[project @ 2004-05-06 12:20:04 by wolfgang]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.196 2004/05/06 12:20:04 wolfgang Exp $
3  *
4  * (c) The GHC Team, 1998-2003
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 /* 
21  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22
23    The main scheduling loop in GUM iterates until a finish message is received.
24    In that case a global flag @receivedFinish@ is set and this instance of
25    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
26    for the handling of incoming messages, such as PP_FINISH.
27    Note that in the parallel case we have a system manager that coordinates
28    different PEs, each of which are running one instance of the RTS.
29    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
30    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31
32  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33
34    The main scheduling code in GranSim is quite different from that in std
35    (concurrent) Haskell: while concurrent Haskell just iterates over the
36    threads in the runnable queue, GranSim is event driven, i.e. it iterates
37    over the events in the global event queue.  -- HWL
38 */
39
40 #include "PosixSource.h"
41 #include "Rts.h"
42 #include "SchedAPI.h"
43 #include "RtsUtils.h"
44 #include "RtsFlags.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "StgStartup.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Storage.h"
53 #include "Interpreter.h"
54 #include "Exception.h"
55 #include "Printer.h"
56 #include "Signals.h"
57 #include "Sanity.h"
58 #include "Stats.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #ifdef PROFILING
63 #include "Proftimer.h"
64 #include "ProfHeap.h"
65 #endif
66 #if defined(GRAN) || defined(PAR)
67 # include "GranSimRts.h"
68 # include "GranSim.h"
69 # include "ParallelRts.h"
70 # include "Parallel.h"
71 # include "ParallelDebug.h"
72 # include "FetchMe.h"
73 # include "HLC.h"
74 #endif
75 #include "Sparks.h"
76 #include "Capability.h"
77 #include "OSThreads.h"
78 #include  "Task.h"
79
80 #ifdef HAVE_SYS_TYPES_H
81 #include <sys/types.h>
82 #endif
83 #ifdef HAVE_UNISTD_H
84 #include <unistd.h>
85 #endif
86
87 #include <string.h>
88 #include <stdlib.h>
89 #include <stdarg.h>
90
91 #ifdef HAVE_ERRNO_H
92 #include <errno.h>
93 #endif
94
95 #ifdef THREADED_RTS
96 #define USED_IN_THREADED_RTS
97 #else
98 #define USED_IN_THREADED_RTS STG_UNUSED
99 #endif
100
101 #ifdef RTS_SUPPORTS_THREADS
102 #define USED_WHEN_RTS_SUPPORTS_THREADS
103 #else
104 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
105 #endif
106
107 /* Main thread queue.
108  * Locks required: sched_mutex.
109  */
110 StgMainThread *main_threads = NULL;
111
112 /* Thread queues.
113  * Locks required: sched_mutex.
114  */
115 #if defined(GRAN)
116
117 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
118 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
119
120 /* 
121    In GranSim we have a runnable and a blocked queue for each processor.
122    In order to minimise code changes new arrays run_queue_hds/tls
123    are created. run_queue_hd is then a short cut (macro) for
124    run_queue_hds[CurrentProc] (see GranSim.h).
125    -- HWL
126 */
127 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
128 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
129 StgTSO *ccalling_threadss[MAX_PROC];
130 /* We use the same global list of threads (all_threads) in GranSim as in
131    the std RTS (i.e. we are cheating). However, we don't use this list in
132    the GranSim specific code at the moment (so we are only potentially
133    cheating).  */
134
135 #else /* !GRAN */
136
137 StgTSO *run_queue_hd = NULL;
138 StgTSO *run_queue_tl = NULL;
139 StgTSO *blocked_queue_hd = NULL;
140 StgTSO *blocked_queue_tl = NULL;
141 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
142
143 #endif
144
145 /* Linked list of all threads.
146  * Used for detecting garbage collected threads.
147  */
148 StgTSO *all_threads = NULL;
149
150 /* When a thread performs a safe C call (_ccall_GC, using old
151  * terminology), it gets put on the suspended_ccalling_threads
152  * list. Used by the garbage collector.
153  */
154 static StgTSO *suspended_ccalling_threads;
155
156 static StgTSO *threadStackOverflow(StgTSO *tso);
157
158 /* KH: The following two flags are shared memory locations.  There is no need
159        to lock them, since they are only unset at the end of a scheduler
160        operation.
161 */
162
163 /* flag set by signal handler to precipitate a context switch */
164 nat context_switch = 0;
165
166 /* if this flag is set as well, give up execution */
167 rtsBool interrupted = rtsFalse;
168
169 /* Next thread ID to allocate.
170  * Locks required: thread_id_mutex
171  */
172 static StgThreadID next_thread_id = 1;
173
174 /*
175  * Pointers to the state of the current thread.
176  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
177  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
178  */
179  
180 /* The smallest stack size that makes any sense is:
181  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
182  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
183  *  + 1                       (the closure to enter)
184  *  + 1                       (stg_ap_v_ret)
185  *  + 1                       (spare slot req'd by stg_ap_v_ret)
186  *
187  * A thread with this stack will bomb immediately with a stack
188  * overflow, which will increase its stack size.  
189  */
190
191 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
192
193
194 #if defined(GRAN)
195 StgTSO *CurrentTSO;
196 #endif
197
198 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
199  *  exists - earlier gccs apparently didn't.
200  *  -= chak
201  */
202 StgTSO dummy_tso;
203
204 static rtsBool ready_to_gc;
205
206 /*
207  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
208  * in an MT setting, needed to signal that a worker thread shouldn't hang around
209  * in the scheduler when it is out of work.
210  */
211 static rtsBool shutting_down_scheduler = rtsFalse;
212
213 void            addToBlockedQueue ( StgTSO *tso );
214
215 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
216        void     interruptStgRts   ( void );
217
218 static void     detectBlackHoles  ( void );
219
220 #if defined(RTS_SUPPORTS_THREADS)
221 /* ToDo: carefully document the invariants that go together
222  *       with these synchronisation objects.
223  */
224 Mutex     sched_mutex       = INIT_MUTEX_VAR;
225 Mutex     term_mutex        = INIT_MUTEX_VAR;
226
227 #endif /* RTS_SUPPORTS_THREADS */
228
229 #if defined(PAR)
230 StgTSO *LastTSO;
231 rtsTime TimeOfLastYield;
232 rtsBool emitSchedule = rtsTrue;
233 #endif
234
235 #if DEBUG
236 static char *whatNext_strs[] = {
237   "ThreadRunGHC",
238   "ThreadInterpret",
239   "ThreadKilled",
240   "ThreadRelocated",
241   "ThreadComplete"
242 };
243 #endif
244
245 #if defined(PAR)
246 StgTSO * createSparkThread(rtsSpark spark);
247 StgTSO * activateSpark (rtsSpark spark);  
248 #endif
249
250 /* ----------------------------------------------------------------------------
251  * Starting Tasks
252  * ------------------------------------------------------------------------- */
253
254 #if defined(RTS_SUPPORTS_THREADS)
255 static rtsBool startingWorkerThread = rtsFalse;
256
257 static void taskStart(void);
258 static void
259 taskStart(void)
260 {
261   ACQUIRE_LOCK(&sched_mutex);
262   startingWorkerThread = rtsFalse;
263   schedule(NULL,NULL);
264   RELEASE_LOCK(&sched_mutex);
265 }
266
267 void
268 startSchedulerTaskIfNecessary(void)
269 {
270   if(run_queue_hd != END_TSO_QUEUE
271     || blocked_queue_hd != END_TSO_QUEUE
272     || sleeping_queue != END_TSO_QUEUE)
273   {
274     if(!startingWorkerThread)
275     { // we don't want to start another worker thread
276       // just because the last one hasn't yet reached the
277       // "waiting for capability" state
278       startingWorkerThread = rtsTrue;
279       if(!startTask(taskStart))
280       {
281         startingWorkerThread = rtsFalse;
282       }
283     }
284   }
285 }
286 #endif
287
288 /* ---------------------------------------------------------------------------
289    Main scheduling loop.
290
291    We use round-robin scheduling, each thread returning to the
292    scheduler loop when one of these conditions is detected:
293
294       * out of heap space
295       * timer expires (thread yields)
296       * thread blocks
297       * thread ends
298       * stack overflow
299
300    Locking notes:  we acquire the scheduler lock once at the beginning
301    of the scheduler loop, and release it when
302     
303       * running a thread, or
304       * waiting for work, or
305       * waiting for a GC to complete.
306
307    GRAN version:
308      In a GranSim setup this loop iterates over the global event queue.
309      This revolves around the global event queue, which determines what 
310      to do next. Therefore, it's more complicated than either the 
311      concurrent or the parallel (GUM) setup.
312
313    GUM version:
314      GUM iterates over incoming messages.
315      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
316      and sends out a fish whenever it has nothing to do; in-between
317      doing the actual reductions (shared code below) it processes the
318      incoming messages and deals with delayed operations 
319      (see PendingFetches).
320      This is not the ugliest code you could imagine, but it's bloody close.
321
322    ------------------------------------------------------------------------ */
323 static void
324 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
325           Capability *initialCapability )
326 {
327   StgTSO *t;
328   Capability *cap;
329   StgThreadReturnCode ret;
330 #if defined(GRAN)
331   rtsEvent *event;
332 #elif defined(PAR)
333   StgSparkPool *pool;
334   rtsSpark spark;
335   StgTSO *tso;
336   GlobalTaskId pe;
337   rtsBool receivedFinish = rtsFalse;
338 # if defined(DEBUG)
339   nat tp_size, sp_size; // stats only
340 # endif
341 #endif
342   rtsBool was_interrupted = rtsFalse;
343   StgTSOWhatNext prev_what_next;
344   
345   // Pre-condition: sched_mutex is held.
346   // We might have a capability, passed in as initialCapability.
347   cap = initialCapability;
348
349 #if defined(RTS_SUPPORTS_THREADS)
350   //
351   // in the threaded case, the capability is either passed in via the
352   // initialCapability parameter, or initialized inside the scheduler
353   // loop 
354   //
355   IF_DEBUG(scheduler,
356            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
357                        mainThread, initialCapability);
358       );
359 #else
360   // simply initialise it in the non-threaded case
361   grabCapability(&cap);
362 #endif
363
364 #if defined(GRAN)
365   /* set up first event to get things going */
366   /* ToDo: assign costs for system setup and init MainTSO ! */
367   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
368             ContinueThread, 
369             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
370
371   IF_DEBUG(gran,
372            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
373            G_TSO(CurrentTSO, 5));
374
375   if (RtsFlags.GranFlags.Light) {
376     /* Save current time; GranSim Light only */
377     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
378   }      
379
380   event = get_next_event();
381
382   while (event!=(rtsEvent*)NULL) {
383     /* Choose the processor with the next event */
384     CurrentProc = event->proc;
385     CurrentTSO = event->tso;
386
387 #elif defined(PAR)
388
389   while (!receivedFinish) {    /* set by processMessages */
390                                /* when receiving PP_FINISH message         */ 
391
392 #else // everything except GRAN and PAR
393
394   while (1) {
395
396 #endif
397
398      IF_DEBUG(scheduler, printAllThreads());
399
400 #if defined(RTS_SUPPORTS_THREADS)
401       // Yield the capability to higher-priority tasks if necessary.
402       //
403       if (cap != NULL) {
404           yieldCapability(&cap);
405       }
406
407       // If we do not currently hold a capability, we wait for one
408       //
409       if (cap == NULL) {
410           waitForCapability(&sched_mutex, &cap,
411                             mainThread ? &mainThread->bound_thread_cond : NULL);
412       }
413
414       // We now have a capability...
415 #endif
416
417     //
418     // If we're interrupted (the user pressed ^C, or some other
419     // termination condition occurred), kill all the currently running
420     // threads.
421     //
422     if (interrupted) {
423         IF_DEBUG(scheduler, sched_belch("interrupted"));
424         interrupted = rtsFalse;
425         was_interrupted = rtsTrue;
426 #if defined(RTS_SUPPORTS_THREADS)
427         // In the threaded RTS, deadlock detection doesn't work,
428         // so just exit right away.
429         prog_belch("interrupted");
430         releaseCapability(cap);
431         RELEASE_LOCK(&sched_mutex);
432         shutdownHaskellAndExit(EXIT_SUCCESS);
433 #else
434         deleteAllThreads();
435 #endif
436     }
437
438 #if defined(RTS_USER_SIGNALS)
439     // check for signals each time around the scheduler
440     if (signals_pending()) {
441       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
442       startSignalHandlers();
443       ACQUIRE_LOCK(&sched_mutex);
444     }
445 #endif
446
447     //
448     // Check whether any waiting threads need to be woken up.  If the
449     // run queue is empty, and there are no other tasks running, we
450     // can wait indefinitely for something to happen.
451     //
452     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
453 #if defined(RTS_SUPPORTS_THREADS)
454                 || EMPTY_RUN_QUEUE()
455 #endif
456         )
457     {
458       awaitEvent( EMPTY_RUN_QUEUE() );
459     }
460     // we can be interrupted while waiting for I/O...
461     if (interrupted) continue;
462
463     /* 
464      * Detect deadlock: when we have no threads to run, there are no
465      * threads waiting on I/O or sleeping, and all the other tasks are
466      * waiting for work, we must have a deadlock of some description.
467      *
468      * We first try to find threads blocked on themselves (ie. black
469      * holes), and generate NonTermination exceptions where necessary.
470      *
471      * If no threads are black holed, we have a deadlock situation, so
472      * inform all the main threads.
473      */
474 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
475     if (   EMPTY_THREAD_QUEUES() )
476     {
477         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
478         // Garbage collection can release some new threads due to
479         // either (a) finalizers or (b) threads resurrected because
480         // they are about to be send BlockedOnDeadMVar.  Any threads
481         // thus released will be immediately runnable.
482         GarbageCollect(GetRoots,rtsTrue);
483
484         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
485
486         IF_DEBUG(scheduler, 
487                  sched_belch("still deadlocked, checking for black holes..."));
488         detectBlackHoles();
489
490         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
491
492 #if defined(RTS_USER_SIGNALS)
493         /* If we have user-installed signal handlers, then wait
494          * for signals to arrive rather then bombing out with a
495          * deadlock.
496          */
497         if ( anyUserHandlers() ) {
498             IF_DEBUG(scheduler, 
499                      sched_belch("still deadlocked, waiting for signals..."));
500
501             awaitUserSignals();
502
503             // we might be interrupted...
504             if (interrupted) { continue; }
505
506             if (signals_pending()) {
507                 RELEASE_LOCK(&sched_mutex);
508                 startSignalHandlers();
509                 ACQUIRE_LOCK(&sched_mutex);
510             }
511             ASSERT(!EMPTY_RUN_QUEUE());
512             goto not_deadlocked;
513         }
514 #endif
515
516         /* Probably a real deadlock.  Send the current main thread the
517          * Deadlock exception (or in the SMP build, send *all* main
518          * threads the deadlock exception, since none of them can make
519          * progress).
520          */
521         {
522             StgMainThread *m;
523             m = main_threads;
524             switch (m->tso->why_blocked) {
525             case BlockedOnBlackHole:
526             case BlockedOnException:
527             case BlockedOnMVar:
528                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
529                 break;
530             default:
531                 barf("deadlock: main thread blocked in a strange way");
532             }
533         }
534     }
535   not_deadlocked:
536
537 #elif defined(RTS_SUPPORTS_THREADS)
538     // ToDo: add deadlock detection in threaded RTS
539 #elif defined(PAR)
540     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
541 #endif
542
543 #if defined(RTS_SUPPORTS_THREADS)
544     if ( EMPTY_RUN_QUEUE() ) {
545         continue; // nothing to do
546     }
547 #endif
548
549 #if defined(GRAN)
550     if (RtsFlags.GranFlags.Light)
551       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
552
553     /* adjust time based on time-stamp */
554     if (event->time > CurrentTime[CurrentProc] &&
555         event->evttype != ContinueThread)
556       CurrentTime[CurrentProc] = event->time;
557     
558     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
559     if (!RtsFlags.GranFlags.Light)
560       handleIdlePEs();
561
562     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
563
564     /* main event dispatcher in GranSim */
565     switch (event->evttype) {
566       /* Should just be continuing execution */
567     case ContinueThread:
568       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
569       /* ToDo: check assertion
570       ASSERT(run_queue_hd != (StgTSO*)NULL &&
571              run_queue_hd != END_TSO_QUEUE);
572       */
573       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
574       if (!RtsFlags.GranFlags.DoAsyncFetch &&
575           procStatus[CurrentProc]==Fetching) {
576         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
577               CurrentTSO->id, CurrentTSO, CurrentProc);
578         goto next_thread;
579       } 
580       /* Ignore ContinueThreads for completed threads */
581       if (CurrentTSO->what_next == ThreadComplete) {
582         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
583               CurrentTSO->id, CurrentTSO, CurrentProc);
584         goto next_thread;
585       } 
586       /* Ignore ContinueThreads for threads that are being migrated */
587       if (PROCS(CurrentTSO)==Nowhere) { 
588         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
589               CurrentTSO->id, CurrentTSO, CurrentProc);
590         goto next_thread;
591       }
592       /* The thread should be at the beginning of the run queue */
593       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
594         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
595               CurrentTSO->id, CurrentTSO, CurrentProc);
596         break; // run the thread anyway
597       }
598       /*
599       new_event(proc, proc, CurrentTime[proc],
600                 FindWork,
601                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
602       goto next_thread; 
603       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
604       break; // now actually run the thread; DaH Qu'vam yImuHbej 
605
606     case FetchNode:
607       do_the_fetchnode(event);
608       goto next_thread;             /* handle next event in event queue  */
609       
610     case GlobalBlock:
611       do_the_globalblock(event);
612       goto next_thread;             /* handle next event in event queue  */
613       
614     case FetchReply:
615       do_the_fetchreply(event);
616       goto next_thread;             /* handle next event in event queue  */
617       
618     case UnblockThread:   /* Move from the blocked queue to the tail of */
619       do_the_unblock(event);
620       goto next_thread;             /* handle next event in event queue  */
621       
622     case ResumeThread:  /* Move from the blocked queue to the tail of */
623       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
624       event->tso->gran.blocktime += 
625         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
626       do_the_startthread(event);
627       goto next_thread;             /* handle next event in event queue  */
628       
629     case StartThread:
630       do_the_startthread(event);
631       goto next_thread;             /* handle next event in event queue  */
632       
633     case MoveThread:
634       do_the_movethread(event);
635       goto next_thread;             /* handle next event in event queue  */
636       
637     case MoveSpark:
638       do_the_movespark(event);
639       goto next_thread;             /* handle next event in event queue  */
640       
641     case FindWork:
642       do_the_findwork(event);
643       goto next_thread;             /* handle next event in event queue  */
644       
645     default:
646       barf("Illegal event type %u\n", event->evttype);
647     }  /* switch */
648     
649     /* This point was scheduler_loop in the old RTS */
650
651     IF_DEBUG(gran, belch("GRAN: after main switch"));
652
653     TimeOfLastEvent = CurrentTime[CurrentProc];
654     TimeOfNextEvent = get_time_of_next_event();
655     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
656     // CurrentTSO = ThreadQueueHd;
657
658     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
659                          TimeOfNextEvent));
660
661     if (RtsFlags.GranFlags.Light) 
662       GranSimLight_leave_system(event, &ActiveTSO); 
663
664     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
665
666     IF_DEBUG(gran, 
667              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
668
669     /* in a GranSim setup the TSO stays on the run queue */
670     t = CurrentTSO;
671     /* Take a thread from the run queue. */
672     POP_RUN_QUEUE(t); // take_off_run_queue(t);
673
674     IF_DEBUG(gran, 
675              fprintf(stderr, "GRAN: About to run current thread, which is\n");
676              G_TSO(t,5));
677
678     context_switch = 0; // turned on via GranYield, checking events and time slice
679
680     IF_DEBUG(gran, 
681              DumpGranEvent(GR_SCHEDULE, t));
682
683     procStatus[CurrentProc] = Busy;
684
685 #elif defined(PAR)
686     if (PendingFetches != END_BF_QUEUE) {
687         processFetches();
688     }
689
690     /* ToDo: phps merge with spark activation above */
691     /* check whether we have local work and send requests if we have none */
692     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
693       /* :-[  no local threads => look out for local sparks */
694       /* the spark pool for the current PE */
695       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
696       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
697           pool->hd < pool->tl) {
698         /* 
699          * ToDo: add GC code check that we really have enough heap afterwards!!
700          * Old comment:
701          * If we're here (no runnable threads) and we have pending
702          * sparks, we must have a space problem.  Get enough space
703          * to turn one of those pending sparks into a
704          * thread... 
705          */
706
707         spark = findSpark(rtsFalse);                /* get a spark */
708         if (spark != (rtsSpark) NULL) {
709           tso = activateSpark(spark);       /* turn the spark into a thread */
710           IF_PAR_DEBUG(schedule,
711                        belch("==== schedule: Created TSO %d (%p); %d threads active",
712                              tso->id, tso, advisory_thread_count));
713
714           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
715             belch("==^^ failed to activate spark");
716             goto next_thread;
717           }               /* otherwise fall through & pick-up new tso */
718         } else {
719           IF_PAR_DEBUG(verbose,
720                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
721                              spark_queue_len(pool)));
722           goto next_thread;
723         }
724       }
725
726       /* If we still have no work we need to send a FISH to get a spark
727          from another PE 
728       */
729       if (EMPTY_RUN_QUEUE()) {
730       /* =8-[  no local sparks => look for work on other PEs */
731         /*
732          * We really have absolutely no work.  Send out a fish
733          * (there may be some out there already), and wait for
734          * something to arrive.  We clearly can't run any threads
735          * until a SCHEDULE or RESUME arrives, and so that's what
736          * we're hoping to see.  (Of course, we still have to
737          * respond to other types of messages.)
738          */
739         TIME now = msTime() /*CURRENT_TIME*/;
740         IF_PAR_DEBUG(verbose, 
741                      belch("--  now=%ld", now));
742         IF_PAR_DEBUG(verbose,
743                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
744                          (last_fish_arrived_at!=0 &&
745                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
746                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
747                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
748                              last_fish_arrived_at,
749                              RtsFlags.ParFlags.fishDelay, now);
750                      });
751         
752         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
753             (last_fish_arrived_at==0 ||
754              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
755           /* outstandingFishes is set in sendFish, processFish;
756              avoid flooding system with fishes via delay */
757           pe = choosePE();
758           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
759                    NEW_FISH_HUNGER);
760
761           // Global statistics: count no. of fishes
762           if (RtsFlags.ParFlags.ParStats.Global &&
763               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
764             globalParStats.tot_fish_mess++;
765           }
766         }
767       
768         receivedFinish = processMessages();
769         goto next_thread;
770       }
771     } else if (PacketsWaiting()) {  /* Look for incoming messages */
772       receivedFinish = processMessages();
773     }
774
775     /* Now we are sure that we have some work available */
776     ASSERT(run_queue_hd != END_TSO_QUEUE);
777
778     /* Take a thread from the run queue, if we have work */
779     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
780     IF_DEBUG(sanity,checkTSO(t));
781
782     /* ToDo: write something to the log-file
783     if (RTSflags.ParFlags.granSimStats && !sameThread)
784         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
785
786     CurrentTSO = t;
787     */
788     /* the spark pool for the current PE */
789     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
790
791     IF_DEBUG(scheduler, 
792              belch("--=^ %d threads, %d sparks on [%#x]", 
793                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
794
795 # if 1
796     if (0 && RtsFlags.ParFlags.ParStats.Full && 
797         t && LastTSO && t->id != LastTSO->id && 
798         LastTSO->why_blocked == NotBlocked && 
799         LastTSO->what_next != ThreadComplete) {
800       // if previously scheduled TSO not blocked we have to record the context switch
801       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
802                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
803     }
804
805     if (RtsFlags.ParFlags.ParStats.Full && 
806         (emitSchedule /* forced emit */ ||
807         (t && LastTSO && t->id != LastTSO->id))) {
808       /* 
809          we are running a different TSO, so write a schedule event to log file
810          NB: If we use fair scheduling we also have to write  a deschedule 
811              event for LastTSO; with unfair scheduling we know that the
812              previous tso has blocked whenever we switch to another tso, so
813              we don't need it in GUM for now
814       */
815       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
816                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
817       emitSchedule = rtsFalse;
818     }
819      
820 # endif
821 #else /* !GRAN && !PAR */
822   
823     // grab a thread from the run queue
824     ASSERT(run_queue_hd != END_TSO_QUEUE);
825     POP_RUN_QUEUE(t);
826
827     // Sanity check the thread we're about to run.  This can be
828     // expensive if there is lots of thread switching going on...
829     IF_DEBUG(sanity,checkTSO(t));
830 #endif
831
832 #ifdef THREADED_RTS
833     {
834       StgMainThread *m = t->main;
835       
836       if(m)
837       {
838         if(m == mainThread)
839         {
840           IF_DEBUG(scheduler,
841             sched_belch("### Running thread %d in bound thread", t->id));
842           // yes, the Haskell thread is bound to the current native thread
843         }
844         else
845         {
846           IF_DEBUG(scheduler,
847             sched_belch("### thread %d bound to another OS thread", t->id));
848           // no, bound to a different Haskell thread: pass to that thread
849           PUSH_ON_RUN_QUEUE(t);
850           passCapability(&m->bound_thread_cond);
851           continue;
852         }
853       }
854       else
855       {
856         if(mainThread != NULL)
857         // The thread we want to run is bound.
858         {
859           IF_DEBUG(scheduler,
860             sched_belch("### this OS thread cannot run thread %d", t->id));
861           // no, the current native thread is bound to a different
862           // Haskell thread, so pass it to any worker thread
863           PUSH_ON_RUN_QUEUE(t);
864           passCapabilityToWorker();
865           continue; 
866         }
867       }
868     }
869 #endif
870
871     cap->r.rCurrentTSO = t;
872     
873     /* context switches are now initiated by the timer signal, unless
874      * the user specified "context switch as often as possible", with
875      * +RTS -C0
876      */
877     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
878          && (run_queue_hd != END_TSO_QUEUE
879              || blocked_queue_hd != END_TSO_QUEUE
880              || sleeping_queue != END_TSO_QUEUE)))
881         context_switch = 1;
882     else
883         context_switch = 0;
884
885 run_thread:
886
887     RELEASE_LOCK(&sched_mutex);
888
889     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
890                               t->id, whatNext_strs[t->what_next]));
891
892 #ifdef PROFILING
893     startHeapProfTimer();
894 #endif
895
896     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
897     /* Run the current thread 
898      */
899     prev_what_next = t->what_next;
900     switch (prev_what_next) {
901     case ThreadKilled:
902     case ThreadComplete:
903         /* Thread already finished, return to scheduler. */
904         ret = ThreadFinished;
905         break;
906     case ThreadRunGHC:
907         errno = t->saved_errno;
908         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
909         t->saved_errno = errno;
910         break;
911     case ThreadInterpret:
912         ret = interpretBCO(cap);
913         break;
914     default:
915       barf("schedule: invalid what_next field");
916     }
917     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
918     
919     /* Costs for the scheduler are assigned to CCS_SYSTEM */
920 #ifdef PROFILING
921     stopHeapProfTimer();
922     CCCS = CCS_SYSTEM;
923 #endif
924     
925     ACQUIRE_LOCK(&sched_mutex);
926     
927 #ifdef RTS_SUPPORTS_THREADS
928     IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
929 #elif !defined(GRAN) && !defined(PAR)
930     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
931 #endif
932     t = cap->r.rCurrentTSO;
933     
934 #if defined(PAR)
935     /* HACK 675: if the last thread didn't yield, make sure to print a 
936        SCHEDULE event to the log file when StgRunning the next thread, even
937        if it is the same one as before */
938     LastTSO = t; 
939     TimeOfLastYield = CURRENT_TIME;
940 #endif
941
942     switch (ret) {
943     case HeapOverflow:
944 #if defined(GRAN)
945       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
946       globalGranStats.tot_heapover++;
947 #elif defined(PAR)
948       globalParStats.tot_heapover++;
949 #endif
950
951       // did the task ask for a large block?
952       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
953           // if so, get one and push it on the front of the nursery.
954           bdescr *bd;
955           nat blocks;
956           
957           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
958
959           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
960                                    t->id, whatNext_strs[t->what_next], blocks));
961
962           // don't do this if it would push us over the
963           // alloc_blocks_lim limit; we'll GC first.
964           if (alloc_blocks + blocks < alloc_blocks_lim) {
965
966               alloc_blocks += blocks;
967               bd = allocGroup( blocks );
968
969               // link the new group into the list
970               bd->link = cap->r.rCurrentNursery;
971               bd->u.back = cap->r.rCurrentNursery->u.back;
972               if (cap->r.rCurrentNursery->u.back != NULL) {
973                   cap->r.rCurrentNursery->u.back->link = bd;
974               } else {
975                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
976                          g0s0->blocks == cap->r.rNursery);
977                   cap->r.rNursery = g0s0->blocks = bd;
978               }           
979               cap->r.rCurrentNursery->u.back = bd;
980
981               // initialise it as a nursery block.  We initialise the
982               // step, gen_no, and flags field of *every* sub-block in
983               // this large block, because this is easier than making
984               // sure that we always find the block head of a large
985               // block whenever we call Bdescr() (eg. evacuate() and
986               // isAlive() in the GC would both have to do this, at
987               // least).
988               { 
989                   bdescr *x;
990                   for (x = bd; x < bd + blocks; x++) {
991                       x->step = g0s0;
992                       x->gen_no = 0;
993                       x->flags = 0;
994                   }
995               }
996
997               // don't forget to update the block count in g0s0.
998               g0s0->n_blocks += blocks;
999               // This assert can be a killer if the app is doing lots
1000               // of large block allocations.
1001               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1002
1003               // now update the nursery to point to the new block
1004               cap->r.rCurrentNursery = bd;
1005
1006               // we might be unlucky and have another thread get on the
1007               // run queue before us and steal the large block, but in that
1008               // case the thread will just end up requesting another large
1009               // block.
1010               PUSH_ON_RUN_QUEUE(t);
1011               break;
1012           }
1013       }
1014
1015       /* make all the running tasks block on a condition variable,
1016        * maybe set context_switch and wait till they all pile in,
1017        * then have them wait on a GC condition variable.
1018        */
1019       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1020                                t->id, whatNext_strs[t->what_next]));
1021       threadPaused(t);
1022 #if defined(GRAN)
1023       ASSERT(!is_on_queue(t,CurrentProc));
1024 #elif defined(PAR)
1025       /* Currently we emit a DESCHEDULE event before GC in GUM.
1026          ToDo: either add separate event to distinguish SYSTEM time from rest
1027                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1028       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1029         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1030                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1031         emitSchedule = rtsTrue;
1032       }
1033 #endif
1034       
1035       ready_to_gc = rtsTrue;
1036       context_switch = 1;               /* stop other threads ASAP */
1037       PUSH_ON_RUN_QUEUE(t);
1038       /* actual GC is done at the end of the while loop */
1039       break;
1040       
1041     case StackOverflow:
1042 #if defined(GRAN)
1043       IF_DEBUG(gran, 
1044                DumpGranEvent(GR_DESCHEDULE, t));
1045       globalGranStats.tot_stackover++;
1046 #elif defined(PAR)
1047       // IF_DEBUG(par, 
1048       // DumpGranEvent(GR_DESCHEDULE, t);
1049       globalParStats.tot_stackover++;
1050 #endif
1051       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1052                                t->id, whatNext_strs[t->what_next]));
1053       /* just adjust the stack for this thread, then pop it back
1054        * on the run queue.
1055        */
1056       threadPaused(t);
1057       { 
1058         /* enlarge the stack */
1059         StgTSO *new_t = threadStackOverflow(t);
1060         
1061         /* This TSO has moved, so update any pointers to it from the
1062          * main thread stack.  It better not be on any other queues...
1063          * (it shouldn't be).
1064          */
1065         if (t->main != NULL) {
1066             t->main->tso = new_t;
1067         }
1068         PUSH_ON_RUN_QUEUE(new_t);
1069       }
1070       break;
1071
1072     case ThreadYielding:
1073 #if defined(GRAN)
1074       IF_DEBUG(gran, 
1075                DumpGranEvent(GR_DESCHEDULE, t));
1076       globalGranStats.tot_yields++;
1077 #elif defined(PAR)
1078       // IF_DEBUG(par, 
1079       // DumpGranEvent(GR_DESCHEDULE, t);
1080       globalParStats.tot_yields++;
1081 #endif
1082       /* put the thread back on the run queue.  Then, if we're ready to
1083        * GC, check whether this is the last task to stop.  If so, wake
1084        * up the GC thread.  getThread will block during a GC until the
1085        * GC is finished.
1086        */
1087       IF_DEBUG(scheduler,
1088                if (t->what_next != prev_what_next) {
1089                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1090                          t->id, whatNext_strs[t->what_next]);
1091                } else {
1092                    belch("--<< thread %ld (%s) stopped, yielding", 
1093                          t->id, whatNext_strs[t->what_next]);
1094                }
1095                );
1096
1097       IF_DEBUG(sanity,
1098                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1099                checkTSO(t));
1100       ASSERT(t->link == END_TSO_QUEUE);
1101
1102       // Shortcut if we're just switching evaluators: don't bother
1103       // doing stack squeezing (which can be expensive), just run the
1104       // thread.
1105       if (t->what_next != prev_what_next) {
1106           goto run_thread;
1107       }
1108
1109       threadPaused(t);
1110
1111 #if defined(GRAN)
1112       ASSERT(!is_on_queue(t,CurrentProc));
1113
1114       IF_DEBUG(sanity,
1115                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1116                checkThreadQsSanity(rtsTrue));
1117 #endif
1118
1119 #if defined(PAR)
1120       if (RtsFlags.ParFlags.doFairScheduling) { 
1121         /* this does round-robin scheduling; good for concurrency */
1122         APPEND_TO_RUN_QUEUE(t);
1123       } else {
1124         /* this does unfair scheduling; good for parallelism */
1125         PUSH_ON_RUN_QUEUE(t);
1126       }
1127 #else
1128       // this does round-robin scheduling; good for concurrency
1129       APPEND_TO_RUN_QUEUE(t);
1130 #endif
1131
1132 #if defined(GRAN)
1133       /* add a ContinueThread event to actually process the thread */
1134       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1135                 ContinueThread,
1136                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1137       IF_GRAN_DEBUG(bq, 
1138                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1139                G_EVENTQ(0);
1140                G_CURR_THREADQ(0));
1141 #endif /* GRAN */
1142       break;
1143
1144     case ThreadBlocked:
1145 #if defined(GRAN)
1146       IF_DEBUG(scheduler,
1147                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1148                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1149                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1150
1151       // ??? needed; should emit block before
1152       IF_DEBUG(gran, 
1153                DumpGranEvent(GR_DESCHEDULE, t)); 
1154       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1155       /*
1156         ngoq Dogh!
1157       ASSERT(procStatus[CurrentProc]==Busy || 
1158               ((procStatus[CurrentProc]==Fetching) && 
1159               (t->block_info.closure!=(StgClosure*)NULL)));
1160       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1161           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1162             procStatus[CurrentProc]==Fetching)) 
1163         procStatus[CurrentProc] = Idle;
1164       */
1165 #elif defined(PAR)
1166       IF_DEBUG(scheduler,
1167                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1168                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1169       IF_PAR_DEBUG(bq,
1170
1171                    if (t->block_info.closure!=(StgClosure*)NULL) 
1172                      print_bq(t->block_info.closure));
1173
1174       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1175       blockThread(t);
1176
1177       /* whatever we schedule next, we must log that schedule */
1178       emitSchedule = rtsTrue;
1179
1180 #else /* !GRAN */
1181       /* don't need to do anything.  Either the thread is blocked on
1182        * I/O, in which case we'll have called addToBlockedQueue
1183        * previously, or it's blocked on an MVar or Blackhole, in which
1184        * case it'll be on the relevant queue already.
1185        */
1186       IF_DEBUG(scheduler,
1187                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1188                        t->id, whatNext_strs[t->what_next]);
1189                printThreadBlockage(t);
1190                fprintf(stderr, "\n"));
1191       fflush(stderr);
1192
1193       /* Only for dumping event to log file 
1194          ToDo: do I need this in GranSim, too?
1195       blockThread(t);
1196       */
1197 #endif
1198       threadPaused(t);
1199       break;
1200
1201     case ThreadFinished:
1202       /* Need to check whether this was a main thread, and if so, signal
1203        * the task that started it with the return value.  If we have no
1204        * more main threads, we probably need to stop all the tasks until
1205        * we get a new one.
1206        */
1207       /* We also end up here if the thread kills itself with an
1208        * uncaught exception, see Exception.hc.
1209        */
1210       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1211                                t->id, whatNext_strs[t->what_next]));
1212 #if defined(GRAN)
1213       endThread(t, CurrentProc); // clean-up the thread
1214 #elif defined(PAR)
1215       /* For now all are advisory -- HWL */
1216       //if(t->priority==AdvisoryPriority) ??
1217       advisory_thread_count--;
1218       
1219 # ifdef DIST
1220       if(t->dist.priority==RevalPriority)
1221         FinishReval(t);
1222 # endif
1223       
1224       if (RtsFlags.ParFlags.ParStats.Full &&
1225           !RtsFlags.ParFlags.ParStats.Suppressed) 
1226         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1227 #endif
1228
1229       //
1230       // Check whether the thread that just completed was a main
1231       // thread, and if so return with the result.  
1232       //
1233       // There is an assumption here that all thread completion goes
1234       // through this point; we need to make sure that if a thread
1235       // ends up in the ThreadKilled state, that it stays on the run
1236       // queue so it can be dealt with here.
1237       //
1238       if (
1239 #if defined(RTS_SUPPORTS_THREADS)
1240           mainThread != NULL
1241 #else
1242           mainThread->tso == t
1243 #endif
1244           )
1245       {
1246           // We are a bound thread: this must be our thread that just
1247           // completed.
1248           ASSERT(mainThread->tso == t);
1249
1250           if (t->what_next == ThreadComplete) {
1251               if (mainThread->ret) {
1252                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1253                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1254               }
1255               mainThread->stat = Success;
1256           } else {
1257               if (mainThread->ret) {
1258                   *(mainThread->ret) = NULL;
1259               }
1260               if (was_interrupted) {
1261                   mainThread->stat = Interrupted;
1262               } else {
1263                   mainThread->stat = Killed;
1264               }
1265           }
1266 #ifdef DEBUG
1267           removeThreadLabel((StgWord)mainThread->tso->id);
1268 #endif
1269           if (mainThread->prev == NULL) {
1270               main_threads = mainThread->link;
1271           } else {
1272               mainThread->prev->link = mainThread->link;
1273           }
1274           if (mainThread->link != NULL) {
1275               mainThread->link->prev = NULL;
1276           }
1277           releaseCapability(cap);
1278           return;
1279       }
1280
1281 #ifdef RTS_SUPPORTS_THREADS
1282       ASSERT(t->main == NULL);
1283 #else
1284       if (t->main != NULL) {
1285           // Must be a main thread that is not the topmost one.  Leave
1286           // it on the run queue until the stack has unwound to the
1287           // point where we can deal with this.  Leaving it on the run
1288           // queue also ensures that the garbage collector knows about
1289           // this thread and its return value (it gets dropped from the
1290           // all_threads list so there's no other way to find it).
1291           APPEND_TO_RUN_QUEUE(t);
1292       }
1293 #endif
1294       break;
1295
1296     default:
1297       barf("schedule: invalid thread return code %d", (int)ret);
1298     }
1299
1300 #ifdef PROFILING
1301     // When we have +RTS -i0 and we're heap profiling, do a census at
1302     // every GC.  This lets us get repeatable runs for debugging.
1303     if (performHeapProfile ||
1304         (RtsFlags.ProfFlags.profileInterval==0 &&
1305          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1306         GarbageCollect(GetRoots, rtsTrue);
1307         heapCensus();
1308         performHeapProfile = rtsFalse;
1309         ready_to_gc = rtsFalse; // we already GC'd
1310     }
1311 #endif
1312
1313     if (ready_to_gc) {
1314       /* everybody back, start the GC.
1315        * Could do it in this thread, or signal a condition var
1316        * to do it in another thread.  Either way, we need to
1317        * broadcast on gc_pending_cond afterward.
1318        */
1319 #if defined(RTS_SUPPORTS_THREADS)
1320       IF_DEBUG(scheduler,sched_belch("doing GC"));
1321 #endif
1322       GarbageCollect(GetRoots,rtsFalse);
1323       ready_to_gc = rtsFalse;
1324 #if defined(GRAN)
1325       /* add a ContinueThread event to continue execution of current thread */
1326       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1327                 ContinueThread,
1328                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1329       IF_GRAN_DEBUG(bq, 
1330                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1331                G_EVENTQ(0);
1332                G_CURR_THREADQ(0));
1333 #endif /* GRAN */
1334     }
1335
1336 #if defined(GRAN)
1337   next_thread:
1338     IF_GRAN_DEBUG(unused,
1339                   print_eventq(EventHd));
1340
1341     event = get_next_event();
1342 #elif defined(PAR)
1343   next_thread:
1344     /* ToDo: wait for next message to arrive rather than busy wait */
1345 #endif /* GRAN */
1346
1347   } /* end of while(1) */
1348
1349   IF_PAR_DEBUG(verbose,
1350                belch("== Leaving schedule() after having received Finish"));
1351 }
1352
1353 /* ---------------------------------------------------------------------------
1354  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1355  * used by Control.Concurrent for error checking.
1356  * ------------------------------------------------------------------------- */
1357  
1358 StgBool
1359 rtsSupportsBoundThreads(void)
1360 {
1361 #ifdef THREADED_RTS
1362   return rtsTrue;
1363 #else
1364   return rtsFalse;
1365 #endif
1366 }
1367
1368 /* ---------------------------------------------------------------------------
1369  * isThreadBound(tso): check whether tso is bound to an OS thread.
1370  * ------------------------------------------------------------------------- */
1371  
1372 StgBool
1373 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1374 {
1375 #ifdef THREADED_RTS
1376   return (tso->main != NULL);
1377 #endif
1378   return rtsFalse;
1379 }
1380
1381 /* ---------------------------------------------------------------------------
1382  * Singleton fork(). Do not copy any running threads.
1383  * ------------------------------------------------------------------------- */
1384
1385 #ifndef mingw32_TARGET_OS
1386 #define FORKPROCESS_PRIMOP_SUPPORTED
1387 #endif
1388
1389 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1390 static void 
1391 deleteThreadImmediately(StgTSO *tso);
1392 #endif
1393 StgInt
1394 forkProcess(HsStablePtr *entry
1395 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1396             STG_UNUSED
1397 #endif
1398            )
1399 {
1400 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1401   pid_t pid;
1402   StgTSO* t,*next;
1403   StgMainThread *m;
1404   SchedulerStatus rc;
1405
1406   IF_DEBUG(scheduler,sched_belch("forking!"));
1407   rts_lock(); // This not only acquires sched_mutex, it also
1408               // makes sure that no other threads are running
1409
1410   pid = fork();
1411
1412   if (pid) { /* parent */
1413
1414   /* just return the pid */
1415     rts_unlock();
1416     return pid;
1417     
1418   } else { /* child */
1419     
1420     
1421       // delete all threads
1422     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1423     
1424     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1425       next = t->link;
1426
1427         // don't allow threads to catch the ThreadKilled exception
1428       deleteThreadImmediately(t);
1429     }
1430     
1431       // wipe the main thread list
1432     while((m = main_threads) != NULL) {
1433       main_threads = m->link;
1434 # ifdef THREADED_RTS
1435       closeCondition(&m->bound_thread_cond);
1436 # endif
1437       stgFree(m);
1438     }
1439     
1440 # ifdef RTS_SUPPORTS_THREADS
1441     resetTaskManagerAfterFork();      // tell startTask() and friends that
1442     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1443     resetWorkerWakeupPipeAfterFork();
1444 # endif
1445     
1446     rc = rts_evalStableIO(entry, NULL);  // run the action
1447     rts_checkSchedStatus("forkProcess",rc);
1448     
1449     rts_unlock();
1450     
1451     hs_exit();                      // clean up and exit
1452     stg_exit(0);
1453   }
1454 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1455   barf("forkProcess#: primop not supported, sorry!\n");
1456   return -1;
1457 #endif
1458 }
1459
1460 /* ---------------------------------------------------------------------------
1461  * deleteAllThreads():  kill all the live threads.
1462  *
1463  * This is used when we catch a user interrupt (^C), before performing
1464  * any necessary cleanups and running finalizers.
1465  *
1466  * Locks: sched_mutex held.
1467  * ------------------------------------------------------------------------- */
1468    
1469 void
1470 deleteAllThreads ( void )
1471 {
1472   StgTSO* t, *next;
1473   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1474   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1475       next = t->global_link;
1476       deleteThread(t);
1477   }      
1478
1479   // The run queue now contains a bunch of ThreadKilled threads.  We
1480   // must not throw these away: the main thread(s) will be in there
1481   // somewhere, and the main scheduler loop has to deal with it.
1482   // Also, the run queue is the only thing keeping these threads from
1483   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1484
1485   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1486   ASSERT(sleeping_queue == END_TSO_QUEUE);
1487 }
1488
1489 /* startThread and  insertThread are now in GranSim.c -- HWL */
1490
1491
1492 /* ---------------------------------------------------------------------------
1493  * Suspending & resuming Haskell threads.
1494  * 
1495  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1496  * its capability before calling the C function.  This allows another
1497  * task to pick up the capability and carry on running Haskell
1498  * threads.  It also means that if the C call blocks, it won't lock
1499  * the whole system.
1500  *
1501  * The Haskell thread making the C call is put to sleep for the
1502  * duration of the call, on the susepended_ccalling_threads queue.  We
1503  * give out a token to the task, which it can use to resume the thread
1504  * on return from the C function.
1505  * ------------------------------------------------------------------------- */
1506    
1507 StgInt
1508 suspendThread( StgRegTable *reg, 
1509                rtsBool concCall
1510 #if !defined(DEBUG)
1511                STG_UNUSED
1512 #endif
1513                )
1514 {
1515   nat tok;
1516   Capability *cap;
1517   int saved_errno = errno;
1518
1519   /* assume that *reg is a pointer to the StgRegTable part
1520    * of a Capability.
1521    */
1522   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1523
1524   ACQUIRE_LOCK(&sched_mutex);
1525
1526   IF_DEBUG(scheduler,
1527            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1528
1529   // XXX this might not be necessary --SDM
1530   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1531
1532   threadPaused(cap->r.rCurrentTSO);
1533   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1534   suspended_ccalling_threads = cap->r.rCurrentTSO;
1535
1536   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1537       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1538       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1539   } else {
1540       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1541   }
1542
1543   /* Use the thread ID as the token; it should be unique */
1544   tok = cap->r.rCurrentTSO->id;
1545
1546   /* Hand back capability */
1547   releaseCapability(cap);
1548   
1549 #if defined(RTS_SUPPORTS_THREADS)
1550   /* Preparing to leave the RTS, so ensure there's a native thread/task
1551      waiting to take over.
1552   */
1553   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1554 #endif
1555
1556   /* Other threads _might_ be available for execution; signal this */
1557   THREAD_RUNNABLE();
1558   RELEASE_LOCK(&sched_mutex);
1559   
1560   errno = saved_errno;
1561   return tok; 
1562 }
1563
1564 StgRegTable *
1565 resumeThread( StgInt tok,
1566               rtsBool concCall STG_UNUSED )
1567 {
1568   StgTSO *tso, **prev;
1569   Capability *cap;
1570   int saved_errno = errno;
1571
1572 #if defined(RTS_SUPPORTS_THREADS)
1573   /* Wait for permission to re-enter the RTS with the result. */
1574   ACQUIRE_LOCK(&sched_mutex);
1575   waitForReturnCapability(&sched_mutex, &cap);
1576
1577   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1578 #else
1579   grabCapability(&cap);
1580 #endif
1581
1582   /* Remove the thread off of the suspended list */
1583   prev = &suspended_ccalling_threads;
1584   for (tso = suspended_ccalling_threads; 
1585        tso != END_TSO_QUEUE; 
1586        prev = &tso->link, tso = tso->link) {
1587     if (tso->id == (StgThreadID)tok) {
1588       *prev = tso->link;
1589       break;
1590     }
1591   }
1592   if (tso == END_TSO_QUEUE) {
1593     barf("resumeThread: thread not found");
1594   }
1595   tso->link = END_TSO_QUEUE;
1596   
1597   if(tso->why_blocked == BlockedOnCCall) {
1598       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1599       tso->blocked_exceptions = NULL;
1600   }
1601   
1602   /* Reset blocking status */
1603   tso->why_blocked  = NotBlocked;
1604
1605   cap->r.rCurrentTSO = tso;
1606   RELEASE_LOCK(&sched_mutex);
1607   errno = saved_errno;
1608   return &cap->r;
1609 }
1610
1611
1612 /* ---------------------------------------------------------------------------
1613  * Static functions
1614  * ------------------------------------------------------------------------ */
1615 static void unblockThread(StgTSO *tso);
1616
1617 /* ---------------------------------------------------------------------------
1618  * Comparing Thread ids.
1619  *
1620  * This is used from STG land in the implementation of the
1621  * instances of Eq/Ord for ThreadIds.
1622  * ------------------------------------------------------------------------ */
1623
1624 int
1625 cmp_thread(StgPtr tso1, StgPtr tso2) 
1626
1627   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1628   StgThreadID id2 = ((StgTSO *)tso2)->id;
1629  
1630   if (id1 < id2) return (-1);
1631   if (id1 > id2) return 1;
1632   return 0;
1633 }
1634
1635 /* ---------------------------------------------------------------------------
1636  * Fetching the ThreadID from an StgTSO.
1637  *
1638  * This is used in the implementation of Show for ThreadIds.
1639  * ------------------------------------------------------------------------ */
1640 int
1641 rts_getThreadId(StgPtr tso) 
1642 {
1643   return ((StgTSO *)tso)->id;
1644 }
1645
1646 #ifdef DEBUG
1647 void
1648 labelThread(StgPtr tso, char *label)
1649 {
1650   int len;
1651   void *buf;
1652
1653   /* Caveat: Once set, you can only set the thread name to "" */
1654   len = strlen(label)+1;
1655   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1656   strncpy(buf,label,len);
1657   /* Update will free the old memory for us */
1658   updateThreadLabel(((StgTSO *)tso)->id,buf);
1659 }
1660 #endif /* DEBUG */
1661
1662 /* ---------------------------------------------------------------------------
1663    Create a new thread.
1664
1665    The new thread starts with the given stack size.  Before the
1666    scheduler can run, however, this thread needs to have a closure
1667    (and possibly some arguments) pushed on its stack.  See
1668    pushClosure() in Schedule.h.
1669
1670    createGenThread() and createIOThread() (in SchedAPI.h) are
1671    convenient packaged versions of this function.
1672
1673    currently pri (priority) is only used in a GRAN setup -- HWL
1674    ------------------------------------------------------------------------ */
1675 #if defined(GRAN)
1676 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1677 StgTSO *
1678 createThread(nat size, StgInt pri)
1679 #else
1680 StgTSO *
1681 createThread(nat size)
1682 #endif
1683 {
1684
1685     StgTSO *tso;
1686     nat stack_size;
1687
1688     /* First check whether we should create a thread at all */
1689 #if defined(PAR)
1690   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1691   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1692     threadsIgnored++;
1693     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1694           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1695     return END_TSO_QUEUE;
1696   }
1697   threadsCreated++;
1698 #endif
1699
1700 #if defined(GRAN)
1701   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1702 #endif
1703
1704   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1705
1706   /* catch ridiculously small stack sizes */
1707   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1708     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1709   }
1710
1711   stack_size = size - TSO_STRUCT_SIZEW;
1712
1713   tso = (StgTSO *)allocate(size);
1714   TICK_ALLOC_TSO(stack_size, 0);
1715
1716   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1717 #if defined(GRAN)
1718   SET_GRAN_HDR(tso, ThisPE);
1719 #endif
1720
1721   // Always start with the compiled code evaluator
1722   tso->what_next = ThreadRunGHC;
1723
1724   tso->id = next_thread_id++; 
1725   tso->why_blocked  = NotBlocked;
1726   tso->blocked_exceptions = NULL;
1727
1728   tso->saved_errno = 0;
1729   tso->main = NULL;
1730   
1731   tso->stack_size   = stack_size;
1732   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1733                               - TSO_STRUCT_SIZEW;
1734   tso->sp           = (P_)&(tso->stack) + stack_size;
1735
1736 #ifdef PROFILING
1737   tso->prof.CCCS = CCS_MAIN;
1738 #endif
1739
1740   /* put a stop frame on the stack */
1741   tso->sp -= sizeofW(StgStopFrame);
1742   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1743   // ToDo: check this
1744 #if defined(GRAN)
1745   tso->link = END_TSO_QUEUE;
1746   /* uses more flexible routine in GranSim */
1747   insertThread(tso, CurrentProc);
1748 #else
1749   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1750    * from its creation
1751    */
1752 #endif
1753
1754 #if defined(GRAN) 
1755   if (RtsFlags.GranFlags.GranSimStats.Full) 
1756     DumpGranEvent(GR_START,tso);
1757 #elif defined(PAR)
1758   if (RtsFlags.ParFlags.ParStats.Full) 
1759     DumpGranEvent(GR_STARTQ,tso);
1760   /* HACk to avoid SCHEDULE 
1761      LastTSO = tso; */
1762 #endif
1763
1764   /* Link the new thread on the global thread list.
1765    */
1766   tso->global_link = all_threads;
1767   all_threads = tso;
1768
1769 #if defined(DIST)
1770   tso->dist.priority = MandatoryPriority; //by default that is...
1771 #endif
1772
1773 #if defined(GRAN)
1774   tso->gran.pri = pri;
1775 # if defined(DEBUG)
1776   tso->gran.magic = TSO_MAGIC; // debugging only
1777 # endif
1778   tso->gran.sparkname   = 0;
1779   tso->gran.startedat   = CURRENT_TIME; 
1780   tso->gran.exported    = 0;
1781   tso->gran.basicblocks = 0;
1782   tso->gran.allocs      = 0;
1783   tso->gran.exectime    = 0;
1784   tso->gran.fetchtime   = 0;
1785   tso->gran.fetchcount  = 0;
1786   tso->gran.blocktime   = 0;
1787   tso->gran.blockcount  = 0;
1788   tso->gran.blockedat   = 0;
1789   tso->gran.globalsparks = 0;
1790   tso->gran.localsparks  = 0;
1791   if (RtsFlags.GranFlags.Light)
1792     tso->gran.clock  = Now; /* local clock */
1793   else
1794     tso->gran.clock  = 0;
1795
1796   IF_DEBUG(gran,printTSO(tso));
1797 #elif defined(PAR)
1798 # if defined(DEBUG)
1799   tso->par.magic = TSO_MAGIC; // debugging only
1800 # endif
1801   tso->par.sparkname   = 0;
1802   tso->par.startedat   = CURRENT_TIME; 
1803   tso->par.exported    = 0;
1804   tso->par.basicblocks = 0;
1805   tso->par.allocs      = 0;
1806   tso->par.exectime    = 0;
1807   tso->par.fetchtime   = 0;
1808   tso->par.fetchcount  = 0;
1809   tso->par.blocktime   = 0;
1810   tso->par.blockcount  = 0;
1811   tso->par.blockedat   = 0;
1812   tso->par.globalsparks = 0;
1813   tso->par.localsparks  = 0;
1814 #endif
1815
1816 #if defined(GRAN)
1817   globalGranStats.tot_threads_created++;
1818   globalGranStats.threads_created_on_PE[CurrentProc]++;
1819   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1820   globalGranStats.tot_sq_probes++;
1821 #elif defined(PAR)
1822   // collect parallel global statistics (currently done together with GC stats)
1823   if (RtsFlags.ParFlags.ParStats.Global &&
1824       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1825     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1826     globalParStats.tot_threads_created++;
1827   }
1828 #endif 
1829
1830 #if defined(GRAN)
1831   IF_GRAN_DEBUG(pri,
1832                 belch("==__ schedule: Created TSO %d (%p);",
1833                       CurrentProc, tso, tso->id));
1834 #elif defined(PAR)
1835     IF_PAR_DEBUG(verbose,
1836                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1837                        tso->id, tso, advisory_thread_count));
1838 #else
1839   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1840                                  tso->id, tso->stack_size));
1841 #endif    
1842   return tso;
1843 }
1844
1845 #if defined(PAR)
1846 /* RFP:
1847    all parallel thread creation calls should fall through the following routine.
1848 */
1849 StgTSO *
1850 createSparkThread(rtsSpark spark) 
1851 { StgTSO *tso;
1852   ASSERT(spark != (rtsSpark)NULL);
1853   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1854   { threadsIgnored++;
1855     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1856           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1857     return END_TSO_QUEUE;
1858   }
1859   else
1860   { threadsCreated++;
1861     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1862     if (tso==END_TSO_QUEUE)     
1863       barf("createSparkThread: Cannot create TSO");
1864 #if defined(DIST)
1865     tso->priority = AdvisoryPriority;
1866 #endif
1867     pushClosure(tso,spark);
1868     PUSH_ON_RUN_QUEUE(tso);
1869     advisory_thread_count++;    
1870   }
1871   return tso;
1872 }
1873 #endif
1874
1875 /*
1876   Turn a spark into a thread.
1877   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1878 */
1879 #if defined(PAR)
1880 StgTSO *
1881 activateSpark (rtsSpark spark) 
1882 {
1883   StgTSO *tso;
1884
1885   tso = createSparkThread(spark);
1886   if (RtsFlags.ParFlags.ParStats.Full) {   
1887     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1888     IF_PAR_DEBUG(verbose,
1889                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1890                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1891   }
1892   // ToDo: fwd info on local/global spark to thread -- HWL
1893   // tso->gran.exported =  spark->exported;
1894   // tso->gran.locked =   !spark->global;
1895   // tso->gran.sparkname = spark->name;
1896
1897   return tso;
1898 }
1899 #endif
1900
1901 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1902                                    Capability *initialCapability
1903                                    );
1904
1905
1906 /* ---------------------------------------------------------------------------
1907  * scheduleThread()
1908  *
1909  * scheduleThread puts a thread on the head of the runnable queue.
1910  * This will usually be done immediately after a thread is created.
1911  * The caller of scheduleThread must create the thread using e.g.
1912  * createThread and push an appropriate closure
1913  * on this thread's stack before the scheduler is invoked.
1914  * ------------------------------------------------------------------------ */
1915
1916 static void scheduleThread_ (StgTSO* tso);
1917
1918 void
1919 scheduleThread_(StgTSO *tso)
1920 {
1921   // Precondition: sched_mutex must be held.
1922   PUSH_ON_RUN_QUEUE(tso);
1923   THREAD_RUNNABLE();
1924 }
1925
1926 void
1927 scheduleThread(StgTSO* tso)
1928 {
1929   ACQUIRE_LOCK(&sched_mutex);
1930   scheduleThread_(tso);
1931   RELEASE_LOCK(&sched_mutex);
1932 }
1933
1934 #if defined(RTS_SUPPORTS_THREADS)
1935 static Condition bound_cond_cache;
1936 static int bound_cond_cache_full = 0;
1937 #endif
1938
1939
1940 SchedulerStatus
1941 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1942                    Capability *initialCapability)
1943 {
1944     // Precondition: sched_mutex must be held
1945     StgMainThread *m;
1946
1947     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1948     m->tso = tso;
1949     tso->main = m;
1950     m->ret = ret;
1951     m->stat = NoStatus;
1952     m->link = main_threads;
1953     m->prev = NULL;
1954     if (main_threads != NULL) {
1955         main_threads->prev = m;
1956     }
1957     main_threads = m;
1958
1959 #if defined(RTS_SUPPORTS_THREADS)
1960     // Allocating a new condition for each thread is expensive, so we
1961     // cache one.  This is a pretty feeble hack, but it helps speed up
1962     // consecutive call-ins quite a bit.
1963     if (bound_cond_cache_full) {
1964         m->bound_thread_cond = bound_cond_cache;
1965         bound_cond_cache_full = 0;
1966     } else {
1967         initCondition(&m->bound_thread_cond);
1968     }
1969 #endif
1970
1971     /* Put the thread on the main-threads list prior to scheduling the TSO.
1972        Failure to do so introduces a race condition in the MT case (as
1973        identified by Wolfgang Thaller), whereby the new task/OS thread 
1974        created by scheduleThread_() would complete prior to the thread
1975        that spawned it managed to put 'itself' on the main-threads list.
1976        The upshot of it all being that the worker thread wouldn't get to
1977        signal the completion of the its work item for the main thread to
1978        see (==> it got stuck waiting.)    -- sof 6/02.
1979     */
1980     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1981     
1982     PUSH_ON_RUN_QUEUE(tso);
1983     // NB. Don't call THREAD_RUNNABLE() here, because the thread is
1984     // bound and only runnable by *this* OS thread, so waking up other
1985     // workers will just slow things down.
1986
1987     return waitThread_(m, initialCapability);
1988 }
1989
1990 /* ---------------------------------------------------------------------------
1991  * initScheduler()
1992  *
1993  * Initialise the scheduler.  This resets all the queues - if the
1994  * queues contained any threads, they'll be garbage collected at the
1995  * next pass.
1996  *
1997  * ------------------------------------------------------------------------ */
1998
1999 void 
2000 initScheduler(void)
2001 {
2002 #if defined(GRAN)
2003   nat i;
2004
2005   for (i=0; i<=MAX_PROC; i++) {
2006     run_queue_hds[i]      = END_TSO_QUEUE;
2007     run_queue_tls[i]      = END_TSO_QUEUE;
2008     blocked_queue_hds[i]  = END_TSO_QUEUE;
2009     blocked_queue_tls[i]  = END_TSO_QUEUE;
2010     ccalling_threadss[i]  = END_TSO_QUEUE;
2011     sleeping_queue        = END_TSO_QUEUE;
2012   }
2013 #else
2014   run_queue_hd      = END_TSO_QUEUE;
2015   run_queue_tl      = END_TSO_QUEUE;
2016   blocked_queue_hd  = END_TSO_QUEUE;
2017   blocked_queue_tl  = END_TSO_QUEUE;
2018   sleeping_queue    = END_TSO_QUEUE;
2019 #endif 
2020
2021   suspended_ccalling_threads  = END_TSO_QUEUE;
2022
2023   main_threads = NULL;
2024   all_threads  = END_TSO_QUEUE;
2025
2026   context_switch = 0;
2027   interrupted    = 0;
2028
2029   RtsFlags.ConcFlags.ctxtSwitchTicks =
2030       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2031       
2032 #if defined(RTS_SUPPORTS_THREADS)
2033   /* Initialise the mutex and condition variables used by
2034    * the scheduler. */
2035   initMutex(&sched_mutex);
2036   initMutex(&term_mutex);
2037 #endif
2038   
2039   ACQUIRE_LOCK(&sched_mutex);
2040
2041   /* A capability holds the state a native thread needs in
2042    * order to execute STG code. At least one capability is
2043    * floating around (only SMP builds have more than one).
2044    */
2045   initCapabilities();
2046   
2047 #if defined(RTS_SUPPORTS_THREADS)
2048     /* start our haskell execution tasks */
2049     startTaskManager(0,taskStart);
2050 #endif
2051
2052 #if /* defined(SMP) ||*/ defined(PAR)
2053   initSparkPools();
2054 #endif
2055
2056   RELEASE_LOCK(&sched_mutex);
2057 }
2058
2059 void
2060 exitScheduler( void )
2061 {
2062 #if defined(RTS_SUPPORTS_THREADS)
2063   stopTaskManager();
2064 #endif
2065   shutting_down_scheduler = rtsTrue;
2066 }
2067
2068 /* ----------------------------------------------------------------------------
2069    Managing the per-task allocation areas.
2070    
2071    Each capability comes with an allocation area.  These are
2072    fixed-length block lists into which allocation can be done.
2073
2074    ToDo: no support for two-space collection at the moment???
2075    ------------------------------------------------------------------------- */
2076
2077 static
2078 SchedulerStatus
2079 waitThread_(StgMainThread* m, Capability *initialCapability)
2080 {
2081   SchedulerStatus stat;
2082
2083   // Precondition: sched_mutex must be held.
2084   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2085
2086 #if defined(GRAN)
2087   /* GranSim specific init */
2088   CurrentTSO = m->tso;                // the TSO to run
2089   procStatus[MainProc] = Busy;        // status of main PE
2090   CurrentProc = MainProc;             // PE to run it on
2091   schedule(m,initialCapability);
2092 #else
2093   schedule(m,initialCapability);
2094   ASSERT(m->stat != NoStatus);
2095 #endif
2096
2097   stat = m->stat;
2098
2099 #if defined(RTS_SUPPORTS_THREADS)
2100   // Free the condition variable, returning it to the cache if possible.
2101   if (!bound_cond_cache_full) {
2102       bound_cond_cache = m->bound_thread_cond;
2103       bound_cond_cache_full = 1;
2104   } else {
2105       closeCondition(&m->bound_thread_cond);
2106   }
2107 #endif
2108
2109   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2110   stgFree(m);
2111
2112   // Postcondition: sched_mutex still held
2113   return stat;
2114 }
2115
2116 /* ---------------------------------------------------------------------------
2117    Where are the roots that we know about?
2118
2119         - all the threads on the runnable queue
2120         - all the threads on the blocked queue
2121         - all the threads on the sleeping queue
2122         - all the thread currently executing a _ccall_GC
2123         - all the "main threads"
2124      
2125    ------------------------------------------------------------------------ */
2126
2127 /* This has to be protected either by the scheduler monitor, or by the
2128         garbage collection monitor (probably the latter).
2129         KH @ 25/10/99
2130 */
2131
2132 void
2133 GetRoots( evac_fn evac )
2134 {
2135 #if defined(GRAN)
2136   {
2137     nat i;
2138     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2139       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2140           evac((StgClosure **)&run_queue_hds[i]);
2141       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2142           evac((StgClosure **)&run_queue_tls[i]);
2143       
2144       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2145           evac((StgClosure **)&blocked_queue_hds[i]);
2146       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2147           evac((StgClosure **)&blocked_queue_tls[i]);
2148       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2149           evac((StgClosure **)&ccalling_threads[i]);
2150     }
2151   }
2152
2153   markEventQueue();
2154
2155 #else /* !GRAN */
2156   if (run_queue_hd != END_TSO_QUEUE) {
2157       ASSERT(run_queue_tl != END_TSO_QUEUE);
2158       evac((StgClosure **)&run_queue_hd);
2159       evac((StgClosure **)&run_queue_tl);
2160   }
2161   
2162   if (blocked_queue_hd != END_TSO_QUEUE) {
2163       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2164       evac((StgClosure **)&blocked_queue_hd);
2165       evac((StgClosure **)&blocked_queue_tl);
2166   }
2167   
2168   if (sleeping_queue != END_TSO_QUEUE) {
2169       evac((StgClosure **)&sleeping_queue);
2170   }
2171 #endif 
2172
2173   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2174       evac((StgClosure **)&suspended_ccalling_threads);
2175   }
2176
2177 #if defined(PAR) || defined(GRAN)
2178   markSparkQueue(evac);
2179 #endif
2180
2181 #if defined(RTS_USER_SIGNALS)
2182   // mark the signal handlers (signals should be already blocked)
2183   markSignalHandlers(evac);
2184 #endif
2185 }
2186
2187 /* -----------------------------------------------------------------------------
2188    performGC
2189
2190    This is the interface to the garbage collector from Haskell land.
2191    We provide this so that external C code can allocate and garbage
2192    collect when called from Haskell via _ccall_GC.
2193
2194    It might be useful to provide an interface whereby the programmer
2195    can specify more roots (ToDo).
2196    
2197    This needs to be protected by the GC condition variable above.  KH.
2198    -------------------------------------------------------------------------- */
2199
2200 static void (*extra_roots)(evac_fn);
2201
2202 void
2203 performGC(void)
2204 {
2205   /* Obligated to hold this lock upon entry */
2206   ACQUIRE_LOCK(&sched_mutex);
2207   GarbageCollect(GetRoots,rtsFalse);
2208   RELEASE_LOCK(&sched_mutex);
2209 }
2210
2211 void
2212 performMajorGC(void)
2213 {
2214   ACQUIRE_LOCK(&sched_mutex);
2215   GarbageCollect(GetRoots,rtsTrue);
2216   RELEASE_LOCK(&sched_mutex);
2217 }
2218
2219 static void
2220 AllRoots(evac_fn evac)
2221 {
2222     GetRoots(evac);             // the scheduler's roots
2223     extra_roots(evac);          // the user's roots
2224 }
2225
2226 void
2227 performGCWithRoots(void (*get_roots)(evac_fn))
2228 {
2229   ACQUIRE_LOCK(&sched_mutex);
2230   extra_roots = get_roots;
2231   GarbageCollect(AllRoots,rtsFalse);
2232   RELEASE_LOCK(&sched_mutex);
2233 }
2234
2235 /* -----------------------------------------------------------------------------
2236    Stack overflow
2237
2238    If the thread has reached its maximum stack size, then raise the
2239    StackOverflow exception in the offending thread.  Otherwise
2240    relocate the TSO into a larger chunk of memory and adjust its stack
2241    size appropriately.
2242    -------------------------------------------------------------------------- */
2243
2244 static StgTSO *
2245 threadStackOverflow(StgTSO *tso)
2246 {
2247   nat new_stack_size, new_tso_size, stack_words;
2248   StgPtr new_sp;
2249   StgTSO *dest;
2250
2251   IF_DEBUG(sanity,checkTSO(tso));
2252   if (tso->stack_size >= tso->max_stack_size) {
2253
2254     IF_DEBUG(gc,
2255              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2256                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2257              /* If we're debugging, just print out the top of the stack */
2258              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2259                                               tso->sp+64)));
2260
2261     /* Send this thread the StackOverflow exception */
2262     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2263     return tso;
2264   }
2265
2266   /* Try to double the current stack size.  If that takes us over the
2267    * maximum stack size for this thread, then use the maximum instead.
2268    * Finally round up so the TSO ends up as a whole number of blocks.
2269    */
2270   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2271   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2272                                        TSO_STRUCT_SIZE)/sizeof(W_);
2273   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2274   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2275
2276   IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2277
2278   dest = (StgTSO *)allocate(new_tso_size);
2279   TICK_ALLOC_TSO(new_stack_size,0);
2280
2281   /* copy the TSO block and the old stack into the new area */
2282   memcpy(dest,tso,TSO_STRUCT_SIZE);
2283   stack_words = tso->stack + tso->stack_size - tso->sp;
2284   new_sp = (P_)dest + new_tso_size - stack_words;
2285   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2286
2287   /* relocate the stack pointers... */
2288   dest->sp         = new_sp;
2289   dest->stack_size = new_stack_size;
2290         
2291   /* Mark the old TSO as relocated.  We have to check for relocated
2292    * TSOs in the garbage collector and any primops that deal with TSOs.
2293    *
2294    * It's important to set the sp value to just beyond the end
2295    * of the stack, so we don't attempt to scavenge any part of the
2296    * dead TSO's stack.
2297    */
2298   tso->what_next = ThreadRelocated;
2299   tso->link = dest;
2300   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2301   tso->why_blocked = NotBlocked;
2302   dest->mut_link = NULL;
2303
2304   IF_PAR_DEBUG(verbose,
2305                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2306                      tso->id, tso, tso->stack_size);
2307                /* If we're debugging, just print out the top of the stack */
2308                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2309                                                 tso->sp+64)));
2310   
2311   IF_DEBUG(sanity,checkTSO(tso));
2312 #if 0
2313   IF_DEBUG(scheduler,printTSO(dest));
2314 #endif
2315
2316   return dest;
2317 }
2318
2319 /* ---------------------------------------------------------------------------
2320    Wake up a queue that was blocked on some resource.
2321    ------------------------------------------------------------------------ */
2322
2323 #if defined(GRAN)
2324 STATIC_INLINE void
2325 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2326 {
2327 }
2328 #elif defined(PAR)
2329 STATIC_INLINE void
2330 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2331 {
2332   /* write RESUME events to log file and
2333      update blocked and fetch time (depending on type of the orig closure) */
2334   if (RtsFlags.ParFlags.ParStats.Full) {
2335     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2336                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2337                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2338     if (EMPTY_RUN_QUEUE())
2339       emitSchedule = rtsTrue;
2340
2341     switch (get_itbl(node)->type) {
2342         case FETCH_ME_BQ:
2343           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2344           break;
2345         case RBH:
2346         case FETCH_ME:
2347         case BLACKHOLE_BQ:
2348           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2349           break;
2350 #ifdef DIST
2351         case MVAR:
2352           break;
2353 #endif    
2354         default:
2355           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2356         }
2357       }
2358 }
2359 #endif
2360
2361 #if defined(GRAN)
2362 static StgBlockingQueueElement *
2363 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2364 {
2365     StgTSO *tso;
2366     PEs node_loc, tso_loc;
2367
2368     node_loc = where_is(node); // should be lifted out of loop
2369     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2370     tso_loc = where_is((StgClosure *)tso);
2371     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2372       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2373       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2374       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2375       // insertThread(tso, node_loc);
2376       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2377                 ResumeThread,
2378                 tso, node, (rtsSpark*)NULL);
2379       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2380       // len_local++;
2381       // len++;
2382     } else { // TSO is remote (actually should be FMBQ)
2383       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2384                                   RtsFlags.GranFlags.Costs.gunblocktime +
2385                                   RtsFlags.GranFlags.Costs.latency;
2386       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2387                 UnblockThread,
2388                 tso, node, (rtsSpark*)NULL);
2389       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2390       // len++;
2391     }
2392     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2393     IF_GRAN_DEBUG(bq,
2394                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2395                           (node_loc==tso_loc ? "Local" : "Global"), 
2396                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2397     tso->block_info.closure = NULL;
2398     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2399                              tso->id, tso));
2400 }
2401 #elif defined(PAR)
2402 static StgBlockingQueueElement *
2403 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2404 {
2405     StgBlockingQueueElement *next;
2406
2407     switch (get_itbl(bqe)->type) {
2408     case TSO:
2409       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2410       /* if it's a TSO just push it onto the run_queue */
2411       next = bqe->link;
2412       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2413       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2414       THREAD_RUNNABLE();
2415       unblockCount(bqe, node);
2416       /* reset blocking status after dumping event */
2417       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2418       break;
2419
2420     case BLOCKED_FETCH:
2421       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2422       next = bqe->link;
2423       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2424       PendingFetches = (StgBlockedFetch *)bqe;
2425       break;
2426
2427 # if defined(DEBUG)
2428       /* can ignore this case in a non-debugging setup; 
2429          see comments on RBHSave closures above */
2430     case CONSTR:
2431       /* check that the closure is an RBHSave closure */
2432       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2433              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2434              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2435       break;
2436
2437     default:
2438       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2439            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2440            (StgClosure *)bqe);
2441 # endif
2442     }
2443   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2444   return next;
2445 }
2446
2447 #else /* !GRAN && !PAR */
2448 static StgTSO *
2449 unblockOneLocked(StgTSO *tso)
2450 {
2451   StgTSO *next;
2452
2453   ASSERT(get_itbl(tso)->type == TSO);
2454   ASSERT(tso->why_blocked != NotBlocked);
2455   tso->why_blocked = NotBlocked;
2456   next = tso->link;
2457   PUSH_ON_RUN_QUEUE(tso);
2458   THREAD_RUNNABLE();
2459   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2460   return next;
2461 }
2462 #endif
2463
2464 #if defined(GRAN) || defined(PAR)
2465 INLINE_ME StgBlockingQueueElement *
2466 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2467 {
2468   ACQUIRE_LOCK(&sched_mutex);
2469   bqe = unblockOneLocked(bqe, node);
2470   RELEASE_LOCK(&sched_mutex);
2471   return bqe;
2472 }
2473 #else
2474 INLINE_ME StgTSO *
2475 unblockOne(StgTSO *tso)
2476 {
2477   ACQUIRE_LOCK(&sched_mutex);
2478   tso = unblockOneLocked(tso);
2479   RELEASE_LOCK(&sched_mutex);
2480   return tso;
2481 }
2482 #endif
2483
2484 #if defined(GRAN)
2485 void 
2486 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2487 {
2488   StgBlockingQueueElement *bqe;
2489   PEs node_loc;
2490   nat len = 0; 
2491
2492   IF_GRAN_DEBUG(bq, 
2493                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2494                       node, CurrentProc, CurrentTime[CurrentProc], 
2495                       CurrentTSO->id, CurrentTSO));
2496
2497   node_loc = where_is(node);
2498
2499   ASSERT(q == END_BQ_QUEUE ||
2500          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2501          get_itbl(q)->type == CONSTR); // closure (type constructor)
2502   ASSERT(is_unique(node));
2503
2504   /* FAKE FETCH: magically copy the node to the tso's proc;
2505      no Fetch necessary because in reality the node should not have been 
2506      moved to the other PE in the first place
2507   */
2508   if (CurrentProc!=node_loc) {
2509     IF_GRAN_DEBUG(bq, 
2510                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2511                         node, node_loc, CurrentProc, CurrentTSO->id, 
2512                         // CurrentTSO, where_is(CurrentTSO),
2513                         node->header.gran.procs));
2514     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2515     IF_GRAN_DEBUG(bq, 
2516                   belch("## new bitmask of node %p is %#x",
2517                         node, node->header.gran.procs));
2518     if (RtsFlags.GranFlags.GranSimStats.Global) {
2519       globalGranStats.tot_fake_fetches++;
2520     }
2521   }
2522
2523   bqe = q;
2524   // ToDo: check: ASSERT(CurrentProc==node_loc);
2525   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2526     //next = bqe->link;
2527     /* 
2528        bqe points to the current element in the queue
2529        next points to the next element in the queue
2530     */
2531     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2532     //tso_loc = where_is(tso);
2533     len++;
2534     bqe = unblockOneLocked(bqe, node);
2535   }
2536
2537   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2538      the closure to make room for the anchor of the BQ */
2539   if (bqe!=END_BQ_QUEUE) {
2540     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2541     /*
2542     ASSERT((info_ptr==&RBH_Save_0_info) ||
2543            (info_ptr==&RBH_Save_1_info) ||
2544            (info_ptr==&RBH_Save_2_info));
2545     */
2546     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2547     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2548     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2549
2550     IF_GRAN_DEBUG(bq,
2551                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2552                         node, info_type(node)));
2553   }
2554
2555   /* statistics gathering */
2556   if (RtsFlags.GranFlags.GranSimStats.Global) {
2557     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2558     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2559     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2560     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2561   }
2562   IF_GRAN_DEBUG(bq,
2563                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2564                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2565 }
2566 #elif defined(PAR)
2567 void 
2568 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2569 {
2570   StgBlockingQueueElement *bqe;
2571
2572   ACQUIRE_LOCK(&sched_mutex);
2573
2574   IF_PAR_DEBUG(verbose, 
2575                belch("##-_ AwBQ for node %p on [%x]: ",
2576                      node, mytid));
2577 #ifdef DIST  
2578   //RFP
2579   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2580     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2581     return;
2582   }
2583 #endif
2584   
2585   ASSERT(q == END_BQ_QUEUE ||
2586          get_itbl(q)->type == TSO ||           
2587          get_itbl(q)->type == BLOCKED_FETCH || 
2588          get_itbl(q)->type == CONSTR); 
2589
2590   bqe = q;
2591   while (get_itbl(bqe)->type==TSO || 
2592          get_itbl(bqe)->type==BLOCKED_FETCH) {
2593     bqe = unblockOneLocked(bqe, node);
2594   }
2595   RELEASE_LOCK(&sched_mutex);
2596 }
2597
2598 #else   /* !GRAN && !PAR */
2599
2600 void
2601 awakenBlockedQueueNoLock(StgTSO *tso)
2602 {
2603   while (tso != END_TSO_QUEUE) {
2604     tso = unblockOneLocked(tso);
2605   }
2606 }
2607
2608 void
2609 awakenBlockedQueue(StgTSO *tso)
2610 {
2611   ACQUIRE_LOCK(&sched_mutex);
2612   while (tso != END_TSO_QUEUE) {
2613     tso = unblockOneLocked(tso);
2614   }
2615   RELEASE_LOCK(&sched_mutex);
2616 }
2617 #endif
2618
2619 /* ---------------------------------------------------------------------------
2620    Interrupt execution
2621    - usually called inside a signal handler so it mustn't do anything fancy.   
2622    ------------------------------------------------------------------------ */
2623
2624 void
2625 interruptStgRts(void)
2626 {
2627     interrupted    = 1;
2628     context_switch = 1;
2629 #ifdef RTS_SUPPORTS_THREADS
2630     wakeBlockedWorkerThread();
2631 #endif
2632 }
2633
2634 /* -----------------------------------------------------------------------------
2635    Unblock a thread
2636
2637    This is for use when we raise an exception in another thread, which
2638    may be blocked.
2639    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2640    -------------------------------------------------------------------------- */
2641
2642 #if defined(GRAN) || defined(PAR)
2643 /*
2644   NB: only the type of the blocking queue is different in GranSim and GUM
2645       the operations on the queue-elements are the same
2646       long live polymorphism!
2647
2648   Locks: sched_mutex is held upon entry and exit.
2649
2650 */
2651 static void
2652 unblockThread(StgTSO *tso)
2653 {
2654   StgBlockingQueueElement *t, **last;
2655
2656   switch (tso->why_blocked) {
2657
2658   case NotBlocked:
2659     return;  /* not blocked */
2660
2661   case BlockedOnMVar:
2662     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2663     {
2664       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2665       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2666
2667       last = (StgBlockingQueueElement **)&mvar->head;
2668       for (t = (StgBlockingQueueElement *)mvar->head; 
2669            t != END_BQ_QUEUE; 
2670            last = &t->link, last_tso = t, t = t->link) {
2671         if (t == (StgBlockingQueueElement *)tso) {
2672           *last = (StgBlockingQueueElement *)tso->link;
2673           if (mvar->tail == tso) {
2674             mvar->tail = (StgTSO *)last_tso;
2675           }
2676           goto done;
2677         }
2678       }
2679       barf("unblockThread (MVAR): TSO not found");
2680     }
2681
2682   case BlockedOnBlackHole:
2683     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2684     {
2685       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2686
2687       last = &bq->blocking_queue;
2688       for (t = bq->blocking_queue; 
2689            t != END_BQ_QUEUE; 
2690            last = &t->link, t = t->link) {
2691         if (t == (StgBlockingQueueElement *)tso) {
2692           *last = (StgBlockingQueueElement *)tso->link;
2693           goto done;
2694         }
2695       }
2696       barf("unblockThread (BLACKHOLE): TSO not found");
2697     }
2698
2699   case BlockedOnException:
2700     {
2701       StgTSO *target  = tso->block_info.tso;
2702
2703       ASSERT(get_itbl(target)->type == TSO);
2704
2705       if (target->what_next == ThreadRelocated) {
2706           target = target->link;
2707           ASSERT(get_itbl(target)->type == TSO);
2708       }
2709
2710       ASSERT(target->blocked_exceptions != NULL);
2711
2712       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2713       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2714            t != END_BQ_QUEUE; 
2715            last = &t->link, t = t->link) {
2716         ASSERT(get_itbl(t)->type == TSO);
2717         if (t == (StgBlockingQueueElement *)tso) {
2718           *last = (StgBlockingQueueElement *)tso->link;
2719           goto done;
2720         }
2721       }
2722       barf("unblockThread (Exception): TSO not found");
2723     }
2724
2725   case BlockedOnRead:
2726   case BlockedOnWrite:
2727 #if defined(mingw32_TARGET_OS)
2728   case BlockedOnDoProc:
2729 #endif
2730     {
2731       /* take TSO off blocked_queue */
2732       StgBlockingQueueElement *prev = NULL;
2733       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2734            prev = t, t = t->link) {
2735         if (t == (StgBlockingQueueElement *)tso) {
2736           if (prev == NULL) {
2737             blocked_queue_hd = (StgTSO *)t->link;
2738             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2739               blocked_queue_tl = END_TSO_QUEUE;
2740             }
2741           } else {
2742             prev->link = t->link;
2743             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2744               blocked_queue_tl = (StgTSO *)prev;
2745             }
2746           }
2747           goto done;
2748         }
2749       }
2750       barf("unblockThread (I/O): TSO not found");
2751     }
2752
2753   case BlockedOnDelay:
2754     {
2755       /* take TSO off sleeping_queue */
2756       StgBlockingQueueElement *prev = NULL;
2757       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2758            prev = t, t = t->link) {
2759         if (t == (StgBlockingQueueElement *)tso) {
2760           if (prev == NULL) {
2761             sleeping_queue = (StgTSO *)t->link;
2762           } else {
2763             prev->link = t->link;
2764           }
2765           goto done;
2766         }
2767       }
2768       barf("unblockThread (delay): TSO not found");
2769     }
2770
2771   default:
2772     barf("unblockThread");
2773   }
2774
2775  done:
2776   tso->link = END_TSO_QUEUE;
2777   tso->why_blocked = NotBlocked;
2778   tso->block_info.closure = NULL;
2779   PUSH_ON_RUN_QUEUE(tso);
2780 }
2781 #else
2782 static void
2783 unblockThread(StgTSO *tso)
2784 {
2785   StgTSO *t, **last;
2786   
2787   /* To avoid locking unnecessarily. */
2788   if (tso->why_blocked == NotBlocked) {
2789     return;
2790   }
2791
2792   switch (tso->why_blocked) {
2793
2794   case BlockedOnMVar:
2795     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2796     {
2797       StgTSO *last_tso = END_TSO_QUEUE;
2798       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2799
2800       last = &mvar->head;
2801       for (t = mvar->head; t != END_TSO_QUEUE; 
2802            last = &t->link, last_tso = t, t = t->link) {
2803         if (t == tso) {
2804           *last = tso->link;
2805           if (mvar->tail == tso) {
2806             mvar->tail = last_tso;
2807           }
2808           goto done;
2809         }
2810       }
2811       barf("unblockThread (MVAR): TSO not found");
2812     }
2813
2814   case BlockedOnBlackHole:
2815     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2816     {
2817       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2818
2819       last = &bq->blocking_queue;
2820       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2821            last = &t->link, t = t->link) {
2822         if (t == tso) {
2823           *last = tso->link;
2824           goto done;
2825         }
2826       }
2827       barf("unblockThread (BLACKHOLE): TSO not found");
2828     }
2829
2830   case BlockedOnException:
2831     {
2832       StgTSO *target  = tso->block_info.tso;
2833
2834       ASSERT(get_itbl(target)->type == TSO);
2835
2836       while (target->what_next == ThreadRelocated) {
2837           target = target->link;
2838           ASSERT(get_itbl(target)->type == TSO);
2839       }
2840       
2841       ASSERT(target->blocked_exceptions != NULL);
2842
2843       last = &target->blocked_exceptions;
2844       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2845            last = &t->link, t = t->link) {
2846         ASSERT(get_itbl(t)->type == TSO);
2847         if (t == tso) {
2848           *last = tso->link;
2849           goto done;
2850         }
2851       }
2852       barf("unblockThread (Exception): TSO not found");
2853     }
2854
2855   case BlockedOnRead:
2856   case BlockedOnWrite:
2857 #if defined(mingw32_TARGET_OS)
2858   case BlockedOnDoProc:
2859 #endif
2860     {
2861       StgTSO *prev = NULL;
2862       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2863            prev = t, t = t->link) {
2864         if (t == tso) {
2865           if (prev == NULL) {
2866             blocked_queue_hd = t->link;
2867             if (blocked_queue_tl == t) {
2868               blocked_queue_tl = END_TSO_QUEUE;
2869             }
2870           } else {
2871             prev->link = t->link;
2872             if (blocked_queue_tl == t) {
2873               blocked_queue_tl = prev;
2874             }
2875           }
2876           goto done;
2877         }
2878       }
2879       barf("unblockThread (I/O): TSO not found");
2880     }
2881
2882   case BlockedOnDelay:
2883     {
2884       StgTSO *prev = NULL;
2885       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2886            prev = t, t = t->link) {
2887         if (t == tso) {
2888           if (prev == NULL) {
2889             sleeping_queue = t->link;
2890           } else {
2891             prev->link = t->link;
2892           }
2893           goto done;
2894         }
2895       }
2896       barf("unblockThread (delay): TSO not found");
2897     }
2898
2899   default:
2900     barf("unblockThread");
2901   }
2902
2903  done:
2904   tso->link = END_TSO_QUEUE;
2905   tso->why_blocked = NotBlocked;
2906   tso->block_info.closure = NULL;
2907   PUSH_ON_RUN_QUEUE(tso);
2908 }
2909 #endif
2910
2911 /* -----------------------------------------------------------------------------
2912  * raiseAsync()
2913  *
2914  * The following function implements the magic for raising an
2915  * asynchronous exception in an existing thread.
2916  *
2917  * We first remove the thread from any queue on which it might be
2918  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2919  *
2920  * We strip the stack down to the innermost CATCH_FRAME, building
2921  * thunks in the heap for all the active computations, so they can 
2922  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2923  * an application of the handler to the exception, and push it on
2924  * the top of the stack.
2925  * 
2926  * How exactly do we save all the active computations?  We create an
2927  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2928  * AP_STACKs pushes everything from the corresponding update frame
2929  * upwards onto the stack.  (Actually, it pushes everything up to the
2930  * next update frame plus a pointer to the next AP_STACK object.
2931  * Entering the next AP_STACK object pushes more onto the stack until we
2932  * reach the last AP_STACK object - at which point the stack should look
2933  * exactly as it did when we killed the TSO and we can continue
2934  * execution by entering the closure on top of the stack.
2935  *
2936  * We can also kill a thread entirely - this happens if either (a) the 
2937  * exception passed to raiseAsync is NULL, or (b) there's no
2938  * CATCH_FRAME on the stack.  In either case, we strip the entire
2939  * stack and replace the thread with a zombie.
2940  *
2941  * Locks: sched_mutex held upon entry nor exit.
2942  *
2943  * -------------------------------------------------------------------------- */
2944  
2945 void 
2946 deleteThread(StgTSO *tso)
2947 {
2948   raiseAsync(tso,NULL);
2949 }
2950
2951 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2952 static void 
2953 deleteThreadImmediately(StgTSO *tso)
2954 { // for forkProcess only:
2955   // delete thread without giving it a chance to catch the KillThread exception
2956
2957   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2958       return;
2959   }
2960
2961   if (tso->why_blocked != BlockedOnCCall &&
2962       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2963     unblockThread(tso);
2964   }
2965
2966   tso->what_next = ThreadKilled;
2967 }
2968 #endif
2969
2970 void
2971 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2972 {
2973   /* When raising async exs from contexts where sched_mutex isn't held;
2974      use raiseAsyncWithLock(). */
2975   ACQUIRE_LOCK(&sched_mutex);
2976   raiseAsync(tso,exception);
2977   RELEASE_LOCK(&sched_mutex);
2978 }
2979
2980 void
2981 raiseAsync(StgTSO *tso, StgClosure *exception)
2982 {
2983     StgRetInfoTable *info;
2984     StgPtr sp;
2985   
2986     // Thread already dead?
2987     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2988         return;
2989     }
2990
2991     IF_DEBUG(scheduler, 
2992              sched_belch("raising exception in thread %ld.", tso->id));
2993     
2994     // Remove it from any blocking queues
2995     unblockThread(tso);
2996
2997     sp = tso->sp;
2998     
2999     // The stack freezing code assumes there's a closure pointer on
3000     // the top of the stack, so we have to arrange that this is the case...
3001     //
3002     if (sp[0] == (W_)&stg_enter_info) {
3003         sp++;
3004     } else {
3005         sp--;
3006         sp[0] = (W_)&stg_dummy_ret_closure;
3007     }
3008
3009     while (1) {
3010         nat i;
3011
3012         // 1. Let the top of the stack be the "current closure"
3013         //
3014         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3015         // CATCH_FRAME.
3016         //
3017         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3018         // current closure applied to the chunk of stack up to (but not
3019         // including) the update frame.  This closure becomes the "current
3020         // closure".  Go back to step 2.
3021         //
3022         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3023         // top of the stack applied to the exception.
3024         // 
3025         // 5. If it's a STOP_FRAME, then kill the thread.
3026         
3027         StgPtr frame;
3028         
3029         frame = sp + 1;
3030         info = get_ret_itbl((StgClosure *)frame);
3031         
3032         while (info->i.type != UPDATE_FRAME
3033                && (info->i.type != CATCH_FRAME || exception == NULL)
3034                && info->i.type != STOP_FRAME) {
3035             frame += stack_frame_sizeW((StgClosure *)frame);
3036             info = get_ret_itbl((StgClosure *)frame);
3037         }
3038         
3039         switch (info->i.type) {
3040             
3041         case CATCH_FRAME:
3042             // If we find a CATCH_FRAME, and we've got an exception to raise,
3043             // then build the THUNK raise(exception), and leave it on
3044             // top of the CATCH_FRAME ready to enter.
3045             //
3046         {
3047 #ifdef PROFILING
3048             StgCatchFrame *cf = (StgCatchFrame *)frame;
3049 #endif
3050             StgClosure *raise;
3051             
3052             // we've got an exception to raise, so let's pass it to the
3053             // handler in this frame.
3054             //
3055             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3056             TICK_ALLOC_SE_THK(1,0);
3057             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3058             raise->payload[0] = exception;
3059             
3060             // throw away the stack from Sp up to the CATCH_FRAME.
3061             //
3062             sp = frame - 1;
3063             
3064             /* Ensure that async excpetions are blocked now, so we don't get
3065              * a surprise exception before we get around to executing the
3066              * handler.
3067              */
3068             if (tso->blocked_exceptions == NULL) {
3069                 tso->blocked_exceptions = END_TSO_QUEUE;
3070             }
3071             
3072             /* Put the newly-built THUNK on top of the stack, ready to execute
3073              * when the thread restarts.
3074              */
3075             sp[0] = (W_)raise;
3076             sp[-1] = (W_)&stg_enter_info;
3077             tso->sp = sp-1;
3078             tso->what_next = ThreadRunGHC;
3079             IF_DEBUG(sanity, checkTSO(tso));
3080             return;
3081         }
3082         
3083         case UPDATE_FRAME:
3084         {
3085             StgAP_STACK * ap;
3086             nat words;
3087             
3088             // First build an AP_STACK consisting of the stack chunk above the
3089             // current update frame, with the top word on the stack as the
3090             // fun field.
3091             //
3092             words = frame - sp - 1;
3093             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3094             
3095             ap->size = words;
3096             ap->fun  = (StgClosure *)sp[0];
3097             sp++;
3098             for(i=0; i < (nat)words; ++i) {
3099                 ap->payload[i] = (StgClosure *)*sp++;
3100             }
3101             
3102             SET_HDR(ap,&stg_AP_STACK_info,
3103                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3104             TICK_ALLOC_UP_THK(words+1,0);
3105             
3106             IF_DEBUG(scheduler,
3107                      fprintf(stderr,  "sched: Updating ");
3108                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3109                      fprintf(stderr,  " with ");
3110                      printObj((StgClosure *)ap);
3111                 );
3112
3113             // Replace the updatee with an indirection - happily
3114             // this will also wake up any threads currently
3115             // waiting on the result.
3116             //
3117             // Warning: if we're in a loop, more than one update frame on
3118             // the stack may point to the same object.  Be careful not to
3119             // overwrite an IND_OLDGEN in this case, because we'll screw
3120             // up the mutable lists.  To be on the safe side, don't
3121             // overwrite any kind of indirection at all.  See also
3122             // threadSqueezeStack in GC.c, where we have to make a similar
3123             // check.
3124             //
3125             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3126                 // revert the black hole
3127                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3128             }
3129             sp += sizeofW(StgUpdateFrame) - 1;
3130             sp[0] = (W_)ap; // push onto stack
3131             break;
3132         }
3133         
3134         case STOP_FRAME:
3135             // We've stripped the entire stack, the thread is now dead.
3136             sp += sizeofW(StgStopFrame);
3137             tso->what_next = ThreadKilled;
3138             tso->sp = sp;
3139             return;
3140             
3141         default:
3142             barf("raiseAsync");
3143         }
3144     }
3145     barf("raiseAsync");
3146 }
3147
3148 /* -----------------------------------------------------------------------------
3149    resurrectThreads is called after garbage collection on the list of
3150    threads found to be garbage.  Each of these threads will be woken
3151    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3152    on an MVar, or NonTermination if the thread was blocked on a Black
3153    Hole.
3154
3155    Locks: sched_mutex isn't held upon entry nor exit.
3156    -------------------------------------------------------------------------- */
3157
3158 void
3159 resurrectThreads( StgTSO *threads )
3160 {
3161   StgTSO *tso, *next;
3162
3163   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3164     next = tso->global_link;
3165     tso->global_link = all_threads;
3166     all_threads = tso;
3167     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3168
3169     switch (tso->why_blocked) {
3170     case BlockedOnMVar:
3171     case BlockedOnException:
3172       /* Called by GC - sched_mutex lock is currently held. */
3173       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3174       break;
3175     case BlockedOnBlackHole:
3176       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3177       break;
3178     case NotBlocked:
3179       /* This might happen if the thread was blocked on a black hole
3180        * belonging to a thread that we've just woken up (raiseAsync
3181        * can wake up threads, remember...).
3182        */
3183       continue;
3184     default:
3185       barf("resurrectThreads: thread blocked in a strange way");
3186     }
3187   }
3188 }
3189
3190 /* -----------------------------------------------------------------------------
3191  * Blackhole detection: if we reach a deadlock, test whether any
3192  * threads are blocked on themselves.  Any threads which are found to
3193  * be self-blocked get sent a NonTermination exception.
3194  *
3195  * This is only done in a deadlock situation in order to avoid
3196  * performance overhead in the normal case.
3197  *
3198  * Locks: sched_mutex is held upon entry and exit.
3199  * -------------------------------------------------------------------------- */
3200
3201 static void
3202 detectBlackHoles( void )
3203 {
3204     StgTSO *tso = all_threads;
3205     StgClosure *frame;
3206     StgClosure *blocked_on;
3207     StgRetInfoTable *info;
3208
3209     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3210
3211         while (tso->what_next == ThreadRelocated) {
3212             tso = tso->link;
3213             ASSERT(get_itbl(tso)->type == TSO);
3214         }
3215       
3216         if (tso->why_blocked != BlockedOnBlackHole) {
3217             continue;
3218         }
3219         blocked_on = tso->block_info.closure;
3220
3221         frame = (StgClosure *)tso->sp;
3222
3223         while(1) {
3224             info = get_ret_itbl(frame);
3225             switch (info->i.type) {
3226             case UPDATE_FRAME:
3227                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3228                     /* We are blocking on one of our own computations, so
3229                      * send this thread the NonTermination exception.  
3230                      */
3231                     IF_DEBUG(scheduler, 
3232                              sched_belch("thread %d is blocked on itself", tso->id));
3233                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3234                     goto done;
3235                 }
3236                 
3237                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3238                 continue;
3239
3240             case STOP_FRAME:
3241                 goto done;
3242
3243                 // normal stack frames; do nothing except advance the pointer
3244             default:
3245                 (StgPtr)frame += stack_frame_sizeW(frame);
3246             }
3247         }   
3248         done: ;
3249     }
3250 }
3251
3252 /* ----------------------------------------------------------------------------
3253  * Debugging: why is a thread blocked
3254  * [Also provides useful information when debugging threaded programs
3255  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3256    ------------------------------------------------------------------------- */
3257
3258 static
3259 void
3260 printThreadBlockage(StgTSO *tso)
3261 {
3262   switch (tso->why_blocked) {
3263   case BlockedOnRead:
3264     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3265     break;
3266   case BlockedOnWrite:
3267     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3268     break;
3269 #if defined(mingw32_TARGET_OS)
3270     case BlockedOnDoProc:
3271     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3272     break;
3273 #endif
3274   case BlockedOnDelay:
3275     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3276     break;
3277   case BlockedOnMVar:
3278     fprintf(stderr,"is blocked on an MVar");
3279     break;
3280   case BlockedOnException:
3281     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3282             tso->block_info.tso->id);
3283     break;
3284   case BlockedOnBlackHole:
3285     fprintf(stderr,"is blocked on a black hole");
3286     break;
3287   case NotBlocked:
3288     fprintf(stderr,"is not blocked");
3289     break;
3290 #if defined(PAR)
3291   case BlockedOnGA:
3292     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3293             tso->block_info.closure, info_type(tso->block_info.closure));
3294     break;
3295   case BlockedOnGA_NoSend:
3296     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3297             tso->block_info.closure, info_type(tso->block_info.closure));
3298     break;
3299 #endif
3300   case BlockedOnCCall:
3301     fprintf(stderr,"is blocked on an external call");
3302     break;
3303   case BlockedOnCCall_NoUnblockExc:
3304     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3305     break;
3306   default:
3307     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3308          tso->why_blocked, tso->id, tso);
3309   }
3310 }
3311
3312 static
3313 void
3314 printThreadStatus(StgTSO *tso)
3315 {
3316   switch (tso->what_next) {
3317   case ThreadKilled:
3318     fprintf(stderr,"has been killed");
3319     break;
3320   case ThreadComplete:
3321     fprintf(stderr,"has completed");
3322     break;
3323   default:
3324     printThreadBlockage(tso);
3325   }
3326 }
3327
3328 void
3329 printAllThreads(void)
3330 {
3331   StgTSO *t;
3332   void *label;
3333
3334 # if defined(GRAN)
3335   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3336   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3337                        time_string, rtsFalse/*no commas!*/);
3338
3339   fprintf(stderr, "all threads at [%s]:\n", time_string);
3340 # elif defined(PAR)
3341   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3342   ullong_format_string(CURRENT_TIME,
3343                        time_string, rtsFalse/*no commas!*/);
3344
3345   fprintf(stderr,"all threads at [%s]:\n", time_string);
3346 # else
3347   fprintf(stderr,"all threads:\n");
3348 # endif
3349
3350   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3351     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3352     label = lookupThreadLabel(t->id);
3353     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3354     printThreadStatus(t);
3355     fprintf(stderr,"\n");
3356   }
3357 }
3358     
3359 #ifdef DEBUG
3360
3361 /* 
3362    Print a whole blocking queue attached to node (debugging only).
3363 */
3364 # if defined(PAR)
3365 void 
3366 print_bq (StgClosure *node)
3367 {
3368   StgBlockingQueueElement *bqe;
3369   StgTSO *tso;
3370   rtsBool end;
3371
3372   fprintf(stderr,"## BQ of closure %p (%s): ",
3373           node, info_type(node));
3374
3375   /* should cover all closures that may have a blocking queue */
3376   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3377          get_itbl(node)->type == FETCH_ME_BQ ||
3378          get_itbl(node)->type == RBH ||
3379          get_itbl(node)->type == MVAR);
3380     
3381   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3382
3383   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3384 }
3385
3386 /* 
3387    Print a whole blocking queue starting with the element bqe.
3388 */
3389 void 
3390 print_bqe (StgBlockingQueueElement *bqe)
3391 {
3392   rtsBool end;
3393
3394   /* 
3395      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3396   */
3397   for (end = (bqe==END_BQ_QUEUE);
3398        !end; // iterate until bqe points to a CONSTR
3399        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3400        bqe = end ? END_BQ_QUEUE : bqe->link) {
3401     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3402     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3403     /* types of closures that may appear in a blocking queue */
3404     ASSERT(get_itbl(bqe)->type == TSO ||           
3405            get_itbl(bqe)->type == BLOCKED_FETCH || 
3406            get_itbl(bqe)->type == CONSTR); 
3407     /* only BQs of an RBH end with an RBH_Save closure */
3408     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3409
3410     switch (get_itbl(bqe)->type) {
3411     case TSO:
3412       fprintf(stderr," TSO %u (%x),",
3413               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3414       break;
3415     case BLOCKED_FETCH:
3416       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3417               ((StgBlockedFetch *)bqe)->node, 
3418               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3419               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3420               ((StgBlockedFetch *)bqe)->ga.weight);
3421       break;
3422     case CONSTR:
3423       fprintf(stderr," %s (IP %p),",
3424               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3425                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3426                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3427                "RBH_Save_?"), get_itbl(bqe));
3428       break;
3429     default:
3430       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3431            info_type((StgClosure *)bqe)); // , node, info_type(node));
3432       break;
3433     }
3434   } /* for */
3435   fputc('\n', stderr);
3436 }
3437 # elif defined(GRAN)
3438 void 
3439 print_bq (StgClosure *node)
3440 {
3441   StgBlockingQueueElement *bqe;
3442   PEs node_loc, tso_loc;
3443   rtsBool end;
3444
3445   /* should cover all closures that may have a blocking queue */
3446   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3447          get_itbl(node)->type == FETCH_ME_BQ ||
3448          get_itbl(node)->type == RBH);
3449     
3450   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3451   node_loc = where_is(node);
3452
3453   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3454           node, info_type(node), node_loc);
3455
3456   /* 
3457      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3458   */
3459   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3460        !end; // iterate until bqe points to a CONSTR
3461        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3462     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3463     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3464     /* types of closures that may appear in a blocking queue */
3465     ASSERT(get_itbl(bqe)->type == TSO ||           
3466            get_itbl(bqe)->type == CONSTR); 
3467     /* only BQs of an RBH end with an RBH_Save closure */
3468     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3469
3470     tso_loc = where_is((StgClosure *)bqe);
3471     switch (get_itbl(bqe)->type) {
3472     case TSO:
3473       fprintf(stderr," TSO %d (%p) on [PE %d],",
3474               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3475       break;
3476     case CONSTR:
3477       fprintf(stderr," %s (IP %p),",
3478               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3479                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3480                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3481                "RBH_Save_?"), get_itbl(bqe));
3482       break;
3483     default:
3484       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3485            info_type((StgClosure *)bqe), node, info_type(node));
3486       break;
3487     }
3488   } /* for */
3489   fputc('\n', stderr);
3490 }
3491 #else
3492 /* 
3493    Nice and easy: only TSOs on the blocking queue
3494 */
3495 void 
3496 print_bq (StgClosure *node)
3497 {
3498   StgTSO *tso;
3499
3500   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3501   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3502        tso != END_TSO_QUEUE; 
3503        tso=tso->link) {
3504     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3505     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3506     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3507   }
3508   fputc('\n', stderr);
3509 }
3510 # endif
3511
3512 #if defined(PAR)
3513 static nat
3514 run_queue_len(void)
3515 {
3516   nat i;
3517   StgTSO *tso;
3518
3519   for (i=0, tso=run_queue_hd; 
3520        tso != END_TSO_QUEUE;
3521        i++, tso=tso->link)
3522     /* nothing */
3523
3524   return i;
3525 }
3526 #endif
3527
3528 void
3529 sched_belch(char *s, ...)
3530 {
3531   va_list ap;
3532   va_start(ap,s);
3533 #ifdef RTS_SUPPORTS_THREADS
3534   fprintf(stderr, "sched (task %p): ", osThreadId());
3535 #elif defined(PAR)
3536   fprintf(stderr, "== ");
3537 #else
3538   fprintf(stderr, "sched: ");
3539 #endif
3540   vfprintf(stderr, s, ap);
3541   fprintf(stderr, "\n");
3542   fflush(stderr);
3543   va_end(ap);
3544 }
3545
3546 #endif /* DEBUG */