[project @ 2001-12-07 20:57:53 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.109 2001/12/07 20:57:53 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "PosixSource.h"
78 #include "Rts.h"
79 #include "SchedAPI.h"
80 #include "RtsUtils.h"
81 #include "RtsFlags.h"
82 #include "Storage.h"
83 #include "StgRun.h"
84 #include "StgStartup.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #ifdef PROFILING
99 #include "Proftimer.h"
100 #include "ProfHeap.h"
101 #include "RetainerProfile.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113
114 #include <stdarg.h>
115
116 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
117 //@subsection Variables and Data structures
118
119 /* Main threads:
120  *
121  * These are the threads which clients have requested that we run.  
122  *
123  * In an SMP build, we might have several concurrent clients all
124  * waiting for results, and each one will wait on a condition variable
125  * until the result is available.
126  *
127  * In non-SMP, clients are strictly nested: the first client calls
128  * into the RTS, which might call out again to C with a _ccall_GC, and
129  * eventually re-enter the RTS.
130  *
131  * Main threads information is kept in a linked list:
132  */
133 //@cindex StgMainThread
134 typedef struct StgMainThread_ {
135   StgTSO *         tso;
136   SchedulerStatus  stat;
137   StgClosure **    ret;
138 #ifdef SMP
139   pthread_cond_t wakeup;
140 #endif
141   struct StgMainThread_ *link;
142 } StgMainThread;
143
144 /* Main thread queue.
145  * Locks required: sched_mutex.
146  */
147 static StgMainThread *main_threads;
148
149 /* Thread queues.
150  * Locks required: sched_mutex.
151  */
152 #if defined(GRAN)
153
154 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
155 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
156
157 /* 
158    In GranSim we have a runable and a blocked queue for each processor.
159    In order to minimise code changes new arrays run_queue_hds/tls
160    are created. run_queue_hd is then a short cut (macro) for
161    run_queue_hds[CurrentProc] (see GranSim.h).
162    -- HWL
163 */
164 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
165 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
166 StgTSO *ccalling_threadss[MAX_PROC];
167 /* We use the same global list of threads (all_threads) in GranSim as in
168    the std RTS (i.e. we are cheating). However, we don't use this list in
169    the GranSim specific code at the moment (so we are only potentially
170    cheating).  */
171
172 #else /* !GRAN */
173
174 StgTSO *run_queue_hd, *run_queue_tl;
175 StgTSO *blocked_queue_hd, *blocked_queue_tl;
176 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
177
178 #endif
179
180 /* Linked list of all threads.
181  * Used for detecting garbage collected threads.
182  */
183 StgTSO *all_threads;
184
185 /* Threads suspended in _ccall_GC.
186  */
187 static StgTSO *suspended_ccalling_threads;
188
189 static StgTSO *threadStackOverflow(StgTSO *tso);
190
191 /* KH: The following two flags are shared memory locations.  There is no need
192        to lock them, since they are only unset at the end of a scheduler
193        operation.
194 */
195
196 /* flag set by signal handler to precipitate a context switch */
197 //@cindex context_switch
198 nat context_switch;
199
200 /* if this flag is set as well, give up execution */
201 //@cindex interrupted
202 rtsBool interrupted;
203
204 /* Next thread ID to allocate.
205  * Locks required: sched_mutex
206  */
207 //@cindex next_thread_id
208 StgThreadID next_thread_id = 1;
209
210 /*
211  * Pointers to the state of the current thread.
212  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
213  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
214  */
215  
216 /* The smallest stack size that makes any sense is:
217  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
218  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
219  *  + 1                       (the realworld token for an IO thread)
220  *  + 1                       (the closure to enter)
221  *
222  * A thread with this stack will bomb immediately with a stack
223  * overflow, which will increase its stack size.  
224  */
225
226 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
227
228 /* Free capability list.
229  * Locks required: sched_mutex.
230  */
231 #ifdef SMP
232 Capability *free_capabilities; /* Available capabilities for running threads */
233 nat n_free_capabilities;       /* total number of available capabilities */
234 #else
235 Capability MainCapability;     /* for non-SMP, we have one global capability */
236 #endif
237
238 #if defined(GRAN)
239 StgTSO *CurrentTSO;
240 #endif
241
242 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
243  *  exists - earlier gccs apparently didn't.
244  *  -= chak
245  */
246 StgTSO dummy_tso;
247
248 rtsBool ready_to_gc;
249
250 /* All our current task ids, saved in case we need to kill them later.
251  */
252 #ifdef SMP
253 //@cindex task_ids
254 task_info *task_ids;
255 #endif
256
257 void            addToBlockedQueue ( StgTSO *tso );
258
259 static void     schedule          ( void );
260        void     interruptStgRts   ( void );
261 #if defined(GRAN)
262 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
263 #else
264 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
265 #endif
266
267 static void     detectBlackHoles  ( void );
268
269 #ifdef DEBUG
270 static void sched_belch(char *s, ...);
271 #endif
272
273 #ifdef SMP
274 //@cindex sched_mutex
275 //@cindex term_mutex
276 //@cindex thread_ready_cond
277 //@cindex gc_pending_cond
278 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
279 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
280 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
281 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
282
283 nat await_death;
284 #endif
285
286 #if defined(PAR)
287 StgTSO *LastTSO;
288 rtsTime TimeOfLastYield;
289 rtsBool emitSchedule = rtsTrue;
290 #endif
291
292 #if DEBUG
293 char *whatNext_strs[] = {
294   "ThreadEnterGHC",
295   "ThreadRunGHC",
296   "ThreadEnterInterp",
297   "ThreadKilled",
298   "ThreadComplete"
299 };
300
301 char *threadReturnCode_strs[] = {
302   "HeapOverflow",                       /* might also be StackOverflow */
303   "StackOverflow",
304   "ThreadYielding",
305   "ThreadBlocked",
306   "ThreadFinished"
307 };
308 #endif
309
310 #ifdef PAR
311 StgTSO * createSparkThread(rtsSpark spark);
312 StgTSO * activateSpark (rtsSpark spark);  
313 #endif
314
315 /*
316  * The thread state for the main thread.
317 // ToDo: check whether not needed any more
318 StgTSO   *MainTSO;
319  */
320
321 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
322 //@subsection Main scheduling loop
323
324 /* ---------------------------------------------------------------------------
325    Main scheduling loop.
326
327    We use round-robin scheduling, each thread returning to the
328    scheduler loop when one of these conditions is detected:
329
330       * out of heap space
331       * timer expires (thread yields)
332       * thread blocks
333       * thread ends
334       * stack overflow
335
336    Locking notes:  we acquire the scheduler lock once at the beginning
337    of the scheduler loop, and release it when
338     
339       * running a thread, or
340       * waiting for work, or
341       * waiting for a GC to complete.
342
343    GRAN version:
344      In a GranSim setup this loop iterates over the global event queue.
345      This revolves around the global event queue, which determines what 
346      to do next. Therefore, it's more complicated than either the 
347      concurrent or the parallel (GUM) setup.
348
349    GUM version:
350      GUM iterates over incoming messages.
351      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
352      and sends out a fish whenever it has nothing to do; in-between
353      doing the actual reductions (shared code below) it processes the
354      incoming messages and deals with delayed operations 
355      (see PendingFetches).
356      This is not the ugliest code you could imagine, but it's bloody close.
357
358    ------------------------------------------------------------------------ */
359 //@cindex schedule
360 static void
361 schedule( void )
362 {
363   StgTSO *t;
364   Capability *cap;
365   StgThreadReturnCode ret;
366 #if defined(GRAN)
367   rtsEvent *event;
368 #elif defined(PAR)
369   StgSparkPool *pool;
370   rtsSpark spark;
371   StgTSO *tso;
372   GlobalTaskId pe;
373   rtsBool receivedFinish = rtsFalse;
374 # if defined(DEBUG)
375   nat tp_size, sp_size; // stats only
376 # endif
377 #endif
378   rtsBool was_interrupted = rtsFalse;
379   
380   ACQUIRE_LOCK(&sched_mutex);
381
382 #if defined(GRAN)
383
384   /* set up first event to get things going */
385   /* ToDo: assign costs for system setup and init MainTSO ! */
386   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
387             ContinueThread, 
388             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
389
390   IF_DEBUG(gran,
391            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
392            G_TSO(CurrentTSO, 5));
393
394   if (RtsFlags.GranFlags.Light) {
395     /* Save current time; GranSim Light only */
396     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
397   }      
398
399   event = get_next_event();
400
401   while (event!=(rtsEvent*)NULL) {
402     /* Choose the processor with the next event */
403     CurrentProc = event->proc;
404     CurrentTSO = event->tso;
405
406 #elif defined(PAR)
407
408   while (!receivedFinish) {    /* set by processMessages */
409                                /* when receiving PP_FINISH message         */ 
410 #else
411
412   while (1) {
413
414 #endif
415
416     IF_DEBUG(scheduler, printAllThreads());
417
418     /* If we're interrupted (the user pressed ^C, or some other
419      * termination condition occurred), kill all the currently running
420      * threads.
421      */
422     if (interrupted) {
423       IF_DEBUG(scheduler, sched_belch("interrupted"));
424       deleteAllThreads();
425       interrupted = rtsFalse;
426       was_interrupted = rtsTrue;
427     }
428
429     /* Go through the list of main threads and wake up any
430      * clients whose computations have finished.  ToDo: this
431      * should be done more efficiently without a linear scan
432      * of the main threads list, somehow...
433      */
434 #ifdef SMP
435     { 
436       StgMainThread *m, **prev;
437       prev = &main_threads;
438       for (m = main_threads; m != NULL; m = m->link) {
439         switch (m->tso->what_next) {
440         case ThreadComplete:
441           if (m->ret) {
442             *(m->ret) = (StgClosure *)m->tso->sp[0];
443           }
444           *prev = m->link;
445           m->stat = Success;
446           pthread_cond_broadcast(&m->wakeup);
447           break;
448         case ThreadKilled:
449           if (m->ret) *(m->ret) = NULL;
450           *prev = m->link;
451           if (was_interrupted) {
452             m->stat = Interrupted;
453           } else {
454             m->stat = Killed;
455           }
456           pthread_cond_broadcast(&m->wakeup);
457           break;
458         default:
459           break;
460         }
461       }
462     }
463
464 #else // not SMP
465
466 # if defined(PAR)
467     /* in GUM do this only on the Main PE */
468     if (IAmMainThread)
469 # endif
470     /* If our main thread has finished or been killed, return.
471      */
472     {
473       StgMainThread *m = main_threads;
474       if (m->tso->what_next == ThreadComplete
475           || m->tso->what_next == ThreadKilled) {
476         main_threads = main_threads->link;
477         if (m->tso->what_next == ThreadComplete) {
478           /* we finished successfully, fill in the return value */
479           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
480           m->stat = Success;
481           return;
482         } else {
483           if (m->ret) { *(m->ret) = NULL; };
484           if (was_interrupted) {
485             m->stat = Interrupted;
486           } else {
487             m->stat = Killed;
488           }
489           return;
490         }
491       }
492     }
493 #endif
494
495     /* Top up the run queue from our spark pool.  We try to make the
496      * number of threads in the run queue equal to the number of
497      * free capabilities.
498      */
499 #if defined(SMP)
500     {
501       nat n = n_free_capabilities;
502       StgTSO *tso = run_queue_hd;
503
504       /* Count the run queue */
505       while (n > 0 && tso != END_TSO_QUEUE) {
506         tso = tso->link;
507         n--;
508       }
509
510       for (; n > 0; n--) {
511         StgClosure *spark;
512         spark = findSpark(rtsFalse);
513         if (spark == NULL) {
514           break; /* no more sparks in the pool */
515         } else {
516           /* I'd prefer this to be done in activateSpark -- HWL */
517           /* tricky - it needs to hold the scheduler lock and
518            * not try to re-acquire it -- SDM */
519           createSparkThread(spark);       
520           IF_DEBUG(scheduler,
521                    sched_belch("==^^ turning spark of closure %p into a thread",
522                                (StgClosure *)spark));
523         }
524       }
525       /* We need to wake up the other tasks if we just created some
526        * work for them.
527        */
528       if (n_free_capabilities - n > 1) {
529           pthread_cond_signal(&thread_ready_cond);
530       }
531     }
532 #endif // SMP
533
534     /* check for signals each time around the scheduler */
535 #ifndef mingw32_TARGET_OS
536     if (signals_pending()) {
537       startSignalHandlers();
538     }
539 #endif
540
541     /* Check whether any waiting threads need to be woken up.  If the
542      * run queue is empty, and there are no other tasks running, we
543      * can wait indefinitely for something to happen.
544      * ToDo: what if another client comes along & requests another
545      * main thread?
546      */
547     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
548       awaitEvent(
549            (run_queue_hd == END_TSO_QUEUE)
550 #ifdef SMP
551         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
552 #endif
553         );
554     }
555     /* we can be interrupted while waiting for I/O... */
556     if (interrupted) continue;
557
558     /* 
559      * Detect deadlock: when we have no threads to run, there are no
560      * threads waiting on I/O or sleeping, and all the other tasks are
561      * waiting for work, we must have a deadlock of some description.
562      *
563      * We first try to find threads blocked on themselves (ie. black
564      * holes), and generate NonTermination exceptions where necessary.
565      *
566      * If no threads are black holed, we have a deadlock situation, so
567      * inform all the main threads.
568      */
569 #ifndef PAR
570     if (blocked_queue_hd == END_TSO_QUEUE
571         && run_queue_hd == END_TSO_QUEUE
572         && sleeping_queue == END_TSO_QUEUE
573 #ifdef SMP
574         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
575 #endif
576         )
577     {
578         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
579         GarbageCollect(GetRoots,rtsTrue);
580         if (blocked_queue_hd == END_TSO_QUEUE
581             && run_queue_hd == END_TSO_QUEUE
582             && sleeping_queue == END_TSO_QUEUE) {
583             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
584             detectBlackHoles();
585             if (run_queue_hd == END_TSO_QUEUE) {
586                 StgMainThread *m = main_threads;
587 #ifdef SMP
588                 for (; m != NULL; m = m->link) {
589                     deleteThread(m->tso);
590                     m->ret = NULL;
591                     m->stat = Deadlock;
592                     pthread_cond_broadcast(&m->wakeup);
593                 }
594                 main_threads = NULL;
595 #else
596                 deleteThread(m->tso);
597                 m->ret = NULL;
598                 m->stat = Deadlock;
599                 main_threads = m->link;
600                 return;
601 #endif
602             }
603         }
604     }
605 #elif defined(PAR)
606     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
607 #endif
608
609 #ifdef SMP
610     /* If there's a GC pending, don't do anything until it has
611      * completed.
612      */
613     if (ready_to_gc) {
614       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
615       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
616     }
617     
618     /* block until we've got a thread on the run queue and a free
619      * capability.
620      */
621     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
622       IF_DEBUG(scheduler, sched_belch("waiting for work"));
623       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
624       IF_DEBUG(scheduler, sched_belch("work now available"));
625     }
626 #endif
627
628 #if defined(GRAN)
629
630     if (RtsFlags.GranFlags.Light)
631       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
632
633     /* adjust time based on time-stamp */
634     if (event->time > CurrentTime[CurrentProc] &&
635         event->evttype != ContinueThread)
636       CurrentTime[CurrentProc] = event->time;
637     
638     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
639     if (!RtsFlags.GranFlags.Light)
640       handleIdlePEs();
641
642     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
643
644     /* main event dispatcher in GranSim */
645     switch (event->evttype) {
646       /* Should just be continuing execution */
647     case ContinueThread:
648       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
649       /* ToDo: check assertion
650       ASSERT(run_queue_hd != (StgTSO*)NULL &&
651              run_queue_hd != END_TSO_QUEUE);
652       */
653       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
654       if (!RtsFlags.GranFlags.DoAsyncFetch &&
655           procStatus[CurrentProc]==Fetching) {
656         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
657               CurrentTSO->id, CurrentTSO, CurrentProc);
658         goto next_thread;
659       } 
660       /* Ignore ContinueThreads for completed threads */
661       if (CurrentTSO->what_next == ThreadComplete) {
662         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
663               CurrentTSO->id, CurrentTSO, CurrentProc);
664         goto next_thread;
665       } 
666       /* Ignore ContinueThreads for threads that are being migrated */
667       if (PROCS(CurrentTSO)==Nowhere) { 
668         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
669               CurrentTSO->id, CurrentTSO, CurrentProc);
670         goto next_thread;
671       }
672       /* The thread should be at the beginning of the run queue */
673       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
674         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
675               CurrentTSO->id, CurrentTSO, CurrentProc);
676         break; // run the thread anyway
677       }
678       /*
679       new_event(proc, proc, CurrentTime[proc],
680                 FindWork,
681                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
682       goto next_thread; 
683       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
684       break; // now actually run the thread; DaH Qu'vam yImuHbej 
685
686     case FetchNode:
687       do_the_fetchnode(event);
688       goto next_thread;             /* handle next event in event queue  */
689       
690     case GlobalBlock:
691       do_the_globalblock(event);
692       goto next_thread;             /* handle next event in event queue  */
693       
694     case FetchReply:
695       do_the_fetchreply(event);
696       goto next_thread;             /* handle next event in event queue  */
697       
698     case UnblockThread:   /* Move from the blocked queue to the tail of */
699       do_the_unblock(event);
700       goto next_thread;             /* handle next event in event queue  */
701       
702     case ResumeThread:  /* Move from the blocked queue to the tail of */
703       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
704       event->tso->gran.blocktime += 
705         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
706       do_the_startthread(event);
707       goto next_thread;             /* handle next event in event queue  */
708       
709     case StartThread:
710       do_the_startthread(event);
711       goto next_thread;             /* handle next event in event queue  */
712       
713     case MoveThread:
714       do_the_movethread(event);
715       goto next_thread;             /* handle next event in event queue  */
716       
717     case MoveSpark:
718       do_the_movespark(event);
719       goto next_thread;             /* handle next event in event queue  */
720       
721     case FindWork:
722       do_the_findwork(event);
723       goto next_thread;             /* handle next event in event queue  */
724       
725     default:
726       barf("Illegal event type %u\n", event->evttype);
727     }  /* switch */
728     
729     /* This point was scheduler_loop in the old RTS */
730
731     IF_DEBUG(gran, belch("GRAN: after main switch"));
732
733     TimeOfLastEvent = CurrentTime[CurrentProc];
734     TimeOfNextEvent = get_time_of_next_event();
735     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
736     // CurrentTSO = ThreadQueueHd;
737
738     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
739                          TimeOfNextEvent));
740
741     if (RtsFlags.GranFlags.Light) 
742       GranSimLight_leave_system(event, &ActiveTSO); 
743
744     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
745
746     IF_DEBUG(gran, 
747              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
748
749     /* in a GranSim setup the TSO stays on the run queue */
750     t = CurrentTSO;
751     /* Take a thread from the run queue. */
752     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
753
754     IF_DEBUG(gran, 
755              fprintf(stderr, "GRAN: About to run current thread, which is\n");
756              G_TSO(t,5));
757
758     context_switch = 0; // turned on via GranYield, checking events and time slice
759
760     IF_DEBUG(gran, 
761              DumpGranEvent(GR_SCHEDULE, t));
762
763     procStatus[CurrentProc] = Busy;
764
765 #elif defined(PAR)
766     if (PendingFetches != END_BF_QUEUE) {
767         processFetches();
768     }
769
770     /* ToDo: phps merge with spark activation above */
771     /* check whether we have local work and send requests if we have none */
772     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
773       /* :-[  no local threads => look out for local sparks */
774       /* the spark pool for the current PE */
775       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
776       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
777           pool->hd < pool->tl) {
778         /* 
779          * ToDo: add GC code check that we really have enough heap afterwards!!
780          * Old comment:
781          * If we're here (no runnable threads) and we have pending
782          * sparks, we must have a space problem.  Get enough space
783          * to turn one of those pending sparks into a
784          * thread... 
785          */
786
787         spark = findSpark(rtsFalse);                /* get a spark */
788         if (spark != (rtsSpark) NULL) {
789           tso = activateSpark(spark);       /* turn the spark into a thread */
790           IF_PAR_DEBUG(schedule,
791                        belch("==== schedule: Created TSO %d (%p); %d threads active",
792                              tso->id, tso, advisory_thread_count));
793
794           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
795             belch("==^^ failed to activate spark");
796             goto next_thread;
797           }               /* otherwise fall through & pick-up new tso */
798         } else {
799           IF_PAR_DEBUG(verbose,
800                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
801                              spark_queue_len(pool)));
802           goto next_thread;
803         }
804       }
805
806       /* If we still have no work we need to send a FISH to get a spark
807          from another PE 
808       */
809       if (EMPTY_RUN_QUEUE()) {
810       /* =8-[  no local sparks => look for work on other PEs */
811         /*
812          * We really have absolutely no work.  Send out a fish
813          * (there may be some out there already), and wait for
814          * something to arrive.  We clearly can't run any threads
815          * until a SCHEDULE or RESUME arrives, and so that's what
816          * we're hoping to see.  (Of course, we still have to
817          * respond to other types of messages.)
818          */
819         TIME now = msTime() /*CURRENT_TIME*/;
820         IF_PAR_DEBUG(verbose, 
821                      belch("--  now=%ld", now));
822         IF_PAR_DEBUG(verbose,
823                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
824                          (last_fish_arrived_at!=0 &&
825                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
826                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
827                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
828                              last_fish_arrived_at,
829                              RtsFlags.ParFlags.fishDelay, now);
830                      });
831         
832         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
833             (last_fish_arrived_at==0 ||
834              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
835           /* outstandingFishes is set in sendFish, processFish;
836              avoid flooding system with fishes via delay */
837           pe = choosePE();
838           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
839                    NEW_FISH_HUNGER);
840
841           // Global statistics: count no. of fishes
842           if (RtsFlags.ParFlags.ParStats.Global &&
843               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
844             globalParStats.tot_fish_mess++;
845           }
846         }
847       
848         receivedFinish = processMessages();
849         goto next_thread;
850       }
851     } else if (PacketsWaiting()) {  /* Look for incoming messages */
852       receivedFinish = processMessages();
853     }
854
855     /* Now we are sure that we have some work available */
856     ASSERT(run_queue_hd != END_TSO_QUEUE);
857
858     /* Take a thread from the run queue, if we have work */
859     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
860     IF_DEBUG(sanity,checkTSO(t));
861
862     /* ToDo: write something to the log-file
863     if (RTSflags.ParFlags.granSimStats && !sameThread)
864         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
865
866     CurrentTSO = t;
867     */
868     /* the spark pool for the current PE */
869     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
870
871     IF_DEBUG(scheduler, 
872              belch("--=^ %d threads, %d sparks on [%#x]", 
873                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
874
875 #if 1
876     if (0 && RtsFlags.ParFlags.ParStats.Full && 
877         t && LastTSO && t->id != LastTSO->id && 
878         LastTSO->why_blocked == NotBlocked && 
879         LastTSO->what_next != ThreadComplete) {
880       // if previously scheduled TSO not blocked we have to record the context switch
881       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
882                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
883     }
884
885     if (RtsFlags.ParFlags.ParStats.Full && 
886         (emitSchedule /* forced emit */ ||
887         (t && LastTSO && t->id != LastTSO->id))) {
888       /* 
889          we are running a different TSO, so write a schedule event to log file
890          NB: If we use fair scheduling we also have to write  a deschedule 
891              event for LastTSO; with unfair scheduling we know that the
892              previous tso has blocked whenever we switch to another tso, so
893              we don't need it in GUM for now
894       */
895       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
896                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
897       emitSchedule = rtsFalse;
898     }
899      
900 #endif
901 #else /* !GRAN && !PAR */
902   
903     /* grab a thread from the run queue
904      */
905     ASSERT(run_queue_hd != END_TSO_QUEUE);
906     t = POP_RUN_QUEUE();
907
908     // Sanity check the thread we're about to run.  This can be
909     // expensive if there is lots of thread switching going on...
910     IF_DEBUG(sanity,checkTSO(t));
911
912 #endif
913     
914     /* grab a capability
915      */
916 #ifdef SMP
917     cap = free_capabilities;
918     free_capabilities = cap->link;
919     n_free_capabilities--;
920 #else
921     cap = &MainCapability;
922 #endif
923
924     cap->r.rCurrentTSO = t;
925     
926     /* context switches are now initiated by the timer signal, unless
927      * the user specified "context switch as often as possible", with
928      * +RTS -C0
929      */
930     if (
931 #ifdef PROFILING
932         RtsFlags.ProfFlags.profileInterval == 0 ||
933 #endif
934         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
935          && (run_queue_hd != END_TSO_QUEUE
936              || blocked_queue_hd != END_TSO_QUEUE
937              || sleeping_queue != END_TSO_QUEUE)))
938         context_switch = 1;
939     else
940         context_switch = 0;
941
942     RELEASE_LOCK(&sched_mutex);
943
944     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
945                               t->id, t, whatNext_strs[t->what_next]));
946
947 #ifdef PROFILING
948     startHeapProfTimer();
949 #endif
950
951     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
952     /* Run the current thread 
953      */
954     switch (cap->r.rCurrentTSO->what_next) {
955     case ThreadKilled:
956     case ThreadComplete:
957         /* Thread already finished, return to scheduler. */
958         ret = ThreadFinished;
959         break;
960     case ThreadEnterGHC:
961         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
962         break;
963     case ThreadRunGHC:
964         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
965         break;
966     case ThreadEnterInterp:
967         ret = interpretBCO(cap);
968         break;
969     default:
970       barf("schedule: invalid what_next field");
971     }
972     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
973     
974     /* Costs for the scheduler are assigned to CCS_SYSTEM */
975 #ifdef PROFILING
976     stopHeapProfTimer();
977     CCCS = CCS_SYSTEM;
978 #endif
979     
980     ACQUIRE_LOCK(&sched_mutex);
981
982 #ifdef SMP
983     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
984 #elif !defined(GRAN) && !defined(PAR)
985     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
986 #endif
987     t = cap->r.rCurrentTSO;
988     
989 #if defined(PAR)
990     /* HACK 675: if the last thread didn't yield, make sure to print a 
991        SCHEDULE event to the log file when StgRunning the next thread, even
992        if it is the same one as before */
993     LastTSO = t; 
994     TimeOfLastYield = CURRENT_TIME;
995 #endif
996
997     switch (ret) {
998     case HeapOverflow:
999 #if defined(GRAN)
1000       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1001       globalGranStats.tot_heapover++;
1002 #elif defined(PAR)
1003       globalParStats.tot_heapover++;
1004 #endif
1005
1006       // did the task ask for a large block?
1007       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1008           // if so, get one and push it on the front of the nursery.
1009           bdescr *bd;
1010           nat blocks;
1011           
1012           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1013
1014           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1015                                    t->id, t,
1016                                    whatNext_strs[t->what_next], blocks));
1017
1018           // don't do this if it would push us over the
1019           // alloc_blocks_lim limit; we'll GC first.
1020           if (alloc_blocks + blocks < alloc_blocks_lim) {
1021
1022               alloc_blocks += blocks;
1023               bd = allocGroup( blocks );
1024
1025               // link the new group into the list
1026               bd->link = cap->r.rCurrentNursery;
1027               bd->u.back = cap->r.rCurrentNursery->u.back;
1028               if (cap->r.rCurrentNursery->u.back != NULL) {
1029                   cap->r.rCurrentNursery->u.back->link = bd;
1030               } else {
1031                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1032                          g0s0->blocks == cap->r.rNursery);
1033                   cap->r.rNursery = g0s0->blocks = bd;
1034               }           
1035               cap->r.rCurrentNursery->u.back = bd;
1036
1037               // initialise it as a nursery block
1038               bd->step = g0s0;
1039               bd->gen_no = 0;
1040               bd->flags = 0;
1041               bd->free = bd->start;
1042
1043               // don't forget to update the block count in g0s0.
1044               g0s0->n_blocks += blocks;
1045               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1046
1047               // now update the nursery to point to the new block
1048               cap->r.rCurrentNursery = bd;
1049
1050               // we might be unlucky and have another thread get on the
1051               // run queue before us and steal the large block, but in that
1052               // case the thread will just end up requesting another large
1053               // block.
1054               PUSH_ON_RUN_QUEUE(t);
1055               break;
1056           }
1057       }
1058
1059       /* make all the running tasks block on a condition variable,
1060        * maybe set context_switch and wait till they all pile in,
1061        * then have them wait on a GC condition variable.
1062        */
1063       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1064                                t->id, t, whatNext_strs[t->what_next]));
1065       threadPaused(t);
1066 #if defined(GRAN)
1067       ASSERT(!is_on_queue(t,CurrentProc));
1068 #elif defined(PAR)
1069       /* Currently we emit a DESCHEDULE event before GC in GUM.
1070          ToDo: either add separate event to distinguish SYSTEM time from rest
1071                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1072       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1073         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1074                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1075         emitSchedule = rtsTrue;
1076       }
1077 #endif
1078       
1079       ready_to_gc = rtsTrue;
1080       context_switch = 1;               /* stop other threads ASAP */
1081       PUSH_ON_RUN_QUEUE(t);
1082       /* actual GC is done at the end of the while loop */
1083       break;
1084       
1085     case StackOverflow:
1086 #if defined(GRAN)
1087       IF_DEBUG(gran, 
1088                DumpGranEvent(GR_DESCHEDULE, t));
1089       globalGranStats.tot_stackover++;
1090 #elif defined(PAR)
1091       // IF_DEBUG(par, 
1092       // DumpGranEvent(GR_DESCHEDULE, t);
1093       globalParStats.tot_stackover++;
1094 #endif
1095       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1096                                t->id, t, whatNext_strs[t->what_next]));
1097       /* just adjust the stack for this thread, then pop it back
1098        * on the run queue.
1099        */
1100       threadPaused(t);
1101       { 
1102         StgMainThread *m;
1103         /* enlarge the stack */
1104         StgTSO *new_t = threadStackOverflow(t);
1105         
1106         /* This TSO has moved, so update any pointers to it from the
1107          * main thread stack.  It better not be on any other queues...
1108          * (it shouldn't be).
1109          */
1110         for (m = main_threads; m != NULL; m = m->link) {
1111           if (m->tso == t) {
1112             m->tso = new_t;
1113           }
1114         }
1115         threadPaused(new_t);
1116         PUSH_ON_RUN_QUEUE(new_t);
1117       }
1118       break;
1119
1120     case ThreadYielding:
1121 #if defined(GRAN)
1122       IF_DEBUG(gran, 
1123                DumpGranEvent(GR_DESCHEDULE, t));
1124       globalGranStats.tot_yields++;
1125 #elif defined(PAR)
1126       // IF_DEBUG(par, 
1127       // DumpGranEvent(GR_DESCHEDULE, t);
1128       globalParStats.tot_yields++;
1129 #endif
1130       /* put the thread back on the run queue.  Then, if we're ready to
1131        * GC, check whether this is the last task to stop.  If so, wake
1132        * up the GC thread.  getThread will block during a GC until the
1133        * GC is finished.
1134        */
1135       IF_DEBUG(scheduler,
1136                if (t->what_next == ThreadEnterInterp) {
1137                    /* ToDo: or maybe a timer expired when we were in Hugs?
1138                     * or maybe someone hit ctrl-C
1139                     */
1140                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1141                          t->id, t, whatNext_strs[t->what_next]);
1142                } else {
1143                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1144                          t->id, t, whatNext_strs[t->what_next]);
1145                }
1146                );
1147
1148       threadPaused(t);
1149
1150       IF_DEBUG(sanity,
1151                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1152                checkTSO(t));
1153       ASSERT(t->link == END_TSO_QUEUE);
1154 #if defined(GRAN)
1155       ASSERT(!is_on_queue(t,CurrentProc));
1156
1157       IF_DEBUG(sanity,
1158                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1159                checkThreadQsSanity(rtsTrue));
1160 #endif
1161 #if defined(PAR)
1162       if (RtsFlags.ParFlags.doFairScheduling) { 
1163         /* this does round-robin scheduling; good for concurrency */
1164         APPEND_TO_RUN_QUEUE(t);
1165       } else {
1166         /* this does unfair scheduling; good for parallelism */
1167         PUSH_ON_RUN_QUEUE(t);
1168       }
1169 #else
1170       /* this does round-robin scheduling; good for concurrency */
1171       APPEND_TO_RUN_QUEUE(t);
1172 #endif
1173 #if defined(GRAN)
1174       /* add a ContinueThread event to actually process the thread */
1175       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1176                 ContinueThread,
1177                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1178       IF_GRAN_DEBUG(bq, 
1179                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1180                G_EVENTQ(0);
1181                G_CURR_THREADQ(0));
1182 #endif /* GRAN */
1183       break;
1184       
1185     case ThreadBlocked:
1186 #if defined(GRAN)
1187       IF_DEBUG(scheduler,
1188                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1189                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1190                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1191
1192       // ??? needed; should emit block before
1193       IF_DEBUG(gran, 
1194                DumpGranEvent(GR_DESCHEDULE, t)); 
1195       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1196       /*
1197         ngoq Dogh!
1198       ASSERT(procStatus[CurrentProc]==Busy || 
1199               ((procStatus[CurrentProc]==Fetching) && 
1200               (t->block_info.closure!=(StgClosure*)NULL)));
1201       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1202           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1203             procStatus[CurrentProc]==Fetching)) 
1204         procStatus[CurrentProc] = Idle;
1205       */
1206 #elif defined(PAR)
1207       IF_DEBUG(scheduler,
1208                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1209                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1210       IF_PAR_DEBUG(bq,
1211
1212                    if (t->block_info.closure!=(StgClosure*)NULL) 
1213                      print_bq(t->block_info.closure));
1214
1215       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1216       blockThread(t);
1217
1218       /* whatever we schedule next, we must log that schedule */
1219       emitSchedule = rtsTrue;
1220
1221 #else /* !GRAN */
1222       /* don't need to do anything.  Either the thread is blocked on
1223        * I/O, in which case we'll have called addToBlockedQueue
1224        * previously, or it's blocked on an MVar or Blackhole, in which
1225        * case it'll be on the relevant queue already.
1226        */
1227       IF_DEBUG(scheduler,
1228                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1229                printThreadBlockage(t);
1230                fprintf(stderr, "\n"));
1231
1232       /* Only for dumping event to log file 
1233          ToDo: do I need this in GranSim, too?
1234       blockThread(t);
1235       */
1236 #endif
1237       threadPaused(t);
1238       break;
1239       
1240     case ThreadFinished:
1241       /* Need to check whether this was a main thread, and if so, signal
1242        * the task that started it with the return value.  If we have no
1243        * more main threads, we probably need to stop all the tasks until
1244        * we get a new one.
1245        */
1246       /* We also end up here if the thread kills itself with an
1247        * uncaught exception, see Exception.hc.
1248        */
1249       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1250 #if defined(GRAN)
1251       endThread(t, CurrentProc); // clean-up the thread
1252 #elif defined(PAR)
1253       /* For now all are advisory -- HWL */
1254       //if(t->priority==AdvisoryPriority) ??
1255       advisory_thread_count--;
1256       
1257 # ifdef DIST
1258       if(t->dist.priority==RevalPriority)
1259         FinishReval(t);
1260 # endif
1261       
1262       if (RtsFlags.ParFlags.ParStats.Full &&
1263           !RtsFlags.ParFlags.ParStats.Suppressed) 
1264         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1265 #endif
1266       break;
1267       
1268     default:
1269       barf("schedule: invalid thread return code %d", (int)ret);
1270     }
1271     
1272 #ifdef SMP
1273     cap->link = free_capabilities;
1274     free_capabilities = cap;
1275     n_free_capabilities++;
1276 #endif
1277
1278 #ifdef PROFILING
1279     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1280         GarbageCollect(GetRoots, rtsTrue);
1281         heapCensus();
1282         performHeapProfile = rtsFalse;
1283         ready_to_gc = rtsFalse; // we already GC'd
1284     }
1285 #endif
1286
1287 #ifdef SMP
1288     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1289 #else
1290     if (ready_to_gc) 
1291 #endif
1292       {
1293       /* everybody back, start the GC.
1294        * Could do it in this thread, or signal a condition var
1295        * to do it in another thread.  Either way, we need to
1296        * broadcast on gc_pending_cond afterward.
1297        */
1298 #ifdef SMP
1299       IF_DEBUG(scheduler,sched_belch("doing GC"));
1300 #endif
1301       GarbageCollect(GetRoots,rtsFalse);
1302       ready_to_gc = rtsFalse;
1303 #ifdef SMP
1304       pthread_cond_broadcast(&gc_pending_cond);
1305 #endif
1306 #if defined(GRAN)
1307       /* add a ContinueThread event to continue execution of current thread */
1308       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1309                 ContinueThread,
1310                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1311       IF_GRAN_DEBUG(bq, 
1312                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1313                G_EVENTQ(0);
1314                G_CURR_THREADQ(0));
1315 #endif /* GRAN */
1316     }
1317
1318 #if defined(GRAN)
1319   next_thread:
1320     IF_GRAN_DEBUG(unused,
1321                   print_eventq(EventHd));
1322
1323     event = get_next_event();
1324 #elif defined(PAR)
1325   next_thread:
1326     /* ToDo: wait for next message to arrive rather than busy wait */
1327 #endif /* GRAN */
1328
1329   } /* end of while(1) */
1330
1331   IF_PAR_DEBUG(verbose,
1332                belch("== Leaving schedule() after having received Finish"));
1333 }
1334
1335 /* ---------------------------------------------------------------------------
1336  * deleteAllThreads():  kill all the live threads.
1337  *
1338  * This is used when we catch a user interrupt (^C), before performing
1339  * any necessary cleanups and running finalizers.
1340  * ------------------------------------------------------------------------- */
1341    
1342 void deleteAllThreads ( void )
1343 {
1344   StgTSO* t;
1345   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1346   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1347       deleteThread(t);
1348   }
1349   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1350       deleteThread(t);
1351   }
1352   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1353       deleteThread(t);
1354   }
1355   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1356   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1357   sleeping_queue = END_TSO_QUEUE;
1358 }
1359
1360 /* startThread and  insertThread are now in GranSim.c -- HWL */
1361
1362 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1363 //@subsection Suspend and Resume
1364
1365 /* ---------------------------------------------------------------------------
1366  * Suspending & resuming Haskell threads.
1367  * 
1368  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1369  * its capability before calling the C function.  This allows another
1370  * task to pick up the capability and carry on running Haskell
1371  * threads.  It also means that if the C call blocks, it won't lock
1372  * the whole system.
1373  *
1374  * The Haskell thread making the C call is put to sleep for the
1375  * duration of the call, on the susepended_ccalling_threads queue.  We
1376  * give out a token to the task, which it can use to resume the thread
1377  * on return from the C function.
1378  * ------------------------------------------------------------------------- */
1379    
1380 StgInt
1381 suspendThread( StgRegTable *reg )
1382 {
1383   nat tok;
1384   Capability *cap;
1385
1386   // assume that *reg is a pointer to the StgRegTable part of a Capability
1387   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1388
1389   ACQUIRE_LOCK(&sched_mutex);
1390
1391   IF_DEBUG(scheduler,
1392            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1393
1394   threadPaused(cap->r.rCurrentTSO);
1395   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1396   suspended_ccalling_threads = cap->r.rCurrentTSO;
1397
1398   /* Use the thread ID as the token; it should be unique */
1399   tok = cap->r.rCurrentTSO->id;
1400
1401 #ifdef SMP
1402   cap->link = free_capabilities;
1403   free_capabilities = cap;
1404   n_free_capabilities++;
1405 #endif
1406
1407   RELEASE_LOCK(&sched_mutex);
1408   return tok; 
1409 }
1410
1411 StgRegTable *
1412 resumeThread( StgInt tok )
1413 {
1414   StgTSO *tso, **prev;
1415   Capability *cap;
1416
1417   ACQUIRE_LOCK(&sched_mutex);
1418
1419   prev = &suspended_ccalling_threads;
1420   for (tso = suspended_ccalling_threads; 
1421        tso != END_TSO_QUEUE; 
1422        prev = &tso->link, tso = tso->link) {
1423     if (tso->id == (StgThreadID)tok) {
1424       *prev = tso->link;
1425       break;
1426     }
1427   }
1428   if (tso == END_TSO_QUEUE) {
1429     barf("resumeThread: thread not found");
1430   }
1431   tso->link = END_TSO_QUEUE;
1432
1433 #ifdef SMP
1434   while (free_capabilities == NULL) {
1435     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1436     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1437     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1438   }
1439   cap = free_capabilities;
1440   free_capabilities = cap->link;
1441   n_free_capabilities--;
1442 #else  
1443   cap = &MainCapability;
1444 #endif
1445
1446   cap->r.rCurrentTSO = tso;
1447
1448   RELEASE_LOCK(&sched_mutex);
1449   return &cap->r;
1450 }
1451
1452
1453 /* ---------------------------------------------------------------------------
1454  * Static functions
1455  * ------------------------------------------------------------------------ */
1456 static void unblockThread(StgTSO *tso);
1457
1458 /* ---------------------------------------------------------------------------
1459  * Comparing Thread ids.
1460  *
1461  * This is used from STG land in the implementation of the
1462  * instances of Eq/Ord for ThreadIds.
1463  * ------------------------------------------------------------------------ */
1464
1465 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1466
1467   StgThreadID id1 = tso1->id; 
1468   StgThreadID id2 = tso2->id;
1469  
1470   if (id1 < id2) return (-1);
1471   if (id1 > id2) return 1;
1472   return 0;
1473 }
1474
1475 /* ---------------------------------------------------------------------------
1476  * Fetching the ThreadID from an StgTSO.
1477  *
1478  * This is used in the implementation of Show for ThreadIds.
1479  * ------------------------------------------------------------------------ */
1480 int rts_getThreadId(const StgTSO *tso) 
1481 {
1482   return tso->id;
1483 }
1484
1485 /* ---------------------------------------------------------------------------
1486    Create a new thread.
1487
1488    The new thread starts with the given stack size.  Before the
1489    scheduler can run, however, this thread needs to have a closure
1490    (and possibly some arguments) pushed on its stack.  See
1491    pushClosure() in Schedule.h.
1492
1493    createGenThread() and createIOThread() (in SchedAPI.h) are
1494    convenient packaged versions of this function.
1495
1496    currently pri (priority) is only used in a GRAN setup -- HWL
1497    ------------------------------------------------------------------------ */
1498 //@cindex createThread
1499 #if defined(GRAN)
1500 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1501 StgTSO *
1502 createThread(nat stack_size, StgInt pri)
1503 {
1504   return createThread_(stack_size, rtsFalse, pri);
1505 }
1506
1507 static StgTSO *
1508 createThread_(nat size, rtsBool have_lock, StgInt pri)
1509 {
1510 #else
1511 StgTSO *
1512 createThread(nat stack_size)
1513 {
1514   return createThread_(stack_size, rtsFalse);
1515 }
1516
1517 static StgTSO *
1518 createThread_(nat size, rtsBool have_lock)
1519 {
1520 #endif
1521
1522     StgTSO *tso;
1523     nat stack_size;
1524
1525     /* First check whether we should create a thread at all */
1526 #if defined(PAR)
1527   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1528   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1529     threadsIgnored++;
1530     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1531           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1532     return END_TSO_QUEUE;
1533   }
1534   threadsCreated++;
1535 #endif
1536
1537 #if defined(GRAN)
1538   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1539 #endif
1540
1541   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1542
1543   /* catch ridiculously small stack sizes */
1544   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1545     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1546   }
1547
1548   stack_size = size - TSO_STRUCT_SIZEW;
1549
1550   tso = (StgTSO *)allocate(size);
1551   TICK_ALLOC_TSO(stack_size, 0);
1552
1553   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1554 #if defined(GRAN)
1555   SET_GRAN_HDR(tso, ThisPE);
1556 #endif
1557   tso->what_next     = ThreadEnterGHC;
1558
1559   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1560    * protect the increment operation on next_thread_id.
1561    * In future, we could use an atomic increment instead.
1562    */
1563   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1564   tso->id = next_thread_id++; 
1565   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1566
1567   tso->why_blocked  = NotBlocked;
1568   tso->blocked_exceptions = NULL;
1569
1570   tso->stack_size   = stack_size;
1571   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1572                               - TSO_STRUCT_SIZEW;
1573   tso->sp           = (P_)&(tso->stack) + stack_size;
1574
1575 #ifdef PROFILING
1576   tso->prof.CCCS = CCS_MAIN;
1577 #endif
1578
1579   /* put a stop frame on the stack */
1580   tso->sp -= sizeofW(StgStopFrame);
1581   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1582   tso->su = (StgUpdateFrame*)tso->sp;
1583
1584   // ToDo: check this
1585 #if defined(GRAN)
1586   tso->link = END_TSO_QUEUE;
1587   /* uses more flexible routine in GranSim */
1588   insertThread(tso, CurrentProc);
1589 #else
1590   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1591    * from its creation
1592    */
1593 #endif
1594
1595 #if defined(GRAN) 
1596   if (RtsFlags.GranFlags.GranSimStats.Full) 
1597     DumpGranEvent(GR_START,tso);
1598 #elif defined(PAR)
1599   if (RtsFlags.ParFlags.ParStats.Full) 
1600     DumpGranEvent(GR_STARTQ,tso);
1601   /* HACk to avoid SCHEDULE 
1602      LastTSO = tso; */
1603 #endif
1604
1605   /* Link the new thread on the global thread list.
1606    */
1607   tso->global_link = all_threads;
1608   all_threads = tso;
1609
1610 #if defined(DIST)
1611   tso->dist.priority = MandatoryPriority; //by default that is...
1612 #endif
1613
1614 #if defined(GRAN)
1615   tso->gran.pri = pri;
1616 # if defined(DEBUG)
1617   tso->gran.magic = TSO_MAGIC; // debugging only
1618 # endif
1619   tso->gran.sparkname   = 0;
1620   tso->gran.startedat   = CURRENT_TIME; 
1621   tso->gran.exported    = 0;
1622   tso->gran.basicblocks = 0;
1623   tso->gran.allocs      = 0;
1624   tso->gran.exectime    = 0;
1625   tso->gran.fetchtime   = 0;
1626   tso->gran.fetchcount  = 0;
1627   tso->gran.blocktime   = 0;
1628   tso->gran.blockcount  = 0;
1629   tso->gran.blockedat   = 0;
1630   tso->gran.globalsparks = 0;
1631   tso->gran.localsparks  = 0;
1632   if (RtsFlags.GranFlags.Light)
1633     tso->gran.clock  = Now; /* local clock */
1634   else
1635     tso->gran.clock  = 0;
1636
1637   IF_DEBUG(gran,printTSO(tso));
1638 #elif defined(PAR)
1639 # if defined(DEBUG)
1640   tso->par.magic = TSO_MAGIC; // debugging only
1641 # endif
1642   tso->par.sparkname   = 0;
1643   tso->par.startedat   = CURRENT_TIME; 
1644   tso->par.exported    = 0;
1645   tso->par.basicblocks = 0;
1646   tso->par.allocs      = 0;
1647   tso->par.exectime    = 0;
1648   tso->par.fetchtime   = 0;
1649   tso->par.fetchcount  = 0;
1650   tso->par.blocktime   = 0;
1651   tso->par.blockcount  = 0;
1652   tso->par.blockedat   = 0;
1653   tso->par.globalsparks = 0;
1654   tso->par.localsparks  = 0;
1655 #endif
1656
1657 #if defined(GRAN)
1658   globalGranStats.tot_threads_created++;
1659   globalGranStats.threads_created_on_PE[CurrentProc]++;
1660   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1661   globalGranStats.tot_sq_probes++;
1662 #elif defined(PAR)
1663   // collect parallel global statistics (currently done together with GC stats)
1664   if (RtsFlags.ParFlags.ParStats.Global &&
1665       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1666     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1667     globalParStats.tot_threads_created++;
1668   }
1669 #endif 
1670
1671 #if defined(GRAN)
1672   IF_GRAN_DEBUG(pri,
1673                 belch("==__ schedule: Created TSO %d (%p);",
1674                       CurrentProc, tso, tso->id));
1675 #elif defined(PAR)
1676     IF_PAR_DEBUG(verbose,
1677                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1678                        tso->id, tso, advisory_thread_count));
1679 #else
1680   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1681                                  tso->id, tso->stack_size));
1682 #endif    
1683   return tso;
1684 }
1685
1686 #if defined(PAR)
1687 /* RFP:
1688    all parallel thread creation calls should fall through the following routine.
1689 */
1690 StgTSO *
1691 createSparkThread(rtsSpark spark) 
1692 { StgTSO *tso;
1693   ASSERT(spark != (rtsSpark)NULL);
1694   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1695   { threadsIgnored++;
1696     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1697           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1698     return END_TSO_QUEUE;
1699   }
1700   else
1701   { threadsCreated++;
1702     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1703     if (tso==END_TSO_QUEUE)     
1704       barf("createSparkThread: Cannot create TSO");
1705 #if defined(DIST)
1706     tso->priority = AdvisoryPriority;
1707 #endif
1708     pushClosure(tso,spark);
1709     PUSH_ON_RUN_QUEUE(tso);
1710     advisory_thread_count++;    
1711   }
1712   return tso;
1713 }
1714 #endif
1715
1716 /*
1717   Turn a spark into a thread.
1718   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1719 */
1720 #if defined(PAR)
1721 //@cindex activateSpark
1722 StgTSO *
1723 activateSpark (rtsSpark spark) 
1724 {
1725   StgTSO *tso;
1726
1727   tso = createSparkThread(spark);
1728   if (RtsFlags.ParFlags.ParStats.Full) {   
1729     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1730     IF_PAR_DEBUG(verbose,
1731                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1732                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1733   }
1734   // ToDo: fwd info on local/global spark to thread -- HWL
1735   // tso->gran.exported =  spark->exported;
1736   // tso->gran.locked =   !spark->global;
1737   // tso->gran.sparkname = spark->name;
1738
1739   return tso;
1740 }
1741 #endif
1742
1743 /* ---------------------------------------------------------------------------
1744  * scheduleThread()
1745  *
1746  * scheduleThread puts a thread on the head of the runnable queue.
1747  * This will usually be done immediately after a thread is created.
1748  * The caller of scheduleThread must create the thread using e.g.
1749  * createThread and push an appropriate closure
1750  * on this thread's stack before the scheduler is invoked.
1751  * ------------------------------------------------------------------------ */
1752
1753 void
1754 scheduleThread(StgTSO *tso)
1755 {
1756   ACQUIRE_LOCK(&sched_mutex);
1757
1758   /* Put the new thread on the head of the runnable queue.  The caller
1759    * better push an appropriate closure on this thread's stack
1760    * beforehand.  In the SMP case, the thread may start running as
1761    * soon as we release the scheduler lock below.
1762    */
1763   PUSH_ON_RUN_QUEUE(tso);
1764   THREAD_RUNNABLE();
1765
1766 #if 0
1767   IF_DEBUG(scheduler,printTSO(tso));
1768 #endif
1769   RELEASE_LOCK(&sched_mutex);
1770 }
1771
1772 /* ---------------------------------------------------------------------------
1773  * startTasks()
1774  *
1775  * Start up Posix threads to run each of the scheduler tasks.
1776  * I believe the task ids are not needed in the system as defined.
1777  *  KH @ 25/10/99
1778  * ------------------------------------------------------------------------ */
1779
1780 #if defined(PAR) || defined(SMP)
1781 void
1782 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1783 {
1784   schedule();
1785 }
1786 #endif
1787
1788 /* ---------------------------------------------------------------------------
1789  * initScheduler()
1790  *
1791  * Initialise the scheduler.  This resets all the queues - if the
1792  * queues contained any threads, they'll be garbage collected at the
1793  * next pass.
1794  *
1795  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1796  * ------------------------------------------------------------------------ */
1797
1798 #ifdef SMP
1799 static void
1800 term_handler(int sig STG_UNUSED)
1801 {
1802   stat_workerStop();
1803   ACQUIRE_LOCK(&term_mutex);
1804   await_death--;
1805   RELEASE_LOCK(&term_mutex);
1806   pthread_exit(NULL);
1807 }
1808 #endif
1809
1810 static void
1811 initCapability( Capability *cap )
1812 {
1813     cap->f.stgChk0         = (F_)__stg_chk_0;
1814     cap->f.stgChk1         = (F_)__stg_chk_1;
1815     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
1816     cap->f.stgUpdatePAP    = (F_)__stg_update_PAP;
1817 }
1818
1819 void 
1820 initScheduler(void)
1821 {
1822 #if defined(GRAN)
1823   nat i;
1824
1825   for (i=0; i<=MAX_PROC; i++) {
1826     run_queue_hds[i]      = END_TSO_QUEUE;
1827     run_queue_tls[i]      = END_TSO_QUEUE;
1828     blocked_queue_hds[i]  = END_TSO_QUEUE;
1829     blocked_queue_tls[i]  = END_TSO_QUEUE;
1830     ccalling_threadss[i]  = END_TSO_QUEUE;
1831     sleeping_queue        = END_TSO_QUEUE;
1832   }
1833 #else
1834   run_queue_hd      = END_TSO_QUEUE;
1835   run_queue_tl      = END_TSO_QUEUE;
1836   blocked_queue_hd  = END_TSO_QUEUE;
1837   blocked_queue_tl  = END_TSO_QUEUE;
1838   sleeping_queue    = END_TSO_QUEUE;
1839 #endif 
1840
1841   suspended_ccalling_threads  = END_TSO_QUEUE;
1842
1843   main_threads = NULL;
1844   all_threads  = END_TSO_QUEUE;
1845
1846   context_switch = 0;
1847   interrupted    = 0;
1848
1849   RtsFlags.ConcFlags.ctxtSwitchTicks =
1850       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1851
1852   /* Install the SIGHUP handler */
1853 #ifdef SMP
1854   {
1855     struct sigaction action,oact;
1856
1857     action.sa_handler = term_handler;
1858     sigemptyset(&action.sa_mask);
1859     action.sa_flags = 0;
1860     if (sigaction(SIGTERM, &action, &oact) != 0) {
1861       barf("can't install TERM handler");
1862     }
1863   }
1864 #endif
1865
1866 #ifdef SMP
1867   /* Allocate N Capabilities */
1868   {
1869     nat i;
1870     Capability *cap, *prev;
1871     cap  = NULL;
1872     prev = NULL;
1873     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1874       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1875       initCapability(cap);
1876       cap->link = prev;
1877       prev = cap;
1878     }
1879     free_capabilities = cap;
1880     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1881   }
1882   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1883                              n_free_capabilities););
1884 #else
1885   initCapability(&MainCapability);
1886 #endif
1887
1888 #if defined(SMP) || defined(PAR)
1889   initSparkPools();
1890 #endif
1891 }
1892
1893 #ifdef SMP
1894 void
1895 startTasks( void )
1896 {
1897   nat i;
1898   int r;
1899   pthread_t tid;
1900   
1901   /* make some space for saving all the thread ids */
1902   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1903                             "initScheduler:task_ids");
1904   
1905   /* and create all the threads */
1906   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1907     r = pthread_create(&tid,NULL,taskStart,NULL);
1908     if (r != 0) {
1909       barf("startTasks: Can't create new Posix thread");
1910     }
1911     task_ids[i].id = tid;
1912     task_ids[i].mut_time = 0.0;
1913     task_ids[i].mut_etime = 0.0;
1914     task_ids[i].gc_time = 0.0;
1915     task_ids[i].gc_etime = 0.0;
1916     task_ids[i].elapsedtimestart = elapsedtime();
1917     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1918   }
1919 }
1920 #endif
1921
1922 void
1923 exitScheduler( void )
1924 {
1925 #ifdef SMP
1926   nat i;
1927
1928   /* Don't want to use pthread_cancel, since we'd have to install
1929    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1930    * all our locks.
1931    */
1932 #if 0
1933   /* Cancel all our tasks */
1934   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1935     pthread_cancel(task_ids[i].id);
1936   }
1937   
1938   /* Wait for all the tasks to terminate */
1939   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1940     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1941                                task_ids[i].id));
1942     pthread_join(task_ids[i].id, NULL);
1943   }
1944 #endif
1945
1946   /* Send 'em all a SIGHUP.  That should shut 'em up.
1947    */
1948   await_death = RtsFlags.ParFlags.nNodes;
1949   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1950     pthread_kill(task_ids[i].id,SIGTERM);
1951   }
1952   while (await_death > 0) {
1953     sched_yield();
1954   }
1955 #endif
1956 }
1957
1958 /* -----------------------------------------------------------------------------
1959    Managing the per-task allocation areas.
1960    
1961    Each capability comes with an allocation area.  These are
1962    fixed-length block lists into which allocation can be done.
1963
1964    ToDo: no support for two-space collection at the moment???
1965    -------------------------------------------------------------------------- */
1966
1967 /* -----------------------------------------------------------------------------
1968  * waitThread is the external interface for running a new computation
1969  * and waiting for the result.
1970  *
1971  * In the non-SMP case, we create a new main thread, push it on the 
1972  * main-thread stack, and invoke the scheduler to run it.  The
1973  * scheduler will return when the top main thread on the stack has
1974  * completed or died, and fill in the necessary fields of the
1975  * main_thread structure.
1976  *
1977  * In the SMP case, we create a main thread as before, but we then
1978  * create a new condition variable and sleep on it.  When our new
1979  * main thread has completed, we'll be woken up and the status/result
1980  * will be in the main_thread struct.
1981  * -------------------------------------------------------------------------- */
1982
1983 int 
1984 howManyThreadsAvail ( void )
1985 {
1986    int i = 0;
1987    StgTSO* q;
1988    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1989       i++;
1990    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1991       i++;
1992    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1993       i++;
1994    return i;
1995 }
1996
1997 void
1998 finishAllThreads ( void )
1999 {
2000    do {
2001       while (run_queue_hd != END_TSO_QUEUE) {
2002          waitThread ( run_queue_hd, NULL );
2003       }
2004       while (blocked_queue_hd != END_TSO_QUEUE) {
2005          waitThread ( blocked_queue_hd, NULL );
2006       }
2007       while (sleeping_queue != END_TSO_QUEUE) {
2008          waitThread ( blocked_queue_hd, NULL );
2009       }
2010    } while 
2011       (blocked_queue_hd != END_TSO_QUEUE || 
2012        run_queue_hd     != END_TSO_QUEUE ||
2013        sleeping_queue   != END_TSO_QUEUE);
2014 }
2015
2016 SchedulerStatus
2017 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2018 {
2019   StgMainThread *m;
2020   SchedulerStatus stat;
2021
2022   ACQUIRE_LOCK(&sched_mutex);
2023   
2024   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2025
2026   m->tso = tso;
2027   m->ret = ret;
2028   m->stat = NoStatus;
2029 #ifdef SMP
2030   pthread_cond_init(&m->wakeup, NULL);
2031 #endif
2032
2033   m->link = main_threads;
2034   main_threads = m;
2035
2036   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2037                               m->tso->id));
2038
2039 #ifdef SMP
2040   do {
2041     pthread_cond_wait(&m->wakeup, &sched_mutex);
2042   } while (m->stat == NoStatus);
2043 #elif defined(GRAN)
2044   /* GranSim specific init */
2045   CurrentTSO = m->tso;                // the TSO to run
2046   procStatus[MainProc] = Busy;        // status of main PE
2047   CurrentProc = MainProc;             // PE to run it on
2048
2049   schedule();
2050 #else
2051   schedule();
2052   ASSERT(m->stat != NoStatus);
2053 #endif
2054
2055   stat = m->stat;
2056
2057 #ifdef SMP
2058   pthread_cond_destroy(&m->wakeup);
2059 #endif
2060
2061   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2062                               m->tso->id));
2063   free(m);
2064
2065   RELEASE_LOCK(&sched_mutex);
2066
2067   return stat;
2068 }
2069
2070 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2071 //@subsection Run queue code 
2072
2073 #if 0
2074 /* 
2075    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2076        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2077        implicit global variable that has to be correct when calling these
2078        fcts -- HWL 
2079 */
2080
2081 /* Put the new thread on the head of the runnable queue.
2082  * The caller of createThread better push an appropriate closure
2083  * on this thread's stack before the scheduler is invoked.
2084  */
2085 static /* inline */ void
2086 add_to_run_queue(tso)
2087 StgTSO* tso; 
2088 {
2089   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2090   tso->link = run_queue_hd;
2091   run_queue_hd = tso;
2092   if (run_queue_tl == END_TSO_QUEUE) {
2093     run_queue_tl = tso;
2094   }
2095 }
2096
2097 /* Put the new thread at the end of the runnable queue. */
2098 static /* inline */ void
2099 push_on_run_queue(tso)
2100 StgTSO* tso; 
2101 {
2102   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2103   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2104   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2105   if (run_queue_hd == END_TSO_QUEUE) {
2106     run_queue_hd = tso;
2107   } else {
2108     run_queue_tl->link = tso;
2109   }
2110   run_queue_tl = tso;
2111 }
2112
2113 /* 
2114    Should be inlined because it's used very often in schedule.  The tso
2115    argument is actually only needed in GranSim, where we want to have the
2116    possibility to schedule *any* TSO on the run queue, irrespective of the
2117    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2118    the run queue and dequeue the tso, adjusting the links in the queue. 
2119 */
2120 //@cindex take_off_run_queue
2121 static /* inline */ StgTSO*
2122 take_off_run_queue(StgTSO *tso) {
2123   StgTSO *t, *prev;
2124
2125   /* 
2126      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2127
2128      if tso is specified, unlink that tso from the run_queue (doesn't have
2129      to be at the beginning of the queue); GranSim only 
2130   */
2131   if (tso!=END_TSO_QUEUE) {
2132     /* find tso in queue */
2133     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2134          t!=END_TSO_QUEUE && t!=tso;
2135          prev=t, t=t->link) 
2136       /* nothing */ ;
2137     ASSERT(t==tso);
2138     /* now actually dequeue the tso */
2139     if (prev!=END_TSO_QUEUE) {
2140       ASSERT(run_queue_hd!=t);
2141       prev->link = t->link;
2142     } else {
2143       /* t is at beginning of thread queue */
2144       ASSERT(run_queue_hd==t);
2145       run_queue_hd = t->link;
2146     }
2147     /* t is at end of thread queue */
2148     if (t->link==END_TSO_QUEUE) {
2149       ASSERT(t==run_queue_tl);
2150       run_queue_tl = prev;
2151     } else {
2152       ASSERT(run_queue_tl!=t);
2153     }
2154     t->link = END_TSO_QUEUE;
2155   } else {
2156     /* take tso from the beginning of the queue; std concurrent code */
2157     t = run_queue_hd;
2158     if (t != END_TSO_QUEUE) {
2159       run_queue_hd = t->link;
2160       t->link = END_TSO_QUEUE;
2161       if (run_queue_hd == END_TSO_QUEUE) {
2162         run_queue_tl = END_TSO_QUEUE;
2163       }
2164     }
2165   }
2166   return t;
2167 }
2168
2169 #endif /* 0 */
2170
2171 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2172 //@subsection Garbage Collextion Routines
2173
2174 /* ---------------------------------------------------------------------------
2175    Where are the roots that we know about?
2176
2177         - all the threads on the runnable queue
2178         - all the threads on the blocked queue
2179         - all the threads on the sleeping queue
2180         - all the thread currently executing a _ccall_GC
2181         - all the "main threads"
2182      
2183    ------------------------------------------------------------------------ */
2184
2185 /* This has to be protected either by the scheduler monitor, or by the
2186         garbage collection monitor (probably the latter).
2187         KH @ 25/10/99
2188 */
2189
2190 void
2191 GetRoots(evac_fn evac)
2192 {
2193   StgMainThread *m;
2194
2195 #if defined(GRAN)
2196   {
2197     nat i;
2198     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2199       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2200           evac((StgClosure **)&run_queue_hds[i]);
2201       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2202           evac((StgClosure **)&run_queue_tls[i]);
2203       
2204       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2205           evac((StgClosure **)&blocked_queue_hds[i]);
2206       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2207           evac((StgClosure **)&blocked_queue_tls[i]);
2208       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2209           evac((StgClosure **)&ccalling_threads[i]);
2210     }
2211   }
2212
2213   markEventQueue();
2214
2215 #else /* !GRAN */
2216   if (run_queue_hd != END_TSO_QUEUE) {
2217       ASSERT(run_queue_tl != END_TSO_QUEUE);
2218       evac((StgClosure **)&run_queue_hd);
2219       evac((StgClosure **)&run_queue_tl);
2220   }
2221   
2222   if (blocked_queue_hd != END_TSO_QUEUE) {
2223       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2224       evac((StgClosure **)&blocked_queue_hd);
2225       evac((StgClosure **)&blocked_queue_tl);
2226   }
2227   
2228   if (sleeping_queue != END_TSO_QUEUE) {
2229       evac((StgClosure **)&sleeping_queue);
2230   }
2231 #endif 
2232
2233   for (m = main_threads; m != NULL; m = m->link) {
2234       evac((StgClosure **)&m->tso);
2235   }
2236   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2237       evac((StgClosure **)&suspended_ccalling_threads);
2238   }
2239
2240 #if defined(SMP) || defined(PAR) || defined(GRAN)
2241   markSparkQueue(evac);
2242 #endif
2243 }
2244
2245 /* -----------------------------------------------------------------------------
2246    performGC
2247
2248    This is the interface to the garbage collector from Haskell land.
2249    We provide this so that external C code can allocate and garbage
2250    collect when called from Haskell via _ccall_GC.
2251
2252    It might be useful to provide an interface whereby the programmer
2253    can specify more roots (ToDo).
2254    
2255    This needs to be protected by the GC condition variable above.  KH.
2256    -------------------------------------------------------------------------- */
2257
2258 void (*extra_roots)(evac_fn);
2259
2260 void
2261 performGC(void)
2262 {
2263   GarbageCollect(GetRoots,rtsFalse);
2264 }
2265
2266 void
2267 performMajorGC(void)
2268 {
2269   GarbageCollect(GetRoots,rtsTrue);
2270 }
2271
2272 static void
2273 AllRoots(evac_fn evac)
2274 {
2275     GetRoots(evac);             // the scheduler's roots
2276     extra_roots(evac);          // the user's roots
2277 }
2278
2279 void
2280 performGCWithRoots(void (*get_roots)(evac_fn))
2281 {
2282   extra_roots = get_roots;
2283   GarbageCollect(AllRoots,rtsFalse);
2284 }
2285
2286 /* -----------------------------------------------------------------------------
2287    Stack overflow
2288
2289    If the thread has reached its maximum stack size, then raise the
2290    StackOverflow exception in the offending thread.  Otherwise
2291    relocate the TSO into a larger chunk of memory and adjust its stack
2292    size appropriately.
2293    -------------------------------------------------------------------------- */
2294
2295 static StgTSO *
2296 threadStackOverflow(StgTSO *tso)
2297 {
2298   nat new_stack_size, new_tso_size, diff, stack_words;
2299   StgPtr new_sp;
2300   StgTSO *dest;
2301
2302   IF_DEBUG(sanity,checkTSO(tso));
2303   if (tso->stack_size >= tso->max_stack_size) {
2304
2305     IF_DEBUG(gc,
2306              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2307                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2308              /* If we're debugging, just print out the top of the stack */
2309              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2310                                               tso->sp+64)));
2311
2312     /* Send this thread the StackOverflow exception */
2313     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2314     return tso;
2315   }
2316
2317   /* Try to double the current stack size.  If that takes us over the
2318    * maximum stack size for this thread, then use the maximum instead.
2319    * Finally round up so the TSO ends up as a whole number of blocks.
2320    */
2321   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2322   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2323                                        TSO_STRUCT_SIZE)/sizeof(W_);
2324   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2325   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2326
2327   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2328
2329   dest = (StgTSO *)allocate(new_tso_size);
2330   TICK_ALLOC_TSO(new_stack_size,0);
2331
2332   /* copy the TSO block and the old stack into the new area */
2333   memcpy(dest,tso,TSO_STRUCT_SIZE);
2334   stack_words = tso->stack + tso->stack_size - tso->sp;
2335   new_sp = (P_)dest + new_tso_size - stack_words;
2336   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2337
2338   /* relocate the stack pointers... */
2339   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2340   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2341   dest->sp    = new_sp;
2342   dest->stack_size = new_stack_size;
2343         
2344   /* and relocate the update frame list */
2345   relocate_stack(dest, diff);
2346
2347   /* Mark the old TSO as relocated.  We have to check for relocated
2348    * TSOs in the garbage collector and any primops that deal with TSOs.
2349    *
2350    * It's important to set the sp and su values to just beyond the end
2351    * of the stack, so we don't attempt to scavenge any part of the
2352    * dead TSO's stack.
2353    */
2354   tso->what_next = ThreadRelocated;
2355   tso->link = dest;
2356   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2357   tso->su = (StgUpdateFrame *)tso->sp;
2358   tso->why_blocked = NotBlocked;
2359   dest->mut_link = NULL;
2360
2361   IF_PAR_DEBUG(verbose,
2362                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2363                      tso->id, tso, tso->stack_size);
2364                /* If we're debugging, just print out the top of the stack */
2365                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2366                                                 tso->sp+64)));
2367   
2368   IF_DEBUG(sanity,checkTSO(tso));
2369 #if 0
2370   IF_DEBUG(scheduler,printTSO(dest));
2371 #endif
2372
2373   return dest;
2374 }
2375
2376 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2377 //@subsection Blocking Queue Routines
2378
2379 /* ---------------------------------------------------------------------------
2380    Wake up a queue that was blocked on some resource.
2381    ------------------------------------------------------------------------ */
2382
2383 #if defined(GRAN)
2384 static inline void
2385 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2386 {
2387 }
2388 #elif defined(PAR)
2389 static inline void
2390 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2391 {
2392   /* write RESUME events to log file and
2393      update blocked and fetch time (depending on type of the orig closure) */
2394   if (RtsFlags.ParFlags.ParStats.Full) {
2395     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2396                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2397                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2398     if (EMPTY_RUN_QUEUE())
2399       emitSchedule = rtsTrue;
2400
2401     switch (get_itbl(node)->type) {
2402         case FETCH_ME_BQ:
2403           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2404           break;
2405         case RBH:
2406         case FETCH_ME:
2407         case BLACKHOLE_BQ:
2408           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2409           break;
2410 #ifdef DIST
2411         case MVAR:
2412           break;
2413 #endif    
2414         default:
2415           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2416         }
2417       }
2418 }
2419 #endif
2420
2421 #if defined(GRAN)
2422 static StgBlockingQueueElement *
2423 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2424 {
2425     StgTSO *tso;
2426     PEs node_loc, tso_loc;
2427
2428     node_loc = where_is(node); // should be lifted out of loop
2429     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2430     tso_loc = where_is((StgClosure *)tso);
2431     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2432       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2433       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2434       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2435       // insertThread(tso, node_loc);
2436       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2437                 ResumeThread,
2438                 tso, node, (rtsSpark*)NULL);
2439       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2440       // len_local++;
2441       // len++;
2442     } else { // TSO is remote (actually should be FMBQ)
2443       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2444                                   RtsFlags.GranFlags.Costs.gunblocktime +
2445                                   RtsFlags.GranFlags.Costs.latency;
2446       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2447                 UnblockThread,
2448                 tso, node, (rtsSpark*)NULL);
2449       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2450       // len++;
2451     }
2452     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2453     IF_GRAN_DEBUG(bq,
2454                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2455                           (node_loc==tso_loc ? "Local" : "Global"), 
2456                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2457     tso->block_info.closure = NULL;
2458     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2459                              tso->id, tso));
2460 }
2461 #elif defined(PAR)
2462 static StgBlockingQueueElement *
2463 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2464 {
2465     StgBlockingQueueElement *next;
2466
2467     switch (get_itbl(bqe)->type) {
2468     case TSO:
2469       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2470       /* if it's a TSO just push it onto the run_queue */
2471       next = bqe->link;
2472       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2473       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2474       THREAD_RUNNABLE();
2475       unblockCount(bqe, node);
2476       /* reset blocking status after dumping event */
2477       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2478       break;
2479
2480     case BLOCKED_FETCH:
2481       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2482       next = bqe->link;
2483       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2484       PendingFetches = (StgBlockedFetch *)bqe;
2485       break;
2486
2487 # if defined(DEBUG)
2488       /* can ignore this case in a non-debugging setup; 
2489          see comments on RBHSave closures above */
2490     case CONSTR:
2491       /* check that the closure is an RBHSave closure */
2492       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2493              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2494              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2495       break;
2496
2497     default:
2498       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2499            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2500            (StgClosure *)bqe);
2501 # endif
2502     }
2503   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2504   return next;
2505 }
2506
2507 #else /* !GRAN && !PAR */
2508 static StgTSO *
2509 unblockOneLocked(StgTSO *tso)
2510 {
2511   StgTSO *next;
2512
2513   ASSERT(get_itbl(tso)->type == TSO);
2514   ASSERT(tso->why_blocked != NotBlocked);
2515   tso->why_blocked = NotBlocked;
2516   next = tso->link;
2517   PUSH_ON_RUN_QUEUE(tso);
2518   THREAD_RUNNABLE();
2519   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2520   return next;
2521 }
2522 #endif
2523
2524 #if defined(GRAN) || defined(PAR)
2525 inline StgBlockingQueueElement *
2526 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2527 {
2528   ACQUIRE_LOCK(&sched_mutex);
2529   bqe = unblockOneLocked(bqe, node);
2530   RELEASE_LOCK(&sched_mutex);
2531   return bqe;
2532 }
2533 #else
2534 inline StgTSO *
2535 unblockOne(StgTSO *tso)
2536 {
2537   ACQUIRE_LOCK(&sched_mutex);
2538   tso = unblockOneLocked(tso);
2539   RELEASE_LOCK(&sched_mutex);
2540   return tso;
2541 }
2542 #endif
2543
2544 #if defined(GRAN)
2545 void 
2546 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2547 {
2548   StgBlockingQueueElement *bqe;
2549   PEs node_loc;
2550   nat len = 0; 
2551
2552   IF_GRAN_DEBUG(bq, 
2553                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2554                       node, CurrentProc, CurrentTime[CurrentProc], 
2555                       CurrentTSO->id, CurrentTSO));
2556
2557   node_loc = where_is(node);
2558
2559   ASSERT(q == END_BQ_QUEUE ||
2560          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2561          get_itbl(q)->type == CONSTR); // closure (type constructor)
2562   ASSERT(is_unique(node));
2563
2564   /* FAKE FETCH: magically copy the node to the tso's proc;
2565      no Fetch necessary because in reality the node should not have been 
2566      moved to the other PE in the first place
2567   */
2568   if (CurrentProc!=node_loc) {
2569     IF_GRAN_DEBUG(bq, 
2570                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2571                         node, node_loc, CurrentProc, CurrentTSO->id, 
2572                         // CurrentTSO, where_is(CurrentTSO),
2573                         node->header.gran.procs));
2574     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2575     IF_GRAN_DEBUG(bq, 
2576                   belch("## new bitmask of node %p is %#x",
2577                         node, node->header.gran.procs));
2578     if (RtsFlags.GranFlags.GranSimStats.Global) {
2579       globalGranStats.tot_fake_fetches++;
2580     }
2581   }
2582
2583   bqe = q;
2584   // ToDo: check: ASSERT(CurrentProc==node_loc);
2585   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2586     //next = bqe->link;
2587     /* 
2588        bqe points to the current element in the queue
2589        next points to the next element in the queue
2590     */
2591     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2592     //tso_loc = where_is(tso);
2593     len++;
2594     bqe = unblockOneLocked(bqe, node);
2595   }
2596
2597   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2598      the closure to make room for the anchor of the BQ */
2599   if (bqe!=END_BQ_QUEUE) {
2600     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2601     /*
2602     ASSERT((info_ptr==&RBH_Save_0_info) ||
2603            (info_ptr==&RBH_Save_1_info) ||
2604            (info_ptr==&RBH_Save_2_info));
2605     */
2606     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2607     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2608     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2609
2610     IF_GRAN_DEBUG(bq,
2611                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2612                         node, info_type(node)));
2613   }
2614
2615   /* statistics gathering */
2616   if (RtsFlags.GranFlags.GranSimStats.Global) {
2617     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2618     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2619     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2620     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2621   }
2622   IF_GRAN_DEBUG(bq,
2623                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2624                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2625 }
2626 #elif defined(PAR)
2627 void 
2628 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2629 {
2630   StgBlockingQueueElement *bqe;
2631
2632   ACQUIRE_LOCK(&sched_mutex);
2633
2634   IF_PAR_DEBUG(verbose, 
2635                belch("##-_ AwBQ for node %p on [%x]: ",
2636                      node, mytid));
2637 #ifdef DIST  
2638   //RFP
2639   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2640     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2641     return;
2642   }
2643 #endif
2644   
2645   ASSERT(q == END_BQ_QUEUE ||
2646          get_itbl(q)->type == TSO ||           
2647          get_itbl(q)->type == BLOCKED_FETCH || 
2648          get_itbl(q)->type == CONSTR); 
2649
2650   bqe = q;
2651   while (get_itbl(bqe)->type==TSO || 
2652          get_itbl(bqe)->type==BLOCKED_FETCH) {
2653     bqe = unblockOneLocked(bqe, node);
2654   }
2655   RELEASE_LOCK(&sched_mutex);
2656 }
2657
2658 #else   /* !GRAN && !PAR */
2659 void
2660 awakenBlockedQueue(StgTSO *tso)
2661 {
2662   ACQUIRE_LOCK(&sched_mutex);
2663   while (tso != END_TSO_QUEUE) {
2664     tso = unblockOneLocked(tso);
2665   }
2666   RELEASE_LOCK(&sched_mutex);
2667 }
2668 #endif
2669
2670 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2671 //@subsection Exception Handling Routines
2672
2673 /* ---------------------------------------------------------------------------
2674    Interrupt execution
2675    - usually called inside a signal handler so it mustn't do anything fancy.   
2676    ------------------------------------------------------------------------ */
2677
2678 void
2679 interruptStgRts(void)
2680 {
2681     interrupted    = 1;
2682     context_switch = 1;
2683 }
2684
2685 /* -----------------------------------------------------------------------------
2686    Unblock a thread
2687
2688    This is for use when we raise an exception in another thread, which
2689    may be blocked.
2690    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2691    -------------------------------------------------------------------------- */
2692
2693 #if defined(GRAN) || defined(PAR)
2694 /*
2695   NB: only the type of the blocking queue is different in GranSim and GUM
2696       the operations on the queue-elements are the same
2697       long live polymorphism!
2698 */
2699 static void
2700 unblockThread(StgTSO *tso)
2701 {
2702   StgBlockingQueueElement *t, **last;
2703
2704   ACQUIRE_LOCK(&sched_mutex);
2705   switch (tso->why_blocked) {
2706
2707   case NotBlocked:
2708     return;  /* not blocked */
2709
2710   case BlockedOnMVar:
2711     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2712     {
2713       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2714       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2715
2716       last = (StgBlockingQueueElement **)&mvar->head;
2717       for (t = (StgBlockingQueueElement *)mvar->head; 
2718            t != END_BQ_QUEUE; 
2719            last = &t->link, last_tso = t, t = t->link) {
2720         if (t == (StgBlockingQueueElement *)tso) {
2721           *last = (StgBlockingQueueElement *)tso->link;
2722           if (mvar->tail == tso) {
2723             mvar->tail = (StgTSO *)last_tso;
2724           }
2725           goto done;
2726         }
2727       }
2728       barf("unblockThread (MVAR): TSO not found");
2729     }
2730
2731   case BlockedOnBlackHole:
2732     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2733     {
2734       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2735
2736       last = &bq->blocking_queue;
2737       for (t = bq->blocking_queue; 
2738            t != END_BQ_QUEUE; 
2739            last = &t->link, t = t->link) {
2740         if (t == (StgBlockingQueueElement *)tso) {
2741           *last = (StgBlockingQueueElement *)tso->link;
2742           goto done;
2743         }
2744       }
2745       barf("unblockThread (BLACKHOLE): TSO not found");
2746     }
2747
2748   case BlockedOnException:
2749     {
2750       StgTSO *target  = tso->block_info.tso;
2751
2752       ASSERT(get_itbl(target)->type == TSO);
2753
2754       if (target->what_next == ThreadRelocated) {
2755           target = target->link;
2756           ASSERT(get_itbl(target)->type == TSO);
2757       }
2758
2759       ASSERT(target->blocked_exceptions != NULL);
2760
2761       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2762       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2763            t != END_BQ_QUEUE; 
2764            last = &t->link, t = t->link) {
2765         ASSERT(get_itbl(t)->type == TSO);
2766         if (t == (StgBlockingQueueElement *)tso) {
2767           *last = (StgBlockingQueueElement *)tso->link;
2768           goto done;
2769         }
2770       }
2771       barf("unblockThread (Exception): TSO not found");
2772     }
2773
2774   case BlockedOnRead:
2775   case BlockedOnWrite:
2776     {
2777       /* take TSO off blocked_queue */
2778       StgBlockingQueueElement *prev = NULL;
2779       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2780            prev = t, t = t->link) {
2781         if (t == (StgBlockingQueueElement *)tso) {
2782           if (prev == NULL) {
2783             blocked_queue_hd = (StgTSO *)t->link;
2784             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2785               blocked_queue_tl = END_TSO_QUEUE;
2786             }
2787           } else {
2788             prev->link = t->link;
2789             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2790               blocked_queue_tl = (StgTSO *)prev;
2791             }
2792           }
2793           goto done;
2794         }
2795       }
2796       barf("unblockThread (I/O): TSO not found");
2797     }
2798
2799   case BlockedOnDelay:
2800     {
2801       /* take TSO off sleeping_queue */
2802       StgBlockingQueueElement *prev = NULL;
2803       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2804            prev = t, t = t->link) {
2805         if (t == (StgBlockingQueueElement *)tso) {
2806           if (prev == NULL) {
2807             sleeping_queue = (StgTSO *)t->link;
2808           } else {
2809             prev->link = t->link;
2810           }
2811           goto done;
2812         }
2813       }
2814       barf("unblockThread (I/O): TSO not found");
2815     }
2816
2817   default:
2818     barf("unblockThread");
2819   }
2820
2821  done:
2822   tso->link = END_TSO_QUEUE;
2823   tso->why_blocked = NotBlocked;
2824   tso->block_info.closure = NULL;
2825   PUSH_ON_RUN_QUEUE(tso);
2826   RELEASE_LOCK(&sched_mutex);
2827 }
2828 #else
2829 static void
2830 unblockThread(StgTSO *tso)
2831 {
2832   StgTSO *t, **last;
2833
2834   ACQUIRE_LOCK(&sched_mutex);
2835   switch (tso->why_blocked) {
2836
2837   case NotBlocked:
2838     return;  /* not blocked */
2839
2840   case BlockedOnMVar:
2841     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2842     {
2843       StgTSO *last_tso = END_TSO_QUEUE;
2844       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2845
2846       last = &mvar->head;
2847       for (t = mvar->head; t != END_TSO_QUEUE; 
2848            last = &t->link, last_tso = t, t = t->link) {
2849         if (t == tso) {
2850           *last = tso->link;
2851           if (mvar->tail == tso) {
2852             mvar->tail = last_tso;
2853           }
2854           goto done;
2855         }
2856       }
2857       barf("unblockThread (MVAR): TSO not found");
2858     }
2859
2860   case BlockedOnBlackHole:
2861     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2862     {
2863       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2864
2865       last = &bq->blocking_queue;
2866       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2867            last = &t->link, t = t->link) {
2868         if (t == tso) {
2869           *last = tso->link;
2870           goto done;
2871         }
2872       }
2873       barf("unblockThread (BLACKHOLE): TSO not found");
2874     }
2875
2876   case BlockedOnException:
2877     {
2878       StgTSO *target  = tso->block_info.tso;
2879
2880       ASSERT(get_itbl(target)->type == TSO);
2881
2882       while (target->what_next == ThreadRelocated) {
2883           target = target->link;
2884           ASSERT(get_itbl(target)->type == TSO);
2885       }
2886       
2887       ASSERT(target->blocked_exceptions != NULL);
2888
2889       last = &target->blocked_exceptions;
2890       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2891            last = &t->link, t = t->link) {
2892         ASSERT(get_itbl(t)->type == TSO);
2893         if (t == tso) {
2894           *last = tso->link;
2895           goto done;
2896         }
2897       }
2898       barf("unblockThread (Exception): TSO not found");
2899     }
2900
2901   case BlockedOnRead:
2902   case BlockedOnWrite:
2903     {
2904       StgTSO *prev = NULL;
2905       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2906            prev = t, t = t->link) {
2907         if (t == tso) {
2908           if (prev == NULL) {
2909             blocked_queue_hd = t->link;
2910             if (blocked_queue_tl == t) {
2911               blocked_queue_tl = END_TSO_QUEUE;
2912             }
2913           } else {
2914             prev->link = t->link;
2915             if (blocked_queue_tl == t) {
2916               blocked_queue_tl = prev;
2917             }
2918           }
2919           goto done;
2920         }
2921       }
2922       barf("unblockThread (I/O): TSO not found");
2923     }
2924
2925   case BlockedOnDelay:
2926     {
2927       StgTSO *prev = NULL;
2928       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2929            prev = t, t = t->link) {
2930         if (t == tso) {
2931           if (prev == NULL) {
2932             sleeping_queue = t->link;
2933           } else {
2934             prev->link = t->link;
2935           }
2936           goto done;
2937         }
2938       }
2939       barf("unblockThread (I/O): TSO not found");
2940     }
2941
2942   default:
2943     barf("unblockThread");
2944   }
2945
2946  done:
2947   tso->link = END_TSO_QUEUE;
2948   tso->why_blocked = NotBlocked;
2949   tso->block_info.closure = NULL;
2950   PUSH_ON_RUN_QUEUE(tso);
2951   RELEASE_LOCK(&sched_mutex);
2952 }
2953 #endif
2954
2955 /* -----------------------------------------------------------------------------
2956  * raiseAsync()
2957  *
2958  * The following function implements the magic for raising an
2959  * asynchronous exception in an existing thread.
2960  *
2961  * We first remove the thread from any queue on which it might be
2962  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2963  *
2964  * We strip the stack down to the innermost CATCH_FRAME, building
2965  * thunks in the heap for all the active computations, so they can 
2966  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2967  * an application of the handler to the exception, and push it on
2968  * the top of the stack.
2969  * 
2970  * How exactly do we save all the active computations?  We create an
2971  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2972  * AP_UPDs pushes everything from the corresponding update frame
2973  * upwards onto the stack.  (Actually, it pushes everything up to the
2974  * next update frame plus a pointer to the next AP_UPD object.
2975  * Entering the next AP_UPD object pushes more onto the stack until we
2976  * reach the last AP_UPD object - at which point the stack should look
2977  * exactly as it did when we killed the TSO and we can continue
2978  * execution by entering the closure on top of the stack.
2979  *
2980  * We can also kill a thread entirely - this happens if either (a) the 
2981  * exception passed to raiseAsync is NULL, or (b) there's no
2982  * CATCH_FRAME on the stack.  In either case, we strip the entire
2983  * stack and replace the thread with a zombie.
2984  *
2985  * -------------------------------------------------------------------------- */
2986  
2987 void 
2988 deleteThread(StgTSO *tso)
2989 {
2990   raiseAsync(tso,NULL);
2991 }
2992
2993 void
2994 raiseAsync(StgTSO *tso, StgClosure *exception)
2995 {
2996   StgUpdateFrame* su = tso->su;
2997   StgPtr          sp = tso->sp;
2998   
2999   /* Thread already dead? */
3000   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3001     return;
3002   }
3003
3004   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3005
3006   /* Remove it from any blocking queues */
3007   unblockThread(tso);
3008
3009   /* The stack freezing code assumes there's a closure pointer on
3010    * the top of the stack.  This isn't always the case with compiled
3011    * code, so we have to push a dummy closure on the top which just
3012    * returns to the next return address on the stack.
3013    */
3014   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3015     *(--sp) = (W_)&stg_dummy_ret_closure;
3016   }
3017
3018   while (1) {
3019     nat words = ((P_)su - (P_)sp) - 1;
3020     nat i;
3021     StgAP_UPD * ap;
3022
3023     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3024      * then build PAP(handler,exception,realworld#), and leave it on
3025      * top of the stack ready to enter.
3026      */
3027     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3028       StgCatchFrame *cf = (StgCatchFrame *)su;
3029       /* we've got an exception to raise, so let's pass it to the
3030        * handler in this frame.
3031        */
3032       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3033       TICK_ALLOC_UPD_PAP(3,0);
3034       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3035               
3036       ap->n_args = 2;
3037       ap->fun = cf->handler;    /* :: Exception -> IO a */
3038       ap->payload[0] = exception;
3039       ap->payload[1] = ARG_TAG(0); /* realworld token */
3040
3041       /* throw away the stack from Sp up to and including the
3042        * CATCH_FRAME.
3043        */
3044       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3045       tso->su = cf->link;
3046
3047       /* Restore the blocked/unblocked state for asynchronous exceptions
3048        * at the CATCH_FRAME.  
3049        *
3050        * If exceptions were unblocked at the catch, arrange that they
3051        * are unblocked again after executing the handler by pushing an
3052        * unblockAsyncExceptions_ret stack frame.
3053        */
3054       if (!cf->exceptions_blocked) {
3055         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3056       }
3057       
3058       /* Ensure that async exceptions are blocked when running the handler.
3059        */
3060       if (tso->blocked_exceptions == NULL) {
3061         tso->blocked_exceptions = END_TSO_QUEUE;
3062       }
3063       
3064       /* Put the newly-built PAP on top of the stack, ready to execute
3065        * when the thread restarts.
3066        */
3067       sp[0] = (W_)ap;
3068       tso->sp = sp;
3069       tso->what_next = ThreadEnterGHC;
3070       IF_DEBUG(sanity, checkTSO(tso));
3071       return;
3072     }
3073
3074     /* First build an AP_UPD consisting of the stack chunk above the
3075      * current update frame, with the top word on the stack as the
3076      * fun field.
3077      */
3078     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3079     
3080     ASSERT(words >= 0);
3081     
3082     ap->n_args = words;
3083     ap->fun    = (StgClosure *)sp[0];
3084     sp++;
3085     for(i=0; i < (nat)words; ++i) {
3086       ap->payload[i] = (StgClosure *)*sp++;
3087     }
3088     
3089     switch (get_itbl(su)->type) {
3090       
3091     case UPDATE_FRAME:
3092       {
3093         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3094         TICK_ALLOC_UP_THK(words+1,0);
3095         
3096         IF_DEBUG(scheduler,
3097                  fprintf(stderr,  "scheduler: Updating ");
3098                  printPtr((P_)su->updatee); 
3099                  fprintf(stderr,  " with ");
3100                  printObj((StgClosure *)ap);
3101                  );
3102         
3103         /* Replace the updatee with an indirection - happily
3104          * this will also wake up any threads currently
3105          * waiting on the result.
3106          *
3107          * Warning: if we're in a loop, more than one update frame on
3108          * the stack may point to the same object.  Be careful not to
3109          * overwrite an IND_OLDGEN in this case, because we'll screw
3110          * up the mutable lists.  To be on the safe side, don't
3111          * overwrite any kind of indirection at all.  See also
3112          * threadSqueezeStack in GC.c, where we have to make a similar
3113          * check.
3114          */
3115         if (!closure_IND(su->updatee)) {
3116             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3117         }
3118         su = su->link;
3119         sp += sizeofW(StgUpdateFrame) -1;
3120         sp[0] = (W_)ap; /* push onto stack */
3121         break;
3122       }
3123
3124     case CATCH_FRAME:
3125       {
3126         StgCatchFrame *cf = (StgCatchFrame *)su;
3127         StgClosure* o;
3128         
3129         /* We want a PAP, not an AP_UPD.  Fortunately, the
3130          * layout's the same.
3131          */
3132         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3133         TICK_ALLOC_UPD_PAP(words+1,0);
3134         
3135         /* now build o = FUN(catch,ap,handler) */
3136         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3137         TICK_ALLOC_FUN(2,0);
3138         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3139         o->payload[0] = (StgClosure *)ap;
3140         o->payload[1] = cf->handler;
3141         
3142         IF_DEBUG(scheduler,
3143                  fprintf(stderr,  "scheduler: Built ");
3144                  printObj((StgClosure *)o);
3145                  );
3146         
3147         /* pop the old handler and put o on the stack */
3148         su = cf->link;
3149         sp += sizeofW(StgCatchFrame) - 1;
3150         sp[0] = (W_)o;
3151         break;
3152       }
3153       
3154     case SEQ_FRAME:
3155       {
3156         StgSeqFrame *sf = (StgSeqFrame *)su;
3157         StgClosure* o;
3158         
3159         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3160         TICK_ALLOC_UPD_PAP(words+1,0);
3161         
3162         /* now build o = FUN(seq,ap) */
3163         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3164         TICK_ALLOC_SE_THK(1,0);
3165         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3166         o->payload[0] = (StgClosure *)ap;
3167         
3168         IF_DEBUG(scheduler,
3169                  fprintf(stderr,  "scheduler: Built ");
3170                  printObj((StgClosure *)o);
3171                  );
3172         
3173         /* pop the old handler and put o on the stack */
3174         su = sf->link;
3175         sp += sizeofW(StgSeqFrame) - 1;
3176         sp[0] = (W_)o;
3177         break;
3178       }
3179       
3180     case STOP_FRAME:
3181       /* We've stripped the entire stack, the thread is now dead. */
3182       sp += sizeofW(StgStopFrame) - 1;
3183       sp[0] = (W_)exception;    /* save the exception */
3184       tso->what_next = ThreadKilled;
3185       tso->su = (StgUpdateFrame *)(sp+1);
3186       tso->sp = sp;
3187       return;
3188
3189     default:
3190       barf("raiseAsync");
3191     }
3192   }
3193   barf("raiseAsync");
3194 }
3195
3196 /* -----------------------------------------------------------------------------
3197    resurrectThreads is called after garbage collection on the list of
3198    threads found to be garbage.  Each of these threads will be woken
3199    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3200    on an MVar, or NonTermination if the thread was blocked on a Black
3201    Hole.
3202    -------------------------------------------------------------------------- */
3203
3204 void
3205 resurrectThreads( StgTSO *threads )
3206 {
3207   StgTSO *tso, *next;
3208
3209   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3210     next = tso->global_link;
3211     tso->global_link = all_threads;
3212     all_threads = tso;
3213     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3214
3215     switch (tso->why_blocked) {
3216     case BlockedOnMVar:
3217     case BlockedOnException:
3218       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3219       break;
3220     case BlockedOnBlackHole:
3221       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3222       break;
3223     case NotBlocked:
3224       /* This might happen if the thread was blocked on a black hole
3225        * belonging to a thread that we've just woken up (raiseAsync
3226        * can wake up threads, remember...).
3227        */
3228       continue;
3229     default:
3230       barf("resurrectThreads: thread blocked in a strange way");
3231     }
3232   }
3233 }
3234
3235 /* -----------------------------------------------------------------------------
3236  * Blackhole detection: if we reach a deadlock, test whether any
3237  * threads are blocked on themselves.  Any threads which are found to
3238  * be self-blocked get sent a NonTermination exception.
3239  *
3240  * This is only done in a deadlock situation in order to avoid
3241  * performance overhead in the normal case.
3242  * -------------------------------------------------------------------------- */
3243
3244 static void
3245 detectBlackHoles( void )
3246 {
3247     StgTSO *t = all_threads;
3248     StgUpdateFrame *frame;
3249     StgClosure *blocked_on;
3250
3251     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3252
3253         while (t->what_next == ThreadRelocated) {
3254             t = t->link;
3255             ASSERT(get_itbl(t)->type == TSO);
3256         }
3257       
3258         if (t->why_blocked != BlockedOnBlackHole) {
3259             continue;
3260         }
3261
3262         blocked_on = t->block_info.closure;
3263
3264         for (frame = t->su; ; frame = frame->link) {
3265             switch (get_itbl(frame)->type) {
3266
3267             case UPDATE_FRAME:
3268                 if (frame->updatee == blocked_on) {
3269                     /* We are blocking on one of our own computations, so
3270                      * send this thread the NonTermination exception.  
3271                      */
3272                     IF_DEBUG(scheduler, 
3273                              sched_belch("thread %d is blocked on itself", t->id));
3274                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3275                     goto done;
3276                 }
3277                 else {
3278                     continue;
3279                 }
3280
3281             case CATCH_FRAME:
3282             case SEQ_FRAME:
3283                 continue;
3284                 
3285             case STOP_FRAME:
3286                 break;
3287             }
3288             break;
3289         }
3290
3291     done: ;
3292     }   
3293 }
3294
3295 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3296 //@subsection Debugging Routines
3297
3298 /* -----------------------------------------------------------------------------
3299    Debugging: why is a thread blocked
3300    -------------------------------------------------------------------------- */
3301
3302 #ifdef DEBUG
3303
3304 void
3305 printThreadBlockage(StgTSO *tso)
3306 {
3307   switch (tso->why_blocked) {
3308   case BlockedOnRead:
3309     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3310     break;
3311   case BlockedOnWrite:
3312     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3313     break;
3314   case BlockedOnDelay:
3315     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3316     break;
3317   case BlockedOnMVar:
3318     fprintf(stderr,"is blocked on an MVar");
3319     break;
3320   case BlockedOnException:
3321     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3322             tso->block_info.tso->id);
3323     break;
3324   case BlockedOnBlackHole:
3325     fprintf(stderr,"is blocked on a black hole");
3326     break;
3327   case NotBlocked:
3328     fprintf(stderr,"is not blocked");
3329     break;
3330 #if defined(PAR)
3331   case BlockedOnGA:
3332     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3333             tso->block_info.closure, info_type(tso->block_info.closure));
3334     break;
3335   case BlockedOnGA_NoSend:
3336     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3337             tso->block_info.closure, info_type(tso->block_info.closure));
3338     break;
3339 #endif
3340   default:
3341     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3342          tso->why_blocked, tso->id, tso);
3343   }
3344 }
3345
3346 void
3347 printThreadStatus(StgTSO *tso)
3348 {
3349   switch (tso->what_next) {
3350   case ThreadKilled:
3351     fprintf(stderr,"has been killed");
3352     break;
3353   case ThreadComplete:
3354     fprintf(stderr,"has completed");
3355     break;
3356   default:
3357     printThreadBlockage(tso);
3358   }
3359 }
3360
3361 void
3362 printAllThreads(void)
3363 {
3364   StgTSO *t;
3365
3366 # if defined(GRAN)
3367   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3368   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3369                        time_string, rtsFalse/*no commas!*/);
3370
3371   sched_belch("all threads at [%s]:", time_string);
3372 # elif defined(PAR)
3373   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3374   ullong_format_string(CURRENT_TIME,
3375                        time_string, rtsFalse/*no commas!*/);
3376
3377   sched_belch("all threads at [%s]:", time_string);
3378 # else
3379   sched_belch("all threads:");
3380 # endif
3381
3382   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3383     fprintf(stderr, "\tthread %d ", t->id);
3384     printThreadStatus(t);
3385     fprintf(stderr,"\n");
3386   }
3387 }
3388     
3389 /* 
3390    Print a whole blocking queue attached to node (debugging only).
3391 */
3392 //@cindex print_bq
3393 # if defined(PAR)
3394 void 
3395 print_bq (StgClosure *node)
3396 {
3397   StgBlockingQueueElement *bqe;
3398   StgTSO *tso;
3399   rtsBool end;
3400
3401   fprintf(stderr,"## BQ of closure %p (%s): ",
3402           node, info_type(node));
3403
3404   /* should cover all closures that may have a blocking queue */
3405   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3406          get_itbl(node)->type == FETCH_ME_BQ ||
3407          get_itbl(node)->type == RBH ||
3408          get_itbl(node)->type == MVAR);
3409     
3410   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3411
3412   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3413 }
3414
3415 /* 
3416    Print a whole blocking queue starting with the element bqe.
3417 */
3418 void 
3419 print_bqe (StgBlockingQueueElement *bqe)
3420 {
3421   rtsBool end;
3422
3423   /* 
3424      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3425   */
3426   for (end = (bqe==END_BQ_QUEUE);
3427        !end; // iterate until bqe points to a CONSTR
3428        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3429        bqe = end ? END_BQ_QUEUE : bqe->link) {
3430     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3431     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3432     /* types of closures that may appear in a blocking queue */
3433     ASSERT(get_itbl(bqe)->type == TSO ||           
3434            get_itbl(bqe)->type == BLOCKED_FETCH || 
3435            get_itbl(bqe)->type == CONSTR); 
3436     /* only BQs of an RBH end with an RBH_Save closure */
3437     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3438
3439     switch (get_itbl(bqe)->type) {
3440     case TSO:
3441       fprintf(stderr," TSO %u (%x),",
3442               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3443       break;
3444     case BLOCKED_FETCH:
3445       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3446               ((StgBlockedFetch *)bqe)->node, 
3447               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3448               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3449               ((StgBlockedFetch *)bqe)->ga.weight);
3450       break;
3451     case CONSTR:
3452       fprintf(stderr," %s (IP %p),",
3453               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3454                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3455                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3456                "RBH_Save_?"), get_itbl(bqe));
3457       break;
3458     default:
3459       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3460            info_type((StgClosure *)bqe)); // , node, info_type(node));
3461       break;
3462     }
3463   } /* for */
3464   fputc('\n', stderr);
3465 }
3466 # elif defined(GRAN)
3467 void 
3468 print_bq (StgClosure *node)
3469 {
3470   StgBlockingQueueElement *bqe;
3471   PEs node_loc, tso_loc;
3472   rtsBool end;
3473
3474   /* should cover all closures that may have a blocking queue */
3475   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3476          get_itbl(node)->type == FETCH_ME_BQ ||
3477          get_itbl(node)->type == RBH);
3478     
3479   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3480   node_loc = where_is(node);
3481
3482   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3483           node, info_type(node), node_loc);
3484
3485   /* 
3486      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3487   */
3488   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3489        !end; // iterate until bqe points to a CONSTR
3490        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3491     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3492     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3493     /* types of closures that may appear in a blocking queue */
3494     ASSERT(get_itbl(bqe)->type == TSO ||           
3495            get_itbl(bqe)->type == CONSTR); 
3496     /* only BQs of an RBH end with an RBH_Save closure */
3497     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3498
3499     tso_loc = where_is((StgClosure *)bqe);
3500     switch (get_itbl(bqe)->type) {
3501     case TSO:
3502       fprintf(stderr," TSO %d (%p) on [PE %d],",
3503               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3504       break;
3505     case CONSTR:
3506       fprintf(stderr," %s (IP %p),",
3507               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3508                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3509                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3510                "RBH_Save_?"), get_itbl(bqe));
3511       break;
3512     default:
3513       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3514            info_type((StgClosure *)bqe), node, info_type(node));
3515       break;
3516     }
3517   } /* for */
3518   fputc('\n', stderr);
3519 }
3520 #else
3521 /* 
3522    Nice and easy: only TSOs on the blocking queue
3523 */
3524 void 
3525 print_bq (StgClosure *node)
3526 {
3527   StgTSO *tso;
3528
3529   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3530   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3531        tso != END_TSO_QUEUE; 
3532        tso=tso->link) {
3533     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3534     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3535     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3536   }
3537   fputc('\n', stderr);
3538 }
3539 # endif
3540
3541 #if defined(PAR)
3542 static nat
3543 run_queue_len(void)
3544 {
3545   nat i;
3546   StgTSO *tso;
3547
3548   for (i=0, tso=run_queue_hd; 
3549        tso != END_TSO_QUEUE;
3550        i++, tso=tso->link)
3551     /* nothing */
3552
3553   return i;
3554 }
3555 #endif
3556
3557 static void
3558 sched_belch(char *s, ...)
3559 {
3560   va_list ap;
3561   va_start(ap,s);
3562 #ifdef SMP
3563   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3564 #elif defined(PAR)
3565   fprintf(stderr, "== ");
3566 #else
3567   fprintf(stderr, "scheduler: ");
3568 #endif
3569   vfprintf(stderr, s, ap);
3570   fprintf(stderr, "\n");
3571 }
3572
3573 #endif /* DEBUG */
3574
3575
3576 //@node Index,  , Debugging Routines, Main scheduling code
3577 //@subsection Index
3578
3579 //@index
3580 //* MainRegTable::  @cindex\s-+MainRegTable
3581 //* StgMainThread::  @cindex\s-+StgMainThread
3582 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3583 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3584 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3585 //* context_switch::  @cindex\s-+context_switch
3586 //* createThread::  @cindex\s-+createThread
3587 //* free_capabilities::  @cindex\s-+free_capabilities
3588 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3589 //* initScheduler::  @cindex\s-+initScheduler
3590 //* interrupted::  @cindex\s-+interrupted
3591 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3592 //* next_thread_id::  @cindex\s-+next_thread_id
3593 //* print_bq::  @cindex\s-+print_bq
3594 //* run_queue_hd::  @cindex\s-+run_queue_hd
3595 //* run_queue_tl::  @cindex\s-+run_queue_tl
3596 //* sched_mutex::  @cindex\s-+sched_mutex
3597 //* schedule::  @cindex\s-+schedule
3598 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3599 //* task_ids::  @cindex\s-+task_ids
3600 //* term_mutex::  @cindex\s-+term_mutex
3601 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3602 //@end index