c0b47208c0ba72c56bf9657f9c54a77bfa60bd29
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.94 2001/03/22 03:51:10 hwloidl Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "Rts.h"
78 #include "SchedAPI.h"
79 #include "RtsUtils.h"
80 #include "RtsFlags.h"
81 #include "Storage.h"
82 #include "StgRun.h"
83 #include "StgStartup.h"
84 #include "GC.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #if defined(GRAN) || defined(PAR)
99 # include "GranSimRts.h"
100 # include "GranSim.h"
101 # include "ParallelRts.h"
102 # include "Parallel.h"
103 # include "ParallelDebug.h"
104 # include "FetchMe.h"
105 # include "HLC.h"
106 #endif
107 #include "Sparks.h"
108
109 #include <stdarg.h>
110
111 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
112 //@subsection Variables and Data structures
113
114 /* Main threads:
115  *
116  * These are the threads which clients have requested that we run.  
117  *
118  * In an SMP build, we might have several concurrent clients all
119  * waiting for results, and each one will wait on a condition variable
120  * until the result is available.
121  *
122  * In non-SMP, clients are strictly nested: the first client calls
123  * into the RTS, which might call out again to C with a _ccall_GC, and
124  * eventually re-enter the RTS.
125  *
126  * Main threads information is kept in a linked list:
127  */
128 //@cindex StgMainThread
129 typedef struct StgMainThread_ {
130   StgTSO *         tso;
131   SchedulerStatus  stat;
132   StgClosure **    ret;
133 #ifdef SMP
134   pthread_cond_t wakeup;
135 #endif
136   struct StgMainThread_ *link;
137 } StgMainThread;
138
139 /* Main thread queue.
140  * Locks required: sched_mutex.
141  */
142 static StgMainThread *main_threads;
143
144 /* Thread queues.
145  * Locks required: sched_mutex.
146  */
147 #if defined(GRAN)
148
149 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
150 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
151
152 /* 
153    In GranSim we have a runable and a blocked queue for each processor.
154    In order to minimise code changes new arrays run_queue_hds/tls
155    are created. run_queue_hd is then a short cut (macro) for
156    run_queue_hds[CurrentProc] (see GranSim.h).
157    -- HWL
158 */
159 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
160 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
161 StgTSO *ccalling_threadss[MAX_PROC];
162 /* We use the same global list of threads (all_threads) in GranSim as in
163    the std RTS (i.e. we are cheating). However, we don't use this list in
164    the GranSim specific code at the moment (so we are only potentially
165    cheating).  */
166
167 #else /* !GRAN */
168
169 StgTSO *run_queue_hd, *run_queue_tl;
170 StgTSO *blocked_queue_hd, *blocked_queue_tl;
171 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
172
173 #endif
174
175 /* Linked list of all threads.
176  * Used for detecting garbage collected threads.
177  */
178 StgTSO *all_threads;
179
180 /* Threads suspended in _ccall_GC.
181  */
182 static StgTSO *suspended_ccalling_threads;
183
184 static void GetRoots(void);
185 static StgTSO *threadStackOverflow(StgTSO *tso);
186
187 /* KH: The following two flags are shared memory locations.  There is no need
188        to lock them, since they are only unset at the end of a scheduler
189        operation.
190 */
191
192 /* flag set by signal handler to precipitate a context switch */
193 //@cindex context_switch
194 nat context_switch;
195
196 /* if this flag is set as well, give up execution */
197 //@cindex interrupted
198 rtsBool interrupted;
199
200 /* Next thread ID to allocate.
201  * Locks required: sched_mutex
202  */
203 //@cindex next_thread_id
204 StgThreadID next_thread_id = 1;
205
206 /*
207  * Pointers to the state of the current thread.
208  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
209  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
210  */
211  
212 /* The smallest stack size that makes any sense is:
213  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
214  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
215  *  + 1                       (the realworld token for an IO thread)
216  *  + 1                       (the closure to enter)
217  *
218  * A thread with this stack will bomb immediately with a stack
219  * overflow, which will increase its stack size.  
220  */
221
222 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
223
224 /* Free capability list.
225  * Locks required: sched_mutex.
226  */
227 #ifdef SMP
228 //@cindex free_capabilities
229 //@cindex n_free_capabilities
230 Capability *free_capabilities; /* Available capabilities for running threads */
231 nat n_free_capabilities;       /* total number of available capabilities */
232 #else
233 //@cindex MainRegTable
234 Capability MainRegTable;       /* for non-SMP, we have one global capability */
235 #endif
236
237 #if defined(GRAN)
238 StgTSO *CurrentTSO;
239 #endif
240
241 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
242  *  exists - earlier gccs apparently didn't.
243  *  -= chak
244  */
245 StgTSO dummy_tso;
246
247 rtsBool ready_to_gc;
248
249 /* All our current task ids, saved in case we need to kill them later.
250  */
251 #ifdef SMP
252 //@cindex task_ids
253 task_info *task_ids;
254 #endif
255
256 void            addToBlockedQueue ( StgTSO *tso );
257
258 static void     schedule          ( void );
259        void     interruptStgRts   ( void );
260 #if defined(GRAN)
261 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
262 #else
263 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
264 #endif
265
266 static void     detectBlackHoles  ( void );
267
268 #ifdef DEBUG
269 static void sched_belch(char *s, ...);
270 #endif
271
272 #ifdef SMP
273 //@cindex sched_mutex
274 //@cindex term_mutex
275 //@cindex thread_ready_cond
276 //@cindex gc_pending_cond
277 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
278 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
279 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
280 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
281
282 nat await_death;
283 #endif
284
285 #if defined(PAR)
286 StgTSO *LastTSO;
287 rtsTime TimeOfLastYield;
288 rtsBool emitSchedule = rtsTrue;
289 #endif
290
291 #if DEBUG
292 char *whatNext_strs[] = {
293   "ThreadEnterGHC",
294   "ThreadRunGHC",
295   "ThreadEnterInterp",
296   "ThreadKilled",
297   "ThreadComplete"
298 };
299
300 char *threadReturnCode_strs[] = {
301   "HeapOverflow",                       /* might also be StackOverflow */
302   "StackOverflow",
303   "ThreadYielding",
304   "ThreadBlocked",
305   "ThreadFinished"
306 };
307 #endif
308
309 #ifdef PAR
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);  
312 #endif
313
314 /*
315  * The thread state for the main thread.
316 // ToDo: check whether not needed any more
317 StgTSO   *MainTSO;
318  */
319
320 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
321 //@subsection Main scheduling loop
322
323 /* ---------------------------------------------------------------------------
324    Main scheduling loop.
325
326    We use round-robin scheduling, each thread returning to the
327    scheduler loop when one of these conditions is detected:
328
329       * out of heap space
330       * timer expires (thread yields)
331       * thread blocks
332       * thread ends
333       * stack overflow
334
335    Locking notes:  we acquire the scheduler lock once at the beginning
336    of the scheduler loop, and release it when
337     
338       * running a thread, or
339       * waiting for work, or
340       * waiting for a GC to complete.
341
342    GRAN version:
343      In a GranSim setup this loop iterates over the global event queue.
344      This revolves around the global event queue, which determines what 
345      to do next. Therefore, it's more complicated than either the 
346      concurrent or the parallel (GUM) setup.
347
348    GUM version:
349      GUM iterates over incoming messages.
350      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
351      and sends out a fish whenever it has nothing to do; in-between
352      doing the actual reductions (shared code below) it processes the
353      incoming messages and deals with delayed operations 
354      (see PendingFetches).
355      This is not the ugliest code you could imagine, but it's bloody close.
356
357    ------------------------------------------------------------------------ */
358 //@cindex schedule
359 static void
360 schedule( void )
361 {
362   StgTSO *t;
363   Capability *cap;
364   StgThreadReturnCode ret;
365 #if defined(GRAN)
366   rtsEvent *event;
367 #elif defined(PAR)
368   StgSparkPool *pool;
369   rtsSpark spark;
370   StgTSO *tso;
371   GlobalTaskId pe;
372   rtsBool receivedFinish = rtsFalse;
373 # if defined(DEBUG)
374   nat tp_size, sp_size; // stats only
375 # endif
376 #endif
377   rtsBool was_interrupted = rtsFalse;
378   
379   ACQUIRE_LOCK(&sched_mutex);
380
381 #if defined(GRAN)
382
383   /* set up first event to get things going */
384   /* ToDo: assign costs for system setup and init MainTSO ! */
385   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
386             ContinueThread, 
387             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
388
389   IF_DEBUG(gran,
390            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391            G_TSO(CurrentTSO, 5));
392
393   if (RtsFlags.GranFlags.Light) {
394     /* Save current time; GranSim Light only */
395     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
396   }      
397
398   event = get_next_event();
399
400   while (event!=(rtsEvent*)NULL) {
401     /* Choose the processor with the next event */
402     CurrentProc = event->proc;
403     CurrentTSO = event->tso;
404
405 #elif defined(PAR)
406
407   while (!receivedFinish) {    /* set by processMessages */
408                                /* when receiving PP_FINISH message         */ 
409 #else
410
411   while (1) {
412
413 #endif
414
415     IF_DEBUG(scheduler, printAllThreads());
416
417     /* If we're interrupted (the user pressed ^C, or some other
418      * termination condition occurred), kill all the currently running
419      * threads.
420      */
421     if (interrupted) {
422       IF_DEBUG(scheduler, sched_belch("interrupted"));
423       deleteAllThreads();
424       interrupted = rtsFalse;
425       was_interrupted = rtsTrue;
426     }
427
428     /* Go through the list of main threads and wake up any
429      * clients whose computations have finished.  ToDo: this
430      * should be done more efficiently without a linear scan
431      * of the main threads list, somehow...
432      */
433 #ifdef SMP
434     { 
435       StgMainThread *m, **prev;
436       prev = &main_threads;
437       for (m = main_threads; m != NULL; m = m->link) {
438         switch (m->tso->what_next) {
439         case ThreadComplete:
440           if (m->ret) {
441             *(m->ret) = (StgClosure *)m->tso->sp[0];
442           }
443           *prev = m->link;
444           m->stat = Success;
445           pthread_cond_broadcast(&m->wakeup);
446           break;
447         case ThreadKilled:
448           *prev = m->link;
449           if (was_interrupted) {
450             m->stat = Interrupted;
451           } else {
452             m->stat = Killed;
453           }
454           pthread_cond_broadcast(&m->wakeup);
455           break;
456         default:
457           break;
458         }
459       }
460     }
461
462 #else
463 # if defined(PAR)
464     /* in GUM do this only on the Main PE */
465     if (IAmMainThread)
466 # endif
467     /* If our main thread has finished or been killed, return.
468      */
469     {
470       StgMainThread *m = main_threads;
471       if (m->tso->what_next == ThreadComplete
472           || m->tso->what_next == ThreadKilled) {
473         main_threads = main_threads->link;
474         if (m->tso->what_next == ThreadComplete) {
475           /* we finished successfully, fill in the return value */
476           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
477           m->stat = Success;
478           return;
479         } else {
480           if (was_interrupted) {
481             m->stat = Interrupted;
482           } else {
483             m->stat = Killed;
484           }
485           return;
486         }
487       }
488     }
489 #endif
490
491     /* Top up the run queue from our spark pool.  We try to make the
492      * number of threads in the run queue equal to the number of
493      * free capabilities.
494      */
495 #if defined(SMP)
496     {
497       nat n = n_free_capabilities;
498       StgTSO *tso = run_queue_hd;
499
500       /* Count the run queue */
501       while (n > 0 && tso != END_TSO_QUEUE) {
502         tso = tso->link;
503         n--;
504       }
505
506       for (; n > 0; n--) {
507         StgClosure *spark;
508         spark = findSpark(rtsFalse);
509         if (spark == NULL) {
510           break; /* no more sparks in the pool */
511         } else {
512           /* I'd prefer this to be done in activateSpark -- HWL */
513           /* tricky - it needs to hold the scheduler lock and
514            * not try to re-acquire it -- SDM */
515           createSparkThread(spark);       
516           IF_DEBUG(scheduler,
517                    sched_belch("==^^ turning spark of closure %p into a thread",
518                                (StgClosure *)spark));
519         }
520       }
521       /* We need to wake up the other tasks if we just created some
522        * work for them.
523        */
524       if (n_free_capabilities - n > 1) {
525           pthread_cond_signal(&thread_ready_cond);
526       }
527     }
528 #endif /* SMP */
529
530     /* Check whether any waiting threads need to be woken up.  If the
531      * run queue is empty, and there are no other tasks running, we
532      * can wait indefinitely for something to happen.
533      * ToDo: what if another client comes along & requests another
534      * main thread?
535      */
536     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
537       awaitEvent(
538            (run_queue_hd == END_TSO_QUEUE)
539 #ifdef SMP
540         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
541 #endif
542         );
543     }
544     /* we can be interrupted while waiting for I/O... */
545     if (interrupted) continue;
546
547     /* check for signals each time around the scheduler */
548 #ifndef mingw32_TARGET_OS
549     if (signals_pending()) {
550       start_signal_handlers();
551     }
552 #endif
553
554     /* 
555      * Detect deadlock: when we have no threads to run, there are no
556      * threads waiting on I/O or sleeping, and all the other tasks are
557      * waiting for work, we must have a deadlock of some description.
558      *
559      * We first try to find threads blocked on themselves (ie. black
560      * holes), and generate NonTermination exceptions where necessary.
561      *
562      * If no threads are black holed, we have a deadlock situation, so
563      * inform all the main threads.
564      */
565 #ifdef SMP
566     if (blocked_queue_hd == END_TSO_QUEUE
567         && run_queue_hd == END_TSO_QUEUE
568         && sleeping_queue == END_TSO_QUEUE
569         && (n_free_capabilities == RtsFlags.ParFlags.nNodes))
570     {
571         IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes..."));
572         detectBlackHoles();
573         if (run_queue_hd == END_TSO_QUEUE) {
574             StgMainThread *m;
575             for (m = main_threads; m != NULL; m = m->link) {
576                 m->ret = NULL;
577                 m->stat = Deadlock;
578                 pthread_cond_broadcast(&m->wakeup);
579             }
580             main_threads = NULL;
581         }
582     }
583 #elif defined(PAR)
584     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
585 #else /* ! SMP */
586     if (blocked_queue_hd == END_TSO_QUEUE
587         && run_queue_hd == END_TSO_QUEUE
588         && sleeping_queue == END_TSO_QUEUE)
589     {
590         IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes..."));
591         detectBlackHoles();
592         if (run_queue_hd == END_TSO_QUEUE) {
593             StgMainThread *m = main_threads;
594             m->ret = NULL;
595             m->stat = Deadlock;
596             main_threads = m->link;
597             return;
598         }
599     }
600 #endif
601
602 #ifdef SMP
603     /* If there's a GC pending, don't do anything until it has
604      * completed.
605      */
606     if (ready_to_gc) {
607       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
608       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
609     }
610     
611     /* block until we've got a thread on the run queue and a free
612      * capability.
613      */
614     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
615       IF_DEBUG(scheduler, sched_belch("waiting for work"));
616       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
617       IF_DEBUG(scheduler, sched_belch("work now available"));
618     }
619 #endif
620
621 #if defined(GRAN)
622
623     if (RtsFlags.GranFlags.Light)
624       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
625
626     /* adjust time based on time-stamp */
627     if (event->time > CurrentTime[CurrentProc] &&
628         event->evttype != ContinueThread)
629       CurrentTime[CurrentProc] = event->time;
630     
631     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
632     if (!RtsFlags.GranFlags.Light)
633       handleIdlePEs();
634
635     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
636
637     /* main event dispatcher in GranSim */
638     switch (event->evttype) {
639       /* Should just be continuing execution */
640     case ContinueThread:
641       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
642       /* ToDo: check assertion
643       ASSERT(run_queue_hd != (StgTSO*)NULL &&
644              run_queue_hd != END_TSO_QUEUE);
645       */
646       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
647       if (!RtsFlags.GranFlags.DoAsyncFetch &&
648           procStatus[CurrentProc]==Fetching) {
649         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
650               CurrentTSO->id, CurrentTSO, CurrentProc);
651         goto next_thread;
652       } 
653       /* Ignore ContinueThreads for completed threads */
654       if (CurrentTSO->what_next == ThreadComplete) {
655         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
656               CurrentTSO->id, CurrentTSO, CurrentProc);
657         goto next_thread;
658       } 
659       /* Ignore ContinueThreads for threads that are being migrated */
660       if (PROCS(CurrentTSO)==Nowhere) { 
661         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
662               CurrentTSO->id, CurrentTSO, CurrentProc);
663         goto next_thread;
664       }
665       /* The thread should be at the beginning of the run queue */
666       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
667         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
668               CurrentTSO->id, CurrentTSO, CurrentProc);
669         break; // run the thread anyway
670       }
671       /*
672       new_event(proc, proc, CurrentTime[proc],
673                 FindWork,
674                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
675       goto next_thread; 
676       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
677       break; // now actually run the thread; DaH Qu'vam yImuHbej 
678
679     case FetchNode:
680       do_the_fetchnode(event);
681       goto next_thread;             /* handle next event in event queue  */
682       
683     case GlobalBlock:
684       do_the_globalblock(event);
685       goto next_thread;             /* handle next event in event queue  */
686       
687     case FetchReply:
688       do_the_fetchreply(event);
689       goto next_thread;             /* handle next event in event queue  */
690       
691     case UnblockThread:   /* Move from the blocked queue to the tail of */
692       do_the_unblock(event);
693       goto next_thread;             /* handle next event in event queue  */
694       
695     case ResumeThread:  /* Move from the blocked queue to the tail of */
696       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
697       event->tso->gran.blocktime += 
698         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
699       do_the_startthread(event);
700       goto next_thread;             /* handle next event in event queue  */
701       
702     case StartThread:
703       do_the_startthread(event);
704       goto next_thread;             /* handle next event in event queue  */
705       
706     case MoveThread:
707       do_the_movethread(event);
708       goto next_thread;             /* handle next event in event queue  */
709       
710     case MoveSpark:
711       do_the_movespark(event);
712       goto next_thread;             /* handle next event in event queue  */
713       
714     case FindWork:
715       do_the_findwork(event);
716       goto next_thread;             /* handle next event in event queue  */
717       
718     default:
719       barf("Illegal event type %u\n", event->evttype);
720     }  /* switch */
721     
722     /* This point was scheduler_loop in the old RTS */
723
724     IF_DEBUG(gran, belch("GRAN: after main switch"));
725
726     TimeOfLastEvent = CurrentTime[CurrentProc];
727     TimeOfNextEvent = get_time_of_next_event();
728     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
729     // CurrentTSO = ThreadQueueHd;
730
731     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
732                          TimeOfNextEvent));
733
734     if (RtsFlags.GranFlags.Light) 
735       GranSimLight_leave_system(event, &ActiveTSO); 
736
737     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
738
739     IF_DEBUG(gran, 
740              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
741
742     /* in a GranSim setup the TSO stays on the run queue */
743     t = CurrentTSO;
744     /* Take a thread from the run queue. */
745     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
746
747     IF_DEBUG(gran, 
748              fprintf(stderr, "GRAN: About to run current thread, which is\n");
749              G_TSO(t,5));
750
751     context_switch = 0; // turned on via GranYield, checking events and time slice
752
753     IF_DEBUG(gran, 
754              DumpGranEvent(GR_SCHEDULE, t));
755
756     procStatus[CurrentProc] = Busy;
757
758 #elif defined(PAR)
759     if (PendingFetches != END_BF_QUEUE) {
760         processFetches();
761     }
762
763     /* ToDo: phps merge with spark activation above */
764     /* check whether we have local work and send requests if we have none */
765     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
766       /* :-[  no local threads => look out for local sparks */
767       /* the spark pool for the current PE */
768       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
769       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
770           pool->hd < pool->tl) {
771         /* 
772          * ToDo: add GC code check that we really have enough heap afterwards!!
773          * Old comment:
774          * If we're here (no runnable threads) and we have pending
775          * sparks, we must have a space problem.  Get enough space
776          * to turn one of those pending sparks into a
777          * thread... 
778          */
779
780         spark = findSpark(rtsFalse);                /* get a spark */
781         if (spark != (rtsSpark) NULL) {
782           tso = activateSpark(spark);       /* turn the spark into a thread */
783           IF_PAR_DEBUG(schedule,
784                        belch("==== schedule: Created TSO %d (%p); %d threads active",
785                              tso->id, tso, advisory_thread_count));
786
787           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
788             belch("==^^ failed to activate spark");
789             goto next_thread;
790           }               /* otherwise fall through & pick-up new tso */
791         } else {
792           IF_PAR_DEBUG(verbose,
793                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
794                              spark_queue_len(pool)));
795           goto next_thread;
796         }
797       }
798
799       /* If we still have no work we need to send a FISH to get a spark
800          from another PE 
801       */
802       if (EMPTY_RUN_QUEUE()) {
803       /* =8-[  no local sparks => look for work on other PEs */
804         /*
805          * We really have absolutely no work.  Send out a fish
806          * (there may be some out there already), and wait for
807          * something to arrive.  We clearly can't run any threads
808          * until a SCHEDULE or RESUME arrives, and so that's what
809          * we're hoping to see.  (Of course, we still have to
810          * respond to other types of messages.)
811          */
812         TIME now = msTime() /*CURRENT_TIME*/;
813         IF_PAR_DEBUG(verbose, 
814                      belch("--  now=%ld", now));
815         IF_PAR_DEBUG(verbose,
816                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
817                          (last_fish_arrived_at!=0 &&
818                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
819                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
820                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
821                              last_fish_arrived_at,
822                              RtsFlags.ParFlags.fishDelay, now);
823                      });
824         
825         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
826             (last_fish_arrived_at==0 ||
827              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
828           /* outstandingFishes is set in sendFish, processFish;
829              avoid flooding system with fishes via delay */
830           pe = choosePE();
831           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
832                    NEW_FISH_HUNGER);
833
834           // Global statistics: count no. of fishes
835           if (RtsFlags.ParFlags.ParStats.Global &&
836               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
837             globalParStats.tot_fish_mess++;
838           }
839         }
840       
841         receivedFinish = processMessages();
842         goto next_thread;
843       }
844     } else if (PacketsWaiting()) {  /* Look for incoming messages */
845       receivedFinish = processMessages();
846     }
847
848     /* Now we are sure that we have some work available */
849     ASSERT(run_queue_hd != END_TSO_QUEUE);
850
851     /* Take a thread from the run queue, if we have work */
852     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
853     IF_DEBUG(sanity,checkTSO(t));
854
855     /* ToDo: write something to the log-file
856     if (RTSflags.ParFlags.granSimStats && !sameThread)
857         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
858
859     CurrentTSO = t;
860     */
861     /* the spark pool for the current PE */
862     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
863
864     IF_DEBUG(scheduler, 
865              belch("--=^ %d threads, %d sparks on [%#x]", 
866                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
867
868 #if 1
869     if (0 && RtsFlags.ParFlags.ParStats.Full && 
870         t && LastTSO && t->id != LastTSO->id && 
871         LastTSO->why_blocked == NotBlocked && 
872         LastTSO->what_next != ThreadComplete) {
873       // if previously scheduled TSO not blocked we have to record the context switch
874       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
875                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
876     }
877
878     if (RtsFlags.ParFlags.ParStats.Full && 
879         (emitSchedule /* forced emit */ ||
880         (t && LastTSO && t->id != LastTSO->id))) {
881       /* 
882          we are running a different TSO, so write a schedule event to log file
883          NB: If we use fair scheduling we also have to write  a deschedule 
884              event for LastTSO; with unfair scheduling we know that the
885              previous tso has blocked whenever we switch to another tso, so
886              we don't need it in GUM for now
887       */
888       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
889                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
890       emitSchedule = rtsFalse;
891     }
892      
893 #endif
894 #else /* !GRAN && !PAR */
895   
896     /* grab a thread from the run queue
897      */
898     ASSERT(run_queue_hd != END_TSO_QUEUE);
899     t = POP_RUN_QUEUE();
900     IF_DEBUG(sanity,checkTSO(t));
901
902 #endif
903     
904     /* grab a capability
905      */
906 #ifdef SMP
907     cap = free_capabilities;
908     free_capabilities = cap->link;
909     n_free_capabilities--;
910 #else
911     cap = &MainRegTable;
912 #endif
913     
914     cap->rCurrentTSO = t;
915     
916     /* context switches are now initiated by the timer signal, unless
917      * the user specified "context switch as often as possible", with
918      * +RTS -C0
919      */
920     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
921         && (run_queue_hd != END_TSO_QUEUE
922             || blocked_queue_hd != END_TSO_QUEUE
923             || sleeping_queue != END_TSO_QUEUE))
924         context_switch = 1;
925     else
926         context_switch = 0;
927
928     RELEASE_LOCK(&sched_mutex);
929
930     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
931                               t->id, t, whatNext_strs[t->what_next]));
932
933     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
934     /* Run the current thread 
935      */
936     switch (cap->rCurrentTSO->what_next) {
937     case ThreadKilled:
938     case ThreadComplete:
939         /* Thread already finished, return to scheduler. */
940         ret = ThreadFinished;
941         break;
942     case ThreadEnterGHC:
943         ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
944         break;
945     case ThreadRunGHC:
946         ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
947         break;
948     case ThreadEnterInterp:
949         ret = interpretBCO(cap);
950         break;
951     default:
952       barf("schedule: invalid what_next field");
953     }
954     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
955     
956     /* Costs for the scheduler are assigned to CCS_SYSTEM */
957 #ifdef PROFILING
958     CCCS = CCS_SYSTEM;
959 #endif
960     
961     ACQUIRE_LOCK(&sched_mutex);
962
963 #ifdef SMP
964     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
965 #elif !defined(GRAN) && !defined(PAR)
966     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
967 #endif
968     t = cap->rCurrentTSO;
969     
970 #if defined(PAR)
971     /* HACK 675: if the last thread didn't yield, make sure to print a 
972        SCHEDULE event to the log file when StgRunning the next thread, even
973        if it is the same one as before */
974     LastTSO = t; 
975     TimeOfLastYield = CURRENT_TIME;
976 #endif
977
978     switch (ret) {
979     case HeapOverflow:
980 #if defined(GRAN)
981       IF_DEBUG(gran, 
982                DumpGranEvent(GR_DESCHEDULE, t));
983       globalGranStats.tot_heapover++;
984 #elif defined(PAR)
985       // IF_DEBUG(par, 
986       //DumpGranEvent(GR_DESCHEDULE, t);
987       globalParStats.tot_heapover++;
988 #endif
989       /* make all the running tasks block on a condition variable,
990        * maybe set context_switch and wait till they all pile in,
991        * then have them wait on a GC condition variable.
992        */
993       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
994                                t->id, t, whatNext_strs[t->what_next]));
995       threadPaused(t);
996 #if defined(GRAN)
997       ASSERT(!is_on_queue(t,CurrentProc));
998 #elif defined(PAR)
999       /* Currently we emit a DESCHEDULE event before GC in GUM.
1000          ToDo: either add separate event to distinguish SYSTEM time from rest
1001                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1002       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1003         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1004                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1005         emitSchedule = rtsTrue;
1006       }
1007 #endif
1008       
1009       ready_to_gc = rtsTrue;
1010       context_switch = 1;               /* stop other threads ASAP */
1011       PUSH_ON_RUN_QUEUE(t);
1012       /* actual GC is done at the end of the while loop */
1013       break;
1014       
1015     case StackOverflow:
1016 #if defined(GRAN)
1017       IF_DEBUG(gran, 
1018                DumpGranEvent(GR_DESCHEDULE, t));
1019       globalGranStats.tot_stackover++;
1020 #elif defined(PAR)
1021       // IF_DEBUG(par, 
1022       // DumpGranEvent(GR_DESCHEDULE, t);
1023       globalParStats.tot_stackover++;
1024 #endif
1025       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1026                                t->id, t, whatNext_strs[t->what_next]));
1027       /* just adjust the stack for this thread, then pop it back
1028        * on the run queue.
1029        */
1030       threadPaused(t);
1031       { 
1032         StgMainThread *m;
1033         /* enlarge the stack */
1034         StgTSO *new_t = threadStackOverflow(t);
1035         
1036         /* This TSO has moved, so update any pointers to it from the
1037          * main thread stack.  It better not be on any other queues...
1038          * (it shouldn't be).
1039          */
1040         for (m = main_threads; m != NULL; m = m->link) {
1041           if (m->tso == t) {
1042             m->tso = new_t;
1043           }
1044         }
1045         threadPaused(new_t);
1046         PUSH_ON_RUN_QUEUE(new_t);
1047       }
1048       break;
1049
1050     case ThreadYielding:
1051 #if defined(GRAN)
1052       IF_DEBUG(gran, 
1053                DumpGranEvent(GR_DESCHEDULE, t));
1054       globalGranStats.tot_yields++;
1055 #elif defined(PAR)
1056       // IF_DEBUG(par, 
1057       // DumpGranEvent(GR_DESCHEDULE, t);
1058       globalParStats.tot_yields++;
1059 #endif
1060       /* put the thread back on the run queue.  Then, if we're ready to
1061        * GC, check whether this is the last task to stop.  If so, wake
1062        * up the GC thread.  getThread will block during a GC until the
1063        * GC is finished.
1064        */
1065       IF_DEBUG(scheduler,
1066                if (t->what_next == ThreadEnterInterp) {
1067                    /* ToDo: or maybe a timer expired when we were in Hugs?
1068                     * or maybe someone hit ctrl-C
1069                     */
1070                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1071                          t->id, t, whatNext_strs[t->what_next]);
1072                } else {
1073                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1074                          t->id, t, whatNext_strs[t->what_next]);
1075                }
1076                );
1077
1078       threadPaused(t);
1079
1080       IF_DEBUG(sanity,
1081                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1082                checkTSO(t));
1083       ASSERT(t->link == END_TSO_QUEUE);
1084 #if defined(GRAN)
1085       ASSERT(!is_on_queue(t,CurrentProc));
1086
1087       IF_DEBUG(sanity,
1088                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1089                checkThreadQsSanity(rtsTrue));
1090 #endif
1091 #if defined(PAR)
1092       if (RtsFlags.ParFlags.doFairScheduling) { 
1093         /* this does round-robin scheduling; good for concurrency */
1094         APPEND_TO_RUN_QUEUE(t);
1095       } else {
1096         /* this does unfair scheduling; good for parallelism */
1097         PUSH_ON_RUN_QUEUE(t);
1098       }
1099 #else
1100       /* this does round-robin scheduling; good for concurrency */
1101       APPEND_TO_RUN_QUEUE(t);
1102 #endif
1103 #if defined(GRAN)
1104       /* add a ContinueThread event to actually process the thread */
1105       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1106                 ContinueThread,
1107                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1108       IF_GRAN_DEBUG(bq, 
1109                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1110                G_EVENTQ(0);
1111                G_CURR_THREADQ(0));
1112 #endif /* GRAN */
1113       break;
1114       
1115     case ThreadBlocked:
1116 #if defined(GRAN)
1117       IF_DEBUG(scheduler,
1118                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1119                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1120                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1121
1122       // ??? needed; should emit block before
1123       IF_DEBUG(gran, 
1124                DumpGranEvent(GR_DESCHEDULE, t)); 
1125       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1126       /*
1127         ngoq Dogh!
1128       ASSERT(procStatus[CurrentProc]==Busy || 
1129               ((procStatus[CurrentProc]==Fetching) && 
1130               (t->block_info.closure!=(StgClosure*)NULL)));
1131       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1132           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1133             procStatus[CurrentProc]==Fetching)) 
1134         procStatus[CurrentProc] = Idle;
1135       */
1136 #elif defined(PAR)
1137       IF_DEBUG(scheduler,
1138                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1139                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1140       IF_PAR_DEBUG(bq,
1141
1142                    if (t->block_info.closure!=(StgClosure*)NULL) 
1143                      print_bq(t->block_info.closure));
1144
1145       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1146       blockThread(t);
1147
1148       /* whatever we schedule next, we must log that schedule */
1149       emitSchedule = rtsTrue;
1150
1151 #else /* !GRAN */
1152       /* don't need to do anything.  Either the thread is blocked on
1153        * I/O, in which case we'll have called addToBlockedQueue
1154        * previously, or it's blocked on an MVar or Blackhole, in which
1155        * case it'll be on the relevant queue already.
1156        */
1157       IF_DEBUG(scheduler,
1158                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1159                printThreadBlockage(t);
1160                fprintf(stderr, "\n"));
1161
1162       /* Only for dumping event to log file 
1163          ToDo: do I need this in GranSim, too?
1164       blockThread(t);
1165       */
1166 #endif
1167       threadPaused(t);
1168       break;
1169       
1170     case ThreadFinished:
1171       /* Need to check whether this was a main thread, and if so, signal
1172        * the task that started it with the return value.  If we have no
1173        * more main threads, we probably need to stop all the tasks until
1174        * we get a new one.
1175        */
1176       /* We also end up here if the thread kills itself with an
1177        * uncaught exception, see Exception.hc.
1178        */
1179       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1180 #if defined(GRAN)
1181       endThread(t, CurrentProc); // clean-up the thread
1182 #elif defined(PAR)
1183       /* For now all are advisory -- HWL */
1184       //if(t->priority==AdvisoryPriority) ??
1185       advisory_thread_count--;
1186       
1187 # ifdef DIST
1188       if(t->dist.priority==RevalPriority)
1189         FinishReval(t);
1190 # endif
1191       
1192       if (RtsFlags.ParFlags.ParStats.Full &&
1193           !RtsFlags.ParFlags.ParStats.Suppressed) 
1194         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1195 #endif
1196       break;
1197       
1198     default:
1199       barf("schedule: invalid thread return code %d", (int)ret);
1200     }
1201     
1202 #ifdef SMP
1203     cap->link = free_capabilities;
1204     free_capabilities = cap;
1205     n_free_capabilities++;
1206 #endif
1207
1208 #ifdef SMP
1209     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1210 #else
1211     if (ready_to_gc) 
1212 #endif
1213       {
1214       /* everybody back, start the GC.
1215        * Could do it in this thread, or signal a condition var
1216        * to do it in another thread.  Either way, we need to
1217        * broadcast on gc_pending_cond afterward.
1218        */
1219 #ifdef SMP
1220       IF_DEBUG(scheduler,sched_belch("doing GC"));
1221 #endif
1222       GarbageCollect(GetRoots,rtsFalse);
1223       ready_to_gc = rtsFalse;
1224 #ifdef SMP
1225       pthread_cond_broadcast(&gc_pending_cond);
1226 #endif
1227 #if defined(GRAN)
1228       /* add a ContinueThread event to continue execution of current thread */
1229       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1230                 ContinueThread,
1231                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1232       IF_GRAN_DEBUG(bq, 
1233                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1234                G_EVENTQ(0);
1235                G_CURR_THREADQ(0));
1236 #endif /* GRAN */
1237     }
1238 #if defined(GRAN)
1239   next_thread:
1240     IF_GRAN_DEBUG(unused,
1241                   print_eventq(EventHd));
1242
1243     event = get_next_event();
1244
1245 #elif defined(PAR)
1246   next_thread:
1247     /* ToDo: wait for next message to arrive rather than busy wait */
1248
1249 #else /* GRAN */
1250   /* not any more
1251   next_thread:
1252     t = take_off_run_queue(END_TSO_QUEUE);
1253   */
1254 #endif /* GRAN */
1255   } /* end of while(1) */
1256   IF_PAR_DEBUG(verbose,
1257                belch("== Leaving schedule() after having received Finish"));
1258 }
1259
1260 /* ---------------------------------------------------------------------------
1261  * deleteAllThreads():  kill all the live threads.
1262  *
1263  * This is used when we catch a user interrupt (^C), before performing
1264  * any necessary cleanups and running finalizers.
1265  * ------------------------------------------------------------------------- */
1266    
1267 void deleteAllThreads ( void )
1268 {
1269   StgTSO* t;
1270   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1271   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1272       deleteThread(t);
1273   }
1274   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1275       deleteThread(t);
1276   }
1277   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1278       deleteThread(t);
1279   }
1280   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1281   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1282   sleeping_queue = END_TSO_QUEUE;
1283 }
1284
1285 /* startThread and  insertThread are now in GranSim.c -- HWL */
1286
1287 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1288 //@subsection Suspend and Resume
1289
1290 /* ---------------------------------------------------------------------------
1291  * Suspending & resuming Haskell threads.
1292  * 
1293  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1294  * its capability before calling the C function.  This allows another
1295  * task to pick up the capability and carry on running Haskell
1296  * threads.  It also means that if the C call blocks, it won't lock
1297  * the whole system.
1298  *
1299  * The Haskell thread making the C call is put to sleep for the
1300  * duration of the call, on the susepended_ccalling_threads queue.  We
1301  * give out a token to the task, which it can use to resume the thread
1302  * on return from the C function.
1303  * ------------------------------------------------------------------------- */
1304    
1305 StgInt
1306 suspendThread( Capability *cap )
1307 {
1308   nat tok;
1309
1310   ACQUIRE_LOCK(&sched_mutex);
1311
1312   IF_DEBUG(scheduler,
1313            sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id));
1314
1315   threadPaused(cap->rCurrentTSO);
1316   cap->rCurrentTSO->link = suspended_ccalling_threads;
1317   suspended_ccalling_threads = cap->rCurrentTSO;
1318
1319   /* Use the thread ID as the token; it should be unique */
1320   tok = cap->rCurrentTSO->id;
1321
1322 #ifdef SMP
1323   cap->link = free_capabilities;
1324   free_capabilities = cap;
1325   n_free_capabilities++;
1326 #endif
1327
1328   RELEASE_LOCK(&sched_mutex);
1329   return tok; 
1330 }
1331
1332 Capability *
1333 resumeThread( StgInt tok )
1334 {
1335   StgTSO *tso, **prev;
1336   Capability *cap;
1337
1338   ACQUIRE_LOCK(&sched_mutex);
1339
1340   prev = &suspended_ccalling_threads;
1341   for (tso = suspended_ccalling_threads; 
1342        tso != END_TSO_QUEUE; 
1343        prev = &tso->link, tso = tso->link) {
1344     if (tso->id == (StgThreadID)tok) {
1345       *prev = tso->link;
1346       break;
1347     }
1348   }
1349   if (tso == END_TSO_QUEUE) {
1350     barf("resumeThread: thread not found");
1351   }
1352   tso->link = END_TSO_QUEUE;
1353
1354 #ifdef SMP
1355   while (free_capabilities == NULL) {
1356     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1357     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1358     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1359   }
1360   cap = free_capabilities;
1361   free_capabilities = cap->link;
1362   n_free_capabilities--;
1363 #else  
1364   cap = &MainRegTable;
1365 #endif
1366
1367   cap->rCurrentTSO = tso;
1368
1369   RELEASE_LOCK(&sched_mutex);
1370   return cap;
1371 }
1372
1373
1374 /* ---------------------------------------------------------------------------
1375  * Static functions
1376  * ------------------------------------------------------------------------ */
1377 static void unblockThread(StgTSO *tso);
1378
1379 /* ---------------------------------------------------------------------------
1380  * Comparing Thread ids.
1381  *
1382  * This is used from STG land in the implementation of the
1383  * instances of Eq/Ord for ThreadIds.
1384  * ------------------------------------------------------------------------ */
1385
1386 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1387
1388   StgThreadID id1 = tso1->id; 
1389   StgThreadID id2 = tso2->id;
1390  
1391   if (id1 < id2) return (-1);
1392   if (id1 > id2) return 1;
1393   return 0;
1394 }
1395
1396 /* ---------------------------------------------------------------------------
1397    Create a new thread.
1398
1399    The new thread starts with the given stack size.  Before the
1400    scheduler can run, however, this thread needs to have a closure
1401    (and possibly some arguments) pushed on its stack.  See
1402    pushClosure() in Schedule.h.
1403
1404    createGenThread() and createIOThread() (in SchedAPI.h) are
1405    convenient packaged versions of this function.
1406
1407    currently pri (priority) is only used in a GRAN setup -- HWL
1408    ------------------------------------------------------------------------ */
1409 //@cindex createThread
1410 #if defined(GRAN)
1411 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1412 StgTSO *
1413 createThread(nat stack_size, StgInt pri)
1414 {
1415   return createThread_(stack_size, rtsFalse, pri);
1416 }
1417
1418 static StgTSO *
1419 createThread_(nat size, rtsBool have_lock, StgInt pri)
1420 {
1421 #else
1422 StgTSO *
1423 createThread(nat stack_size)
1424 {
1425   return createThread_(stack_size, rtsFalse);
1426 }
1427
1428 static StgTSO *
1429 createThread_(nat size, rtsBool have_lock)
1430 {
1431 #endif
1432
1433     StgTSO *tso;
1434     nat stack_size;
1435
1436     /* First check whether we should create a thread at all */
1437 #if defined(PAR)
1438   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1439   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1440     threadsIgnored++;
1441     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1442           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1443     return END_TSO_QUEUE;
1444   }
1445   threadsCreated++;
1446 #endif
1447
1448 #if defined(GRAN)
1449   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1450 #endif
1451
1452   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1453
1454   /* catch ridiculously small stack sizes */
1455   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1456     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1457   }
1458
1459   stack_size = size - TSO_STRUCT_SIZEW;
1460
1461   tso = (StgTSO *)allocate(size);
1462   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1463
1464   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1465 #if defined(GRAN)
1466   SET_GRAN_HDR(tso, ThisPE);
1467 #endif
1468   tso->what_next     = ThreadEnterGHC;
1469
1470   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1471    * protect the increment operation on next_thread_id.
1472    * In future, we could use an atomic increment instead.
1473    */
1474   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1475   tso->id = next_thread_id++; 
1476   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1477
1478   tso->why_blocked  = NotBlocked;
1479   tso->blocked_exceptions = NULL;
1480
1481   //tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1482   tso->stack_size   = stack_size;
1483   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1484                               - TSO_STRUCT_SIZEW;
1485   tso->sp           = (P_)&(tso->stack) + stack_size;
1486
1487 #ifdef PROFILING
1488   tso->prof.CCCS = CCS_MAIN;
1489 #endif
1490
1491   /* put a stop frame on the stack */
1492   tso->sp -= sizeofW(StgStopFrame);
1493   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1494   tso->su = (StgUpdateFrame*)tso->sp;
1495
1496   // ToDo: check this
1497 #if defined(GRAN)
1498   tso->link = END_TSO_QUEUE;
1499   /* uses more flexible routine in GranSim */
1500   insertThread(tso, CurrentProc);
1501 #else
1502   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1503    * from its creation
1504    */
1505 #endif
1506
1507 #if defined(GRAN) 
1508   if (RtsFlags.GranFlags.GranSimStats.Full) 
1509     DumpGranEvent(GR_START,tso);
1510 #elif defined(PAR)
1511   if (RtsFlags.ParFlags.ParStats.Full) 
1512     DumpGranEvent(GR_STARTQ,tso);
1513   /* HACk to avoid SCHEDULE 
1514      LastTSO = tso; */
1515 #endif
1516
1517   /* Link the new thread on the global thread list.
1518    */
1519   tso->global_link = all_threads;
1520   all_threads = tso;
1521
1522 #if defined(DIST)
1523   tso->dist.priority = MandatoryPriority; //by default that is...
1524 #endif
1525
1526 #if defined(GRAN)
1527   tso->gran.pri = pri;
1528 # if defined(DEBUG)
1529   tso->gran.magic = TSO_MAGIC; // debugging only
1530 # endif
1531   tso->gran.sparkname   = 0;
1532   tso->gran.startedat   = CURRENT_TIME; 
1533   tso->gran.exported    = 0;
1534   tso->gran.basicblocks = 0;
1535   tso->gran.allocs      = 0;
1536   tso->gran.exectime    = 0;
1537   tso->gran.fetchtime   = 0;
1538   tso->gran.fetchcount  = 0;
1539   tso->gran.blocktime   = 0;
1540   tso->gran.blockcount  = 0;
1541   tso->gran.blockedat   = 0;
1542   tso->gran.globalsparks = 0;
1543   tso->gran.localsparks  = 0;
1544   if (RtsFlags.GranFlags.Light)
1545     tso->gran.clock  = Now; /* local clock */
1546   else
1547     tso->gran.clock  = 0;
1548
1549   IF_DEBUG(gran,printTSO(tso));
1550 #elif defined(PAR)
1551 # if defined(DEBUG)
1552   tso->par.magic = TSO_MAGIC; // debugging only
1553 # endif
1554   tso->par.sparkname   = 0;
1555   tso->par.startedat   = CURRENT_TIME; 
1556   tso->par.exported    = 0;
1557   tso->par.basicblocks = 0;
1558   tso->par.allocs      = 0;
1559   tso->par.exectime    = 0;
1560   tso->par.fetchtime   = 0;
1561   tso->par.fetchcount  = 0;
1562   tso->par.blocktime   = 0;
1563   tso->par.blockcount  = 0;
1564   tso->par.blockedat   = 0;
1565   tso->par.globalsparks = 0;
1566   tso->par.localsparks  = 0;
1567 #endif
1568
1569 #if defined(GRAN)
1570   globalGranStats.tot_threads_created++;
1571   globalGranStats.threads_created_on_PE[CurrentProc]++;
1572   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1573   globalGranStats.tot_sq_probes++;
1574 #elif defined(PAR)
1575   // collect parallel global statistics (currently done together with GC stats)
1576   if (RtsFlags.ParFlags.ParStats.Global &&
1577       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1578     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1579     globalParStats.tot_threads_created++;
1580   }
1581 #endif 
1582
1583 #if defined(GRAN)
1584   IF_GRAN_DEBUG(pri,
1585                 belch("==__ schedule: Created TSO %d (%p);",
1586                       CurrentProc, tso, tso->id));
1587 #elif defined(PAR)
1588     IF_PAR_DEBUG(verbose,
1589                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1590                        tso->id, tso, advisory_thread_count));
1591 #else
1592   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1593                                  tso->id, tso->stack_size));
1594 #endif    
1595   return tso;
1596 }
1597
1598 #if defined(PAR)
1599 /* RFP:
1600    all parallel thread creation calls should fall through the following routine.
1601 */
1602 StgTSO *
1603 createSparkThread(rtsSpark spark) 
1604 { StgTSO *tso;
1605   ASSERT(spark != (rtsSpark)NULL);
1606   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1607   { threadsIgnored++;
1608     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1609           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1610     return END_TSO_QUEUE;
1611   }
1612   else
1613   { threadsCreated++;
1614     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1615     if (tso==END_TSO_QUEUE)     
1616       barf("createSparkThread: Cannot create TSO");
1617 #if defined(DIST)
1618     tso->priority = AdvisoryPriority;
1619 #endif
1620     pushClosure(tso,spark);
1621     PUSH_ON_RUN_QUEUE(tso);
1622     advisory_thread_count++;    
1623   }
1624   return tso;
1625 }
1626 #endif
1627
1628 /*
1629   Turn a spark into a thread.
1630   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1631 */
1632 #if defined(PAR)
1633 //@cindex activateSpark
1634 StgTSO *
1635 activateSpark (rtsSpark spark) 
1636 {
1637   StgTSO *tso;
1638
1639   tso = createSparkThread(spark);
1640   if (RtsFlags.ParFlags.ParStats.Full) {   
1641     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1642     IF_PAR_DEBUG(verbose,
1643                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1644                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1645   }
1646   // ToDo: fwd info on local/global spark to thread -- HWL
1647   // tso->gran.exported =  spark->exported;
1648   // tso->gran.locked =   !spark->global;
1649   // tso->gran.sparkname = spark->name;
1650
1651   return tso;
1652 }
1653 #endif
1654
1655 /* ---------------------------------------------------------------------------
1656  * scheduleThread()
1657  *
1658  * scheduleThread puts a thread on the head of the runnable queue.
1659  * This will usually be done immediately after a thread is created.
1660  * The caller of scheduleThread must create the thread using e.g.
1661  * createThread and push an appropriate closure
1662  * on this thread's stack before the scheduler is invoked.
1663  * ------------------------------------------------------------------------ */
1664
1665 void
1666 scheduleThread(StgTSO *tso)
1667 {
1668   if (tso==END_TSO_QUEUE){    
1669     schedule();
1670     return;
1671   }
1672
1673   ACQUIRE_LOCK(&sched_mutex);
1674
1675   /* Put the new thread on the head of the runnable queue.  The caller
1676    * better push an appropriate closure on this thread's stack
1677    * beforehand.  In the SMP case, the thread may start running as
1678    * soon as we release the scheduler lock below.
1679    */
1680   PUSH_ON_RUN_QUEUE(tso);
1681   THREAD_RUNNABLE();
1682
1683 #if 0
1684   IF_DEBUG(scheduler,printTSO(tso));
1685 #endif
1686   RELEASE_LOCK(&sched_mutex);
1687 }
1688
1689 /* ---------------------------------------------------------------------------
1690  * startTasks()
1691  *
1692  * Start up Posix threads to run each of the scheduler tasks.
1693  * I believe the task ids are not needed in the system as defined.
1694  *  KH @ 25/10/99
1695  * ------------------------------------------------------------------------ */
1696
1697 #if defined(PAR) || defined(SMP)
1698 void
1699 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1700 {
1701   scheduleThread(END_TSO_QUEUE);
1702 }
1703 #endif
1704
1705 /* ---------------------------------------------------------------------------
1706  * initScheduler()
1707  *
1708  * Initialise the scheduler.  This resets all the queues - if the
1709  * queues contained any threads, they'll be garbage collected at the
1710  * next pass.
1711  *
1712  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1713  * ------------------------------------------------------------------------ */
1714
1715 #ifdef SMP
1716 static void
1717 term_handler(int sig STG_UNUSED)
1718 {
1719   stat_workerStop();
1720   ACQUIRE_LOCK(&term_mutex);
1721   await_death--;
1722   RELEASE_LOCK(&term_mutex);
1723   pthread_exit(NULL);
1724 }
1725 #endif
1726
1727 //@cindex initScheduler
1728 void 
1729 initScheduler(void)
1730 {
1731 #if defined(GRAN)
1732   nat i;
1733
1734   for (i=0; i<=MAX_PROC; i++) {
1735     run_queue_hds[i]      = END_TSO_QUEUE;
1736     run_queue_tls[i]      = END_TSO_QUEUE;
1737     blocked_queue_hds[i]  = END_TSO_QUEUE;
1738     blocked_queue_tls[i]  = END_TSO_QUEUE;
1739     ccalling_threadss[i]  = END_TSO_QUEUE;
1740     sleeping_queue        = END_TSO_QUEUE;
1741   }
1742 #else
1743   run_queue_hd      = END_TSO_QUEUE;
1744   run_queue_tl      = END_TSO_QUEUE;
1745   blocked_queue_hd  = END_TSO_QUEUE;
1746   blocked_queue_tl  = END_TSO_QUEUE;
1747   sleeping_queue    = END_TSO_QUEUE;
1748 #endif 
1749
1750   suspended_ccalling_threads  = END_TSO_QUEUE;
1751
1752   main_threads = NULL;
1753   all_threads  = END_TSO_QUEUE;
1754
1755   context_switch = 0;
1756   interrupted    = 0;
1757
1758   RtsFlags.ConcFlags.ctxtSwitchTicks =
1759       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1760
1761   /* Install the SIGHUP handler */
1762 #ifdef SMP
1763   {
1764     struct sigaction action,oact;
1765
1766     action.sa_handler = term_handler;
1767     sigemptyset(&action.sa_mask);
1768     action.sa_flags = 0;
1769     if (sigaction(SIGTERM, &action, &oact) != 0) {
1770       barf("can't install TERM handler");
1771     }
1772   }
1773 #endif
1774
1775 #ifdef SMP
1776   /* Allocate N Capabilities */
1777   {
1778     nat i;
1779     Capability *cap, *prev;
1780     cap  = NULL;
1781     prev = NULL;
1782     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1783       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1784       cap->link = prev;
1785       prev = cap;
1786     }
1787     free_capabilities = cap;
1788     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1789   }
1790   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1791                              n_free_capabilities););
1792 #endif
1793
1794 #if defined(SMP) || defined(PAR)
1795   initSparkPools();
1796 #endif
1797 }
1798
1799 #ifdef SMP
1800 void
1801 startTasks( void )
1802 {
1803   nat i;
1804   int r;
1805   pthread_t tid;
1806   
1807   /* make some space for saving all the thread ids */
1808   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1809                             "initScheduler:task_ids");
1810   
1811   /* and create all the threads */
1812   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1813     r = pthread_create(&tid,NULL,taskStart,NULL);
1814     if (r != 0) {
1815       barf("startTasks: Can't create new Posix thread");
1816     }
1817     task_ids[i].id = tid;
1818     task_ids[i].mut_time = 0.0;
1819     task_ids[i].mut_etime = 0.0;
1820     task_ids[i].gc_time = 0.0;
1821     task_ids[i].gc_etime = 0.0;
1822     task_ids[i].elapsedtimestart = elapsedtime();
1823     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1824   }
1825 }
1826 #endif
1827
1828 void
1829 exitScheduler( void )
1830 {
1831 #ifdef SMP
1832   nat i;
1833
1834   /* Don't want to use pthread_cancel, since we'd have to install
1835    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1836    * all our locks.
1837    */
1838 #if 0
1839   /* Cancel all our tasks */
1840   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1841     pthread_cancel(task_ids[i].id);
1842   }
1843   
1844   /* Wait for all the tasks to terminate */
1845   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1846     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1847                                task_ids[i].id));
1848     pthread_join(task_ids[i].id, NULL);
1849   }
1850 #endif
1851
1852   /* Send 'em all a SIGHUP.  That should shut 'em up.
1853    */
1854   await_death = RtsFlags.ParFlags.nNodes;
1855   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1856     pthread_kill(task_ids[i].id,SIGTERM);
1857   }
1858   while (await_death > 0) {
1859     sched_yield();
1860   }
1861 #endif
1862 }
1863
1864 /* -----------------------------------------------------------------------------
1865    Managing the per-task allocation areas.
1866    
1867    Each capability comes with an allocation area.  These are
1868    fixed-length block lists into which allocation can be done.
1869
1870    ToDo: no support for two-space collection at the moment???
1871    -------------------------------------------------------------------------- */
1872
1873 /* -----------------------------------------------------------------------------
1874  * waitThread is the external interface for running a new computation
1875  * and waiting for the result.
1876  *
1877  * In the non-SMP case, we create a new main thread, push it on the 
1878  * main-thread stack, and invoke the scheduler to run it.  The
1879  * scheduler will return when the top main thread on the stack has
1880  * completed or died, and fill in the necessary fields of the
1881  * main_thread structure.
1882  *
1883  * In the SMP case, we create a main thread as before, but we then
1884  * create a new condition variable and sleep on it.  When our new
1885  * main thread has completed, we'll be woken up and the status/result
1886  * will be in the main_thread struct.
1887  * -------------------------------------------------------------------------- */
1888
1889 int 
1890 howManyThreadsAvail ( void )
1891 {
1892    int i = 0;
1893    StgTSO* q;
1894    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1895       i++;
1896    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1897       i++;
1898    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1899       i++;
1900    return i;
1901 }
1902
1903 void
1904 finishAllThreads ( void )
1905 {
1906    do {
1907       while (run_queue_hd != END_TSO_QUEUE) {
1908          waitThread ( run_queue_hd, NULL );
1909       }
1910       while (blocked_queue_hd != END_TSO_QUEUE) {
1911          waitThread ( blocked_queue_hd, NULL );
1912       }
1913       while (sleeping_queue != END_TSO_QUEUE) {
1914          waitThread ( blocked_queue_hd, NULL );
1915       }
1916    } while 
1917       (blocked_queue_hd != END_TSO_QUEUE || 
1918        run_queue_hd     != END_TSO_QUEUE ||
1919        sleeping_queue   != END_TSO_QUEUE);
1920 }
1921
1922 SchedulerStatus
1923 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1924 {
1925   StgMainThread *m;
1926   SchedulerStatus stat;
1927
1928   ACQUIRE_LOCK(&sched_mutex);
1929   
1930   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1931
1932   m->tso = tso;
1933   m->ret = ret;
1934   m->stat = NoStatus;
1935 #ifdef SMP
1936   pthread_cond_init(&m->wakeup, NULL);
1937 #endif
1938
1939   m->link = main_threads;
1940   main_threads = m;
1941
1942   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
1943                               m->tso->id));
1944
1945 #ifdef SMP
1946   do {
1947     pthread_cond_wait(&m->wakeup, &sched_mutex);
1948   } while (m->stat == NoStatus);
1949 #elif defined(GRAN)
1950   /* GranSim specific init */
1951   CurrentTSO = m->tso;                // the TSO to run
1952   procStatus[MainProc] = Busy;        // status of main PE
1953   CurrentProc = MainProc;             // PE to run it on
1954
1955   schedule();
1956 #else
1957   schedule();
1958   ASSERT(m->stat != NoStatus);
1959 #endif
1960
1961   stat = m->stat;
1962
1963 #ifdef SMP
1964   pthread_cond_destroy(&m->wakeup);
1965 #endif
1966
1967   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
1968                               m->tso->id));
1969   free(m);
1970
1971   RELEASE_LOCK(&sched_mutex);
1972
1973   return stat;
1974 }
1975
1976 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1977 //@subsection Run queue code 
1978
1979 #if 0
1980 /* 
1981    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1982        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1983        implicit global variable that has to be correct when calling these
1984        fcts -- HWL 
1985 */
1986
1987 /* Put the new thread on the head of the runnable queue.
1988  * The caller of createThread better push an appropriate closure
1989  * on this thread's stack before the scheduler is invoked.
1990  */
1991 static /* inline */ void
1992 add_to_run_queue(tso)
1993 StgTSO* tso; 
1994 {
1995   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1996   tso->link = run_queue_hd;
1997   run_queue_hd = tso;
1998   if (run_queue_tl == END_TSO_QUEUE) {
1999     run_queue_tl = tso;
2000   }
2001 }
2002
2003 /* Put the new thread at the end of the runnable queue. */
2004 static /* inline */ void
2005 push_on_run_queue(tso)
2006 StgTSO* tso; 
2007 {
2008   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2009   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2010   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2011   if (run_queue_hd == END_TSO_QUEUE) {
2012     run_queue_hd = tso;
2013   } else {
2014     run_queue_tl->link = tso;
2015   }
2016   run_queue_tl = tso;
2017 }
2018
2019 /* 
2020    Should be inlined because it's used very often in schedule.  The tso
2021    argument is actually only needed in GranSim, where we want to have the
2022    possibility to schedule *any* TSO on the run queue, irrespective of the
2023    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2024    the run queue and dequeue the tso, adjusting the links in the queue. 
2025 */
2026 //@cindex take_off_run_queue
2027 static /* inline */ StgTSO*
2028 take_off_run_queue(StgTSO *tso) {
2029   StgTSO *t, *prev;
2030
2031   /* 
2032      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2033
2034      if tso is specified, unlink that tso from the run_queue (doesn't have
2035      to be at the beginning of the queue); GranSim only 
2036   */
2037   if (tso!=END_TSO_QUEUE) {
2038     /* find tso in queue */
2039     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2040          t!=END_TSO_QUEUE && t!=tso;
2041          prev=t, t=t->link) 
2042       /* nothing */ ;
2043     ASSERT(t==tso);
2044     /* now actually dequeue the tso */
2045     if (prev!=END_TSO_QUEUE) {
2046       ASSERT(run_queue_hd!=t);
2047       prev->link = t->link;
2048     } else {
2049       /* t is at beginning of thread queue */
2050       ASSERT(run_queue_hd==t);
2051       run_queue_hd = t->link;
2052     }
2053     /* t is at end of thread queue */
2054     if (t->link==END_TSO_QUEUE) {
2055       ASSERT(t==run_queue_tl);
2056       run_queue_tl = prev;
2057     } else {
2058       ASSERT(run_queue_tl!=t);
2059     }
2060     t->link = END_TSO_QUEUE;
2061   } else {
2062     /* take tso from the beginning of the queue; std concurrent code */
2063     t = run_queue_hd;
2064     if (t != END_TSO_QUEUE) {
2065       run_queue_hd = t->link;
2066       t->link = END_TSO_QUEUE;
2067       if (run_queue_hd == END_TSO_QUEUE) {
2068         run_queue_tl = END_TSO_QUEUE;
2069       }
2070     }
2071   }
2072   return t;
2073 }
2074
2075 #endif /* 0 */
2076
2077 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2078 //@subsection Garbage Collextion Routines
2079
2080 /* ---------------------------------------------------------------------------
2081    Where are the roots that we know about?
2082
2083         - all the threads on the runnable queue
2084         - all the threads on the blocked queue
2085         - all the threads on the sleeping queue
2086         - all the thread currently executing a _ccall_GC
2087         - all the "main threads"
2088      
2089    ------------------------------------------------------------------------ */
2090
2091 /* This has to be protected either by the scheduler monitor, or by the
2092         garbage collection monitor (probably the latter).
2093         KH @ 25/10/99
2094 */
2095
2096 static void GetRoots(void)
2097 {
2098   StgMainThread *m;
2099
2100 #if defined(GRAN)
2101   {
2102     nat i;
2103     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2104       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2105         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
2106       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2107         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
2108       
2109       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2110         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
2111       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2112         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
2113       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2114         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
2115     }
2116   }
2117
2118   markEventQueue();
2119
2120 #else /* !GRAN */
2121   if (run_queue_hd != END_TSO_QUEUE) {
2122     ASSERT(run_queue_tl != END_TSO_QUEUE);
2123     run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
2124     run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
2125   }
2126
2127   if (blocked_queue_hd != END_TSO_QUEUE) {
2128     ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2129     blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
2130     blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
2131   }
2132
2133   if (sleeping_queue != END_TSO_QUEUE) {
2134     sleeping_queue  = (StgTSO *)MarkRoot((StgClosure *)sleeping_queue);
2135   }
2136 #endif 
2137
2138   for (m = main_threads; m != NULL; m = m->link) {
2139     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
2140   }
2141   if (suspended_ccalling_threads != END_TSO_QUEUE)
2142     suspended_ccalling_threads = 
2143       (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
2144
2145 #if defined(SMP) || defined(PAR) || defined(GRAN)
2146   markSparkQueue();
2147 #endif
2148 }
2149
2150 /* -----------------------------------------------------------------------------
2151    performGC
2152
2153    This is the interface to the garbage collector from Haskell land.
2154    We provide this so that external C code can allocate and garbage
2155    collect when called from Haskell via _ccall_GC.
2156
2157    It might be useful to provide an interface whereby the programmer
2158    can specify more roots (ToDo).
2159    
2160    This needs to be protected by the GC condition variable above.  KH.
2161    -------------------------------------------------------------------------- */
2162
2163 void (*extra_roots)(void);
2164
2165 void
2166 performGC(void)
2167 {
2168   GarbageCollect(GetRoots,rtsFalse);
2169 }
2170
2171 void
2172 performMajorGC(void)
2173 {
2174   GarbageCollect(GetRoots,rtsTrue);
2175 }
2176
2177 static void
2178 AllRoots(void)
2179 {
2180   GetRoots();                   /* the scheduler's roots */
2181   extra_roots();                /* the user's roots */
2182 }
2183
2184 void
2185 performGCWithRoots(void (*get_roots)(void))
2186 {
2187   extra_roots = get_roots;
2188
2189   GarbageCollect(AllRoots,rtsFalse);
2190 }
2191
2192 /* -----------------------------------------------------------------------------
2193    Stack overflow
2194
2195    If the thread has reached its maximum stack size, then raise the
2196    StackOverflow exception in the offending thread.  Otherwise
2197    relocate the TSO into a larger chunk of memory and adjust its stack
2198    size appropriately.
2199    -------------------------------------------------------------------------- */
2200
2201 static StgTSO *
2202 threadStackOverflow(StgTSO *tso)
2203 {
2204   nat new_stack_size, new_tso_size, diff, stack_words;
2205   StgPtr new_sp;
2206   StgTSO *dest;
2207
2208   IF_DEBUG(sanity,checkTSO(tso));
2209   if (tso->stack_size >= tso->max_stack_size) {
2210
2211     IF_DEBUG(gc,
2212              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2213                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2214              /* If we're debugging, just print out the top of the stack */
2215              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2216                                               tso->sp+64)));
2217
2218     /* Send this thread the StackOverflow exception */
2219     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2220     return tso;
2221   }
2222
2223   /* Try to double the current stack size.  If that takes us over the
2224    * maximum stack size for this thread, then use the maximum instead.
2225    * Finally round up so the TSO ends up as a whole number of blocks.
2226    */
2227   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2228   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2229                                        TSO_STRUCT_SIZE)/sizeof(W_);
2230   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2231   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2232
2233   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2234
2235   dest = (StgTSO *)allocate(new_tso_size);
2236   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2237
2238   /* copy the TSO block and the old stack into the new area */
2239   memcpy(dest,tso,TSO_STRUCT_SIZE);
2240   stack_words = tso->stack + tso->stack_size - tso->sp;
2241   new_sp = (P_)dest + new_tso_size - stack_words;
2242   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2243
2244   /* relocate the stack pointers... */
2245   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2246   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2247   dest->sp    = new_sp;
2248   //dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2249   dest->stack_size = new_stack_size;
2250         
2251   /* and relocate the update frame list */
2252   relocate_TSO(tso, dest);
2253
2254   /* Mark the old TSO as relocated.  We have to check for relocated
2255    * TSOs in the garbage collector and any primops that deal with TSOs.
2256    *
2257    * It's important to set the sp and su values to just beyond the end
2258    * of the stack, so we don't attempt to scavenge any part of the
2259    * dead TSO's stack.
2260    */
2261   tso->what_next = ThreadRelocated;
2262   tso->link = dest;
2263   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2264   tso->su = (StgUpdateFrame *)tso->sp;
2265   tso->why_blocked = NotBlocked;
2266   dest->mut_link = NULL;
2267
2268   IF_PAR_DEBUG(verbose,
2269                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2270                      tso->id, tso, tso->stack_size);
2271                /* If we're debugging, just print out the top of the stack */
2272                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2273                                                 tso->sp+64)));
2274   
2275   IF_DEBUG(sanity,checkTSO(tso));
2276 #if 0
2277   IF_DEBUG(scheduler,printTSO(dest));
2278 #endif
2279
2280   return dest;
2281 }
2282
2283 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2284 //@subsection Blocking Queue Routines
2285
2286 /* ---------------------------------------------------------------------------
2287    Wake up a queue that was blocked on some resource.
2288    ------------------------------------------------------------------------ */
2289
2290 #if defined(GRAN)
2291 static inline void
2292 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2293 {
2294 }
2295 #elif defined(PAR)
2296 static inline void
2297 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2298 {
2299   /* write RESUME events to log file and
2300      update blocked and fetch time (depending on type of the orig closure) */
2301   if (RtsFlags.ParFlags.ParStats.Full) {
2302     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2303                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2304                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2305     if (EMPTY_RUN_QUEUE())
2306       emitSchedule = rtsTrue;
2307
2308     switch (get_itbl(node)->type) {
2309         case FETCH_ME_BQ:
2310           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2311           break;
2312         case RBH:
2313         case FETCH_ME:
2314         case BLACKHOLE_BQ:
2315           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2316           break;
2317 #ifdef DIST
2318         case MVAR:
2319           break;
2320 #endif    
2321         default:
2322           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2323         }
2324       }
2325 }
2326 #endif
2327
2328 #if defined(GRAN)
2329 static StgBlockingQueueElement *
2330 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2331 {
2332     StgTSO *tso;
2333     PEs node_loc, tso_loc;
2334
2335     node_loc = where_is(node); // should be lifted out of loop
2336     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2337     tso_loc = where_is((StgClosure *)tso);
2338     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2339       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2340       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2341       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2342       // insertThread(tso, node_loc);
2343       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2344                 ResumeThread,
2345                 tso, node, (rtsSpark*)NULL);
2346       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2347       // len_local++;
2348       // len++;
2349     } else { // TSO is remote (actually should be FMBQ)
2350       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2351                                   RtsFlags.GranFlags.Costs.gunblocktime +
2352                                   RtsFlags.GranFlags.Costs.latency;
2353       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2354                 UnblockThread,
2355                 tso, node, (rtsSpark*)NULL);
2356       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2357       // len++;
2358     }
2359     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2360     IF_GRAN_DEBUG(bq,
2361                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2362                           (node_loc==tso_loc ? "Local" : "Global"), 
2363                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2364     tso->block_info.closure = NULL;
2365     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2366                              tso->id, tso));
2367 }
2368 #elif defined(PAR)
2369 static StgBlockingQueueElement *
2370 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2371 {
2372     StgBlockingQueueElement *next;
2373
2374     switch (get_itbl(bqe)->type) {
2375     case TSO:
2376       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2377       /* if it's a TSO just push it onto the run_queue */
2378       next = bqe->link;
2379       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2380       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2381       THREAD_RUNNABLE();
2382       unblockCount(bqe, node);
2383       /* reset blocking status after dumping event */
2384       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2385       break;
2386
2387     case BLOCKED_FETCH:
2388       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2389       next = bqe->link;
2390       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2391       PendingFetches = (StgBlockedFetch *)bqe;
2392       break;
2393
2394 # if defined(DEBUG)
2395       /* can ignore this case in a non-debugging setup; 
2396          see comments on RBHSave closures above */
2397     case CONSTR:
2398       /* check that the closure is an RBHSave closure */
2399       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2400              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2401              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2402       break;
2403
2404     default:
2405       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2406            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2407            (StgClosure *)bqe);
2408 # endif
2409     }
2410   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2411   return next;
2412 }
2413
2414 #else /* !GRAN && !PAR */
2415 static StgTSO *
2416 unblockOneLocked(StgTSO *tso)
2417 {
2418   StgTSO *next;
2419
2420   ASSERT(get_itbl(tso)->type == TSO);
2421   ASSERT(tso->why_blocked != NotBlocked);
2422   tso->why_blocked = NotBlocked;
2423   next = tso->link;
2424   PUSH_ON_RUN_QUEUE(tso);
2425   THREAD_RUNNABLE();
2426   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2427   return next;
2428 }
2429 #endif
2430
2431 #if defined(GRAN) || defined(PAR)
2432 inline StgBlockingQueueElement *
2433 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2434 {
2435   ACQUIRE_LOCK(&sched_mutex);
2436   bqe = unblockOneLocked(bqe, node);
2437   RELEASE_LOCK(&sched_mutex);
2438   return bqe;
2439 }
2440 #else
2441 inline StgTSO *
2442 unblockOne(StgTSO *tso)
2443 {
2444   ACQUIRE_LOCK(&sched_mutex);
2445   tso = unblockOneLocked(tso);
2446   RELEASE_LOCK(&sched_mutex);
2447   return tso;
2448 }
2449 #endif
2450
2451 #if defined(GRAN)
2452 void 
2453 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2454 {
2455   StgBlockingQueueElement *bqe;
2456   PEs node_loc;
2457   nat len = 0; 
2458
2459   IF_GRAN_DEBUG(bq, 
2460                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2461                       node, CurrentProc, CurrentTime[CurrentProc], 
2462                       CurrentTSO->id, CurrentTSO));
2463
2464   node_loc = where_is(node);
2465
2466   ASSERT(q == END_BQ_QUEUE ||
2467          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2468          get_itbl(q)->type == CONSTR); // closure (type constructor)
2469   ASSERT(is_unique(node));
2470
2471   /* FAKE FETCH: magically copy the node to the tso's proc;
2472      no Fetch necessary because in reality the node should not have been 
2473      moved to the other PE in the first place
2474   */
2475   if (CurrentProc!=node_loc) {
2476     IF_GRAN_DEBUG(bq, 
2477                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2478                         node, node_loc, CurrentProc, CurrentTSO->id, 
2479                         // CurrentTSO, where_is(CurrentTSO),
2480                         node->header.gran.procs));
2481     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2482     IF_GRAN_DEBUG(bq, 
2483                   belch("## new bitmask of node %p is %#x",
2484                         node, node->header.gran.procs));
2485     if (RtsFlags.GranFlags.GranSimStats.Global) {
2486       globalGranStats.tot_fake_fetches++;
2487     }
2488   }
2489
2490   bqe = q;
2491   // ToDo: check: ASSERT(CurrentProc==node_loc);
2492   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2493     //next = bqe->link;
2494     /* 
2495        bqe points to the current element in the queue
2496        next points to the next element in the queue
2497     */
2498     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2499     //tso_loc = where_is(tso);
2500     len++;
2501     bqe = unblockOneLocked(bqe, node);
2502   }
2503
2504   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2505      the closure to make room for the anchor of the BQ */
2506   if (bqe!=END_BQ_QUEUE) {
2507     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2508     /*
2509     ASSERT((info_ptr==&RBH_Save_0_info) ||
2510            (info_ptr==&RBH_Save_1_info) ||
2511            (info_ptr==&RBH_Save_2_info));
2512     */
2513     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2514     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2515     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2516
2517     IF_GRAN_DEBUG(bq,
2518                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2519                         node, info_type(node)));
2520   }
2521
2522   /* statistics gathering */
2523   if (RtsFlags.GranFlags.GranSimStats.Global) {
2524     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2525     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2526     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2527     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2528   }
2529   IF_GRAN_DEBUG(bq,
2530                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2531                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2532 }
2533 #elif defined(PAR)
2534 void 
2535 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2536 {
2537   StgBlockingQueueElement *bqe;
2538
2539   ACQUIRE_LOCK(&sched_mutex);
2540
2541   IF_PAR_DEBUG(verbose, 
2542                belch("##-_ AwBQ for node %p on [%x]: ",
2543                      node, mytid));
2544 #ifdef DIST  
2545   //RFP
2546   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2547     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2548     return;
2549   }
2550 #endif
2551   
2552   ASSERT(q == END_BQ_QUEUE ||
2553          get_itbl(q)->type == TSO ||           
2554          get_itbl(q)->type == BLOCKED_FETCH || 
2555          get_itbl(q)->type == CONSTR); 
2556
2557   bqe = q;
2558   while (get_itbl(bqe)->type==TSO || 
2559          get_itbl(bqe)->type==BLOCKED_FETCH) {
2560     bqe = unblockOneLocked(bqe, node);
2561   }
2562   RELEASE_LOCK(&sched_mutex);
2563 }
2564
2565 #else   /* !GRAN && !PAR */
2566 void
2567 awakenBlockedQueue(StgTSO *tso)
2568 {
2569   ACQUIRE_LOCK(&sched_mutex);
2570   while (tso != END_TSO_QUEUE) {
2571     tso = unblockOneLocked(tso);
2572   }
2573   RELEASE_LOCK(&sched_mutex);
2574 }
2575 #endif
2576
2577 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2578 //@subsection Exception Handling Routines
2579
2580 /* ---------------------------------------------------------------------------
2581    Interrupt execution
2582    - usually called inside a signal handler so it mustn't do anything fancy.   
2583    ------------------------------------------------------------------------ */
2584
2585 void
2586 interruptStgRts(void)
2587 {
2588     interrupted    = 1;
2589     context_switch = 1;
2590 }
2591
2592 /* -----------------------------------------------------------------------------
2593    Unblock a thread
2594
2595    This is for use when we raise an exception in another thread, which
2596    may be blocked.
2597    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2598    -------------------------------------------------------------------------- */
2599
2600 #if defined(GRAN) || defined(PAR)
2601 /*
2602   NB: only the type of the blocking queue is different in GranSim and GUM
2603       the operations on the queue-elements are the same
2604       long live polymorphism!
2605 */
2606 static void
2607 unblockThread(StgTSO *tso)
2608 {
2609   StgBlockingQueueElement *t, **last;
2610
2611   ACQUIRE_LOCK(&sched_mutex);
2612   switch (tso->why_blocked) {
2613
2614   case NotBlocked:
2615     return;  /* not blocked */
2616
2617   case BlockedOnMVar:
2618     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2619     {
2620       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2621       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2622
2623       last = (StgBlockingQueueElement **)&mvar->head;
2624       for (t = (StgBlockingQueueElement *)mvar->head; 
2625            t != END_BQ_QUEUE; 
2626            last = &t->link, last_tso = t, t = t->link) {
2627         if (t == (StgBlockingQueueElement *)tso) {
2628           *last = (StgBlockingQueueElement *)tso->link;
2629           if (mvar->tail == tso) {
2630             mvar->tail = (StgTSO *)last_tso;
2631           }
2632           goto done;
2633         }
2634       }
2635       barf("unblockThread (MVAR): TSO not found");
2636     }
2637
2638   case BlockedOnBlackHole:
2639     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2640     {
2641       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2642
2643       last = &bq->blocking_queue;
2644       for (t = bq->blocking_queue; 
2645            t != END_BQ_QUEUE; 
2646            last = &t->link, t = t->link) {
2647         if (t == (StgBlockingQueueElement *)tso) {
2648           *last = (StgBlockingQueueElement *)tso->link;
2649           goto done;
2650         }
2651       }
2652       barf("unblockThread (BLACKHOLE): TSO not found");
2653     }
2654
2655   case BlockedOnException:
2656     {
2657       StgTSO *target  = tso->block_info.tso;
2658
2659       ASSERT(get_itbl(target)->type == TSO);
2660
2661       if (target->what_next == ThreadRelocated) {
2662           target = target->link;
2663           ASSERT(get_itbl(target)->type == TSO);
2664       }
2665
2666       ASSERT(target->blocked_exceptions != NULL);
2667
2668       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2669       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2670            t != END_BQ_QUEUE; 
2671            last = &t->link, t = t->link) {
2672         ASSERT(get_itbl(t)->type == TSO);
2673         if (t == (StgBlockingQueueElement *)tso) {
2674           *last = (StgBlockingQueueElement *)tso->link;
2675           goto done;
2676         }
2677       }
2678       barf("unblockThread (Exception): TSO not found");
2679     }
2680
2681   case BlockedOnRead:
2682   case BlockedOnWrite:
2683     {
2684       /* take TSO off blocked_queue */
2685       StgBlockingQueueElement *prev = NULL;
2686       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2687            prev = t, t = t->link) {
2688         if (t == (StgBlockingQueueElement *)tso) {
2689           if (prev == NULL) {
2690             blocked_queue_hd = (StgTSO *)t->link;
2691             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2692               blocked_queue_tl = END_TSO_QUEUE;
2693             }
2694           } else {
2695             prev->link = t->link;
2696             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2697               blocked_queue_tl = (StgTSO *)prev;
2698             }
2699           }
2700           goto done;
2701         }
2702       }
2703       barf("unblockThread (I/O): TSO not found");
2704     }
2705
2706   case BlockedOnDelay:
2707     {
2708       /* take TSO off sleeping_queue */
2709       StgBlockingQueueElement *prev = NULL;
2710       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2711            prev = t, t = t->link) {
2712         if (t == (StgBlockingQueueElement *)tso) {
2713           if (prev == NULL) {
2714             sleeping_queue = (StgTSO *)t->link;
2715           } else {
2716             prev->link = t->link;
2717           }
2718           goto done;
2719         }
2720       }
2721       barf("unblockThread (I/O): TSO not found");
2722     }
2723
2724   default:
2725     barf("unblockThread");
2726   }
2727
2728  done:
2729   tso->link = END_TSO_QUEUE;
2730   tso->why_blocked = NotBlocked;
2731   tso->block_info.closure = NULL;
2732   PUSH_ON_RUN_QUEUE(tso);
2733   RELEASE_LOCK(&sched_mutex);
2734 }
2735 #else
2736 static void
2737 unblockThread(StgTSO *tso)
2738 {
2739   StgTSO *t, **last;
2740
2741   ACQUIRE_LOCK(&sched_mutex);
2742   switch (tso->why_blocked) {
2743
2744   case NotBlocked:
2745     return;  /* not blocked */
2746
2747   case BlockedOnMVar:
2748     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2749     {
2750       StgTSO *last_tso = END_TSO_QUEUE;
2751       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2752
2753       last = &mvar->head;
2754       for (t = mvar->head; t != END_TSO_QUEUE; 
2755            last = &t->link, last_tso = t, t = t->link) {
2756         if (t == tso) {
2757           *last = tso->link;
2758           if (mvar->tail == tso) {
2759             mvar->tail = last_tso;
2760           }
2761           goto done;
2762         }
2763       }
2764       barf("unblockThread (MVAR): TSO not found");
2765     }
2766
2767   case BlockedOnBlackHole:
2768     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2769     {
2770       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2771
2772       last = &bq->blocking_queue;
2773       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2774            last = &t->link, t = t->link) {
2775         if (t == tso) {
2776           *last = tso->link;
2777           goto done;
2778         }
2779       }
2780       barf("unblockThread (BLACKHOLE): TSO not found");
2781     }
2782
2783   case BlockedOnException:
2784     {
2785       StgTSO *target  = tso->block_info.tso;
2786
2787       ASSERT(get_itbl(target)->type == TSO);
2788
2789       while (target->what_next == ThreadRelocated) {
2790           target = target->link;
2791           ASSERT(get_itbl(target)->type == TSO);
2792       }
2793       
2794       ASSERT(target->blocked_exceptions != NULL);
2795
2796       last = &target->blocked_exceptions;
2797       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2798            last = &t->link, t = t->link) {
2799         ASSERT(get_itbl(t)->type == TSO);
2800         if (t == tso) {
2801           *last = tso->link;
2802           goto done;
2803         }
2804       }
2805       barf("unblockThread (Exception): TSO not found");
2806     }
2807
2808   case BlockedOnRead:
2809   case BlockedOnWrite:
2810     {
2811       StgTSO *prev = NULL;
2812       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2813            prev = t, t = t->link) {
2814         if (t == tso) {
2815           if (prev == NULL) {
2816             blocked_queue_hd = t->link;
2817             if (blocked_queue_tl == t) {
2818               blocked_queue_tl = END_TSO_QUEUE;
2819             }
2820           } else {
2821             prev->link = t->link;
2822             if (blocked_queue_tl == t) {
2823               blocked_queue_tl = prev;
2824             }
2825           }
2826           goto done;
2827         }
2828       }
2829       barf("unblockThread (I/O): TSO not found");
2830     }
2831
2832   case BlockedOnDelay:
2833     {
2834       StgTSO *prev = NULL;
2835       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2836            prev = t, t = t->link) {
2837         if (t == tso) {
2838           if (prev == NULL) {
2839             sleeping_queue = t->link;
2840           } else {
2841             prev->link = t->link;
2842           }
2843           goto done;
2844         }
2845       }
2846       barf("unblockThread (I/O): TSO not found");
2847     }
2848
2849   default:
2850     barf("unblockThread");
2851   }
2852
2853  done:
2854   tso->link = END_TSO_QUEUE;
2855   tso->why_blocked = NotBlocked;
2856   tso->block_info.closure = NULL;
2857   PUSH_ON_RUN_QUEUE(tso);
2858   RELEASE_LOCK(&sched_mutex);
2859 }
2860 #endif
2861
2862 /* -----------------------------------------------------------------------------
2863  * raiseAsync()
2864  *
2865  * The following function implements the magic for raising an
2866  * asynchronous exception in an existing thread.
2867  *
2868  * We first remove the thread from any queue on which it might be
2869  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2870  *
2871  * We strip the stack down to the innermost CATCH_FRAME, building
2872  * thunks in the heap for all the active computations, so they can 
2873  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2874  * an application of the handler to the exception, and push it on
2875  * the top of the stack.
2876  * 
2877  * How exactly do we save all the active computations?  We create an
2878  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2879  * AP_UPDs pushes everything from the corresponding update frame
2880  * upwards onto the stack.  (Actually, it pushes everything up to the
2881  * next update frame plus a pointer to the next AP_UPD object.
2882  * Entering the next AP_UPD object pushes more onto the stack until we
2883  * reach the last AP_UPD object - at which point the stack should look
2884  * exactly as it did when we killed the TSO and we can continue
2885  * execution by entering the closure on top of the stack.
2886  *
2887  * We can also kill a thread entirely - this happens if either (a) the 
2888  * exception passed to raiseAsync is NULL, or (b) there's no
2889  * CATCH_FRAME on the stack.  In either case, we strip the entire
2890  * stack and replace the thread with a zombie.
2891  *
2892  * -------------------------------------------------------------------------- */
2893  
2894 void 
2895 deleteThread(StgTSO *tso)
2896 {
2897   raiseAsync(tso,NULL);
2898 }
2899
2900 void
2901 raiseAsync(StgTSO *tso, StgClosure *exception)
2902 {
2903   StgUpdateFrame* su = tso->su;
2904   StgPtr          sp = tso->sp;
2905   
2906   /* Thread already dead? */
2907   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2908     return;
2909   }
2910
2911   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2912
2913   /* Remove it from any blocking queues */
2914   unblockThread(tso);
2915
2916   /* The stack freezing code assumes there's a closure pointer on
2917    * the top of the stack.  This isn't always the case with compiled
2918    * code, so we have to push a dummy closure on the top which just
2919    * returns to the next return address on the stack.
2920    */
2921   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2922     *(--sp) = (W_)&stg_dummy_ret_closure;
2923   }
2924
2925   while (1) {
2926     int words = ((P_)su - (P_)sp) - 1;
2927     nat i;
2928     StgAP_UPD * ap;
2929
2930     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2931      * then build PAP(handler,exception,realworld#), and leave it on
2932      * top of the stack ready to enter.
2933      */
2934     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2935       StgCatchFrame *cf = (StgCatchFrame *)su;
2936       /* we've got an exception to raise, so let's pass it to the
2937        * handler in this frame.
2938        */
2939       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2940       TICK_ALLOC_UPD_PAP(3,0);
2941       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
2942               
2943       ap->n_args = 2;
2944       ap->fun = cf->handler;    /* :: Exception -> IO a */
2945       ap->payload[0] = exception;
2946       ap->payload[1] = ARG_TAG(0); /* realworld token */
2947
2948       /* throw away the stack from Sp up to and including the
2949        * CATCH_FRAME.
2950        */
2951       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2952       tso->su = cf->link;
2953
2954       /* Restore the blocked/unblocked state for asynchronous exceptions
2955        * at the CATCH_FRAME.  
2956        *
2957        * If exceptions were unblocked at the catch, arrange that they
2958        * are unblocked again after executing the handler by pushing an
2959        * unblockAsyncExceptions_ret stack frame.
2960        */
2961       if (!cf->exceptions_blocked) {
2962         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
2963       }
2964       
2965       /* Ensure that async exceptions are blocked when running the handler.
2966        */
2967       if (tso->blocked_exceptions == NULL) {
2968         tso->blocked_exceptions = END_TSO_QUEUE;
2969       }
2970       
2971       /* Put the newly-built PAP on top of the stack, ready to execute
2972        * when the thread restarts.
2973        */
2974       sp[0] = (W_)ap;
2975       tso->sp = sp;
2976       tso->what_next = ThreadEnterGHC;
2977       IF_DEBUG(sanity, checkTSO(tso));
2978       return;
2979     }
2980
2981     /* First build an AP_UPD consisting of the stack chunk above the
2982      * current update frame, with the top word on the stack as the
2983      * fun field.
2984      */
2985     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2986     
2987     ASSERT(words >= 0);
2988     
2989     ap->n_args = words;
2990     ap->fun    = (StgClosure *)sp[0];
2991     sp++;
2992     for(i=0; i < (nat)words; ++i) {
2993       ap->payload[i] = (StgClosure *)*sp++;
2994     }
2995     
2996     switch (get_itbl(su)->type) {
2997       
2998     case UPDATE_FRAME:
2999       {
3000         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3001         TICK_ALLOC_UP_THK(words+1,0);
3002         
3003         IF_DEBUG(scheduler,
3004                  fprintf(stderr,  "scheduler: Updating ");
3005                  printPtr((P_)su->updatee); 
3006                  fprintf(stderr,  " with ");
3007                  printObj((StgClosure *)ap);
3008                  );
3009         
3010         /* Replace the updatee with an indirection - happily
3011          * this will also wake up any threads currently
3012          * waiting on the result.
3013          *
3014          * Warning: if we're in a loop, more than one update frame on
3015          * the stack may point to the same object.  Be careful not to
3016          * overwrite an IND_OLDGEN in this case, because we'll screw
3017          * up the mutable lists.  To be on the safe side, don't
3018          * overwrite any kind of indirection at all.  See also
3019          * threadSqueezeStack in GC.c, where we have to make a similar
3020          * check.
3021          */
3022         if (!closure_IND(su->updatee)) {
3023             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3024         }
3025         su = su->link;
3026         sp += sizeofW(StgUpdateFrame) -1;
3027         sp[0] = (W_)ap; /* push onto stack */
3028         break;
3029       }
3030
3031     case CATCH_FRAME:
3032       {
3033         StgCatchFrame *cf = (StgCatchFrame *)su;
3034         StgClosure* o;
3035         
3036         /* We want a PAP, not an AP_UPD.  Fortunately, the
3037          * layout's the same.
3038          */
3039         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3040         TICK_ALLOC_UPD_PAP(words+1,0);
3041         
3042         /* now build o = FUN(catch,ap,handler) */
3043         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3044         TICK_ALLOC_FUN(2,0);
3045         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3046         o->payload[0] = (StgClosure *)ap;
3047         o->payload[1] = cf->handler;
3048         
3049         IF_DEBUG(scheduler,
3050                  fprintf(stderr,  "scheduler: Built ");
3051                  printObj((StgClosure *)o);
3052                  );
3053         
3054         /* pop the old handler and put o on the stack */
3055         su = cf->link;
3056         sp += sizeofW(StgCatchFrame) - 1;
3057         sp[0] = (W_)o;
3058         break;
3059       }
3060       
3061     case SEQ_FRAME:
3062       {
3063         StgSeqFrame *sf = (StgSeqFrame *)su;
3064         StgClosure* o;
3065         
3066         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3067         TICK_ALLOC_UPD_PAP(words+1,0);
3068         
3069         /* now build o = FUN(seq,ap) */
3070         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3071         TICK_ALLOC_SE_THK(1,0);
3072         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3073         o->payload[0] = (StgClosure *)ap;
3074         
3075         IF_DEBUG(scheduler,
3076                  fprintf(stderr,  "scheduler: Built ");
3077                  printObj((StgClosure *)o);
3078                  );
3079         
3080         /* pop the old handler and put o on the stack */
3081         su = sf->link;
3082         sp += sizeofW(StgSeqFrame) - 1;
3083         sp[0] = (W_)o;
3084         break;
3085       }
3086       
3087     case STOP_FRAME:
3088       /* We've stripped the entire stack, the thread is now dead. */
3089       sp += sizeofW(StgStopFrame) - 1;
3090       sp[0] = (W_)exception;    /* save the exception */
3091       tso->what_next = ThreadKilled;
3092       tso->su = (StgUpdateFrame *)(sp+1);
3093       tso->sp = sp;
3094       return;
3095
3096     default:
3097       barf("raiseAsync");
3098     }
3099   }
3100   barf("raiseAsync");
3101 }
3102
3103 /* -----------------------------------------------------------------------------
3104    resurrectThreads is called after garbage collection on the list of
3105    threads found to be garbage.  Each of these threads will be woken
3106    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3107    on an MVar, or NonTermination if the thread was blocked on a Black
3108    Hole.
3109    -------------------------------------------------------------------------- */
3110
3111 void
3112 resurrectThreads( StgTSO *threads )
3113 {
3114   StgTSO *tso, *next;
3115
3116   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3117     next = tso->global_link;
3118     tso->global_link = all_threads;
3119     all_threads = tso;
3120     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3121
3122     switch (tso->why_blocked) {
3123     case BlockedOnMVar:
3124     case BlockedOnException:
3125       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3126       break;
3127     case BlockedOnBlackHole:
3128       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3129       break;
3130     case NotBlocked:
3131       /* This might happen if the thread was blocked on a black hole
3132        * belonging to a thread that we've just woken up (raiseAsync
3133        * can wake up threads, remember...).
3134        */
3135       continue;
3136     default:
3137       barf("resurrectThreads: thread blocked in a strange way");
3138     }
3139   }
3140 }
3141
3142 /* -----------------------------------------------------------------------------
3143  * Blackhole detection: if we reach a deadlock, test whether any
3144  * threads are blocked on themselves.  Any threads which are found to
3145  * be self-blocked get sent a NonTermination exception.
3146  *
3147  * This is only done in a deadlock situation in order to avoid
3148  * performance overhead in the normal case.
3149  * -------------------------------------------------------------------------- */
3150
3151 static void
3152 detectBlackHoles( void )
3153 {
3154     StgTSO *t = all_threads;
3155     StgUpdateFrame *frame;
3156     StgClosure *blocked_on;
3157
3158     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3159
3160         while (t->what_next == ThreadRelocated) {
3161             t = t->link;
3162             ASSERT(get_itbl(t)->type == TSO);
3163         }
3164       
3165         if (t->why_blocked != BlockedOnBlackHole) {
3166             continue;
3167         }
3168
3169         blocked_on = t->block_info.closure;
3170
3171         for (frame = t->su; ; frame = frame->link) {
3172             switch (get_itbl(frame)->type) {
3173
3174             case UPDATE_FRAME:
3175                 if (frame->updatee == blocked_on) {
3176                     /* We are blocking on one of our own computations, so
3177                      * send this thread the NonTermination exception.  
3178                      */
3179                     IF_DEBUG(scheduler, 
3180                              sched_belch("thread %d is blocked on itself", t->id));
3181                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3182                     goto done;
3183                 }
3184                 else {
3185                     continue;
3186                 }
3187
3188             case CATCH_FRAME:
3189             case SEQ_FRAME:
3190                 continue;
3191                 
3192             case STOP_FRAME:
3193                 break;
3194             }
3195             break;
3196         }
3197
3198     done: ;
3199     }   
3200 }
3201
3202 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3203 //@subsection Debugging Routines
3204
3205 /* -----------------------------------------------------------------------------
3206    Debugging: why is a thread blocked
3207    -------------------------------------------------------------------------- */
3208
3209 #ifdef DEBUG
3210
3211 void
3212 printThreadBlockage(StgTSO *tso)
3213 {
3214   switch (tso->why_blocked) {
3215   case BlockedOnRead:
3216     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3217     break;
3218   case BlockedOnWrite:
3219     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3220     break;
3221   case BlockedOnDelay:
3222     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3223     break;
3224   case BlockedOnMVar:
3225     fprintf(stderr,"is blocked on an MVar");
3226     break;
3227   case BlockedOnException:
3228     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3229             tso->block_info.tso->id);
3230     break;
3231   case BlockedOnBlackHole:
3232     fprintf(stderr,"is blocked on a black hole");
3233     break;
3234   case NotBlocked:
3235     fprintf(stderr,"is not blocked");
3236     break;
3237 #if defined(PAR)
3238   case BlockedOnGA:
3239     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3240             tso->block_info.closure, info_type(tso->block_info.closure));
3241     break;
3242   case BlockedOnGA_NoSend:
3243     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3244             tso->block_info.closure, info_type(tso->block_info.closure));
3245     break;
3246 #endif
3247   default:
3248     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3249          tso->why_blocked, tso->id, tso);
3250   }
3251 }
3252
3253 void
3254 printThreadStatus(StgTSO *tso)
3255 {
3256   switch (tso->what_next) {
3257   case ThreadKilled:
3258     fprintf(stderr,"has been killed");
3259     break;
3260   case ThreadComplete:
3261     fprintf(stderr,"has completed");
3262     break;
3263   default:
3264     printThreadBlockage(tso);
3265   }
3266 }
3267
3268 void
3269 printAllThreads(void)
3270 {
3271   StgTSO *t;
3272
3273 # if defined(GRAN)
3274   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3275   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3276                        time_string, rtsFalse/*no commas!*/);
3277
3278   sched_belch("all threads at [%s]:", time_string);
3279 # elif defined(PAR)
3280   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3281   ullong_format_string(CURRENT_TIME,
3282                        time_string, rtsFalse/*no commas!*/);
3283
3284   sched_belch("all threads at [%s]:", time_string);
3285 # else
3286   sched_belch("all threads:");
3287 # endif
3288
3289   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3290     fprintf(stderr, "\tthread %d ", t->id);
3291     printThreadStatus(t);
3292     fprintf(stderr,"\n");
3293   }
3294 }
3295     
3296 /* 
3297    Print a whole blocking queue attached to node (debugging only).
3298 */
3299 //@cindex print_bq
3300 # if defined(PAR)
3301 void 
3302 print_bq (StgClosure *node)
3303 {
3304   StgBlockingQueueElement *bqe;
3305   StgTSO *tso;
3306   rtsBool end;
3307
3308   fprintf(stderr,"## BQ of closure %p (%s): ",
3309           node, info_type(node));
3310
3311   /* should cover all closures that may have a blocking queue */
3312   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3313          get_itbl(node)->type == FETCH_ME_BQ ||
3314          get_itbl(node)->type == RBH ||
3315          get_itbl(node)->type == MVAR);
3316     
3317   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3318
3319   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3320 }
3321
3322 /* 
3323    Print a whole blocking queue starting with the element bqe.
3324 */
3325 void 
3326 print_bqe (StgBlockingQueueElement *bqe)
3327 {
3328   rtsBool end;
3329
3330   /* 
3331      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3332   */
3333   for (end = (bqe==END_BQ_QUEUE);
3334        !end; // iterate until bqe points to a CONSTR
3335        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3336        bqe = end ? END_BQ_QUEUE : bqe->link) {
3337     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3338     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3339     /* types of closures that may appear in a blocking queue */
3340     ASSERT(get_itbl(bqe)->type == TSO ||           
3341            get_itbl(bqe)->type == BLOCKED_FETCH || 
3342            get_itbl(bqe)->type == CONSTR); 
3343     /* only BQs of an RBH end with an RBH_Save closure */
3344     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3345
3346     switch (get_itbl(bqe)->type) {
3347     case TSO:
3348       fprintf(stderr," TSO %u (%x),",
3349               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3350       break;
3351     case BLOCKED_FETCH:
3352       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3353               ((StgBlockedFetch *)bqe)->node, 
3354               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3355               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3356               ((StgBlockedFetch *)bqe)->ga.weight);
3357       break;
3358     case CONSTR:
3359       fprintf(stderr," %s (IP %p),",
3360               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3361                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3362                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3363                "RBH_Save_?"), get_itbl(bqe));
3364       break;
3365     default:
3366       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3367            info_type((StgClosure *)bqe)); // , node, info_type(node));
3368       break;
3369     }
3370   } /* for */
3371   fputc('\n', stderr);
3372 }
3373 # elif defined(GRAN)
3374 void 
3375 print_bq (StgClosure *node)
3376 {
3377   StgBlockingQueueElement *bqe;
3378   PEs node_loc, tso_loc;
3379   rtsBool end;
3380
3381   /* should cover all closures that may have a blocking queue */
3382   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3383          get_itbl(node)->type == FETCH_ME_BQ ||
3384          get_itbl(node)->type == RBH);
3385     
3386   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3387   node_loc = where_is(node);
3388
3389   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3390           node, info_type(node), node_loc);
3391
3392   /* 
3393      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3394   */
3395   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3396        !end; // iterate until bqe points to a CONSTR
3397        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3398     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3399     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3400     /* types of closures that may appear in a blocking queue */
3401     ASSERT(get_itbl(bqe)->type == TSO ||           
3402            get_itbl(bqe)->type == CONSTR); 
3403     /* only BQs of an RBH end with an RBH_Save closure */
3404     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3405
3406     tso_loc = where_is((StgClosure *)bqe);
3407     switch (get_itbl(bqe)->type) {
3408     case TSO:
3409       fprintf(stderr," TSO %d (%p) on [PE %d],",
3410               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3411       break;
3412     case CONSTR:
3413       fprintf(stderr," %s (IP %p),",
3414               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3415                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3416                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3417                "RBH_Save_?"), get_itbl(bqe));
3418       break;
3419     default:
3420       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3421            info_type((StgClosure *)bqe), node, info_type(node));
3422       break;
3423     }
3424   } /* for */
3425   fputc('\n', stderr);
3426 }
3427 #else
3428 /* 
3429    Nice and easy: only TSOs on the blocking queue
3430 */
3431 void 
3432 print_bq (StgClosure *node)
3433 {
3434   StgTSO *tso;
3435
3436   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3437   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3438        tso != END_TSO_QUEUE; 
3439        tso=tso->link) {
3440     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3441     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3442     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3443   }
3444   fputc('\n', stderr);
3445 }
3446 # endif
3447
3448 #if defined(PAR)
3449 static nat
3450 run_queue_len(void)
3451 {
3452   nat i;
3453   StgTSO *tso;
3454
3455   for (i=0, tso=run_queue_hd; 
3456        tso != END_TSO_QUEUE;
3457        i++, tso=tso->link)
3458     /* nothing */
3459
3460   return i;
3461 }
3462 #endif
3463
3464 static void
3465 sched_belch(char *s, ...)
3466 {
3467   va_list ap;
3468   va_start(ap,s);
3469 #ifdef SMP
3470   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3471 #elif defined(PAR)
3472   fprintf(stderr, "== ");
3473 #else
3474   fprintf(stderr, "scheduler: ");
3475 #endif
3476   vfprintf(stderr, s, ap);
3477   fprintf(stderr, "\n");
3478 }
3479
3480 #endif /* DEBUG */
3481
3482
3483 //@node Index,  , Debugging Routines, Main scheduling code
3484 //@subsection Index
3485
3486 //@index
3487 //* MainRegTable::  @cindex\s-+MainRegTable
3488 //* StgMainThread::  @cindex\s-+StgMainThread
3489 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3490 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3491 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3492 //* context_switch::  @cindex\s-+context_switch
3493 //* createThread::  @cindex\s-+createThread
3494 //* free_capabilities::  @cindex\s-+free_capabilities
3495 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3496 //* initScheduler::  @cindex\s-+initScheduler
3497 //* interrupted::  @cindex\s-+interrupted
3498 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3499 //* next_thread_id::  @cindex\s-+next_thread_id
3500 //* print_bq::  @cindex\s-+print_bq
3501 //* run_queue_hd::  @cindex\s-+run_queue_hd
3502 //* run_queue_tl::  @cindex\s-+run_queue_tl
3503 //* sched_mutex::  @cindex\s-+sched_mutex
3504 //* schedule::  @cindex\s-+schedule
3505 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3506 //* task_ids::  @cindex\s-+task_ids
3507 //* term_mutex::  @cindex\s-+term_mutex
3508 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3509 //@end index