f8a65871a3292d9db593e922559a25c48757aa36
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.198 2004/05/27 15:21:37 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2003
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 /* 
21  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22
23    The main scheduling loop in GUM iterates until a finish message is received.
24    In that case a global flag @receivedFinish@ is set and this instance of
25    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
26    for the handling of incoming messages, such as PP_FINISH.
27    Note that in the parallel case we have a system manager that coordinates
28    different PEs, each of which are running one instance of the RTS.
29    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
30    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31
32  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33
34    The main scheduling code in GranSim is quite different from that in std
35    (concurrent) Haskell: while concurrent Haskell just iterates over the
36    threads in the runnable queue, GranSim is event driven, i.e. it iterates
37    over the events in the global event queue.  -- HWL
38 */
39
40 #include "PosixSource.h"
41 #include "Rts.h"
42 #include "SchedAPI.h"
43 #include "RtsUtils.h"
44 #include "RtsFlags.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "StgStartup.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Storage.h"
53 #include "Interpreter.h"
54 #include "Exception.h"
55 #include "Printer.h"
56 #include "Signals.h"
57 #include "Sanity.h"
58 #include "Stats.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #ifdef PROFILING
63 #include "Proftimer.h"
64 #include "ProfHeap.h"
65 #endif
66 #if defined(GRAN) || defined(PAR)
67 # include "GranSimRts.h"
68 # include "GranSim.h"
69 # include "ParallelRts.h"
70 # include "Parallel.h"
71 # include "ParallelDebug.h"
72 # include "FetchMe.h"
73 # include "HLC.h"
74 #endif
75 #include "Sparks.h"
76 #include "Capability.h"
77 #include "OSThreads.h"
78 #include  "Task.h"
79
80 #ifdef HAVE_SYS_TYPES_H
81 #include <sys/types.h>
82 #endif
83 #ifdef HAVE_UNISTD_H
84 #include <unistd.h>
85 #endif
86
87 #include <string.h>
88 #include <stdlib.h>
89 #include <stdarg.h>
90
91 #ifdef HAVE_ERRNO_H
92 #include <errno.h>
93 #endif
94
95 #ifdef THREADED_RTS
96 #define USED_IN_THREADED_RTS
97 #else
98 #define USED_IN_THREADED_RTS STG_UNUSED
99 #endif
100
101 #ifdef RTS_SUPPORTS_THREADS
102 #define USED_WHEN_RTS_SUPPORTS_THREADS
103 #else
104 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
105 #endif
106
107 /* Main thread queue.
108  * Locks required: sched_mutex.
109  */
110 StgMainThread *main_threads = NULL;
111
112 /* Thread queues.
113  * Locks required: sched_mutex.
114  */
115 #if defined(GRAN)
116
117 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
118 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
119
120 /* 
121    In GranSim we have a runnable and a blocked queue for each processor.
122    In order to minimise code changes new arrays run_queue_hds/tls
123    are created. run_queue_hd is then a short cut (macro) for
124    run_queue_hds[CurrentProc] (see GranSim.h).
125    -- HWL
126 */
127 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
128 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
129 StgTSO *ccalling_threadss[MAX_PROC];
130 /* We use the same global list of threads (all_threads) in GranSim as in
131    the std RTS (i.e. we are cheating). However, we don't use this list in
132    the GranSim specific code at the moment (so we are only potentially
133    cheating).  */
134
135 #else /* !GRAN */
136
137 StgTSO *run_queue_hd = NULL;
138 StgTSO *run_queue_tl = NULL;
139 StgTSO *blocked_queue_hd = NULL;
140 StgTSO *blocked_queue_tl = NULL;
141 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
142
143 #endif
144
145 /* Linked list of all threads.
146  * Used for detecting garbage collected threads.
147  */
148 StgTSO *all_threads = NULL;
149
150 /* When a thread performs a safe C call (_ccall_GC, using old
151  * terminology), it gets put on the suspended_ccalling_threads
152  * list. Used by the garbage collector.
153  */
154 static StgTSO *suspended_ccalling_threads;
155
156 static StgTSO *threadStackOverflow(StgTSO *tso);
157
158 /* KH: The following two flags are shared memory locations.  There is no need
159        to lock them, since they are only unset at the end of a scheduler
160        operation.
161 */
162
163 /* flag set by signal handler to precipitate a context switch */
164 nat context_switch = 0;
165
166 /* if this flag is set as well, give up execution */
167 rtsBool interrupted = rtsFalse;
168
169 /* Next thread ID to allocate.
170  * Locks required: thread_id_mutex
171  */
172 static StgThreadID next_thread_id = 1;
173
174 /*
175  * Pointers to the state of the current thread.
176  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
177  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
178  */
179  
180 /* The smallest stack size that makes any sense is:
181  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
182  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
183  *  + 1                       (the closure to enter)
184  *  + 1                       (stg_ap_v_ret)
185  *  + 1                       (spare slot req'd by stg_ap_v_ret)
186  *
187  * A thread with this stack will bomb immediately with a stack
188  * overflow, which will increase its stack size.  
189  */
190
191 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
192
193
194 #if defined(GRAN)
195 StgTSO *CurrentTSO;
196 #endif
197
198 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
199  *  exists - earlier gccs apparently didn't.
200  *  -= chak
201  */
202 StgTSO dummy_tso;
203
204 static rtsBool ready_to_gc;
205
206 /*
207  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
208  * in an MT setting, needed to signal that a worker thread shouldn't hang around
209  * in the scheduler when it is out of work.
210  */
211 static rtsBool shutting_down_scheduler = rtsFalse;
212
213 void            addToBlockedQueue ( StgTSO *tso );
214
215 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
216        void     interruptStgRts   ( void );
217
218 static void     detectBlackHoles  ( void );
219
220 #if defined(RTS_SUPPORTS_THREADS)
221 /* ToDo: carefully document the invariants that go together
222  *       with these synchronisation objects.
223  */
224 Mutex     sched_mutex       = INIT_MUTEX_VAR;
225 Mutex     term_mutex        = INIT_MUTEX_VAR;
226
227 #endif /* RTS_SUPPORTS_THREADS */
228
229 #if defined(PAR)
230 StgTSO *LastTSO;
231 rtsTime TimeOfLastYield;
232 rtsBool emitSchedule = rtsTrue;
233 #endif
234
235 #if DEBUG
236 static char *whatNext_strs[] = {
237   "ThreadRunGHC",
238   "ThreadInterpret",
239   "ThreadKilled",
240   "ThreadRelocated",
241   "ThreadComplete"
242 };
243 #endif
244
245 #if defined(PAR)
246 StgTSO * createSparkThread(rtsSpark spark);
247 StgTSO * activateSpark (rtsSpark spark);  
248 #endif
249
250 /* ----------------------------------------------------------------------------
251  * Starting Tasks
252  * ------------------------------------------------------------------------- */
253
254 #if defined(RTS_SUPPORTS_THREADS)
255 static rtsBool startingWorkerThread = rtsFalse;
256
257 static void taskStart(void);
258 static void
259 taskStart(void)
260 {
261   ACQUIRE_LOCK(&sched_mutex);
262   startingWorkerThread = rtsFalse;
263   schedule(NULL,NULL);
264   RELEASE_LOCK(&sched_mutex);
265 }
266
267 void
268 startSchedulerTaskIfNecessary(void)
269 {
270   if(run_queue_hd != END_TSO_QUEUE
271     || blocked_queue_hd != END_TSO_QUEUE
272     || sleeping_queue != END_TSO_QUEUE)
273   {
274     if(!startingWorkerThread)
275     { // we don't want to start another worker thread
276       // just because the last one hasn't yet reached the
277       // "waiting for capability" state
278       startingWorkerThread = rtsTrue;
279       if(!startTask(taskStart))
280       {
281         startingWorkerThread = rtsFalse;
282       }
283     }
284   }
285 }
286 #endif
287
288 /* ---------------------------------------------------------------------------
289    Main scheduling loop.
290
291    We use round-robin scheduling, each thread returning to the
292    scheduler loop when one of these conditions is detected:
293
294       * out of heap space
295       * timer expires (thread yields)
296       * thread blocks
297       * thread ends
298       * stack overflow
299
300    Locking notes:  we acquire the scheduler lock once at the beginning
301    of the scheduler loop, and release it when
302     
303       * running a thread, or
304       * waiting for work, or
305       * waiting for a GC to complete.
306
307    GRAN version:
308      In a GranSim setup this loop iterates over the global event queue.
309      This revolves around the global event queue, which determines what 
310      to do next. Therefore, it's more complicated than either the 
311      concurrent or the parallel (GUM) setup.
312
313    GUM version:
314      GUM iterates over incoming messages.
315      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
316      and sends out a fish whenever it has nothing to do; in-between
317      doing the actual reductions (shared code below) it processes the
318      incoming messages and deals with delayed operations 
319      (see PendingFetches).
320      This is not the ugliest code you could imagine, but it's bloody close.
321
322    ------------------------------------------------------------------------ */
323 static void
324 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
325           Capability *initialCapability )
326 {
327   StgTSO *t;
328   Capability *cap;
329   StgThreadReturnCode ret;
330 #if defined(GRAN)
331   rtsEvent *event;
332 #elif defined(PAR)
333   StgSparkPool *pool;
334   rtsSpark spark;
335   StgTSO *tso;
336   GlobalTaskId pe;
337   rtsBool receivedFinish = rtsFalse;
338 # if defined(DEBUG)
339   nat tp_size, sp_size; // stats only
340 # endif
341 #endif
342   rtsBool was_interrupted = rtsFalse;
343   StgTSOWhatNext prev_what_next;
344   
345   // Pre-condition: sched_mutex is held.
346   // We might have a capability, passed in as initialCapability.
347   cap = initialCapability;
348
349 #if defined(RTS_SUPPORTS_THREADS)
350   //
351   // in the threaded case, the capability is either passed in via the
352   // initialCapability parameter, or initialized inside the scheduler
353   // loop 
354   //
355   IF_DEBUG(scheduler,
356            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
357                        mainThread, initialCapability);
358       );
359 #else
360   // simply initialise it in the non-threaded case
361   grabCapability(&cap);
362 #endif
363
364 #if defined(GRAN)
365   /* set up first event to get things going */
366   /* ToDo: assign costs for system setup and init MainTSO ! */
367   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
368             ContinueThread, 
369             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
370
371   IF_DEBUG(gran,
372            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
373            G_TSO(CurrentTSO, 5));
374
375   if (RtsFlags.GranFlags.Light) {
376     /* Save current time; GranSim Light only */
377     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
378   }      
379
380   event = get_next_event();
381
382   while (event!=(rtsEvent*)NULL) {
383     /* Choose the processor with the next event */
384     CurrentProc = event->proc;
385     CurrentTSO = event->tso;
386
387 #elif defined(PAR)
388
389   while (!receivedFinish) {    /* set by processMessages */
390                                /* when receiving PP_FINISH message         */ 
391
392 #else // everything except GRAN and PAR
393
394   while (1) {
395
396 #endif
397
398      IF_DEBUG(scheduler, printAllThreads());
399
400 #if defined(RTS_SUPPORTS_THREADS)
401       // Yield the capability to higher-priority tasks if necessary.
402       //
403       if (cap != NULL) {
404           yieldCapability(&cap);
405       }
406
407       // If we do not currently hold a capability, we wait for one
408       //
409       if (cap == NULL) {
410           waitForCapability(&sched_mutex, &cap,
411                             mainThread ? &mainThread->bound_thread_cond : NULL);
412       }
413
414       // We now have a capability...
415 #endif
416
417     //
418     // If we're interrupted (the user pressed ^C, or some other
419     // termination condition occurred), kill all the currently running
420     // threads.
421     //
422     if (interrupted) {
423         IF_DEBUG(scheduler, sched_belch("interrupted"));
424         interrupted = rtsFalse;
425         was_interrupted = rtsTrue;
426 #if defined(RTS_SUPPORTS_THREADS)
427         // In the threaded RTS, deadlock detection doesn't work,
428         // so just exit right away.
429         prog_belch("interrupted");
430         releaseCapability(cap);
431         RELEASE_LOCK(&sched_mutex);
432         shutdownHaskellAndExit(EXIT_SUCCESS);
433 #else
434         deleteAllThreads();
435 #endif
436     }
437
438 #if defined(RTS_USER_SIGNALS)
439     // check for signals each time around the scheduler
440     if (signals_pending()) {
441       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
442       startSignalHandlers();
443       ACQUIRE_LOCK(&sched_mutex);
444     }
445 #endif
446
447     //
448     // Check whether any waiting threads need to be woken up.  If the
449     // run queue is empty, and there are no other tasks running, we
450     // can wait indefinitely for something to happen.
451     //
452     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
453 #if defined(RTS_SUPPORTS_THREADS)
454                 || EMPTY_RUN_QUEUE()
455 #endif
456         )
457     {
458       awaitEvent( EMPTY_RUN_QUEUE() );
459     }
460     // we can be interrupted while waiting for I/O...
461     if (interrupted) continue;
462
463     /* 
464      * Detect deadlock: when we have no threads to run, there are no
465      * threads waiting on I/O or sleeping, and all the other tasks are
466      * waiting for work, we must have a deadlock of some description.
467      *
468      * We first try to find threads blocked on themselves (ie. black
469      * holes), and generate NonTermination exceptions where necessary.
470      *
471      * If no threads are black holed, we have a deadlock situation, so
472      * inform all the main threads.
473      */
474 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
475     if (   EMPTY_THREAD_QUEUES() )
476     {
477         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
478         // Garbage collection can release some new threads due to
479         // either (a) finalizers or (b) threads resurrected because
480         // they are about to be send BlockedOnDeadMVar.  Any threads
481         // thus released will be immediately runnable.
482         GarbageCollect(GetRoots,rtsTrue);
483
484         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
485
486         IF_DEBUG(scheduler, 
487                  sched_belch("still deadlocked, checking for black holes..."));
488         detectBlackHoles();
489
490         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
491
492 #if defined(RTS_USER_SIGNALS)
493         /* If we have user-installed signal handlers, then wait
494          * for signals to arrive rather then bombing out with a
495          * deadlock.
496          */
497         if ( anyUserHandlers() ) {
498             IF_DEBUG(scheduler, 
499                      sched_belch("still deadlocked, waiting for signals..."));
500
501             awaitUserSignals();
502
503             // we might be interrupted...
504             if (interrupted) { continue; }
505
506             if (signals_pending()) {
507                 RELEASE_LOCK(&sched_mutex);
508                 startSignalHandlers();
509                 ACQUIRE_LOCK(&sched_mutex);
510             }
511             ASSERT(!EMPTY_RUN_QUEUE());
512             goto not_deadlocked;
513         }
514 #endif
515
516         /* Probably a real deadlock.  Send the current main thread the
517          * Deadlock exception (or in the SMP build, send *all* main
518          * threads the deadlock exception, since none of them can make
519          * progress).
520          */
521         {
522             StgMainThread *m;
523             m = main_threads;
524             switch (m->tso->why_blocked) {
525             case BlockedOnBlackHole:
526             case BlockedOnException:
527             case BlockedOnMVar:
528                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
529                 break;
530             default:
531                 barf("deadlock: main thread blocked in a strange way");
532             }
533         }
534     }
535   not_deadlocked:
536
537 #elif defined(RTS_SUPPORTS_THREADS)
538     // ToDo: add deadlock detection in threaded RTS
539 #elif defined(PAR)
540     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
541 #endif
542
543 #if defined(RTS_SUPPORTS_THREADS)
544     if ( EMPTY_RUN_QUEUE() ) {
545         continue; // nothing to do
546     }
547 #endif
548
549 #if defined(GRAN)
550     if (RtsFlags.GranFlags.Light)
551       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
552
553     /* adjust time based on time-stamp */
554     if (event->time > CurrentTime[CurrentProc] &&
555         event->evttype != ContinueThread)
556       CurrentTime[CurrentProc] = event->time;
557     
558     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
559     if (!RtsFlags.GranFlags.Light)
560       handleIdlePEs();
561
562     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
563
564     /* main event dispatcher in GranSim */
565     switch (event->evttype) {
566       /* Should just be continuing execution */
567     case ContinueThread:
568       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
569       /* ToDo: check assertion
570       ASSERT(run_queue_hd != (StgTSO*)NULL &&
571              run_queue_hd != END_TSO_QUEUE);
572       */
573       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
574       if (!RtsFlags.GranFlags.DoAsyncFetch &&
575           procStatus[CurrentProc]==Fetching) {
576         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
577               CurrentTSO->id, CurrentTSO, CurrentProc);
578         goto next_thread;
579       } 
580       /* Ignore ContinueThreads for completed threads */
581       if (CurrentTSO->what_next == ThreadComplete) {
582         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
583               CurrentTSO->id, CurrentTSO, CurrentProc);
584         goto next_thread;
585       } 
586       /* Ignore ContinueThreads for threads that are being migrated */
587       if (PROCS(CurrentTSO)==Nowhere) { 
588         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
589               CurrentTSO->id, CurrentTSO, CurrentProc);
590         goto next_thread;
591       }
592       /* The thread should be at the beginning of the run queue */
593       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
594         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
595               CurrentTSO->id, CurrentTSO, CurrentProc);
596         break; // run the thread anyway
597       }
598       /*
599       new_event(proc, proc, CurrentTime[proc],
600                 FindWork,
601                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
602       goto next_thread; 
603       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
604       break; // now actually run the thread; DaH Qu'vam yImuHbej 
605
606     case FetchNode:
607       do_the_fetchnode(event);
608       goto next_thread;             /* handle next event in event queue  */
609       
610     case GlobalBlock:
611       do_the_globalblock(event);
612       goto next_thread;             /* handle next event in event queue  */
613       
614     case FetchReply:
615       do_the_fetchreply(event);
616       goto next_thread;             /* handle next event in event queue  */
617       
618     case UnblockThread:   /* Move from the blocked queue to the tail of */
619       do_the_unblock(event);
620       goto next_thread;             /* handle next event in event queue  */
621       
622     case ResumeThread:  /* Move from the blocked queue to the tail of */
623       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
624       event->tso->gran.blocktime += 
625         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
626       do_the_startthread(event);
627       goto next_thread;             /* handle next event in event queue  */
628       
629     case StartThread:
630       do_the_startthread(event);
631       goto next_thread;             /* handle next event in event queue  */
632       
633     case MoveThread:
634       do_the_movethread(event);
635       goto next_thread;             /* handle next event in event queue  */
636       
637     case MoveSpark:
638       do_the_movespark(event);
639       goto next_thread;             /* handle next event in event queue  */
640       
641     case FindWork:
642       do_the_findwork(event);
643       goto next_thread;             /* handle next event in event queue  */
644       
645     default:
646       barf("Illegal event type %u\n", event->evttype);
647     }  /* switch */
648     
649     /* This point was scheduler_loop in the old RTS */
650
651     IF_DEBUG(gran, belch("GRAN: after main switch"));
652
653     TimeOfLastEvent = CurrentTime[CurrentProc];
654     TimeOfNextEvent = get_time_of_next_event();
655     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
656     // CurrentTSO = ThreadQueueHd;
657
658     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
659                          TimeOfNextEvent));
660
661     if (RtsFlags.GranFlags.Light) 
662       GranSimLight_leave_system(event, &ActiveTSO); 
663
664     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
665
666     IF_DEBUG(gran, 
667              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
668
669     /* in a GranSim setup the TSO stays on the run queue */
670     t = CurrentTSO;
671     /* Take a thread from the run queue. */
672     POP_RUN_QUEUE(t); // take_off_run_queue(t);
673
674     IF_DEBUG(gran, 
675              fprintf(stderr, "GRAN: About to run current thread, which is\n");
676              G_TSO(t,5));
677
678     context_switch = 0; // turned on via GranYield, checking events and time slice
679
680     IF_DEBUG(gran, 
681              DumpGranEvent(GR_SCHEDULE, t));
682
683     procStatus[CurrentProc] = Busy;
684
685 #elif defined(PAR)
686     if (PendingFetches != END_BF_QUEUE) {
687         processFetches();
688     }
689
690     /* ToDo: phps merge with spark activation above */
691     /* check whether we have local work and send requests if we have none */
692     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
693       /* :-[  no local threads => look out for local sparks */
694       /* the spark pool for the current PE */
695       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
696       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
697           pool->hd < pool->tl) {
698         /* 
699          * ToDo: add GC code check that we really have enough heap afterwards!!
700          * Old comment:
701          * If we're here (no runnable threads) and we have pending
702          * sparks, we must have a space problem.  Get enough space
703          * to turn one of those pending sparks into a
704          * thread... 
705          */
706
707         spark = findSpark(rtsFalse);                /* get a spark */
708         if (spark != (rtsSpark) NULL) {
709           tso = activateSpark(spark);       /* turn the spark into a thread */
710           IF_PAR_DEBUG(schedule,
711                        belch("==== schedule: Created TSO %d (%p); %d threads active",
712                              tso->id, tso, advisory_thread_count));
713
714           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
715             belch("==^^ failed to activate spark");
716             goto next_thread;
717           }               /* otherwise fall through & pick-up new tso */
718         } else {
719           IF_PAR_DEBUG(verbose,
720                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
721                              spark_queue_len(pool)));
722           goto next_thread;
723         }
724       }
725
726       /* If we still have no work we need to send a FISH to get a spark
727          from another PE 
728       */
729       if (EMPTY_RUN_QUEUE()) {
730       /* =8-[  no local sparks => look for work on other PEs */
731         /*
732          * We really have absolutely no work.  Send out a fish
733          * (there may be some out there already), and wait for
734          * something to arrive.  We clearly can't run any threads
735          * until a SCHEDULE or RESUME arrives, and so that's what
736          * we're hoping to see.  (Of course, we still have to
737          * respond to other types of messages.)
738          */
739         TIME now = msTime() /*CURRENT_TIME*/;
740         IF_PAR_DEBUG(verbose, 
741                      belch("--  now=%ld", now));
742         IF_PAR_DEBUG(verbose,
743                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
744                          (last_fish_arrived_at!=0 &&
745                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
746                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
747                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
748                              last_fish_arrived_at,
749                              RtsFlags.ParFlags.fishDelay, now);
750                      });
751         
752         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
753             (last_fish_arrived_at==0 ||
754              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
755           /* outstandingFishes is set in sendFish, processFish;
756              avoid flooding system with fishes via delay */
757           pe = choosePE();
758           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
759                    NEW_FISH_HUNGER);
760
761           // Global statistics: count no. of fishes
762           if (RtsFlags.ParFlags.ParStats.Global &&
763               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
764             globalParStats.tot_fish_mess++;
765           }
766         }
767       
768         receivedFinish = processMessages();
769         goto next_thread;
770       }
771     } else if (PacketsWaiting()) {  /* Look for incoming messages */
772       receivedFinish = processMessages();
773     }
774
775     /* Now we are sure that we have some work available */
776     ASSERT(run_queue_hd != END_TSO_QUEUE);
777
778     /* Take a thread from the run queue, if we have work */
779     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
780     IF_DEBUG(sanity,checkTSO(t));
781
782     /* ToDo: write something to the log-file
783     if (RTSflags.ParFlags.granSimStats && !sameThread)
784         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
785
786     CurrentTSO = t;
787     */
788     /* the spark pool for the current PE */
789     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
790
791     IF_DEBUG(scheduler, 
792              belch("--=^ %d threads, %d sparks on [%#x]", 
793                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
794
795 # if 1
796     if (0 && RtsFlags.ParFlags.ParStats.Full && 
797         t && LastTSO && t->id != LastTSO->id && 
798         LastTSO->why_blocked == NotBlocked && 
799         LastTSO->what_next != ThreadComplete) {
800       // if previously scheduled TSO not blocked we have to record the context switch
801       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
802                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
803     }
804
805     if (RtsFlags.ParFlags.ParStats.Full && 
806         (emitSchedule /* forced emit */ ||
807         (t && LastTSO && t->id != LastTSO->id))) {
808       /* 
809          we are running a different TSO, so write a schedule event to log file
810          NB: If we use fair scheduling we also have to write  a deschedule 
811              event for LastTSO; with unfair scheduling we know that the
812              previous tso has blocked whenever we switch to another tso, so
813              we don't need it in GUM for now
814       */
815       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
816                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
817       emitSchedule = rtsFalse;
818     }
819      
820 # endif
821 #else /* !GRAN && !PAR */
822   
823     // grab a thread from the run queue
824     ASSERT(run_queue_hd != END_TSO_QUEUE);
825     POP_RUN_QUEUE(t);
826
827     // Sanity check the thread we're about to run.  This can be
828     // expensive if there is lots of thread switching going on...
829     IF_DEBUG(sanity,checkTSO(t));
830 #endif
831
832 #ifdef THREADED_RTS
833     {
834       StgMainThread *m = t->main;
835       
836       if(m)
837       {
838         if(m == mainThread)
839         {
840           IF_DEBUG(scheduler,
841             sched_belch("### Running thread %d in bound thread", t->id));
842           // yes, the Haskell thread is bound to the current native thread
843         }
844         else
845         {
846           IF_DEBUG(scheduler,
847             sched_belch("### thread %d bound to another OS thread", t->id));
848           // no, bound to a different Haskell thread: pass to that thread
849           PUSH_ON_RUN_QUEUE(t);
850           passCapability(&m->bound_thread_cond);
851           continue;
852         }
853       }
854       else
855       {
856         if(mainThread != NULL)
857         // The thread we want to run is bound.
858         {
859           IF_DEBUG(scheduler,
860             sched_belch("### this OS thread cannot run thread %d", t->id));
861           // no, the current native thread is bound to a different
862           // Haskell thread, so pass it to any worker thread
863           PUSH_ON_RUN_QUEUE(t);
864           passCapabilityToWorker();
865           continue; 
866         }
867       }
868     }
869 #endif
870
871     cap->r.rCurrentTSO = t;
872     
873     /* context switches are now initiated by the timer signal, unless
874      * the user specified "context switch as often as possible", with
875      * +RTS -C0
876      */
877     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
878          && (run_queue_hd != END_TSO_QUEUE
879              || blocked_queue_hd != END_TSO_QUEUE
880              || sleeping_queue != END_TSO_QUEUE)))
881         context_switch = 1;
882     else
883         context_switch = 0;
884
885 run_thread:
886
887     RELEASE_LOCK(&sched_mutex);
888
889     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
890                               t->id, whatNext_strs[t->what_next]));
891
892 #ifdef PROFILING
893     startHeapProfTimer();
894 #endif
895
896     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
897     /* Run the current thread 
898      */
899     prev_what_next = t->what_next;
900
901     errno = t->saved_errno;
902
903     switch (prev_what_next) {
904
905     case ThreadKilled:
906     case ThreadComplete:
907         /* Thread already finished, return to scheduler. */
908         ret = ThreadFinished;
909         break;
910
911     case ThreadRunGHC:
912         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
913         break;
914
915     case ThreadInterpret:
916         ret = interpretBCO(cap);
917         break;
918
919     default:
920       barf("schedule: invalid what_next field");
921     }
922
923     // The TSO might have moved, so find the new location:
924     t = cap->r.rCurrentTSO;
925
926     // And save the current errno in this thread.
927     t->saved_errno = errno;
928
929     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
930     
931     /* Costs for the scheduler are assigned to CCS_SYSTEM */
932 #ifdef PROFILING
933     stopHeapProfTimer();
934     CCCS = CCS_SYSTEM;
935 #endif
936     
937     ACQUIRE_LOCK(&sched_mutex);
938     
939 #ifdef RTS_SUPPORTS_THREADS
940     IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
941 #elif !defined(GRAN) && !defined(PAR)
942     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
943 #endif
944     
945 #if defined(PAR)
946     /* HACK 675: if the last thread didn't yield, make sure to print a 
947        SCHEDULE event to the log file when StgRunning the next thread, even
948        if it is the same one as before */
949     LastTSO = t; 
950     TimeOfLastYield = CURRENT_TIME;
951 #endif
952
953     switch (ret) {
954     case HeapOverflow:
955 #if defined(GRAN)
956       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
957       globalGranStats.tot_heapover++;
958 #elif defined(PAR)
959       globalParStats.tot_heapover++;
960 #endif
961
962       // did the task ask for a large block?
963       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
964           // if so, get one and push it on the front of the nursery.
965           bdescr *bd;
966           nat blocks;
967           
968           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
969
970           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
971                                    t->id, whatNext_strs[t->what_next], blocks));
972
973           // don't do this if it would push us over the
974           // alloc_blocks_lim limit; we'll GC first.
975           if (alloc_blocks + blocks < alloc_blocks_lim) {
976
977               alloc_blocks += blocks;
978               bd = allocGroup( blocks );
979
980               // link the new group into the list
981               bd->link = cap->r.rCurrentNursery;
982               bd->u.back = cap->r.rCurrentNursery->u.back;
983               if (cap->r.rCurrentNursery->u.back != NULL) {
984                   cap->r.rCurrentNursery->u.back->link = bd;
985               } else {
986                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
987                          g0s0->blocks == cap->r.rNursery);
988                   cap->r.rNursery = g0s0->blocks = bd;
989               }           
990               cap->r.rCurrentNursery->u.back = bd;
991
992               // initialise it as a nursery block.  We initialise the
993               // step, gen_no, and flags field of *every* sub-block in
994               // this large block, because this is easier than making
995               // sure that we always find the block head of a large
996               // block whenever we call Bdescr() (eg. evacuate() and
997               // isAlive() in the GC would both have to do this, at
998               // least).
999               { 
1000                   bdescr *x;
1001                   for (x = bd; x < bd + blocks; x++) {
1002                       x->step = g0s0;
1003                       x->gen_no = 0;
1004                       x->flags = 0;
1005                   }
1006               }
1007
1008               // don't forget to update the block count in g0s0.
1009               g0s0->n_blocks += blocks;
1010               // This assert can be a killer if the app is doing lots
1011               // of large block allocations.
1012               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1013
1014               // now update the nursery to point to the new block
1015               cap->r.rCurrentNursery = bd;
1016
1017               // we might be unlucky and have another thread get on the
1018               // run queue before us and steal the large block, but in that
1019               // case the thread will just end up requesting another large
1020               // block.
1021               PUSH_ON_RUN_QUEUE(t);
1022               break;
1023           }
1024       }
1025
1026       /* make all the running tasks block on a condition variable,
1027        * maybe set context_switch and wait till they all pile in,
1028        * then have them wait on a GC condition variable.
1029        */
1030       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1031                                t->id, whatNext_strs[t->what_next]));
1032       threadPaused(t);
1033 #if defined(GRAN)
1034       ASSERT(!is_on_queue(t,CurrentProc));
1035 #elif defined(PAR)
1036       /* Currently we emit a DESCHEDULE event before GC in GUM.
1037          ToDo: either add separate event to distinguish SYSTEM time from rest
1038                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1039       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1040         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1041                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1042         emitSchedule = rtsTrue;
1043       }
1044 #endif
1045       
1046       ready_to_gc = rtsTrue;
1047       context_switch = 1;               /* stop other threads ASAP */
1048       PUSH_ON_RUN_QUEUE(t);
1049       /* actual GC is done at the end of the while loop */
1050       break;
1051       
1052     case StackOverflow:
1053 #if defined(GRAN)
1054       IF_DEBUG(gran, 
1055                DumpGranEvent(GR_DESCHEDULE, t));
1056       globalGranStats.tot_stackover++;
1057 #elif defined(PAR)
1058       // IF_DEBUG(par, 
1059       // DumpGranEvent(GR_DESCHEDULE, t);
1060       globalParStats.tot_stackover++;
1061 #endif
1062       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1063                                t->id, whatNext_strs[t->what_next]));
1064       /* just adjust the stack for this thread, then pop it back
1065        * on the run queue.
1066        */
1067       threadPaused(t);
1068       { 
1069         /* enlarge the stack */
1070         StgTSO *new_t = threadStackOverflow(t);
1071         
1072         /* This TSO has moved, so update any pointers to it from the
1073          * main thread stack.  It better not be on any other queues...
1074          * (it shouldn't be).
1075          */
1076         if (t->main != NULL) {
1077             t->main->tso = new_t;
1078         }
1079         PUSH_ON_RUN_QUEUE(new_t);
1080       }
1081       break;
1082
1083     case ThreadYielding:
1084 #if defined(GRAN)
1085       IF_DEBUG(gran, 
1086                DumpGranEvent(GR_DESCHEDULE, t));
1087       globalGranStats.tot_yields++;
1088 #elif defined(PAR)
1089       // IF_DEBUG(par, 
1090       // DumpGranEvent(GR_DESCHEDULE, t);
1091       globalParStats.tot_yields++;
1092 #endif
1093       /* put the thread back on the run queue.  Then, if we're ready to
1094        * GC, check whether this is the last task to stop.  If so, wake
1095        * up the GC thread.  getThread will block during a GC until the
1096        * GC is finished.
1097        */
1098       IF_DEBUG(scheduler,
1099                if (t->what_next != prev_what_next) {
1100                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1101                          t->id, whatNext_strs[t->what_next]);
1102                } else {
1103                    belch("--<< thread %ld (%s) stopped, yielding", 
1104                          t->id, whatNext_strs[t->what_next]);
1105                }
1106                );
1107
1108       IF_DEBUG(sanity,
1109                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1110                checkTSO(t));
1111       ASSERT(t->link == END_TSO_QUEUE);
1112
1113       // Shortcut if we're just switching evaluators: don't bother
1114       // doing stack squeezing (which can be expensive), just run the
1115       // thread.
1116       if (t->what_next != prev_what_next) {
1117           goto run_thread;
1118       }
1119
1120       threadPaused(t);
1121
1122 #if defined(GRAN)
1123       ASSERT(!is_on_queue(t,CurrentProc));
1124
1125       IF_DEBUG(sanity,
1126                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1127                checkThreadQsSanity(rtsTrue));
1128 #endif
1129
1130 #if defined(PAR)
1131       if (RtsFlags.ParFlags.doFairScheduling) { 
1132         /* this does round-robin scheduling; good for concurrency */
1133         APPEND_TO_RUN_QUEUE(t);
1134       } else {
1135         /* this does unfair scheduling; good for parallelism */
1136         PUSH_ON_RUN_QUEUE(t);
1137       }
1138 #else
1139       // this does round-robin scheduling; good for concurrency
1140       APPEND_TO_RUN_QUEUE(t);
1141 #endif
1142
1143 #if defined(GRAN)
1144       /* add a ContinueThread event to actually process the thread */
1145       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1146                 ContinueThread,
1147                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1148       IF_GRAN_DEBUG(bq, 
1149                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1150                G_EVENTQ(0);
1151                G_CURR_THREADQ(0));
1152 #endif /* GRAN */
1153       break;
1154
1155     case ThreadBlocked:
1156 #if defined(GRAN)
1157       IF_DEBUG(scheduler,
1158                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1159                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1160                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1161
1162       // ??? needed; should emit block before
1163       IF_DEBUG(gran, 
1164                DumpGranEvent(GR_DESCHEDULE, t)); 
1165       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1166       /*
1167         ngoq Dogh!
1168       ASSERT(procStatus[CurrentProc]==Busy || 
1169               ((procStatus[CurrentProc]==Fetching) && 
1170               (t->block_info.closure!=(StgClosure*)NULL)));
1171       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1172           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1173             procStatus[CurrentProc]==Fetching)) 
1174         procStatus[CurrentProc] = Idle;
1175       */
1176 #elif defined(PAR)
1177       IF_DEBUG(scheduler,
1178                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1179                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1180       IF_PAR_DEBUG(bq,
1181
1182                    if (t->block_info.closure!=(StgClosure*)NULL) 
1183                      print_bq(t->block_info.closure));
1184
1185       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1186       blockThread(t);
1187
1188       /* whatever we schedule next, we must log that schedule */
1189       emitSchedule = rtsTrue;
1190
1191 #else /* !GRAN */
1192       /* don't need to do anything.  Either the thread is blocked on
1193        * I/O, in which case we'll have called addToBlockedQueue
1194        * previously, or it's blocked on an MVar or Blackhole, in which
1195        * case it'll be on the relevant queue already.
1196        */
1197       IF_DEBUG(scheduler,
1198                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1199                        t->id, whatNext_strs[t->what_next]);
1200                printThreadBlockage(t);
1201                fprintf(stderr, "\n"));
1202       fflush(stderr);
1203
1204       /* Only for dumping event to log file 
1205          ToDo: do I need this in GranSim, too?
1206       blockThread(t);
1207       */
1208 #endif
1209       threadPaused(t);
1210       break;
1211
1212     case ThreadFinished:
1213       /* Need to check whether this was a main thread, and if so, signal
1214        * the task that started it with the return value.  If we have no
1215        * more main threads, we probably need to stop all the tasks until
1216        * we get a new one.
1217        */
1218       /* We also end up here if the thread kills itself with an
1219        * uncaught exception, see Exception.hc.
1220        */
1221       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1222                                t->id, whatNext_strs[t->what_next]));
1223 #if defined(GRAN)
1224       endThread(t, CurrentProc); // clean-up the thread
1225 #elif defined(PAR)
1226       /* For now all are advisory -- HWL */
1227       //if(t->priority==AdvisoryPriority) ??
1228       advisory_thread_count--;
1229       
1230 # ifdef DIST
1231       if(t->dist.priority==RevalPriority)
1232         FinishReval(t);
1233 # endif
1234       
1235       if (RtsFlags.ParFlags.ParStats.Full &&
1236           !RtsFlags.ParFlags.ParStats.Suppressed) 
1237         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1238 #endif
1239
1240       //
1241       // Check whether the thread that just completed was a main
1242       // thread, and if so return with the result.  
1243       //
1244       // There is an assumption here that all thread completion goes
1245       // through this point; we need to make sure that if a thread
1246       // ends up in the ThreadKilled state, that it stays on the run
1247       // queue so it can be dealt with here.
1248       //
1249       if (
1250 #if defined(RTS_SUPPORTS_THREADS)
1251           mainThread != NULL
1252 #else
1253           mainThread->tso == t
1254 #endif
1255           )
1256       {
1257           // We are a bound thread: this must be our thread that just
1258           // completed.
1259           ASSERT(mainThread->tso == t);
1260
1261           if (t->what_next == ThreadComplete) {
1262               if (mainThread->ret) {
1263                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1264                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1265               }
1266               mainThread->stat = Success;
1267           } else {
1268               if (mainThread->ret) {
1269                   *(mainThread->ret) = NULL;
1270               }
1271               if (was_interrupted) {
1272                   mainThread->stat = Interrupted;
1273               } else {
1274                   mainThread->stat = Killed;
1275               }
1276           }
1277 #ifdef DEBUG
1278           removeThreadLabel((StgWord)mainThread->tso->id);
1279 #endif
1280           if (mainThread->prev == NULL) {
1281               main_threads = mainThread->link;
1282           } else {
1283               mainThread->prev->link = mainThread->link;
1284           }
1285           if (mainThread->link != NULL) {
1286               mainThread->link->prev = NULL;
1287           }
1288           releaseCapability(cap);
1289           return;
1290       }
1291
1292 #ifdef RTS_SUPPORTS_THREADS
1293       ASSERT(t->main == NULL);
1294 #else
1295       if (t->main != NULL) {
1296           // Must be a main thread that is not the topmost one.  Leave
1297           // it on the run queue until the stack has unwound to the
1298           // point where we can deal with this.  Leaving it on the run
1299           // queue also ensures that the garbage collector knows about
1300           // this thread and its return value (it gets dropped from the
1301           // all_threads list so there's no other way to find it).
1302           APPEND_TO_RUN_QUEUE(t);
1303       }
1304 #endif
1305       break;
1306
1307     default:
1308       barf("schedule: invalid thread return code %d", (int)ret);
1309     }
1310
1311 #ifdef PROFILING
1312     // When we have +RTS -i0 and we're heap profiling, do a census at
1313     // every GC.  This lets us get repeatable runs for debugging.
1314     if (performHeapProfile ||
1315         (RtsFlags.ProfFlags.profileInterval==0 &&
1316          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1317         GarbageCollect(GetRoots, rtsTrue);
1318         heapCensus();
1319         performHeapProfile = rtsFalse;
1320         ready_to_gc = rtsFalse; // we already GC'd
1321     }
1322 #endif
1323
1324     if (ready_to_gc) {
1325       /* everybody back, start the GC.
1326        * Could do it in this thread, or signal a condition var
1327        * to do it in another thread.  Either way, we need to
1328        * broadcast on gc_pending_cond afterward.
1329        */
1330 #if defined(RTS_SUPPORTS_THREADS)
1331       IF_DEBUG(scheduler,sched_belch("doing GC"));
1332 #endif
1333       GarbageCollect(GetRoots,rtsFalse);
1334       ready_to_gc = rtsFalse;
1335 #if defined(GRAN)
1336       /* add a ContinueThread event to continue execution of current thread */
1337       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1338                 ContinueThread,
1339                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1340       IF_GRAN_DEBUG(bq, 
1341                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1342                G_EVENTQ(0);
1343                G_CURR_THREADQ(0));
1344 #endif /* GRAN */
1345     }
1346
1347 #if defined(GRAN)
1348   next_thread:
1349     IF_GRAN_DEBUG(unused,
1350                   print_eventq(EventHd));
1351
1352     event = get_next_event();
1353 #elif defined(PAR)
1354   next_thread:
1355     /* ToDo: wait for next message to arrive rather than busy wait */
1356 #endif /* GRAN */
1357
1358   } /* end of while(1) */
1359
1360   IF_PAR_DEBUG(verbose,
1361                belch("== Leaving schedule() after having received Finish"));
1362 }
1363
1364 /* ---------------------------------------------------------------------------
1365  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1366  * used by Control.Concurrent for error checking.
1367  * ------------------------------------------------------------------------- */
1368  
1369 StgBool
1370 rtsSupportsBoundThreads(void)
1371 {
1372 #ifdef THREADED_RTS
1373   return rtsTrue;
1374 #else
1375   return rtsFalse;
1376 #endif
1377 }
1378
1379 /* ---------------------------------------------------------------------------
1380  * isThreadBound(tso): check whether tso is bound to an OS thread.
1381  * ------------------------------------------------------------------------- */
1382  
1383 StgBool
1384 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1385 {
1386 #ifdef THREADED_RTS
1387   return (tso->main != NULL);
1388 #endif
1389   return rtsFalse;
1390 }
1391
1392 /* ---------------------------------------------------------------------------
1393  * Singleton fork(). Do not copy any running threads.
1394  * ------------------------------------------------------------------------- */
1395
1396 #ifndef mingw32_TARGET_OS
1397 #define FORKPROCESS_PRIMOP_SUPPORTED
1398 #endif
1399
1400 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1401 static void 
1402 deleteThreadImmediately(StgTSO *tso);
1403 #endif
1404 StgInt
1405 forkProcess(HsStablePtr *entry
1406 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1407             STG_UNUSED
1408 #endif
1409            )
1410 {
1411 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1412   pid_t pid;
1413   StgTSO* t,*next;
1414   StgMainThread *m;
1415   SchedulerStatus rc;
1416
1417   IF_DEBUG(scheduler,sched_belch("forking!"));
1418   rts_lock(); // This not only acquires sched_mutex, it also
1419               // makes sure that no other threads are running
1420
1421   pid = fork();
1422
1423   if (pid) { /* parent */
1424
1425   /* just return the pid */
1426     rts_unlock();
1427     return pid;
1428     
1429   } else { /* child */
1430     
1431     
1432       // delete all threads
1433     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1434     
1435     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1436       next = t->link;
1437
1438         // don't allow threads to catch the ThreadKilled exception
1439       deleteThreadImmediately(t);
1440     }
1441     
1442       // wipe the main thread list
1443     while((m = main_threads) != NULL) {
1444       main_threads = m->link;
1445 # ifdef THREADED_RTS
1446       closeCondition(&m->bound_thread_cond);
1447 # endif
1448       stgFree(m);
1449     }
1450     
1451 # ifdef RTS_SUPPORTS_THREADS
1452     resetTaskManagerAfterFork();      // tell startTask() and friends that
1453     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1454     resetWorkerWakeupPipeAfterFork();
1455 # endif
1456     
1457     rc = rts_evalStableIO(entry, NULL);  // run the action
1458     rts_checkSchedStatus("forkProcess",rc);
1459     
1460     rts_unlock();
1461     
1462     hs_exit();                      // clean up and exit
1463     stg_exit(0);
1464   }
1465 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1466   barf("forkProcess#: primop not supported, sorry!\n");
1467   return -1;
1468 #endif
1469 }
1470
1471 /* ---------------------------------------------------------------------------
1472  * deleteAllThreads():  kill all the live threads.
1473  *
1474  * This is used when we catch a user interrupt (^C), before performing
1475  * any necessary cleanups and running finalizers.
1476  *
1477  * Locks: sched_mutex held.
1478  * ------------------------------------------------------------------------- */
1479    
1480 void
1481 deleteAllThreads ( void )
1482 {
1483   StgTSO* t, *next;
1484   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1485   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1486       next = t->global_link;
1487       deleteThread(t);
1488   }      
1489
1490   // The run queue now contains a bunch of ThreadKilled threads.  We
1491   // must not throw these away: the main thread(s) will be in there
1492   // somewhere, and the main scheduler loop has to deal with it.
1493   // Also, the run queue is the only thing keeping these threads from
1494   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1495
1496   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1497   ASSERT(sleeping_queue == END_TSO_QUEUE);
1498 }
1499
1500 /* startThread and  insertThread are now in GranSim.c -- HWL */
1501
1502
1503 /* ---------------------------------------------------------------------------
1504  * Suspending & resuming Haskell threads.
1505  * 
1506  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1507  * its capability before calling the C function.  This allows another
1508  * task to pick up the capability and carry on running Haskell
1509  * threads.  It also means that if the C call blocks, it won't lock
1510  * the whole system.
1511  *
1512  * The Haskell thread making the C call is put to sleep for the
1513  * duration of the call, on the susepended_ccalling_threads queue.  We
1514  * give out a token to the task, which it can use to resume the thread
1515  * on return from the C function.
1516  * ------------------------------------------------------------------------- */
1517    
1518 StgInt
1519 suspendThread( StgRegTable *reg, 
1520                rtsBool concCall
1521 #if !defined(DEBUG)
1522                STG_UNUSED
1523 #endif
1524                )
1525 {
1526   nat tok;
1527   Capability *cap;
1528   int saved_errno = errno;
1529
1530   /* assume that *reg is a pointer to the StgRegTable part
1531    * of a Capability.
1532    */
1533   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1534
1535   ACQUIRE_LOCK(&sched_mutex);
1536
1537   IF_DEBUG(scheduler,
1538            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1539
1540   // XXX this might not be necessary --SDM
1541   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1542
1543   threadPaused(cap->r.rCurrentTSO);
1544   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1545   suspended_ccalling_threads = cap->r.rCurrentTSO;
1546
1547   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1548       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1549       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1550   } else {
1551       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1552   }
1553
1554   /* Use the thread ID as the token; it should be unique */
1555   tok = cap->r.rCurrentTSO->id;
1556
1557   /* Hand back capability */
1558   releaseCapability(cap);
1559   
1560 #if defined(RTS_SUPPORTS_THREADS)
1561   /* Preparing to leave the RTS, so ensure there's a native thread/task
1562      waiting to take over.
1563   */
1564   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1565 #endif
1566
1567   /* Other threads _might_ be available for execution; signal this */
1568   THREAD_RUNNABLE();
1569   RELEASE_LOCK(&sched_mutex);
1570   
1571   errno = saved_errno;
1572   return tok; 
1573 }
1574
1575 StgRegTable *
1576 resumeThread( StgInt tok,
1577               rtsBool concCall STG_UNUSED )
1578 {
1579   StgTSO *tso, **prev;
1580   Capability *cap;
1581   int saved_errno = errno;
1582
1583 #if defined(RTS_SUPPORTS_THREADS)
1584   /* Wait for permission to re-enter the RTS with the result. */
1585   ACQUIRE_LOCK(&sched_mutex);
1586   waitForReturnCapability(&sched_mutex, &cap);
1587
1588   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1589 #else
1590   grabCapability(&cap);
1591 #endif
1592
1593   /* Remove the thread off of the suspended list */
1594   prev = &suspended_ccalling_threads;
1595   for (tso = suspended_ccalling_threads; 
1596        tso != END_TSO_QUEUE; 
1597        prev = &tso->link, tso = tso->link) {
1598     if (tso->id == (StgThreadID)tok) {
1599       *prev = tso->link;
1600       break;
1601     }
1602   }
1603   if (tso == END_TSO_QUEUE) {
1604     barf("resumeThread: thread not found");
1605   }
1606   tso->link = END_TSO_QUEUE;
1607   
1608   if(tso->why_blocked == BlockedOnCCall) {
1609       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1610       tso->blocked_exceptions = NULL;
1611   }
1612   
1613   /* Reset blocking status */
1614   tso->why_blocked  = NotBlocked;
1615
1616   cap->r.rCurrentTSO = tso;
1617   RELEASE_LOCK(&sched_mutex);
1618   errno = saved_errno;
1619   return &cap->r;
1620 }
1621
1622
1623 /* ---------------------------------------------------------------------------
1624  * Static functions
1625  * ------------------------------------------------------------------------ */
1626 static void unblockThread(StgTSO *tso);
1627
1628 /* ---------------------------------------------------------------------------
1629  * Comparing Thread ids.
1630  *
1631  * This is used from STG land in the implementation of the
1632  * instances of Eq/Ord for ThreadIds.
1633  * ------------------------------------------------------------------------ */
1634
1635 int
1636 cmp_thread(StgPtr tso1, StgPtr tso2) 
1637
1638   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1639   StgThreadID id2 = ((StgTSO *)tso2)->id;
1640  
1641   if (id1 < id2) return (-1);
1642   if (id1 > id2) return 1;
1643   return 0;
1644 }
1645
1646 /* ---------------------------------------------------------------------------
1647  * Fetching the ThreadID from an StgTSO.
1648  *
1649  * This is used in the implementation of Show for ThreadIds.
1650  * ------------------------------------------------------------------------ */
1651 int
1652 rts_getThreadId(StgPtr tso) 
1653 {
1654   return ((StgTSO *)tso)->id;
1655 }
1656
1657 #ifdef DEBUG
1658 void
1659 labelThread(StgPtr tso, char *label)
1660 {
1661   int len;
1662   void *buf;
1663
1664   /* Caveat: Once set, you can only set the thread name to "" */
1665   len = strlen(label)+1;
1666   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1667   strncpy(buf,label,len);
1668   /* Update will free the old memory for us */
1669   updateThreadLabel(((StgTSO *)tso)->id,buf);
1670 }
1671 #endif /* DEBUG */
1672
1673 /* ---------------------------------------------------------------------------
1674    Create a new thread.
1675
1676    The new thread starts with the given stack size.  Before the
1677    scheduler can run, however, this thread needs to have a closure
1678    (and possibly some arguments) pushed on its stack.  See
1679    pushClosure() in Schedule.h.
1680
1681    createGenThread() and createIOThread() (in SchedAPI.h) are
1682    convenient packaged versions of this function.
1683
1684    currently pri (priority) is only used in a GRAN setup -- HWL
1685    ------------------------------------------------------------------------ */
1686 #if defined(GRAN)
1687 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1688 StgTSO *
1689 createThread(nat size, StgInt pri)
1690 #else
1691 StgTSO *
1692 createThread(nat size)
1693 #endif
1694 {
1695
1696     StgTSO *tso;
1697     nat stack_size;
1698
1699     /* First check whether we should create a thread at all */
1700 #if defined(PAR)
1701   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1702   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1703     threadsIgnored++;
1704     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1705           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1706     return END_TSO_QUEUE;
1707   }
1708   threadsCreated++;
1709 #endif
1710
1711 #if defined(GRAN)
1712   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1713 #endif
1714
1715   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1716
1717   /* catch ridiculously small stack sizes */
1718   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1719     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1720   }
1721
1722   stack_size = size - TSO_STRUCT_SIZEW;
1723
1724   tso = (StgTSO *)allocate(size);
1725   TICK_ALLOC_TSO(stack_size, 0);
1726
1727   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1728 #if defined(GRAN)
1729   SET_GRAN_HDR(tso, ThisPE);
1730 #endif
1731
1732   // Always start with the compiled code evaluator
1733   tso->what_next = ThreadRunGHC;
1734
1735   tso->id = next_thread_id++; 
1736   tso->why_blocked  = NotBlocked;
1737   tso->blocked_exceptions = NULL;
1738
1739   tso->saved_errno = 0;
1740   tso->main = NULL;
1741   
1742   tso->stack_size   = stack_size;
1743   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1744                               - TSO_STRUCT_SIZEW;
1745   tso->sp           = (P_)&(tso->stack) + stack_size;
1746
1747 #ifdef PROFILING
1748   tso->prof.CCCS = CCS_MAIN;
1749 #endif
1750
1751   /* put a stop frame on the stack */
1752   tso->sp -= sizeofW(StgStopFrame);
1753   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1754   // ToDo: check this
1755 #if defined(GRAN)
1756   tso->link = END_TSO_QUEUE;
1757   /* uses more flexible routine in GranSim */
1758   insertThread(tso, CurrentProc);
1759 #else
1760   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1761    * from its creation
1762    */
1763 #endif
1764
1765 #if defined(GRAN) 
1766   if (RtsFlags.GranFlags.GranSimStats.Full) 
1767     DumpGranEvent(GR_START,tso);
1768 #elif defined(PAR)
1769   if (RtsFlags.ParFlags.ParStats.Full) 
1770     DumpGranEvent(GR_STARTQ,tso);
1771   /* HACk to avoid SCHEDULE 
1772      LastTSO = tso; */
1773 #endif
1774
1775   /* Link the new thread on the global thread list.
1776    */
1777   tso->global_link = all_threads;
1778   all_threads = tso;
1779
1780 #if defined(DIST)
1781   tso->dist.priority = MandatoryPriority; //by default that is...
1782 #endif
1783
1784 #if defined(GRAN)
1785   tso->gran.pri = pri;
1786 # if defined(DEBUG)
1787   tso->gran.magic = TSO_MAGIC; // debugging only
1788 # endif
1789   tso->gran.sparkname   = 0;
1790   tso->gran.startedat   = CURRENT_TIME; 
1791   tso->gran.exported    = 0;
1792   tso->gran.basicblocks = 0;
1793   tso->gran.allocs      = 0;
1794   tso->gran.exectime    = 0;
1795   tso->gran.fetchtime   = 0;
1796   tso->gran.fetchcount  = 0;
1797   tso->gran.blocktime   = 0;
1798   tso->gran.blockcount  = 0;
1799   tso->gran.blockedat   = 0;
1800   tso->gran.globalsparks = 0;
1801   tso->gran.localsparks  = 0;
1802   if (RtsFlags.GranFlags.Light)
1803     tso->gran.clock  = Now; /* local clock */
1804   else
1805     tso->gran.clock  = 0;
1806
1807   IF_DEBUG(gran,printTSO(tso));
1808 #elif defined(PAR)
1809 # if defined(DEBUG)
1810   tso->par.magic = TSO_MAGIC; // debugging only
1811 # endif
1812   tso->par.sparkname   = 0;
1813   tso->par.startedat   = CURRENT_TIME; 
1814   tso->par.exported    = 0;
1815   tso->par.basicblocks = 0;
1816   tso->par.allocs      = 0;
1817   tso->par.exectime    = 0;
1818   tso->par.fetchtime   = 0;
1819   tso->par.fetchcount  = 0;
1820   tso->par.blocktime   = 0;
1821   tso->par.blockcount  = 0;
1822   tso->par.blockedat   = 0;
1823   tso->par.globalsparks = 0;
1824   tso->par.localsparks  = 0;
1825 #endif
1826
1827 #if defined(GRAN)
1828   globalGranStats.tot_threads_created++;
1829   globalGranStats.threads_created_on_PE[CurrentProc]++;
1830   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1831   globalGranStats.tot_sq_probes++;
1832 #elif defined(PAR)
1833   // collect parallel global statistics (currently done together with GC stats)
1834   if (RtsFlags.ParFlags.ParStats.Global &&
1835       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1836     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1837     globalParStats.tot_threads_created++;
1838   }
1839 #endif 
1840
1841 #if defined(GRAN)
1842   IF_GRAN_DEBUG(pri,
1843                 belch("==__ schedule: Created TSO %d (%p);",
1844                       CurrentProc, tso, tso->id));
1845 #elif defined(PAR)
1846     IF_PAR_DEBUG(verbose,
1847                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1848                        tso->id, tso, advisory_thread_count));
1849 #else
1850   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1851                                  tso->id, tso->stack_size));
1852 #endif    
1853   return tso;
1854 }
1855
1856 #if defined(PAR)
1857 /* RFP:
1858    all parallel thread creation calls should fall through the following routine.
1859 */
1860 StgTSO *
1861 createSparkThread(rtsSpark spark) 
1862 { StgTSO *tso;
1863   ASSERT(spark != (rtsSpark)NULL);
1864   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1865   { threadsIgnored++;
1866     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1867           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1868     return END_TSO_QUEUE;
1869   }
1870   else
1871   { threadsCreated++;
1872     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1873     if (tso==END_TSO_QUEUE)     
1874       barf("createSparkThread: Cannot create TSO");
1875 #if defined(DIST)
1876     tso->priority = AdvisoryPriority;
1877 #endif
1878     pushClosure(tso,spark);
1879     PUSH_ON_RUN_QUEUE(tso);
1880     advisory_thread_count++;    
1881   }
1882   return tso;
1883 }
1884 #endif
1885
1886 /*
1887   Turn a spark into a thread.
1888   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1889 */
1890 #if defined(PAR)
1891 StgTSO *
1892 activateSpark (rtsSpark spark) 
1893 {
1894   StgTSO *tso;
1895
1896   tso = createSparkThread(spark);
1897   if (RtsFlags.ParFlags.ParStats.Full) {   
1898     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1899     IF_PAR_DEBUG(verbose,
1900                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1901                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1902   }
1903   // ToDo: fwd info on local/global spark to thread -- HWL
1904   // tso->gran.exported =  spark->exported;
1905   // tso->gran.locked =   !spark->global;
1906   // tso->gran.sparkname = spark->name;
1907
1908   return tso;
1909 }
1910 #endif
1911
1912 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1913                                    Capability *initialCapability
1914                                    );
1915
1916
1917 /* ---------------------------------------------------------------------------
1918  * scheduleThread()
1919  *
1920  * scheduleThread puts a thread on the head of the runnable queue.
1921  * This will usually be done immediately after a thread is created.
1922  * The caller of scheduleThread must create the thread using e.g.
1923  * createThread and push an appropriate closure
1924  * on this thread's stack before the scheduler is invoked.
1925  * ------------------------------------------------------------------------ */
1926
1927 static void scheduleThread_ (StgTSO* tso);
1928
1929 void
1930 scheduleThread_(StgTSO *tso)
1931 {
1932   // Precondition: sched_mutex must be held.
1933   PUSH_ON_RUN_QUEUE(tso);
1934   THREAD_RUNNABLE();
1935 }
1936
1937 void
1938 scheduleThread(StgTSO* tso)
1939 {
1940   ACQUIRE_LOCK(&sched_mutex);
1941   scheduleThread_(tso);
1942   RELEASE_LOCK(&sched_mutex);
1943 }
1944
1945 #if defined(RTS_SUPPORTS_THREADS)
1946 static Condition bound_cond_cache;
1947 static int bound_cond_cache_full = 0;
1948 #endif
1949
1950
1951 SchedulerStatus
1952 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1953                    Capability *initialCapability)
1954 {
1955     // Precondition: sched_mutex must be held
1956     StgMainThread *m;
1957
1958     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1959     m->tso = tso;
1960     tso->main = m;
1961     m->ret = ret;
1962     m->stat = NoStatus;
1963     m->link = main_threads;
1964     m->prev = NULL;
1965     if (main_threads != NULL) {
1966         main_threads->prev = m;
1967     }
1968     main_threads = m;
1969
1970 #if defined(RTS_SUPPORTS_THREADS)
1971     // Allocating a new condition for each thread is expensive, so we
1972     // cache one.  This is a pretty feeble hack, but it helps speed up
1973     // consecutive call-ins quite a bit.
1974     if (bound_cond_cache_full) {
1975         m->bound_thread_cond = bound_cond_cache;
1976         bound_cond_cache_full = 0;
1977     } else {
1978         initCondition(&m->bound_thread_cond);
1979     }
1980 #endif
1981
1982     /* Put the thread on the main-threads list prior to scheduling the TSO.
1983        Failure to do so introduces a race condition in the MT case (as
1984        identified by Wolfgang Thaller), whereby the new task/OS thread 
1985        created by scheduleThread_() would complete prior to the thread
1986        that spawned it managed to put 'itself' on the main-threads list.
1987        The upshot of it all being that the worker thread wouldn't get to
1988        signal the completion of the its work item for the main thread to
1989        see (==> it got stuck waiting.)    -- sof 6/02.
1990     */
1991     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1992     
1993     PUSH_ON_RUN_QUEUE(tso);
1994     // NB. Don't call THREAD_RUNNABLE() here, because the thread is
1995     // bound and only runnable by *this* OS thread, so waking up other
1996     // workers will just slow things down.
1997
1998     return waitThread_(m, initialCapability);
1999 }
2000
2001 /* ---------------------------------------------------------------------------
2002  * initScheduler()
2003  *
2004  * Initialise the scheduler.  This resets all the queues - if the
2005  * queues contained any threads, they'll be garbage collected at the
2006  * next pass.
2007  *
2008  * ------------------------------------------------------------------------ */
2009
2010 void 
2011 initScheduler(void)
2012 {
2013 #if defined(GRAN)
2014   nat i;
2015
2016   for (i=0; i<=MAX_PROC; i++) {
2017     run_queue_hds[i]      = END_TSO_QUEUE;
2018     run_queue_tls[i]      = END_TSO_QUEUE;
2019     blocked_queue_hds[i]  = END_TSO_QUEUE;
2020     blocked_queue_tls[i]  = END_TSO_QUEUE;
2021     ccalling_threadss[i]  = END_TSO_QUEUE;
2022     sleeping_queue        = END_TSO_QUEUE;
2023   }
2024 #else
2025   run_queue_hd      = END_TSO_QUEUE;
2026   run_queue_tl      = END_TSO_QUEUE;
2027   blocked_queue_hd  = END_TSO_QUEUE;
2028   blocked_queue_tl  = END_TSO_QUEUE;
2029   sleeping_queue    = END_TSO_QUEUE;
2030 #endif 
2031
2032   suspended_ccalling_threads  = END_TSO_QUEUE;
2033
2034   main_threads = NULL;
2035   all_threads  = END_TSO_QUEUE;
2036
2037   context_switch = 0;
2038   interrupted    = 0;
2039
2040   RtsFlags.ConcFlags.ctxtSwitchTicks =
2041       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2042       
2043 #if defined(RTS_SUPPORTS_THREADS)
2044   /* Initialise the mutex and condition variables used by
2045    * the scheduler. */
2046   initMutex(&sched_mutex);
2047   initMutex(&term_mutex);
2048 #endif
2049   
2050   ACQUIRE_LOCK(&sched_mutex);
2051
2052   /* A capability holds the state a native thread needs in
2053    * order to execute STG code. At least one capability is
2054    * floating around (only SMP builds have more than one).
2055    */
2056   initCapabilities();
2057   
2058 #if defined(RTS_SUPPORTS_THREADS)
2059     /* start our haskell execution tasks */
2060     startTaskManager(0,taskStart);
2061 #endif
2062
2063 #if /* defined(SMP) ||*/ defined(PAR)
2064   initSparkPools();
2065 #endif
2066
2067   RELEASE_LOCK(&sched_mutex);
2068 }
2069
2070 void
2071 exitScheduler( void )
2072 {
2073 #if defined(RTS_SUPPORTS_THREADS)
2074   stopTaskManager();
2075 #endif
2076   shutting_down_scheduler = rtsTrue;
2077 }
2078
2079 /* ----------------------------------------------------------------------------
2080    Managing the per-task allocation areas.
2081    
2082    Each capability comes with an allocation area.  These are
2083    fixed-length block lists into which allocation can be done.
2084
2085    ToDo: no support for two-space collection at the moment???
2086    ------------------------------------------------------------------------- */
2087
2088 static
2089 SchedulerStatus
2090 waitThread_(StgMainThread* m, Capability *initialCapability)
2091 {
2092   SchedulerStatus stat;
2093
2094   // Precondition: sched_mutex must be held.
2095   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2096
2097 #if defined(GRAN)
2098   /* GranSim specific init */
2099   CurrentTSO = m->tso;                // the TSO to run
2100   procStatus[MainProc] = Busy;        // status of main PE
2101   CurrentProc = MainProc;             // PE to run it on
2102   schedule(m,initialCapability);
2103 #else
2104   schedule(m,initialCapability);
2105   ASSERT(m->stat != NoStatus);
2106 #endif
2107
2108   stat = m->stat;
2109
2110 #if defined(RTS_SUPPORTS_THREADS)
2111   // Free the condition variable, returning it to the cache if possible.
2112   if (!bound_cond_cache_full) {
2113       bound_cond_cache = m->bound_thread_cond;
2114       bound_cond_cache_full = 1;
2115   } else {
2116       closeCondition(&m->bound_thread_cond);
2117   }
2118 #endif
2119
2120   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2121   stgFree(m);
2122
2123   // Postcondition: sched_mutex still held
2124   return stat;
2125 }
2126
2127 /* ---------------------------------------------------------------------------
2128    Where are the roots that we know about?
2129
2130         - all the threads on the runnable queue
2131         - all the threads on the blocked queue
2132         - all the threads on the sleeping queue
2133         - all the thread currently executing a _ccall_GC
2134         - all the "main threads"
2135      
2136    ------------------------------------------------------------------------ */
2137
2138 /* This has to be protected either by the scheduler monitor, or by the
2139         garbage collection monitor (probably the latter).
2140         KH @ 25/10/99
2141 */
2142
2143 void
2144 GetRoots( evac_fn evac )
2145 {
2146 #if defined(GRAN)
2147   {
2148     nat i;
2149     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2150       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2151           evac((StgClosure **)&run_queue_hds[i]);
2152       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2153           evac((StgClosure **)&run_queue_tls[i]);
2154       
2155       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2156           evac((StgClosure **)&blocked_queue_hds[i]);
2157       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2158           evac((StgClosure **)&blocked_queue_tls[i]);
2159       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2160           evac((StgClosure **)&ccalling_threads[i]);
2161     }
2162   }
2163
2164   markEventQueue();
2165
2166 #else /* !GRAN */
2167   if (run_queue_hd != END_TSO_QUEUE) {
2168       ASSERT(run_queue_tl != END_TSO_QUEUE);
2169       evac((StgClosure **)&run_queue_hd);
2170       evac((StgClosure **)&run_queue_tl);
2171   }
2172   
2173   if (blocked_queue_hd != END_TSO_QUEUE) {
2174       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2175       evac((StgClosure **)&blocked_queue_hd);
2176       evac((StgClosure **)&blocked_queue_tl);
2177   }
2178   
2179   if (sleeping_queue != END_TSO_QUEUE) {
2180       evac((StgClosure **)&sleeping_queue);
2181   }
2182 #endif 
2183
2184   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2185       evac((StgClosure **)&suspended_ccalling_threads);
2186   }
2187
2188 #if defined(PAR) || defined(GRAN)
2189   markSparkQueue(evac);
2190 #endif
2191
2192 #if defined(RTS_USER_SIGNALS)
2193   // mark the signal handlers (signals should be already blocked)
2194   markSignalHandlers(evac);
2195 #endif
2196 }
2197
2198 /* -----------------------------------------------------------------------------
2199    performGC
2200
2201    This is the interface to the garbage collector from Haskell land.
2202    We provide this so that external C code can allocate and garbage
2203    collect when called from Haskell via _ccall_GC.
2204
2205    It might be useful to provide an interface whereby the programmer
2206    can specify more roots (ToDo).
2207    
2208    This needs to be protected by the GC condition variable above.  KH.
2209    -------------------------------------------------------------------------- */
2210
2211 static void (*extra_roots)(evac_fn);
2212
2213 void
2214 performGC(void)
2215 {
2216   /* Obligated to hold this lock upon entry */
2217   ACQUIRE_LOCK(&sched_mutex);
2218   GarbageCollect(GetRoots,rtsFalse);
2219   RELEASE_LOCK(&sched_mutex);
2220 }
2221
2222 void
2223 performMajorGC(void)
2224 {
2225   ACQUIRE_LOCK(&sched_mutex);
2226   GarbageCollect(GetRoots,rtsTrue);
2227   RELEASE_LOCK(&sched_mutex);
2228 }
2229
2230 static void
2231 AllRoots(evac_fn evac)
2232 {
2233     GetRoots(evac);             // the scheduler's roots
2234     extra_roots(evac);          // the user's roots
2235 }
2236
2237 void
2238 performGCWithRoots(void (*get_roots)(evac_fn))
2239 {
2240   ACQUIRE_LOCK(&sched_mutex);
2241   extra_roots = get_roots;
2242   GarbageCollect(AllRoots,rtsFalse);
2243   RELEASE_LOCK(&sched_mutex);
2244 }
2245
2246 /* -----------------------------------------------------------------------------
2247    Stack overflow
2248
2249    If the thread has reached its maximum stack size, then raise the
2250    StackOverflow exception in the offending thread.  Otherwise
2251    relocate the TSO into a larger chunk of memory and adjust its stack
2252    size appropriately.
2253    -------------------------------------------------------------------------- */
2254
2255 static StgTSO *
2256 threadStackOverflow(StgTSO *tso)
2257 {
2258   nat new_stack_size, new_tso_size, stack_words;
2259   StgPtr new_sp;
2260   StgTSO *dest;
2261
2262   IF_DEBUG(sanity,checkTSO(tso));
2263   if (tso->stack_size >= tso->max_stack_size) {
2264
2265     IF_DEBUG(gc,
2266              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2267                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2268              /* If we're debugging, just print out the top of the stack */
2269              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2270                                               tso->sp+64)));
2271
2272     /* Send this thread the StackOverflow exception */
2273     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2274     return tso;
2275   }
2276
2277   /* Try to double the current stack size.  If that takes us over the
2278    * maximum stack size for this thread, then use the maximum instead.
2279    * Finally round up so the TSO ends up as a whole number of blocks.
2280    */
2281   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2282   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2283                                        TSO_STRUCT_SIZE)/sizeof(W_);
2284   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2285   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2286
2287   IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2288
2289   dest = (StgTSO *)allocate(new_tso_size);
2290   TICK_ALLOC_TSO(new_stack_size,0);
2291
2292   /* copy the TSO block and the old stack into the new area */
2293   memcpy(dest,tso,TSO_STRUCT_SIZE);
2294   stack_words = tso->stack + tso->stack_size - tso->sp;
2295   new_sp = (P_)dest + new_tso_size - stack_words;
2296   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2297
2298   /* relocate the stack pointers... */
2299   dest->sp         = new_sp;
2300   dest->stack_size = new_stack_size;
2301         
2302   /* Mark the old TSO as relocated.  We have to check for relocated
2303    * TSOs in the garbage collector and any primops that deal with TSOs.
2304    *
2305    * It's important to set the sp value to just beyond the end
2306    * of the stack, so we don't attempt to scavenge any part of the
2307    * dead TSO's stack.
2308    */
2309   tso->what_next = ThreadRelocated;
2310   tso->link = dest;
2311   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2312   tso->why_blocked = NotBlocked;
2313   dest->mut_link = NULL;
2314
2315   IF_PAR_DEBUG(verbose,
2316                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2317                      tso->id, tso, tso->stack_size);
2318                /* If we're debugging, just print out the top of the stack */
2319                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2320                                                 tso->sp+64)));
2321   
2322   IF_DEBUG(sanity,checkTSO(tso));
2323 #if 0
2324   IF_DEBUG(scheduler,printTSO(dest));
2325 #endif
2326
2327   return dest;
2328 }
2329
2330 /* ---------------------------------------------------------------------------
2331    Wake up a queue that was blocked on some resource.
2332    ------------------------------------------------------------------------ */
2333
2334 #if defined(GRAN)
2335 STATIC_INLINE void
2336 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2337 {
2338 }
2339 #elif defined(PAR)
2340 STATIC_INLINE void
2341 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2342 {
2343   /* write RESUME events to log file and
2344      update blocked and fetch time (depending on type of the orig closure) */
2345   if (RtsFlags.ParFlags.ParStats.Full) {
2346     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2347                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2348                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2349     if (EMPTY_RUN_QUEUE())
2350       emitSchedule = rtsTrue;
2351
2352     switch (get_itbl(node)->type) {
2353         case FETCH_ME_BQ:
2354           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2355           break;
2356         case RBH:
2357         case FETCH_ME:
2358         case BLACKHOLE_BQ:
2359           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2360           break;
2361 #ifdef DIST
2362         case MVAR:
2363           break;
2364 #endif    
2365         default:
2366           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2367         }
2368       }
2369 }
2370 #endif
2371
2372 #if defined(GRAN)
2373 static StgBlockingQueueElement *
2374 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2375 {
2376     StgTSO *tso;
2377     PEs node_loc, tso_loc;
2378
2379     node_loc = where_is(node); // should be lifted out of loop
2380     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2381     tso_loc = where_is((StgClosure *)tso);
2382     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2383       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2384       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2385       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2386       // insertThread(tso, node_loc);
2387       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2388                 ResumeThread,
2389                 tso, node, (rtsSpark*)NULL);
2390       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2391       // len_local++;
2392       // len++;
2393     } else { // TSO is remote (actually should be FMBQ)
2394       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2395                                   RtsFlags.GranFlags.Costs.gunblocktime +
2396                                   RtsFlags.GranFlags.Costs.latency;
2397       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2398                 UnblockThread,
2399                 tso, node, (rtsSpark*)NULL);
2400       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2401       // len++;
2402     }
2403     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2404     IF_GRAN_DEBUG(bq,
2405                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2406                           (node_loc==tso_loc ? "Local" : "Global"), 
2407                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2408     tso->block_info.closure = NULL;
2409     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2410                              tso->id, tso));
2411 }
2412 #elif defined(PAR)
2413 static StgBlockingQueueElement *
2414 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2415 {
2416     StgBlockingQueueElement *next;
2417
2418     switch (get_itbl(bqe)->type) {
2419     case TSO:
2420       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2421       /* if it's a TSO just push it onto the run_queue */
2422       next = bqe->link;
2423       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2424       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2425       THREAD_RUNNABLE();
2426       unblockCount(bqe, node);
2427       /* reset blocking status after dumping event */
2428       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2429       break;
2430
2431     case BLOCKED_FETCH:
2432       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2433       next = bqe->link;
2434       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2435       PendingFetches = (StgBlockedFetch *)bqe;
2436       break;
2437
2438 # if defined(DEBUG)
2439       /* can ignore this case in a non-debugging setup; 
2440          see comments on RBHSave closures above */
2441     case CONSTR:
2442       /* check that the closure is an RBHSave closure */
2443       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2444              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2445              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2446       break;
2447
2448     default:
2449       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2450            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2451            (StgClosure *)bqe);
2452 # endif
2453     }
2454   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2455   return next;
2456 }
2457
2458 #else /* !GRAN && !PAR */
2459 static StgTSO *
2460 unblockOneLocked(StgTSO *tso)
2461 {
2462   StgTSO *next;
2463
2464   ASSERT(get_itbl(tso)->type == TSO);
2465   ASSERT(tso->why_blocked != NotBlocked);
2466   tso->why_blocked = NotBlocked;
2467   next = tso->link;
2468   PUSH_ON_RUN_QUEUE(tso);
2469   THREAD_RUNNABLE();
2470   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2471   return next;
2472 }
2473 #endif
2474
2475 #if defined(GRAN) || defined(PAR)
2476 INLINE_ME StgBlockingQueueElement *
2477 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2478 {
2479   ACQUIRE_LOCK(&sched_mutex);
2480   bqe = unblockOneLocked(bqe, node);
2481   RELEASE_LOCK(&sched_mutex);
2482   return bqe;
2483 }
2484 #else
2485 INLINE_ME StgTSO *
2486 unblockOne(StgTSO *tso)
2487 {
2488   ACQUIRE_LOCK(&sched_mutex);
2489   tso = unblockOneLocked(tso);
2490   RELEASE_LOCK(&sched_mutex);
2491   return tso;
2492 }
2493 #endif
2494
2495 #if defined(GRAN)
2496 void 
2497 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2498 {
2499   StgBlockingQueueElement *bqe;
2500   PEs node_loc;
2501   nat len = 0; 
2502
2503   IF_GRAN_DEBUG(bq, 
2504                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2505                       node, CurrentProc, CurrentTime[CurrentProc], 
2506                       CurrentTSO->id, CurrentTSO));
2507
2508   node_loc = where_is(node);
2509
2510   ASSERT(q == END_BQ_QUEUE ||
2511          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2512          get_itbl(q)->type == CONSTR); // closure (type constructor)
2513   ASSERT(is_unique(node));
2514
2515   /* FAKE FETCH: magically copy the node to the tso's proc;
2516      no Fetch necessary because in reality the node should not have been 
2517      moved to the other PE in the first place
2518   */
2519   if (CurrentProc!=node_loc) {
2520     IF_GRAN_DEBUG(bq, 
2521                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2522                         node, node_loc, CurrentProc, CurrentTSO->id, 
2523                         // CurrentTSO, where_is(CurrentTSO),
2524                         node->header.gran.procs));
2525     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2526     IF_GRAN_DEBUG(bq, 
2527                   belch("## new bitmask of node %p is %#x",
2528                         node, node->header.gran.procs));
2529     if (RtsFlags.GranFlags.GranSimStats.Global) {
2530       globalGranStats.tot_fake_fetches++;
2531     }
2532   }
2533
2534   bqe = q;
2535   // ToDo: check: ASSERT(CurrentProc==node_loc);
2536   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2537     //next = bqe->link;
2538     /* 
2539        bqe points to the current element in the queue
2540        next points to the next element in the queue
2541     */
2542     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2543     //tso_loc = where_is(tso);
2544     len++;
2545     bqe = unblockOneLocked(bqe, node);
2546   }
2547
2548   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2549      the closure to make room for the anchor of the BQ */
2550   if (bqe!=END_BQ_QUEUE) {
2551     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2552     /*
2553     ASSERT((info_ptr==&RBH_Save_0_info) ||
2554            (info_ptr==&RBH_Save_1_info) ||
2555            (info_ptr==&RBH_Save_2_info));
2556     */
2557     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2558     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2559     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2560
2561     IF_GRAN_DEBUG(bq,
2562                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2563                         node, info_type(node)));
2564   }
2565
2566   /* statistics gathering */
2567   if (RtsFlags.GranFlags.GranSimStats.Global) {
2568     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2569     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2570     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2571     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2572   }
2573   IF_GRAN_DEBUG(bq,
2574                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2575                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2576 }
2577 #elif defined(PAR)
2578 void 
2579 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2580 {
2581   StgBlockingQueueElement *bqe;
2582
2583   ACQUIRE_LOCK(&sched_mutex);
2584
2585   IF_PAR_DEBUG(verbose, 
2586                belch("##-_ AwBQ for node %p on [%x]: ",
2587                      node, mytid));
2588 #ifdef DIST  
2589   //RFP
2590   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2591     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2592     return;
2593   }
2594 #endif
2595   
2596   ASSERT(q == END_BQ_QUEUE ||
2597          get_itbl(q)->type == TSO ||           
2598          get_itbl(q)->type == BLOCKED_FETCH || 
2599          get_itbl(q)->type == CONSTR); 
2600
2601   bqe = q;
2602   while (get_itbl(bqe)->type==TSO || 
2603          get_itbl(bqe)->type==BLOCKED_FETCH) {
2604     bqe = unblockOneLocked(bqe, node);
2605   }
2606   RELEASE_LOCK(&sched_mutex);
2607 }
2608
2609 #else   /* !GRAN && !PAR */
2610
2611 void
2612 awakenBlockedQueueNoLock(StgTSO *tso)
2613 {
2614   while (tso != END_TSO_QUEUE) {
2615     tso = unblockOneLocked(tso);
2616   }
2617 }
2618
2619 void
2620 awakenBlockedQueue(StgTSO *tso)
2621 {
2622   ACQUIRE_LOCK(&sched_mutex);
2623   while (tso != END_TSO_QUEUE) {
2624     tso = unblockOneLocked(tso);
2625   }
2626   RELEASE_LOCK(&sched_mutex);
2627 }
2628 #endif
2629
2630 /* ---------------------------------------------------------------------------
2631    Interrupt execution
2632    - usually called inside a signal handler so it mustn't do anything fancy.   
2633    ------------------------------------------------------------------------ */
2634
2635 void
2636 interruptStgRts(void)
2637 {
2638     interrupted    = 1;
2639     context_switch = 1;
2640 #ifdef RTS_SUPPORTS_THREADS
2641     wakeBlockedWorkerThread();
2642 #endif
2643 }
2644
2645 /* -----------------------------------------------------------------------------
2646    Unblock a thread
2647
2648    This is for use when we raise an exception in another thread, which
2649    may be blocked.
2650    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2651    -------------------------------------------------------------------------- */
2652
2653 #if defined(GRAN) || defined(PAR)
2654 /*
2655   NB: only the type of the blocking queue is different in GranSim and GUM
2656       the operations on the queue-elements are the same
2657       long live polymorphism!
2658
2659   Locks: sched_mutex is held upon entry and exit.
2660
2661 */
2662 static void
2663 unblockThread(StgTSO *tso)
2664 {
2665   StgBlockingQueueElement *t, **last;
2666
2667   switch (tso->why_blocked) {
2668
2669   case NotBlocked:
2670     return;  /* not blocked */
2671
2672   case BlockedOnMVar:
2673     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2674     {
2675       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2676       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2677
2678       last = (StgBlockingQueueElement **)&mvar->head;
2679       for (t = (StgBlockingQueueElement *)mvar->head; 
2680            t != END_BQ_QUEUE; 
2681            last = &t->link, last_tso = t, t = t->link) {
2682         if (t == (StgBlockingQueueElement *)tso) {
2683           *last = (StgBlockingQueueElement *)tso->link;
2684           if (mvar->tail == tso) {
2685             mvar->tail = (StgTSO *)last_tso;
2686           }
2687           goto done;
2688         }
2689       }
2690       barf("unblockThread (MVAR): TSO not found");
2691     }
2692
2693   case BlockedOnBlackHole:
2694     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2695     {
2696       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2697
2698       last = &bq->blocking_queue;
2699       for (t = bq->blocking_queue; 
2700            t != END_BQ_QUEUE; 
2701            last = &t->link, t = t->link) {
2702         if (t == (StgBlockingQueueElement *)tso) {
2703           *last = (StgBlockingQueueElement *)tso->link;
2704           goto done;
2705         }
2706       }
2707       barf("unblockThread (BLACKHOLE): TSO not found");
2708     }
2709
2710   case BlockedOnException:
2711     {
2712       StgTSO *target  = tso->block_info.tso;
2713
2714       ASSERT(get_itbl(target)->type == TSO);
2715
2716       if (target->what_next == ThreadRelocated) {
2717           target = target->link;
2718           ASSERT(get_itbl(target)->type == TSO);
2719       }
2720
2721       ASSERT(target->blocked_exceptions != NULL);
2722
2723       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2724       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2725            t != END_BQ_QUEUE; 
2726            last = &t->link, t = t->link) {
2727         ASSERT(get_itbl(t)->type == TSO);
2728         if (t == (StgBlockingQueueElement *)tso) {
2729           *last = (StgBlockingQueueElement *)tso->link;
2730           goto done;
2731         }
2732       }
2733       barf("unblockThread (Exception): TSO not found");
2734     }
2735
2736   case BlockedOnRead:
2737   case BlockedOnWrite:
2738 #if defined(mingw32_TARGET_OS)
2739   case BlockedOnDoProc:
2740 #endif
2741     {
2742       /* take TSO off blocked_queue */
2743       StgBlockingQueueElement *prev = NULL;
2744       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2745            prev = t, t = t->link) {
2746         if (t == (StgBlockingQueueElement *)tso) {
2747           if (prev == NULL) {
2748             blocked_queue_hd = (StgTSO *)t->link;
2749             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2750               blocked_queue_tl = END_TSO_QUEUE;
2751             }
2752           } else {
2753             prev->link = t->link;
2754             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2755               blocked_queue_tl = (StgTSO *)prev;
2756             }
2757           }
2758           goto done;
2759         }
2760       }
2761       barf("unblockThread (I/O): TSO not found");
2762     }
2763
2764   case BlockedOnDelay:
2765     {
2766       /* take TSO off sleeping_queue */
2767       StgBlockingQueueElement *prev = NULL;
2768       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2769            prev = t, t = t->link) {
2770         if (t == (StgBlockingQueueElement *)tso) {
2771           if (prev == NULL) {
2772             sleeping_queue = (StgTSO *)t->link;
2773           } else {
2774             prev->link = t->link;
2775           }
2776           goto done;
2777         }
2778       }
2779       barf("unblockThread (delay): TSO not found");
2780     }
2781
2782   default:
2783     barf("unblockThread");
2784   }
2785
2786  done:
2787   tso->link = END_TSO_QUEUE;
2788   tso->why_blocked = NotBlocked;
2789   tso->block_info.closure = NULL;
2790   PUSH_ON_RUN_QUEUE(tso);
2791 }
2792 #else
2793 static void
2794 unblockThread(StgTSO *tso)
2795 {
2796   StgTSO *t, **last;
2797   
2798   /* To avoid locking unnecessarily. */
2799   if (tso->why_blocked == NotBlocked) {
2800     return;
2801   }
2802
2803   switch (tso->why_blocked) {
2804
2805   case BlockedOnMVar:
2806     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2807     {
2808       StgTSO *last_tso = END_TSO_QUEUE;
2809       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2810
2811       last = &mvar->head;
2812       for (t = mvar->head; t != END_TSO_QUEUE; 
2813            last = &t->link, last_tso = t, t = t->link) {
2814         if (t == tso) {
2815           *last = tso->link;
2816           if (mvar->tail == tso) {
2817             mvar->tail = last_tso;
2818           }
2819           goto done;
2820         }
2821       }
2822       barf("unblockThread (MVAR): TSO not found");
2823     }
2824
2825   case BlockedOnBlackHole:
2826     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2827     {
2828       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2829
2830       last = &bq->blocking_queue;
2831       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2832            last = &t->link, t = t->link) {
2833         if (t == tso) {
2834           *last = tso->link;
2835           goto done;
2836         }
2837       }
2838       barf("unblockThread (BLACKHOLE): TSO not found");
2839     }
2840
2841   case BlockedOnException:
2842     {
2843       StgTSO *target  = tso->block_info.tso;
2844
2845       ASSERT(get_itbl(target)->type == TSO);
2846
2847       while (target->what_next == ThreadRelocated) {
2848           target = target->link;
2849           ASSERT(get_itbl(target)->type == TSO);
2850       }
2851       
2852       ASSERT(target->blocked_exceptions != NULL);
2853
2854       last = &target->blocked_exceptions;
2855       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2856            last = &t->link, t = t->link) {
2857         ASSERT(get_itbl(t)->type == TSO);
2858         if (t == tso) {
2859           *last = tso->link;
2860           goto done;
2861         }
2862       }
2863       barf("unblockThread (Exception): TSO not found");
2864     }
2865
2866   case BlockedOnRead:
2867   case BlockedOnWrite:
2868 #if defined(mingw32_TARGET_OS)
2869   case BlockedOnDoProc:
2870 #endif
2871     {
2872       StgTSO *prev = NULL;
2873       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2874            prev = t, t = t->link) {
2875         if (t == tso) {
2876           if (prev == NULL) {
2877             blocked_queue_hd = t->link;
2878             if (blocked_queue_tl == t) {
2879               blocked_queue_tl = END_TSO_QUEUE;
2880             }
2881           } else {
2882             prev->link = t->link;
2883             if (blocked_queue_tl == t) {
2884               blocked_queue_tl = prev;
2885             }
2886           }
2887           goto done;
2888         }
2889       }
2890       barf("unblockThread (I/O): TSO not found");
2891     }
2892
2893   case BlockedOnDelay:
2894     {
2895       StgTSO *prev = NULL;
2896       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2897            prev = t, t = t->link) {
2898         if (t == tso) {
2899           if (prev == NULL) {
2900             sleeping_queue = t->link;
2901           } else {
2902             prev->link = t->link;
2903           }
2904           goto done;
2905         }
2906       }
2907       barf("unblockThread (delay): TSO not found");
2908     }
2909
2910   default:
2911     barf("unblockThread");
2912   }
2913
2914  done:
2915   tso->link = END_TSO_QUEUE;
2916   tso->why_blocked = NotBlocked;
2917   tso->block_info.closure = NULL;
2918   PUSH_ON_RUN_QUEUE(tso);
2919 }
2920 #endif
2921
2922 /* -----------------------------------------------------------------------------
2923  * raiseAsync()
2924  *
2925  * The following function implements the magic for raising an
2926  * asynchronous exception in an existing thread.
2927  *
2928  * We first remove the thread from any queue on which it might be
2929  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2930  *
2931  * We strip the stack down to the innermost CATCH_FRAME, building
2932  * thunks in the heap for all the active computations, so they can 
2933  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2934  * an application of the handler to the exception, and push it on
2935  * the top of the stack.
2936  * 
2937  * How exactly do we save all the active computations?  We create an
2938  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2939  * AP_STACKs pushes everything from the corresponding update frame
2940  * upwards onto the stack.  (Actually, it pushes everything up to the
2941  * next update frame plus a pointer to the next AP_STACK object.
2942  * Entering the next AP_STACK object pushes more onto the stack until we
2943  * reach the last AP_STACK object - at which point the stack should look
2944  * exactly as it did when we killed the TSO and we can continue
2945  * execution by entering the closure on top of the stack.
2946  *
2947  * We can also kill a thread entirely - this happens if either (a) the 
2948  * exception passed to raiseAsync is NULL, or (b) there's no
2949  * CATCH_FRAME on the stack.  In either case, we strip the entire
2950  * stack and replace the thread with a zombie.
2951  *
2952  * Locks: sched_mutex held upon entry nor exit.
2953  *
2954  * -------------------------------------------------------------------------- */
2955  
2956 void 
2957 deleteThread(StgTSO *tso)
2958 {
2959   raiseAsync(tso,NULL);
2960 }
2961
2962 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2963 static void 
2964 deleteThreadImmediately(StgTSO *tso)
2965 { // for forkProcess only:
2966   // delete thread without giving it a chance to catch the KillThread exception
2967
2968   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2969       return;
2970   }
2971
2972   if (tso->why_blocked != BlockedOnCCall &&
2973       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2974     unblockThread(tso);
2975   }
2976
2977   tso->what_next = ThreadKilled;
2978 }
2979 #endif
2980
2981 void
2982 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2983 {
2984   /* When raising async exs from contexts where sched_mutex isn't held;
2985      use raiseAsyncWithLock(). */
2986   ACQUIRE_LOCK(&sched_mutex);
2987   raiseAsync(tso,exception);
2988   RELEASE_LOCK(&sched_mutex);
2989 }
2990
2991 void
2992 raiseAsync(StgTSO *tso, StgClosure *exception)
2993 {
2994     StgRetInfoTable *info;
2995     StgPtr sp;
2996   
2997     // Thread already dead?
2998     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2999         return;
3000     }
3001
3002     IF_DEBUG(scheduler, 
3003              sched_belch("raising exception in thread %ld.", tso->id));
3004     
3005     // Remove it from any blocking queues
3006     unblockThread(tso);
3007
3008     sp = tso->sp;
3009     
3010     // The stack freezing code assumes there's a closure pointer on
3011     // the top of the stack, so we have to arrange that this is the case...
3012     //
3013     if (sp[0] == (W_)&stg_enter_info) {
3014         sp++;
3015     } else {
3016         sp--;
3017         sp[0] = (W_)&stg_dummy_ret_closure;
3018     }
3019
3020     while (1) {
3021         nat i;
3022
3023         // 1. Let the top of the stack be the "current closure"
3024         //
3025         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3026         // CATCH_FRAME.
3027         //
3028         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3029         // current closure applied to the chunk of stack up to (but not
3030         // including) the update frame.  This closure becomes the "current
3031         // closure".  Go back to step 2.
3032         //
3033         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3034         // top of the stack applied to the exception.
3035         // 
3036         // 5. If it's a STOP_FRAME, then kill the thread.
3037         
3038         StgPtr frame;
3039         
3040         frame = sp + 1;
3041         info = get_ret_itbl((StgClosure *)frame);
3042         
3043         while (info->i.type != UPDATE_FRAME
3044                && (info->i.type != CATCH_FRAME || exception == NULL)
3045                && info->i.type != STOP_FRAME) {
3046             frame += stack_frame_sizeW((StgClosure *)frame);
3047             info = get_ret_itbl((StgClosure *)frame);
3048         }
3049         
3050         switch (info->i.type) {
3051             
3052         case CATCH_FRAME:
3053             // If we find a CATCH_FRAME, and we've got an exception to raise,
3054             // then build the THUNK raise(exception), and leave it on
3055             // top of the CATCH_FRAME ready to enter.
3056             //
3057         {
3058 #ifdef PROFILING
3059             StgCatchFrame *cf = (StgCatchFrame *)frame;
3060 #endif
3061             StgClosure *raise;
3062             
3063             // we've got an exception to raise, so let's pass it to the
3064             // handler in this frame.
3065             //
3066             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3067             TICK_ALLOC_SE_THK(1,0);
3068             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3069             raise->payload[0] = exception;
3070             
3071             // throw away the stack from Sp up to the CATCH_FRAME.
3072             //
3073             sp = frame - 1;
3074             
3075             /* Ensure that async excpetions are blocked now, so we don't get
3076              * a surprise exception before we get around to executing the
3077              * handler.
3078              */
3079             if (tso->blocked_exceptions == NULL) {
3080                 tso->blocked_exceptions = END_TSO_QUEUE;
3081             }
3082             
3083             /* Put the newly-built THUNK on top of the stack, ready to execute
3084              * when the thread restarts.
3085              */
3086             sp[0] = (W_)raise;
3087             sp[-1] = (W_)&stg_enter_info;
3088             tso->sp = sp-1;
3089             tso->what_next = ThreadRunGHC;
3090             IF_DEBUG(sanity, checkTSO(tso));
3091             return;
3092         }
3093         
3094         case UPDATE_FRAME:
3095         {
3096             StgAP_STACK * ap;
3097             nat words;
3098             
3099             // First build an AP_STACK consisting of the stack chunk above the
3100             // current update frame, with the top word on the stack as the
3101             // fun field.
3102             //
3103             words = frame - sp - 1;
3104             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3105             
3106             ap->size = words;
3107             ap->fun  = (StgClosure *)sp[0];
3108             sp++;
3109             for(i=0; i < (nat)words; ++i) {
3110                 ap->payload[i] = (StgClosure *)*sp++;
3111             }
3112             
3113             SET_HDR(ap,&stg_AP_STACK_info,
3114                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3115             TICK_ALLOC_UP_THK(words+1,0);
3116             
3117             IF_DEBUG(scheduler,
3118                      fprintf(stderr,  "sched: Updating ");
3119                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3120                      fprintf(stderr,  " with ");
3121                      printObj((StgClosure *)ap);
3122                 );
3123
3124             // Replace the updatee with an indirection - happily
3125             // this will also wake up any threads currently
3126             // waiting on the result.
3127             //
3128             // Warning: if we're in a loop, more than one update frame on
3129             // the stack may point to the same object.  Be careful not to
3130             // overwrite an IND_OLDGEN in this case, because we'll screw
3131             // up the mutable lists.  To be on the safe side, don't
3132             // overwrite any kind of indirection at all.  See also
3133             // threadSqueezeStack in GC.c, where we have to make a similar
3134             // check.
3135             //
3136             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3137                 // revert the black hole
3138                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3139             }
3140             sp += sizeofW(StgUpdateFrame) - 1;
3141             sp[0] = (W_)ap; // push onto stack
3142             break;
3143         }
3144         
3145         case STOP_FRAME:
3146             // We've stripped the entire stack, the thread is now dead.
3147             sp += sizeofW(StgStopFrame);
3148             tso->what_next = ThreadKilled;
3149             tso->sp = sp;
3150             return;
3151             
3152         default:
3153             barf("raiseAsync");
3154         }
3155     }
3156     barf("raiseAsync");
3157 }
3158
3159 /* -----------------------------------------------------------------------------
3160    resurrectThreads is called after garbage collection on the list of
3161    threads found to be garbage.  Each of these threads will be woken
3162    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3163    on an MVar, or NonTermination if the thread was blocked on a Black
3164    Hole.
3165
3166    Locks: sched_mutex isn't held upon entry nor exit.
3167    -------------------------------------------------------------------------- */
3168
3169 void
3170 resurrectThreads( StgTSO *threads )
3171 {
3172   StgTSO *tso, *next;
3173
3174   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3175     next = tso->global_link;
3176     tso->global_link = all_threads;
3177     all_threads = tso;
3178     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3179
3180     switch (tso->why_blocked) {
3181     case BlockedOnMVar:
3182     case BlockedOnException:
3183       /* Called by GC - sched_mutex lock is currently held. */
3184       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3185       break;
3186     case BlockedOnBlackHole:
3187       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3188       break;
3189     case NotBlocked:
3190       /* This might happen if the thread was blocked on a black hole
3191        * belonging to a thread that we've just woken up (raiseAsync
3192        * can wake up threads, remember...).
3193        */
3194       continue;
3195     default:
3196       barf("resurrectThreads: thread blocked in a strange way");
3197     }
3198   }
3199 }
3200
3201 /* -----------------------------------------------------------------------------
3202  * Blackhole detection: if we reach a deadlock, test whether any
3203  * threads are blocked on themselves.  Any threads which are found to
3204  * be self-blocked get sent a NonTermination exception.
3205  *
3206  * This is only done in a deadlock situation in order to avoid
3207  * performance overhead in the normal case.
3208  *
3209  * Locks: sched_mutex is held upon entry and exit.
3210  * -------------------------------------------------------------------------- */
3211
3212 static void
3213 detectBlackHoles( void )
3214 {
3215     StgTSO *tso = all_threads;
3216     StgClosure *frame;
3217     StgClosure *blocked_on;
3218     StgRetInfoTable *info;
3219
3220     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3221
3222         while (tso->what_next == ThreadRelocated) {
3223             tso = tso->link;
3224             ASSERT(get_itbl(tso)->type == TSO);
3225         }
3226       
3227         if (tso->why_blocked != BlockedOnBlackHole) {
3228             continue;
3229         }
3230         blocked_on = tso->block_info.closure;
3231
3232         frame = (StgClosure *)tso->sp;
3233
3234         while(1) {
3235             info = get_ret_itbl(frame);
3236             switch (info->i.type) {
3237             case UPDATE_FRAME:
3238                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3239                     /* We are blocking on one of our own computations, so
3240                      * send this thread the NonTermination exception.  
3241                      */
3242                     IF_DEBUG(scheduler, 
3243                              sched_belch("thread %d is blocked on itself", tso->id));
3244                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3245                     goto done;
3246                 }
3247                 
3248                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3249                 continue;
3250
3251             case STOP_FRAME:
3252                 goto done;
3253
3254                 // normal stack frames; do nothing except advance the pointer
3255             default:
3256                 (StgPtr)frame += stack_frame_sizeW(frame);
3257             }
3258         }   
3259         done: ;
3260     }
3261 }
3262
3263 /* ----------------------------------------------------------------------------
3264  * Debugging: why is a thread blocked
3265  * [Also provides useful information when debugging threaded programs
3266  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3267    ------------------------------------------------------------------------- */
3268
3269 static
3270 void
3271 printThreadBlockage(StgTSO *tso)
3272 {
3273   switch (tso->why_blocked) {
3274   case BlockedOnRead:
3275     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3276     break;
3277   case BlockedOnWrite:
3278     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3279     break;
3280 #if defined(mingw32_TARGET_OS)
3281     case BlockedOnDoProc:
3282     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3283     break;
3284 #endif
3285   case BlockedOnDelay:
3286     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3287     break;
3288   case BlockedOnMVar:
3289     fprintf(stderr,"is blocked on an MVar");
3290     break;
3291   case BlockedOnException:
3292     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3293             tso->block_info.tso->id);
3294     break;
3295   case BlockedOnBlackHole:
3296     fprintf(stderr,"is blocked on a black hole");
3297     break;
3298   case NotBlocked:
3299     fprintf(stderr,"is not blocked");
3300     break;
3301 #if defined(PAR)
3302   case BlockedOnGA:
3303     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3304             tso->block_info.closure, info_type(tso->block_info.closure));
3305     break;
3306   case BlockedOnGA_NoSend:
3307     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3308             tso->block_info.closure, info_type(tso->block_info.closure));
3309     break;
3310 #endif
3311   case BlockedOnCCall:
3312     fprintf(stderr,"is blocked on an external call");
3313     break;
3314   case BlockedOnCCall_NoUnblockExc:
3315     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3316     break;
3317   default:
3318     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3319          tso->why_blocked, tso->id, tso);
3320   }
3321 }
3322
3323 static
3324 void
3325 printThreadStatus(StgTSO *tso)
3326 {
3327   switch (tso->what_next) {
3328   case ThreadKilled:
3329     fprintf(stderr,"has been killed");
3330     break;
3331   case ThreadComplete:
3332     fprintf(stderr,"has completed");
3333     break;
3334   default:
3335     printThreadBlockage(tso);
3336   }
3337 }
3338
3339 void
3340 printAllThreads(void)
3341 {
3342   StgTSO *t;
3343   void *label;
3344
3345 # if defined(GRAN)
3346   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3347   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3348                        time_string, rtsFalse/*no commas!*/);
3349
3350   fprintf(stderr, "all threads at [%s]:\n", time_string);
3351 # elif defined(PAR)
3352   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3353   ullong_format_string(CURRENT_TIME,
3354                        time_string, rtsFalse/*no commas!*/);
3355
3356   fprintf(stderr,"all threads at [%s]:\n", time_string);
3357 # else
3358   fprintf(stderr,"all threads:\n");
3359 # endif
3360
3361   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3362     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3363     label = lookupThreadLabel(t->id);
3364     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3365     printThreadStatus(t);
3366     fprintf(stderr,"\n");
3367   }
3368 }
3369     
3370 #ifdef DEBUG
3371
3372 /* 
3373    Print a whole blocking queue attached to node (debugging only).
3374 */
3375 # if defined(PAR)
3376 void 
3377 print_bq (StgClosure *node)
3378 {
3379   StgBlockingQueueElement *bqe;
3380   StgTSO *tso;
3381   rtsBool end;
3382
3383   fprintf(stderr,"## BQ of closure %p (%s): ",
3384           node, info_type(node));
3385
3386   /* should cover all closures that may have a blocking queue */
3387   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3388          get_itbl(node)->type == FETCH_ME_BQ ||
3389          get_itbl(node)->type == RBH ||
3390          get_itbl(node)->type == MVAR);
3391     
3392   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3393
3394   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3395 }
3396
3397 /* 
3398    Print a whole blocking queue starting with the element bqe.
3399 */
3400 void 
3401 print_bqe (StgBlockingQueueElement *bqe)
3402 {
3403   rtsBool end;
3404
3405   /* 
3406      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3407   */
3408   for (end = (bqe==END_BQ_QUEUE);
3409        !end; // iterate until bqe points to a CONSTR
3410        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3411        bqe = end ? END_BQ_QUEUE : bqe->link) {
3412     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3413     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3414     /* types of closures that may appear in a blocking queue */
3415     ASSERT(get_itbl(bqe)->type == TSO ||           
3416            get_itbl(bqe)->type == BLOCKED_FETCH || 
3417            get_itbl(bqe)->type == CONSTR); 
3418     /* only BQs of an RBH end with an RBH_Save closure */
3419     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3420
3421     switch (get_itbl(bqe)->type) {
3422     case TSO:
3423       fprintf(stderr," TSO %u (%x),",
3424               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3425       break;
3426     case BLOCKED_FETCH:
3427       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3428               ((StgBlockedFetch *)bqe)->node, 
3429               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3430               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3431               ((StgBlockedFetch *)bqe)->ga.weight);
3432       break;
3433     case CONSTR:
3434       fprintf(stderr," %s (IP %p),",
3435               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3436                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3437                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3438                "RBH_Save_?"), get_itbl(bqe));
3439       break;
3440     default:
3441       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3442            info_type((StgClosure *)bqe)); // , node, info_type(node));
3443       break;
3444     }
3445   } /* for */
3446   fputc('\n', stderr);
3447 }
3448 # elif defined(GRAN)
3449 void 
3450 print_bq (StgClosure *node)
3451 {
3452   StgBlockingQueueElement *bqe;
3453   PEs node_loc, tso_loc;
3454   rtsBool end;
3455
3456   /* should cover all closures that may have a blocking queue */
3457   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3458          get_itbl(node)->type == FETCH_ME_BQ ||
3459          get_itbl(node)->type == RBH);
3460     
3461   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3462   node_loc = where_is(node);
3463
3464   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3465           node, info_type(node), node_loc);
3466
3467   /* 
3468      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3469   */
3470   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3471        !end; // iterate until bqe points to a CONSTR
3472        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3473     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3474     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3475     /* types of closures that may appear in a blocking queue */
3476     ASSERT(get_itbl(bqe)->type == TSO ||           
3477            get_itbl(bqe)->type == CONSTR); 
3478     /* only BQs of an RBH end with an RBH_Save closure */
3479     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3480
3481     tso_loc = where_is((StgClosure *)bqe);
3482     switch (get_itbl(bqe)->type) {
3483     case TSO:
3484       fprintf(stderr," TSO %d (%p) on [PE %d],",
3485               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3486       break;
3487     case CONSTR:
3488       fprintf(stderr," %s (IP %p),",
3489               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3490                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3491                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3492                "RBH_Save_?"), get_itbl(bqe));
3493       break;
3494     default:
3495       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3496            info_type((StgClosure *)bqe), node, info_type(node));
3497       break;
3498     }
3499   } /* for */
3500   fputc('\n', stderr);
3501 }
3502 #else
3503 /* 
3504    Nice and easy: only TSOs on the blocking queue
3505 */
3506 void 
3507 print_bq (StgClosure *node)
3508 {
3509   StgTSO *tso;
3510
3511   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3512   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3513        tso != END_TSO_QUEUE; 
3514        tso=tso->link) {
3515     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3516     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3517     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3518   }
3519   fputc('\n', stderr);
3520 }
3521 # endif
3522
3523 #if defined(PAR)
3524 static nat
3525 run_queue_len(void)
3526 {
3527   nat i;
3528   StgTSO *tso;
3529
3530   for (i=0, tso=run_queue_hd; 
3531        tso != END_TSO_QUEUE;
3532        i++, tso=tso->link)
3533     /* nothing */
3534
3535   return i;
3536 }
3537 #endif
3538
3539 void
3540 sched_belch(char *s, ...)
3541 {
3542   va_list ap;
3543   va_start(ap,s);
3544 #ifdef RTS_SUPPORTS_THREADS
3545   fprintf(stderr, "sched (task %p): ", osThreadId());
3546 #elif defined(PAR)
3547   fprintf(stderr, "== ");
3548 #else
3549   fprintf(stderr, "sched: ");
3550 #endif
3551   vfprintf(stderr, s, ap);
3552   fprintf(stderr, "\n");
3553   fflush(stderr);
3554   va_end(ap);
3555 }
3556
3557 #endif /* DEBUG */