[project @ 2004-08-19 11:27:45 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "Hooks.h"
48 #define COMPILING_SCHEDULER
49 #include "Schedule.h"
50 #include "StgMiscClosures.h"
51 #include "Storage.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "Timer.h"
59 #include "Prelude.h"
60 #include "ThreadLabels.h"
61 #include "LdvProfile.h"
62 #include "Updates.h"
63 #ifdef PROFILING
64 #include "Proftimer.h"
65 #include "ProfHeap.h"
66 #endif
67 #if defined(GRAN) || defined(PAR)
68 # include "GranSimRts.h"
69 # include "GranSim.h"
70 # include "ParallelRts.h"
71 # include "Parallel.h"
72 # include "ParallelDebug.h"
73 # include "FetchMe.h"
74 # include "HLC.h"
75 #endif
76 #include "Sparks.h"
77 #include "Capability.h"
78 #include "OSThreads.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 #ifdef THREADED_RTS
97 #define USED_IN_THREADED_RTS
98 #else
99 #define USED_IN_THREADED_RTS STG_UNUSED
100 #endif
101
102 #ifdef RTS_SUPPORTS_THREADS
103 #define USED_WHEN_RTS_SUPPORTS_THREADS
104 #else
105 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
106 #endif
107
108 /* Main thread queue.
109  * Locks required: sched_mutex.
110  */
111 StgMainThread *main_threads = NULL;
112
113 /* Thread queues.
114  * Locks required: sched_mutex.
115  */
116 #if defined(GRAN)
117
118 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
119 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
120
121 /* 
122    In GranSim we have a runnable and a blocked queue for each processor.
123    In order to minimise code changes new arrays run_queue_hds/tls
124    are created. run_queue_hd is then a short cut (macro) for
125    run_queue_hds[CurrentProc] (see GranSim.h).
126    -- HWL
127 */
128 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
129 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
130 StgTSO *ccalling_threadss[MAX_PROC];
131 /* We use the same global list of threads (all_threads) in GranSim as in
132    the std RTS (i.e. we are cheating). However, we don't use this list in
133    the GranSim specific code at the moment (so we are only potentially
134    cheating).  */
135
136 #else /* !GRAN */
137
138 StgTSO *run_queue_hd = NULL;
139 StgTSO *run_queue_tl = NULL;
140 StgTSO *blocked_queue_hd = NULL;
141 StgTSO *blocked_queue_tl = NULL;
142 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
143
144 #endif
145
146 /* Linked list of all threads.
147  * Used for detecting garbage collected threads.
148  */
149 StgTSO *all_threads = NULL;
150
151 /* When a thread performs a safe C call (_ccall_GC, using old
152  * terminology), it gets put on the suspended_ccalling_threads
153  * list. Used by the garbage collector.
154  */
155 static StgTSO *suspended_ccalling_threads;
156
157 static StgTSO *threadStackOverflow(StgTSO *tso);
158
159 /* KH: The following two flags are shared memory locations.  There is no need
160        to lock them, since they are only unset at the end of a scheduler
161        operation.
162 */
163
164 /* flag set by signal handler to precipitate a context switch */
165 nat context_switch = 0;
166
167 /* if this flag is set as well, give up execution */
168 rtsBool interrupted = rtsFalse;
169
170 /* Next thread ID to allocate.
171  * Locks required: thread_id_mutex
172  */
173 static StgThreadID next_thread_id = 1;
174
175 /*
176  * Pointers to the state of the current thread.
177  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
178  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
179  */
180  
181 /* The smallest stack size that makes any sense is:
182  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
183  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
184  *  + 1                       (the closure to enter)
185  *  + 1                       (stg_ap_v_ret)
186  *  + 1                       (spare slot req'd by stg_ap_v_ret)
187  *
188  * A thread with this stack will bomb immediately with a stack
189  * overflow, which will increase its stack size.  
190  */
191
192 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
193
194
195 #if defined(GRAN)
196 StgTSO *CurrentTSO;
197 #endif
198
199 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
200  *  exists - earlier gccs apparently didn't.
201  *  -= chak
202  */
203 StgTSO dummy_tso;
204
205 static rtsBool ready_to_gc;
206
207 /*
208  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
209  * in an MT setting, needed to signal that a worker thread shouldn't hang around
210  * in the scheduler when it is out of work.
211  */
212 static rtsBool shutting_down_scheduler = rtsFalse;
213
214 void            addToBlockedQueue ( StgTSO *tso );
215
216 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
217        void     interruptStgRts   ( void );
218
219 static void     detectBlackHoles  ( void );
220
221 #if defined(RTS_SUPPORTS_THREADS)
222 /* ToDo: carefully document the invariants that go together
223  *       with these synchronisation objects.
224  */
225 Mutex     sched_mutex       = INIT_MUTEX_VAR;
226 Mutex     term_mutex        = INIT_MUTEX_VAR;
227
228 #endif /* RTS_SUPPORTS_THREADS */
229
230 #if defined(PAR)
231 StgTSO *LastTSO;
232 rtsTime TimeOfLastYield;
233 rtsBool emitSchedule = rtsTrue;
234 #endif
235
236 #if DEBUG
237 static char *whatNext_strs[] = {
238   "(unknown)",
239   "ThreadRunGHC",
240   "ThreadInterpret",
241   "ThreadKilled",
242   "ThreadRelocated",
243   "ThreadComplete"
244 };
245 #endif
246
247 #if defined(PAR)
248 StgTSO * createSparkThread(rtsSpark spark);
249 StgTSO * activateSpark (rtsSpark spark);  
250 #endif
251
252 /* ----------------------------------------------------------------------------
253  * Starting Tasks
254  * ------------------------------------------------------------------------- */
255
256 #if defined(RTS_SUPPORTS_THREADS)
257 static rtsBool startingWorkerThread = rtsFalse;
258
259 static void taskStart(void);
260 static void
261 taskStart(void)
262 {
263   ACQUIRE_LOCK(&sched_mutex);
264   startingWorkerThread = rtsFalse;
265   schedule(NULL,NULL);
266   RELEASE_LOCK(&sched_mutex);
267 }
268
269 void
270 startSchedulerTaskIfNecessary(void)
271 {
272   if(run_queue_hd != END_TSO_QUEUE
273     || blocked_queue_hd != END_TSO_QUEUE
274     || sleeping_queue != END_TSO_QUEUE)
275   {
276     if(!startingWorkerThread)
277     { // we don't want to start another worker thread
278       // just because the last one hasn't yet reached the
279       // "waiting for capability" state
280       startingWorkerThread = rtsTrue;
281       if(!startTask(taskStart))
282       {
283         startingWorkerThread = rtsFalse;
284       }
285     }
286   }
287 }
288 #endif
289
290 /* ---------------------------------------------------------------------------
291    Main scheduling loop.
292
293    We use round-robin scheduling, each thread returning to the
294    scheduler loop when one of these conditions is detected:
295
296       * out of heap space
297       * timer expires (thread yields)
298       * thread blocks
299       * thread ends
300       * stack overflow
301
302    Locking notes:  we acquire the scheduler lock once at the beginning
303    of the scheduler loop, and release it when
304     
305       * running a thread, or
306       * waiting for work, or
307       * waiting for a GC to complete.
308
309    GRAN version:
310      In a GranSim setup this loop iterates over the global event queue.
311      This revolves around the global event queue, which determines what 
312      to do next. Therefore, it's more complicated than either the 
313      concurrent or the parallel (GUM) setup.
314
315    GUM version:
316      GUM iterates over incoming messages.
317      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
318      and sends out a fish whenever it has nothing to do; in-between
319      doing the actual reductions (shared code below) it processes the
320      incoming messages and deals with delayed operations 
321      (see PendingFetches).
322      This is not the ugliest code you could imagine, but it's bloody close.
323
324    ------------------------------------------------------------------------ */
325 static void
326 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
327           Capability *initialCapability )
328 {
329   StgTSO *t;
330   Capability *cap;
331   StgThreadReturnCode ret;
332 #if defined(GRAN)
333   rtsEvent *event;
334 #elif defined(PAR)
335   StgSparkPool *pool;
336   rtsSpark spark;
337   StgTSO *tso;
338   GlobalTaskId pe;
339   rtsBool receivedFinish = rtsFalse;
340 # if defined(DEBUG)
341   nat tp_size, sp_size; // stats only
342 # endif
343 #endif
344   rtsBool was_interrupted = rtsFalse;
345   nat prev_what_next;
346   
347   // Pre-condition: sched_mutex is held.
348   // We might have a capability, passed in as initialCapability.
349   cap = initialCapability;
350
351 #if defined(RTS_SUPPORTS_THREADS)
352   //
353   // in the threaded case, the capability is either passed in via the
354   // initialCapability parameter, or initialized inside the scheduler
355   // loop 
356   //
357   IF_DEBUG(scheduler,
358            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
359                        mainThread, initialCapability);
360       );
361 #else
362   // simply initialise it in the non-threaded case
363   grabCapability(&cap);
364 #endif
365
366 #if defined(GRAN)
367   /* set up first event to get things going */
368   /* ToDo: assign costs for system setup and init MainTSO ! */
369   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
370             ContinueThread, 
371             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
372
373   IF_DEBUG(gran,
374            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
375            G_TSO(CurrentTSO, 5));
376
377   if (RtsFlags.GranFlags.Light) {
378     /* Save current time; GranSim Light only */
379     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
380   }      
381
382   event = get_next_event();
383
384   while (event!=(rtsEvent*)NULL) {
385     /* Choose the processor with the next event */
386     CurrentProc = event->proc;
387     CurrentTSO = event->tso;
388
389 #elif defined(PAR)
390
391   while (!receivedFinish) {    /* set by processMessages */
392                                /* when receiving PP_FINISH message         */ 
393
394 #else // everything except GRAN and PAR
395
396   while (1) {
397
398 #endif
399
400      IF_DEBUG(scheduler, printAllThreads());
401
402 #if defined(RTS_SUPPORTS_THREADS)
403       // Yield the capability to higher-priority tasks if necessary.
404       //
405       if (cap != NULL) {
406           yieldCapability(&cap);
407       }
408
409       // If we do not currently hold a capability, we wait for one
410       //
411       if (cap == NULL) {
412           waitForCapability(&sched_mutex, &cap,
413                             mainThread ? &mainThread->bound_thread_cond : NULL);
414       }
415
416       // We now have a capability...
417 #endif
418
419     //
420     // If we're interrupted (the user pressed ^C, or some other
421     // termination condition occurred), kill all the currently running
422     // threads.
423     //
424     if (interrupted) {
425         IF_DEBUG(scheduler, sched_belch("interrupted"));
426         interrupted = rtsFalse;
427         was_interrupted = rtsTrue;
428 #if defined(RTS_SUPPORTS_THREADS)
429         // In the threaded RTS, deadlock detection doesn't work,
430         // so just exit right away.
431         prog_belch("interrupted");
432         releaseCapability(cap);
433         RELEASE_LOCK(&sched_mutex);
434         shutdownHaskellAndExit(EXIT_SUCCESS);
435 #else
436         deleteAllThreads();
437 #endif
438     }
439
440 #if defined(RTS_USER_SIGNALS)
441     // check for signals each time around the scheduler
442     if (signals_pending()) {
443       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
444       startSignalHandlers();
445       ACQUIRE_LOCK(&sched_mutex);
446     }
447 #endif
448
449     //
450     // Check whether any waiting threads need to be woken up.  If the
451     // run queue is empty, and there are no other tasks running, we
452     // can wait indefinitely for something to happen.
453     //
454     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
455 #if defined(RTS_SUPPORTS_THREADS)
456                 || EMPTY_RUN_QUEUE()
457 #endif
458         )
459     {
460       awaitEvent( EMPTY_RUN_QUEUE() );
461     }
462     // we can be interrupted while waiting for I/O...
463     if (interrupted) continue;
464
465     /* 
466      * Detect deadlock: when we have no threads to run, there are no
467      * threads waiting on I/O or sleeping, and all the other tasks are
468      * waiting for work, we must have a deadlock of some description.
469      *
470      * We first try to find threads blocked on themselves (ie. black
471      * holes), and generate NonTermination exceptions where necessary.
472      *
473      * If no threads are black holed, we have a deadlock situation, so
474      * inform all the main threads.
475      */
476 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
477     if (   EMPTY_THREAD_QUEUES() )
478     {
479         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
480         // Garbage collection can release some new threads due to
481         // either (a) finalizers or (b) threads resurrected because
482         // they are about to be send BlockedOnDeadMVar.  Any threads
483         // thus released will be immediately runnable.
484         GarbageCollect(GetRoots,rtsTrue);
485
486         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
487
488         IF_DEBUG(scheduler, 
489                  sched_belch("still deadlocked, checking for black holes..."));
490         detectBlackHoles();
491
492         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
493
494 #if defined(RTS_USER_SIGNALS)
495         /* If we have user-installed signal handlers, then wait
496          * for signals to arrive rather then bombing out with a
497          * deadlock.
498          */
499         if ( anyUserHandlers() ) {
500             IF_DEBUG(scheduler, 
501                      sched_belch("still deadlocked, waiting for signals..."));
502
503             awaitUserSignals();
504
505             // we might be interrupted...
506             if (interrupted) { continue; }
507
508             if (signals_pending()) {
509                 RELEASE_LOCK(&sched_mutex);
510                 startSignalHandlers();
511                 ACQUIRE_LOCK(&sched_mutex);
512             }
513             ASSERT(!EMPTY_RUN_QUEUE());
514             goto not_deadlocked;
515         }
516 #endif
517
518         /* Probably a real deadlock.  Send the current main thread the
519          * Deadlock exception (or in the SMP build, send *all* main
520          * threads the deadlock exception, since none of them can make
521          * progress).
522          */
523         {
524             StgMainThread *m;
525             m = main_threads;
526             switch (m->tso->why_blocked) {
527             case BlockedOnBlackHole:
528             case BlockedOnException:
529             case BlockedOnMVar:
530                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
531                 break;
532             default:
533                 barf("deadlock: main thread blocked in a strange way");
534             }
535         }
536     }
537   not_deadlocked:
538
539 #elif defined(RTS_SUPPORTS_THREADS)
540     // ToDo: add deadlock detection in threaded RTS
541 #elif defined(PAR)
542     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
543 #endif
544
545 #if defined(RTS_SUPPORTS_THREADS)
546     if ( EMPTY_RUN_QUEUE() ) {
547         continue; // nothing to do
548     }
549 #endif
550
551 #if defined(GRAN)
552     if (RtsFlags.GranFlags.Light)
553       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
554
555     /* adjust time based on time-stamp */
556     if (event->time > CurrentTime[CurrentProc] &&
557         event->evttype != ContinueThread)
558       CurrentTime[CurrentProc] = event->time;
559     
560     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
561     if (!RtsFlags.GranFlags.Light)
562       handleIdlePEs();
563
564     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
565
566     /* main event dispatcher in GranSim */
567     switch (event->evttype) {
568       /* Should just be continuing execution */
569     case ContinueThread:
570       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
571       /* ToDo: check assertion
572       ASSERT(run_queue_hd != (StgTSO*)NULL &&
573              run_queue_hd != END_TSO_QUEUE);
574       */
575       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
576       if (!RtsFlags.GranFlags.DoAsyncFetch &&
577           procStatus[CurrentProc]==Fetching) {
578         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
579               CurrentTSO->id, CurrentTSO, CurrentProc);
580         goto next_thread;
581       } 
582       /* Ignore ContinueThreads for completed threads */
583       if (CurrentTSO->what_next == ThreadComplete) {
584         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
585               CurrentTSO->id, CurrentTSO, CurrentProc);
586         goto next_thread;
587       } 
588       /* Ignore ContinueThreads for threads that are being migrated */
589       if (PROCS(CurrentTSO)==Nowhere) { 
590         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
591               CurrentTSO->id, CurrentTSO, CurrentProc);
592         goto next_thread;
593       }
594       /* The thread should be at the beginning of the run queue */
595       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
596         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
597               CurrentTSO->id, CurrentTSO, CurrentProc);
598         break; // run the thread anyway
599       }
600       /*
601       new_event(proc, proc, CurrentTime[proc],
602                 FindWork,
603                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
604       goto next_thread; 
605       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
606       break; // now actually run the thread; DaH Qu'vam yImuHbej 
607
608     case FetchNode:
609       do_the_fetchnode(event);
610       goto next_thread;             /* handle next event in event queue  */
611       
612     case GlobalBlock:
613       do_the_globalblock(event);
614       goto next_thread;             /* handle next event in event queue  */
615       
616     case FetchReply:
617       do_the_fetchreply(event);
618       goto next_thread;             /* handle next event in event queue  */
619       
620     case UnblockThread:   /* Move from the blocked queue to the tail of */
621       do_the_unblock(event);
622       goto next_thread;             /* handle next event in event queue  */
623       
624     case ResumeThread:  /* Move from the blocked queue to the tail of */
625       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
626       event->tso->gran.blocktime += 
627         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
628       do_the_startthread(event);
629       goto next_thread;             /* handle next event in event queue  */
630       
631     case StartThread:
632       do_the_startthread(event);
633       goto next_thread;             /* handle next event in event queue  */
634       
635     case MoveThread:
636       do_the_movethread(event);
637       goto next_thread;             /* handle next event in event queue  */
638       
639     case MoveSpark:
640       do_the_movespark(event);
641       goto next_thread;             /* handle next event in event queue  */
642       
643     case FindWork:
644       do_the_findwork(event);
645       goto next_thread;             /* handle next event in event queue  */
646       
647     default:
648       barf("Illegal event type %u\n", event->evttype);
649     }  /* switch */
650     
651     /* This point was scheduler_loop in the old RTS */
652
653     IF_DEBUG(gran, belch("GRAN: after main switch"));
654
655     TimeOfLastEvent = CurrentTime[CurrentProc];
656     TimeOfNextEvent = get_time_of_next_event();
657     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
658     // CurrentTSO = ThreadQueueHd;
659
660     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
661                          TimeOfNextEvent));
662
663     if (RtsFlags.GranFlags.Light) 
664       GranSimLight_leave_system(event, &ActiveTSO); 
665
666     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
667
668     IF_DEBUG(gran, 
669              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
670
671     /* in a GranSim setup the TSO stays on the run queue */
672     t = CurrentTSO;
673     /* Take a thread from the run queue. */
674     POP_RUN_QUEUE(t); // take_off_run_queue(t);
675
676     IF_DEBUG(gran, 
677              fprintf(stderr, "GRAN: About to run current thread, which is\n");
678              G_TSO(t,5));
679
680     context_switch = 0; // turned on via GranYield, checking events and time slice
681
682     IF_DEBUG(gran, 
683              DumpGranEvent(GR_SCHEDULE, t));
684
685     procStatus[CurrentProc] = Busy;
686
687 #elif defined(PAR)
688     if (PendingFetches != END_BF_QUEUE) {
689         processFetches();
690     }
691
692     /* ToDo: phps merge with spark activation above */
693     /* check whether we have local work and send requests if we have none */
694     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
695       /* :-[  no local threads => look out for local sparks */
696       /* the spark pool for the current PE */
697       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
698       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
699           pool->hd < pool->tl) {
700         /* 
701          * ToDo: add GC code check that we really have enough heap afterwards!!
702          * Old comment:
703          * If we're here (no runnable threads) and we have pending
704          * sparks, we must have a space problem.  Get enough space
705          * to turn one of those pending sparks into a
706          * thread... 
707          */
708
709         spark = findSpark(rtsFalse);                /* get a spark */
710         if (spark != (rtsSpark) NULL) {
711           tso = activateSpark(spark);       /* turn the spark into a thread */
712           IF_PAR_DEBUG(schedule,
713                        belch("==== schedule: Created TSO %d (%p); %d threads active",
714                              tso->id, tso, advisory_thread_count));
715
716           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
717             belch("==^^ failed to activate spark");
718             goto next_thread;
719           }               /* otherwise fall through & pick-up new tso */
720         } else {
721           IF_PAR_DEBUG(verbose,
722                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
723                              spark_queue_len(pool)));
724           goto next_thread;
725         }
726       }
727
728       /* If we still have no work we need to send a FISH to get a spark
729          from another PE 
730       */
731       if (EMPTY_RUN_QUEUE()) {
732       /* =8-[  no local sparks => look for work on other PEs */
733         /*
734          * We really have absolutely no work.  Send out a fish
735          * (there may be some out there already), and wait for
736          * something to arrive.  We clearly can't run any threads
737          * until a SCHEDULE or RESUME arrives, and so that's what
738          * we're hoping to see.  (Of course, we still have to
739          * respond to other types of messages.)
740          */
741         TIME now = msTime() /*CURRENT_TIME*/;
742         IF_PAR_DEBUG(verbose, 
743                      belch("--  now=%ld", now));
744         IF_PAR_DEBUG(verbose,
745                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
746                          (last_fish_arrived_at!=0 &&
747                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
748                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
749                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
750                              last_fish_arrived_at,
751                              RtsFlags.ParFlags.fishDelay, now);
752                      });
753         
754         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
755             (last_fish_arrived_at==0 ||
756              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
757           /* outstandingFishes is set in sendFish, processFish;
758              avoid flooding system with fishes via delay */
759           pe = choosePE();
760           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
761                    NEW_FISH_HUNGER);
762
763           // Global statistics: count no. of fishes
764           if (RtsFlags.ParFlags.ParStats.Global &&
765               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
766             globalParStats.tot_fish_mess++;
767           }
768         }
769       
770         receivedFinish = processMessages();
771         goto next_thread;
772       }
773     } else if (PacketsWaiting()) {  /* Look for incoming messages */
774       receivedFinish = processMessages();
775     }
776
777     /* Now we are sure that we have some work available */
778     ASSERT(run_queue_hd != END_TSO_QUEUE);
779
780     /* Take a thread from the run queue, if we have work */
781     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
782     IF_DEBUG(sanity,checkTSO(t));
783
784     /* ToDo: write something to the log-file
785     if (RTSflags.ParFlags.granSimStats && !sameThread)
786         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
787
788     CurrentTSO = t;
789     */
790     /* the spark pool for the current PE */
791     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
792
793     IF_DEBUG(scheduler, 
794              belch("--=^ %d threads, %d sparks on [%#x]", 
795                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
796
797 # if 1
798     if (0 && RtsFlags.ParFlags.ParStats.Full && 
799         t && LastTSO && t->id != LastTSO->id && 
800         LastTSO->why_blocked == NotBlocked && 
801         LastTSO->what_next != ThreadComplete) {
802       // if previously scheduled TSO not blocked we have to record the context switch
803       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
804                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
805     }
806
807     if (RtsFlags.ParFlags.ParStats.Full && 
808         (emitSchedule /* forced emit */ ||
809         (t && LastTSO && t->id != LastTSO->id))) {
810       /* 
811          we are running a different TSO, so write a schedule event to log file
812          NB: If we use fair scheduling we also have to write  a deschedule 
813              event for LastTSO; with unfair scheduling we know that the
814              previous tso has blocked whenever we switch to another tso, so
815              we don't need it in GUM for now
816       */
817       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
818                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
819       emitSchedule = rtsFalse;
820     }
821      
822 # endif
823 #else /* !GRAN && !PAR */
824   
825     // grab a thread from the run queue
826     ASSERT(run_queue_hd != END_TSO_QUEUE);
827     POP_RUN_QUEUE(t);
828
829     // Sanity check the thread we're about to run.  This can be
830     // expensive if there is lots of thread switching going on...
831     IF_DEBUG(sanity,checkTSO(t));
832 #endif
833
834 #ifdef THREADED_RTS
835     {
836       StgMainThread *m = t->main;
837       
838       if(m)
839       {
840         if(m == mainThread)
841         {
842           IF_DEBUG(scheduler,
843             sched_belch("### Running thread %d in bound thread", t->id));
844           // yes, the Haskell thread is bound to the current native thread
845         }
846         else
847         {
848           IF_DEBUG(scheduler,
849             sched_belch("### thread %d bound to another OS thread", t->id));
850           // no, bound to a different Haskell thread: pass to that thread
851           PUSH_ON_RUN_QUEUE(t);
852           passCapability(&m->bound_thread_cond);
853           continue;
854         }
855       }
856       else
857       {
858         if(mainThread != NULL)
859         // The thread we want to run is bound.
860         {
861           IF_DEBUG(scheduler,
862             sched_belch("### this OS thread cannot run thread %d", t->id));
863           // no, the current native thread is bound to a different
864           // Haskell thread, so pass it to any worker thread
865           PUSH_ON_RUN_QUEUE(t);
866           passCapabilityToWorker();
867           continue; 
868         }
869       }
870     }
871 #endif
872
873     cap->r.rCurrentTSO = t;
874     
875     /* context switches are now initiated by the timer signal, unless
876      * the user specified "context switch as often as possible", with
877      * +RTS -C0
878      */
879     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
880          && (run_queue_hd != END_TSO_QUEUE
881              || blocked_queue_hd != END_TSO_QUEUE
882              || sleeping_queue != END_TSO_QUEUE)))
883         context_switch = 1;
884
885 run_thread:
886
887     RELEASE_LOCK(&sched_mutex);
888
889     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
890                               t->id, whatNext_strs[t->what_next]));
891
892 #ifdef PROFILING
893     startHeapProfTimer();
894 #endif
895
896     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
897     /* Run the current thread 
898      */
899     prev_what_next = t->what_next;
900
901     errno = t->saved_errno;
902
903     switch (prev_what_next) {
904
905     case ThreadKilled:
906     case ThreadComplete:
907         /* Thread already finished, return to scheduler. */
908         ret = ThreadFinished;
909         break;
910
911     case ThreadRunGHC:
912         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
913         break;
914
915     case ThreadInterpret:
916         ret = interpretBCO(cap);
917         break;
918
919     default:
920       barf("schedule: invalid what_next field");
921     }
922
923     // The TSO might have moved, so find the new location:
924     t = cap->r.rCurrentTSO;
925
926     // And save the current errno in this thread.
927     t->saved_errno = errno;
928
929     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
930     
931     /* Costs for the scheduler are assigned to CCS_SYSTEM */
932 #ifdef PROFILING
933     stopHeapProfTimer();
934     CCCS = CCS_SYSTEM;
935 #endif
936     
937     ACQUIRE_LOCK(&sched_mutex);
938     
939 #ifdef RTS_SUPPORTS_THREADS
940     IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
941 #elif !defined(GRAN) && !defined(PAR)
942     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
943 #endif
944     
945 #if defined(PAR)
946     /* HACK 675: if the last thread didn't yield, make sure to print a 
947        SCHEDULE event to the log file when StgRunning the next thread, even
948        if it is the same one as before */
949     LastTSO = t; 
950     TimeOfLastYield = CURRENT_TIME;
951 #endif
952
953     switch (ret) {
954     case HeapOverflow:
955 #if defined(GRAN)
956       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
957       globalGranStats.tot_heapover++;
958 #elif defined(PAR)
959       globalParStats.tot_heapover++;
960 #endif
961
962       // did the task ask for a large block?
963       if (cap->r.rHpAlloc > BLOCK_SIZE) {
964           // if so, get one and push it on the front of the nursery.
965           bdescr *bd;
966           nat blocks;
967           
968           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
969
970           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
971                                    t->id, whatNext_strs[t->what_next], blocks));
972
973           // don't do this if it would push us over the
974           // alloc_blocks_lim limit; we'll GC first.
975           if (alloc_blocks + blocks < alloc_blocks_lim) {
976
977               alloc_blocks += blocks;
978               bd = allocGroup( blocks );
979
980               // link the new group into the list
981               bd->link = cap->r.rCurrentNursery;
982               bd->u.back = cap->r.rCurrentNursery->u.back;
983               if (cap->r.rCurrentNursery->u.back != NULL) {
984                   cap->r.rCurrentNursery->u.back->link = bd;
985               } else {
986                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
987                          g0s0->blocks == cap->r.rNursery);
988                   cap->r.rNursery = g0s0->blocks = bd;
989               }           
990               cap->r.rCurrentNursery->u.back = bd;
991
992               // initialise it as a nursery block.  We initialise the
993               // step, gen_no, and flags field of *every* sub-block in
994               // this large block, because this is easier than making
995               // sure that we always find the block head of a large
996               // block whenever we call Bdescr() (eg. evacuate() and
997               // isAlive() in the GC would both have to do this, at
998               // least).
999               { 
1000                   bdescr *x;
1001                   for (x = bd; x < bd + blocks; x++) {
1002                       x->step = g0s0;
1003                       x->gen_no = 0;
1004                       x->flags = 0;
1005                   }
1006               }
1007
1008               // don't forget to update the block count in g0s0.
1009               g0s0->n_blocks += blocks;
1010               // This assert can be a killer if the app is doing lots
1011               // of large block allocations.
1012               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1013
1014               // now update the nursery to point to the new block
1015               cap->r.rCurrentNursery = bd;
1016
1017               // we might be unlucky and have another thread get on the
1018               // run queue before us and steal the large block, but in that
1019               // case the thread will just end up requesting another large
1020               // block.
1021               PUSH_ON_RUN_QUEUE(t);
1022               break;
1023           }
1024       }
1025
1026       /* make all the running tasks block on a condition variable,
1027        * maybe set context_switch and wait till they all pile in,
1028        * then have them wait on a GC condition variable.
1029        */
1030       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1031                                t->id, whatNext_strs[t->what_next]));
1032       threadPaused(t);
1033 #if defined(GRAN)
1034       ASSERT(!is_on_queue(t,CurrentProc));
1035 #elif defined(PAR)
1036       /* Currently we emit a DESCHEDULE event before GC in GUM.
1037          ToDo: either add separate event to distinguish SYSTEM time from rest
1038                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1039       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1040         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1041                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1042         emitSchedule = rtsTrue;
1043       }
1044 #endif
1045       
1046       ready_to_gc = rtsTrue;
1047       context_switch = 1;               /* stop other threads ASAP */
1048       PUSH_ON_RUN_QUEUE(t);
1049       /* actual GC is done at the end of the while loop */
1050       break;
1051       
1052     case StackOverflow:
1053 #if defined(GRAN)
1054       IF_DEBUG(gran, 
1055                DumpGranEvent(GR_DESCHEDULE, t));
1056       globalGranStats.tot_stackover++;
1057 #elif defined(PAR)
1058       // IF_DEBUG(par, 
1059       // DumpGranEvent(GR_DESCHEDULE, t);
1060       globalParStats.tot_stackover++;
1061 #endif
1062       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1063                                t->id, whatNext_strs[t->what_next]));
1064       /* just adjust the stack for this thread, then pop it back
1065        * on the run queue.
1066        */
1067       threadPaused(t);
1068       { 
1069         /* enlarge the stack */
1070         StgTSO *new_t = threadStackOverflow(t);
1071         
1072         /* This TSO has moved, so update any pointers to it from the
1073          * main thread stack.  It better not be on any other queues...
1074          * (it shouldn't be).
1075          */
1076         if (t->main != NULL) {
1077             t->main->tso = new_t;
1078         }
1079         PUSH_ON_RUN_QUEUE(new_t);
1080       }
1081       break;
1082
1083     case ThreadYielding:
1084       // Reset the context switch flag.  We don't do this just before
1085       // running the thread, because that would mean we would lose ticks
1086       // during GC, which can lead to unfair scheduling (a thread hogs
1087       // the CPU because the tick always arrives during GC).  This way
1088       // penalises threads that do a lot of allocation, but that seems
1089       // better than the alternative.
1090       context_switch = 0;
1091
1092 #if defined(GRAN)
1093       IF_DEBUG(gran, 
1094                DumpGranEvent(GR_DESCHEDULE, t));
1095       globalGranStats.tot_yields++;
1096 #elif defined(PAR)
1097       // IF_DEBUG(par, 
1098       // DumpGranEvent(GR_DESCHEDULE, t);
1099       globalParStats.tot_yields++;
1100 #endif
1101       /* put the thread back on the run queue.  Then, if we're ready to
1102        * GC, check whether this is the last task to stop.  If so, wake
1103        * up the GC thread.  getThread will block during a GC until the
1104        * GC is finished.
1105        */
1106       IF_DEBUG(scheduler,
1107                if (t->what_next != prev_what_next) {
1108                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1109                          t->id, whatNext_strs[t->what_next]);
1110                } else {
1111                    belch("--<< thread %ld (%s) stopped, yielding", 
1112                          t->id, whatNext_strs[t->what_next]);
1113                }
1114                );
1115
1116       IF_DEBUG(sanity,
1117                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1118                checkTSO(t));
1119       ASSERT(t->link == END_TSO_QUEUE);
1120
1121       // Shortcut if we're just switching evaluators: don't bother
1122       // doing stack squeezing (which can be expensive), just run the
1123       // thread.
1124       if (t->what_next != prev_what_next) {
1125           goto run_thread;
1126       }
1127
1128       threadPaused(t);
1129
1130 #if defined(GRAN)
1131       ASSERT(!is_on_queue(t,CurrentProc));
1132
1133       IF_DEBUG(sanity,
1134                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1135                checkThreadQsSanity(rtsTrue));
1136 #endif
1137
1138 #if defined(PAR)
1139       if (RtsFlags.ParFlags.doFairScheduling) { 
1140         /* this does round-robin scheduling; good for concurrency */
1141         APPEND_TO_RUN_QUEUE(t);
1142       } else {
1143         /* this does unfair scheduling; good for parallelism */
1144         PUSH_ON_RUN_QUEUE(t);
1145       }
1146 #else
1147       // this does round-robin scheduling; good for concurrency
1148       APPEND_TO_RUN_QUEUE(t);
1149 #endif
1150
1151 #if defined(GRAN)
1152       /* add a ContinueThread event to actually process the thread */
1153       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1154                 ContinueThread,
1155                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1156       IF_GRAN_DEBUG(bq, 
1157                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1158                G_EVENTQ(0);
1159                G_CURR_THREADQ(0));
1160 #endif /* GRAN */
1161       break;
1162
1163     case ThreadBlocked:
1164 #if defined(GRAN)
1165       IF_DEBUG(scheduler,
1166                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1167                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1168                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1169
1170       // ??? needed; should emit block before
1171       IF_DEBUG(gran, 
1172                DumpGranEvent(GR_DESCHEDULE, t)); 
1173       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1174       /*
1175         ngoq Dogh!
1176       ASSERT(procStatus[CurrentProc]==Busy || 
1177               ((procStatus[CurrentProc]==Fetching) && 
1178               (t->block_info.closure!=(StgClosure*)NULL)));
1179       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1180           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1181             procStatus[CurrentProc]==Fetching)) 
1182         procStatus[CurrentProc] = Idle;
1183       */
1184 #elif defined(PAR)
1185       IF_DEBUG(scheduler,
1186                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1187                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1188       IF_PAR_DEBUG(bq,
1189
1190                    if (t->block_info.closure!=(StgClosure*)NULL) 
1191                      print_bq(t->block_info.closure));
1192
1193       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1194       blockThread(t);
1195
1196       /* whatever we schedule next, we must log that schedule */
1197       emitSchedule = rtsTrue;
1198
1199 #else /* !GRAN */
1200       /* don't need to do anything.  Either the thread is blocked on
1201        * I/O, in which case we'll have called addToBlockedQueue
1202        * previously, or it's blocked on an MVar or Blackhole, in which
1203        * case it'll be on the relevant queue already.
1204        */
1205       IF_DEBUG(scheduler,
1206                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1207                        t->id, whatNext_strs[t->what_next]);
1208                printThreadBlockage(t);
1209                fprintf(stderr, "\n"));
1210       fflush(stderr);
1211
1212       /* Only for dumping event to log file 
1213          ToDo: do I need this in GranSim, too?
1214       blockThread(t);
1215       */
1216 #endif
1217       threadPaused(t);
1218       break;
1219
1220     case ThreadFinished:
1221       /* Need to check whether this was a main thread, and if so, signal
1222        * the task that started it with the return value.  If we have no
1223        * more main threads, we probably need to stop all the tasks until
1224        * we get a new one.
1225        */
1226       /* We also end up here if the thread kills itself with an
1227        * uncaught exception, see Exception.hc.
1228        */
1229       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1230                                t->id, whatNext_strs[t->what_next]));
1231 #if defined(GRAN)
1232       endThread(t, CurrentProc); // clean-up the thread
1233 #elif defined(PAR)
1234       /* For now all are advisory -- HWL */
1235       //if(t->priority==AdvisoryPriority) ??
1236       advisory_thread_count--;
1237       
1238 # ifdef DIST
1239       if(t->dist.priority==RevalPriority)
1240         FinishReval(t);
1241 # endif
1242       
1243       if (RtsFlags.ParFlags.ParStats.Full &&
1244           !RtsFlags.ParFlags.ParStats.Suppressed) 
1245         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1246 #endif
1247
1248       //
1249       // Check whether the thread that just completed was a main
1250       // thread, and if so return with the result.  
1251       //
1252       // There is an assumption here that all thread completion goes
1253       // through this point; we need to make sure that if a thread
1254       // ends up in the ThreadKilled state, that it stays on the run
1255       // queue so it can be dealt with here.
1256       //
1257       if (
1258 #if defined(RTS_SUPPORTS_THREADS)
1259           mainThread != NULL
1260 #else
1261           mainThread->tso == t
1262 #endif
1263           )
1264       {
1265           // We are a bound thread: this must be our thread that just
1266           // completed.
1267           ASSERT(mainThread->tso == t);
1268
1269           if (t->what_next == ThreadComplete) {
1270               if (mainThread->ret) {
1271                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1272                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1273               }
1274               mainThread->stat = Success;
1275           } else {
1276               if (mainThread->ret) {
1277                   *(mainThread->ret) = NULL;
1278               }
1279               if (was_interrupted) {
1280                   mainThread->stat = Interrupted;
1281               } else {
1282                   mainThread->stat = Killed;
1283               }
1284           }
1285 #ifdef DEBUG
1286           removeThreadLabel((StgWord)mainThread->tso->id);
1287 #endif
1288           if (mainThread->prev == NULL) {
1289               main_threads = mainThread->link;
1290           } else {
1291               mainThread->prev->link = mainThread->link;
1292           }
1293           if (mainThread->link != NULL) {
1294               mainThread->link->prev = NULL;
1295           }
1296           releaseCapability(cap);
1297           return;
1298       }
1299
1300 #ifdef RTS_SUPPORTS_THREADS
1301       ASSERT(t->main == NULL);
1302 #else
1303       if (t->main != NULL) {
1304           // Must be a main thread that is not the topmost one.  Leave
1305           // it on the run queue until the stack has unwound to the
1306           // point where we can deal with this.  Leaving it on the run
1307           // queue also ensures that the garbage collector knows about
1308           // this thread and its return value (it gets dropped from the
1309           // all_threads list so there's no other way to find it).
1310           APPEND_TO_RUN_QUEUE(t);
1311       }
1312 #endif
1313       break;
1314
1315     default:
1316       barf("schedule: invalid thread return code %d", (int)ret);
1317     }
1318
1319 #ifdef PROFILING
1320     // When we have +RTS -i0 and we're heap profiling, do a census at
1321     // every GC.  This lets us get repeatable runs for debugging.
1322     if (performHeapProfile ||
1323         (RtsFlags.ProfFlags.profileInterval==0 &&
1324          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1325         GarbageCollect(GetRoots, rtsTrue);
1326         heapCensus();
1327         performHeapProfile = rtsFalse;
1328         ready_to_gc = rtsFalse; // we already GC'd
1329     }
1330 #endif
1331
1332     if (ready_to_gc) {
1333       /* everybody back, start the GC.
1334        * Could do it in this thread, or signal a condition var
1335        * to do it in another thread.  Either way, we need to
1336        * broadcast on gc_pending_cond afterward.
1337        */
1338 #if defined(RTS_SUPPORTS_THREADS)
1339       IF_DEBUG(scheduler,sched_belch("doing GC"));
1340 #endif
1341       GarbageCollect(GetRoots,rtsFalse);
1342       ready_to_gc = rtsFalse;
1343 #if defined(GRAN)
1344       /* add a ContinueThread event to continue execution of current thread */
1345       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1346                 ContinueThread,
1347                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1348       IF_GRAN_DEBUG(bq, 
1349                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1350                G_EVENTQ(0);
1351                G_CURR_THREADQ(0));
1352 #endif /* GRAN */
1353     }
1354
1355 #if defined(GRAN)
1356   next_thread:
1357     IF_GRAN_DEBUG(unused,
1358                   print_eventq(EventHd));
1359
1360     event = get_next_event();
1361 #elif defined(PAR)
1362   next_thread:
1363     /* ToDo: wait for next message to arrive rather than busy wait */
1364 #endif /* GRAN */
1365
1366   } /* end of while(1) */
1367
1368   IF_PAR_DEBUG(verbose,
1369                belch("== Leaving schedule() after having received Finish"));
1370 }
1371
1372 /* ---------------------------------------------------------------------------
1373  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1374  * used by Control.Concurrent for error checking.
1375  * ------------------------------------------------------------------------- */
1376  
1377 StgBool
1378 rtsSupportsBoundThreads(void)
1379 {
1380 #ifdef THREADED_RTS
1381   return rtsTrue;
1382 #else
1383   return rtsFalse;
1384 #endif
1385 }
1386
1387 /* ---------------------------------------------------------------------------
1388  * isThreadBound(tso): check whether tso is bound to an OS thread.
1389  * ------------------------------------------------------------------------- */
1390  
1391 StgBool
1392 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1393 {
1394 #ifdef THREADED_RTS
1395   return (tso->main != NULL);
1396 #endif
1397   return rtsFalse;
1398 }
1399
1400 /* ---------------------------------------------------------------------------
1401  * Singleton fork(). Do not copy any running threads.
1402  * ------------------------------------------------------------------------- */
1403
1404 #ifndef mingw32_TARGET_OS
1405 #define FORKPROCESS_PRIMOP_SUPPORTED
1406 #endif
1407
1408 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1409 static void 
1410 deleteThreadImmediately(StgTSO *tso);
1411 #endif
1412 StgInt
1413 forkProcess(HsStablePtr *entry
1414 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1415             STG_UNUSED
1416 #endif
1417            )
1418 {
1419 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1420   pid_t pid;
1421   StgTSO* t,*next;
1422   StgMainThread *m;
1423   SchedulerStatus rc;
1424
1425   IF_DEBUG(scheduler,sched_belch("forking!"));
1426   rts_lock(); // This not only acquires sched_mutex, it also
1427               // makes sure that no other threads are running
1428
1429   pid = fork();
1430
1431   if (pid) { /* parent */
1432
1433   /* just return the pid */
1434     rts_unlock();
1435     return pid;
1436     
1437   } else { /* child */
1438     
1439     
1440       // delete all threads
1441     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1442     
1443     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1444       next = t->link;
1445
1446         // don't allow threads to catch the ThreadKilled exception
1447       deleteThreadImmediately(t);
1448     }
1449     
1450       // wipe the main thread list
1451     while((m = main_threads) != NULL) {
1452       main_threads = m->link;
1453 # ifdef THREADED_RTS
1454       closeCondition(&m->bound_thread_cond);
1455 # endif
1456       stgFree(m);
1457     }
1458     
1459 # ifdef RTS_SUPPORTS_THREADS
1460     resetTaskManagerAfterFork();      // tell startTask() and friends that
1461     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1462     resetWorkerWakeupPipeAfterFork();
1463 # endif
1464     
1465     rc = rts_evalStableIO(entry, NULL);  // run the action
1466     rts_checkSchedStatus("forkProcess",rc);
1467     
1468     rts_unlock();
1469     
1470     hs_exit();                      // clean up and exit
1471     stg_exit(0);
1472   }
1473 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1474   barf("forkProcess#: primop not supported, sorry!\n");
1475   return -1;
1476 #endif
1477 }
1478
1479 /* ---------------------------------------------------------------------------
1480  * deleteAllThreads():  kill all the live threads.
1481  *
1482  * This is used when we catch a user interrupt (^C), before performing
1483  * any necessary cleanups and running finalizers.
1484  *
1485  * Locks: sched_mutex held.
1486  * ------------------------------------------------------------------------- */
1487    
1488 void
1489 deleteAllThreads ( void )
1490 {
1491   StgTSO* t, *next;
1492   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1493   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1494       next = t->global_link;
1495       deleteThread(t);
1496   }      
1497
1498   // The run queue now contains a bunch of ThreadKilled threads.  We
1499   // must not throw these away: the main thread(s) will be in there
1500   // somewhere, and the main scheduler loop has to deal with it.
1501   // Also, the run queue is the only thing keeping these threads from
1502   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1503
1504   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1505   ASSERT(sleeping_queue == END_TSO_QUEUE);
1506 }
1507
1508 /* startThread and  insertThread are now in GranSim.c -- HWL */
1509
1510
1511 /* ---------------------------------------------------------------------------
1512  * Suspending & resuming Haskell threads.
1513  * 
1514  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1515  * its capability before calling the C function.  This allows another
1516  * task to pick up the capability and carry on running Haskell
1517  * threads.  It also means that if the C call blocks, it won't lock
1518  * the whole system.
1519  *
1520  * The Haskell thread making the C call is put to sleep for the
1521  * duration of the call, on the susepended_ccalling_threads queue.  We
1522  * give out a token to the task, which it can use to resume the thread
1523  * on return from the C function.
1524  * ------------------------------------------------------------------------- */
1525    
1526 StgInt
1527 suspendThread( StgRegTable *reg )
1528 {
1529   nat tok;
1530   Capability *cap;
1531   int saved_errno = errno;
1532
1533   /* assume that *reg is a pointer to the StgRegTable part
1534    * of a Capability.
1535    */
1536   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1537
1538   ACQUIRE_LOCK(&sched_mutex);
1539
1540   IF_DEBUG(scheduler,
1541            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1542
1543   // XXX this might not be necessary --SDM
1544   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1545
1546   threadPaused(cap->r.rCurrentTSO);
1547   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1548   suspended_ccalling_threads = cap->r.rCurrentTSO;
1549
1550   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1551       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1552       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1553   } else {
1554       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1555   }
1556
1557   /* Use the thread ID as the token; it should be unique */
1558   tok = cap->r.rCurrentTSO->id;
1559
1560   /* Hand back capability */
1561   releaseCapability(cap);
1562   
1563 #if defined(RTS_SUPPORTS_THREADS)
1564   /* Preparing to leave the RTS, so ensure there's a native thread/task
1565      waiting to take over.
1566   */
1567   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1568 #endif
1569
1570   /* Other threads _might_ be available for execution; signal this */
1571   THREAD_RUNNABLE();
1572   RELEASE_LOCK(&sched_mutex);
1573   
1574   errno = saved_errno;
1575   return tok; 
1576 }
1577
1578 StgRegTable *
1579 resumeThread( StgInt tok )
1580 {
1581   StgTSO *tso, **prev;
1582   Capability *cap;
1583   int saved_errno = errno;
1584
1585 #if defined(RTS_SUPPORTS_THREADS)
1586   /* Wait for permission to re-enter the RTS with the result. */
1587   ACQUIRE_LOCK(&sched_mutex);
1588   waitForReturnCapability(&sched_mutex, &cap);
1589
1590   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1591 #else
1592   grabCapability(&cap);
1593 #endif
1594
1595   /* Remove the thread off of the suspended list */
1596   prev = &suspended_ccalling_threads;
1597   for (tso = suspended_ccalling_threads; 
1598        tso != END_TSO_QUEUE; 
1599        prev = &tso->link, tso = tso->link) {
1600     if (tso->id == (StgThreadID)tok) {
1601       *prev = tso->link;
1602       break;
1603     }
1604   }
1605   if (tso == END_TSO_QUEUE) {
1606     barf("resumeThread: thread not found");
1607   }
1608   tso->link = END_TSO_QUEUE;
1609   
1610   if(tso->why_blocked == BlockedOnCCall) {
1611       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1612       tso->blocked_exceptions = NULL;
1613   }
1614   
1615   /* Reset blocking status */
1616   tso->why_blocked  = NotBlocked;
1617
1618   cap->r.rCurrentTSO = tso;
1619   RELEASE_LOCK(&sched_mutex);
1620   errno = saved_errno;
1621   return &cap->r;
1622 }
1623
1624
1625 /* ---------------------------------------------------------------------------
1626  * Static functions
1627  * ------------------------------------------------------------------------ */
1628 static void unblockThread(StgTSO *tso);
1629
1630 /* ---------------------------------------------------------------------------
1631  * Comparing Thread ids.
1632  *
1633  * This is used from STG land in the implementation of the
1634  * instances of Eq/Ord for ThreadIds.
1635  * ------------------------------------------------------------------------ */
1636
1637 int
1638 cmp_thread(StgPtr tso1, StgPtr tso2) 
1639
1640   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1641   StgThreadID id2 = ((StgTSO *)tso2)->id;
1642  
1643   if (id1 < id2) return (-1);
1644   if (id1 > id2) return 1;
1645   return 0;
1646 }
1647
1648 /* ---------------------------------------------------------------------------
1649  * Fetching the ThreadID from an StgTSO.
1650  *
1651  * This is used in the implementation of Show for ThreadIds.
1652  * ------------------------------------------------------------------------ */
1653 int
1654 rts_getThreadId(StgPtr tso) 
1655 {
1656   return ((StgTSO *)tso)->id;
1657 }
1658
1659 #ifdef DEBUG
1660 void
1661 labelThread(StgPtr tso, char *label)
1662 {
1663   int len;
1664   void *buf;
1665
1666   /* Caveat: Once set, you can only set the thread name to "" */
1667   len = strlen(label)+1;
1668   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1669   strncpy(buf,label,len);
1670   /* Update will free the old memory for us */
1671   updateThreadLabel(((StgTSO *)tso)->id,buf);
1672 }
1673 #endif /* DEBUG */
1674
1675 /* ---------------------------------------------------------------------------
1676    Create a new thread.
1677
1678    The new thread starts with the given stack size.  Before the
1679    scheduler can run, however, this thread needs to have a closure
1680    (and possibly some arguments) pushed on its stack.  See
1681    pushClosure() in Schedule.h.
1682
1683    createGenThread() and createIOThread() (in SchedAPI.h) are
1684    convenient packaged versions of this function.
1685
1686    currently pri (priority) is only used in a GRAN setup -- HWL
1687    ------------------------------------------------------------------------ */
1688 #if defined(GRAN)
1689 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1690 StgTSO *
1691 createThread(nat size, StgInt pri)
1692 #else
1693 StgTSO *
1694 createThread(nat size)
1695 #endif
1696 {
1697
1698     StgTSO *tso;
1699     nat stack_size;
1700
1701     /* First check whether we should create a thread at all */
1702 #if defined(PAR)
1703   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1704   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1705     threadsIgnored++;
1706     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1707           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1708     return END_TSO_QUEUE;
1709   }
1710   threadsCreated++;
1711 #endif
1712
1713 #if defined(GRAN)
1714   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1715 #endif
1716
1717   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1718
1719   /* catch ridiculously small stack sizes */
1720   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1721     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1722   }
1723
1724   stack_size = size - TSO_STRUCT_SIZEW;
1725
1726   tso = (StgTSO *)allocate(size);
1727   TICK_ALLOC_TSO(stack_size, 0);
1728
1729   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1730 #if defined(GRAN)
1731   SET_GRAN_HDR(tso, ThisPE);
1732 #endif
1733
1734   // Always start with the compiled code evaluator
1735   tso->what_next = ThreadRunGHC;
1736
1737   tso->id = next_thread_id++; 
1738   tso->why_blocked  = NotBlocked;
1739   tso->blocked_exceptions = NULL;
1740
1741   tso->saved_errno = 0;
1742   tso->main = NULL;
1743   
1744   tso->stack_size   = stack_size;
1745   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1746                               - TSO_STRUCT_SIZEW;
1747   tso->sp           = (P_)&(tso->stack) + stack_size;
1748
1749 #ifdef PROFILING
1750   tso->prof.CCCS = CCS_MAIN;
1751 #endif
1752
1753   /* put a stop frame on the stack */
1754   tso->sp -= sizeofW(StgStopFrame);
1755   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1756   tso->link = END_TSO_QUEUE;
1757
1758   // ToDo: check this
1759 #if defined(GRAN)
1760   /* uses more flexible routine in GranSim */
1761   insertThread(tso, CurrentProc);
1762 #else
1763   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1764    * from its creation
1765    */
1766 #endif
1767
1768 #if defined(GRAN) 
1769   if (RtsFlags.GranFlags.GranSimStats.Full) 
1770     DumpGranEvent(GR_START,tso);
1771 #elif defined(PAR)
1772   if (RtsFlags.ParFlags.ParStats.Full) 
1773     DumpGranEvent(GR_STARTQ,tso);
1774   /* HACk to avoid SCHEDULE 
1775      LastTSO = tso; */
1776 #endif
1777
1778   /* Link the new thread on the global thread list.
1779    */
1780   tso->global_link = all_threads;
1781   all_threads = tso;
1782
1783 #if defined(DIST)
1784   tso->dist.priority = MandatoryPriority; //by default that is...
1785 #endif
1786
1787 #if defined(GRAN)
1788   tso->gran.pri = pri;
1789 # if defined(DEBUG)
1790   tso->gran.magic = TSO_MAGIC; // debugging only
1791 # endif
1792   tso->gran.sparkname   = 0;
1793   tso->gran.startedat   = CURRENT_TIME; 
1794   tso->gran.exported    = 0;
1795   tso->gran.basicblocks = 0;
1796   tso->gran.allocs      = 0;
1797   tso->gran.exectime    = 0;
1798   tso->gran.fetchtime   = 0;
1799   tso->gran.fetchcount  = 0;
1800   tso->gran.blocktime   = 0;
1801   tso->gran.blockcount  = 0;
1802   tso->gran.blockedat   = 0;
1803   tso->gran.globalsparks = 0;
1804   tso->gran.localsparks  = 0;
1805   if (RtsFlags.GranFlags.Light)
1806     tso->gran.clock  = Now; /* local clock */
1807   else
1808     tso->gran.clock  = 0;
1809
1810   IF_DEBUG(gran,printTSO(tso));
1811 #elif defined(PAR)
1812 # if defined(DEBUG)
1813   tso->par.magic = TSO_MAGIC; // debugging only
1814 # endif
1815   tso->par.sparkname   = 0;
1816   tso->par.startedat   = CURRENT_TIME; 
1817   tso->par.exported    = 0;
1818   tso->par.basicblocks = 0;
1819   tso->par.allocs      = 0;
1820   tso->par.exectime    = 0;
1821   tso->par.fetchtime   = 0;
1822   tso->par.fetchcount  = 0;
1823   tso->par.blocktime   = 0;
1824   tso->par.blockcount  = 0;
1825   tso->par.blockedat   = 0;
1826   tso->par.globalsparks = 0;
1827   tso->par.localsparks  = 0;
1828 #endif
1829
1830 #if defined(GRAN)
1831   globalGranStats.tot_threads_created++;
1832   globalGranStats.threads_created_on_PE[CurrentProc]++;
1833   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1834   globalGranStats.tot_sq_probes++;
1835 #elif defined(PAR)
1836   // collect parallel global statistics (currently done together with GC stats)
1837   if (RtsFlags.ParFlags.ParStats.Global &&
1838       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1839     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1840     globalParStats.tot_threads_created++;
1841   }
1842 #endif 
1843
1844 #if defined(GRAN)
1845   IF_GRAN_DEBUG(pri,
1846                 belch("==__ schedule: Created TSO %d (%p);",
1847                       CurrentProc, tso, tso->id));
1848 #elif defined(PAR)
1849     IF_PAR_DEBUG(verbose,
1850                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1851                        tso->id, tso, advisory_thread_count));
1852 #else
1853   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1854                                  tso->id, tso->stack_size));
1855 #endif    
1856   return tso;
1857 }
1858
1859 #if defined(PAR)
1860 /* RFP:
1861    all parallel thread creation calls should fall through the following routine.
1862 */
1863 StgTSO *
1864 createSparkThread(rtsSpark spark) 
1865 { StgTSO *tso;
1866   ASSERT(spark != (rtsSpark)NULL);
1867   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1868   { threadsIgnored++;
1869     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1870           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1871     return END_TSO_QUEUE;
1872   }
1873   else
1874   { threadsCreated++;
1875     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1876     if (tso==END_TSO_QUEUE)     
1877       barf("createSparkThread: Cannot create TSO");
1878 #if defined(DIST)
1879     tso->priority = AdvisoryPriority;
1880 #endif
1881     pushClosure(tso,spark);
1882     PUSH_ON_RUN_QUEUE(tso);
1883     advisory_thread_count++;    
1884   }
1885   return tso;
1886 }
1887 #endif
1888
1889 /*
1890   Turn a spark into a thread.
1891   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1892 */
1893 #if defined(PAR)
1894 StgTSO *
1895 activateSpark (rtsSpark spark) 
1896 {
1897   StgTSO *tso;
1898
1899   tso = createSparkThread(spark);
1900   if (RtsFlags.ParFlags.ParStats.Full) {   
1901     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1902     IF_PAR_DEBUG(verbose,
1903                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1904                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1905   }
1906   // ToDo: fwd info on local/global spark to thread -- HWL
1907   // tso->gran.exported =  spark->exported;
1908   // tso->gran.locked =   !spark->global;
1909   // tso->gran.sparkname = spark->name;
1910
1911   return tso;
1912 }
1913 #endif
1914
1915 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1916                                    Capability *initialCapability
1917                                    );
1918
1919
1920 /* ---------------------------------------------------------------------------
1921  * scheduleThread()
1922  *
1923  * scheduleThread puts a thread on the head of the runnable queue.
1924  * This will usually be done immediately after a thread is created.
1925  * The caller of scheduleThread must create the thread using e.g.
1926  * createThread and push an appropriate closure
1927  * on this thread's stack before the scheduler is invoked.
1928  * ------------------------------------------------------------------------ */
1929
1930 static void scheduleThread_ (StgTSO* tso);
1931
1932 void
1933 scheduleThread_(StgTSO *tso)
1934 {
1935   // Precondition: sched_mutex must be held.
1936   // The thread goes at the *end* of the run-queue, to avoid possible
1937   // starvation of any threads already on the queue.
1938   APPEND_TO_RUN_QUEUE(tso);
1939   THREAD_RUNNABLE();
1940 }
1941
1942 void
1943 scheduleThread(StgTSO* tso)
1944 {
1945   ACQUIRE_LOCK(&sched_mutex);
1946   scheduleThread_(tso);
1947   RELEASE_LOCK(&sched_mutex);
1948 }
1949
1950 #if defined(RTS_SUPPORTS_THREADS)
1951 static Condition bound_cond_cache;
1952 static int bound_cond_cache_full = 0;
1953 #endif
1954
1955
1956 SchedulerStatus
1957 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1958                    Capability *initialCapability)
1959 {
1960     // Precondition: sched_mutex must be held
1961     StgMainThread *m;
1962
1963     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1964     m->tso = tso;
1965     tso->main = m;
1966     m->ret = ret;
1967     m->stat = NoStatus;
1968     m->link = main_threads;
1969     m->prev = NULL;
1970     if (main_threads != NULL) {
1971         main_threads->prev = m;
1972     }
1973     main_threads = m;
1974
1975 #if defined(RTS_SUPPORTS_THREADS)
1976     // Allocating a new condition for each thread is expensive, so we
1977     // cache one.  This is a pretty feeble hack, but it helps speed up
1978     // consecutive call-ins quite a bit.
1979     if (bound_cond_cache_full) {
1980         m->bound_thread_cond = bound_cond_cache;
1981         bound_cond_cache_full = 0;
1982     } else {
1983         initCondition(&m->bound_thread_cond);
1984     }
1985 #endif
1986
1987     /* Put the thread on the main-threads list prior to scheduling the TSO.
1988        Failure to do so introduces a race condition in the MT case (as
1989        identified by Wolfgang Thaller), whereby the new task/OS thread 
1990        created by scheduleThread_() would complete prior to the thread
1991        that spawned it managed to put 'itself' on the main-threads list.
1992        The upshot of it all being that the worker thread wouldn't get to
1993        signal the completion of the its work item for the main thread to
1994        see (==> it got stuck waiting.)    -- sof 6/02.
1995     */
1996     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1997     
1998     APPEND_TO_RUN_QUEUE(tso);
1999     // NB. Don't call THREAD_RUNNABLE() here, because the thread is
2000     // bound and only runnable by *this* OS thread, so waking up other
2001     // workers will just slow things down.
2002
2003     return waitThread_(m, initialCapability);
2004 }
2005
2006 /* ---------------------------------------------------------------------------
2007  * initScheduler()
2008  *
2009  * Initialise the scheduler.  This resets all the queues - if the
2010  * queues contained any threads, they'll be garbage collected at the
2011  * next pass.
2012  *
2013  * ------------------------------------------------------------------------ */
2014
2015 void 
2016 initScheduler(void)
2017 {
2018 #if defined(GRAN)
2019   nat i;
2020
2021   for (i=0; i<=MAX_PROC; i++) {
2022     run_queue_hds[i]      = END_TSO_QUEUE;
2023     run_queue_tls[i]      = END_TSO_QUEUE;
2024     blocked_queue_hds[i]  = END_TSO_QUEUE;
2025     blocked_queue_tls[i]  = END_TSO_QUEUE;
2026     ccalling_threadss[i]  = END_TSO_QUEUE;
2027     sleeping_queue        = END_TSO_QUEUE;
2028   }
2029 #else
2030   run_queue_hd      = END_TSO_QUEUE;
2031   run_queue_tl      = END_TSO_QUEUE;
2032   blocked_queue_hd  = END_TSO_QUEUE;
2033   blocked_queue_tl  = END_TSO_QUEUE;
2034   sleeping_queue    = END_TSO_QUEUE;
2035 #endif 
2036
2037   suspended_ccalling_threads  = END_TSO_QUEUE;
2038
2039   main_threads = NULL;
2040   all_threads  = END_TSO_QUEUE;
2041
2042   context_switch = 0;
2043   interrupted    = 0;
2044
2045   RtsFlags.ConcFlags.ctxtSwitchTicks =
2046       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2047       
2048 #if defined(RTS_SUPPORTS_THREADS)
2049   /* Initialise the mutex and condition variables used by
2050    * the scheduler. */
2051   initMutex(&sched_mutex);
2052   initMutex(&term_mutex);
2053 #endif
2054   
2055   ACQUIRE_LOCK(&sched_mutex);
2056
2057   /* A capability holds the state a native thread needs in
2058    * order to execute STG code. At least one capability is
2059    * floating around (only SMP builds have more than one).
2060    */
2061   initCapabilities();
2062   
2063 #if defined(RTS_SUPPORTS_THREADS)
2064     /* start our haskell execution tasks */
2065     startTaskManager(0,taskStart);
2066 #endif
2067
2068 #if /* defined(SMP) ||*/ defined(PAR)
2069   initSparkPools();
2070 #endif
2071
2072   RELEASE_LOCK(&sched_mutex);
2073 }
2074
2075 void
2076 exitScheduler( void )
2077 {
2078 #if defined(RTS_SUPPORTS_THREADS)
2079   stopTaskManager();
2080 #endif
2081   shutting_down_scheduler = rtsTrue;
2082 }
2083
2084 /* ----------------------------------------------------------------------------
2085    Managing the per-task allocation areas.
2086    
2087    Each capability comes with an allocation area.  These are
2088    fixed-length block lists into which allocation can be done.
2089
2090    ToDo: no support for two-space collection at the moment???
2091    ------------------------------------------------------------------------- */
2092
2093 static
2094 SchedulerStatus
2095 waitThread_(StgMainThread* m, Capability *initialCapability)
2096 {
2097   SchedulerStatus stat;
2098
2099   // Precondition: sched_mutex must be held.
2100   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2101
2102 #if defined(GRAN)
2103   /* GranSim specific init */
2104   CurrentTSO = m->tso;                // the TSO to run
2105   procStatus[MainProc] = Busy;        // status of main PE
2106   CurrentProc = MainProc;             // PE to run it on
2107   schedule(m,initialCapability);
2108 #else
2109   schedule(m,initialCapability);
2110   ASSERT(m->stat != NoStatus);
2111 #endif
2112
2113   stat = m->stat;
2114
2115 #if defined(RTS_SUPPORTS_THREADS)
2116   // Free the condition variable, returning it to the cache if possible.
2117   if (!bound_cond_cache_full) {
2118       bound_cond_cache = m->bound_thread_cond;
2119       bound_cond_cache_full = 1;
2120   } else {
2121       closeCondition(&m->bound_thread_cond);
2122   }
2123 #endif
2124
2125   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2126   stgFree(m);
2127
2128   // Postcondition: sched_mutex still held
2129   return stat;
2130 }
2131
2132 /* ---------------------------------------------------------------------------
2133    Where are the roots that we know about?
2134
2135         - all the threads on the runnable queue
2136         - all the threads on the blocked queue
2137         - all the threads on the sleeping queue
2138         - all the thread currently executing a _ccall_GC
2139         - all the "main threads"
2140      
2141    ------------------------------------------------------------------------ */
2142
2143 /* This has to be protected either by the scheduler monitor, or by the
2144         garbage collection monitor (probably the latter).
2145         KH @ 25/10/99
2146 */
2147
2148 void
2149 GetRoots( evac_fn evac )
2150 {
2151 #if defined(GRAN)
2152   {
2153     nat i;
2154     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2155       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2156           evac((StgClosure **)&run_queue_hds[i]);
2157       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2158           evac((StgClosure **)&run_queue_tls[i]);
2159       
2160       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2161           evac((StgClosure **)&blocked_queue_hds[i]);
2162       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2163           evac((StgClosure **)&blocked_queue_tls[i]);
2164       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2165           evac((StgClosure **)&ccalling_threads[i]);
2166     }
2167   }
2168
2169   markEventQueue();
2170
2171 #else /* !GRAN */
2172   if (run_queue_hd != END_TSO_QUEUE) {
2173       ASSERT(run_queue_tl != END_TSO_QUEUE);
2174       evac((StgClosure **)&run_queue_hd);
2175       evac((StgClosure **)&run_queue_tl);
2176   }
2177   
2178   if (blocked_queue_hd != END_TSO_QUEUE) {
2179       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2180       evac((StgClosure **)&blocked_queue_hd);
2181       evac((StgClosure **)&blocked_queue_tl);
2182   }
2183   
2184   if (sleeping_queue != END_TSO_QUEUE) {
2185       evac((StgClosure **)&sleeping_queue);
2186   }
2187 #endif 
2188
2189   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2190       evac((StgClosure **)&suspended_ccalling_threads);
2191   }
2192
2193 #if defined(PAR) || defined(GRAN)
2194   markSparkQueue(evac);
2195 #endif
2196
2197 #if defined(RTS_USER_SIGNALS)
2198   // mark the signal handlers (signals should be already blocked)
2199   markSignalHandlers(evac);
2200 #endif
2201 }
2202
2203 /* -----------------------------------------------------------------------------
2204    performGC
2205
2206    This is the interface to the garbage collector from Haskell land.
2207    We provide this so that external C code can allocate and garbage
2208    collect when called from Haskell via _ccall_GC.
2209
2210    It might be useful to provide an interface whereby the programmer
2211    can specify more roots (ToDo).
2212    
2213    This needs to be protected by the GC condition variable above.  KH.
2214    -------------------------------------------------------------------------- */
2215
2216 static void (*extra_roots)(evac_fn);
2217
2218 void
2219 performGC(void)
2220 {
2221   /* Obligated to hold this lock upon entry */
2222   ACQUIRE_LOCK(&sched_mutex);
2223   GarbageCollect(GetRoots,rtsFalse);
2224   RELEASE_LOCK(&sched_mutex);
2225 }
2226
2227 void
2228 performMajorGC(void)
2229 {
2230   ACQUIRE_LOCK(&sched_mutex);
2231   GarbageCollect(GetRoots,rtsTrue);
2232   RELEASE_LOCK(&sched_mutex);
2233 }
2234
2235 static void
2236 AllRoots(evac_fn evac)
2237 {
2238     GetRoots(evac);             // the scheduler's roots
2239     extra_roots(evac);          // the user's roots
2240 }
2241
2242 void
2243 performGCWithRoots(void (*get_roots)(evac_fn))
2244 {
2245   ACQUIRE_LOCK(&sched_mutex);
2246   extra_roots = get_roots;
2247   GarbageCollect(AllRoots,rtsFalse);
2248   RELEASE_LOCK(&sched_mutex);
2249 }
2250
2251 /* -----------------------------------------------------------------------------
2252    Stack overflow
2253
2254    If the thread has reached its maximum stack size, then raise the
2255    StackOverflow exception in the offending thread.  Otherwise
2256    relocate the TSO into a larger chunk of memory and adjust its stack
2257    size appropriately.
2258    -------------------------------------------------------------------------- */
2259
2260 static StgTSO *
2261 threadStackOverflow(StgTSO *tso)
2262 {
2263   nat new_stack_size, new_tso_size, stack_words;
2264   StgPtr new_sp;
2265   StgTSO *dest;
2266
2267   IF_DEBUG(sanity,checkTSO(tso));
2268   if (tso->stack_size >= tso->max_stack_size) {
2269
2270     IF_DEBUG(gc,
2271              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2272                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2273              /* If we're debugging, just print out the top of the stack */
2274              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2275                                               tso->sp+64)));
2276
2277     /* Send this thread the StackOverflow exception */
2278     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2279     return tso;
2280   }
2281
2282   /* Try to double the current stack size.  If that takes us over the
2283    * maximum stack size for this thread, then use the maximum instead.
2284    * Finally round up so the TSO ends up as a whole number of blocks.
2285    */
2286   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2287   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2288                                        TSO_STRUCT_SIZE)/sizeof(W_);
2289   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2290   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2291
2292   IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2293
2294   dest = (StgTSO *)allocate(new_tso_size);
2295   TICK_ALLOC_TSO(new_stack_size,0);
2296
2297   /* copy the TSO block and the old stack into the new area */
2298   memcpy(dest,tso,TSO_STRUCT_SIZE);
2299   stack_words = tso->stack + tso->stack_size - tso->sp;
2300   new_sp = (P_)dest + new_tso_size - stack_words;
2301   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2302
2303   /* relocate the stack pointers... */
2304   dest->sp         = new_sp;
2305   dest->stack_size = new_stack_size;
2306         
2307   /* Mark the old TSO as relocated.  We have to check for relocated
2308    * TSOs in the garbage collector and any primops that deal with TSOs.
2309    *
2310    * It's important to set the sp value to just beyond the end
2311    * of the stack, so we don't attempt to scavenge any part of the
2312    * dead TSO's stack.
2313    */
2314   tso->what_next = ThreadRelocated;
2315   tso->link = dest;
2316   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2317   tso->why_blocked = NotBlocked;
2318   dest->mut_link = NULL;
2319
2320   IF_PAR_DEBUG(verbose,
2321                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2322                      tso->id, tso, tso->stack_size);
2323                /* If we're debugging, just print out the top of the stack */
2324                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2325                                                 tso->sp+64)));
2326   
2327   IF_DEBUG(sanity,checkTSO(tso));
2328 #if 0
2329   IF_DEBUG(scheduler,printTSO(dest));
2330 #endif
2331
2332   return dest;
2333 }
2334
2335 /* ---------------------------------------------------------------------------
2336    Wake up a queue that was blocked on some resource.
2337    ------------------------------------------------------------------------ */
2338
2339 #if defined(GRAN)
2340 STATIC_INLINE void
2341 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2342 {
2343 }
2344 #elif defined(PAR)
2345 STATIC_INLINE void
2346 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2347 {
2348   /* write RESUME events to log file and
2349      update blocked and fetch time (depending on type of the orig closure) */
2350   if (RtsFlags.ParFlags.ParStats.Full) {
2351     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2352                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2353                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2354     if (EMPTY_RUN_QUEUE())
2355       emitSchedule = rtsTrue;
2356
2357     switch (get_itbl(node)->type) {
2358         case FETCH_ME_BQ:
2359           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2360           break;
2361         case RBH:
2362         case FETCH_ME:
2363         case BLACKHOLE_BQ:
2364           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2365           break;
2366 #ifdef DIST
2367         case MVAR:
2368           break;
2369 #endif    
2370         default:
2371           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2372         }
2373       }
2374 }
2375 #endif
2376
2377 #if defined(GRAN)
2378 static StgBlockingQueueElement *
2379 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2380 {
2381     StgTSO *tso;
2382     PEs node_loc, tso_loc;
2383
2384     node_loc = where_is(node); // should be lifted out of loop
2385     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2386     tso_loc = where_is((StgClosure *)tso);
2387     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2388       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2389       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2390       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2391       // insertThread(tso, node_loc);
2392       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2393                 ResumeThread,
2394                 tso, node, (rtsSpark*)NULL);
2395       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2396       // len_local++;
2397       // len++;
2398     } else { // TSO is remote (actually should be FMBQ)
2399       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2400                                   RtsFlags.GranFlags.Costs.gunblocktime +
2401                                   RtsFlags.GranFlags.Costs.latency;
2402       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2403                 UnblockThread,
2404                 tso, node, (rtsSpark*)NULL);
2405       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2406       // len++;
2407     }
2408     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2409     IF_GRAN_DEBUG(bq,
2410                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2411                           (node_loc==tso_loc ? "Local" : "Global"), 
2412                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2413     tso->block_info.closure = NULL;
2414     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2415                              tso->id, tso));
2416 }
2417 #elif defined(PAR)
2418 static StgBlockingQueueElement *
2419 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2420 {
2421     StgBlockingQueueElement *next;
2422
2423     switch (get_itbl(bqe)->type) {
2424     case TSO:
2425       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2426       /* if it's a TSO just push it onto the run_queue */
2427       next = bqe->link;
2428       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2429       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2430       THREAD_RUNNABLE();
2431       unblockCount(bqe, node);
2432       /* reset blocking status after dumping event */
2433       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2434       break;
2435
2436     case BLOCKED_FETCH:
2437       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2438       next = bqe->link;
2439       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2440       PendingFetches = (StgBlockedFetch *)bqe;
2441       break;
2442
2443 # if defined(DEBUG)
2444       /* can ignore this case in a non-debugging setup; 
2445          see comments on RBHSave closures above */
2446     case CONSTR:
2447       /* check that the closure is an RBHSave closure */
2448       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2449              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2450              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2451       break;
2452
2453     default:
2454       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2455            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2456            (StgClosure *)bqe);
2457 # endif
2458     }
2459   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2460   return next;
2461 }
2462
2463 #else /* !GRAN && !PAR */
2464 static StgTSO *
2465 unblockOneLocked(StgTSO *tso)
2466 {
2467   StgTSO *next;
2468
2469   ASSERT(get_itbl(tso)->type == TSO);
2470   ASSERT(tso->why_blocked != NotBlocked);
2471   tso->why_blocked = NotBlocked;
2472   next = tso->link;
2473   tso->link = END_TSO_QUEUE;
2474   APPEND_TO_RUN_QUEUE(tso);
2475   THREAD_RUNNABLE();
2476   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2477   return next;
2478 }
2479 #endif
2480
2481 #if defined(GRAN) || defined(PAR)
2482 INLINE_ME StgBlockingQueueElement *
2483 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2484 {
2485   ACQUIRE_LOCK(&sched_mutex);
2486   bqe = unblockOneLocked(bqe, node);
2487   RELEASE_LOCK(&sched_mutex);
2488   return bqe;
2489 }
2490 #else
2491 INLINE_ME StgTSO *
2492 unblockOne(StgTSO *tso)
2493 {
2494   ACQUIRE_LOCK(&sched_mutex);
2495   tso = unblockOneLocked(tso);
2496   RELEASE_LOCK(&sched_mutex);
2497   return tso;
2498 }
2499 #endif
2500
2501 #if defined(GRAN)
2502 void 
2503 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2504 {
2505   StgBlockingQueueElement *bqe;
2506   PEs node_loc;
2507   nat len = 0; 
2508
2509   IF_GRAN_DEBUG(bq, 
2510                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2511                       node, CurrentProc, CurrentTime[CurrentProc], 
2512                       CurrentTSO->id, CurrentTSO));
2513
2514   node_loc = where_is(node);
2515
2516   ASSERT(q == END_BQ_QUEUE ||
2517          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2518          get_itbl(q)->type == CONSTR); // closure (type constructor)
2519   ASSERT(is_unique(node));
2520
2521   /* FAKE FETCH: magically copy the node to the tso's proc;
2522      no Fetch necessary because in reality the node should not have been 
2523      moved to the other PE in the first place
2524   */
2525   if (CurrentProc!=node_loc) {
2526     IF_GRAN_DEBUG(bq, 
2527                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2528                         node, node_loc, CurrentProc, CurrentTSO->id, 
2529                         // CurrentTSO, where_is(CurrentTSO),
2530                         node->header.gran.procs));
2531     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2532     IF_GRAN_DEBUG(bq, 
2533                   belch("## new bitmask of node %p is %#x",
2534                         node, node->header.gran.procs));
2535     if (RtsFlags.GranFlags.GranSimStats.Global) {
2536       globalGranStats.tot_fake_fetches++;
2537     }
2538   }
2539
2540   bqe = q;
2541   // ToDo: check: ASSERT(CurrentProc==node_loc);
2542   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2543     //next = bqe->link;
2544     /* 
2545        bqe points to the current element in the queue
2546        next points to the next element in the queue
2547     */
2548     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2549     //tso_loc = where_is(tso);
2550     len++;
2551     bqe = unblockOneLocked(bqe, node);
2552   }
2553
2554   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2555      the closure to make room for the anchor of the BQ */
2556   if (bqe!=END_BQ_QUEUE) {
2557     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2558     /*
2559     ASSERT((info_ptr==&RBH_Save_0_info) ||
2560            (info_ptr==&RBH_Save_1_info) ||
2561            (info_ptr==&RBH_Save_2_info));
2562     */
2563     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2564     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2565     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2566
2567     IF_GRAN_DEBUG(bq,
2568                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2569                         node, info_type(node)));
2570   }
2571
2572   /* statistics gathering */
2573   if (RtsFlags.GranFlags.GranSimStats.Global) {
2574     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2575     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2576     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2577     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2578   }
2579   IF_GRAN_DEBUG(bq,
2580                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2581                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2582 }
2583 #elif defined(PAR)
2584 void 
2585 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2586 {
2587   StgBlockingQueueElement *bqe;
2588
2589   ACQUIRE_LOCK(&sched_mutex);
2590
2591   IF_PAR_DEBUG(verbose, 
2592                belch("##-_ AwBQ for node %p on [%x]: ",
2593                      node, mytid));
2594 #ifdef DIST  
2595   //RFP
2596   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2597     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2598     return;
2599   }
2600 #endif
2601   
2602   ASSERT(q == END_BQ_QUEUE ||
2603          get_itbl(q)->type == TSO ||           
2604          get_itbl(q)->type == BLOCKED_FETCH || 
2605          get_itbl(q)->type == CONSTR); 
2606
2607   bqe = q;
2608   while (get_itbl(bqe)->type==TSO || 
2609          get_itbl(bqe)->type==BLOCKED_FETCH) {
2610     bqe = unblockOneLocked(bqe, node);
2611   }
2612   RELEASE_LOCK(&sched_mutex);
2613 }
2614
2615 #else   /* !GRAN && !PAR */
2616
2617 void
2618 awakenBlockedQueueNoLock(StgTSO *tso)
2619 {
2620   while (tso != END_TSO_QUEUE) {
2621     tso = unblockOneLocked(tso);
2622   }
2623 }
2624
2625 void
2626 awakenBlockedQueue(StgTSO *tso)
2627 {
2628   ACQUIRE_LOCK(&sched_mutex);
2629   while (tso != END_TSO_QUEUE) {
2630     tso = unblockOneLocked(tso);
2631   }
2632   RELEASE_LOCK(&sched_mutex);
2633 }
2634 #endif
2635
2636 /* ---------------------------------------------------------------------------
2637    Interrupt execution
2638    - usually called inside a signal handler so it mustn't do anything fancy.   
2639    ------------------------------------------------------------------------ */
2640
2641 void
2642 interruptStgRts(void)
2643 {
2644     interrupted    = 1;
2645     context_switch = 1;
2646 #ifdef RTS_SUPPORTS_THREADS
2647     wakeBlockedWorkerThread();
2648 #endif
2649 }
2650
2651 /* -----------------------------------------------------------------------------
2652    Unblock a thread
2653
2654    This is for use when we raise an exception in another thread, which
2655    may be blocked.
2656    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2657    -------------------------------------------------------------------------- */
2658
2659 #if defined(GRAN) || defined(PAR)
2660 /*
2661   NB: only the type of the blocking queue is different in GranSim and GUM
2662       the operations on the queue-elements are the same
2663       long live polymorphism!
2664
2665   Locks: sched_mutex is held upon entry and exit.
2666
2667 */
2668 static void
2669 unblockThread(StgTSO *tso)
2670 {
2671   StgBlockingQueueElement *t, **last;
2672
2673   switch (tso->why_blocked) {
2674
2675   case NotBlocked:
2676     return;  /* not blocked */
2677
2678   case BlockedOnMVar:
2679     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2680     {
2681       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2682       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2683
2684       last = (StgBlockingQueueElement **)&mvar->head;
2685       for (t = (StgBlockingQueueElement *)mvar->head; 
2686            t != END_BQ_QUEUE; 
2687            last = &t->link, last_tso = t, t = t->link) {
2688         if (t == (StgBlockingQueueElement *)tso) {
2689           *last = (StgBlockingQueueElement *)tso->link;
2690           if (mvar->tail == tso) {
2691             mvar->tail = (StgTSO *)last_tso;
2692           }
2693           goto done;
2694         }
2695       }
2696       barf("unblockThread (MVAR): TSO not found");
2697     }
2698
2699   case BlockedOnBlackHole:
2700     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2701     {
2702       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2703
2704       last = &bq->blocking_queue;
2705       for (t = bq->blocking_queue; 
2706            t != END_BQ_QUEUE; 
2707            last = &t->link, t = t->link) {
2708         if (t == (StgBlockingQueueElement *)tso) {
2709           *last = (StgBlockingQueueElement *)tso->link;
2710           goto done;
2711         }
2712       }
2713       barf("unblockThread (BLACKHOLE): TSO not found");
2714     }
2715
2716   case BlockedOnException:
2717     {
2718       StgTSO *target  = tso->block_info.tso;
2719
2720       ASSERT(get_itbl(target)->type == TSO);
2721
2722       if (target->what_next == ThreadRelocated) {
2723           target = target->link;
2724           ASSERT(get_itbl(target)->type == TSO);
2725       }
2726
2727       ASSERT(target->blocked_exceptions != NULL);
2728
2729       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2730       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2731            t != END_BQ_QUEUE; 
2732            last = &t->link, t = t->link) {
2733         ASSERT(get_itbl(t)->type == TSO);
2734         if (t == (StgBlockingQueueElement *)tso) {
2735           *last = (StgBlockingQueueElement *)tso->link;
2736           goto done;
2737         }
2738       }
2739       barf("unblockThread (Exception): TSO not found");
2740     }
2741
2742   case BlockedOnRead:
2743   case BlockedOnWrite:
2744 #if defined(mingw32_TARGET_OS)
2745   case BlockedOnDoProc:
2746 #endif
2747     {
2748       /* take TSO off blocked_queue */
2749       StgBlockingQueueElement *prev = NULL;
2750       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2751            prev = t, t = t->link) {
2752         if (t == (StgBlockingQueueElement *)tso) {
2753           if (prev == NULL) {
2754             blocked_queue_hd = (StgTSO *)t->link;
2755             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2756               blocked_queue_tl = END_TSO_QUEUE;
2757             }
2758           } else {
2759             prev->link = t->link;
2760             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2761               blocked_queue_tl = (StgTSO *)prev;
2762             }
2763           }
2764           goto done;
2765         }
2766       }
2767       barf("unblockThread (I/O): TSO not found");
2768     }
2769
2770   case BlockedOnDelay:
2771     {
2772       /* take TSO off sleeping_queue */
2773       StgBlockingQueueElement *prev = NULL;
2774       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2775            prev = t, t = t->link) {
2776         if (t == (StgBlockingQueueElement *)tso) {
2777           if (prev == NULL) {
2778             sleeping_queue = (StgTSO *)t->link;
2779           } else {
2780             prev->link = t->link;
2781           }
2782           goto done;
2783         }
2784       }
2785       barf("unblockThread (delay): TSO not found");
2786     }
2787
2788   default:
2789     barf("unblockThread");
2790   }
2791
2792  done:
2793   tso->link = END_TSO_QUEUE;
2794   tso->why_blocked = NotBlocked;
2795   tso->block_info.closure = NULL;
2796   PUSH_ON_RUN_QUEUE(tso);
2797 }
2798 #else
2799 static void
2800 unblockThread(StgTSO *tso)
2801 {
2802   StgTSO *t, **last;
2803   
2804   /* To avoid locking unnecessarily. */
2805   if (tso->why_blocked == NotBlocked) {
2806     return;
2807   }
2808
2809   switch (tso->why_blocked) {
2810
2811   case BlockedOnMVar:
2812     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2813     {
2814       StgTSO *last_tso = END_TSO_QUEUE;
2815       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2816
2817       last = &mvar->head;
2818       for (t = mvar->head; t != END_TSO_QUEUE; 
2819            last = &t->link, last_tso = t, t = t->link) {
2820         if (t == tso) {
2821           *last = tso->link;
2822           if (mvar->tail == tso) {
2823             mvar->tail = last_tso;
2824           }
2825           goto done;
2826         }
2827       }
2828       barf("unblockThread (MVAR): TSO not found");
2829     }
2830
2831   case BlockedOnBlackHole:
2832     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2833     {
2834       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2835
2836       last = &bq->blocking_queue;
2837       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2838            last = &t->link, t = t->link) {
2839         if (t == tso) {
2840           *last = tso->link;
2841           goto done;
2842         }
2843       }
2844       barf("unblockThread (BLACKHOLE): TSO not found");
2845     }
2846
2847   case BlockedOnException:
2848     {
2849       StgTSO *target  = tso->block_info.tso;
2850
2851       ASSERT(get_itbl(target)->type == TSO);
2852
2853       while (target->what_next == ThreadRelocated) {
2854           target = target->link;
2855           ASSERT(get_itbl(target)->type == TSO);
2856       }
2857       
2858       ASSERT(target->blocked_exceptions != NULL);
2859
2860       last = &target->blocked_exceptions;
2861       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2862            last = &t->link, t = t->link) {
2863         ASSERT(get_itbl(t)->type == TSO);
2864         if (t == tso) {
2865           *last = tso->link;
2866           goto done;
2867         }
2868       }
2869       barf("unblockThread (Exception): TSO not found");
2870     }
2871
2872   case BlockedOnRead:
2873   case BlockedOnWrite:
2874 #if defined(mingw32_TARGET_OS)
2875   case BlockedOnDoProc:
2876 #endif
2877     {
2878       StgTSO *prev = NULL;
2879       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2880            prev = t, t = t->link) {
2881         if (t == tso) {
2882           if (prev == NULL) {
2883             blocked_queue_hd = t->link;
2884             if (blocked_queue_tl == t) {
2885               blocked_queue_tl = END_TSO_QUEUE;
2886             }
2887           } else {
2888             prev->link = t->link;
2889             if (blocked_queue_tl == t) {
2890               blocked_queue_tl = prev;
2891             }
2892           }
2893           goto done;
2894         }
2895       }
2896       barf("unblockThread (I/O): TSO not found");
2897     }
2898
2899   case BlockedOnDelay:
2900     {
2901       StgTSO *prev = NULL;
2902       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2903            prev = t, t = t->link) {
2904         if (t == tso) {
2905           if (prev == NULL) {
2906             sleeping_queue = t->link;
2907           } else {
2908             prev->link = t->link;
2909           }
2910           goto done;
2911         }
2912       }
2913       barf("unblockThread (delay): TSO not found");
2914     }
2915
2916   default:
2917     barf("unblockThread");
2918   }
2919
2920  done:
2921   tso->link = END_TSO_QUEUE;
2922   tso->why_blocked = NotBlocked;
2923   tso->block_info.closure = NULL;
2924   APPEND_TO_RUN_QUEUE(tso);
2925 }
2926 #endif
2927
2928 /* -----------------------------------------------------------------------------
2929  * raiseAsync()
2930  *
2931  * The following function implements the magic for raising an
2932  * asynchronous exception in an existing thread.
2933  *
2934  * We first remove the thread from any queue on which it might be
2935  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2936  *
2937  * We strip the stack down to the innermost CATCH_FRAME, building
2938  * thunks in the heap for all the active computations, so they can 
2939  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2940  * an application of the handler to the exception, and push it on
2941  * the top of the stack.
2942  * 
2943  * How exactly do we save all the active computations?  We create an
2944  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2945  * AP_STACKs pushes everything from the corresponding update frame
2946  * upwards onto the stack.  (Actually, it pushes everything up to the
2947  * next update frame plus a pointer to the next AP_STACK object.
2948  * Entering the next AP_STACK object pushes more onto the stack until we
2949  * reach the last AP_STACK object - at which point the stack should look
2950  * exactly as it did when we killed the TSO and we can continue
2951  * execution by entering the closure on top of the stack.
2952  *
2953  * We can also kill a thread entirely - this happens if either (a) the 
2954  * exception passed to raiseAsync is NULL, or (b) there's no
2955  * CATCH_FRAME on the stack.  In either case, we strip the entire
2956  * stack and replace the thread with a zombie.
2957  *
2958  * Locks: sched_mutex held upon entry nor exit.
2959  *
2960  * -------------------------------------------------------------------------- */
2961  
2962 void 
2963 deleteThread(StgTSO *tso)
2964 {
2965   raiseAsync(tso,NULL);
2966 }
2967
2968 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2969 static void 
2970 deleteThreadImmediately(StgTSO *tso)
2971 { // for forkProcess only:
2972   // delete thread without giving it a chance to catch the KillThread exception
2973
2974   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2975       return;
2976   }
2977
2978   if (tso->why_blocked != BlockedOnCCall &&
2979       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2980     unblockThread(tso);
2981   }
2982
2983   tso->what_next = ThreadKilled;
2984 }
2985 #endif
2986
2987 void
2988 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2989 {
2990   /* When raising async exs from contexts where sched_mutex isn't held;
2991      use raiseAsyncWithLock(). */
2992   ACQUIRE_LOCK(&sched_mutex);
2993   raiseAsync(tso,exception);
2994   RELEASE_LOCK(&sched_mutex);
2995 }
2996
2997 void
2998 raiseAsync(StgTSO *tso, StgClosure *exception)
2999 {
3000     StgRetInfoTable *info;
3001     StgPtr sp;
3002   
3003     // Thread already dead?
3004     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3005         return;
3006     }
3007
3008     IF_DEBUG(scheduler, 
3009              sched_belch("raising exception in thread %ld.", tso->id));
3010     
3011     // Remove it from any blocking queues
3012     unblockThread(tso);
3013
3014     sp = tso->sp;
3015     
3016     // The stack freezing code assumes there's a closure pointer on
3017     // the top of the stack, so we have to arrange that this is the case...
3018     //
3019     if (sp[0] == (W_)&stg_enter_info) {
3020         sp++;
3021     } else {
3022         sp--;
3023         sp[0] = (W_)&stg_dummy_ret_closure;
3024     }
3025
3026     while (1) {
3027         nat i;
3028
3029         // 1. Let the top of the stack be the "current closure"
3030         //
3031         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3032         // CATCH_FRAME.
3033         //
3034         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3035         // current closure applied to the chunk of stack up to (but not
3036         // including) the update frame.  This closure becomes the "current
3037         // closure".  Go back to step 2.
3038         //
3039         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3040         // top of the stack applied to the exception.
3041         // 
3042         // 5. If it's a STOP_FRAME, then kill the thread.
3043         
3044         StgPtr frame;
3045         
3046         frame = sp + 1;
3047         info = get_ret_itbl((StgClosure *)frame);
3048         
3049         while (info->i.type != UPDATE_FRAME
3050                && (info->i.type != CATCH_FRAME || exception == NULL)
3051                && info->i.type != STOP_FRAME) {
3052             frame += stack_frame_sizeW((StgClosure *)frame);
3053             info = get_ret_itbl((StgClosure *)frame);
3054         }
3055         
3056         switch (info->i.type) {
3057             
3058         case CATCH_FRAME:
3059             // If we find a CATCH_FRAME, and we've got an exception to raise,
3060             // then build the THUNK raise(exception), and leave it on
3061             // top of the CATCH_FRAME ready to enter.
3062             //
3063         {
3064 #ifdef PROFILING
3065             StgCatchFrame *cf = (StgCatchFrame *)frame;
3066 #endif
3067             StgClosure *raise;
3068             
3069             // we've got an exception to raise, so let's pass it to the
3070             // handler in this frame.
3071             //
3072             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3073             TICK_ALLOC_SE_THK(1,0);
3074             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3075             raise->payload[0] = exception;
3076             
3077             // throw away the stack from Sp up to the CATCH_FRAME.
3078             //
3079             sp = frame - 1;
3080             
3081             /* Ensure that async excpetions are blocked now, so we don't get
3082              * a surprise exception before we get around to executing the
3083              * handler.
3084              */
3085             if (tso->blocked_exceptions == NULL) {
3086                 tso->blocked_exceptions = END_TSO_QUEUE;
3087             }
3088             
3089             /* Put the newly-built THUNK on top of the stack, ready to execute
3090              * when the thread restarts.
3091              */
3092             sp[0] = (W_)raise;
3093             sp[-1] = (W_)&stg_enter_info;
3094             tso->sp = sp-1;
3095             tso->what_next = ThreadRunGHC;
3096             IF_DEBUG(sanity, checkTSO(tso));
3097             return;
3098         }
3099         
3100         case UPDATE_FRAME:
3101         {
3102             StgAP_STACK * ap;
3103             nat words;
3104             
3105             // First build an AP_STACK consisting of the stack chunk above the
3106             // current update frame, with the top word on the stack as the
3107             // fun field.
3108             //
3109             words = frame - sp - 1;
3110             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3111             
3112             ap->size = words;
3113             ap->fun  = (StgClosure *)sp[0];
3114             sp++;
3115             for(i=0; i < (nat)words; ++i) {
3116                 ap->payload[i] = (StgClosure *)*sp++;
3117             }
3118             
3119             SET_HDR(ap,&stg_AP_STACK_info,
3120                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3121             TICK_ALLOC_UP_THK(words+1,0);
3122             
3123             IF_DEBUG(scheduler,
3124                      fprintf(stderr,  "sched: Updating ");
3125                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3126                      fprintf(stderr,  " with ");
3127                      printObj((StgClosure *)ap);
3128                 );
3129
3130             // Replace the updatee with an indirection - happily
3131             // this will also wake up any threads currently
3132             // waiting on the result.
3133             //
3134             // Warning: if we're in a loop, more than one update frame on
3135             // the stack may point to the same object.  Be careful not to
3136             // overwrite an IND_OLDGEN in this case, because we'll screw
3137             // up the mutable lists.  To be on the safe side, don't
3138             // overwrite any kind of indirection at all.  See also
3139             // threadSqueezeStack in GC.c, where we have to make a similar
3140             // check.
3141             //
3142             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3143                 // revert the black hole
3144                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3145                                (StgClosure *)ap);
3146             }
3147             sp += sizeofW(StgUpdateFrame) - 1;
3148             sp[0] = (W_)ap; // push onto stack
3149             break;
3150         }
3151         
3152         case STOP_FRAME:
3153             // We've stripped the entire stack, the thread is now dead.
3154             sp += sizeofW(StgStopFrame);
3155             tso->what_next = ThreadKilled;
3156             tso->sp = sp;
3157             return;
3158             
3159         default:
3160             barf("raiseAsync");
3161         }
3162     }
3163     barf("raiseAsync");
3164 }
3165
3166 /* -----------------------------------------------------------------------------
3167    raiseExceptionHelper
3168    
3169    This function is called by the raise# primitve, just so that we can
3170    move some of the tricky bits of raising an exception from C-- into
3171    C.  Who knows, it might be a useful re-useable thing here too.
3172    -------------------------------------------------------------------------- */
3173
3174 StgWord
3175 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3176 {
3177     StgClosure *raise_closure = NULL;
3178     StgPtr p, next;
3179     StgRetInfoTable *info;
3180     //
3181     // This closure represents the expression 'raise# E' where E
3182     // is the exception raise.  It is used to overwrite all the
3183     // thunks which are currently under evaluataion.
3184     //
3185
3186     //    
3187     // LDV profiling: stg_raise_info has THUNK as its closure
3188     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3189     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3190     // 1 does not cause any problem unless profiling is performed.
3191     // However, when LDV profiling goes on, we need to linearly scan
3192     // small object pool, where raise_closure is stored, so we should
3193     // use MIN_UPD_SIZE.
3194     //
3195     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3196     //                                 sizeofW(StgClosure)+1);
3197     //
3198
3199     //
3200     // Walk up the stack, looking for the catch frame.  On the way,
3201     // we update any closures pointed to from update frames with the
3202     // raise closure that we just built.
3203     //
3204     p = tso->sp;
3205     while(1) {
3206         info = get_ret_itbl((StgClosure *)p);
3207         next = p + stack_frame_sizeW((StgClosure *)p);
3208         switch (info->i.type) {
3209             
3210         case UPDATE_FRAME:
3211             // Only create raise_closure if we need to.
3212             if (raise_closure == NULL) {
3213                 raise_closure = 
3214                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3215                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3216                 raise_closure->payload[0] = exception;
3217             }
3218             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3219             p = next;
3220             continue;
3221             
3222         case CATCH_FRAME:
3223             tso->sp = p;
3224             return CATCH_FRAME;
3225             
3226         case STOP_FRAME:
3227             tso->sp = p;
3228             return STOP_FRAME;
3229
3230         default:
3231             p = next; 
3232             continue;
3233         }
3234     }
3235 }
3236
3237 /* -----------------------------------------------------------------------------
3238    resurrectThreads is called after garbage collection on the list of
3239    threads found to be garbage.  Each of these threads will be woken
3240    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3241    on an MVar, or NonTermination if the thread was blocked on a Black
3242    Hole.
3243
3244    Locks: sched_mutex isn't held upon entry nor exit.
3245    -------------------------------------------------------------------------- */
3246
3247 void
3248 resurrectThreads( StgTSO *threads )
3249 {
3250   StgTSO *tso, *next;
3251
3252   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3253     next = tso->global_link;
3254     tso->global_link = all_threads;
3255     all_threads = tso;
3256     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3257
3258     switch (tso->why_blocked) {
3259     case BlockedOnMVar:
3260     case BlockedOnException:
3261       /* Called by GC - sched_mutex lock is currently held. */
3262       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3263       break;
3264     case BlockedOnBlackHole:
3265       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3266       break;
3267     case NotBlocked:
3268       /* This might happen if the thread was blocked on a black hole
3269        * belonging to a thread that we've just woken up (raiseAsync
3270        * can wake up threads, remember...).
3271        */
3272       continue;
3273     default:
3274       barf("resurrectThreads: thread blocked in a strange way");
3275     }
3276   }
3277 }
3278
3279 /* -----------------------------------------------------------------------------
3280  * Blackhole detection: if we reach a deadlock, test whether any
3281  * threads are blocked on themselves.  Any threads which are found to
3282  * be self-blocked get sent a NonTermination exception.
3283  *
3284  * This is only done in a deadlock situation in order to avoid
3285  * performance overhead in the normal case.
3286  *
3287  * Locks: sched_mutex is held upon entry and exit.
3288  * -------------------------------------------------------------------------- */
3289
3290 static void
3291 detectBlackHoles( void )
3292 {
3293     StgTSO *tso = all_threads;
3294     StgClosure *frame;
3295     StgClosure *blocked_on;
3296     StgRetInfoTable *info;
3297
3298     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3299
3300         while (tso->what_next == ThreadRelocated) {
3301             tso = tso->link;
3302             ASSERT(get_itbl(tso)->type == TSO);
3303         }
3304       
3305         if (tso->why_blocked != BlockedOnBlackHole) {
3306             continue;
3307         }
3308         blocked_on = tso->block_info.closure;
3309
3310         frame = (StgClosure *)tso->sp;
3311
3312         while(1) {
3313             info = get_ret_itbl(frame);
3314             switch (info->i.type) {
3315             case UPDATE_FRAME:
3316                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3317                     /* We are blocking on one of our own computations, so
3318                      * send this thread the NonTermination exception.  
3319                      */
3320                     IF_DEBUG(scheduler, 
3321                              sched_belch("thread %d is blocked on itself", tso->id));
3322                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3323                     goto done;
3324                 }
3325                 
3326                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3327                 continue;
3328
3329             case STOP_FRAME:
3330                 goto done;
3331
3332                 // normal stack frames; do nothing except advance the pointer
3333             default:
3334                 (StgPtr)frame += stack_frame_sizeW(frame);
3335             }
3336         }   
3337         done: ;
3338     }
3339 }
3340
3341 /* ----------------------------------------------------------------------------
3342  * Debugging: why is a thread blocked
3343  * [Also provides useful information when debugging threaded programs
3344  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3345    ------------------------------------------------------------------------- */
3346
3347 static
3348 void
3349 printThreadBlockage(StgTSO *tso)
3350 {
3351   switch (tso->why_blocked) {
3352   case BlockedOnRead:
3353     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3354     break;
3355   case BlockedOnWrite:
3356     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3357     break;
3358 #if defined(mingw32_TARGET_OS)
3359     case BlockedOnDoProc:
3360     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3361     break;
3362 #endif
3363   case BlockedOnDelay:
3364     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3365     break;
3366   case BlockedOnMVar:
3367     fprintf(stderr,"is blocked on an MVar");
3368     break;
3369   case BlockedOnException:
3370     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3371             tso->block_info.tso->id);
3372     break;
3373   case BlockedOnBlackHole:
3374     fprintf(stderr,"is blocked on a black hole");
3375     break;
3376   case NotBlocked:
3377     fprintf(stderr,"is not blocked");
3378     break;
3379 #if defined(PAR)
3380   case BlockedOnGA:
3381     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3382             tso->block_info.closure, info_type(tso->block_info.closure));
3383     break;
3384   case BlockedOnGA_NoSend:
3385     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3386             tso->block_info.closure, info_type(tso->block_info.closure));
3387     break;
3388 #endif
3389   case BlockedOnCCall:
3390     fprintf(stderr,"is blocked on an external call");
3391     break;
3392   case BlockedOnCCall_NoUnblockExc:
3393     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3394     break;
3395   default:
3396     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3397          tso->why_blocked, tso->id, tso);
3398   }
3399 }
3400
3401 static
3402 void
3403 printThreadStatus(StgTSO *tso)
3404 {
3405   switch (tso->what_next) {
3406   case ThreadKilled:
3407     fprintf(stderr,"has been killed");
3408     break;
3409   case ThreadComplete:
3410     fprintf(stderr,"has completed");
3411     break;
3412   default:
3413     printThreadBlockage(tso);
3414   }
3415 }
3416
3417 void
3418 printAllThreads(void)
3419 {
3420   StgTSO *t;
3421   void *label;
3422
3423 # if defined(GRAN)
3424   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3425   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3426                        time_string, rtsFalse/*no commas!*/);
3427
3428   fprintf(stderr, "all threads at [%s]:\n", time_string);
3429 # elif defined(PAR)
3430   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3431   ullong_format_string(CURRENT_TIME,
3432                        time_string, rtsFalse/*no commas!*/);
3433
3434   fprintf(stderr,"all threads at [%s]:\n", time_string);
3435 # else
3436   fprintf(stderr,"all threads:\n");
3437 # endif
3438
3439   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3440     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3441     label = lookupThreadLabel(t->id);
3442     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3443     printThreadStatus(t);
3444     fprintf(stderr,"\n");
3445   }
3446 }
3447     
3448 #ifdef DEBUG
3449
3450 /* 
3451    Print a whole blocking queue attached to node (debugging only).
3452 */
3453 # if defined(PAR)
3454 void 
3455 print_bq (StgClosure *node)
3456 {
3457   StgBlockingQueueElement *bqe;
3458   StgTSO *tso;
3459   rtsBool end;
3460
3461   fprintf(stderr,"## BQ of closure %p (%s): ",
3462           node, info_type(node));
3463
3464   /* should cover all closures that may have a blocking queue */
3465   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3466          get_itbl(node)->type == FETCH_ME_BQ ||
3467          get_itbl(node)->type == RBH ||
3468          get_itbl(node)->type == MVAR);
3469     
3470   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3471
3472   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3473 }
3474
3475 /* 
3476    Print a whole blocking queue starting with the element bqe.
3477 */
3478 void 
3479 print_bqe (StgBlockingQueueElement *bqe)
3480 {
3481   rtsBool end;
3482
3483   /* 
3484      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3485   */
3486   for (end = (bqe==END_BQ_QUEUE);
3487        !end; // iterate until bqe points to a CONSTR
3488        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3489        bqe = end ? END_BQ_QUEUE : bqe->link) {
3490     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3491     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3492     /* types of closures that may appear in a blocking queue */
3493     ASSERT(get_itbl(bqe)->type == TSO ||           
3494            get_itbl(bqe)->type == BLOCKED_FETCH || 
3495            get_itbl(bqe)->type == CONSTR); 
3496     /* only BQs of an RBH end with an RBH_Save closure */
3497     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3498
3499     switch (get_itbl(bqe)->type) {
3500     case TSO:
3501       fprintf(stderr," TSO %u (%x),",
3502               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3503       break;
3504     case BLOCKED_FETCH:
3505       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3506               ((StgBlockedFetch *)bqe)->node, 
3507               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3508               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3509               ((StgBlockedFetch *)bqe)->ga.weight);
3510       break;
3511     case CONSTR:
3512       fprintf(stderr," %s (IP %p),",
3513               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3514                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3515                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3516                "RBH_Save_?"), get_itbl(bqe));
3517       break;
3518     default:
3519       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3520            info_type((StgClosure *)bqe)); // , node, info_type(node));
3521       break;
3522     }
3523   } /* for */
3524   fputc('\n', stderr);
3525 }
3526 # elif defined(GRAN)
3527 void 
3528 print_bq (StgClosure *node)
3529 {
3530   StgBlockingQueueElement *bqe;
3531   PEs node_loc, tso_loc;
3532   rtsBool end;
3533
3534   /* should cover all closures that may have a blocking queue */
3535   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3536          get_itbl(node)->type == FETCH_ME_BQ ||
3537          get_itbl(node)->type == RBH);
3538     
3539   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3540   node_loc = where_is(node);
3541
3542   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3543           node, info_type(node), node_loc);
3544
3545   /* 
3546      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3547   */
3548   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3549        !end; // iterate until bqe points to a CONSTR
3550        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3551     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3552     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3553     /* types of closures that may appear in a blocking queue */
3554     ASSERT(get_itbl(bqe)->type == TSO ||           
3555            get_itbl(bqe)->type == CONSTR); 
3556     /* only BQs of an RBH end with an RBH_Save closure */
3557     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3558
3559     tso_loc = where_is((StgClosure *)bqe);
3560     switch (get_itbl(bqe)->type) {
3561     case TSO:
3562       fprintf(stderr," TSO %d (%p) on [PE %d],",
3563               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3564       break;
3565     case CONSTR:
3566       fprintf(stderr," %s (IP %p),",
3567               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3568                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3569                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3570                "RBH_Save_?"), get_itbl(bqe));
3571       break;
3572     default:
3573       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3574            info_type((StgClosure *)bqe), node, info_type(node));
3575       break;
3576     }
3577   } /* for */
3578   fputc('\n', stderr);
3579 }
3580 #else
3581 /* 
3582    Nice and easy: only TSOs on the blocking queue
3583 */
3584 void 
3585 print_bq (StgClosure *node)
3586 {
3587   StgTSO *tso;
3588
3589   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3590   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3591        tso != END_TSO_QUEUE; 
3592        tso=tso->link) {
3593     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3594     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3595     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3596   }
3597   fputc('\n', stderr);
3598 }
3599 # endif
3600
3601 #if defined(PAR)
3602 static nat
3603 run_queue_len(void)
3604 {
3605   nat i;
3606   StgTSO *tso;
3607
3608   for (i=0, tso=run_queue_hd; 
3609        tso != END_TSO_QUEUE;
3610        i++, tso=tso->link)
3611     /* nothing */
3612
3613   return i;
3614 }
3615 #endif
3616
3617 void
3618 sched_belch(char *s, ...)
3619 {
3620   va_list ap;
3621   va_start(ap,s);
3622 #ifdef RTS_SUPPORTS_THREADS
3623   fprintf(stderr, "sched (task %p): ", osThreadId());
3624 #elif defined(PAR)
3625   fprintf(stderr, "== ");
3626 #else
3627   fprintf(stderr, "sched: ");
3628 #endif
3629   vfprintf(stderr, s, ap);
3630   fprintf(stderr, "\n");
3631   fflush(stderr);
3632   va_end(ap);
3633 }
3634
3635 #endif /* DEBUG */