b2fb90ad0ed8d0408f695dc77f7a8dbdcab72f6b
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.102 2001/10/23 11:28:51 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "PosixSource.h"
78 #include "Rts.h"
79 #include "SchedAPI.h"
80 #include "RtsUtils.h"
81 #include "RtsFlags.h"
82 #include "Storage.h"
83 #include "StgRun.h"
84 #include "StgStartup.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #if defined(GRAN) || defined(PAR)
99 # include "GranSimRts.h"
100 # include "GranSim.h"
101 # include "ParallelRts.h"
102 # include "Parallel.h"
103 # include "ParallelDebug.h"
104 # include "FetchMe.h"
105 # include "HLC.h"
106 #endif
107 #include "Sparks.h"
108
109 #include <stdarg.h>
110
111 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
112 //@subsection Variables and Data structures
113
114 /* Main threads:
115  *
116  * These are the threads which clients have requested that we run.  
117  *
118  * In an SMP build, we might have several concurrent clients all
119  * waiting for results, and each one will wait on a condition variable
120  * until the result is available.
121  *
122  * In non-SMP, clients are strictly nested: the first client calls
123  * into the RTS, which might call out again to C with a _ccall_GC, and
124  * eventually re-enter the RTS.
125  *
126  * Main threads information is kept in a linked list:
127  */
128 //@cindex StgMainThread
129 typedef struct StgMainThread_ {
130   StgTSO *         tso;
131   SchedulerStatus  stat;
132   StgClosure **    ret;
133 #ifdef SMP
134   pthread_cond_t wakeup;
135 #endif
136   struct StgMainThread_ *link;
137 } StgMainThread;
138
139 /* Main thread queue.
140  * Locks required: sched_mutex.
141  */
142 static StgMainThread *main_threads;
143
144 /* Thread queues.
145  * Locks required: sched_mutex.
146  */
147 #if defined(GRAN)
148
149 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
150 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
151
152 /* 
153    In GranSim we have a runable and a blocked queue for each processor.
154    In order to minimise code changes new arrays run_queue_hds/tls
155    are created. run_queue_hd is then a short cut (macro) for
156    run_queue_hds[CurrentProc] (see GranSim.h).
157    -- HWL
158 */
159 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
160 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
161 StgTSO *ccalling_threadss[MAX_PROC];
162 /* We use the same global list of threads (all_threads) in GranSim as in
163    the std RTS (i.e. we are cheating). However, we don't use this list in
164    the GranSim specific code at the moment (so we are only potentially
165    cheating).  */
166
167 #else /* !GRAN */
168
169 StgTSO *run_queue_hd, *run_queue_tl;
170 StgTSO *blocked_queue_hd, *blocked_queue_tl;
171 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
172
173 #endif
174
175 /* Linked list of all threads.
176  * Used for detecting garbage collected threads.
177  */
178 StgTSO *all_threads;
179
180 /* Threads suspended in _ccall_GC.
181  */
182 static StgTSO *suspended_ccalling_threads;
183
184 static void GetRoots(evac_fn);
185 static StgTSO *threadStackOverflow(StgTSO *tso);
186
187 /* KH: The following two flags are shared memory locations.  There is no need
188        to lock them, since they are only unset at the end of a scheduler
189        operation.
190 */
191
192 /* flag set by signal handler to precipitate a context switch */
193 //@cindex context_switch
194 nat context_switch;
195
196 /* if this flag is set as well, give up execution */
197 //@cindex interrupted
198 rtsBool interrupted;
199
200 /* Next thread ID to allocate.
201  * Locks required: sched_mutex
202  */
203 //@cindex next_thread_id
204 StgThreadID next_thread_id = 1;
205
206 /*
207  * Pointers to the state of the current thread.
208  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
209  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
210  */
211  
212 /* The smallest stack size that makes any sense is:
213  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
214  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
215  *  + 1                       (the realworld token for an IO thread)
216  *  + 1                       (the closure to enter)
217  *
218  * A thread with this stack will bomb immediately with a stack
219  * overflow, which will increase its stack size.  
220  */
221
222 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
223
224 /* Free capability list.
225  * Locks required: sched_mutex.
226  */
227 #ifdef SMP
228 //@cindex free_capabilities
229 //@cindex n_free_capabilities
230 Capability *free_capabilities; /* Available capabilities for running threads */
231 nat n_free_capabilities;       /* total number of available capabilities */
232 #else
233 //@cindex MainRegTable
234 Capability MainRegTable;       /* for non-SMP, we have one global capability */
235 #endif
236
237 #if defined(GRAN)
238 StgTSO *CurrentTSO;
239 #endif
240
241 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
242  *  exists - earlier gccs apparently didn't.
243  *  -= chak
244  */
245 StgTSO dummy_tso;
246
247 rtsBool ready_to_gc;
248
249 /* All our current task ids, saved in case we need to kill them later.
250  */
251 #ifdef SMP
252 //@cindex task_ids
253 task_info *task_ids;
254 #endif
255
256 void            addToBlockedQueue ( StgTSO *tso );
257
258 static void     schedule          ( void );
259        void     interruptStgRts   ( void );
260 #if defined(GRAN)
261 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
262 #else
263 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
264 #endif
265
266 static void     detectBlackHoles  ( void );
267
268 #ifdef DEBUG
269 static void sched_belch(char *s, ...);
270 #endif
271
272 #ifdef SMP
273 //@cindex sched_mutex
274 //@cindex term_mutex
275 //@cindex thread_ready_cond
276 //@cindex gc_pending_cond
277 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
278 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
279 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
280 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
281
282 nat await_death;
283 #endif
284
285 #if defined(PAR)
286 StgTSO *LastTSO;
287 rtsTime TimeOfLastYield;
288 rtsBool emitSchedule = rtsTrue;
289 #endif
290
291 #if DEBUG
292 char *whatNext_strs[] = {
293   "ThreadEnterGHC",
294   "ThreadRunGHC",
295   "ThreadEnterInterp",
296   "ThreadKilled",
297   "ThreadComplete"
298 };
299
300 char *threadReturnCode_strs[] = {
301   "HeapOverflow",                       /* might also be StackOverflow */
302   "StackOverflow",
303   "ThreadYielding",
304   "ThreadBlocked",
305   "ThreadFinished"
306 };
307 #endif
308
309 #ifdef PAR
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);  
312 #endif
313
314 /*
315  * The thread state for the main thread.
316 // ToDo: check whether not needed any more
317 StgTSO   *MainTSO;
318  */
319
320 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
321 //@subsection Main scheduling loop
322
323 /* ---------------------------------------------------------------------------
324    Main scheduling loop.
325
326    We use round-robin scheduling, each thread returning to the
327    scheduler loop when one of these conditions is detected:
328
329       * out of heap space
330       * timer expires (thread yields)
331       * thread blocks
332       * thread ends
333       * stack overflow
334
335    Locking notes:  we acquire the scheduler lock once at the beginning
336    of the scheduler loop, and release it when
337     
338       * running a thread, or
339       * waiting for work, or
340       * waiting for a GC to complete.
341
342    GRAN version:
343      In a GranSim setup this loop iterates over the global event queue.
344      This revolves around the global event queue, which determines what 
345      to do next. Therefore, it's more complicated than either the 
346      concurrent or the parallel (GUM) setup.
347
348    GUM version:
349      GUM iterates over incoming messages.
350      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
351      and sends out a fish whenever it has nothing to do; in-between
352      doing the actual reductions (shared code below) it processes the
353      incoming messages and deals with delayed operations 
354      (see PendingFetches).
355      This is not the ugliest code you could imagine, but it's bloody close.
356
357    ------------------------------------------------------------------------ */
358 //@cindex schedule
359 static void
360 schedule( void )
361 {
362   StgTSO *t;
363   Capability *cap;
364   StgThreadReturnCode ret;
365 #if defined(GRAN)
366   rtsEvent *event;
367 #elif defined(PAR)
368   StgSparkPool *pool;
369   rtsSpark spark;
370   StgTSO *tso;
371   GlobalTaskId pe;
372   rtsBool receivedFinish = rtsFalse;
373 # if defined(DEBUG)
374   nat tp_size, sp_size; // stats only
375 # endif
376 #endif
377   rtsBool was_interrupted = rtsFalse;
378   
379   ACQUIRE_LOCK(&sched_mutex);
380
381 #if defined(GRAN)
382
383   /* set up first event to get things going */
384   /* ToDo: assign costs for system setup and init MainTSO ! */
385   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
386             ContinueThread, 
387             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
388
389   IF_DEBUG(gran,
390            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391            G_TSO(CurrentTSO, 5));
392
393   if (RtsFlags.GranFlags.Light) {
394     /* Save current time; GranSim Light only */
395     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
396   }      
397
398   event = get_next_event();
399
400   while (event!=(rtsEvent*)NULL) {
401     /* Choose the processor with the next event */
402     CurrentProc = event->proc;
403     CurrentTSO = event->tso;
404
405 #elif defined(PAR)
406
407   while (!receivedFinish) {    /* set by processMessages */
408                                /* when receiving PP_FINISH message         */ 
409 #else
410
411   while (1) {
412
413 #endif
414
415     IF_DEBUG(scheduler, printAllThreads());
416
417     /* If we're interrupted (the user pressed ^C, or some other
418      * termination condition occurred), kill all the currently running
419      * threads.
420      */
421     if (interrupted) {
422       IF_DEBUG(scheduler, sched_belch("interrupted"));
423       deleteAllThreads();
424       interrupted = rtsFalse;
425       was_interrupted = rtsTrue;
426     }
427
428     /* Go through the list of main threads and wake up any
429      * clients whose computations have finished.  ToDo: this
430      * should be done more efficiently without a linear scan
431      * of the main threads list, somehow...
432      */
433 #ifdef SMP
434     { 
435       StgMainThread *m, **prev;
436       prev = &main_threads;
437       for (m = main_threads; m != NULL; m = m->link) {
438         switch (m->tso->what_next) {
439         case ThreadComplete:
440           if (m->ret) {
441             *(m->ret) = (StgClosure *)m->tso->sp[0];
442           }
443           *prev = m->link;
444           m->stat = Success;
445           pthread_cond_broadcast(&m->wakeup);
446           break;
447         case ThreadKilled:
448           if (m->ret) *(m->ret) = NULL;
449           *prev = m->link;
450           if (was_interrupted) {
451             m->stat = Interrupted;
452           } else {
453             m->stat = Killed;
454           }
455           pthread_cond_broadcast(&m->wakeup);
456           break;
457         default:
458           break;
459         }
460       }
461     }
462
463 #else
464 # if defined(PAR)
465     /* in GUM do this only on the Main PE */
466     if (IAmMainThread)
467 # endif
468     /* If our main thread has finished or been killed, return.
469      */
470     {
471       StgMainThread *m = main_threads;
472       if (m->tso->what_next == ThreadComplete
473           || m->tso->what_next == ThreadKilled) {
474         main_threads = main_threads->link;
475         if (m->tso->what_next == ThreadComplete) {
476           /* we finished successfully, fill in the return value */
477           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
478           m->stat = Success;
479           return;
480         } else {
481           if (m->ret) { *(m->ret) = NULL; };
482           if (was_interrupted) {
483             m->stat = Interrupted;
484           } else {
485             m->stat = Killed;
486           }
487           return;
488         }
489       }
490     }
491 #endif
492
493     /* Top up the run queue from our spark pool.  We try to make the
494      * number of threads in the run queue equal to the number of
495      * free capabilities.
496      */
497 #if defined(SMP)
498     {
499       nat n = n_free_capabilities;
500       StgTSO *tso = run_queue_hd;
501
502       /* Count the run queue */
503       while (n > 0 && tso != END_TSO_QUEUE) {
504         tso = tso->link;
505         n--;
506       }
507
508       for (; n > 0; n--) {
509         StgClosure *spark;
510         spark = findSpark(rtsFalse);
511         if (spark == NULL) {
512           break; /* no more sparks in the pool */
513         } else {
514           /* I'd prefer this to be done in activateSpark -- HWL */
515           /* tricky - it needs to hold the scheduler lock and
516            * not try to re-acquire it -- SDM */
517           createSparkThread(spark);       
518           IF_DEBUG(scheduler,
519                    sched_belch("==^^ turning spark of closure %p into a thread",
520                                (StgClosure *)spark));
521         }
522       }
523       /* We need to wake up the other tasks if we just created some
524        * work for them.
525        */
526       if (n_free_capabilities - n > 1) {
527           pthread_cond_signal(&thread_ready_cond);
528       }
529     }
530 #endif /* SMP */
531
532     /* Check whether any waiting threads need to be woken up.  If the
533      * run queue is empty, and there are no other tasks running, we
534      * can wait indefinitely for something to happen.
535      * ToDo: what if another client comes along & requests another
536      * main thread?
537      */
538     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
539       awaitEvent(
540            (run_queue_hd == END_TSO_QUEUE)
541 #ifdef SMP
542         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
543 #endif
544         );
545     }
546     /* we can be interrupted while waiting for I/O... */
547     if (interrupted) continue;
548
549     /* check for signals each time around the scheduler */
550 #ifndef mingw32_TARGET_OS
551     if (signals_pending()) {
552       start_signal_handlers();
553     }
554 #endif
555
556     /* 
557      * Detect deadlock: when we have no threads to run, there are no
558      * threads waiting on I/O or sleeping, and all the other tasks are
559      * waiting for work, we must have a deadlock of some description.
560      *
561      * We first try to find threads blocked on themselves (ie. black
562      * holes), and generate NonTermination exceptions where necessary.
563      *
564      * If no threads are black holed, we have a deadlock situation, so
565      * inform all the main threads.
566      */
567 #ifndef PAR
568     if (blocked_queue_hd == END_TSO_QUEUE
569         && run_queue_hd == END_TSO_QUEUE
570         && sleeping_queue == END_TSO_QUEUE
571 #ifdef SMP
572         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
573 #endif
574         )
575     {
576         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
577         GarbageCollect(GetRoots,rtsTrue);
578         if (blocked_queue_hd == END_TSO_QUEUE
579             && run_queue_hd == END_TSO_QUEUE
580             && sleeping_queue == END_TSO_QUEUE) {
581             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
582             detectBlackHoles();
583             if (run_queue_hd == END_TSO_QUEUE) {
584                 StgMainThread *m = main_threads;
585 #ifdef SMP
586                 for (; m != NULL; m = m->link) {
587                     deleteThread(m->tso);
588                     m->ret = NULL;
589                     m->stat = Deadlock;
590                     pthread_cond_broadcast(&m->wakeup);
591                 }
592                 main_threads = NULL;
593 #else
594                 deleteThread(m->tso);
595                 m->ret = NULL;
596                 m->stat = Deadlock;
597                 main_threads = m->link;
598                 return;
599 #endif
600             }
601         }
602     }
603 #elif defined(PAR)
604     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
605 #endif
606
607 #ifdef SMP
608     /* If there's a GC pending, don't do anything until it has
609      * completed.
610      */
611     if (ready_to_gc) {
612       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
613       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
614     }
615     
616     /* block until we've got a thread on the run queue and a free
617      * capability.
618      */
619     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
620       IF_DEBUG(scheduler, sched_belch("waiting for work"));
621       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
622       IF_DEBUG(scheduler, sched_belch("work now available"));
623     }
624 #endif
625
626 #if defined(GRAN)
627
628     if (RtsFlags.GranFlags.Light)
629       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
630
631     /* adjust time based on time-stamp */
632     if (event->time > CurrentTime[CurrentProc] &&
633         event->evttype != ContinueThread)
634       CurrentTime[CurrentProc] = event->time;
635     
636     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
637     if (!RtsFlags.GranFlags.Light)
638       handleIdlePEs();
639
640     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
641
642     /* main event dispatcher in GranSim */
643     switch (event->evttype) {
644       /* Should just be continuing execution */
645     case ContinueThread:
646       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
647       /* ToDo: check assertion
648       ASSERT(run_queue_hd != (StgTSO*)NULL &&
649              run_queue_hd != END_TSO_QUEUE);
650       */
651       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
652       if (!RtsFlags.GranFlags.DoAsyncFetch &&
653           procStatus[CurrentProc]==Fetching) {
654         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
655               CurrentTSO->id, CurrentTSO, CurrentProc);
656         goto next_thread;
657       } 
658       /* Ignore ContinueThreads for completed threads */
659       if (CurrentTSO->what_next == ThreadComplete) {
660         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
661               CurrentTSO->id, CurrentTSO, CurrentProc);
662         goto next_thread;
663       } 
664       /* Ignore ContinueThreads for threads that are being migrated */
665       if (PROCS(CurrentTSO)==Nowhere) { 
666         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
667               CurrentTSO->id, CurrentTSO, CurrentProc);
668         goto next_thread;
669       }
670       /* The thread should be at the beginning of the run queue */
671       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
672         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
673               CurrentTSO->id, CurrentTSO, CurrentProc);
674         break; // run the thread anyway
675       }
676       /*
677       new_event(proc, proc, CurrentTime[proc],
678                 FindWork,
679                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
680       goto next_thread; 
681       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
682       break; // now actually run the thread; DaH Qu'vam yImuHbej 
683
684     case FetchNode:
685       do_the_fetchnode(event);
686       goto next_thread;             /* handle next event in event queue  */
687       
688     case GlobalBlock:
689       do_the_globalblock(event);
690       goto next_thread;             /* handle next event in event queue  */
691       
692     case FetchReply:
693       do_the_fetchreply(event);
694       goto next_thread;             /* handle next event in event queue  */
695       
696     case UnblockThread:   /* Move from the blocked queue to the tail of */
697       do_the_unblock(event);
698       goto next_thread;             /* handle next event in event queue  */
699       
700     case ResumeThread:  /* Move from the blocked queue to the tail of */
701       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
702       event->tso->gran.blocktime += 
703         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
704       do_the_startthread(event);
705       goto next_thread;             /* handle next event in event queue  */
706       
707     case StartThread:
708       do_the_startthread(event);
709       goto next_thread;             /* handle next event in event queue  */
710       
711     case MoveThread:
712       do_the_movethread(event);
713       goto next_thread;             /* handle next event in event queue  */
714       
715     case MoveSpark:
716       do_the_movespark(event);
717       goto next_thread;             /* handle next event in event queue  */
718       
719     case FindWork:
720       do_the_findwork(event);
721       goto next_thread;             /* handle next event in event queue  */
722       
723     default:
724       barf("Illegal event type %u\n", event->evttype);
725     }  /* switch */
726     
727     /* This point was scheduler_loop in the old RTS */
728
729     IF_DEBUG(gran, belch("GRAN: after main switch"));
730
731     TimeOfLastEvent = CurrentTime[CurrentProc];
732     TimeOfNextEvent = get_time_of_next_event();
733     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
734     // CurrentTSO = ThreadQueueHd;
735
736     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
737                          TimeOfNextEvent));
738
739     if (RtsFlags.GranFlags.Light) 
740       GranSimLight_leave_system(event, &ActiveTSO); 
741
742     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
743
744     IF_DEBUG(gran, 
745              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
746
747     /* in a GranSim setup the TSO stays on the run queue */
748     t = CurrentTSO;
749     /* Take a thread from the run queue. */
750     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
751
752     IF_DEBUG(gran, 
753              fprintf(stderr, "GRAN: About to run current thread, which is\n");
754              G_TSO(t,5));
755
756     context_switch = 0; // turned on via GranYield, checking events and time slice
757
758     IF_DEBUG(gran, 
759              DumpGranEvent(GR_SCHEDULE, t));
760
761     procStatus[CurrentProc] = Busy;
762
763 #elif defined(PAR)
764     if (PendingFetches != END_BF_QUEUE) {
765         processFetches();
766     }
767
768     /* ToDo: phps merge with spark activation above */
769     /* check whether we have local work and send requests if we have none */
770     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
771       /* :-[  no local threads => look out for local sparks */
772       /* the spark pool for the current PE */
773       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
774       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
775           pool->hd < pool->tl) {
776         /* 
777          * ToDo: add GC code check that we really have enough heap afterwards!!
778          * Old comment:
779          * If we're here (no runnable threads) and we have pending
780          * sparks, we must have a space problem.  Get enough space
781          * to turn one of those pending sparks into a
782          * thread... 
783          */
784
785         spark = findSpark(rtsFalse);                /* get a spark */
786         if (spark != (rtsSpark) NULL) {
787           tso = activateSpark(spark);       /* turn the spark into a thread */
788           IF_PAR_DEBUG(schedule,
789                        belch("==== schedule: Created TSO %d (%p); %d threads active",
790                              tso->id, tso, advisory_thread_count));
791
792           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
793             belch("==^^ failed to activate spark");
794             goto next_thread;
795           }               /* otherwise fall through & pick-up new tso */
796         } else {
797           IF_PAR_DEBUG(verbose,
798                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
799                              spark_queue_len(pool)));
800           goto next_thread;
801         }
802       }
803
804       /* If we still have no work we need to send a FISH to get a spark
805          from another PE 
806       */
807       if (EMPTY_RUN_QUEUE()) {
808       /* =8-[  no local sparks => look for work on other PEs */
809         /*
810          * We really have absolutely no work.  Send out a fish
811          * (there may be some out there already), and wait for
812          * something to arrive.  We clearly can't run any threads
813          * until a SCHEDULE or RESUME arrives, and so that's what
814          * we're hoping to see.  (Of course, we still have to
815          * respond to other types of messages.)
816          */
817         TIME now = msTime() /*CURRENT_TIME*/;
818         IF_PAR_DEBUG(verbose, 
819                      belch("--  now=%ld", now));
820         IF_PAR_DEBUG(verbose,
821                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
822                          (last_fish_arrived_at!=0 &&
823                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
824                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
825                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
826                              last_fish_arrived_at,
827                              RtsFlags.ParFlags.fishDelay, now);
828                      });
829         
830         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
831             (last_fish_arrived_at==0 ||
832              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
833           /* outstandingFishes is set in sendFish, processFish;
834              avoid flooding system with fishes via delay */
835           pe = choosePE();
836           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
837                    NEW_FISH_HUNGER);
838
839           // Global statistics: count no. of fishes
840           if (RtsFlags.ParFlags.ParStats.Global &&
841               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
842             globalParStats.tot_fish_mess++;
843           }
844         }
845       
846         receivedFinish = processMessages();
847         goto next_thread;
848       }
849     } else if (PacketsWaiting()) {  /* Look for incoming messages */
850       receivedFinish = processMessages();
851     }
852
853     /* Now we are sure that we have some work available */
854     ASSERT(run_queue_hd != END_TSO_QUEUE);
855
856     /* Take a thread from the run queue, if we have work */
857     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
858     IF_DEBUG(sanity,checkTSO(t));
859
860     /* ToDo: write something to the log-file
861     if (RTSflags.ParFlags.granSimStats && !sameThread)
862         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
863
864     CurrentTSO = t;
865     */
866     /* the spark pool for the current PE */
867     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
868
869     IF_DEBUG(scheduler, 
870              belch("--=^ %d threads, %d sparks on [%#x]", 
871                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
872
873 #if 1
874     if (0 && RtsFlags.ParFlags.ParStats.Full && 
875         t && LastTSO && t->id != LastTSO->id && 
876         LastTSO->why_blocked == NotBlocked && 
877         LastTSO->what_next != ThreadComplete) {
878       // if previously scheduled TSO not blocked we have to record the context switch
879       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
880                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
881     }
882
883     if (RtsFlags.ParFlags.ParStats.Full && 
884         (emitSchedule /* forced emit */ ||
885         (t && LastTSO && t->id != LastTSO->id))) {
886       /* 
887          we are running a different TSO, so write a schedule event to log file
888          NB: If we use fair scheduling we also have to write  a deschedule 
889              event for LastTSO; with unfair scheduling we know that the
890              previous tso has blocked whenever we switch to another tso, so
891              we don't need it in GUM for now
892       */
893       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
894                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
895       emitSchedule = rtsFalse;
896     }
897      
898 #endif
899 #else /* !GRAN && !PAR */
900   
901     /* grab a thread from the run queue
902      */
903     ASSERT(run_queue_hd != END_TSO_QUEUE);
904     t = POP_RUN_QUEUE();
905     IF_DEBUG(sanity,checkTSO(t));
906
907 #endif
908     
909     /* grab a capability
910      */
911 #ifdef SMP
912     cap = free_capabilities;
913     free_capabilities = cap->link;
914     n_free_capabilities--;
915 #else
916     cap = &MainRegTable;
917 #endif
918
919     cap->rCurrentTSO = t;
920     
921     /* context switches are now initiated by the timer signal, unless
922      * the user specified "context switch as often as possible", with
923      * +RTS -C0
924      */
925     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
926         && (run_queue_hd != END_TSO_QUEUE
927             || blocked_queue_hd != END_TSO_QUEUE
928             || sleeping_queue != END_TSO_QUEUE))
929         context_switch = 1;
930     else
931         context_switch = 0;
932
933     RELEASE_LOCK(&sched_mutex);
934
935     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
936                               t->id, t, whatNext_strs[t->what_next]));
937
938     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
939     /* Run the current thread 
940      */
941     switch (cap->rCurrentTSO->what_next) {
942     case ThreadKilled:
943     case ThreadComplete:
944         /* Thread already finished, return to scheduler. */
945         ret = ThreadFinished;
946         break;
947     case ThreadEnterGHC:
948         ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
949         break;
950     case ThreadRunGHC:
951         ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
952         break;
953     case ThreadEnterInterp:
954         ret = interpretBCO(cap);
955         break;
956     default:
957       barf("schedule: invalid what_next field");
958     }
959     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
960     
961     /* Costs for the scheduler are assigned to CCS_SYSTEM */
962 #ifdef PROFILING
963     CCCS = CCS_SYSTEM;
964 #endif
965     
966     ACQUIRE_LOCK(&sched_mutex);
967
968 #ifdef SMP
969     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
970 #elif !defined(GRAN) && !defined(PAR)
971     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
972 #endif
973     t = cap->rCurrentTSO;
974     
975 #if defined(PAR)
976     /* HACK 675: if the last thread didn't yield, make sure to print a 
977        SCHEDULE event to the log file when StgRunning the next thread, even
978        if it is the same one as before */
979     LastTSO = t; 
980     TimeOfLastYield = CURRENT_TIME;
981 #endif
982
983     switch (ret) {
984     case HeapOverflow:
985 #if defined(GRAN)
986       IF_DEBUG(gran, 
987                DumpGranEvent(GR_DESCHEDULE, t));
988       globalGranStats.tot_heapover++;
989 #elif defined(PAR)
990       // IF_DEBUG(par, 
991       //DumpGranEvent(GR_DESCHEDULE, t);
992       globalParStats.tot_heapover++;
993 #endif
994       /* make all the running tasks block on a condition variable,
995        * maybe set context_switch and wait till they all pile in,
996        * then have them wait on a GC condition variable.
997        */
998       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
999                                t->id, t, whatNext_strs[t->what_next]));
1000       threadPaused(t);
1001 #if defined(GRAN)
1002       ASSERT(!is_on_queue(t,CurrentProc));
1003 #elif defined(PAR)
1004       /* Currently we emit a DESCHEDULE event before GC in GUM.
1005          ToDo: either add separate event to distinguish SYSTEM time from rest
1006                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1007       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1008         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1009                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1010         emitSchedule = rtsTrue;
1011       }
1012 #endif
1013       
1014       ready_to_gc = rtsTrue;
1015       context_switch = 1;               /* stop other threads ASAP */
1016       PUSH_ON_RUN_QUEUE(t);
1017       /* actual GC is done at the end of the while loop */
1018       break;
1019       
1020     case StackOverflow:
1021 #if defined(GRAN)
1022       IF_DEBUG(gran, 
1023                DumpGranEvent(GR_DESCHEDULE, t));
1024       globalGranStats.tot_stackover++;
1025 #elif defined(PAR)
1026       // IF_DEBUG(par, 
1027       // DumpGranEvent(GR_DESCHEDULE, t);
1028       globalParStats.tot_stackover++;
1029 #endif
1030       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1031                                t->id, t, whatNext_strs[t->what_next]));
1032       /* just adjust the stack for this thread, then pop it back
1033        * on the run queue.
1034        */
1035       threadPaused(t);
1036       { 
1037         StgMainThread *m;
1038         /* enlarge the stack */
1039         StgTSO *new_t = threadStackOverflow(t);
1040         
1041         /* This TSO has moved, so update any pointers to it from the
1042          * main thread stack.  It better not be on any other queues...
1043          * (it shouldn't be).
1044          */
1045         for (m = main_threads; m != NULL; m = m->link) {
1046           if (m->tso == t) {
1047             m->tso = new_t;
1048           }
1049         }
1050         threadPaused(new_t);
1051         PUSH_ON_RUN_QUEUE(new_t);
1052       }
1053       break;
1054
1055     case ThreadYielding:
1056 #if defined(GRAN)
1057       IF_DEBUG(gran, 
1058                DumpGranEvent(GR_DESCHEDULE, t));
1059       globalGranStats.tot_yields++;
1060 #elif defined(PAR)
1061       // IF_DEBUG(par, 
1062       // DumpGranEvent(GR_DESCHEDULE, t);
1063       globalParStats.tot_yields++;
1064 #endif
1065       /* put the thread back on the run queue.  Then, if we're ready to
1066        * GC, check whether this is the last task to stop.  If so, wake
1067        * up the GC thread.  getThread will block during a GC until the
1068        * GC is finished.
1069        */
1070       IF_DEBUG(scheduler,
1071                if (t->what_next == ThreadEnterInterp) {
1072                    /* ToDo: or maybe a timer expired when we were in Hugs?
1073                     * or maybe someone hit ctrl-C
1074                     */
1075                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1076                          t->id, t, whatNext_strs[t->what_next]);
1077                } else {
1078                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1079                          t->id, t, whatNext_strs[t->what_next]);
1080                }
1081                );
1082
1083       threadPaused(t);
1084
1085       IF_DEBUG(sanity,
1086                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1087                checkTSO(t));
1088       ASSERT(t->link == END_TSO_QUEUE);
1089 #if defined(GRAN)
1090       ASSERT(!is_on_queue(t,CurrentProc));
1091
1092       IF_DEBUG(sanity,
1093                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1094                checkThreadQsSanity(rtsTrue));
1095 #endif
1096 #if defined(PAR)
1097       if (RtsFlags.ParFlags.doFairScheduling) { 
1098         /* this does round-robin scheduling; good for concurrency */
1099         APPEND_TO_RUN_QUEUE(t);
1100       } else {
1101         /* this does unfair scheduling; good for parallelism */
1102         PUSH_ON_RUN_QUEUE(t);
1103       }
1104 #else
1105       /* this does round-robin scheduling; good for concurrency */
1106       APPEND_TO_RUN_QUEUE(t);
1107 #endif
1108 #if defined(GRAN)
1109       /* add a ContinueThread event to actually process the thread */
1110       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1111                 ContinueThread,
1112                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1113       IF_GRAN_DEBUG(bq, 
1114                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1115                G_EVENTQ(0);
1116                G_CURR_THREADQ(0));
1117 #endif /* GRAN */
1118       break;
1119       
1120     case ThreadBlocked:
1121 #if defined(GRAN)
1122       IF_DEBUG(scheduler,
1123                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1124                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1125                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1126
1127       // ??? needed; should emit block before
1128       IF_DEBUG(gran, 
1129                DumpGranEvent(GR_DESCHEDULE, t)); 
1130       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1131       /*
1132         ngoq Dogh!
1133       ASSERT(procStatus[CurrentProc]==Busy || 
1134               ((procStatus[CurrentProc]==Fetching) && 
1135               (t->block_info.closure!=(StgClosure*)NULL)));
1136       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1137           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1138             procStatus[CurrentProc]==Fetching)) 
1139         procStatus[CurrentProc] = Idle;
1140       */
1141 #elif defined(PAR)
1142       IF_DEBUG(scheduler,
1143                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1144                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1145       IF_PAR_DEBUG(bq,
1146
1147                    if (t->block_info.closure!=(StgClosure*)NULL) 
1148                      print_bq(t->block_info.closure));
1149
1150       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1151       blockThread(t);
1152
1153       /* whatever we schedule next, we must log that schedule */
1154       emitSchedule = rtsTrue;
1155
1156 #else /* !GRAN */
1157       /* don't need to do anything.  Either the thread is blocked on
1158        * I/O, in which case we'll have called addToBlockedQueue
1159        * previously, or it's blocked on an MVar or Blackhole, in which
1160        * case it'll be on the relevant queue already.
1161        */
1162       IF_DEBUG(scheduler,
1163                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1164                printThreadBlockage(t);
1165                fprintf(stderr, "\n"));
1166
1167       /* Only for dumping event to log file 
1168          ToDo: do I need this in GranSim, too?
1169       blockThread(t);
1170       */
1171 #endif
1172       threadPaused(t);
1173       break;
1174       
1175     case ThreadFinished:
1176       /* Need to check whether this was a main thread, and if so, signal
1177        * the task that started it with the return value.  If we have no
1178        * more main threads, we probably need to stop all the tasks until
1179        * we get a new one.
1180        */
1181       /* We also end up here if the thread kills itself with an
1182        * uncaught exception, see Exception.hc.
1183        */
1184       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1185 #if defined(GRAN)
1186       endThread(t, CurrentProc); // clean-up the thread
1187 #elif defined(PAR)
1188       /* For now all are advisory -- HWL */
1189       //if(t->priority==AdvisoryPriority) ??
1190       advisory_thread_count--;
1191       
1192 # ifdef DIST
1193       if(t->dist.priority==RevalPriority)
1194         FinishReval(t);
1195 # endif
1196       
1197       if (RtsFlags.ParFlags.ParStats.Full &&
1198           !RtsFlags.ParFlags.ParStats.Suppressed) 
1199         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1200 #endif
1201       break;
1202       
1203     default:
1204       barf("schedule: invalid thread return code %d", (int)ret);
1205     }
1206     
1207 #ifdef SMP
1208     cap->link = free_capabilities;
1209     free_capabilities = cap;
1210     n_free_capabilities++;
1211 #endif
1212
1213 #ifdef SMP
1214     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1215 #else
1216     if (ready_to_gc) 
1217 #endif
1218       {
1219       /* everybody back, start the GC.
1220        * Could do it in this thread, or signal a condition var
1221        * to do it in another thread.  Either way, we need to
1222        * broadcast on gc_pending_cond afterward.
1223        */
1224 #ifdef SMP
1225       IF_DEBUG(scheduler,sched_belch("doing GC"));
1226 #endif
1227       GarbageCollect(GetRoots,rtsFalse);
1228       ready_to_gc = rtsFalse;
1229 #ifdef SMP
1230       pthread_cond_broadcast(&gc_pending_cond);
1231 #endif
1232 #if defined(GRAN)
1233       /* add a ContinueThread event to continue execution of current thread */
1234       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1235                 ContinueThread,
1236                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1237       IF_GRAN_DEBUG(bq, 
1238                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1239                G_EVENTQ(0);
1240                G_CURR_THREADQ(0));
1241 #endif /* GRAN */
1242     }
1243 #if defined(GRAN)
1244   next_thread:
1245     IF_GRAN_DEBUG(unused,
1246                   print_eventq(EventHd));
1247
1248     event = get_next_event();
1249
1250 #elif defined(PAR)
1251   next_thread:
1252     /* ToDo: wait for next message to arrive rather than busy wait */
1253
1254 #else /* GRAN */
1255   /* not any more
1256   next_thread:
1257     t = take_off_run_queue(END_TSO_QUEUE);
1258   */
1259 #endif /* GRAN */
1260   } /* end of while(1) */
1261   IF_PAR_DEBUG(verbose,
1262                belch("== Leaving schedule() after having received Finish"));
1263 }
1264
1265 /* ---------------------------------------------------------------------------
1266  * deleteAllThreads():  kill all the live threads.
1267  *
1268  * This is used when we catch a user interrupt (^C), before performing
1269  * any necessary cleanups and running finalizers.
1270  * ------------------------------------------------------------------------- */
1271    
1272 void deleteAllThreads ( void )
1273 {
1274   StgTSO* t;
1275   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1276   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1277       deleteThread(t);
1278   }
1279   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1280       deleteThread(t);
1281   }
1282   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1283       deleteThread(t);
1284   }
1285   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1286   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1287   sleeping_queue = END_TSO_QUEUE;
1288 }
1289
1290 /* startThread and  insertThread are now in GranSim.c -- HWL */
1291
1292 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1293 //@subsection Suspend and Resume
1294
1295 /* ---------------------------------------------------------------------------
1296  * Suspending & resuming Haskell threads.
1297  * 
1298  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1299  * its capability before calling the C function.  This allows another
1300  * task to pick up the capability and carry on running Haskell
1301  * threads.  It also means that if the C call blocks, it won't lock
1302  * the whole system.
1303  *
1304  * The Haskell thread making the C call is put to sleep for the
1305  * duration of the call, on the susepended_ccalling_threads queue.  We
1306  * give out a token to the task, which it can use to resume the thread
1307  * on return from the C function.
1308  * ------------------------------------------------------------------------- */
1309    
1310 StgInt
1311 suspendThread( Capability *cap )
1312 {
1313   nat tok;
1314
1315   ACQUIRE_LOCK(&sched_mutex);
1316
1317   IF_DEBUG(scheduler,
1318            sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id));
1319
1320   threadPaused(cap->rCurrentTSO);
1321   cap->rCurrentTSO->link = suspended_ccalling_threads;
1322   suspended_ccalling_threads = cap->rCurrentTSO;
1323
1324   /* Use the thread ID as the token; it should be unique */
1325   tok = cap->rCurrentTSO->id;
1326
1327 #ifdef SMP
1328   cap->link = free_capabilities;
1329   free_capabilities = cap;
1330   n_free_capabilities++;
1331 #endif
1332
1333   RELEASE_LOCK(&sched_mutex);
1334   return tok; 
1335 }
1336
1337 Capability *
1338 resumeThread( StgInt tok )
1339 {
1340   StgTSO *tso, **prev;
1341   Capability *cap;
1342
1343   ACQUIRE_LOCK(&sched_mutex);
1344
1345   prev = &suspended_ccalling_threads;
1346   for (tso = suspended_ccalling_threads; 
1347        tso != END_TSO_QUEUE; 
1348        prev = &tso->link, tso = tso->link) {
1349     if (tso->id == (StgThreadID)tok) {
1350       *prev = tso->link;
1351       break;
1352     }
1353   }
1354   if (tso == END_TSO_QUEUE) {
1355     barf("resumeThread: thread not found");
1356   }
1357   tso->link = END_TSO_QUEUE;
1358
1359 #ifdef SMP
1360   while (free_capabilities == NULL) {
1361     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1362     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1363     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1364   }
1365   cap = free_capabilities;
1366   free_capabilities = cap->link;
1367   n_free_capabilities--;
1368 #else  
1369   cap = &MainRegTable;
1370 #endif
1371
1372   cap->rCurrentTSO = tso;
1373
1374   RELEASE_LOCK(&sched_mutex);
1375   return cap;
1376 }
1377
1378
1379 /* ---------------------------------------------------------------------------
1380  * Static functions
1381  * ------------------------------------------------------------------------ */
1382 static void unblockThread(StgTSO *tso);
1383
1384 /* ---------------------------------------------------------------------------
1385  * Comparing Thread ids.
1386  *
1387  * This is used from STG land in the implementation of the
1388  * instances of Eq/Ord for ThreadIds.
1389  * ------------------------------------------------------------------------ */
1390
1391 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1392
1393   StgThreadID id1 = tso1->id; 
1394   StgThreadID id2 = tso2->id;
1395  
1396   if (id1 < id2) return (-1);
1397   if (id1 > id2) return 1;
1398   return 0;
1399 }
1400
1401 /* ---------------------------------------------------------------------------
1402    Create a new thread.
1403
1404    The new thread starts with the given stack size.  Before the
1405    scheduler can run, however, this thread needs to have a closure
1406    (and possibly some arguments) pushed on its stack.  See
1407    pushClosure() in Schedule.h.
1408
1409    createGenThread() and createIOThread() (in SchedAPI.h) are
1410    convenient packaged versions of this function.
1411
1412    currently pri (priority) is only used in a GRAN setup -- HWL
1413    ------------------------------------------------------------------------ */
1414 //@cindex createThread
1415 #if defined(GRAN)
1416 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1417 StgTSO *
1418 createThread(nat stack_size, StgInt pri)
1419 {
1420   return createThread_(stack_size, rtsFalse, pri);
1421 }
1422
1423 static StgTSO *
1424 createThread_(nat size, rtsBool have_lock, StgInt pri)
1425 {
1426 #else
1427 StgTSO *
1428 createThread(nat stack_size)
1429 {
1430   return createThread_(stack_size, rtsFalse);
1431 }
1432
1433 static StgTSO *
1434 createThread_(nat size, rtsBool have_lock)
1435 {
1436 #endif
1437
1438     StgTSO *tso;
1439     nat stack_size;
1440
1441     /* First check whether we should create a thread at all */
1442 #if defined(PAR)
1443   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1444   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1445     threadsIgnored++;
1446     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1447           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1448     return END_TSO_QUEUE;
1449   }
1450   threadsCreated++;
1451 #endif
1452
1453 #if defined(GRAN)
1454   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1455 #endif
1456
1457   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1458
1459   /* catch ridiculously small stack sizes */
1460   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1461     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1462   }
1463
1464   stack_size = size - TSO_STRUCT_SIZEW;
1465
1466   tso = (StgTSO *)allocate(size);
1467   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1468
1469   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1470 #if defined(GRAN)
1471   SET_GRAN_HDR(tso, ThisPE);
1472 #endif
1473   tso->what_next     = ThreadEnterGHC;
1474
1475   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1476    * protect the increment operation on next_thread_id.
1477    * In future, we could use an atomic increment instead.
1478    */
1479   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1480   tso->id = next_thread_id++; 
1481   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1482
1483   tso->why_blocked  = NotBlocked;
1484   tso->blocked_exceptions = NULL;
1485
1486   tso->stack_size   = stack_size;
1487   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1488                               - TSO_STRUCT_SIZEW;
1489   tso->sp           = (P_)&(tso->stack) + stack_size;
1490
1491 #ifdef PROFILING
1492   tso->prof.CCCS = CCS_MAIN;
1493 #endif
1494
1495   /* put a stop frame on the stack */
1496   tso->sp -= sizeofW(StgStopFrame);
1497   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1498   tso->su = (StgUpdateFrame*)tso->sp;
1499
1500   // ToDo: check this
1501 #if defined(GRAN)
1502   tso->link = END_TSO_QUEUE;
1503   /* uses more flexible routine in GranSim */
1504   insertThread(tso, CurrentProc);
1505 #else
1506   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1507    * from its creation
1508    */
1509 #endif
1510
1511 #if defined(GRAN) 
1512   if (RtsFlags.GranFlags.GranSimStats.Full) 
1513     DumpGranEvent(GR_START,tso);
1514 #elif defined(PAR)
1515   if (RtsFlags.ParFlags.ParStats.Full) 
1516     DumpGranEvent(GR_STARTQ,tso);
1517   /* HACk to avoid SCHEDULE 
1518      LastTSO = tso; */
1519 #endif
1520
1521   /* Link the new thread on the global thread list.
1522    */
1523   tso->global_link = all_threads;
1524   all_threads = tso;
1525
1526 #if defined(DIST)
1527   tso->dist.priority = MandatoryPriority; //by default that is...
1528 #endif
1529
1530 #if defined(GRAN)
1531   tso->gran.pri = pri;
1532 # if defined(DEBUG)
1533   tso->gran.magic = TSO_MAGIC; // debugging only
1534 # endif
1535   tso->gran.sparkname   = 0;
1536   tso->gran.startedat   = CURRENT_TIME; 
1537   tso->gran.exported    = 0;
1538   tso->gran.basicblocks = 0;
1539   tso->gran.allocs      = 0;
1540   tso->gran.exectime    = 0;
1541   tso->gran.fetchtime   = 0;
1542   tso->gran.fetchcount  = 0;
1543   tso->gran.blocktime   = 0;
1544   tso->gran.blockcount  = 0;
1545   tso->gran.blockedat   = 0;
1546   tso->gran.globalsparks = 0;
1547   tso->gran.localsparks  = 0;
1548   if (RtsFlags.GranFlags.Light)
1549     tso->gran.clock  = Now; /* local clock */
1550   else
1551     tso->gran.clock  = 0;
1552
1553   IF_DEBUG(gran,printTSO(tso));
1554 #elif defined(PAR)
1555 # if defined(DEBUG)
1556   tso->par.magic = TSO_MAGIC; // debugging only
1557 # endif
1558   tso->par.sparkname   = 0;
1559   tso->par.startedat   = CURRENT_TIME; 
1560   tso->par.exported    = 0;
1561   tso->par.basicblocks = 0;
1562   tso->par.allocs      = 0;
1563   tso->par.exectime    = 0;
1564   tso->par.fetchtime   = 0;
1565   tso->par.fetchcount  = 0;
1566   tso->par.blocktime   = 0;
1567   tso->par.blockcount  = 0;
1568   tso->par.blockedat   = 0;
1569   tso->par.globalsparks = 0;
1570   tso->par.localsparks  = 0;
1571 #endif
1572
1573 #if defined(GRAN)
1574   globalGranStats.tot_threads_created++;
1575   globalGranStats.threads_created_on_PE[CurrentProc]++;
1576   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1577   globalGranStats.tot_sq_probes++;
1578 #elif defined(PAR)
1579   // collect parallel global statistics (currently done together with GC stats)
1580   if (RtsFlags.ParFlags.ParStats.Global &&
1581       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1582     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1583     globalParStats.tot_threads_created++;
1584   }
1585 #endif 
1586
1587 #if defined(GRAN)
1588   IF_GRAN_DEBUG(pri,
1589                 belch("==__ schedule: Created TSO %d (%p);",
1590                       CurrentProc, tso, tso->id));
1591 #elif defined(PAR)
1592     IF_PAR_DEBUG(verbose,
1593                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1594                        tso->id, tso, advisory_thread_count));
1595 #else
1596   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1597                                  tso->id, tso->stack_size));
1598 #endif    
1599   return tso;
1600 }
1601
1602 #if defined(PAR)
1603 /* RFP:
1604    all parallel thread creation calls should fall through the following routine.
1605 */
1606 StgTSO *
1607 createSparkThread(rtsSpark spark) 
1608 { StgTSO *tso;
1609   ASSERT(spark != (rtsSpark)NULL);
1610   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1611   { threadsIgnored++;
1612     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1613           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1614     return END_TSO_QUEUE;
1615   }
1616   else
1617   { threadsCreated++;
1618     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1619     if (tso==END_TSO_QUEUE)     
1620       barf("createSparkThread: Cannot create TSO");
1621 #if defined(DIST)
1622     tso->priority = AdvisoryPriority;
1623 #endif
1624     pushClosure(tso,spark);
1625     PUSH_ON_RUN_QUEUE(tso);
1626     advisory_thread_count++;    
1627   }
1628   return tso;
1629 }
1630 #endif
1631
1632 /*
1633   Turn a spark into a thread.
1634   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1635 */
1636 #if defined(PAR)
1637 //@cindex activateSpark
1638 StgTSO *
1639 activateSpark (rtsSpark spark) 
1640 {
1641   StgTSO *tso;
1642
1643   tso = createSparkThread(spark);
1644   if (RtsFlags.ParFlags.ParStats.Full) {   
1645     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1646     IF_PAR_DEBUG(verbose,
1647                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1648                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1649   }
1650   // ToDo: fwd info on local/global spark to thread -- HWL
1651   // tso->gran.exported =  spark->exported;
1652   // tso->gran.locked =   !spark->global;
1653   // tso->gran.sparkname = spark->name;
1654
1655   return tso;
1656 }
1657 #endif
1658
1659 /* ---------------------------------------------------------------------------
1660  * scheduleThread()
1661  *
1662  * scheduleThread puts a thread on the head of the runnable queue.
1663  * This will usually be done immediately after a thread is created.
1664  * The caller of scheduleThread must create the thread using e.g.
1665  * createThread and push an appropriate closure
1666  * on this thread's stack before the scheduler is invoked.
1667  * ------------------------------------------------------------------------ */
1668
1669 void
1670 scheduleThread(StgTSO *tso)
1671 {
1672   if (tso==END_TSO_QUEUE){    
1673     schedule();
1674     return;
1675   }
1676
1677   ACQUIRE_LOCK(&sched_mutex);
1678
1679   /* Put the new thread on the head of the runnable queue.  The caller
1680    * better push an appropriate closure on this thread's stack
1681    * beforehand.  In the SMP case, the thread may start running as
1682    * soon as we release the scheduler lock below.
1683    */
1684   PUSH_ON_RUN_QUEUE(tso);
1685   THREAD_RUNNABLE();
1686
1687 #if 0
1688   IF_DEBUG(scheduler,printTSO(tso));
1689 #endif
1690   RELEASE_LOCK(&sched_mutex);
1691 }
1692
1693 /* ---------------------------------------------------------------------------
1694  * startTasks()
1695  *
1696  * Start up Posix threads to run each of the scheduler tasks.
1697  * I believe the task ids are not needed in the system as defined.
1698  *  KH @ 25/10/99
1699  * ------------------------------------------------------------------------ */
1700
1701 #if defined(PAR) || defined(SMP)
1702 void
1703 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1704 {
1705   scheduleThread(END_TSO_QUEUE);
1706 }
1707 #endif
1708
1709 /* ---------------------------------------------------------------------------
1710  * initScheduler()
1711  *
1712  * Initialise the scheduler.  This resets all the queues - if the
1713  * queues contained any threads, they'll be garbage collected at the
1714  * next pass.
1715  *
1716  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1717  * ------------------------------------------------------------------------ */
1718
1719 #ifdef SMP
1720 static void
1721 term_handler(int sig STG_UNUSED)
1722 {
1723   stat_workerStop();
1724   ACQUIRE_LOCK(&term_mutex);
1725   await_death--;
1726   RELEASE_LOCK(&term_mutex);
1727   pthread_exit(NULL);
1728 }
1729 #endif
1730
1731 //@cindex initScheduler
1732 void 
1733 initScheduler(void)
1734 {
1735 #if defined(GRAN)
1736   nat i;
1737
1738   for (i=0; i<=MAX_PROC; i++) {
1739     run_queue_hds[i]      = END_TSO_QUEUE;
1740     run_queue_tls[i]      = END_TSO_QUEUE;
1741     blocked_queue_hds[i]  = END_TSO_QUEUE;
1742     blocked_queue_tls[i]  = END_TSO_QUEUE;
1743     ccalling_threadss[i]  = END_TSO_QUEUE;
1744     sleeping_queue        = END_TSO_QUEUE;
1745   }
1746 #else
1747   run_queue_hd      = END_TSO_QUEUE;
1748   run_queue_tl      = END_TSO_QUEUE;
1749   blocked_queue_hd  = END_TSO_QUEUE;
1750   blocked_queue_tl  = END_TSO_QUEUE;
1751   sleeping_queue    = END_TSO_QUEUE;
1752 #endif 
1753
1754   suspended_ccalling_threads  = END_TSO_QUEUE;
1755
1756   main_threads = NULL;
1757   all_threads  = END_TSO_QUEUE;
1758
1759   context_switch = 0;
1760   interrupted    = 0;
1761
1762   RtsFlags.ConcFlags.ctxtSwitchTicks =
1763       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1764
1765   /* Install the SIGHUP handler */
1766 #ifdef SMP
1767   {
1768     struct sigaction action,oact;
1769
1770     action.sa_handler = term_handler;
1771     sigemptyset(&action.sa_mask);
1772     action.sa_flags = 0;
1773     if (sigaction(SIGTERM, &action, &oact) != 0) {
1774       barf("can't install TERM handler");
1775     }
1776   }
1777 #endif
1778
1779 #ifdef SMP
1780   /* Allocate N Capabilities */
1781   {
1782     nat i;
1783     Capability *cap, *prev;
1784     cap  = NULL;
1785     prev = NULL;
1786     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1787       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1788       cap->link = prev;
1789       prev = cap;
1790     }
1791     free_capabilities = cap;
1792     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1793   }
1794   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1795                              n_free_capabilities););
1796 #endif
1797
1798 #if defined(SMP) || defined(PAR)
1799   initSparkPools();
1800 #endif
1801 }
1802
1803 #ifdef SMP
1804 void
1805 startTasks( void )
1806 {
1807   nat i;
1808   int r;
1809   pthread_t tid;
1810   
1811   /* make some space for saving all the thread ids */
1812   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1813                             "initScheduler:task_ids");
1814   
1815   /* and create all the threads */
1816   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1817     r = pthread_create(&tid,NULL,taskStart,NULL);
1818     if (r != 0) {
1819       barf("startTasks: Can't create new Posix thread");
1820     }
1821     task_ids[i].id = tid;
1822     task_ids[i].mut_time = 0.0;
1823     task_ids[i].mut_etime = 0.0;
1824     task_ids[i].gc_time = 0.0;
1825     task_ids[i].gc_etime = 0.0;
1826     task_ids[i].elapsedtimestart = elapsedtime();
1827     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1828   }
1829 }
1830 #endif
1831
1832 void
1833 exitScheduler( void )
1834 {
1835 #ifdef SMP
1836   nat i;
1837
1838   /* Don't want to use pthread_cancel, since we'd have to install
1839    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1840    * all our locks.
1841    */
1842 #if 0
1843   /* Cancel all our tasks */
1844   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1845     pthread_cancel(task_ids[i].id);
1846   }
1847   
1848   /* Wait for all the tasks to terminate */
1849   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1850     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1851                                task_ids[i].id));
1852     pthread_join(task_ids[i].id, NULL);
1853   }
1854 #endif
1855
1856   /* Send 'em all a SIGHUP.  That should shut 'em up.
1857    */
1858   await_death = RtsFlags.ParFlags.nNodes;
1859   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1860     pthread_kill(task_ids[i].id,SIGTERM);
1861   }
1862   while (await_death > 0) {
1863     sched_yield();
1864   }
1865 #endif
1866 }
1867
1868 /* -----------------------------------------------------------------------------
1869    Managing the per-task allocation areas.
1870    
1871    Each capability comes with an allocation area.  These are
1872    fixed-length block lists into which allocation can be done.
1873
1874    ToDo: no support for two-space collection at the moment???
1875    -------------------------------------------------------------------------- */
1876
1877 /* -----------------------------------------------------------------------------
1878  * waitThread is the external interface for running a new computation
1879  * and waiting for the result.
1880  *
1881  * In the non-SMP case, we create a new main thread, push it on the 
1882  * main-thread stack, and invoke the scheduler to run it.  The
1883  * scheduler will return when the top main thread on the stack has
1884  * completed or died, and fill in the necessary fields of the
1885  * main_thread structure.
1886  *
1887  * In the SMP case, we create a main thread as before, but we then
1888  * create a new condition variable and sleep on it.  When our new
1889  * main thread has completed, we'll be woken up and the status/result
1890  * will be in the main_thread struct.
1891  * -------------------------------------------------------------------------- */
1892
1893 int 
1894 howManyThreadsAvail ( void )
1895 {
1896    int i = 0;
1897    StgTSO* q;
1898    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1899       i++;
1900    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1901       i++;
1902    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1903       i++;
1904    return i;
1905 }
1906
1907 void
1908 finishAllThreads ( void )
1909 {
1910    do {
1911       while (run_queue_hd != END_TSO_QUEUE) {
1912          waitThread ( run_queue_hd, NULL );
1913       }
1914       while (blocked_queue_hd != END_TSO_QUEUE) {
1915          waitThread ( blocked_queue_hd, NULL );
1916       }
1917       while (sleeping_queue != END_TSO_QUEUE) {
1918          waitThread ( blocked_queue_hd, NULL );
1919       }
1920    } while 
1921       (blocked_queue_hd != END_TSO_QUEUE || 
1922        run_queue_hd     != END_TSO_QUEUE ||
1923        sleeping_queue   != END_TSO_QUEUE);
1924 }
1925
1926 SchedulerStatus
1927 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1928 {
1929   StgMainThread *m;
1930   SchedulerStatus stat;
1931
1932   ACQUIRE_LOCK(&sched_mutex);
1933   
1934   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1935
1936   m->tso = tso;
1937   m->ret = ret;
1938   m->stat = NoStatus;
1939 #ifdef SMP
1940   pthread_cond_init(&m->wakeup, NULL);
1941 #endif
1942
1943   m->link = main_threads;
1944   main_threads = m;
1945
1946   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
1947                               m->tso->id));
1948
1949 #ifdef SMP
1950   do {
1951     pthread_cond_wait(&m->wakeup, &sched_mutex);
1952   } while (m->stat == NoStatus);
1953 #elif defined(GRAN)
1954   /* GranSim specific init */
1955   CurrentTSO = m->tso;                // the TSO to run
1956   procStatus[MainProc] = Busy;        // status of main PE
1957   CurrentProc = MainProc;             // PE to run it on
1958
1959   schedule();
1960 #else
1961   schedule();
1962   ASSERT(m->stat != NoStatus);
1963 #endif
1964
1965   stat = m->stat;
1966
1967 #ifdef SMP
1968   pthread_cond_destroy(&m->wakeup);
1969 #endif
1970
1971   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
1972                               m->tso->id));
1973   free(m);
1974
1975   RELEASE_LOCK(&sched_mutex);
1976
1977   return stat;
1978 }
1979
1980 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1981 //@subsection Run queue code 
1982
1983 #if 0
1984 /* 
1985    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1986        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1987        implicit global variable that has to be correct when calling these
1988        fcts -- HWL 
1989 */
1990
1991 /* Put the new thread on the head of the runnable queue.
1992  * The caller of createThread better push an appropriate closure
1993  * on this thread's stack before the scheduler is invoked.
1994  */
1995 static /* inline */ void
1996 add_to_run_queue(tso)
1997 StgTSO* tso; 
1998 {
1999   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2000   tso->link = run_queue_hd;
2001   run_queue_hd = tso;
2002   if (run_queue_tl == END_TSO_QUEUE) {
2003     run_queue_tl = tso;
2004   }
2005 }
2006
2007 /* Put the new thread at the end of the runnable queue. */
2008 static /* inline */ void
2009 push_on_run_queue(tso)
2010 StgTSO* tso; 
2011 {
2012   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2013   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2014   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2015   if (run_queue_hd == END_TSO_QUEUE) {
2016     run_queue_hd = tso;
2017   } else {
2018     run_queue_tl->link = tso;
2019   }
2020   run_queue_tl = tso;
2021 }
2022
2023 /* 
2024    Should be inlined because it's used very often in schedule.  The tso
2025    argument is actually only needed in GranSim, where we want to have the
2026    possibility to schedule *any* TSO on the run queue, irrespective of the
2027    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2028    the run queue and dequeue the tso, adjusting the links in the queue. 
2029 */
2030 //@cindex take_off_run_queue
2031 static /* inline */ StgTSO*
2032 take_off_run_queue(StgTSO *tso) {
2033   StgTSO *t, *prev;
2034
2035   /* 
2036      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2037
2038      if tso is specified, unlink that tso from the run_queue (doesn't have
2039      to be at the beginning of the queue); GranSim only 
2040   */
2041   if (tso!=END_TSO_QUEUE) {
2042     /* find tso in queue */
2043     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2044          t!=END_TSO_QUEUE && t!=tso;
2045          prev=t, t=t->link) 
2046       /* nothing */ ;
2047     ASSERT(t==tso);
2048     /* now actually dequeue the tso */
2049     if (prev!=END_TSO_QUEUE) {
2050       ASSERT(run_queue_hd!=t);
2051       prev->link = t->link;
2052     } else {
2053       /* t is at beginning of thread queue */
2054       ASSERT(run_queue_hd==t);
2055       run_queue_hd = t->link;
2056     }
2057     /* t is at end of thread queue */
2058     if (t->link==END_TSO_QUEUE) {
2059       ASSERT(t==run_queue_tl);
2060       run_queue_tl = prev;
2061     } else {
2062       ASSERT(run_queue_tl!=t);
2063     }
2064     t->link = END_TSO_QUEUE;
2065   } else {
2066     /* take tso from the beginning of the queue; std concurrent code */
2067     t = run_queue_hd;
2068     if (t != END_TSO_QUEUE) {
2069       run_queue_hd = t->link;
2070       t->link = END_TSO_QUEUE;
2071       if (run_queue_hd == END_TSO_QUEUE) {
2072         run_queue_tl = END_TSO_QUEUE;
2073       }
2074     }
2075   }
2076   return t;
2077 }
2078
2079 #endif /* 0 */
2080
2081 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2082 //@subsection Garbage Collextion Routines
2083
2084 /* ---------------------------------------------------------------------------
2085    Where are the roots that we know about?
2086
2087         - all the threads on the runnable queue
2088         - all the threads on the blocked queue
2089         - all the threads on the sleeping queue
2090         - all the thread currently executing a _ccall_GC
2091         - all the "main threads"
2092      
2093    ------------------------------------------------------------------------ */
2094
2095 /* This has to be protected either by the scheduler monitor, or by the
2096         garbage collection monitor (probably the latter).
2097         KH @ 25/10/99
2098 */
2099
2100 static void
2101 GetRoots(evac_fn evac)
2102 {
2103   StgMainThread *m;
2104
2105 #if defined(GRAN)
2106   {
2107     nat i;
2108     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2109       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2110           evac((StgClosure **)&run_queue_hds[i]);
2111       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2112           evac((StgClosure **)&run_queue_tls[i]);
2113       
2114       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2115           evac((StgClosure **)&blocked_queue_hds[i]);
2116       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2117           evac((StgClosure **)&blocked_queue_tls[i]);
2118       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2119           evac((StgClosure **)&ccalling_threads[i]);
2120     }
2121   }
2122
2123   markEventQueue();
2124
2125 #else /* !GRAN */
2126   if (run_queue_hd != END_TSO_QUEUE) {
2127       ASSERT(run_queue_tl != END_TSO_QUEUE);
2128       evac((StgClosure **)&run_queue_hd);
2129       evac((StgClosure **)&run_queue_tl);
2130   }
2131   
2132   if (blocked_queue_hd != END_TSO_QUEUE) {
2133       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2134       evac((StgClosure **)&blocked_queue_hd);
2135       evac((StgClosure **)&blocked_queue_tl);
2136   }
2137   
2138   if (sleeping_queue != END_TSO_QUEUE) {
2139       evac((StgClosure **)&sleeping_queue);
2140   }
2141 #endif 
2142
2143   for (m = main_threads; m != NULL; m = m->link) {
2144       evac((StgClosure **)&m->tso);
2145   }
2146   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2147       evac((StgClosure **)&suspended_ccalling_threads);
2148   }
2149
2150 #if defined(SMP) || defined(PAR) || defined(GRAN)
2151   markSparkQueue(evac);
2152 #endif
2153 }
2154
2155 /* -----------------------------------------------------------------------------
2156    performGC
2157
2158    This is the interface to the garbage collector from Haskell land.
2159    We provide this so that external C code can allocate and garbage
2160    collect when called from Haskell via _ccall_GC.
2161
2162    It might be useful to provide an interface whereby the programmer
2163    can specify more roots (ToDo).
2164    
2165    This needs to be protected by the GC condition variable above.  KH.
2166    -------------------------------------------------------------------------- */
2167
2168 void (*extra_roots)(evac_fn);
2169
2170 void
2171 performGC(void)
2172 {
2173   GarbageCollect(GetRoots,rtsFalse);
2174 }
2175
2176 void
2177 performMajorGC(void)
2178 {
2179   GarbageCollect(GetRoots,rtsTrue);
2180 }
2181
2182 static void
2183 AllRoots(evac_fn evac)
2184 {
2185     GetRoots(evac);             // the scheduler's roots
2186     extra_roots(evac);          // the user's roots
2187 }
2188
2189 void
2190 performGCWithRoots(void (*get_roots)(evac_fn))
2191 {
2192   extra_roots = get_roots;
2193   GarbageCollect(AllRoots,rtsFalse);
2194 }
2195
2196 /* -----------------------------------------------------------------------------
2197    Stack overflow
2198
2199    If the thread has reached its maximum stack size, then raise the
2200    StackOverflow exception in the offending thread.  Otherwise
2201    relocate the TSO into a larger chunk of memory and adjust its stack
2202    size appropriately.
2203    -------------------------------------------------------------------------- */
2204
2205 static StgTSO *
2206 threadStackOverflow(StgTSO *tso)
2207 {
2208   nat new_stack_size, new_tso_size, diff, stack_words;
2209   StgPtr new_sp;
2210   StgTSO *dest;
2211
2212   IF_DEBUG(sanity,checkTSO(tso));
2213   if (tso->stack_size >= tso->max_stack_size) {
2214
2215     IF_DEBUG(gc,
2216              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2217                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2218              /* If we're debugging, just print out the top of the stack */
2219              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2220                                               tso->sp+64)));
2221
2222     /* Send this thread the StackOverflow exception */
2223     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2224     return tso;
2225   }
2226
2227   /* Try to double the current stack size.  If that takes us over the
2228    * maximum stack size for this thread, then use the maximum instead.
2229    * Finally round up so the TSO ends up as a whole number of blocks.
2230    */
2231   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2232   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2233                                        TSO_STRUCT_SIZE)/sizeof(W_);
2234   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2235   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2236
2237   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2238
2239   dest = (StgTSO *)allocate(new_tso_size);
2240   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2241
2242   /* copy the TSO block and the old stack into the new area */
2243   memcpy(dest,tso,TSO_STRUCT_SIZE);
2244   stack_words = tso->stack + tso->stack_size - tso->sp;
2245   new_sp = (P_)dest + new_tso_size - stack_words;
2246   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2247
2248   /* relocate the stack pointers... */
2249   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2250   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2251   dest->sp    = new_sp;
2252   dest->stack_size = new_stack_size;
2253         
2254   /* and relocate the update frame list */
2255   relocate_stack(dest, diff);
2256
2257   /* Mark the old TSO as relocated.  We have to check for relocated
2258    * TSOs in the garbage collector and any primops that deal with TSOs.
2259    *
2260    * It's important to set the sp and su values to just beyond the end
2261    * of the stack, so we don't attempt to scavenge any part of the
2262    * dead TSO's stack.
2263    */
2264   tso->what_next = ThreadRelocated;
2265   tso->link = dest;
2266   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2267   tso->su = (StgUpdateFrame *)tso->sp;
2268   tso->why_blocked = NotBlocked;
2269   dest->mut_link = NULL;
2270
2271   IF_PAR_DEBUG(verbose,
2272                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2273                      tso->id, tso, tso->stack_size);
2274                /* If we're debugging, just print out the top of the stack */
2275                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2276                                                 tso->sp+64)));
2277   
2278   IF_DEBUG(sanity,checkTSO(tso));
2279 #if 0
2280   IF_DEBUG(scheduler,printTSO(dest));
2281 #endif
2282
2283   return dest;
2284 }
2285
2286 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2287 //@subsection Blocking Queue Routines
2288
2289 /* ---------------------------------------------------------------------------
2290    Wake up a queue that was blocked on some resource.
2291    ------------------------------------------------------------------------ */
2292
2293 #if defined(GRAN)
2294 static inline void
2295 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2296 {
2297 }
2298 #elif defined(PAR)
2299 static inline void
2300 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2301 {
2302   /* write RESUME events to log file and
2303      update blocked and fetch time (depending on type of the orig closure) */
2304   if (RtsFlags.ParFlags.ParStats.Full) {
2305     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2306                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2307                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2308     if (EMPTY_RUN_QUEUE())
2309       emitSchedule = rtsTrue;
2310
2311     switch (get_itbl(node)->type) {
2312         case FETCH_ME_BQ:
2313           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2314           break;
2315         case RBH:
2316         case FETCH_ME:
2317         case BLACKHOLE_BQ:
2318           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2319           break;
2320 #ifdef DIST
2321         case MVAR:
2322           break;
2323 #endif    
2324         default:
2325           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2326         }
2327       }
2328 }
2329 #endif
2330
2331 #if defined(GRAN)
2332 static StgBlockingQueueElement *
2333 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2334 {
2335     StgTSO *tso;
2336     PEs node_loc, tso_loc;
2337
2338     node_loc = where_is(node); // should be lifted out of loop
2339     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2340     tso_loc = where_is((StgClosure *)tso);
2341     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2342       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2343       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2344       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2345       // insertThread(tso, node_loc);
2346       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2347                 ResumeThread,
2348                 tso, node, (rtsSpark*)NULL);
2349       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2350       // len_local++;
2351       // len++;
2352     } else { // TSO is remote (actually should be FMBQ)
2353       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2354                                   RtsFlags.GranFlags.Costs.gunblocktime +
2355                                   RtsFlags.GranFlags.Costs.latency;
2356       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2357                 UnblockThread,
2358                 tso, node, (rtsSpark*)NULL);
2359       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2360       // len++;
2361     }
2362     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2363     IF_GRAN_DEBUG(bq,
2364                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2365                           (node_loc==tso_loc ? "Local" : "Global"), 
2366                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2367     tso->block_info.closure = NULL;
2368     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2369                              tso->id, tso));
2370 }
2371 #elif defined(PAR)
2372 static StgBlockingQueueElement *
2373 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2374 {
2375     StgBlockingQueueElement *next;
2376
2377     switch (get_itbl(bqe)->type) {
2378     case TSO:
2379       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2380       /* if it's a TSO just push it onto the run_queue */
2381       next = bqe->link;
2382       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2383       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2384       THREAD_RUNNABLE();
2385       unblockCount(bqe, node);
2386       /* reset blocking status after dumping event */
2387       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2388       break;
2389
2390     case BLOCKED_FETCH:
2391       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2392       next = bqe->link;
2393       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2394       PendingFetches = (StgBlockedFetch *)bqe;
2395       break;
2396
2397 # if defined(DEBUG)
2398       /* can ignore this case in a non-debugging setup; 
2399          see comments on RBHSave closures above */
2400     case CONSTR:
2401       /* check that the closure is an RBHSave closure */
2402       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2403              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2404              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2405       break;
2406
2407     default:
2408       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2409            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2410            (StgClosure *)bqe);
2411 # endif
2412     }
2413   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2414   return next;
2415 }
2416
2417 #else /* !GRAN && !PAR */
2418 static StgTSO *
2419 unblockOneLocked(StgTSO *tso)
2420 {
2421   StgTSO *next;
2422
2423   ASSERT(get_itbl(tso)->type == TSO);
2424   ASSERT(tso->why_blocked != NotBlocked);
2425   tso->why_blocked = NotBlocked;
2426   next = tso->link;
2427   PUSH_ON_RUN_QUEUE(tso);
2428   THREAD_RUNNABLE();
2429   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2430   return next;
2431 }
2432 #endif
2433
2434 #if defined(GRAN) || defined(PAR)
2435 inline StgBlockingQueueElement *
2436 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2437 {
2438   ACQUIRE_LOCK(&sched_mutex);
2439   bqe = unblockOneLocked(bqe, node);
2440   RELEASE_LOCK(&sched_mutex);
2441   return bqe;
2442 }
2443 #else
2444 inline StgTSO *
2445 unblockOne(StgTSO *tso)
2446 {
2447   ACQUIRE_LOCK(&sched_mutex);
2448   tso = unblockOneLocked(tso);
2449   RELEASE_LOCK(&sched_mutex);
2450   return tso;
2451 }
2452 #endif
2453
2454 #if defined(GRAN)
2455 void 
2456 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2457 {
2458   StgBlockingQueueElement *bqe;
2459   PEs node_loc;
2460   nat len = 0; 
2461
2462   IF_GRAN_DEBUG(bq, 
2463                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2464                       node, CurrentProc, CurrentTime[CurrentProc], 
2465                       CurrentTSO->id, CurrentTSO));
2466
2467   node_loc = where_is(node);
2468
2469   ASSERT(q == END_BQ_QUEUE ||
2470          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2471          get_itbl(q)->type == CONSTR); // closure (type constructor)
2472   ASSERT(is_unique(node));
2473
2474   /* FAKE FETCH: magically copy the node to the tso's proc;
2475      no Fetch necessary because in reality the node should not have been 
2476      moved to the other PE in the first place
2477   */
2478   if (CurrentProc!=node_loc) {
2479     IF_GRAN_DEBUG(bq, 
2480                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2481                         node, node_loc, CurrentProc, CurrentTSO->id, 
2482                         // CurrentTSO, where_is(CurrentTSO),
2483                         node->header.gran.procs));
2484     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2485     IF_GRAN_DEBUG(bq, 
2486                   belch("## new bitmask of node %p is %#x",
2487                         node, node->header.gran.procs));
2488     if (RtsFlags.GranFlags.GranSimStats.Global) {
2489       globalGranStats.tot_fake_fetches++;
2490     }
2491   }
2492
2493   bqe = q;
2494   // ToDo: check: ASSERT(CurrentProc==node_loc);
2495   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2496     //next = bqe->link;
2497     /* 
2498        bqe points to the current element in the queue
2499        next points to the next element in the queue
2500     */
2501     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2502     //tso_loc = where_is(tso);
2503     len++;
2504     bqe = unblockOneLocked(bqe, node);
2505   }
2506
2507   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2508      the closure to make room for the anchor of the BQ */
2509   if (bqe!=END_BQ_QUEUE) {
2510     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2511     /*
2512     ASSERT((info_ptr==&RBH_Save_0_info) ||
2513            (info_ptr==&RBH_Save_1_info) ||
2514            (info_ptr==&RBH_Save_2_info));
2515     */
2516     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2517     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2518     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2519
2520     IF_GRAN_DEBUG(bq,
2521                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2522                         node, info_type(node)));
2523   }
2524
2525   /* statistics gathering */
2526   if (RtsFlags.GranFlags.GranSimStats.Global) {
2527     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2528     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2529     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2530     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2531   }
2532   IF_GRAN_DEBUG(bq,
2533                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2534                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2535 }
2536 #elif defined(PAR)
2537 void 
2538 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2539 {
2540   StgBlockingQueueElement *bqe;
2541
2542   ACQUIRE_LOCK(&sched_mutex);
2543
2544   IF_PAR_DEBUG(verbose, 
2545                belch("##-_ AwBQ for node %p on [%x]: ",
2546                      node, mytid));
2547 #ifdef DIST  
2548   //RFP
2549   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2550     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2551     return;
2552   }
2553 #endif
2554   
2555   ASSERT(q == END_BQ_QUEUE ||
2556          get_itbl(q)->type == TSO ||           
2557          get_itbl(q)->type == BLOCKED_FETCH || 
2558          get_itbl(q)->type == CONSTR); 
2559
2560   bqe = q;
2561   while (get_itbl(bqe)->type==TSO || 
2562          get_itbl(bqe)->type==BLOCKED_FETCH) {
2563     bqe = unblockOneLocked(bqe, node);
2564   }
2565   RELEASE_LOCK(&sched_mutex);
2566 }
2567
2568 #else   /* !GRAN && !PAR */
2569 void
2570 awakenBlockedQueue(StgTSO *tso)
2571 {
2572   ACQUIRE_LOCK(&sched_mutex);
2573   while (tso != END_TSO_QUEUE) {
2574     tso = unblockOneLocked(tso);
2575   }
2576   RELEASE_LOCK(&sched_mutex);
2577 }
2578 #endif
2579
2580 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2581 //@subsection Exception Handling Routines
2582
2583 /* ---------------------------------------------------------------------------
2584    Interrupt execution
2585    - usually called inside a signal handler so it mustn't do anything fancy.   
2586    ------------------------------------------------------------------------ */
2587
2588 void
2589 interruptStgRts(void)
2590 {
2591     interrupted    = 1;
2592     context_switch = 1;
2593 }
2594
2595 /* -----------------------------------------------------------------------------
2596    Unblock a thread
2597
2598    This is for use when we raise an exception in another thread, which
2599    may be blocked.
2600    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2601    -------------------------------------------------------------------------- */
2602
2603 #if defined(GRAN) || defined(PAR)
2604 /*
2605   NB: only the type of the blocking queue is different in GranSim and GUM
2606       the operations on the queue-elements are the same
2607       long live polymorphism!
2608 */
2609 static void
2610 unblockThread(StgTSO *tso)
2611 {
2612   StgBlockingQueueElement *t, **last;
2613
2614   ACQUIRE_LOCK(&sched_mutex);
2615   switch (tso->why_blocked) {
2616
2617   case NotBlocked:
2618     return;  /* not blocked */
2619
2620   case BlockedOnMVar:
2621     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2622     {
2623       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2624       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2625
2626       last = (StgBlockingQueueElement **)&mvar->head;
2627       for (t = (StgBlockingQueueElement *)mvar->head; 
2628            t != END_BQ_QUEUE; 
2629            last = &t->link, last_tso = t, t = t->link) {
2630         if (t == (StgBlockingQueueElement *)tso) {
2631           *last = (StgBlockingQueueElement *)tso->link;
2632           if (mvar->tail == tso) {
2633             mvar->tail = (StgTSO *)last_tso;
2634           }
2635           goto done;
2636         }
2637       }
2638       barf("unblockThread (MVAR): TSO not found");
2639     }
2640
2641   case BlockedOnBlackHole:
2642     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2643     {
2644       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2645
2646       last = &bq->blocking_queue;
2647       for (t = bq->blocking_queue; 
2648            t != END_BQ_QUEUE; 
2649            last = &t->link, t = t->link) {
2650         if (t == (StgBlockingQueueElement *)tso) {
2651           *last = (StgBlockingQueueElement *)tso->link;
2652           goto done;
2653         }
2654       }
2655       barf("unblockThread (BLACKHOLE): TSO not found");
2656     }
2657
2658   case BlockedOnException:
2659     {
2660       StgTSO *target  = tso->block_info.tso;
2661
2662       ASSERT(get_itbl(target)->type == TSO);
2663
2664       if (target->what_next == ThreadRelocated) {
2665           target = target->link;
2666           ASSERT(get_itbl(target)->type == TSO);
2667       }
2668
2669       ASSERT(target->blocked_exceptions != NULL);
2670
2671       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2672       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2673            t != END_BQ_QUEUE; 
2674            last = &t->link, t = t->link) {
2675         ASSERT(get_itbl(t)->type == TSO);
2676         if (t == (StgBlockingQueueElement *)tso) {
2677           *last = (StgBlockingQueueElement *)tso->link;
2678           goto done;
2679         }
2680       }
2681       barf("unblockThread (Exception): TSO not found");
2682     }
2683
2684   case BlockedOnRead:
2685   case BlockedOnWrite:
2686     {
2687       /* take TSO off blocked_queue */
2688       StgBlockingQueueElement *prev = NULL;
2689       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2690            prev = t, t = t->link) {
2691         if (t == (StgBlockingQueueElement *)tso) {
2692           if (prev == NULL) {
2693             blocked_queue_hd = (StgTSO *)t->link;
2694             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2695               blocked_queue_tl = END_TSO_QUEUE;
2696             }
2697           } else {
2698             prev->link = t->link;
2699             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2700               blocked_queue_tl = (StgTSO *)prev;
2701             }
2702           }
2703           goto done;
2704         }
2705       }
2706       barf("unblockThread (I/O): TSO not found");
2707     }
2708
2709   case BlockedOnDelay:
2710     {
2711       /* take TSO off sleeping_queue */
2712       StgBlockingQueueElement *prev = NULL;
2713       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2714            prev = t, t = t->link) {
2715         if (t == (StgBlockingQueueElement *)tso) {
2716           if (prev == NULL) {
2717             sleeping_queue = (StgTSO *)t->link;
2718           } else {
2719             prev->link = t->link;
2720           }
2721           goto done;
2722         }
2723       }
2724       barf("unblockThread (I/O): TSO not found");
2725     }
2726
2727   default:
2728     barf("unblockThread");
2729   }
2730
2731  done:
2732   tso->link = END_TSO_QUEUE;
2733   tso->why_blocked = NotBlocked;
2734   tso->block_info.closure = NULL;
2735   PUSH_ON_RUN_QUEUE(tso);
2736   RELEASE_LOCK(&sched_mutex);
2737 }
2738 #else
2739 static void
2740 unblockThread(StgTSO *tso)
2741 {
2742   StgTSO *t, **last;
2743
2744   ACQUIRE_LOCK(&sched_mutex);
2745   switch (tso->why_blocked) {
2746
2747   case NotBlocked:
2748     return;  /* not blocked */
2749
2750   case BlockedOnMVar:
2751     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2752     {
2753       StgTSO *last_tso = END_TSO_QUEUE;
2754       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2755
2756       last = &mvar->head;
2757       for (t = mvar->head; t != END_TSO_QUEUE; 
2758            last = &t->link, last_tso = t, t = t->link) {
2759         if (t == tso) {
2760           *last = tso->link;
2761           if (mvar->tail == tso) {
2762             mvar->tail = last_tso;
2763           }
2764           goto done;
2765         }
2766       }
2767       barf("unblockThread (MVAR): TSO not found");
2768     }
2769
2770   case BlockedOnBlackHole:
2771     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2772     {
2773       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2774
2775       last = &bq->blocking_queue;
2776       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2777            last = &t->link, t = t->link) {
2778         if (t == tso) {
2779           *last = tso->link;
2780           goto done;
2781         }
2782       }
2783       barf("unblockThread (BLACKHOLE): TSO not found");
2784     }
2785
2786   case BlockedOnException:
2787     {
2788       StgTSO *target  = tso->block_info.tso;
2789
2790       ASSERT(get_itbl(target)->type == TSO);
2791
2792       while (target->what_next == ThreadRelocated) {
2793           target = target->link;
2794           ASSERT(get_itbl(target)->type == TSO);
2795       }
2796       
2797       ASSERT(target->blocked_exceptions != NULL);
2798
2799       last = &target->blocked_exceptions;
2800       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2801            last = &t->link, t = t->link) {
2802         ASSERT(get_itbl(t)->type == TSO);
2803         if (t == tso) {
2804           *last = tso->link;
2805           goto done;
2806         }
2807       }
2808       barf("unblockThread (Exception): TSO not found");
2809     }
2810
2811   case BlockedOnRead:
2812   case BlockedOnWrite:
2813     {
2814       StgTSO *prev = NULL;
2815       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2816            prev = t, t = t->link) {
2817         if (t == tso) {
2818           if (prev == NULL) {
2819             blocked_queue_hd = t->link;
2820             if (blocked_queue_tl == t) {
2821               blocked_queue_tl = END_TSO_QUEUE;
2822             }
2823           } else {
2824             prev->link = t->link;
2825             if (blocked_queue_tl == t) {
2826               blocked_queue_tl = prev;
2827             }
2828           }
2829           goto done;
2830         }
2831       }
2832       barf("unblockThread (I/O): TSO not found");
2833     }
2834
2835   case BlockedOnDelay:
2836     {
2837       StgTSO *prev = NULL;
2838       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2839            prev = t, t = t->link) {
2840         if (t == tso) {
2841           if (prev == NULL) {
2842             sleeping_queue = t->link;
2843           } else {
2844             prev->link = t->link;
2845           }
2846           goto done;
2847         }
2848       }
2849       barf("unblockThread (I/O): TSO not found");
2850     }
2851
2852   default:
2853     barf("unblockThread");
2854   }
2855
2856  done:
2857   tso->link = END_TSO_QUEUE;
2858   tso->why_blocked = NotBlocked;
2859   tso->block_info.closure = NULL;
2860   PUSH_ON_RUN_QUEUE(tso);
2861   RELEASE_LOCK(&sched_mutex);
2862 }
2863 #endif
2864
2865 /* -----------------------------------------------------------------------------
2866  * raiseAsync()
2867  *
2868  * The following function implements the magic for raising an
2869  * asynchronous exception in an existing thread.
2870  *
2871  * We first remove the thread from any queue on which it might be
2872  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2873  *
2874  * We strip the stack down to the innermost CATCH_FRAME, building
2875  * thunks in the heap for all the active computations, so they can 
2876  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2877  * an application of the handler to the exception, and push it on
2878  * the top of the stack.
2879  * 
2880  * How exactly do we save all the active computations?  We create an
2881  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2882  * AP_UPDs pushes everything from the corresponding update frame
2883  * upwards onto the stack.  (Actually, it pushes everything up to the
2884  * next update frame plus a pointer to the next AP_UPD object.
2885  * Entering the next AP_UPD object pushes more onto the stack until we
2886  * reach the last AP_UPD object - at which point the stack should look
2887  * exactly as it did when we killed the TSO and we can continue
2888  * execution by entering the closure on top of the stack.
2889  *
2890  * We can also kill a thread entirely - this happens if either (a) the 
2891  * exception passed to raiseAsync is NULL, or (b) there's no
2892  * CATCH_FRAME on the stack.  In either case, we strip the entire
2893  * stack and replace the thread with a zombie.
2894  *
2895  * -------------------------------------------------------------------------- */
2896  
2897 void 
2898 deleteThread(StgTSO *tso)
2899 {
2900   raiseAsync(tso,NULL);
2901 }
2902
2903 void
2904 raiseAsync(StgTSO *tso, StgClosure *exception)
2905 {
2906   StgUpdateFrame* su = tso->su;
2907   StgPtr          sp = tso->sp;
2908   
2909   /* Thread already dead? */
2910   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2911     return;
2912   }
2913
2914   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2915
2916   /* Remove it from any blocking queues */
2917   unblockThread(tso);
2918
2919   /* The stack freezing code assumes there's a closure pointer on
2920    * the top of the stack.  This isn't always the case with compiled
2921    * code, so we have to push a dummy closure on the top which just
2922    * returns to the next return address on the stack.
2923    */
2924   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2925     *(--sp) = (W_)&stg_dummy_ret_closure;
2926   }
2927
2928   while (1) {
2929     nat words = ((P_)su - (P_)sp) - 1;
2930     nat i;
2931     StgAP_UPD * ap;
2932
2933     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2934      * then build PAP(handler,exception,realworld#), and leave it on
2935      * top of the stack ready to enter.
2936      */
2937     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2938       StgCatchFrame *cf = (StgCatchFrame *)su;
2939       /* we've got an exception to raise, so let's pass it to the
2940        * handler in this frame.
2941        */
2942       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2943       TICK_ALLOC_UPD_PAP(3,0);
2944       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
2945               
2946       ap->n_args = 2;
2947       ap->fun = cf->handler;    /* :: Exception -> IO a */
2948       ap->payload[0] = exception;
2949       ap->payload[1] = ARG_TAG(0); /* realworld token */
2950
2951       /* throw away the stack from Sp up to and including the
2952        * CATCH_FRAME.
2953        */
2954       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2955       tso->su = cf->link;
2956
2957       /* Restore the blocked/unblocked state for asynchronous exceptions
2958        * at the CATCH_FRAME.  
2959        *
2960        * If exceptions were unblocked at the catch, arrange that they
2961        * are unblocked again after executing the handler by pushing an
2962        * unblockAsyncExceptions_ret stack frame.
2963        */
2964       if (!cf->exceptions_blocked) {
2965         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
2966       }
2967       
2968       /* Ensure that async exceptions are blocked when running the handler.
2969        */
2970       if (tso->blocked_exceptions == NULL) {
2971         tso->blocked_exceptions = END_TSO_QUEUE;
2972       }
2973       
2974       /* Put the newly-built PAP on top of the stack, ready to execute
2975        * when the thread restarts.
2976        */
2977       sp[0] = (W_)ap;
2978       tso->sp = sp;
2979       tso->what_next = ThreadEnterGHC;
2980       IF_DEBUG(sanity, checkTSO(tso));
2981       return;
2982     }
2983
2984     /* First build an AP_UPD consisting of the stack chunk above the
2985      * current update frame, with the top word on the stack as the
2986      * fun field.
2987      */
2988     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2989     
2990     ASSERT(words >= 0);
2991     
2992     ap->n_args = words;
2993     ap->fun    = (StgClosure *)sp[0];
2994     sp++;
2995     for(i=0; i < (nat)words; ++i) {
2996       ap->payload[i] = (StgClosure *)*sp++;
2997     }
2998     
2999     switch (get_itbl(su)->type) {
3000       
3001     case UPDATE_FRAME:
3002       {
3003         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3004         TICK_ALLOC_UP_THK(words+1,0);
3005         
3006         IF_DEBUG(scheduler,
3007                  fprintf(stderr,  "scheduler: Updating ");
3008                  printPtr((P_)su->updatee); 
3009                  fprintf(stderr,  " with ");
3010                  printObj((StgClosure *)ap);
3011                  );
3012         
3013         /* Replace the updatee with an indirection - happily
3014          * this will also wake up any threads currently
3015          * waiting on the result.
3016          *
3017          * Warning: if we're in a loop, more than one update frame on
3018          * the stack may point to the same object.  Be careful not to
3019          * overwrite an IND_OLDGEN in this case, because we'll screw
3020          * up the mutable lists.  To be on the safe side, don't
3021          * overwrite any kind of indirection at all.  See also
3022          * threadSqueezeStack in GC.c, where we have to make a similar
3023          * check.
3024          */
3025         if (!closure_IND(su->updatee)) {
3026             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3027         }
3028         su = su->link;
3029         sp += sizeofW(StgUpdateFrame) -1;
3030         sp[0] = (W_)ap; /* push onto stack */
3031         break;
3032       }
3033
3034     case CATCH_FRAME:
3035       {
3036         StgCatchFrame *cf = (StgCatchFrame *)su;
3037         StgClosure* o;
3038         
3039         /* We want a PAP, not an AP_UPD.  Fortunately, the
3040          * layout's the same.
3041          */
3042         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3043         TICK_ALLOC_UPD_PAP(words+1,0);
3044         
3045         /* now build o = FUN(catch,ap,handler) */
3046         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3047         TICK_ALLOC_FUN(2,0);
3048         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3049         o->payload[0] = (StgClosure *)ap;
3050         o->payload[1] = cf->handler;
3051         
3052         IF_DEBUG(scheduler,
3053                  fprintf(stderr,  "scheduler: Built ");
3054                  printObj((StgClosure *)o);
3055                  );
3056         
3057         /* pop the old handler and put o on the stack */
3058         su = cf->link;
3059         sp += sizeofW(StgCatchFrame) - 1;
3060         sp[0] = (W_)o;
3061         break;
3062       }
3063       
3064     case SEQ_FRAME:
3065       {
3066         StgSeqFrame *sf = (StgSeqFrame *)su;
3067         StgClosure* o;
3068         
3069         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3070         TICK_ALLOC_UPD_PAP(words+1,0);
3071         
3072         /* now build o = FUN(seq,ap) */
3073         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3074         TICK_ALLOC_SE_THK(1,0);
3075         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3076         o->payload[0] = (StgClosure *)ap;
3077         
3078         IF_DEBUG(scheduler,
3079                  fprintf(stderr,  "scheduler: Built ");
3080                  printObj((StgClosure *)o);
3081                  );
3082         
3083         /* pop the old handler and put o on the stack */
3084         su = sf->link;
3085         sp += sizeofW(StgSeqFrame) - 1;
3086         sp[0] = (W_)o;
3087         break;
3088       }
3089       
3090     case STOP_FRAME:
3091       /* We've stripped the entire stack, the thread is now dead. */
3092       sp += sizeofW(StgStopFrame) - 1;
3093       sp[0] = (W_)exception;    /* save the exception */
3094       tso->what_next = ThreadKilled;
3095       tso->su = (StgUpdateFrame *)(sp+1);
3096       tso->sp = sp;
3097       return;
3098
3099     default:
3100       barf("raiseAsync");
3101     }
3102   }
3103   barf("raiseAsync");
3104 }
3105
3106 /* -----------------------------------------------------------------------------
3107    resurrectThreads is called after garbage collection on the list of
3108    threads found to be garbage.  Each of these threads will be woken
3109    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3110    on an MVar, or NonTermination if the thread was blocked on a Black
3111    Hole.
3112    -------------------------------------------------------------------------- */
3113
3114 void
3115 resurrectThreads( StgTSO *threads )
3116 {
3117   StgTSO *tso, *next;
3118
3119   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3120     next = tso->global_link;
3121     tso->global_link = all_threads;
3122     all_threads = tso;
3123     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3124
3125     switch (tso->why_blocked) {
3126     case BlockedOnMVar:
3127     case BlockedOnException:
3128       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3129       break;
3130     case BlockedOnBlackHole:
3131       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3132       break;
3133     case NotBlocked:
3134       /* This might happen if the thread was blocked on a black hole
3135        * belonging to a thread that we've just woken up (raiseAsync
3136        * can wake up threads, remember...).
3137        */
3138       continue;
3139     default:
3140       barf("resurrectThreads: thread blocked in a strange way");
3141     }
3142   }
3143 }
3144
3145 /* -----------------------------------------------------------------------------
3146  * Blackhole detection: if we reach a deadlock, test whether any
3147  * threads are blocked on themselves.  Any threads which are found to
3148  * be self-blocked get sent a NonTermination exception.
3149  *
3150  * This is only done in a deadlock situation in order to avoid
3151  * performance overhead in the normal case.
3152  * -------------------------------------------------------------------------- */
3153
3154 static void
3155 detectBlackHoles( void )
3156 {
3157     StgTSO *t = all_threads;
3158     StgUpdateFrame *frame;
3159     StgClosure *blocked_on;
3160
3161     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3162
3163         while (t->what_next == ThreadRelocated) {
3164             t = t->link;
3165             ASSERT(get_itbl(t)->type == TSO);
3166         }
3167       
3168         if (t->why_blocked != BlockedOnBlackHole) {
3169             continue;
3170         }
3171
3172         blocked_on = t->block_info.closure;
3173
3174         for (frame = t->su; ; frame = frame->link) {
3175             switch (get_itbl(frame)->type) {
3176
3177             case UPDATE_FRAME:
3178                 if (frame->updatee == blocked_on) {
3179                     /* We are blocking on one of our own computations, so
3180                      * send this thread the NonTermination exception.  
3181                      */
3182                     IF_DEBUG(scheduler, 
3183                              sched_belch("thread %d is blocked on itself", t->id));
3184                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3185                     goto done;
3186                 }
3187                 else {
3188                     continue;
3189                 }
3190
3191             case CATCH_FRAME:
3192             case SEQ_FRAME:
3193                 continue;
3194                 
3195             case STOP_FRAME:
3196                 break;
3197             }
3198             break;
3199         }
3200
3201     done: ;
3202     }   
3203 }
3204
3205 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3206 //@subsection Debugging Routines
3207
3208 /* -----------------------------------------------------------------------------
3209    Debugging: why is a thread blocked
3210    -------------------------------------------------------------------------- */
3211
3212 #ifdef DEBUG
3213
3214 void
3215 printThreadBlockage(StgTSO *tso)
3216 {
3217   switch (tso->why_blocked) {
3218   case BlockedOnRead:
3219     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3220     break;
3221   case BlockedOnWrite:
3222     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3223     break;
3224   case BlockedOnDelay:
3225     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3226     break;
3227   case BlockedOnMVar:
3228     fprintf(stderr,"is blocked on an MVar");
3229     break;
3230   case BlockedOnException:
3231     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3232             tso->block_info.tso->id);
3233     break;
3234   case BlockedOnBlackHole:
3235     fprintf(stderr,"is blocked on a black hole");
3236     break;
3237   case NotBlocked:
3238     fprintf(stderr,"is not blocked");
3239     break;
3240 #if defined(PAR)
3241   case BlockedOnGA:
3242     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3243             tso->block_info.closure, info_type(tso->block_info.closure));
3244     break;
3245   case BlockedOnGA_NoSend:
3246     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3247             tso->block_info.closure, info_type(tso->block_info.closure));
3248     break;
3249 #endif
3250   default:
3251     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3252          tso->why_blocked, tso->id, tso);
3253   }
3254 }
3255
3256 void
3257 printThreadStatus(StgTSO *tso)
3258 {
3259   switch (tso->what_next) {
3260   case ThreadKilled:
3261     fprintf(stderr,"has been killed");
3262     break;
3263   case ThreadComplete:
3264     fprintf(stderr,"has completed");
3265     break;
3266   default:
3267     printThreadBlockage(tso);
3268   }
3269 }
3270
3271 void
3272 printAllThreads(void)
3273 {
3274   StgTSO *t;
3275
3276 # if defined(GRAN)
3277   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3278   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3279                        time_string, rtsFalse/*no commas!*/);
3280
3281   sched_belch("all threads at [%s]:", time_string);
3282 # elif defined(PAR)
3283   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3284   ullong_format_string(CURRENT_TIME,
3285                        time_string, rtsFalse/*no commas!*/);
3286
3287   sched_belch("all threads at [%s]:", time_string);
3288 # else
3289   sched_belch("all threads:");
3290 # endif
3291
3292   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3293     fprintf(stderr, "\tthread %d ", t->id);
3294     printThreadStatus(t);
3295     fprintf(stderr,"\n");
3296   }
3297 }
3298     
3299 /* 
3300    Print a whole blocking queue attached to node (debugging only).
3301 */
3302 //@cindex print_bq
3303 # if defined(PAR)
3304 void 
3305 print_bq (StgClosure *node)
3306 {
3307   StgBlockingQueueElement *bqe;
3308   StgTSO *tso;
3309   rtsBool end;
3310
3311   fprintf(stderr,"## BQ of closure %p (%s): ",
3312           node, info_type(node));
3313
3314   /* should cover all closures that may have a blocking queue */
3315   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3316          get_itbl(node)->type == FETCH_ME_BQ ||
3317          get_itbl(node)->type == RBH ||
3318          get_itbl(node)->type == MVAR);
3319     
3320   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3321
3322   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3323 }
3324
3325 /* 
3326    Print a whole blocking queue starting with the element bqe.
3327 */
3328 void 
3329 print_bqe (StgBlockingQueueElement *bqe)
3330 {
3331   rtsBool end;
3332
3333   /* 
3334      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3335   */
3336   for (end = (bqe==END_BQ_QUEUE);
3337        !end; // iterate until bqe points to a CONSTR
3338        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3339        bqe = end ? END_BQ_QUEUE : bqe->link) {
3340     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3341     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3342     /* types of closures that may appear in a blocking queue */
3343     ASSERT(get_itbl(bqe)->type == TSO ||           
3344            get_itbl(bqe)->type == BLOCKED_FETCH || 
3345            get_itbl(bqe)->type == CONSTR); 
3346     /* only BQs of an RBH end with an RBH_Save closure */
3347     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3348
3349     switch (get_itbl(bqe)->type) {
3350     case TSO:
3351       fprintf(stderr," TSO %u (%x),",
3352               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3353       break;
3354     case BLOCKED_FETCH:
3355       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3356               ((StgBlockedFetch *)bqe)->node, 
3357               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3358               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3359               ((StgBlockedFetch *)bqe)->ga.weight);
3360       break;
3361     case CONSTR:
3362       fprintf(stderr," %s (IP %p),",
3363               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3364                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3365                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3366                "RBH_Save_?"), get_itbl(bqe));
3367       break;
3368     default:
3369       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3370            info_type((StgClosure *)bqe)); // , node, info_type(node));
3371       break;
3372     }
3373   } /* for */
3374   fputc('\n', stderr);
3375 }
3376 # elif defined(GRAN)
3377 void 
3378 print_bq (StgClosure *node)
3379 {
3380   StgBlockingQueueElement *bqe;
3381   PEs node_loc, tso_loc;
3382   rtsBool end;
3383
3384   /* should cover all closures that may have a blocking queue */
3385   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3386          get_itbl(node)->type == FETCH_ME_BQ ||
3387          get_itbl(node)->type == RBH);
3388     
3389   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3390   node_loc = where_is(node);
3391
3392   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3393           node, info_type(node), node_loc);
3394
3395   /* 
3396      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3397   */
3398   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3399        !end; // iterate until bqe points to a CONSTR
3400        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3401     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3402     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3403     /* types of closures that may appear in a blocking queue */
3404     ASSERT(get_itbl(bqe)->type == TSO ||           
3405            get_itbl(bqe)->type == CONSTR); 
3406     /* only BQs of an RBH end with an RBH_Save closure */
3407     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3408
3409     tso_loc = where_is((StgClosure *)bqe);
3410     switch (get_itbl(bqe)->type) {
3411     case TSO:
3412       fprintf(stderr," TSO %d (%p) on [PE %d],",
3413               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3414       break;
3415     case CONSTR:
3416       fprintf(stderr," %s (IP %p),",
3417               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3418                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3419                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3420                "RBH_Save_?"), get_itbl(bqe));
3421       break;
3422     default:
3423       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3424            info_type((StgClosure *)bqe), node, info_type(node));
3425       break;
3426     }
3427   } /* for */
3428   fputc('\n', stderr);
3429 }
3430 #else
3431 /* 
3432    Nice and easy: only TSOs on the blocking queue
3433 */
3434 void 
3435 print_bq (StgClosure *node)
3436 {
3437   StgTSO *tso;
3438
3439   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3440   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3441        tso != END_TSO_QUEUE; 
3442        tso=tso->link) {
3443     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3444     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3445     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3446   }
3447   fputc('\n', stderr);
3448 }
3449 # endif
3450
3451 #if defined(PAR)
3452 static nat
3453 run_queue_len(void)
3454 {
3455   nat i;
3456   StgTSO *tso;
3457
3458   for (i=0, tso=run_queue_hd; 
3459        tso != END_TSO_QUEUE;
3460        i++, tso=tso->link)
3461     /* nothing */
3462
3463   return i;
3464 }
3465 #endif
3466
3467 static void
3468 sched_belch(char *s, ...)
3469 {
3470   va_list ap;
3471   va_start(ap,s);
3472 #ifdef SMP
3473   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3474 #elif defined(PAR)
3475   fprintf(stderr, "== ");
3476 #else
3477   fprintf(stderr, "scheduler: ");
3478 #endif
3479   vfprintf(stderr, s, ap);
3480   fprintf(stderr, "\n");
3481 }
3482
3483 #endif /* DEBUG */
3484
3485
3486 //@node Index,  , Debugging Routines, Main scheduling code
3487 //@subsection Index
3488
3489 //@index
3490 //* MainRegTable::  @cindex\s-+MainRegTable
3491 //* StgMainThread::  @cindex\s-+StgMainThread
3492 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3493 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3494 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3495 //* context_switch::  @cindex\s-+context_switch
3496 //* createThread::  @cindex\s-+createThread
3497 //* free_capabilities::  @cindex\s-+free_capabilities
3498 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3499 //* initScheduler::  @cindex\s-+initScheduler
3500 //* interrupted::  @cindex\s-+interrupted
3501 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3502 //* next_thread_id::  @cindex\s-+next_thread_id
3503 //* print_bq::  @cindex\s-+print_bq
3504 //* run_queue_hd::  @cindex\s-+run_queue_hd
3505 //* run_queue_tl::  @cindex\s-+run_queue_tl
3506 //* sched_mutex::  @cindex\s-+sched_mutex
3507 //* schedule::  @cindex\s-+schedule
3508 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3509 //* task_ids::  @cindex\s-+task_ids
3510 //* term_mutex::  @cindex\s-+term_mutex
3511 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3512 //@end index