6296990cefd83d1f2cf0836f1e739f8aaef9519c
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.195 2004/04/13 13:43:11 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2003
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 /* 
21  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22
23    The main scheduling loop in GUM iterates until a finish message is received.
24    In that case a global flag @receivedFinish@ is set and this instance of
25    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
26    for the handling of incoming messages, such as PP_FINISH.
27    Note that in the parallel case we have a system manager that coordinates
28    different PEs, each of which are running one instance of the RTS.
29    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
30    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31
32  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33
34    The main scheduling code in GranSim is quite different from that in std
35    (concurrent) Haskell: while concurrent Haskell just iterates over the
36    threads in the runnable queue, GranSim is event driven, i.e. it iterates
37    over the events in the global event queue.  -- HWL
38 */
39
40 #include "PosixSource.h"
41 #include "Rts.h"
42 #include "SchedAPI.h"
43 #include "RtsUtils.h"
44 #include "RtsFlags.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "StgStartup.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Storage.h"
53 #include "Interpreter.h"
54 #include "Exception.h"
55 #include "Printer.h"
56 #include "Signals.h"
57 #include "Sanity.h"
58 #include "Stats.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #ifdef PROFILING
63 #include "Proftimer.h"
64 #include "ProfHeap.h"
65 #endif
66 #if defined(GRAN) || defined(PAR)
67 # include "GranSimRts.h"
68 # include "GranSim.h"
69 # include "ParallelRts.h"
70 # include "Parallel.h"
71 # include "ParallelDebug.h"
72 # include "FetchMe.h"
73 # include "HLC.h"
74 #endif
75 #include "Sparks.h"
76 #include "Capability.h"
77 #include "OSThreads.h"
78 #include  "Task.h"
79
80 #ifdef HAVE_SYS_TYPES_H
81 #include <sys/types.h>
82 #endif
83 #ifdef HAVE_UNISTD_H
84 #include <unistd.h>
85 #endif
86
87 #include <string.h>
88 #include <stdlib.h>
89 #include <stdarg.h>
90
91 #ifdef HAVE_ERRNO_H
92 #include <errno.h>
93 #endif
94
95 #ifdef THREADED_RTS
96 #define USED_IN_THREADED_RTS
97 #else
98 #define USED_IN_THREADED_RTS STG_UNUSED
99 #endif
100
101 #ifdef RTS_SUPPORTS_THREADS
102 #define USED_WHEN_RTS_SUPPORTS_THREADS
103 #else
104 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
105 #endif
106
107 /* Main thread queue.
108  * Locks required: sched_mutex.
109  */
110 StgMainThread *main_threads = NULL;
111
112 /* Thread queues.
113  * Locks required: sched_mutex.
114  */
115 #if defined(GRAN)
116
117 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
118 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
119
120 /* 
121    In GranSim we have a runnable and a blocked queue for each processor.
122    In order to minimise code changes new arrays run_queue_hds/tls
123    are created. run_queue_hd is then a short cut (macro) for
124    run_queue_hds[CurrentProc] (see GranSim.h).
125    -- HWL
126 */
127 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
128 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
129 StgTSO *ccalling_threadss[MAX_PROC];
130 /* We use the same global list of threads (all_threads) in GranSim as in
131    the std RTS (i.e. we are cheating). However, we don't use this list in
132    the GranSim specific code at the moment (so we are only potentially
133    cheating).  */
134
135 #else /* !GRAN */
136
137 StgTSO *run_queue_hd = NULL;
138 StgTSO *run_queue_tl = NULL;
139 StgTSO *blocked_queue_hd = NULL;
140 StgTSO *blocked_queue_tl = NULL;
141 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
142
143 #endif
144
145 /* Linked list of all threads.
146  * Used for detecting garbage collected threads.
147  */
148 StgTSO *all_threads = NULL;
149
150 /* When a thread performs a safe C call (_ccall_GC, using old
151  * terminology), it gets put on the suspended_ccalling_threads
152  * list. Used by the garbage collector.
153  */
154 static StgTSO *suspended_ccalling_threads;
155
156 static StgTSO *threadStackOverflow(StgTSO *tso);
157
158 /* KH: The following two flags are shared memory locations.  There is no need
159        to lock them, since they are only unset at the end of a scheduler
160        operation.
161 */
162
163 /* flag set by signal handler to precipitate a context switch */
164 nat context_switch = 0;
165
166 /* if this flag is set as well, give up execution */
167 rtsBool interrupted = rtsFalse;
168
169 /* Next thread ID to allocate.
170  * Locks required: thread_id_mutex
171  */
172 static StgThreadID next_thread_id = 1;
173
174 /*
175  * Pointers to the state of the current thread.
176  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
177  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
178  */
179  
180 /* The smallest stack size that makes any sense is:
181  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
182  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
183  *  + 1                       (the closure to enter)
184  *  + 1                       (stg_ap_v_ret)
185  *  + 1                       (spare slot req'd by stg_ap_v_ret)
186  *
187  * A thread with this stack will bomb immediately with a stack
188  * overflow, which will increase its stack size.  
189  */
190
191 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
192
193
194 #if defined(GRAN)
195 StgTSO *CurrentTSO;
196 #endif
197
198 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
199  *  exists - earlier gccs apparently didn't.
200  *  -= chak
201  */
202 StgTSO dummy_tso;
203
204 static rtsBool ready_to_gc;
205
206 /*
207  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
208  * in an MT setting, needed to signal that a worker thread shouldn't hang around
209  * in the scheduler when it is out of work.
210  */
211 static rtsBool shutting_down_scheduler = rtsFalse;
212
213 void            addToBlockedQueue ( StgTSO *tso );
214
215 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
216        void     interruptStgRts   ( void );
217
218 static void     detectBlackHoles  ( void );
219
220 #if defined(RTS_SUPPORTS_THREADS)
221 /* ToDo: carefully document the invariants that go together
222  *       with these synchronisation objects.
223  */
224 Mutex     sched_mutex       = INIT_MUTEX_VAR;
225 Mutex     term_mutex        = INIT_MUTEX_VAR;
226
227 #endif /* RTS_SUPPORTS_THREADS */
228
229 #if defined(PAR)
230 StgTSO *LastTSO;
231 rtsTime TimeOfLastYield;
232 rtsBool emitSchedule = rtsTrue;
233 #endif
234
235 #if DEBUG
236 static char *whatNext_strs[] = {
237   "ThreadRunGHC",
238   "ThreadInterpret",
239   "ThreadKilled",
240   "ThreadRelocated",
241   "ThreadComplete"
242 };
243 #endif
244
245 #if defined(PAR)
246 StgTSO * createSparkThread(rtsSpark spark);
247 StgTSO * activateSpark (rtsSpark spark);  
248 #endif
249
250 /* ----------------------------------------------------------------------------
251  * Starting Tasks
252  * ------------------------------------------------------------------------- */
253
254 #if defined(RTS_SUPPORTS_THREADS)
255 static rtsBool startingWorkerThread = rtsFalse;
256
257 static void taskStart(void);
258 static void
259 taskStart(void)
260 {
261   ACQUIRE_LOCK(&sched_mutex);
262   startingWorkerThread = rtsFalse;
263   schedule(NULL,NULL);
264   RELEASE_LOCK(&sched_mutex);
265 }
266
267 void
268 startSchedulerTaskIfNecessary(void)
269 {
270   if(run_queue_hd != END_TSO_QUEUE
271     || blocked_queue_hd != END_TSO_QUEUE
272     || sleeping_queue != END_TSO_QUEUE)
273   {
274     if(!startingWorkerThread)
275     { // we don't want to start another worker thread
276       // just because the last one hasn't yet reached the
277       // "waiting for capability" state
278       startingWorkerThread = rtsTrue;
279       startTask(taskStart);
280     }
281   }
282 }
283 #endif
284
285 /* ---------------------------------------------------------------------------
286    Main scheduling loop.
287
288    We use round-robin scheduling, each thread returning to the
289    scheduler loop when one of these conditions is detected:
290
291       * out of heap space
292       * timer expires (thread yields)
293       * thread blocks
294       * thread ends
295       * stack overflow
296
297    Locking notes:  we acquire the scheduler lock once at the beginning
298    of the scheduler loop, and release it when
299     
300       * running a thread, or
301       * waiting for work, or
302       * waiting for a GC to complete.
303
304    GRAN version:
305      In a GranSim setup this loop iterates over the global event queue.
306      This revolves around the global event queue, which determines what 
307      to do next. Therefore, it's more complicated than either the 
308      concurrent or the parallel (GUM) setup.
309
310    GUM version:
311      GUM iterates over incoming messages.
312      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
313      and sends out a fish whenever it has nothing to do; in-between
314      doing the actual reductions (shared code below) it processes the
315      incoming messages and deals with delayed operations 
316      (see PendingFetches).
317      This is not the ugliest code you could imagine, but it's bloody close.
318
319    ------------------------------------------------------------------------ */
320 static void
321 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
322           Capability *initialCapability )
323 {
324   StgTSO *t;
325   Capability *cap;
326   StgThreadReturnCode ret;
327 #if defined(GRAN)
328   rtsEvent *event;
329 #elif defined(PAR)
330   StgSparkPool *pool;
331   rtsSpark spark;
332   StgTSO *tso;
333   GlobalTaskId pe;
334   rtsBool receivedFinish = rtsFalse;
335 # if defined(DEBUG)
336   nat tp_size, sp_size; // stats only
337 # endif
338 #endif
339   rtsBool was_interrupted = rtsFalse;
340   StgTSOWhatNext prev_what_next;
341   
342   // Pre-condition: sched_mutex is held.
343   // We might have a capability, passed in as initialCapability.
344   cap = initialCapability;
345
346 #if defined(RTS_SUPPORTS_THREADS)
347   //
348   // in the threaded case, the capability is either passed in via the
349   // initialCapability parameter, or initialized inside the scheduler
350   // loop 
351   //
352   IF_DEBUG(scheduler,
353            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
354                        mainThread, initialCapability);
355       );
356 #else
357   // simply initialise it in the non-threaded case
358   grabCapability(&cap);
359 #endif
360
361 #if defined(GRAN)
362   /* set up first event to get things going */
363   /* ToDo: assign costs for system setup and init MainTSO ! */
364   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
365             ContinueThread, 
366             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
367
368   IF_DEBUG(gran,
369            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
370            G_TSO(CurrentTSO, 5));
371
372   if (RtsFlags.GranFlags.Light) {
373     /* Save current time; GranSim Light only */
374     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
375   }      
376
377   event = get_next_event();
378
379   while (event!=(rtsEvent*)NULL) {
380     /* Choose the processor with the next event */
381     CurrentProc = event->proc;
382     CurrentTSO = event->tso;
383
384 #elif defined(PAR)
385
386   while (!receivedFinish) {    /* set by processMessages */
387                                /* when receiving PP_FINISH message         */ 
388
389 #else // everything except GRAN and PAR
390
391   while (1) {
392
393 #endif
394
395      IF_DEBUG(scheduler, printAllThreads());
396
397 #if defined(RTS_SUPPORTS_THREADS)
398       // Yield the capability to higher-priority tasks if necessary.
399       //
400       if (cap != NULL) {
401           yieldCapability(&cap);
402       }
403
404       // If we do not currently hold a capability, we wait for one
405       //
406       if (cap == NULL) {
407           waitForCapability(&sched_mutex, &cap,
408                             mainThread ? &mainThread->bound_thread_cond : NULL);
409       }
410
411       // We now have a capability...
412 #endif
413
414     //
415     // If we're interrupted (the user pressed ^C, or some other
416     // termination condition occurred), kill all the currently running
417     // threads.
418     //
419     if (interrupted) {
420         IF_DEBUG(scheduler, sched_belch("interrupted"));
421         interrupted = rtsFalse;
422         was_interrupted = rtsTrue;
423 #if defined(RTS_SUPPORTS_THREADS)
424         // In the threaded RTS, deadlock detection doesn't work,
425         // so just exit right away.
426         prog_belch("interrupted");
427         releaseCapability(cap);
428         RELEASE_LOCK(&sched_mutex);
429         shutdownHaskellAndExit(EXIT_SUCCESS);
430 #else
431         deleteAllThreads();
432 #endif
433     }
434
435 #if defined(RTS_USER_SIGNALS)
436     // check for signals each time around the scheduler
437     if (signals_pending()) {
438       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
439       startSignalHandlers();
440       ACQUIRE_LOCK(&sched_mutex);
441     }
442 #endif
443
444     //
445     // Check whether any waiting threads need to be woken up.  If the
446     // run queue is empty, and there are no other tasks running, we
447     // can wait indefinitely for something to happen.
448     //
449     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
450 #if defined(RTS_SUPPORTS_THREADS)
451                 || EMPTY_RUN_QUEUE()
452 #endif
453         )
454     {
455       awaitEvent( EMPTY_RUN_QUEUE() );
456     }
457     // we can be interrupted while waiting for I/O...
458     if (interrupted) continue;
459
460     /* 
461      * Detect deadlock: when we have no threads to run, there are no
462      * threads waiting on I/O or sleeping, and all the other tasks are
463      * waiting for work, we must have a deadlock of some description.
464      *
465      * We first try to find threads blocked on themselves (ie. black
466      * holes), and generate NonTermination exceptions where necessary.
467      *
468      * If no threads are black holed, we have a deadlock situation, so
469      * inform all the main threads.
470      */
471 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
472     if (   EMPTY_THREAD_QUEUES() )
473     {
474         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
475         // Garbage collection can release some new threads due to
476         // either (a) finalizers or (b) threads resurrected because
477         // they are about to be send BlockedOnDeadMVar.  Any threads
478         // thus released will be immediately runnable.
479         GarbageCollect(GetRoots,rtsTrue);
480
481         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
482
483         IF_DEBUG(scheduler, 
484                  sched_belch("still deadlocked, checking for black holes..."));
485         detectBlackHoles();
486
487         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
488
489 #if defined(RTS_USER_SIGNALS)
490         /* If we have user-installed signal handlers, then wait
491          * for signals to arrive rather then bombing out with a
492          * deadlock.
493          */
494         if ( anyUserHandlers() ) {
495             IF_DEBUG(scheduler, 
496                      sched_belch("still deadlocked, waiting for signals..."));
497
498             awaitUserSignals();
499
500             // we might be interrupted...
501             if (interrupted) { continue; }
502
503             if (signals_pending()) {
504                 RELEASE_LOCK(&sched_mutex);
505                 startSignalHandlers();
506                 ACQUIRE_LOCK(&sched_mutex);
507             }
508             ASSERT(!EMPTY_RUN_QUEUE());
509             goto not_deadlocked;
510         }
511 #endif
512
513         /* Probably a real deadlock.  Send the current main thread the
514          * Deadlock exception (or in the SMP build, send *all* main
515          * threads the deadlock exception, since none of them can make
516          * progress).
517          */
518         {
519             StgMainThread *m;
520             m = main_threads;
521             switch (m->tso->why_blocked) {
522             case BlockedOnBlackHole:
523             case BlockedOnException:
524             case BlockedOnMVar:
525                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
526                 break;
527             default:
528                 barf("deadlock: main thread blocked in a strange way");
529             }
530         }
531     }
532   not_deadlocked:
533
534 #elif defined(RTS_SUPPORTS_THREADS)
535     // ToDo: add deadlock detection in threaded RTS
536 #elif defined(PAR)
537     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
538 #endif
539
540 #if defined(RTS_SUPPORTS_THREADS)
541     if ( EMPTY_RUN_QUEUE() ) {
542         continue; // nothing to do
543     }
544 #endif
545
546 #if defined(GRAN)
547     if (RtsFlags.GranFlags.Light)
548       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
549
550     /* adjust time based on time-stamp */
551     if (event->time > CurrentTime[CurrentProc] &&
552         event->evttype != ContinueThread)
553       CurrentTime[CurrentProc] = event->time;
554     
555     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
556     if (!RtsFlags.GranFlags.Light)
557       handleIdlePEs();
558
559     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
560
561     /* main event dispatcher in GranSim */
562     switch (event->evttype) {
563       /* Should just be continuing execution */
564     case ContinueThread:
565       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
566       /* ToDo: check assertion
567       ASSERT(run_queue_hd != (StgTSO*)NULL &&
568              run_queue_hd != END_TSO_QUEUE);
569       */
570       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
571       if (!RtsFlags.GranFlags.DoAsyncFetch &&
572           procStatus[CurrentProc]==Fetching) {
573         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
574               CurrentTSO->id, CurrentTSO, CurrentProc);
575         goto next_thread;
576       } 
577       /* Ignore ContinueThreads for completed threads */
578       if (CurrentTSO->what_next == ThreadComplete) {
579         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
580               CurrentTSO->id, CurrentTSO, CurrentProc);
581         goto next_thread;
582       } 
583       /* Ignore ContinueThreads for threads that are being migrated */
584       if (PROCS(CurrentTSO)==Nowhere) { 
585         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
586               CurrentTSO->id, CurrentTSO, CurrentProc);
587         goto next_thread;
588       }
589       /* The thread should be at the beginning of the run queue */
590       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
591         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
592               CurrentTSO->id, CurrentTSO, CurrentProc);
593         break; // run the thread anyway
594       }
595       /*
596       new_event(proc, proc, CurrentTime[proc],
597                 FindWork,
598                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
599       goto next_thread; 
600       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
601       break; // now actually run the thread; DaH Qu'vam yImuHbej 
602
603     case FetchNode:
604       do_the_fetchnode(event);
605       goto next_thread;             /* handle next event in event queue  */
606       
607     case GlobalBlock:
608       do_the_globalblock(event);
609       goto next_thread;             /* handle next event in event queue  */
610       
611     case FetchReply:
612       do_the_fetchreply(event);
613       goto next_thread;             /* handle next event in event queue  */
614       
615     case UnblockThread:   /* Move from the blocked queue to the tail of */
616       do_the_unblock(event);
617       goto next_thread;             /* handle next event in event queue  */
618       
619     case ResumeThread:  /* Move from the blocked queue to the tail of */
620       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
621       event->tso->gran.blocktime += 
622         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
623       do_the_startthread(event);
624       goto next_thread;             /* handle next event in event queue  */
625       
626     case StartThread:
627       do_the_startthread(event);
628       goto next_thread;             /* handle next event in event queue  */
629       
630     case MoveThread:
631       do_the_movethread(event);
632       goto next_thread;             /* handle next event in event queue  */
633       
634     case MoveSpark:
635       do_the_movespark(event);
636       goto next_thread;             /* handle next event in event queue  */
637       
638     case FindWork:
639       do_the_findwork(event);
640       goto next_thread;             /* handle next event in event queue  */
641       
642     default:
643       barf("Illegal event type %u\n", event->evttype);
644     }  /* switch */
645     
646     /* This point was scheduler_loop in the old RTS */
647
648     IF_DEBUG(gran, belch("GRAN: after main switch"));
649
650     TimeOfLastEvent = CurrentTime[CurrentProc];
651     TimeOfNextEvent = get_time_of_next_event();
652     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
653     // CurrentTSO = ThreadQueueHd;
654
655     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
656                          TimeOfNextEvent));
657
658     if (RtsFlags.GranFlags.Light) 
659       GranSimLight_leave_system(event, &ActiveTSO); 
660
661     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
662
663     IF_DEBUG(gran, 
664              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
665
666     /* in a GranSim setup the TSO stays on the run queue */
667     t = CurrentTSO;
668     /* Take a thread from the run queue. */
669     POP_RUN_QUEUE(t); // take_off_run_queue(t);
670
671     IF_DEBUG(gran, 
672              fprintf(stderr, "GRAN: About to run current thread, which is\n");
673              G_TSO(t,5));
674
675     context_switch = 0; // turned on via GranYield, checking events and time slice
676
677     IF_DEBUG(gran, 
678              DumpGranEvent(GR_SCHEDULE, t));
679
680     procStatus[CurrentProc] = Busy;
681
682 #elif defined(PAR)
683     if (PendingFetches != END_BF_QUEUE) {
684         processFetches();
685     }
686
687     /* ToDo: phps merge with spark activation above */
688     /* check whether we have local work and send requests if we have none */
689     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
690       /* :-[  no local threads => look out for local sparks */
691       /* the spark pool for the current PE */
692       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
693       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
694           pool->hd < pool->tl) {
695         /* 
696          * ToDo: add GC code check that we really have enough heap afterwards!!
697          * Old comment:
698          * If we're here (no runnable threads) and we have pending
699          * sparks, we must have a space problem.  Get enough space
700          * to turn one of those pending sparks into a
701          * thread... 
702          */
703
704         spark = findSpark(rtsFalse);                /* get a spark */
705         if (spark != (rtsSpark) NULL) {
706           tso = activateSpark(spark);       /* turn the spark into a thread */
707           IF_PAR_DEBUG(schedule,
708                        belch("==== schedule: Created TSO %d (%p); %d threads active",
709                              tso->id, tso, advisory_thread_count));
710
711           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
712             belch("==^^ failed to activate spark");
713             goto next_thread;
714           }               /* otherwise fall through & pick-up new tso */
715         } else {
716           IF_PAR_DEBUG(verbose,
717                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
718                              spark_queue_len(pool)));
719           goto next_thread;
720         }
721       }
722
723       /* If we still have no work we need to send a FISH to get a spark
724          from another PE 
725       */
726       if (EMPTY_RUN_QUEUE()) {
727       /* =8-[  no local sparks => look for work on other PEs */
728         /*
729          * We really have absolutely no work.  Send out a fish
730          * (there may be some out there already), and wait for
731          * something to arrive.  We clearly can't run any threads
732          * until a SCHEDULE or RESUME arrives, and so that's what
733          * we're hoping to see.  (Of course, we still have to
734          * respond to other types of messages.)
735          */
736         TIME now = msTime() /*CURRENT_TIME*/;
737         IF_PAR_DEBUG(verbose, 
738                      belch("--  now=%ld", now));
739         IF_PAR_DEBUG(verbose,
740                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
741                          (last_fish_arrived_at!=0 &&
742                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
743                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
744                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
745                              last_fish_arrived_at,
746                              RtsFlags.ParFlags.fishDelay, now);
747                      });
748         
749         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
750             (last_fish_arrived_at==0 ||
751              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
752           /* outstandingFishes is set in sendFish, processFish;
753              avoid flooding system with fishes via delay */
754           pe = choosePE();
755           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
756                    NEW_FISH_HUNGER);
757
758           // Global statistics: count no. of fishes
759           if (RtsFlags.ParFlags.ParStats.Global &&
760               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
761             globalParStats.tot_fish_mess++;
762           }
763         }
764       
765         receivedFinish = processMessages();
766         goto next_thread;
767       }
768     } else if (PacketsWaiting()) {  /* Look for incoming messages */
769       receivedFinish = processMessages();
770     }
771
772     /* Now we are sure that we have some work available */
773     ASSERT(run_queue_hd != END_TSO_QUEUE);
774
775     /* Take a thread from the run queue, if we have work */
776     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
777     IF_DEBUG(sanity,checkTSO(t));
778
779     /* ToDo: write something to the log-file
780     if (RTSflags.ParFlags.granSimStats && !sameThread)
781         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
782
783     CurrentTSO = t;
784     */
785     /* the spark pool for the current PE */
786     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
787
788     IF_DEBUG(scheduler, 
789              belch("--=^ %d threads, %d sparks on [%#x]", 
790                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
791
792 # if 1
793     if (0 && RtsFlags.ParFlags.ParStats.Full && 
794         t && LastTSO && t->id != LastTSO->id && 
795         LastTSO->why_blocked == NotBlocked && 
796         LastTSO->what_next != ThreadComplete) {
797       // if previously scheduled TSO not blocked we have to record the context switch
798       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
799                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
800     }
801
802     if (RtsFlags.ParFlags.ParStats.Full && 
803         (emitSchedule /* forced emit */ ||
804         (t && LastTSO && t->id != LastTSO->id))) {
805       /* 
806          we are running a different TSO, so write a schedule event to log file
807          NB: If we use fair scheduling we also have to write  a deschedule 
808              event for LastTSO; with unfair scheduling we know that the
809              previous tso has blocked whenever we switch to another tso, so
810              we don't need it in GUM for now
811       */
812       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
813                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
814       emitSchedule = rtsFalse;
815     }
816      
817 # endif
818 #else /* !GRAN && !PAR */
819   
820     // grab a thread from the run queue
821     ASSERT(run_queue_hd != END_TSO_QUEUE);
822     POP_RUN_QUEUE(t);
823
824     // Sanity check the thread we're about to run.  This can be
825     // expensive if there is lots of thread switching going on...
826     IF_DEBUG(sanity,checkTSO(t));
827 #endif
828
829 #ifdef THREADED_RTS
830     {
831       StgMainThread *m = t->main;
832       
833       if(m)
834       {
835         if(m == mainThread)
836         {
837           IF_DEBUG(scheduler,
838             sched_belch("### Running thread %d in bound thread", t->id));
839           // yes, the Haskell thread is bound to the current native thread
840         }
841         else
842         {
843           IF_DEBUG(scheduler,
844             sched_belch("### thread %d bound to another OS thread", t->id));
845           // no, bound to a different Haskell thread: pass to that thread
846           PUSH_ON_RUN_QUEUE(t);
847           passCapability(&m->bound_thread_cond);
848           continue;
849         }
850       }
851       else
852       {
853         if(mainThread != NULL)
854         // The thread we want to run is bound.
855         {
856           IF_DEBUG(scheduler,
857             sched_belch("### this OS thread cannot run thread %d", t->id));
858           // no, the current native thread is bound to a different
859           // Haskell thread, so pass it to any worker thread
860           PUSH_ON_RUN_QUEUE(t);
861           passCapabilityToWorker();
862           continue; 
863         }
864       }
865     }
866 #endif
867
868     cap->r.rCurrentTSO = t;
869     
870     /* context switches are now initiated by the timer signal, unless
871      * the user specified "context switch as often as possible", with
872      * +RTS -C0
873      */
874     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
875          && (run_queue_hd != END_TSO_QUEUE
876              || blocked_queue_hd != END_TSO_QUEUE
877              || sleeping_queue != END_TSO_QUEUE)))
878         context_switch = 1;
879     else
880         context_switch = 0;
881
882 run_thread:
883
884     RELEASE_LOCK(&sched_mutex);
885
886     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
887                               t->id, whatNext_strs[t->what_next]));
888
889 #ifdef PROFILING
890     startHeapProfTimer();
891 #endif
892
893     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
894     /* Run the current thread 
895      */
896     prev_what_next = t->what_next;
897     switch (prev_what_next) {
898     case ThreadKilled:
899     case ThreadComplete:
900         /* Thread already finished, return to scheduler. */
901         ret = ThreadFinished;
902         break;
903     case ThreadRunGHC:
904         errno = t->saved_errno;
905         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
906         t->saved_errno = errno;
907         break;
908     case ThreadInterpret:
909         ret = interpretBCO(cap);
910         break;
911     default:
912       barf("schedule: invalid what_next field");
913     }
914     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
915     
916     /* Costs for the scheduler are assigned to CCS_SYSTEM */
917 #ifdef PROFILING
918     stopHeapProfTimer();
919     CCCS = CCS_SYSTEM;
920 #endif
921     
922     ACQUIRE_LOCK(&sched_mutex);
923     
924 #ifdef RTS_SUPPORTS_THREADS
925     IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
926 #elif !defined(GRAN) && !defined(PAR)
927     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
928 #endif
929     t = cap->r.rCurrentTSO;
930     
931 #if defined(PAR)
932     /* HACK 675: if the last thread didn't yield, make sure to print a 
933        SCHEDULE event to the log file when StgRunning the next thread, even
934        if it is the same one as before */
935     LastTSO = t; 
936     TimeOfLastYield = CURRENT_TIME;
937 #endif
938
939     switch (ret) {
940     case HeapOverflow:
941 #if defined(GRAN)
942       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
943       globalGranStats.tot_heapover++;
944 #elif defined(PAR)
945       globalParStats.tot_heapover++;
946 #endif
947
948       // did the task ask for a large block?
949       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
950           // if so, get one and push it on the front of the nursery.
951           bdescr *bd;
952           nat blocks;
953           
954           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
955
956           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
957                                    t->id, whatNext_strs[t->what_next], blocks));
958
959           // don't do this if it would push us over the
960           // alloc_blocks_lim limit; we'll GC first.
961           if (alloc_blocks + blocks < alloc_blocks_lim) {
962
963               alloc_blocks += blocks;
964               bd = allocGroup( blocks );
965
966               // link the new group into the list
967               bd->link = cap->r.rCurrentNursery;
968               bd->u.back = cap->r.rCurrentNursery->u.back;
969               if (cap->r.rCurrentNursery->u.back != NULL) {
970                   cap->r.rCurrentNursery->u.back->link = bd;
971               } else {
972                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
973                          g0s0->blocks == cap->r.rNursery);
974                   cap->r.rNursery = g0s0->blocks = bd;
975               }           
976               cap->r.rCurrentNursery->u.back = bd;
977
978               // initialise it as a nursery block.  We initialise the
979               // step, gen_no, and flags field of *every* sub-block in
980               // this large block, because this is easier than making
981               // sure that we always find the block head of a large
982               // block whenever we call Bdescr() (eg. evacuate() and
983               // isAlive() in the GC would both have to do this, at
984               // least).
985               { 
986                   bdescr *x;
987                   for (x = bd; x < bd + blocks; x++) {
988                       x->step = g0s0;
989                       x->gen_no = 0;
990                       x->flags = 0;
991                   }
992               }
993
994               // don't forget to update the block count in g0s0.
995               g0s0->n_blocks += blocks;
996               // This assert can be a killer if the app is doing lots
997               // of large block allocations.
998               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
999
1000               // now update the nursery to point to the new block
1001               cap->r.rCurrentNursery = bd;
1002
1003               // we might be unlucky and have another thread get on the
1004               // run queue before us and steal the large block, but in that
1005               // case the thread will just end up requesting another large
1006               // block.
1007               PUSH_ON_RUN_QUEUE(t);
1008               break;
1009           }
1010       }
1011
1012       /* make all the running tasks block on a condition variable,
1013        * maybe set context_switch and wait till they all pile in,
1014        * then have them wait on a GC condition variable.
1015        */
1016       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1017                                t->id, whatNext_strs[t->what_next]));
1018       threadPaused(t);
1019 #if defined(GRAN)
1020       ASSERT(!is_on_queue(t,CurrentProc));
1021 #elif defined(PAR)
1022       /* Currently we emit a DESCHEDULE event before GC in GUM.
1023          ToDo: either add separate event to distinguish SYSTEM time from rest
1024                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1025       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1026         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1027                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1028         emitSchedule = rtsTrue;
1029       }
1030 #endif
1031       
1032       ready_to_gc = rtsTrue;
1033       context_switch = 1;               /* stop other threads ASAP */
1034       PUSH_ON_RUN_QUEUE(t);
1035       /* actual GC is done at the end of the while loop */
1036       break;
1037       
1038     case StackOverflow:
1039 #if defined(GRAN)
1040       IF_DEBUG(gran, 
1041                DumpGranEvent(GR_DESCHEDULE, t));
1042       globalGranStats.tot_stackover++;
1043 #elif defined(PAR)
1044       // IF_DEBUG(par, 
1045       // DumpGranEvent(GR_DESCHEDULE, t);
1046       globalParStats.tot_stackover++;
1047 #endif
1048       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1049                                t->id, whatNext_strs[t->what_next]));
1050       /* just adjust the stack for this thread, then pop it back
1051        * on the run queue.
1052        */
1053       threadPaused(t);
1054       { 
1055         /* enlarge the stack */
1056         StgTSO *new_t = threadStackOverflow(t);
1057         
1058         /* This TSO has moved, so update any pointers to it from the
1059          * main thread stack.  It better not be on any other queues...
1060          * (it shouldn't be).
1061          */
1062         if (t->main != NULL) {
1063             t->main->tso = new_t;
1064         }
1065         PUSH_ON_RUN_QUEUE(new_t);
1066       }
1067       break;
1068
1069     case ThreadYielding:
1070 #if defined(GRAN)
1071       IF_DEBUG(gran, 
1072                DumpGranEvent(GR_DESCHEDULE, t));
1073       globalGranStats.tot_yields++;
1074 #elif defined(PAR)
1075       // IF_DEBUG(par, 
1076       // DumpGranEvent(GR_DESCHEDULE, t);
1077       globalParStats.tot_yields++;
1078 #endif
1079       /* put the thread back on the run queue.  Then, if we're ready to
1080        * GC, check whether this is the last task to stop.  If so, wake
1081        * up the GC thread.  getThread will block during a GC until the
1082        * GC is finished.
1083        */
1084       IF_DEBUG(scheduler,
1085                if (t->what_next != prev_what_next) {
1086                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1087                          t->id, whatNext_strs[t->what_next]);
1088                } else {
1089                    belch("--<< thread %ld (%s) stopped, yielding", 
1090                          t->id, whatNext_strs[t->what_next]);
1091                }
1092                );
1093
1094       IF_DEBUG(sanity,
1095                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1096                checkTSO(t));
1097       ASSERT(t->link == END_TSO_QUEUE);
1098
1099       // Shortcut if we're just switching evaluators: don't bother
1100       // doing stack squeezing (which can be expensive), just run the
1101       // thread.
1102       if (t->what_next != prev_what_next) {
1103           goto run_thread;
1104       }
1105
1106       threadPaused(t);
1107
1108 #if defined(GRAN)
1109       ASSERT(!is_on_queue(t,CurrentProc));
1110
1111       IF_DEBUG(sanity,
1112                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1113                checkThreadQsSanity(rtsTrue));
1114 #endif
1115
1116 #if defined(PAR)
1117       if (RtsFlags.ParFlags.doFairScheduling) { 
1118         /* this does round-robin scheduling; good for concurrency */
1119         APPEND_TO_RUN_QUEUE(t);
1120       } else {
1121         /* this does unfair scheduling; good for parallelism */
1122         PUSH_ON_RUN_QUEUE(t);
1123       }
1124 #else
1125       // this does round-robin scheduling; good for concurrency
1126       APPEND_TO_RUN_QUEUE(t);
1127 #endif
1128
1129 #if defined(GRAN)
1130       /* add a ContinueThread event to actually process the thread */
1131       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1132                 ContinueThread,
1133                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1134       IF_GRAN_DEBUG(bq, 
1135                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1136                G_EVENTQ(0);
1137                G_CURR_THREADQ(0));
1138 #endif /* GRAN */
1139       break;
1140
1141     case ThreadBlocked:
1142 #if defined(GRAN)
1143       IF_DEBUG(scheduler,
1144                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1145                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1146                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1147
1148       // ??? needed; should emit block before
1149       IF_DEBUG(gran, 
1150                DumpGranEvent(GR_DESCHEDULE, t)); 
1151       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1152       /*
1153         ngoq Dogh!
1154       ASSERT(procStatus[CurrentProc]==Busy || 
1155               ((procStatus[CurrentProc]==Fetching) && 
1156               (t->block_info.closure!=(StgClosure*)NULL)));
1157       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1158           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1159             procStatus[CurrentProc]==Fetching)) 
1160         procStatus[CurrentProc] = Idle;
1161       */
1162 #elif defined(PAR)
1163       IF_DEBUG(scheduler,
1164                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1165                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1166       IF_PAR_DEBUG(bq,
1167
1168                    if (t->block_info.closure!=(StgClosure*)NULL) 
1169                      print_bq(t->block_info.closure));
1170
1171       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1172       blockThread(t);
1173
1174       /* whatever we schedule next, we must log that schedule */
1175       emitSchedule = rtsTrue;
1176
1177 #else /* !GRAN */
1178       /* don't need to do anything.  Either the thread is blocked on
1179        * I/O, in which case we'll have called addToBlockedQueue
1180        * previously, or it's blocked on an MVar or Blackhole, in which
1181        * case it'll be on the relevant queue already.
1182        */
1183       IF_DEBUG(scheduler,
1184                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1185                        t->id, whatNext_strs[t->what_next]);
1186                printThreadBlockage(t);
1187                fprintf(stderr, "\n"));
1188       fflush(stderr);
1189
1190       /* Only for dumping event to log file 
1191          ToDo: do I need this in GranSim, too?
1192       blockThread(t);
1193       */
1194 #endif
1195       threadPaused(t);
1196       break;
1197
1198     case ThreadFinished:
1199       /* Need to check whether this was a main thread, and if so, signal
1200        * the task that started it with the return value.  If we have no
1201        * more main threads, we probably need to stop all the tasks until
1202        * we get a new one.
1203        */
1204       /* We also end up here if the thread kills itself with an
1205        * uncaught exception, see Exception.hc.
1206        */
1207       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1208                                t->id, whatNext_strs[t->what_next]));
1209 #if defined(GRAN)
1210       endThread(t, CurrentProc); // clean-up the thread
1211 #elif defined(PAR)
1212       /* For now all are advisory -- HWL */
1213       //if(t->priority==AdvisoryPriority) ??
1214       advisory_thread_count--;
1215       
1216 # ifdef DIST
1217       if(t->dist.priority==RevalPriority)
1218         FinishReval(t);
1219 # endif
1220       
1221       if (RtsFlags.ParFlags.ParStats.Full &&
1222           !RtsFlags.ParFlags.ParStats.Suppressed) 
1223         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1224 #endif
1225
1226       //
1227       // Check whether the thread that just completed was a main
1228       // thread, and if so return with the result.  
1229       //
1230       // There is an assumption here that all thread completion goes
1231       // through this point; we need to make sure that if a thread
1232       // ends up in the ThreadKilled state, that it stays on the run
1233       // queue so it can be dealt with here.
1234       //
1235       if (
1236 #if defined(RTS_SUPPORTS_THREADS)
1237           mainThread != NULL
1238 #else
1239           mainThread->tso == t
1240 #endif
1241           )
1242       {
1243           // We are a bound thread: this must be our thread that just
1244           // completed.
1245           ASSERT(mainThread->tso == t);
1246
1247           if (t->what_next == ThreadComplete) {
1248               if (mainThread->ret) {
1249                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1250                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1251               }
1252               mainThread->stat = Success;
1253           } else {
1254               if (mainThread->ret) {
1255                   *(mainThread->ret) = NULL;
1256               }
1257               if (was_interrupted) {
1258                   mainThread->stat = Interrupted;
1259               } else {
1260                   mainThread->stat = Killed;
1261               }
1262           }
1263 #ifdef DEBUG
1264           removeThreadLabel((StgWord)mainThread->tso->id);
1265 #endif
1266           if (mainThread->prev == NULL) {
1267               main_threads = mainThread->link;
1268           } else {
1269               mainThread->prev->link = mainThread->link;
1270           }
1271           if (mainThread->link != NULL) {
1272               mainThread->link->prev = NULL;
1273           }
1274           releaseCapability(cap);
1275           return;
1276       }
1277
1278 #ifdef RTS_SUPPORTS_THREADS
1279       ASSERT(t->main == NULL);
1280 #else
1281       if (t->main != NULL) {
1282           // Must be a main thread that is not the topmost one.  Leave
1283           // it on the run queue until the stack has unwound to the
1284           // point where we can deal with this.  Leaving it on the run
1285           // queue also ensures that the garbage collector knows about
1286           // this thread and its return value (it gets dropped from the
1287           // all_threads list so there's no other way to find it).
1288           APPEND_TO_RUN_QUEUE(t);
1289       }
1290 #endif
1291       break;
1292
1293     default:
1294       barf("schedule: invalid thread return code %d", (int)ret);
1295     }
1296
1297 #ifdef PROFILING
1298     // When we have +RTS -i0 and we're heap profiling, do a census at
1299     // every GC.  This lets us get repeatable runs for debugging.
1300     if (performHeapProfile ||
1301         (RtsFlags.ProfFlags.profileInterval==0 &&
1302          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1303         GarbageCollect(GetRoots, rtsTrue);
1304         heapCensus();
1305         performHeapProfile = rtsFalse;
1306         ready_to_gc = rtsFalse; // we already GC'd
1307     }
1308 #endif
1309
1310     if (ready_to_gc) {
1311       /* everybody back, start the GC.
1312        * Could do it in this thread, or signal a condition var
1313        * to do it in another thread.  Either way, we need to
1314        * broadcast on gc_pending_cond afterward.
1315        */
1316 #if defined(RTS_SUPPORTS_THREADS)
1317       IF_DEBUG(scheduler,sched_belch("doing GC"));
1318 #endif
1319       GarbageCollect(GetRoots,rtsFalse);
1320       ready_to_gc = rtsFalse;
1321 #if defined(GRAN)
1322       /* add a ContinueThread event to continue execution of current thread */
1323       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1324                 ContinueThread,
1325                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1326       IF_GRAN_DEBUG(bq, 
1327                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1328                G_EVENTQ(0);
1329                G_CURR_THREADQ(0));
1330 #endif /* GRAN */
1331     }
1332
1333 #if defined(GRAN)
1334   next_thread:
1335     IF_GRAN_DEBUG(unused,
1336                   print_eventq(EventHd));
1337
1338     event = get_next_event();
1339 #elif defined(PAR)
1340   next_thread:
1341     /* ToDo: wait for next message to arrive rather than busy wait */
1342 #endif /* GRAN */
1343
1344   } /* end of while(1) */
1345
1346   IF_PAR_DEBUG(verbose,
1347                belch("== Leaving schedule() after having received Finish"));
1348 }
1349
1350 /* ---------------------------------------------------------------------------
1351  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1352  * used by Control.Concurrent for error checking.
1353  * ------------------------------------------------------------------------- */
1354  
1355 StgBool
1356 rtsSupportsBoundThreads(void)
1357 {
1358 #ifdef THREADED_RTS
1359   return rtsTrue;
1360 #else
1361   return rtsFalse;
1362 #endif
1363 }
1364
1365 /* ---------------------------------------------------------------------------
1366  * isThreadBound(tso): check whether tso is bound to an OS thread.
1367  * ------------------------------------------------------------------------- */
1368  
1369 StgBool
1370 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1371 {
1372 #ifdef THREADED_RTS
1373   return (tso->main != NULL);
1374 #endif
1375   return rtsFalse;
1376 }
1377
1378 /* ---------------------------------------------------------------------------
1379  * Singleton fork(). Do not copy any running threads.
1380  * ------------------------------------------------------------------------- */
1381
1382 #ifndef mingw32_TARGET_OS
1383 #define FORKPROCESS_PRIMOP_SUPPORTED
1384 #endif
1385
1386 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1387 static void 
1388 deleteThreadImmediately(StgTSO *tso);
1389 #endif
1390 StgInt
1391 forkProcess(HsStablePtr *entry
1392 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1393             STG_UNUSED
1394 #endif
1395            )
1396 {
1397 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1398   pid_t pid;
1399   StgTSO* t,*next;
1400   StgMainThread *m;
1401   SchedulerStatus rc;
1402
1403   IF_DEBUG(scheduler,sched_belch("forking!"));
1404   rts_lock(); // This not only acquires sched_mutex, it also
1405               // makes sure that no other threads are running
1406
1407   pid = fork();
1408
1409   if (pid) { /* parent */
1410
1411   /* just return the pid */
1412     rts_unlock();
1413     return pid;
1414     
1415   } else { /* child */
1416     
1417     
1418       // delete all threads
1419     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1420     
1421     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1422       next = t->link;
1423
1424         // don't allow threads to catch the ThreadKilled exception
1425       deleteThreadImmediately(t);
1426     }
1427     
1428       // wipe the main thread list
1429     while((m = main_threads) != NULL) {
1430       main_threads = m->link;
1431 # ifdef THREADED_RTS
1432       closeCondition(&m->bound_thread_cond);
1433 # endif
1434       stgFree(m);
1435     }
1436     
1437 # ifdef RTS_SUPPORTS_THREADS
1438     resetTaskManagerAfterFork();      // tell startTask() and friends that
1439     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1440     resetWorkerWakeupPipeAfterFork();
1441 # endif
1442     
1443     rc = rts_evalStableIO(entry, NULL);  // run the action
1444     rts_checkSchedStatus("forkProcess",rc);
1445     
1446     rts_unlock();
1447     
1448     hs_exit();                      // clean up and exit
1449     stg_exit(0);
1450   }
1451 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1452   barf("forkProcess#: primop not supported, sorry!\n");
1453   return -1;
1454 #endif
1455 }
1456
1457 /* ---------------------------------------------------------------------------
1458  * deleteAllThreads():  kill all the live threads.
1459  *
1460  * This is used when we catch a user interrupt (^C), before performing
1461  * any necessary cleanups and running finalizers.
1462  *
1463  * Locks: sched_mutex held.
1464  * ------------------------------------------------------------------------- */
1465    
1466 void
1467 deleteAllThreads ( void )
1468 {
1469   StgTSO* t, *next;
1470   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1471   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1472       next = t->global_link;
1473       deleteThread(t);
1474   }      
1475
1476   // The run queue now contains a bunch of ThreadKilled threads.  We
1477   // must not throw these away: the main thread(s) will be in there
1478   // somewhere, and the main scheduler loop has to deal with it.
1479   // Also, the run queue is the only thing keeping these threads from
1480   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1481
1482   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1483   ASSERT(sleeping_queue == END_TSO_QUEUE);
1484 }
1485
1486 /* startThread and  insertThread are now in GranSim.c -- HWL */
1487
1488
1489 /* ---------------------------------------------------------------------------
1490  * Suspending & resuming Haskell threads.
1491  * 
1492  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1493  * its capability before calling the C function.  This allows another
1494  * task to pick up the capability and carry on running Haskell
1495  * threads.  It also means that if the C call blocks, it won't lock
1496  * the whole system.
1497  *
1498  * The Haskell thread making the C call is put to sleep for the
1499  * duration of the call, on the susepended_ccalling_threads queue.  We
1500  * give out a token to the task, which it can use to resume the thread
1501  * on return from the C function.
1502  * ------------------------------------------------------------------------- */
1503    
1504 StgInt
1505 suspendThread( StgRegTable *reg, 
1506                rtsBool concCall
1507 #if !defined(DEBUG)
1508                STG_UNUSED
1509 #endif
1510                )
1511 {
1512   nat tok;
1513   Capability *cap;
1514   int saved_errno = errno;
1515
1516   /* assume that *reg is a pointer to the StgRegTable part
1517    * of a Capability.
1518    */
1519   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1520
1521   ACQUIRE_LOCK(&sched_mutex);
1522
1523   IF_DEBUG(scheduler,
1524            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1525
1526   // XXX this might not be necessary --SDM
1527   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1528
1529   threadPaused(cap->r.rCurrentTSO);
1530   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1531   suspended_ccalling_threads = cap->r.rCurrentTSO;
1532
1533   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1534       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1535       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1536   } else {
1537       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1538   }
1539
1540   /* Use the thread ID as the token; it should be unique */
1541   tok = cap->r.rCurrentTSO->id;
1542
1543   /* Hand back capability */
1544   releaseCapability(cap);
1545   
1546 #if defined(RTS_SUPPORTS_THREADS)
1547   /* Preparing to leave the RTS, so ensure there's a native thread/task
1548      waiting to take over.
1549   */
1550   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1551 #endif
1552
1553   /* Other threads _might_ be available for execution; signal this */
1554   THREAD_RUNNABLE();
1555   RELEASE_LOCK(&sched_mutex);
1556   
1557   errno = saved_errno;
1558   return tok; 
1559 }
1560
1561 StgRegTable *
1562 resumeThread( StgInt tok,
1563               rtsBool concCall STG_UNUSED )
1564 {
1565   StgTSO *tso, **prev;
1566   Capability *cap;
1567   int saved_errno = errno;
1568
1569 #if defined(RTS_SUPPORTS_THREADS)
1570   /* Wait for permission to re-enter the RTS with the result. */
1571   ACQUIRE_LOCK(&sched_mutex);
1572   waitForReturnCapability(&sched_mutex, &cap);
1573
1574   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1575 #else
1576   grabCapability(&cap);
1577 #endif
1578
1579   /* Remove the thread off of the suspended list */
1580   prev = &suspended_ccalling_threads;
1581   for (tso = suspended_ccalling_threads; 
1582        tso != END_TSO_QUEUE; 
1583        prev = &tso->link, tso = tso->link) {
1584     if (tso->id == (StgThreadID)tok) {
1585       *prev = tso->link;
1586       break;
1587     }
1588   }
1589   if (tso == END_TSO_QUEUE) {
1590     barf("resumeThread: thread not found");
1591   }
1592   tso->link = END_TSO_QUEUE;
1593   
1594   if(tso->why_blocked == BlockedOnCCall) {
1595       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1596       tso->blocked_exceptions = NULL;
1597   }
1598   
1599   /* Reset blocking status */
1600   tso->why_blocked  = NotBlocked;
1601
1602   cap->r.rCurrentTSO = tso;
1603   RELEASE_LOCK(&sched_mutex);
1604   errno = saved_errno;
1605   return &cap->r;
1606 }
1607
1608
1609 /* ---------------------------------------------------------------------------
1610  * Static functions
1611  * ------------------------------------------------------------------------ */
1612 static void unblockThread(StgTSO *tso);
1613
1614 /* ---------------------------------------------------------------------------
1615  * Comparing Thread ids.
1616  *
1617  * This is used from STG land in the implementation of the
1618  * instances of Eq/Ord for ThreadIds.
1619  * ------------------------------------------------------------------------ */
1620
1621 int
1622 cmp_thread(StgPtr tso1, StgPtr tso2) 
1623
1624   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1625   StgThreadID id2 = ((StgTSO *)tso2)->id;
1626  
1627   if (id1 < id2) return (-1);
1628   if (id1 > id2) return 1;
1629   return 0;
1630 }
1631
1632 /* ---------------------------------------------------------------------------
1633  * Fetching the ThreadID from an StgTSO.
1634  *
1635  * This is used in the implementation of Show for ThreadIds.
1636  * ------------------------------------------------------------------------ */
1637 int
1638 rts_getThreadId(StgPtr tso) 
1639 {
1640   return ((StgTSO *)tso)->id;
1641 }
1642
1643 #ifdef DEBUG
1644 void
1645 labelThread(StgPtr tso, char *label)
1646 {
1647   int len;
1648   void *buf;
1649
1650   /* Caveat: Once set, you can only set the thread name to "" */
1651   len = strlen(label)+1;
1652   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1653   strncpy(buf,label,len);
1654   /* Update will free the old memory for us */
1655   updateThreadLabel(((StgTSO *)tso)->id,buf);
1656 }
1657 #endif /* DEBUG */
1658
1659 /* ---------------------------------------------------------------------------
1660    Create a new thread.
1661
1662    The new thread starts with the given stack size.  Before the
1663    scheduler can run, however, this thread needs to have a closure
1664    (and possibly some arguments) pushed on its stack.  See
1665    pushClosure() in Schedule.h.
1666
1667    createGenThread() and createIOThread() (in SchedAPI.h) are
1668    convenient packaged versions of this function.
1669
1670    currently pri (priority) is only used in a GRAN setup -- HWL
1671    ------------------------------------------------------------------------ */
1672 #if defined(GRAN)
1673 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1674 StgTSO *
1675 createThread(nat size, StgInt pri)
1676 #else
1677 StgTSO *
1678 createThread(nat size)
1679 #endif
1680 {
1681
1682     StgTSO *tso;
1683     nat stack_size;
1684
1685     /* First check whether we should create a thread at all */
1686 #if defined(PAR)
1687   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1688   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1689     threadsIgnored++;
1690     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1691           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1692     return END_TSO_QUEUE;
1693   }
1694   threadsCreated++;
1695 #endif
1696
1697 #if defined(GRAN)
1698   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1699 #endif
1700
1701   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1702
1703   /* catch ridiculously small stack sizes */
1704   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1705     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1706   }
1707
1708   stack_size = size - TSO_STRUCT_SIZEW;
1709
1710   tso = (StgTSO *)allocate(size);
1711   TICK_ALLOC_TSO(stack_size, 0);
1712
1713   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1714 #if defined(GRAN)
1715   SET_GRAN_HDR(tso, ThisPE);
1716 #endif
1717
1718   // Always start with the compiled code evaluator
1719   tso->what_next = ThreadRunGHC;
1720
1721   tso->id = next_thread_id++; 
1722   tso->why_blocked  = NotBlocked;
1723   tso->blocked_exceptions = NULL;
1724
1725   tso->saved_errno = 0;
1726   tso->main = NULL;
1727   
1728   tso->stack_size   = stack_size;
1729   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1730                               - TSO_STRUCT_SIZEW;
1731   tso->sp           = (P_)&(tso->stack) + stack_size;
1732
1733 #ifdef PROFILING
1734   tso->prof.CCCS = CCS_MAIN;
1735 #endif
1736
1737   /* put a stop frame on the stack */
1738   tso->sp -= sizeofW(StgStopFrame);
1739   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1740   // ToDo: check this
1741 #if defined(GRAN)
1742   tso->link = END_TSO_QUEUE;
1743   /* uses more flexible routine in GranSim */
1744   insertThread(tso, CurrentProc);
1745 #else
1746   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1747    * from its creation
1748    */
1749 #endif
1750
1751 #if defined(GRAN) 
1752   if (RtsFlags.GranFlags.GranSimStats.Full) 
1753     DumpGranEvent(GR_START,tso);
1754 #elif defined(PAR)
1755   if (RtsFlags.ParFlags.ParStats.Full) 
1756     DumpGranEvent(GR_STARTQ,tso);
1757   /* HACk to avoid SCHEDULE 
1758      LastTSO = tso; */
1759 #endif
1760
1761   /* Link the new thread on the global thread list.
1762    */
1763   tso->global_link = all_threads;
1764   all_threads = tso;
1765
1766 #if defined(DIST)
1767   tso->dist.priority = MandatoryPriority; //by default that is...
1768 #endif
1769
1770 #if defined(GRAN)
1771   tso->gran.pri = pri;
1772 # if defined(DEBUG)
1773   tso->gran.magic = TSO_MAGIC; // debugging only
1774 # endif
1775   tso->gran.sparkname   = 0;
1776   tso->gran.startedat   = CURRENT_TIME; 
1777   tso->gran.exported    = 0;
1778   tso->gran.basicblocks = 0;
1779   tso->gran.allocs      = 0;
1780   tso->gran.exectime    = 0;
1781   tso->gran.fetchtime   = 0;
1782   tso->gran.fetchcount  = 0;
1783   tso->gran.blocktime   = 0;
1784   tso->gran.blockcount  = 0;
1785   tso->gran.blockedat   = 0;
1786   tso->gran.globalsparks = 0;
1787   tso->gran.localsparks  = 0;
1788   if (RtsFlags.GranFlags.Light)
1789     tso->gran.clock  = Now; /* local clock */
1790   else
1791     tso->gran.clock  = 0;
1792
1793   IF_DEBUG(gran,printTSO(tso));
1794 #elif defined(PAR)
1795 # if defined(DEBUG)
1796   tso->par.magic = TSO_MAGIC; // debugging only
1797 # endif
1798   tso->par.sparkname   = 0;
1799   tso->par.startedat   = CURRENT_TIME; 
1800   tso->par.exported    = 0;
1801   tso->par.basicblocks = 0;
1802   tso->par.allocs      = 0;
1803   tso->par.exectime    = 0;
1804   tso->par.fetchtime   = 0;
1805   tso->par.fetchcount  = 0;
1806   tso->par.blocktime   = 0;
1807   tso->par.blockcount  = 0;
1808   tso->par.blockedat   = 0;
1809   tso->par.globalsparks = 0;
1810   tso->par.localsparks  = 0;
1811 #endif
1812
1813 #if defined(GRAN)
1814   globalGranStats.tot_threads_created++;
1815   globalGranStats.threads_created_on_PE[CurrentProc]++;
1816   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1817   globalGranStats.tot_sq_probes++;
1818 #elif defined(PAR)
1819   // collect parallel global statistics (currently done together with GC stats)
1820   if (RtsFlags.ParFlags.ParStats.Global &&
1821       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1822     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1823     globalParStats.tot_threads_created++;
1824   }
1825 #endif 
1826
1827 #if defined(GRAN)
1828   IF_GRAN_DEBUG(pri,
1829                 belch("==__ schedule: Created TSO %d (%p);",
1830                       CurrentProc, tso, tso->id));
1831 #elif defined(PAR)
1832     IF_PAR_DEBUG(verbose,
1833                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1834                        tso->id, tso, advisory_thread_count));
1835 #else
1836   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1837                                  tso->id, tso->stack_size));
1838 #endif    
1839   return tso;
1840 }
1841
1842 #if defined(PAR)
1843 /* RFP:
1844    all parallel thread creation calls should fall through the following routine.
1845 */
1846 StgTSO *
1847 createSparkThread(rtsSpark spark) 
1848 { StgTSO *tso;
1849   ASSERT(spark != (rtsSpark)NULL);
1850   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1851   { threadsIgnored++;
1852     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1853           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1854     return END_TSO_QUEUE;
1855   }
1856   else
1857   { threadsCreated++;
1858     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1859     if (tso==END_TSO_QUEUE)     
1860       barf("createSparkThread: Cannot create TSO");
1861 #if defined(DIST)
1862     tso->priority = AdvisoryPriority;
1863 #endif
1864     pushClosure(tso,spark);
1865     PUSH_ON_RUN_QUEUE(tso);
1866     advisory_thread_count++;    
1867   }
1868   return tso;
1869 }
1870 #endif
1871
1872 /*
1873   Turn a spark into a thread.
1874   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1875 */
1876 #if defined(PAR)
1877 StgTSO *
1878 activateSpark (rtsSpark spark) 
1879 {
1880   StgTSO *tso;
1881
1882   tso = createSparkThread(spark);
1883   if (RtsFlags.ParFlags.ParStats.Full) {   
1884     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1885     IF_PAR_DEBUG(verbose,
1886                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1887                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1888   }
1889   // ToDo: fwd info on local/global spark to thread -- HWL
1890   // tso->gran.exported =  spark->exported;
1891   // tso->gran.locked =   !spark->global;
1892   // tso->gran.sparkname = spark->name;
1893
1894   return tso;
1895 }
1896 #endif
1897
1898 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1899                                    Capability *initialCapability
1900                                    );
1901
1902
1903 /* ---------------------------------------------------------------------------
1904  * scheduleThread()
1905  *
1906  * scheduleThread puts a thread on the head of the runnable queue.
1907  * This will usually be done immediately after a thread is created.
1908  * The caller of scheduleThread must create the thread using e.g.
1909  * createThread and push an appropriate closure
1910  * on this thread's stack before the scheduler is invoked.
1911  * ------------------------------------------------------------------------ */
1912
1913 static void scheduleThread_ (StgTSO* tso);
1914
1915 void
1916 scheduleThread_(StgTSO *tso)
1917 {
1918   // Precondition: sched_mutex must be held.
1919   PUSH_ON_RUN_QUEUE(tso);
1920   THREAD_RUNNABLE();
1921 }
1922
1923 void
1924 scheduleThread(StgTSO* tso)
1925 {
1926   ACQUIRE_LOCK(&sched_mutex);
1927   scheduleThread_(tso);
1928   RELEASE_LOCK(&sched_mutex);
1929 }
1930
1931 #if defined(RTS_SUPPORTS_THREADS)
1932 static Condition bound_cond_cache;
1933 static int bound_cond_cache_full = 0;
1934 #endif
1935
1936
1937 SchedulerStatus
1938 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1939                    Capability *initialCapability)
1940 {
1941     // Precondition: sched_mutex must be held
1942     StgMainThread *m;
1943
1944     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1945     m->tso = tso;
1946     tso->main = m;
1947     m->ret = ret;
1948     m->stat = NoStatus;
1949     m->link = main_threads;
1950     m->prev = NULL;
1951     if (main_threads != NULL) {
1952         main_threads->prev = m;
1953     }
1954     main_threads = m;
1955
1956 #if defined(RTS_SUPPORTS_THREADS)
1957     // Allocating a new condition for each thread is expensive, so we
1958     // cache one.  This is a pretty feeble hack, but it helps speed up
1959     // consecutive call-ins quite a bit.
1960     if (bound_cond_cache_full) {
1961         m->bound_thread_cond = bound_cond_cache;
1962         bound_cond_cache_full = 0;
1963     } else {
1964         initCondition(&m->bound_thread_cond);
1965     }
1966 #endif
1967
1968     /* Put the thread on the main-threads list prior to scheduling the TSO.
1969        Failure to do so introduces a race condition in the MT case (as
1970        identified by Wolfgang Thaller), whereby the new task/OS thread 
1971        created by scheduleThread_() would complete prior to the thread
1972        that spawned it managed to put 'itself' on the main-threads list.
1973        The upshot of it all being that the worker thread wouldn't get to
1974        signal the completion of the its work item for the main thread to
1975        see (==> it got stuck waiting.)    -- sof 6/02.
1976     */
1977     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1978     
1979     PUSH_ON_RUN_QUEUE(tso);
1980     // NB. Don't call THREAD_RUNNABLE() here, because the thread is
1981     // bound and only runnable by *this* OS thread, so waking up other
1982     // workers will just slow things down.
1983
1984     return waitThread_(m, initialCapability);
1985 }
1986
1987 /* ---------------------------------------------------------------------------
1988  * initScheduler()
1989  *
1990  * Initialise the scheduler.  This resets all the queues - if the
1991  * queues contained any threads, they'll be garbage collected at the
1992  * next pass.
1993  *
1994  * ------------------------------------------------------------------------ */
1995
1996 void 
1997 initScheduler(void)
1998 {
1999 #if defined(GRAN)
2000   nat i;
2001
2002   for (i=0; i<=MAX_PROC; i++) {
2003     run_queue_hds[i]      = END_TSO_QUEUE;
2004     run_queue_tls[i]      = END_TSO_QUEUE;
2005     blocked_queue_hds[i]  = END_TSO_QUEUE;
2006     blocked_queue_tls[i]  = END_TSO_QUEUE;
2007     ccalling_threadss[i]  = END_TSO_QUEUE;
2008     sleeping_queue        = END_TSO_QUEUE;
2009   }
2010 #else
2011   run_queue_hd      = END_TSO_QUEUE;
2012   run_queue_tl      = END_TSO_QUEUE;
2013   blocked_queue_hd  = END_TSO_QUEUE;
2014   blocked_queue_tl  = END_TSO_QUEUE;
2015   sleeping_queue    = END_TSO_QUEUE;
2016 #endif 
2017
2018   suspended_ccalling_threads  = END_TSO_QUEUE;
2019
2020   main_threads = NULL;
2021   all_threads  = END_TSO_QUEUE;
2022
2023   context_switch = 0;
2024   interrupted    = 0;
2025
2026   RtsFlags.ConcFlags.ctxtSwitchTicks =
2027       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2028       
2029 #if defined(RTS_SUPPORTS_THREADS)
2030   /* Initialise the mutex and condition variables used by
2031    * the scheduler. */
2032   initMutex(&sched_mutex);
2033   initMutex(&term_mutex);
2034 #endif
2035   
2036   ACQUIRE_LOCK(&sched_mutex);
2037
2038   /* A capability holds the state a native thread needs in
2039    * order to execute STG code. At least one capability is
2040    * floating around (only SMP builds have more than one).
2041    */
2042   initCapabilities();
2043   
2044 #if defined(RTS_SUPPORTS_THREADS)
2045     /* start our haskell execution tasks */
2046     startTaskManager(0,taskStart);
2047 #endif
2048
2049 #if /* defined(SMP) ||*/ defined(PAR)
2050   initSparkPools();
2051 #endif
2052
2053   RELEASE_LOCK(&sched_mutex);
2054 }
2055
2056 void
2057 exitScheduler( void )
2058 {
2059 #if defined(RTS_SUPPORTS_THREADS)
2060   stopTaskManager();
2061 #endif
2062   shutting_down_scheduler = rtsTrue;
2063 }
2064
2065 /* ----------------------------------------------------------------------------
2066    Managing the per-task allocation areas.
2067    
2068    Each capability comes with an allocation area.  These are
2069    fixed-length block lists into which allocation can be done.
2070
2071    ToDo: no support for two-space collection at the moment???
2072    ------------------------------------------------------------------------- */
2073
2074 static
2075 SchedulerStatus
2076 waitThread_(StgMainThread* m, Capability *initialCapability)
2077 {
2078   SchedulerStatus stat;
2079
2080   // Precondition: sched_mutex must be held.
2081   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2082
2083 #if defined(GRAN)
2084   /* GranSim specific init */
2085   CurrentTSO = m->tso;                // the TSO to run
2086   procStatus[MainProc] = Busy;        // status of main PE
2087   CurrentProc = MainProc;             // PE to run it on
2088   schedule(m,initialCapability);
2089 #else
2090   schedule(m,initialCapability);
2091   ASSERT(m->stat != NoStatus);
2092 #endif
2093
2094   stat = m->stat;
2095
2096 #if defined(RTS_SUPPORTS_THREADS)
2097   // Free the condition variable, returning it to the cache if possible.
2098   if (!bound_cond_cache_full) {
2099       bound_cond_cache = m->bound_thread_cond;
2100       bound_cond_cache_full = 1;
2101   } else {
2102       closeCondition(&m->bound_thread_cond);
2103   }
2104 #endif
2105
2106   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2107   stgFree(m);
2108
2109   // Postcondition: sched_mutex still held
2110   return stat;
2111 }
2112
2113 /* ---------------------------------------------------------------------------
2114    Where are the roots that we know about?
2115
2116         - all the threads on the runnable queue
2117         - all the threads on the blocked queue
2118         - all the threads on the sleeping queue
2119         - all the thread currently executing a _ccall_GC
2120         - all the "main threads"
2121      
2122    ------------------------------------------------------------------------ */
2123
2124 /* This has to be protected either by the scheduler monitor, or by the
2125         garbage collection monitor (probably the latter).
2126         KH @ 25/10/99
2127 */
2128
2129 void
2130 GetRoots( evac_fn evac )
2131 {
2132 #if defined(GRAN)
2133   {
2134     nat i;
2135     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2136       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2137           evac((StgClosure **)&run_queue_hds[i]);
2138       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2139           evac((StgClosure **)&run_queue_tls[i]);
2140       
2141       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2142           evac((StgClosure **)&blocked_queue_hds[i]);
2143       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2144           evac((StgClosure **)&blocked_queue_tls[i]);
2145       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2146           evac((StgClosure **)&ccalling_threads[i]);
2147     }
2148   }
2149
2150   markEventQueue();
2151
2152 #else /* !GRAN */
2153   if (run_queue_hd != END_TSO_QUEUE) {
2154       ASSERT(run_queue_tl != END_TSO_QUEUE);
2155       evac((StgClosure **)&run_queue_hd);
2156       evac((StgClosure **)&run_queue_tl);
2157   }
2158   
2159   if (blocked_queue_hd != END_TSO_QUEUE) {
2160       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2161       evac((StgClosure **)&blocked_queue_hd);
2162       evac((StgClosure **)&blocked_queue_tl);
2163   }
2164   
2165   if (sleeping_queue != END_TSO_QUEUE) {
2166       evac((StgClosure **)&sleeping_queue);
2167   }
2168 #endif 
2169
2170   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2171       evac((StgClosure **)&suspended_ccalling_threads);
2172   }
2173
2174 #if defined(PAR) || defined(GRAN)
2175   markSparkQueue(evac);
2176 #endif
2177
2178 #if defined(RTS_USER_SIGNALS)
2179   // mark the signal handlers (signals should be already blocked)
2180   markSignalHandlers(evac);
2181 #endif
2182 }
2183
2184 /* -----------------------------------------------------------------------------
2185    performGC
2186
2187    This is the interface to the garbage collector from Haskell land.
2188    We provide this so that external C code can allocate and garbage
2189    collect when called from Haskell via _ccall_GC.
2190
2191    It might be useful to provide an interface whereby the programmer
2192    can specify more roots (ToDo).
2193    
2194    This needs to be protected by the GC condition variable above.  KH.
2195    -------------------------------------------------------------------------- */
2196
2197 static void (*extra_roots)(evac_fn);
2198
2199 void
2200 performGC(void)
2201 {
2202   /* Obligated to hold this lock upon entry */
2203   ACQUIRE_LOCK(&sched_mutex);
2204   GarbageCollect(GetRoots,rtsFalse);
2205   RELEASE_LOCK(&sched_mutex);
2206 }
2207
2208 void
2209 performMajorGC(void)
2210 {
2211   ACQUIRE_LOCK(&sched_mutex);
2212   GarbageCollect(GetRoots,rtsTrue);
2213   RELEASE_LOCK(&sched_mutex);
2214 }
2215
2216 static void
2217 AllRoots(evac_fn evac)
2218 {
2219     GetRoots(evac);             // the scheduler's roots
2220     extra_roots(evac);          // the user's roots
2221 }
2222
2223 void
2224 performGCWithRoots(void (*get_roots)(evac_fn))
2225 {
2226   ACQUIRE_LOCK(&sched_mutex);
2227   extra_roots = get_roots;
2228   GarbageCollect(AllRoots,rtsFalse);
2229   RELEASE_LOCK(&sched_mutex);
2230 }
2231
2232 /* -----------------------------------------------------------------------------
2233    Stack overflow
2234
2235    If the thread has reached its maximum stack size, then raise the
2236    StackOverflow exception in the offending thread.  Otherwise
2237    relocate the TSO into a larger chunk of memory and adjust its stack
2238    size appropriately.
2239    -------------------------------------------------------------------------- */
2240
2241 static StgTSO *
2242 threadStackOverflow(StgTSO *tso)
2243 {
2244   nat new_stack_size, new_tso_size, stack_words;
2245   StgPtr new_sp;
2246   StgTSO *dest;
2247
2248   IF_DEBUG(sanity,checkTSO(tso));
2249   if (tso->stack_size >= tso->max_stack_size) {
2250
2251     IF_DEBUG(gc,
2252              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2253                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2254              /* If we're debugging, just print out the top of the stack */
2255              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2256                                               tso->sp+64)));
2257
2258     /* Send this thread the StackOverflow exception */
2259     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2260     return tso;
2261   }
2262
2263   /* Try to double the current stack size.  If that takes us over the
2264    * maximum stack size for this thread, then use the maximum instead.
2265    * Finally round up so the TSO ends up as a whole number of blocks.
2266    */
2267   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2268   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2269                                        TSO_STRUCT_SIZE)/sizeof(W_);
2270   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2271   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2272
2273   IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2274
2275   dest = (StgTSO *)allocate(new_tso_size);
2276   TICK_ALLOC_TSO(new_stack_size,0);
2277
2278   /* copy the TSO block and the old stack into the new area */
2279   memcpy(dest,tso,TSO_STRUCT_SIZE);
2280   stack_words = tso->stack + tso->stack_size - tso->sp;
2281   new_sp = (P_)dest + new_tso_size - stack_words;
2282   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2283
2284   /* relocate the stack pointers... */
2285   dest->sp         = new_sp;
2286   dest->stack_size = new_stack_size;
2287         
2288   /* Mark the old TSO as relocated.  We have to check for relocated
2289    * TSOs in the garbage collector and any primops that deal with TSOs.
2290    *
2291    * It's important to set the sp value to just beyond the end
2292    * of the stack, so we don't attempt to scavenge any part of the
2293    * dead TSO's stack.
2294    */
2295   tso->what_next = ThreadRelocated;
2296   tso->link = dest;
2297   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2298   tso->why_blocked = NotBlocked;
2299   dest->mut_link = NULL;
2300
2301   IF_PAR_DEBUG(verbose,
2302                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2303                      tso->id, tso, tso->stack_size);
2304                /* If we're debugging, just print out the top of the stack */
2305                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2306                                                 tso->sp+64)));
2307   
2308   IF_DEBUG(sanity,checkTSO(tso));
2309 #if 0
2310   IF_DEBUG(scheduler,printTSO(dest));
2311 #endif
2312
2313   return dest;
2314 }
2315
2316 /* ---------------------------------------------------------------------------
2317    Wake up a queue that was blocked on some resource.
2318    ------------------------------------------------------------------------ */
2319
2320 #if defined(GRAN)
2321 STATIC_INLINE void
2322 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2323 {
2324 }
2325 #elif defined(PAR)
2326 STATIC_INLINE void
2327 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2328 {
2329   /* write RESUME events to log file and
2330      update blocked and fetch time (depending on type of the orig closure) */
2331   if (RtsFlags.ParFlags.ParStats.Full) {
2332     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2333                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2334                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2335     if (EMPTY_RUN_QUEUE())
2336       emitSchedule = rtsTrue;
2337
2338     switch (get_itbl(node)->type) {
2339         case FETCH_ME_BQ:
2340           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2341           break;
2342         case RBH:
2343         case FETCH_ME:
2344         case BLACKHOLE_BQ:
2345           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2346           break;
2347 #ifdef DIST
2348         case MVAR:
2349           break;
2350 #endif    
2351         default:
2352           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2353         }
2354       }
2355 }
2356 #endif
2357
2358 #if defined(GRAN)
2359 static StgBlockingQueueElement *
2360 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2361 {
2362     StgTSO *tso;
2363     PEs node_loc, tso_loc;
2364
2365     node_loc = where_is(node); // should be lifted out of loop
2366     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2367     tso_loc = where_is((StgClosure *)tso);
2368     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2369       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2370       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2371       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2372       // insertThread(tso, node_loc);
2373       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2374                 ResumeThread,
2375                 tso, node, (rtsSpark*)NULL);
2376       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2377       // len_local++;
2378       // len++;
2379     } else { // TSO is remote (actually should be FMBQ)
2380       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2381                                   RtsFlags.GranFlags.Costs.gunblocktime +
2382                                   RtsFlags.GranFlags.Costs.latency;
2383       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2384                 UnblockThread,
2385                 tso, node, (rtsSpark*)NULL);
2386       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2387       // len++;
2388     }
2389     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2390     IF_GRAN_DEBUG(bq,
2391                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2392                           (node_loc==tso_loc ? "Local" : "Global"), 
2393                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2394     tso->block_info.closure = NULL;
2395     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2396                              tso->id, tso));
2397 }
2398 #elif defined(PAR)
2399 static StgBlockingQueueElement *
2400 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2401 {
2402     StgBlockingQueueElement *next;
2403
2404     switch (get_itbl(bqe)->type) {
2405     case TSO:
2406       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2407       /* if it's a TSO just push it onto the run_queue */
2408       next = bqe->link;
2409       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2410       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2411       THREAD_RUNNABLE();
2412       unblockCount(bqe, node);
2413       /* reset blocking status after dumping event */
2414       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2415       break;
2416
2417     case BLOCKED_FETCH:
2418       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2419       next = bqe->link;
2420       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2421       PendingFetches = (StgBlockedFetch *)bqe;
2422       break;
2423
2424 # if defined(DEBUG)
2425       /* can ignore this case in a non-debugging setup; 
2426          see comments on RBHSave closures above */
2427     case CONSTR:
2428       /* check that the closure is an RBHSave closure */
2429       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2430              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2431              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2432       break;
2433
2434     default:
2435       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2436            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2437            (StgClosure *)bqe);
2438 # endif
2439     }
2440   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2441   return next;
2442 }
2443
2444 #else /* !GRAN && !PAR */
2445 static StgTSO *
2446 unblockOneLocked(StgTSO *tso)
2447 {
2448   StgTSO *next;
2449
2450   ASSERT(get_itbl(tso)->type == TSO);
2451   ASSERT(tso->why_blocked != NotBlocked);
2452   tso->why_blocked = NotBlocked;
2453   next = tso->link;
2454   PUSH_ON_RUN_QUEUE(tso);
2455   THREAD_RUNNABLE();
2456   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2457   return next;
2458 }
2459 #endif
2460
2461 #if defined(GRAN) || defined(PAR)
2462 INLINE_ME StgBlockingQueueElement *
2463 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2464 {
2465   ACQUIRE_LOCK(&sched_mutex);
2466   bqe = unblockOneLocked(bqe, node);
2467   RELEASE_LOCK(&sched_mutex);
2468   return bqe;
2469 }
2470 #else
2471 INLINE_ME StgTSO *
2472 unblockOne(StgTSO *tso)
2473 {
2474   ACQUIRE_LOCK(&sched_mutex);
2475   tso = unblockOneLocked(tso);
2476   RELEASE_LOCK(&sched_mutex);
2477   return tso;
2478 }
2479 #endif
2480
2481 #if defined(GRAN)
2482 void 
2483 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2484 {
2485   StgBlockingQueueElement *bqe;
2486   PEs node_loc;
2487   nat len = 0; 
2488
2489   IF_GRAN_DEBUG(bq, 
2490                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2491                       node, CurrentProc, CurrentTime[CurrentProc], 
2492                       CurrentTSO->id, CurrentTSO));
2493
2494   node_loc = where_is(node);
2495
2496   ASSERT(q == END_BQ_QUEUE ||
2497          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2498          get_itbl(q)->type == CONSTR); // closure (type constructor)
2499   ASSERT(is_unique(node));
2500
2501   /* FAKE FETCH: magically copy the node to the tso's proc;
2502      no Fetch necessary because in reality the node should not have been 
2503      moved to the other PE in the first place
2504   */
2505   if (CurrentProc!=node_loc) {
2506     IF_GRAN_DEBUG(bq, 
2507                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2508                         node, node_loc, CurrentProc, CurrentTSO->id, 
2509                         // CurrentTSO, where_is(CurrentTSO),
2510                         node->header.gran.procs));
2511     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2512     IF_GRAN_DEBUG(bq, 
2513                   belch("## new bitmask of node %p is %#x",
2514                         node, node->header.gran.procs));
2515     if (RtsFlags.GranFlags.GranSimStats.Global) {
2516       globalGranStats.tot_fake_fetches++;
2517     }
2518   }
2519
2520   bqe = q;
2521   // ToDo: check: ASSERT(CurrentProc==node_loc);
2522   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2523     //next = bqe->link;
2524     /* 
2525        bqe points to the current element in the queue
2526        next points to the next element in the queue
2527     */
2528     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2529     //tso_loc = where_is(tso);
2530     len++;
2531     bqe = unblockOneLocked(bqe, node);
2532   }
2533
2534   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2535      the closure to make room for the anchor of the BQ */
2536   if (bqe!=END_BQ_QUEUE) {
2537     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2538     /*
2539     ASSERT((info_ptr==&RBH_Save_0_info) ||
2540            (info_ptr==&RBH_Save_1_info) ||
2541            (info_ptr==&RBH_Save_2_info));
2542     */
2543     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2544     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2545     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2546
2547     IF_GRAN_DEBUG(bq,
2548                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2549                         node, info_type(node)));
2550   }
2551
2552   /* statistics gathering */
2553   if (RtsFlags.GranFlags.GranSimStats.Global) {
2554     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2555     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2556     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2557     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2558   }
2559   IF_GRAN_DEBUG(bq,
2560                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2561                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2562 }
2563 #elif defined(PAR)
2564 void 
2565 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2566 {
2567   StgBlockingQueueElement *bqe;
2568
2569   ACQUIRE_LOCK(&sched_mutex);
2570
2571   IF_PAR_DEBUG(verbose, 
2572                belch("##-_ AwBQ for node %p on [%x]: ",
2573                      node, mytid));
2574 #ifdef DIST  
2575   //RFP
2576   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2577     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2578     return;
2579   }
2580 #endif
2581   
2582   ASSERT(q == END_BQ_QUEUE ||
2583          get_itbl(q)->type == TSO ||           
2584          get_itbl(q)->type == BLOCKED_FETCH || 
2585          get_itbl(q)->type == CONSTR); 
2586
2587   bqe = q;
2588   while (get_itbl(bqe)->type==TSO || 
2589          get_itbl(bqe)->type==BLOCKED_FETCH) {
2590     bqe = unblockOneLocked(bqe, node);
2591   }
2592   RELEASE_LOCK(&sched_mutex);
2593 }
2594
2595 #else   /* !GRAN && !PAR */
2596
2597 void
2598 awakenBlockedQueueNoLock(StgTSO *tso)
2599 {
2600   while (tso != END_TSO_QUEUE) {
2601     tso = unblockOneLocked(tso);
2602   }
2603 }
2604
2605 void
2606 awakenBlockedQueue(StgTSO *tso)
2607 {
2608   ACQUIRE_LOCK(&sched_mutex);
2609   while (tso != END_TSO_QUEUE) {
2610     tso = unblockOneLocked(tso);
2611   }
2612   RELEASE_LOCK(&sched_mutex);
2613 }
2614 #endif
2615
2616 /* ---------------------------------------------------------------------------
2617    Interrupt execution
2618    - usually called inside a signal handler so it mustn't do anything fancy.   
2619    ------------------------------------------------------------------------ */
2620
2621 void
2622 interruptStgRts(void)
2623 {
2624     interrupted    = 1;
2625     context_switch = 1;
2626 #ifdef RTS_SUPPORTS_THREADS
2627     wakeBlockedWorkerThread();
2628 #endif
2629 }
2630
2631 /* -----------------------------------------------------------------------------
2632    Unblock a thread
2633
2634    This is for use when we raise an exception in another thread, which
2635    may be blocked.
2636    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2637    -------------------------------------------------------------------------- */
2638
2639 #if defined(GRAN) || defined(PAR)
2640 /*
2641   NB: only the type of the blocking queue is different in GranSim and GUM
2642       the operations on the queue-elements are the same
2643       long live polymorphism!
2644
2645   Locks: sched_mutex is held upon entry and exit.
2646
2647 */
2648 static void
2649 unblockThread(StgTSO *tso)
2650 {
2651   StgBlockingQueueElement *t, **last;
2652
2653   switch (tso->why_blocked) {
2654
2655   case NotBlocked:
2656     return;  /* not blocked */
2657
2658   case BlockedOnMVar:
2659     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2660     {
2661       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2662       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2663
2664       last = (StgBlockingQueueElement **)&mvar->head;
2665       for (t = (StgBlockingQueueElement *)mvar->head; 
2666            t != END_BQ_QUEUE; 
2667            last = &t->link, last_tso = t, t = t->link) {
2668         if (t == (StgBlockingQueueElement *)tso) {
2669           *last = (StgBlockingQueueElement *)tso->link;
2670           if (mvar->tail == tso) {
2671             mvar->tail = (StgTSO *)last_tso;
2672           }
2673           goto done;
2674         }
2675       }
2676       barf("unblockThread (MVAR): TSO not found");
2677     }
2678
2679   case BlockedOnBlackHole:
2680     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2681     {
2682       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2683
2684       last = &bq->blocking_queue;
2685       for (t = bq->blocking_queue; 
2686            t != END_BQ_QUEUE; 
2687            last = &t->link, t = t->link) {
2688         if (t == (StgBlockingQueueElement *)tso) {
2689           *last = (StgBlockingQueueElement *)tso->link;
2690           goto done;
2691         }
2692       }
2693       barf("unblockThread (BLACKHOLE): TSO not found");
2694     }
2695
2696   case BlockedOnException:
2697     {
2698       StgTSO *target  = tso->block_info.tso;
2699
2700       ASSERT(get_itbl(target)->type == TSO);
2701
2702       if (target->what_next == ThreadRelocated) {
2703           target = target->link;
2704           ASSERT(get_itbl(target)->type == TSO);
2705       }
2706
2707       ASSERT(target->blocked_exceptions != NULL);
2708
2709       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2710       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2711            t != END_BQ_QUEUE; 
2712            last = &t->link, t = t->link) {
2713         ASSERT(get_itbl(t)->type == TSO);
2714         if (t == (StgBlockingQueueElement *)tso) {
2715           *last = (StgBlockingQueueElement *)tso->link;
2716           goto done;
2717         }
2718       }
2719       barf("unblockThread (Exception): TSO not found");
2720     }
2721
2722   case BlockedOnRead:
2723   case BlockedOnWrite:
2724 #if defined(mingw32_TARGET_OS)
2725   case BlockedOnDoProc:
2726 #endif
2727     {
2728       /* take TSO off blocked_queue */
2729       StgBlockingQueueElement *prev = NULL;
2730       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2731            prev = t, t = t->link) {
2732         if (t == (StgBlockingQueueElement *)tso) {
2733           if (prev == NULL) {
2734             blocked_queue_hd = (StgTSO *)t->link;
2735             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2736               blocked_queue_tl = END_TSO_QUEUE;
2737             }
2738           } else {
2739             prev->link = t->link;
2740             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2741               blocked_queue_tl = (StgTSO *)prev;
2742             }
2743           }
2744           goto done;
2745         }
2746       }
2747       barf("unblockThread (I/O): TSO not found");
2748     }
2749
2750   case BlockedOnDelay:
2751     {
2752       /* take TSO off sleeping_queue */
2753       StgBlockingQueueElement *prev = NULL;
2754       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2755            prev = t, t = t->link) {
2756         if (t == (StgBlockingQueueElement *)tso) {
2757           if (prev == NULL) {
2758             sleeping_queue = (StgTSO *)t->link;
2759           } else {
2760             prev->link = t->link;
2761           }
2762           goto done;
2763         }
2764       }
2765       barf("unblockThread (delay): TSO not found");
2766     }
2767
2768   default:
2769     barf("unblockThread");
2770   }
2771
2772  done:
2773   tso->link = END_TSO_QUEUE;
2774   tso->why_blocked = NotBlocked;
2775   tso->block_info.closure = NULL;
2776   PUSH_ON_RUN_QUEUE(tso);
2777 }
2778 #else
2779 static void
2780 unblockThread(StgTSO *tso)
2781 {
2782   StgTSO *t, **last;
2783   
2784   /* To avoid locking unnecessarily. */
2785   if (tso->why_blocked == NotBlocked) {
2786     return;
2787   }
2788
2789   switch (tso->why_blocked) {
2790
2791   case BlockedOnMVar:
2792     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2793     {
2794       StgTSO *last_tso = END_TSO_QUEUE;
2795       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2796
2797       last = &mvar->head;
2798       for (t = mvar->head; t != END_TSO_QUEUE; 
2799            last = &t->link, last_tso = t, t = t->link) {
2800         if (t == tso) {
2801           *last = tso->link;
2802           if (mvar->tail == tso) {
2803             mvar->tail = last_tso;
2804           }
2805           goto done;
2806         }
2807       }
2808       barf("unblockThread (MVAR): TSO not found");
2809     }
2810
2811   case BlockedOnBlackHole:
2812     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2813     {
2814       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2815
2816       last = &bq->blocking_queue;
2817       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2818            last = &t->link, t = t->link) {
2819         if (t == tso) {
2820           *last = tso->link;
2821           goto done;
2822         }
2823       }
2824       barf("unblockThread (BLACKHOLE): TSO not found");
2825     }
2826
2827   case BlockedOnException:
2828     {
2829       StgTSO *target  = tso->block_info.tso;
2830
2831       ASSERT(get_itbl(target)->type == TSO);
2832
2833       while (target->what_next == ThreadRelocated) {
2834           target = target->link;
2835           ASSERT(get_itbl(target)->type == TSO);
2836       }
2837       
2838       ASSERT(target->blocked_exceptions != NULL);
2839
2840       last = &target->blocked_exceptions;
2841       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2842            last = &t->link, t = t->link) {
2843         ASSERT(get_itbl(t)->type == TSO);
2844         if (t == tso) {
2845           *last = tso->link;
2846           goto done;
2847         }
2848       }
2849       barf("unblockThread (Exception): TSO not found");
2850     }
2851
2852   case BlockedOnRead:
2853   case BlockedOnWrite:
2854 #if defined(mingw32_TARGET_OS)
2855   case BlockedOnDoProc:
2856 #endif
2857     {
2858       StgTSO *prev = NULL;
2859       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2860            prev = t, t = t->link) {
2861         if (t == tso) {
2862           if (prev == NULL) {
2863             blocked_queue_hd = t->link;
2864             if (blocked_queue_tl == t) {
2865               blocked_queue_tl = END_TSO_QUEUE;
2866             }
2867           } else {
2868             prev->link = t->link;
2869             if (blocked_queue_tl == t) {
2870               blocked_queue_tl = prev;
2871             }
2872           }
2873           goto done;
2874         }
2875       }
2876       barf("unblockThread (I/O): TSO not found");
2877     }
2878
2879   case BlockedOnDelay:
2880     {
2881       StgTSO *prev = NULL;
2882       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2883            prev = t, t = t->link) {
2884         if (t == tso) {
2885           if (prev == NULL) {
2886             sleeping_queue = t->link;
2887           } else {
2888             prev->link = t->link;
2889           }
2890           goto done;
2891         }
2892       }
2893       barf("unblockThread (delay): TSO not found");
2894     }
2895
2896   default:
2897     barf("unblockThread");
2898   }
2899
2900  done:
2901   tso->link = END_TSO_QUEUE;
2902   tso->why_blocked = NotBlocked;
2903   tso->block_info.closure = NULL;
2904   PUSH_ON_RUN_QUEUE(tso);
2905 }
2906 #endif
2907
2908 /* -----------------------------------------------------------------------------
2909  * raiseAsync()
2910  *
2911  * The following function implements the magic for raising an
2912  * asynchronous exception in an existing thread.
2913  *
2914  * We first remove the thread from any queue on which it might be
2915  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2916  *
2917  * We strip the stack down to the innermost CATCH_FRAME, building
2918  * thunks in the heap for all the active computations, so they can 
2919  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2920  * an application of the handler to the exception, and push it on
2921  * the top of the stack.
2922  * 
2923  * How exactly do we save all the active computations?  We create an
2924  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2925  * AP_STACKs pushes everything from the corresponding update frame
2926  * upwards onto the stack.  (Actually, it pushes everything up to the
2927  * next update frame plus a pointer to the next AP_STACK object.
2928  * Entering the next AP_STACK object pushes more onto the stack until we
2929  * reach the last AP_STACK object - at which point the stack should look
2930  * exactly as it did when we killed the TSO and we can continue
2931  * execution by entering the closure on top of the stack.
2932  *
2933  * We can also kill a thread entirely - this happens if either (a) the 
2934  * exception passed to raiseAsync is NULL, or (b) there's no
2935  * CATCH_FRAME on the stack.  In either case, we strip the entire
2936  * stack and replace the thread with a zombie.
2937  *
2938  * Locks: sched_mutex held upon entry nor exit.
2939  *
2940  * -------------------------------------------------------------------------- */
2941  
2942 void 
2943 deleteThread(StgTSO *tso)
2944 {
2945   raiseAsync(tso,NULL);
2946 }
2947
2948 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2949 static void 
2950 deleteThreadImmediately(StgTSO *tso)
2951 { // for forkProcess only:
2952   // delete thread without giving it a chance to catch the KillThread exception
2953
2954   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2955       return;
2956   }
2957
2958   if (tso->why_blocked != BlockedOnCCall &&
2959       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2960     unblockThread(tso);
2961   }
2962
2963   tso->what_next = ThreadKilled;
2964 }
2965 #endif
2966
2967 void
2968 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2969 {
2970   /* When raising async exs from contexts where sched_mutex isn't held;
2971      use raiseAsyncWithLock(). */
2972   ACQUIRE_LOCK(&sched_mutex);
2973   raiseAsync(tso,exception);
2974   RELEASE_LOCK(&sched_mutex);
2975 }
2976
2977 void
2978 raiseAsync(StgTSO *tso, StgClosure *exception)
2979 {
2980     StgRetInfoTable *info;
2981     StgPtr sp;
2982   
2983     // Thread already dead?
2984     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2985         return;
2986     }
2987
2988     IF_DEBUG(scheduler, 
2989              sched_belch("raising exception in thread %ld.", tso->id));
2990     
2991     // Remove it from any blocking queues
2992     unblockThread(tso);
2993
2994     sp = tso->sp;
2995     
2996     // The stack freezing code assumes there's a closure pointer on
2997     // the top of the stack, so we have to arrange that this is the case...
2998     //
2999     if (sp[0] == (W_)&stg_enter_info) {
3000         sp++;
3001     } else {
3002         sp--;
3003         sp[0] = (W_)&stg_dummy_ret_closure;
3004     }
3005
3006     while (1) {
3007         nat i;
3008
3009         // 1. Let the top of the stack be the "current closure"
3010         //
3011         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3012         // CATCH_FRAME.
3013         //
3014         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3015         // current closure applied to the chunk of stack up to (but not
3016         // including) the update frame.  This closure becomes the "current
3017         // closure".  Go back to step 2.
3018         //
3019         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3020         // top of the stack applied to the exception.
3021         // 
3022         // 5. If it's a STOP_FRAME, then kill the thread.
3023         
3024         StgPtr frame;
3025         
3026         frame = sp + 1;
3027         info = get_ret_itbl((StgClosure *)frame);
3028         
3029         while (info->i.type != UPDATE_FRAME
3030                && (info->i.type != CATCH_FRAME || exception == NULL)
3031                && info->i.type != STOP_FRAME) {
3032             frame += stack_frame_sizeW((StgClosure *)frame);
3033             info = get_ret_itbl((StgClosure *)frame);
3034         }
3035         
3036         switch (info->i.type) {
3037             
3038         case CATCH_FRAME:
3039             // If we find a CATCH_FRAME, and we've got an exception to raise,
3040             // then build the THUNK raise(exception), and leave it on
3041             // top of the CATCH_FRAME ready to enter.
3042             //
3043         {
3044 #ifdef PROFILING
3045             StgCatchFrame *cf = (StgCatchFrame *)frame;
3046 #endif
3047             StgClosure *raise;
3048             
3049             // we've got an exception to raise, so let's pass it to the
3050             // handler in this frame.
3051             //
3052             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3053             TICK_ALLOC_SE_THK(1,0);
3054             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3055             raise->payload[0] = exception;
3056             
3057             // throw away the stack from Sp up to the CATCH_FRAME.
3058             //
3059             sp = frame - 1;
3060             
3061             /* Ensure that async excpetions are blocked now, so we don't get
3062              * a surprise exception before we get around to executing the
3063              * handler.
3064              */
3065             if (tso->blocked_exceptions == NULL) {
3066                 tso->blocked_exceptions = END_TSO_QUEUE;
3067             }
3068             
3069             /* Put the newly-built THUNK on top of the stack, ready to execute
3070              * when the thread restarts.
3071              */
3072             sp[0] = (W_)raise;
3073             sp[-1] = (W_)&stg_enter_info;
3074             tso->sp = sp-1;
3075             tso->what_next = ThreadRunGHC;
3076             IF_DEBUG(sanity, checkTSO(tso));
3077             return;
3078         }
3079         
3080         case UPDATE_FRAME:
3081         {
3082             StgAP_STACK * ap;
3083             nat words;
3084             
3085             // First build an AP_STACK consisting of the stack chunk above the
3086             // current update frame, with the top word on the stack as the
3087             // fun field.
3088             //
3089             words = frame - sp - 1;
3090             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3091             
3092             ap->size = words;
3093             ap->fun  = (StgClosure *)sp[0];
3094             sp++;
3095             for(i=0; i < (nat)words; ++i) {
3096                 ap->payload[i] = (StgClosure *)*sp++;
3097             }
3098             
3099             SET_HDR(ap,&stg_AP_STACK_info,
3100                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3101             TICK_ALLOC_UP_THK(words+1,0);
3102             
3103             IF_DEBUG(scheduler,
3104                      fprintf(stderr,  "sched: Updating ");
3105                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3106                      fprintf(stderr,  " with ");
3107                      printObj((StgClosure *)ap);
3108                 );
3109
3110             // Replace the updatee with an indirection - happily
3111             // this will also wake up any threads currently
3112             // waiting on the result.
3113             //
3114             // Warning: if we're in a loop, more than one update frame on
3115             // the stack may point to the same object.  Be careful not to
3116             // overwrite an IND_OLDGEN in this case, because we'll screw
3117             // up the mutable lists.  To be on the safe side, don't
3118             // overwrite any kind of indirection at all.  See also
3119             // threadSqueezeStack in GC.c, where we have to make a similar
3120             // check.
3121             //
3122             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3123                 // revert the black hole
3124                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3125             }
3126             sp += sizeofW(StgUpdateFrame) - 1;
3127             sp[0] = (W_)ap; // push onto stack
3128             break;
3129         }
3130         
3131         case STOP_FRAME:
3132             // We've stripped the entire stack, the thread is now dead.
3133             sp += sizeofW(StgStopFrame);
3134             tso->what_next = ThreadKilled;
3135             tso->sp = sp;
3136             return;
3137             
3138         default:
3139             barf("raiseAsync");
3140         }
3141     }
3142     barf("raiseAsync");
3143 }
3144
3145 /* -----------------------------------------------------------------------------
3146    resurrectThreads is called after garbage collection on the list of
3147    threads found to be garbage.  Each of these threads will be woken
3148    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3149    on an MVar, or NonTermination if the thread was blocked on a Black
3150    Hole.
3151
3152    Locks: sched_mutex isn't held upon entry nor exit.
3153    -------------------------------------------------------------------------- */
3154
3155 void
3156 resurrectThreads( StgTSO *threads )
3157 {
3158   StgTSO *tso, *next;
3159
3160   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3161     next = tso->global_link;
3162     tso->global_link = all_threads;
3163     all_threads = tso;
3164     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3165
3166     switch (tso->why_blocked) {
3167     case BlockedOnMVar:
3168     case BlockedOnException:
3169       /* Called by GC - sched_mutex lock is currently held. */
3170       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3171       break;
3172     case BlockedOnBlackHole:
3173       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3174       break;
3175     case NotBlocked:
3176       /* This might happen if the thread was blocked on a black hole
3177        * belonging to a thread that we've just woken up (raiseAsync
3178        * can wake up threads, remember...).
3179        */
3180       continue;
3181     default:
3182       barf("resurrectThreads: thread blocked in a strange way");
3183     }
3184   }
3185 }
3186
3187 /* -----------------------------------------------------------------------------
3188  * Blackhole detection: if we reach a deadlock, test whether any
3189  * threads are blocked on themselves.  Any threads which are found to
3190  * be self-blocked get sent a NonTermination exception.
3191  *
3192  * This is only done in a deadlock situation in order to avoid
3193  * performance overhead in the normal case.
3194  *
3195  * Locks: sched_mutex is held upon entry and exit.
3196  * -------------------------------------------------------------------------- */
3197
3198 static void
3199 detectBlackHoles( void )
3200 {
3201     StgTSO *tso = all_threads;
3202     StgClosure *frame;
3203     StgClosure *blocked_on;
3204     StgRetInfoTable *info;
3205
3206     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3207
3208         while (tso->what_next == ThreadRelocated) {
3209             tso = tso->link;
3210             ASSERT(get_itbl(tso)->type == TSO);
3211         }
3212       
3213         if (tso->why_blocked != BlockedOnBlackHole) {
3214             continue;
3215         }
3216         blocked_on = tso->block_info.closure;
3217
3218         frame = (StgClosure *)tso->sp;
3219
3220         while(1) {
3221             info = get_ret_itbl(frame);
3222             switch (info->i.type) {
3223             case UPDATE_FRAME:
3224                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3225                     /* We are blocking on one of our own computations, so
3226                      * send this thread the NonTermination exception.  
3227                      */
3228                     IF_DEBUG(scheduler, 
3229                              sched_belch("thread %d is blocked on itself", tso->id));
3230                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3231                     goto done;
3232                 }
3233                 
3234                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3235                 continue;
3236
3237             case STOP_FRAME:
3238                 goto done;
3239
3240                 // normal stack frames; do nothing except advance the pointer
3241             default:
3242                 (StgPtr)frame += stack_frame_sizeW(frame);
3243             }
3244         }   
3245         done: ;
3246     }
3247 }
3248
3249 /* ----------------------------------------------------------------------------
3250  * Debugging: why is a thread blocked
3251  * [Also provides useful information when debugging threaded programs
3252  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3253    ------------------------------------------------------------------------- */
3254
3255 static
3256 void
3257 printThreadBlockage(StgTSO *tso)
3258 {
3259   switch (tso->why_blocked) {
3260   case BlockedOnRead:
3261     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3262     break;
3263   case BlockedOnWrite:
3264     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3265     break;
3266 #if defined(mingw32_TARGET_OS)
3267     case BlockedOnDoProc:
3268     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3269     break;
3270 #endif
3271   case BlockedOnDelay:
3272     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3273     break;
3274   case BlockedOnMVar:
3275     fprintf(stderr,"is blocked on an MVar");
3276     break;
3277   case BlockedOnException:
3278     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3279             tso->block_info.tso->id);
3280     break;
3281   case BlockedOnBlackHole:
3282     fprintf(stderr,"is blocked on a black hole");
3283     break;
3284   case NotBlocked:
3285     fprintf(stderr,"is not blocked");
3286     break;
3287 #if defined(PAR)
3288   case BlockedOnGA:
3289     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3290             tso->block_info.closure, info_type(tso->block_info.closure));
3291     break;
3292   case BlockedOnGA_NoSend:
3293     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3294             tso->block_info.closure, info_type(tso->block_info.closure));
3295     break;
3296 #endif
3297   case BlockedOnCCall:
3298     fprintf(stderr,"is blocked on an external call");
3299     break;
3300   case BlockedOnCCall_NoUnblockExc:
3301     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3302     break;
3303   default:
3304     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3305          tso->why_blocked, tso->id, tso);
3306   }
3307 }
3308
3309 static
3310 void
3311 printThreadStatus(StgTSO *tso)
3312 {
3313   switch (tso->what_next) {
3314   case ThreadKilled:
3315     fprintf(stderr,"has been killed");
3316     break;
3317   case ThreadComplete:
3318     fprintf(stderr,"has completed");
3319     break;
3320   default:
3321     printThreadBlockage(tso);
3322   }
3323 }
3324
3325 void
3326 printAllThreads(void)
3327 {
3328   StgTSO *t;
3329   void *label;
3330
3331 # if defined(GRAN)
3332   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3333   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3334                        time_string, rtsFalse/*no commas!*/);
3335
3336   fprintf(stderr, "all threads at [%s]:\n", time_string);
3337 # elif defined(PAR)
3338   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3339   ullong_format_string(CURRENT_TIME,
3340                        time_string, rtsFalse/*no commas!*/);
3341
3342   fprintf(stderr,"all threads at [%s]:\n", time_string);
3343 # else
3344   fprintf(stderr,"all threads:\n");
3345 # endif
3346
3347   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3348     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3349     label = lookupThreadLabel(t->id);
3350     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3351     printThreadStatus(t);
3352     fprintf(stderr,"\n");
3353   }
3354 }
3355     
3356 #ifdef DEBUG
3357
3358 /* 
3359    Print a whole blocking queue attached to node (debugging only).
3360 */
3361 # if defined(PAR)
3362 void 
3363 print_bq (StgClosure *node)
3364 {
3365   StgBlockingQueueElement *bqe;
3366   StgTSO *tso;
3367   rtsBool end;
3368
3369   fprintf(stderr,"## BQ of closure %p (%s): ",
3370           node, info_type(node));
3371
3372   /* should cover all closures that may have a blocking queue */
3373   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3374          get_itbl(node)->type == FETCH_ME_BQ ||
3375          get_itbl(node)->type == RBH ||
3376          get_itbl(node)->type == MVAR);
3377     
3378   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3379
3380   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3381 }
3382
3383 /* 
3384    Print a whole blocking queue starting with the element bqe.
3385 */
3386 void 
3387 print_bqe (StgBlockingQueueElement *bqe)
3388 {
3389   rtsBool end;
3390
3391   /* 
3392      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3393   */
3394   for (end = (bqe==END_BQ_QUEUE);
3395        !end; // iterate until bqe points to a CONSTR
3396        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3397        bqe = end ? END_BQ_QUEUE : bqe->link) {
3398     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3399     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3400     /* types of closures that may appear in a blocking queue */
3401     ASSERT(get_itbl(bqe)->type == TSO ||           
3402            get_itbl(bqe)->type == BLOCKED_FETCH || 
3403            get_itbl(bqe)->type == CONSTR); 
3404     /* only BQs of an RBH end with an RBH_Save closure */
3405     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3406
3407     switch (get_itbl(bqe)->type) {
3408     case TSO:
3409       fprintf(stderr," TSO %u (%x),",
3410               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3411       break;
3412     case BLOCKED_FETCH:
3413       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3414               ((StgBlockedFetch *)bqe)->node, 
3415               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3416               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3417               ((StgBlockedFetch *)bqe)->ga.weight);
3418       break;
3419     case CONSTR:
3420       fprintf(stderr," %s (IP %p),",
3421               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3422                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3423                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3424                "RBH_Save_?"), get_itbl(bqe));
3425       break;
3426     default:
3427       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3428            info_type((StgClosure *)bqe)); // , node, info_type(node));
3429       break;
3430     }
3431   } /* for */
3432   fputc('\n', stderr);
3433 }
3434 # elif defined(GRAN)
3435 void 
3436 print_bq (StgClosure *node)
3437 {
3438   StgBlockingQueueElement *bqe;
3439   PEs node_loc, tso_loc;
3440   rtsBool end;
3441
3442   /* should cover all closures that may have a blocking queue */
3443   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3444          get_itbl(node)->type == FETCH_ME_BQ ||
3445          get_itbl(node)->type == RBH);
3446     
3447   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3448   node_loc = where_is(node);
3449
3450   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3451           node, info_type(node), node_loc);
3452
3453   /* 
3454      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3455   */
3456   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3457        !end; // iterate until bqe points to a CONSTR
3458        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3459     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3460     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3461     /* types of closures that may appear in a blocking queue */
3462     ASSERT(get_itbl(bqe)->type == TSO ||           
3463            get_itbl(bqe)->type == CONSTR); 
3464     /* only BQs of an RBH end with an RBH_Save closure */
3465     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3466
3467     tso_loc = where_is((StgClosure *)bqe);
3468     switch (get_itbl(bqe)->type) {
3469     case TSO:
3470       fprintf(stderr," TSO %d (%p) on [PE %d],",
3471               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3472       break;
3473     case CONSTR:
3474       fprintf(stderr," %s (IP %p),",
3475               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3476                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3477                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3478                "RBH_Save_?"), get_itbl(bqe));
3479       break;
3480     default:
3481       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3482            info_type((StgClosure *)bqe), node, info_type(node));
3483       break;
3484     }
3485   } /* for */
3486   fputc('\n', stderr);
3487 }
3488 #else
3489 /* 
3490    Nice and easy: only TSOs on the blocking queue
3491 */
3492 void 
3493 print_bq (StgClosure *node)
3494 {
3495   StgTSO *tso;
3496
3497   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3498   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3499        tso != END_TSO_QUEUE; 
3500        tso=tso->link) {
3501     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3502     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3503     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3504   }
3505   fputc('\n', stderr);
3506 }
3507 # endif
3508
3509 #if defined(PAR)
3510 static nat
3511 run_queue_len(void)
3512 {
3513   nat i;
3514   StgTSO *tso;
3515
3516   for (i=0, tso=run_queue_hd; 
3517        tso != END_TSO_QUEUE;
3518        i++, tso=tso->link)
3519     /* nothing */
3520
3521   return i;
3522 }
3523 #endif
3524
3525 void
3526 sched_belch(char *s, ...)
3527 {
3528   va_list ap;
3529   va_start(ap,s);
3530 #ifdef RTS_SUPPORTS_THREADS
3531   fprintf(stderr, "sched (task %p): ", osThreadId());
3532 #elif defined(PAR)
3533   fprintf(stderr, "== ");
3534 #else
3535   fprintf(stderr, "sched: ");
3536 #endif
3537   vfprintf(stderr, s, ap);
3538   fprintf(stderr, "\n");
3539   fflush(stderr);
3540   va_end(ap);
3541 }
3542
3543 #endif /* DEBUG */