[project @ 2004-03-13 00:56:45 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.194 2004/03/13 00:56:45 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2003
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 /* 
21  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22
23    The main scheduling loop in GUM iterates until a finish message is received.
24    In that case a global flag @receivedFinish@ is set and this instance of
25    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
26    for the handling of incoming messages, such as PP_FINISH.
27    Note that in the parallel case we have a system manager that coordinates
28    different PEs, each of which are running one instance of the RTS.
29    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
30    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31
32  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33
34    The main scheduling code in GranSim is quite different from that in std
35    (concurrent) Haskell: while concurrent Haskell just iterates over the
36    threads in the runnable queue, GranSim is event driven, i.e. it iterates
37    over the events in the global event queue.  -- HWL
38 */
39
40 #include "PosixSource.h"
41 #include "Rts.h"
42 #include "SchedAPI.h"
43 #include "RtsUtils.h"
44 #include "RtsFlags.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "StgStartup.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Storage.h"
53 #include "Interpreter.h"
54 #include "Exception.h"
55 #include "Printer.h"
56 #include "Signals.h"
57 #include "Sanity.h"
58 #include "Stats.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #ifdef PROFILING
63 #include "Proftimer.h"
64 #include "ProfHeap.h"
65 #endif
66 #if defined(GRAN) || defined(PAR)
67 # include "GranSimRts.h"
68 # include "GranSim.h"
69 # include "ParallelRts.h"
70 # include "Parallel.h"
71 # include "ParallelDebug.h"
72 # include "FetchMe.h"
73 # include "HLC.h"
74 #endif
75 #include "Sparks.h"
76 #include "Capability.h"
77 #include "OSThreads.h"
78 #include  "Task.h"
79
80 #ifdef HAVE_SYS_TYPES_H
81 #include <sys/types.h>
82 #endif
83 #ifdef HAVE_UNISTD_H
84 #include <unistd.h>
85 #endif
86
87 #include <string.h>
88 #include <stdlib.h>
89 #include <stdarg.h>
90
91 #ifdef HAVE_ERRNO_H
92 #include <errno.h>
93 #endif
94
95 #ifdef THREADED_RTS
96 #define USED_IN_THREADED_RTS
97 #else
98 #define USED_IN_THREADED_RTS STG_UNUSED
99 #endif
100
101 #ifdef RTS_SUPPORTS_THREADS
102 #define USED_WHEN_RTS_SUPPORTS_THREADS
103 #else
104 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
105 #endif
106
107 /* Main thread queue.
108  * Locks required: sched_mutex.
109  */
110 StgMainThread *main_threads = NULL;
111
112 /* Thread queues.
113  * Locks required: sched_mutex.
114  */
115 #if defined(GRAN)
116
117 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
118 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
119
120 /* 
121    In GranSim we have a runnable and a blocked queue for each processor.
122    In order to minimise code changes new arrays run_queue_hds/tls
123    are created. run_queue_hd is then a short cut (macro) for
124    run_queue_hds[CurrentProc] (see GranSim.h).
125    -- HWL
126 */
127 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
128 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
129 StgTSO *ccalling_threadss[MAX_PROC];
130 /* We use the same global list of threads (all_threads) in GranSim as in
131    the std RTS (i.e. we are cheating). However, we don't use this list in
132    the GranSim specific code at the moment (so we are only potentially
133    cheating).  */
134
135 #else /* !GRAN */
136
137 StgTSO *run_queue_hd = NULL;
138 StgTSO *run_queue_tl = NULL;
139 StgTSO *blocked_queue_hd = NULL;
140 StgTSO *blocked_queue_tl = NULL;
141 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
142
143 #endif
144
145 /* Linked list of all threads.
146  * Used for detecting garbage collected threads.
147  */
148 StgTSO *all_threads = NULL;
149
150 /* When a thread performs a safe C call (_ccall_GC, using old
151  * terminology), it gets put on the suspended_ccalling_threads
152  * list. Used by the garbage collector.
153  */
154 static StgTSO *suspended_ccalling_threads;
155
156 static StgTSO *threadStackOverflow(StgTSO *tso);
157
158 /* KH: The following two flags are shared memory locations.  There is no need
159        to lock them, since they are only unset at the end of a scheduler
160        operation.
161 */
162
163 /* flag set by signal handler to precipitate a context switch */
164 nat context_switch = 0;
165
166 /* if this flag is set as well, give up execution */
167 rtsBool interrupted = rtsFalse;
168
169 /* Next thread ID to allocate.
170  * Locks required: thread_id_mutex
171  */
172 static StgThreadID next_thread_id = 1;
173
174 /*
175  * Pointers to the state of the current thread.
176  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
177  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
178  */
179  
180 /* The smallest stack size that makes any sense is:
181  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
182  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
183  *  + 1                       (the closure to enter)
184  *  + 1                       (stg_ap_v_ret)
185  *  + 1                       (spare slot req'd by stg_ap_v_ret)
186  *
187  * A thread with this stack will bomb immediately with a stack
188  * overflow, which will increase its stack size.  
189  */
190
191 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
192
193
194 #if defined(GRAN)
195 StgTSO *CurrentTSO;
196 #endif
197
198 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
199  *  exists - earlier gccs apparently didn't.
200  *  -= chak
201  */
202 StgTSO dummy_tso;
203
204 static rtsBool ready_to_gc;
205
206 /*
207  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
208  * in an MT setting, needed to signal that a worker thread shouldn't hang around
209  * in the scheduler when it is out of work.
210  */
211 static rtsBool shutting_down_scheduler = rtsFalse;
212
213 void            addToBlockedQueue ( StgTSO *tso );
214
215 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
216        void     interruptStgRts   ( void );
217
218 static void     detectBlackHoles  ( void );
219
220 #if defined(RTS_SUPPORTS_THREADS)
221 /* ToDo: carefully document the invariants that go together
222  *       with these synchronisation objects.
223  */
224 Mutex     sched_mutex       = INIT_MUTEX_VAR;
225 Mutex     term_mutex        = INIT_MUTEX_VAR;
226
227 #endif /* RTS_SUPPORTS_THREADS */
228
229 #if defined(PAR)
230 StgTSO *LastTSO;
231 rtsTime TimeOfLastYield;
232 rtsBool emitSchedule = rtsTrue;
233 #endif
234
235 #if DEBUG
236 static char *whatNext_strs[] = {
237   "ThreadRunGHC",
238   "ThreadInterpret",
239   "ThreadKilled",
240   "ThreadRelocated",
241   "ThreadComplete"
242 };
243 #endif
244
245 #if defined(PAR)
246 StgTSO * createSparkThread(rtsSpark spark);
247 StgTSO * activateSpark (rtsSpark spark);  
248 #endif
249
250 /* ----------------------------------------------------------------------------
251  * Starting Tasks
252  * ------------------------------------------------------------------------- */
253
254 #if defined(RTS_SUPPORTS_THREADS)
255 static rtsBool startingWorkerThread = rtsFalse;
256
257 static void taskStart(void);
258 static void
259 taskStart(void)
260 {
261   ACQUIRE_LOCK(&sched_mutex);
262   startingWorkerThread = rtsFalse;
263   schedule(NULL,NULL);
264   RELEASE_LOCK(&sched_mutex);
265 }
266
267 void
268 startSchedulerTaskIfNecessary(void)
269 {
270   if(run_queue_hd != END_TSO_QUEUE
271     || blocked_queue_hd != END_TSO_QUEUE
272     || sleeping_queue != END_TSO_QUEUE)
273   {
274     if(!startingWorkerThread)
275     { // we don't want to start another worker thread
276       // just because the last one hasn't yet reached the
277       // "waiting for capability" state
278       startingWorkerThread = rtsTrue;
279       startTask(taskStart);
280     }
281   }
282 }
283 #endif
284
285 /* ---------------------------------------------------------------------------
286    Main scheduling loop.
287
288    We use round-robin scheduling, each thread returning to the
289    scheduler loop when one of these conditions is detected:
290
291       * out of heap space
292       * timer expires (thread yields)
293       * thread blocks
294       * thread ends
295       * stack overflow
296
297    Locking notes:  we acquire the scheduler lock once at the beginning
298    of the scheduler loop, and release it when
299     
300       * running a thread, or
301       * waiting for work, or
302       * waiting for a GC to complete.
303
304    GRAN version:
305      In a GranSim setup this loop iterates over the global event queue.
306      This revolves around the global event queue, which determines what 
307      to do next. Therefore, it's more complicated than either the 
308      concurrent or the parallel (GUM) setup.
309
310    GUM version:
311      GUM iterates over incoming messages.
312      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
313      and sends out a fish whenever it has nothing to do; in-between
314      doing the actual reductions (shared code below) it processes the
315      incoming messages and deals with delayed operations 
316      (see PendingFetches).
317      This is not the ugliest code you could imagine, but it's bloody close.
318
319    ------------------------------------------------------------------------ */
320 static void
321 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
322           Capability *initialCapability )
323 {
324   StgTSO *t;
325   Capability *cap;
326   StgThreadReturnCode ret;
327 #if defined(GRAN)
328   rtsEvent *event;
329 #elif defined(PAR)
330   StgSparkPool *pool;
331   rtsSpark spark;
332   StgTSO *tso;
333   GlobalTaskId pe;
334   rtsBool receivedFinish = rtsFalse;
335 # if defined(DEBUG)
336   nat tp_size, sp_size; // stats only
337 # endif
338 #endif
339   rtsBool was_interrupted = rtsFalse;
340   StgTSOWhatNext prev_what_next;
341   
342   // Pre-condition: sched_mutex is held.
343   // We might have a capability, passed in as initialCapability.
344   cap = initialCapability;
345
346 #if defined(RTS_SUPPORTS_THREADS)
347   //
348   // in the threaded case, the capability is either passed in via the
349   // initialCapability parameter, or initialized inside the scheduler
350   // loop 
351   //
352   IF_DEBUG(scheduler,
353            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
354                        mainThread, initialCapability);
355       );
356 #else
357   // simply initialise it in the non-threaded case
358   grabCapability(&cap);
359 #endif
360
361 #if defined(GRAN)
362   /* set up first event to get things going */
363   /* ToDo: assign costs for system setup and init MainTSO ! */
364   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
365             ContinueThread, 
366             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
367
368   IF_DEBUG(gran,
369            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
370            G_TSO(CurrentTSO, 5));
371
372   if (RtsFlags.GranFlags.Light) {
373     /* Save current time; GranSim Light only */
374     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
375   }      
376
377   event = get_next_event();
378
379   while (event!=(rtsEvent*)NULL) {
380     /* Choose the processor with the next event */
381     CurrentProc = event->proc;
382     CurrentTSO = event->tso;
383
384 #elif defined(PAR)
385
386   while (!receivedFinish) {    /* set by processMessages */
387                                /* when receiving PP_FINISH message         */ 
388
389 #else // everything except GRAN and PAR
390
391   while (1) {
392
393 #endif
394
395      IF_DEBUG(scheduler, printAllThreads());
396
397 #if defined(RTS_SUPPORTS_THREADS)
398       // Yield the capability to higher-priority tasks if necessary.
399       //
400       if (cap != NULL) {
401           yieldCapability(&cap);
402       }
403
404       // If we do not currently hold a capability, we wait for one
405       //
406       if (cap == NULL) {
407           waitForCapability(&sched_mutex, &cap,
408                             mainThread ? &mainThread->bound_thread_cond : NULL);
409       }
410
411       // We now have a capability...
412 #endif
413
414     //
415     // If we're interrupted (the user pressed ^C, or some other
416     // termination condition occurred), kill all the currently running
417     // threads.
418     //
419     if (interrupted) {
420         IF_DEBUG(scheduler, sched_belch("interrupted"));
421         interrupted = rtsFalse;
422         was_interrupted = rtsTrue;
423 #if defined(RTS_SUPPORTS_THREADS)
424         // In the threaded RTS, deadlock detection doesn't work,
425         // so just exit right away.
426         prog_belch("interrupted");
427         releaseCapability(cap);
428         RELEASE_LOCK(&sched_mutex);
429         shutdownHaskellAndExit(EXIT_SUCCESS);
430 #else
431         deleteAllThreads();
432 #endif
433     }
434
435 #if defined(RTS_USER_SIGNALS)
436     // check for signals each time around the scheduler
437     if (signals_pending()) {
438       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
439       startSignalHandlers();
440       ACQUIRE_LOCK(&sched_mutex);
441     }
442 #endif
443
444     //
445     // Check whether any waiting threads need to be woken up.  If the
446     // run queue is empty, and there are no other tasks running, we
447     // can wait indefinitely for something to happen.
448     //
449     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
450 #if defined(RTS_SUPPORTS_THREADS)
451                 || EMPTY_RUN_QUEUE()
452 #endif
453         )
454     {
455       awaitEvent( EMPTY_RUN_QUEUE() );
456     }
457     // we can be interrupted while waiting for I/O...
458     if (interrupted) continue;
459
460     /* 
461      * Detect deadlock: when we have no threads to run, there are no
462      * threads waiting on I/O or sleeping, and all the other tasks are
463      * waiting for work, we must have a deadlock of some description.
464      *
465      * We first try to find threads blocked on themselves (ie. black
466      * holes), and generate NonTermination exceptions where necessary.
467      *
468      * If no threads are black holed, we have a deadlock situation, so
469      * inform all the main threads.
470      */
471 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
472     if (   EMPTY_THREAD_QUEUES() )
473     {
474         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
475         // Garbage collection can release some new threads due to
476         // either (a) finalizers or (b) threads resurrected because
477         // they are about to be send BlockedOnDeadMVar.  Any threads
478         // thus released will be immediately runnable.
479         GarbageCollect(GetRoots,rtsTrue);
480
481         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
482
483         IF_DEBUG(scheduler, 
484                  sched_belch("still deadlocked, checking for black holes..."));
485         detectBlackHoles();
486
487         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
488
489 #if defined(RTS_USER_SIGNALS)
490         /* If we have user-installed signal handlers, then wait
491          * for signals to arrive rather then bombing out with a
492          * deadlock.
493          */
494         if ( anyUserHandlers() ) {
495             IF_DEBUG(scheduler, 
496                      sched_belch("still deadlocked, waiting for signals..."));
497
498             awaitUserSignals();
499
500             // we might be interrupted...
501             if (interrupted) { continue; }
502
503             if (signals_pending()) {
504                 RELEASE_LOCK(&sched_mutex);
505                 startSignalHandlers();
506                 ACQUIRE_LOCK(&sched_mutex);
507             }
508             ASSERT(!EMPTY_RUN_QUEUE());
509             goto not_deadlocked;
510         }
511 #endif
512
513         /* Probably a real deadlock.  Send the current main thread the
514          * Deadlock exception (or in the SMP build, send *all* main
515          * threads the deadlock exception, since none of them can make
516          * progress).
517          */
518         {
519             StgMainThread *m;
520             m = main_threads;
521             switch (m->tso->why_blocked) {
522             case BlockedOnBlackHole:
523             case BlockedOnException:
524             case BlockedOnMVar:
525                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
526                 break;
527             default:
528                 barf("deadlock: main thread blocked in a strange way");
529             }
530         }
531     }
532   not_deadlocked:
533
534 #elif defined(RTS_SUPPORTS_THREADS)
535     // ToDo: add deadlock detection in threaded RTS
536 #elif defined(PAR)
537     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
538 #endif
539
540 #if defined(RTS_SUPPORTS_THREADS)
541     if ( EMPTY_RUN_QUEUE() ) {
542         continue; // nothing to do
543     }
544 #endif
545
546 #if defined(GRAN)
547     if (RtsFlags.GranFlags.Light)
548       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
549
550     /* adjust time based on time-stamp */
551     if (event->time > CurrentTime[CurrentProc] &&
552         event->evttype != ContinueThread)
553       CurrentTime[CurrentProc] = event->time;
554     
555     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
556     if (!RtsFlags.GranFlags.Light)
557       handleIdlePEs();
558
559     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
560
561     /* main event dispatcher in GranSim */
562     switch (event->evttype) {
563       /* Should just be continuing execution */
564     case ContinueThread:
565       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
566       /* ToDo: check assertion
567       ASSERT(run_queue_hd != (StgTSO*)NULL &&
568              run_queue_hd != END_TSO_QUEUE);
569       */
570       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
571       if (!RtsFlags.GranFlags.DoAsyncFetch &&
572           procStatus[CurrentProc]==Fetching) {
573         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
574               CurrentTSO->id, CurrentTSO, CurrentProc);
575         goto next_thread;
576       } 
577       /* Ignore ContinueThreads for completed threads */
578       if (CurrentTSO->what_next == ThreadComplete) {
579         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
580               CurrentTSO->id, CurrentTSO, CurrentProc);
581         goto next_thread;
582       } 
583       /* Ignore ContinueThreads for threads that are being migrated */
584       if (PROCS(CurrentTSO)==Nowhere) { 
585         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
586               CurrentTSO->id, CurrentTSO, CurrentProc);
587         goto next_thread;
588       }
589       /* The thread should be at the beginning of the run queue */
590       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
591         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
592               CurrentTSO->id, CurrentTSO, CurrentProc);
593         break; // run the thread anyway
594       }
595       /*
596       new_event(proc, proc, CurrentTime[proc],
597                 FindWork,
598                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
599       goto next_thread; 
600       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
601       break; // now actually run the thread; DaH Qu'vam yImuHbej 
602
603     case FetchNode:
604       do_the_fetchnode(event);
605       goto next_thread;             /* handle next event in event queue  */
606       
607     case GlobalBlock:
608       do_the_globalblock(event);
609       goto next_thread;             /* handle next event in event queue  */
610       
611     case FetchReply:
612       do_the_fetchreply(event);
613       goto next_thread;             /* handle next event in event queue  */
614       
615     case UnblockThread:   /* Move from the blocked queue to the tail of */
616       do_the_unblock(event);
617       goto next_thread;             /* handle next event in event queue  */
618       
619     case ResumeThread:  /* Move from the blocked queue to the tail of */
620       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
621       event->tso->gran.blocktime += 
622         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
623       do_the_startthread(event);
624       goto next_thread;             /* handle next event in event queue  */
625       
626     case StartThread:
627       do_the_startthread(event);
628       goto next_thread;             /* handle next event in event queue  */
629       
630     case MoveThread:
631       do_the_movethread(event);
632       goto next_thread;             /* handle next event in event queue  */
633       
634     case MoveSpark:
635       do_the_movespark(event);
636       goto next_thread;             /* handle next event in event queue  */
637       
638     case FindWork:
639       do_the_findwork(event);
640       goto next_thread;             /* handle next event in event queue  */
641       
642     default:
643       barf("Illegal event type %u\n", event->evttype);
644     }  /* switch */
645     
646     /* This point was scheduler_loop in the old RTS */
647
648     IF_DEBUG(gran, belch("GRAN: after main switch"));
649
650     TimeOfLastEvent = CurrentTime[CurrentProc];
651     TimeOfNextEvent = get_time_of_next_event();
652     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
653     // CurrentTSO = ThreadQueueHd;
654
655     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
656                          TimeOfNextEvent));
657
658     if (RtsFlags.GranFlags.Light) 
659       GranSimLight_leave_system(event, &ActiveTSO); 
660
661     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
662
663     IF_DEBUG(gran, 
664              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
665
666     /* in a GranSim setup the TSO stays on the run queue */
667     t = CurrentTSO;
668     /* Take a thread from the run queue. */
669     POP_RUN_QUEUE(t); // take_off_run_queue(t);
670
671     IF_DEBUG(gran, 
672              fprintf(stderr, "GRAN: About to run current thread, which is\n");
673              G_TSO(t,5));
674
675     context_switch = 0; // turned on via GranYield, checking events and time slice
676
677     IF_DEBUG(gran, 
678              DumpGranEvent(GR_SCHEDULE, t));
679
680     procStatus[CurrentProc] = Busy;
681
682 #elif defined(PAR)
683     if (PendingFetches != END_BF_QUEUE) {
684         processFetches();
685     }
686
687     /* ToDo: phps merge with spark activation above */
688     /* check whether we have local work and send requests if we have none */
689     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
690       /* :-[  no local threads => look out for local sparks */
691       /* the spark pool for the current PE */
692       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
693       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
694           pool->hd < pool->tl) {
695         /* 
696          * ToDo: add GC code check that we really have enough heap afterwards!!
697          * Old comment:
698          * If we're here (no runnable threads) and we have pending
699          * sparks, we must have a space problem.  Get enough space
700          * to turn one of those pending sparks into a
701          * thread... 
702          */
703
704         spark = findSpark(rtsFalse);                /* get a spark */
705         if (spark != (rtsSpark) NULL) {
706           tso = activateSpark(spark);       /* turn the spark into a thread */
707           IF_PAR_DEBUG(schedule,
708                        belch("==== schedule: Created TSO %d (%p); %d threads active",
709                              tso->id, tso, advisory_thread_count));
710
711           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
712             belch("==^^ failed to activate spark");
713             goto next_thread;
714           }               /* otherwise fall through & pick-up new tso */
715         } else {
716           IF_PAR_DEBUG(verbose,
717                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
718                              spark_queue_len(pool)));
719           goto next_thread;
720         }
721       }
722
723       /* If we still have no work we need to send a FISH to get a spark
724          from another PE 
725       */
726       if (EMPTY_RUN_QUEUE()) {
727       /* =8-[  no local sparks => look for work on other PEs */
728         /*
729          * We really have absolutely no work.  Send out a fish
730          * (there may be some out there already), and wait for
731          * something to arrive.  We clearly can't run any threads
732          * until a SCHEDULE or RESUME arrives, and so that's what
733          * we're hoping to see.  (Of course, we still have to
734          * respond to other types of messages.)
735          */
736         TIME now = msTime() /*CURRENT_TIME*/;
737         IF_PAR_DEBUG(verbose, 
738                      belch("--  now=%ld", now));
739         IF_PAR_DEBUG(verbose,
740                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
741                          (last_fish_arrived_at!=0 &&
742                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
743                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
744                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
745                              last_fish_arrived_at,
746                              RtsFlags.ParFlags.fishDelay, now);
747                      });
748         
749         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
750             (last_fish_arrived_at==0 ||
751              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
752           /* outstandingFishes is set in sendFish, processFish;
753              avoid flooding system with fishes via delay */
754           pe = choosePE();
755           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
756                    NEW_FISH_HUNGER);
757
758           // Global statistics: count no. of fishes
759           if (RtsFlags.ParFlags.ParStats.Global &&
760               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
761             globalParStats.tot_fish_mess++;
762           }
763         }
764       
765         receivedFinish = processMessages();
766         goto next_thread;
767       }
768     } else if (PacketsWaiting()) {  /* Look for incoming messages */
769       receivedFinish = processMessages();
770     }
771
772     /* Now we are sure that we have some work available */
773     ASSERT(run_queue_hd != END_TSO_QUEUE);
774
775     /* Take a thread from the run queue, if we have work */
776     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
777     IF_DEBUG(sanity,checkTSO(t));
778
779     /* ToDo: write something to the log-file
780     if (RTSflags.ParFlags.granSimStats && !sameThread)
781         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
782
783     CurrentTSO = t;
784     */
785     /* the spark pool for the current PE */
786     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
787
788     IF_DEBUG(scheduler, 
789              belch("--=^ %d threads, %d sparks on [%#x]", 
790                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
791
792 # if 1
793     if (0 && RtsFlags.ParFlags.ParStats.Full && 
794         t && LastTSO && t->id != LastTSO->id && 
795         LastTSO->why_blocked == NotBlocked && 
796         LastTSO->what_next != ThreadComplete) {
797       // if previously scheduled TSO not blocked we have to record the context switch
798       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
799                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
800     }
801
802     if (RtsFlags.ParFlags.ParStats.Full && 
803         (emitSchedule /* forced emit */ ||
804         (t && LastTSO && t->id != LastTSO->id))) {
805       /* 
806          we are running a different TSO, so write a schedule event to log file
807          NB: If we use fair scheduling we also have to write  a deschedule 
808              event for LastTSO; with unfair scheduling we know that the
809              previous tso has blocked whenever we switch to another tso, so
810              we don't need it in GUM for now
811       */
812       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
813                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
814       emitSchedule = rtsFalse;
815     }
816      
817 # endif
818 #else /* !GRAN && !PAR */
819   
820     // grab a thread from the run queue
821     ASSERT(run_queue_hd != END_TSO_QUEUE);
822     POP_RUN_QUEUE(t);
823
824     // Sanity check the thread we're about to run.  This can be
825     // expensive if there is lots of thread switching going on...
826     IF_DEBUG(sanity,checkTSO(t));
827 #endif
828
829 #ifdef THREADED_RTS
830     {
831       StgMainThread *m = t->main;
832       
833       if(m)
834       {
835         if(m == mainThread)
836         {
837           IF_DEBUG(scheduler,
838             sched_belch("### Running thread %d in bound thread", t->id));
839           // yes, the Haskell thread is bound to the current native thread
840         }
841         else
842         {
843           IF_DEBUG(scheduler,
844             sched_belch("### thread %d bound to another OS thread", t->id));
845           // no, bound to a different Haskell thread: pass to that thread
846           PUSH_ON_RUN_QUEUE(t);
847           passCapability(&m->bound_thread_cond);
848           continue;
849         }
850       }
851       else
852       {
853         if(mainThread != NULL)
854         // The thread we want to run is bound.
855         {
856           IF_DEBUG(scheduler,
857             sched_belch("### this OS thread cannot run thread %d", t->id));
858           // no, the current native thread is bound to a different
859           // Haskell thread, so pass it to any worker thread
860           PUSH_ON_RUN_QUEUE(t);
861           passCapabilityToWorker();
862           continue; 
863         }
864       }
865     }
866 #endif
867
868     cap->r.rCurrentTSO = t;
869     
870     /* context switches are now initiated by the timer signal, unless
871      * the user specified "context switch as often as possible", with
872      * +RTS -C0
873      */
874     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
875          && (run_queue_hd != END_TSO_QUEUE
876              || blocked_queue_hd != END_TSO_QUEUE
877              || sleeping_queue != END_TSO_QUEUE)))
878         context_switch = 1;
879     else
880         context_switch = 0;
881
882 run_thread:
883
884     RELEASE_LOCK(&sched_mutex);
885
886     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
887                               t->id, whatNext_strs[t->what_next]));
888
889 #ifdef PROFILING
890     startHeapProfTimer();
891 #endif
892
893     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
894     /* Run the current thread 
895      */
896     prev_what_next = t->what_next;
897     switch (prev_what_next) {
898     case ThreadKilled:
899     case ThreadComplete:
900         /* Thread already finished, return to scheduler. */
901         ret = ThreadFinished;
902         break;
903     case ThreadRunGHC:
904         errno = t->saved_errno;
905         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
906         t->saved_errno = errno;
907         break;
908     case ThreadInterpret:
909         ret = interpretBCO(cap);
910         break;
911     default:
912       barf("schedule: invalid what_next field");
913     }
914     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
915     
916     /* Costs for the scheduler are assigned to CCS_SYSTEM */
917 #ifdef PROFILING
918     stopHeapProfTimer();
919     CCCS = CCS_SYSTEM;
920 #endif
921     
922     ACQUIRE_LOCK(&sched_mutex);
923     
924 #ifdef RTS_SUPPORTS_THREADS
925     IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
926 #elif !defined(GRAN) && !defined(PAR)
927     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
928 #endif
929     t = cap->r.rCurrentTSO;
930     
931 #if defined(PAR)
932     /* HACK 675: if the last thread didn't yield, make sure to print a 
933        SCHEDULE event to the log file when StgRunning the next thread, even
934        if it is the same one as before */
935     LastTSO = t; 
936     TimeOfLastYield = CURRENT_TIME;
937 #endif
938
939     switch (ret) {
940     case HeapOverflow:
941 #if defined(GRAN)
942       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
943       globalGranStats.tot_heapover++;
944 #elif defined(PAR)
945       globalParStats.tot_heapover++;
946 #endif
947
948       // did the task ask for a large block?
949       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
950           // if so, get one and push it on the front of the nursery.
951           bdescr *bd;
952           nat blocks;
953           
954           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
955
956           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
957                                    t->id, whatNext_strs[t->what_next], blocks));
958
959           // don't do this if it would push us over the
960           // alloc_blocks_lim limit; we'll GC first.
961           if (alloc_blocks + blocks < alloc_blocks_lim) {
962
963               alloc_blocks += blocks;
964               bd = allocGroup( blocks );
965
966               // link the new group into the list
967               bd->link = cap->r.rCurrentNursery;
968               bd->u.back = cap->r.rCurrentNursery->u.back;
969               if (cap->r.rCurrentNursery->u.back != NULL) {
970                   cap->r.rCurrentNursery->u.back->link = bd;
971               } else {
972                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
973                          g0s0->blocks == cap->r.rNursery);
974                   cap->r.rNursery = g0s0->blocks = bd;
975               }           
976               cap->r.rCurrentNursery->u.back = bd;
977
978               // initialise it as a nursery block.  We initialise the
979               // step, gen_no, and flags field of *every* sub-block in
980               // this large block, because this is easier than making
981               // sure that we always find the block head of a large
982               // block whenever we call Bdescr() (eg. evacuate() and
983               // isAlive() in the GC would both have to do this, at
984               // least).
985               { 
986                   bdescr *x;
987                   for (x = bd; x < bd + blocks; x++) {
988                       x->step = g0s0;
989                       x->gen_no = 0;
990                       x->flags = 0;
991                   }
992               }
993
994               // don't forget to update the block count in g0s0.
995               g0s0->n_blocks += blocks;
996               // This assert can be a killer if the app is doing lots
997               // of large block allocations.
998               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
999
1000               // now update the nursery to point to the new block
1001               cap->r.rCurrentNursery = bd;
1002
1003               // we might be unlucky and have another thread get on the
1004               // run queue before us and steal the large block, but in that
1005               // case the thread will just end up requesting another large
1006               // block.
1007               PUSH_ON_RUN_QUEUE(t);
1008               break;
1009           }
1010       }
1011
1012       /* make all the running tasks block on a condition variable,
1013        * maybe set context_switch and wait till they all pile in,
1014        * then have them wait on a GC condition variable.
1015        */
1016       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1017                                t->id, whatNext_strs[t->what_next]));
1018       threadPaused(t);
1019 #if defined(GRAN)
1020       ASSERT(!is_on_queue(t,CurrentProc));
1021 #elif defined(PAR)
1022       /* Currently we emit a DESCHEDULE event before GC in GUM.
1023          ToDo: either add separate event to distinguish SYSTEM time from rest
1024                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1025       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1026         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1027                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1028         emitSchedule = rtsTrue;
1029       }
1030 #endif
1031       
1032       ready_to_gc = rtsTrue;
1033       context_switch = 1;               /* stop other threads ASAP */
1034       PUSH_ON_RUN_QUEUE(t);
1035       /* actual GC is done at the end of the while loop */
1036       break;
1037       
1038     case StackOverflow:
1039 #if defined(GRAN)
1040       IF_DEBUG(gran, 
1041                DumpGranEvent(GR_DESCHEDULE, t));
1042       globalGranStats.tot_stackover++;
1043 #elif defined(PAR)
1044       // IF_DEBUG(par, 
1045       // DumpGranEvent(GR_DESCHEDULE, t);
1046       globalParStats.tot_stackover++;
1047 #endif
1048       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1049                                t->id, whatNext_strs[t->what_next]));
1050       /* just adjust the stack for this thread, then pop it back
1051        * on the run queue.
1052        */
1053       threadPaused(t);
1054       { 
1055         /* enlarge the stack */
1056         StgTSO *new_t = threadStackOverflow(t);
1057         
1058         /* This TSO has moved, so update any pointers to it from the
1059          * main thread stack.  It better not be on any other queues...
1060          * (it shouldn't be).
1061          */
1062         if (t->main != NULL) {
1063             t->main->tso = new_t;
1064         }
1065         threadPaused(new_t);
1066         PUSH_ON_RUN_QUEUE(new_t);
1067       }
1068       break;
1069
1070     case ThreadYielding:
1071 #if defined(GRAN)
1072       IF_DEBUG(gran, 
1073                DumpGranEvent(GR_DESCHEDULE, t));
1074       globalGranStats.tot_yields++;
1075 #elif defined(PAR)
1076       // IF_DEBUG(par, 
1077       // DumpGranEvent(GR_DESCHEDULE, t);
1078       globalParStats.tot_yields++;
1079 #endif
1080       /* put the thread back on the run queue.  Then, if we're ready to
1081        * GC, check whether this is the last task to stop.  If so, wake
1082        * up the GC thread.  getThread will block during a GC until the
1083        * GC is finished.
1084        */
1085       IF_DEBUG(scheduler,
1086                if (t->what_next != prev_what_next) {
1087                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1088                          t->id, whatNext_strs[t->what_next]);
1089                } else {
1090                    belch("--<< thread %ld (%s) stopped, yielding", 
1091                          t->id, whatNext_strs[t->what_next]);
1092                }
1093                );
1094
1095       IF_DEBUG(sanity,
1096                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1097                checkTSO(t));
1098       ASSERT(t->link == END_TSO_QUEUE);
1099
1100       // Shortcut if we're just switching evaluators: don't bother
1101       // doing stack squeezing (which can be expensive), just run the
1102       // thread.
1103       if (t->what_next != prev_what_next) {
1104           goto run_thread;
1105       }
1106
1107       threadPaused(t);
1108
1109 #if defined(GRAN)
1110       ASSERT(!is_on_queue(t,CurrentProc));
1111
1112       IF_DEBUG(sanity,
1113                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1114                checkThreadQsSanity(rtsTrue));
1115 #endif
1116
1117 #if defined(PAR)
1118       if (RtsFlags.ParFlags.doFairScheduling) { 
1119         /* this does round-robin scheduling; good for concurrency */
1120         APPEND_TO_RUN_QUEUE(t);
1121       } else {
1122         /* this does unfair scheduling; good for parallelism */
1123         PUSH_ON_RUN_QUEUE(t);
1124       }
1125 #else
1126       // this does round-robin scheduling; good for concurrency
1127       APPEND_TO_RUN_QUEUE(t);
1128 #endif
1129
1130 #if defined(GRAN)
1131       /* add a ContinueThread event to actually process the thread */
1132       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1133                 ContinueThread,
1134                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1135       IF_GRAN_DEBUG(bq, 
1136                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1137                G_EVENTQ(0);
1138                G_CURR_THREADQ(0));
1139 #endif /* GRAN */
1140       break;
1141
1142     case ThreadBlocked:
1143 #if defined(GRAN)
1144       IF_DEBUG(scheduler,
1145                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1146                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1147                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1148
1149       // ??? needed; should emit block before
1150       IF_DEBUG(gran, 
1151                DumpGranEvent(GR_DESCHEDULE, t)); 
1152       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1153       /*
1154         ngoq Dogh!
1155       ASSERT(procStatus[CurrentProc]==Busy || 
1156               ((procStatus[CurrentProc]==Fetching) && 
1157               (t->block_info.closure!=(StgClosure*)NULL)));
1158       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1159           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1160             procStatus[CurrentProc]==Fetching)) 
1161         procStatus[CurrentProc] = Idle;
1162       */
1163 #elif defined(PAR)
1164       IF_DEBUG(scheduler,
1165                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1166                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1167       IF_PAR_DEBUG(bq,
1168
1169                    if (t->block_info.closure!=(StgClosure*)NULL) 
1170                      print_bq(t->block_info.closure));
1171
1172       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1173       blockThread(t);
1174
1175       /* whatever we schedule next, we must log that schedule */
1176       emitSchedule = rtsTrue;
1177
1178 #else /* !GRAN */
1179       /* don't need to do anything.  Either the thread is blocked on
1180        * I/O, in which case we'll have called addToBlockedQueue
1181        * previously, or it's blocked on an MVar or Blackhole, in which
1182        * case it'll be on the relevant queue already.
1183        */
1184       IF_DEBUG(scheduler,
1185                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1186                        t->id, whatNext_strs[t->what_next]);
1187                printThreadBlockage(t);
1188                fprintf(stderr, "\n"));
1189       fflush(stderr);
1190
1191       /* Only for dumping event to log file 
1192          ToDo: do I need this in GranSim, too?
1193       blockThread(t);
1194       */
1195 #endif
1196       threadPaused(t);
1197       break;
1198
1199     case ThreadFinished:
1200       /* Need to check whether this was a main thread, and if so, signal
1201        * the task that started it with the return value.  If we have no
1202        * more main threads, we probably need to stop all the tasks until
1203        * we get a new one.
1204        */
1205       /* We also end up here if the thread kills itself with an
1206        * uncaught exception, see Exception.hc.
1207        */
1208       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1209                                t->id, whatNext_strs[t->what_next]));
1210 #if defined(GRAN)
1211       endThread(t, CurrentProc); // clean-up the thread
1212 #elif defined(PAR)
1213       /* For now all are advisory -- HWL */
1214       //if(t->priority==AdvisoryPriority) ??
1215       advisory_thread_count--;
1216       
1217 # ifdef DIST
1218       if(t->dist.priority==RevalPriority)
1219         FinishReval(t);
1220 # endif
1221       
1222       if (RtsFlags.ParFlags.ParStats.Full &&
1223           !RtsFlags.ParFlags.ParStats.Suppressed) 
1224         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1225 #endif
1226
1227       //
1228       // Check whether the thread that just completed was a main
1229       // thread, and if so return with the result.  
1230       //
1231       // There is an assumption here that all thread completion goes
1232       // through this point; we need to make sure that if a thread
1233       // ends up in the ThreadKilled state, that it stays on the run
1234       // queue so it can be dealt with here.
1235       //
1236       if (
1237 #if defined(RTS_SUPPORTS_THREADS)
1238           mainThread != NULL
1239 #else
1240           mainThread->tso == t
1241 #endif
1242           )
1243       {
1244           // We are a bound thread: this must be our thread that just
1245           // completed.
1246           ASSERT(mainThread->tso == t);
1247
1248           if (t->what_next == ThreadComplete) {
1249               if (mainThread->ret) {
1250                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1251                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1252               }
1253               mainThread->stat = Success;
1254           } else {
1255               if (mainThread->ret) {
1256                   *(mainThread->ret) = NULL;
1257               }
1258               if (was_interrupted) {
1259                   mainThread->stat = Interrupted;
1260               } else {
1261                   mainThread->stat = Killed;
1262               }
1263           }
1264 #ifdef DEBUG
1265           removeThreadLabel((StgWord)mainThread->tso->id);
1266 #endif
1267           if (mainThread->prev == NULL) {
1268               main_threads = mainThread->link;
1269           } else {
1270               mainThread->prev->link = mainThread->link;
1271           }
1272           if (mainThread->link != NULL) {
1273               mainThread->link->prev = NULL;
1274           }
1275           releaseCapability(cap);
1276           return;
1277       }
1278
1279 #ifdef RTS_SUPPORTS_THREADS
1280       ASSERT(t->main == NULL);
1281 #else
1282       if (t->main != NULL) {
1283           // Must be a main thread that is not the topmost one.  Leave
1284           // it on the run queue until the stack has unwound to the
1285           // point where we can deal with this.  Leaving it on the run
1286           // queue also ensures that the garbage collector knows about
1287           // this thread and its return value (it gets dropped from the
1288           // all_threads list so there's no other way to find it).
1289           APPEND_TO_RUN_QUEUE(t);
1290       }
1291 #endif
1292       break;
1293
1294     default:
1295       barf("schedule: invalid thread return code %d", (int)ret);
1296     }
1297
1298 #ifdef PROFILING
1299     // When we have +RTS -i0 and we're heap profiling, do a census at
1300     // every GC.  This lets us get repeatable runs for debugging.
1301     if (performHeapProfile ||
1302         (RtsFlags.ProfFlags.profileInterval==0 &&
1303          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1304         GarbageCollect(GetRoots, rtsTrue);
1305         heapCensus();
1306         performHeapProfile = rtsFalse;
1307         ready_to_gc = rtsFalse; // we already GC'd
1308     }
1309 #endif
1310
1311     if (ready_to_gc) {
1312       /* everybody back, start the GC.
1313        * Could do it in this thread, or signal a condition var
1314        * to do it in another thread.  Either way, we need to
1315        * broadcast on gc_pending_cond afterward.
1316        */
1317 #if defined(RTS_SUPPORTS_THREADS)
1318       IF_DEBUG(scheduler,sched_belch("doing GC"));
1319 #endif
1320       GarbageCollect(GetRoots,rtsFalse);
1321       ready_to_gc = rtsFalse;
1322 #if defined(GRAN)
1323       /* add a ContinueThread event to continue execution of current thread */
1324       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1325                 ContinueThread,
1326                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1327       IF_GRAN_DEBUG(bq, 
1328                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1329                G_EVENTQ(0);
1330                G_CURR_THREADQ(0));
1331 #endif /* GRAN */
1332     }
1333
1334 #if defined(GRAN)
1335   next_thread:
1336     IF_GRAN_DEBUG(unused,
1337                   print_eventq(EventHd));
1338
1339     event = get_next_event();
1340 #elif defined(PAR)
1341   next_thread:
1342     /* ToDo: wait for next message to arrive rather than busy wait */
1343 #endif /* GRAN */
1344
1345   } /* end of while(1) */
1346
1347   IF_PAR_DEBUG(verbose,
1348                belch("== Leaving schedule() after having received Finish"));
1349 }
1350
1351 /* ---------------------------------------------------------------------------
1352  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1353  * used by Control.Concurrent for error checking.
1354  * ------------------------------------------------------------------------- */
1355  
1356 StgBool
1357 rtsSupportsBoundThreads(void)
1358 {
1359 #ifdef THREADED_RTS
1360   return rtsTrue;
1361 #else
1362   return rtsFalse;
1363 #endif
1364 }
1365
1366 /* ---------------------------------------------------------------------------
1367  * isThreadBound(tso): check whether tso is bound to an OS thread.
1368  * ------------------------------------------------------------------------- */
1369  
1370 StgBool
1371 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1372 {
1373 #ifdef THREADED_RTS
1374   return (tso->main != NULL);
1375 #endif
1376   return rtsFalse;
1377 }
1378
1379 /* ---------------------------------------------------------------------------
1380  * Singleton fork(). Do not copy any running threads.
1381  * ------------------------------------------------------------------------- */
1382
1383 #ifndef mingw32_TARGET_OS
1384 #define FORKPROCESS_PRIMOP_SUPPORTED
1385 #endif
1386
1387 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1388 static void 
1389 deleteThreadImmediately(StgTSO *tso);
1390 #endif
1391 StgInt
1392 forkProcess(HsStablePtr *entry
1393 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1394             STG_UNUSED
1395 #endif
1396            )
1397 {
1398 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1399   pid_t pid;
1400   StgTSO* t,*next;
1401   StgMainThread *m;
1402   SchedulerStatus rc;
1403
1404   IF_DEBUG(scheduler,sched_belch("forking!"));
1405   rts_lock(); // This not only acquires sched_mutex, it also
1406               // makes sure that no other threads are running
1407
1408   pid = fork();
1409
1410   if (pid) { /* parent */
1411
1412   /* just return the pid */
1413     rts_unlock();
1414     return pid;
1415     
1416   } else { /* child */
1417     
1418     
1419       // delete all threads
1420     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1421     
1422     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1423       next = t->link;
1424
1425         // don't allow threads to catch the ThreadKilled exception
1426       deleteThreadImmediately(t);
1427     }
1428     
1429       // wipe the main thread list
1430     while((m = main_threads) != NULL) {
1431       main_threads = m->link;
1432 # ifdef THREADED_RTS
1433       closeCondition(&m->bound_thread_cond);
1434 # endif
1435       stgFree(m);
1436     }
1437     
1438 # ifdef RTS_SUPPORTS_THREADS
1439     resetTaskManagerAfterFork();      // tell startTask() and friends that
1440     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1441     resetWorkerWakeupPipeAfterFork();
1442 # endif
1443     
1444     rc = rts_evalStableIO(entry, NULL);  // run the action
1445     rts_checkSchedStatus("forkProcess",rc);
1446     
1447     rts_unlock();
1448     
1449     hs_exit();                      // clean up and exit
1450     stg_exit(0);
1451   }
1452 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1453   barf("forkProcess#: primop not supported, sorry!\n");
1454   return -1;
1455 #endif
1456 }
1457
1458 /* ---------------------------------------------------------------------------
1459  * deleteAllThreads():  kill all the live threads.
1460  *
1461  * This is used when we catch a user interrupt (^C), before performing
1462  * any necessary cleanups and running finalizers.
1463  *
1464  * Locks: sched_mutex held.
1465  * ------------------------------------------------------------------------- */
1466    
1467 void
1468 deleteAllThreads ( void )
1469 {
1470   StgTSO* t, *next;
1471   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1472   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1473       next = t->global_link;
1474       deleteThread(t);
1475   }      
1476   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1477   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1478   sleeping_queue = END_TSO_QUEUE;
1479 }
1480
1481 /* startThread and  insertThread are now in GranSim.c -- HWL */
1482
1483
1484 /* ---------------------------------------------------------------------------
1485  * Suspending & resuming Haskell threads.
1486  * 
1487  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1488  * its capability before calling the C function.  This allows another
1489  * task to pick up the capability and carry on running Haskell
1490  * threads.  It also means that if the C call blocks, it won't lock
1491  * the whole system.
1492  *
1493  * The Haskell thread making the C call is put to sleep for the
1494  * duration of the call, on the susepended_ccalling_threads queue.  We
1495  * give out a token to the task, which it can use to resume the thread
1496  * on return from the C function.
1497  * ------------------------------------------------------------------------- */
1498    
1499 StgInt
1500 suspendThread( StgRegTable *reg, 
1501                rtsBool concCall
1502 #if !defined(DEBUG)
1503                STG_UNUSED
1504 #endif
1505                )
1506 {
1507   nat tok;
1508   Capability *cap;
1509   int saved_errno = errno;
1510
1511   /* assume that *reg is a pointer to the StgRegTable part
1512    * of a Capability.
1513    */
1514   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1515
1516   ACQUIRE_LOCK(&sched_mutex);
1517
1518   IF_DEBUG(scheduler,
1519            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1520
1521   // XXX this might not be necessary --SDM
1522   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1523
1524   threadPaused(cap->r.rCurrentTSO);
1525   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1526   suspended_ccalling_threads = cap->r.rCurrentTSO;
1527
1528   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1529       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1530       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1531   } else {
1532       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1533   }
1534
1535   /* Use the thread ID as the token; it should be unique */
1536   tok = cap->r.rCurrentTSO->id;
1537
1538   /* Hand back capability */
1539   releaseCapability(cap);
1540   
1541 #if defined(RTS_SUPPORTS_THREADS)
1542   /* Preparing to leave the RTS, so ensure there's a native thread/task
1543      waiting to take over.
1544   */
1545   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1546 #endif
1547
1548   /* Other threads _might_ be available for execution; signal this */
1549   THREAD_RUNNABLE();
1550   RELEASE_LOCK(&sched_mutex);
1551   
1552   errno = saved_errno;
1553   return tok; 
1554 }
1555
1556 StgRegTable *
1557 resumeThread( StgInt tok,
1558               rtsBool concCall STG_UNUSED )
1559 {
1560   StgTSO *tso, **prev;
1561   Capability *cap;
1562   int saved_errno = errno;
1563
1564 #if defined(RTS_SUPPORTS_THREADS)
1565   /* Wait for permission to re-enter the RTS with the result. */
1566   ACQUIRE_LOCK(&sched_mutex);
1567   waitForReturnCapability(&sched_mutex, &cap);
1568
1569   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1570 #else
1571   grabCapability(&cap);
1572 #endif
1573
1574   /* Remove the thread off of the suspended list */
1575   prev = &suspended_ccalling_threads;
1576   for (tso = suspended_ccalling_threads; 
1577        tso != END_TSO_QUEUE; 
1578        prev = &tso->link, tso = tso->link) {
1579     if (tso->id == (StgThreadID)tok) {
1580       *prev = tso->link;
1581       break;
1582     }
1583   }
1584   if (tso == END_TSO_QUEUE) {
1585     barf("resumeThread: thread not found");
1586   }
1587   tso->link = END_TSO_QUEUE;
1588   
1589   if(tso->why_blocked == BlockedOnCCall) {
1590       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1591       tso->blocked_exceptions = NULL;
1592   }
1593   
1594   /* Reset blocking status */
1595   tso->why_blocked  = NotBlocked;
1596
1597   cap->r.rCurrentTSO = tso;
1598   RELEASE_LOCK(&sched_mutex);
1599   errno = saved_errno;
1600   return &cap->r;
1601 }
1602
1603
1604 /* ---------------------------------------------------------------------------
1605  * Static functions
1606  * ------------------------------------------------------------------------ */
1607 static void unblockThread(StgTSO *tso);
1608
1609 /* ---------------------------------------------------------------------------
1610  * Comparing Thread ids.
1611  *
1612  * This is used from STG land in the implementation of the
1613  * instances of Eq/Ord for ThreadIds.
1614  * ------------------------------------------------------------------------ */
1615
1616 int
1617 cmp_thread(StgPtr tso1, StgPtr tso2) 
1618
1619   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1620   StgThreadID id2 = ((StgTSO *)tso2)->id;
1621  
1622   if (id1 < id2) return (-1);
1623   if (id1 > id2) return 1;
1624   return 0;
1625 }
1626
1627 /* ---------------------------------------------------------------------------
1628  * Fetching the ThreadID from an StgTSO.
1629  *
1630  * This is used in the implementation of Show for ThreadIds.
1631  * ------------------------------------------------------------------------ */
1632 int
1633 rts_getThreadId(StgPtr tso) 
1634 {
1635   return ((StgTSO *)tso)->id;
1636 }
1637
1638 #ifdef DEBUG
1639 void
1640 labelThread(StgPtr tso, char *label)
1641 {
1642   int len;
1643   void *buf;
1644
1645   /* Caveat: Once set, you can only set the thread name to "" */
1646   len = strlen(label)+1;
1647   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1648   strncpy(buf,label,len);
1649   /* Update will free the old memory for us */
1650   updateThreadLabel(((StgTSO *)tso)->id,buf);
1651 }
1652 #endif /* DEBUG */
1653
1654 /* ---------------------------------------------------------------------------
1655    Create a new thread.
1656
1657    The new thread starts with the given stack size.  Before the
1658    scheduler can run, however, this thread needs to have a closure
1659    (and possibly some arguments) pushed on its stack.  See
1660    pushClosure() in Schedule.h.
1661
1662    createGenThread() and createIOThread() (in SchedAPI.h) are
1663    convenient packaged versions of this function.
1664
1665    currently pri (priority) is only used in a GRAN setup -- HWL
1666    ------------------------------------------------------------------------ */
1667 #if defined(GRAN)
1668 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1669 StgTSO *
1670 createThread(nat size, StgInt pri)
1671 #else
1672 StgTSO *
1673 createThread(nat size)
1674 #endif
1675 {
1676
1677     StgTSO *tso;
1678     nat stack_size;
1679
1680     /* First check whether we should create a thread at all */
1681 #if defined(PAR)
1682   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1683   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1684     threadsIgnored++;
1685     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1686           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1687     return END_TSO_QUEUE;
1688   }
1689   threadsCreated++;
1690 #endif
1691
1692 #if defined(GRAN)
1693   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1694 #endif
1695
1696   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1697
1698   /* catch ridiculously small stack sizes */
1699   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1700     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1701   }
1702
1703   stack_size = size - TSO_STRUCT_SIZEW;
1704
1705   tso = (StgTSO *)allocate(size);
1706   TICK_ALLOC_TSO(stack_size, 0);
1707
1708   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1709 #if defined(GRAN)
1710   SET_GRAN_HDR(tso, ThisPE);
1711 #endif
1712
1713   // Always start with the compiled code evaluator
1714   tso->what_next = ThreadRunGHC;
1715
1716   tso->id = next_thread_id++; 
1717   tso->why_blocked  = NotBlocked;
1718   tso->blocked_exceptions = NULL;
1719
1720   tso->saved_errno = 0;
1721   tso->main = NULL;
1722   
1723   tso->stack_size   = stack_size;
1724   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1725                               - TSO_STRUCT_SIZEW;
1726   tso->sp           = (P_)&(tso->stack) + stack_size;
1727
1728 #ifdef PROFILING
1729   tso->prof.CCCS = CCS_MAIN;
1730 #endif
1731
1732   /* put a stop frame on the stack */
1733   tso->sp -= sizeofW(StgStopFrame);
1734   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1735   // ToDo: check this
1736 #if defined(GRAN)
1737   tso->link = END_TSO_QUEUE;
1738   /* uses more flexible routine in GranSim */
1739   insertThread(tso, CurrentProc);
1740 #else
1741   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1742    * from its creation
1743    */
1744 #endif
1745
1746 #if defined(GRAN) 
1747   if (RtsFlags.GranFlags.GranSimStats.Full) 
1748     DumpGranEvent(GR_START,tso);
1749 #elif defined(PAR)
1750   if (RtsFlags.ParFlags.ParStats.Full) 
1751     DumpGranEvent(GR_STARTQ,tso);
1752   /* HACk to avoid SCHEDULE 
1753      LastTSO = tso; */
1754 #endif
1755
1756   /* Link the new thread on the global thread list.
1757    */
1758   tso->global_link = all_threads;
1759   all_threads = tso;
1760
1761 #if defined(DIST)
1762   tso->dist.priority = MandatoryPriority; //by default that is...
1763 #endif
1764
1765 #if defined(GRAN)
1766   tso->gran.pri = pri;
1767 # if defined(DEBUG)
1768   tso->gran.magic = TSO_MAGIC; // debugging only
1769 # endif
1770   tso->gran.sparkname   = 0;
1771   tso->gran.startedat   = CURRENT_TIME; 
1772   tso->gran.exported    = 0;
1773   tso->gran.basicblocks = 0;
1774   tso->gran.allocs      = 0;
1775   tso->gran.exectime    = 0;
1776   tso->gran.fetchtime   = 0;
1777   tso->gran.fetchcount  = 0;
1778   tso->gran.blocktime   = 0;
1779   tso->gran.blockcount  = 0;
1780   tso->gran.blockedat   = 0;
1781   tso->gran.globalsparks = 0;
1782   tso->gran.localsparks  = 0;
1783   if (RtsFlags.GranFlags.Light)
1784     tso->gran.clock  = Now; /* local clock */
1785   else
1786     tso->gran.clock  = 0;
1787
1788   IF_DEBUG(gran,printTSO(tso));
1789 #elif defined(PAR)
1790 # if defined(DEBUG)
1791   tso->par.magic = TSO_MAGIC; // debugging only
1792 # endif
1793   tso->par.sparkname   = 0;
1794   tso->par.startedat   = CURRENT_TIME; 
1795   tso->par.exported    = 0;
1796   tso->par.basicblocks = 0;
1797   tso->par.allocs      = 0;
1798   tso->par.exectime    = 0;
1799   tso->par.fetchtime   = 0;
1800   tso->par.fetchcount  = 0;
1801   tso->par.blocktime   = 0;
1802   tso->par.blockcount  = 0;
1803   tso->par.blockedat   = 0;
1804   tso->par.globalsparks = 0;
1805   tso->par.localsparks  = 0;
1806 #endif
1807
1808 #if defined(GRAN)
1809   globalGranStats.tot_threads_created++;
1810   globalGranStats.threads_created_on_PE[CurrentProc]++;
1811   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1812   globalGranStats.tot_sq_probes++;
1813 #elif defined(PAR)
1814   // collect parallel global statistics (currently done together with GC stats)
1815   if (RtsFlags.ParFlags.ParStats.Global &&
1816       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1817     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1818     globalParStats.tot_threads_created++;
1819   }
1820 #endif 
1821
1822 #if defined(GRAN)
1823   IF_GRAN_DEBUG(pri,
1824                 belch("==__ schedule: Created TSO %d (%p);",
1825                       CurrentProc, tso, tso->id));
1826 #elif defined(PAR)
1827     IF_PAR_DEBUG(verbose,
1828                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1829                        tso->id, tso, advisory_thread_count));
1830 #else
1831   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1832                                  tso->id, tso->stack_size));
1833 #endif    
1834   return tso;
1835 }
1836
1837 #if defined(PAR)
1838 /* RFP:
1839    all parallel thread creation calls should fall through the following routine.
1840 */
1841 StgTSO *
1842 createSparkThread(rtsSpark spark) 
1843 { StgTSO *tso;
1844   ASSERT(spark != (rtsSpark)NULL);
1845   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1846   { threadsIgnored++;
1847     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1848           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1849     return END_TSO_QUEUE;
1850   }
1851   else
1852   { threadsCreated++;
1853     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1854     if (tso==END_TSO_QUEUE)     
1855       barf("createSparkThread: Cannot create TSO");
1856 #if defined(DIST)
1857     tso->priority = AdvisoryPriority;
1858 #endif
1859     pushClosure(tso,spark);
1860     PUSH_ON_RUN_QUEUE(tso);
1861     advisory_thread_count++;    
1862   }
1863   return tso;
1864 }
1865 #endif
1866
1867 /*
1868   Turn a spark into a thread.
1869   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1870 */
1871 #if defined(PAR)
1872 StgTSO *
1873 activateSpark (rtsSpark spark) 
1874 {
1875   StgTSO *tso;
1876
1877   tso = createSparkThread(spark);
1878   if (RtsFlags.ParFlags.ParStats.Full) {   
1879     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1880     IF_PAR_DEBUG(verbose,
1881                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1882                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1883   }
1884   // ToDo: fwd info on local/global spark to thread -- HWL
1885   // tso->gran.exported =  spark->exported;
1886   // tso->gran.locked =   !spark->global;
1887   // tso->gran.sparkname = spark->name;
1888
1889   return tso;
1890 }
1891 #endif
1892
1893 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1894                                    Capability *initialCapability
1895                                    );
1896
1897
1898 /* ---------------------------------------------------------------------------
1899  * scheduleThread()
1900  *
1901  * scheduleThread puts a thread on the head of the runnable queue.
1902  * This will usually be done immediately after a thread is created.
1903  * The caller of scheduleThread must create the thread using e.g.
1904  * createThread and push an appropriate closure
1905  * on this thread's stack before the scheduler is invoked.
1906  * ------------------------------------------------------------------------ */
1907
1908 static void scheduleThread_ (StgTSO* tso);
1909
1910 void
1911 scheduleThread_(StgTSO *tso)
1912 {
1913   // Precondition: sched_mutex must be held.
1914   PUSH_ON_RUN_QUEUE(tso);
1915   THREAD_RUNNABLE();
1916 }
1917
1918 void
1919 scheduleThread(StgTSO* tso)
1920 {
1921   ACQUIRE_LOCK(&sched_mutex);
1922   scheduleThread_(tso);
1923   RELEASE_LOCK(&sched_mutex);
1924 }
1925
1926 #if defined(RTS_SUPPORTS_THREADS)
1927 static Condition bound_cond_cache;
1928 static int bound_cond_cache_full = 0;
1929 #endif
1930
1931
1932 SchedulerStatus
1933 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1934                    Capability *initialCapability)
1935 {
1936     // Precondition: sched_mutex must be held
1937     StgMainThread *m;
1938
1939     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1940     m->tso = tso;
1941     tso->main = m;
1942     m->ret = ret;
1943     m->stat = NoStatus;
1944     m->link = main_threads;
1945     m->prev = NULL;
1946     if (main_threads != NULL) {
1947         main_threads->prev = m;
1948     }
1949     main_threads = m;
1950
1951 #if defined(RTS_SUPPORTS_THREADS)
1952     // Allocating a new condition for each thread is expensive, so we
1953     // cache one.  This is a pretty feeble hack, but it helps speed up
1954     // consecutive call-ins quite a bit.
1955     if (bound_cond_cache_full) {
1956         m->bound_thread_cond = bound_cond_cache;
1957         bound_cond_cache_full = 0;
1958     } else {
1959         initCondition(&m->bound_thread_cond);
1960     }
1961 #endif
1962
1963     /* Put the thread on the main-threads list prior to scheduling the TSO.
1964        Failure to do so introduces a race condition in the MT case (as
1965        identified by Wolfgang Thaller), whereby the new task/OS thread 
1966        created by scheduleThread_() would complete prior to the thread
1967        that spawned it managed to put 'itself' on the main-threads list.
1968        The upshot of it all being that the worker thread wouldn't get to
1969        signal the completion of the its work item for the main thread to
1970        see (==> it got stuck waiting.)    -- sof 6/02.
1971     */
1972     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1973     
1974     PUSH_ON_RUN_QUEUE(tso);
1975     // NB. Don't call THREAD_RUNNABLE() here, because the thread is
1976     // bound and only runnable by *this* OS thread, so waking up other
1977     // workers will just slow things down.
1978
1979     return waitThread_(m, initialCapability);
1980 }
1981
1982 /* ---------------------------------------------------------------------------
1983  * initScheduler()
1984  *
1985  * Initialise the scheduler.  This resets all the queues - if the
1986  * queues contained any threads, they'll be garbage collected at the
1987  * next pass.
1988  *
1989  * ------------------------------------------------------------------------ */
1990
1991 void 
1992 initScheduler(void)
1993 {
1994 #if defined(GRAN)
1995   nat i;
1996
1997   for (i=0; i<=MAX_PROC; i++) {
1998     run_queue_hds[i]      = END_TSO_QUEUE;
1999     run_queue_tls[i]      = END_TSO_QUEUE;
2000     blocked_queue_hds[i]  = END_TSO_QUEUE;
2001     blocked_queue_tls[i]  = END_TSO_QUEUE;
2002     ccalling_threadss[i]  = END_TSO_QUEUE;
2003     sleeping_queue        = END_TSO_QUEUE;
2004   }
2005 #else
2006   run_queue_hd      = END_TSO_QUEUE;
2007   run_queue_tl      = END_TSO_QUEUE;
2008   blocked_queue_hd  = END_TSO_QUEUE;
2009   blocked_queue_tl  = END_TSO_QUEUE;
2010   sleeping_queue    = END_TSO_QUEUE;
2011 #endif 
2012
2013   suspended_ccalling_threads  = END_TSO_QUEUE;
2014
2015   main_threads = NULL;
2016   all_threads  = END_TSO_QUEUE;
2017
2018   context_switch = 0;
2019   interrupted    = 0;
2020
2021   RtsFlags.ConcFlags.ctxtSwitchTicks =
2022       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2023       
2024 #if defined(RTS_SUPPORTS_THREADS)
2025   /* Initialise the mutex and condition variables used by
2026    * the scheduler. */
2027   initMutex(&sched_mutex);
2028   initMutex(&term_mutex);
2029 #endif
2030   
2031   ACQUIRE_LOCK(&sched_mutex);
2032
2033   /* A capability holds the state a native thread needs in
2034    * order to execute STG code. At least one capability is
2035    * floating around (only SMP builds have more than one).
2036    */
2037   initCapabilities();
2038   
2039 #if defined(RTS_SUPPORTS_THREADS)
2040     /* start our haskell execution tasks */
2041     startTaskManager(0,taskStart);
2042 #endif
2043
2044 #if /* defined(SMP) ||*/ defined(PAR)
2045   initSparkPools();
2046 #endif
2047
2048   RELEASE_LOCK(&sched_mutex);
2049 }
2050
2051 void
2052 exitScheduler( void )
2053 {
2054 #if defined(RTS_SUPPORTS_THREADS)
2055   stopTaskManager();
2056 #endif
2057   shutting_down_scheduler = rtsTrue;
2058 }
2059
2060 /* ----------------------------------------------------------------------------
2061    Managing the per-task allocation areas.
2062    
2063    Each capability comes with an allocation area.  These are
2064    fixed-length block lists into which allocation can be done.
2065
2066    ToDo: no support for two-space collection at the moment???
2067    ------------------------------------------------------------------------- */
2068
2069 static
2070 SchedulerStatus
2071 waitThread_(StgMainThread* m, Capability *initialCapability)
2072 {
2073   SchedulerStatus stat;
2074
2075   // Precondition: sched_mutex must be held.
2076   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2077
2078 #if defined(GRAN)
2079   /* GranSim specific init */
2080   CurrentTSO = m->tso;                // the TSO to run
2081   procStatus[MainProc] = Busy;        // status of main PE
2082   CurrentProc = MainProc;             // PE to run it on
2083   schedule(m,initialCapability);
2084 #else
2085   schedule(m,initialCapability);
2086   ASSERT(m->stat != NoStatus);
2087 #endif
2088
2089   stat = m->stat;
2090
2091 #if defined(RTS_SUPPORTS_THREADS)
2092   // Free the condition variable, returning it to the cache if possible.
2093   if (!bound_cond_cache_full) {
2094       bound_cond_cache = m->bound_thread_cond;
2095       bound_cond_cache_full = 1;
2096   } else {
2097       closeCondition(&m->bound_thread_cond);
2098   }
2099 #endif
2100
2101   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2102   stgFree(m);
2103
2104   // Postcondition: sched_mutex still held
2105   return stat;
2106 }
2107
2108 /* ---------------------------------------------------------------------------
2109    Where are the roots that we know about?
2110
2111         - all the threads on the runnable queue
2112         - all the threads on the blocked queue
2113         - all the threads on the sleeping queue
2114         - all the thread currently executing a _ccall_GC
2115         - all the "main threads"
2116      
2117    ------------------------------------------------------------------------ */
2118
2119 /* This has to be protected either by the scheduler monitor, or by the
2120         garbage collection monitor (probably the latter).
2121         KH @ 25/10/99
2122 */
2123
2124 void
2125 GetRoots( evac_fn evac )
2126 {
2127 #if defined(GRAN)
2128   {
2129     nat i;
2130     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2131       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2132           evac((StgClosure **)&run_queue_hds[i]);
2133       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2134           evac((StgClosure **)&run_queue_tls[i]);
2135       
2136       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2137           evac((StgClosure **)&blocked_queue_hds[i]);
2138       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2139           evac((StgClosure **)&blocked_queue_tls[i]);
2140       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2141           evac((StgClosure **)&ccalling_threads[i]);
2142     }
2143   }
2144
2145   markEventQueue();
2146
2147 #else /* !GRAN */
2148   if (run_queue_hd != END_TSO_QUEUE) {
2149       ASSERT(run_queue_tl != END_TSO_QUEUE);
2150       evac((StgClosure **)&run_queue_hd);
2151       evac((StgClosure **)&run_queue_tl);
2152   }
2153   
2154   if (blocked_queue_hd != END_TSO_QUEUE) {
2155       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2156       evac((StgClosure **)&blocked_queue_hd);
2157       evac((StgClosure **)&blocked_queue_tl);
2158   }
2159   
2160   if (sleeping_queue != END_TSO_QUEUE) {
2161       evac((StgClosure **)&sleeping_queue);
2162   }
2163 #endif 
2164
2165   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2166       evac((StgClosure **)&suspended_ccalling_threads);
2167   }
2168
2169 #if defined(PAR) || defined(GRAN)
2170   markSparkQueue(evac);
2171 #endif
2172
2173 #if defined(RTS_USER_SIGNALS)
2174   // mark the signal handlers (signals should be already blocked)
2175   markSignalHandlers(evac);
2176 #endif
2177 }
2178
2179 /* -----------------------------------------------------------------------------
2180    performGC
2181
2182    This is the interface to the garbage collector from Haskell land.
2183    We provide this so that external C code can allocate and garbage
2184    collect when called from Haskell via _ccall_GC.
2185
2186    It might be useful to provide an interface whereby the programmer
2187    can specify more roots (ToDo).
2188    
2189    This needs to be protected by the GC condition variable above.  KH.
2190    -------------------------------------------------------------------------- */
2191
2192 static void (*extra_roots)(evac_fn);
2193
2194 void
2195 performGC(void)
2196 {
2197   /* Obligated to hold this lock upon entry */
2198   ACQUIRE_LOCK(&sched_mutex);
2199   GarbageCollect(GetRoots,rtsFalse);
2200   RELEASE_LOCK(&sched_mutex);
2201 }
2202
2203 void
2204 performMajorGC(void)
2205 {
2206   ACQUIRE_LOCK(&sched_mutex);
2207   GarbageCollect(GetRoots,rtsTrue);
2208   RELEASE_LOCK(&sched_mutex);
2209 }
2210
2211 static void
2212 AllRoots(evac_fn evac)
2213 {
2214     GetRoots(evac);             // the scheduler's roots
2215     extra_roots(evac);          // the user's roots
2216 }
2217
2218 void
2219 performGCWithRoots(void (*get_roots)(evac_fn))
2220 {
2221   ACQUIRE_LOCK(&sched_mutex);
2222   extra_roots = get_roots;
2223   GarbageCollect(AllRoots,rtsFalse);
2224   RELEASE_LOCK(&sched_mutex);
2225 }
2226
2227 /* -----------------------------------------------------------------------------
2228    Stack overflow
2229
2230    If the thread has reached its maximum stack size, then raise the
2231    StackOverflow exception in the offending thread.  Otherwise
2232    relocate the TSO into a larger chunk of memory and adjust its stack
2233    size appropriately.
2234    -------------------------------------------------------------------------- */
2235
2236 static StgTSO *
2237 threadStackOverflow(StgTSO *tso)
2238 {
2239   nat new_stack_size, new_tso_size, stack_words;
2240   StgPtr new_sp;
2241   StgTSO *dest;
2242
2243   IF_DEBUG(sanity,checkTSO(tso));
2244   if (tso->stack_size >= tso->max_stack_size) {
2245
2246     IF_DEBUG(gc,
2247              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2248                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2249              /* If we're debugging, just print out the top of the stack */
2250              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2251                                               tso->sp+64)));
2252
2253     /* Send this thread the StackOverflow exception */
2254     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2255     return tso;
2256   }
2257
2258   /* Try to double the current stack size.  If that takes us over the
2259    * maximum stack size for this thread, then use the maximum instead.
2260    * Finally round up so the TSO ends up as a whole number of blocks.
2261    */
2262   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2263   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2264                                        TSO_STRUCT_SIZE)/sizeof(W_);
2265   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2266   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2267
2268   IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2269
2270   dest = (StgTSO *)allocate(new_tso_size);
2271   TICK_ALLOC_TSO(new_stack_size,0);
2272
2273   /* copy the TSO block and the old stack into the new area */
2274   memcpy(dest,tso,TSO_STRUCT_SIZE);
2275   stack_words = tso->stack + tso->stack_size - tso->sp;
2276   new_sp = (P_)dest + new_tso_size - stack_words;
2277   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2278
2279   /* relocate the stack pointers... */
2280   dest->sp         = new_sp;
2281   dest->stack_size = new_stack_size;
2282         
2283   /* Mark the old TSO as relocated.  We have to check for relocated
2284    * TSOs in the garbage collector and any primops that deal with TSOs.
2285    *
2286    * It's important to set the sp value to just beyond the end
2287    * of the stack, so we don't attempt to scavenge any part of the
2288    * dead TSO's stack.
2289    */
2290   tso->what_next = ThreadRelocated;
2291   tso->link = dest;
2292   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2293   tso->why_blocked = NotBlocked;
2294   dest->mut_link = NULL;
2295
2296   IF_PAR_DEBUG(verbose,
2297                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2298                      tso->id, tso, tso->stack_size);
2299                /* If we're debugging, just print out the top of the stack */
2300                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2301                                                 tso->sp+64)));
2302   
2303   IF_DEBUG(sanity,checkTSO(tso));
2304 #if 0
2305   IF_DEBUG(scheduler,printTSO(dest));
2306 #endif
2307
2308   return dest;
2309 }
2310
2311 /* ---------------------------------------------------------------------------
2312    Wake up a queue that was blocked on some resource.
2313    ------------------------------------------------------------------------ */
2314
2315 #if defined(GRAN)
2316 STATIC_INLINE void
2317 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2318 {
2319 }
2320 #elif defined(PAR)
2321 STATIC_INLINE void
2322 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2323 {
2324   /* write RESUME events to log file and
2325      update blocked and fetch time (depending on type of the orig closure) */
2326   if (RtsFlags.ParFlags.ParStats.Full) {
2327     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2328                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2329                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2330     if (EMPTY_RUN_QUEUE())
2331       emitSchedule = rtsTrue;
2332
2333     switch (get_itbl(node)->type) {
2334         case FETCH_ME_BQ:
2335           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2336           break;
2337         case RBH:
2338         case FETCH_ME:
2339         case BLACKHOLE_BQ:
2340           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2341           break;
2342 #ifdef DIST
2343         case MVAR:
2344           break;
2345 #endif    
2346         default:
2347           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2348         }
2349       }
2350 }
2351 #endif
2352
2353 #if defined(GRAN)
2354 static StgBlockingQueueElement *
2355 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2356 {
2357     StgTSO *tso;
2358     PEs node_loc, tso_loc;
2359
2360     node_loc = where_is(node); // should be lifted out of loop
2361     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2362     tso_loc = where_is((StgClosure *)tso);
2363     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2364       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2365       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2366       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2367       // insertThread(tso, node_loc);
2368       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2369                 ResumeThread,
2370                 tso, node, (rtsSpark*)NULL);
2371       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2372       // len_local++;
2373       // len++;
2374     } else { // TSO is remote (actually should be FMBQ)
2375       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2376                                   RtsFlags.GranFlags.Costs.gunblocktime +
2377                                   RtsFlags.GranFlags.Costs.latency;
2378       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2379                 UnblockThread,
2380                 tso, node, (rtsSpark*)NULL);
2381       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2382       // len++;
2383     }
2384     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2385     IF_GRAN_DEBUG(bq,
2386                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2387                           (node_loc==tso_loc ? "Local" : "Global"), 
2388                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2389     tso->block_info.closure = NULL;
2390     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2391                              tso->id, tso));
2392 }
2393 #elif defined(PAR)
2394 static StgBlockingQueueElement *
2395 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2396 {
2397     StgBlockingQueueElement *next;
2398
2399     switch (get_itbl(bqe)->type) {
2400     case TSO:
2401       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2402       /* if it's a TSO just push it onto the run_queue */
2403       next = bqe->link;
2404       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2405       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2406       THREAD_RUNNABLE();
2407       unblockCount(bqe, node);
2408       /* reset blocking status after dumping event */
2409       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2410       break;
2411
2412     case BLOCKED_FETCH:
2413       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2414       next = bqe->link;
2415       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2416       PendingFetches = (StgBlockedFetch *)bqe;
2417       break;
2418
2419 # if defined(DEBUG)
2420       /* can ignore this case in a non-debugging setup; 
2421          see comments on RBHSave closures above */
2422     case CONSTR:
2423       /* check that the closure is an RBHSave closure */
2424       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2425              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2426              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2427       break;
2428
2429     default:
2430       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2431            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2432            (StgClosure *)bqe);
2433 # endif
2434     }
2435   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2436   return next;
2437 }
2438
2439 #else /* !GRAN && !PAR */
2440 static StgTSO *
2441 unblockOneLocked(StgTSO *tso)
2442 {
2443   StgTSO *next;
2444
2445   ASSERT(get_itbl(tso)->type == TSO);
2446   ASSERT(tso->why_blocked != NotBlocked);
2447   tso->why_blocked = NotBlocked;
2448   next = tso->link;
2449   PUSH_ON_RUN_QUEUE(tso);
2450   THREAD_RUNNABLE();
2451   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2452   return next;
2453 }
2454 #endif
2455
2456 #if defined(GRAN) || defined(PAR)
2457 INLINE_ME StgBlockingQueueElement *
2458 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2459 {
2460   ACQUIRE_LOCK(&sched_mutex);
2461   bqe = unblockOneLocked(bqe, node);
2462   RELEASE_LOCK(&sched_mutex);
2463   return bqe;
2464 }
2465 #else
2466 INLINE_ME StgTSO *
2467 unblockOne(StgTSO *tso)
2468 {
2469   ACQUIRE_LOCK(&sched_mutex);
2470   tso = unblockOneLocked(tso);
2471   RELEASE_LOCK(&sched_mutex);
2472   return tso;
2473 }
2474 #endif
2475
2476 #if defined(GRAN)
2477 void 
2478 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2479 {
2480   StgBlockingQueueElement *bqe;
2481   PEs node_loc;
2482   nat len = 0; 
2483
2484   IF_GRAN_DEBUG(bq, 
2485                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2486                       node, CurrentProc, CurrentTime[CurrentProc], 
2487                       CurrentTSO->id, CurrentTSO));
2488
2489   node_loc = where_is(node);
2490
2491   ASSERT(q == END_BQ_QUEUE ||
2492          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2493          get_itbl(q)->type == CONSTR); // closure (type constructor)
2494   ASSERT(is_unique(node));
2495
2496   /* FAKE FETCH: magically copy the node to the tso's proc;
2497      no Fetch necessary because in reality the node should not have been 
2498      moved to the other PE in the first place
2499   */
2500   if (CurrentProc!=node_loc) {
2501     IF_GRAN_DEBUG(bq, 
2502                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2503                         node, node_loc, CurrentProc, CurrentTSO->id, 
2504                         // CurrentTSO, where_is(CurrentTSO),
2505                         node->header.gran.procs));
2506     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2507     IF_GRAN_DEBUG(bq, 
2508                   belch("## new bitmask of node %p is %#x",
2509                         node, node->header.gran.procs));
2510     if (RtsFlags.GranFlags.GranSimStats.Global) {
2511       globalGranStats.tot_fake_fetches++;
2512     }
2513   }
2514
2515   bqe = q;
2516   // ToDo: check: ASSERT(CurrentProc==node_loc);
2517   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2518     //next = bqe->link;
2519     /* 
2520        bqe points to the current element in the queue
2521        next points to the next element in the queue
2522     */
2523     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2524     //tso_loc = where_is(tso);
2525     len++;
2526     bqe = unblockOneLocked(bqe, node);
2527   }
2528
2529   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2530      the closure to make room for the anchor of the BQ */
2531   if (bqe!=END_BQ_QUEUE) {
2532     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2533     /*
2534     ASSERT((info_ptr==&RBH_Save_0_info) ||
2535            (info_ptr==&RBH_Save_1_info) ||
2536            (info_ptr==&RBH_Save_2_info));
2537     */
2538     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2539     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2540     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2541
2542     IF_GRAN_DEBUG(bq,
2543                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2544                         node, info_type(node)));
2545   }
2546
2547   /* statistics gathering */
2548   if (RtsFlags.GranFlags.GranSimStats.Global) {
2549     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2550     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2551     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2552     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2553   }
2554   IF_GRAN_DEBUG(bq,
2555                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2556                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2557 }
2558 #elif defined(PAR)
2559 void 
2560 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2561 {
2562   StgBlockingQueueElement *bqe;
2563
2564   ACQUIRE_LOCK(&sched_mutex);
2565
2566   IF_PAR_DEBUG(verbose, 
2567                belch("##-_ AwBQ for node %p on [%x]: ",
2568                      node, mytid));
2569 #ifdef DIST  
2570   //RFP
2571   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2572     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2573     return;
2574   }
2575 #endif
2576   
2577   ASSERT(q == END_BQ_QUEUE ||
2578          get_itbl(q)->type == TSO ||           
2579          get_itbl(q)->type == BLOCKED_FETCH || 
2580          get_itbl(q)->type == CONSTR); 
2581
2582   bqe = q;
2583   while (get_itbl(bqe)->type==TSO || 
2584          get_itbl(bqe)->type==BLOCKED_FETCH) {
2585     bqe = unblockOneLocked(bqe, node);
2586   }
2587   RELEASE_LOCK(&sched_mutex);
2588 }
2589
2590 #else   /* !GRAN && !PAR */
2591
2592 void
2593 awakenBlockedQueueNoLock(StgTSO *tso)
2594 {
2595   while (tso != END_TSO_QUEUE) {
2596     tso = unblockOneLocked(tso);
2597   }
2598 }
2599
2600 void
2601 awakenBlockedQueue(StgTSO *tso)
2602 {
2603   ACQUIRE_LOCK(&sched_mutex);
2604   while (tso != END_TSO_QUEUE) {
2605     tso = unblockOneLocked(tso);
2606   }
2607   RELEASE_LOCK(&sched_mutex);
2608 }
2609 #endif
2610
2611 /* ---------------------------------------------------------------------------
2612    Interrupt execution
2613    - usually called inside a signal handler so it mustn't do anything fancy.   
2614    ------------------------------------------------------------------------ */
2615
2616 void
2617 interruptStgRts(void)
2618 {
2619     interrupted    = 1;
2620     context_switch = 1;
2621 #ifdef RTS_SUPPORTS_THREADS
2622     wakeBlockedWorkerThread();
2623 #endif
2624 }
2625
2626 /* -----------------------------------------------------------------------------
2627    Unblock a thread
2628
2629    This is for use when we raise an exception in another thread, which
2630    may be blocked.
2631    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2632    -------------------------------------------------------------------------- */
2633
2634 #if defined(GRAN) || defined(PAR)
2635 /*
2636   NB: only the type of the blocking queue is different in GranSim and GUM
2637       the operations on the queue-elements are the same
2638       long live polymorphism!
2639
2640   Locks: sched_mutex is held upon entry and exit.
2641
2642 */
2643 static void
2644 unblockThread(StgTSO *tso)
2645 {
2646   StgBlockingQueueElement *t, **last;
2647
2648   switch (tso->why_blocked) {
2649
2650   case NotBlocked:
2651     return;  /* not blocked */
2652
2653   case BlockedOnMVar:
2654     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2655     {
2656       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2657       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2658
2659       last = (StgBlockingQueueElement **)&mvar->head;
2660       for (t = (StgBlockingQueueElement *)mvar->head; 
2661            t != END_BQ_QUEUE; 
2662            last = &t->link, last_tso = t, t = t->link) {
2663         if (t == (StgBlockingQueueElement *)tso) {
2664           *last = (StgBlockingQueueElement *)tso->link;
2665           if (mvar->tail == tso) {
2666             mvar->tail = (StgTSO *)last_tso;
2667           }
2668           goto done;
2669         }
2670       }
2671       barf("unblockThread (MVAR): TSO not found");
2672     }
2673
2674   case BlockedOnBlackHole:
2675     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2676     {
2677       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2678
2679       last = &bq->blocking_queue;
2680       for (t = bq->blocking_queue; 
2681            t != END_BQ_QUEUE; 
2682            last = &t->link, t = t->link) {
2683         if (t == (StgBlockingQueueElement *)tso) {
2684           *last = (StgBlockingQueueElement *)tso->link;
2685           goto done;
2686         }
2687       }
2688       barf("unblockThread (BLACKHOLE): TSO not found");
2689     }
2690
2691   case BlockedOnException:
2692     {
2693       StgTSO *target  = tso->block_info.tso;
2694
2695       ASSERT(get_itbl(target)->type == TSO);
2696
2697       if (target->what_next == ThreadRelocated) {
2698           target = target->link;
2699           ASSERT(get_itbl(target)->type == TSO);
2700       }
2701
2702       ASSERT(target->blocked_exceptions != NULL);
2703
2704       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2705       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2706            t != END_BQ_QUEUE; 
2707            last = &t->link, t = t->link) {
2708         ASSERT(get_itbl(t)->type == TSO);
2709         if (t == (StgBlockingQueueElement *)tso) {
2710           *last = (StgBlockingQueueElement *)tso->link;
2711           goto done;
2712         }
2713       }
2714       barf("unblockThread (Exception): TSO not found");
2715     }
2716
2717   case BlockedOnRead:
2718   case BlockedOnWrite:
2719 #if defined(mingw32_TARGET_OS)
2720   case BlockedOnDoProc:
2721 #endif
2722     {
2723       /* take TSO off blocked_queue */
2724       StgBlockingQueueElement *prev = NULL;
2725       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2726            prev = t, t = t->link) {
2727         if (t == (StgBlockingQueueElement *)tso) {
2728           if (prev == NULL) {
2729             blocked_queue_hd = (StgTSO *)t->link;
2730             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2731               blocked_queue_tl = END_TSO_QUEUE;
2732             }
2733           } else {
2734             prev->link = t->link;
2735             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2736               blocked_queue_tl = (StgTSO *)prev;
2737             }
2738           }
2739           goto done;
2740         }
2741       }
2742       barf("unblockThread (I/O): TSO not found");
2743     }
2744
2745   case BlockedOnDelay:
2746     {
2747       /* take TSO off sleeping_queue */
2748       StgBlockingQueueElement *prev = NULL;
2749       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2750            prev = t, t = t->link) {
2751         if (t == (StgBlockingQueueElement *)tso) {
2752           if (prev == NULL) {
2753             sleeping_queue = (StgTSO *)t->link;
2754           } else {
2755             prev->link = t->link;
2756           }
2757           goto done;
2758         }
2759       }
2760       barf("unblockThread (delay): TSO not found");
2761     }
2762
2763   default:
2764     barf("unblockThread");
2765   }
2766
2767  done:
2768   tso->link = END_TSO_QUEUE;
2769   tso->why_blocked = NotBlocked;
2770   tso->block_info.closure = NULL;
2771   PUSH_ON_RUN_QUEUE(tso);
2772 }
2773 #else
2774 static void
2775 unblockThread(StgTSO *tso)
2776 {
2777   StgTSO *t, **last;
2778   
2779   /* To avoid locking unnecessarily. */
2780   if (tso->why_blocked == NotBlocked) {
2781     return;
2782   }
2783
2784   switch (tso->why_blocked) {
2785
2786   case BlockedOnMVar:
2787     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2788     {
2789       StgTSO *last_tso = END_TSO_QUEUE;
2790       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2791
2792       last = &mvar->head;
2793       for (t = mvar->head; t != END_TSO_QUEUE; 
2794            last = &t->link, last_tso = t, t = t->link) {
2795         if (t == tso) {
2796           *last = tso->link;
2797           if (mvar->tail == tso) {
2798             mvar->tail = last_tso;
2799           }
2800           goto done;
2801         }
2802       }
2803       barf("unblockThread (MVAR): TSO not found");
2804     }
2805
2806   case BlockedOnBlackHole:
2807     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2808     {
2809       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2810
2811       last = &bq->blocking_queue;
2812       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2813            last = &t->link, t = t->link) {
2814         if (t == tso) {
2815           *last = tso->link;
2816           goto done;
2817         }
2818       }
2819       barf("unblockThread (BLACKHOLE): TSO not found");
2820     }
2821
2822   case BlockedOnException:
2823     {
2824       StgTSO *target  = tso->block_info.tso;
2825
2826       ASSERT(get_itbl(target)->type == TSO);
2827
2828       while (target->what_next == ThreadRelocated) {
2829           target = target->link;
2830           ASSERT(get_itbl(target)->type == TSO);
2831       }
2832       
2833       ASSERT(target->blocked_exceptions != NULL);
2834
2835       last = &target->blocked_exceptions;
2836       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2837            last = &t->link, t = t->link) {
2838         ASSERT(get_itbl(t)->type == TSO);
2839         if (t == tso) {
2840           *last = tso->link;
2841           goto done;
2842         }
2843       }
2844       barf("unblockThread (Exception): TSO not found");
2845     }
2846
2847   case BlockedOnRead:
2848   case BlockedOnWrite:
2849 #if defined(mingw32_TARGET_OS)
2850   case BlockedOnDoProc:
2851 #endif
2852     {
2853       StgTSO *prev = NULL;
2854       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2855            prev = t, t = t->link) {
2856         if (t == tso) {
2857           if (prev == NULL) {
2858             blocked_queue_hd = t->link;
2859             if (blocked_queue_tl == t) {
2860               blocked_queue_tl = END_TSO_QUEUE;
2861             }
2862           } else {
2863             prev->link = t->link;
2864             if (blocked_queue_tl == t) {
2865               blocked_queue_tl = prev;
2866             }
2867           }
2868           goto done;
2869         }
2870       }
2871       barf("unblockThread (I/O): TSO not found");
2872     }
2873
2874   case BlockedOnDelay:
2875     {
2876       StgTSO *prev = NULL;
2877       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2878            prev = t, t = t->link) {
2879         if (t == tso) {
2880           if (prev == NULL) {
2881             sleeping_queue = t->link;
2882           } else {
2883             prev->link = t->link;
2884           }
2885           goto done;
2886         }
2887       }
2888       barf("unblockThread (delay): TSO not found");
2889     }
2890
2891   default:
2892     barf("unblockThread");
2893   }
2894
2895  done:
2896   tso->link = END_TSO_QUEUE;
2897   tso->why_blocked = NotBlocked;
2898   tso->block_info.closure = NULL;
2899   PUSH_ON_RUN_QUEUE(tso);
2900 }
2901 #endif
2902
2903 /* -----------------------------------------------------------------------------
2904  * raiseAsync()
2905  *
2906  * The following function implements the magic for raising an
2907  * asynchronous exception in an existing thread.
2908  *
2909  * We first remove the thread from any queue on which it might be
2910  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2911  *
2912  * We strip the stack down to the innermost CATCH_FRAME, building
2913  * thunks in the heap for all the active computations, so they can 
2914  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2915  * an application of the handler to the exception, and push it on
2916  * the top of the stack.
2917  * 
2918  * How exactly do we save all the active computations?  We create an
2919  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2920  * AP_STACKs pushes everything from the corresponding update frame
2921  * upwards onto the stack.  (Actually, it pushes everything up to the
2922  * next update frame plus a pointer to the next AP_STACK object.
2923  * Entering the next AP_STACK object pushes more onto the stack until we
2924  * reach the last AP_STACK object - at which point the stack should look
2925  * exactly as it did when we killed the TSO and we can continue
2926  * execution by entering the closure on top of the stack.
2927  *
2928  * We can also kill a thread entirely - this happens if either (a) the 
2929  * exception passed to raiseAsync is NULL, or (b) there's no
2930  * CATCH_FRAME on the stack.  In either case, we strip the entire
2931  * stack and replace the thread with a zombie.
2932  *
2933  * Locks: sched_mutex held upon entry nor exit.
2934  *
2935  * -------------------------------------------------------------------------- */
2936  
2937 void 
2938 deleteThread(StgTSO *tso)
2939 {
2940   raiseAsync(tso,NULL);
2941 }
2942
2943 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2944 static void 
2945 deleteThreadImmediately(StgTSO *tso)
2946 { // for forkProcess only:
2947   // delete thread without giving it a chance to catch the KillThread exception
2948
2949   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2950       return;
2951   }
2952
2953   if (tso->why_blocked != BlockedOnCCall &&
2954       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2955     unblockThread(tso);
2956   }
2957
2958   tso->what_next = ThreadKilled;
2959 }
2960 #endif
2961
2962 void
2963 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2964 {
2965   /* When raising async exs from contexts where sched_mutex isn't held;
2966      use raiseAsyncWithLock(). */
2967   ACQUIRE_LOCK(&sched_mutex);
2968   raiseAsync(tso,exception);
2969   RELEASE_LOCK(&sched_mutex);
2970 }
2971
2972 void
2973 raiseAsync(StgTSO *tso, StgClosure *exception)
2974 {
2975     StgRetInfoTable *info;
2976     StgPtr sp;
2977   
2978     // Thread already dead?
2979     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2980         return;
2981     }
2982
2983     IF_DEBUG(scheduler, 
2984              sched_belch("raising exception in thread %ld.", tso->id));
2985     
2986     // Remove it from any blocking queues
2987     unblockThread(tso);
2988
2989     sp = tso->sp;
2990     
2991     // The stack freezing code assumes there's a closure pointer on
2992     // the top of the stack, so we have to arrange that this is the case...
2993     //
2994     if (sp[0] == (W_)&stg_enter_info) {
2995         sp++;
2996     } else {
2997         sp--;
2998         sp[0] = (W_)&stg_dummy_ret_closure;
2999     }
3000
3001     while (1) {
3002         nat i;
3003
3004         // 1. Let the top of the stack be the "current closure"
3005         //
3006         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3007         // CATCH_FRAME.
3008         //
3009         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3010         // current closure applied to the chunk of stack up to (but not
3011         // including) the update frame.  This closure becomes the "current
3012         // closure".  Go back to step 2.
3013         //
3014         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3015         // top of the stack applied to the exception.
3016         // 
3017         // 5. If it's a STOP_FRAME, then kill the thread.
3018         
3019         StgPtr frame;
3020         
3021         frame = sp + 1;
3022         info = get_ret_itbl((StgClosure *)frame);
3023         
3024         while (info->i.type != UPDATE_FRAME
3025                && (info->i.type != CATCH_FRAME || exception == NULL)
3026                && info->i.type != STOP_FRAME) {
3027             frame += stack_frame_sizeW((StgClosure *)frame);
3028             info = get_ret_itbl((StgClosure *)frame);
3029         }
3030         
3031         switch (info->i.type) {
3032             
3033         case CATCH_FRAME:
3034             // If we find a CATCH_FRAME, and we've got an exception to raise,
3035             // then build the THUNK raise(exception), and leave it on
3036             // top of the CATCH_FRAME ready to enter.
3037             //
3038         {
3039 #ifdef PROFILING
3040             StgCatchFrame *cf = (StgCatchFrame *)frame;
3041 #endif
3042             StgClosure *raise;
3043             
3044             // we've got an exception to raise, so let's pass it to the
3045             // handler in this frame.
3046             //
3047             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3048             TICK_ALLOC_SE_THK(1,0);
3049             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3050             raise->payload[0] = exception;
3051             
3052             // throw away the stack from Sp up to the CATCH_FRAME.
3053             //
3054             sp = frame - 1;
3055             
3056             /* Ensure that async excpetions are blocked now, so we don't get
3057              * a surprise exception before we get around to executing the
3058              * handler.
3059              */
3060             if (tso->blocked_exceptions == NULL) {
3061                 tso->blocked_exceptions = END_TSO_QUEUE;
3062             }
3063             
3064             /* Put the newly-built THUNK on top of the stack, ready to execute
3065              * when the thread restarts.
3066              */
3067             sp[0] = (W_)raise;
3068             sp[-1] = (W_)&stg_enter_info;
3069             tso->sp = sp-1;
3070             tso->what_next = ThreadRunGHC;
3071             IF_DEBUG(sanity, checkTSO(tso));
3072             return;
3073         }
3074         
3075         case UPDATE_FRAME:
3076         {
3077             StgAP_STACK * ap;
3078             nat words;
3079             
3080             // First build an AP_STACK consisting of the stack chunk above the
3081             // current update frame, with the top word on the stack as the
3082             // fun field.
3083             //
3084             words = frame - sp - 1;
3085             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3086             
3087             ap->size = words;
3088             ap->fun  = (StgClosure *)sp[0];
3089             sp++;
3090             for(i=0; i < (nat)words; ++i) {
3091                 ap->payload[i] = (StgClosure *)*sp++;
3092             }
3093             
3094             SET_HDR(ap,&stg_AP_STACK_info,
3095                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3096             TICK_ALLOC_UP_THK(words+1,0);
3097             
3098             IF_DEBUG(scheduler,
3099                      fprintf(stderr,  "sched: Updating ");
3100                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3101                      fprintf(stderr,  " with ");
3102                      printObj((StgClosure *)ap);
3103                 );
3104
3105             // Replace the updatee with an indirection - happily
3106             // this will also wake up any threads currently
3107             // waiting on the result.
3108             //
3109             // Warning: if we're in a loop, more than one update frame on
3110             // the stack may point to the same object.  Be careful not to
3111             // overwrite an IND_OLDGEN in this case, because we'll screw
3112             // up the mutable lists.  To be on the safe side, don't
3113             // overwrite any kind of indirection at all.  See also
3114             // threadSqueezeStack in GC.c, where we have to make a similar
3115             // check.
3116             //
3117             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3118                 // revert the black hole
3119                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3120             }
3121             sp += sizeofW(StgUpdateFrame) - 1;
3122             sp[0] = (W_)ap; // push onto stack
3123             break;
3124         }
3125         
3126         case STOP_FRAME:
3127             // We've stripped the entire stack, the thread is now dead.
3128             sp += sizeofW(StgStopFrame);
3129             tso->what_next = ThreadKilled;
3130             tso->sp = sp;
3131             return;
3132             
3133         default:
3134             barf("raiseAsync");
3135         }
3136     }
3137     barf("raiseAsync");
3138 }
3139
3140 /* -----------------------------------------------------------------------------
3141    resurrectThreads is called after garbage collection on the list of
3142    threads found to be garbage.  Each of these threads will be woken
3143    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3144    on an MVar, or NonTermination if the thread was blocked on a Black
3145    Hole.
3146
3147    Locks: sched_mutex isn't held upon entry nor exit.
3148    -------------------------------------------------------------------------- */
3149
3150 void
3151 resurrectThreads( StgTSO *threads )
3152 {
3153   StgTSO *tso, *next;
3154
3155   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3156     next = tso->global_link;
3157     tso->global_link = all_threads;
3158     all_threads = tso;
3159     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3160
3161     switch (tso->why_blocked) {
3162     case BlockedOnMVar:
3163     case BlockedOnException:
3164       /* Called by GC - sched_mutex lock is currently held. */
3165       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3166       break;
3167     case BlockedOnBlackHole:
3168       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3169       break;
3170     case NotBlocked:
3171       /* This might happen if the thread was blocked on a black hole
3172        * belonging to a thread that we've just woken up (raiseAsync
3173        * can wake up threads, remember...).
3174        */
3175       continue;
3176     default:
3177       barf("resurrectThreads: thread blocked in a strange way");
3178     }
3179   }
3180 }
3181
3182 /* -----------------------------------------------------------------------------
3183  * Blackhole detection: if we reach a deadlock, test whether any
3184  * threads are blocked on themselves.  Any threads which are found to
3185  * be self-blocked get sent a NonTermination exception.
3186  *
3187  * This is only done in a deadlock situation in order to avoid
3188  * performance overhead in the normal case.
3189  *
3190  * Locks: sched_mutex is held upon entry and exit.
3191  * -------------------------------------------------------------------------- */
3192
3193 static void
3194 detectBlackHoles( void )
3195 {
3196     StgTSO *tso = all_threads;
3197     StgClosure *frame;
3198     StgClosure *blocked_on;
3199     StgRetInfoTable *info;
3200
3201     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3202
3203         while (tso->what_next == ThreadRelocated) {
3204             tso = tso->link;
3205             ASSERT(get_itbl(tso)->type == TSO);
3206         }
3207       
3208         if (tso->why_blocked != BlockedOnBlackHole) {
3209             continue;
3210         }
3211         blocked_on = tso->block_info.closure;
3212
3213         frame = (StgClosure *)tso->sp;
3214
3215         while(1) {
3216             info = get_ret_itbl(frame);
3217             switch (info->i.type) {
3218             case UPDATE_FRAME:
3219                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3220                     /* We are blocking on one of our own computations, so
3221                      * send this thread the NonTermination exception.  
3222                      */
3223                     IF_DEBUG(scheduler, 
3224                              sched_belch("thread %d is blocked on itself", tso->id));
3225                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3226                     goto done;
3227                 }
3228                 
3229                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3230                 continue;
3231
3232             case STOP_FRAME:
3233                 goto done;
3234
3235                 // normal stack frames; do nothing except advance the pointer
3236             default:
3237                 (StgPtr)frame += stack_frame_sizeW(frame);
3238             }
3239         }   
3240         done: ;
3241     }
3242 }
3243
3244 /* ----------------------------------------------------------------------------
3245  * Debugging: why is a thread blocked
3246  * [Also provides useful information when debugging threaded programs
3247  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3248    ------------------------------------------------------------------------- */
3249
3250 static
3251 void
3252 printThreadBlockage(StgTSO *tso)
3253 {
3254   switch (tso->why_blocked) {
3255   case BlockedOnRead:
3256     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3257     break;
3258   case BlockedOnWrite:
3259     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3260     break;
3261 #if defined(mingw32_TARGET_OS)
3262     case BlockedOnDoProc:
3263     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3264     break;
3265 #endif
3266   case BlockedOnDelay:
3267     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3268     break;
3269   case BlockedOnMVar:
3270     fprintf(stderr,"is blocked on an MVar");
3271     break;
3272   case BlockedOnException:
3273     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3274             tso->block_info.tso->id);
3275     break;
3276   case BlockedOnBlackHole:
3277     fprintf(stderr,"is blocked on a black hole");
3278     break;
3279   case NotBlocked:
3280     fprintf(stderr,"is not blocked");
3281     break;
3282 #if defined(PAR)
3283   case BlockedOnGA:
3284     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3285             tso->block_info.closure, info_type(tso->block_info.closure));
3286     break;
3287   case BlockedOnGA_NoSend:
3288     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3289             tso->block_info.closure, info_type(tso->block_info.closure));
3290     break;
3291 #endif
3292   case BlockedOnCCall:
3293     fprintf(stderr,"is blocked on an external call");
3294     break;
3295   case BlockedOnCCall_NoUnblockExc:
3296     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3297     break;
3298   default:
3299     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3300          tso->why_blocked, tso->id, tso);
3301   }
3302 }
3303
3304 static
3305 void
3306 printThreadStatus(StgTSO *tso)
3307 {
3308   switch (tso->what_next) {
3309   case ThreadKilled:
3310     fprintf(stderr,"has been killed");
3311     break;
3312   case ThreadComplete:
3313     fprintf(stderr,"has completed");
3314     break;
3315   default:
3316     printThreadBlockage(tso);
3317   }
3318 }
3319
3320 void
3321 printAllThreads(void)
3322 {
3323   StgTSO *t;
3324   void *label;
3325
3326 # if defined(GRAN)
3327   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3328   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3329                        time_string, rtsFalse/*no commas!*/);
3330
3331   fprintf(stderr, "all threads at [%s]:\n", time_string);
3332 # elif defined(PAR)
3333   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3334   ullong_format_string(CURRENT_TIME,
3335                        time_string, rtsFalse/*no commas!*/);
3336
3337   fprintf(stderr,"all threads at [%s]:\n", time_string);
3338 # else
3339   fprintf(stderr,"all threads:\n");
3340 # endif
3341
3342   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3343     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3344     label = lookupThreadLabel(t->id);
3345     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3346     printThreadStatus(t);
3347     fprintf(stderr,"\n");
3348   }
3349 }
3350     
3351 #ifdef DEBUG
3352
3353 /* 
3354    Print a whole blocking queue attached to node (debugging only).
3355 */
3356 # if defined(PAR)
3357 void 
3358 print_bq (StgClosure *node)
3359 {
3360   StgBlockingQueueElement *bqe;
3361   StgTSO *tso;
3362   rtsBool end;
3363
3364   fprintf(stderr,"## BQ of closure %p (%s): ",
3365           node, info_type(node));
3366
3367   /* should cover all closures that may have a blocking queue */
3368   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3369          get_itbl(node)->type == FETCH_ME_BQ ||
3370          get_itbl(node)->type == RBH ||
3371          get_itbl(node)->type == MVAR);
3372     
3373   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3374
3375   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3376 }
3377
3378 /* 
3379    Print a whole blocking queue starting with the element bqe.
3380 */
3381 void 
3382 print_bqe (StgBlockingQueueElement *bqe)
3383 {
3384   rtsBool end;
3385
3386   /* 
3387      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3388   */
3389   for (end = (bqe==END_BQ_QUEUE);
3390        !end; // iterate until bqe points to a CONSTR
3391        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3392        bqe = end ? END_BQ_QUEUE : bqe->link) {
3393     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3394     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3395     /* types of closures that may appear in a blocking queue */
3396     ASSERT(get_itbl(bqe)->type == TSO ||           
3397            get_itbl(bqe)->type == BLOCKED_FETCH || 
3398            get_itbl(bqe)->type == CONSTR); 
3399     /* only BQs of an RBH end with an RBH_Save closure */
3400     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3401
3402     switch (get_itbl(bqe)->type) {
3403     case TSO:
3404       fprintf(stderr," TSO %u (%x),",
3405               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3406       break;
3407     case BLOCKED_FETCH:
3408       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3409               ((StgBlockedFetch *)bqe)->node, 
3410               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3411               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3412               ((StgBlockedFetch *)bqe)->ga.weight);
3413       break;
3414     case CONSTR:
3415       fprintf(stderr," %s (IP %p),",
3416               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3417                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3418                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3419                "RBH_Save_?"), get_itbl(bqe));
3420       break;
3421     default:
3422       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3423            info_type((StgClosure *)bqe)); // , node, info_type(node));
3424       break;
3425     }
3426   } /* for */
3427   fputc('\n', stderr);
3428 }
3429 # elif defined(GRAN)
3430 void 
3431 print_bq (StgClosure *node)
3432 {
3433   StgBlockingQueueElement *bqe;
3434   PEs node_loc, tso_loc;
3435   rtsBool end;
3436
3437   /* should cover all closures that may have a blocking queue */
3438   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3439          get_itbl(node)->type == FETCH_ME_BQ ||
3440          get_itbl(node)->type == RBH);
3441     
3442   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3443   node_loc = where_is(node);
3444
3445   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3446           node, info_type(node), node_loc);
3447
3448   /* 
3449      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3450   */
3451   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3452        !end; // iterate until bqe points to a CONSTR
3453        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3454     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3455     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3456     /* types of closures that may appear in a blocking queue */
3457     ASSERT(get_itbl(bqe)->type == TSO ||           
3458            get_itbl(bqe)->type == CONSTR); 
3459     /* only BQs of an RBH end with an RBH_Save closure */
3460     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3461
3462     tso_loc = where_is((StgClosure *)bqe);
3463     switch (get_itbl(bqe)->type) {
3464     case TSO:
3465       fprintf(stderr," TSO %d (%p) on [PE %d],",
3466               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3467       break;
3468     case CONSTR:
3469       fprintf(stderr," %s (IP %p),",
3470               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3471                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3472                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3473                "RBH_Save_?"), get_itbl(bqe));
3474       break;
3475     default:
3476       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3477            info_type((StgClosure *)bqe), node, info_type(node));
3478       break;
3479     }
3480   } /* for */
3481   fputc('\n', stderr);
3482 }
3483 #else
3484 /* 
3485    Nice and easy: only TSOs on the blocking queue
3486 */
3487 void 
3488 print_bq (StgClosure *node)
3489 {
3490   StgTSO *tso;
3491
3492   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3493   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3494        tso != END_TSO_QUEUE; 
3495        tso=tso->link) {
3496     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3497     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3498     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3499   }
3500   fputc('\n', stderr);
3501 }
3502 # endif
3503
3504 #if defined(PAR)
3505 static nat
3506 run_queue_len(void)
3507 {
3508   nat i;
3509   StgTSO *tso;
3510
3511   for (i=0, tso=run_queue_hd; 
3512        tso != END_TSO_QUEUE;
3513        i++, tso=tso->link)
3514     /* nothing */
3515
3516   return i;
3517 }
3518 #endif
3519
3520 void
3521 sched_belch(char *s, ...)
3522 {
3523   va_list ap;
3524   va_start(ap,s);
3525 #ifdef RTS_SUPPORTS_THREADS
3526   fprintf(stderr, "sched (task %p): ", osThreadId());
3527 #elif defined(PAR)
3528   fprintf(stderr, "== ");
3529 #else
3530   fprintf(stderr, "sched: ");
3531 #endif
3532   vfprintf(stderr, s, ap);
3533   fprintf(stderr, "\n");
3534   fflush(stderr);
3535   va_end(ap);
3536 }
3537
3538 #endif /* DEBUG */