[project @ 2002-02-05 10:06:24 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.117 2002/02/05 10:06:24 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #include <stdarg.h>
118
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
121
122 /* Main threads:
123  *
124  * These are the threads which clients have requested that we run.  
125  *
126  * In a 'threaded' build, we might have several concurrent clients all
127  * waiting for results, and each one will wait on a condition variable
128  * until the result is available.
129  *
130  * In non-SMP, clients are strictly nested: the first client calls
131  * into the RTS, which might call out again to C with a _ccall_GC, and
132  * eventually re-enter the RTS.
133  *
134  * Main threads information is kept in a linked list:
135  */
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
138   StgTSO *         tso;
139   SchedulerStatus  stat;
140   StgClosure **    ret;
141 #if defined(RTS_SUPPORTS_THREADS)
142   Condition        wakeup;
143 #endif
144   struct StgMainThread_ *link;
145 } StgMainThread;
146
147 /* Main thread queue.
148  * Locks required: sched_mutex.
149  */
150 static StgMainThread *main_threads;
151
152 /* Thread queues.
153  * Locks required: sched_mutex.
154  */
155 #if defined(GRAN)
156
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
159
160 /* 
161    In GranSim we have a runable and a blocked queue for each processor.
162    In order to minimise code changes new arrays run_queue_hds/tls
163    are created. run_queue_hd is then a short cut (macro) for
164    run_queue_hds[CurrentProc] (see GranSim.h).
165    -- HWL
166 */
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171    the std RTS (i.e. we are cheating). However, we don't use this list in
172    the GranSim specific code at the moment (so we are only potentially
173    cheating).  */
174
175 #else /* !GRAN */
176
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
180
181 #endif
182
183 /* Linked list of all threads.
184  * Used for detecting garbage collected threads.
185  */
186 StgTSO *all_threads;
187
188 /* Threads suspended in _ccall_GC.
189  */
190 static StgTSO *suspended_ccalling_threads;
191
192 static StgTSO *threadStackOverflow(StgTSO *tso);
193
194 /* KH: The following two flags are shared memory locations.  There is no need
195        to lock them, since they are only unset at the end of a scheduler
196        operation.
197 */
198
199 /* flag set by signal handler to precipitate a context switch */
200 //@cindex context_switch
201 nat context_switch;
202
203 /* if this flag is set as well, give up execution */
204 //@cindex interrupted
205 rtsBool interrupted;
206
207 /* Next thread ID to allocate.
208  * Locks required: sched_mutex
209  */
210 //@cindex next_thread_id
211 StgThreadID next_thread_id = 1;
212
213 /*
214  * Pointers to the state of the current thread.
215  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
216  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
217  */
218  
219 /* The smallest stack size that makes any sense is:
220  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
221  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
222  *  + 1                       (the realworld token for an IO thread)
223  *  + 1                       (the closure to enter)
224  *
225  * A thread with this stack will bomb immediately with a stack
226  * overflow, which will increase its stack size.  
227  */
228
229 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
230
231
232 #if defined(GRAN)
233 StgTSO *CurrentTSO;
234 #endif
235
236 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
237  *  exists - earlier gccs apparently didn't.
238  *  -= chak
239  */
240 StgTSO dummy_tso;
241
242 rtsBool ready_to_gc;
243
244 void            addToBlockedQueue ( StgTSO *tso );
245
246 static void     schedule          ( void );
247        void     interruptStgRts   ( void );
248 #if defined(GRAN)
249 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
250 #else
251 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
252 #endif
253
254 static void     detectBlackHoles  ( void );
255
256 #ifdef DEBUG
257 static void sched_belch(char *s, ...);
258 #endif
259
260 #if defined(RTS_SUPPORTS_THREADS)
261 /* ToDo: carefully document the invariants that go together
262  *       with these synchronisation objects.
263  */
264 Mutex     sched_mutex       = INIT_MUTEX_VAR;
265 Mutex     term_mutex        = INIT_MUTEX_VAR;
266 #if defined(THREADED_RTS)
267 /*
268  * The rts_mutex is the 'big lock' that the active native
269  * thread within the RTS holds while executing code
270  * within the RTS. It is given up when the thread makes a
271  * transition out of the RTS (e.g., to perform an external
272  * C call), hopefully for another thread to enter the RTS.
273  *
274  */
275 Mutex     rts_mutex         = INIT_MUTEX_VAR;
276 /*
277  * When a native thread has completed executing an external
278  * call, it needs to communicate the result back to the
279  * (Haskell) thread that made the call. Do this as follows:
280  *
281  *  - in resumeThread(), the thread increments the counter
282  *    ext_threads_waiting, and then blocks on the
283  *    'big' RTS lock.
284  *  - upon entry to the scheduler, the thread that's currently
285  *    holding the RTS lock checks ext_threads_waiting. If there
286  *    are native threads waiting, it gives up its RTS lock
287  *    and tries to re-grab the RTS lock [perhaps after having
288  *    waited for a bit..?]
289  *  - care must be taken to deal with the case where more than
290  *    one external thread are waiting on the lock. [ToDo: more]
291  *    
292  */
293
294 static nat ext_threads_waiting = 0;
295 /*
296  * thread_ready_aux_mutex is used to handle the scenario where the
297  * the RTS executing thread runs out of work, but there are
298  * active external threads. The RTS executing thread gives up
299  * its RTS mutex, and blocks waiting for the thread_ready_cond.
300  * Unfortunately, a condition variable needs to be associated
301  * with a mutex in pthreads, so rts_thread_waiting_mutex is
302  * used for just this purpose.
303  * 
304  */
305 Mutex  thread_ready_aux_mutex = INIT_MUTEX_VAR;
306 #endif
307
308
309 /* thread_ready_cond: when signalled, a thread has
310  * become runnable. When used?
311  */
312 Condition thread_ready_cond = INIT_COND_VAR;
313 Condition gc_pending_cond   = INIT_COND_VAR;
314
315 nat await_death;
316 #endif
317
318 #if defined(PAR)
319 StgTSO *LastTSO;
320 rtsTime TimeOfLastYield;
321 rtsBool emitSchedule = rtsTrue;
322 #endif
323
324 #if DEBUG
325 char *whatNext_strs[] = {
326   "ThreadEnterGHC",
327   "ThreadRunGHC",
328   "ThreadEnterInterp",
329   "ThreadKilled",
330   "ThreadComplete"
331 };
332
333 char *threadReturnCode_strs[] = {
334   "HeapOverflow",                       /* might also be StackOverflow */
335   "StackOverflow",
336   "ThreadYielding",
337   "ThreadBlocked",
338   "ThreadFinished"
339 };
340 #endif
341
342 #if defined(PAR)
343 StgTSO * createSparkThread(rtsSpark spark);
344 StgTSO * activateSpark (rtsSpark spark);  
345 #endif
346
347 /*
348  * The thread state for the main thread.
349 // ToDo: check whether not needed any more
350 StgTSO   *MainTSO;
351  */
352
353 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
354 static void taskStart(void);
355 static void
356 taskStart(void)
357 {
358   /* threads start up using 'taskStart', so make them
359      them grab the RTS lock. */
360 #if defined(THREADED_RTS)
361   ACQUIRE_LOCK(&rts_mutex);
362 #endif
363   schedule();
364 }
365 #endif
366
367
368
369
370 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
371 //@subsection Main scheduling loop
372
373 /* ---------------------------------------------------------------------------
374    Main scheduling loop.
375
376    We use round-robin scheduling, each thread returning to the
377    scheduler loop when one of these conditions is detected:
378
379       * out of heap space
380       * timer expires (thread yields)
381       * thread blocks
382       * thread ends
383       * stack overflow
384
385    Locking notes:  we acquire the scheduler lock once at the beginning
386    of the scheduler loop, and release it when
387     
388       * running a thread, or
389       * waiting for work, or
390       * waiting for a GC to complete.
391
392    GRAN version:
393      In a GranSim setup this loop iterates over the global event queue.
394      This revolves around the global event queue, which determines what 
395      to do next. Therefore, it's more complicated than either the 
396      concurrent or the parallel (GUM) setup.
397
398    GUM version:
399      GUM iterates over incoming messages.
400      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
401      and sends out a fish whenever it has nothing to do; in-between
402      doing the actual reductions (shared code below) it processes the
403      incoming messages and deals with delayed operations 
404      (see PendingFetches).
405      This is not the ugliest code you could imagine, but it's bloody close.
406
407    ------------------------------------------------------------------------ */
408 //@cindex schedule
409 static void
410 schedule( void )
411 {
412   StgTSO *t;
413   Capability *cap;
414   StgThreadReturnCode ret;
415 #if defined(GRAN)
416   rtsEvent *event;
417 #elif defined(PAR)
418   StgSparkPool *pool;
419   rtsSpark spark;
420   StgTSO *tso;
421   GlobalTaskId pe;
422   rtsBool receivedFinish = rtsFalse;
423 # if defined(DEBUG)
424   nat tp_size, sp_size; // stats only
425 # endif
426 #endif
427   rtsBool was_interrupted = rtsFalse;
428   
429   ACQUIRE_LOCK(&sched_mutex);
430   
431 #if defined(THREADED_RTS)
432   /* ToDo: consider SMP support */
433   if (ext_threads_waiting > 0) {
434     /* (At least) one external thread is waiting to
435      * to deposit the result of an external call.
436      * Give way to one of them by giving up the RTS
437      * lock.
438      */
439     RELEASE_LOCK(&sched_mutex);
440     RELEASE_LOCK(&rts_mutex);
441     /* ToDo: come up with mechanism that guarantees that
442      * the main thread doesn't loop here.
443      */
444     yieldThread();
445     /* ToDo: longjmp() */
446     taskStart();
447   }
448 #endif
449
450 #if defined(GRAN)
451
452   /* set up first event to get things going */
453   /* ToDo: assign costs for system setup and init MainTSO ! */
454   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
455             ContinueThread, 
456             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
457
458   IF_DEBUG(gran,
459            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
460            G_TSO(CurrentTSO, 5));
461
462   if (RtsFlags.GranFlags.Light) {
463     /* Save current time; GranSim Light only */
464     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
465   }      
466
467   event = get_next_event();
468
469   while (event!=(rtsEvent*)NULL) {
470     /* Choose the processor with the next event */
471     CurrentProc = event->proc;
472     CurrentTSO = event->tso;
473
474 #elif defined(PAR)
475
476   while (!receivedFinish) {    /* set by processMessages */
477                                /* when receiving PP_FINISH message         */ 
478 #else
479
480   while (1) {
481
482 #endif
483
484     IF_DEBUG(scheduler, printAllThreads());
485
486     /* If we're interrupted (the user pressed ^C, or some other
487      * termination condition occurred), kill all the currently running
488      * threads.
489      */
490     if (interrupted) {
491       IF_DEBUG(scheduler, sched_belch("interrupted"));
492       deleteAllThreads();
493       interrupted = rtsFalse;
494       was_interrupted = rtsTrue;
495     }
496
497     /* Go through the list of main threads and wake up any
498      * clients whose computations have finished.  ToDo: this
499      * should be done more efficiently without a linear scan
500      * of the main threads list, somehow...
501      */
502 #if defined(RTS_SUPPORTS_THREADS)
503     { 
504       StgMainThread *m, **prev;
505       prev = &main_threads;
506       for (m = main_threads; m != NULL; m = m->link) {
507         switch (m->tso->what_next) {
508         case ThreadComplete:
509           if (m->ret) {
510             *(m->ret) = (StgClosure *)m->tso->sp[0];
511           }
512           *prev = m->link;
513           m->stat = Success;
514           broadcastCondition(&m->wakeup);
515           break;
516         case ThreadKilled:
517           if (m->ret) *(m->ret) = NULL;
518           *prev = m->link;
519           if (was_interrupted) {
520             m->stat = Interrupted;
521           } else {
522             m->stat = Killed;
523           }
524           broadcastCondition(&m->wakeup);
525           break;
526         default:
527           break;
528         }
529       }
530     }
531
532 #else /* not threaded */
533
534 # if defined(PAR)
535     /* in GUM do this only on the Main PE */
536     if (IAmMainThread)
537 # endif
538     /* If our main thread has finished or been killed, return.
539      */
540     {
541       StgMainThread *m = main_threads;
542       if (m->tso->what_next == ThreadComplete
543           || m->tso->what_next == ThreadKilled) {
544         main_threads = main_threads->link;
545         if (m->tso->what_next == ThreadComplete) {
546           /* we finished successfully, fill in the return value */
547           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
548           m->stat = Success;
549           return;
550         } else {
551           if (m->ret) { *(m->ret) = NULL; };
552           if (was_interrupted) {
553             m->stat = Interrupted;
554           } else {
555             m->stat = Killed;
556           }
557           return;
558         }
559       }
560     }
561 #endif
562
563     /* Top up the run queue from our spark pool.  We try to make the
564      * number of threads in the run queue equal to the number of
565      * free capabilities.
566      *
567      * Disable spark support in SMP for now, non-essential & requires
568      * a little bit of work to make it compile cleanly. -- sof 1/02.
569      */
570 #if 0 /* defined(SMP) */
571     {
572       nat n = getFreeCapabilities();
573       StgTSO *tso = run_queue_hd;
574
575       /* Count the run queue */
576       while (n > 0 && tso != END_TSO_QUEUE) {
577         tso = tso->link;
578         n--;
579       }
580
581       for (; n > 0; n--) {
582         StgClosure *spark;
583         spark = findSpark(rtsFalse);
584         if (spark == NULL) {
585           break; /* no more sparks in the pool */
586         } else {
587           /* I'd prefer this to be done in activateSpark -- HWL */
588           /* tricky - it needs to hold the scheduler lock and
589            * not try to re-acquire it -- SDM */
590           createSparkThread(spark);       
591           IF_DEBUG(scheduler,
592                    sched_belch("==^^ turning spark of closure %p into a thread",
593                                (StgClosure *)spark));
594         }
595       }
596       /* We need to wake up the other tasks if we just created some
597        * work for them.
598        */
599       if (getFreeCapabilities() - n > 1) {
600           signalCondition( &thread_ready_cond );
601       }
602     }
603 #endif // SMP
604
605     /* check for signals each time around the scheduler */
606 #ifndef mingw32_TARGET_OS
607     if (signals_pending()) {
608       startSignalHandlers();
609     }
610 #endif
611
612     /* Check whether any waiting threads need to be woken up.  If the
613      * run queue is empty, and there are no other tasks running, we
614      * can wait indefinitely for something to happen.
615      * ToDo: what if another client comes along & requests another
616      * main thread?
617      */
618     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
619       awaitEvent(
620            (run_queue_hd == END_TSO_QUEUE)
621 #if defined(SMP)
622         && allFreeCapabilities()
623 #endif
624         );
625     }
626     /* we can be interrupted while waiting for I/O... */
627     if (interrupted) continue;
628
629     /* 
630      * Detect deadlock: when we have no threads to run, there are no
631      * threads waiting on I/O or sleeping, and all the other tasks are
632      * waiting for work, we must have a deadlock of some description.
633      *
634      * We first try to find threads blocked on themselves (ie. black
635      * holes), and generate NonTermination exceptions where necessary.
636      *
637      * If no threads are black holed, we have a deadlock situation, so
638      * inform all the main threads.
639      */
640 #ifndef PAR
641     if (blocked_queue_hd == END_TSO_QUEUE
642         && run_queue_hd == END_TSO_QUEUE
643         && sleeping_queue == END_TSO_QUEUE
644 #if defined(SMP)
645         && allFreeCapabilities()
646 #elif defined(THREADED_RTS)
647         && suspended_ccalling_threads == END_TSO_QUEUE
648 #endif
649         )
650     {
651         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
652         RELEASE_LOCK(&sched_mutex);
653         GarbageCollect(GetRoots,rtsTrue);
654         ACQUIRE_LOCK(&sched_mutex);
655         IF_DEBUG(scheduler, sched_belch("GC done."));
656         if (blocked_queue_hd == END_TSO_QUEUE
657             && run_queue_hd == END_TSO_QUEUE
658             && sleeping_queue == END_TSO_QUEUE) {
659
660             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
661             detectBlackHoles();
662
663             /* No black holes, so probably a real deadlock.  Send the
664              * current main thread the Deadlock exception (or in the SMP
665              * build, send *all* main threads the deadlock exception,
666              * since none of them can make progress).
667              */
668             if (run_queue_hd == END_TSO_QUEUE) {
669                 StgMainThread *m;
670 #if defined(RTS_SUPPORTS_THREADS)
671                 for (m = main_threads; m != NULL; m = m->link) {
672                     switch (m->tso->why_blocked) {
673                     case BlockedOnBlackHole:
674                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
675                         break;
676                     case BlockedOnException:
677                     case BlockedOnMVar:
678                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
679                         break;
680                     default:
681                         barf("deadlock: main thread blocked in a strange way");
682                     }
683                 }
684 #else
685                 m = main_threads;
686                 switch (m->tso->why_blocked) {
687                 case BlockedOnBlackHole:
688                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
689                     break;
690                 case BlockedOnException:
691                 case BlockedOnMVar:
692                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
693                     break;
694                 default:
695                     barf("deadlock: main thread blocked in a strange way");
696                 }
697 #endif
698             }
699 #if defined(RTS_SUPPORTS_THREADS)
700             if ( run_queue_hd == END_TSO_QUEUE ) {
701               IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down."));
702               shutdownHaskellAndExit(0);
703             
704             }
705 #endif
706             ASSERT( run_queue_hd != END_TSO_QUEUE );
707         }
708     }
709 #elif defined(PAR)
710     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
711 #endif
712
713 #if defined(SMP)
714     /* If there's a GC pending, don't do anything until it has
715      * completed.
716      */
717     if (ready_to_gc) {
718       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
719       waitCondition( &gc_pending_cond, &sched_mutex );
720     }
721 #endif    
722
723 #if defined(SMP)
724     /* block until we've got a thread on the run queue and a free
725      * capability.
726      */
727     while ( run_queue_hd == END_TSO_QUEUE 
728             || noFreeCapabilities() 
729             ) {
730       IF_DEBUG(scheduler, sched_belch("waiting for work"));
731       waitCondition( &thread_ready_cond, &sched_mutex );
732       IF_DEBUG(scheduler, sched_belch("work now available"));
733     }
734 #elif defined(THREADED_RTS)
735    if ( run_queue_hd == END_TSO_QUEUE ) {
736      /* no work available, wait for external calls to complete. */
737      IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId()));
738      RELEASE_LOCK(&sched_mutex);
739      RELEASE_LOCK(&rts_mutex);
740      /* Sigh - need to have a mutex locked in order to wait on the
741         condition variable. */
742      ACQUIRE_LOCK(&thread_ready_aux_mutex);
743      waitCondition(&thread_ready_cond, &thread_ready_aux_mutex);
744      RELEASE_LOCK(&thread_ready_aux_mutex);
745      IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId()));
746      /* ToDo: longjmp() */
747      taskStart();
748    
749    }
750 #endif
751
752 #if defined(GRAN)
753
754     if (RtsFlags.GranFlags.Light)
755       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
756
757     /* adjust time based on time-stamp */
758     if (event->time > CurrentTime[CurrentProc] &&
759         event->evttype != ContinueThread)
760       CurrentTime[CurrentProc] = event->time;
761     
762     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
763     if (!RtsFlags.GranFlags.Light)
764       handleIdlePEs();
765
766     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
767
768     /* main event dispatcher in GranSim */
769     switch (event->evttype) {
770       /* Should just be continuing execution */
771     case ContinueThread:
772       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
773       /* ToDo: check assertion
774       ASSERT(run_queue_hd != (StgTSO*)NULL &&
775              run_queue_hd != END_TSO_QUEUE);
776       */
777       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
778       if (!RtsFlags.GranFlags.DoAsyncFetch &&
779           procStatus[CurrentProc]==Fetching) {
780         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
781               CurrentTSO->id, CurrentTSO, CurrentProc);
782         goto next_thread;
783       } 
784       /* Ignore ContinueThreads for completed threads */
785       if (CurrentTSO->what_next == ThreadComplete) {
786         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
787               CurrentTSO->id, CurrentTSO, CurrentProc);
788         goto next_thread;
789       } 
790       /* Ignore ContinueThreads for threads that are being migrated */
791       if (PROCS(CurrentTSO)==Nowhere) { 
792         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
793               CurrentTSO->id, CurrentTSO, CurrentProc);
794         goto next_thread;
795       }
796       /* The thread should be at the beginning of the run queue */
797       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
798         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
799               CurrentTSO->id, CurrentTSO, CurrentProc);
800         break; // run the thread anyway
801       }
802       /*
803       new_event(proc, proc, CurrentTime[proc],
804                 FindWork,
805                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
806       goto next_thread; 
807       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
808       break; // now actually run the thread; DaH Qu'vam yImuHbej 
809
810     case FetchNode:
811       do_the_fetchnode(event);
812       goto next_thread;             /* handle next event in event queue  */
813       
814     case GlobalBlock:
815       do_the_globalblock(event);
816       goto next_thread;             /* handle next event in event queue  */
817       
818     case FetchReply:
819       do_the_fetchreply(event);
820       goto next_thread;             /* handle next event in event queue  */
821       
822     case UnblockThread:   /* Move from the blocked queue to the tail of */
823       do_the_unblock(event);
824       goto next_thread;             /* handle next event in event queue  */
825       
826     case ResumeThread:  /* Move from the blocked queue to the tail of */
827       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
828       event->tso->gran.blocktime += 
829         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
830       do_the_startthread(event);
831       goto next_thread;             /* handle next event in event queue  */
832       
833     case StartThread:
834       do_the_startthread(event);
835       goto next_thread;             /* handle next event in event queue  */
836       
837     case MoveThread:
838       do_the_movethread(event);
839       goto next_thread;             /* handle next event in event queue  */
840       
841     case MoveSpark:
842       do_the_movespark(event);
843       goto next_thread;             /* handle next event in event queue  */
844       
845     case FindWork:
846       do_the_findwork(event);
847       goto next_thread;             /* handle next event in event queue  */
848       
849     default:
850       barf("Illegal event type %u\n", event->evttype);
851     }  /* switch */
852     
853     /* This point was scheduler_loop in the old RTS */
854
855     IF_DEBUG(gran, belch("GRAN: after main switch"));
856
857     TimeOfLastEvent = CurrentTime[CurrentProc];
858     TimeOfNextEvent = get_time_of_next_event();
859     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
860     // CurrentTSO = ThreadQueueHd;
861
862     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
863                          TimeOfNextEvent));
864
865     if (RtsFlags.GranFlags.Light) 
866       GranSimLight_leave_system(event, &ActiveTSO); 
867
868     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
869
870     IF_DEBUG(gran, 
871              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
872
873     /* in a GranSim setup the TSO stays on the run queue */
874     t = CurrentTSO;
875     /* Take a thread from the run queue. */
876     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
877
878     IF_DEBUG(gran, 
879              fprintf(stderr, "GRAN: About to run current thread, which is\n");
880              G_TSO(t,5));
881
882     context_switch = 0; // turned on via GranYield, checking events and time slice
883
884     IF_DEBUG(gran, 
885              DumpGranEvent(GR_SCHEDULE, t));
886
887     procStatus[CurrentProc] = Busy;
888
889 #elif defined(PAR)
890     if (PendingFetches != END_BF_QUEUE) {
891         processFetches();
892     }
893
894     /* ToDo: phps merge with spark activation above */
895     /* check whether we have local work and send requests if we have none */
896     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
897       /* :-[  no local threads => look out for local sparks */
898       /* the spark pool for the current PE */
899       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
900       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
901           pool->hd < pool->tl) {
902         /* 
903          * ToDo: add GC code check that we really have enough heap afterwards!!
904          * Old comment:
905          * If we're here (no runnable threads) and we have pending
906          * sparks, we must have a space problem.  Get enough space
907          * to turn one of those pending sparks into a
908          * thread... 
909          */
910
911         spark = findSpark(rtsFalse);                /* get a spark */
912         if (spark != (rtsSpark) NULL) {
913           tso = activateSpark(spark);       /* turn the spark into a thread */
914           IF_PAR_DEBUG(schedule,
915                        belch("==== schedule: Created TSO %d (%p); %d threads active",
916                              tso->id, tso, advisory_thread_count));
917
918           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
919             belch("==^^ failed to activate spark");
920             goto next_thread;
921           }               /* otherwise fall through & pick-up new tso */
922         } else {
923           IF_PAR_DEBUG(verbose,
924                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
925                              spark_queue_len(pool)));
926           goto next_thread;
927         }
928       }
929
930       /* If we still have no work we need to send a FISH to get a spark
931          from another PE 
932       */
933       if (EMPTY_RUN_QUEUE()) {
934       /* =8-[  no local sparks => look for work on other PEs */
935         /*
936          * We really have absolutely no work.  Send out a fish
937          * (there may be some out there already), and wait for
938          * something to arrive.  We clearly can't run any threads
939          * until a SCHEDULE or RESUME arrives, and so that's what
940          * we're hoping to see.  (Of course, we still have to
941          * respond to other types of messages.)
942          */
943         TIME now = msTime() /*CURRENT_TIME*/;
944         IF_PAR_DEBUG(verbose, 
945                      belch("--  now=%ld", now));
946         IF_PAR_DEBUG(verbose,
947                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
948                          (last_fish_arrived_at!=0 &&
949                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
950                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
951                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
952                              last_fish_arrived_at,
953                              RtsFlags.ParFlags.fishDelay, now);
954                      });
955         
956         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
957             (last_fish_arrived_at==0 ||
958              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
959           /* outstandingFishes is set in sendFish, processFish;
960              avoid flooding system with fishes via delay */
961           pe = choosePE();
962           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
963                    NEW_FISH_HUNGER);
964
965           // Global statistics: count no. of fishes
966           if (RtsFlags.ParFlags.ParStats.Global &&
967               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
968             globalParStats.tot_fish_mess++;
969           }
970         }
971       
972         receivedFinish = processMessages();
973         goto next_thread;
974       }
975     } else if (PacketsWaiting()) {  /* Look for incoming messages */
976       receivedFinish = processMessages();
977     }
978
979     /* Now we are sure that we have some work available */
980     ASSERT(run_queue_hd != END_TSO_QUEUE);
981
982     /* Take a thread from the run queue, if we have work */
983     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
984     IF_DEBUG(sanity,checkTSO(t));
985
986     /* ToDo: write something to the log-file
987     if (RTSflags.ParFlags.granSimStats && !sameThread)
988         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
989
990     CurrentTSO = t;
991     */
992     /* the spark pool for the current PE */
993     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
994
995     IF_DEBUG(scheduler, 
996              belch("--=^ %d threads, %d sparks on [%#x]", 
997                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
998
999 #if 1
1000     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1001         t && LastTSO && t->id != LastTSO->id && 
1002         LastTSO->why_blocked == NotBlocked && 
1003         LastTSO->what_next != ThreadComplete) {
1004       // if previously scheduled TSO not blocked we have to record the context switch
1005       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1006                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1007     }
1008
1009     if (RtsFlags.ParFlags.ParStats.Full && 
1010         (emitSchedule /* forced emit */ ||
1011         (t && LastTSO && t->id != LastTSO->id))) {
1012       /* 
1013          we are running a different TSO, so write a schedule event to log file
1014          NB: If we use fair scheduling we also have to write  a deschedule 
1015              event for LastTSO; with unfair scheduling we know that the
1016              previous tso has blocked whenever we switch to another tso, so
1017              we don't need it in GUM for now
1018       */
1019       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1020                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1021       emitSchedule = rtsFalse;
1022     }
1023      
1024 #endif
1025 #else /* !GRAN && !PAR */
1026   
1027     /* grab a thread from the run queue
1028      */
1029     ASSERT(run_queue_hd != END_TSO_QUEUE);
1030     t = POP_RUN_QUEUE();
1031     // Sanity check the thread we're about to run.  This can be
1032     // expensive if there is lots of thread switching going on...
1033     IF_DEBUG(sanity,checkTSO(t));
1034 #endif
1035     
1036     grabCapability(&cap);
1037     cap->r.rCurrentTSO = t;
1038     
1039     /* context switches are now initiated by the timer signal, unless
1040      * the user specified "context switch as often as possible", with
1041      * +RTS -C0
1042      */
1043     if (
1044 #ifdef PROFILING
1045         RtsFlags.ProfFlags.profileInterval == 0 ||
1046 #endif
1047         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1048          && (run_queue_hd != END_TSO_QUEUE
1049              || blocked_queue_hd != END_TSO_QUEUE
1050              || sleeping_queue != END_TSO_QUEUE)))
1051         context_switch = 1;
1052     else
1053         context_switch = 0;
1054
1055     RELEASE_LOCK(&sched_mutex);
1056
1057     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1058                               t->id, t, whatNext_strs[t->what_next]));
1059
1060 #ifdef PROFILING
1061     startHeapProfTimer();
1062 #endif
1063
1064     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1065     /* Run the current thread 
1066      */
1067     switch (cap->r.rCurrentTSO->what_next) {
1068     case ThreadKilled:
1069     case ThreadComplete:
1070         /* Thread already finished, return to scheduler. */
1071         ret = ThreadFinished;
1072         break;
1073     case ThreadEnterGHC:
1074         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1075         break;
1076     case ThreadRunGHC:
1077         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1078         break;
1079     case ThreadEnterInterp:
1080         ret = interpretBCO(cap);
1081         break;
1082     default:
1083       barf("schedule: invalid what_next field");
1084     }
1085     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1086     
1087     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1088 #ifdef PROFILING
1089     stopHeapProfTimer();
1090     CCCS = CCS_SYSTEM;
1091 #endif
1092     
1093     ACQUIRE_LOCK(&sched_mutex);
1094
1095 #ifdef SMP
1096     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1097 #elif !defined(GRAN) && !defined(PAR)
1098     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1099 #endif
1100     t = cap->r.rCurrentTSO;
1101     
1102 #if defined(PAR)
1103     /* HACK 675: if the last thread didn't yield, make sure to print a 
1104        SCHEDULE event to the log file when StgRunning the next thread, even
1105        if it is the same one as before */
1106     LastTSO = t; 
1107     TimeOfLastYield = CURRENT_TIME;
1108 #endif
1109
1110     switch (ret) {
1111     case HeapOverflow:
1112 #if defined(GRAN)
1113       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1114       globalGranStats.tot_heapover++;
1115 #elif defined(PAR)
1116       globalParStats.tot_heapover++;
1117 #endif
1118
1119       // did the task ask for a large block?
1120       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1121           // if so, get one and push it on the front of the nursery.
1122           bdescr *bd;
1123           nat blocks;
1124           
1125           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1126
1127           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1128                                    t->id, t,
1129                                    whatNext_strs[t->what_next], blocks));
1130
1131           // don't do this if it would push us over the
1132           // alloc_blocks_lim limit; we'll GC first.
1133           if (alloc_blocks + blocks < alloc_blocks_lim) {
1134
1135               alloc_blocks += blocks;
1136               bd = allocGroup( blocks );
1137
1138               // link the new group into the list
1139               bd->link = cap->r.rCurrentNursery;
1140               bd->u.back = cap->r.rCurrentNursery->u.back;
1141               if (cap->r.rCurrentNursery->u.back != NULL) {
1142                   cap->r.rCurrentNursery->u.back->link = bd;
1143               } else {
1144                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1145                          g0s0->blocks == cap->r.rNursery);
1146                   cap->r.rNursery = g0s0->blocks = bd;
1147               }           
1148               cap->r.rCurrentNursery->u.back = bd;
1149
1150               // initialise it as a nursery block
1151               bd->step = g0s0;
1152               bd->gen_no = 0;
1153               bd->flags = 0;
1154               bd->free = bd->start;
1155
1156               // don't forget to update the block count in g0s0.
1157               g0s0->n_blocks += blocks;
1158               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1159
1160               // now update the nursery to point to the new block
1161               cap->r.rCurrentNursery = bd;
1162
1163               // we might be unlucky and have another thread get on the
1164               // run queue before us and steal the large block, but in that
1165               // case the thread will just end up requesting another large
1166               // block.
1167               PUSH_ON_RUN_QUEUE(t);
1168               break;
1169           }
1170       }
1171
1172       /* make all the running tasks block on a condition variable,
1173        * maybe set context_switch and wait till they all pile in,
1174        * then have them wait on a GC condition variable.
1175        */
1176       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1177                                t->id, t, whatNext_strs[t->what_next]));
1178       threadPaused(t);
1179 #if defined(GRAN)
1180       ASSERT(!is_on_queue(t,CurrentProc));
1181 #elif defined(PAR)
1182       /* Currently we emit a DESCHEDULE event before GC in GUM.
1183          ToDo: either add separate event to distinguish SYSTEM time from rest
1184                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1185       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1186         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1187                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1188         emitSchedule = rtsTrue;
1189       }
1190 #endif
1191       
1192       ready_to_gc = rtsTrue;
1193       context_switch = 1;               /* stop other threads ASAP */
1194       PUSH_ON_RUN_QUEUE(t);
1195       /* actual GC is done at the end of the while loop */
1196       break;
1197       
1198     case StackOverflow:
1199 #if defined(GRAN)
1200       IF_DEBUG(gran, 
1201                DumpGranEvent(GR_DESCHEDULE, t));
1202       globalGranStats.tot_stackover++;
1203 #elif defined(PAR)
1204       // IF_DEBUG(par, 
1205       // DumpGranEvent(GR_DESCHEDULE, t);
1206       globalParStats.tot_stackover++;
1207 #endif
1208       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1209                                t->id, t, whatNext_strs[t->what_next]));
1210       /* just adjust the stack for this thread, then pop it back
1211        * on the run queue.
1212        */
1213       threadPaused(t);
1214       { 
1215         StgMainThread *m;
1216         /* enlarge the stack */
1217         StgTSO *new_t = threadStackOverflow(t);
1218         
1219         /* This TSO has moved, so update any pointers to it from the
1220          * main thread stack.  It better not be on any other queues...
1221          * (it shouldn't be).
1222          */
1223         for (m = main_threads; m != NULL; m = m->link) {
1224           if (m->tso == t) {
1225             m->tso = new_t;
1226           }
1227         }
1228         threadPaused(new_t);
1229         PUSH_ON_RUN_QUEUE(new_t);
1230       }
1231       break;
1232
1233     case ThreadYielding:
1234 #if defined(GRAN)
1235       IF_DEBUG(gran, 
1236                DumpGranEvent(GR_DESCHEDULE, t));
1237       globalGranStats.tot_yields++;
1238 #elif defined(PAR)
1239       // IF_DEBUG(par, 
1240       // DumpGranEvent(GR_DESCHEDULE, t);
1241       globalParStats.tot_yields++;
1242 #endif
1243       /* put the thread back on the run queue.  Then, if we're ready to
1244        * GC, check whether this is the last task to stop.  If so, wake
1245        * up the GC thread.  getThread will block during a GC until the
1246        * GC is finished.
1247        */
1248       IF_DEBUG(scheduler,
1249                if (t->what_next == ThreadEnterInterp) {
1250                    /* ToDo: or maybe a timer expired when we were in Hugs?
1251                     * or maybe someone hit ctrl-C
1252                     */
1253                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1254                          t->id, t, whatNext_strs[t->what_next]);
1255                } else {
1256                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1257                          t->id, t, whatNext_strs[t->what_next]);
1258                }
1259                );
1260
1261       threadPaused(t);
1262
1263       IF_DEBUG(sanity,
1264                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1265                checkTSO(t));
1266       ASSERT(t->link == END_TSO_QUEUE);
1267 #if defined(GRAN)
1268       ASSERT(!is_on_queue(t,CurrentProc));
1269
1270       IF_DEBUG(sanity,
1271                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1272                checkThreadQsSanity(rtsTrue));
1273 #endif
1274 #if defined(PAR)
1275       if (RtsFlags.ParFlags.doFairScheduling) { 
1276         /* this does round-robin scheduling; good for concurrency */
1277         APPEND_TO_RUN_QUEUE(t);
1278       } else {
1279         /* this does unfair scheduling; good for parallelism */
1280         PUSH_ON_RUN_QUEUE(t);
1281       }
1282 #else
1283       /* this does round-robin scheduling; good for concurrency */
1284       APPEND_TO_RUN_QUEUE(t);
1285 #endif
1286 #if defined(GRAN)
1287       /* add a ContinueThread event to actually process the thread */
1288       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1289                 ContinueThread,
1290                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1291       IF_GRAN_DEBUG(bq, 
1292                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1293                G_EVENTQ(0);
1294                G_CURR_THREADQ(0));
1295 #endif /* GRAN */
1296       break;
1297       
1298     case ThreadBlocked:
1299 #if defined(GRAN)
1300       IF_DEBUG(scheduler,
1301                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1302                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1303                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1304
1305       // ??? needed; should emit block before
1306       IF_DEBUG(gran, 
1307                DumpGranEvent(GR_DESCHEDULE, t)); 
1308       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1309       /*
1310         ngoq Dogh!
1311       ASSERT(procStatus[CurrentProc]==Busy || 
1312               ((procStatus[CurrentProc]==Fetching) && 
1313               (t->block_info.closure!=(StgClosure*)NULL)));
1314       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1315           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1316             procStatus[CurrentProc]==Fetching)) 
1317         procStatus[CurrentProc] = Idle;
1318       */
1319 #elif defined(PAR)
1320       IF_DEBUG(scheduler,
1321                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1322                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1323       IF_PAR_DEBUG(bq,
1324
1325                    if (t->block_info.closure!=(StgClosure*)NULL) 
1326                      print_bq(t->block_info.closure));
1327
1328       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1329       blockThread(t);
1330
1331       /* whatever we schedule next, we must log that schedule */
1332       emitSchedule = rtsTrue;
1333
1334 #else /* !GRAN */
1335       /* don't need to do anything.  Either the thread is blocked on
1336        * I/O, in which case we'll have called addToBlockedQueue
1337        * previously, or it's blocked on an MVar or Blackhole, in which
1338        * case it'll be on the relevant queue already.
1339        */
1340       IF_DEBUG(scheduler,
1341                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1342                printThreadBlockage(t);
1343                fprintf(stderr, "\n"));
1344
1345       /* Only for dumping event to log file 
1346          ToDo: do I need this in GranSim, too?
1347       blockThread(t);
1348       */
1349 #endif
1350       threadPaused(t);
1351       break;
1352       
1353     case ThreadFinished:
1354       /* Need to check whether this was a main thread, and if so, signal
1355        * the task that started it with the return value.  If we have no
1356        * more main threads, we probably need to stop all the tasks until
1357        * we get a new one.
1358        */
1359       /* We also end up here if the thread kills itself with an
1360        * uncaught exception, see Exception.hc.
1361        */
1362       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1363 #if defined(GRAN)
1364       endThread(t, CurrentProc); // clean-up the thread
1365 #elif defined(PAR)
1366       /* For now all are advisory -- HWL */
1367       //if(t->priority==AdvisoryPriority) ??
1368       advisory_thread_count--;
1369       
1370 # ifdef DIST
1371       if(t->dist.priority==RevalPriority)
1372         FinishReval(t);
1373 # endif
1374       
1375       if (RtsFlags.ParFlags.ParStats.Full &&
1376           !RtsFlags.ParFlags.ParStats.Suppressed) 
1377         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1378 #endif
1379       break;
1380       
1381     default:
1382       barf("schedule: invalid thread return code %d", (int)ret);
1383     }
1384     
1385 #ifdef SMP
1386     grabCapability(&cap);
1387 #endif
1388
1389 #ifdef PROFILING
1390     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1391         GarbageCollect(GetRoots, rtsTrue);
1392         heapCensus();
1393         performHeapProfile = rtsFalse;
1394         ready_to_gc = rtsFalse; // we already GC'd
1395     }
1396 #endif
1397
1398 #ifdef SMP
1399     if (ready_to_gc && allFreeCapabilities() )
1400 #else
1401     if (ready_to_gc) 
1402 #endif
1403       {
1404       /* everybody back, start the GC.
1405        * Could do it in this thread, or signal a condition var
1406        * to do it in another thread.  Either way, we need to
1407        * broadcast on gc_pending_cond afterward.
1408        */
1409 #if defined(RTS_SUPPORTS_THREADS)
1410       IF_DEBUG(scheduler,sched_belch("doing GC"));
1411 #endif
1412       GarbageCollect(GetRoots,rtsFalse);
1413       ready_to_gc = rtsFalse;
1414 #ifdef SMP
1415       broadcastCondition(&gc_pending_cond);
1416 #endif
1417 #if defined(GRAN)
1418       /* add a ContinueThread event to continue execution of current thread */
1419       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1420                 ContinueThread,
1421                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1422       IF_GRAN_DEBUG(bq, 
1423                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1424                G_EVENTQ(0);
1425                G_CURR_THREADQ(0));
1426 #endif /* GRAN */
1427     }
1428
1429 #if defined(GRAN)
1430   next_thread:
1431     IF_GRAN_DEBUG(unused,
1432                   print_eventq(EventHd));
1433
1434     event = get_next_event();
1435 #elif defined(PAR)
1436   next_thread:
1437     /* ToDo: wait for next message to arrive rather than busy wait */
1438 #endif /* GRAN */
1439
1440   } /* end of while(1) */
1441
1442   IF_PAR_DEBUG(verbose,
1443                belch("== Leaving schedule() after having received Finish"));
1444 }
1445
1446 /* ---------------------------------------------------------------------------
1447  * deleteAllThreads():  kill all the live threads.
1448  *
1449  * This is used when we catch a user interrupt (^C), before performing
1450  * any necessary cleanups and running finalizers.
1451  * ------------------------------------------------------------------------- */
1452    
1453 void deleteAllThreads ( void )
1454 {
1455   StgTSO* t, *next;
1456   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1457   for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1458       next = t->link;
1459       deleteThread(t);
1460   }
1461   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1462       next = t->link;
1463       deleteThread(t);
1464   }
1465   for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1466       next = t->link;
1467       deleteThread(t);
1468   }
1469   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1470   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1471   sleeping_queue = END_TSO_QUEUE;
1472 }
1473
1474 /* startThread and  insertThread are now in GranSim.c -- HWL */
1475
1476
1477 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1478 //@subsection Suspend and Resume
1479
1480 /* ---------------------------------------------------------------------------
1481  * Suspending & resuming Haskell threads.
1482  * 
1483  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1484  * its capability before calling the C function.  This allows another
1485  * task to pick up the capability and carry on running Haskell
1486  * threads.  It also means that if the C call blocks, it won't lock
1487  * the whole system.
1488  *
1489  * The Haskell thread making the C call is put to sleep for the
1490  * duration of the call, on the susepended_ccalling_threads queue.  We
1491  * give out a token to the task, which it can use to resume the thread
1492  * on return from the C function.
1493  * ------------------------------------------------------------------------- */
1494    
1495 StgInt
1496 suspendThread( StgRegTable *reg )
1497 {
1498   nat tok;
1499   Capability *cap;
1500
1501   /* assume that *reg is a pointer to the StgRegTable part
1502    * of a Capability.
1503    */
1504   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1505
1506   ACQUIRE_LOCK(&sched_mutex);
1507
1508   IF_DEBUG(scheduler,
1509            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1510
1511   threadPaused(cap->r.rCurrentTSO);
1512   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1513   suspended_ccalling_threads = cap->r.rCurrentTSO;
1514
1515   /* Use the thread ID as the token; it should be unique */
1516   tok = cap->r.rCurrentTSO->id;
1517
1518   /* Hand back capability */
1519   releaseCapability(&cap);
1520   
1521 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1522   IF_DEBUG(scheduler, sched_belch("thread %d leaving RTS\n", tok));
1523   startTask(taskStart);
1524 #endif
1525   
1526   RELEASE_LOCK(&sched_mutex);
1527   RELEASE_LOCK(&rts_mutex);
1528   return tok; 
1529 }
1530
1531 StgRegTable *
1532 resumeThread( StgInt tok )
1533 {
1534   StgTSO *tso, **prev;
1535   Capability *cap;
1536
1537 #if defined(THREADED_RTS)
1538   IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok));
1539   ACQUIRE_LOCK(&sched_mutex);
1540   ext_threads_waiting++;
1541   IF_DEBUG(scheduler, sched_belch("thread %d returning, ext_thread count: %d.\n", tok, ext_threads_waiting));
1542   RELEASE_LOCK(&sched_mutex);
1543   
1544   IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok));
1545   ACQUIRE_LOCK(&rts_mutex);
1546   ext_threads_waiting--;
1547   IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok));
1548 #endif
1549
1550 #if defined(THREADED_RTS)
1551   /* Free up any RTS-blocked threads. */
1552   broadcastCondition(&thread_ready_cond);
1553 #endif
1554
1555   /* Remove the thread off of the suspended list */
1556   prev = &suspended_ccalling_threads;
1557   for (tso = suspended_ccalling_threads; 
1558        tso != END_TSO_QUEUE; 
1559        prev = &tso->link, tso = tso->link) {
1560     if (tso->id == (StgThreadID)tok) {
1561       *prev = tso->link;
1562       break;
1563     }
1564   }
1565   if (tso == END_TSO_QUEUE) {
1566     barf("resumeThread: thread not found");
1567   }
1568   tso->link = END_TSO_QUEUE;
1569
1570 #if defined(SMP)
1571   while ( noFreeCapabilities() ) {
1572     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1573     waitCondition(&thread_ready_cond, &sched_mutex);
1574     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1575   }
1576 #endif
1577
1578   grabCapability(&cap);
1579
1580   cap->r.rCurrentTSO = tso;
1581
1582   return &cap->r;
1583 }
1584
1585
1586 /* ---------------------------------------------------------------------------
1587  * Static functions
1588  * ------------------------------------------------------------------------ */
1589 static void unblockThread(StgTSO *tso);
1590
1591 /* ---------------------------------------------------------------------------
1592  * Comparing Thread ids.
1593  *
1594  * This is used from STG land in the implementation of the
1595  * instances of Eq/Ord for ThreadIds.
1596  * ------------------------------------------------------------------------ */
1597
1598 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1599
1600   StgThreadID id1 = tso1->id; 
1601   StgThreadID id2 = tso2->id;
1602  
1603   if (id1 < id2) return (-1);
1604   if (id1 > id2) return 1;
1605   return 0;
1606 }
1607
1608 /* ---------------------------------------------------------------------------
1609  * Fetching the ThreadID from an StgTSO.
1610  *
1611  * This is used in the implementation of Show for ThreadIds.
1612  * ------------------------------------------------------------------------ */
1613 int rts_getThreadId(const StgTSO *tso) 
1614 {
1615   return tso->id;
1616 }
1617
1618 /* ---------------------------------------------------------------------------
1619    Create a new thread.
1620
1621    The new thread starts with the given stack size.  Before the
1622    scheduler can run, however, this thread needs to have a closure
1623    (and possibly some arguments) pushed on its stack.  See
1624    pushClosure() in Schedule.h.
1625
1626    createGenThread() and createIOThread() (in SchedAPI.h) are
1627    convenient packaged versions of this function.
1628
1629    currently pri (priority) is only used in a GRAN setup -- HWL
1630    ------------------------------------------------------------------------ */
1631 //@cindex createThread
1632 #if defined(GRAN)
1633 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1634 StgTSO *
1635 createThread(nat stack_size, StgInt pri)
1636 {
1637   return createThread_(stack_size, rtsFalse, pri);
1638 }
1639
1640 static StgTSO *
1641 createThread_(nat size, rtsBool have_lock, StgInt pri)
1642 {
1643 #else
1644 StgTSO *
1645 createThread(nat stack_size)
1646 {
1647   return createThread_(stack_size, rtsFalse);
1648 }
1649
1650 static StgTSO *
1651 createThread_(nat size, rtsBool have_lock)
1652 {
1653 #endif
1654
1655     StgTSO *tso;
1656     nat stack_size;
1657
1658     /* First check whether we should create a thread at all */
1659 #if defined(PAR)
1660   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1661   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1662     threadsIgnored++;
1663     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1664           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1665     return END_TSO_QUEUE;
1666   }
1667   threadsCreated++;
1668 #endif
1669
1670 #if defined(GRAN)
1671   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1672 #endif
1673
1674   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1675
1676   /* catch ridiculously small stack sizes */
1677   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1678     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1679   }
1680
1681   stack_size = size - TSO_STRUCT_SIZEW;
1682
1683   tso = (StgTSO *)allocate(size);
1684   TICK_ALLOC_TSO(stack_size, 0);
1685
1686   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1687 #if defined(GRAN)
1688   SET_GRAN_HDR(tso, ThisPE);
1689 #endif
1690   tso->what_next     = ThreadEnterGHC;
1691
1692   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1693    * protect the increment operation on next_thread_id.
1694    * In future, we could use an atomic increment instead.
1695    */
1696   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1697   tso->id = next_thread_id++; 
1698   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1699
1700   tso->why_blocked  = NotBlocked;
1701   tso->blocked_exceptions = NULL;
1702
1703   tso->stack_size   = stack_size;
1704   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1705                               - TSO_STRUCT_SIZEW;
1706   tso->sp           = (P_)&(tso->stack) + stack_size;
1707
1708 #ifdef PROFILING
1709   tso->prof.CCCS = CCS_MAIN;
1710 #endif
1711
1712   /* put a stop frame on the stack */
1713   tso->sp -= sizeofW(StgStopFrame);
1714   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1715   tso->su = (StgUpdateFrame*)tso->sp;
1716
1717   // ToDo: check this
1718 #if defined(GRAN)
1719   tso->link = END_TSO_QUEUE;
1720   /* uses more flexible routine in GranSim */
1721   insertThread(tso, CurrentProc);
1722 #else
1723   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1724    * from its creation
1725    */
1726 #endif
1727
1728 #if defined(GRAN) 
1729   if (RtsFlags.GranFlags.GranSimStats.Full) 
1730     DumpGranEvent(GR_START,tso);
1731 #elif defined(PAR)
1732   if (RtsFlags.ParFlags.ParStats.Full) 
1733     DumpGranEvent(GR_STARTQ,tso);
1734   /* HACk to avoid SCHEDULE 
1735      LastTSO = tso; */
1736 #endif
1737
1738   /* Link the new thread on the global thread list.
1739    */
1740   tso->global_link = all_threads;
1741   all_threads = tso;
1742
1743 #if defined(DIST)
1744   tso->dist.priority = MandatoryPriority; //by default that is...
1745 #endif
1746
1747 #if defined(GRAN)
1748   tso->gran.pri = pri;
1749 # if defined(DEBUG)
1750   tso->gran.magic = TSO_MAGIC; // debugging only
1751 # endif
1752   tso->gran.sparkname   = 0;
1753   tso->gran.startedat   = CURRENT_TIME; 
1754   tso->gran.exported    = 0;
1755   tso->gran.basicblocks = 0;
1756   tso->gran.allocs      = 0;
1757   tso->gran.exectime    = 0;
1758   tso->gran.fetchtime   = 0;
1759   tso->gran.fetchcount  = 0;
1760   tso->gran.blocktime   = 0;
1761   tso->gran.blockcount  = 0;
1762   tso->gran.blockedat   = 0;
1763   tso->gran.globalsparks = 0;
1764   tso->gran.localsparks  = 0;
1765   if (RtsFlags.GranFlags.Light)
1766     tso->gran.clock  = Now; /* local clock */
1767   else
1768     tso->gran.clock  = 0;
1769
1770   IF_DEBUG(gran,printTSO(tso));
1771 #elif defined(PAR)
1772 # if defined(DEBUG)
1773   tso->par.magic = TSO_MAGIC; // debugging only
1774 # endif
1775   tso->par.sparkname   = 0;
1776   tso->par.startedat   = CURRENT_TIME; 
1777   tso->par.exported    = 0;
1778   tso->par.basicblocks = 0;
1779   tso->par.allocs      = 0;
1780   tso->par.exectime    = 0;
1781   tso->par.fetchtime   = 0;
1782   tso->par.fetchcount  = 0;
1783   tso->par.blocktime   = 0;
1784   tso->par.blockcount  = 0;
1785   tso->par.blockedat   = 0;
1786   tso->par.globalsparks = 0;
1787   tso->par.localsparks  = 0;
1788 #endif
1789
1790 #if defined(GRAN)
1791   globalGranStats.tot_threads_created++;
1792   globalGranStats.threads_created_on_PE[CurrentProc]++;
1793   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1794   globalGranStats.tot_sq_probes++;
1795 #elif defined(PAR)
1796   // collect parallel global statistics (currently done together with GC stats)
1797   if (RtsFlags.ParFlags.ParStats.Global &&
1798       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1799     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1800     globalParStats.tot_threads_created++;
1801   }
1802 #endif 
1803
1804 #if defined(GRAN)
1805   IF_GRAN_DEBUG(pri,
1806                 belch("==__ schedule: Created TSO %d (%p);",
1807                       CurrentProc, tso, tso->id));
1808 #elif defined(PAR)
1809     IF_PAR_DEBUG(verbose,
1810                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1811                        tso->id, tso, advisory_thread_count));
1812 #else
1813   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1814                                  tso->id, tso->stack_size));
1815 #endif    
1816   return tso;
1817 }
1818
1819 #if defined(PAR)
1820 /* RFP:
1821    all parallel thread creation calls should fall through the following routine.
1822 */
1823 StgTSO *
1824 createSparkThread(rtsSpark spark) 
1825 { StgTSO *tso;
1826   ASSERT(spark != (rtsSpark)NULL);
1827   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1828   { threadsIgnored++;
1829     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1830           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1831     return END_TSO_QUEUE;
1832   }
1833   else
1834   { threadsCreated++;
1835     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1836     if (tso==END_TSO_QUEUE)     
1837       barf("createSparkThread: Cannot create TSO");
1838 #if defined(DIST)
1839     tso->priority = AdvisoryPriority;
1840 #endif
1841     pushClosure(tso,spark);
1842     PUSH_ON_RUN_QUEUE(tso);
1843     advisory_thread_count++;    
1844   }
1845   return tso;
1846 }
1847 #endif
1848
1849 /*
1850   Turn a spark into a thread.
1851   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1852 */
1853 #if defined(PAR)
1854 //@cindex activateSpark
1855 StgTSO *
1856 activateSpark (rtsSpark spark) 
1857 {
1858   StgTSO *tso;
1859
1860   tso = createSparkThread(spark);
1861   if (RtsFlags.ParFlags.ParStats.Full) {   
1862     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1863     IF_PAR_DEBUG(verbose,
1864                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1865                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1866   }
1867   // ToDo: fwd info on local/global spark to thread -- HWL
1868   // tso->gran.exported =  spark->exported;
1869   // tso->gran.locked =   !spark->global;
1870   // tso->gran.sparkname = spark->name;
1871
1872   return tso;
1873 }
1874 #endif
1875
1876 /* ---------------------------------------------------------------------------
1877  * scheduleThread()
1878  *
1879  * scheduleThread puts a thread on the head of the runnable queue.
1880  * This will usually be done immediately after a thread is created.
1881  * The caller of scheduleThread must create the thread using e.g.
1882  * createThread and push an appropriate closure
1883  * on this thread's stack before the scheduler is invoked.
1884  * ------------------------------------------------------------------------ */
1885
1886 void
1887 scheduleThread(StgTSO *tso)
1888 {
1889   ACQUIRE_LOCK(&sched_mutex);
1890
1891   /* Put the new thread on the head of the runnable queue.  The caller
1892    * better push an appropriate closure on this thread's stack
1893    * beforehand.  In the SMP case, the thread may start running as
1894    * soon as we release the scheduler lock below.
1895    */
1896   PUSH_ON_RUN_QUEUE(tso);
1897   THREAD_RUNNABLE();
1898
1899 #if 0
1900   IF_DEBUG(scheduler,printTSO(tso));
1901 #endif
1902   RELEASE_LOCK(&sched_mutex);
1903 }
1904
1905 /* ---------------------------------------------------------------------------
1906  * initScheduler()
1907  *
1908  * Initialise the scheduler.  This resets all the queues - if the
1909  * queues contained any threads, they'll be garbage collected at the
1910  * next pass.
1911  *
1912  * ------------------------------------------------------------------------ */
1913
1914 #ifdef SMP
1915 static void
1916 term_handler(int sig STG_UNUSED)
1917 {
1918   stat_workerStop();
1919   ACQUIRE_LOCK(&term_mutex);
1920   await_death--;
1921   RELEASE_LOCK(&term_mutex);
1922   shutdownThread();
1923 }
1924 #endif
1925
1926 void 
1927 initScheduler(void)
1928 {
1929 #if defined(GRAN)
1930   nat i;
1931
1932   for (i=0; i<=MAX_PROC; i++) {
1933     run_queue_hds[i]      = END_TSO_QUEUE;
1934     run_queue_tls[i]      = END_TSO_QUEUE;
1935     blocked_queue_hds[i]  = END_TSO_QUEUE;
1936     blocked_queue_tls[i]  = END_TSO_QUEUE;
1937     ccalling_threadss[i]  = END_TSO_QUEUE;
1938     sleeping_queue        = END_TSO_QUEUE;
1939   }
1940 #else
1941   run_queue_hd      = END_TSO_QUEUE;
1942   run_queue_tl      = END_TSO_QUEUE;
1943   blocked_queue_hd  = END_TSO_QUEUE;
1944   blocked_queue_tl  = END_TSO_QUEUE;
1945   sleeping_queue    = END_TSO_QUEUE;
1946 #endif 
1947
1948   suspended_ccalling_threads  = END_TSO_QUEUE;
1949
1950   main_threads = NULL;
1951   all_threads  = END_TSO_QUEUE;
1952
1953   context_switch = 0;
1954   interrupted    = 0;
1955
1956   RtsFlags.ConcFlags.ctxtSwitchTicks =
1957       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1958       
1959 #if defined(RTS_SUPPORTS_THREADS)
1960   /* Initialise the mutex and condition variables used by
1961    * the scheduler. */
1962   initMutex(&rts_mutex);
1963   initMutex(&sched_mutex);
1964   initMutex(&term_mutex);
1965 #if defined(THREADED_RTS)
1966   initMutex(&thread_ready_aux_mutex);
1967 #endif
1968   
1969   initCondition(&thread_ready_cond);
1970   initCondition(&gc_pending_cond);
1971 #endif
1972
1973 #if defined(THREADED_RTS)
1974   /* Grab big lock */
1975   ACQUIRE_LOCK(&rts_mutex);
1976   IF_DEBUG(scheduler, 
1977            sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId()));
1978 #endif
1979
1980   /* Install the SIGHUP handler */
1981 #ifdef SMP
1982   {
1983     struct sigaction action,oact;
1984
1985     action.sa_handler = term_handler;
1986     sigemptyset(&action.sa_mask);
1987     action.sa_flags = 0;
1988     if (sigaction(SIGTERM, &action, &oact) != 0) {
1989       barf("can't install TERM handler");
1990     }
1991   }
1992 #endif
1993
1994   /* A capability holds the state a native thread needs in
1995    * order to execute STG code. At least one capability is
1996    * floating around (only SMP builds have more than one).
1997    */
1998   initCapabilities();
1999   
2000 #if defined(RTS_SUPPORTS_THREADS)
2001     /* start our haskell execution tasks */
2002 # if defined(SMP)
2003     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2004 # else
2005     startTaskManager(0,taskStart);
2006 # endif
2007 #endif
2008
2009 #if /* defined(SMP) ||*/ defined(PAR)
2010   initSparkPools();
2011 #endif
2012 }
2013
2014 void
2015 exitScheduler( void )
2016 {
2017 #if defined(RTS_SUPPORTS_THREADS)
2018   stopTaskManager();
2019 #endif
2020 }
2021
2022 /* -----------------------------------------------------------------------------
2023    Managing the per-task allocation areas.
2024    
2025    Each capability comes with an allocation area.  These are
2026    fixed-length block lists into which allocation can be done.
2027
2028    ToDo: no support for two-space collection at the moment???
2029    -------------------------------------------------------------------------- */
2030
2031 /* -----------------------------------------------------------------------------
2032  * waitThread is the external interface for running a new computation
2033  * and waiting for the result.
2034  *
2035  * In the non-SMP case, we create a new main thread, push it on the 
2036  * main-thread stack, and invoke the scheduler to run it.  The
2037  * scheduler will return when the top main thread on the stack has
2038  * completed or died, and fill in the necessary fields of the
2039  * main_thread structure.
2040  *
2041  * In the SMP case, we create a main thread as before, but we then
2042  * create a new condition variable and sleep on it.  When our new
2043  * main thread has completed, we'll be woken up and the status/result
2044  * will be in the main_thread struct.
2045  * -------------------------------------------------------------------------- */
2046
2047 int 
2048 howManyThreadsAvail ( void )
2049 {
2050    int i = 0;
2051    StgTSO* q;
2052    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2053       i++;
2054    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2055       i++;
2056    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2057       i++;
2058    return i;
2059 }
2060
2061 void
2062 finishAllThreads ( void )
2063 {
2064    do {
2065       while (run_queue_hd != END_TSO_QUEUE) {
2066          waitThread ( run_queue_hd, NULL );
2067       }
2068       while (blocked_queue_hd != END_TSO_QUEUE) {
2069          waitThread ( blocked_queue_hd, NULL );
2070       }
2071       while (sleeping_queue != END_TSO_QUEUE) {
2072          waitThread ( blocked_queue_hd, NULL );
2073       }
2074    } while 
2075       (blocked_queue_hd != END_TSO_QUEUE || 
2076        run_queue_hd     != END_TSO_QUEUE ||
2077        sleeping_queue   != END_TSO_QUEUE);
2078 }
2079
2080 SchedulerStatus
2081 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2082 {
2083   StgMainThread *m;
2084   SchedulerStatus stat;
2085
2086   ACQUIRE_LOCK(&sched_mutex);
2087   
2088   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2089
2090   m->tso = tso;
2091   m->ret = ret;
2092   m->stat = NoStatus;
2093 #if defined(RTS_SUPPORTS_THREADS)
2094   initCondition(&m->wakeup);
2095 #endif
2096
2097   m->link = main_threads;
2098   main_threads = m;
2099
2100   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2101                               m->tso->id));
2102
2103 #ifdef SMP
2104   do {
2105     waitCondition(&m->wakeup, &sched_mutex);
2106   } while (m->stat == NoStatus);
2107 #elif defined(GRAN)
2108   /* GranSim specific init */
2109   CurrentTSO = m->tso;                // the TSO to run
2110   procStatus[MainProc] = Busy;        // status of main PE
2111   CurrentProc = MainProc;             // PE to run it on
2112
2113   schedule();
2114 #else
2115   RELEASE_LOCK(&sched_mutex);
2116   schedule();
2117   ASSERT(m->stat != NoStatus);
2118 #endif
2119
2120   stat = m->stat;
2121
2122 #if defined(RTS_SUPPORTS_THREADS)
2123   closeCondition(&m->wakeup);
2124 #endif
2125
2126   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2127                               m->tso->id));
2128   free(m);
2129
2130   RELEASE_LOCK(&sched_mutex);
2131
2132   return stat;
2133 }
2134
2135 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2136 //@subsection Run queue code 
2137
2138 #if 0
2139 /* 
2140    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2141        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2142        implicit global variable that has to be correct when calling these
2143        fcts -- HWL 
2144 */
2145
2146 /* Put the new thread on the head of the runnable queue.
2147  * The caller of createThread better push an appropriate closure
2148  * on this thread's stack before the scheduler is invoked.
2149  */
2150 static /* inline */ void
2151 add_to_run_queue(tso)
2152 StgTSO* tso; 
2153 {
2154   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2155   tso->link = run_queue_hd;
2156   run_queue_hd = tso;
2157   if (run_queue_tl == END_TSO_QUEUE) {
2158     run_queue_tl = tso;
2159   }
2160 }
2161
2162 /* Put the new thread at the end of the runnable queue. */
2163 static /* inline */ void
2164 push_on_run_queue(tso)
2165 StgTSO* tso; 
2166 {
2167   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2168   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2169   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2170   if (run_queue_hd == END_TSO_QUEUE) {
2171     run_queue_hd = tso;
2172   } else {
2173     run_queue_tl->link = tso;
2174   }
2175   run_queue_tl = tso;
2176 }
2177
2178 /* 
2179    Should be inlined because it's used very often in schedule.  The tso
2180    argument is actually only needed in GranSim, where we want to have the
2181    possibility to schedule *any* TSO on the run queue, irrespective of the
2182    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2183    the run queue and dequeue the tso, adjusting the links in the queue. 
2184 */
2185 //@cindex take_off_run_queue
2186 static /* inline */ StgTSO*
2187 take_off_run_queue(StgTSO *tso) {
2188   StgTSO *t, *prev;
2189
2190   /* 
2191      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2192
2193      if tso is specified, unlink that tso from the run_queue (doesn't have
2194      to be at the beginning of the queue); GranSim only 
2195   */
2196   if (tso!=END_TSO_QUEUE) {
2197     /* find tso in queue */
2198     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2199          t!=END_TSO_QUEUE && t!=tso;
2200          prev=t, t=t->link) 
2201       /* nothing */ ;
2202     ASSERT(t==tso);
2203     /* now actually dequeue the tso */
2204     if (prev!=END_TSO_QUEUE) {
2205       ASSERT(run_queue_hd!=t);
2206       prev->link = t->link;
2207     } else {
2208       /* t is at beginning of thread queue */
2209       ASSERT(run_queue_hd==t);
2210       run_queue_hd = t->link;
2211     }
2212     /* t is at end of thread queue */
2213     if (t->link==END_TSO_QUEUE) {
2214       ASSERT(t==run_queue_tl);
2215       run_queue_tl = prev;
2216     } else {
2217       ASSERT(run_queue_tl!=t);
2218     }
2219     t->link = END_TSO_QUEUE;
2220   } else {
2221     /* take tso from the beginning of the queue; std concurrent code */
2222     t = run_queue_hd;
2223     if (t != END_TSO_QUEUE) {
2224       run_queue_hd = t->link;
2225       t->link = END_TSO_QUEUE;
2226       if (run_queue_hd == END_TSO_QUEUE) {
2227         run_queue_tl = END_TSO_QUEUE;
2228       }
2229     }
2230   }
2231   return t;
2232 }
2233
2234 #endif /* 0 */
2235
2236 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2237 //@subsection Garbage Collextion Routines
2238
2239 /* ---------------------------------------------------------------------------
2240    Where are the roots that we know about?
2241
2242         - all the threads on the runnable queue
2243         - all the threads on the blocked queue
2244         - all the threads on the sleeping queue
2245         - all the thread currently executing a _ccall_GC
2246         - all the "main threads"
2247      
2248    ------------------------------------------------------------------------ */
2249
2250 /* This has to be protected either by the scheduler monitor, or by the
2251         garbage collection monitor (probably the latter).
2252         KH @ 25/10/99
2253 */
2254
2255 void
2256 GetRoots(evac_fn evac)
2257 {
2258   StgMainThread *m;
2259
2260 #if defined(GRAN)
2261   {
2262     nat i;
2263     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2264       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2265           evac((StgClosure **)&run_queue_hds[i]);
2266       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2267           evac((StgClosure **)&run_queue_tls[i]);
2268       
2269       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2270           evac((StgClosure **)&blocked_queue_hds[i]);
2271       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2272           evac((StgClosure **)&blocked_queue_tls[i]);
2273       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2274           evac((StgClosure **)&ccalling_threads[i]);
2275     }
2276   }
2277
2278   markEventQueue();
2279
2280 #else /* !GRAN */
2281   if (run_queue_hd != END_TSO_QUEUE) {
2282       ASSERT(run_queue_tl != END_TSO_QUEUE);
2283       evac((StgClosure **)&run_queue_hd);
2284       evac((StgClosure **)&run_queue_tl);
2285   }
2286   
2287   if (blocked_queue_hd != END_TSO_QUEUE) {
2288       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2289       evac((StgClosure **)&blocked_queue_hd);
2290       evac((StgClosure **)&blocked_queue_tl);
2291   }
2292   
2293   if (sleeping_queue != END_TSO_QUEUE) {
2294       evac((StgClosure **)&sleeping_queue);
2295   }
2296 #endif 
2297
2298   for (m = main_threads; m != NULL; m = m->link) {
2299       evac((StgClosure **)&m->tso);
2300   }
2301   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2302       evac((StgClosure **)&suspended_ccalling_threads);
2303   }
2304
2305 #if defined(PAR) || defined(GRAN)
2306   markSparkQueue(evac);
2307 #endif
2308 }
2309
2310 /* -----------------------------------------------------------------------------
2311    performGC
2312
2313    This is the interface to the garbage collector from Haskell land.
2314    We provide this so that external C code can allocate and garbage
2315    collect when called from Haskell via _ccall_GC.
2316
2317    It might be useful to provide an interface whereby the programmer
2318    can specify more roots (ToDo).
2319    
2320    This needs to be protected by the GC condition variable above.  KH.
2321    -------------------------------------------------------------------------- */
2322
2323 void (*extra_roots)(evac_fn);
2324
2325 void
2326 performGC(void)
2327 {
2328   GarbageCollect(GetRoots,rtsFalse);
2329 }
2330
2331 void
2332 performMajorGC(void)
2333 {
2334   GarbageCollect(GetRoots,rtsTrue);
2335 }
2336
2337 static void
2338 AllRoots(evac_fn evac)
2339 {
2340     GetRoots(evac);             // the scheduler's roots
2341     extra_roots(evac);          // the user's roots
2342 }
2343
2344 void
2345 performGCWithRoots(void (*get_roots)(evac_fn))
2346 {
2347   extra_roots = get_roots;
2348   GarbageCollect(AllRoots,rtsFalse);
2349 }
2350
2351 /* -----------------------------------------------------------------------------
2352    Stack overflow
2353
2354    If the thread has reached its maximum stack size, then raise the
2355    StackOverflow exception in the offending thread.  Otherwise
2356    relocate the TSO into a larger chunk of memory and adjust its stack
2357    size appropriately.
2358    -------------------------------------------------------------------------- */
2359
2360 static StgTSO *
2361 threadStackOverflow(StgTSO *tso)
2362 {
2363   nat new_stack_size, new_tso_size, diff, stack_words;
2364   StgPtr new_sp;
2365   StgTSO *dest;
2366
2367   IF_DEBUG(sanity,checkTSO(tso));
2368   if (tso->stack_size >= tso->max_stack_size) {
2369
2370     IF_DEBUG(gc,
2371              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2372                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2373              /* If we're debugging, just print out the top of the stack */
2374              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2375                                               tso->sp+64)));
2376
2377     /* Send this thread the StackOverflow exception */
2378     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2379     return tso;
2380   }
2381
2382   /* Try to double the current stack size.  If that takes us over the
2383    * maximum stack size for this thread, then use the maximum instead.
2384    * Finally round up so the TSO ends up as a whole number of blocks.
2385    */
2386   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2387   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2388                                        TSO_STRUCT_SIZE)/sizeof(W_);
2389   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2390   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2391
2392   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2393
2394   dest = (StgTSO *)allocate(new_tso_size);
2395   TICK_ALLOC_TSO(new_stack_size,0);
2396
2397   /* copy the TSO block and the old stack into the new area */
2398   memcpy(dest,tso,TSO_STRUCT_SIZE);
2399   stack_words = tso->stack + tso->stack_size - tso->sp;
2400   new_sp = (P_)dest + new_tso_size - stack_words;
2401   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2402
2403   /* relocate the stack pointers... */
2404   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2405   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2406   dest->sp    = new_sp;
2407   dest->stack_size = new_stack_size;
2408         
2409   /* and relocate the update frame list */
2410   relocate_stack(dest, diff);
2411
2412   /* Mark the old TSO as relocated.  We have to check for relocated
2413    * TSOs in the garbage collector and any primops that deal with TSOs.
2414    *
2415    * It's important to set the sp and su values to just beyond the end
2416    * of the stack, so we don't attempt to scavenge any part of the
2417    * dead TSO's stack.
2418    */
2419   tso->what_next = ThreadRelocated;
2420   tso->link = dest;
2421   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2422   tso->su = (StgUpdateFrame *)tso->sp;
2423   tso->why_blocked = NotBlocked;
2424   dest->mut_link = NULL;
2425
2426   IF_PAR_DEBUG(verbose,
2427                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2428                      tso->id, tso, tso->stack_size);
2429                /* If we're debugging, just print out the top of the stack */
2430                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2431                                                 tso->sp+64)));
2432   
2433   IF_DEBUG(sanity,checkTSO(tso));
2434 #if 0
2435   IF_DEBUG(scheduler,printTSO(dest));
2436 #endif
2437
2438   return dest;
2439 }
2440
2441 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2442 //@subsection Blocking Queue Routines
2443
2444 /* ---------------------------------------------------------------------------
2445    Wake up a queue that was blocked on some resource.
2446    ------------------------------------------------------------------------ */
2447
2448 #if defined(GRAN)
2449 static inline void
2450 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2451 {
2452 }
2453 #elif defined(PAR)
2454 static inline void
2455 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2456 {
2457   /* write RESUME events to log file and
2458      update blocked and fetch time (depending on type of the orig closure) */
2459   if (RtsFlags.ParFlags.ParStats.Full) {
2460     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2461                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2462                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2463     if (EMPTY_RUN_QUEUE())
2464       emitSchedule = rtsTrue;
2465
2466     switch (get_itbl(node)->type) {
2467         case FETCH_ME_BQ:
2468           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2469           break;
2470         case RBH:
2471         case FETCH_ME:
2472         case BLACKHOLE_BQ:
2473           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2474           break;
2475 #ifdef DIST
2476         case MVAR:
2477           break;
2478 #endif    
2479         default:
2480           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2481         }
2482       }
2483 }
2484 #endif
2485
2486 #if defined(GRAN)
2487 static StgBlockingQueueElement *
2488 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2489 {
2490     StgTSO *tso;
2491     PEs node_loc, tso_loc;
2492
2493     node_loc = where_is(node); // should be lifted out of loop
2494     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2495     tso_loc = where_is((StgClosure *)tso);
2496     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2497       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2498       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2499       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2500       // insertThread(tso, node_loc);
2501       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2502                 ResumeThread,
2503                 tso, node, (rtsSpark*)NULL);
2504       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2505       // len_local++;
2506       // len++;
2507     } else { // TSO is remote (actually should be FMBQ)
2508       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2509                                   RtsFlags.GranFlags.Costs.gunblocktime +
2510                                   RtsFlags.GranFlags.Costs.latency;
2511       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2512                 UnblockThread,
2513                 tso, node, (rtsSpark*)NULL);
2514       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2515       // len++;
2516     }
2517     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2518     IF_GRAN_DEBUG(bq,
2519                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2520                           (node_loc==tso_loc ? "Local" : "Global"), 
2521                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2522     tso->block_info.closure = NULL;
2523     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2524                              tso->id, tso));
2525 }
2526 #elif defined(PAR)
2527 static StgBlockingQueueElement *
2528 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2529 {
2530     StgBlockingQueueElement *next;
2531
2532     switch (get_itbl(bqe)->type) {
2533     case TSO:
2534       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2535       /* if it's a TSO just push it onto the run_queue */
2536       next = bqe->link;
2537       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2538       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2539       THREAD_RUNNABLE();
2540       unblockCount(bqe, node);
2541       /* reset blocking status after dumping event */
2542       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2543       break;
2544
2545     case BLOCKED_FETCH:
2546       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2547       next = bqe->link;
2548       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2549       PendingFetches = (StgBlockedFetch *)bqe;
2550       break;
2551
2552 # if defined(DEBUG)
2553       /* can ignore this case in a non-debugging setup; 
2554          see comments on RBHSave closures above */
2555     case CONSTR:
2556       /* check that the closure is an RBHSave closure */
2557       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2558              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2559              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2560       break;
2561
2562     default:
2563       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2564            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2565            (StgClosure *)bqe);
2566 # endif
2567     }
2568   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2569   return next;
2570 }
2571
2572 #else /* !GRAN && !PAR */
2573 static StgTSO *
2574 unblockOneLocked(StgTSO *tso)
2575 {
2576   StgTSO *next;
2577
2578   ASSERT(get_itbl(tso)->type == TSO);
2579   ASSERT(tso->why_blocked != NotBlocked);
2580   tso->why_blocked = NotBlocked;
2581   next = tso->link;
2582   PUSH_ON_RUN_QUEUE(tso);
2583   THREAD_RUNNABLE();
2584   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2585   return next;
2586 }
2587 #endif
2588
2589 #if defined(GRAN) || defined(PAR)
2590 inline StgBlockingQueueElement *
2591 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2592 {
2593   ACQUIRE_LOCK(&sched_mutex);
2594   bqe = unblockOneLocked(bqe, node);
2595   RELEASE_LOCK(&sched_mutex);
2596   return bqe;
2597 }
2598 #else
2599 inline StgTSO *
2600 unblockOne(StgTSO *tso)
2601 {
2602   ACQUIRE_LOCK(&sched_mutex);
2603   tso = unblockOneLocked(tso);
2604   RELEASE_LOCK(&sched_mutex);
2605   return tso;
2606 }
2607 #endif
2608
2609 #if defined(GRAN)
2610 void 
2611 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2612 {
2613   StgBlockingQueueElement *bqe;
2614   PEs node_loc;
2615   nat len = 0; 
2616
2617   IF_GRAN_DEBUG(bq, 
2618                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2619                       node, CurrentProc, CurrentTime[CurrentProc], 
2620                       CurrentTSO->id, CurrentTSO));
2621
2622   node_loc = where_is(node);
2623
2624   ASSERT(q == END_BQ_QUEUE ||
2625          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2626          get_itbl(q)->type == CONSTR); // closure (type constructor)
2627   ASSERT(is_unique(node));
2628
2629   /* FAKE FETCH: magically copy the node to the tso's proc;
2630      no Fetch necessary because in reality the node should not have been 
2631      moved to the other PE in the first place
2632   */
2633   if (CurrentProc!=node_loc) {
2634     IF_GRAN_DEBUG(bq, 
2635                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2636                         node, node_loc, CurrentProc, CurrentTSO->id, 
2637                         // CurrentTSO, where_is(CurrentTSO),
2638                         node->header.gran.procs));
2639     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2640     IF_GRAN_DEBUG(bq, 
2641                   belch("## new bitmask of node %p is %#x",
2642                         node, node->header.gran.procs));
2643     if (RtsFlags.GranFlags.GranSimStats.Global) {
2644       globalGranStats.tot_fake_fetches++;
2645     }
2646   }
2647
2648   bqe = q;
2649   // ToDo: check: ASSERT(CurrentProc==node_loc);
2650   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2651     //next = bqe->link;
2652     /* 
2653        bqe points to the current element in the queue
2654        next points to the next element in the queue
2655     */
2656     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2657     //tso_loc = where_is(tso);
2658     len++;
2659     bqe = unblockOneLocked(bqe, node);
2660   }
2661
2662   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2663      the closure to make room for the anchor of the BQ */
2664   if (bqe!=END_BQ_QUEUE) {
2665     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2666     /*
2667     ASSERT((info_ptr==&RBH_Save_0_info) ||
2668            (info_ptr==&RBH_Save_1_info) ||
2669            (info_ptr==&RBH_Save_2_info));
2670     */
2671     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2672     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2673     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2674
2675     IF_GRAN_DEBUG(bq,
2676                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2677                         node, info_type(node)));
2678   }
2679
2680   /* statistics gathering */
2681   if (RtsFlags.GranFlags.GranSimStats.Global) {
2682     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2683     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2684     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2685     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2686   }
2687   IF_GRAN_DEBUG(bq,
2688                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2689                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2690 }
2691 #elif defined(PAR)
2692 void 
2693 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2694 {
2695   StgBlockingQueueElement *bqe;
2696
2697   ACQUIRE_LOCK(&sched_mutex);
2698
2699   IF_PAR_DEBUG(verbose, 
2700                belch("##-_ AwBQ for node %p on [%x]: ",
2701                      node, mytid));
2702 #ifdef DIST  
2703   //RFP
2704   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2705     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2706     return;
2707   }
2708 #endif
2709   
2710   ASSERT(q == END_BQ_QUEUE ||
2711          get_itbl(q)->type == TSO ||           
2712          get_itbl(q)->type == BLOCKED_FETCH || 
2713          get_itbl(q)->type == CONSTR); 
2714
2715   bqe = q;
2716   while (get_itbl(bqe)->type==TSO || 
2717          get_itbl(bqe)->type==BLOCKED_FETCH) {
2718     bqe = unblockOneLocked(bqe, node);
2719   }
2720   RELEASE_LOCK(&sched_mutex);
2721 }
2722
2723 #else   /* !GRAN && !PAR */
2724 void
2725 awakenBlockedQueue(StgTSO *tso)
2726 {
2727   ACQUIRE_LOCK(&sched_mutex);
2728   while (tso != END_TSO_QUEUE) {
2729     tso = unblockOneLocked(tso);
2730   }
2731   RELEASE_LOCK(&sched_mutex);
2732 }
2733 #endif
2734
2735 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2736 //@subsection Exception Handling Routines
2737
2738 /* ---------------------------------------------------------------------------
2739    Interrupt execution
2740    - usually called inside a signal handler so it mustn't do anything fancy.   
2741    ------------------------------------------------------------------------ */
2742
2743 void
2744 interruptStgRts(void)
2745 {
2746     interrupted    = 1;
2747     context_switch = 1;
2748 }
2749
2750 /* -----------------------------------------------------------------------------
2751    Unblock a thread
2752
2753    This is for use when we raise an exception in another thread, which
2754    may be blocked.
2755    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2756    -------------------------------------------------------------------------- */
2757
2758 #if defined(GRAN) || defined(PAR)
2759 /*
2760   NB: only the type of the blocking queue is different in GranSim and GUM
2761       the operations on the queue-elements are the same
2762       long live polymorphism!
2763 */
2764 static void
2765 unblockThread(StgTSO *tso)
2766 {
2767   StgBlockingQueueElement *t, **last;
2768
2769   ACQUIRE_LOCK(&sched_mutex);
2770   switch (tso->why_blocked) {
2771
2772   case NotBlocked:
2773     return;  /* not blocked */
2774
2775   case BlockedOnMVar:
2776     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2777     {
2778       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2779       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2780
2781       last = (StgBlockingQueueElement **)&mvar->head;
2782       for (t = (StgBlockingQueueElement *)mvar->head; 
2783            t != END_BQ_QUEUE; 
2784            last = &t->link, last_tso = t, t = t->link) {
2785         if (t == (StgBlockingQueueElement *)tso) {
2786           *last = (StgBlockingQueueElement *)tso->link;
2787           if (mvar->tail == tso) {
2788             mvar->tail = (StgTSO *)last_tso;
2789           }
2790           goto done;
2791         }
2792       }
2793       barf("unblockThread (MVAR): TSO not found");
2794     }
2795
2796   case BlockedOnBlackHole:
2797     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2798     {
2799       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2800
2801       last = &bq->blocking_queue;
2802       for (t = bq->blocking_queue; 
2803            t != END_BQ_QUEUE; 
2804            last = &t->link, t = t->link) {
2805         if (t == (StgBlockingQueueElement *)tso) {
2806           *last = (StgBlockingQueueElement *)tso->link;
2807           goto done;
2808         }
2809       }
2810       barf("unblockThread (BLACKHOLE): TSO not found");
2811     }
2812
2813   case BlockedOnException:
2814     {
2815       StgTSO *target  = tso->block_info.tso;
2816
2817       ASSERT(get_itbl(target)->type == TSO);
2818
2819       if (target->what_next == ThreadRelocated) {
2820           target = target->link;
2821           ASSERT(get_itbl(target)->type == TSO);
2822       }
2823
2824       ASSERT(target->blocked_exceptions != NULL);
2825
2826       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2827       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2828            t != END_BQ_QUEUE; 
2829            last = &t->link, t = t->link) {
2830         ASSERT(get_itbl(t)->type == TSO);
2831         if (t == (StgBlockingQueueElement *)tso) {
2832           *last = (StgBlockingQueueElement *)tso->link;
2833           goto done;
2834         }
2835       }
2836       barf("unblockThread (Exception): TSO not found");
2837     }
2838
2839   case BlockedOnRead:
2840   case BlockedOnWrite:
2841     {
2842       /* take TSO off blocked_queue */
2843       StgBlockingQueueElement *prev = NULL;
2844       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2845            prev = t, t = t->link) {
2846         if (t == (StgBlockingQueueElement *)tso) {
2847           if (prev == NULL) {
2848             blocked_queue_hd = (StgTSO *)t->link;
2849             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2850               blocked_queue_tl = END_TSO_QUEUE;
2851             }
2852           } else {
2853             prev->link = t->link;
2854             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2855               blocked_queue_tl = (StgTSO *)prev;
2856             }
2857           }
2858           goto done;
2859         }
2860       }
2861       barf("unblockThread (I/O): TSO not found");
2862     }
2863
2864   case BlockedOnDelay:
2865     {
2866       /* take TSO off sleeping_queue */
2867       StgBlockingQueueElement *prev = NULL;
2868       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2869            prev = t, t = t->link) {
2870         if (t == (StgBlockingQueueElement *)tso) {
2871           if (prev == NULL) {
2872             sleeping_queue = (StgTSO *)t->link;
2873           } else {
2874             prev->link = t->link;
2875           }
2876           goto done;
2877         }
2878       }
2879       barf("unblockThread (I/O): TSO not found");
2880     }
2881
2882   default:
2883     barf("unblockThread");
2884   }
2885
2886  done:
2887   tso->link = END_TSO_QUEUE;
2888   tso->why_blocked = NotBlocked;
2889   tso->block_info.closure = NULL;
2890   PUSH_ON_RUN_QUEUE(tso);
2891   RELEASE_LOCK(&sched_mutex);
2892 }
2893 #else
2894 static void
2895 unblockThread(StgTSO *tso)
2896 {
2897   StgTSO *t, **last;
2898
2899   ACQUIRE_LOCK(&sched_mutex);
2900   switch (tso->why_blocked) {
2901
2902   case NotBlocked:
2903     return;  /* not blocked */
2904
2905   case BlockedOnMVar:
2906     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2907     {
2908       StgTSO *last_tso = END_TSO_QUEUE;
2909       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2910
2911       last = &mvar->head;
2912       for (t = mvar->head; t != END_TSO_QUEUE; 
2913            last = &t->link, last_tso = t, t = t->link) {
2914         if (t == tso) {
2915           *last = tso->link;
2916           if (mvar->tail == tso) {
2917             mvar->tail = last_tso;
2918           }
2919           goto done;
2920         }
2921       }
2922       barf("unblockThread (MVAR): TSO not found");
2923     }
2924
2925   case BlockedOnBlackHole:
2926     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2927     {
2928       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2929
2930       last = &bq->blocking_queue;
2931       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2932            last = &t->link, t = t->link) {
2933         if (t == tso) {
2934           *last = tso->link;
2935           goto done;
2936         }
2937       }
2938       barf("unblockThread (BLACKHOLE): TSO not found");
2939     }
2940
2941   case BlockedOnException:
2942     {
2943       StgTSO *target  = tso->block_info.tso;
2944
2945       ASSERT(get_itbl(target)->type == TSO);
2946
2947       while (target->what_next == ThreadRelocated) {
2948           target = target->link;
2949           ASSERT(get_itbl(target)->type == TSO);
2950       }
2951       
2952       ASSERT(target->blocked_exceptions != NULL);
2953
2954       last = &target->blocked_exceptions;
2955       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2956            last = &t->link, t = t->link) {
2957         ASSERT(get_itbl(t)->type == TSO);
2958         if (t == tso) {
2959           *last = tso->link;
2960           goto done;
2961         }
2962       }
2963       barf("unblockThread (Exception): TSO not found");
2964     }
2965
2966   case BlockedOnRead:
2967   case BlockedOnWrite:
2968     {
2969       StgTSO *prev = NULL;
2970       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2971            prev = t, t = t->link) {
2972         if (t == tso) {
2973           if (prev == NULL) {
2974             blocked_queue_hd = t->link;
2975             if (blocked_queue_tl == t) {
2976               blocked_queue_tl = END_TSO_QUEUE;
2977             }
2978           } else {
2979             prev->link = t->link;
2980             if (blocked_queue_tl == t) {
2981               blocked_queue_tl = prev;
2982             }
2983           }
2984           goto done;
2985         }
2986       }
2987       barf("unblockThread (I/O): TSO not found");
2988     }
2989
2990   case BlockedOnDelay:
2991     {
2992       StgTSO *prev = NULL;
2993       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2994            prev = t, t = t->link) {
2995         if (t == tso) {
2996           if (prev == NULL) {
2997             sleeping_queue = t->link;
2998           } else {
2999             prev->link = t->link;
3000           }
3001           goto done;
3002         }
3003       }
3004       barf("unblockThread (I/O): TSO not found");
3005     }
3006
3007   default:
3008     barf("unblockThread");
3009   }
3010
3011  done:
3012   tso->link = END_TSO_QUEUE;
3013   tso->why_blocked = NotBlocked;
3014   tso->block_info.closure = NULL;
3015   PUSH_ON_RUN_QUEUE(tso);
3016   RELEASE_LOCK(&sched_mutex);
3017 }
3018 #endif
3019
3020 /* -----------------------------------------------------------------------------
3021  * raiseAsync()
3022  *
3023  * The following function implements the magic for raising an
3024  * asynchronous exception in an existing thread.
3025  *
3026  * We first remove the thread from any queue on which it might be
3027  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3028  *
3029  * We strip the stack down to the innermost CATCH_FRAME, building
3030  * thunks in the heap for all the active computations, so they can 
3031  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3032  * an application of the handler to the exception, and push it on
3033  * the top of the stack.
3034  * 
3035  * How exactly do we save all the active computations?  We create an
3036  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3037  * AP_UPDs pushes everything from the corresponding update frame
3038  * upwards onto the stack.  (Actually, it pushes everything up to the
3039  * next update frame plus a pointer to the next AP_UPD object.
3040  * Entering the next AP_UPD object pushes more onto the stack until we
3041  * reach the last AP_UPD object - at which point the stack should look
3042  * exactly as it did when we killed the TSO and we can continue
3043  * execution by entering the closure on top of the stack.
3044  *
3045  * We can also kill a thread entirely - this happens if either (a) the 
3046  * exception passed to raiseAsync is NULL, or (b) there's no
3047  * CATCH_FRAME on the stack.  In either case, we strip the entire
3048  * stack and replace the thread with a zombie.
3049  *
3050  * -------------------------------------------------------------------------- */
3051  
3052 void 
3053 deleteThread(StgTSO *tso)
3054 {
3055   raiseAsync(tso,NULL);
3056 }
3057
3058 void
3059 raiseAsync(StgTSO *tso, StgClosure *exception)
3060 {
3061   StgUpdateFrame* su = tso->su;
3062   StgPtr          sp = tso->sp;
3063   
3064   /* Thread already dead? */
3065   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3066     return;
3067   }
3068
3069   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3070
3071   /* Remove it from any blocking queues */
3072   unblockThread(tso);
3073
3074   /* The stack freezing code assumes there's a closure pointer on
3075    * the top of the stack.  This isn't always the case with compiled
3076    * code, so we have to push a dummy closure on the top which just
3077    * returns to the next return address on the stack.
3078    */
3079   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3080     *(--sp) = (W_)&stg_dummy_ret_closure;
3081   }
3082
3083   while (1) {
3084     nat words = ((P_)su - (P_)sp) - 1;
3085     nat i;
3086     StgAP_UPD * ap;
3087
3088     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3089      * then build PAP(handler,exception,realworld#), and leave it on
3090      * top of the stack ready to enter.
3091      */
3092     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3093       StgCatchFrame *cf = (StgCatchFrame *)su;
3094       /* we've got an exception to raise, so let's pass it to the
3095        * handler in this frame.
3096        */
3097       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3098       TICK_ALLOC_UPD_PAP(3,0);
3099       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3100               
3101       ap->n_args = 2;
3102       ap->fun = cf->handler;    /* :: Exception -> IO a */
3103       ap->payload[0] = exception;
3104       ap->payload[1] = ARG_TAG(0); /* realworld token */
3105
3106       /* throw away the stack from Sp up to and including the
3107        * CATCH_FRAME.
3108        */
3109       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3110       tso->su = cf->link;
3111
3112       /* Restore the blocked/unblocked state for asynchronous exceptions
3113        * at the CATCH_FRAME.  
3114        *
3115        * If exceptions were unblocked at the catch, arrange that they
3116        * are unblocked again after executing the handler by pushing an
3117        * unblockAsyncExceptions_ret stack frame.
3118        */
3119       if (!cf->exceptions_blocked) {
3120         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3121       }
3122       
3123       /* Ensure that async exceptions are blocked when running the handler.
3124        */
3125       if (tso->blocked_exceptions == NULL) {
3126         tso->blocked_exceptions = END_TSO_QUEUE;
3127       }
3128       
3129       /* Put the newly-built PAP on top of the stack, ready to execute
3130        * when the thread restarts.
3131        */
3132       sp[0] = (W_)ap;
3133       tso->sp = sp;
3134       tso->what_next = ThreadEnterGHC;
3135       IF_DEBUG(sanity, checkTSO(tso));
3136       return;
3137     }
3138
3139     /* First build an AP_UPD consisting of the stack chunk above the
3140      * current update frame, with the top word on the stack as the
3141      * fun field.
3142      */
3143     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3144     
3145     ASSERT(words >= 0);
3146     
3147     ap->n_args = words;
3148     ap->fun    = (StgClosure *)sp[0];
3149     sp++;
3150     for(i=0; i < (nat)words; ++i) {
3151       ap->payload[i] = (StgClosure *)*sp++;
3152     }
3153     
3154     switch (get_itbl(su)->type) {
3155       
3156     case UPDATE_FRAME:
3157       {
3158         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3159         TICK_ALLOC_UP_THK(words+1,0);
3160         
3161         IF_DEBUG(scheduler,
3162                  fprintf(stderr,  "scheduler: Updating ");
3163                  printPtr((P_)su->updatee); 
3164                  fprintf(stderr,  " with ");
3165                  printObj((StgClosure *)ap);
3166                  );
3167         
3168         /* Replace the updatee with an indirection - happily
3169          * this will also wake up any threads currently
3170          * waiting on the result.
3171          *
3172          * Warning: if we're in a loop, more than one update frame on
3173          * the stack may point to the same object.  Be careful not to
3174          * overwrite an IND_OLDGEN in this case, because we'll screw
3175          * up the mutable lists.  To be on the safe side, don't
3176          * overwrite any kind of indirection at all.  See also
3177          * threadSqueezeStack in GC.c, where we have to make a similar
3178          * check.
3179          */
3180         if (!closure_IND(su->updatee)) {
3181             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3182         }
3183         su = su->link;
3184         sp += sizeofW(StgUpdateFrame) -1;
3185         sp[0] = (W_)ap; /* push onto stack */
3186         break;
3187       }
3188
3189     case CATCH_FRAME:
3190       {
3191         StgCatchFrame *cf = (StgCatchFrame *)su;
3192         StgClosure* o;
3193         
3194         /* We want a PAP, not an AP_UPD.  Fortunately, the
3195          * layout's the same.
3196          */
3197         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3198         TICK_ALLOC_UPD_PAP(words+1,0);
3199         
3200         /* now build o = FUN(catch,ap,handler) */
3201         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3202         TICK_ALLOC_FUN(2,0);
3203         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3204         o->payload[0] = (StgClosure *)ap;
3205         o->payload[1] = cf->handler;
3206         
3207         IF_DEBUG(scheduler,
3208                  fprintf(stderr,  "scheduler: Built ");
3209                  printObj((StgClosure *)o);
3210                  );
3211         
3212         /* pop the old handler and put o on the stack */
3213         su = cf->link;
3214         sp += sizeofW(StgCatchFrame) - 1;
3215         sp[0] = (W_)o;
3216         break;
3217       }
3218       
3219     case SEQ_FRAME:
3220       {
3221         StgSeqFrame *sf = (StgSeqFrame *)su;
3222         StgClosure* o;
3223         
3224         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3225         TICK_ALLOC_UPD_PAP(words+1,0);
3226         
3227         /* now build o = FUN(seq,ap) */
3228         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3229         TICK_ALLOC_SE_THK(1,0);
3230         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3231         o->payload[0] = (StgClosure *)ap;
3232         
3233         IF_DEBUG(scheduler,
3234                  fprintf(stderr,  "scheduler: Built ");
3235                  printObj((StgClosure *)o);
3236                  );
3237         
3238         /* pop the old handler and put o on the stack */
3239         su = sf->link;
3240         sp += sizeofW(StgSeqFrame) - 1;
3241         sp[0] = (W_)o;
3242         break;
3243       }
3244       
3245     case STOP_FRAME:
3246       /* We've stripped the entire stack, the thread is now dead. */
3247       sp += sizeofW(StgStopFrame) - 1;
3248       sp[0] = (W_)exception;    /* save the exception */
3249       tso->what_next = ThreadKilled;
3250       tso->su = (StgUpdateFrame *)(sp+1);
3251       tso->sp = sp;
3252       return;
3253
3254     default:
3255       barf("raiseAsync");
3256     }
3257   }
3258   barf("raiseAsync");
3259 }
3260
3261 /* -----------------------------------------------------------------------------
3262    resurrectThreads is called after garbage collection on the list of
3263    threads found to be garbage.  Each of these threads will be woken
3264    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3265    on an MVar, or NonTermination if the thread was blocked on a Black
3266    Hole.
3267    -------------------------------------------------------------------------- */
3268
3269 void
3270 resurrectThreads( StgTSO *threads )
3271 {
3272   StgTSO *tso, *next;
3273
3274   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3275     next = tso->global_link;
3276     tso->global_link = all_threads;
3277     all_threads = tso;
3278     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3279
3280     switch (tso->why_blocked) {
3281     case BlockedOnMVar:
3282     case BlockedOnException:
3283       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3284       break;
3285     case BlockedOnBlackHole:
3286       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3287       break;
3288     case NotBlocked:
3289       /* This might happen if the thread was blocked on a black hole
3290        * belonging to a thread that we've just woken up (raiseAsync
3291        * can wake up threads, remember...).
3292        */
3293       continue;
3294     default:
3295       barf("resurrectThreads: thread blocked in a strange way");
3296     }
3297   }
3298 }
3299
3300 /* -----------------------------------------------------------------------------
3301  * Blackhole detection: if we reach a deadlock, test whether any
3302  * threads are blocked on themselves.  Any threads which are found to
3303  * be self-blocked get sent a NonTermination exception.
3304  *
3305  * This is only done in a deadlock situation in order to avoid
3306  * performance overhead in the normal case.
3307  * -------------------------------------------------------------------------- */
3308
3309 static void
3310 detectBlackHoles( void )
3311 {
3312     StgTSO *t = all_threads;
3313     StgUpdateFrame *frame;
3314     StgClosure *blocked_on;
3315
3316     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3317
3318         while (t->what_next == ThreadRelocated) {
3319             t = t->link;
3320             ASSERT(get_itbl(t)->type == TSO);
3321         }
3322       
3323         if (t->why_blocked != BlockedOnBlackHole) {
3324             continue;
3325         }
3326
3327         blocked_on = t->block_info.closure;
3328
3329         for (frame = t->su; ; frame = frame->link) {
3330             switch (get_itbl(frame)->type) {
3331
3332             case UPDATE_FRAME:
3333                 if (frame->updatee == blocked_on) {
3334                     /* We are blocking on one of our own computations, so
3335                      * send this thread the NonTermination exception.  
3336                      */
3337                     IF_DEBUG(scheduler, 
3338                              sched_belch("thread %d is blocked on itself", t->id));
3339                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3340                     goto done;
3341                 }
3342                 else {
3343                     continue;
3344                 }
3345
3346             case CATCH_FRAME:
3347             case SEQ_FRAME:
3348                 continue;
3349                 
3350             case STOP_FRAME:
3351                 break;
3352             }
3353             break;
3354         }
3355
3356     done: ;
3357     }   
3358 }
3359
3360 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3361 //@subsection Debugging Routines
3362
3363 /* -----------------------------------------------------------------------------
3364    Debugging: why is a thread blocked
3365    -------------------------------------------------------------------------- */
3366
3367 #ifdef DEBUG
3368
3369 void
3370 printThreadBlockage(StgTSO *tso)
3371 {
3372   switch (tso->why_blocked) {
3373   case BlockedOnRead:
3374     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3375     break;
3376   case BlockedOnWrite:
3377     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3378     break;
3379   case BlockedOnDelay:
3380     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3381     break;
3382   case BlockedOnMVar:
3383     fprintf(stderr,"is blocked on an MVar");
3384     break;
3385   case BlockedOnException:
3386     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3387             tso->block_info.tso->id);
3388     break;
3389   case BlockedOnBlackHole:
3390     fprintf(stderr,"is blocked on a black hole");
3391     break;
3392   case NotBlocked:
3393     fprintf(stderr,"is not blocked");
3394     break;
3395 #if defined(PAR)
3396   case BlockedOnGA:
3397     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3398             tso->block_info.closure, info_type(tso->block_info.closure));
3399     break;
3400   case BlockedOnGA_NoSend:
3401     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3402             tso->block_info.closure, info_type(tso->block_info.closure));
3403     break;
3404 #endif
3405   default:
3406     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3407          tso->why_blocked, tso->id, tso);
3408   }
3409 }
3410
3411 void
3412 printThreadStatus(StgTSO *tso)
3413 {
3414   switch (tso->what_next) {
3415   case ThreadKilled:
3416     fprintf(stderr,"has been killed");
3417     break;
3418   case ThreadComplete:
3419     fprintf(stderr,"has completed");
3420     break;
3421   default:
3422     printThreadBlockage(tso);
3423   }
3424 }
3425
3426 void
3427 printAllThreads(void)
3428 {
3429   StgTSO *t;
3430
3431 # if defined(GRAN)
3432   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3433   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3434                        time_string, rtsFalse/*no commas!*/);
3435
3436   sched_belch("all threads at [%s]:", time_string);
3437 # elif defined(PAR)
3438   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3439   ullong_format_string(CURRENT_TIME,
3440                        time_string, rtsFalse/*no commas!*/);
3441
3442   sched_belch("all threads at [%s]:", time_string);
3443 # else
3444   sched_belch("all threads:");
3445 # endif
3446
3447   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3448     fprintf(stderr, "\tthread %d ", t->id);
3449     printThreadStatus(t);
3450     fprintf(stderr,"\n");
3451   }
3452 }
3453     
3454 /* 
3455    Print a whole blocking queue attached to node (debugging only).
3456 */
3457 //@cindex print_bq
3458 # if defined(PAR)
3459 void 
3460 print_bq (StgClosure *node)
3461 {
3462   StgBlockingQueueElement *bqe;
3463   StgTSO *tso;
3464   rtsBool end;
3465
3466   fprintf(stderr,"## BQ of closure %p (%s): ",
3467           node, info_type(node));
3468
3469   /* should cover all closures that may have a blocking queue */
3470   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3471          get_itbl(node)->type == FETCH_ME_BQ ||
3472          get_itbl(node)->type == RBH ||
3473          get_itbl(node)->type == MVAR);
3474     
3475   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3476
3477   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3478 }
3479
3480 /* 
3481    Print a whole blocking queue starting with the element bqe.
3482 */
3483 void 
3484 print_bqe (StgBlockingQueueElement *bqe)
3485 {
3486   rtsBool end;
3487
3488   /* 
3489      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3490   */
3491   for (end = (bqe==END_BQ_QUEUE);
3492        !end; // iterate until bqe points to a CONSTR
3493        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3494        bqe = end ? END_BQ_QUEUE : bqe->link) {
3495     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3496     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3497     /* types of closures that may appear in a blocking queue */
3498     ASSERT(get_itbl(bqe)->type == TSO ||           
3499            get_itbl(bqe)->type == BLOCKED_FETCH || 
3500            get_itbl(bqe)->type == CONSTR); 
3501     /* only BQs of an RBH end with an RBH_Save closure */
3502     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3503
3504     switch (get_itbl(bqe)->type) {
3505     case TSO:
3506       fprintf(stderr," TSO %u (%x),",
3507               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3508       break;
3509     case BLOCKED_FETCH:
3510       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3511               ((StgBlockedFetch *)bqe)->node, 
3512               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3513               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3514               ((StgBlockedFetch *)bqe)->ga.weight);
3515       break;
3516     case CONSTR:
3517       fprintf(stderr," %s (IP %p),",
3518               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3519                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3520                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3521                "RBH_Save_?"), get_itbl(bqe));
3522       break;
3523     default:
3524       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3525            info_type((StgClosure *)bqe)); // , node, info_type(node));
3526       break;
3527     }
3528   } /* for */
3529   fputc('\n', stderr);
3530 }
3531 # elif defined(GRAN)
3532 void 
3533 print_bq (StgClosure *node)
3534 {
3535   StgBlockingQueueElement *bqe;
3536   PEs node_loc, tso_loc;
3537   rtsBool end;
3538
3539   /* should cover all closures that may have a blocking queue */
3540   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3541          get_itbl(node)->type == FETCH_ME_BQ ||
3542          get_itbl(node)->type == RBH);
3543     
3544   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3545   node_loc = where_is(node);
3546
3547   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3548           node, info_type(node), node_loc);
3549
3550   /* 
3551      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3552   */
3553   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3554        !end; // iterate until bqe points to a CONSTR
3555        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3556     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3557     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3558     /* types of closures that may appear in a blocking queue */
3559     ASSERT(get_itbl(bqe)->type == TSO ||           
3560            get_itbl(bqe)->type == CONSTR); 
3561     /* only BQs of an RBH end with an RBH_Save closure */
3562     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3563
3564     tso_loc = where_is((StgClosure *)bqe);
3565     switch (get_itbl(bqe)->type) {
3566     case TSO:
3567       fprintf(stderr," TSO %d (%p) on [PE %d],",
3568               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3569       break;
3570     case CONSTR:
3571       fprintf(stderr," %s (IP %p),",
3572               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3573                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3574                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3575                "RBH_Save_?"), get_itbl(bqe));
3576       break;
3577     default:
3578       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3579            info_type((StgClosure *)bqe), node, info_type(node));
3580       break;
3581     }
3582   } /* for */
3583   fputc('\n', stderr);
3584 }
3585 #else
3586 /* 
3587    Nice and easy: only TSOs on the blocking queue
3588 */
3589 void 
3590 print_bq (StgClosure *node)
3591 {
3592   StgTSO *tso;
3593
3594   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3595   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3596        tso != END_TSO_QUEUE; 
3597        tso=tso->link) {
3598     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3599     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3600     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3601   }
3602   fputc('\n', stderr);
3603 }
3604 # endif
3605
3606 #if defined(PAR)
3607 static nat
3608 run_queue_len(void)
3609 {
3610   nat i;
3611   StgTSO *tso;
3612
3613   for (i=0, tso=run_queue_hd; 
3614        tso != END_TSO_QUEUE;
3615        i++, tso=tso->link)
3616     /* nothing */
3617
3618   return i;
3619 }
3620 #endif
3621
3622 static void
3623 sched_belch(char *s, ...)
3624 {
3625   va_list ap;
3626   va_start(ap,s);
3627 #ifdef SMP
3628   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3629 #elif defined(PAR)
3630   fprintf(stderr, "== ");
3631 #else
3632   fprintf(stderr, "scheduler: ");
3633 #endif
3634   vfprintf(stderr, s, ap);
3635   fprintf(stderr, "\n");
3636 }
3637
3638 #endif /* DEBUG */
3639
3640
3641 //@node Index,  , Debugging Routines, Main scheduling code
3642 //@subsection Index
3643
3644 //@index
3645 //* MainRegTable::  @cindex\s-+MainRegTable
3646 //* StgMainThread::  @cindex\s-+StgMainThread
3647 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3648 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3649 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3650 //* context_switch::  @cindex\s-+context_switch
3651 //* createThread::  @cindex\s-+createThread
3652 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3653 //* initScheduler::  @cindex\s-+initScheduler
3654 //* interrupted::  @cindex\s-+interrupted
3655 //* next_thread_id::  @cindex\s-+next_thread_id
3656 //* print_bq::  @cindex\s-+print_bq
3657 //* run_queue_hd::  @cindex\s-+run_queue_hd
3658 //* run_queue_tl::  @cindex\s-+run_queue_tl
3659 //* sched_mutex::  @cindex\s-+sched_mutex
3660 //* schedule::  @cindex\s-+schedule
3661 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3662 //* term_mutex::  @cindex\s-+term_mutex
3663 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3664 //@end index