[project @ 2004-03-01 14:18:35 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.193 2004/03/01 14:18:35 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2003
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 /* 
21  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22
23    The main scheduling loop in GUM iterates until a finish message is received.
24    In that case a global flag @receivedFinish@ is set and this instance of
25    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
26    for the handling of incoming messages, such as PP_FINISH.
27    Note that in the parallel case we have a system manager that coordinates
28    different PEs, each of which are running one instance of the RTS.
29    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
30    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31
32  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33
34    The main scheduling code in GranSim is quite different from that in std
35    (concurrent) Haskell: while concurrent Haskell just iterates over the
36    threads in the runnable queue, GranSim is event driven, i.e. it iterates
37    over the events in the global event queue.  -- HWL
38 */
39
40 #include "PosixSource.h"
41 #include "Rts.h"
42 #include "SchedAPI.h"
43 #include "RtsUtils.h"
44 #include "RtsFlags.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "StgStartup.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Storage.h"
53 #include "Interpreter.h"
54 #include "Exception.h"
55 #include "Printer.h"
56 #include "Signals.h"
57 #include "Sanity.h"
58 #include "Stats.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #ifdef PROFILING
63 #include "Proftimer.h"
64 #include "ProfHeap.h"
65 #endif
66 #if defined(GRAN) || defined(PAR)
67 # include "GranSimRts.h"
68 # include "GranSim.h"
69 # include "ParallelRts.h"
70 # include "Parallel.h"
71 # include "ParallelDebug.h"
72 # include "FetchMe.h"
73 # include "HLC.h"
74 #endif
75 #include "Sparks.h"
76 #include "Capability.h"
77 #include "OSThreads.h"
78 #include  "Task.h"
79
80 #ifdef HAVE_SYS_TYPES_H
81 #include <sys/types.h>
82 #endif
83 #ifdef HAVE_UNISTD_H
84 #include <unistd.h>
85 #endif
86
87 #include <string.h>
88 #include <stdlib.h>
89 #include <stdarg.h>
90
91 #ifdef HAVE_ERRNO_H
92 #include <errno.h>
93 #endif
94
95 #ifdef THREADED_RTS
96 #define USED_IN_THREADED_RTS
97 #else
98 #define USED_IN_THREADED_RTS STG_UNUSED
99 #endif
100
101 #ifdef RTS_SUPPORTS_THREADS
102 #define USED_WHEN_RTS_SUPPORTS_THREADS
103 #else
104 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
105 #endif
106
107 /* Main thread queue.
108  * Locks required: sched_mutex.
109  */
110 StgMainThread *main_threads = NULL;
111
112 /* Thread queues.
113  * Locks required: sched_mutex.
114  */
115 #if defined(GRAN)
116
117 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
118 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
119
120 /* 
121    In GranSim we have a runnable and a blocked queue for each processor.
122    In order to minimise code changes new arrays run_queue_hds/tls
123    are created. run_queue_hd is then a short cut (macro) for
124    run_queue_hds[CurrentProc] (see GranSim.h).
125    -- HWL
126 */
127 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
128 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
129 StgTSO *ccalling_threadss[MAX_PROC];
130 /* We use the same global list of threads (all_threads) in GranSim as in
131    the std RTS (i.e. we are cheating). However, we don't use this list in
132    the GranSim specific code at the moment (so we are only potentially
133    cheating).  */
134
135 #else /* !GRAN */
136
137 StgTSO *run_queue_hd = NULL;
138 StgTSO *run_queue_tl = NULL;
139 StgTSO *blocked_queue_hd = NULL;
140 StgTSO *blocked_queue_tl = NULL;
141 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
142
143 #endif
144
145 /* Linked list of all threads.
146  * Used for detecting garbage collected threads.
147  */
148 StgTSO *all_threads = NULL;
149
150 /* When a thread performs a safe C call (_ccall_GC, using old
151  * terminology), it gets put on the suspended_ccalling_threads
152  * list. Used by the garbage collector.
153  */
154 static StgTSO *suspended_ccalling_threads;
155
156 static StgTSO *threadStackOverflow(StgTSO *tso);
157
158 /* KH: The following two flags are shared memory locations.  There is no need
159        to lock them, since they are only unset at the end of a scheduler
160        operation.
161 */
162
163 /* flag set by signal handler to precipitate a context switch */
164 nat context_switch = 0;
165
166 /* if this flag is set as well, give up execution */
167 rtsBool interrupted = rtsFalse;
168
169 /* Next thread ID to allocate.
170  * Locks required: thread_id_mutex
171  */
172 static StgThreadID next_thread_id = 1;
173
174 /*
175  * Pointers to the state of the current thread.
176  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
177  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
178  */
179  
180 /* The smallest stack size that makes any sense is:
181  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
182  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
183  *  + 1                       (the closure to enter)
184  *  + 1                       (stg_ap_v_ret)
185  *  + 1                       (spare slot req'd by stg_ap_v_ret)
186  *
187  * A thread with this stack will bomb immediately with a stack
188  * overflow, which will increase its stack size.  
189  */
190
191 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
192
193
194 #if defined(GRAN)
195 StgTSO *CurrentTSO;
196 #endif
197
198 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
199  *  exists - earlier gccs apparently didn't.
200  *  -= chak
201  */
202 StgTSO dummy_tso;
203
204 static rtsBool ready_to_gc;
205
206 /*
207  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
208  * in an MT setting, needed to signal that a worker thread shouldn't hang around
209  * in the scheduler when it is out of work.
210  */
211 static rtsBool shutting_down_scheduler = rtsFalse;
212
213 void            addToBlockedQueue ( StgTSO *tso );
214
215 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
216        void     interruptStgRts   ( void );
217
218 static void     detectBlackHoles  ( void );
219
220 #if defined(RTS_SUPPORTS_THREADS)
221 /* ToDo: carefully document the invariants that go together
222  *       with these synchronisation objects.
223  */
224 Mutex     sched_mutex       = INIT_MUTEX_VAR;
225 Mutex     term_mutex        = INIT_MUTEX_VAR;
226
227 #endif /* RTS_SUPPORTS_THREADS */
228
229 #if defined(PAR)
230 StgTSO *LastTSO;
231 rtsTime TimeOfLastYield;
232 rtsBool emitSchedule = rtsTrue;
233 #endif
234
235 #if DEBUG
236 static char *whatNext_strs[] = {
237   "ThreadRunGHC",
238   "ThreadInterpret",
239   "ThreadKilled",
240   "ThreadRelocated",
241   "ThreadComplete"
242 };
243 #endif
244
245 #if defined(PAR)
246 StgTSO * createSparkThread(rtsSpark spark);
247 StgTSO * activateSpark (rtsSpark spark);  
248 #endif
249
250 /* ----------------------------------------------------------------------------
251  * Starting Tasks
252  * ------------------------------------------------------------------------- */
253
254 #if defined(RTS_SUPPORTS_THREADS)
255 static rtsBool startingWorkerThread = rtsFalse;
256
257 static void taskStart(void);
258 static void
259 taskStart(void)
260 {
261   ACQUIRE_LOCK(&sched_mutex);
262   startingWorkerThread = rtsFalse;
263   schedule(NULL,NULL);
264   RELEASE_LOCK(&sched_mutex);
265 }
266
267 void
268 startSchedulerTaskIfNecessary(void)
269 {
270   if(run_queue_hd != END_TSO_QUEUE
271     || blocked_queue_hd != END_TSO_QUEUE
272     || sleeping_queue != END_TSO_QUEUE)
273   {
274     if(!startingWorkerThread)
275     { // we don't want to start another worker thread
276       // just because the last one hasn't yet reached the
277       // "waiting for capability" state
278       startingWorkerThread = rtsTrue;
279       startTask(taskStart);
280     }
281   }
282 }
283 #endif
284
285 /* ---------------------------------------------------------------------------
286    Main scheduling loop.
287
288    We use round-robin scheduling, each thread returning to the
289    scheduler loop when one of these conditions is detected:
290
291       * out of heap space
292       * timer expires (thread yields)
293       * thread blocks
294       * thread ends
295       * stack overflow
296
297    Locking notes:  we acquire the scheduler lock once at the beginning
298    of the scheduler loop, and release it when
299     
300       * running a thread, or
301       * waiting for work, or
302       * waiting for a GC to complete.
303
304    GRAN version:
305      In a GranSim setup this loop iterates over the global event queue.
306      This revolves around the global event queue, which determines what 
307      to do next. Therefore, it's more complicated than either the 
308      concurrent or the parallel (GUM) setup.
309
310    GUM version:
311      GUM iterates over incoming messages.
312      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
313      and sends out a fish whenever it has nothing to do; in-between
314      doing the actual reductions (shared code below) it processes the
315      incoming messages and deals with delayed operations 
316      (see PendingFetches).
317      This is not the ugliest code you could imagine, but it's bloody close.
318
319    ------------------------------------------------------------------------ */
320 static void
321 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
322           Capability *initialCapability )
323 {
324   StgTSO *t;
325   Capability *cap;
326   StgThreadReturnCode ret;
327 #if defined(GRAN)
328   rtsEvent *event;
329 #elif defined(PAR)
330   StgSparkPool *pool;
331   rtsSpark spark;
332   StgTSO *tso;
333   GlobalTaskId pe;
334   rtsBool receivedFinish = rtsFalse;
335 # if defined(DEBUG)
336   nat tp_size, sp_size; // stats only
337 # endif
338 #endif
339   rtsBool was_interrupted = rtsFalse;
340   StgTSOWhatNext prev_what_next;
341   
342   // Pre-condition: sched_mutex is held.
343   // We might have a capability, passed in as initialCapability.
344   cap = initialCapability;
345
346 #if defined(RTS_SUPPORTS_THREADS)
347   //
348   // in the threaded case, the capability is either passed in via the
349   // initialCapability parameter, or initialized inside the scheduler
350   // loop 
351   //
352   IF_DEBUG(scheduler,
353            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
354                        mainThread, initialCapability);
355       );
356 #else
357   // simply initialise it in the non-threaded case
358   grabCapability(&cap);
359 #endif
360
361 #if defined(GRAN)
362   /* set up first event to get things going */
363   /* ToDo: assign costs for system setup and init MainTSO ! */
364   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
365             ContinueThread, 
366             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
367
368   IF_DEBUG(gran,
369            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
370            G_TSO(CurrentTSO, 5));
371
372   if (RtsFlags.GranFlags.Light) {
373     /* Save current time; GranSim Light only */
374     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
375   }      
376
377   event = get_next_event();
378
379   while (event!=(rtsEvent*)NULL) {
380     /* Choose the processor with the next event */
381     CurrentProc = event->proc;
382     CurrentTSO = event->tso;
383
384 #elif defined(PAR)
385
386   while (!receivedFinish) {    /* set by processMessages */
387                                /* when receiving PP_FINISH message         */ 
388
389 #else // everything except GRAN and PAR
390
391   while (1) {
392
393 #endif
394
395      IF_DEBUG(scheduler, printAllThreads());
396
397 #if defined(RTS_SUPPORTS_THREADS)
398       // Yield the capability to higher-priority tasks if necessary.
399       //
400       if (cap != NULL) {
401           yieldCapability(&cap);
402       }
403
404       // If we do not currently hold a capability, we wait for one
405       //
406       if (cap == NULL) {
407           waitForCapability(&sched_mutex, &cap,
408                             mainThread ? &mainThread->bound_thread_cond : NULL);
409       }
410
411       // We now have a capability...
412 #endif
413
414     //
415     // If we're interrupted (the user pressed ^C, or some other
416     // termination condition occurred), kill all the currently running
417     // threads.
418     //
419     if (interrupted) {
420         IF_DEBUG(scheduler, sched_belch("interrupted"));
421         interrupted = rtsFalse;
422         was_interrupted = rtsTrue;
423 #if defined(RTS_SUPPORTS_THREADS)
424         // In the threaded RTS, deadlock detection doesn't work,
425         // so just exit right away.
426         prog_belch("interrupted");
427         releaseCapability(cap);
428         RELEASE_LOCK(&sched_mutex);
429         shutdownHaskellAndExit(EXIT_SUCCESS);
430 #else
431         deleteAllThreads();
432 #endif
433     }
434
435 #if defined(RTS_USER_SIGNALS)
436     // check for signals each time around the scheduler
437     if (signals_pending()) {
438       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
439       startSignalHandlers();
440       ACQUIRE_LOCK(&sched_mutex);
441     }
442 #endif
443
444     //
445     // Check whether any waiting threads need to be woken up.  If the
446     // run queue is empty, and there are no other tasks running, we
447     // can wait indefinitely for something to happen.
448     //
449     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
450 #if defined(RTS_SUPPORTS_THREADS)
451                 || EMPTY_RUN_QUEUE()
452 #endif
453         )
454     {
455       awaitEvent( EMPTY_RUN_QUEUE() );
456     }
457     // we can be interrupted while waiting for I/O...
458     if (interrupted) continue;
459
460     /* 
461      * Detect deadlock: when we have no threads to run, there are no
462      * threads waiting on I/O or sleeping, and all the other tasks are
463      * waiting for work, we must have a deadlock of some description.
464      *
465      * We first try to find threads blocked on themselves (ie. black
466      * holes), and generate NonTermination exceptions where necessary.
467      *
468      * If no threads are black holed, we have a deadlock situation, so
469      * inform all the main threads.
470      */
471 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
472     if (   EMPTY_THREAD_QUEUES() )
473     {
474         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
475         // Garbage collection can release some new threads due to
476         // either (a) finalizers or (b) threads resurrected because
477         // they are about to be send BlockedOnDeadMVar.  Any threads
478         // thus released will be immediately runnable.
479         GarbageCollect(GetRoots,rtsTrue);
480
481         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
482
483         IF_DEBUG(scheduler, 
484                  sched_belch("still deadlocked, checking for black holes..."));
485         detectBlackHoles();
486
487         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
488
489 #if defined(RTS_USER_SIGNALS)
490         /* If we have user-installed signal handlers, then wait
491          * for signals to arrive rather then bombing out with a
492          * deadlock.
493          */
494         if ( anyUserHandlers() ) {
495             IF_DEBUG(scheduler, 
496                      sched_belch("still deadlocked, waiting for signals..."));
497
498             awaitUserSignals();
499
500             // we might be interrupted...
501             if (interrupted) { continue; }
502
503             if (signals_pending()) {
504                 RELEASE_LOCK(&sched_mutex);
505                 startSignalHandlers();
506                 ACQUIRE_LOCK(&sched_mutex);
507             }
508             ASSERT(!EMPTY_RUN_QUEUE());
509             goto not_deadlocked;
510         }
511 #endif
512
513         /* Probably a real deadlock.  Send the current main thread the
514          * Deadlock exception (or in the SMP build, send *all* main
515          * threads the deadlock exception, since none of them can make
516          * progress).
517          */
518         {
519             StgMainThread *m;
520             m = main_threads;
521             switch (m->tso->why_blocked) {
522             case BlockedOnBlackHole:
523             case BlockedOnException:
524             case BlockedOnMVar:
525                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
526                 break;
527             default:
528                 barf("deadlock: main thread blocked in a strange way");
529             }
530         }
531     }
532   not_deadlocked:
533
534 #elif defined(RTS_SUPPORTS_THREADS)
535     // ToDo: add deadlock detection in threaded RTS
536 #elif defined(PAR)
537     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
538 #endif
539
540 #if defined(RTS_SUPPORTS_THREADS)
541     if ( EMPTY_RUN_QUEUE() ) {
542         continue; // nothing to do
543     }
544 #endif
545
546 #if defined(GRAN)
547     if (RtsFlags.GranFlags.Light)
548       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
549
550     /* adjust time based on time-stamp */
551     if (event->time > CurrentTime[CurrentProc] &&
552         event->evttype != ContinueThread)
553       CurrentTime[CurrentProc] = event->time;
554     
555     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
556     if (!RtsFlags.GranFlags.Light)
557       handleIdlePEs();
558
559     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
560
561     /* main event dispatcher in GranSim */
562     switch (event->evttype) {
563       /* Should just be continuing execution */
564     case ContinueThread:
565       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
566       /* ToDo: check assertion
567       ASSERT(run_queue_hd != (StgTSO*)NULL &&
568              run_queue_hd != END_TSO_QUEUE);
569       */
570       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
571       if (!RtsFlags.GranFlags.DoAsyncFetch &&
572           procStatus[CurrentProc]==Fetching) {
573         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
574               CurrentTSO->id, CurrentTSO, CurrentProc);
575         goto next_thread;
576       } 
577       /* Ignore ContinueThreads for completed threads */
578       if (CurrentTSO->what_next == ThreadComplete) {
579         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
580               CurrentTSO->id, CurrentTSO, CurrentProc);
581         goto next_thread;
582       } 
583       /* Ignore ContinueThreads for threads that are being migrated */
584       if (PROCS(CurrentTSO)==Nowhere) { 
585         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
586               CurrentTSO->id, CurrentTSO, CurrentProc);
587         goto next_thread;
588       }
589       /* The thread should be at the beginning of the run queue */
590       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
591         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
592               CurrentTSO->id, CurrentTSO, CurrentProc);
593         break; // run the thread anyway
594       }
595       /*
596       new_event(proc, proc, CurrentTime[proc],
597                 FindWork,
598                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
599       goto next_thread; 
600       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
601       break; // now actually run the thread; DaH Qu'vam yImuHbej 
602
603     case FetchNode:
604       do_the_fetchnode(event);
605       goto next_thread;             /* handle next event in event queue  */
606       
607     case GlobalBlock:
608       do_the_globalblock(event);
609       goto next_thread;             /* handle next event in event queue  */
610       
611     case FetchReply:
612       do_the_fetchreply(event);
613       goto next_thread;             /* handle next event in event queue  */
614       
615     case UnblockThread:   /* Move from the blocked queue to the tail of */
616       do_the_unblock(event);
617       goto next_thread;             /* handle next event in event queue  */
618       
619     case ResumeThread:  /* Move from the blocked queue to the tail of */
620       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
621       event->tso->gran.blocktime += 
622         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
623       do_the_startthread(event);
624       goto next_thread;             /* handle next event in event queue  */
625       
626     case StartThread:
627       do_the_startthread(event);
628       goto next_thread;             /* handle next event in event queue  */
629       
630     case MoveThread:
631       do_the_movethread(event);
632       goto next_thread;             /* handle next event in event queue  */
633       
634     case MoveSpark:
635       do_the_movespark(event);
636       goto next_thread;             /* handle next event in event queue  */
637       
638     case FindWork:
639       do_the_findwork(event);
640       goto next_thread;             /* handle next event in event queue  */
641       
642     default:
643       barf("Illegal event type %u\n", event->evttype);
644     }  /* switch */
645     
646     /* This point was scheduler_loop in the old RTS */
647
648     IF_DEBUG(gran, belch("GRAN: after main switch"));
649
650     TimeOfLastEvent = CurrentTime[CurrentProc];
651     TimeOfNextEvent = get_time_of_next_event();
652     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
653     // CurrentTSO = ThreadQueueHd;
654
655     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
656                          TimeOfNextEvent));
657
658     if (RtsFlags.GranFlags.Light) 
659       GranSimLight_leave_system(event, &ActiveTSO); 
660
661     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
662
663     IF_DEBUG(gran, 
664              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
665
666     /* in a GranSim setup the TSO stays on the run queue */
667     t = CurrentTSO;
668     /* Take a thread from the run queue. */
669     POP_RUN_QUEUE(t); // take_off_run_queue(t);
670
671     IF_DEBUG(gran, 
672              fprintf(stderr, "GRAN: About to run current thread, which is\n");
673              G_TSO(t,5));
674
675     context_switch = 0; // turned on via GranYield, checking events and time slice
676
677     IF_DEBUG(gran, 
678              DumpGranEvent(GR_SCHEDULE, t));
679
680     procStatus[CurrentProc] = Busy;
681
682 #elif defined(PAR)
683     if (PendingFetches != END_BF_QUEUE) {
684         processFetches();
685     }
686
687     /* ToDo: phps merge with spark activation above */
688     /* check whether we have local work and send requests if we have none */
689     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
690       /* :-[  no local threads => look out for local sparks */
691       /* the spark pool for the current PE */
692       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
693       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
694           pool->hd < pool->tl) {
695         /* 
696          * ToDo: add GC code check that we really have enough heap afterwards!!
697          * Old comment:
698          * If we're here (no runnable threads) and we have pending
699          * sparks, we must have a space problem.  Get enough space
700          * to turn one of those pending sparks into a
701          * thread... 
702          */
703
704         spark = findSpark(rtsFalse);                /* get a spark */
705         if (spark != (rtsSpark) NULL) {
706           tso = activateSpark(spark);       /* turn the spark into a thread */
707           IF_PAR_DEBUG(schedule,
708                        belch("==== schedule: Created TSO %d (%p); %d threads active",
709                              tso->id, tso, advisory_thread_count));
710
711           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
712             belch("==^^ failed to activate spark");
713             goto next_thread;
714           }               /* otherwise fall through & pick-up new tso */
715         } else {
716           IF_PAR_DEBUG(verbose,
717                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
718                              spark_queue_len(pool)));
719           goto next_thread;
720         }
721       }
722
723       /* If we still have no work we need to send a FISH to get a spark
724          from another PE 
725       */
726       if (EMPTY_RUN_QUEUE()) {
727       /* =8-[  no local sparks => look for work on other PEs */
728         /*
729          * We really have absolutely no work.  Send out a fish
730          * (there may be some out there already), and wait for
731          * something to arrive.  We clearly can't run any threads
732          * until a SCHEDULE or RESUME arrives, and so that's what
733          * we're hoping to see.  (Of course, we still have to
734          * respond to other types of messages.)
735          */
736         TIME now = msTime() /*CURRENT_TIME*/;
737         IF_PAR_DEBUG(verbose, 
738                      belch("--  now=%ld", now));
739         IF_PAR_DEBUG(verbose,
740                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
741                          (last_fish_arrived_at!=0 &&
742                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
743                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
744                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
745                              last_fish_arrived_at,
746                              RtsFlags.ParFlags.fishDelay, now);
747                      });
748         
749         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
750             (last_fish_arrived_at==0 ||
751              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
752           /* outstandingFishes is set in sendFish, processFish;
753              avoid flooding system with fishes via delay */
754           pe = choosePE();
755           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
756                    NEW_FISH_HUNGER);
757
758           // Global statistics: count no. of fishes
759           if (RtsFlags.ParFlags.ParStats.Global &&
760               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
761             globalParStats.tot_fish_mess++;
762           }
763         }
764       
765         receivedFinish = processMessages();
766         goto next_thread;
767       }
768     } else if (PacketsWaiting()) {  /* Look for incoming messages */
769       receivedFinish = processMessages();
770     }
771
772     /* Now we are sure that we have some work available */
773     ASSERT(run_queue_hd != END_TSO_QUEUE);
774
775     /* Take a thread from the run queue, if we have work */
776     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
777     IF_DEBUG(sanity,checkTSO(t));
778
779     /* ToDo: write something to the log-file
780     if (RTSflags.ParFlags.granSimStats && !sameThread)
781         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
782
783     CurrentTSO = t;
784     */
785     /* the spark pool for the current PE */
786     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
787
788     IF_DEBUG(scheduler, 
789              belch("--=^ %d threads, %d sparks on [%#x]", 
790                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
791
792 # if 1
793     if (0 && RtsFlags.ParFlags.ParStats.Full && 
794         t && LastTSO && t->id != LastTSO->id && 
795         LastTSO->why_blocked == NotBlocked && 
796         LastTSO->what_next != ThreadComplete) {
797       // if previously scheduled TSO not blocked we have to record the context switch
798       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
799                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
800     }
801
802     if (RtsFlags.ParFlags.ParStats.Full && 
803         (emitSchedule /* forced emit */ ||
804         (t && LastTSO && t->id != LastTSO->id))) {
805       /* 
806          we are running a different TSO, so write a schedule event to log file
807          NB: If we use fair scheduling we also have to write  a deschedule 
808              event for LastTSO; with unfair scheduling we know that the
809              previous tso has blocked whenever we switch to another tso, so
810              we don't need it in GUM for now
811       */
812       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
813                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
814       emitSchedule = rtsFalse;
815     }
816      
817 # endif
818 #else /* !GRAN && !PAR */
819   
820     // grab a thread from the run queue
821     ASSERT(run_queue_hd != END_TSO_QUEUE);
822     POP_RUN_QUEUE(t);
823
824     // Sanity check the thread we're about to run.  This can be
825     // expensive if there is lots of thread switching going on...
826     IF_DEBUG(sanity,checkTSO(t));
827 #endif
828
829 #ifdef THREADED_RTS
830     {
831       StgMainThread *m = t->main;
832       
833       if(m)
834       {
835         if(m == mainThread)
836         {
837           IF_DEBUG(scheduler,
838             sched_belch("### Running thread %d in bound thread", t->id));
839           // yes, the Haskell thread is bound to the current native thread
840         }
841         else
842         {
843           IF_DEBUG(scheduler,
844             sched_belch("### thread %d bound to another OS thread", t->id));
845           // no, bound to a different Haskell thread: pass to that thread
846           PUSH_ON_RUN_QUEUE(t);
847           passCapability(&m->bound_thread_cond);
848           continue;
849         }
850       }
851       else
852       {
853         if(mainThread != NULL)
854         // The thread we want to run is bound.
855         {
856           IF_DEBUG(scheduler,
857             sched_belch("### this OS thread cannot run thread %d", t->id));
858           // no, the current native thread is bound to a different
859           // Haskell thread, so pass it to any worker thread
860           PUSH_ON_RUN_QUEUE(t);
861           passCapabilityToWorker();
862           continue; 
863         }
864       }
865     }
866 #endif
867
868     cap->r.rCurrentTSO = t;
869     
870     /* context switches are now initiated by the timer signal, unless
871      * the user specified "context switch as often as possible", with
872      * +RTS -C0
873      */
874     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
875          && (run_queue_hd != END_TSO_QUEUE
876              || blocked_queue_hd != END_TSO_QUEUE
877              || sleeping_queue != END_TSO_QUEUE)))
878         context_switch = 1;
879     else
880         context_switch = 0;
881
882 run_thread:
883
884     RELEASE_LOCK(&sched_mutex);
885
886     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
887                               t->id, whatNext_strs[t->what_next]));
888
889 #ifdef PROFILING
890     startHeapProfTimer();
891 #endif
892
893     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
894     /* Run the current thread 
895      */
896     prev_what_next = t->what_next;
897     switch (prev_what_next) {
898     case ThreadKilled:
899     case ThreadComplete:
900         /* Thread already finished, return to scheduler. */
901         ret = ThreadFinished;
902         break;
903     case ThreadRunGHC:
904         errno = t->saved_errno;
905         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
906         t->saved_errno = errno;
907         break;
908     case ThreadInterpret:
909         ret = interpretBCO(cap);
910         break;
911     default:
912       barf("schedule: invalid what_next field");
913     }
914     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
915     
916     /* Costs for the scheduler are assigned to CCS_SYSTEM */
917 #ifdef PROFILING
918     stopHeapProfTimer();
919     CCCS = CCS_SYSTEM;
920 #endif
921     
922     ACQUIRE_LOCK(&sched_mutex);
923     
924 #ifdef RTS_SUPPORTS_THREADS
925     IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
926 #elif !defined(GRAN) && !defined(PAR)
927     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
928 #endif
929     t = cap->r.rCurrentTSO;
930     
931 #if defined(PAR)
932     /* HACK 675: if the last thread didn't yield, make sure to print a 
933        SCHEDULE event to the log file when StgRunning the next thread, even
934        if it is the same one as before */
935     LastTSO = t; 
936     TimeOfLastYield = CURRENT_TIME;
937 #endif
938
939     switch (ret) {
940     case HeapOverflow:
941 #if defined(GRAN)
942       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
943       globalGranStats.tot_heapover++;
944 #elif defined(PAR)
945       globalParStats.tot_heapover++;
946 #endif
947
948       // did the task ask for a large block?
949       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
950           // if so, get one and push it on the front of the nursery.
951           bdescr *bd;
952           nat blocks;
953           
954           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
955
956           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
957                                    t->id, whatNext_strs[t->what_next], blocks));
958
959           // don't do this if it would push us over the
960           // alloc_blocks_lim limit; we'll GC first.
961           if (alloc_blocks + blocks < alloc_blocks_lim) {
962
963               alloc_blocks += blocks;
964               bd = allocGroup( blocks );
965
966               // link the new group into the list
967               bd->link = cap->r.rCurrentNursery;
968               bd->u.back = cap->r.rCurrentNursery->u.back;
969               if (cap->r.rCurrentNursery->u.back != NULL) {
970                   cap->r.rCurrentNursery->u.back->link = bd;
971               } else {
972                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
973                          g0s0->blocks == cap->r.rNursery);
974                   cap->r.rNursery = g0s0->blocks = bd;
975               }           
976               cap->r.rCurrentNursery->u.back = bd;
977
978               // initialise it as a nursery block.  We initialise the
979               // step, gen_no, and flags field of *every* sub-block in
980               // this large block, because this is easier than making
981               // sure that we always find the block head of a large
982               // block whenever we call Bdescr() (eg. evacuate() and
983               // isAlive() in the GC would both have to do this, at
984               // least).
985               { 
986                   bdescr *x;
987                   for (x = bd; x < bd + blocks; x++) {
988                       x->step = g0s0;
989                       x->gen_no = 0;
990                       x->flags = 0;
991                   }
992               }
993
994               // don't forget to update the block count in g0s0.
995               g0s0->n_blocks += blocks;
996               // This assert can be a killer if the app is doing lots
997               // of large block allocations.
998               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
999
1000               // now update the nursery to point to the new block
1001               cap->r.rCurrentNursery = bd;
1002
1003               // we might be unlucky and have another thread get on the
1004               // run queue before us and steal the large block, but in that
1005               // case the thread will just end up requesting another large
1006               // block.
1007               PUSH_ON_RUN_QUEUE(t);
1008               break;
1009           }
1010       }
1011
1012       /* make all the running tasks block on a condition variable,
1013        * maybe set context_switch and wait till they all pile in,
1014        * then have them wait on a GC condition variable.
1015        */
1016       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1017                                t->id, whatNext_strs[t->what_next]));
1018       threadPaused(t);
1019 #if defined(GRAN)
1020       ASSERT(!is_on_queue(t,CurrentProc));
1021 #elif defined(PAR)
1022       /* Currently we emit a DESCHEDULE event before GC in GUM.
1023          ToDo: either add separate event to distinguish SYSTEM time from rest
1024                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1025       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1026         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1027                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1028         emitSchedule = rtsTrue;
1029       }
1030 #endif
1031       
1032       ready_to_gc = rtsTrue;
1033       context_switch = 1;               /* stop other threads ASAP */
1034       PUSH_ON_RUN_QUEUE(t);
1035       /* actual GC is done at the end of the while loop */
1036       break;
1037       
1038     case StackOverflow:
1039 #if defined(GRAN)
1040       IF_DEBUG(gran, 
1041                DumpGranEvent(GR_DESCHEDULE, t));
1042       globalGranStats.tot_stackover++;
1043 #elif defined(PAR)
1044       // IF_DEBUG(par, 
1045       // DumpGranEvent(GR_DESCHEDULE, t);
1046       globalParStats.tot_stackover++;
1047 #endif
1048       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1049                                t->id, whatNext_strs[t->what_next]));
1050       /* just adjust the stack for this thread, then pop it back
1051        * on the run queue.
1052        */
1053       threadPaused(t);
1054       { 
1055         /* enlarge the stack */
1056         StgTSO *new_t = threadStackOverflow(t);
1057         
1058         /* This TSO has moved, so update any pointers to it from the
1059          * main thread stack.  It better not be on any other queues...
1060          * (it shouldn't be).
1061          */
1062         if (t->main != NULL) {
1063             t->main->tso = new_t;
1064         }
1065         threadPaused(new_t);
1066         PUSH_ON_RUN_QUEUE(new_t);
1067       }
1068       break;
1069
1070     case ThreadYielding:
1071 #if defined(GRAN)
1072       IF_DEBUG(gran, 
1073                DumpGranEvent(GR_DESCHEDULE, t));
1074       globalGranStats.tot_yields++;
1075 #elif defined(PAR)
1076       // IF_DEBUG(par, 
1077       // DumpGranEvent(GR_DESCHEDULE, t);
1078       globalParStats.tot_yields++;
1079 #endif
1080       /* put the thread back on the run queue.  Then, if we're ready to
1081        * GC, check whether this is the last task to stop.  If so, wake
1082        * up the GC thread.  getThread will block during a GC until the
1083        * GC is finished.
1084        */
1085       IF_DEBUG(scheduler,
1086                if (t->what_next != prev_what_next) {
1087                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1088                          t->id, whatNext_strs[t->what_next]);
1089                } else {
1090                    belch("--<< thread %ld (%s) stopped, yielding", 
1091                          t->id, whatNext_strs[t->what_next]);
1092                }
1093                );
1094
1095       IF_DEBUG(sanity,
1096                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1097                checkTSO(t));
1098       ASSERT(t->link == END_TSO_QUEUE);
1099
1100       // Shortcut if we're just switching evaluators: don't bother
1101       // doing stack squeezing (which can be expensive), just run the
1102       // thread.
1103       if (t->what_next != prev_what_next) {
1104           goto run_thread;
1105       }
1106
1107       threadPaused(t);
1108
1109 #if defined(GRAN)
1110       ASSERT(!is_on_queue(t,CurrentProc));
1111
1112       IF_DEBUG(sanity,
1113                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1114                checkThreadQsSanity(rtsTrue));
1115 #endif
1116
1117 #if defined(PAR)
1118       if (RtsFlags.ParFlags.doFairScheduling) { 
1119         /* this does round-robin scheduling; good for concurrency */
1120         APPEND_TO_RUN_QUEUE(t);
1121       } else {
1122         /* this does unfair scheduling; good for parallelism */
1123         PUSH_ON_RUN_QUEUE(t);
1124       }
1125 #else
1126       // this does round-robin scheduling; good for concurrency
1127       APPEND_TO_RUN_QUEUE(t);
1128 #endif
1129
1130 #if defined(GRAN)
1131       /* add a ContinueThread event to actually process the thread */
1132       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1133                 ContinueThread,
1134                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1135       IF_GRAN_DEBUG(bq, 
1136                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1137                G_EVENTQ(0);
1138                G_CURR_THREADQ(0));
1139 #endif /* GRAN */
1140       break;
1141
1142     case ThreadBlocked:
1143 #if defined(GRAN)
1144       IF_DEBUG(scheduler,
1145                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1146                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1147                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1148
1149       // ??? needed; should emit block before
1150       IF_DEBUG(gran, 
1151                DumpGranEvent(GR_DESCHEDULE, t)); 
1152       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1153       /*
1154         ngoq Dogh!
1155       ASSERT(procStatus[CurrentProc]==Busy || 
1156               ((procStatus[CurrentProc]==Fetching) && 
1157               (t->block_info.closure!=(StgClosure*)NULL)));
1158       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1159           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1160             procStatus[CurrentProc]==Fetching)) 
1161         procStatus[CurrentProc] = Idle;
1162       */
1163 #elif defined(PAR)
1164       IF_DEBUG(scheduler,
1165                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1166                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1167       IF_PAR_DEBUG(bq,
1168
1169                    if (t->block_info.closure!=(StgClosure*)NULL) 
1170                      print_bq(t->block_info.closure));
1171
1172       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1173       blockThread(t);
1174
1175       /* whatever we schedule next, we must log that schedule */
1176       emitSchedule = rtsTrue;
1177
1178 #else /* !GRAN */
1179       /* don't need to do anything.  Either the thread is blocked on
1180        * I/O, in which case we'll have called addToBlockedQueue
1181        * previously, or it's blocked on an MVar or Blackhole, in which
1182        * case it'll be on the relevant queue already.
1183        */
1184       IF_DEBUG(scheduler,
1185                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1186                        t->id, whatNext_strs[t->what_next]);
1187                printThreadBlockage(t);
1188                fprintf(stderr, "\n"));
1189       fflush(stderr);
1190
1191       /* Only for dumping event to log file 
1192          ToDo: do I need this in GranSim, too?
1193       blockThread(t);
1194       */
1195 #endif
1196       threadPaused(t);
1197       break;
1198
1199     case ThreadFinished:
1200       /* Need to check whether this was a main thread, and if so, signal
1201        * the task that started it with the return value.  If we have no
1202        * more main threads, we probably need to stop all the tasks until
1203        * we get a new one.
1204        */
1205       /* We also end up here if the thread kills itself with an
1206        * uncaught exception, see Exception.hc.
1207        */
1208       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1209                                t->id, whatNext_strs[t->what_next]));
1210 #if defined(GRAN)
1211       endThread(t, CurrentProc); // clean-up the thread
1212 #elif defined(PAR)
1213       /* For now all are advisory -- HWL */
1214       //if(t->priority==AdvisoryPriority) ??
1215       advisory_thread_count--;
1216       
1217 # ifdef DIST
1218       if(t->dist.priority==RevalPriority)
1219         FinishReval(t);
1220 # endif
1221       
1222       if (RtsFlags.ParFlags.ParStats.Full &&
1223           !RtsFlags.ParFlags.ParStats.Suppressed) 
1224         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1225 #endif
1226
1227       //
1228       // Check whether the thread that just completed was a main
1229       // thread, and if so return with the result.  
1230       //
1231       // There is an assumption here that all thread completion goes
1232       // through this point; we need to make sure that if a thread
1233       // ends up in the ThreadKilled state, that it stays on the run
1234       // queue so it can be dealt with here.
1235       //
1236       if (
1237 #if defined(RTS_SUPPORTS_THREADS)
1238           mainThread != NULL
1239 #else
1240           mainThread->tso == t
1241 #endif
1242           )
1243       {
1244           // We are a bound thread: this must be our thread that just
1245           // completed.
1246           ASSERT(mainThread->tso == t);
1247
1248           if (t->what_next == ThreadComplete) {
1249               if (mainThread->ret) {
1250                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1251                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1252               }
1253               mainThread->stat = Success;
1254           } else {
1255               if (mainThread->ret) {
1256                   *(mainThread->ret) = NULL;
1257               }
1258               if (was_interrupted) {
1259                   mainThread->stat = Interrupted;
1260               } else {
1261                   mainThread->stat = Killed;
1262               }
1263           }
1264 #ifdef DEBUG
1265           removeThreadLabel((StgWord)mainThread->tso->id);
1266 #endif
1267           if (mainThread->prev == NULL) {
1268               main_threads = mainThread->link;
1269           } else {
1270               mainThread->prev->link = mainThread->link;
1271           }
1272           if (mainThread->link != NULL) {
1273               mainThread->link->prev = NULL;
1274           }
1275           releaseCapability(cap);
1276           return;
1277       }
1278
1279 #ifdef RTS_SUPPORTS_THREADS
1280       ASSERT(t->main == NULL);
1281 #else
1282       if (t->main != NULL) {
1283           // Must be a main thread that is not the topmost one.  Leave
1284           // it on the run queue until the stack has unwound to the
1285           // point where we can deal with this.  Leaving it on the run
1286           // queue also ensures that the garbage collector knows about
1287           // this thread and its return value (it gets dropped from the
1288           // all_threads list so there's no other way to find it).
1289           APPEND_TO_RUN_QUEUE(t);
1290       }
1291 #endif
1292       break;
1293
1294     default:
1295       barf("schedule: invalid thread return code %d", (int)ret);
1296     }
1297
1298 #ifdef PROFILING
1299     // When we have +RTS -i0 and we're heap profiling, do a census at
1300     // every GC.  This lets us get repeatable runs for debugging.
1301     if (performHeapProfile ||
1302         (RtsFlags.ProfFlags.profileInterval==0 &&
1303          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1304         GarbageCollect(GetRoots, rtsTrue);
1305         heapCensus();
1306         performHeapProfile = rtsFalse;
1307         ready_to_gc = rtsFalse; // we already GC'd
1308     }
1309 #endif
1310
1311     if (ready_to_gc) {
1312       /* everybody back, start the GC.
1313        * Could do it in this thread, or signal a condition var
1314        * to do it in another thread.  Either way, we need to
1315        * broadcast on gc_pending_cond afterward.
1316        */
1317 #if defined(RTS_SUPPORTS_THREADS)
1318       IF_DEBUG(scheduler,sched_belch("doing GC"));
1319 #endif
1320       GarbageCollect(GetRoots,rtsFalse);
1321       ready_to_gc = rtsFalse;
1322 #if defined(GRAN)
1323       /* add a ContinueThread event to continue execution of current thread */
1324       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1325                 ContinueThread,
1326                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1327       IF_GRAN_DEBUG(bq, 
1328                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1329                G_EVENTQ(0);
1330                G_CURR_THREADQ(0));
1331 #endif /* GRAN */
1332     }
1333
1334 #if defined(GRAN)
1335   next_thread:
1336     IF_GRAN_DEBUG(unused,
1337                   print_eventq(EventHd));
1338
1339     event = get_next_event();
1340 #elif defined(PAR)
1341   next_thread:
1342     /* ToDo: wait for next message to arrive rather than busy wait */
1343 #endif /* GRAN */
1344
1345   } /* end of while(1) */
1346
1347   IF_PAR_DEBUG(verbose,
1348                belch("== Leaving schedule() after having received Finish"));
1349 }
1350
1351 /* ---------------------------------------------------------------------------
1352  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1353  * used by Control.Concurrent for error checking.
1354  * ------------------------------------------------------------------------- */
1355  
1356 StgBool
1357 rtsSupportsBoundThreads(void)
1358 {
1359 #ifdef THREADED_RTS
1360   return rtsTrue;
1361 #else
1362   return rtsFalse;
1363 #endif
1364 }
1365
1366 /* ---------------------------------------------------------------------------
1367  * isThreadBound(tso): check whether tso is bound to an OS thread.
1368  * ------------------------------------------------------------------------- */
1369  
1370 StgBool
1371 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1372 {
1373 #ifdef THREADED_RTS
1374   return (tso->main != NULL);
1375 #endif
1376   return rtsFalse;
1377 }
1378
1379 /* ---------------------------------------------------------------------------
1380  * Singleton fork(). Do not copy any running threads.
1381  * ------------------------------------------------------------------------- */
1382
1383 static void 
1384 deleteThreadImmediately(StgTSO *tso);
1385
1386 StgInt
1387 forkProcess(HsStablePtr *entry)
1388 {
1389 #ifndef mingw32_TARGET_OS
1390   pid_t pid;
1391   StgTSO* t,*next;
1392   StgMainThread *m;
1393   SchedulerStatus rc;
1394
1395   IF_DEBUG(scheduler,sched_belch("forking!"));
1396   rts_lock(); // This not only acquires sched_mutex, it also
1397               // makes sure that no other threads are running
1398
1399   pid = fork();
1400
1401   if (pid) { /* parent */
1402
1403   /* just return the pid */
1404     rts_unlock();
1405     return pid;
1406     
1407   } else { /* child */
1408     
1409     
1410       // delete all threads
1411     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1412     
1413     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1414       next = t->link;
1415
1416         // don't allow threads to catch the ThreadKilled exception
1417       deleteThreadImmediately(t);
1418     }
1419     
1420       // wipe the main thread list
1421     while((m = main_threads) != NULL) {
1422       main_threads = m->link;
1423 #ifdef THREADED_RTS
1424       closeCondition(&m->bound_thread_cond);
1425 #endif
1426       stgFree(m);
1427     }
1428     
1429 #ifdef RTS_SUPPORTS_THREADS
1430     resetTaskManagerAfterFork();      // tell startTask() and friends that
1431     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1432     resetWorkerWakeupPipeAfterFork();
1433 #endif
1434     
1435     rc = rts_evalStableIO(entry, NULL);  // run the action
1436     rts_checkSchedStatus("forkProcess",rc);
1437     
1438     rts_unlock();
1439     
1440     hs_exit();                      // clean up and exit
1441     stg_exit(0);
1442   }
1443 #else /* mingw32 */
1444   barf("forkProcess#: primop not implemented for mingw32, sorry!\n");
1445   return -1;
1446 #endif /* mingw32 */
1447 }
1448
1449 /* ---------------------------------------------------------------------------
1450  * deleteAllThreads():  kill all the live threads.
1451  *
1452  * This is used when we catch a user interrupt (^C), before performing
1453  * any necessary cleanups and running finalizers.
1454  *
1455  * Locks: sched_mutex held.
1456  * ------------------------------------------------------------------------- */
1457    
1458 void
1459 deleteAllThreads ( void )
1460 {
1461   StgTSO* t, *next;
1462   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1463   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1464       next = t->global_link;
1465       deleteThread(t);
1466   }      
1467   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1468   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1469   sleeping_queue = END_TSO_QUEUE;
1470 }
1471
1472 /* startThread and  insertThread are now in GranSim.c -- HWL */
1473
1474
1475 /* ---------------------------------------------------------------------------
1476  * Suspending & resuming Haskell threads.
1477  * 
1478  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1479  * its capability before calling the C function.  This allows another
1480  * task to pick up the capability and carry on running Haskell
1481  * threads.  It also means that if the C call blocks, it won't lock
1482  * the whole system.
1483  *
1484  * The Haskell thread making the C call is put to sleep for the
1485  * duration of the call, on the susepended_ccalling_threads queue.  We
1486  * give out a token to the task, which it can use to resume the thread
1487  * on return from the C function.
1488  * ------------------------------------------------------------------------- */
1489    
1490 StgInt
1491 suspendThread( StgRegTable *reg, 
1492                rtsBool concCall
1493 #if !defined(DEBUG)
1494                STG_UNUSED
1495 #endif
1496                )
1497 {
1498   nat tok;
1499   Capability *cap;
1500   int saved_errno = errno;
1501
1502   /* assume that *reg is a pointer to the StgRegTable part
1503    * of a Capability.
1504    */
1505   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1506
1507   ACQUIRE_LOCK(&sched_mutex);
1508
1509   IF_DEBUG(scheduler,
1510            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1511
1512   // XXX this might not be necessary --SDM
1513   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1514
1515   threadPaused(cap->r.rCurrentTSO);
1516   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1517   suspended_ccalling_threads = cap->r.rCurrentTSO;
1518
1519   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1520       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1521       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1522   } else {
1523       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1524   }
1525
1526   /* Use the thread ID as the token; it should be unique */
1527   tok = cap->r.rCurrentTSO->id;
1528
1529   /* Hand back capability */
1530   releaseCapability(cap);
1531   
1532 #if defined(RTS_SUPPORTS_THREADS)
1533   /* Preparing to leave the RTS, so ensure there's a native thread/task
1534      waiting to take over.
1535   */
1536   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1537 #endif
1538
1539   /* Other threads _might_ be available for execution; signal this */
1540   THREAD_RUNNABLE();
1541   RELEASE_LOCK(&sched_mutex);
1542   
1543   errno = saved_errno;
1544   return tok; 
1545 }
1546
1547 StgRegTable *
1548 resumeThread( StgInt tok,
1549               rtsBool concCall STG_UNUSED )
1550 {
1551   StgTSO *tso, **prev;
1552   Capability *cap;
1553   int saved_errno = errno;
1554
1555 #if defined(RTS_SUPPORTS_THREADS)
1556   /* Wait for permission to re-enter the RTS with the result. */
1557   ACQUIRE_LOCK(&sched_mutex);
1558   waitForReturnCapability(&sched_mutex, &cap);
1559
1560   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1561 #else
1562   grabCapability(&cap);
1563 #endif
1564
1565   /* Remove the thread off of the suspended list */
1566   prev = &suspended_ccalling_threads;
1567   for (tso = suspended_ccalling_threads; 
1568        tso != END_TSO_QUEUE; 
1569        prev = &tso->link, tso = tso->link) {
1570     if (tso->id == (StgThreadID)tok) {
1571       *prev = tso->link;
1572       break;
1573     }
1574   }
1575   if (tso == END_TSO_QUEUE) {
1576     barf("resumeThread: thread not found");
1577   }
1578   tso->link = END_TSO_QUEUE;
1579   
1580   if(tso->why_blocked == BlockedOnCCall) {
1581       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1582       tso->blocked_exceptions = NULL;
1583   }
1584   
1585   /* Reset blocking status */
1586   tso->why_blocked  = NotBlocked;
1587
1588   cap->r.rCurrentTSO = tso;
1589   RELEASE_LOCK(&sched_mutex);
1590   errno = saved_errno;
1591   return &cap->r;
1592 }
1593
1594
1595 /* ---------------------------------------------------------------------------
1596  * Static functions
1597  * ------------------------------------------------------------------------ */
1598 static void unblockThread(StgTSO *tso);
1599
1600 /* ---------------------------------------------------------------------------
1601  * Comparing Thread ids.
1602  *
1603  * This is used from STG land in the implementation of the
1604  * instances of Eq/Ord for ThreadIds.
1605  * ------------------------------------------------------------------------ */
1606
1607 int
1608 cmp_thread(StgPtr tso1, StgPtr tso2) 
1609
1610   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1611   StgThreadID id2 = ((StgTSO *)tso2)->id;
1612  
1613   if (id1 < id2) return (-1);
1614   if (id1 > id2) return 1;
1615   return 0;
1616 }
1617
1618 /* ---------------------------------------------------------------------------
1619  * Fetching the ThreadID from an StgTSO.
1620  *
1621  * This is used in the implementation of Show for ThreadIds.
1622  * ------------------------------------------------------------------------ */
1623 int
1624 rts_getThreadId(StgPtr tso) 
1625 {
1626   return ((StgTSO *)tso)->id;
1627 }
1628
1629 #ifdef DEBUG
1630 void
1631 labelThread(StgPtr tso, char *label)
1632 {
1633   int len;
1634   void *buf;
1635
1636   /* Caveat: Once set, you can only set the thread name to "" */
1637   len = strlen(label)+1;
1638   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1639   strncpy(buf,label,len);
1640   /* Update will free the old memory for us */
1641   updateThreadLabel(((StgTSO *)tso)->id,buf);
1642 }
1643 #endif /* DEBUG */
1644
1645 /* ---------------------------------------------------------------------------
1646    Create a new thread.
1647
1648    The new thread starts with the given stack size.  Before the
1649    scheduler can run, however, this thread needs to have a closure
1650    (and possibly some arguments) pushed on its stack.  See
1651    pushClosure() in Schedule.h.
1652
1653    createGenThread() and createIOThread() (in SchedAPI.h) are
1654    convenient packaged versions of this function.
1655
1656    currently pri (priority) is only used in a GRAN setup -- HWL
1657    ------------------------------------------------------------------------ */
1658 #if defined(GRAN)
1659 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1660 StgTSO *
1661 createThread(nat size, StgInt pri)
1662 #else
1663 StgTSO *
1664 createThread(nat size)
1665 #endif
1666 {
1667
1668     StgTSO *tso;
1669     nat stack_size;
1670
1671     /* First check whether we should create a thread at all */
1672 #if defined(PAR)
1673   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1674   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1675     threadsIgnored++;
1676     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1677           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1678     return END_TSO_QUEUE;
1679   }
1680   threadsCreated++;
1681 #endif
1682
1683 #if defined(GRAN)
1684   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1685 #endif
1686
1687   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1688
1689   /* catch ridiculously small stack sizes */
1690   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1691     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1692   }
1693
1694   stack_size = size - TSO_STRUCT_SIZEW;
1695
1696   tso = (StgTSO *)allocate(size);
1697   TICK_ALLOC_TSO(stack_size, 0);
1698
1699   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1700 #if defined(GRAN)
1701   SET_GRAN_HDR(tso, ThisPE);
1702 #endif
1703
1704   // Always start with the compiled code evaluator
1705   tso->what_next = ThreadRunGHC;
1706
1707   tso->id = next_thread_id++; 
1708   tso->why_blocked  = NotBlocked;
1709   tso->blocked_exceptions = NULL;
1710
1711   tso->saved_errno = 0;
1712   tso->main = NULL;
1713   
1714   tso->stack_size   = stack_size;
1715   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1716                               - TSO_STRUCT_SIZEW;
1717   tso->sp           = (P_)&(tso->stack) + stack_size;
1718
1719 #ifdef PROFILING
1720   tso->prof.CCCS = CCS_MAIN;
1721 #endif
1722
1723   /* put a stop frame on the stack */
1724   tso->sp -= sizeofW(StgStopFrame);
1725   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1726   // ToDo: check this
1727 #if defined(GRAN)
1728   tso->link = END_TSO_QUEUE;
1729   /* uses more flexible routine in GranSim */
1730   insertThread(tso, CurrentProc);
1731 #else
1732   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1733    * from its creation
1734    */
1735 #endif
1736
1737 #if defined(GRAN) 
1738   if (RtsFlags.GranFlags.GranSimStats.Full) 
1739     DumpGranEvent(GR_START,tso);
1740 #elif defined(PAR)
1741   if (RtsFlags.ParFlags.ParStats.Full) 
1742     DumpGranEvent(GR_STARTQ,tso);
1743   /* HACk to avoid SCHEDULE 
1744      LastTSO = tso; */
1745 #endif
1746
1747   /* Link the new thread on the global thread list.
1748    */
1749   tso->global_link = all_threads;
1750   all_threads = tso;
1751
1752 #if defined(DIST)
1753   tso->dist.priority = MandatoryPriority; //by default that is...
1754 #endif
1755
1756 #if defined(GRAN)
1757   tso->gran.pri = pri;
1758 # if defined(DEBUG)
1759   tso->gran.magic = TSO_MAGIC; // debugging only
1760 # endif
1761   tso->gran.sparkname   = 0;
1762   tso->gran.startedat   = CURRENT_TIME; 
1763   tso->gran.exported    = 0;
1764   tso->gran.basicblocks = 0;
1765   tso->gran.allocs      = 0;
1766   tso->gran.exectime    = 0;
1767   tso->gran.fetchtime   = 0;
1768   tso->gran.fetchcount  = 0;
1769   tso->gran.blocktime   = 0;
1770   tso->gran.blockcount  = 0;
1771   tso->gran.blockedat   = 0;
1772   tso->gran.globalsparks = 0;
1773   tso->gran.localsparks  = 0;
1774   if (RtsFlags.GranFlags.Light)
1775     tso->gran.clock  = Now; /* local clock */
1776   else
1777     tso->gran.clock  = 0;
1778
1779   IF_DEBUG(gran,printTSO(tso));
1780 #elif defined(PAR)
1781 # if defined(DEBUG)
1782   tso->par.magic = TSO_MAGIC; // debugging only
1783 # endif
1784   tso->par.sparkname   = 0;
1785   tso->par.startedat   = CURRENT_TIME; 
1786   tso->par.exported    = 0;
1787   tso->par.basicblocks = 0;
1788   tso->par.allocs      = 0;
1789   tso->par.exectime    = 0;
1790   tso->par.fetchtime   = 0;
1791   tso->par.fetchcount  = 0;
1792   tso->par.blocktime   = 0;
1793   tso->par.blockcount  = 0;
1794   tso->par.blockedat   = 0;
1795   tso->par.globalsparks = 0;
1796   tso->par.localsparks  = 0;
1797 #endif
1798
1799 #if defined(GRAN)
1800   globalGranStats.tot_threads_created++;
1801   globalGranStats.threads_created_on_PE[CurrentProc]++;
1802   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1803   globalGranStats.tot_sq_probes++;
1804 #elif defined(PAR)
1805   // collect parallel global statistics (currently done together with GC stats)
1806   if (RtsFlags.ParFlags.ParStats.Global &&
1807       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1808     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1809     globalParStats.tot_threads_created++;
1810   }
1811 #endif 
1812
1813 #if defined(GRAN)
1814   IF_GRAN_DEBUG(pri,
1815                 belch("==__ schedule: Created TSO %d (%p);",
1816                       CurrentProc, tso, tso->id));
1817 #elif defined(PAR)
1818     IF_PAR_DEBUG(verbose,
1819                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1820                        tso->id, tso, advisory_thread_count));
1821 #else
1822   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1823                                  tso->id, tso->stack_size));
1824 #endif    
1825   return tso;
1826 }
1827
1828 #if defined(PAR)
1829 /* RFP:
1830    all parallel thread creation calls should fall through the following routine.
1831 */
1832 StgTSO *
1833 createSparkThread(rtsSpark spark) 
1834 { StgTSO *tso;
1835   ASSERT(spark != (rtsSpark)NULL);
1836   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1837   { threadsIgnored++;
1838     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1839           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1840     return END_TSO_QUEUE;
1841   }
1842   else
1843   { threadsCreated++;
1844     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1845     if (tso==END_TSO_QUEUE)     
1846       barf("createSparkThread: Cannot create TSO");
1847 #if defined(DIST)
1848     tso->priority = AdvisoryPriority;
1849 #endif
1850     pushClosure(tso,spark);
1851     PUSH_ON_RUN_QUEUE(tso);
1852     advisory_thread_count++;    
1853   }
1854   return tso;
1855 }
1856 #endif
1857
1858 /*
1859   Turn a spark into a thread.
1860   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1861 */
1862 #if defined(PAR)
1863 StgTSO *
1864 activateSpark (rtsSpark spark) 
1865 {
1866   StgTSO *tso;
1867
1868   tso = createSparkThread(spark);
1869   if (RtsFlags.ParFlags.ParStats.Full) {   
1870     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1871     IF_PAR_DEBUG(verbose,
1872                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1873                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1874   }
1875   // ToDo: fwd info on local/global spark to thread -- HWL
1876   // tso->gran.exported =  spark->exported;
1877   // tso->gran.locked =   !spark->global;
1878   // tso->gran.sparkname = spark->name;
1879
1880   return tso;
1881 }
1882 #endif
1883
1884 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1885                                    Capability *initialCapability
1886                                    );
1887
1888
1889 /* ---------------------------------------------------------------------------
1890  * scheduleThread()
1891  *
1892  * scheduleThread puts a thread on the head of the runnable queue.
1893  * This will usually be done immediately after a thread is created.
1894  * The caller of scheduleThread must create the thread using e.g.
1895  * createThread and push an appropriate closure
1896  * on this thread's stack before the scheduler is invoked.
1897  * ------------------------------------------------------------------------ */
1898
1899 static void scheduleThread_ (StgTSO* tso);
1900
1901 void
1902 scheduleThread_(StgTSO *tso)
1903 {
1904   // Precondition: sched_mutex must be held.
1905   PUSH_ON_RUN_QUEUE(tso);
1906   THREAD_RUNNABLE();
1907 }
1908
1909 void
1910 scheduleThread(StgTSO* tso)
1911 {
1912   ACQUIRE_LOCK(&sched_mutex);
1913   scheduleThread_(tso);
1914   RELEASE_LOCK(&sched_mutex);
1915 }
1916
1917 #if defined(RTS_SUPPORTS_THREADS)
1918 static Condition bound_cond_cache;
1919 static int bound_cond_cache_full = 0;
1920 #endif
1921
1922
1923 SchedulerStatus
1924 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1925                    Capability *initialCapability)
1926 {
1927     // Precondition: sched_mutex must be held
1928     StgMainThread *m;
1929
1930     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1931     m->tso = tso;
1932     tso->main = m;
1933     m->ret = ret;
1934     m->stat = NoStatus;
1935     m->link = main_threads;
1936     m->prev = NULL;
1937     if (main_threads != NULL) {
1938         main_threads->prev = m;
1939     }
1940     main_threads = m;
1941
1942 #if defined(RTS_SUPPORTS_THREADS)
1943     // Allocating a new condition for each thread is expensive, so we
1944     // cache one.  This is a pretty feeble hack, but it helps speed up
1945     // consecutive call-ins quite a bit.
1946     if (bound_cond_cache_full) {
1947         m->bound_thread_cond = bound_cond_cache;
1948         bound_cond_cache_full = 0;
1949     } else {
1950         initCondition(&m->bound_thread_cond);
1951     }
1952 #endif
1953
1954     /* Put the thread on the main-threads list prior to scheduling the TSO.
1955        Failure to do so introduces a race condition in the MT case (as
1956        identified by Wolfgang Thaller), whereby the new task/OS thread 
1957        created by scheduleThread_() would complete prior to the thread
1958        that spawned it managed to put 'itself' on the main-threads list.
1959        The upshot of it all being that the worker thread wouldn't get to
1960        signal the completion of the its work item for the main thread to
1961        see (==> it got stuck waiting.)    -- sof 6/02.
1962     */
1963     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1964     
1965     PUSH_ON_RUN_QUEUE(tso);
1966     // NB. Don't call THREAD_RUNNABLE() here, because the thread is
1967     // bound and only runnable by *this* OS thread, so waking up other
1968     // workers will just slow things down.
1969
1970     return waitThread_(m, initialCapability);
1971 }
1972
1973 /* ---------------------------------------------------------------------------
1974  * initScheduler()
1975  *
1976  * Initialise the scheduler.  This resets all the queues - if the
1977  * queues contained any threads, they'll be garbage collected at the
1978  * next pass.
1979  *
1980  * ------------------------------------------------------------------------ */
1981
1982 void 
1983 initScheduler(void)
1984 {
1985 #if defined(GRAN)
1986   nat i;
1987
1988   for (i=0; i<=MAX_PROC; i++) {
1989     run_queue_hds[i]      = END_TSO_QUEUE;
1990     run_queue_tls[i]      = END_TSO_QUEUE;
1991     blocked_queue_hds[i]  = END_TSO_QUEUE;
1992     blocked_queue_tls[i]  = END_TSO_QUEUE;
1993     ccalling_threadss[i]  = END_TSO_QUEUE;
1994     sleeping_queue        = END_TSO_QUEUE;
1995   }
1996 #else
1997   run_queue_hd      = END_TSO_QUEUE;
1998   run_queue_tl      = END_TSO_QUEUE;
1999   blocked_queue_hd  = END_TSO_QUEUE;
2000   blocked_queue_tl  = END_TSO_QUEUE;
2001   sleeping_queue    = END_TSO_QUEUE;
2002 #endif 
2003
2004   suspended_ccalling_threads  = END_TSO_QUEUE;
2005
2006   main_threads = NULL;
2007   all_threads  = END_TSO_QUEUE;
2008
2009   context_switch = 0;
2010   interrupted    = 0;
2011
2012   RtsFlags.ConcFlags.ctxtSwitchTicks =
2013       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2014       
2015 #if defined(RTS_SUPPORTS_THREADS)
2016   /* Initialise the mutex and condition variables used by
2017    * the scheduler. */
2018   initMutex(&sched_mutex);
2019   initMutex(&term_mutex);
2020 #endif
2021   
2022   ACQUIRE_LOCK(&sched_mutex);
2023
2024   /* A capability holds the state a native thread needs in
2025    * order to execute STG code. At least one capability is
2026    * floating around (only SMP builds have more than one).
2027    */
2028   initCapabilities();
2029   
2030 #if defined(RTS_SUPPORTS_THREADS)
2031     /* start our haskell execution tasks */
2032     startTaskManager(0,taskStart);
2033 #endif
2034
2035 #if /* defined(SMP) ||*/ defined(PAR)
2036   initSparkPools();
2037 #endif
2038
2039   RELEASE_LOCK(&sched_mutex);
2040 }
2041
2042 void
2043 exitScheduler( void )
2044 {
2045 #if defined(RTS_SUPPORTS_THREADS)
2046   stopTaskManager();
2047 #endif
2048   shutting_down_scheduler = rtsTrue;
2049 }
2050
2051 /* ----------------------------------------------------------------------------
2052    Managing the per-task allocation areas.
2053    
2054    Each capability comes with an allocation area.  These are
2055    fixed-length block lists into which allocation can be done.
2056
2057    ToDo: no support for two-space collection at the moment???
2058    ------------------------------------------------------------------------- */
2059
2060 static
2061 SchedulerStatus
2062 waitThread_(StgMainThread* m, Capability *initialCapability)
2063 {
2064   SchedulerStatus stat;
2065
2066   // Precondition: sched_mutex must be held.
2067   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2068
2069 #if defined(GRAN)
2070   /* GranSim specific init */
2071   CurrentTSO = m->tso;                // the TSO to run
2072   procStatus[MainProc] = Busy;        // status of main PE
2073   CurrentProc = MainProc;             // PE to run it on
2074   schedule(m,initialCapability);
2075 #else
2076   schedule(m,initialCapability);
2077   ASSERT(m->stat != NoStatus);
2078 #endif
2079
2080   stat = m->stat;
2081
2082 #if defined(RTS_SUPPORTS_THREADS)
2083   // Free the condition variable, returning it to the cache if possible.
2084   if (!bound_cond_cache_full) {
2085       bound_cond_cache = m->bound_thread_cond;
2086       bound_cond_cache_full = 1;
2087   } else {
2088       closeCondition(&m->bound_thread_cond);
2089   }
2090 #endif
2091
2092   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2093   stgFree(m);
2094
2095   // Postcondition: sched_mutex still held
2096   return stat;
2097 }
2098
2099 /* ---------------------------------------------------------------------------
2100    Where are the roots that we know about?
2101
2102         - all the threads on the runnable queue
2103         - all the threads on the blocked queue
2104         - all the threads on the sleeping queue
2105         - all the thread currently executing a _ccall_GC
2106         - all the "main threads"
2107      
2108    ------------------------------------------------------------------------ */
2109
2110 /* This has to be protected either by the scheduler monitor, or by the
2111         garbage collection monitor (probably the latter).
2112         KH @ 25/10/99
2113 */
2114
2115 void
2116 GetRoots( evac_fn evac )
2117 {
2118 #if defined(GRAN)
2119   {
2120     nat i;
2121     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2122       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2123           evac((StgClosure **)&run_queue_hds[i]);
2124       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2125           evac((StgClosure **)&run_queue_tls[i]);
2126       
2127       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2128           evac((StgClosure **)&blocked_queue_hds[i]);
2129       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2130           evac((StgClosure **)&blocked_queue_tls[i]);
2131       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2132           evac((StgClosure **)&ccalling_threads[i]);
2133     }
2134   }
2135
2136   markEventQueue();
2137
2138 #else /* !GRAN */
2139   if (run_queue_hd != END_TSO_QUEUE) {
2140       ASSERT(run_queue_tl != END_TSO_QUEUE);
2141       evac((StgClosure **)&run_queue_hd);
2142       evac((StgClosure **)&run_queue_tl);
2143   }
2144   
2145   if (blocked_queue_hd != END_TSO_QUEUE) {
2146       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2147       evac((StgClosure **)&blocked_queue_hd);
2148       evac((StgClosure **)&blocked_queue_tl);
2149   }
2150   
2151   if (sleeping_queue != END_TSO_QUEUE) {
2152       evac((StgClosure **)&sleeping_queue);
2153   }
2154 #endif 
2155
2156   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2157       evac((StgClosure **)&suspended_ccalling_threads);
2158   }
2159
2160 #if defined(PAR) || defined(GRAN)
2161   markSparkQueue(evac);
2162 #endif
2163
2164 #if defined(RTS_USER_SIGNALS)
2165   // mark the signal handlers (signals should be already blocked)
2166   markSignalHandlers(evac);
2167 #endif
2168 }
2169
2170 /* -----------------------------------------------------------------------------
2171    performGC
2172
2173    This is the interface to the garbage collector from Haskell land.
2174    We provide this so that external C code can allocate and garbage
2175    collect when called from Haskell via _ccall_GC.
2176
2177    It might be useful to provide an interface whereby the programmer
2178    can specify more roots (ToDo).
2179    
2180    This needs to be protected by the GC condition variable above.  KH.
2181    -------------------------------------------------------------------------- */
2182
2183 static void (*extra_roots)(evac_fn);
2184
2185 void
2186 performGC(void)
2187 {
2188   /* Obligated to hold this lock upon entry */
2189   ACQUIRE_LOCK(&sched_mutex);
2190   GarbageCollect(GetRoots,rtsFalse);
2191   RELEASE_LOCK(&sched_mutex);
2192 }
2193
2194 void
2195 performMajorGC(void)
2196 {
2197   ACQUIRE_LOCK(&sched_mutex);
2198   GarbageCollect(GetRoots,rtsTrue);
2199   RELEASE_LOCK(&sched_mutex);
2200 }
2201
2202 static void
2203 AllRoots(evac_fn evac)
2204 {
2205     GetRoots(evac);             // the scheduler's roots
2206     extra_roots(evac);          // the user's roots
2207 }
2208
2209 void
2210 performGCWithRoots(void (*get_roots)(evac_fn))
2211 {
2212   ACQUIRE_LOCK(&sched_mutex);
2213   extra_roots = get_roots;
2214   GarbageCollect(AllRoots,rtsFalse);
2215   RELEASE_LOCK(&sched_mutex);
2216 }
2217
2218 /* -----------------------------------------------------------------------------
2219    Stack overflow
2220
2221    If the thread has reached its maximum stack size, then raise the
2222    StackOverflow exception in the offending thread.  Otherwise
2223    relocate the TSO into a larger chunk of memory and adjust its stack
2224    size appropriately.
2225    -------------------------------------------------------------------------- */
2226
2227 static StgTSO *
2228 threadStackOverflow(StgTSO *tso)
2229 {
2230   nat new_stack_size, new_tso_size, stack_words;
2231   StgPtr new_sp;
2232   StgTSO *dest;
2233
2234   IF_DEBUG(sanity,checkTSO(tso));
2235   if (tso->stack_size >= tso->max_stack_size) {
2236
2237     IF_DEBUG(gc,
2238              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2239                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2240              /* If we're debugging, just print out the top of the stack */
2241              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2242                                               tso->sp+64)));
2243
2244     /* Send this thread the StackOverflow exception */
2245     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2246     return tso;
2247   }
2248
2249   /* Try to double the current stack size.  If that takes us over the
2250    * maximum stack size for this thread, then use the maximum instead.
2251    * Finally round up so the TSO ends up as a whole number of blocks.
2252    */
2253   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2254   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2255                                        TSO_STRUCT_SIZE)/sizeof(W_);
2256   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2257   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2258
2259   IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2260
2261   dest = (StgTSO *)allocate(new_tso_size);
2262   TICK_ALLOC_TSO(new_stack_size,0);
2263
2264   /* copy the TSO block and the old stack into the new area */
2265   memcpy(dest,tso,TSO_STRUCT_SIZE);
2266   stack_words = tso->stack + tso->stack_size - tso->sp;
2267   new_sp = (P_)dest + new_tso_size - stack_words;
2268   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2269
2270   /* relocate the stack pointers... */
2271   dest->sp         = new_sp;
2272   dest->stack_size = new_stack_size;
2273         
2274   /* Mark the old TSO as relocated.  We have to check for relocated
2275    * TSOs in the garbage collector and any primops that deal with TSOs.
2276    *
2277    * It's important to set the sp value to just beyond the end
2278    * of the stack, so we don't attempt to scavenge any part of the
2279    * dead TSO's stack.
2280    */
2281   tso->what_next = ThreadRelocated;
2282   tso->link = dest;
2283   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2284   tso->why_blocked = NotBlocked;
2285   dest->mut_link = NULL;
2286
2287   IF_PAR_DEBUG(verbose,
2288                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2289                      tso->id, tso, tso->stack_size);
2290                /* If we're debugging, just print out the top of the stack */
2291                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2292                                                 tso->sp+64)));
2293   
2294   IF_DEBUG(sanity,checkTSO(tso));
2295 #if 0
2296   IF_DEBUG(scheduler,printTSO(dest));
2297 #endif
2298
2299   return dest;
2300 }
2301
2302 /* ---------------------------------------------------------------------------
2303    Wake up a queue that was blocked on some resource.
2304    ------------------------------------------------------------------------ */
2305
2306 #if defined(GRAN)
2307 STATIC_INLINE void
2308 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2309 {
2310 }
2311 #elif defined(PAR)
2312 STATIC_INLINE void
2313 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2314 {
2315   /* write RESUME events to log file and
2316      update blocked and fetch time (depending on type of the orig closure) */
2317   if (RtsFlags.ParFlags.ParStats.Full) {
2318     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2319                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2320                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2321     if (EMPTY_RUN_QUEUE())
2322       emitSchedule = rtsTrue;
2323
2324     switch (get_itbl(node)->type) {
2325         case FETCH_ME_BQ:
2326           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2327           break;
2328         case RBH:
2329         case FETCH_ME:
2330         case BLACKHOLE_BQ:
2331           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2332           break;
2333 #ifdef DIST
2334         case MVAR:
2335           break;
2336 #endif    
2337         default:
2338           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2339         }
2340       }
2341 }
2342 #endif
2343
2344 #if defined(GRAN)
2345 static StgBlockingQueueElement *
2346 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2347 {
2348     StgTSO *tso;
2349     PEs node_loc, tso_loc;
2350
2351     node_loc = where_is(node); // should be lifted out of loop
2352     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2353     tso_loc = where_is((StgClosure *)tso);
2354     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2355       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2356       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2357       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2358       // insertThread(tso, node_loc);
2359       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2360                 ResumeThread,
2361                 tso, node, (rtsSpark*)NULL);
2362       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2363       // len_local++;
2364       // len++;
2365     } else { // TSO is remote (actually should be FMBQ)
2366       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2367                                   RtsFlags.GranFlags.Costs.gunblocktime +
2368                                   RtsFlags.GranFlags.Costs.latency;
2369       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2370                 UnblockThread,
2371                 tso, node, (rtsSpark*)NULL);
2372       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2373       // len++;
2374     }
2375     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2376     IF_GRAN_DEBUG(bq,
2377                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2378                           (node_loc==tso_loc ? "Local" : "Global"), 
2379                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2380     tso->block_info.closure = NULL;
2381     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2382                              tso->id, tso));
2383 }
2384 #elif defined(PAR)
2385 static StgBlockingQueueElement *
2386 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2387 {
2388     StgBlockingQueueElement *next;
2389
2390     switch (get_itbl(bqe)->type) {
2391     case TSO:
2392       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2393       /* if it's a TSO just push it onto the run_queue */
2394       next = bqe->link;
2395       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2396       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2397       THREAD_RUNNABLE();
2398       unblockCount(bqe, node);
2399       /* reset blocking status after dumping event */
2400       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2401       break;
2402
2403     case BLOCKED_FETCH:
2404       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2405       next = bqe->link;
2406       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2407       PendingFetches = (StgBlockedFetch *)bqe;
2408       break;
2409
2410 # if defined(DEBUG)
2411       /* can ignore this case in a non-debugging setup; 
2412          see comments on RBHSave closures above */
2413     case CONSTR:
2414       /* check that the closure is an RBHSave closure */
2415       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2416              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2417              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2418       break;
2419
2420     default:
2421       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2422            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2423            (StgClosure *)bqe);
2424 # endif
2425     }
2426   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2427   return next;
2428 }
2429
2430 #else /* !GRAN && !PAR */
2431 static StgTSO *
2432 unblockOneLocked(StgTSO *tso)
2433 {
2434   StgTSO *next;
2435
2436   ASSERT(get_itbl(tso)->type == TSO);
2437   ASSERT(tso->why_blocked != NotBlocked);
2438   tso->why_blocked = NotBlocked;
2439   next = tso->link;
2440   PUSH_ON_RUN_QUEUE(tso);
2441   THREAD_RUNNABLE();
2442   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2443   return next;
2444 }
2445 #endif
2446
2447 #if defined(GRAN) || defined(PAR)
2448 INLINE_ME StgBlockingQueueElement *
2449 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2450 {
2451   ACQUIRE_LOCK(&sched_mutex);
2452   bqe = unblockOneLocked(bqe, node);
2453   RELEASE_LOCK(&sched_mutex);
2454   return bqe;
2455 }
2456 #else
2457 INLINE_ME StgTSO *
2458 unblockOne(StgTSO *tso)
2459 {
2460   ACQUIRE_LOCK(&sched_mutex);
2461   tso = unblockOneLocked(tso);
2462   RELEASE_LOCK(&sched_mutex);
2463   return tso;
2464 }
2465 #endif
2466
2467 #if defined(GRAN)
2468 void 
2469 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2470 {
2471   StgBlockingQueueElement *bqe;
2472   PEs node_loc;
2473   nat len = 0; 
2474
2475   IF_GRAN_DEBUG(bq, 
2476                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2477                       node, CurrentProc, CurrentTime[CurrentProc], 
2478                       CurrentTSO->id, CurrentTSO));
2479
2480   node_loc = where_is(node);
2481
2482   ASSERT(q == END_BQ_QUEUE ||
2483          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2484          get_itbl(q)->type == CONSTR); // closure (type constructor)
2485   ASSERT(is_unique(node));
2486
2487   /* FAKE FETCH: magically copy the node to the tso's proc;
2488      no Fetch necessary because in reality the node should not have been 
2489      moved to the other PE in the first place
2490   */
2491   if (CurrentProc!=node_loc) {
2492     IF_GRAN_DEBUG(bq, 
2493                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2494                         node, node_loc, CurrentProc, CurrentTSO->id, 
2495                         // CurrentTSO, where_is(CurrentTSO),
2496                         node->header.gran.procs));
2497     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2498     IF_GRAN_DEBUG(bq, 
2499                   belch("## new bitmask of node %p is %#x",
2500                         node, node->header.gran.procs));
2501     if (RtsFlags.GranFlags.GranSimStats.Global) {
2502       globalGranStats.tot_fake_fetches++;
2503     }
2504   }
2505
2506   bqe = q;
2507   // ToDo: check: ASSERT(CurrentProc==node_loc);
2508   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2509     //next = bqe->link;
2510     /* 
2511        bqe points to the current element in the queue
2512        next points to the next element in the queue
2513     */
2514     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2515     //tso_loc = where_is(tso);
2516     len++;
2517     bqe = unblockOneLocked(bqe, node);
2518   }
2519
2520   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2521      the closure to make room for the anchor of the BQ */
2522   if (bqe!=END_BQ_QUEUE) {
2523     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2524     /*
2525     ASSERT((info_ptr==&RBH_Save_0_info) ||
2526            (info_ptr==&RBH_Save_1_info) ||
2527            (info_ptr==&RBH_Save_2_info));
2528     */
2529     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2530     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2531     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2532
2533     IF_GRAN_DEBUG(bq,
2534                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2535                         node, info_type(node)));
2536   }
2537
2538   /* statistics gathering */
2539   if (RtsFlags.GranFlags.GranSimStats.Global) {
2540     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2541     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2542     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2543     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2544   }
2545   IF_GRAN_DEBUG(bq,
2546                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2547                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2548 }
2549 #elif defined(PAR)
2550 void 
2551 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2552 {
2553   StgBlockingQueueElement *bqe;
2554
2555   ACQUIRE_LOCK(&sched_mutex);
2556
2557   IF_PAR_DEBUG(verbose, 
2558                belch("##-_ AwBQ for node %p on [%x]: ",
2559                      node, mytid));
2560 #ifdef DIST  
2561   //RFP
2562   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2563     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2564     return;
2565   }
2566 #endif
2567   
2568   ASSERT(q == END_BQ_QUEUE ||
2569          get_itbl(q)->type == TSO ||           
2570          get_itbl(q)->type == BLOCKED_FETCH || 
2571          get_itbl(q)->type == CONSTR); 
2572
2573   bqe = q;
2574   while (get_itbl(bqe)->type==TSO || 
2575          get_itbl(bqe)->type==BLOCKED_FETCH) {
2576     bqe = unblockOneLocked(bqe, node);
2577   }
2578   RELEASE_LOCK(&sched_mutex);
2579 }
2580
2581 #else   /* !GRAN && !PAR */
2582
2583 void
2584 awakenBlockedQueueNoLock(StgTSO *tso)
2585 {
2586   while (tso != END_TSO_QUEUE) {
2587     tso = unblockOneLocked(tso);
2588   }
2589 }
2590
2591 void
2592 awakenBlockedQueue(StgTSO *tso)
2593 {
2594   ACQUIRE_LOCK(&sched_mutex);
2595   while (tso != END_TSO_QUEUE) {
2596     tso = unblockOneLocked(tso);
2597   }
2598   RELEASE_LOCK(&sched_mutex);
2599 }
2600 #endif
2601
2602 /* ---------------------------------------------------------------------------
2603    Interrupt execution
2604    - usually called inside a signal handler so it mustn't do anything fancy.   
2605    ------------------------------------------------------------------------ */
2606
2607 void
2608 interruptStgRts(void)
2609 {
2610     interrupted    = 1;
2611     context_switch = 1;
2612 #ifdef RTS_SUPPORTS_THREADS
2613     wakeBlockedWorkerThread();
2614 #endif
2615 }
2616
2617 /* -----------------------------------------------------------------------------
2618    Unblock a thread
2619
2620    This is for use when we raise an exception in another thread, which
2621    may be blocked.
2622    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2623    -------------------------------------------------------------------------- */
2624
2625 #if defined(GRAN) || defined(PAR)
2626 /*
2627   NB: only the type of the blocking queue is different in GranSim and GUM
2628       the operations on the queue-elements are the same
2629       long live polymorphism!
2630
2631   Locks: sched_mutex is held upon entry and exit.
2632
2633 */
2634 static void
2635 unblockThread(StgTSO *tso)
2636 {
2637   StgBlockingQueueElement *t, **last;
2638
2639   switch (tso->why_blocked) {
2640
2641   case NotBlocked:
2642     return;  /* not blocked */
2643
2644   case BlockedOnMVar:
2645     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2646     {
2647       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2648       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2649
2650       last = (StgBlockingQueueElement **)&mvar->head;
2651       for (t = (StgBlockingQueueElement *)mvar->head; 
2652            t != END_BQ_QUEUE; 
2653            last = &t->link, last_tso = t, t = t->link) {
2654         if (t == (StgBlockingQueueElement *)tso) {
2655           *last = (StgBlockingQueueElement *)tso->link;
2656           if (mvar->tail == tso) {
2657             mvar->tail = (StgTSO *)last_tso;
2658           }
2659           goto done;
2660         }
2661       }
2662       barf("unblockThread (MVAR): TSO not found");
2663     }
2664
2665   case BlockedOnBlackHole:
2666     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2667     {
2668       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2669
2670       last = &bq->blocking_queue;
2671       for (t = bq->blocking_queue; 
2672            t != END_BQ_QUEUE; 
2673            last = &t->link, t = t->link) {
2674         if (t == (StgBlockingQueueElement *)tso) {
2675           *last = (StgBlockingQueueElement *)tso->link;
2676           goto done;
2677         }
2678       }
2679       barf("unblockThread (BLACKHOLE): TSO not found");
2680     }
2681
2682   case BlockedOnException:
2683     {
2684       StgTSO *target  = tso->block_info.tso;
2685
2686       ASSERT(get_itbl(target)->type == TSO);
2687
2688       if (target->what_next == ThreadRelocated) {
2689           target = target->link;
2690           ASSERT(get_itbl(target)->type == TSO);
2691       }
2692
2693       ASSERT(target->blocked_exceptions != NULL);
2694
2695       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2696       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2697            t != END_BQ_QUEUE; 
2698            last = &t->link, t = t->link) {
2699         ASSERT(get_itbl(t)->type == TSO);
2700         if (t == (StgBlockingQueueElement *)tso) {
2701           *last = (StgBlockingQueueElement *)tso->link;
2702           goto done;
2703         }
2704       }
2705       barf("unblockThread (Exception): TSO not found");
2706     }
2707
2708   case BlockedOnRead:
2709   case BlockedOnWrite:
2710 #if defined(mingw32_TARGET_OS)
2711   case BlockedOnDoProc:
2712 #endif
2713     {
2714       /* take TSO off blocked_queue */
2715       StgBlockingQueueElement *prev = NULL;
2716       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2717            prev = t, t = t->link) {
2718         if (t == (StgBlockingQueueElement *)tso) {
2719           if (prev == NULL) {
2720             blocked_queue_hd = (StgTSO *)t->link;
2721             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2722               blocked_queue_tl = END_TSO_QUEUE;
2723             }
2724           } else {
2725             prev->link = t->link;
2726             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2727               blocked_queue_tl = (StgTSO *)prev;
2728             }
2729           }
2730           goto done;
2731         }
2732       }
2733       barf("unblockThread (I/O): TSO not found");
2734     }
2735
2736   case BlockedOnDelay:
2737     {
2738       /* take TSO off sleeping_queue */
2739       StgBlockingQueueElement *prev = NULL;
2740       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2741            prev = t, t = t->link) {
2742         if (t == (StgBlockingQueueElement *)tso) {
2743           if (prev == NULL) {
2744             sleeping_queue = (StgTSO *)t->link;
2745           } else {
2746             prev->link = t->link;
2747           }
2748           goto done;
2749         }
2750       }
2751       barf("unblockThread (delay): TSO not found");
2752     }
2753
2754   default:
2755     barf("unblockThread");
2756   }
2757
2758  done:
2759   tso->link = END_TSO_QUEUE;
2760   tso->why_blocked = NotBlocked;
2761   tso->block_info.closure = NULL;
2762   PUSH_ON_RUN_QUEUE(tso);
2763 }
2764 #else
2765 static void
2766 unblockThread(StgTSO *tso)
2767 {
2768   StgTSO *t, **last;
2769   
2770   /* To avoid locking unnecessarily. */
2771   if (tso->why_blocked == NotBlocked) {
2772     return;
2773   }
2774
2775   switch (tso->why_blocked) {
2776
2777   case BlockedOnMVar:
2778     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2779     {
2780       StgTSO *last_tso = END_TSO_QUEUE;
2781       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2782
2783       last = &mvar->head;
2784       for (t = mvar->head; t != END_TSO_QUEUE; 
2785            last = &t->link, last_tso = t, t = t->link) {
2786         if (t == tso) {
2787           *last = tso->link;
2788           if (mvar->tail == tso) {
2789             mvar->tail = last_tso;
2790           }
2791           goto done;
2792         }
2793       }
2794       barf("unblockThread (MVAR): TSO not found");
2795     }
2796
2797   case BlockedOnBlackHole:
2798     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2799     {
2800       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2801
2802       last = &bq->blocking_queue;
2803       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2804            last = &t->link, t = t->link) {
2805         if (t == tso) {
2806           *last = tso->link;
2807           goto done;
2808         }
2809       }
2810       barf("unblockThread (BLACKHOLE): TSO not found");
2811     }
2812
2813   case BlockedOnException:
2814     {
2815       StgTSO *target  = tso->block_info.tso;
2816
2817       ASSERT(get_itbl(target)->type == TSO);
2818
2819       while (target->what_next == ThreadRelocated) {
2820           target = target->link;
2821           ASSERT(get_itbl(target)->type == TSO);
2822       }
2823       
2824       ASSERT(target->blocked_exceptions != NULL);
2825
2826       last = &target->blocked_exceptions;
2827       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2828            last = &t->link, t = t->link) {
2829         ASSERT(get_itbl(t)->type == TSO);
2830         if (t == tso) {
2831           *last = tso->link;
2832           goto done;
2833         }
2834       }
2835       barf("unblockThread (Exception): TSO not found");
2836     }
2837
2838   case BlockedOnRead:
2839   case BlockedOnWrite:
2840 #if defined(mingw32_TARGET_OS)
2841   case BlockedOnDoProc:
2842 #endif
2843     {
2844       StgTSO *prev = NULL;
2845       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2846            prev = t, t = t->link) {
2847         if (t == tso) {
2848           if (prev == NULL) {
2849             blocked_queue_hd = t->link;
2850             if (blocked_queue_tl == t) {
2851               blocked_queue_tl = END_TSO_QUEUE;
2852             }
2853           } else {
2854             prev->link = t->link;
2855             if (blocked_queue_tl == t) {
2856               blocked_queue_tl = prev;
2857             }
2858           }
2859           goto done;
2860         }
2861       }
2862       barf("unblockThread (I/O): TSO not found");
2863     }
2864
2865   case BlockedOnDelay:
2866     {
2867       StgTSO *prev = NULL;
2868       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2869            prev = t, t = t->link) {
2870         if (t == tso) {
2871           if (prev == NULL) {
2872             sleeping_queue = t->link;
2873           } else {
2874             prev->link = t->link;
2875           }
2876           goto done;
2877         }
2878       }
2879       barf("unblockThread (delay): TSO not found");
2880     }
2881
2882   default:
2883     barf("unblockThread");
2884   }
2885
2886  done:
2887   tso->link = END_TSO_QUEUE;
2888   tso->why_blocked = NotBlocked;
2889   tso->block_info.closure = NULL;
2890   PUSH_ON_RUN_QUEUE(tso);
2891 }
2892 #endif
2893
2894 /* -----------------------------------------------------------------------------
2895  * raiseAsync()
2896  *
2897  * The following function implements the magic for raising an
2898  * asynchronous exception in an existing thread.
2899  *
2900  * We first remove the thread from any queue on which it might be
2901  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2902  *
2903  * We strip the stack down to the innermost CATCH_FRAME, building
2904  * thunks in the heap for all the active computations, so they can 
2905  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2906  * an application of the handler to the exception, and push it on
2907  * the top of the stack.
2908  * 
2909  * How exactly do we save all the active computations?  We create an
2910  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2911  * AP_STACKs pushes everything from the corresponding update frame
2912  * upwards onto the stack.  (Actually, it pushes everything up to the
2913  * next update frame plus a pointer to the next AP_STACK object.
2914  * Entering the next AP_STACK object pushes more onto the stack until we
2915  * reach the last AP_STACK object - at which point the stack should look
2916  * exactly as it did when we killed the TSO and we can continue
2917  * execution by entering the closure on top of the stack.
2918  *
2919  * We can also kill a thread entirely - this happens if either (a) the 
2920  * exception passed to raiseAsync is NULL, or (b) there's no
2921  * CATCH_FRAME on the stack.  In either case, we strip the entire
2922  * stack and replace the thread with a zombie.
2923  *
2924  * Locks: sched_mutex held upon entry nor exit.
2925  *
2926  * -------------------------------------------------------------------------- */
2927  
2928 void 
2929 deleteThread(StgTSO *tso)
2930 {
2931   raiseAsync(tso,NULL);
2932 }
2933
2934 static void 
2935 deleteThreadImmediately(StgTSO *tso)
2936 { // for forkProcess only:
2937   // delete thread without giving it a chance to catch the KillThread exception
2938
2939   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2940       return;
2941   }
2942
2943   if (tso->why_blocked != BlockedOnCCall &&
2944       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2945     unblockThread(tso);
2946   }
2947
2948   tso->what_next = ThreadKilled;
2949 }
2950
2951 void
2952 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2953 {
2954   /* When raising async exs from contexts where sched_mutex isn't held;
2955      use raiseAsyncWithLock(). */
2956   ACQUIRE_LOCK(&sched_mutex);
2957   raiseAsync(tso,exception);
2958   RELEASE_LOCK(&sched_mutex);
2959 }
2960
2961 void
2962 raiseAsync(StgTSO *tso, StgClosure *exception)
2963 {
2964     StgRetInfoTable *info;
2965     StgPtr sp;
2966   
2967     // Thread already dead?
2968     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2969         return;
2970     }
2971
2972     IF_DEBUG(scheduler, 
2973              sched_belch("raising exception in thread %ld.", tso->id));
2974     
2975     // Remove it from any blocking queues
2976     unblockThread(tso);
2977
2978     sp = tso->sp;
2979     
2980     // The stack freezing code assumes there's a closure pointer on
2981     // the top of the stack, so we have to arrange that this is the case...
2982     //
2983     if (sp[0] == (W_)&stg_enter_info) {
2984         sp++;
2985     } else {
2986         sp--;
2987         sp[0] = (W_)&stg_dummy_ret_closure;
2988     }
2989
2990     while (1) {
2991         nat i;
2992
2993         // 1. Let the top of the stack be the "current closure"
2994         //
2995         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
2996         // CATCH_FRAME.
2997         //
2998         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
2999         // current closure applied to the chunk of stack up to (but not
3000         // including) the update frame.  This closure becomes the "current
3001         // closure".  Go back to step 2.
3002         //
3003         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3004         // top of the stack applied to the exception.
3005         // 
3006         // 5. If it's a STOP_FRAME, then kill the thread.
3007         
3008         StgPtr frame;
3009         
3010         frame = sp + 1;
3011         info = get_ret_itbl((StgClosure *)frame);
3012         
3013         while (info->i.type != UPDATE_FRAME
3014                && (info->i.type != CATCH_FRAME || exception == NULL)
3015                && info->i.type != STOP_FRAME) {
3016             frame += stack_frame_sizeW((StgClosure *)frame);
3017             info = get_ret_itbl((StgClosure *)frame);
3018         }
3019         
3020         switch (info->i.type) {
3021             
3022         case CATCH_FRAME:
3023             // If we find a CATCH_FRAME, and we've got an exception to raise,
3024             // then build the THUNK raise(exception), and leave it on
3025             // top of the CATCH_FRAME ready to enter.
3026             //
3027         {
3028 #ifdef PROFILING
3029             StgCatchFrame *cf = (StgCatchFrame *)frame;
3030 #endif
3031             StgClosure *raise;
3032             
3033             // we've got an exception to raise, so let's pass it to the
3034             // handler in this frame.
3035             //
3036             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3037             TICK_ALLOC_SE_THK(1,0);
3038             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3039             raise->payload[0] = exception;
3040             
3041             // throw away the stack from Sp up to the CATCH_FRAME.
3042             //
3043             sp = frame - 1;
3044             
3045             /* Ensure that async excpetions are blocked now, so we don't get
3046              * a surprise exception before we get around to executing the
3047              * handler.
3048              */
3049             if (tso->blocked_exceptions == NULL) {
3050                 tso->blocked_exceptions = END_TSO_QUEUE;
3051             }
3052             
3053             /* Put the newly-built THUNK on top of the stack, ready to execute
3054              * when the thread restarts.
3055              */
3056             sp[0] = (W_)raise;
3057             sp[-1] = (W_)&stg_enter_info;
3058             tso->sp = sp-1;
3059             tso->what_next = ThreadRunGHC;
3060             IF_DEBUG(sanity, checkTSO(tso));
3061             return;
3062         }
3063         
3064         case UPDATE_FRAME:
3065         {
3066             StgAP_STACK * ap;
3067             nat words;
3068             
3069             // First build an AP_STACK consisting of the stack chunk above the
3070             // current update frame, with the top word on the stack as the
3071             // fun field.
3072             //
3073             words = frame - sp - 1;
3074             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3075             
3076             ap->size = words;
3077             ap->fun  = (StgClosure *)sp[0];
3078             sp++;
3079             for(i=0; i < (nat)words; ++i) {
3080                 ap->payload[i] = (StgClosure *)*sp++;
3081             }
3082             
3083             SET_HDR(ap,&stg_AP_STACK_info,
3084                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3085             TICK_ALLOC_UP_THK(words+1,0);
3086             
3087             IF_DEBUG(scheduler,
3088                      fprintf(stderr,  "sched: Updating ");
3089                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3090                      fprintf(stderr,  " with ");
3091                      printObj((StgClosure *)ap);
3092                 );
3093
3094             // Replace the updatee with an indirection - happily
3095             // this will also wake up any threads currently
3096             // waiting on the result.
3097             //
3098             // Warning: if we're in a loop, more than one update frame on
3099             // the stack may point to the same object.  Be careful not to
3100             // overwrite an IND_OLDGEN in this case, because we'll screw
3101             // up the mutable lists.  To be on the safe side, don't
3102             // overwrite any kind of indirection at all.  See also
3103             // threadSqueezeStack in GC.c, where we have to make a similar
3104             // check.
3105             //
3106             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3107                 // revert the black hole
3108                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3109             }
3110             sp += sizeofW(StgUpdateFrame) - 1;
3111             sp[0] = (W_)ap; // push onto stack
3112             break;
3113         }
3114         
3115         case STOP_FRAME:
3116             // We've stripped the entire stack, the thread is now dead.
3117             sp += sizeofW(StgStopFrame);
3118             tso->what_next = ThreadKilled;
3119             tso->sp = sp;
3120             return;
3121             
3122         default:
3123             barf("raiseAsync");
3124         }
3125     }
3126     barf("raiseAsync");
3127 }
3128
3129 /* -----------------------------------------------------------------------------
3130    resurrectThreads is called after garbage collection on the list of
3131    threads found to be garbage.  Each of these threads will be woken
3132    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3133    on an MVar, or NonTermination if the thread was blocked on a Black
3134    Hole.
3135
3136    Locks: sched_mutex isn't held upon entry nor exit.
3137    -------------------------------------------------------------------------- */
3138
3139 void
3140 resurrectThreads( StgTSO *threads )
3141 {
3142   StgTSO *tso, *next;
3143
3144   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3145     next = tso->global_link;
3146     tso->global_link = all_threads;
3147     all_threads = tso;
3148     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3149
3150     switch (tso->why_blocked) {
3151     case BlockedOnMVar:
3152     case BlockedOnException:
3153       /* Called by GC - sched_mutex lock is currently held. */
3154       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3155       break;
3156     case BlockedOnBlackHole:
3157       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3158       break;
3159     case NotBlocked:
3160       /* This might happen if the thread was blocked on a black hole
3161        * belonging to a thread that we've just woken up (raiseAsync
3162        * can wake up threads, remember...).
3163        */
3164       continue;
3165     default:
3166       barf("resurrectThreads: thread blocked in a strange way");
3167     }
3168   }
3169 }
3170
3171 /* -----------------------------------------------------------------------------
3172  * Blackhole detection: if we reach a deadlock, test whether any
3173  * threads are blocked on themselves.  Any threads which are found to
3174  * be self-blocked get sent a NonTermination exception.
3175  *
3176  * This is only done in a deadlock situation in order to avoid
3177  * performance overhead in the normal case.
3178  *
3179  * Locks: sched_mutex is held upon entry and exit.
3180  * -------------------------------------------------------------------------- */
3181
3182 static void
3183 detectBlackHoles( void )
3184 {
3185     StgTSO *tso = all_threads;
3186     StgClosure *frame;
3187     StgClosure *blocked_on;
3188     StgRetInfoTable *info;
3189
3190     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3191
3192         while (tso->what_next == ThreadRelocated) {
3193             tso = tso->link;
3194             ASSERT(get_itbl(tso)->type == TSO);
3195         }
3196       
3197         if (tso->why_blocked != BlockedOnBlackHole) {
3198             continue;
3199         }
3200         blocked_on = tso->block_info.closure;
3201
3202         frame = (StgClosure *)tso->sp;
3203
3204         while(1) {
3205             info = get_ret_itbl(frame);
3206             switch (info->i.type) {
3207             case UPDATE_FRAME:
3208                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3209                     /* We are blocking on one of our own computations, so
3210                      * send this thread the NonTermination exception.  
3211                      */
3212                     IF_DEBUG(scheduler, 
3213                              sched_belch("thread %d is blocked on itself", tso->id));
3214                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3215                     goto done;
3216                 }
3217                 
3218                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3219                 continue;
3220
3221             case STOP_FRAME:
3222                 goto done;
3223
3224                 // normal stack frames; do nothing except advance the pointer
3225             default:
3226                 (StgPtr)frame += stack_frame_sizeW(frame);
3227             }
3228         }   
3229         done: ;
3230     }
3231 }
3232
3233 /* ----------------------------------------------------------------------------
3234  * Debugging: why is a thread blocked
3235  * [Also provides useful information when debugging threaded programs
3236  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3237    ------------------------------------------------------------------------- */
3238
3239 static
3240 void
3241 printThreadBlockage(StgTSO *tso)
3242 {
3243   switch (tso->why_blocked) {
3244   case BlockedOnRead:
3245     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3246     break;
3247   case BlockedOnWrite:
3248     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3249     break;
3250 #if defined(mingw32_TARGET_OS)
3251     case BlockedOnDoProc:
3252     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3253     break;
3254 #endif
3255   case BlockedOnDelay:
3256     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3257     break;
3258   case BlockedOnMVar:
3259     fprintf(stderr,"is blocked on an MVar");
3260     break;
3261   case BlockedOnException:
3262     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3263             tso->block_info.tso->id);
3264     break;
3265   case BlockedOnBlackHole:
3266     fprintf(stderr,"is blocked on a black hole");
3267     break;
3268   case NotBlocked:
3269     fprintf(stderr,"is not blocked");
3270     break;
3271 #if defined(PAR)
3272   case BlockedOnGA:
3273     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3274             tso->block_info.closure, info_type(tso->block_info.closure));
3275     break;
3276   case BlockedOnGA_NoSend:
3277     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3278             tso->block_info.closure, info_type(tso->block_info.closure));
3279     break;
3280 #endif
3281   case BlockedOnCCall:
3282     fprintf(stderr,"is blocked on an external call");
3283     break;
3284   case BlockedOnCCall_NoUnblockExc:
3285     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3286     break;
3287   default:
3288     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3289          tso->why_blocked, tso->id, tso);
3290   }
3291 }
3292
3293 static
3294 void
3295 printThreadStatus(StgTSO *tso)
3296 {
3297   switch (tso->what_next) {
3298   case ThreadKilled:
3299     fprintf(stderr,"has been killed");
3300     break;
3301   case ThreadComplete:
3302     fprintf(stderr,"has completed");
3303     break;
3304   default:
3305     printThreadBlockage(tso);
3306   }
3307 }
3308
3309 void
3310 printAllThreads(void)
3311 {
3312   StgTSO *t;
3313   void *label;
3314
3315 # if defined(GRAN)
3316   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3317   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3318                        time_string, rtsFalse/*no commas!*/);
3319
3320   fprintf(stderr, "all threads at [%s]:\n", time_string);
3321 # elif defined(PAR)
3322   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3323   ullong_format_string(CURRENT_TIME,
3324                        time_string, rtsFalse/*no commas!*/);
3325
3326   fprintf(stderr,"all threads at [%s]:\n", time_string);
3327 # else
3328   fprintf(stderr,"all threads:\n");
3329 # endif
3330
3331   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3332     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3333     label = lookupThreadLabel(t->id);
3334     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3335     printThreadStatus(t);
3336     fprintf(stderr,"\n");
3337   }
3338 }
3339     
3340 #ifdef DEBUG
3341
3342 /* 
3343    Print a whole blocking queue attached to node (debugging only).
3344 */
3345 # if defined(PAR)
3346 void 
3347 print_bq (StgClosure *node)
3348 {
3349   StgBlockingQueueElement *bqe;
3350   StgTSO *tso;
3351   rtsBool end;
3352
3353   fprintf(stderr,"## BQ of closure %p (%s): ",
3354           node, info_type(node));
3355
3356   /* should cover all closures that may have a blocking queue */
3357   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3358          get_itbl(node)->type == FETCH_ME_BQ ||
3359          get_itbl(node)->type == RBH ||
3360          get_itbl(node)->type == MVAR);
3361     
3362   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3363
3364   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3365 }
3366
3367 /* 
3368    Print a whole blocking queue starting with the element bqe.
3369 */
3370 void 
3371 print_bqe (StgBlockingQueueElement *bqe)
3372 {
3373   rtsBool end;
3374
3375   /* 
3376      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3377   */
3378   for (end = (bqe==END_BQ_QUEUE);
3379        !end; // iterate until bqe points to a CONSTR
3380        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3381        bqe = end ? END_BQ_QUEUE : bqe->link) {
3382     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3383     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3384     /* types of closures that may appear in a blocking queue */
3385     ASSERT(get_itbl(bqe)->type == TSO ||           
3386            get_itbl(bqe)->type == BLOCKED_FETCH || 
3387            get_itbl(bqe)->type == CONSTR); 
3388     /* only BQs of an RBH end with an RBH_Save closure */
3389     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3390
3391     switch (get_itbl(bqe)->type) {
3392     case TSO:
3393       fprintf(stderr," TSO %u (%x),",
3394               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3395       break;
3396     case BLOCKED_FETCH:
3397       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3398               ((StgBlockedFetch *)bqe)->node, 
3399               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3400               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3401               ((StgBlockedFetch *)bqe)->ga.weight);
3402       break;
3403     case CONSTR:
3404       fprintf(stderr," %s (IP %p),",
3405               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3406                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3407                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3408                "RBH_Save_?"), get_itbl(bqe));
3409       break;
3410     default:
3411       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3412            info_type((StgClosure *)bqe)); // , node, info_type(node));
3413       break;
3414     }
3415   } /* for */
3416   fputc('\n', stderr);
3417 }
3418 # elif defined(GRAN)
3419 void 
3420 print_bq (StgClosure *node)
3421 {
3422   StgBlockingQueueElement *bqe;
3423   PEs node_loc, tso_loc;
3424   rtsBool end;
3425
3426   /* should cover all closures that may have a blocking queue */
3427   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3428          get_itbl(node)->type == FETCH_ME_BQ ||
3429          get_itbl(node)->type == RBH);
3430     
3431   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3432   node_loc = where_is(node);
3433
3434   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3435           node, info_type(node), node_loc);
3436
3437   /* 
3438      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3439   */
3440   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3441        !end; // iterate until bqe points to a CONSTR
3442        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3443     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3444     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3445     /* types of closures that may appear in a blocking queue */
3446     ASSERT(get_itbl(bqe)->type == TSO ||           
3447            get_itbl(bqe)->type == CONSTR); 
3448     /* only BQs of an RBH end with an RBH_Save closure */
3449     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3450
3451     tso_loc = where_is((StgClosure *)bqe);
3452     switch (get_itbl(bqe)->type) {
3453     case TSO:
3454       fprintf(stderr," TSO %d (%p) on [PE %d],",
3455               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3456       break;
3457     case CONSTR:
3458       fprintf(stderr," %s (IP %p),",
3459               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3460                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3461                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3462                "RBH_Save_?"), get_itbl(bqe));
3463       break;
3464     default:
3465       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3466            info_type((StgClosure *)bqe), node, info_type(node));
3467       break;
3468     }
3469   } /* for */
3470   fputc('\n', stderr);
3471 }
3472 #else
3473 /* 
3474    Nice and easy: only TSOs on the blocking queue
3475 */
3476 void 
3477 print_bq (StgClosure *node)
3478 {
3479   StgTSO *tso;
3480
3481   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3482   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3483        tso != END_TSO_QUEUE; 
3484        tso=tso->link) {
3485     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3486     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3487     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3488   }
3489   fputc('\n', stderr);
3490 }
3491 # endif
3492
3493 #if defined(PAR)
3494 static nat
3495 run_queue_len(void)
3496 {
3497   nat i;
3498   StgTSO *tso;
3499
3500   for (i=0, tso=run_queue_hd; 
3501        tso != END_TSO_QUEUE;
3502        i++, tso=tso->link)
3503     /* nothing */
3504
3505   return i;
3506 }
3507 #endif
3508
3509 void
3510 sched_belch(char *s, ...)
3511 {
3512   va_list ap;
3513   va_start(ap,s);
3514 #ifdef RTS_SUPPORTS_THREADS
3515   fprintf(stderr, "sched (task %p): ", osThreadId());
3516 #elif defined(PAR)
3517   fprintf(stderr, "== ");
3518 #else
3519   fprintf(stderr, "sched: ");
3520 #endif
3521   vfprintf(stderr, s, ap);
3522   fprintf(stderr, "\n");
3523   fflush(stderr);
3524   va_end(ap);
3525 }
3526
3527 #endif /* DEBUG */