[project @ 2002-01-15 05:39:14 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.110 2001/12/18 12:33:45 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "PosixSource.h"
78 #include "Rts.h"
79 #include "SchedAPI.h"
80 #include "RtsUtils.h"
81 #include "RtsFlags.h"
82 #include "Storage.h"
83 #include "StgRun.h"
84 #include "StgStartup.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #ifdef PROFILING
99 #include "Proftimer.h"
100 #include "ProfHeap.h"
101 #endif
102 #if defined(GRAN) || defined(PAR)
103 # include "GranSimRts.h"
104 # include "GranSim.h"
105 # include "ParallelRts.h"
106 # include "Parallel.h"
107 # include "ParallelDebug.h"
108 # include "FetchMe.h"
109 # include "HLC.h"
110 #endif
111 #include "Sparks.h"
112
113 #include <stdarg.h>
114
115 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
116 //@subsection Variables and Data structures
117
118 /* Main threads:
119  *
120  * These are the threads which clients have requested that we run.  
121  *
122  * In an SMP build, we might have several concurrent clients all
123  * waiting for results, and each one will wait on a condition variable
124  * until the result is available.
125  *
126  * In non-SMP, clients are strictly nested: the first client calls
127  * into the RTS, which might call out again to C with a _ccall_GC, and
128  * eventually re-enter the RTS.
129  *
130  * Main threads information is kept in a linked list:
131  */
132 //@cindex StgMainThread
133 typedef struct StgMainThread_ {
134   StgTSO *         tso;
135   SchedulerStatus  stat;
136   StgClosure **    ret;
137 #ifdef SMP
138   pthread_cond_t wakeup;
139 #endif
140   struct StgMainThread_ *link;
141 } StgMainThread;
142
143 /* Main thread queue.
144  * Locks required: sched_mutex.
145  */
146 static StgMainThread *main_threads;
147
148 /* Thread queues.
149  * Locks required: sched_mutex.
150  */
151 #if defined(GRAN)
152
153 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
154 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
155
156 /* 
157    In GranSim we have a runable and a blocked queue for each processor.
158    In order to minimise code changes new arrays run_queue_hds/tls
159    are created. run_queue_hd is then a short cut (macro) for
160    run_queue_hds[CurrentProc] (see GranSim.h).
161    -- HWL
162 */
163 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
164 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
165 StgTSO *ccalling_threadss[MAX_PROC];
166 /* We use the same global list of threads (all_threads) in GranSim as in
167    the std RTS (i.e. we are cheating). However, we don't use this list in
168    the GranSim specific code at the moment (so we are only potentially
169    cheating).  */
170
171 #else /* !GRAN */
172
173 StgTSO *run_queue_hd, *run_queue_tl;
174 StgTSO *blocked_queue_hd, *blocked_queue_tl;
175 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
176
177 #endif
178
179 /* Linked list of all threads.
180  * Used for detecting garbage collected threads.
181  */
182 StgTSO *all_threads;
183
184 /* Threads suspended in _ccall_GC.
185  */
186 static StgTSO *suspended_ccalling_threads;
187
188 static StgTSO *threadStackOverflow(StgTSO *tso);
189
190 /* KH: The following two flags are shared memory locations.  There is no need
191        to lock them, since they are only unset at the end of a scheduler
192        operation.
193 */
194
195 /* flag set by signal handler to precipitate a context switch */
196 //@cindex context_switch
197 nat context_switch;
198
199 /* if this flag is set as well, give up execution */
200 //@cindex interrupted
201 rtsBool interrupted;
202
203 /* Next thread ID to allocate.
204  * Locks required: sched_mutex
205  */
206 //@cindex next_thread_id
207 StgThreadID next_thread_id = 1;
208
209 /*
210  * Pointers to the state of the current thread.
211  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
212  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
213  */
214  
215 /* The smallest stack size that makes any sense is:
216  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
217  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
218  *  + 1                       (the realworld token for an IO thread)
219  *  + 1                       (the closure to enter)
220  *
221  * A thread with this stack will bomb immediately with a stack
222  * overflow, which will increase its stack size.  
223  */
224
225 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
226
227 /* Free capability list.
228  * Locks required: sched_mutex.
229  */
230 #ifdef SMP
231 Capability *free_capabilities; /* Available capabilities for running threads */
232 nat n_free_capabilities;       /* total number of available capabilities */
233 #else
234 Capability MainCapability;     /* for non-SMP, we have one global capability */
235 #endif
236
237 #if defined(GRAN)
238 StgTSO *CurrentTSO;
239 #endif
240
241 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
242  *  exists - earlier gccs apparently didn't.
243  *  -= chak
244  */
245 StgTSO dummy_tso;
246
247 rtsBool ready_to_gc;
248
249 /* All our current task ids, saved in case we need to kill them later.
250  */
251 #ifdef SMP
252 //@cindex task_ids
253 task_info *task_ids;
254 #endif
255
256 void            addToBlockedQueue ( StgTSO *tso );
257
258 static void     schedule          ( void );
259        void     interruptStgRts   ( void );
260 #if defined(GRAN)
261 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
262 #else
263 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
264 #endif
265
266 static void     detectBlackHoles  ( void );
267
268 #ifdef DEBUG
269 static void sched_belch(char *s, ...);
270 #endif
271
272 #ifdef SMP
273 //@cindex sched_mutex
274 //@cindex term_mutex
275 //@cindex thread_ready_cond
276 //@cindex gc_pending_cond
277 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
278 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
279 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
280 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
281
282 nat await_death;
283 #endif
284
285 #if defined(PAR)
286 StgTSO *LastTSO;
287 rtsTime TimeOfLastYield;
288 rtsBool emitSchedule = rtsTrue;
289 #endif
290
291 #if DEBUG
292 char *whatNext_strs[] = {
293   "ThreadEnterGHC",
294   "ThreadRunGHC",
295   "ThreadEnterInterp",
296   "ThreadKilled",
297   "ThreadComplete"
298 };
299
300 char *threadReturnCode_strs[] = {
301   "HeapOverflow",                       /* might also be StackOverflow */
302   "StackOverflow",
303   "ThreadYielding",
304   "ThreadBlocked",
305   "ThreadFinished"
306 };
307 #endif
308
309 #ifdef PAR
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);  
312 #endif
313
314 /*
315  * The thread state for the main thread.
316 // ToDo: check whether not needed any more
317 StgTSO   *MainTSO;
318  */
319
320 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
321 //@subsection Main scheduling loop
322
323 /* ---------------------------------------------------------------------------
324    Main scheduling loop.
325
326    We use round-robin scheduling, each thread returning to the
327    scheduler loop when one of these conditions is detected:
328
329       * out of heap space
330       * timer expires (thread yields)
331       * thread blocks
332       * thread ends
333       * stack overflow
334
335    Locking notes:  we acquire the scheduler lock once at the beginning
336    of the scheduler loop, and release it when
337     
338       * running a thread, or
339       * waiting for work, or
340       * waiting for a GC to complete.
341
342    GRAN version:
343      In a GranSim setup this loop iterates over the global event queue.
344      This revolves around the global event queue, which determines what 
345      to do next. Therefore, it's more complicated than either the 
346      concurrent or the parallel (GUM) setup.
347
348    GUM version:
349      GUM iterates over incoming messages.
350      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
351      and sends out a fish whenever it has nothing to do; in-between
352      doing the actual reductions (shared code below) it processes the
353      incoming messages and deals with delayed operations 
354      (see PendingFetches).
355      This is not the ugliest code you could imagine, but it's bloody close.
356
357    ------------------------------------------------------------------------ */
358 //@cindex schedule
359 static void
360 schedule( void )
361 {
362   StgTSO *t;
363   Capability *cap;
364   StgThreadReturnCode ret;
365 #if defined(GRAN)
366   rtsEvent *event;
367 #elif defined(PAR)
368   StgSparkPool *pool;
369   rtsSpark spark;
370   StgTSO *tso;
371   GlobalTaskId pe;
372   rtsBool receivedFinish = rtsFalse;
373 # if defined(DEBUG)
374   nat tp_size, sp_size; // stats only
375 # endif
376 #endif
377   rtsBool was_interrupted = rtsFalse;
378   
379   ACQUIRE_LOCK(&sched_mutex);
380
381 #if defined(GRAN)
382
383   /* set up first event to get things going */
384   /* ToDo: assign costs for system setup and init MainTSO ! */
385   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
386             ContinueThread, 
387             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
388
389   IF_DEBUG(gran,
390            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391            G_TSO(CurrentTSO, 5));
392
393   if (RtsFlags.GranFlags.Light) {
394     /* Save current time; GranSim Light only */
395     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
396   }      
397
398   event = get_next_event();
399
400   while (event!=(rtsEvent*)NULL) {
401     /* Choose the processor with the next event */
402     CurrentProc = event->proc;
403     CurrentTSO = event->tso;
404
405 #elif defined(PAR)
406
407   while (!receivedFinish) {    /* set by processMessages */
408                                /* when receiving PP_FINISH message         */ 
409 #else
410
411   while (1) {
412
413 #endif
414
415     IF_DEBUG(scheduler, printAllThreads());
416
417     /* If we're interrupted (the user pressed ^C, or some other
418      * termination condition occurred), kill all the currently running
419      * threads.
420      */
421     if (interrupted) {
422       IF_DEBUG(scheduler, sched_belch("interrupted"));
423       deleteAllThreads();
424       interrupted = rtsFalse;
425       was_interrupted = rtsTrue;
426     }
427
428     /* Go through the list of main threads and wake up any
429      * clients whose computations have finished.  ToDo: this
430      * should be done more efficiently without a linear scan
431      * of the main threads list, somehow...
432      */
433 #ifdef SMP
434     { 
435       StgMainThread *m, **prev;
436       prev = &main_threads;
437       for (m = main_threads; m != NULL; m = m->link) {
438         switch (m->tso->what_next) {
439         case ThreadComplete:
440           if (m->ret) {
441             *(m->ret) = (StgClosure *)m->tso->sp[0];
442           }
443           *prev = m->link;
444           m->stat = Success;
445           pthread_cond_broadcast(&m->wakeup);
446           break;
447         case ThreadKilled:
448           if (m->ret) *(m->ret) = NULL;
449           *prev = m->link;
450           if (was_interrupted) {
451             m->stat = Interrupted;
452           } else {
453             m->stat = Killed;
454           }
455           pthread_cond_broadcast(&m->wakeup);
456           break;
457         default:
458           break;
459         }
460       }
461     }
462
463 #else // not SMP
464
465 # if defined(PAR)
466     /* in GUM do this only on the Main PE */
467     if (IAmMainThread)
468 # endif
469     /* If our main thread has finished or been killed, return.
470      */
471     {
472       StgMainThread *m = main_threads;
473       if (m->tso->what_next == ThreadComplete
474           || m->tso->what_next == ThreadKilled) {
475         main_threads = main_threads->link;
476         if (m->tso->what_next == ThreadComplete) {
477           /* we finished successfully, fill in the return value */
478           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
479           m->stat = Success;
480           return;
481         } else {
482           if (m->ret) { *(m->ret) = NULL; };
483           if (was_interrupted) {
484             m->stat = Interrupted;
485           } else {
486             m->stat = Killed;
487           }
488           return;
489         }
490       }
491     }
492 #endif
493
494     /* Top up the run queue from our spark pool.  We try to make the
495      * number of threads in the run queue equal to the number of
496      * free capabilities.
497      */
498 #if defined(SMP)
499     {
500       nat n = n_free_capabilities;
501       StgTSO *tso = run_queue_hd;
502
503       /* Count the run queue */
504       while (n > 0 && tso != END_TSO_QUEUE) {
505         tso = tso->link;
506         n--;
507       }
508
509       for (; n > 0; n--) {
510         StgClosure *spark;
511         spark = findSpark(rtsFalse);
512         if (spark == NULL) {
513           break; /* no more sparks in the pool */
514         } else {
515           /* I'd prefer this to be done in activateSpark -- HWL */
516           /* tricky - it needs to hold the scheduler lock and
517            * not try to re-acquire it -- SDM */
518           createSparkThread(spark);       
519           IF_DEBUG(scheduler,
520                    sched_belch("==^^ turning spark of closure %p into a thread",
521                                (StgClosure *)spark));
522         }
523       }
524       /* We need to wake up the other tasks if we just created some
525        * work for them.
526        */
527       if (n_free_capabilities - n > 1) {
528           pthread_cond_signal(&thread_ready_cond);
529       }
530     }
531 #endif // SMP
532
533     /* check for signals each time around the scheduler */
534 #ifndef mingw32_TARGET_OS
535     if (signals_pending()) {
536       startSignalHandlers();
537     }
538 #endif
539
540     /* Check whether any waiting threads need to be woken up.  If the
541      * run queue is empty, and there are no other tasks running, we
542      * can wait indefinitely for something to happen.
543      * ToDo: what if another client comes along & requests another
544      * main thread?
545      */
546     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
547       awaitEvent(
548            (run_queue_hd == END_TSO_QUEUE)
549 #ifdef SMP
550         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
551 #endif
552         );
553     }
554     /* we can be interrupted while waiting for I/O... */
555     if (interrupted) continue;
556
557     /* 
558      * Detect deadlock: when we have no threads to run, there are no
559      * threads waiting on I/O or sleeping, and all the other tasks are
560      * waiting for work, we must have a deadlock of some description.
561      *
562      * We first try to find threads blocked on themselves (ie. black
563      * holes), and generate NonTermination exceptions where necessary.
564      *
565      * If no threads are black holed, we have a deadlock situation, so
566      * inform all the main threads.
567      */
568 #ifndef PAR
569     if (blocked_queue_hd == END_TSO_QUEUE
570         && run_queue_hd == END_TSO_QUEUE
571         && sleeping_queue == END_TSO_QUEUE
572 #ifdef SMP
573         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
574 #endif
575         )
576     {
577         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
578         GarbageCollect(GetRoots,rtsTrue);
579         if (blocked_queue_hd == END_TSO_QUEUE
580             && run_queue_hd == END_TSO_QUEUE
581             && sleeping_queue == END_TSO_QUEUE) {
582             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
583             detectBlackHoles();
584             if (run_queue_hd == END_TSO_QUEUE) {
585                 StgMainThread *m = main_threads;
586 #ifdef SMP
587                 for (; m != NULL; m = m->link) {
588                     deleteThread(m->tso);
589                     m->ret = NULL;
590                     m->stat = Deadlock;
591                     pthread_cond_broadcast(&m->wakeup);
592                 }
593                 main_threads = NULL;
594 #else
595                 deleteThread(m->tso);
596                 m->ret = NULL;
597                 m->stat = Deadlock;
598                 main_threads = m->link;
599                 return;
600 #endif
601             }
602         }
603     }
604 #elif defined(PAR)
605     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
606 #endif
607
608 #ifdef SMP
609     /* If there's a GC pending, don't do anything until it has
610      * completed.
611      */
612     if (ready_to_gc) {
613       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
614       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
615     }
616     
617     /* block until we've got a thread on the run queue and a free
618      * capability.
619      */
620     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
621       IF_DEBUG(scheduler, sched_belch("waiting for work"));
622       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
623       IF_DEBUG(scheduler, sched_belch("work now available"));
624     }
625 #endif
626
627 #if defined(GRAN)
628
629     if (RtsFlags.GranFlags.Light)
630       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
631
632     /* adjust time based on time-stamp */
633     if (event->time > CurrentTime[CurrentProc] &&
634         event->evttype != ContinueThread)
635       CurrentTime[CurrentProc] = event->time;
636     
637     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
638     if (!RtsFlags.GranFlags.Light)
639       handleIdlePEs();
640
641     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
642
643     /* main event dispatcher in GranSim */
644     switch (event->evttype) {
645       /* Should just be continuing execution */
646     case ContinueThread:
647       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
648       /* ToDo: check assertion
649       ASSERT(run_queue_hd != (StgTSO*)NULL &&
650              run_queue_hd != END_TSO_QUEUE);
651       */
652       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
653       if (!RtsFlags.GranFlags.DoAsyncFetch &&
654           procStatus[CurrentProc]==Fetching) {
655         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
656               CurrentTSO->id, CurrentTSO, CurrentProc);
657         goto next_thread;
658       } 
659       /* Ignore ContinueThreads for completed threads */
660       if (CurrentTSO->what_next == ThreadComplete) {
661         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
662               CurrentTSO->id, CurrentTSO, CurrentProc);
663         goto next_thread;
664       } 
665       /* Ignore ContinueThreads for threads that are being migrated */
666       if (PROCS(CurrentTSO)==Nowhere) { 
667         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
668               CurrentTSO->id, CurrentTSO, CurrentProc);
669         goto next_thread;
670       }
671       /* The thread should be at the beginning of the run queue */
672       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
673         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
674               CurrentTSO->id, CurrentTSO, CurrentProc);
675         break; // run the thread anyway
676       }
677       /*
678       new_event(proc, proc, CurrentTime[proc],
679                 FindWork,
680                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
681       goto next_thread; 
682       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
683       break; // now actually run the thread; DaH Qu'vam yImuHbej 
684
685     case FetchNode:
686       do_the_fetchnode(event);
687       goto next_thread;             /* handle next event in event queue  */
688       
689     case GlobalBlock:
690       do_the_globalblock(event);
691       goto next_thread;             /* handle next event in event queue  */
692       
693     case FetchReply:
694       do_the_fetchreply(event);
695       goto next_thread;             /* handle next event in event queue  */
696       
697     case UnblockThread:   /* Move from the blocked queue to the tail of */
698       do_the_unblock(event);
699       goto next_thread;             /* handle next event in event queue  */
700       
701     case ResumeThread:  /* Move from the blocked queue to the tail of */
702       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
703       event->tso->gran.blocktime += 
704         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
705       do_the_startthread(event);
706       goto next_thread;             /* handle next event in event queue  */
707       
708     case StartThread:
709       do_the_startthread(event);
710       goto next_thread;             /* handle next event in event queue  */
711       
712     case MoveThread:
713       do_the_movethread(event);
714       goto next_thread;             /* handle next event in event queue  */
715       
716     case MoveSpark:
717       do_the_movespark(event);
718       goto next_thread;             /* handle next event in event queue  */
719       
720     case FindWork:
721       do_the_findwork(event);
722       goto next_thread;             /* handle next event in event queue  */
723       
724     default:
725       barf("Illegal event type %u\n", event->evttype);
726     }  /* switch */
727     
728     /* This point was scheduler_loop in the old RTS */
729
730     IF_DEBUG(gran, belch("GRAN: after main switch"));
731
732     TimeOfLastEvent = CurrentTime[CurrentProc];
733     TimeOfNextEvent = get_time_of_next_event();
734     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
735     // CurrentTSO = ThreadQueueHd;
736
737     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
738                          TimeOfNextEvent));
739
740     if (RtsFlags.GranFlags.Light) 
741       GranSimLight_leave_system(event, &ActiveTSO); 
742
743     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
744
745     IF_DEBUG(gran, 
746              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
747
748     /* in a GranSim setup the TSO stays on the run queue */
749     t = CurrentTSO;
750     /* Take a thread from the run queue. */
751     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
752
753     IF_DEBUG(gran, 
754              fprintf(stderr, "GRAN: About to run current thread, which is\n");
755              G_TSO(t,5));
756
757     context_switch = 0; // turned on via GranYield, checking events and time slice
758
759     IF_DEBUG(gran, 
760              DumpGranEvent(GR_SCHEDULE, t));
761
762     procStatus[CurrentProc] = Busy;
763
764 #elif defined(PAR)
765     if (PendingFetches != END_BF_QUEUE) {
766         processFetches();
767     }
768
769     /* ToDo: phps merge with spark activation above */
770     /* check whether we have local work and send requests if we have none */
771     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
772       /* :-[  no local threads => look out for local sparks */
773       /* the spark pool for the current PE */
774       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
775       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
776           pool->hd < pool->tl) {
777         /* 
778          * ToDo: add GC code check that we really have enough heap afterwards!!
779          * Old comment:
780          * If we're here (no runnable threads) and we have pending
781          * sparks, we must have a space problem.  Get enough space
782          * to turn one of those pending sparks into a
783          * thread... 
784          */
785
786         spark = findSpark(rtsFalse);                /* get a spark */
787         if (spark != (rtsSpark) NULL) {
788           tso = activateSpark(spark);       /* turn the spark into a thread */
789           IF_PAR_DEBUG(schedule,
790                        belch("==== schedule: Created TSO %d (%p); %d threads active",
791                              tso->id, tso, advisory_thread_count));
792
793           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
794             belch("==^^ failed to activate spark");
795             goto next_thread;
796           }               /* otherwise fall through & pick-up new tso */
797         } else {
798           IF_PAR_DEBUG(verbose,
799                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
800                              spark_queue_len(pool)));
801           goto next_thread;
802         }
803       }
804
805       /* If we still have no work we need to send a FISH to get a spark
806          from another PE 
807       */
808       if (EMPTY_RUN_QUEUE()) {
809       /* =8-[  no local sparks => look for work on other PEs */
810         /*
811          * We really have absolutely no work.  Send out a fish
812          * (there may be some out there already), and wait for
813          * something to arrive.  We clearly can't run any threads
814          * until a SCHEDULE or RESUME arrives, and so that's what
815          * we're hoping to see.  (Of course, we still have to
816          * respond to other types of messages.)
817          */
818         TIME now = msTime() /*CURRENT_TIME*/;
819         IF_PAR_DEBUG(verbose, 
820                      belch("--  now=%ld", now));
821         IF_PAR_DEBUG(verbose,
822                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
823                          (last_fish_arrived_at!=0 &&
824                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
825                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
826                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
827                              last_fish_arrived_at,
828                              RtsFlags.ParFlags.fishDelay, now);
829                      });
830         
831         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
832             (last_fish_arrived_at==0 ||
833              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
834           /* outstandingFishes is set in sendFish, processFish;
835              avoid flooding system with fishes via delay */
836           pe = choosePE();
837           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
838                    NEW_FISH_HUNGER);
839
840           // Global statistics: count no. of fishes
841           if (RtsFlags.ParFlags.ParStats.Global &&
842               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
843             globalParStats.tot_fish_mess++;
844           }
845         }
846       
847         receivedFinish = processMessages();
848         goto next_thread;
849       }
850     } else if (PacketsWaiting()) {  /* Look for incoming messages */
851       receivedFinish = processMessages();
852     }
853
854     /* Now we are sure that we have some work available */
855     ASSERT(run_queue_hd != END_TSO_QUEUE);
856
857     /* Take a thread from the run queue, if we have work */
858     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
859     IF_DEBUG(sanity,checkTSO(t));
860
861     /* ToDo: write something to the log-file
862     if (RTSflags.ParFlags.granSimStats && !sameThread)
863         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
864
865     CurrentTSO = t;
866     */
867     /* the spark pool for the current PE */
868     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
869
870     IF_DEBUG(scheduler, 
871              belch("--=^ %d threads, %d sparks on [%#x]", 
872                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
873
874 #if 1
875     if (0 && RtsFlags.ParFlags.ParStats.Full && 
876         t && LastTSO && t->id != LastTSO->id && 
877         LastTSO->why_blocked == NotBlocked && 
878         LastTSO->what_next != ThreadComplete) {
879       // if previously scheduled TSO not blocked we have to record the context switch
880       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
881                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
882     }
883
884     if (RtsFlags.ParFlags.ParStats.Full && 
885         (emitSchedule /* forced emit */ ||
886         (t && LastTSO && t->id != LastTSO->id))) {
887       /* 
888          we are running a different TSO, so write a schedule event to log file
889          NB: If we use fair scheduling we also have to write  a deschedule 
890              event for LastTSO; with unfair scheduling we know that the
891              previous tso has blocked whenever we switch to another tso, so
892              we don't need it in GUM for now
893       */
894       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
895                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
896       emitSchedule = rtsFalse;
897     }
898      
899 #endif
900 #else /* !GRAN && !PAR */
901   
902     /* grab a thread from the run queue
903      */
904     ASSERT(run_queue_hd != END_TSO_QUEUE);
905     t = POP_RUN_QUEUE();
906
907     // Sanity check the thread we're about to run.  This can be
908     // expensive if there is lots of thread switching going on...
909     IF_DEBUG(sanity,checkTSO(t));
910
911 #endif
912     
913     /* grab a capability
914      */
915 #ifdef SMP
916     cap = free_capabilities;
917     free_capabilities = cap->link;
918     n_free_capabilities--;
919 #else
920     cap = &MainCapability;
921 #endif
922
923     cap->r.rCurrentTSO = t;
924     
925     /* context switches are now initiated by the timer signal, unless
926      * the user specified "context switch as often as possible", with
927      * +RTS -C0
928      */
929     if (
930 #ifdef PROFILING
931         RtsFlags.ProfFlags.profileInterval == 0 ||
932 #endif
933         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
934          && (run_queue_hd != END_TSO_QUEUE
935              || blocked_queue_hd != END_TSO_QUEUE
936              || sleeping_queue != END_TSO_QUEUE)))
937         context_switch = 1;
938     else
939         context_switch = 0;
940
941     RELEASE_LOCK(&sched_mutex);
942
943     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
944                               t->id, t, whatNext_strs[t->what_next]));
945
946 #ifdef PROFILING
947     startHeapProfTimer();
948 #endif
949
950     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
951     /* Run the current thread 
952      */
953     switch (cap->r.rCurrentTSO->what_next) {
954     case ThreadKilled:
955     case ThreadComplete:
956         /* Thread already finished, return to scheduler. */
957         ret = ThreadFinished;
958         break;
959     case ThreadEnterGHC:
960         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
961         break;
962     case ThreadRunGHC:
963         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
964         break;
965     case ThreadEnterInterp:
966         ret = interpretBCO(cap);
967         break;
968     default:
969       barf("schedule: invalid what_next field");
970     }
971     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
972     
973     /* Costs for the scheduler are assigned to CCS_SYSTEM */
974 #ifdef PROFILING
975     stopHeapProfTimer();
976     CCCS = CCS_SYSTEM;
977 #endif
978     
979     ACQUIRE_LOCK(&sched_mutex);
980
981 #ifdef SMP
982     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
983 #elif !defined(GRAN) && !defined(PAR)
984     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
985 #endif
986     t = cap->r.rCurrentTSO;
987     
988 #if defined(PAR)
989     /* HACK 675: if the last thread didn't yield, make sure to print a 
990        SCHEDULE event to the log file when StgRunning the next thread, even
991        if it is the same one as before */
992     LastTSO = t; 
993     TimeOfLastYield = CURRENT_TIME;
994 #endif
995
996     switch (ret) {
997     case HeapOverflow:
998 #if defined(GRAN)
999       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1000       globalGranStats.tot_heapover++;
1001 #elif defined(PAR)
1002       globalParStats.tot_heapover++;
1003 #endif
1004
1005       // did the task ask for a large block?
1006       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1007           // if so, get one and push it on the front of the nursery.
1008           bdescr *bd;
1009           nat blocks;
1010           
1011           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1012
1013           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1014                                    t->id, t,
1015                                    whatNext_strs[t->what_next], blocks));
1016
1017           // don't do this if it would push us over the
1018           // alloc_blocks_lim limit; we'll GC first.
1019           if (alloc_blocks + blocks < alloc_blocks_lim) {
1020
1021               alloc_blocks += blocks;
1022               bd = allocGroup( blocks );
1023
1024               // link the new group into the list
1025               bd->link = cap->r.rCurrentNursery;
1026               bd->u.back = cap->r.rCurrentNursery->u.back;
1027               if (cap->r.rCurrentNursery->u.back != NULL) {
1028                   cap->r.rCurrentNursery->u.back->link = bd;
1029               } else {
1030                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1031                          g0s0->blocks == cap->r.rNursery);
1032                   cap->r.rNursery = g0s0->blocks = bd;
1033               }           
1034               cap->r.rCurrentNursery->u.back = bd;
1035
1036               // initialise it as a nursery block
1037               bd->step = g0s0;
1038               bd->gen_no = 0;
1039               bd->flags = 0;
1040               bd->free = bd->start;
1041
1042               // don't forget to update the block count in g0s0.
1043               g0s0->n_blocks += blocks;
1044               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1045
1046               // now update the nursery to point to the new block
1047               cap->r.rCurrentNursery = bd;
1048
1049               // we might be unlucky and have another thread get on the
1050               // run queue before us and steal the large block, but in that
1051               // case the thread will just end up requesting another large
1052               // block.
1053               PUSH_ON_RUN_QUEUE(t);
1054               break;
1055           }
1056       }
1057
1058       /* make all the running tasks block on a condition variable,
1059        * maybe set context_switch and wait till they all pile in,
1060        * then have them wait on a GC condition variable.
1061        */
1062       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1063                                t->id, t, whatNext_strs[t->what_next]));
1064       threadPaused(t);
1065 #if defined(GRAN)
1066       ASSERT(!is_on_queue(t,CurrentProc));
1067 #elif defined(PAR)
1068       /* Currently we emit a DESCHEDULE event before GC in GUM.
1069          ToDo: either add separate event to distinguish SYSTEM time from rest
1070                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1071       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1072         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1073                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1074         emitSchedule = rtsTrue;
1075       }
1076 #endif
1077       
1078       ready_to_gc = rtsTrue;
1079       context_switch = 1;               /* stop other threads ASAP */
1080       PUSH_ON_RUN_QUEUE(t);
1081       /* actual GC is done at the end of the while loop */
1082       break;
1083       
1084     case StackOverflow:
1085 #if defined(GRAN)
1086       IF_DEBUG(gran, 
1087                DumpGranEvent(GR_DESCHEDULE, t));
1088       globalGranStats.tot_stackover++;
1089 #elif defined(PAR)
1090       // IF_DEBUG(par, 
1091       // DumpGranEvent(GR_DESCHEDULE, t);
1092       globalParStats.tot_stackover++;
1093 #endif
1094       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1095                                t->id, t, whatNext_strs[t->what_next]));
1096       /* just adjust the stack for this thread, then pop it back
1097        * on the run queue.
1098        */
1099       threadPaused(t);
1100       { 
1101         StgMainThread *m;
1102         /* enlarge the stack */
1103         StgTSO *new_t = threadStackOverflow(t);
1104         
1105         /* This TSO has moved, so update any pointers to it from the
1106          * main thread stack.  It better not be on any other queues...
1107          * (it shouldn't be).
1108          */
1109         for (m = main_threads; m != NULL; m = m->link) {
1110           if (m->tso == t) {
1111             m->tso = new_t;
1112           }
1113         }
1114         threadPaused(new_t);
1115         PUSH_ON_RUN_QUEUE(new_t);
1116       }
1117       break;
1118
1119     case ThreadYielding:
1120 #if defined(GRAN)
1121       IF_DEBUG(gran, 
1122                DumpGranEvent(GR_DESCHEDULE, t));
1123       globalGranStats.tot_yields++;
1124 #elif defined(PAR)
1125       // IF_DEBUG(par, 
1126       // DumpGranEvent(GR_DESCHEDULE, t);
1127       globalParStats.tot_yields++;
1128 #endif
1129       /* put the thread back on the run queue.  Then, if we're ready to
1130        * GC, check whether this is the last task to stop.  If so, wake
1131        * up the GC thread.  getThread will block during a GC until the
1132        * GC is finished.
1133        */
1134       IF_DEBUG(scheduler,
1135                if (t->what_next == ThreadEnterInterp) {
1136                    /* ToDo: or maybe a timer expired when we were in Hugs?
1137                     * or maybe someone hit ctrl-C
1138                     */
1139                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1140                          t->id, t, whatNext_strs[t->what_next]);
1141                } else {
1142                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1143                          t->id, t, whatNext_strs[t->what_next]);
1144                }
1145                );
1146
1147       threadPaused(t);
1148
1149       IF_DEBUG(sanity,
1150                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1151                checkTSO(t));
1152       ASSERT(t->link == END_TSO_QUEUE);
1153 #if defined(GRAN)
1154       ASSERT(!is_on_queue(t,CurrentProc));
1155
1156       IF_DEBUG(sanity,
1157                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1158                checkThreadQsSanity(rtsTrue));
1159 #endif
1160 #if defined(PAR)
1161       if (RtsFlags.ParFlags.doFairScheduling) { 
1162         /* this does round-robin scheduling; good for concurrency */
1163         APPEND_TO_RUN_QUEUE(t);
1164       } else {
1165         /* this does unfair scheduling; good for parallelism */
1166         PUSH_ON_RUN_QUEUE(t);
1167       }
1168 #else
1169       /* this does round-robin scheduling; good for concurrency */
1170       APPEND_TO_RUN_QUEUE(t);
1171 #endif
1172 #if defined(GRAN)
1173       /* add a ContinueThread event to actually process the thread */
1174       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1175                 ContinueThread,
1176                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1177       IF_GRAN_DEBUG(bq, 
1178                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1179                G_EVENTQ(0);
1180                G_CURR_THREADQ(0));
1181 #endif /* GRAN */
1182       break;
1183       
1184     case ThreadBlocked:
1185 #if defined(GRAN)
1186       IF_DEBUG(scheduler,
1187                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1188                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1189                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1190
1191       // ??? needed; should emit block before
1192       IF_DEBUG(gran, 
1193                DumpGranEvent(GR_DESCHEDULE, t)); 
1194       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1195       /*
1196         ngoq Dogh!
1197       ASSERT(procStatus[CurrentProc]==Busy || 
1198               ((procStatus[CurrentProc]==Fetching) && 
1199               (t->block_info.closure!=(StgClosure*)NULL)));
1200       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1201           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1202             procStatus[CurrentProc]==Fetching)) 
1203         procStatus[CurrentProc] = Idle;
1204       */
1205 #elif defined(PAR)
1206       IF_DEBUG(scheduler,
1207                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1208                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1209       IF_PAR_DEBUG(bq,
1210
1211                    if (t->block_info.closure!=(StgClosure*)NULL) 
1212                      print_bq(t->block_info.closure));
1213
1214       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1215       blockThread(t);
1216
1217       /* whatever we schedule next, we must log that schedule */
1218       emitSchedule = rtsTrue;
1219
1220 #else /* !GRAN */
1221       /* don't need to do anything.  Either the thread is blocked on
1222        * I/O, in which case we'll have called addToBlockedQueue
1223        * previously, or it's blocked on an MVar or Blackhole, in which
1224        * case it'll be on the relevant queue already.
1225        */
1226       IF_DEBUG(scheduler,
1227                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1228                printThreadBlockage(t);
1229                fprintf(stderr, "\n"));
1230
1231       /* Only for dumping event to log file 
1232          ToDo: do I need this in GranSim, too?
1233       blockThread(t);
1234       */
1235 #endif
1236       threadPaused(t);
1237       break;
1238       
1239     case ThreadFinished:
1240       /* Need to check whether this was a main thread, and if so, signal
1241        * the task that started it with the return value.  If we have no
1242        * more main threads, we probably need to stop all the tasks until
1243        * we get a new one.
1244        */
1245       /* We also end up here if the thread kills itself with an
1246        * uncaught exception, see Exception.hc.
1247        */
1248       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1249 #if defined(GRAN)
1250       endThread(t, CurrentProc); // clean-up the thread
1251 #elif defined(PAR)
1252       /* For now all are advisory -- HWL */
1253       //if(t->priority==AdvisoryPriority) ??
1254       advisory_thread_count--;
1255       
1256 # ifdef DIST
1257       if(t->dist.priority==RevalPriority)
1258         FinishReval(t);
1259 # endif
1260       
1261       if (RtsFlags.ParFlags.ParStats.Full &&
1262           !RtsFlags.ParFlags.ParStats.Suppressed) 
1263         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1264 #endif
1265       break;
1266       
1267     default:
1268       barf("schedule: invalid thread return code %d", (int)ret);
1269     }
1270     
1271 #ifdef SMP
1272     cap->link = free_capabilities;
1273     free_capabilities = cap;
1274     n_free_capabilities++;
1275 #endif
1276
1277 #ifdef PROFILING
1278     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1279         GarbageCollect(GetRoots, rtsTrue);
1280         heapCensus();
1281         performHeapProfile = rtsFalse;
1282         ready_to_gc = rtsFalse; // we already GC'd
1283     }
1284 #endif
1285
1286 #ifdef SMP
1287     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1288 #else
1289     if (ready_to_gc) 
1290 #endif
1291       {
1292       /* everybody back, start the GC.
1293        * Could do it in this thread, or signal a condition var
1294        * to do it in another thread.  Either way, we need to
1295        * broadcast on gc_pending_cond afterward.
1296        */
1297 #ifdef SMP
1298       IF_DEBUG(scheduler,sched_belch("doing GC"));
1299 #endif
1300       GarbageCollect(GetRoots,rtsFalse);
1301       ready_to_gc = rtsFalse;
1302 #ifdef SMP
1303       pthread_cond_broadcast(&gc_pending_cond);
1304 #endif
1305 #if defined(GRAN)
1306       /* add a ContinueThread event to continue execution of current thread */
1307       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1308                 ContinueThread,
1309                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1310       IF_GRAN_DEBUG(bq, 
1311                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1312                G_EVENTQ(0);
1313                G_CURR_THREADQ(0));
1314 #endif /* GRAN */
1315     }
1316
1317 #if defined(GRAN)
1318   next_thread:
1319     IF_GRAN_DEBUG(unused,
1320                   print_eventq(EventHd));
1321
1322     event = get_next_event();
1323 #elif defined(PAR)
1324   next_thread:
1325     /* ToDo: wait for next message to arrive rather than busy wait */
1326 #endif /* GRAN */
1327
1328   } /* end of while(1) */
1329
1330   IF_PAR_DEBUG(verbose,
1331                belch("== Leaving schedule() after having received Finish"));
1332 }
1333
1334 /* ---------------------------------------------------------------------------
1335  * deleteAllThreads():  kill all the live threads.
1336  *
1337  * This is used when we catch a user interrupt (^C), before performing
1338  * any necessary cleanups and running finalizers.
1339  * ------------------------------------------------------------------------- */
1340    
1341 void deleteAllThreads ( void )
1342 {
1343   StgTSO* t;
1344   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1345   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1346       deleteThread(t);
1347   }
1348   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1349       deleteThread(t);
1350   }
1351   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1352       deleteThread(t);
1353   }
1354   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1355   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1356   sleeping_queue = END_TSO_QUEUE;
1357 }
1358
1359 /* startThread and  insertThread are now in GranSim.c -- HWL */
1360
1361 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1362 //@subsection Suspend and Resume
1363
1364 /* ---------------------------------------------------------------------------
1365  * Suspending & resuming Haskell threads.
1366  * 
1367  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1368  * its capability before calling the C function.  This allows another
1369  * task to pick up the capability and carry on running Haskell
1370  * threads.  It also means that if the C call blocks, it won't lock
1371  * the whole system.
1372  *
1373  * The Haskell thread making the C call is put to sleep for the
1374  * duration of the call, on the susepended_ccalling_threads queue.  We
1375  * give out a token to the task, which it can use to resume the thread
1376  * on return from the C function.
1377  * ------------------------------------------------------------------------- */
1378    
1379 StgInt
1380 suspendThread( StgRegTable *reg )
1381 {
1382   nat tok;
1383   Capability *cap;
1384
1385   // assume that *reg is a pointer to the StgRegTable part of a Capability
1386   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1387
1388   ACQUIRE_LOCK(&sched_mutex);
1389
1390   IF_DEBUG(scheduler,
1391            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1392
1393   threadPaused(cap->r.rCurrentTSO);
1394   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1395   suspended_ccalling_threads = cap->r.rCurrentTSO;
1396
1397   /* Use the thread ID as the token; it should be unique */
1398   tok = cap->r.rCurrentTSO->id;
1399
1400 #ifdef SMP
1401   cap->link = free_capabilities;
1402   free_capabilities = cap;
1403   n_free_capabilities++;
1404 #endif
1405
1406   RELEASE_LOCK(&sched_mutex);
1407   return tok; 
1408 }
1409
1410 StgRegTable *
1411 resumeThread( StgInt tok )
1412 {
1413   StgTSO *tso, **prev;
1414   Capability *cap;
1415
1416   ACQUIRE_LOCK(&sched_mutex);
1417
1418   prev = &suspended_ccalling_threads;
1419   for (tso = suspended_ccalling_threads; 
1420        tso != END_TSO_QUEUE; 
1421        prev = &tso->link, tso = tso->link) {
1422     if (tso->id == (StgThreadID)tok) {
1423       *prev = tso->link;
1424       break;
1425     }
1426   }
1427   if (tso == END_TSO_QUEUE) {
1428     barf("resumeThread: thread not found");
1429   }
1430   tso->link = END_TSO_QUEUE;
1431
1432 #ifdef SMP
1433   while (free_capabilities == NULL) {
1434     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1435     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1436     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1437   }
1438   cap = free_capabilities;
1439   free_capabilities = cap->link;
1440   n_free_capabilities--;
1441 #else  
1442   cap = &MainCapability;
1443 #endif
1444
1445   cap->r.rCurrentTSO = tso;
1446
1447   RELEASE_LOCK(&sched_mutex);
1448   return &cap->r;
1449 }
1450
1451
1452 /* ---------------------------------------------------------------------------
1453  * Static functions
1454  * ------------------------------------------------------------------------ */
1455 static void unblockThread(StgTSO *tso);
1456
1457 /* ---------------------------------------------------------------------------
1458  * Comparing Thread ids.
1459  *
1460  * This is used from STG land in the implementation of the
1461  * instances of Eq/Ord for ThreadIds.
1462  * ------------------------------------------------------------------------ */
1463
1464 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1465
1466   StgThreadID id1 = tso1->id; 
1467   StgThreadID id2 = tso2->id;
1468  
1469   if (id1 < id2) return (-1);
1470   if (id1 > id2) return 1;
1471   return 0;
1472 }
1473
1474 /* ---------------------------------------------------------------------------
1475  * Fetching the ThreadID from an StgTSO.
1476  *
1477  * This is used in the implementation of Show for ThreadIds.
1478  * ------------------------------------------------------------------------ */
1479 int rts_getThreadId(const StgTSO *tso) 
1480 {
1481   return tso->id;
1482 }
1483
1484 /* ---------------------------------------------------------------------------
1485    Create a new thread.
1486
1487    The new thread starts with the given stack size.  Before the
1488    scheduler can run, however, this thread needs to have a closure
1489    (and possibly some arguments) pushed on its stack.  See
1490    pushClosure() in Schedule.h.
1491
1492    createGenThread() and createIOThread() (in SchedAPI.h) are
1493    convenient packaged versions of this function.
1494
1495    currently pri (priority) is only used in a GRAN setup -- HWL
1496    ------------------------------------------------------------------------ */
1497 //@cindex createThread
1498 #if defined(GRAN)
1499 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1500 StgTSO *
1501 createThread(nat stack_size, StgInt pri)
1502 {
1503   return createThread_(stack_size, rtsFalse, pri);
1504 }
1505
1506 static StgTSO *
1507 createThread_(nat size, rtsBool have_lock, StgInt pri)
1508 {
1509 #else
1510 StgTSO *
1511 createThread(nat stack_size)
1512 {
1513   return createThread_(stack_size, rtsFalse);
1514 }
1515
1516 static StgTSO *
1517 createThread_(nat size, rtsBool have_lock)
1518 {
1519 #endif
1520
1521     StgTSO *tso;
1522     nat stack_size;
1523
1524     /* First check whether we should create a thread at all */
1525 #if defined(PAR)
1526   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1527   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1528     threadsIgnored++;
1529     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1530           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1531     return END_TSO_QUEUE;
1532   }
1533   threadsCreated++;
1534 #endif
1535
1536 #if defined(GRAN)
1537   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1538 #endif
1539
1540   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1541
1542   /* catch ridiculously small stack sizes */
1543   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1544     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1545   }
1546
1547   stack_size = size - TSO_STRUCT_SIZEW;
1548
1549   tso = (StgTSO *)allocate(size);
1550   TICK_ALLOC_TSO(stack_size, 0);
1551
1552   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1553 #if defined(GRAN)
1554   SET_GRAN_HDR(tso, ThisPE);
1555 #endif
1556   tso->what_next     = ThreadEnterGHC;
1557
1558   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1559    * protect the increment operation on next_thread_id.
1560    * In future, we could use an atomic increment instead.
1561    */
1562   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1563   tso->id = next_thread_id++; 
1564   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1565
1566   tso->why_blocked  = NotBlocked;
1567   tso->blocked_exceptions = NULL;
1568
1569   tso->stack_size   = stack_size;
1570   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1571                               - TSO_STRUCT_SIZEW;
1572   tso->sp           = (P_)&(tso->stack) + stack_size;
1573
1574 #ifdef PROFILING
1575   tso->prof.CCCS = CCS_MAIN;
1576 #endif
1577
1578   /* put a stop frame on the stack */
1579   tso->sp -= sizeofW(StgStopFrame);
1580   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1581   tso->su = (StgUpdateFrame*)tso->sp;
1582
1583   // ToDo: check this
1584 #if defined(GRAN)
1585   tso->link = END_TSO_QUEUE;
1586   /* uses more flexible routine in GranSim */
1587   insertThread(tso, CurrentProc);
1588 #else
1589   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1590    * from its creation
1591    */
1592 #endif
1593
1594 #if defined(GRAN) 
1595   if (RtsFlags.GranFlags.GranSimStats.Full) 
1596     DumpGranEvent(GR_START,tso);
1597 #elif defined(PAR)
1598   if (RtsFlags.ParFlags.ParStats.Full) 
1599     DumpGranEvent(GR_STARTQ,tso);
1600   /* HACk to avoid SCHEDULE 
1601      LastTSO = tso; */
1602 #endif
1603
1604   /* Link the new thread on the global thread list.
1605    */
1606   tso->global_link = all_threads;
1607   all_threads = tso;
1608
1609 #if defined(DIST)
1610   tso->dist.priority = MandatoryPriority; //by default that is...
1611 #endif
1612
1613 #if defined(GRAN)
1614   tso->gran.pri = pri;
1615 # if defined(DEBUG)
1616   tso->gran.magic = TSO_MAGIC; // debugging only
1617 # endif
1618   tso->gran.sparkname   = 0;
1619   tso->gran.startedat   = CURRENT_TIME; 
1620   tso->gran.exported    = 0;
1621   tso->gran.basicblocks = 0;
1622   tso->gran.allocs      = 0;
1623   tso->gran.exectime    = 0;
1624   tso->gran.fetchtime   = 0;
1625   tso->gran.fetchcount  = 0;
1626   tso->gran.blocktime   = 0;
1627   tso->gran.blockcount  = 0;
1628   tso->gran.blockedat   = 0;
1629   tso->gran.globalsparks = 0;
1630   tso->gran.localsparks  = 0;
1631   if (RtsFlags.GranFlags.Light)
1632     tso->gran.clock  = Now; /* local clock */
1633   else
1634     tso->gran.clock  = 0;
1635
1636   IF_DEBUG(gran,printTSO(tso));
1637 #elif defined(PAR)
1638 # if defined(DEBUG)
1639   tso->par.magic = TSO_MAGIC; // debugging only
1640 # endif
1641   tso->par.sparkname   = 0;
1642   tso->par.startedat   = CURRENT_TIME; 
1643   tso->par.exported    = 0;
1644   tso->par.basicblocks = 0;
1645   tso->par.allocs      = 0;
1646   tso->par.exectime    = 0;
1647   tso->par.fetchtime   = 0;
1648   tso->par.fetchcount  = 0;
1649   tso->par.blocktime   = 0;
1650   tso->par.blockcount  = 0;
1651   tso->par.blockedat   = 0;
1652   tso->par.globalsparks = 0;
1653   tso->par.localsparks  = 0;
1654 #endif
1655
1656 #if defined(GRAN)
1657   globalGranStats.tot_threads_created++;
1658   globalGranStats.threads_created_on_PE[CurrentProc]++;
1659   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1660   globalGranStats.tot_sq_probes++;
1661 #elif defined(PAR)
1662   // collect parallel global statistics (currently done together with GC stats)
1663   if (RtsFlags.ParFlags.ParStats.Global &&
1664       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1665     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1666     globalParStats.tot_threads_created++;
1667   }
1668 #endif 
1669
1670 #if defined(GRAN)
1671   IF_GRAN_DEBUG(pri,
1672                 belch("==__ schedule: Created TSO %d (%p);",
1673                       CurrentProc, tso, tso->id));
1674 #elif defined(PAR)
1675     IF_PAR_DEBUG(verbose,
1676                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1677                        tso->id, tso, advisory_thread_count));
1678 #else
1679   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1680                                  tso->id, tso->stack_size));
1681 #endif    
1682   return tso;
1683 }
1684
1685 #if defined(PAR)
1686 /* RFP:
1687    all parallel thread creation calls should fall through the following routine.
1688 */
1689 StgTSO *
1690 createSparkThread(rtsSpark spark) 
1691 { StgTSO *tso;
1692   ASSERT(spark != (rtsSpark)NULL);
1693   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1694   { threadsIgnored++;
1695     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1696           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1697     return END_TSO_QUEUE;
1698   }
1699   else
1700   { threadsCreated++;
1701     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1702     if (tso==END_TSO_QUEUE)     
1703       barf("createSparkThread: Cannot create TSO");
1704 #if defined(DIST)
1705     tso->priority = AdvisoryPriority;
1706 #endif
1707     pushClosure(tso,spark);
1708     PUSH_ON_RUN_QUEUE(tso);
1709     advisory_thread_count++;    
1710   }
1711   return tso;
1712 }
1713 #endif
1714
1715 /*
1716   Turn a spark into a thread.
1717   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1718 */
1719 #if defined(PAR)
1720 //@cindex activateSpark
1721 StgTSO *
1722 activateSpark (rtsSpark spark) 
1723 {
1724   StgTSO *tso;
1725
1726   tso = createSparkThread(spark);
1727   if (RtsFlags.ParFlags.ParStats.Full) {   
1728     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1729     IF_PAR_DEBUG(verbose,
1730                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1731                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1732   }
1733   // ToDo: fwd info on local/global spark to thread -- HWL
1734   // tso->gran.exported =  spark->exported;
1735   // tso->gran.locked =   !spark->global;
1736   // tso->gran.sparkname = spark->name;
1737
1738   return tso;
1739 }
1740 #endif
1741
1742 /* ---------------------------------------------------------------------------
1743  * scheduleThread()
1744  *
1745  * scheduleThread puts a thread on the head of the runnable queue.
1746  * This will usually be done immediately after a thread is created.
1747  * The caller of scheduleThread must create the thread using e.g.
1748  * createThread and push an appropriate closure
1749  * on this thread's stack before the scheduler is invoked.
1750  * ------------------------------------------------------------------------ */
1751
1752 void
1753 scheduleThread(StgTSO *tso)
1754 {
1755   ACQUIRE_LOCK(&sched_mutex);
1756
1757   /* Put the new thread on the head of the runnable queue.  The caller
1758    * better push an appropriate closure on this thread's stack
1759    * beforehand.  In the SMP case, the thread may start running as
1760    * soon as we release the scheduler lock below.
1761    */
1762   PUSH_ON_RUN_QUEUE(tso);
1763   THREAD_RUNNABLE();
1764
1765 #if 0
1766   IF_DEBUG(scheduler,printTSO(tso));
1767 #endif
1768   RELEASE_LOCK(&sched_mutex);
1769 }
1770
1771 /* ---------------------------------------------------------------------------
1772  * startTasks()
1773  *
1774  * Start up Posix threads to run each of the scheduler tasks.
1775  * I believe the task ids are not needed in the system as defined.
1776  *  KH @ 25/10/99
1777  * ------------------------------------------------------------------------ */
1778
1779 #if defined(PAR) || defined(SMP)
1780 void
1781 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1782 {
1783   schedule();
1784 }
1785 #endif
1786
1787 /* ---------------------------------------------------------------------------
1788  * initScheduler()
1789  *
1790  * Initialise the scheduler.  This resets all the queues - if the
1791  * queues contained any threads, they'll be garbage collected at the
1792  * next pass.
1793  *
1794  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1795  * ------------------------------------------------------------------------ */
1796
1797 #ifdef SMP
1798 static void
1799 term_handler(int sig STG_UNUSED)
1800 {
1801   stat_workerStop();
1802   ACQUIRE_LOCK(&term_mutex);
1803   await_death--;
1804   RELEASE_LOCK(&term_mutex);
1805   pthread_exit(NULL);
1806 }
1807 #endif
1808
1809 static void
1810 initCapability( Capability *cap )
1811 {
1812     cap->f.stgChk0         = (F_)__stg_chk_0;
1813     cap->f.stgChk1         = (F_)__stg_chk_1;
1814     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
1815     cap->f.stgUpdatePAP    = (F_)__stg_update_PAP;
1816 }
1817
1818 void 
1819 initScheduler(void)
1820 {
1821 #if defined(GRAN)
1822   nat i;
1823
1824   for (i=0; i<=MAX_PROC; i++) {
1825     run_queue_hds[i]      = END_TSO_QUEUE;
1826     run_queue_tls[i]      = END_TSO_QUEUE;
1827     blocked_queue_hds[i]  = END_TSO_QUEUE;
1828     blocked_queue_tls[i]  = END_TSO_QUEUE;
1829     ccalling_threadss[i]  = END_TSO_QUEUE;
1830     sleeping_queue        = END_TSO_QUEUE;
1831   }
1832 #else
1833   run_queue_hd      = END_TSO_QUEUE;
1834   run_queue_tl      = END_TSO_QUEUE;
1835   blocked_queue_hd  = END_TSO_QUEUE;
1836   blocked_queue_tl  = END_TSO_QUEUE;
1837   sleeping_queue    = END_TSO_QUEUE;
1838 #endif 
1839
1840   suspended_ccalling_threads  = END_TSO_QUEUE;
1841
1842   main_threads = NULL;
1843   all_threads  = END_TSO_QUEUE;
1844
1845   context_switch = 0;
1846   interrupted    = 0;
1847
1848   RtsFlags.ConcFlags.ctxtSwitchTicks =
1849       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1850
1851   /* Install the SIGHUP handler */
1852 #ifdef SMP
1853   {
1854     struct sigaction action,oact;
1855
1856     action.sa_handler = term_handler;
1857     sigemptyset(&action.sa_mask);
1858     action.sa_flags = 0;
1859     if (sigaction(SIGTERM, &action, &oact) != 0) {
1860       barf("can't install TERM handler");
1861     }
1862   }
1863 #endif
1864
1865 #ifdef SMP
1866   /* Allocate N Capabilities */
1867   {
1868     nat i;
1869     Capability *cap, *prev;
1870     cap  = NULL;
1871     prev = NULL;
1872     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1873       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1874       initCapability(cap);
1875       cap->link = prev;
1876       prev = cap;
1877     }
1878     free_capabilities = cap;
1879     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1880   }
1881   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1882                              n_free_capabilities););
1883 #else
1884   initCapability(&MainCapability);
1885 #endif
1886
1887 #if defined(SMP) || defined(PAR)
1888   initSparkPools();
1889 #endif
1890 }
1891
1892 #ifdef SMP
1893 void
1894 startTasks( void )
1895 {
1896   nat i;
1897   int r;
1898   pthread_t tid;
1899   
1900   /* make some space for saving all the thread ids */
1901   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1902                             "initScheduler:task_ids");
1903   
1904   /* and create all the threads */
1905   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1906     r = pthread_create(&tid,NULL,taskStart,NULL);
1907     if (r != 0) {
1908       barf("startTasks: Can't create new Posix thread");
1909     }
1910     task_ids[i].id = tid;
1911     task_ids[i].mut_time = 0.0;
1912     task_ids[i].mut_etime = 0.0;
1913     task_ids[i].gc_time = 0.0;
1914     task_ids[i].gc_etime = 0.0;
1915     task_ids[i].elapsedtimestart = elapsedtime();
1916     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1917   }
1918 }
1919 #endif
1920
1921 void
1922 exitScheduler( void )
1923 {
1924 #ifdef SMP
1925   nat i;
1926
1927   /* Don't want to use pthread_cancel, since we'd have to install
1928    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1929    * all our locks.
1930    */
1931 #if 0
1932   /* Cancel all our tasks */
1933   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1934     pthread_cancel(task_ids[i].id);
1935   }
1936   
1937   /* Wait for all the tasks to terminate */
1938   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1939     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1940                                task_ids[i].id));
1941     pthread_join(task_ids[i].id, NULL);
1942   }
1943 #endif
1944
1945   /* Send 'em all a SIGHUP.  That should shut 'em up.
1946    */
1947   await_death = RtsFlags.ParFlags.nNodes;
1948   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1949     pthread_kill(task_ids[i].id,SIGTERM);
1950   }
1951   while (await_death > 0) {
1952     sched_yield();
1953   }
1954 #endif
1955 }
1956
1957 /* -----------------------------------------------------------------------------
1958    Managing the per-task allocation areas.
1959    
1960    Each capability comes with an allocation area.  These are
1961    fixed-length block lists into which allocation can be done.
1962
1963    ToDo: no support for two-space collection at the moment???
1964    -------------------------------------------------------------------------- */
1965
1966 /* -----------------------------------------------------------------------------
1967  * waitThread is the external interface for running a new computation
1968  * and waiting for the result.
1969  *
1970  * In the non-SMP case, we create a new main thread, push it on the 
1971  * main-thread stack, and invoke the scheduler to run it.  The
1972  * scheduler will return when the top main thread on the stack has
1973  * completed or died, and fill in the necessary fields of the
1974  * main_thread structure.
1975  *
1976  * In the SMP case, we create a main thread as before, but we then
1977  * create a new condition variable and sleep on it.  When our new
1978  * main thread has completed, we'll be woken up and the status/result
1979  * will be in the main_thread struct.
1980  * -------------------------------------------------------------------------- */
1981
1982 int 
1983 howManyThreadsAvail ( void )
1984 {
1985    int i = 0;
1986    StgTSO* q;
1987    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1988       i++;
1989    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1990       i++;
1991    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1992       i++;
1993    return i;
1994 }
1995
1996 void
1997 finishAllThreads ( void )
1998 {
1999    do {
2000       while (run_queue_hd != END_TSO_QUEUE) {
2001          waitThread ( run_queue_hd, NULL );
2002       }
2003       while (blocked_queue_hd != END_TSO_QUEUE) {
2004          waitThread ( blocked_queue_hd, NULL );
2005       }
2006       while (sleeping_queue != END_TSO_QUEUE) {
2007          waitThread ( blocked_queue_hd, NULL );
2008       }
2009    } while 
2010       (blocked_queue_hd != END_TSO_QUEUE || 
2011        run_queue_hd     != END_TSO_QUEUE ||
2012        sleeping_queue   != END_TSO_QUEUE);
2013 }
2014
2015 SchedulerStatus
2016 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2017 {
2018   StgMainThread *m;
2019   SchedulerStatus stat;
2020
2021   ACQUIRE_LOCK(&sched_mutex);
2022   
2023   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2024
2025   m->tso = tso;
2026   m->ret = ret;
2027   m->stat = NoStatus;
2028 #ifdef SMP
2029   pthread_cond_init(&m->wakeup, NULL);
2030 #endif
2031
2032   m->link = main_threads;
2033   main_threads = m;
2034
2035   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2036                               m->tso->id));
2037
2038 #ifdef SMP
2039   do {
2040     pthread_cond_wait(&m->wakeup, &sched_mutex);
2041   } while (m->stat == NoStatus);
2042 #elif defined(GRAN)
2043   /* GranSim specific init */
2044   CurrentTSO = m->tso;                // the TSO to run
2045   procStatus[MainProc] = Busy;        // status of main PE
2046   CurrentProc = MainProc;             // PE to run it on
2047
2048   schedule();
2049 #else
2050   schedule();
2051   ASSERT(m->stat != NoStatus);
2052 #endif
2053
2054   stat = m->stat;
2055
2056 #ifdef SMP
2057   pthread_cond_destroy(&m->wakeup);
2058 #endif
2059
2060   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2061                               m->tso->id));
2062   free(m);
2063
2064   RELEASE_LOCK(&sched_mutex);
2065
2066   return stat;
2067 }
2068
2069 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2070 //@subsection Run queue code 
2071
2072 #if 0
2073 /* 
2074    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2075        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2076        implicit global variable that has to be correct when calling these
2077        fcts -- HWL 
2078 */
2079
2080 /* Put the new thread on the head of the runnable queue.
2081  * The caller of createThread better push an appropriate closure
2082  * on this thread's stack before the scheduler is invoked.
2083  */
2084 static /* inline */ void
2085 add_to_run_queue(tso)
2086 StgTSO* tso; 
2087 {
2088   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2089   tso->link = run_queue_hd;
2090   run_queue_hd = tso;
2091   if (run_queue_tl == END_TSO_QUEUE) {
2092     run_queue_tl = tso;
2093   }
2094 }
2095
2096 /* Put the new thread at the end of the runnable queue. */
2097 static /* inline */ void
2098 push_on_run_queue(tso)
2099 StgTSO* tso; 
2100 {
2101   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2102   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2103   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2104   if (run_queue_hd == END_TSO_QUEUE) {
2105     run_queue_hd = tso;
2106   } else {
2107     run_queue_tl->link = tso;
2108   }
2109   run_queue_tl = tso;
2110 }
2111
2112 /* 
2113    Should be inlined because it's used very often in schedule.  The tso
2114    argument is actually only needed in GranSim, where we want to have the
2115    possibility to schedule *any* TSO on the run queue, irrespective of the
2116    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2117    the run queue and dequeue the tso, adjusting the links in the queue. 
2118 */
2119 //@cindex take_off_run_queue
2120 static /* inline */ StgTSO*
2121 take_off_run_queue(StgTSO *tso) {
2122   StgTSO *t, *prev;
2123
2124   /* 
2125      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2126
2127      if tso is specified, unlink that tso from the run_queue (doesn't have
2128      to be at the beginning of the queue); GranSim only 
2129   */
2130   if (tso!=END_TSO_QUEUE) {
2131     /* find tso in queue */
2132     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2133          t!=END_TSO_QUEUE && t!=tso;
2134          prev=t, t=t->link) 
2135       /* nothing */ ;
2136     ASSERT(t==tso);
2137     /* now actually dequeue the tso */
2138     if (prev!=END_TSO_QUEUE) {
2139       ASSERT(run_queue_hd!=t);
2140       prev->link = t->link;
2141     } else {
2142       /* t is at beginning of thread queue */
2143       ASSERT(run_queue_hd==t);
2144       run_queue_hd = t->link;
2145     }
2146     /* t is at end of thread queue */
2147     if (t->link==END_TSO_QUEUE) {
2148       ASSERT(t==run_queue_tl);
2149       run_queue_tl = prev;
2150     } else {
2151       ASSERT(run_queue_tl!=t);
2152     }
2153     t->link = END_TSO_QUEUE;
2154   } else {
2155     /* take tso from the beginning of the queue; std concurrent code */
2156     t = run_queue_hd;
2157     if (t != END_TSO_QUEUE) {
2158       run_queue_hd = t->link;
2159       t->link = END_TSO_QUEUE;
2160       if (run_queue_hd == END_TSO_QUEUE) {
2161         run_queue_tl = END_TSO_QUEUE;
2162       }
2163     }
2164   }
2165   return t;
2166 }
2167
2168 #endif /* 0 */
2169
2170 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2171 //@subsection Garbage Collextion Routines
2172
2173 /* ---------------------------------------------------------------------------
2174    Where are the roots that we know about?
2175
2176         - all the threads on the runnable queue
2177         - all the threads on the blocked queue
2178         - all the threads on the sleeping queue
2179         - all the thread currently executing a _ccall_GC
2180         - all the "main threads"
2181      
2182    ------------------------------------------------------------------------ */
2183
2184 /* This has to be protected either by the scheduler monitor, or by the
2185         garbage collection monitor (probably the latter).
2186         KH @ 25/10/99
2187 */
2188
2189 void
2190 GetRoots(evac_fn evac)
2191 {
2192   StgMainThread *m;
2193
2194 #if defined(GRAN)
2195   {
2196     nat i;
2197     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2198       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2199           evac((StgClosure **)&run_queue_hds[i]);
2200       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2201           evac((StgClosure **)&run_queue_tls[i]);
2202       
2203       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2204           evac((StgClosure **)&blocked_queue_hds[i]);
2205       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2206           evac((StgClosure **)&blocked_queue_tls[i]);
2207       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2208           evac((StgClosure **)&ccalling_threads[i]);
2209     }
2210   }
2211
2212   markEventQueue();
2213
2214 #else /* !GRAN */
2215   if (run_queue_hd != END_TSO_QUEUE) {
2216       ASSERT(run_queue_tl != END_TSO_QUEUE);
2217       evac((StgClosure **)&run_queue_hd);
2218       evac((StgClosure **)&run_queue_tl);
2219   }
2220   
2221   if (blocked_queue_hd != END_TSO_QUEUE) {
2222       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2223       evac((StgClosure **)&blocked_queue_hd);
2224       evac((StgClosure **)&blocked_queue_tl);
2225   }
2226   
2227   if (sleeping_queue != END_TSO_QUEUE) {
2228       evac((StgClosure **)&sleeping_queue);
2229   }
2230 #endif 
2231
2232   for (m = main_threads; m != NULL; m = m->link) {
2233       evac((StgClosure **)&m->tso);
2234   }
2235   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2236       evac((StgClosure **)&suspended_ccalling_threads);
2237   }
2238
2239 #if defined(SMP) || defined(PAR) || defined(GRAN)
2240   markSparkQueue(evac);
2241 #endif
2242 }
2243
2244 /* -----------------------------------------------------------------------------
2245    performGC
2246
2247    This is the interface to the garbage collector from Haskell land.
2248    We provide this so that external C code can allocate and garbage
2249    collect when called from Haskell via _ccall_GC.
2250
2251    It might be useful to provide an interface whereby the programmer
2252    can specify more roots (ToDo).
2253    
2254    This needs to be protected by the GC condition variable above.  KH.
2255    -------------------------------------------------------------------------- */
2256
2257 void (*extra_roots)(evac_fn);
2258
2259 void
2260 performGC(void)
2261 {
2262   GarbageCollect(GetRoots,rtsFalse);
2263 }
2264
2265 void
2266 performMajorGC(void)
2267 {
2268   GarbageCollect(GetRoots,rtsTrue);
2269 }
2270
2271 static void
2272 AllRoots(evac_fn evac)
2273 {
2274     GetRoots(evac);             // the scheduler's roots
2275     extra_roots(evac);          // the user's roots
2276 }
2277
2278 void
2279 performGCWithRoots(void (*get_roots)(evac_fn))
2280 {
2281   extra_roots = get_roots;
2282   GarbageCollect(AllRoots,rtsFalse);
2283 }
2284
2285 /* -----------------------------------------------------------------------------
2286    Stack overflow
2287
2288    If the thread has reached its maximum stack size, then raise the
2289    StackOverflow exception in the offending thread.  Otherwise
2290    relocate the TSO into a larger chunk of memory and adjust its stack
2291    size appropriately.
2292    -------------------------------------------------------------------------- */
2293
2294 static StgTSO *
2295 threadStackOverflow(StgTSO *tso)
2296 {
2297   nat new_stack_size, new_tso_size, diff, stack_words;
2298   StgPtr new_sp;
2299   StgTSO *dest;
2300
2301   IF_DEBUG(sanity,checkTSO(tso));
2302   if (tso->stack_size >= tso->max_stack_size) {
2303
2304     IF_DEBUG(gc,
2305              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2306                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2307              /* If we're debugging, just print out the top of the stack */
2308              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2309                                               tso->sp+64)));
2310
2311     /* Send this thread the StackOverflow exception */
2312     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2313     return tso;
2314   }
2315
2316   /* Try to double the current stack size.  If that takes us over the
2317    * maximum stack size for this thread, then use the maximum instead.
2318    * Finally round up so the TSO ends up as a whole number of blocks.
2319    */
2320   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2321   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2322                                        TSO_STRUCT_SIZE)/sizeof(W_);
2323   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2324   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2325
2326   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2327
2328   dest = (StgTSO *)allocate(new_tso_size);
2329   TICK_ALLOC_TSO(new_stack_size,0);
2330
2331   /* copy the TSO block and the old stack into the new area */
2332   memcpy(dest,tso,TSO_STRUCT_SIZE);
2333   stack_words = tso->stack + tso->stack_size - tso->sp;
2334   new_sp = (P_)dest + new_tso_size - stack_words;
2335   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2336
2337   /* relocate the stack pointers... */
2338   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2339   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2340   dest->sp    = new_sp;
2341   dest->stack_size = new_stack_size;
2342         
2343   /* and relocate the update frame list */
2344   relocate_stack(dest, diff);
2345
2346   /* Mark the old TSO as relocated.  We have to check for relocated
2347    * TSOs in the garbage collector and any primops that deal with TSOs.
2348    *
2349    * It's important to set the sp and su values to just beyond the end
2350    * of the stack, so we don't attempt to scavenge any part of the
2351    * dead TSO's stack.
2352    */
2353   tso->what_next = ThreadRelocated;
2354   tso->link = dest;
2355   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2356   tso->su = (StgUpdateFrame *)tso->sp;
2357   tso->why_blocked = NotBlocked;
2358   dest->mut_link = NULL;
2359
2360   IF_PAR_DEBUG(verbose,
2361                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2362                      tso->id, tso, tso->stack_size);
2363                /* If we're debugging, just print out the top of the stack */
2364                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2365                                                 tso->sp+64)));
2366   
2367   IF_DEBUG(sanity,checkTSO(tso));
2368 #if 0
2369   IF_DEBUG(scheduler,printTSO(dest));
2370 #endif
2371
2372   return dest;
2373 }
2374
2375 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2376 //@subsection Blocking Queue Routines
2377
2378 /* ---------------------------------------------------------------------------
2379    Wake up a queue that was blocked on some resource.
2380    ------------------------------------------------------------------------ */
2381
2382 #if defined(GRAN)
2383 static inline void
2384 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2385 {
2386 }
2387 #elif defined(PAR)
2388 static inline void
2389 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2390 {
2391   /* write RESUME events to log file and
2392      update blocked and fetch time (depending on type of the orig closure) */
2393   if (RtsFlags.ParFlags.ParStats.Full) {
2394     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2395                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2396                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2397     if (EMPTY_RUN_QUEUE())
2398       emitSchedule = rtsTrue;
2399
2400     switch (get_itbl(node)->type) {
2401         case FETCH_ME_BQ:
2402           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2403           break;
2404         case RBH:
2405         case FETCH_ME:
2406         case BLACKHOLE_BQ:
2407           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2408           break;
2409 #ifdef DIST
2410         case MVAR:
2411           break;
2412 #endif    
2413         default:
2414           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2415         }
2416       }
2417 }
2418 #endif
2419
2420 #if defined(GRAN)
2421 static StgBlockingQueueElement *
2422 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2423 {
2424     StgTSO *tso;
2425     PEs node_loc, tso_loc;
2426
2427     node_loc = where_is(node); // should be lifted out of loop
2428     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2429     tso_loc = where_is((StgClosure *)tso);
2430     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2431       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2432       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2433       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2434       // insertThread(tso, node_loc);
2435       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2436                 ResumeThread,
2437                 tso, node, (rtsSpark*)NULL);
2438       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2439       // len_local++;
2440       // len++;
2441     } else { // TSO is remote (actually should be FMBQ)
2442       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2443                                   RtsFlags.GranFlags.Costs.gunblocktime +
2444                                   RtsFlags.GranFlags.Costs.latency;
2445       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2446                 UnblockThread,
2447                 tso, node, (rtsSpark*)NULL);
2448       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2449       // len++;
2450     }
2451     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2452     IF_GRAN_DEBUG(bq,
2453                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2454                           (node_loc==tso_loc ? "Local" : "Global"), 
2455                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2456     tso->block_info.closure = NULL;
2457     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2458                              tso->id, tso));
2459 }
2460 #elif defined(PAR)
2461 static StgBlockingQueueElement *
2462 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2463 {
2464     StgBlockingQueueElement *next;
2465
2466     switch (get_itbl(bqe)->type) {
2467     case TSO:
2468       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2469       /* if it's a TSO just push it onto the run_queue */
2470       next = bqe->link;
2471       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2472       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2473       THREAD_RUNNABLE();
2474       unblockCount(bqe, node);
2475       /* reset blocking status after dumping event */
2476       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2477       break;
2478
2479     case BLOCKED_FETCH:
2480       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2481       next = bqe->link;
2482       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2483       PendingFetches = (StgBlockedFetch *)bqe;
2484       break;
2485
2486 # if defined(DEBUG)
2487       /* can ignore this case in a non-debugging setup; 
2488          see comments on RBHSave closures above */
2489     case CONSTR:
2490       /* check that the closure is an RBHSave closure */
2491       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2492              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2493              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2494       break;
2495
2496     default:
2497       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2498            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2499            (StgClosure *)bqe);
2500 # endif
2501     }
2502   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2503   return next;
2504 }
2505
2506 #else /* !GRAN && !PAR */
2507 static StgTSO *
2508 unblockOneLocked(StgTSO *tso)
2509 {
2510   StgTSO *next;
2511
2512   ASSERT(get_itbl(tso)->type == TSO);
2513   ASSERT(tso->why_blocked != NotBlocked);
2514   tso->why_blocked = NotBlocked;
2515   next = tso->link;
2516   PUSH_ON_RUN_QUEUE(tso);
2517   THREAD_RUNNABLE();
2518   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2519   return next;
2520 }
2521 #endif
2522
2523 #if defined(GRAN) || defined(PAR)
2524 inline StgBlockingQueueElement *
2525 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2526 {
2527   ACQUIRE_LOCK(&sched_mutex);
2528   bqe = unblockOneLocked(bqe, node);
2529   RELEASE_LOCK(&sched_mutex);
2530   return bqe;
2531 }
2532 #else
2533 inline StgTSO *
2534 unblockOne(StgTSO *tso)
2535 {
2536   ACQUIRE_LOCK(&sched_mutex);
2537   tso = unblockOneLocked(tso);
2538   RELEASE_LOCK(&sched_mutex);
2539   return tso;
2540 }
2541 #endif
2542
2543 #if defined(GRAN)
2544 void 
2545 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2546 {
2547   StgBlockingQueueElement *bqe;
2548   PEs node_loc;
2549   nat len = 0; 
2550
2551   IF_GRAN_DEBUG(bq, 
2552                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2553                       node, CurrentProc, CurrentTime[CurrentProc], 
2554                       CurrentTSO->id, CurrentTSO));
2555
2556   node_loc = where_is(node);
2557
2558   ASSERT(q == END_BQ_QUEUE ||
2559          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2560          get_itbl(q)->type == CONSTR); // closure (type constructor)
2561   ASSERT(is_unique(node));
2562
2563   /* FAKE FETCH: magically copy the node to the tso's proc;
2564      no Fetch necessary because in reality the node should not have been 
2565      moved to the other PE in the first place
2566   */
2567   if (CurrentProc!=node_loc) {
2568     IF_GRAN_DEBUG(bq, 
2569                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2570                         node, node_loc, CurrentProc, CurrentTSO->id, 
2571                         // CurrentTSO, where_is(CurrentTSO),
2572                         node->header.gran.procs));
2573     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2574     IF_GRAN_DEBUG(bq, 
2575                   belch("## new bitmask of node %p is %#x",
2576                         node, node->header.gran.procs));
2577     if (RtsFlags.GranFlags.GranSimStats.Global) {
2578       globalGranStats.tot_fake_fetches++;
2579     }
2580   }
2581
2582   bqe = q;
2583   // ToDo: check: ASSERT(CurrentProc==node_loc);
2584   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2585     //next = bqe->link;
2586     /* 
2587        bqe points to the current element in the queue
2588        next points to the next element in the queue
2589     */
2590     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2591     //tso_loc = where_is(tso);
2592     len++;
2593     bqe = unblockOneLocked(bqe, node);
2594   }
2595
2596   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2597      the closure to make room for the anchor of the BQ */
2598   if (bqe!=END_BQ_QUEUE) {
2599     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2600     /*
2601     ASSERT((info_ptr==&RBH_Save_0_info) ||
2602            (info_ptr==&RBH_Save_1_info) ||
2603            (info_ptr==&RBH_Save_2_info));
2604     */
2605     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2606     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2607     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2608
2609     IF_GRAN_DEBUG(bq,
2610                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2611                         node, info_type(node)));
2612   }
2613
2614   /* statistics gathering */
2615   if (RtsFlags.GranFlags.GranSimStats.Global) {
2616     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2617     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2618     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2619     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2620   }
2621   IF_GRAN_DEBUG(bq,
2622                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2623                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2624 }
2625 #elif defined(PAR)
2626 void 
2627 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2628 {
2629   StgBlockingQueueElement *bqe;
2630
2631   ACQUIRE_LOCK(&sched_mutex);
2632
2633   IF_PAR_DEBUG(verbose, 
2634                belch("##-_ AwBQ for node %p on [%x]: ",
2635                      node, mytid));
2636 #ifdef DIST  
2637   //RFP
2638   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2639     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2640     return;
2641   }
2642 #endif
2643   
2644   ASSERT(q == END_BQ_QUEUE ||
2645          get_itbl(q)->type == TSO ||           
2646          get_itbl(q)->type == BLOCKED_FETCH || 
2647          get_itbl(q)->type == CONSTR); 
2648
2649   bqe = q;
2650   while (get_itbl(bqe)->type==TSO || 
2651          get_itbl(bqe)->type==BLOCKED_FETCH) {
2652     bqe = unblockOneLocked(bqe, node);
2653   }
2654   RELEASE_LOCK(&sched_mutex);
2655 }
2656
2657 #else   /* !GRAN && !PAR */
2658 void
2659 awakenBlockedQueue(StgTSO *tso)
2660 {
2661   ACQUIRE_LOCK(&sched_mutex);
2662   while (tso != END_TSO_QUEUE) {
2663     tso = unblockOneLocked(tso);
2664   }
2665   RELEASE_LOCK(&sched_mutex);
2666 }
2667 #endif
2668
2669 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2670 //@subsection Exception Handling Routines
2671
2672 /* ---------------------------------------------------------------------------
2673    Interrupt execution
2674    - usually called inside a signal handler so it mustn't do anything fancy.   
2675    ------------------------------------------------------------------------ */
2676
2677 void
2678 interruptStgRts(void)
2679 {
2680     interrupted    = 1;
2681     context_switch = 1;
2682 }
2683
2684 /* -----------------------------------------------------------------------------
2685    Unblock a thread
2686
2687    This is for use when we raise an exception in another thread, which
2688    may be blocked.
2689    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2690    -------------------------------------------------------------------------- */
2691
2692 #if defined(GRAN) || defined(PAR)
2693 /*
2694   NB: only the type of the blocking queue is different in GranSim and GUM
2695       the operations on the queue-elements are the same
2696       long live polymorphism!
2697 */
2698 static void
2699 unblockThread(StgTSO *tso)
2700 {
2701   StgBlockingQueueElement *t, **last;
2702
2703   ACQUIRE_LOCK(&sched_mutex);
2704   switch (tso->why_blocked) {
2705
2706   case NotBlocked:
2707     return;  /* not blocked */
2708
2709   case BlockedOnMVar:
2710     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2711     {
2712       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2713       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2714
2715       last = (StgBlockingQueueElement **)&mvar->head;
2716       for (t = (StgBlockingQueueElement *)mvar->head; 
2717            t != END_BQ_QUEUE; 
2718            last = &t->link, last_tso = t, t = t->link) {
2719         if (t == (StgBlockingQueueElement *)tso) {
2720           *last = (StgBlockingQueueElement *)tso->link;
2721           if (mvar->tail == tso) {
2722             mvar->tail = (StgTSO *)last_tso;
2723           }
2724           goto done;
2725         }
2726       }
2727       barf("unblockThread (MVAR): TSO not found");
2728     }
2729
2730   case BlockedOnBlackHole:
2731     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2732     {
2733       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2734
2735       last = &bq->blocking_queue;
2736       for (t = bq->blocking_queue; 
2737            t != END_BQ_QUEUE; 
2738            last = &t->link, t = t->link) {
2739         if (t == (StgBlockingQueueElement *)tso) {
2740           *last = (StgBlockingQueueElement *)tso->link;
2741           goto done;
2742         }
2743       }
2744       barf("unblockThread (BLACKHOLE): TSO not found");
2745     }
2746
2747   case BlockedOnException:
2748     {
2749       StgTSO *target  = tso->block_info.tso;
2750
2751       ASSERT(get_itbl(target)->type == TSO);
2752
2753       if (target->what_next == ThreadRelocated) {
2754           target = target->link;
2755           ASSERT(get_itbl(target)->type == TSO);
2756       }
2757
2758       ASSERT(target->blocked_exceptions != NULL);
2759
2760       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2761       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2762            t != END_BQ_QUEUE; 
2763            last = &t->link, t = t->link) {
2764         ASSERT(get_itbl(t)->type == TSO);
2765         if (t == (StgBlockingQueueElement *)tso) {
2766           *last = (StgBlockingQueueElement *)tso->link;
2767           goto done;
2768         }
2769       }
2770       barf("unblockThread (Exception): TSO not found");
2771     }
2772
2773   case BlockedOnRead:
2774   case BlockedOnWrite:
2775     {
2776       /* take TSO off blocked_queue */
2777       StgBlockingQueueElement *prev = NULL;
2778       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2779            prev = t, t = t->link) {
2780         if (t == (StgBlockingQueueElement *)tso) {
2781           if (prev == NULL) {
2782             blocked_queue_hd = (StgTSO *)t->link;
2783             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2784               blocked_queue_tl = END_TSO_QUEUE;
2785             }
2786           } else {
2787             prev->link = t->link;
2788             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2789               blocked_queue_tl = (StgTSO *)prev;
2790             }
2791           }
2792           goto done;
2793         }
2794       }
2795       barf("unblockThread (I/O): TSO not found");
2796     }
2797
2798   case BlockedOnDelay:
2799     {
2800       /* take TSO off sleeping_queue */
2801       StgBlockingQueueElement *prev = NULL;
2802       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2803            prev = t, t = t->link) {
2804         if (t == (StgBlockingQueueElement *)tso) {
2805           if (prev == NULL) {
2806             sleeping_queue = (StgTSO *)t->link;
2807           } else {
2808             prev->link = t->link;
2809           }
2810           goto done;
2811         }
2812       }
2813       barf("unblockThread (I/O): TSO not found");
2814     }
2815
2816   default:
2817     barf("unblockThread");
2818   }
2819
2820  done:
2821   tso->link = END_TSO_QUEUE;
2822   tso->why_blocked = NotBlocked;
2823   tso->block_info.closure = NULL;
2824   PUSH_ON_RUN_QUEUE(tso);
2825   RELEASE_LOCK(&sched_mutex);
2826 }
2827 #else
2828 static void
2829 unblockThread(StgTSO *tso)
2830 {
2831   StgTSO *t, **last;
2832
2833   ACQUIRE_LOCK(&sched_mutex);
2834   switch (tso->why_blocked) {
2835
2836   case NotBlocked:
2837     return;  /* not blocked */
2838
2839   case BlockedOnMVar:
2840     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2841     {
2842       StgTSO *last_tso = END_TSO_QUEUE;
2843       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2844
2845       last = &mvar->head;
2846       for (t = mvar->head; t != END_TSO_QUEUE; 
2847            last = &t->link, last_tso = t, t = t->link) {
2848         if (t == tso) {
2849           *last = tso->link;
2850           if (mvar->tail == tso) {
2851             mvar->tail = last_tso;
2852           }
2853           goto done;
2854         }
2855       }
2856       barf("unblockThread (MVAR): TSO not found");
2857     }
2858
2859   case BlockedOnBlackHole:
2860     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2861     {
2862       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2863
2864       last = &bq->blocking_queue;
2865       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2866            last = &t->link, t = t->link) {
2867         if (t == tso) {
2868           *last = tso->link;
2869           goto done;
2870         }
2871       }
2872       barf("unblockThread (BLACKHOLE): TSO not found");
2873     }
2874
2875   case BlockedOnException:
2876     {
2877       StgTSO *target  = tso->block_info.tso;
2878
2879       ASSERT(get_itbl(target)->type == TSO);
2880
2881       while (target->what_next == ThreadRelocated) {
2882           target = target->link;
2883           ASSERT(get_itbl(target)->type == TSO);
2884       }
2885       
2886       ASSERT(target->blocked_exceptions != NULL);
2887
2888       last = &target->blocked_exceptions;
2889       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2890            last = &t->link, t = t->link) {
2891         ASSERT(get_itbl(t)->type == TSO);
2892         if (t == tso) {
2893           *last = tso->link;
2894           goto done;
2895         }
2896       }
2897       barf("unblockThread (Exception): TSO not found");
2898     }
2899
2900   case BlockedOnRead:
2901   case BlockedOnWrite:
2902     {
2903       StgTSO *prev = NULL;
2904       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2905            prev = t, t = t->link) {
2906         if (t == tso) {
2907           if (prev == NULL) {
2908             blocked_queue_hd = t->link;
2909             if (blocked_queue_tl == t) {
2910               blocked_queue_tl = END_TSO_QUEUE;
2911             }
2912           } else {
2913             prev->link = t->link;
2914             if (blocked_queue_tl == t) {
2915               blocked_queue_tl = prev;
2916             }
2917           }
2918           goto done;
2919         }
2920       }
2921       barf("unblockThread (I/O): TSO not found");
2922     }
2923
2924   case BlockedOnDelay:
2925     {
2926       StgTSO *prev = NULL;
2927       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2928            prev = t, t = t->link) {
2929         if (t == tso) {
2930           if (prev == NULL) {
2931             sleeping_queue = t->link;
2932           } else {
2933             prev->link = t->link;
2934           }
2935           goto done;
2936         }
2937       }
2938       barf("unblockThread (I/O): TSO not found");
2939     }
2940
2941   default:
2942     barf("unblockThread");
2943   }
2944
2945  done:
2946   tso->link = END_TSO_QUEUE;
2947   tso->why_blocked = NotBlocked;
2948   tso->block_info.closure = NULL;
2949   PUSH_ON_RUN_QUEUE(tso);
2950   RELEASE_LOCK(&sched_mutex);
2951 }
2952 #endif
2953
2954 /* -----------------------------------------------------------------------------
2955  * raiseAsync()
2956  *
2957  * The following function implements the magic for raising an
2958  * asynchronous exception in an existing thread.
2959  *
2960  * We first remove the thread from any queue on which it might be
2961  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2962  *
2963  * We strip the stack down to the innermost CATCH_FRAME, building
2964  * thunks in the heap for all the active computations, so they can 
2965  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2966  * an application of the handler to the exception, and push it on
2967  * the top of the stack.
2968  * 
2969  * How exactly do we save all the active computations?  We create an
2970  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2971  * AP_UPDs pushes everything from the corresponding update frame
2972  * upwards onto the stack.  (Actually, it pushes everything up to the
2973  * next update frame plus a pointer to the next AP_UPD object.
2974  * Entering the next AP_UPD object pushes more onto the stack until we
2975  * reach the last AP_UPD object - at which point the stack should look
2976  * exactly as it did when we killed the TSO and we can continue
2977  * execution by entering the closure on top of the stack.
2978  *
2979  * We can also kill a thread entirely - this happens if either (a) the 
2980  * exception passed to raiseAsync is NULL, or (b) there's no
2981  * CATCH_FRAME on the stack.  In either case, we strip the entire
2982  * stack and replace the thread with a zombie.
2983  *
2984  * -------------------------------------------------------------------------- */
2985  
2986 void 
2987 deleteThread(StgTSO *tso)
2988 {
2989   raiseAsync(tso,NULL);
2990 }
2991
2992 void
2993 raiseAsync(StgTSO *tso, StgClosure *exception)
2994 {
2995   StgUpdateFrame* su = tso->su;
2996   StgPtr          sp = tso->sp;
2997   
2998   /* Thread already dead? */
2999   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3000     return;
3001   }
3002
3003   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3004
3005   /* Remove it from any blocking queues */
3006   unblockThread(tso);
3007
3008   /* The stack freezing code assumes there's a closure pointer on
3009    * the top of the stack.  This isn't always the case with compiled
3010    * code, so we have to push a dummy closure on the top which just
3011    * returns to the next return address on the stack.
3012    */
3013   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3014     *(--sp) = (W_)&stg_dummy_ret_closure;
3015   }
3016
3017   while (1) {
3018     nat words = ((P_)su - (P_)sp) - 1;
3019     nat i;
3020     StgAP_UPD * ap;
3021
3022     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3023      * then build PAP(handler,exception,realworld#), and leave it on
3024      * top of the stack ready to enter.
3025      */
3026     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3027       StgCatchFrame *cf = (StgCatchFrame *)su;
3028       /* we've got an exception to raise, so let's pass it to the
3029        * handler in this frame.
3030        */
3031       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3032       TICK_ALLOC_UPD_PAP(3,0);
3033       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3034               
3035       ap->n_args = 2;
3036       ap->fun = cf->handler;    /* :: Exception -> IO a */
3037       ap->payload[0] = exception;
3038       ap->payload[1] = ARG_TAG(0); /* realworld token */
3039
3040       /* throw away the stack from Sp up to and including the
3041        * CATCH_FRAME.
3042        */
3043       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3044       tso->su = cf->link;
3045
3046       /* Restore the blocked/unblocked state for asynchronous exceptions
3047        * at the CATCH_FRAME.  
3048        *
3049        * If exceptions were unblocked at the catch, arrange that they
3050        * are unblocked again after executing the handler by pushing an
3051        * unblockAsyncExceptions_ret stack frame.
3052        */
3053       if (!cf->exceptions_blocked) {
3054         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3055       }
3056       
3057       /* Ensure that async exceptions are blocked when running the handler.
3058        */
3059       if (tso->blocked_exceptions == NULL) {
3060         tso->blocked_exceptions = END_TSO_QUEUE;
3061       }
3062       
3063       /* Put the newly-built PAP on top of the stack, ready to execute
3064        * when the thread restarts.
3065        */
3066       sp[0] = (W_)ap;
3067       tso->sp = sp;
3068       tso->what_next = ThreadEnterGHC;
3069       IF_DEBUG(sanity, checkTSO(tso));
3070       return;
3071     }
3072
3073     /* First build an AP_UPD consisting of the stack chunk above the
3074      * current update frame, with the top word on the stack as the
3075      * fun field.
3076      */
3077     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3078     
3079     ASSERT(words >= 0);
3080     
3081     ap->n_args = words;
3082     ap->fun    = (StgClosure *)sp[0];
3083     sp++;
3084     for(i=0; i < (nat)words; ++i) {
3085       ap->payload[i] = (StgClosure *)*sp++;
3086     }
3087     
3088     switch (get_itbl(su)->type) {
3089       
3090     case UPDATE_FRAME:
3091       {
3092         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3093         TICK_ALLOC_UP_THK(words+1,0);
3094         
3095         IF_DEBUG(scheduler,
3096                  fprintf(stderr,  "scheduler: Updating ");
3097                  printPtr((P_)su->updatee); 
3098                  fprintf(stderr,  " with ");
3099                  printObj((StgClosure *)ap);
3100                  );
3101         
3102         /* Replace the updatee with an indirection - happily
3103          * this will also wake up any threads currently
3104          * waiting on the result.
3105          *
3106          * Warning: if we're in a loop, more than one update frame on
3107          * the stack may point to the same object.  Be careful not to
3108          * overwrite an IND_OLDGEN in this case, because we'll screw
3109          * up the mutable lists.  To be on the safe side, don't
3110          * overwrite any kind of indirection at all.  See also
3111          * threadSqueezeStack in GC.c, where we have to make a similar
3112          * check.
3113          */
3114         if (!closure_IND(su->updatee)) {
3115             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3116         }
3117         su = su->link;
3118         sp += sizeofW(StgUpdateFrame) -1;
3119         sp[0] = (W_)ap; /* push onto stack */
3120         break;
3121       }
3122
3123     case CATCH_FRAME:
3124       {
3125         StgCatchFrame *cf = (StgCatchFrame *)su;
3126         StgClosure* o;
3127         
3128         /* We want a PAP, not an AP_UPD.  Fortunately, the
3129          * layout's the same.
3130          */
3131         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3132         TICK_ALLOC_UPD_PAP(words+1,0);
3133         
3134         /* now build o = FUN(catch,ap,handler) */
3135         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3136         TICK_ALLOC_FUN(2,0);
3137         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3138         o->payload[0] = (StgClosure *)ap;
3139         o->payload[1] = cf->handler;
3140         
3141         IF_DEBUG(scheduler,
3142                  fprintf(stderr,  "scheduler: Built ");
3143                  printObj((StgClosure *)o);
3144                  );
3145         
3146         /* pop the old handler and put o on the stack */
3147         su = cf->link;
3148         sp += sizeofW(StgCatchFrame) - 1;
3149         sp[0] = (W_)o;
3150         break;
3151       }
3152       
3153     case SEQ_FRAME:
3154       {
3155         StgSeqFrame *sf = (StgSeqFrame *)su;
3156         StgClosure* o;
3157         
3158         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3159         TICK_ALLOC_UPD_PAP(words+1,0);
3160         
3161         /* now build o = FUN(seq,ap) */
3162         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3163         TICK_ALLOC_SE_THK(1,0);
3164         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3165         o->payload[0] = (StgClosure *)ap;
3166         
3167         IF_DEBUG(scheduler,
3168                  fprintf(stderr,  "scheduler: Built ");
3169                  printObj((StgClosure *)o);
3170                  );
3171         
3172         /* pop the old handler and put o on the stack */
3173         su = sf->link;
3174         sp += sizeofW(StgSeqFrame) - 1;
3175         sp[0] = (W_)o;
3176         break;
3177       }
3178       
3179     case STOP_FRAME:
3180       /* We've stripped the entire stack, the thread is now dead. */
3181       sp += sizeofW(StgStopFrame) - 1;
3182       sp[0] = (W_)exception;    /* save the exception */
3183       tso->what_next = ThreadKilled;
3184       tso->su = (StgUpdateFrame *)(sp+1);
3185       tso->sp = sp;
3186       return;
3187
3188     default:
3189       barf("raiseAsync");
3190     }
3191   }
3192   barf("raiseAsync");
3193 }
3194
3195 /* -----------------------------------------------------------------------------
3196    resurrectThreads is called after garbage collection on the list of
3197    threads found to be garbage.  Each of these threads will be woken
3198    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3199    on an MVar, or NonTermination if the thread was blocked on a Black
3200    Hole.
3201    -------------------------------------------------------------------------- */
3202
3203 void
3204 resurrectThreads( StgTSO *threads )
3205 {
3206   StgTSO *tso, *next;
3207
3208   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3209     next = tso->global_link;
3210     tso->global_link = all_threads;
3211     all_threads = tso;
3212     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3213
3214     switch (tso->why_blocked) {
3215     case BlockedOnMVar:
3216     case BlockedOnException:
3217       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3218       break;
3219     case BlockedOnBlackHole:
3220       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3221       break;
3222     case NotBlocked:
3223       /* This might happen if the thread was blocked on a black hole
3224        * belonging to a thread that we've just woken up (raiseAsync
3225        * can wake up threads, remember...).
3226        */
3227       continue;
3228     default:
3229       barf("resurrectThreads: thread blocked in a strange way");
3230     }
3231   }
3232 }
3233
3234 /* -----------------------------------------------------------------------------
3235  * Blackhole detection: if we reach a deadlock, test whether any
3236  * threads are blocked on themselves.  Any threads which are found to
3237  * be self-blocked get sent a NonTermination exception.
3238  *
3239  * This is only done in a deadlock situation in order to avoid
3240  * performance overhead in the normal case.
3241  * -------------------------------------------------------------------------- */
3242
3243 static void
3244 detectBlackHoles( void )
3245 {
3246     StgTSO *t = all_threads;
3247     StgUpdateFrame *frame;
3248     StgClosure *blocked_on;
3249
3250     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3251
3252         while (t->what_next == ThreadRelocated) {
3253             t = t->link;
3254             ASSERT(get_itbl(t)->type == TSO);
3255         }
3256       
3257         if (t->why_blocked != BlockedOnBlackHole) {
3258             continue;
3259         }
3260
3261         blocked_on = t->block_info.closure;
3262
3263         for (frame = t->su; ; frame = frame->link) {
3264             switch (get_itbl(frame)->type) {
3265
3266             case UPDATE_FRAME:
3267                 if (frame->updatee == blocked_on) {
3268                     /* We are blocking on one of our own computations, so
3269                      * send this thread the NonTermination exception.  
3270                      */
3271                     IF_DEBUG(scheduler, 
3272                              sched_belch("thread %d is blocked on itself", t->id));
3273                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3274                     goto done;
3275                 }
3276                 else {
3277                     continue;
3278                 }
3279
3280             case CATCH_FRAME:
3281             case SEQ_FRAME:
3282                 continue;
3283                 
3284             case STOP_FRAME:
3285                 break;
3286             }
3287             break;
3288         }
3289
3290     done: ;
3291     }   
3292 }
3293
3294 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3295 //@subsection Debugging Routines
3296
3297 /* -----------------------------------------------------------------------------
3298    Debugging: why is a thread blocked
3299    -------------------------------------------------------------------------- */
3300
3301 #ifdef DEBUG
3302
3303 void
3304 printThreadBlockage(StgTSO *tso)
3305 {
3306   switch (tso->why_blocked) {
3307   case BlockedOnRead:
3308     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3309     break;
3310   case BlockedOnWrite:
3311     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3312     break;
3313   case BlockedOnDelay:
3314     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3315     break;
3316   case BlockedOnMVar:
3317     fprintf(stderr,"is blocked on an MVar");
3318     break;
3319   case BlockedOnException:
3320     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3321             tso->block_info.tso->id);
3322     break;
3323   case BlockedOnBlackHole:
3324     fprintf(stderr,"is blocked on a black hole");
3325     break;
3326   case NotBlocked:
3327     fprintf(stderr,"is not blocked");
3328     break;
3329 #if defined(PAR)
3330   case BlockedOnGA:
3331     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3332             tso->block_info.closure, info_type(tso->block_info.closure));
3333     break;
3334   case BlockedOnGA_NoSend:
3335     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3336             tso->block_info.closure, info_type(tso->block_info.closure));
3337     break;
3338 #endif
3339   default:
3340     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3341          tso->why_blocked, tso->id, tso);
3342   }
3343 }
3344
3345 void
3346 printThreadStatus(StgTSO *tso)
3347 {
3348   switch (tso->what_next) {
3349   case ThreadKilled:
3350     fprintf(stderr,"has been killed");
3351     break;
3352   case ThreadComplete:
3353     fprintf(stderr,"has completed");
3354     break;
3355   default:
3356     printThreadBlockage(tso);
3357   }
3358 }
3359
3360 void
3361 printAllThreads(void)
3362 {
3363   StgTSO *t;
3364
3365 # if defined(GRAN)
3366   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3367   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3368                        time_string, rtsFalse/*no commas!*/);
3369
3370   sched_belch("all threads at [%s]:", time_string);
3371 # elif defined(PAR)
3372   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3373   ullong_format_string(CURRENT_TIME,
3374                        time_string, rtsFalse/*no commas!*/);
3375
3376   sched_belch("all threads at [%s]:", time_string);
3377 # else
3378   sched_belch("all threads:");
3379 # endif
3380
3381   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3382     fprintf(stderr, "\tthread %d ", t->id);
3383     printThreadStatus(t);
3384     fprintf(stderr,"\n");
3385   }
3386 }
3387     
3388 /* 
3389    Print a whole blocking queue attached to node (debugging only).
3390 */
3391 //@cindex print_bq
3392 # if defined(PAR)
3393 void 
3394 print_bq (StgClosure *node)
3395 {
3396   StgBlockingQueueElement *bqe;
3397   StgTSO *tso;
3398   rtsBool end;
3399
3400   fprintf(stderr,"## BQ of closure %p (%s): ",
3401           node, info_type(node));
3402
3403   /* should cover all closures that may have a blocking queue */
3404   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3405          get_itbl(node)->type == FETCH_ME_BQ ||
3406          get_itbl(node)->type == RBH ||
3407          get_itbl(node)->type == MVAR);
3408     
3409   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3410
3411   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3412 }
3413
3414 /* 
3415    Print a whole blocking queue starting with the element bqe.
3416 */
3417 void 
3418 print_bqe (StgBlockingQueueElement *bqe)
3419 {
3420   rtsBool end;
3421
3422   /* 
3423      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3424   */
3425   for (end = (bqe==END_BQ_QUEUE);
3426        !end; // iterate until bqe points to a CONSTR
3427        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3428        bqe = end ? END_BQ_QUEUE : bqe->link) {
3429     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3430     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3431     /* types of closures that may appear in a blocking queue */
3432     ASSERT(get_itbl(bqe)->type == TSO ||           
3433            get_itbl(bqe)->type == BLOCKED_FETCH || 
3434            get_itbl(bqe)->type == CONSTR); 
3435     /* only BQs of an RBH end with an RBH_Save closure */
3436     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3437
3438     switch (get_itbl(bqe)->type) {
3439     case TSO:
3440       fprintf(stderr," TSO %u (%x),",
3441               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3442       break;
3443     case BLOCKED_FETCH:
3444       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3445               ((StgBlockedFetch *)bqe)->node, 
3446               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3447               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3448               ((StgBlockedFetch *)bqe)->ga.weight);
3449       break;
3450     case CONSTR:
3451       fprintf(stderr," %s (IP %p),",
3452               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3453                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3454                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3455                "RBH_Save_?"), get_itbl(bqe));
3456       break;
3457     default:
3458       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3459            info_type((StgClosure *)bqe)); // , node, info_type(node));
3460       break;
3461     }
3462   } /* for */
3463   fputc('\n', stderr);
3464 }
3465 # elif defined(GRAN)
3466 void 
3467 print_bq (StgClosure *node)
3468 {
3469   StgBlockingQueueElement *bqe;
3470   PEs node_loc, tso_loc;
3471   rtsBool end;
3472
3473   /* should cover all closures that may have a blocking queue */
3474   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3475          get_itbl(node)->type == FETCH_ME_BQ ||
3476          get_itbl(node)->type == RBH);
3477     
3478   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3479   node_loc = where_is(node);
3480
3481   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3482           node, info_type(node), node_loc);
3483
3484   /* 
3485      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3486   */
3487   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3488        !end; // iterate until bqe points to a CONSTR
3489        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3490     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3491     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3492     /* types of closures that may appear in a blocking queue */
3493     ASSERT(get_itbl(bqe)->type == TSO ||           
3494            get_itbl(bqe)->type == CONSTR); 
3495     /* only BQs of an RBH end with an RBH_Save closure */
3496     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3497
3498     tso_loc = where_is((StgClosure *)bqe);
3499     switch (get_itbl(bqe)->type) {
3500     case TSO:
3501       fprintf(stderr," TSO %d (%p) on [PE %d],",
3502               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3503       break;
3504     case CONSTR:
3505       fprintf(stderr," %s (IP %p),",
3506               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3507                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3508                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3509                "RBH_Save_?"), get_itbl(bqe));
3510       break;
3511     default:
3512       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3513            info_type((StgClosure *)bqe), node, info_type(node));
3514       break;
3515     }
3516   } /* for */
3517   fputc('\n', stderr);
3518 }
3519 #else
3520 /* 
3521    Nice and easy: only TSOs on the blocking queue
3522 */
3523 void 
3524 print_bq (StgClosure *node)
3525 {
3526   StgTSO *tso;
3527
3528   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3529   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3530        tso != END_TSO_QUEUE; 
3531        tso=tso->link) {
3532     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3533     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3534     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3535   }
3536   fputc('\n', stderr);
3537 }
3538 # endif
3539
3540 #if defined(PAR)
3541 static nat
3542 run_queue_len(void)
3543 {
3544   nat i;
3545   StgTSO *tso;
3546
3547   for (i=0, tso=run_queue_hd; 
3548        tso != END_TSO_QUEUE;
3549        i++, tso=tso->link)
3550     /* nothing */
3551
3552   return i;
3553 }
3554 #endif
3555
3556 static void
3557 sched_belch(char *s, ...)
3558 {
3559   va_list ap;
3560   va_start(ap,s);
3561 #ifdef SMP
3562   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3563 #elif defined(PAR)
3564   fprintf(stderr, "== ");
3565 #else
3566   fprintf(stderr, "scheduler: ");
3567 #endif
3568   vfprintf(stderr, s, ap);
3569   fprintf(stderr, "\n");
3570 }
3571
3572 #endif /* DEBUG */
3573
3574
3575 //@node Index,  , Debugging Routines, Main scheduling code
3576 //@subsection Index
3577
3578 //@index
3579 //* MainRegTable::  @cindex\s-+MainRegTable
3580 //* StgMainThread::  @cindex\s-+StgMainThread
3581 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3582 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3583 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3584 //* context_switch::  @cindex\s-+context_switch
3585 //* createThread::  @cindex\s-+createThread
3586 //* free_capabilities::  @cindex\s-+free_capabilities
3587 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3588 //* initScheduler::  @cindex\s-+initScheduler
3589 //* interrupted::  @cindex\s-+interrupted
3590 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3591 //* next_thread_id::  @cindex\s-+next_thread_id
3592 //* print_bq::  @cindex\s-+print_bq
3593 //* run_queue_hd::  @cindex\s-+run_queue_hd
3594 //* run_queue_tl::  @cindex\s-+run_queue_tl
3595 //* sched_mutex::  @cindex\s-+sched_mutex
3596 //* schedule::  @cindex\s-+schedule
3597 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3598 //* task_ids::  @cindex\s-+task_ids
3599 //* term_mutex::  @cindex\s-+term_mutex
3600 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3601 //@end index