[project @ 2004-02-27 16:16:31 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.192 2004/02/27 16:16:31 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2003
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 /* 
21  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22
23    The main scheduling loop in GUM iterates until a finish message is received.
24    In that case a global flag @receivedFinish@ is set and this instance of
25    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
26    for the handling of incoming messages, such as PP_FINISH.
27    Note that in the parallel case we have a system manager that coordinates
28    different PEs, each of which are running one instance of the RTS.
29    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
30    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31
32  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33
34    The main scheduling code in GranSim is quite different from that in std
35    (concurrent) Haskell: while concurrent Haskell just iterates over the
36    threads in the runnable queue, GranSim is event driven, i.e. it iterates
37    over the events in the global event queue.  -- HWL
38 */
39
40 #include "PosixSource.h"
41 #include "Rts.h"
42 #include "SchedAPI.h"
43 #include "RtsUtils.h"
44 #include "RtsFlags.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "StgStartup.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Storage.h"
53 #include "Interpreter.h"
54 #include "Exception.h"
55 #include "Printer.h"
56 #include "Signals.h"
57 #include "Sanity.h"
58 #include "Stats.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #ifdef PROFILING
63 #include "Proftimer.h"
64 #include "ProfHeap.h"
65 #endif
66 #if defined(GRAN) || defined(PAR)
67 # include "GranSimRts.h"
68 # include "GranSim.h"
69 # include "ParallelRts.h"
70 # include "Parallel.h"
71 # include "ParallelDebug.h"
72 # include "FetchMe.h"
73 # include "HLC.h"
74 #endif
75 #include "Sparks.h"
76 #include "Capability.h"
77 #include "OSThreads.h"
78 #include  "Task.h"
79
80 #ifdef HAVE_SYS_TYPES_H
81 #include <sys/types.h>
82 #endif
83 #ifdef HAVE_UNISTD_H
84 #include <unistd.h>
85 #endif
86
87 #include <string.h>
88 #include <stdlib.h>
89 #include <stdarg.h>
90
91 #ifdef HAVE_ERRNO_H
92 #include <errno.h>
93 #endif
94
95 #ifdef THREADED_RTS
96 #define USED_IN_THREADED_RTS
97 #else
98 #define USED_IN_THREADED_RTS STG_UNUSED
99 #endif
100
101 #ifdef RTS_SUPPORTS_THREADS
102 #define USED_WHEN_RTS_SUPPORTS_THREADS
103 #else
104 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
105 #endif
106
107 /* Main thread queue.
108  * Locks required: sched_mutex.
109  */
110 StgMainThread *main_threads = NULL;
111
112 /* Thread queues.
113  * Locks required: sched_mutex.
114  */
115 #if defined(GRAN)
116
117 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
118 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
119
120 /* 
121    In GranSim we have a runnable and a blocked queue for each processor.
122    In order to minimise code changes new arrays run_queue_hds/tls
123    are created. run_queue_hd is then a short cut (macro) for
124    run_queue_hds[CurrentProc] (see GranSim.h).
125    -- HWL
126 */
127 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
128 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
129 StgTSO *ccalling_threadss[MAX_PROC];
130 /* We use the same global list of threads (all_threads) in GranSim as in
131    the std RTS (i.e. we are cheating). However, we don't use this list in
132    the GranSim specific code at the moment (so we are only potentially
133    cheating).  */
134
135 #else /* !GRAN */
136
137 StgTSO *run_queue_hd = NULL;
138 StgTSO *run_queue_tl = NULL;
139 StgTSO *blocked_queue_hd = NULL;
140 StgTSO *blocked_queue_tl = NULL;
141 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
142
143 #endif
144
145 /* Linked list of all threads.
146  * Used for detecting garbage collected threads.
147  */
148 StgTSO *all_threads = NULL;
149
150 /* When a thread performs a safe C call (_ccall_GC, using old
151  * terminology), it gets put on the suspended_ccalling_threads
152  * list. Used by the garbage collector.
153  */
154 static StgTSO *suspended_ccalling_threads;
155
156 static StgTSO *threadStackOverflow(StgTSO *tso);
157
158 /* KH: The following two flags are shared memory locations.  There is no need
159        to lock them, since they are only unset at the end of a scheduler
160        operation.
161 */
162
163 /* flag set by signal handler to precipitate a context switch */
164 nat context_switch = 0;
165
166 /* if this flag is set as well, give up execution */
167 rtsBool interrupted = rtsFalse;
168
169 /* Next thread ID to allocate.
170  * Locks required: thread_id_mutex
171  */
172 static StgThreadID next_thread_id = 1;
173
174 /*
175  * Pointers to the state of the current thread.
176  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
177  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
178  */
179  
180 /* The smallest stack size that makes any sense is:
181  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
182  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
183  *  + 1                       (the closure to enter)
184  *  + 1                       (stg_ap_v_ret)
185  *  + 1                       (spare slot req'd by stg_ap_v_ret)
186  *
187  * A thread with this stack will bomb immediately with a stack
188  * overflow, which will increase its stack size.  
189  */
190
191 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
192
193
194 #if defined(GRAN)
195 StgTSO *CurrentTSO;
196 #endif
197
198 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
199  *  exists - earlier gccs apparently didn't.
200  *  -= chak
201  */
202 StgTSO dummy_tso;
203
204 static rtsBool ready_to_gc;
205
206 /*
207  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
208  * in an MT setting, needed to signal that a worker thread shouldn't hang around
209  * in the scheduler when it is out of work.
210  */
211 static rtsBool shutting_down_scheduler = rtsFalse;
212
213 void            addToBlockedQueue ( StgTSO *tso );
214
215 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
216        void     interruptStgRts   ( void );
217
218 static void     detectBlackHoles  ( void );
219
220 #if defined(RTS_SUPPORTS_THREADS)
221 /* ToDo: carefully document the invariants that go together
222  *       with these synchronisation objects.
223  */
224 Mutex     sched_mutex       = INIT_MUTEX_VAR;
225 Mutex     term_mutex        = INIT_MUTEX_VAR;
226
227 #endif /* RTS_SUPPORTS_THREADS */
228
229 #if defined(PAR)
230 StgTSO *LastTSO;
231 rtsTime TimeOfLastYield;
232 rtsBool emitSchedule = rtsTrue;
233 #endif
234
235 #if DEBUG
236 static char *whatNext_strs[] = {
237   "ThreadRunGHC",
238   "ThreadInterpret",
239   "ThreadKilled",
240   "ThreadRelocated",
241   "ThreadComplete"
242 };
243 #endif
244
245 #if defined(PAR)
246 StgTSO * createSparkThread(rtsSpark spark);
247 StgTSO * activateSpark (rtsSpark spark);  
248 #endif
249
250 /* ----------------------------------------------------------------------------
251  * Starting Tasks
252  * ------------------------------------------------------------------------- */
253
254 #if defined(RTS_SUPPORTS_THREADS)
255 static rtsBool startingWorkerThread = rtsFalse;
256
257 static void taskStart(void);
258 static void
259 taskStart(void)
260 {
261   ACQUIRE_LOCK(&sched_mutex);
262   schedule(NULL,NULL);
263   RELEASE_LOCK(&sched_mutex);
264 }
265
266 void
267 startSchedulerTaskIfNecessary(void)
268 {
269   if(run_queue_hd != END_TSO_QUEUE
270     || blocked_queue_hd != END_TSO_QUEUE
271     || sleeping_queue != END_TSO_QUEUE)
272   {
273     if(!startingWorkerThread)
274     { // we don't want to start another worker thread
275       // just because the last one hasn't yet reached the
276       // "waiting for capability" state
277       startingWorkerThread = rtsTrue;
278       startTask(taskStart);
279     }
280   }
281 }
282 #endif
283
284 /* ---------------------------------------------------------------------------
285    Main scheduling loop.
286
287    We use round-robin scheduling, each thread returning to the
288    scheduler loop when one of these conditions is detected:
289
290       * out of heap space
291       * timer expires (thread yields)
292       * thread blocks
293       * thread ends
294       * stack overflow
295
296    Locking notes:  we acquire the scheduler lock once at the beginning
297    of the scheduler loop, and release it when
298     
299       * running a thread, or
300       * waiting for work, or
301       * waiting for a GC to complete.
302
303    GRAN version:
304      In a GranSim setup this loop iterates over the global event queue.
305      This revolves around the global event queue, which determines what 
306      to do next. Therefore, it's more complicated than either the 
307      concurrent or the parallel (GUM) setup.
308
309    GUM version:
310      GUM iterates over incoming messages.
311      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312      and sends out a fish whenever it has nothing to do; in-between
313      doing the actual reductions (shared code below) it processes the
314      incoming messages and deals with delayed operations 
315      (see PendingFetches).
316      This is not the ugliest code you could imagine, but it's bloody close.
317
318    ------------------------------------------------------------------------ */
319 static void
320 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
321           Capability *initialCapability )
322 {
323   StgTSO *t;
324   Capability *cap;
325   StgThreadReturnCode ret;
326 #if defined(GRAN)
327   rtsEvent *event;
328 #elif defined(PAR)
329   StgSparkPool *pool;
330   rtsSpark spark;
331   StgTSO *tso;
332   GlobalTaskId pe;
333   rtsBool receivedFinish = rtsFalse;
334 # if defined(DEBUG)
335   nat tp_size, sp_size; // stats only
336 # endif
337 #endif
338   rtsBool was_interrupted = rtsFalse;
339   StgTSOWhatNext prev_what_next;
340   
341   // Pre-condition: sched_mutex is held.
342   // We might have a capability, passed in as initialCapability.
343   cap = initialCapability;
344
345 #if defined(RTS_SUPPORTS_THREADS)
346   //
347   // in the threaded case, the capability is either passed in via the
348   // initialCapability parameter, or initialized inside the scheduler
349   // loop 
350   //
351   IF_DEBUG(scheduler,
352            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
353                        mainThread, initialCapability);
354       );
355 #else
356   // simply initialise it in the non-threaded case
357   grabCapability(&cap);
358 #endif
359
360 #if defined(GRAN)
361   /* set up first event to get things going */
362   /* ToDo: assign costs for system setup and init MainTSO ! */
363   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
364             ContinueThread, 
365             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
366
367   IF_DEBUG(gran,
368            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
369            G_TSO(CurrentTSO, 5));
370
371   if (RtsFlags.GranFlags.Light) {
372     /* Save current time; GranSim Light only */
373     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
374   }      
375
376   event = get_next_event();
377
378   while (event!=(rtsEvent*)NULL) {
379     /* Choose the processor with the next event */
380     CurrentProc = event->proc;
381     CurrentTSO = event->tso;
382
383 #elif defined(PAR)
384
385   while (!receivedFinish) {    /* set by processMessages */
386                                /* when receiving PP_FINISH message         */ 
387
388 #else // everything except GRAN and PAR
389
390   while (1) {
391
392 #endif
393
394      IF_DEBUG(scheduler, printAllThreads());
395
396 #if defined(RTS_SUPPORTS_THREADS)
397       // Yield the capability to higher-priority tasks if necessary.
398       //
399       if (cap != NULL) {
400           yieldCapability(&cap);
401       }
402
403       // If we do not currently hold a capability, we wait for one
404       //
405       if (cap == NULL) {
406           waitForCapability(&sched_mutex, &cap,
407                             mainThread ? &mainThread->bound_thread_cond : NULL);
408       }
409
410       // We now have a capability...
411 #endif
412
413     //
414     // If we're interrupted (the user pressed ^C, or some other
415     // termination condition occurred), kill all the currently running
416     // threads.
417     //
418     if (interrupted) {
419         IF_DEBUG(scheduler, sched_belch("interrupted"));
420         interrupted = rtsFalse;
421         was_interrupted = rtsTrue;
422 #if defined(RTS_SUPPORTS_THREADS)
423         // In the threaded RTS, deadlock detection doesn't work,
424         // so just exit right away.
425         prog_belch("interrupted");
426         releaseCapability(cap);
427         RELEASE_LOCK(&sched_mutex);
428         shutdownHaskellAndExit(EXIT_SUCCESS);
429 #else
430         deleteAllThreads();
431 #endif
432     }
433
434     //
435     // Go through the list of main threads and wake up any
436     // clients whose computations have finished.  ToDo: this
437     // should be done more efficiently without a linear scan
438     // of the main threads list, somehow...
439     //
440 #if defined(RTS_SUPPORTS_THREADS)
441     { 
442         StgMainThread *m, **prev;
443         prev = &main_threads;
444         for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
445             if (m->tso->what_next == ThreadComplete
446                 || m->tso->what_next == ThreadKilled) {
447                 if (m == mainThread) {
448                     if (m->tso->what_next == ThreadComplete) {
449                         if (m->ret) {
450                             // NOTE: return val is tso->sp[1]
451                             // (see StgStartup.hc)
452                             *(m->ret) = (StgClosure *)m->tso->sp[1]; 
453                         }
454                         m->stat = Success;
455                     } else {
456                         if (m->ret) {
457                             *(m->ret) = NULL;
458                         }
459                         if (was_interrupted) {
460                             m->stat = Interrupted;
461                         } else {
462                             m->stat = Killed;
463                         }
464                     }
465                     *prev = m->link;
466 #ifdef DEBUG
467                     removeThreadLabel((StgWord)m->tso->id);
468 #endif
469                     releaseCapability(cap);
470                     return;
471                 } else {
472                     // The current OS thread can not handle the fact that
473                     // the Haskell thread "m" has ended.  "m" is bound;
474                     // the scheduler loop in its bound OS thread has to
475                     // return, so let's pass the capability directly to
476                     // that thread.
477                     passCapability(&m->bound_thread_cond);
478                     continue;
479                 }
480             }
481         }
482     }
483     
484 #else /* not threaded */
485
486 # if defined(PAR)
487     /* in GUM do this only on the Main PE */
488     if (IAmMainThread)
489 # endif
490     /* If our main thread has finished or been killed, return.
491      */
492     {
493       StgMainThread *m = main_threads;
494       if (m->tso->what_next == ThreadComplete
495           || m->tso->what_next == ThreadKilled) {
496 #ifdef DEBUG
497         removeThreadLabel((StgWord)m->tso->id);
498 #endif
499         main_threads = main_threads->link;
500         if (m->tso->what_next == ThreadComplete) {
501             // We finished successfully, fill in the return value
502             // NOTE: return val is tso->sp[1] (see StgStartup.hc)
503             if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
504             m->stat = Success;
505             return;
506         } else {
507           if (m->ret) { *(m->ret) = NULL; };
508           if (was_interrupted) {
509             m->stat = Interrupted;
510           } else {
511             m->stat = Killed;
512           }
513           return;
514         }
515       }
516     }
517 #endif
518
519
520 #if defined(RTS_USER_SIGNALS)
521     // check for signals each time around the scheduler
522     if (signals_pending()) {
523       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
524       startSignalHandlers();
525       ACQUIRE_LOCK(&sched_mutex);
526     }
527 #endif
528
529     //
530     // Check whether any waiting threads need to be woken up.  If the
531     // run queue is empty, and there are no other tasks running, we
532     // can wait indefinitely for something to happen.
533     //
534     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
535 #if defined(RTS_SUPPORTS_THREADS)
536                 || EMPTY_RUN_QUEUE()
537 #endif
538         )
539     {
540       awaitEvent( EMPTY_RUN_QUEUE() );
541     }
542     // we can be interrupted while waiting for I/O...
543     if (interrupted) continue;
544
545     /* 
546      * Detect deadlock: when we have no threads to run, there are no
547      * threads waiting on I/O or sleeping, and all the other tasks are
548      * waiting for work, we must have a deadlock of some description.
549      *
550      * We first try to find threads blocked on themselves (ie. black
551      * holes), and generate NonTermination exceptions where necessary.
552      *
553      * If no threads are black holed, we have a deadlock situation, so
554      * inform all the main threads.
555      */
556 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
557     if (   EMPTY_THREAD_QUEUES() )
558     {
559         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
560         // Garbage collection can release some new threads due to
561         // either (a) finalizers or (b) threads resurrected because
562         // they are about to be send BlockedOnDeadMVar.  Any threads
563         // thus released will be immediately runnable.
564         GarbageCollect(GetRoots,rtsTrue);
565
566         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
567
568         IF_DEBUG(scheduler, 
569                  sched_belch("still deadlocked, checking for black holes..."));
570         detectBlackHoles();
571
572         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
573
574 #if defined(RTS_USER_SIGNALS)
575         /* If we have user-installed signal handlers, then wait
576          * for signals to arrive rather then bombing out with a
577          * deadlock.
578          */
579         if ( anyUserHandlers() ) {
580             IF_DEBUG(scheduler, 
581                      sched_belch("still deadlocked, waiting for signals..."));
582
583             awaitUserSignals();
584
585             // we might be interrupted...
586             if (interrupted) { continue; }
587
588             if (signals_pending()) {
589                 RELEASE_LOCK(&sched_mutex);
590                 startSignalHandlers();
591                 ACQUIRE_LOCK(&sched_mutex);
592             }
593             ASSERT(!EMPTY_RUN_QUEUE());
594             goto not_deadlocked;
595         }
596 #endif
597
598         /* Probably a real deadlock.  Send the current main thread the
599          * Deadlock exception (or in the SMP build, send *all* main
600          * threads the deadlock exception, since none of them can make
601          * progress).
602          */
603         {
604             StgMainThread *m;
605             m = main_threads;
606             switch (m->tso->why_blocked) {
607             case BlockedOnBlackHole:
608                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
609                 break;
610             case BlockedOnException:
611             case BlockedOnMVar:
612                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
613                 break;
614             default:
615                 barf("deadlock: main thread blocked in a strange way");
616             }
617         }
618     }
619   not_deadlocked:
620
621 #elif defined(RTS_SUPPORTS_THREADS)
622     // ToDo: add deadlock detection in threaded RTS
623 #elif defined(PAR)
624     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
625 #endif
626
627 #if defined(RTS_SUPPORTS_THREADS)
628     if ( EMPTY_RUN_QUEUE() ) {
629         continue; // nothing to do
630     }
631 #endif
632
633 #if defined(GRAN)
634     if (RtsFlags.GranFlags.Light)
635       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
636
637     /* adjust time based on time-stamp */
638     if (event->time > CurrentTime[CurrentProc] &&
639         event->evttype != ContinueThread)
640       CurrentTime[CurrentProc] = event->time;
641     
642     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
643     if (!RtsFlags.GranFlags.Light)
644       handleIdlePEs();
645
646     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
647
648     /* main event dispatcher in GranSim */
649     switch (event->evttype) {
650       /* Should just be continuing execution */
651     case ContinueThread:
652       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
653       /* ToDo: check assertion
654       ASSERT(run_queue_hd != (StgTSO*)NULL &&
655              run_queue_hd != END_TSO_QUEUE);
656       */
657       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
658       if (!RtsFlags.GranFlags.DoAsyncFetch &&
659           procStatus[CurrentProc]==Fetching) {
660         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
661               CurrentTSO->id, CurrentTSO, CurrentProc);
662         goto next_thread;
663       } 
664       /* Ignore ContinueThreads for completed threads */
665       if (CurrentTSO->what_next == ThreadComplete) {
666         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
667               CurrentTSO->id, CurrentTSO, CurrentProc);
668         goto next_thread;
669       } 
670       /* Ignore ContinueThreads for threads that are being migrated */
671       if (PROCS(CurrentTSO)==Nowhere) { 
672         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
673               CurrentTSO->id, CurrentTSO, CurrentProc);
674         goto next_thread;
675       }
676       /* The thread should be at the beginning of the run queue */
677       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
678         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
679               CurrentTSO->id, CurrentTSO, CurrentProc);
680         break; // run the thread anyway
681       }
682       /*
683       new_event(proc, proc, CurrentTime[proc],
684                 FindWork,
685                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
686       goto next_thread; 
687       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
688       break; // now actually run the thread; DaH Qu'vam yImuHbej 
689
690     case FetchNode:
691       do_the_fetchnode(event);
692       goto next_thread;             /* handle next event in event queue  */
693       
694     case GlobalBlock:
695       do_the_globalblock(event);
696       goto next_thread;             /* handle next event in event queue  */
697       
698     case FetchReply:
699       do_the_fetchreply(event);
700       goto next_thread;             /* handle next event in event queue  */
701       
702     case UnblockThread:   /* Move from the blocked queue to the tail of */
703       do_the_unblock(event);
704       goto next_thread;             /* handle next event in event queue  */
705       
706     case ResumeThread:  /* Move from the blocked queue to the tail of */
707       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
708       event->tso->gran.blocktime += 
709         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
710       do_the_startthread(event);
711       goto next_thread;             /* handle next event in event queue  */
712       
713     case StartThread:
714       do_the_startthread(event);
715       goto next_thread;             /* handle next event in event queue  */
716       
717     case MoveThread:
718       do_the_movethread(event);
719       goto next_thread;             /* handle next event in event queue  */
720       
721     case MoveSpark:
722       do_the_movespark(event);
723       goto next_thread;             /* handle next event in event queue  */
724       
725     case FindWork:
726       do_the_findwork(event);
727       goto next_thread;             /* handle next event in event queue  */
728       
729     default:
730       barf("Illegal event type %u\n", event->evttype);
731     }  /* switch */
732     
733     /* This point was scheduler_loop in the old RTS */
734
735     IF_DEBUG(gran, belch("GRAN: after main switch"));
736
737     TimeOfLastEvent = CurrentTime[CurrentProc];
738     TimeOfNextEvent = get_time_of_next_event();
739     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
740     // CurrentTSO = ThreadQueueHd;
741
742     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
743                          TimeOfNextEvent));
744
745     if (RtsFlags.GranFlags.Light) 
746       GranSimLight_leave_system(event, &ActiveTSO); 
747
748     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
749
750     IF_DEBUG(gran, 
751              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
752
753     /* in a GranSim setup the TSO stays on the run queue */
754     t = CurrentTSO;
755     /* Take a thread from the run queue. */
756     POP_RUN_QUEUE(t); // take_off_run_queue(t);
757
758     IF_DEBUG(gran, 
759              fprintf(stderr, "GRAN: About to run current thread, which is\n");
760              G_TSO(t,5));
761
762     context_switch = 0; // turned on via GranYield, checking events and time slice
763
764     IF_DEBUG(gran, 
765              DumpGranEvent(GR_SCHEDULE, t));
766
767     procStatus[CurrentProc] = Busy;
768
769 #elif defined(PAR)
770     if (PendingFetches != END_BF_QUEUE) {
771         processFetches();
772     }
773
774     /* ToDo: phps merge with spark activation above */
775     /* check whether we have local work and send requests if we have none */
776     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
777       /* :-[  no local threads => look out for local sparks */
778       /* the spark pool for the current PE */
779       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
780       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
781           pool->hd < pool->tl) {
782         /* 
783          * ToDo: add GC code check that we really have enough heap afterwards!!
784          * Old comment:
785          * If we're here (no runnable threads) and we have pending
786          * sparks, we must have a space problem.  Get enough space
787          * to turn one of those pending sparks into a
788          * thread... 
789          */
790
791         spark = findSpark(rtsFalse);                /* get a spark */
792         if (spark != (rtsSpark) NULL) {
793           tso = activateSpark(spark);       /* turn the spark into a thread */
794           IF_PAR_DEBUG(schedule,
795                        belch("==== schedule: Created TSO %d (%p); %d threads active",
796                              tso->id, tso, advisory_thread_count));
797
798           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
799             belch("==^^ failed to activate spark");
800             goto next_thread;
801           }               /* otherwise fall through & pick-up new tso */
802         } else {
803           IF_PAR_DEBUG(verbose,
804                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
805                              spark_queue_len(pool)));
806           goto next_thread;
807         }
808       }
809
810       /* If we still have no work we need to send a FISH to get a spark
811          from another PE 
812       */
813       if (EMPTY_RUN_QUEUE()) {
814       /* =8-[  no local sparks => look for work on other PEs */
815         /*
816          * We really have absolutely no work.  Send out a fish
817          * (there may be some out there already), and wait for
818          * something to arrive.  We clearly can't run any threads
819          * until a SCHEDULE or RESUME arrives, and so that's what
820          * we're hoping to see.  (Of course, we still have to
821          * respond to other types of messages.)
822          */
823         TIME now = msTime() /*CURRENT_TIME*/;
824         IF_PAR_DEBUG(verbose, 
825                      belch("--  now=%ld", now));
826         IF_PAR_DEBUG(verbose,
827                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
828                          (last_fish_arrived_at!=0 &&
829                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
830                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
831                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
832                              last_fish_arrived_at,
833                              RtsFlags.ParFlags.fishDelay, now);
834                      });
835         
836         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
837             (last_fish_arrived_at==0 ||
838              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
839           /* outstandingFishes is set in sendFish, processFish;
840              avoid flooding system with fishes via delay */
841           pe = choosePE();
842           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
843                    NEW_FISH_HUNGER);
844
845           // Global statistics: count no. of fishes
846           if (RtsFlags.ParFlags.ParStats.Global &&
847               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
848             globalParStats.tot_fish_mess++;
849           }
850         }
851       
852         receivedFinish = processMessages();
853         goto next_thread;
854       }
855     } else if (PacketsWaiting()) {  /* Look for incoming messages */
856       receivedFinish = processMessages();
857     }
858
859     /* Now we are sure that we have some work available */
860     ASSERT(run_queue_hd != END_TSO_QUEUE);
861
862     /* Take a thread from the run queue, if we have work */
863     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
864     IF_DEBUG(sanity,checkTSO(t));
865
866     /* ToDo: write something to the log-file
867     if (RTSflags.ParFlags.granSimStats && !sameThread)
868         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
869
870     CurrentTSO = t;
871     */
872     /* the spark pool for the current PE */
873     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
874
875     IF_DEBUG(scheduler, 
876              belch("--=^ %d threads, %d sparks on [%#x]", 
877                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
878
879 # if 1
880     if (0 && RtsFlags.ParFlags.ParStats.Full && 
881         t && LastTSO && t->id != LastTSO->id && 
882         LastTSO->why_blocked == NotBlocked && 
883         LastTSO->what_next != ThreadComplete) {
884       // if previously scheduled TSO not blocked we have to record the context switch
885       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
886                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
887     }
888
889     if (RtsFlags.ParFlags.ParStats.Full && 
890         (emitSchedule /* forced emit */ ||
891         (t && LastTSO && t->id != LastTSO->id))) {
892       /* 
893          we are running a different TSO, so write a schedule event to log file
894          NB: If we use fair scheduling we also have to write  a deschedule 
895              event for LastTSO; with unfair scheduling we know that the
896              previous tso has blocked whenever we switch to another tso, so
897              we don't need it in GUM for now
898       */
899       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
900                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
901       emitSchedule = rtsFalse;
902     }
903      
904 # endif
905 #else /* !GRAN && !PAR */
906   
907     // grab a thread from the run queue
908     ASSERT(run_queue_hd != END_TSO_QUEUE);
909     POP_RUN_QUEUE(t);
910
911     // Sanity check the thread we're about to run.  This can be
912     // expensive if there is lots of thread switching going on...
913     IF_DEBUG(sanity,checkTSO(t));
914 #endif
915
916 #ifdef THREADED_RTS
917     {
918       StgMainThread *m;
919       for(m = main_threads; m; m = m->link)
920       {
921         if(m->tso == t)
922           break;
923       }
924       
925       if(m)
926       {
927         if(m == mainThread)
928         {
929           IF_DEBUG(scheduler,
930             sched_belch("### Running thread %d in bound thread", t->id));
931           // yes, the Haskell thread is bound to the current native thread
932         }
933         else
934         {
935           IF_DEBUG(scheduler,
936             sched_belch("### thread %d bound to another OS thread", t->id));
937           // no, bound to a different Haskell thread: pass to that thread
938           PUSH_ON_RUN_QUEUE(t);
939           passCapability(&m->bound_thread_cond);
940           continue;
941         }
942       }
943       else
944       {
945         if(mainThread != NULL)
946         // The thread we want to run is bound.
947         {
948           IF_DEBUG(scheduler,
949             sched_belch("### this OS thread cannot run thread %d", t->id));
950           // no, the current native thread is bound to a different
951           // Haskell thread, so pass it to any worker thread
952           PUSH_ON_RUN_QUEUE(t);
953           passCapabilityToWorker();
954           continue; 
955         }
956       }
957     }
958 #endif
959
960     cap->r.rCurrentTSO = t;
961     
962     /* context switches are now initiated by the timer signal, unless
963      * the user specified "context switch as often as possible", with
964      * +RTS -C0
965      */
966     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
967          && (run_queue_hd != END_TSO_QUEUE
968              || blocked_queue_hd != END_TSO_QUEUE
969              || sleeping_queue != END_TSO_QUEUE)))
970         context_switch = 1;
971     else
972         context_switch = 0;
973
974 run_thread:
975
976     RELEASE_LOCK(&sched_mutex);
977
978     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
979                               t->id, whatNext_strs[t->what_next]));
980
981 #ifdef PROFILING
982     startHeapProfTimer();
983 #endif
984
985     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
986     /* Run the current thread 
987      */
988     prev_what_next = t->what_next;
989     switch (prev_what_next) {
990     case ThreadKilled:
991     case ThreadComplete:
992         /* Thread already finished, return to scheduler. */
993         ret = ThreadFinished;
994         break;
995     case ThreadRunGHC:
996         errno = t->saved_errno;
997         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
998         t->saved_errno = errno;
999         break;
1000     case ThreadInterpret:
1001         ret = interpretBCO(cap);
1002         break;
1003     default:
1004       barf("schedule: invalid what_next field");
1005     }
1006     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1007     
1008     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1009 #ifdef PROFILING
1010     stopHeapProfTimer();
1011     CCCS = CCS_SYSTEM;
1012 #endif
1013     
1014     ACQUIRE_LOCK(&sched_mutex);
1015     
1016 #ifdef RTS_SUPPORTS_THREADS
1017     IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
1018 #elif !defined(GRAN) && !defined(PAR)
1019     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
1020 #endif
1021     t = cap->r.rCurrentTSO;
1022     
1023 #if defined(PAR)
1024     /* HACK 675: if the last thread didn't yield, make sure to print a 
1025        SCHEDULE event to the log file when StgRunning the next thread, even
1026        if it is the same one as before */
1027     LastTSO = t; 
1028     TimeOfLastYield = CURRENT_TIME;
1029 #endif
1030
1031     switch (ret) {
1032     case HeapOverflow:
1033 #if defined(GRAN)
1034       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1035       globalGranStats.tot_heapover++;
1036 #elif defined(PAR)
1037       globalParStats.tot_heapover++;
1038 #endif
1039
1040       // did the task ask for a large block?
1041       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1042           // if so, get one and push it on the front of the nursery.
1043           bdescr *bd;
1044           nat blocks;
1045           
1046           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1047
1048           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
1049                                    t->id, whatNext_strs[t->what_next], blocks));
1050
1051           // don't do this if it would push us over the
1052           // alloc_blocks_lim limit; we'll GC first.
1053           if (alloc_blocks + blocks < alloc_blocks_lim) {
1054
1055               alloc_blocks += blocks;
1056               bd = allocGroup( blocks );
1057
1058               // link the new group into the list
1059               bd->link = cap->r.rCurrentNursery;
1060               bd->u.back = cap->r.rCurrentNursery->u.back;
1061               if (cap->r.rCurrentNursery->u.back != NULL) {
1062                   cap->r.rCurrentNursery->u.back->link = bd;
1063               } else {
1064                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1065                          g0s0->blocks == cap->r.rNursery);
1066                   cap->r.rNursery = g0s0->blocks = bd;
1067               }           
1068               cap->r.rCurrentNursery->u.back = bd;
1069
1070               // initialise it as a nursery block.  We initialise the
1071               // step, gen_no, and flags field of *every* sub-block in
1072               // this large block, because this is easier than making
1073               // sure that we always find the block head of a large
1074               // block whenever we call Bdescr() (eg. evacuate() and
1075               // isAlive() in the GC would both have to do this, at
1076               // least).
1077               { 
1078                   bdescr *x;
1079                   for (x = bd; x < bd + blocks; x++) {
1080                       x->step = g0s0;
1081                       x->gen_no = 0;
1082                       x->flags = 0;
1083                   }
1084               }
1085
1086               // don't forget to update the block count in g0s0.
1087               g0s0->n_blocks += blocks;
1088               // This assert can be a killer if the app is doing lots
1089               // of large block allocations.
1090               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1091
1092               // now update the nursery to point to the new block
1093               cap->r.rCurrentNursery = bd;
1094
1095               // we might be unlucky and have another thread get on the
1096               // run queue before us and steal the large block, but in that
1097               // case the thread will just end up requesting another large
1098               // block.
1099               PUSH_ON_RUN_QUEUE(t);
1100               break;
1101           }
1102       }
1103
1104       /* make all the running tasks block on a condition variable,
1105        * maybe set context_switch and wait till they all pile in,
1106        * then have them wait on a GC condition variable.
1107        */
1108       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1109                                t->id, whatNext_strs[t->what_next]));
1110       threadPaused(t);
1111 #if defined(GRAN)
1112       ASSERT(!is_on_queue(t,CurrentProc));
1113 #elif defined(PAR)
1114       /* Currently we emit a DESCHEDULE event before GC in GUM.
1115          ToDo: either add separate event to distinguish SYSTEM time from rest
1116                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1117       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1118         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1119                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1120         emitSchedule = rtsTrue;
1121       }
1122 #endif
1123       
1124       ready_to_gc = rtsTrue;
1125       context_switch = 1;               /* stop other threads ASAP */
1126       PUSH_ON_RUN_QUEUE(t);
1127       /* actual GC is done at the end of the while loop */
1128       break;
1129       
1130     case StackOverflow:
1131 #if defined(GRAN)
1132       IF_DEBUG(gran, 
1133                DumpGranEvent(GR_DESCHEDULE, t));
1134       globalGranStats.tot_stackover++;
1135 #elif defined(PAR)
1136       // IF_DEBUG(par, 
1137       // DumpGranEvent(GR_DESCHEDULE, t);
1138       globalParStats.tot_stackover++;
1139 #endif
1140       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1141                                t->id, whatNext_strs[t->what_next]));
1142       /* just adjust the stack for this thread, then pop it back
1143        * on the run queue.
1144        */
1145       threadPaused(t);
1146       { 
1147         StgMainThread *m;
1148         /* enlarge the stack */
1149         StgTSO *new_t = threadStackOverflow(t);
1150         
1151         /* This TSO has moved, so update any pointers to it from the
1152          * main thread stack.  It better not be on any other queues...
1153          * (it shouldn't be).
1154          */
1155         for (m = main_threads; m != NULL; m = m->link) {
1156           if (m->tso == t) {
1157             m->tso = new_t;
1158           }
1159         }
1160         threadPaused(new_t);
1161         PUSH_ON_RUN_QUEUE(new_t);
1162       }
1163       break;
1164
1165     case ThreadYielding:
1166 #if defined(GRAN)
1167       IF_DEBUG(gran, 
1168                DumpGranEvent(GR_DESCHEDULE, t));
1169       globalGranStats.tot_yields++;
1170 #elif defined(PAR)
1171       // IF_DEBUG(par, 
1172       // DumpGranEvent(GR_DESCHEDULE, t);
1173       globalParStats.tot_yields++;
1174 #endif
1175       /* put the thread back on the run queue.  Then, if we're ready to
1176        * GC, check whether this is the last task to stop.  If so, wake
1177        * up the GC thread.  getThread will block during a GC until the
1178        * GC is finished.
1179        */
1180       IF_DEBUG(scheduler,
1181                if (t->what_next != prev_what_next) {
1182                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1183                          t->id, whatNext_strs[t->what_next]);
1184                } else {
1185                    belch("--<< thread %ld (%s) stopped, yielding", 
1186                          t->id, whatNext_strs[t->what_next]);
1187                }
1188                );
1189
1190       IF_DEBUG(sanity,
1191                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1192                checkTSO(t));
1193       ASSERT(t->link == END_TSO_QUEUE);
1194
1195       // Shortcut if we're just switching evaluators: don't bother
1196       // doing stack squeezing (which can be expensive), just run the
1197       // thread.
1198       if (t->what_next != prev_what_next) {
1199           goto run_thread;
1200       }
1201
1202       threadPaused(t);
1203
1204 #if defined(GRAN)
1205       ASSERT(!is_on_queue(t,CurrentProc));
1206
1207       IF_DEBUG(sanity,
1208                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1209                checkThreadQsSanity(rtsTrue));
1210 #endif
1211
1212 #if defined(PAR)
1213       if (RtsFlags.ParFlags.doFairScheduling) { 
1214         /* this does round-robin scheduling; good for concurrency */
1215         APPEND_TO_RUN_QUEUE(t);
1216       } else {
1217         /* this does unfair scheduling; good for parallelism */
1218         PUSH_ON_RUN_QUEUE(t);
1219       }
1220 #else
1221       // this does round-robin scheduling; good for concurrency
1222       APPEND_TO_RUN_QUEUE(t);
1223 #endif
1224
1225 #if defined(GRAN)
1226       /* add a ContinueThread event to actually process the thread */
1227       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1228                 ContinueThread,
1229                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1230       IF_GRAN_DEBUG(bq, 
1231                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1232                G_EVENTQ(0);
1233                G_CURR_THREADQ(0));
1234 #endif /* GRAN */
1235       break;
1236
1237     case ThreadBlocked:
1238 #if defined(GRAN)
1239       IF_DEBUG(scheduler,
1240                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1241                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1242                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1243
1244       // ??? needed; should emit block before
1245       IF_DEBUG(gran, 
1246                DumpGranEvent(GR_DESCHEDULE, t)); 
1247       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1248       /*
1249         ngoq Dogh!
1250       ASSERT(procStatus[CurrentProc]==Busy || 
1251               ((procStatus[CurrentProc]==Fetching) && 
1252               (t->block_info.closure!=(StgClosure*)NULL)));
1253       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1254           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1255             procStatus[CurrentProc]==Fetching)) 
1256         procStatus[CurrentProc] = Idle;
1257       */
1258 #elif defined(PAR)
1259       IF_DEBUG(scheduler,
1260                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1261                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1262       IF_PAR_DEBUG(bq,
1263
1264                    if (t->block_info.closure!=(StgClosure*)NULL) 
1265                      print_bq(t->block_info.closure));
1266
1267       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1268       blockThread(t);
1269
1270       /* whatever we schedule next, we must log that schedule */
1271       emitSchedule = rtsTrue;
1272
1273 #else /* !GRAN */
1274       /* don't need to do anything.  Either the thread is blocked on
1275        * I/O, in which case we'll have called addToBlockedQueue
1276        * previously, or it's blocked on an MVar or Blackhole, in which
1277        * case it'll be on the relevant queue already.
1278        */
1279       IF_DEBUG(scheduler,
1280                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1281                        t->id, whatNext_strs[t->what_next]);
1282                printThreadBlockage(t);
1283                fprintf(stderr, "\n"));
1284       fflush(stderr);
1285
1286       /* Only for dumping event to log file 
1287          ToDo: do I need this in GranSim, too?
1288       blockThread(t);
1289       */
1290 #endif
1291       threadPaused(t);
1292       break;
1293
1294     case ThreadFinished:
1295       /* Need to check whether this was a main thread, and if so, signal
1296        * the task that started it with the return value.  If we have no
1297        * more main threads, we probably need to stop all the tasks until
1298        * we get a new one.
1299        */
1300       /* We also end up here if the thread kills itself with an
1301        * uncaught exception, see Exception.hc.
1302        */
1303       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1304                                t->id, whatNext_strs[t->what_next]));
1305 #if defined(GRAN)
1306       endThread(t, CurrentProc); // clean-up the thread
1307 #elif defined(PAR)
1308       /* For now all are advisory -- HWL */
1309       //if(t->priority==AdvisoryPriority) ??
1310       advisory_thread_count--;
1311       
1312 # ifdef DIST
1313       if(t->dist.priority==RevalPriority)
1314         FinishReval(t);
1315 # endif
1316       
1317       if (RtsFlags.ParFlags.ParStats.Full &&
1318           !RtsFlags.ParFlags.ParStats.Suppressed) 
1319         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1320 #endif
1321       break;
1322       
1323     default:
1324       barf("schedule: invalid thread return code %d", (int)ret);
1325     }
1326
1327 #ifdef PROFILING
1328     // When we have +RTS -i0 and we're heap profiling, do a census at
1329     // every GC.  This lets us get repeatable runs for debugging.
1330     if (performHeapProfile ||
1331         (RtsFlags.ProfFlags.profileInterval==0 &&
1332          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1333         GarbageCollect(GetRoots, rtsTrue);
1334         heapCensus();
1335         performHeapProfile = rtsFalse;
1336         ready_to_gc = rtsFalse; // we already GC'd
1337     }
1338 #endif
1339
1340     if (ready_to_gc) {
1341       /* everybody back, start the GC.
1342        * Could do it in this thread, or signal a condition var
1343        * to do it in another thread.  Either way, we need to
1344        * broadcast on gc_pending_cond afterward.
1345        */
1346 #if defined(RTS_SUPPORTS_THREADS)
1347       IF_DEBUG(scheduler,sched_belch("doing GC"));
1348 #endif
1349       GarbageCollect(GetRoots,rtsFalse);
1350       ready_to_gc = rtsFalse;
1351 #if defined(GRAN)
1352       /* add a ContinueThread event to continue execution of current thread */
1353       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1354                 ContinueThread,
1355                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1356       IF_GRAN_DEBUG(bq, 
1357                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1358                G_EVENTQ(0);
1359                G_CURR_THREADQ(0));
1360 #endif /* GRAN */
1361     }
1362
1363 #if defined(GRAN)
1364   next_thread:
1365     IF_GRAN_DEBUG(unused,
1366                   print_eventq(EventHd));
1367
1368     event = get_next_event();
1369 #elif defined(PAR)
1370   next_thread:
1371     /* ToDo: wait for next message to arrive rather than busy wait */
1372 #endif /* GRAN */
1373
1374   } /* end of while(1) */
1375
1376   IF_PAR_DEBUG(verbose,
1377                belch("== Leaving schedule() after having received Finish"));
1378 }
1379
1380 /* ---------------------------------------------------------------------------
1381  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1382  * used by Control.Concurrent for error checking.
1383  * ------------------------------------------------------------------------- */
1384  
1385 StgBool
1386 rtsSupportsBoundThreads(void)
1387 {
1388 #ifdef THREADED_RTS
1389   return rtsTrue;
1390 #else
1391   return rtsFalse;
1392 #endif
1393 }
1394
1395 /* ---------------------------------------------------------------------------
1396  * isThreadBound(tso): check whether tso is bound to an OS thread.
1397  * ------------------------------------------------------------------------- */
1398  
1399 StgBool
1400 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1401 {
1402 #ifdef THREADED_RTS
1403   StgMainThread *m;
1404   for(m = main_threads; m; m = m->link)
1405   {
1406     if(m->tso == tso)
1407       return rtsTrue;
1408   }
1409 #endif
1410   return rtsFalse;
1411 }
1412
1413 /* ---------------------------------------------------------------------------
1414  * Singleton fork(). Do not copy any running threads.
1415  * ------------------------------------------------------------------------- */
1416
1417 static void 
1418 deleteThreadImmediately(StgTSO *tso);
1419
1420 StgInt
1421 forkProcess(HsStablePtr *entry)
1422 {
1423 #ifndef mingw32_TARGET_OS
1424   pid_t pid;
1425   StgTSO* t,*next;
1426   StgMainThread *m;
1427   SchedulerStatus rc;
1428
1429   IF_DEBUG(scheduler,sched_belch("forking!"));
1430   rts_lock(); // This not only acquires sched_mutex, it also
1431               // makes sure that no other threads are running
1432
1433   pid = fork();
1434
1435   if (pid) { /* parent */
1436
1437   /* just return the pid */
1438     rts_unlock();
1439     return pid;
1440     
1441   } else { /* child */
1442     
1443     
1444       // delete all threads
1445     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1446     
1447     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1448       next = t->link;
1449
1450         // don't allow threads to catch the ThreadKilled exception
1451       deleteThreadImmediately(t);
1452     }
1453     
1454       // wipe the main thread list
1455     while((m = main_threads) != NULL) {
1456       main_threads = m->link;
1457 #ifdef THREADED_RTS
1458       closeCondition(&m->bound_thread_cond);
1459 #endif
1460       stgFree(m);
1461     }
1462     
1463 #ifdef RTS_SUPPORTS_THREADS
1464     resetTaskManagerAfterFork();      // tell startTask() and friends that
1465     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1466     resetWorkerWakeupPipeAfterFork();
1467 #endif
1468     
1469     rc = rts_evalStableIO(entry, NULL);  // run the action
1470     rts_checkSchedStatus("forkProcess",rc);
1471     
1472     rts_unlock();
1473     
1474     hs_exit();                      // clean up and exit
1475     stg_exit(0);
1476   }
1477 #else /* mingw32 */
1478   barf("forkProcess#: primop not implemented for mingw32, sorry!\n");
1479   return -1;
1480 #endif /* mingw32 */
1481 }
1482
1483 /* ---------------------------------------------------------------------------
1484  * deleteAllThreads():  kill all the live threads.
1485  *
1486  * This is used when we catch a user interrupt (^C), before performing
1487  * any necessary cleanups and running finalizers.
1488  *
1489  * Locks: sched_mutex held.
1490  * ------------------------------------------------------------------------- */
1491    
1492 void
1493 deleteAllThreads ( void )
1494 {
1495   StgTSO* t, *next;
1496   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1497   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1498       next = t->global_link;
1499       deleteThread(t);
1500   }      
1501   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1502   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1503   sleeping_queue = END_TSO_QUEUE;
1504 }
1505
1506 /* startThread and  insertThread are now in GranSim.c -- HWL */
1507
1508
1509 /* ---------------------------------------------------------------------------
1510  * Suspending & resuming Haskell threads.
1511  * 
1512  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1513  * its capability before calling the C function.  This allows another
1514  * task to pick up the capability and carry on running Haskell
1515  * threads.  It also means that if the C call blocks, it won't lock
1516  * the whole system.
1517  *
1518  * The Haskell thread making the C call is put to sleep for the
1519  * duration of the call, on the susepended_ccalling_threads queue.  We
1520  * give out a token to the task, which it can use to resume the thread
1521  * on return from the C function.
1522  * ------------------------------------------------------------------------- */
1523    
1524 StgInt
1525 suspendThread( StgRegTable *reg, 
1526                rtsBool concCall
1527 #if !defined(DEBUG)
1528                STG_UNUSED
1529 #endif
1530                )
1531 {
1532   nat tok;
1533   Capability *cap;
1534   int saved_errno = errno;
1535
1536   /* assume that *reg is a pointer to the StgRegTable part
1537    * of a Capability.
1538    */
1539   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1540
1541   ACQUIRE_LOCK(&sched_mutex);
1542
1543   IF_DEBUG(scheduler,
1544            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1545
1546   // XXX this might not be necessary --SDM
1547   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1548
1549   threadPaused(cap->r.rCurrentTSO);
1550   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1551   suspended_ccalling_threads = cap->r.rCurrentTSO;
1552
1553 #if defined(RTS_SUPPORTS_THREADS)
1554   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1555   {
1556       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1557       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1558   }
1559   else
1560   {
1561       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1562   }
1563 #endif
1564
1565   /* Use the thread ID as the token; it should be unique */
1566   tok = cap->r.rCurrentTSO->id;
1567
1568   /* Hand back capability */
1569   releaseCapability(cap);
1570   
1571 #if defined(RTS_SUPPORTS_THREADS)
1572   /* Preparing to leave the RTS, so ensure there's a native thread/task
1573      waiting to take over.
1574   */
1575   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1576 #endif
1577
1578   /* Other threads _might_ be available for execution; signal this */
1579   THREAD_RUNNABLE();
1580   RELEASE_LOCK(&sched_mutex);
1581   
1582   errno = saved_errno;
1583   return tok; 
1584 }
1585
1586 StgRegTable *
1587 resumeThread( StgInt tok,
1588               rtsBool concCall STG_UNUSED )
1589 {
1590   StgTSO *tso, **prev;
1591   Capability *cap;
1592   int saved_errno = errno;
1593
1594 #if defined(RTS_SUPPORTS_THREADS)
1595   /* Wait for permission to re-enter the RTS with the result. */
1596   ACQUIRE_LOCK(&sched_mutex);
1597   waitForReturnCapability(&sched_mutex, &cap);
1598
1599   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1600 #else
1601   grabCapability(&cap);
1602 #endif
1603
1604   /* Remove the thread off of the suspended list */
1605   prev = &suspended_ccalling_threads;
1606   for (tso = suspended_ccalling_threads; 
1607        tso != END_TSO_QUEUE; 
1608        prev = &tso->link, tso = tso->link) {
1609     if (tso->id == (StgThreadID)tok) {
1610       *prev = tso->link;
1611       break;
1612     }
1613   }
1614   if (tso == END_TSO_QUEUE) {
1615     barf("resumeThread: thread not found");
1616   }
1617   tso->link = END_TSO_QUEUE;
1618   
1619 #if defined(RTS_SUPPORTS_THREADS)
1620   if(tso->why_blocked == BlockedOnCCall)
1621   {
1622       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1623       tso->blocked_exceptions = NULL;
1624   }
1625 #endif
1626   
1627   /* Reset blocking status */
1628   tso->why_blocked  = NotBlocked;
1629
1630   cap->r.rCurrentTSO = tso;
1631   RELEASE_LOCK(&sched_mutex);
1632   errno = saved_errno;
1633   return &cap->r;
1634 }
1635
1636
1637 /* ---------------------------------------------------------------------------
1638  * Static functions
1639  * ------------------------------------------------------------------------ */
1640 static void unblockThread(StgTSO *tso);
1641
1642 /* ---------------------------------------------------------------------------
1643  * Comparing Thread ids.
1644  *
1645  * This is used from STG land in the implementation of the
1646  * instances of Eq/Ord for ThreadIds.
1647  * ------------------------------------------------------------------------ */
1648
1649 int
1650 cmp_thread(StgPtr tso1, StgPtr tso2) 
1651
1652   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1653   StgThreadID id2 = ((StgTSO *)tso2)->id;
1654  
1655   if (id1 < id2) return (-1);
1656   if (id1 > id2) return 1;
1657   return 0;
1658 }
1659
1660 /* ---------------------------------------------------------------------------
1661  * Fetching the ThreadID from an StgTSO.
1662  *
1663  * This is used in the implementation of Show for ThreadIds.
1664  * ------------------------------------------------------------------------ */
1665 int
1666 rts_getThreadId(StgPtr tso) 
1667 {
1668   return ((StgTSO *)tso)->id;
1669 }
1670
1671 #ifdef DEBUG
1672 void
1673 labelThread(StgPtr tso, char *label)
1674 {
1675   int len;
1676   void *buf;
1677
1678   /* Caveat: Once set, you can only set the thread name to "" */
1679   len = strlen(label)+1;
1680   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1681   strncpy(buf,label,len);
1682   /* Update will free the old memory for us */
1683   updateThreadLabel(((StgTSO *)tso)->id,buf);
1684 }
1685 #endif /* DEBUG */
1686
1687 /* ---------------------------------------------------------------------------
1688    Create a new thread.
1689
1690    The new thread starts with the given stack size.  Before the
1691    scheduler can run, however, this thread needs to have a closure
1692    (and possibly some arguments) pushed on its stack.  See
1693    pushClosure() in Schedule.h.
1694
1695    createGenThread() and createIOThread() (in SchedAPI.h) are
1696    convenient packaged versions of this function.
1697
1698    currently pri (priority) is only used in a GRAN setup -- HWL
1699    ------------------------------------------------------------------------ */
1700 #if defined(GRAN)
1701 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1702 StgTSO *
1703 createThread(nat size, StgInt pri)
1704 #else
1705 StgTSO *
1706 createThread(nat size)
1707 #endif
1708 {
1709
1710     StgTSO *tso;
1711     nat stack_size;
1712
1713     /* First check whether we should create a thread at all */
1714 #if defined(PAR)
1715   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1716   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1717     threadsIgnored++;
1718     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1719           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1720     return END_TSO_QUEUE;
1721   }
1722   threadsCreated++;
1723 #endif
1724
1725 #if defined(GRAN)
1726   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1727 #endif
1728
1729   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1730
1731   /* catch ridiculously small stack sizes */
1732   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1733     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1734   }
1735
1736   stack_size = size - TSO_STRUCT_SIZEW;
1737
1738   tso = (StgTSO *)allocate(size);
1739   TICK_ALLOC_TSO(stack_size, 0);
1740
1741   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1742 #if defined(GRAN)
1743   SET_GRAN_HDR(tso, ThisPE);
1744 #endif
1745
1746   // Always start with the compiled code evaluator
1747   tso->what_next = ThreadRunGHC;
1748
1749   tso->id = next_thread_id++; 
1750   tso->why_blocked  = NotBlocked;
1751   tso->blocked_exceptions = NULL;
1752
1753   tso->saved_errno = 0;
1754   
1755   tso->stack_size   = stack_size;
1756   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1757                               - TSO_STRUCT_SIZEW;
1758   tso->sp           = (P_)&(tso->stack) + stack_size;
1759
1760 #ifdef PROFILING
1761   tso->prof.CCCS = CCS_MAIN;
1762 #endif
1763
1764   /* put a stop frame on the stack */
1765   tso->sp -= sizeofW(StgStopFrame);
1766   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1767   // ToDo: check this
1768 #if defined(GRAN)
1769   tso->link = END_TSO_QUEUE;
1770   /* uses more flexible routine in GranSim */
1771   insertThread(tso, CurrentProc);
1772 #else
1773   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1774    * from its creation
1775    */
1776 #endif
1777
1778 #if defined(GRAN) 
1779   if (RtsFlags.GranFlags.GranSimStats.Full) 
1780     DumpGranEvent(GR_START,tso);
1781 #elif defined(PAR)
1782   if (RtsFlags.ParFlags.ParStats.Full) 
1783     DumpGranEvent(GR_STARTQ,tso);
1784   /* HACk to avoid SCHEDULE 
1785      LastTSO = tso; */
1786 #endif
1787
1788   /* Link the new thread on the global thread list.
1789    */
1790   tso->global_link = all_threads;
1791   all_threads = tso;
1792
1793 #if defined(DIST)
1794   tso->dist.priority = MandatoryPriority; //by default that is...
1795 #endif
1796
1797 #if defined(GRAN)
1798   tso->gran.pri = pri;
1799 # if defined(DEBUG)
1800   tso->gran.magic = TSO_MAGIC; // debugging only
1801 # endif
1802   tso->gran.sparkname   = 0;
1803   tso->gran.startedat   = CURRENT_TIME; 
1804   tso->gran.exported    = 0;
1805   tso->gran.basicblocks = 0;
1806   tso->gran.allocs      = 0;
1807   tso->gran.exectime    = 0;
1808   tso->gran.fetchtime   = 0;
1809   tso->gran.fetchcount  = 0;
1810   tso->gran.blocktime   = 0;
1811   tso->gran.blockcount  = 0;
1812   tso->gran.blockedat   = 0;
1813   tso->gran.globalsparks = 0;
1814   tso->gran.localsparks  = 0;
1815   if (RtsFlags.GranFlags.Light)
1816     tso->gran.clock  = Now; /* local clock */
1817   else
1818     tso->gran.clock  = 0;
1819
1820   IF_DEBUG(gran,printTSO(tso));
1821 #elif defined(PAR)
1822 # if defined(DEBUG)
1823   tso->par.magic = TSO_MAGIC; // debugging only
1824 # endif
1825   tso->par.sparkname   = 0;
1826   tso->par.startedat   = CURRENT_TIME; 
1827   tso->par.exported    = 0;
1828   tso->par.basicblocks = 0;
1829   tso->par.allocs      = 0;
1830   tso->par.exectime    = 0;
1831   tso->par.fetchtime   = 0;
1832   tso->par.fetchcount  = 0;
1833   tso->par.blocktime   = 0;
1834   tso->par.blockcount  = 0;
1835   tso->par.blockedat   = 0;
1836   tso->par.globalsparks = 0;
1837   tso->par.localsparks  = 0;
1838 #endif
1839
1840 #if defined(GRAN)
1841   globalGranStats.tot_threads_created++;
1842   globalGranStats.threads_created_on_PE[CurrentProc]++;
1843   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1844   globalGranStats.tot_sq_probes++;
1845 #elif defined(PAR)
1846   // collect parallel global statistics (currently done together with GC stats)
1847   if (RtsFlags.ParFlags.ParStats.Global &&
1848       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1849     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1850     globalParStats.tot_threads_created++;
1851   }
1852 #endif 
1853
1854 #if defined(GRAN)
1855   IF_GRAN_DEBUG(pri,
1856                 belch("==__ schedule: Created TSO %d (%p);",
1857                       CurrentProc, tso, tso->id));
1858 #elif defined(PAR)
1859     IF_PAR_DEBUG(verbose,
1860                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1861                        tso->id, tso, advisory_thread_count));
1862 #else
1863   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1864                                  tso->id, tso->stack_size));
1865 #endif    
1866   return tso;
1867 }
1868
1869 #if defined(PAR)
1870 /* RFP:
1871    all parallel thread creation calls should fall through the following routine.
1872 */
1873 StgTSO *
1874 createSparkThread(rtsSpark spark) 
1875 { StgTSO *tso;
1876   ASSERT(spark != (rtsSpark)NULL);
1877   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1878   { threadsIgnored++;
1879     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1880           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1881     return END_TSO_QUEUE;
1882   }
1883   else
1884   { threadsCreated++;
1885     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1886     if (tso==END_TSO_QUEUE)     
1887       barf("createSparkThread: Cannot create TSO");
1888 #if defined(DIST)
1889     tso->priority = AdvisoryPriority;
1890 #endif
1891     pushClosure(tso,spark);
1892     PUSH_ON_RUN_QUEUE(tso);
1893     advisory_thread_count++;    
1894   }
1895   return tso;
1896 }
1897 #endif
1898
1899 /*
1900   Turn a spark into a thread.
1901   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1902 */
1903 #if defined(PAR)
1904 StgTSO *
1905 activateSpark (rtsSpark spark) 
1906 {
1907   StgTSO *tso;
1908
1909   tso = createSparkThread(spark);
1910   if (RtsFlags.ParFlags.ParStats.Full) {   
1911     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1912     IF_PAR_DEBUG(verbose,
1913                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1914                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1915   }
1916   // ToDo: fwd info on local/global spark to thread -- HWL
1917   // tso->gran.exported =  spark->exported;
1918   // tso->gran.locked =   !spark->global;
1919   // tso->gran.sparkname = spark->name;
1920
1921   return tso;
1922 }
1923 #endif
1924
1925 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1926                                    Capability *initialCapability
1927                                    );
1928
1929
1930 /* ---------------------------------------------------------------------------
1931  * scheduleThread()
1932  *
1933  * scheduleThread puts a thread on the head of the runnable queue.
1934  * This will usually be done immediately after a thread is created.
1935  * The caller of scheduleThread must create the thread using e.g.
1936  * createThread and push an appropriate closure
1937  * on this thread's stack before the scheduler is invoked.
1938  * ------------------------------------------------------------------------ */
1939
1940 static void scheduleThread_ (StgTSO* tso);
1941
1942 void
1943 scheduleThread_(StgTSO *tso)
1944 {
1945   // Precondition: sched_mutex must be held.
1946   PUSH_ON_RUN_QUEUE(tso);
1947   THREAD_RUNNABLE();
1948 }
1949
1950 void
1951 scheduleThread(StgTSO* tso)
1952 {
1953   ACQUIRE_LOCK(&sched_mutex);
1954   scheduleThread_(tso);
1955   RELEASE_LOCK(&sched_mutex);
1956 }
1957
1958 #if defined(RTS_SUPPORTS_THREADS)
1959 static Condition bound_cond_cache;
1960 static int bound_cond_cache_full = 0;
1961 #endif
1962
1963
1964 SchedulerStatus
1965 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1966                    Capability *initialCapability)
1967 {
1968     // Precondition: sched_mutex must be held
1969     StgMainThread *m;
1970
1971     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1972     m->tso = tso;
1973     m->ret = ret;
1974     m->stat = NoStatus;
1975 #if defined(RTS_SUPPORTS_THREADS)
1976     // Allocating a new condition for each thread is expensive, so we
1977     // cache one.  This is a pretty feeble hack, but it helps speed up
1978     // consecutive call-ins quite a bit.
1979     if (bound_cond_cache_full) {
1980         m->bound_thread_cond = bound_cond_cache;
1981         bound_cond_cache_full = 0;
1982     } else {
1983         initCondition(&m->bound_thread_cond);
1984     }
1985 #endif
1986
1987     /* Put the thread on the main-threads list prior to scheduling the TSO.
1988        Failure to do so introduces a race condition in the MT case (as
1989        identified by Wolfgang Thaller), whereby the new task/OS thread 
1990        created by scheduleThread_() would complete prior to the thread
1991        that spawned it managed to put 'itself' on the main-threads list.
1992        The upshot of it all being that the worker thread wouldn't get to
1993        signal the completion of the its work item for the main thread to
1994        see (==> it got stuck waiting.)    -- sof 6/02.
1995     */
1996     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1997     
1998     m->link = main_threads;
1999     main_threads = m;
2000     
2001     PUSH_ON_RUN_QUEUE(tso);
2002     // NB. Don't call THREAD_RUNNABLE() here, because the thread is
2003     // bound and only runnable by *this* OS thread, so waking up other
2004     // workers will just slow things down.
2005
2006     return waitThread_(m, initialCapability);
2007 }
2008
2009 /* ---------------------------------------------------------------------------
2010  * initScheduler()
2011  *
2012  * Initialise the scheduler.  This resets all the queues - if the
2013  * queues contained any threads, they'll be garbage collected at the
2014  * next pass.
2015  *
2016  * ------------------------------------------------------------------------ */
2017
2018 void 
2019 initScheduler(void)
2020 {
2021 #if defined(GRAN)
2022   nat i;
2023
2024   for (i=0; i<=MAX_PROC; i++) {
2025     run_queue_hds[i]      = END_TSO_QUEUE;
2026     run_queue_tls[i]      = END_TSO_QUEUE;
2027     blocked_queue_hds[i]  = END_TSO_QUEUE;
2028     blocked_queue_tls[i]  = END_TSO_QUEUE;
2029     ccalling_threadss[i]  = END_TSO_QUEUE;
2030     sleeping_queue        = END_TSO_QUEUE;
2031   }
2032 #else
2033   run_queue_hd      = END_TSO_QUEUE;
2034   run_queue_tl      = END_TSO_QUEUE;
2035   blocked_queue_hd  = END_TSO_QUEUE;
2036   blocked_queue_tl  = END_TSO_QUEUE;
2037   sleeping_queue    = END_TSO_QUEUE;
2038 #endif 
2039
2040   suspended_ccalling_threads  = END_TSO_QUEUE;
2041
2042   main_threads = NULL;
2043   all_threads  = END_TSO_QUEUE;
2044
2045   context_switch = 0;
2046   interrupted    = 0;
2047
2048   RtsFlags.ConcFlags.ctxtSwitchTicks =
2049       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2050       
2051 #if defined(RTS_SUPPORTS_THREADS)
2052   /* Initialise the mutex and condition variables used by
2053    * the scheduler. */
2054   initMutex(&sched_mutex);
2055   initMutex(&term_mutex);
2056 #endif
2057   
2058   ACQUIRE_LOCK(&sched_mutex);
2059
2060   /* A capability holds the state a native thread needs in
2061    * order to execute STG code. At least one capability is
2062    * floating around (only SMP builds have more than one).
2063    */
2064   initCapabilities();
2065   
2066 #if defined(RTS_SUPPORTS_THREADS)
2067     /* start our haskell execution tasks */
2068     startTaskManager(0,taskStart);
2069 #endif
2070
2071 #if /* defined(SMP) ||*/ defined(PAR)
2072   initSparkPools();
2073 #endif
2074
2075   RELEASE_LOCK(&sched_mutex);
2076 }
2077
2078 void
2079 exitScheduler( void )
2080 {
2081 #if defined(RTS_SUPPORTS_THREADS)
2082   stopTaskManager();
2083 #endif
2084   shutting_down_scheduler = rtsTrue;
2085 }
2086
2087 /* ----------------------------------------------------------------------------
2088    Managing the per-task allocation areas.
2089    
2090    Each capability comes with an allocation area.  These are
2091    fixed-length block lists into which allocation can be done.
2092
2093    ToDo: no support for two-space collection at the moment???
2094    ------------------------------------------------------------------------- */
2095
2096 static
2097 SchedulerStatus
2098 waitThread_(StgMainThread* m, Capability *initialCapability)
2099 {
2100   SchedulerStatus stat;
2101
2102   // Precondition: sched_mutex must be held.
2103   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2104
2105 #if defined(GRAN)
2106   /* GranSim specific init */
2107   CurrentTSO = m->tso;                // the TSO to run
2108   procStatus[MainProc] = Busy;        // status of main PE
2109   CurrentProc = MainProc;             // PE to run it on
2110   schedule(m,initialCapability);
2111 #else
2112   schedule(m,initialCapability);
2113   ASSERT(m->stat != NoStatus);
2114 #endif
2115
2116   stat = m->stat;
2117
2118 #if defined(RTS_SUPPORTS_THREADS)
2119   // Free the condition variable, returning it to the cache if possible.
2120   if (!bound_cond_cache_full) {
2121       bound_cond_cache = m->bound_thread_cond;
2122       bound_cond_cache_full = 1;
2123   } else {
2124       closeCondition(&m->bound_thread_cond);
2125   }
2126 #endif
2127
2128   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2129   stgFree(m);
2130
2131   // Postcondition: sched_mutex still held
2132   return stat;
2133 }
2134
2135 /* ---------------------------------------------------------------------------
2136    Where are the roots that we know about?
2137
2138         - all the threads on the runnable queue
2139         - all the threads on the blocked queue
2140         - all the threads on the sleeping queue
2141         - all the thread currently executing a _ccall_GC
2142         - all the "main threads"
2143      
2144    ------------------------------------------------------------------------ */
2145
2146 /* This has to be protected either by the scheduler monitor, or by the
2147         garbage collection monitor (probably the latter).
2148         KH @ 25/10/99
2149 */
2150
2151 void
2152 GetRoots( evac_fn evac )
2153 {
2154 #if defined(GRAN)
2155   {
2156     nat i;
2157     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2158       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2159           evac((StgClosure **)&run_queue_hds[i]);
2160       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2161           evac((StgClosure **)&run_queue_tls[i]);
2162       
2163       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2164           evac((StgClosure **)&blocked_queue_hds[i]);
2165       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2166           evac((StgClosure **)&blocked_queue_tls[i]);
2167       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2168           evac((StgClosure **)&ccalling_threads[i]);
2169     }
2170   }
2171
2172   markEventQueue();
2173
2174 #else /* !GRAN */
2175   if (run_queue_hd != END_TSO_QUEUE) {
2176       ASSERT(run_queue_tl != END_TSO_QUEUE);
2177       evac((StgClosure **)&run_queue_hd);
2178       evac((StgClosure **)&run_queue_tl);
2179   }
2180   
2181   if (blocked_queue_hd != END_TSO_QUEUE) {
2182       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2183       evac((StgClosure **)&blocked_queue_hd);
2184       evac((StgClosure **)&blocked_queue_tl);
2185   }
2186   
2187   if (sleeping_queue != END_TSO_QUEUE) {
2188       evac((StgClosure **)&sleeping_queue);
2189   }
2190 #endif 
2191
2192   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2193       evac((StgClosure **)&suspended_ccalling_threads);
2194   }
2195
2196 #if defined(PAR) || defined(GRAN)
2197   markSparkQueue(evac);
2198 #endif
2199
2200 #if defined(RTS_USER_SIGNALS)
2201   // mark the signal handlers (signals should be already blocked)
2202   markSignalHandlers(evac);
2203 #endif
2204
2205   // main threads which have completed need to be retained until they
2206   // are dealt with in the main scheduler loop.  They won't be
2207   // retained any other way: the GC will drop them from the
2208   // all_threads list, so we have to be careful to treat them as roots
2209   // here.
2210   { 
2211       StgMainThread *m;
2212       for (m = main_threads; m != NULL; m = m->link) {
2213           switch (m->tso->what_next) {
2214           case ThreadComplete:
2215           case ThreadKilled:
2216               evac((StgClosure **)&m->tso);
2217               break;
2218           default:
2219               break;
2220           }
2221       }
2222   }
2223 }
2224
2225 /* -----------------------------------------------------------------------------
2226    performGC
2227
2228    This is the interface to the garbage collector from Haskell land.
2229    We provide this so that external C code can allocate and garbage
2230    collect when called from Haskell via _ccall_GC.
2231
2232    It might be useful to provide an interface whereby the programmer
2233    can specify more roots (ToDo).
2234    
2235    This needs to be protected by the GC condition variable above.  KH.
2236    -------------------------------------------------------------------------- */
2237
2238 static void (*extra_roots)(evac_fn);
2239
2240 void
2241 performGC(void)
2242 {
2243   /* Obligated to hold this lock upon entry */
2244   ACQUIRE_LOCK(&sched_mutex);
2245   GarbageCollect(GetRoots,rtsFalse);
2246   RELEASE_LOCK(&sched_mutex);
2247 }
2248
2249 void
2250 performMajorGC(void)
2251 {
2252   ACQUIRE_LOCK(&sched_mutex);
2253   GarbageCollect(GetRoots,rtsTrue);
2254   RELEASE_LOCK(&sched_mutex);
2255 }
2256
2257 static void
2258 AllRoots(evac_fn evac)
2259 {
2260     GetRoots(evac);             // the scheduler's roots
2261     extra_roots(evac);          // the user's roots
2262 }
2263
2264 void
2265 performGCWithRoots(void (*get_roots)(evac_fn))
2266 {
2267   ACQUIRE_LOCK(&sched_mutex);
2268   extra_roots = get_roots;
2269   GarbageCollect(AllRoots,rtsFalse);
2270   RELEASE_LOCK(&sched_mutex);
2271 }
2272
2273 /* -----------------------------------------------------------------------------
2274    Stack overflow
2275
2276    If the thread has reached its maximum stack size, then raise the
2277    StackOverflow exception in the offending thread.  Otherwise
2278    relocate the TSO into a larger chunk of memory and adjust its stack
2279    size appropriately.
2280    -------------------------------------------------------------------------- */
2281
2282 static StgTSO *
2283 threadStackOverflow(StgTSO *tso)
2284 {
2285   nat new_stack_size, new_tso_size, stack_words;
2286   StgPtr new_sp;
2287   StgTSO *dest;
2288
2289   IF_DEBUG(sanity,checkTSO(tso));
2290   if (tso->stack_size >= tso->max_stack_size) {
2291
2292     IF_DEBUG(gc,
2293              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2294                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2295              /* If we're debugging, just print out the top of the stack */
2296              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2297                                               tso->sp+64)));
2298
2299     /* Send this thread the StackOverflow exception */
2300     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2301     return tso;
2302   }
2303
2304   /* Try to double the current stack size.  If that takes us over the
2305    * maximum stack size for this thread, then use the maximum instead.
2306    * Finally round up so the TSO ends up as a whole number of blocks.
2307    */
2308   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2309   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2310                                        TSO_STRUCT_SIZE)/sizeof(W_);
2311   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2312   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2313
2314   IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2315
2316   dest = (StgTSO *)allocate(new_tso_size);
2317   TICK_ALLOC_TSO(new_stack_size,0);
2318
2319   /* copy the TSO block and the old stack into the new area */
2320   memcpy(dest,tso,TSO_STRUCT_SIZE);
2321   stack_words = tso->stack + tso->stack_size - tso->sp;
2322   new_sp = (P_)dest + new_tso_size - stack_words;
2323   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2324
2325   /* relocate the stack pointers... */
2326   dest->sp         = new_sp;
2327   dest->stack_size = new_stack_size;
2328         
2329   /* Mark the old TSO as relocated.  We have to check for relocated
2330    * TSOs in the garbage collector and any primops that deal with TSOs.
2331    *
2332    * It's important to set the sp value to just beyond the end
2333    * of the stack, so we don't attempt to scavenge any part of the
2334    * dead TSO's stack.
2335    */
2336   tso->what_next = ThreadRelocated;
2337   tso->link = dest;
2338   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2339   tso->why_blocked = NotBlocked;
2340   dest->mut_link = NULL;
2341
2342   IF_PAR_DEBUG(verbose,
2343                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2344                      tso->id, tso, tso->stack_size);
2345                /* If we're debugging, just print out the top of the stack */
2346                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2347                                                 tso->sp+64)));
2348   
2349   IF_DEBUG(sanity,checkTSO(tso));
2350 #if 0
2351   IF_DEBUG(scheduler,printTSO(dest));
2352 #endif
2353
2354   return dest;
2355 }
2356
2357 /* ---------------------------------------------------------------------------
2358    Wake up a queue that was blocked on some resource.
2359    ------------------------------------------------------------------------ */
2360
2361 #if defined(GRAN)
2362 STATIC_INLINE void
2363 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2364 {
2365 }
2366 #elif defined(PAR)
2367 STATIC_INLINE void
2368 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2369 {
2370   /* write RESUME events to log file and
2371      update blocked and fetch time (depending on type of the orig closure) */
2372   if (RtsFlags.ParFlags.ParStats.Full) {
2373     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2374                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2375                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2376     if (EMPTY_RUN_QUEUE())
2377       emitSchedule = rtsTrue;
2378
2379     switch (get_itbl(node)->type) {
2380         case FETCH_ME_BQ:
2381           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2382           break;
2383         case RBH:
2384         case FETCH_ME:
2385         case BLACKHOLE_BQ:
2386           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2387           break;
2388 #ifdef DIST
2389         case MVAR:
2390           break;
2391 #endif    
2392         default:
2393           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2394         }
2395       }
2396 }
2397 #endif
2398
2399 #if defined(GRAN)
2400 static StgBlockingQueueElement *
2401 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2402 {
2403     StgTSO *tso;
2404     PEs node_loc, tso_loc;
2405
2406     node_loc = where_is(node); // should be lifted out of loop
2407     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2408     tso_loc = where_is((StgClosure *)tso);
2409     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2410       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2411       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2412       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2413       // insertThread(tso, node_loc);
2414       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2415                 ResumeThread,
2416                 tso, node, (rtsSpark*)NULL);
2417       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2418       // len_local++;
2419       // len++;
2420     } else { // TSO is remote (actually should be FMBQ)
2421       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2422                                   RtsFlags.GranFlags.Costs.gunblocktime +
2423                                   RtsFlags.GranFlags.Costs.latency;
2424       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2425                 UnblockThread,
2426                 tso, node, (rtsSpark*)NULL);
2427       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2428       // len++;
2429     }
2430     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2431     IF_GRAN_DEBUG(bq,
2432                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2433                           (node_loc==tso_loc ? "Local" : "Global"), 
2434                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2435     tso->block_info.closure = NULL;
2436     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2437                              tso->id, tso));
2438 }
2439 #elif defined(PAR)
2440 static StgBlockingQueueElement *
2441 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2442 {
2443     StgBlockingQueueElement *next;
2444
2445     switch (get_itbl(bqe)->type) {
2446     case TSO:
2447       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2448       /* if it's a TSO just push it onto the run_queue */
2449       next = bqe->link;
2450       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2451       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2452       THREAD_RUNNABLE();
2453       unblockCount(bqe, node);
2454       /* reset blocking status after dumping event */
2455       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2456       break;
2457
2458     case BLOCKED_FETCH:
2459       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2460       next = bqe->link;
2461       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2462       PendingFetches = (StgBlockedFetch *)bqe;
2463       break;
2464
2465 # if defined(DEBUG)
2466       /* can ignore this case in a non-debugging setup; 
2467          see comments on RBHSave closures above */
2468     case CONSTR:
2469       /* check that the closure is an RBHSave closure */
2470       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2471              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2472              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2473       break;
2474
2475     default:
2476       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2477            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2478            (StgClosure *)bqe);
2479 # endif
2480     }
2481   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2482   return next;
2483 }
2484
2485 #else /* !GRAN && !PAR */
2486 static StgTSO *
2487 unblockOneLocked(StgTSO *tso)
2488 {
2489   StgTSO *next;
2490
2491   ASSERT(get_itbl(tso)->type == TSO);
2492   ASSERT(tso->why_blocked != NotBlocked);
2493   tso->why_blocked = NotBlocked;
2494   next = tso->link;
2495   PUSH_ON_RUN_QUEUE(tso);
2496   THREAD_RUNNABLE();
2497   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2498   return next;
2499 }
2500 #endif
2501
2502 #if defined(GRAN) || defined(PAR)
2503 INLINE_ME StgBlockingQueueElement *
2504 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2505 {
2506   ACQUIRE_LOCK(&sched_mutex);
2507   bqe = unblockOneLocked(bqe, node);
2508   RELEASE_LOCK(&sched_mutex);
2509   return bqe;
2510 }
2511 #else
2512 INLINE_ME StgTSO *
2513 unblockOne(StgTSO *tso)
2514 {
2515   ACQUIRE_LOCK(&sched_mutex);
2516   tso = unblockOneLocked(tso);
2517   RELEASE_LOCK(&sched_mutex);
2518   return tso;
2519 }
2520 #endif
2521
2522 #if defined(GRAN)
2523 void 
2524 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2525 {
2526   StgBlockingQueueElement *bqe;
2527   PEs node_loc;
2528   nat len = 0; 
2529
2530   IF_GRAN_DEBUG(bq, 
2531                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2532                       node, CurrentProc, CurrentTime[CurrentProc], 
2533                       CurrentTSO->id, CurrentTSO));
2534
2535   node_loc = where_is(node);
2536
2537   ASSERT(q == END_BQ_QUEUE ||
2538          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2539          get_itbl(q)->type == CONSTR); // closure (type constructor)
2540   ASSERT(is_unique(node));
2541
2542   /* FAKE FETCH: magically copy the node to the tso's proc;
2543      no Fetch necessary because in reality the node should not have been 
2544      moved to the other PE in the first place
2545   */
2546   if (CurrentProc!=node_loc) {
2547     IF_GRAN_DEBUG(bq, 
2548                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2549                         node, node_loc, CurrentProc, CurrentTSO->id, 
2550                         // CurrentTSO, where_is(CurrentTSO),
2551                         node->header.gran.procs));
2552     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2553     IF_GRAN_DEBUG(bq, 
2554                   belch("## new bitmask of node %p is %#x",
2555                         node, node->header.gran.procs));
2556     if (RtsFlags.GranFlags.GranSimStats.Global) {
2557       globalGranStats.tot_fake_fetches++;
2558     }
2559   }
2560
2561   bqe = q;
2562   // ToDo: check: ASSERT(CurrentProc==node_loc);
2563   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2564     //next = bqe->link;
2565     /* 
2566        bqe points to the current element in the queue
2567        next points to the next element in the queue
2568     */
2569     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2570     //tso_loc = where_is(tso);
2571     len++;
2572     bqe = unblockOneLocked(bqe, node);
2573   }
2574
2575   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2576      the closure to make room for the anchor of the BQ */
2577   if (bqe!=END_BQ_QUEUE) {
2578     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2579     /*
2580     ASSERT((info_ptr==&RBH_Save_0_info) ||
2581            (info_ptr==&RBH_Save_1_info) ||
2582            (info_ptr==&RBH_Save_2_info));
2583     */
2584     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2585     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2586     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2587
2588     IF_GRAN_DEBUG(bq,
2589                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2590                         node, info_type(node)));
2591   }
2592
2593   /* statistics gathering */
2594   if (RtsFlags.GranFlags.GranSimStats.Global) {
2595     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2596     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2597     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2598     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2599   }
2600   IF_GRAN_DEBUG(bq,
2601                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2602                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2603 }
2604 #elif defined(PAR)
2605 void 
2606 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2607 {
2608   StgBlockingQueueElement *bqe;
2609
2610   ACQUIRE_LOCK(&sched_mutex);
2611
2612   IF_PAR_DEBUG(verbose, 
2613                belch("##-_ AwBQ for node %p on [%x]: ",
2614                      node, mytid));
2615 #ifdef DIST  
2616   //RFP
2617   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2618     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2619     return;
2620   }
2621 #endif
2622   
2623   ASSERT(q == END_BQ_QUEUE ||
2624          get_itbl(q)->type == TSO ||           
2625          get_itbl(q)->type == BLOCKED_FETCH || 
2626          get_itbl(q)->type == CONSTR); 
2627
2628   bqe = q;
2629   while (get_itbl(bqe)->type==TSO || 
2630          get_itbl(bqe)->type==BLOCKED_FETCH) {
2631     bqe = unblockOneLocked(bqe, node);
2632   }
2633   RELEASE_LOCK(&sched_mutex);
2634 }
2635
2636 #else   /* !GRAN && !PAR */
2637
2638 #ifdef RTS_SUPPORTS_THREADS
2639 void
2640 awakenBlockedQueueNoLock(StgTSO *tso)
2641 {
2642   while (tso != END_TSO_QUEUE) {
2643     tso = unblockOneLocked(tso);
2644   }
2645 }
2646 #endif
2647
2648 void
2649 awakenBlockedQueue(StgTSO *tso)
2650 {
2651   ACQUIRE_LOCK(&sched_mutex);
2652   while (tso != END_TSO_QUEUE) {
2653     tso = unblockOneLocked(tso);
2654   }
2655   RELEASE_LOCK(&sched_mutex);
2656 }
2657 #endif
2658
2659 /* ---------------------------------------------------------------------------
2660    Interrupt execution
2661    - usually called inside a signal handler so it mustn't do anything fancy.   
2662    ------------------------------------------------------------------------ */
2663
2664 void
2665 interruptStgRts(void)
2666 {
2667     interrupted    = 1;
2668     context_switch = 1;
2669 #ifdef RTS_SUPPORTS_THREADS
2670     wakeBlockedWorkerThread();
2671 #endif
2672 }
2673
2674 /* -----------------------------------------------------------------------------
2675    Unblock a thread
2676
2677    This is for use when we raise an exception in another thread, which
2678    may be blocked.
2679    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2680    -------------------------------------------------------------------------- */
2681
2682 #if defined(GRAN) || defined(PAR)
2683 /*
2684   NB: only the type of the blocking queue is different in GranSim and GUM
2685       the operations on the queue-elements are the same
2686       long live polymorphism!
2687
2688   Locks: sched_mutex is held upon entry and exit.
2689
2690 */
2691 static void
2692 unblockThread(StgTSO *tso)
2693 {
2694   StgBlockingQueueElement *t, **last;
2695
2696   switch (tso->why_blocked) {
2697
2698   case NotBlocked:
2699     return;  /* not blocked */
2700
2701   case BlockedOnMVar:
2702     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2703     {
2704       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2705       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2706
2707       last = (StgBlockingQueueElement **)&mvar->head;
2708       for (t = (StgBlockingQueueElement *)mvar->head; 
2709            t != END_BQ_QUEUE; 
2710            last = &t->link, last_tso = t, t = t->link) {
2711         if (t == (StgBlockingQueueElement *)tso) {
2712           *last = (StgBlockingQueueElement *)tso->link;
2713           if (mvar->tail == tso) {
2714             mvar->tail = (StgTSO *)last_tso;
2715           }
2716           goto done;
2717         }
2718       }
2719       barf("unblockThread (MVAR): TSO not found");
2720     }
2721
2722   case BlockedOnBlackHole:
2723     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2724     {
2725       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2726
2727       last = &bq->blocking_queue;
2728       for (t = bq->blocking_queue; 
2729            t != END_BQ_QUEUE; 
2730            last = &t->link, t = t->link) {
2731         if (t == (StgBlockingQueueElement *)tso) {
2732           *last = (StgBlockingQueueElement *)tso->link;
2733           goto done;
2734         }
2735       }
2736       barf("unblockThread (BLACKHOLE): TSO not found");
2737     }
2738
2739   case BlockedOnException:
2740     {
2741       StgTSO *target  = tso->block_info.tso;
2742
2743       ASSERT(get_itbl(target)->type == TSO);
2744
2745       if (target->what_next == ThreadRelocated) {
2746           target = target->link;
2747           ASSERT(get_itbl(target)->type == TSO);
2748       }
2749
2750       ASSERT(target->blocked_exceptions != NULL);
2751
2752       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2753       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2754            t != END_BQ_QUEUE; 
2755            last = &t->link, t = t->link) {
2756         ASSERT(get_itbl(t)->type == TSO);
2757         if (t == (StgBlockingQueueElement *)tso) {
2758           *last = (StgBlockingQueueElement *)tso->link;
2759           goto done;
2760         }
2761       }
2762       barf("unblockThread (Exception): TSO not found");
2763     }
2764
2765   case BlockedOnRead:
2766   case BlockedOnWrite:
2767 #if defined(mingw32_TARGET_OS)
2768   case BlockedOnDoProc:
2769 #endif
2770     {
2771       /* take TSO off blocked_queue */
2772       StgBlockingQueueElement *prev = NULL;
2773       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2774            prev = t, t = t->link) {
2775         if (t == (StgBlockingQueueElement *)tso) {
2776           if (prev == NULL) {
2777             blocked_queue_hd = (StgTSO *)t->link;
2778             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2779               blocked_queue_tl = END_TSO_QUEUE;
2780             }
2781           } else {
2782             prev->link = t->link;
2783             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2784               blocked_queue_tl = (StgTSO *)prev;
2785             }
2786           }
2787           goto done;
2788         }
2789       }
2790       barf("unblockThread (I/O): TSO not found");
2791     }
2792
2793   case BlockedOnDelay:
2794     {
2795       /* take TSO off sleeping_queue */
2796       StgBlockingQueueElement *prev = NULL;
2797       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2798            prev = t, t = t->link) {
2799         if (t == (StgBlockingQueueElement *)tso) {
2800           if (prev == NULL) {
2801             sleeping_queue = (StgTSO *)t->link;
2802           } else {
2803             prev->link = t->link;
2804           }
2805           goto done;
2806         }
2807       }
2808       barf("unblockThread (delay): TSO not found");
2809     }
2810
2811   default:
2812     barf("unblockThread");
2813   }
2814
2815  done:
2816   tso->link = END_TSO_QUEUE;
2817   tso->why_blocked = NotBlocked;
2818   tso->block_info.closure = NULL;
2819   PUSH_ON_RUN_QUEUE(tso);
2820 }
2821 #else
2822 static void
2823 unblockThread(StgTSO *tso)
2824 {
2825   StgTSO *t, **last;
2826   
2827   /* To avoid locking unnecessarily. */
2828   if (tso->why_blocked == NotBlocked) {
2829     return;
2830   }
2831
2832   switch (tso->why_blocked) {
2833
2834   case BlockedOnMVar:
2835     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2836     {
2837       StgTSO *last_tso = END_TSO_QUEUE;
2838       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2839
2840       last = &mvar->head;
2841       for (t = mvar->head; t != END_TSO_QUEUE; 
2842            last = &t->link, last_tso = t, t = t->link) {
2843         if (t == tso) {
2844           *last = tso->link;
2845           if (mvar->tail == tso) {
2846             mvar->tail = last_tso;
2847           }
2848           goto done;
2849         }
2850       }
2851       barf("unblockThread (MVAR): TSO not found");
2852     }
2853
2854   case BlockedOnBlackHole:
2855     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2856     {
2857       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2858
2859       last = &bq->blocking_queue;
2860       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2861            last = &t->link, t = t->link) {
2862         if (t == tso) {
2863           *last = tso->link;
2864           goto done;
2865         }
2866       }
2867       barf("unblockThread (BLACKHOLE): TSO not found");
2868     }
2869
2870   case BlockedOnException:
2871     {
2872       StgTSO *target  = tso->block_info.tso;
2873
2874       ASSERT(get_itbl(target)->type == TSO);
2875
2876       while (target->what_next == ThreadRelocated) {
2877           target = target->link;
2878           ASSERT(get_itbl(target)->type == TSO);
2879       }
2880       
2881       ASSERT(target->blocked_exceptions != NULL);
2882
2883       last = &target->blocked_exceptions;
2884       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2885            last = &t->link, t = t->link) {
2886         ASSERT(get_itbl(t)->type == TSO);
2887         if (t == tso) {
2888           *last = tso->link;
2889           goto done;
2890         }
2891       }
2892       barf("unblockThread (Exception): TSO not found");
2893     }
2894
2895   case BlockedOnRead:
2896   case BlockedOnWrite:
2897 #if defined(mingw32_TARGET_OS)
2898   case BlockedOnDoProc:
2899 #endif
2900     {
2901       StgTSO *prev = NULL;
2902       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2903            prev = t, t = t->link) {
2904         if (t == tso) {
2905           if (prev == NULL) {
2906             blocked_queue_hd = t->link;
2907             if (blocked_queue_tl == t) {
2908               blocked_queue_tl = END_TSO_QUEUE;
2909             }
2910           } else {
2911             prev->link = t->link;
2912             if (blocked_queue_tl == t) {
2913               blocked_queue_tl = prev;
2914             }
2915           }
2916           goto done;
2917         }
2918       }
2919       barf("unblockThread (I/O): TSO not found");
2920     }
2921
2922   case BlockedOnDelay:
2923     {
2924       StgTSO *prev = NULL;
2925       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2926            prev = t, t = t->link) {
2927         if (t == tso) {
2928           if (prev == NULL) {
2929             sleeping_queue = t->link;
2930           } else {
2931             prev->link = t->link;
2932           }
2933           goto done;
2934         }
2935       }
2936       barf("unblockThread (delay): TSO not found");
2937     }
2938
2939   default:
2940     barf("unblockThread");
2941   }
2942
2943  done:
2944   tso->link = END_TSO_QUEUE;
2945   tso->why_blocked = NotBlocked;
2946   tso->block_info.closure = NULL;
2947   PUSH_ON_RUN_QUEUE(tso);
2948 }
2949 #endif
2950
2951 /* -----------------------------------------------------------------------------
2952  * raiseAsync()
2953  *
2954  * The following function implements the magic for raising an
2955  * asynchronous exception in an existing thread.
2956  *
2957  * We first remove the thread from any queue on which it might be
2958  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2959  *
2960  * We strip the stack down to the innermost CATCH_FRAME, building
2961  * thunks in the heap for all the active computations, so they can 
2962  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2963  * an application of the handler to the exception, and push it on
2964  * the top of the stack.
2965  * 
2966  * How exactly do we save all the active computations?  We create an
2967  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2968  * AP_STACKs pushes everything from the corresponding update frame
2969  * upwards onto the stack.  (Actually, it pushes everything up to the
2970  * next update frame plus a pointer to the next AP_STACK object.
2971  * Entering the next AP_STACK object pushes more onto the stack until we
2972  * reach the last AP_STACK object - at which point the stack should look
2973  * exactly as it did when we killed the TSO and we can continue
2974  * execution by entering the closure on top of the stack.
2975  *
2976  * We can also kill a thread entirely - this happens if either (a) the 
2977  * exception passed to raiseAsync is NULL, or (b) there's no
2978  * CATCH_FRAME on the stack.  In either case, we strip the entire
2979  * stack and replace the thread with a zombie.
2980  *
2981  * Locks: sched_mutex held upon entry nor exit.
2982  *
2983  * -------------------------------------------------------------------------- */
2984  
2985 void 
2986 deleteThread(StgTSO *tso)
2987 {
2988   raiseAsync(tso,NULL);
2989 }
2990
2991 static void 
2992 deleteThreadImmediately(StgTSO *tso)
2993 { // for forkProcess only:
2994   // delete thread without giving it a chance to catch the KillThread exception
2995
2996   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2997       return;
2998   }
2999 #if defined(RTS_SUPPORTS_THREADS)
3000   if (tso->why_blocked != BlockedOnCCall
3001       && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
3002 #endif
3003     unblockThread(tso);
3004   tso->what_next = ThreadKilled;
3005 }
3006
3007 void
3008 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3009 {
3010   /* When raising async exs from contexts where sched_mutex isn't held;
3011      use raiseAsyncWithLock(). */
3012   ACQUIRE_LOCK(&sched_mutex);
3013   raiseAsync(tso,exception);
3014   RELEASE_LOCK(&sched_mutex);
3015 }
3016
3017 void
3018 raiseAsync(StgTSO *tso, StgClosure *exception)
3019 {
3020     StgRetInfoTable *info;
3021     StgPtr sp;
3022   
3023     // Thread already dead?
3024     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3025         return;
3026     }
3027
3028     IF_DEBUG(scheduler, 
3029              sched_belch("raising exception in thread %ld.", tso->id));
3030     
3031     // Remove it from any blocking queues
3032     unblockThread(tso);
3033
3034     sp = tso->sp;
3035     
3036     // The stack freezing code assumes there's a closure pointer on
3037     // the top of the stack, so we have to arrange that this is the case...
3038     //
3039     if (sp[0] == (W_)&stg_enter_info) {
3040         sp++;
3041     } else {
3042         sp--;
3043         sp[0] = (W_)&stg_dummy_ret_closure;
3044     }
3045
3046     while (1) {
3047         nat i;
3048
3049         // 1. Let the top of the stack be the "current closure"
3050         //
3051         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3052         // CATCH_FRAME.
3053         //
3054         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3055         // current closure applied to the chunk of stack up to (but not
3056         // including) the update frame.  This closure becomes the "current
3057         // closure".  Go back to step 2.
3058         //
3059         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3060         // top of the stack applied to the exception.
3061         // 
3062         // 5. If it's a STOP_FRAME, then kill the thread.
3063         
3064         StgPtr frame;
3065         
3066         frame = sp + 1;
3067         info = get_ret_itbl((StgClosure *)frame);
3068         
3069         while (info->i.type != UPDATE_FRAME
3070                && (info->i.type != CATCH_FRAME || exception == NULL)
3071                && info->i.type != STOP_FRAME) {
3072             frame += stack_frame_sizeW((StgClosure *)frame);
3073             info = get_ret_itbl((StgClosure *)frame);
3074         }
3075         
3076         switch (info->i.type) {
3077             
3078         case CATCH_FRAME:
3079             // If we find a CATCH_FRAME, and we've got an exception to raise,
3080             // then build the THUNK raise(exception), and leave it on
3081             // top of the CATCH_FRAME ready to enter.
3082             //
3083         {
3084 #ifdef PROFILING
3085             StgCatchFrame *cf = (StgCatchFrame *)frame;
3086 #endif
3087             StgClosure *raise;
3088             
3089             // we've got an exception to raise, so let's pass it to the
3090             // handler in this frame.
3091             //
3092             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3093             TICK_ALLOC_SE_THK(1,0);
3094             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3095             raise->payload[0] = exception;
3096             
3097             // throw away the stack from Sp up to the CATCH_FRAME.
3098             //
3099             sp = frame - 1;
3100             
3101             /* Ensure that async excpetions are blocked now, so we don't get
3102              * a surprise exception before we get around to executing the
3103              * handler.
3104              */
3105             if (tso->blocked_exceptions == NULL) {
3106                 tso->blocked_exceptions = END_TSO_QUEUE;
3107             }
3108             
3109             /* Put the newly-built THUNK on top of the stack, ready to execute
3110              * when the thread restarts.
3111              */
3112             sp[0] = (W_)raise;
3113             sp[-1] = (W_)&stg_enter_info;
3114             tso->sp = sp-1;
3115             tso->what_next = ThreadRunGHC;
3116             IF_DEBUG(sanity, checkTSO(tso));
3117             return;
3118         }
3119         
3120         case UPDATE_FRAME:
3121         {
3122             StgAP_STACK * ap;
3123             nat words;
3124             
3125             // First build an AP_STACK consisting of the stack chunk above the
3126             // current update frame, with the top word on the stack as the
3127             // fun field.
3128             //
3129             words = frame - sp - 1;
3130             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3131             
3132             ap->size = words;
3133             ap->fun  = (StgClosure *)sp[0];
3134             sp++;
3135             for(i=0; i < (nat)words; ++i) {
3136                 ap->payload[i] = (StgClosure *)*sp++;
3137             }
3138             
3139             SET_HDR(ap,&stg_AP_STACK_info,
3140                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3141             TICK_ALLOC_UP_THK(words+1,0);
3142             
3143             IF_DEBUG(scheduler,
3144                      fprintf(stderr,  "sched: Updating ");
3145                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3146                      fprintf(stderr,  " with ");
3147                      printObj((StgClosure *)ap);
3148                 );
3149
3150             // Replace the updatee with an indirection - happily
3151             // this will also wake up any threads currently
3152             // waiting on the result.
3153             //
3154             // Warning: if we're in a loop, more than one update frame on
3155             // the stack may point to the same object.  Be careful not to
3156             // overwrite an IND_OLDGEN in this case, because we'll screw
3157             // up the mutable lists.  To be on the safe side, don't
3158             // overwrite any kind of indirection at all.  See also
3159             // threadSqueezeStack in GC.c, where we have to make a similar
3160             // check.
3161             //
3162             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3163                 // revert the black hole
3164                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3165             }
3166             sp += sizeofW(StgUpdateFrame) - 1;
3167             sp[0] = (W_)ap; // push onto stack
3168             break;
3169         }
3170         
3171         case STOP_FRAME:
3172             // We've stripped the entire stack, the thread is now dead.
3173             sp += sizeofW(StgStopFrame);
3174             tso->what_next = ThreadKilled;
3175             tso->sp = sp;
3176             return;
3177             
3178         default:
3179             barf("raiseAsync");
3180         }
3181     }
3182     barf("raiseAsync");
3183 }
3184
3185 /* -----------------------------------------------------------------------------
3186    resurrectThreads is called after garbage collection on the list of
3187    threads found to be garbage.  Each of these threads will be woken
3188    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3189    on an MVar, or NonTermination if the thread was blocked on a Black
3190    Hole.
3191
3192    Locks: sched_mutex isn't held upon entry nor exit.
3193    -------------------------------------------------------------------------- */
3194
3195 void
3196 resurrectThreads( StgTSO *threads )
3197 {
3198   StgTSO *tso, *next;
3199
3200   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3201     next = tso->global_link;
3202     tso->global_link = all_threads;
3203     all_threads = tso;
3204     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3205
3206     switch (tso->why_blocked) {
3207     case BlockedOnMVar:
3208     case BlockedOnException:
3209       /* Called by GC - sched_mutex lock is currently held. */
3210       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3211       break;
3212     case BlockedOnBlackHole:
3213       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3214       break;
3215     case NotBlocked:
3216       /* This might happen if the thread was blocked on a black hole
3217        * belonging to a thread that we've just woken up (raiseAsync
3218        * can wake up threads, remember...).
3219        */
3220       continue;
3221     default:
3222       barf("resurrectThreads: thread blocked in a strange way");
3223     }
3224   }
3225 }
3226
3227 /* -----------------------------------------------------------------------------
3228  * Blackhole detection: if we reach a deadlock, test whether any
3229  * threads are blocked on themselves.  Any threads which are found to
3230  * be self-blocked get sent a NonTermination exception.
3231  *
3232  * This is only done in a deadlock situation in order to avoid
3233  * performance overhead in the normal case.
3234  *
3235  * Locks: sched_mutex is held upon entry and exit.
3236  * -------------------------------------------------------------------------- */
3237
3238 static void
3239 detectBlackHoles( void )
3240 {
3241     StgTSO *tso = all_threads;
3242     StgClosure *frame;
3243     StgClosure *blocked_on;
3244     StgRetInfoTable *info;
3245
3246     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3247
3248         while (tso->what_next == ThreadRelocated) {
3249             tso = tso->link;
3250             ASSERT(get_itbl(tso)->type == TSO);
3251         }
3252       
3253         if (tso->why_blocked != BlockedOnBlackHole) {
3254             continue;
3255         }
3256         blocked_on = tso->block_info.closure;
3257
3258         frame = (StgClosure *)tso->sp;
3259
3260         while(1) {
3261             info = get_ret_itbl(frame);
3262             switch (info->i.type) {
3263             case UPDATE_FRAME:
3264                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3265                     /* We are blocking on one of our own computations, so
3266                      * send this thread the NonTermination exception.  
3267                      */
3268                     IF_DEBUG(scheduler, 
3269                              sched_belch("thread %d is blocked on itself", tso->id));
3270                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3271                     goto done;
3272                 }
3273                 
3274                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3275                 continue;
3276
3277             case STOP_FRAME:
3278                 goto done;
3279
3280                 // normal stack frames; do nothing except advance the pointer
3281             default:
3282                 (StgPtr)frame += stack_frame_sizeW(frame);
3283             }
3284         }   
3285         done: ;
3286     }
3287 }
3288
3289 /* ----------------------------------------------------------------------------
3290  * Debugging: why is a thread blocked
3291  * [Also provides useful information when debugging threaded programs
3292  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3293    ------------------------------------------------------------------------- */
3294
3295 static
3296 void
3297 printThreadBlockage(StgTSO *tso)
3298 {
3299   switch (tso->why_blocked) {
3300   case BlockedOnRead:
3301     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3302     break;
3303   case BlockedOnWrite:
3304     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3305     break;
3306 #if defined(mingw32_TARGET_OS)
3307     case BlockedOnDoProc:
3308     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3309     break;
3310 #endif
3311   case BlockedOnDelay:
3312     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3313     break;
3314   case BlockedOnMVar:
3315     fprintf(stderr,"is blocked on an MVar");
3316     break;
3317   case BlockedOnException:
3318     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3319             tso->block_info.tso->id);
3320     break;
3321   case BlockedOnBlackHole:
3322     fprintf(stderr,"is blocked on a black hole");
3323     break;
3324   case NotBlocked:
3325     fprintf(stderr,"is not blocked");
3326     break;
3327 #if defined(PAR)
3328   case BlockedOnGA:
3329     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3330             tso->block_info.closure, info_type(tso->block_info.closure));
3331     break;
3332   case BlockedOnGA_NoSend:
3333     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3334             tso->block_info.closure, info_type(tso->block_info.closure));
3335     break;
3336 #endif
3337 #if defined(RTS_SUPPORTS_THREADS)
3338   case BlockedOnCCall:
3339     fprintf(stderr,"is blocked on an external call");
3340     break;
3341   case BlockedOnCCall_NoUnblockExc:
3342     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3343     break;
3344 #endif
3345   default:
3346     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3347          tso->why_blocked, tso->id, tso);
3348   }
3349 }
3350
3351 static
3352 void
3353 printThreadStatus(StgTSO *tso)
3354 {
3355   switch (tso->what_next) {
3356   case ThreadKilled:
3357     fprintf(stderr,"has been killed");
3358     break;
3359   case ThreadComplete:
3360     fprintf(stderr,"has completed");
3361     break;
3362   default:
3363     printThreadBlockage(tso);
3364   }
3365 }
3366
3367 void
3368 printAllThreads(void)
3369 {
3370   StgTSO *t;
3371   void *label;
3372
3373 # if defined(GRAN)
3374   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3375   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3376                        time_string, rtsFalse/*no commas!*/);
3377
3378   fprintf(stderr, "all threads at [%s]:\n", time_string);
3379 # elif defined(PAR)
3380   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3381   ullong_format_string(CURRENT_TIME,
3382                        time_string, rtsFalse/*no commas!*/);
3383
3384   fprintf(stderr,"all threads at [%s]:\n", time_string);
3385 # else
3386   fprintf(stderr,"all threads:\n");
3387 # endif
3388
3389   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3390     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3391     label = lookupThreadLabel(t->id);
3392     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3393     printThreadStatus(t);
3394     fprintf(stderr,"\n");
3395   }
3396 }
3397     
3398 #ifdef DEBUG
3399
3400 /* 
3401    Print a whole blocking queue attached to node (debugging only).
3402 */
3403 # if defined(PAR)
3404 void 
3405 print_bq (StgClosure *node)
3406 {
3407   StgBlockingQueueElement *bqe;
3408   StgTSO *tso;
3409   rtsBool end;
3410
3411   fprintf(stderr,"## BQ of closure %p (%s): ",
3412           node, info_type(node));
3413
3414   /* should cover all closures that may have a blocking queue */
3415   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3416          get_itbl(node)->type == FETCH_ME_BQ ||
3417          get_itbl(node)->type == RBH ||
3418          get_itbl(node)->type == MVAR);
3419     
3420   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3421
3422   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3423 }
3424
3425 /* 
3426    Print a whole blocking queue starting with the element bqe.
3427 */
3428 void 
3429 print_bqe (StgBlockingQueueElement *bqe)
3430 {
3431   rtsBool end;
3432
3433   /* 
3434      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3435   */
3436   for (end = (bqe==END_BQ_QUEUE);
3437        !end; // iterate until bqe points to a CONSTR
3438        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3439        bqe = end ? END_BQ_QUEUE : bqe->link) {
3440     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3441     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3442     /* types of closures that may appear in a blocking queue */
3443     ASSERT(get_itbl(bqe)->type == TSO ||           
3444            get_itbl(bqe)->type == BLOCKED_FETCH || 
3445            get_itbl(bqe)->type == CONSTR); 
3446     /* only BQs of an RBH end with an RBH_Save closure */
3447     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3448
3449     switch (get_itbl(bqe)->type) {
3450     case TSO:
3451       fprintf(stderr," TSO %u (%x),",
3452               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3453       break;
3454     case BLOCKED_FETCH:
3455       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3456               ((StgBlockedFetch *)bqe)->node, 
3457               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3458               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3459               ((StgBlockedFetch *)bqe)->ga.weight);
3460       break;
3461     case CONSTR:
3462       fprintf(stderr," %s (IP %p),",
3463               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3464                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3465                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3466                "RBH_Save_?"), get_itbl(bqe));
3467       break;
3468     default:
3469       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3470            info_type((StgClosure *)bqe)); // , node, info_type(node));
3471       break;
3472     }
3473   } /* for */
3474   fputc('\n', stderr);
3475 }
3476 # elif defined(GRAN)
3477 void 
3478 print_bq (StgClosure *node)
3479 {
3480   StgBlockingQueueElement *bqe;
3481   PEs node_loc, tso_loc;
3482   rtsBool end;
3483
3484   /* should cover all closures that may have a blocking queue */
3485   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3486          get_itbl(node)->type == FETCH_ME_BQ ||
3487          get_itbl(node)->type == RBH);
3488     
3489   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3490   node_loc = where_is(node);
3491
3492   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3493           node, info_type(node), node_loc);
3494
3495   /* 
3496      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3497   */
3498   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3499        !end; // iterate until bqe points to a CONSTR
3500        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3501     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3502     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3503     /* types of closures that may appear in a blocking queue */
3504     ASSERT(get_itbl(bqe)->type == TSO ||           
3505            get_itbl(bqe)->type == CONSTR); 
3506     /* only BQs of an RBH end with an RBH_Save closure */
3507     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3508
3509     tso_loc = where_is((StgClosure *)bqe);
3510     switch (get_itbl(bqe)->type) {
3511     case TSO:
3512       fprintf(stderr," TSO %d (%p) on [PE %d],",
3513               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3514       break;
3515     case CONSTR:
3516       fprintf(stderr," %s (IP %p),",
3517               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3518                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3519                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3520                "RBH_Save_?"), get_itbl(bqe));
3521       break;
3522     default:
3523       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3524            info_type((StgClosure *)bqe), node, info_type(node));
3525       break;
3526     }
3527   } /* for */
3528   fputc('\n', stderr);
3529 }
3530 #else
3531 /* 
3532    Nice and easy: only TSOs on the blocking queue
3533 */
3534 void 
3535 print_bq (StgClosure *node)
3536 {
3537   StgTSO *tso;
3538
3539   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3540   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3541        tso != END_TSO_QUEUE; 
3542        tso=tso->link) {
3543     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3544     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3545     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3546   }
3547   fputc('\n', stderr);
3548 }
3549 # endif
3550
3551 #if defined(PAR)
3552 static nat
3553 run_queue_len(void)
3554 {
3555   nat i;
3556   StgTSO *tso;
3557
3558   for (i=0, tso=run_queue_hd; 
3559        tso != END_TSO_QUEUE;
3560        i++, tso=tso->link)
3561     /* nothing */
3562
3563   return i;
3564 }
3565 #endif
3566
3567 void
3568 sched_belch(char *s, ...)
3569 {
3570   va_list ap;
3571   va_start(ap,s);
3572 #ifdef RTS_SUPPORTS_THREADS
3573   fprintf(stderr, "sched (task %p): ", osThreadId());
3574 #elif defined(PAR)
3575   fprintf(stderr, "== ");
3576 #else
3577   fprintf(stderr, "sched: ");
3578 #endif
3579   vfprintf(stderr, s, ap);
3580   fprintf(stderr, "\n");
3581   fflush(stderr);
3582   va_end(ap);
3583 }
3584
3585 #endif /* DEBUG */