[project @ 2001-10-27 22:05:48 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.103 2001/10/27 22:05:48 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "PosixSource.h"
78 #include "Rts.h"
79 #include "SchedAPI.h"
80 #include "RtsUtils.h"
81 #include "RtsFlags.h"
82 #include "Storage.h"
83 #include "StgRun.h"
84 #include "StgStartup.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #if defined(GRAN) || defined(PAR)
99 # include "GranSimRts.h"
100 # include "GranSim.h"
101 # include "ParallelRts.h"
102 # include "Parallel.h"
103 # include "ParallelDebug.h"
104 # include "FetchMe.h"
105 # include "HLC.h"
106 #endif
107 #include "Sparks.h"
108
109 #include <stdarg.h>
110
111 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
112 //@subsection Variables and Data structures
113
114 /* Main threads:
115  *
116  * These are the threads which clients have requested that we run.  
117  *
118  * In an SMP build, we might have several concurrent clients all
119  * waiting for results, and each one will wait on a condition variable
120  * until the result is available.
121  *
122  * In non-SMP, clients are strictly nested: the first client calls
123  * into the RTS, which might call out again to C with a _ccall_GC, and
124  * eventually re-enter the RTS.
125  *
126  * Main threads information is kept in a linked list:
127  */
128 //@cindex StgMainThread
129 typedef struct StgMainThread_ {
130   StgTSO *         tso;
131   SchedulerStatus  stat;
132   StgClosure **    ret;
133 #ifdef SMP
134   pthread_cond_t wakeup;
135 #endif
136   struct StgMainThread_ *link;
137 } StgMainThread;
138
139 /* Main thread queue.
140  * Locks required: sched_mutex.
141  */
142 static StgMainThread *main_threads;
143
144 /* Thread queues.
145  * Locks required: sched_mutex.
146  */
147 #if defined(GRAN)
148
149 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
150 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
151
152 /* 
153    In GranSim we have a runable and a blocked queue for each processor.
154    In order to minimise code changes new arrays run_queue_hds/tls
155    are created. run_queue_hd is then a short cut (macro) for
156    run_queue_hds[CurrentProc] (see GranSim.h).
157    -- HWL
158 */
159 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
160 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
161 StgTSO *ccalling_threadss[MAX_PROC];
162 /* We use the same global list of threads (all_threads) in GranSim as in
163    the std RTS (i.e. we are cheating). However, we don't use this list in
164    the GranSim specific code at the moment (so we are only potentially
165    cheating).  */
166
167 #else /* !GRAN */
168
169 StgTSO *run_queue_hd, *run_queue_tl;
170 StgTSO *blocked_queue_hd, *blocked_queue_tl;
171 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
172
173 #endif
174
175 /* Linked list of all threads.
176  * Used for detecting garbage collected threads.
177  */
178 StgTSO *all_threads;
179
180 /* Threads suspended in _ccall_GC.
181  */
182 static StgTSO *suspended_ccalling_threads;
183
184 static void GetRoots(evac_fn);
185 static StgTSO *threadStackOverflow(StgTSO *tso);
186
187 /* KH: The following two flags are shared memory locations.  There is no need
188        to lock them, since they are only unset at the end of a scheduler
189        operation.
190 */
191
192 /* flag set by signal handler to precipitate a context switch */
193 //@cindex context_switch
194 nat context_switch;
195
196 /* if this flag is set as well, give up execution */
197 //@cindex interrupted
198 rtsBool interrupted;
199
200 /* Next thread ID to allocate.
201  * Locks required: sched_mutex
202  */
203 //@cindex next_thread_id
204 StgThreadID next_thread_id = 1;
205
206 /*
207  * Pointers to the state of the current thread.
208  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
209  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
210  */
211  
212 /* The smallest stack size that makes any sense is:
213  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
214  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
215  *  + 1                       (the realworld token for an IO thread)
216  *  + 1                       (the closure to enter)
217  *
218  * A thread with this stack will bomb immediately with a stack
219  * overflow, which will increase its stack size.  
220  */
221
222 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
223
224 /* Free capability list.
225  * Locks required: sched_mutex.
226  */
227 #ifdef SMP
228 //@cindex free_capabilities
229 //@cindex n_free_capabilities
230 Capability *free_capabilities; /* Available capabilities for running threads */
231 nat n_free_capabilities;       /* total number of available capabilities */
232 #else
233 //@cindex MainRegTable
234 Capability MainRegTable;       /* for non-SMP, we have one global capability */
235 #endif
236
237 #if defined(GRAN)
238 StgTSO *CurrentTSO;
239 #endif
240
241 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
242  *  exists - earlier gccs apparently didn't.
243  *  -= chak
244  */
245 StgTSO dummy_tso;
246
247 rtsBool ready_to_gc;
248
249 /* All our current task ids, saved in case we need to kill them later.
250  */
251 #ifdef SMP
252 //@cindex task_ids
253 task_info *task_ids;
254 #endif
255
256 void            addToBlockedQueue ( StgTSO *tso );
257
258 static void     schedule          ( void );
259        void     interruptStgRts   ( void );
260 #if defined(GRAN)
261 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
262 #else
263 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
264 #endif
265
266 static void     detectBlackHoles  ( void );
267
268 #ifdef DEBUG
269 static void sched_belch(char *s, ...);
270 #endif
271
272 #ifdef SMP
273 //@cindex sched_mutex
274 //@cindex term_mutex
275 //@cindex thread_ready_cond
276 //@cindex gc_pending_cond
277 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
278 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
279 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
280 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
281
282 nat await_death;
283 #endif
284
285 #if defined(PAR)
286 StgTSO *LastTSO;
287 rtsTime TimeOfLastYield;
288 rtsBool emitSchedule = rtsTrue;
289 #endif
290
291 #if DEBUG
292 char *whatNext_strs[] = {
293   "ThreadEnterGHC",
294   "ThreadRunGHC",
295   "ThreadEnterInterp",
296   "ThreadKilled",
297   "ThreadComplete"
298 };
299
300 char *threadReturnCode_strs[] = {
301   "HeapOverflow",                       /* might also be StackOverflow */
302   "StackOverflow",
303   "ThreadYielding",
304   "ThreadBlocked",
305   "ThreadFinished"
306 };
307 #endif
308
309 #ifdef PAR
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);  
312 #endif
313
314 /*
315  * The thread state for the main thread.
316 // ToDo: check whether not needed any more
317 StgTSO   *MainTSO;
318  */
319
320 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
321 //@subsection Main scheduling loop
322
323 /* ---------------------------------------------------------------------------
324    Main scheduling loop.
325
326    We use round-robin scheduling, each thread returning to the
327    scheduler loop when one of these conditions is detected:
328
329       * out of heap space
330       * timer expires (thread yields)
331       * thread blocks
332       * thread ends
333       * stack overflow
334
335    Locking notes:  we acquire the scheduler lock once at the beginning
336    of the scheduler loop, and release it when
337     
338       * running a thread, or
339       * waiting for work, or
340       * waiting for a GC to complete.
341
342    GRAN version:
343      In a GranSim setup this loop iterates over the global event queue.
344      This revolves around the global event queue, which determines what 
345      to do next. Therefore, it's more complicated than either the 
346      concurrent or the parallel (GUM) setup.
347
348    GUM version:
349      GUM iterates over incoming messages.
350      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
351      and sends out a fish whenever it has nothing to do; in-between
352      doing the actual reductions (shared code below) it processes the
353      incoming messages and deals with delayed operations 
354      (see PendingFetches).
355      This is not the ugliest code you could imagine, but it's bloody close.
356
357    ------------------------------------------------------------------------ */
358 //@cindex schedule
359 static void
360 schedule( void )
361 {
362   StgTSO *t;
363   Capability *cap;
364   StgThreadReturnCode ret;
365 #if defined(GRAN)
366   rtsEvent *event;
367 #elif defined(PAR)
368   StgSparkPool *pool;
369   rtsSpark spark;
370   StgTSO *tso;
371   GlobalTaskId pe;
372   rtsBool receivedFinish = rtsFalse;
373 # if defined(DEBUG)
374   nat tp_size, sp_size; // stats only
375 # endif
376 #endif
377   rtsBool was_interrupted = rtsFalse;
378   
379   ACQUIRE_LOCK(&sched_mutex);
380
381 #if defined(GRAN)
382
383   /* set up first event to get things going */
384   /* ToDo: assign costs for system setup and init MainTSO ! */
385   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
386             ContinueThread, 
387             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
388
389   IF_DEBUG(gran,
390            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391            G_TSO(CurrentTSO, 5));
392
393   if (RtsFlags.GranFlags.Light) {
394     /* Save current time; GranSim Light only */
395     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
396   }      
397
398   event = get_next_event();
399
400   while (event!=(rtsEvent*)NULL) {
401     /* Choose the processor with the next event */
402     CurrentProc = event->proc;
403     CurrentTSO = event->tso;
404
405 #elif defined(PAR)
406
407   while (!receivedFinish) {    /* set by processMessages */
408                                /* when receiving PP_FINISH message         */ 
409 #else
410
411   while (1) {
412
413 #endif
414
415     IF_DEBUG(scheduler, printAllThreads());
416
417     /* If we're interrupted (the user pressed ^C, or some other
418      * termination condition occurred), kill all the currently running
419      * threads.
420      */
421     if (interrupted) {
422       IF_DEBUG(scheduler, sched_belch("interrupted"));
423       deleteAllThreads();
424       interrupted = rtsFalse;
425       was_interrupted = rtsTrue;
426     }
427
428     /* Go through the list of main threads and wake up any
429      * clients whose computations have finished.  ToDo: this
430      * should be done more efficiently without a linear scan
431      * of the main threads list, somehow...
432      */
433 #ifdef SMP
434     { 
435       StgMainThread *m, **prev;
436       prev = &main_threads;
437       for (m = main_threads; m != NULL; m = m->link) {
438         switch (m->tso->what_next) {
439         case ThreadComplete:
440           if (m->ret) {
441             *(m->ret) = (StgClosure *)m->tso->sp[0];
442           }
443           *prev = m->link;
444           m->stat = Success;
445           pthread_cond_broadcast(&m->wakeup);
446           break;
447         case ThreadKilled:
448           if (m->ret) *(m->ret) = NULL;
449           *prev = m->link;
450           if (was_interrupted) {
451             m->stat = Interrupted;
452           } else {
453             m->stat = Killed;
454           }
455           pthread_cond_broadcast(&m->wakeup);
456           break;
457         default:
458           break;
459         }
460       }
461     }
462
463 #else
464 # if defined(PAR)
465     /* in GUM do this only on the Main PE */
466     if (IAmMainThread)
467 # endif
468     /* If our main thread has finished or been killed, return.
469      */
470     {
471       StgMainThread *m = main_threads;
472       if (m->tso->what_next == ThreadComplete
473           || m->tso->what_next == ThreadKilled) {
474         main_threads = main_threads->link;
475         if (m->tso->what_next == ThreadComplete) {
476           /* we finished successfully, fill in the return value */
477           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
478           m->stat = Success;
479           return;
480         } else {
481           if (m->ret) { *(m->ret) = NULL; };
482           if (was_interrupted) {
483             m->stat = Interrupted;
484           } else {
485             m->stat = Killed;
486           }
487           return;
488         }
489       }
490     }
491 #endif
492
493     /* Top up the run queue from our spark pool.  We try to make the
494      * number of threads in the run queue equal to the number of
495      * free capabilities.
496      */
497 #if defined(SMP)
498     {
499       nat n = n_free_capabilities;
500       StgTSO *tso = run_queue_hd;
501
502       /* Count the run queue */
503       while (n > 0 && tso != END_TSO_QUEUE) {
504         tso = tso->link;
505         n--;
506       }
507
508       for (; n > 0; n--) {
509         StgClosure *spark;
510         spark = findSpark(rtsFalse);
511         if (spark == NULL) {
512           break; /* no more sparks in the pool */
513         } else {
514           /* I'd prefer this to be done in activateSpark -- HWL */
515           /* tricky - it needs to hold the scheduler lock and
516            * not try to re-acquire it -- SDM */
517           createSparkThread(spark);       
518           IF_DEBUG(scheduler,
519                    sched_belch("==^^ turning spark of closure %p into a thread",
520                                (StgClosure *)spark));
521         }
522       }
523       /* We need to wake up the other tasks if we just created some
524        * work for them.
525        */
526       if (n_free_capabilities - n > 1) {
527           pthread_cond_signal(&thread_ready_cond);
528       }
529     }
530 #endif /* SMP */
531
532     /* Check whether any waiting threads need to be woken up.  If the
533      * run queue is empty, and there are no other tasks running, we
534      * can wait indefinitely for something to happen.
535      * ToDo: what if another client comes along & requests another
536      * main thread?
537      */
538     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
539       awaitEvent(
540            (run_queue_hd == END_TSO_QUEUE)
541 #ifdef SMP
542         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
543 #endif
544         );
545     }
546     /* we can be interrupted while waiting for I/O... */
547     if (interrupted) continue;
548
549     /* check for signals each time around the scheduler */
550 #ifndef mingw32_TARGET_OS
551     if (signals_pending()) {
552       start_signal_handlers();
553     }
554 #endif
555
556     /* 
557      * Detect deadlock: when we have no threads to run, there are no
558      * threads waiting on I/O or sleeping, and all the other tasks are
559      * waiting for work, we must have a deadlock of some description.
560      *
561      * We first try to find threads blocked on themselves (ie. black
562      * holes), and generate NonTermination exceptions where necessary.
563      *
564      * If no threads are black holed, we have a deadlock situation, so
565      * inform all the main threads.
566      */
567 #ifndef PAR
568     if (blocked_queue_hd == END_TSO_QUEUE
569         && run_queue_hd == END_TSO_QUEUE
570         && sleeping_queue == END_TSO_QUEUE
571 #ifdef SMP
572         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
573 #endif
574         )
575     {
576         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
577         GarbageCollect(GetRoots,rtsTrue);
578         if (blocked_queue_hd == END_TSO_QUEUE
579             && run_queue_hd == END_TSO_QUEUE
580             && sleeping_queue == END_TSO_QUEUE) {
581             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
582             detectBlackHoles();
583             if (run_queue_hd == END_TSO_QUEUE) {
584                 StgMainThread *m = main_threads;
585 #ifdef SMP
586                 for (; m != NULL; m = m->link) {
587                     deleteThread(m->tso);
588                     m->ret = NULL;
589                     m->stat = Deadlock;
590                     pthread_cond_broadcast(&m->wakeup);
591                 }
592                 main_threads = NULL;
593 #else
594                 deleteThread(m->tso);
595                 m->ret = NULL;
596                 m->stat = Deadlock;
597                 main_threads = m->link;
598                 return;
599 #endif
600             }
601         }
602     }
603 #elif defined(PAR)
604     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
605 #endif
606
607 #ifdef SMP
608     /* If there's a GC pending, don't do anything until it has
609      * completed.
610      */
611     if (ready_to_gc) {
612       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
613       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
614     }
615     
616     /* block until we've got a thread on the run queue and a free
617      * capability.
618      */
619     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
620       IF_DEBUG(scheduler, sched_belch("waiting for work"));
621       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
622       IF_DEBUG(scheduler, sched_belch("work now available"));
623     }
624 #endif
625
626 #if defined(GRAN)
627
628     if (RtsFlags.GranFlags.Light)
629       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
630
631     /* adjust time based on time-stamp */
632     if (event->time > CurrentTime[CurrentProc] &&
633         event->evttype != ContinueThread)
634       CurrentTime[CurrentProc] = event->time;
635     
636     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
637     if (!RtsFlags.GranFlags.Light)
638       handleIdlePEs();
639
640     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
641
642     /* main event dispatcher in GranSim */
643     switch (event->evttype) {
644       /* Should just be continuing execution */
645     case ContinueThread:
646       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
647       /* ToDo: check assertion
648       ASSERT(run_queue_hd != (StgTSO*)NULL &&
649              run_queue_hd != END_TSO_QUEUE);
650       */
651       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
652       if (!RtsFlags.GranFlags.DoAsyncFetch &&
653           procStatus[CurrentProc]==Fetching) {
654         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
655               CurrentTSO->id, CurrentTSO, CurrentProc);
656         goto next_thread;
657       } 
658       /* Ignore ContinueThreads for completed threads */
659       if (CurrentTSO->what_next == ThreadComplete) {
660         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
661               CurrentTSO->id, CurrentTSO, CurrentProc);
662         goto next_thread;
663       } 
664       /* Ignore ContinueThreads for threads that are being migrated */
665       if (PROCS(CurrentTSO)==Nowhere) { 
666         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
667               CurrentTSO->id, CurrentTSO, CurrentProc);
668         goto next_thread;
669       }
670       /* The thread should be at the beginning of the run queue */
671       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
672         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
673               CurrentTSO->id, CurrentTSO, CurrentProc);
674         break; // run the thread anyway
675       }
676       /*
677       new_event(proc, proc, CurrentTime[proc],
678                 FindWork,
679                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
680       goto next_thread; 
681       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
682       break; // now actually run the thread; DaH Qu'vam yImuHbej 
683
684     case FetchNode:
685       do_the_fetchnode(event);
686       goto next_thread;             /* handle next event in event queue  */
687       
688     case GlobalBlock:
689       do_the_globalblock(event);
690       goto next_thread;             /* handle next event in event queue  */
691       
692     case FetchReply:
693       do_the_fetchreply(event);
694       goto next_thread;             /* handle next event in event queue  */
695       
696     case UnblockThread:   /* Move from the blocked queue to the tail of */
697       do_the_unblock(event);
698       goto next_thread;             /* handle next event in event queue  */
699       
700     case ResumeThread:  /* Move from the blocked queue to the tail of */
701       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
702       event->tso->gran.blocktime += 
703         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
704       do_the_startthread(event);
705       goto next_thread;             /* handle next event in event queue  */
706       
707     case StartThread:
708       do_the_startthread(event);
709       goto next_thread;             /* handle next event in event queue  */
710       
711     case MoveThread:
712       do_the_movethread(event);
713       goto next_thread;             /* handle next event in event queue  */
714       
715     case MoveSpark:
716       do_the_movespark(event);
717       goto next_thread;             /* handle next event in event queue  */
718       
719     case FindWork:
720       do_the_findwork(event);
721       goto next_thread;             /* handle next event in event queue  */
722       
723     default:
724       barf("Illegal event type %u\n", event->evttype);
725     }  /* switch */
726     
727     /* This point was scheduler_loop in the old RTS */
728
729     IF_DEBUG(gran, belch("GRAN: after main switch"));
730
731     TimeOfLastEvent = CurrentTime[CurrentProc];
732     TimeOfNextEvent = get_time_of_next_event();
733     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
734     // CurrentTSO = ThreadQueueHd;
735
736     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
737                          TimeOfNextEvent));
738
739     if (RtsFlags.GranFlags.Light) 
740       GranSimLight_leave_system(event, &ActiveTSO); 
741
742     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
743
744     IF_DEBUG(gran, 
745              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
746
747     /* in a GranSim setup the TSO stays on the run queue */
748     t = CurrentTSO;
749     /* Take a thread from the run queue. */
750     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
751
752     IF_DEBUG(gran, 
753              fprintf(stderr, "GRAN: About to run current thread, which is\n");
754              G_TSO(t,5));
755
756     context_switch = 0; // turned on via GranYield, checking events and time slice
757
758     IF_DEBUG(gran, 
759              DumpGranEvent(GR_SCHEDULE, t));
760
761     procStatus[CurrentProc] = Busy;
762
763 #elif defined(PAR)
764     if (PendingFetches != END_BF_QUEUE) {
765         processFetches();
766     }
767
768     /* ToDo: phps merge with spark activation above */
769     /* check whether we have local work and send requests if we have none */
770     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
771       /* :-[  no local threads => look out for local sparks */
772       /* the spark pool for the current PE */
773       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
774       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
775           pool->hd < pool->tl) {
776         /* 
777          * ToDo: add GC code check that we really have enough heap afterwards!!
778          * Old comment:
779          * If we're here (no runnable threads) and we have pending
780          * sparks, we must have a space problem.  Get enough space
781          * to turn one of those pending sparks into a
782          * thread... 
783          */
784
785         spark = findSpark(rtsFalse);                /* get a spark */
786         if (spark != (rtsSpark) NULL) {
787           tso = activateSpark(spark);       /* turn the spark into a thread */
788           IF_PAR_DEBUG(schedule,
789                        belch("==== schedule: Created TSO %d (%p); %d threads active",
790                              tso->id, tso, advisory_thread_count));
791
792           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
793             belch("==^^ failed to activate spark");
794             goto next_thread;
795           }               /* otherwise fall through & pick-up new tso */
796         } else {
797           IF_PAR_DEBUG(verbose,
798                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
799                              spark_queue_len(pool)));
800           goto next_thread;
801         }
802       }
803
804       /* If we still have no work we need to send a FISH to get a spark
805          from another PE 
806       */
807       if (EMPTY_RUN_QUEUE()) {
808       /* =8-[  no local sparks => look for work on other PEs */
809         /*
810          * We really have absolutely no work.  Send out a fish
811          * (there may be some out there already), and wait for
812          * something to arrive.  We clearly can't run any threads
813          * until a SCHEDULE or RESUME arrives, and so that's what
814          * we're hoping to see.  (Of course, we still have to
815          * respond to other types of messages.)
816          */
817         TIME now = msTime() /*CURRENT_TIME*/;
818         IF_PAR_DEBUG(verbose, 
819                      belch("--  now=%ld", now));
820         IF_PAR_DEBUG(verbose,
821                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
822                          (last_fish_arrived_at!=0 &&
823                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
824                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
825                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
826                              last_fish_arrived_at,
827                              RtsFlags.ParFlags.fishDelay, now);
828                      });
829         
830         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
831             (last_fish_arrived_at==0 ||
832              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
833           /* outstandingFishes is set in sendFish, processFish;
834              avoid flooding system with fishes via delay */
835           pe = choosePE();
836           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
837                    NEW_FISH_HUNGER);
838
839           // Global statistics: count no. of fishes
840           if (RtsFlags.ParFlags.ParStats.Global &&
841               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
842             globalParStats.tot_fish_mess++;
843           }
844         }
845       
846         receivedFinish = processMessages();
847         goto next_thread;
848       }
849     } else if (PacketsWaiting()) {  /* Look for incoming messages */
850       receivedFinish = processMessages();
851     }
852
853     /* Now we are sure that we have some work available */
854     ASSERT(run_queue_hd != END_TSO_QUEUE);
855
856     /* Take a thread from the run queue, if we have work */
857     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
858     IF_DEBUG(sanity,checkTSO(t));
859
860     /* ToDo: write something to the log-file
861     if (RTSflags.ParFlags.granSimStats && !sameThread)
862         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
863
864     CurrentTSO = t;
865     */
866     /* the spark pool for the current PE */
867     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
868
869     IF_DEBUG(scheduler, 
870              belch("--=^ %d threads, %d sparks on [%#x]", 
871                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
872
873 #if 1
874     if (0 && RtsFlags.ParFlags.ParStats.Full && 
875         t && LastTSO && t->id != LastTSO->id && 
876         LastTSO->why_blocked == NotBlocked && 
877         LastTSO->what_next != ThreadComplete) {
878       // if previously scheduled TSO not blocked we have to record the context switch
879       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
880                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
881     }
882
883     if (RtsFlags.ParFlags.ParStats.Full && 
884         (emitSchedule /* forced emit */ ||
885         (t && LastTSO && t->id != LastTSO->id))) {
886       /* 
887          we are running a different TSO, so write a schedule event to log file
888          NB: If we use fair scheduling we also have to write  a deschedule 
889              event for LastTSO; with unfair scheduling we know that the
890              previous tso has blocked whenever we switch to another tso, so
891              we don't need it in GUM for now
892       */
893       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
894                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
895       emitSchedule = rtsFalse;
896     }
897      
898 #endif
899 #else /* !GRAN && !PAR */
900   
901     /* grab a thread from the run queue
902      */
903     ASSERT(run_queue_hd != END_TSO_QUEUE);
904     t = POP_RUN_QUEUE();
905     IF_DEBUG(sanity,checkTSO(t));
906
907 #endif
908     
909     /* grab a capability
910      */
911 #ifdef SMP
912     cap = free_capabilities;
913     free_capabilities = cap->link;
914     n_free_capabilities--;
915 #else
916     cap = &MainRegTable;
917 #endif
918
919     cap->rCurrentTSO = t;
920     
921     /* context switches are now initiated by the timer signal, unless
922      * the user specified "context switch as often as possible", with
923      * +RTS -C0
924      */
925     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
926         && (run_queue_hd != END_TSO_QUEUE
927             || blocked_queue_hd != END_TSO_QUEUE
928             || sleeping_queue != END_TSO_QUEUE))
929         context_switch = 1;
930     else
931         context_switch = 0;
932
933     RELEASE_LOCK(&sched_mutex);
934
935     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
936                               t->id, t, whatNext_strs[t->what_next]));
937
938     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
939     /* Run the current thread 
940      */
941     switch (cap->rCurrentTSO->what_next) {
942     case ThreadKilled:
943     case ThreadComplete:
944         /* Thread already finished, return to scheduler. */
945         ret = ThreadFinished;
946         break;
947     case ThreadEnterGHC:
948         ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
949         break;
950     case ThreadRunGHC:
951         ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
952         break;
953     case ThreadEnterInterp:
954         ret = interpretBCO(cap);
955         break;
956     default:
957       barf("schedule: invalid what_next field");
958     }
959     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
960     
961     /* Costs for the scheduler are assigned to CCS_SYSTEM */
962 #ifdef PROFILING
963     CCCS = CCS_SYSTEM;
964 #endif
965     
966     ACQUIRE_LOCK(&sched_mutex);
967
968 #ifdef SMP
969     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
970 #elif !defined(GRAN) && !defined(PAR)
971     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
972 #endif
973     t = cap->rCurrentTSO;
974     
975 #if defined(PAR)
976     /* HACK 675: if the last thread didn't yield, make sure to print a 
977        SCHEDULE event to the log file when StgRunning the next thread, even
978        if it is the same one as before */
979     LastTSO = t; 
980     TimeOfLastYield = CURRENT_TIME;
981 #endif
982
983     switch (ret) {
984     case HeapOverflow:
985 #if defined(GRAN)
986       IF_DEBUG(gran, 
987                DumpGranEvent(GR_DESCHEDULE, t));
988       globalGranStats.tot_heapover++;
989 #elif defined(PAR)
990       // IF_DEBUG(par, 
991       //DumpGranEvent(GR_DESCHEDULE, t);
992       globalParStats.tot_heapover++;
993 #endif
994       /* make all the running tasks block on a condition variable,
995        * maybe set context_switch and wait till they all pile in,
996        * then have them wait on a GC condition variable.
997        */
998       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
999                                t->id, t, whatNext_strs[t->what_next]));
1000       threadPaused(t);
1001 #if defined(GRAN)
1002       ASSERT(!is_on_queue(t,CurrentProc));
1003 #elif defined(PAR)
1004       /* Currently we emit a DESCHEDULE event before GC in GUM.
1005          ToDo: either add separate event to distinguish SYSTEM time from rest
1006                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1007       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1008         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1009                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1010         emitSchedule = rtsTrue;
1011       }
1012 #endif
1013       
1014       ready_to_gc = rtsTrue;
1015       context_switch = 1;               /* stop other threads ASAP */
1016       PUSH_ON_RUN_QUEUE(t);
1017       /* actual GC is done at the end of the while loop */
1018       break;
1019       
1020     case StackOverflow:
1021 #if defined(GRAN)
1022       IF_DEBUG(gran, 
1023                DumpGranEvent(GR_DESCHEDULE, t));
1024       globalGranStats.tot_stackover++;
1025 #elif defined(PAR)
1026       // IF_DEBUG(par, 
1027       // DumpGranEvent(GR_DESCHEDULE, t);
1028       globalParStats.tot_stackover++;
1029 #endif
1030       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1031                                t->id, t, whatNext_strs[t->what_next]));
1032       /* just adjust the stack for this thread, then pop it back
1033        * on the run queue.
1034        */
1035       threadPaused(t);
1036       { 
1037         StgMainThread *m;
1038         /* enlarge the stack */
1039         StgTSO *new_t = threadStackOverflow(t);
1040         
1041         /* This TSO has moved, so update any pointers to it from the
1042          * main thread stack.  It better not be on any other queues...
1043          * (it shouldn't be).
1044          */
1045         for (m = main_threads; m != NULL; m = m->link) {
1046           if (m->tso == t) {
1047             m->tso = new_t;
1048           }
1049         }
1050         threadPaused(new_t);
1051         PUSH_ON_RUN_QUEUE(new_t);
1052       }
1053       break;
1054
1055     case ThreadYielding:
1056 #if defined(GRAN)
1057       IF_DEBUG(gran, 
1058                DumpGranEvent(GR_DESCHEDULE, t));
1059       globalGranStats.tot_yields++;
1060 #elif defined(PAR)
1061       // IF_DEBUG(par, 
1062       // DumpGranEvent(GR_DESCHEDULE, t);
1063       globalParStats.tot_yields++;
1064 #endif
1065       /* put the thread back on the run queue.  Then, if we're ready to
1066        * GC, check whether this is the last task to stop.  If so, wake
1067        * up the GC thread.  getThread will block during a GC until the
1068        * GC is finished.
1069        */
1070       IF_DEBUG(scheduler,
1071                if (t->what_next == ThreadEnterInterp) {
1072                    /* ToDo: or maybe a timer expired when we were in Hugs?
1073                     * or maybe someone hit ctrl-C
1074                     */
1075                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1076                          t->id, t, whatNext_strs[t->what_next]);
1077                } else {
1078                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1079                          t->id, t, whatNext_strs[t->what_next]);
1080                }
1081                );
1082
1083       threadPaused(t);
1084
1085       IF_DEBUG(sanity,
1086                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1087                checkTSO(t));
1088       ASSERT(t->link == END_TSO_QUEUE);
1089 #if defined(GRAN)
1090       ASSERT(!is_on_queue(t,CurrentProc));
1091
1092       IF_DEBUG(sanity,
1093                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1094                checkThreadQsSanity(rtsTrue));
1095 #endif
1096 #if defined(PAR)
1097       if (RtsFlags.ParFlags.doFairScheduling) { 
1098         /* this does round-robin scheduling; good for concurrency */
1099         APPEND_TO_RUN_QUEUE(t);
1100       } else {
1101         /* this does unfair scheduling; good for parallelism */
1102         PUSH_ON_RUN_QUEUE(t);
1103       }
1104 #else
1105       /* this does round-robin scheduling; good for concurrency */
1106       APPEND_TO_RUN_QUEUE(t);
1107 #endif
1108 #if defined(GRAN)
1109       /* add a ContinueThread event to actually process the thread */
1110       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1111                 ContinueThread,
1112                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1113       IF_GRAN_DEBUG(bq, 
1114                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1115                G_EVENTQ(0);
1116                G_CURR_THREADQ(0));
1117 #endif /* GRAN */
1118       break;
1119       
1120     case ThreadBlocked:
1121 #if defined(GRAN)
1122       IF_DEBUG(scheduler,
1123                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1124                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1125                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1126
1127       // ??? needed; should emit block before
1128       IF_DEBUG(gran, 
1129                DumpGranEvent(GR_DESCHEDULE, t)); 
1130       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1131       /*
1132         ngoq Dogh!
1133       ASSERT(procStatus[CurrentProc]==Busy || 
1134               ((procStatus[CurrentProc]==Fetching) && 
1135               (t->block_info.closure!=(StgClosure*)NULL)));
1136       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1137           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1138             procStatus[CurrentProc]==Fetching)) 
1139         procStatus[CurrentProc] = Idle;
1140       */
1141 #elif defined(PAR)
1142       IF_DEBUG(scheduler,
1143                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1144                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1145       IF_PAR_DEBUG(bq,
1146
1147                    if (t->block_info.closure!=(StgClosure*)NULL) 
1148                      print_bq(t->block_info.closure));
1149
1150       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1151       blockThread(t);
1152
1153       /* whatever we schedule next, we must log that schedule */
1154       emitSchedule = rtsTrue;
1155
1156 #else /* !GRAN */
1157       /* don't need to do anything.  Either the thread is blocked on
1158        * I/O, in which case we'll have called addToBlockedQueue
1159        * previously, or it's blocked on an MVar or Blackhole, in which
1160        * case it'll be on the relevant queue already.
1161        */
1162       IF_DEBUG(scheduler,
1163                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1164                printThreadBlockage(t);
1165                fprintf(stderr, "\n"));
1166
1167       /* Only for dumping event to log file 
1168          ToDo: do I need this in GranSim, too?
1169       blockThread(t);
1170       */
1171 #endif
1172       threadPaused(t);
1173       break;
1174       
1175     case ThreadFinished:
1176       /* Need to check whether this was a main thread, and if so, signal
1177        * the task that started it with the return value.  If we have no
1178        * more main threads, we probably need to stop all the tasks until
1179        * we get a new one.
1180        */
1181       /* We also end up here if the thread kills itself with an
1182        * uncaught exception, see Exception.hc.
1183        */
1184       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1185 #if defined(GRAN)
1186       endThread(t, CurrentProc); // clean-up the thread
1187 #elif defined(PAR)
1188       /* For now all are advisory -- HWL */
1189       //if(t->priority==AdvisoryPriority) ??
1190       advisory_thread_count--;
1191       
1192 # ifdef DIST
1193       if(t->dist.priority==RevalPriority)
1194         FinishReval(t);
1195 # endif
1196       
1197       if (RtsFlags.ParFlags.ParStats.Full &&
1198           !RtsFlags.ParFlags.ParStats.Suppressed) 
1199         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1200 #endif
1201       break;
1202       
1203     default:
1204       barf("schedule: invalid thread return code %d", (int)ret);
1205     }
1206     
1207 #ifdef SMP
1208     cap->link = free_capabilities;
1209     free_capabilities = cap;
1210     n_free_capabilities++;
1211 #endif
1212
1213 #ifdef SMP
1214     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1215 #else
1216     if (ready_to_gc) 
1217 #endif
1218       {
1219       /* everybody back, start the GC.
1220        * Could do it in this thread, or signal a condition var
1221        * to do it in another thread.  Either way, we need to
1222        * broadcast on gc_pending_cond afterward.
1223        */
1224 #ifdef SMP
1225       IF_DEBUG(scheduler,sched_belch("doing GC"));
1226 #endif
1227       GarbageCollect(GetRoots,rtsFalse);
1228       ready_to_gc = rtsFalse;
1229 #ifdef SMP
1230       pthread_cond_broadcast(&gc_pending_cond);
1231 #endif
1232 #if defined(GRAN)
1233       /* add a ContinueThread event to continue execution of current thread */
1234       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1235                 ContinueThread,
1236                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1237       IF_GRAN_DEBUG(bq, 
1238                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1239                G_EVENTQ(0);
1240                G_CURR_THREADQ(0));
1241 #endif /* GRAN */
1242     }
1243 #if defined(GRAN)
1244   next_thread:
1245     IF_GRAN_DEBUG(unused,
1246                   print_eventq(EventHd));
1247
1248     event = get_next_event();
1249
1250 #elif defined(PAR)
1251   next_thread:
1252     /* ToDo: wait for next message to arrive rather than busy wait */
1253
1254 #else /* GRAN */
1255   /* not any more
1256   next_thread:
1257     t = take_off_run_queue(END_TSO_QUEUE);
1258   */
1259 #endif /* GRAN */
1260   } /* end of while(1) */
1261   IF_PAR_DEBUG(verbose,
1262                belch("== Leaving schedule() after having received Finish"));
1263 }
1264
1265 /* ---------------------------------------------------------------------------
1266  * deleteAllThreads():  kill all the live threads.
1267  *
1268  * This is used when we catch a user interrupt (^C), before performing
1269  * any necessary cleanups and running finalizers.
1270  * ------------------------------------------------------------------------- */
1271    
1272 void deleteAllThreads ( void )
1273 {
1274   StgTSO* t;
1275   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1276   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1277       deleteThread(t);
1278   }
1279   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1280       deleteThread(t);
1281   }
1282   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1283       deleteThread(t);
1284   }
1285   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1286   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1287   sleeping_queue = END_TSO_QUEUE;
1288 }
1289
1290 /* startThread and  insertThread are now in GranSim.c -- HWL */
1291
1292 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1293 //@subsection Suspend and Resume
1294
1295 /* ---------------------------------------------------------------------------
1296  * Suspending & resuming Haskell threads.
1297  * 
1298  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1299  * its capability before calling the C function.  This allows another
1300  * task to pick up the capability and carry on running Haskell
1301  * threads.  It also means that if the C call blocks, it won't lock
1302  * the whole system.
1303  *
1304  * The Haskell thread making the C call is put to sleep for the
1305  * duration of the call, on the susepended_ccalling_threads queue.  We
1306  * give out a token to the task, which it can use to resume the thread
1307  * on return from the C function.
1308  * ------------------------------------------------------------------------- */
1309    
1310 StgInt
1311 suspendThread( Capability *cap )
1312 {
1313   nat tok;
1314
1315   ACQUIRE_LOCK(&sched_mutex);
1316
1317   IF_DEBUG(scheduler,
1318            sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id));
1319
1320   threadPaused(cap->rCurrentTSO);
1321   cap->rCurrentTSO->link = suspended_ccalling_threads;
1322   suspended_ccalling_threads = cap->rCurrentTSO;
1323
1324   /* Use the thread ID as the token; it should be unique */
1325   tok = cap->rCurrentTSO->id;
1326
1327 #ifdef SMP
1328   cap->link = free_capabilities;
1329   free_capabilities = cap;
1330   n_free_capabilities++;
1331 #endif
1332
1333   RELEASE_LOCK(&sched_mutex);
1334   return tok; 
1335 }
1336
1337 Capability *
1338 resumeThread( StgInt tok )
1339 {
1340   StgTSO *tso, **prev;
1341   Capability *cap;
1342
1343   ACQUIRE_LOCK(&sched_mutex);
1344
1345   prev = &suspended_ccalling_threads;
1346   for (tso = suspended_ccalling_threads; 
1347        tso != END_TSO_QUEUE; 
1348        prev = &tso->link, tso = tso->link) {
1349     if (tso->id == (StgThreadID)tok) {
1350       *prev = tso->link;
1351       break;
1352     }
1353   }
1354   if (tso == END_TSO_QUEUE) {
1355     barf("resumeThread: thread not found");
1356   }
1357   tso->link = END_TSO_QUEUE;
1358
1359 #ifdef SMP
1360   while (free_capabilities == NULL) {
1361     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1362     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1363     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1364   }
1365   cap = free_capabilities;
1366   free_capabilities = cap->link;
1367   n_free_capabilities--;
1368 #else  
1369   cap = &MainRegTable;
1370 #endif
1371
1372   cap->rCurrentTSO = tso;
1373
1374   RELEASE_LOCK(&sched_mutex);
1375   return cap;
1376 }
1377
1378
1379 /* ---------------------------------------------------------------------------
1380  * Static functions
1381  * ------------------------------------------------------------------------ */
1382 static void unblockThread(StgTSO *tso);
1383
1384 /* ---------------------------------------------------------------------------
1385  * Comparing Thread ids.
1386  *
1387  * This is used from STG land in the implementation of the
1388  * instances of Eq/Ord for ThreadIds.
1389  * ------------------------------------------------------------------------ */
1390
1391 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1392
1393   StgThreadID id1 = tso1->id; 
1394   StgThreadID id2 = tso2->id;
1395  
1396   if (id1 < id2) return (-1);
1397   if (id1 > id2) return 1;
1398   return 0;
1399 }
1400
1401 /* ---------------------------------------------------------------------------
1402  * Fetching the ThreadID from an StgTSO.
1403  *
1404  * This is used in the implementation of Show for ThreadIds.
1405  * ------------------------------------------------------------------------ */
1406 int rts_getThreadId(const StgTSO *tso) 
1407 {
1408   return tso->id;
1409 }
1410
1411 /* ---------------------------------------------------------------------------
1412    Create a new thread.
1413
1414    The new thread starts with the given stack size.  Before the
1415    scheduler can run, however, this thread needs to have a closure
1416    (and possibly some arguments) pushed on its stack.  See
1417    pushClosure() in Schedule.h.
1418
1419    createGenThread() and createIOThread() (in SchedAPI.h) are
1420    convenient packaged versions of this function.
1421
1422    currently pri (priority) is only used in a GRAN setup -- HWL
1423    ------------------------------------------------------------------------ */
1424 //@cindex createThread
1425 #if defined(GRAN)
1426 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1427 StgTSO *
1428 createThread(nat stack_size, StgInt pri)
1429 {
1430   return createThread_(stack_size, rtsFalse, pri);
1431 }
1432
1433 static StgTSO *
1434 createThread_(nat size, rtsBool have_lock, StgInt pri)
1435 {
1436 #else
1437 StgTSO *
1438 createThread(nat stack_size)
1439 {
1440   return createThread_(stack_size, rtsFalse);
1441 }
1442
1443 static StgTSO *
1444 createThread_(nat size, rtsBool have_lock)
1445 {
1446 #endif
1447
1448     StgTSO *tso;
1449     nat stack_size;
1450
1451     /* First check whether we should create a thread at all */
1452 #if defined(PAR)
1453   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1454   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1455     threadsIgnored++;
1456     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1457           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1458     return END_TSO_QUEUE;
1459   }
1460   threadsCreated++;
1461 #endif
1462
1463 #if defined(GRAN)
1464   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1465 #endif
1466
1467   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1468
1469   /* catch ridiculously small stack sizes */
1470   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1471     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1472   }
1473
1474   stack_size = size - TSO_STRUCT_SIZEW;
1475
1476   tso = (StgTSO *)allocate(size);
1477   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1478
1479   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1480 #if defined(GRAN)
1481   SET_GRAN_HDR(tso, ThisPE);
1482 #endif
1483   tso->what_next     = ThreadEnterGHC;
1484
1485   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1486    * protect the increment operation on next_thread_id.
1487    * In future, we could use an atomic increment instead.
1488    */
1489   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1490   tso->id = next_thread_id++; 
1491   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1492
1493   tso->why_blocked  = NotBlocked;
1494   tso->blocked_exceptions = NULL;
1495
1496   tso->stack_size   = stack_size;
1497   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1498                               - TSO_STRUCT_SIZEW;
1499   tso->sp           = (P_)&(tso->stack) + stack_size;
1500
1501 #ifdef PROFILING
1502   tso->prof.CCCS = CCS_MAIN;
1503 #endif
1504
1505   /* put a stop frame on the stack */
1506   tso->sp -= sizeofW(StgStopFrame);
1507   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1508   tso->su = (StgUpdateFrame*)tso->sp;
1509
1510   // ToDo: check this
1511 #if defined(GRAN)
1512   tso->link = END_TSO_QUEUE;
1513   /* uses more flexible routine in GranSim */
1514   insertThread(tso, CurrentProc);
1515 #else
1516   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1517    * from its creation
1518    */
1519 #endif
1520
1521 #if defined(GRAN) 
1522   if (RtsFlags.GranFlags.GranSimStats.Full) 
1523     DumpGranEvent(GR_START,tso);
1524 #elif defined(PAR)
1525   if (RtsFlags.ParFlags.ParStats.Full) 
1526     DumpGranEvent(GR_STARTQ,tso);
1527   /* HACk to avoid SCHEDULE 
1528      LastTSO = tso; */
1529 #endif
1530
1531   /* Link the new thread on the global thread list.
1532    */
1533   tso->global_link = all_threads;
1534   all_threads = tso;
1535
1536 #if defined(DIST)
1537   tso->dist.priority = MandatoryPriority; //by default that is...
1538 #endif
1539
1540 #if defined(GRAN)
1541   tso->gran.pri = pri;
1542 # if defined(DEBUG)
1543   tso->gran.magic = TSO_MAGIC; // debugging only
1544 # endif
1545   tso->gran.sparkname   = 0;
1546   tso->gran.startedat   = CURRENT_TIME; 
1547   tso->gran.exported    = 0;
1548   tso->gran.basicblocks = 0;
1549   tso->gran.allocs      = 0;
1550   tso->gran.exectime    = 0;
1551   tso->gran.fetchtime   = 0;
1552   tso->gran.fetchcount  = 0;
1553   tso->gran.blocktime   = 0;
1554   tso->gran.blockcount  = 0;
1555   tso->gran.blockedat   = 0;
1556   tso->gran.globalsparks = 0;
1557   tso->gran.localsparks  = 0;
1558   if (RtsFlags.GranFlags.Light)
1559     tso->gran.clock  = Now; /* local clock */
1560   else
1561     tso->gran.clock  = 0;
1562
1563   IF_DEBUG(gran,printTSO(tso));
1564 #elif defined(PAR)
1565 # if defined(DEBUG)
1566   tso->par.magic = TSO_MAGIC; // debugging only
1567 # endif
1568   tso->par.sparkname   = 0;
1569   tso->par.startedat   = CURRENT_TIME; 
1570   tso->par.exported    = 0;
1571   tso->par.basicblocks = 0;
1572   tso->par.allocs      = 0;
1573   tso->par.exectime    = 0;
1574   tso->par.fetchtime   = 0;
1575   tso->par.fetchcount  = 0;
1576   tso->par.blocktime   = 0;
1577   tso->par.blockcount  = 0;
1578   tso->par.blockedat   = 0;
1579   tso->par.globalsparks = 0;
1580   tso->par.localsparks  = 0;
1581 #endif
1582
1583 #if defined(GRAN)
1584   globalGranStats.tot_threads_created++;
1585   globalGranStats.threads_created_on_PE[CurrentProc]++;
1586   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1587   globalGranStats.tot_sq_probes++;
1588 #elif defined(PAR)
1589   // collect parallel global statistics (currently done together with GC stats)
1590   if (RtsFlags.ParFlags.ParStats.Global &&
1591       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1592     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1593     globalParStats.tot_threads_created++;
1594   }
1595 #endif 
1596
1597 #if defined(GRAN)
1598   IF_GRAN_DEBUG(pri,
1599                 belch("==__ schedule: Created TSO %d (%p);",
1600                       CurrentProc, tso, tso->id));
1601 #elif defined(PAR)
1602     IF_PAR_DEBUG(verbose,
1603                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1604                        tso->id, tso, advisory_thread_count));
1605 #else
1606   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1607                                  tso->id, tso->stack_size));
1608 #endif    
1609   return tso;
1610 }
1611
1612 #if defined(PAR)
1613 /* RFP:
1614    all parallel thread creation calls should fall through the following routine.
1615 */
1616 StgTSO *
1617 createSparkThread(rtsSpark spark) 
1618 { StgTSO *tso;
1619   ASSERT(spark != (rtsSpark)NULL);
1620   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1621   { threadsIgnored++;
1622     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1623           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1624     return END_TSO_QUEUE;
1625   }
1626   else
1627   { threadsCreated++;
1628     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1629     if (tso==END_TSO_QUEUE)     
1630       barf("createSparkThread: Cannot create TSO");
1631 #if defined(DIST)
1632     tso->priority = AdvisoryPriority;
1633 #endif
1634     pushClosure(tso,spark);
1635     PUSH_ON_RUN_QUEUE(tso);
1636     advisory_thread_count++;    
1637   }
1638   return tso;
1639 }
1640 #endif
1641
1642 /*
1643   Turn a spark into a thread.
1644   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1645 */
1646 #if defined(PAR)
1647 //@cindex activateSpark
1648 StgTSO *
1649 activateSpark (rtsSpark spark) 
1650 {
1651   StgTSO *tso;
1652
1653   tso = createSparkThread(spark);
1654   if (RtsFlags.ParFlags.ParStats.Full) {   
1655     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1656     IF_PAR_DEBUG(verbose,
1657                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1658                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1659   }
1660   // ToDo: fwd info on local/global spark to thread -- HWL
1661   // tso->gran.exported =  spark->exported;
1662   // tso->gran.locked =   !spark->global;
1663   // tso->gran.sparkname = spark->name;
1664
1665   return tso;
1666 }
1667 #endif
1668
1669 /* ---------------------------------------------------------------------------
1670  * scheduleThread()
1671  *
1672  * scheduleThread puts a thread on the head of the runnable queue.
1673  * This will usually be done immediately after a thread is created.
1674  * The caller of scheduleThread must create the thread using e.g.
1675  * createThread and push an appropriate closure
1676  * on this thread's stack before the scheduler is invoked.
1677  * ------------------------------------------------------------------------ */
1678
1679 void
1680 scheduleThread(StgTSO *tso)
1681 {
1682   if (tso==END_TSO_QUEUE){    
1683     schedule();
1684     return;
1685   }
1686
1687   ACQUIRE_LOCK(&sched_mutex);
1688
1689   /* Put the new thread on the head of the runnable queue.  The caller
1690    * better push an appropriate closure on this thread's stack
1691    * beforehand.  In the SMP case, the thread may start running as
1692    * soon as we release the scheduler lock below.
1693    */
1694   PUSH_ON_RUN_QUEUE(tso);
1695   THREAD_RUNNABLE();
1696
1697 #if 0
1698   IF_DEBUG(scheduler,printTSO(tso));
1699 #endif
1700   RELEASE_LOCK(&sched_mutex);
1701 }
1702
1703 /* ---------------------------------------------------------------------------
1704  * startTasks()
1705  *
1706  * Start up Posix threads to run each of the scheduler tasks.
1707  * I believe the task ids are not needed in the system as defined.
1708  *  KH @ 25/10/99
1709  * ------------------------------------------------------------------------ */
1710
1711 #if defined(PAR) || defined(SMP)
1712 void
1713 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1714 {
1715   scheduleThread(END_TSO_QUEUE);
1716 }
1717 #endif
1718
1719 /* ---------------------------------------------------------------------------
1720  * initScheduler()
1721  *
1722  * Initialise the scheduler.  This resets all the queues - if the
1723  * queues contained any threads, they'll be garbage collected at the
1724  * next pass.
1725  *
1726  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1727  * ------------------------------------------------------------------------ */
1728
1729 #ifdef SMP
1730 static void
1731 term_handler(int sig STG_UNUSED)
1732 {
1733   stat_workerStop();
1734   ACQUIRE_LOCK(&term_mutex);
1735   await_death--;
1736   RELEASE_LOCK(&term_mutex);
1737   pthread_exit(NULL);
1738 }
1739 #endif
1740
1741 //@cindex initScheduler
1742 void 
1743 initScheduler(void)
1744 {
1745 #if defined(GRAN)
1746   nat i;
1747
1748   for (i=0; i<=MAX_PROC; i++) {
1749     run_queue_hds[i]      = END_TSO_QUEUE;
1750     run_queue_tls[i]      = END_TSO_QUEUE;
1751     blocked_queue_hds[i]  = END_TSO_QUEUE;
1752     blocked_queue_tls[i]  = END_TSO_QUEUE;
1753     ccalling_threadss[i]  = END_TSO_QUEUE;
1754     sleeping_queue        = END_TSO_QUEUE;
1755   }
1756 #else
1757   run_queue_hd      = END_TSO_QUEUE;
1758   run_queue_tl      = END_TSO_QUEUE;
1759   blocked_queue_hd  = END_TSO_QUEUE;
1760   blocked_queue_tl  = END_TSO_QUEUE;
1761   sleeping_queue    = END_TSO_QUEUE;
1762 #endif 
1763
1764   suspended_ccalling_threads  = END_TSO_QUEUE;
1765
1766   main_threads = NULL;
1767   all_threads  = END_TSO_QUEUE;
1768
1769   context_switch = 0;
1770   interrupted    = 0;
1771
1772   RtsFlags.ConcFlags.ctxtSwitchTicks =
1773       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1774
1775   /* Install the SIGHUP handler */
1776 #ifdef SMP
1777   {
1778     struct sigaction action,oact;
1779
1780     action.sa_handler = term_handler;
1781     sigemptyset(&action.sa_mask);
1782     action.sa_flags = 0;
1783     if (sigaction(SIGTERM, &action, &oact) != 0) {
1784       barf("can't install TERM handler");
1785     }
1786   }
1787 #endif
1788
1789 #ifdef SMP
1790   /* Allocate N Capabilities */
1791   {
1792     nat i;
1793     Capability *cap, *prev;
1794     cap  = NULL;
1795     prev = NULL;
1796     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1797       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1798       cap->link = prev;
1799       prev = cap;
1800     }
1801     free_capabilities = cap;
1802     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1803   }
1804   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1805                              n_free_capabilities););
1806 #endif
1807
1808 #if defined(SMP) || defined(PAR)
1809   initSparkPools();
1810 #endif
1811 }
1812
1813 #ifdef SMP
1814 void
1815 startTasks( void )
1816 {
1817   nat i;
1818   int r;
1819   pthread_t tid;
1820   
1821   /* make some space for saving all the thread ids */
1822   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1823                             "initScheduler:task_ids");
1824   
1825   /* and create all the threads */
1826   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1827     r = pthread_create(&tid,NULL,taskStart,NULL);
1828     if (r != 0) {
1829       barf("startTasks: Can't create new Posix thread");
1830     }
1831     task_ids[i].id = tid;
1832     task_ids[i].mut_time = 0.0;
1833     task_ids[i].mut_etime = 0.0;
1834     task_ids[i].gc_time = 0.0;
1835     task_ids[i].gc_etime = 0.0;
1836     task_ids[i].elapsedtimestart = elapsedtime();
1837     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1838   }
1839 }
1840 #endif
1841
1842 void
1843 exitScheduler( void )
1844 {
1845 #ifdef SMP
1846   nat i;
1847
1848   /* Don't want to use pthread_cancel, since we'd have to install
1849    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1850    * all our locks.
1851    */
1852 #if 0
1853   /* Cancel all our tasks */
1854   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1855     pthread_cancel(task_ids[i].id);
1856   }
1857   
1858   /* Wait for all the tasks to terminate */
1859   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1860     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1861                                task_ids[i].id));
1862     pthread_join(task_ids[i].id, NULL);
1863   }
1864 #endif
1865
1866   /* Send 'em all a SIGHUP.  That should shut 'em up.
1867    */
1868   await_death = RtsFlags.ParFlags.nNodes;
1869   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1870     pthread_kill(task_ids[i].id,SIGTERM);
1871   }
1872   while (await_death > 0) {
1873     sched_yield();
1874   }
1875 #endif
1876 }
1877
1878 /* -----------------------------------------------------------------------------
1879    Managing the per-task allocation areas.
1880    
1881    Each capability comes with an allocation area.  These are
1882    fixed-length block lists into which allocation can be done.
1883
1884    ToDo: no support for two-space collection at the moment???
1885    -------------------------------------------------------------------------- */
1886
1887 /* -----------------------------------------------------------------------------
1888  * waitThread is the external interface for running a new computation
1889  * and waiting for the result.
1890  *
1891  * In the non-SMP case, we create a new main thread, push it on the 
1892  * main-thread stack, and invoke the scheduler to run it.  The
1893  * scheduler will return when the top main thread on the stack has
1894  * completed or died, and fill in the necessary fields of the
1895  * main_thread structure.
1896  *
1897  * In the SMP case, we create a main thread as before, but we then
1898  * create a new condition variable and sleep on it.  When our new
1899  * main thread has completed, we'll be woken up and the status/result
1900  * will be in the main_thread struct.
1901  * -------------------------------------------------------------------------- */
1902
1903 int 
1904 howManyThreadsAvail ( void )
1905 {
1906    int i = 0;
1907    StgTSO* q;
1908    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1909       i++;
1910    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1911       i++;
1912    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1913       i++;
1914    return i;
1915 }
1916
1917 void
1918 finishAllThreads ( void )
1919 {
1920    do {
1921       while (run_queue_hd != END_TSO_QUEUE) {
1922          waitThread ( run_queue_hd, NULL );
1923       }
1924       while (blocked_queue_hd != END_TSO_QUEUE) {
1925          waitThread ( blocked_queue_hd, NULL );
1926       }
1927       while (sleeping_queue != END_TSO_QUEUE) {
1928          waitThread ( blocked_queue_hd, NULL );
1929       }
1930    } while 
1931       (blocked_queue_hd != END_TSO_QUEUE || 
1932        run_queue_hd     != END_TSO_QUEUE ||
1933        sleeping_queue   != END_TSO_QUEUE);
1934 }
1935
1936 SchedulerStatus
1937 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1938 {
1939   StgMainThread *m;
1940   SchedulerStatus stat;
1941
1942   ACQUIRE_LOCK(&sched_mutex);
1943   
1944   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1945
1946   m->tso = tso;
1947   m->ret = ret;
1948   m->stat = NoStatus;
1949 #ifdef SMP
1950   pthread_cond_init(&m->wakeup, NULL);
1951 #endif
1952
1953   m->link = main_threads;
1954   main_threads = m;
1955
1956   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
1957                               m->tso->id));
1958
1959 #ifdef SMP
1960   do {
1961     pthread_cond_wait(&m->wakeup, &sched_mutex);
1962   } while (m->stat == NoStatus);
1963 #elif defined(GRAN)
1964   /* GranSim specific init */
1965   CurrentTSO = m->tso;                // the TSO to run
1966   procStatus[MainProc] = Busy;        // status of main PE
1967   CurrentProc = MainProc;             // PE to run it on
1968
1969   schedule();
1970 #else
1971   schedule();
1972   ASSERT(m->stat != NoStatus);
1973 #endif
1974
1975   stat = m->stat;
1976
1977 #ifdef SMP
1978   pthread_cond_destroy(&m->wakeup);
1979 #endif
1980
1981   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
1982                               m->tso->id));
1983   free(m);
1984
1985   RELEASE_LOCK(&sched_mutex);
1986
1987   return stat;
1988 }
1989
1990 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1991 //@subsection Run queue code 
1992
1993 #if 0
1994 /* 
1995    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1996        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1997        implicit global variable that has to be correct when calling these
1998        fcts -- HWL 
1999 */
2000
2001 /* Put the new thread on the head of the runnable queue.
2002  * The caller of createThread better push an appropriate closure
2003  * on this thread's stack before the scheduler is invoked.
2004  */
2005 static /* inline */ void
2006 add_to_run_queue(tso)
2007 StgTSO* tso; 
2008 {
2009   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2010   tso->link = run_queue_hd;
2011   run_queue_hd = tso;
2012   if (run_queue_tl == END_TSO_QUEUE) {
2013     run_queue_tl = tso;
2014   }
2015 }
2016
2017 /* Put the new thread at the end of the runnable queue. */
2018 static /* inline */ void
2019 push_on_run_queue(tso)
2020 StgTSO* tso; 
2021 {
2022   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2023   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2024   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2025   if (run_queue_hd == END_TSO_QUEUE) {
2026     run_queue_hd = tso;
2027   } else {
2028     run_queue_tl->link = tso;
2029   }
2030   run_queue_tl = tso;
2031 }
2032
2033 /* 
2034    Should be inlined because it's used very often in schedule.  The tso
2035    argument is actually only needed in GranSim, where we want to have the
2036    possibility to schedule *any* TSO on the run queue, irrespective of the
2037    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2038    the run queue and dequeue the tso, adjusting the links in the queue. 
2039 */
2040 //@cindex take_off_run_queue
2041 static /* inline */ StgTSO*
2042 take_off_run_queue(StgTSO *tso) {
2043   StgTSO *t, *prev;
2044
2045   /* 
2046      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2047
2048      if tso is specified, unlink that tso from the run_queue (doesn't have
2049      to be at the beginning of the queue); GranSim only 
2050   */
2051   if (tso!=END_TSO_QUEUE) {
2052     /* find tso in queue */
2053     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2054          t!=END_TSO_QUEUE && t!=tso;
2055          prev=t, t=t->link) 
2056       /* nothing */ ;
2057     ASSERT(t==tso);
2058     /* now actually dequeue the tso */
2059     if (prev!=END_TSO_QUEUE) {
2060       ASSERT(run_queue_hd!=t);
2061       prev->link = t->link;
2062     } else {
2063       /* t is at beginning of thread queue */
2064       ASSERT(run_queue_hd==t);
2065       run_queue_hd = t->link;
2066     }
2067     /* t is at end of thread queue */
2068     if (t->link==END_TSO_QUEUE) {
2069       ASSERT(t==run_queue_tl);
2070       run_queue_tl = prev;
2071     } else {
2072       ASSERT(run_queue_tl!=t);
2073     }
2074     t->link = END_TSO_QUEUE;
2075   } else {
2076     /* take tso from the beginning of the queue; std concurrent code */
2077     t = run_queue_hd;
2078     if (t != END_TSO_QUEUE) {
2079       run_queue_hd = t->link;
2080       t->link = END_TSO_QUEUE;
2081       if (run_queue_hd == END_TSO_QUEUE) {
2082         run_queue_tl = END_TSO_QUEUE;
2083       }
2084     }
2085   }
2086   return t;
2087 }
2088
2089 #endif /* 0 */
2090
2091 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2092 //@subsection Garbage Collextion Routines
2093
2094 /* ---------------------------------------------------------------------------
2095    Where are the roots that we know about?
2096
2097         - all the threads on the runnable queue
2098         - all the threads on the blocked queue
2099         - all the threads on the sleeping queue
2100         - all the thread currently executing a _ccall_GC
2101         - all the "main threads"
2102      
2103    ------------------------------------------------------------------------ */
2104
2105 /* This has to be protected either by the scheduler monitor, or by the
2106         garbage collection monitor (probably the latter).
2107         KH @ 25/10/99
2108 */
2109
2110 static void
2111 GetRoots(evac_fn evac)
2112 {
2113   StgMainThread *m;
2114
2115 #if defined(GRAN)
2116   {
2117     nat i;
2118     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2119       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2120           evac((StgClosure **)&run_queue_hds[i]);
2121       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2122           evac((StgClosure **)&run_queue_tls[i]);
2123       
2124       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2125           evac((StgClosure **)&blocked_queue_hds[i]);
2126       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2127           evac((StgClosure **)&blocked_queue_tls[i]);
2128       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2129           evac((StgClosure **)&ccalling_threads[i]);
2130     }
2131   }
2132
2133   markEventQueue();
2134
2135 #else /* !GRAN */
2136   if (run_queue_hd != END_TSO_QUEUE) {
2137       ASSERT(run_queue_tl != END_TSO_QUEUE);
2138       evac((StgClosure **)&run_queue_hd);
2139       evac((StgClosure **)&run_queue_tl);
2140   }
2141   
2142   if (blocked_queue_hd != END_TSO_QUEUE) {
2143       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2144       evac((StgClosure **)&blocked_queue_hd);
2145       evac((StgClosure **)&blocked_queue_tl);
2146   }
2147   
2148   if (sleeping_queue != END_TSO_QUEUE) {
2149       evac((StgClosure **)&sleeping_queue);
2150   }
2151 #endif 
2152
2153   for (m = main_threads; m != NULL; m = m->link) {
2154       evac((StgClosure **)&m->tso);
2155   }
2156   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2157       evac((StgClosure **)&suspended_ccalling_threads);
2158   }
2159
2160 #if defined(SMP) || defined(PAR) || defined(GRAN)
2161   markSparkQueue(evac);
2162 #endif
2163 }
2164
2165 /* -----------------------------------------------------------------------------
2166    performGC
2167
2168    This is the interface to the garbage collector from Haskell land.
2169    We provide this so that external C code can allocate and garbage
2170    collect when called from Haskell via _ccall_GC.
2171
2172    It might be useful to provide an interface whereby the programmer
2173    can specify more roots (ToDo).
2174    
2175    This needs to be protected by the GC condition variable above.  KH.
2176    -------------------------------------------------------------------------- */
2177
2178 void (*extra_roots)(evac_fn);
2179
2180 void
2181 performGC(void)
2182 {
2183   GarbageCollect(GetRoots,rtsFalse);
2184 }
2185
2186 void
2187 performMajorGC(void)
2188 {
2189   GarbageCollect(GetRoots,rtsTrue);
2190 }
2191
2192 static void
2193 AllRoots(evac_fn evac)
2194 {
2195     GetRoots(evac);             // the scheduler's roots
2196     extra_roots(evac);          // the user's roots
2197 }
2198
2199 void
2200 performGCWithRoots(void (*get_roots)(evac_fn))
2201 {
2202   extra_roots = get_roots;
2203   GarbageCollect(AllRoots,rtsFalse);
2204 }
2205
2206 /* -----------------------------------------------------------------------------
2207    Stack overflow
2208
2209    If the thread has reached its maximum stack size, then raise the
2210    StackOverflow exception in the offending thread.  Otherwise
2211    relocate the TSO into a larger chunk of memory and adjust its stack
2212    size appropriately.
2213    -------------------------------------------------------------------------- */
2214
2215 static StgTSO *
2216 threadStackOverflow(StgTSO *tso)
2217 {
2218   nat new_stack_size, new_tso_size, diff, stack_words;
2219   StgPtr new_sp;
2220   StgTSO *dest;
2221
2222   IF_DEBUG(sanity,checkTSO(tso));
2223   if (tso->stack_size >= tso->max_stack_size) {
2224
2225     IF_DEBUG(gc,
2226              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2227                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2228              /* If we're debugging, just print out the top of the stack */
2229              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2230                                               tso->sp+64)));
2231
2232     /* Send this thread the StackOverflow exception */
2233     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2234     return tso;
2235   }
2236
2237   /* Try to double the current stack size.  If that takes us over the
2238    * maximum stack size for this thread, then use the maximum instead.
2239    * Finally round up so the TSO ends up as a whole number of blocks.
2240    */
2241   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2242   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2243                                        TSO_STRUCT_SIZE)/sizeof(W_);
2244   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2245   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2246
2247   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2248
2249   dest = (StgTSO *)allocate(new_tso_size);
2250   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2251
2252   /* copy the TSO block and the old stack into the new area */
2253   memcpy(dest,tso,TSO_STRUCT_SIZE);
2254   stack_words = tso->stack + tso->stack_size - tso->sp;
2255   new_sp = (P_)dest + new_tso_size - stack_words;
2256   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2257
2258   /* relocate the stack pointers... */
2259   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2260   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2261   dest->sp    = new_sp;
2262   dest->stack_size = new_stack_size;
2263         
2264   /* and relocate the update frame list */
2265   relocate_stack(dest, diff);
2266
2267   /* Mark the old TSO as relocated.  We have to check for relocated
2268    * TSOs in the garbage collector and any primops that deal with TSOs.
2269    *
2270    * It's important to set the sp and su values to just beyond the end
2271    * of the stack, so we don't attempt to scavenge any part of the
2272    * dead TSO's stack.
2273    */
2274   tso->what_next = ThreadRelocated;
2275   tso->link = dest;
2276   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2277   tso->su = (StgUpdateFrame *)tso->sp;
2278   tso->why_blocked = NotBlocked;
2279   dest->mut_link = NULL;
2280
2281   IF_PAR_DEBUG(verbose,
2282                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2283                      tso->id, tso, tso->stack_size);
2284                /* If we're debugging, just print out the top of the stack */
2285                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2286                                                 tso->sp+64)));
2287   
2288   IF_DEBUG(sanity,checkTSO(tso));
2289 #if 0
2290   IF_DEBUG(scheduler,printTSO(dest));
2291 #endif
2292
2293   return dest;
2294 }
2295
2296 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2297 //@subsection Blocking Queue Routines
2298
2299 /* ---------------------------------------------------------------------------
2300    Wake up a queue that was blocked on some resource.
2301    ------------------------------------------------------------------------ */
2302
2303 #if defined(GRAN)
2304 static inline void
2305 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2306 {
2307 }
2308 #elif defined(PAR)
2309 static inline void
2310 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2311 {
2312   /* write RESUME events to log file and
2313      update blocked and fetch time (depending on type of the orig closure) */
2314   if (RtsFlags.ParFlags.ParStats.Full) {
2315     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2316                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2317                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2318     if (EMPTY_RUN_QUEUE())
2319       emitSchedule = rtsTrue;
2320
2321     switch (get_itbl(node)->type) {
2322         case FETCH_ME_BQ:
2323           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2324           break;
2325         case RBH:
2326         case FETCH_ME:
2327         case BLACKHOLE_BQ:
2328           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2329           break;
2330 #ifdef DIST
2331         case MVAR:
2332           break;
2333 #endif    
2334         default:
2335           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2336         }
2337       }
2338 }
2339 #endif
2340
2341 #if defined(GRAN)
2342 static StgBlockingQueueElement *
2343 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2344 {
2345     StgTSO *tso;
2346     PEs node_loc, tso_loc;
2347
2348     node_loc = where_is(node); // should be lifted out of loop
2349     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2350     tso_loc = where_is((StgClosure *)tso);
2351     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2352       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2353       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2354       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2355       // insertThread(tso, node_loc);
2356       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2357                 ResumeThread,
2358                 tso, node, (rtsSpark*)NULL);
2359       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2360       // len_local++;
2361       // len++;
2362     } else { // TSO is remote (actually should be FMBQ)
2363       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2364                                   RtsFlags.GranFlags.Costs.gunblocktime +
2365                                   RtsFlags.GranFlags.Costs.latency;
2366       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2367                 UnblockThread,
2368                 tso, node, (rtsSpark*)NULL);
2369       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2370       // len++;
2371     }
2372     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2373     IF_GRAN_DEBUG(bq,
2374                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2375                           (node_loc==tso_loc ? "Local" : "Global"), 
2376                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2377     tso->block_info.closure = NULL;
2378     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2379                              tso->id, tso));
2380 }
2381 #elif defined(PAR)
2382 static StgBlockingQueueElement *
2383 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2384 {
2385     StgBlockingQueueElement *next;
2386
2387     switch (get_itbl(bqe)->type) {
2388     case TSO:
2389       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2390       /* if it's a TSO just push it onto the run_queue */
2391       next = bqe->link;
2392       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2393       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2394       THREAD_RUNNABLE();
2395       unblockCount(bqe, node);
2396       /* reset blocking status after dumping event */
2397       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2398       break;
2399
2400     case BLOCKED_FETCH:
2401       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2402       next = bqe->link;
2403       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2404       PendingFetches = (StgBlockedFetch *)bqe;
2405       break;
2406
2407 # if defined(DEBUG)
2408       /* can ignore this case in a non-debugging setup; 
2409          see comments on RBHSave closures above */
2410     case CONSTR:
2411       /* check that the closure is an RBHSave closure */
2412       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2413              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2414              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2415       break;
2416
2417     default:
2418       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2419            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2420            (StgClosure *)bqe);
2421 # endif
2422     }
2423   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2424   return next;
2425 }
2426
2427 #else /* !GRAN && !PAR */
2428 static StgTSO *
2429 unblockOneLocked(StgTSO *tso)
2430 {
2431   StgTSO *next;
2432
2433   ASSERT(get_itbl(tso)->type == TSO);
2434   ASSERT(tso->why_blocked != NotBlocked);
2435   tso->why_blocked = NotBlocked;
2436   next = tso->link;
2437   PUSH_ON_RUN_QUEUE(tso);
2438   THREAD_RUNNABLE();
2439   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2440   return next;
2441 }
2442 #endif
2443
2444 #if defined(GRAN) || defined(PAR)
2445 inline StgBlockingQueueElement *
2446 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2447 {
2448   ACQUIRE_LOCK(&sched_mutex);
2449   bqe = unblockOneLocked(bqe, node);
2450   RELEASE_LOCK(&sched_mutex);
2451   return bqe;
2452 }
2453 #else
2454 inline StgTSO *
2455 unblockOne(StgTSO *tso)
2456 {
2457   ACQUIRE_LOCK(&sched_mutex);
2458   tso = unblockOneLocked(tso);
2459   RELEASE_LOCK(&sched_mutex);
2460   return tso;
2461 }
2462 #endif
2463
2464 #if defined(GRAN)
2465 void 
2466 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2467 {
2468   StgBlockingQueueElement *bqe;
2469   PEs node_loc;
2470   nat len = 0; 
2471
2472   IF_GRAN_DEBUG(bq, 
2473                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2474                       node, CurrentProc, CurrentTime[CurrentProc], 
2475                       CurrentTSO->id, CurrentTSO));
2476
2477   node_loc = where_is(node);
2478
2479   ASSERT(q == END_BQ_QUEUE ||
2480          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2481          get_itbl(q)->type == CONSTR); // closure (type constructor)
2482   ASSERT(is_unique(node));
2483
2484   /* FAKE FETCH: magically copy the node to the tso's proc;
2485      no Fetch necessary because in reality the node should not have been 
2486      moved to the other PE in the first place
2487   */
2488   if (CurrentProc!=node_loc) {
2489     IF_GRAN_DEBUG(bq, 
2490                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2491                         node, node_loc, CurrentProc, CurrentTSO->id, 
2492                         // CurrentTSO, where_is(CurrentTSO),
2493                         node->header.gran.procs));
2494     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2495     IF_GRAN_DEBUG(bq, 
2496                   belch("## new bitmask of node %p is %#x",
2497                         node, node->header.gran.procs));
2498     if (RtsFlags.GranFlags.GranSimStats.Global) {
2499       globalGranStats.tot_fake_fetches++;
2500     }
2501   }
2502
2503   bqe = q;
2504   // ToDo: check: ASSERT(CurrentProc==node_loc);
2505   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2506     //next = bqe->link;
2507     /* 
2508        bqe points to the current element in the queue
2509        next points to the next element in the queue
2510     */
2511     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2512     //tso_loc = where_is(tso);
2513     len++;
2514     bqe = unblockOneLocked(bqe, node);
2515   }
2516
2517   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2518      the closure to make room for the anchor of the BQ */
2519   if (bqe!=END_BQ_QUEUE) {
2520     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2521     /*
2522     ASSERT((info_ptr==&RBH_Save_0_info) ||
2523            (info_ptr==&RBH_Save_1_info) ||
2524            (info_ptr==&RBH_Save_2_info));
2525     */
2526     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2527     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2528     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2529
2530     IF_GRAN_DEBUG(bq,
2531                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2532                         node, info_type(node)));
2533   }
2534
2535   /* statistics gathering */
2536   if (RtsFlags.GranFlags.GranSimStats.Global) {
2537     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2538     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2539     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2540     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2541   }
2542   IF_GRAN_DEBUG(bq,
2543                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2544                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2545 }
2546 #elif defined(PAR)
2547 void 
2548 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2549 {
2550   StgBlockingQueueElement *bqe;
2551
2552   ACQUIRE_LOCK(&sched_mutex);
2553
2554   IF_PAR_DEBUG(verbose, 
2555                belch("##-_ AwBQ for node %p on [%x]: ",
2556                      node, mytid));
2557 #ifdef DIST  
2558   //RFP
2559   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2560     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2561     return;
2562   }
2563 #endif
2564   
2565   ASSERT(q == END_BQ_QUEUE ||
2566          get_itbl(q)->type == TSO ||           
2567          get_itbl(q)->type == BLOCKED_FETCH || 
2568          get_itbl(q)->type == CONSTR); 
2569
2570   bqe = q;
2571   while (get_itbl(bqe)->type==TSO || 
2572          get_itbl(bqe)->type==BLOCKED_FETCH) {
2573     bqe = unblockOneLocked(bqe, node);
2574   }
2575   RELEASE_LOCK(&sched_mutex);
2576 }
2577
2578 #else   /* !GRAN && !PAR */
2579 void
2580 awakenBlockedQueue(StgTSO *tso)
2581 {
2582   ACQUIRE_LOCK(&sched_mutex);
2583   while (tso != END_TSO_QUEUE) {
2584     tso = unblockOneLocked(tso);
2585   }
2586   RELEASE_LOCK(&sched_mutex);
2587 }
2588 #endif
2589
2590 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2591 //@subsection Exception Handling Routines
2592
2593 /* ---------------------------------------------------------------------------
2594    Interrupt execution
2595    - usually called inside a signal handler so it mustn't do anything fancy.   
2596    ------------------------------------------------------------------------ */
2597
2598 void
2599 interruptStgRts(void)
2600 {
2601     interrupted    = 1;
2602     context_switch = 1;
2603 }
2604
2605 /* -----------------------------------------------------------------------------
2606    Unblock a thread
2607
2608    This is for use when we raise an exception in another thread, which
2609    may be blocked.
2610    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2611    -------------------------------------------------------------------------- */
2612
2613 #if defined(GRAN) || defined(PAR)
2614 /*
2615   NB: only the type of the blocking queue is different in GranSim and GUM
2616       the operations on the queue-elements are the same
2617       long live polymorphism!
2618 */
2619 static void
2620 unblockThread(StgTSO *tso)
2621 {
2622   StgBlockingQueueElement *t, **last;
2623
2624   ACQUIRE_LOCK(&sched_mutex);
2625   switch (tso->why_blocked) {
2626
2627   case NotBlocked:
2628     return;  /* not blocked */
2629
2630   case BlockedOnMVar:
2631     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2632     {
2633       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2634       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2635
2636       last = (StgBlockingQueueElement **)&mvar->head;
2637       for (t = (StgBlockingQueueElement *)mvar->head; 
2638            t != END_BQ_QUEUE; 
2639            last = &t->link, last_tso = t, t = t->link) {
2640         if (t == (StgBlockingQueueElement *)tso) {
2641           *last = (StgBlockingQueueElement *)tso->link;
2642           if (mvar->tail == tso) {
2643             mvar->tail = (StgTSO *)last_tso;
2644           }
2645           goto done;
2646         }
2647       }
2648       barf("unblockThread (MVAR): TSO not found");
2649     }
2650
2651   case BlockedOnBlackHole:
2652     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2653     {
2654       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2655
2656       last = &bq->blocking_queue;
2657       for (t = bq->blocking_queue; 
2658            t != END_BQ_QUEUE; 
2659            last = &t->link, t = t->link) {
2660         if (t == (StgBlockingQueueElement *)tso) {
2661           *last = (StgBlockingQueueElement *)tso->link;
2662           goto done;
2663         }
2664       }
2665       barf("unblockThread (BLACKHOLE): TSO not found");
2666     }
2667
2668   case BlockedOnException:
2669     {
2670       StgTSO *target  = tso->block_info.tso;
2671
2672       ASSERT(get_itbl(target)->type == TSO);
2673
2674       if (target->what_next == ThreadRelocated) {
2675           target = target->link;
2676           ASSERT(get_itbl(target)->type == TSO);
2677       }
2678
2679       ASSERT(target->blocked_exceptions != NULL);
2680
2681       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2682       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2683            t != END_BQ_QUEUE; 
2684            last = &t->link, t = t->link) {
2685         ASSERT(get_itbl(t)->type == TSO);
2686         if (t == (StgBlockingQueueElement *)tso) {
2687           *last = (StgBlockingQueueElement *)tso->link;
2688           goto done;
2689         }
2690       }
2691       barf("unblockThread (Exception): TSO not found");
2692     }
2693
2694   case BlockedOnRead:
2695   case BlockedOnWrite:
2696     {
2697       /* take TSO off blocked_queue */
2698       StgBlockingQueueElement *prev = NULL;
2699       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2700            prev = t, t = t->link) {
2701         if (t == (StgBlockingQueueElement *)tso) {
2702           if (prev == NULL) {
2703             blocked_queue_hd = (StgTSO *)t->link;
2704             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2705               blocked_queue_tl = END_TSO_QUEUE;
2706             }
2707           } else {
2708             prev->link = t->link;
2709             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2710               blocked_queue_tl = (StgTSO *)prev;
2711             }
2712           }
2713           goto done;
2714         }
2715       }
2716       barf("unblockThread (I/O): TSO not found");
2717     }
2718
2719   case BlockedOnDelay:
2720     {
2721       /* take TSO off sleeping_queue */
2722       StgBlockingQueueElement *prev = NULL;
2723       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2724            prev = t, t = t->link) {
2725         if (t == (StgBlockingQueueElement *)tso) {
2726           if (prev == NULL) {
2727             sleeping_queue = (StgTSO *)t->link;
2728           } else {
2729             prev->link = t->link;
2730           }
2731           goto done;
2732         }
2733       }
2734       barf("unblockThread (I/O): TSO not found");
2735     }
2736
2737   default:
2738     barf("unblockThread");
2739   }
2740
2741  done:
2742   tso->link = END_TSO_QUEUE;
2743   tso->why_blocked = NotBlocked;
2744   tso->block_info.closure = NULL;
2745   PUSH_ON_RUN_QUEUE(tso);
2746   RELEASE_LOCK(&sched_mutex);
2747 }
2748 #else
2749 static void
2750 unblockThread(StgTSO *tso)
2751 {
2752   StgTSO *t, **last;
2753
2754   ACQUIRE_LOCK(&sched_mutex);
2755   switch (tso->why_blocked) {
2756
2757   case NotBlocked:
2758     return;  /* not blocked */
2759
2760   case BlockedOnMVar:
2761     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2762     {
2763       StgTSO *last_tso = END_TSO_QUEUE;
2764       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2765
2766       last = &mvar->head;
2767       for (t = mvar->head; t != END_TSO_QUEUE; 
2768            last = &t->link, last_tso = t, t = t->link) {
2769         if (t == tso) {
2770           *last = tso->link;
2771           if (mvar->tail == tso) {
2772             mvar->tail = last_tso;
2773           }
2774           goto done;
2775         }
2776       }
2777       barf("unblockThread (MVAR): TSO not found");
2778     }
2779
2780   case BlockedOnBlackHole:
2781     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2782     {
2783       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2784
2785       last = &bq->blocking_queue;
2786       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2787            last = &t->link, t = t->link) {
2788         if (t == tso) {
2789           *last = tso->link;
2790           goto done;
2791         }
2792       }
2793       barf("unblockThread (BLACKHOLE): TSO not found");
2794     }
2795
2796   case BlockedOnException:
2797     {
2798       StgTSO *target  = tso->block_info.tso;
2799
2800       ASSERT(get_itbl(target)->type == TSO);
2801
2802       while (target->what_next == ThreadRelocated) {
2803           target = target->link;
2804           ASSERT(get_itbl(target)->type == TSO);
2805       }
2806       
2807       ASSERT(target->blocked_exceptions != NULL);
2808
2809       last = &target->blocked_exceptions;
2810       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2811            last = &t->link, t = t->link) {
2812         ASSERT(get_itbl(t)->type == TSO);
2813         if (t == tso) {
2814           *last = tso->link;
2815           goto done;
2816         }
2817       }
2818       barf("unblockThread (Exception): TSO not found");
2819     }
2820
2821   case BlockedOnRead:
2822   case BlockedOnWrite:
2823     {
2824       StgTSO *prev = NULL;
2825       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2826            prev = t, t = t->link) {
2827         if (t == tso) {
2828           if (prev == NULL) {
2829             blocked_queue_hd = t->link;
2830             if (blocked_queue_tl == t) {
2831               blocked_queue_tl = END_TSO_QUEUE;
2832             }
2833           } else {
2834             prev->link = t->link;
2835             if (blocked_queue_tl == t) {
2836               blocked_queue_tl = prev;
2837             }
2838           }
2839           goto done;
2840         }
2841       }
2842       barf("unblockThread (I/O): TSO not found");
2843     }
2844
2845   case BlockedOnDelay:
2846     {
2847       StgTSO *prev = NULL;
2848       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2849            prev = t, t = t->link) {
2850         if (t == tso) {
2851           if (prev == NULL) {
2852             sleeping_queue = t->link;
2853           } else {
2854             prev->link = t->link;
2855           }
2856           goto done;
2857         }
2858       }
2859       barf("unblockThread (I/O): TSO not found");
2860     }
2861
2862   default:
2863     barf("unblockThread");
2864   }
2865
2866  done:
2867   tso->link = END_TSO_QUEUE;
2868   tso->why_blocked = NotBlocked;
2869   tso->block_info.closure = NULL;
2870   PUSH_ON_RUN_QUEUE(tso);
2871   RELEASE_LOCK(&sched_mutex);
2872 }
2873 #endif
2874
2875 /* -----------------------------------------------------------------------------
2876  * raiseAsync()
2877  *
2878  * The following function implements the magic for raising an
2879  * asynchronous exception in an existing thread.
2880  *
2881  * We first remove the thread from any queue on which it might be
2882  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2883  *
2884  * We strip the stack down to the innermost CATCH_FRAME, building
2885  * thunks in the heap for all the active computations, so they can 
2886  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2887  * an application of the handler to the exception, and push it on
2888  * the top of the stack.
2889  * 
2890  * How exactly do we save all the active computations?  We create an
2891  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2892  * AP_UPDs pushes everything from the corresponding update frame
2893  * upwards onto the stack.  (Actually, it pushes everything up to the
2894  * next update frame plus a pointer to the next AP_UPD object.
2895  * Entering the next AP_UPD object pushes more onto the stack until we
2896  * reach the last AP_UPD object - at which point the stack should look
2897  * exactly as it did when we killed the TSO and we can continue
2898  * execution by entering the closure on top of the stack.
2899  *
2900  * We can also kill a thread entirely - this happens if either (a) the 
2901  * exception passed to raiseAsync is NULL, or (b) there's no
2902  * CATCH_FRAME on the stack.  In either case, we strip the entire
2903  * stack and replace the thread with a zombie.
2904  *
2905  * -------------------------------------------------------------------------- */
2906  
2907 void 
2908 deleteThread(StgTSO *tso)
2909 {
2910   raiseAsync(tso,NULL);
2911 }
2912
2913 void
2914 raiseAsync(StgTSO *tso, StgClosure *exception)
2915 {
2916   StgUpdateFrame* su = tso->su;
2917   StgPtr          sp = tso->sp;
2918   
2919   /* Thread already dead? */
2920   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2921     return;
2922   }
2923
2924   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2925
2926   /* Remove it from any blocking queues */
2927   unblockThread(tso);
2928
2929   /* The stack freezing code assumes there's a closure pointer on
2930    * the top of the stack.  This isn't always the case with compiled
2931    * code, so we have to push a dummy closure on the top which just
2932    * returns to the next return address on the stack.
2933    */
2934   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2935     *(--sp) = (W_)&stg_dummy_ret_closure;
2936   }
2937
2938   while (1) {
2939     nat words = ((P_)su - (P_)sp) - 1;
2940     nat i;
2941     StgAP_UPD * ap;
2942
2943     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2944      * then build PAP(handler,exception,realworld#), and leave it on
2945      * top of the stack ready to enter.
2946      */
2947     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2948       StgCatchFrame *cf = (StgCatchFrame *)su;
2949       /* we've got an exception to raise, so let's pass it to the
2950        * handler in this frame.
2951        */
2952       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2953       TICK_ALLOC_UPD_PAP(3,0);
2954       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
2955               
2956       ap->n_args = 2;
2957       ap->fun = cf->handler;    /* :: Exception -> IO a */
2958       ap->payload[0] = exception;
2959       ap->payload[1] = ARG_TAG(0); /* realworld token */
2960
2961       /* throw away the stack from Sp up to and including the
2962        * CATCH_FRAME.
2963        */
2964       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2965       tso->su = cf->link;
2966
2967       /* Restore the blocked/unblocked state for asynchronous exceptions
2968        * at the CATCH_FRAME.  
2969        *
2970        * If exceptions were unblocked at the catch, arrange that they
2971        * are unblocked again after executing the handler by pushing an
2972        * unblockAsyncExceptions_ret stack frame.
2973        */
2974       if (!cf->exceptions_blocked) {
2975         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
2976       }
2977       
2978       /* Ensure that async exceptions are blocked when running the handler.
2979        */
2980       if (tso->blocked_exceptions == NULL) {
2981         tso->blocked_exceptions = END_TSO_QUEUE;
2982       }
2983       
2984       /* Put the newly-built PAP on top of the stack, ready to execute
2985        * when the thread restarts.
2986        */
2987       sp[0] = (W_)ap;
2988       tso->sp = sp;
2989       tso->what_next = ThreadEnterGHC;
2990       IF_DEBUG(sanity, checkTSO(tso));
2991       return;
2992     }
2993
2994     /* First build an AP_UPD consisting of the stack chunk above the
2995      * current update frame, with the top word on the stack as the
2996      * fun field.
2997      */
2998     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2999     
3000     ASSERT(words >= 0);
3001     
3002     ap->n_args = words;
3003     ap->fun    = (StgClosure *)sp[0];
3004     sp++;
3005     for(i=0; i < (nat)words; ++i) {
3006       ap->payload[i] = (StgClosure *)*sp++;
3007     }
3008     
3009     switch (get_itbl(su)->type) {
3010       
3011     case UPDATE_FRAME:
3012       {
3013         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3014         TICK_ALLOC_UP_THK(words+1,0);
3015         
3016         IF_DEBUG(scheduler,
3017                  fprintf(stderr,  "scheduler: Updating ");
3018                  printPtr((P_)su->updatee); 
3019                  fprintf(stderr,  " with ");
3020                  printObj((StgClosure *)ap);
3021                  );
3022         
3023         /* Replace the updatee with an indirection - happily
3024          * this will also wake up any threads currently
3025          * waiting on the result.
3026          *
3027          * Warning: if we're in a loop, more than one update frame on
3028          * the stack may point to the same object.  Be careful not to
3029          * overwrite an IND_OLDGEN in this case, because we'll screw
3030          * up the mutable lists.  To be on the safe side, don't
3031          * overwrite any kind of indirection at all.  See also
3032          * threadSqueezeStack in GC.c, where we have to make a similar
3033          * check.
3034          */
3035         if (!closure_IND(su->updatee)) {
3036             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3037         }
3038         su = su->link;
3039         sp += sizeofW(StgUpdateFrame) -1;
3040         sp[0] = (W_)ap; /* push onto stack */
3041         break;
3042       }
3043
3044     case CATCH_FRAME:
3045       {
3046         StgCatchFrame *cf = (StgCatchFrame *)su;
3047         StgClosure* o;
3048         
3049         /* We want a PAP, not an AP_UPD.  Fortunately, the
3050          * layout's the same.
3051          */
3052         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3053         TICK_ALLOC_UPD_PAP(words+1,0);
3054         
3055         /* now build o = FUN(catch,ap,handler) */
3056         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3057         TICK_ALLOC_FUN(2,0);
3058         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3059         o->payload[0] = (StgClosure *)ap;
3060         o->payload[1] = cf->handler;
3061         
3062         IF_DEBUG(scheduler,
3063                  fprintf(stderr,  "scheduler: Built ");
3064                  printObj((StgClosure *)o);
3065                  );
3066         
3067         /* pop the old handler and put o on the stack */
3068         su = cf->link;
3069         sp += sizeofW(StgCatchFrame) - 1;
3070         sp[0] = (W_)o;
3071         break;
3072       }
3073       
3074     case SEQ_FRAME:
3075       {
3076         StgSeqFrame *sf = (StgSeqFrame *)su;
3077         StgClosure* o;
3078         
3079         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3080         TICK_ALLOC_UPD_PAP(words+1,0);
3081         
3082         /* now build o = FUN(seq,ap) */
3083         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3084         TICK_ALLOC_SE_THK(1,0);
3085         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3086         o->payload[0] = (StgClosure *)ap;
3087         
3088         IF_DEBUG(scheduler,
3089                  fprintf(stderr,  "scheduler: Built ");
3090                  printObj((StgClosure *)o);
3091                  );
3092         
3093         /* pop the old handler and put o on the stack */
3094         su = sf->link;
3095         sp += sizeofW(StgSeqFrame) - 1;
3096         sp[0] = (W_)o;
3097         break;
3098       }
3099       
3100     case STOP_FRAME:
3101       /* We've stripped the entire stack, the thread is now dead. */
3102       sp += sizeofW(StgStopFrame) - 1;
3103       sp[0] = (W_)exception;    /* save the exception */
3104       tso->what_next = ThreadKilled;
3105       tso->su = (StgUpdateFrame *)(sp+1);
3106       tso->sp = sp;
3107       return;
3108
3109     default:
3110       barf("raiseAsync");
3111     }
3112   }
3113   barf("raiseAsync");
3114 }
3115
3116 /* -----------------------------------------------------------------------------
3117    resurrectThreads is called after garbage collection on the list of
3118    threads found to be garbage.  Each of these threads will be woken
3119    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3120    on an MVar, or NonTermination if the thread was blocked on a Black
3121    Hole.
3122    -------------------------------------------------------------------------- */
3123
3124 void
3125 resurrectThreads( StgTSO *threads )
3126 {
3127   StgTSO *tso, *next;
3128
3129   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3130     next = tso->global_link;
3131     tso->global_link = all_threads;
3132     all_threads = tso;
3133     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3134
3135     switch (tso->why_blocked) {
3136     case BlockedOnMVar:
3137     case BlockedOnException:
3138       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3139       break;
3140     case BlockedOnBlackHole:
3141       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3142       break;
3143     case NotBlocked:
3144       /* This might happen if the thread was blocked on a black hole
3145        * belonging to a thread that we've just woken up (raiseAsync
3146        * can wake up threads, remember...).
3147        */
3148       continue;
3149     default:
3150       barf("resurrectThreads: thread blocked in a strange way");
3151     }
3152   }
3153 }
3154
3155 /* -----------------------------------------------------------------------------
3156  * Blackhole detection: if we reach a deadlock, test whether any
3157  * threads are blocked on themselves.  Any threads which are found to
3158  * be self-blocked get sent a NonTermination exception.
3159  *
3160  * This is only done in a deadlock situation in order to avoid
3161  * performance overhead in the normal case.
3162  * -------------------------------------------------------------------------- */
3163
3164 static void
3165 detectBlackHoles( void )
3166 {
3167     StgTSO *t = all_threads;
3168     StgUpdateFrame *frame;
3169     StgClosure *blocked_on;
3170
3171     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3172
3173         while (t->what_next == ThreadRelocated) {
3174             t = t->link;
3175             ASSERT(get_itbl(t)->type == TSO);
3176         }
3177       
3178         if (t->why_blocked != BlockedOnBlackHole) {
3179             continue;
3180         }
3181
3182         blocked_on = t->block_info.closure;
3183
3184         for (frame = t->su; ; frame = frame->link) {
3185             switch (get_itbl(frame)->type) {
3186
3187             case UPDATE_FRAME:
3188                 if (frame->updatee == blocked_on) {
3189                     /* We are blocking on one of our own computations, so
3190                      * send this thread the NonTermination exception.  
3191                      */
3192                     IF_DEBUG(scheduler, 
3193                              sched_belch("thread %d is blocked on itself", t->id));
3194                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3195                     goto done;
3196                 }
3197                 else {
3198                     continue;
3199                 }
3200
3201             case CATCH_FRAME:
3202             case SEQ_FRAME:
3203                 continue;
3204                 
3205             case STOP_FRAME:
3206                 break;
3207             }
3208             break;
3209         }
3210
3211     done: ;
3212     }   
3213 }
3214
3215 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3216 //@subsection Debugging Routines
3217
3218 /* -----------------------------------------------------------------------------
3219    Debugging: why is a thread blocked
3220    -------------------------------------------------------------------------- */
3221
3222 #ifdef DEBUG
3223
3224 void
3225 printThreadBlockage(StgTSO *tso)
3226 {
3227   switch (tso->why_blocked) {
3228   case BlockedOnRead:
3229     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3230     break;
3231   case BlockedOnWrite:
3232     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3233     break;
3234   case BlockedOnDelay:
3235     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3236     break;
3237   case BlockedOnMVar:
3238     fprintf(stderr,"is blocked on an MVar");
3239     break;
3240   case BlockedOnException:
3241     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3242             tso->block_info.tso->id);
3243     break;
3244   case BlockedOnBlackHole:
3245     fprintf(stderr,"is blocked on a black hole");
3246     break;
3247   case NotBlocked:
3248     fprintf(stderr,"is not blocked");
3249     break;
3250 #if defined(PAR)
3251   case BlockedOnGA:
3252     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3253             tso->block_info.closure, info_type(tso->block_info.closure));
3254     break;
3255   case BlockedOnGA_NoSend:
3256     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3257             tso->block_info.closure, info_type(tso->block_info.closure));
3258     break;
3259 #endif
3260   default:
3261     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3262          tso->why_blocked, tso->id, tso);
3263   }
3264 }
3265
3266 void
3267 printThreadStatus(StgTSO *tso)
3268 {
3269   switch (tso->what_next) {
3270   case ThreadKilled:
3271     fprintf(stderr,"has been killed");
3272     break;
3273   case ThreadComplete:
3274     fprintf(stderr,"has completed");
3275     break;
3276   default:
3277     printThreadBlockage(tso);
3278   }
3279 }
3280
3281 void
3282 printAllThreads(void)
3283 {
3284   StgTSO *t;
3285
3286 # if defined(GRAN)
3287   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3288   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3289                        time_string, rtsFalse/*no commas!*/);
3290
3291   sched_belch("all threads at [%s]:", time_string);
3292 # elif defined(PAR)
3293   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3294   ullong_format_string(CURRENT_TIME,
3295                        time_string, rtsFalse/*no commas!*/);
3296
3297   sched_belch("all threads at [%s]:", time_string);
3298 # else
3299   sched_belch("all threads:");
3300 # endif
3301
3302   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3303     fprintf(stderr, "\tthread %d ", t->id);
3304     printThreadStatus(t);
3305     fprintf(stderr,"\n");
3306   }
3307 }
3308     
3309 /* 
3310    Print a whole blocking queue attached to node (debugging only).
3311 */
3312 //@cindex print_bq
3313 # if defined(PAR)
3314 void 
3315 print_bq (StgClosure *node)
3316 {
3317   StgBlockingQueueElement *bqe;
3318   StgTSO *tso;
3319   rtsBool end;
3320
3321   fprintf(stderr,"## BQ of closure %p (%s): ",
3322           node, info_type(node));
3323
3324   /* should cover all closures that may have a blocking queue */
3325   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3326          get_itbl(node)->type == FETCH_ME_BQ ||
3327          get_itbl(node)->type == RBH ||
3328          get_itbl(node)->type == MVAR);
3329     
3330   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3331
3332   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3333 }
3334
3335 /* 
3336    Print a whole blocking queue starting with the element bqe.
3337 */
3338 void 
3339 print_bqe (StgBlockingQueueElement *bqe)
3340 {
3341   rtsBool end;
3342
3343   /* 
3344      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3345   */
3346   for (end = (bqe==END_BQ_QUEUE);
3347        !end; // iterate until bqe points to a CONSTR
3348        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3349        bqe = end ? END_BQ_QUEUE : bqe->link) {
3350     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3351     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3352     /* types of closures that may appear in a blocking queue */
3353     ASSERT(get_itbl(bqe)->type == TSO ||           
3354            get_itbl(bqe)->type == BLOCKED_FETCH || 
3355            get_itbl(bqe)->type == CONSTR); 
3356     /* only BQs of an RBH end with an RBH_Save closure */
3357     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3358
3359     switch (get_itbl(bqe)->type) {
3360     case TSO:
3361       fprintf(stderr," TSO %u (%x),",
3362               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3363       break;
3364     case BLOCKED_FETCH:
3365       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3366               ((StgBlockedFetch *)bqe)->node, 
3367               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3368               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3369               ((StgBlockedFetch *)bqe)->ga.weight);
3370       break;
3371     case CONSTR:
3372       fprintf(stderr," %s (IP %p),",
3373               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3374                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3375                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3376                "RBH_Save_?"), get_itbl(bqe));
3377       break;
3378     default:
3379       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3380            info_type((StgClosure *)bqe)); // , node, info_type(node));
3381       break;
3382     }
3383   } /* for */
3384   fputc('\n', stderr);
3385 }
3386 # elif defined(GRAN)
3387 void 
3388 print_bq (StgClosure *node)
3389 {
3390   StgBlockingQueueElement *bqe;
3391   PEs node_loc, tso_loc;
3392   rtsBool end;
3393
3394   /* should cover all closures that may have a blocking queue */
3395   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3396          get_itbl(node)->type == FETCH_ME_BQ ||
3397          get_itbl(node)->type == RBH);
3398     
3399   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3400   node_loc = where_is(node);
3401
3402   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3403           node, info_type(node), node_loc);
3404
3405   /* 
3406      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3407   */
3408   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3409        !end; // iterate until bqe points to a CONSTR
3410        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3411     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3412     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3413     /* types of closures that may appear in a blocking queue */
3414     ASSERT(get_itbl(bqe)->type == TSO ||           
3415            get_itbl(bqe)->type == CONSTR); 
3416     /* only BQs of an RBH end with an RBH_Save closure */
3417     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3418
3419     tso_loc = where_is((StgClosure *)bqe);
3420     switch (get_itbl(bqe)->type) {
3421     case TSO:
3422       fprintf(stderr," TSO %d (%p) on [PE %d],",
3423               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3424       break;
3425     case CONSTR:
3426       fprintf(stderr," %s (IP %p),",
3427               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3428                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3429                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3430                "RBH_Save_?"), get_itbl(bqe));
3431       break;
3432     default:
3433       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3434            info_type((StgClosure *)bqe), node, info_type(node));
3435       break;
3436     }
3437   } /* for */
3438   fputc('\n', stderr);
3439 }
3440 #else
3441 /* 
3442    Nice and easy: only TSOs on the blocking queue
3443 */
3444 void 
3445 print_bq (StgClosure *node)
3446 {
3447   StgTSO *tso;
3448
3449   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3450   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3451        tso != END_TSO_QUEUE; 
3452        tso=tso->link) {
3453     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3454     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3455     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3456   }
3457   fputc('\n', stderr);
3458 }
3459 # endif
3460
3461 #if defined(PAR)
3462 static nat
3463 run_queue_len(void)
3464 {
3465   nat i;
3466   StgTSO *tso;
3467
3468   for (i=0, tso=run_queue_hd; 
3469        tso != END_TSO_QUEUE;
3470        i++, tso=tso->link)
3471     /* nothing */
3472
3473   return i;
3474 }
3475 #endif
3476
3477 static void
3478 sched_belch(char *s, ...)
3479 {
3480   va_list ap;
3481   va_start(ap,s);
3482 #ifdef SMP
3483   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3484 #elif defined(PAR)
3485   fprintf(stderr, "== ");
3486 #else
3487   fprintf(stderr, "scheduler: ");
3488 #endif
3489   vfprintf(stderr, s, ap);
3490   fprintf(stderr, "\n");
3491 }
3492
3493 #endif /* DEBUG */
3494
3495
3496 //@node Index,  , Debugging Routines, Main scheduling code
3497 //@subsection Index
3498
3499 //@index
3500 //* MainRegTable::  @cindex\s-+MainRegTable
3501 //* StgMainThread::  @cindex\s-+StgMainThread
3502 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3503 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3504 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3505 //* context_switch::  @cindex\s-+context_switch
3506 //* createThread::  @cindex\s-+createThread
3507 //* free_capabilities::  @cindex\s-+free_capabilities
3508 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3509 //* initScheduler::  @cindex\s-+initScheduler
3510 //* interrupted::  @cindex\s-+interrupted
3511 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3512 //* next_thread_id::  @cindex\s-+next_thread_id
3513 //* print_bq::  @cindex\s-+print_bq
3514 //* run_queue_hd::  @cindex\s-+run_queue_hd
3515 //* run_queue_tl::  @cindex\s-+run_queue_tl
3516 //* sched_mutex::  @cindex\s-+sched_mutex
3517 //* schedule::  @cindex\s-+schedule
3518 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3519 //* task_ids::  @cindex\s-+task_ids
3520 //* term_mutex::  @cindex\s-+term_mutex
3521 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3522 //@end index