b1b9fdac67eaed22726405be49397cbcedef31b4
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "Hooks.h"
48 #define COMPILING_SCHEDULER
49 #include "Schedule.h"
50 #include "StgMiscClosures.h"
51 #include "Storage.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "Timer.h"
59 #include "Prelude.h"
60 #include "ThreadLabels.h"
61 #include "LdvProfile.h"
62 #include "Updates.h"
63 #ifdef PROFILING
64 #include "Proftimer.h"
65 #include "ProfHeap.h"
66 #endif
67 #if defined(GRAN) || defined(PAR)
68 # include "GranSimRts.h"
69 # include "GranSim.h"
70 # include "ParallelRts.h"
71 # include "Parallel.h"
72 # include "ParallelDebug.h"
73 # include "FetchMe.h"
74 # include "HLC.h"
75 #endif
76 #include "Sparks.h"
77 #include "Capability.h"
78 #include "OSThreads.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 #ifdef THREADED_RTS
97 #define USED_IN_THREADED_RTS
98 #else
99 #define USED_IN_THREADED_RTS STG_UNUSED
100 #endif
101
102 #ifdef RTS_SUPPORTS_THREADS
103 #define USED_WHEN_RTS_SUPPORTS_THREADS
104 #else
105 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
106 #endif
107
108 /* Main thread queue.
109  * Locks required: sched_mutex.
110  */
111 StgMainThread *main_threads = NULL;
112
113 /* Thread queues.
114  * Locks required: sched_mutex.
115  */
116 #if defined(GRAN)
117
118 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
119 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
120
121 /* 
122    In GranSim we have a runnable and a blocked queue for each processor.
123    In order to minimise code changes new arrays run_queue_hds/tls
124    are created. run_queue_hd is then a short cut (macro) for
125    run_queue_hds[CurrentProc] (see GranSim.h).
126    -- HWL
127 */
128 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
129 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
130 StgTSO *ccalling_threadss[MAX_PROC];
131 /* We use the same global list of threads (all_threads) in GranSim as in
132    the std RTS (i.e. we are cheating). However, we don't use this list in
133    the GranSim specific code at the moment (so we are only potentially
134    cheating).  */
135
136 #else /* !GRAN */
137
138 StgTSO *run_queue_hd = NULL;
139 StgTSO *run_queue_tl = NULL;
140 StgTSO *blocked_queue_hd = NULL;
141 StgTSO *blocked_queue_tl = NULL;
142 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
143
144 #endif
145
146 /* Linked list of all threads.
147  * Used for detecting garbage collected threads.
148  */
149 StgTSO *all_threads = NULL;
150
151 /* When a thread performs a safe C call (_ccall_GC, using old
152  * terminology), it gets put on the suspended_ccalling_threads
153  * list. Used by the garbage collector.
154  */
155 static StgTSO *suspended_ccalling_threads;
156
157 static StgTSO *threadStackOverflow(StgTSO *tso);
158
159 /* KH: The following two flags are shared memory locations.  There is no need
160        to lock them, since they are only unset at the end of a scheduler
161        operation.
162 */
163
164 /* flag set by signal handler to precipitate a context switch */
165 nat context_switch = 0;
166
167 /* if this flag is set as well, give up execution */
168 rtsBool interrupted = rtsFalse;
169
170 /* Next thread ID to allocate.
171  * Locks required: thread_id_mutex
172  */
173 static StgThreadID next_thread_id = 1;
174
175 /*
176  * Pointers to the state of the current thread.
177  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
178  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
179  */
180  
181 /* The smallest stack size that makes any sense is:
182  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
183  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
184  *  + 1                       (the closure to enter)
185  *  + 1                       (stg_ap_v_ret)
186  *  + 1                       (spare slot req'd by stg_ap_v_ret)
187  *
188  * A thread with this stack will bomb immediately with a stack
189  * overflow, which will increase its stack size.  
190  */
191
192 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
193
194
195 #if defined(GRAN)
196 StgTSO *CurrentTSO;
197 #endif
198
199 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
200  *  exists - earlier gccs apparently didn't.
201  *  -= chak
202  */
203 StgTSO dummy_tso;
204
205 static rtsBool ready_to_gc;
206
207 /*
208  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
209  * in an MT setting, needed to signal that a worker thread shouldn't hang around
210  * in the scheduler when it is out of work.
211  */
212 static rtsBool shutting_down_scheduler = rtsFalse;
213
214 void            addToBlockedQueue ( StgTSO *tso );
215
216 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
217        void     interruptStgRts   ( void );
218
219 static void     detectBlackHoles  ( void );
220
221 #if defined(RTS_SUPPORTS_THREADS)
222 /* ToDo: carefully document the invariants that go together
223  *       with these synchronisation objects.
224  */
225 Mutex     sched_mutex       = INIT_MUTEX_VAR;
226 Mutex     term_mutex        = INIT_MUTEX_VAR;
227
228 #endif /* RTS_SUPPORTS_THREADS */
229
230 #if defined(PAR)
231 StgTSO *LastTSO;
232 rtsTime TimeOfLastYield;
233 rtsBool emitSchedule = rtsTrue;
234 #endif
235
236 #if DEBUG
237 static char *whatNext_strs[] = {
238   "(unknown)",
239   "ThreadRunGHC",
240   "ThreadInterpret",
241   "ThreadKilled",
242   "ThreadRelocated",
243   "ThreadComplete"
244 };
245 #endif
246
247 #if defined(PAR)
248 StgTSO * createSparkThread(rtsSpark spark);
249 StgTSO * activateSpark (rtsSpark spark);  
250 #endif
251
252 /* ----------------------------------------------------------------------------
253  * Starting Tasks
254  * ------------------------------------------------------------------------- */
255
256 #if defined(RTS_SUPPORTS_THREADS)
257 static rtsBool startingWorkerThread = rtsFalse;
258
259 static void taskStart(void);
260 static void
261 taskStart(void)
262 {
263   ACQUIRE_LOCK(&sched_mutex);
264   startingWorkerThread = rtsFalse;
265   schedule(NULL,NULL);
266   RELEASE_LOCK(&sched_mutex);
267 }
268
269 void
270 startSchedulerTaskIfNecessary(void)
271 {
272   if(run_queue_hd != END_TSO_QUEUE
273     || blocked_queue_hd != END_TSO_QUEUE
274     || sleeping_queue != END_TSO_QUEUE)
275   {
276     if(!startingWorkerThread)
277     { // we don't want to start another worker thread
278       // just because the last one hasn't yet reached the
279       // "waiting for capability" state
280       startingWorkerThread = rtsTrue;
281       if(!startTask(taskStart))
282       {
283         startingWorkerThread = rtsFalse;
284       }
285     }
286   }
287 }
288 #endif
289
290 /* ---------------------------------------------------------------------------
291    Main scheduling loop.
292
293    We use round-robin scheduling, each thread returning to the
294    scheduler loop when one of these conditions is detected:
295
296       * out of heap space
297       * timer expires (thread yields)
298       * thread blocks
299       * thread ends
300       * stack overflow
301
302    Locking notes:  we acquire the scheduler lock once at the beginning
303    of the scheduler loop, and release it when
304     
305       * running a thread, or
306       * waiting for work, or
307       * waiting for a GC to complete.
308
309    GRAN version:
310      In a GranSim setup this loop iterates over the global event queue.
311      This revolves around the global event queue, which determines what 
312      to do next. Therefore, it's more complicated than either the 
313      concurrent or the parallel (GUM) setup.
314
315    GUM version:
316      GUM iterates over incoming messages.
317      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
318      and sends out a fish whenever it has nothing to do; in-between
319      doing the actual reductions (shared code below) it processes the
320      incoming messages and deals with delayed operations 
321      (see PendingFetches).
322      This is not the ugliest code you could imagine, but it's bloody close.
323
324    ------------------------------------------------------------------------ */
325 static void
326 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
327           Capability *initialCapability )
328 {
329   StgTSO *t;
330   Capability *cap;
331   StgThreadReturnCode ret;
332 #if defined(GRAN)
333   rtsEvent *event;
334 #elif defined(PAR)
335   StgSparkPool *pool;
336   rtsSpark spark;
337   StgTSO *tso;
338   GlobalTaskId pe;
339   rtsBool receivedFinish = rtsFalse;
340 # if defined(DEBUG)
341   nat tp_size, sp_size; // stats only
342 # endif
343 #endif
344   rtsBool was_interrupted = rtsFalse;
345   nat prev_what_next;
346   
347   // Pre-condition: sched_mutex is held.
348   // We might have a capability, passed in as initialCapability.
349   cap = initialCapability;
350
351 #if defined(RTS_SUPPORTS_THREADS)
352   //
353   // in the threaded case, the capability is either passed in via the
354   // initialCapability parameter, or initialized inside the scheduler
355   // loop 
356   //
357   IF_DEBUG(scheduler,
358            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
359                        mainThread, initialCapability);
360       );
361 #else
362   // simply initialise it in the non-threaded case
363   grabCapability(&cap);
364 #endif
365
366 #if defined(GRAN)
367   /* set up first event to get things going */
368   /* ToDo: assign costs for system setup and init MainTSO ! */
369   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
370             ContinueThread, 
371             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
372
373   IF_DEBUG(gran,
374            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
375            G_TSO(CurrentTSO, 5));
376
377   if (RtsFlags.GranFlags.Light) {
378     /* Save current time; GranSim Light only */
379     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
380   }      
381
382   event = get_next_event();
383
384   while (event!=(rtsEvent*)NULL) {
385     /* Choose the processor with the next event */
386     CurrentProc = event->proc;
387     CurrentTSO = event->tso;
388
389 #elif defined(PAR)
390
391   while (!receivedFinish) {    /* set by processMessages */
392                                /* when receiving PP_FINISH message         */ 
393
394 #else // everything except GRAN and PAR
395
396   while (1) {
397
398 #endif
399
400      IF_DEBUG(scheduler, printAllThreads());
401
402 #if defined(RTS_SUPPORTS_THREADS)
403       // Yield the capability to higher-priority tasks if necessary.
404       //
405       if (cap != NULL) {
406           yieldCapability(&cap);
407       }
408
409       // If we do not currently hold a capability, we wait for one
410       //
411       if (cap == NULL) {
412           waitForCapability(&sched_mutex, &cap,
413                             mainThread ? &mainThread->bound_thread_cond : NULL);
414       }
415
416       // We now have a capability...
417 #endif
418
419     //
420     // If we're interrupted (the user pressed ^C, or some other
421     // termination condition occurred), kill all the currently running
422     // threads.
423     //
424     if (interrupted) {
425         IF_DEBUG(scheduler, sched_belch("interrupted"));
426         interrupted = rtsFalse;
427         was_interrupted = rtsTrue;
428 #if defined(RTS_SUPPORTS_THREADS)
429         // In the threaded RTS, deadlock detection doesn't work,
430         // so just exit right away.
431         prog_belch("interrupted");
432         releaseCapability(cap);
433         RELEASE_LOCK(&sched_mutex);
434         shutdownHaskellAndExit(EXIT_SUCCESS);
435 #else
436         deleteAllThreads();
437 #endif
438     }
439
440 #if defined(RTS_USER_SIGNALS)
441     // check for signals each time around the scheduler
442     if (signals_pending()) {
443       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
444       startSignalHandlers();
445       ACQUIRE_LOCK(&sched_mutex);
446     }
447 #endif
448
449     //
450     // Check whether any waiting threads need to be woken up.  If the
451     // run queue is empty, and there are no other tasks running, we
452     // can wait indefinitely for something to happen.
453     //
454     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
455 #if defined(RTS_SUPPORTS_THREADS)
456                 || EMPTY_RUN_QUEUE()
457 #endif
458         )
459     {
460       awaitEvent( EMPTY_RUN_QUEUE() );
461     }
462     // we can be interrupted while waiting for I/O...
463     if (interrupted) continue;
464
465     /* 
466      * Detect deadlock: when we have no threads to run, there are no
467      * threads waiting on I/O or sleeping, and all the other tasks are
468      * waiting for work, we must have a deadlock of some description.
469      *
470      * We first try to find threads blocked on themselves (ie. black
471      * holes), and generate NonTermination exceptions where necessary.
472      *
473      * If no threads are black holed, we have a deadlock situation, so
474      * inform all the main threads.
475      */
476 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
477     if (   EMPTY_THREAD_QUEUES() )
478     {
479         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
480         // Garbage collection can release some new threads due to
481         // either (a) finalizers or (b) threads resurrected because
482         // they are about to be send BlockedOnDeadMVar.  Any threads
483         // thus released will be immediately runnable.
484         GarbageCollect(GetRoots,rtsTrue);
485
486         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
487
488         IF_DEBUG(scheduler, 
489                  sched_belch("still deadlocked, checking for black holes..."));
490         detectBlackHoles();
491
492         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
493
494 #if defined(RTS_USER_SIGNALS)
495         /* If we have user-installed signal handlers, then wait
496          * for signals to arrive rather then bombing out with a
497          * deadlock.
498          */
499         if ( anyUserHandlers() ) {
500             IF_DEBUG(scheduler, 
501                      sched_belch("still deadlocked, waiting for signals..."));
502
503             awaitUserSignals();
504
505             // we might be interrupted...
506             if (interrupted) { continue; }
507
508             if (signals_pending()) {
509                 RELEASE_LOCK(&sched_mutex);
510                 startSignalHandlers();
511                 ACQUIRE_LOCK(&sched_mutex);
512             }
513             ASSERT(!EMPTY_RUN_QUEUE());
514             goto not_deadlocked;
515         }
516 #endif
517
518         /* Probably a real deadlock.  Send the current main thread the
519          * Deadlock exception (or in the SMP build, send *all* main
520          * threads the deadlock exception, since none of them can make
521          * progress).
522          */
523         {
524             StgMainThread *m;
525             m = main_threads;
526             switch (m->tso->why_blocked) {
527             case BlockedOnBlackHole:
528             case BlockedOnException:
529             case BlockedOnMVar:
530                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
531                 break;
532             default:
533                 barf("deadlock: main thread blocked in a strange way");
534             }
535         }
536     }
537   not_deadlocked:
538
539 #elif defined(RTS_SUPPORTS_THREADS)
540     // ToDo: add deadlock detection in threaded RTS
541 #elif defined(PAR)
542     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
543 #endif
544
545 #if defined(RTS_SUPPORTS_THREADS)
546     if ( EMPTY_RUN_QUEUE() ) {
547         continue; // nothing to do
548     }
549 #endif
550
551 #if defined(GRAN)
552     if (RtsFlags.GranFlags.Light)
553       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
554
555     /* adjust time based on time-stamp */
556     if (event->time > CurrentTime[CurrentProc] &&
557         event->evttype != ContinueThread)
558       CurrentTime[CurrentProc] = event->time;
559     
560     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
561     if (!RtsFlags.GranFlags.Light)
562       handleIdlePEs();
563
564     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
565
566     /* main event dispatcher in GranSim */
567     switch (event->evttype) {
568       /* Should just be continuing execution */
569     case ContinueThread:
570       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
571       /* ToDo: check assertion
572       ASSERT(run_queue_hd != (StgTSO*)NULL &&
573              run_queue_hd != END_TSO_QUEUE);
574       */
575       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
576       if (!RtsFlags.GranFlags.DoAsyncFetch &&
577           procStatus[CurrentProc]==Fetching) {
578         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
579               CurrentTSO->id, CurrentTSO, CurrentProc);
580         goto next_thread;
581       } 
582       /* Ignore ContinueThreads for completed threads */
583       if (CurrentTSO->what_next == ThreadComplete) {
584         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
585               CurrentTSO->id, CurrentTSO, CurrentProc);
586         goto next_thread;
587       } 
588       /* Ignore ContinueThreads for threads that are being migrated */
589       if (PROCS(CurrentTSO)==Nowhere) { 
590         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
591               CurrentTSO->id, CurrentTSO, CurrentProc);
592         goto next_thread;
593       }
594       /* The thread should be at the beginning of the run queue */
595       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
596         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
597               CurrentTSO->id, CurrentTSO, CurrentProc);
598         break; // run the thread anyway
599       }
600       /*
601       new_event(proc, proc, CurrentTime[proc],
602                 FindWork,
603                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
604       goto next_thread; 
605       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
606       break; // now actually run the thread; DaH Qu'vam yImuHbej 
607
608     case FetchNode:
609       do_the_fetchnode(event);
610       goto next_thread;             /* handle next event in event queue  */
611       
612     case GlobalBlock:
613       do_the_globalblock(event);
614       goto next_thread;             /* handle next event in event queue  */
615       
616     case FetchReply:
617       do_the_fetchreply(event);
618       goto next_thread;             /* handle next event in event queue  */
619       
620     case UnblockThread:   /* Move from the blocked queue to the tail of */
621       do_the_unblock(event);
622       goto next_thread;             /* handle next event in event queue  */
623       
624     case ResumeThread:  /* Move from the blocked queue to the tail of */
625       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
626       event->tso->gran.blocktime += 
627         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
628       do_the_startthread(event);
629       goto next_thread;             /* handle next event in event queue  */
630       
631     case StartThread:
632       do_the_startthread(event);
633       goto next_thread;             /* handle next event in event queue  */
634       
635     case MoveThread:
636       do_the_movethread(event);
637       goto next_thread;             /* handle next event in event queue  */
638       
639     case MoveSpark:
640       do_the_movespark(event);
641       goto next_thread;             /* handle next event in event queue  */
642       
643     case FindWork:
644       do_the_findwork(event);
645       goto next_thread;             /* handle next event in event queue  */
646       
647     default:
648       barf("Illegal event type %u\n", event->evttype);
649     }  /* switch */
650     
651     /* This point was scheduler_loop in the old RTS */
652
653     IF_DEBUG(gran, belch("GRAN: after main switch"));
654
655     TimeOfLastEvent = CurrentTime[CurrentProc];
656     TimeOfNextEvent = get_time_of_next_event();
657     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
658     // CurrentTSO = ThreadQueueHd;
659
660     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
661                          TimeOfNextEvent));
662
663     if (RtsFlags.GranFlags.Light) 
664       GranSimLight_leave_system(event, &ActiveTSO); 
665
666     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
667
668     IF_DEBUG(gran, 
669              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
670
671     /* in a GranSim setup the TSO stays on the run queue */
672     t = CurrentTSO;
673     /* Take a thread from the run queue. */
674     POP_RUN_QUEUE(t); // take_off_run_queue(t);
675
676     IF_DEBUG(gran, 
677              fprintf(stderr, "GRAN: About to run current thread, which is\n");
678              G_TSO(t,5));
679
680     context_switch = 0; // turned on via GranYield, checking events and time slice
681
682     IF_DEBUG(gran, 
683              DumpGranEvent(GR_SCHEDULE, t));
684
685     procStatus[CurrentProc] = Busy;
686
687 #elif defined(PAR)
688     if (PendingFetches != END_BF_QUEUE) {
689         processFetches();
690     }
691
692     /* ToDo: phps merge with spark activation above */
693     /* check whether we have local work and send requests if we have none */
694     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
695       /* :-[  no local threads => look out for local sparks */
696       /* the spark pool for the current PE */
697       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
698       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
699           pool->hd < pool->tl) {
700         /* 
701          * ToDo: add GC code check that we really have enough heap afterwards!!
702          * Old comment:
703          * If we're here (no runnable threads) and we have pending
704          * sparks, we must have a space problem.  Get enough space
705          * to turn one of those pending sparks into a
706          * thread... 
707          */
708
709         spark = findSpark(rtsFalse);                /* get a spark */
710         if (spark != (rtsSpark) NULL) {
711           tso = activateSpark(spark);       /* turn the spark into a thread */
712           IF_PAR_DEBUG(schedule,
713                        belch("==== schedule: Created TSO %d (%p); %d threads active",
714                              tso->id, tso, advisory_thread_count));
715
716           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
717             belch("==^^ failed to activate spark");
718             goto next_thread;
719           }               /* otherwise fall through & pick-up new tso */
720         } else {
721           IF_PAR_DEBUG(verbose,
722                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
723                              spark_queue_len(pool)));
724           goto next_thread;
725         }
726       }
727
728       /* If we still have no work we need to send a FISH to get a spark
729          from another PE 
730       */
731       if (EMPTY_RUN_QUEUE()) {
732       /* =8-[  no local sparks => look for work on other PEs */
733         /*
734          * We really have absolutely no work.  Send out a fish
735          * (there may be some out there already), and wait for
736          * something to arrive.  We clearly can't run any threads
737          * until a SCHEDULE or RESUME arrives, and so that's what
738          * we're hoping to see.  (Of course, we still have to
739          * respond to other types of messages.)
740          */
741         TIME now = msTime() /*CURRENT_TIME*/;
742         IF_PAR_DEBUG(verbose, 
743                      belch("--  now=%ld", now));
744         IF_PAR_DEBUG(verbose,
745                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
746                          (last_fish_arrived_at!=0 &&
747                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
748                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
749                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
750                              last_fish_arrived_at,
751                              RtsFlags.ParFlags.fishDelay, now);
752                      });
753         
754         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
755             (last_fish_arrived_at==0 ||
756              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
757           /* outstandingFishes is set in sendFish, processFish;
758              avoid flooding system with fishes via delay */
759           pe = choosePE();
760           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
761                    NEW_FISH_HUNGER);
762
763           // Global statistics: count no. of fishes
764           if (RtsFlags.ParFlags.ParStats.Global &&
765               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
766             globalParStats.tot_fish_mess++;
767           }
768         }
769       
770         receivedFinish = processMessages();
771         goto next_thread;
772       }
773     } else if (PacketsWaiting()) {  /* Look for incoming messages */
774       receivedFinish = processMessages();
775     }
776
777     /* Now we are sure that we have some work available */
778     ASSERT(run_queue_hd != END_TSO_QUEUE);
779
780     /* Take a thread from the run queue, if we have work */
781     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
782     IF_DEBUG(sanity,checkTSO(t));
783
784     /* ToDo: write something to the log-file
785     if (RTSflags.ParFlags.granSimStats && !sameThread)
786         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
787
788     CurrentTSO = t;
789     */
790     /* the spark pool for the current PE */
791     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
792
793     IF_DEBUG(scheduler, 
794              belch("--=^ %d threads, %d sparks on [%#x]", 
795                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
796
797 # if 1
798     if (0 && RtsFlags.ParFlags.ParStats.Full && 
799         t && LastTSO && t->id != LastTSO->id && 
800         LastTSO->why_blocked == NotBlocked && 
801         LastTSO->what_next != ThreadComplete) {
802       // if previously scheduled TSO not blocked we have to record the context switch
803       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
804                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
805     }
806
807     if (RtsFlags.ParFlags.ParStats.Full && 
808         (emitSchedule /* forced emit */ ||
809         (t && LastTSO && t->id != LastTSO->id))) {
810       /* 
811          we are running a different TSO, so write a schedule event to log file
812          NB: If we use fair scheduling we also have to write  a deschedule 
813              event for LastTSO; with unfair scheduling we know that the
814              previous tso has blocked whenever we switch to another tso, so
815              we don't need it in GUM for now
816       */
817       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
818                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
819       emitSchedule = rtsFalse;
820     }
821      
822 # endif
823 #else /* !GRAN && !PAR */
824   
825     // grab a thread from the run queue
826     ASSERT(run_queue_hd != END_TSO_QUEUE);
827     POP_RUN_QUEUE(t);
828
829     // Sanity check the thread we're about to run.  This can be
830     // expensive if there is lots of thread switching going on...
831     IF_DEBUG(sanity,checkTSO(t));
832 #endif
833
834 #ifdef THREADED_RTS
835     {
836       StgMainThread *m = t->main;
837       
838       if(m)
839       {
840         if(m == mainThread)
841         {
842           IF_DEBUG(scheduler,
843             sched_belch("### Running thread %d in bound thread", t->id));
844           // yes, the Haskell thread is bound to the current native thread
845         }
846         else
847         {
848           IF_DEBUG(scheduler,
849             sched_belch("### thread %d bound to another OS thread", t->id));
850           // no, bound to a different Haskell thread: pass to that thread
851           PUSH_ON_RUN_QUEUE(t);
852           passCapability(&m->bound_thread_cond);
853           continue;
854         }
855       }
856       else
857       {
858         if(mainThread != NULL)
859         // The thread we want to run is bound.
860         {
861           IF_DEBUG(scheduler,
862             sched_belch("### this OS thread cannot run thread %d", t->id));
863           // no, the current native thread is bound to a different
864           // Haskell thread, so pass it to any worker thread
865           PUSH_ON_RUN_QUEUE(t);
866           passCapabilityToWorker();
867           continue; 
868         }
869       }
870     }
871 #endif
872
873     cap->r.rCurrentTSO = t;
874     
875     /* context switches are now initiated by the timer signal, unless
876      * the user specified "context switch as often as possible", with
877      * +RTS -C0
878      */
879     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
880          && (run_queue_hd != END_TSO_QUEUE
881              || blocked_queue_hd != END_TSO_QUEUE
882              || sleeping_queue != END_TSO_QUEUE)))
883         context_switch = 1;
884
885 run_thread:
886
887     RELEASE_LOCK(&sched_mutex);
888
889     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
890                               t->id, whatNext_strs[t->what_next]));
891
892 #ifdef PROFILING
893     startHeapProfTimer();
894 #endif
895
896     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
897     /* Run the current thread 
898      */
899     prev_what_next = t->what_next;
900
901     errno = t->saved_errno;
902
903     switch (prev_what_next) {
904
905     case ThreadKilled:
906     case ThreadComplete:
907         /* Thread already finished, return to scheduler. */
908         ret = ThreadFinished;
909         break;
910
911     case ThreadRunGHC:
912         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
913         break;
914
915     case ThreadInterpret:
916         ret = interpretBCO(cap);
917         break;
918
919     default:
920       barf("schedule: invalid what_next field");
921     }
922
923     // The TSO might have moved, so find the new location:
924     t = cap->r.rCurrentTSO;
925
926     // And save the current errno in this thread.
927     t->saved_errno = errno;
928
929     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
930     
931     /* Costs for the scheduler are assigned to CCS_SYSTEM */
932 #ifdef PROFILING
933     stopHeapProfTimer();
934     CCCS = CCS_SYSTEM;
935 #endif
936     
937     ACQUIRE_LOCK(&sched_mutex);
938     
939 #ifdef RTS_SUPPORTS_THREADS
940     IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
941 #elif !defined(GRAN) && !defined(PAR)
942     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
943 #endif
944     
945 #if defined(PAR)
946     /* HACK 675: if the last thread didn't yield, make sure to print a 
947        SCHEDULE event to the log file when StgRunning the next thread, even
948        if it is the same one as before */
949     LastTSO = t; 
950     TimeOfLastYield = CURRENT_TIME;
951 #endif
952
953     switch (ret) {
954     case HeapOverflow:
955 #if defined(GRAN)
956       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
957       globalGranStats.tot_heapover++;
958 #elif defined(PAR)
959       globalParStats.tot_heapover++;
960 #endif
961
962       // did the task ask for a large block?
963       if (cap->r.rHpAlloc > BLOCK_SIZE) {
964           // if so, get one and push it on the front of the nursery.
965           bdescr *bd;
966           nat blocks;
967           
968           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
969
970           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
971                                    t->id, whatNext_strs[t->what_next], blocks));
972
973           // don't do this if it would push us over the
974           // alloc_blocks_lim limit; we'll GC first.
975           if (alloc_blocks + blocks < alloc_blocks_lim) {
976
977               alloc_blocks += blocks;
978               bd = allocGroup( blocks );
979
980               // link the new group into the list
981               bd->link = cap->r.rCurrentNursery;
982               bd->u.back = cap->r.rCurrentNursery->u.back;
983               if (cap->r.rCurrentNursery->u.back != NULL) {
984                   cap->r.rCurrentNursery->u.back->link = bd;
985               } else {
986                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
987                          g0s0->blocks == cap->r.rNursery);
988                   cap->r.rNursery = g0s0->blocks = bd;
989               }           
990               cap->r.rCurrentNursery->u.back = bd;
991
992               // initialise it as a nursery block.  We initialise the
993               // step, gen_no, and flags field of *every* sub-block in
994               // this large block, because this is easier than making
995               // sure that we always find the block head of a large
996               // block whenever we call Bdescr() (eg. evacuate() and
997               // isAlive() in the GC would both have to do this, at
998               // least).
999               { 
1000                   bdescr *x;
1001                   for (x = bd; x < bd + blocks; x++) {
1002                       x->step = g0s0;
1003                       x->gen_no = 0;
1004                       x->flags = 0;
1005                   }
1006               }
1007
1008               // don't forget to update the block count in g0s0.
1009               g0s0->n_blocks += blocks;
1010               // This assert can be a killer if the app is doing lots
1011               // of large block allocations.
1012               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1013
1014               // now update the nursery to point to the new block
1015               cap->r.rCurrentNursery = bd;
1016
1017               // we might be unlucky and have another thread get on the
1018               // run queue before us and steal the large block, but in that
1019               // case the thread will just end up requesting another large
1020               // block.
1021               PUSH_ON_RUN_QUEUE(t);
1022               break;
1023           }
1024       }
1025
1026       /* make all the running tasks block on a condition variable,
1027        * maybe set context_switch and wait till they all pile in,
1028        * then have them wait on a GC condition variable.
1029        */
1030       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1031                                t->id, whatNext_strs[t->what_next]));
1032       threadPaused(t);
1033 #if defined(GRAN)
1034       ASSERT(!is_on_queue(t,CurrentProc));
1035 #elif defined(PAR)
1036       /* Currently we emit a DESCHEDULE event before GC in GUM.
1037          ToDo: either add separate event to distinguish SYSTEM time from rest
1038                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1039       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1040         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1041                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1042         emitSchedule = rtsTrue;
1043       }
1044 #endif
1045       
1046       ready_to_gc = rtsTrue;
1047       context_switch = 1;               /* stop other threads ASAP */
1048       PUSH_ON_RUN_QUEUE(t);
1049       /* actual GC is done at the end of the while loop */
1050       break;
1051       
1052     case StackOverflow:
1053 #if defined(GRAN)
1054       IF_DEBUG(gran, 
1055                DumpGranEvent(GR_DESCHEDULE, t));
1056       globalGranStats.tot_stackover++;
1057 #elif defined(PAR)
1058       // IF_DEBUG(par, 
1059       // DumpGranEvent(GR_DESCHEDULE, t);
1060       globalParStats.tot_stackover++;
1061 #endif
1062       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1063                                t->id, whatNext_strs[t->what_next]));
1064       /* just adjust the stack for this thread, then pop it back
1065        * on the run queue.
1066        */
1067       threadPaused(t);
1068       { 
1069         /* enlarge the stack */
1070         StgTSO *new_t = threadStackOverflow(t);
1071         
1072         /* This TSO has moved, so update any pointers to it from the
1073          * main thread stack.  It better not be on any other queues...
1074          * (it shouldn't be).
1075          */
1076         if (t->main != NULL) {
1077             t->main->tso = new_t;
1078         }
1079         PUSH_ON_RUN_QUEUE(new_t);
1080       }
1081       break;
1082
1083     case ThreadYielding:
1084       // Reset the context switch flag.  We don't do this just before
1085       // running the thread, because that would mean we would lose ticks
1086       // during GC, which can lead to unfair scheduling (a thread hogs
1087       // the CPU because the tick always arrives during GC).  This way
1088       // penalises threads that do a lot of allocation, but that seems
1089       // better than the alternative.
1090       context_switch = 0;
1091
1092 #if defined(GRAN)
1093       IF_DEBUG(gran, 
1094                DumpGranEvent(GR_DESCHEDULE, t));
1095       globalGranStats.tot_yields++;
1096 #elif defined(PAR)
1097       // IF_DEBUG(par, 
1098       // DumpGranEvent(GR_DESCHEDULE, t);
1099       globalParStats.tot_yields++;
1100 #endif
1101       /* put the thread back on the run queue.  Then, if we're ready to
1102        * GC, check whether this is the last task to stop.  If so, wake
1103        * up the GC thread.  getThread will block during a GC until the
1104        * GC is finished.
1105        */
1106       IF_DEBUG(scheduler,
1107                if (t->what_next != prev_what_next) {
1108                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1109                          t->id, whatNext_strs[t->what_next]);
1110                } else {
1111                    belch("--<< thread %ld (%s) stopped, yielding", 
1112                          t->id, whatNext_strs[t->what_next]);
1113                }
1114                );
1115
1116       IF_DEBUG(sanity,
1117                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1118                checkTSO(t));
1119       ASSERT(t->link == END_TSO_QUEUE);
1120
1121       // Shortcut if we're just switching evaluators: don't bother
1122       // doing stack squeezing (which can be expensive), just run the
1123       // thread.
1124       if (t->what_next != prev_what_next) {
1125           goto run_thread;
1126       }
1127
1128       threadPaused(t);
1129
1130 #if defined(GRAN)
1131       ASSERT(!is_on_queue(t,CurrentProc));
1132
1133       IF_DEBUG(sanity,
1134                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1135                checkThreadQsSanity(rtsTrue));
1136 #endif
1137
1138 #if defined(PAR)
1139       if (RtsFlags.ParFlags.doFairScheduling) { 
1140         /* this does round-robin scheduling; good for concurrency */
1141         APPEND_TO_RUN_QUEUE(t);
1142       } else {
1143         /* this does unfair scheduling; good for parallelism */
1144         PUSH_ON_RUN_QUEUE(t);
1145       }
1146 #else
1147       // this does round-robin scheduling; good for concurrency
1148       APPEND_TO_RUN_QUEUE(t);
1149 #endif
1150
1151 #if defined(GRAN)
1152       /* add a ContinueThread event to actually process the thread */
1153       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1154                 ContinueThread,
1155                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1156       IF_GRAN_DEBUG(bq, 
1157                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1158                G_EVENTQ(0);
1159                G_CURR_THREADQ(0));
1160 #endif /* GRAN */
1161       break;
1162
1163     case ThreadBlocked:
1164 #if defined(GRAN)
1165       IF_DEBUG(scheduler,
1166                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1167                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1168                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1169
1170       // ??? needed; should emit block before
1171       IF_DEBUG(gran, 
1172                DumpGranEvent(GR_DESCHEDULE, t)); 
1173       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1174       /*
1175         ngoq Dogh!
1176       ASSERT(procStatus[CurrentProc]==Busy || 
1177               ((procStatus[CurrentProc]==Fetching) && 
1178               (t->block_info.closure!=(StgClosure*)NULL)));
1179       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1180           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1181             procStatus[CurrentProc]==Fetching)) 
1182         procStatus[CurrentProc] = Idle;
1183       */
1184 #elif defined(PAR)
1185       IF_DEBUG(scheduler,
1186                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1187                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1188       IF_PAR_DEBUG(bq,
1189
1190                    if (t->block_info.closure!=(StgClosure*)NULL) 
1191                      print_bq(t->block_info.closure));
1192
1193       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1194       blockThread(t);
1195
1196       /* whatever we schedule next, we must log that schedule */
1197       emitSchedule = rtsTrue;
1198
1199 #else /* !GRAN */
1200       /* don't need to do anything.  Either the thread is blocked on
1201        * I/O, in which case we'll have called addToBlockedQueue
1202        * previously, or it's blocked on an MVar or Blackhole, in which
1203        * case it'll be on the relevant queue already.
1204        */
1205       IF_DEBUG(scheduler,
1206                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1207                        t->id, whatNext_strs[t->what_next]);
1208                printThreadBlockage(t);
1209                fprintf(stderr, "\n"));
1210       fflush(stderr);
1211
1212       /* Only for dumping event to log file 
1213          ToDo: do I need this in GranSim, too?
1214       blockThread(t);
1215       */
1216 #endif
1217       threadPaused(t);
1218       break;
1219
1220     case ThreadFinished:
1221       /* Need to check whether this was a main thread, and if so, signal
1222        * the task that started it with the return value.  If we have no
1223        * more main threads, we probably need to stop all the tasks until
1224        * we get a new one.
1225        */
1226       /* We also end up here if the thread kills itself with an
1227        * uncaught exception, see Exception.hc.
1228        */
1229       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1230                                t->id, whatNext_strs[t->what_next]));
1231 #if defined(GRAN)
1232       endThread(t, CurrentProc); // clean-up the thread
1233 #elif defined(PAR)
1234       /* For now all are advisory -- HWL */
1235       //if(t->priority==AdvisoryPriority) ??
1236       advisory_thread_count--;
1237       
1238 # ifdef DIST
1239       if(t->dist.priority==RevalPriority)
1240         FinishReval(t);
1241 # endif
1242       
1243       if (RtsFlags.ParFlags.ParStats.Full &&
1244           !RtsFlags.ParFlags.ParStats.Suppressed) 
1245         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1246 #endif
1247
1248       //
1249       // Check whether the thread that just completed was a main
1250       // thread, and if so return with the result.  
1251       //
1252       // There is an assumption here that all thread completion goes
1253       // through this point; we need to make sure that if a thread
1254       // ends up in the ThreadKilled state, that it stays on the run
1255       // queue so it can be dealt with here.
1256       //
1257       if (
1258 #if defined(RTS_SUPPORTS_THREADS)
1259           mainThread != NULL
1260 #else
1261           mainThread->tso == t
1262 #endif
1263           )
1264       {
1265           // We are a bound thread: this must be our thread that just
1266           // completed.
1267           ASSERT(mainThread->tso == t);
1268
1269           if (t->what_next == ThreadComplete) {
1270               if (mainThread->ret) {
1271                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1272                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1273               }
1274               mainThread->stat = Success;
1275           } else {
1276               if (mainThread->ret) {
1277                   *(mainThread->ret) = NULL;
1278               }
1279               if (was_interrupted) {
1280                   mainThread->stat = Interrupted;
1281               } else {
1282                   mainThread->stat = Killed;
1283               }
1284           }
1285 #ifdef DEBUG
1286           removeThreadLabel((StgWord)mainThread->tso->id);
1287 #endif
1288           if (mainThread->prev == NULL) {
1289               main_threads = mainThread->link;
1290           } else {
1291               mainThread->prev->link = mainThread->link;
1292           }
1293           if (mainThread->link != NULL) {
1294               mainThread->link->prev = NULL;
1295           }
1296           releaseCapability(cap);
1297           return;
1298       }
1299
1300 #ifdef RTS_SUPPORTS_THREADS
1301       ASSERT(t->main == NULL);
1302 #else
1303       if (t->main != NULL) {
1304           // Must be a main thread that is not the topmost one.  Leave
1305           // it on the run queue until the stack has unwound to the
1306           // point where we can deal with this.  Leaving it on the run
1307           // queue also ensures that the garbage collector knows about
1308           // this thread and its return value (it gets dropped from the
1309           // all_threads list so there's no other way to find it).
1310           APPEND_TO_RUN_QUEUE(t);
1311       }
1312 #endif
1313       break;
1314
1315     default:
1316       barf("schedule: invalid thread return code %d", (int)ret);
1317     }
1318
1319 #ifdef PROFILING
1320     // When we have +RTS -i0 and we're heap profiling, do a census at
1321     // every GC.  This lets us get repeatable runs for debugging.
1322     if (performHeapProfile ||
1323         (RtsFlags.ProfFlags.profileInterval==0 &&
1324          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1325         GarbageCollect(GetRoots, rtsTrue);
1326         heapCensus();
1327         performHeapProfile = rtsFalse;
1328         ready_to_gc = rtsFalse; // we already GC'd
1329     }
1330 #endif
1331
1332     if (ready_to_gc) {
1333       /* everybody back, start the GC.
1334        * Could do it in this thread, or signal a condition var
1335        * to do it in another thread.  Either way, we need to
1336        * broadcast on gc_pending_cond afterward.
1337        */
1338 #if defined(RTS_SUPPORTS_THREADS)
1339       IF_DEBUG(scheduler,sched_belch("doing GC"));
1340 #endif
1341       GarbageCollect(GetRoots,rtsFalse);
1342       ready_to_gc = rtsFalse;
1343 #if defined(GRAN)
1344       /* add a ContinueThread event to continue execution of current thread */
1345       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1346                 ContinueThread,
1347                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1348       IF_GRAN_DEBUG(bq, 
1349                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1350                G_EVENTQ(0);
1351                G_CURR_THREADQ(0));
1352 #endif /* GRAN */
1353     }
1354
1355 #if defined(GRAN)
1356   next_thread:
1357     IF_GRAN_DEBUG(unused,
1358                   print_eventq(EventHd));
1359
1360     event = get_next_event();
1361 #elif defined(PAR)
1362   next_thread:
1363     /* ToDo: wait for next message to arrive rather than busy wait */
1364 #endif /* GRAN */
1365
1366   } /* end of while(1) */
1367
1368   IF_PAR_DEBUG(verbose,
1369                belch("== Leaving schedule() after having received Finish"));
1370 }
1371
1372 /* ---------------------------------------------------------------------------
1373  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1374  * used by Control.Concurrent for error checking.
1375  * ------------------------------------------------------------------------- */
1376  
1377 StgBool
1378 rtsSupportsBoundThreads(void)
1379 {
1380 #ifdef THREADED_RTS
1381   return rtsTrue;
1382 #else
1383   return rtsFalse;
1384 #endif
1385 }
1386
1387 /* ---------------------------------------------------------------------------
1388  * isThreadBound(tso): check whether tso is bound to an OS thread.
1389  * ------------------------------------------------------------------------- */
1390  
1391 StgBool
1392 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1393 {
1394 #ifdef THREADED_RTS
1395   return (tso->main != NULL);
1396 #endif
1397   return rtsFalse;
1398 }
1399
1400 /* ---------------------------------------------------------------------------
1401  * Singleton fork(). Do not copy any running threads.
1402  * ------------------------------------------------------------------------- */
1403
1404 #ifndef mingw32_TARGET_OS
1405 #define FORKPROCESS_PRIMOP_SUPPORTED
1406 #endif
1407
1408 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1409 static void 
1410 deleteThreadImmediately(StgTSO *tso);
1411 #endif
1412 StgInt
1413 forkProcess(HsStablePtr *entry
1414 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1415             STG_UNUSED
1416 #endif
1417            )
1418 {
1419 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1420   pid_t pid;
1421   StgTSO* t,*next;
1422   StgMainThread *m;
1423   SchedulerStatus rc;
1424
1425   IF_DEBUG(scheduler,sched_belch("forking!"));
1426   rts_lock(); // This not only acquires sched_mutex, it also
1427               // makes sure that no other threads are running
1428
1429   pid = fork();
1430
1431   if (pid) { /* parent */
1432
1433   /* just return the pid */
1434     rts_unlock();
1435     return pid;
1436     
1437   } else { /* child */
1438     
1439     
1440       // delete all threads
1441     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1442     
1443     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1444       next = t->link;
1445
1446         // don't allow threads to catch the ThreadKilled exception
1447       deleteThreadImmediately(t);
1448     }
1449     
1450       // wipe the main thread list
1451     while((m = main_threads) != NULL) {
1452       main_threads = m->link;
1453 # ifdef THREADED_RTS
1454       closeCondition(&m->bound_thread_cond);
1455 # endif
1456       stgFree(m);
1457     }
1458     
1459 # ifdef RTS_SUPPORTS_THREADS
1460     resetTaskManagerAfterFork();      // tell startTask() and friends that
1461     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1462     resetWorkerWakeupPipeAfterFork();
1463 # endif
1464     
1465     rc = rts_evalStableIO(entry, NULL);  // run the action
1466     rts_checkSchedStatus("forkProcess",rc);
1467     
1468     rts_unlock();
1469     
1470     hs_exit();                      // clean up and exit
1471     stg_exit(0);
1472   }
1473 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1474   barf("forkProcess#: primop not supported, sorry!\n");
1475   return -1;
1476 #endif
1477 }
1478
1479 /* ---------------------------------------------------------------------------
1480  * deleteAllThreads():  kill all the live threads.
1481  *
1482  * This is used when we catch a user interrupt (^C), before performing
1483  * any necessary cleanups and running finalizers.
1484  *
1485  * Locks: sched_mutex held.
1486  * ------------------------------------------------------------------------- */
1487    
1488 void
1489 deleteAllThreads ( void )
1490 {
1491   StgTSO* t, *next;
1492   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1493   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1494       next = t->global_link;
1495       deleteThread(t);
1496   }      
1497
1498   // The run queue now contains a bunch of ThreadKilled threads.  We
1499   // must not throw these away: the main thread(s) will be in there
1500   // somewhere, and the main scheduler loop has to deal with it.
1501   // Also, the run queue is the only thing keeping these threads from
1502   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1503
1504   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1505   ASSERT(sleeping_queue == END_TSO_QUEUE);
1506 }
1507
1508 /* startThread and  insertThread are now in GranSim.c -- HWL */
1509
1510
1511 /* ---------------------------------------------------------------------------
1512  * Suspending & resuming Haskell threads.
1513  * 
1514  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1515  * its capability before calling the C function.  This allows another
1516  * task to pick up the capability and carry on running Haskell
1517  * threads.  It also means that if the C call blocks, it won't lock
1518  * the whole system.
1519  *
1520  * The Haskell thread making the C call is put to sleep for the
1521  * duration of the call, on the susepended_ccalling_threads queue.  We
1522  * give out a token to the task, which it can use to resume the thread
1523  * on return from the C function.
1524  * ------------------------------------------------------------------------- */
1525    
1526 StgInt
1527 suspendThread( StgRegTable *reg )
1528 {
1529   nat tok;
1530   Capability *cap;
1531   int saved_errno = errno;
1532
1533   /* assume that *reg is a pointer to the StgRegTable part
1534    * of a Capability.
1535    */
1536   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1537
1538   ACQUIRE_LOCK(&sched_mutex);
1539
1540   IF_DEBUG(scheduler,
1541            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1542
1543   // XXX this might not be necessary --SDM
1544   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1545
1546   threadPaused(cap->r.rCurrentTSO);
1547   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1548   suspended_ccalling_threads = cap->r.rCurrentTSO;
1549
1550   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1551       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1552       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1553   } else {
1554       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1555   }
1556
1557   /* Use the thread ID as the token; it should be unique */
1558   tok = cap->r.rCurrentTSO->id;
1559
1560   /* Hand back capability */
1561   releaseCapability(cap);
1562   
1563 #if defined(RTS_SUPPORTS_THREADS)
1564   /* Preparing to leave the RTS, so ensure there's a native thread/task
1565      waiting to take over.
1566   */
1567   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1568 #endif
1569
1570   /* Other threads _might_ be available for execution; signal this */
1571   THREAD_RUNNABLE();
1572   RELEASE_LOCK(&sched_mutex);
1573   
1574   errno = saved_errno;
1575   return tok; 
1576 }
1577
1578 StgRegTable *
1579 resumeThread( StgInt tok )
1580 {
1581   StgTSO *tso, **prev;
1582   Capability *cap;
1583   int saved_errno = errno;
1584
1585 #if defined(RTS_SUPPORTS_THREADS)
1586   /* Wait for permission to re-enter the RTS with the result. */
1587   ACQUIRE_LOCK(&sched_mutex);
1588   waitForReturnCapability(&sched_mutex, &cap);
1589
1590   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1591 #else
1592   grabCapability(&cap);
1593 #endif
1594
1595   /* Remove the thread off of the suspended list */
1596   prev = &suspended_ccalling_threads;
1597   for (tso = suspended_ccalling_threads; 
1598        tso != END_TSO_QUEUE; 
1599        prev = &tso->link, tso = tso->link) {
1600     if (tso->id == (StgThreadID)tok) {
1601       *prev = tso->link;
1602       break;
1603     }
1604   }
1605   if (tso == END_TSO_QUEUE) {
1606     barf("resumeThread: thread not found");
1607   }
1608   tso->link = END_TSO_QUEUE;
1609   
1610   if(tso->why_blocked == BlockedOnCCall) {
1611       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1612       tso->blocked_exceptions = NULL;
1613   }
1614   
1615   /* Reset blocking status */
1616   tso->why_blocked  = NotBlocked;
1617
1618   cap->r.rCurrentTSO = tso;
1619   RELEASE_LOCK(&sched_mutex);
1620   errno = saved_errno;
1621   return &cap->r;
1622 }
1623
1624
1625 /* ---------------------------------------------------------------------------
1626  * Static functions
1627  * ------------------------------------------------------------------------ */
1628 static void unblockThread(StgTSO *tso);
1629
1630 /* ---------------------------------------------------------------------------
1631  * Comparing Thread ids.
1632  *
1633  * This is used from STG land in the implementation of the
1634  * instances of Eq/Ord for ThreadIds.
1635  * ------------------------------------------------------------------------ */
1636
1637 int
1638 cmp_thread(StgPtr tso1, StgPtr tso2) 
1639
1640   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1641   StgThreadID id2 = ((StgTSO *)tso2)->id;
1642  
1643   if (id1 < id2) return (-1);
1644   if (id1 > id2) return 1;
1645   return 0;
1646 }
1647
1648 /* ---------------------------------------------------------------------------
1649  * Fetching the ThreadID from an StgTSO.
1650  *
1651  * This is used in the implementation of Show for ThreadIds.
1652  * ------------------------------------------------------------------------ */
1653 int
1654 rts_getThreadId(StgPtr tso) 
1655 {
1656   return ((StgTSO *)tso)->id;
1657 }
1658
1659 #ifdef DEBUG
1660 void
1661 labelThread(StgPtr tso, char *label)
1662 {
1663   int len;
1664   void *buf;
1665
1666   /* Caveat: Once set, you can only set the thread name to "" */
1667   len = strlen(label)+1;
1668   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1669   strncpy(buf,label,len);
1670   /* Update will free the old memory for us */
1671   updateThreadLabel(((StgTSO *)tso)->id,buf);
1672 }
1673 #endif /* DEBUG */
1674
1675 /* ---------------------------------------------------------------------------
1676    Create a new thread.
1677
1678    The new thread starts with the given stack size.  Before the
1679    scheduler can run, however, this thread needs to have a closure
1680    (and possibly some arguments) pushed on its stack.  See
1681    pushClosure() in Schedule.h.
1682
1683    createGenThread() and createIOThread() (in SchedAPI.h) are
1684    convenient packaged versions of this function.
1685
1686    currently pri (priority) is only used in a GRAN setup -- HWL
1687    ------------------------------------------------------------------------ */
1688 #if defined(GRAN)
1689 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1690 StgTSO *
1691 createThread(nat size, StgInt pri)
1692 #else
1693 StgTSO *
1694 createThread(nat size)
1695 #endif
1696 {
1697
1698     StgTSO *tso;
1699     nat stack_size;
1700
1701     /* First check whether we should create a thread at all */
1702 #if defined(PAR)
1703   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1704   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1705     threadsIgnored++;
1706     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1707           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1708     return END_TSO_QUEUE;
1709   }
1710   threadsCreated++;
1711 #endif
1712
1713 #if defined(GRAN)
1714   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1715 #endif
1716
1717   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1718
1719   /* catch ridiculously small stack sizes */
1720   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1721     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1722   }
1723
1724   stack_size = size - TSO_STRUCT_SIZEW;
1725
1726   tso = (StgTSO *)allocate(size);
1727   TICK_ALLOC_TSO(stack_size, 0);
1728
1729   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1730 #if defined(GRAN)
1731   SET_GRAN_HDR(tso, ThisPE);
1732 #endif
1733
1734   // Always start with the compiled code evaluator
1735   tso->what_next = ThreadRunGHC;
1736
1737   tso->id = next_thread_id++; 
1738   tso->why_blocked  = NotBlocked;
1739   tso->blocked_exceptions = NULL;
1740
1741   tso->saved_errno = 0;
1742   tso->main = NULL;
1743   
1744   tso->stack_size   = stack_size;
1745   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1746                               - TSO_STRUCT_SIZEW;
1747   tso->sp           = (P_)&(tso->stack) + stack_size;
1748
1749 #ifdef PROFILING
1750   tso->prof.CCCS = CCS_MAIN;
1751 #endif
1752
1753   /* put a stop frame on the stack */
1754   tso->sp -= sizeofW(StgStopFrame);
1755   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1756   tso->link = END_TSO_QUEUE;
1757
1758   // ToDo: check this
1759 #if defined(GRAN)
1760   /* uses more flexible routine in GranSim */
1761   insertThread(tso, CurrentProc);
1762 #else
1763   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1764    * from its creation
1765    */
1766 #endif
1767
1768 #if defined(GRAN) 
1769   if (RtsFlags.GranFlags.GranSimStats.Full) 
1770     DumpGranEvent(GR_START,tso);
1771 #elif defined(PAR)
1772   if (RtsFlags.ParFlags.ParStats.Full) 
1773     DumpGranEvent(GR_STARTQ,tso);
1774   /* HACk to avoid SCHEDULE 
1775      LastTSO = tso; */
1776 #endif
1777
1778   /* Link the new thread on the global thread list.
1779    */
1780   tso->global_link = all_threads;
1781   all_threads = tso;
1782
1783 #if defined(DIST)
1784   tso->dist.priority = MandatoryPriority; //by default that is...
1785 #endif
1786
1787 #if defined(GRAN)
1788   tso->gran.pri = pri;
1789 # if defined(DEBUG)
1790   tso->gran.magic = TSO_MAGIC; // debugging only
1791 # endif
1792   tso->gran.sparkname   = 0;
1793   tso->gran.startedat   = CURRENT_TIME; 
1794   tso->gran.exported    = 0;
1795   tso->gran.basicblocks = 0;
1796   tso->gran.allocs      = 0;
1797   tso->gran.exectime    = 0;
1798   tso->gran.fetchtime   = 0;
1799   tso->gran.fetchcount  = 0;
1800   tso->gran.blocktime   = 0;
1801   tso->gran.blockcount  = 0;
1802   tso->gran.blockedat   = 0;
1803   tso->gran.globalsparks = 0;
1804   tso->gran.localsparks  = 0;
1805   if (RtsFlags.GranFlags.Light)
1806     tso->gran.clock  = Now; /* local clock */
1807   else
1808     tso->gran.clock  = 0;
1809
1810   IF_DEBUG(gran,printTSO(tso));
1811 #elif defined(PAR)
1812 # if defined(DEBUG)
1813   tso->par.magic = TSO_MAGIC; // debugging only
1814 # endif
1815   tso->par.sparkname   = 0;
1816   tso->par.startedat   = CURRENT_TIME; 
1817   tso->par.exported    = 0;
1818   tso->par.basicblocks = 0;
1819   tso->par.allocs      = 0;
1820   tso->par.exectime    = 0;
1821   tso->par.fetchtime   = 0;
1822   tso->par.fetchcount  = 0;
1823   tso->par.blocktime   = 0;
1824   tso->par.blockcount  = 0;
1825   tso->par.blockedat   = 0;
1826   tso->par.globalsparks = 0;
1827   tso->par.localsparks  = 0;
1828 #endif
1829
1830 #if defined(GRAN)
1831   globalGranStats.tot_threads_created++;
1832   globalGranStats.threads_created_on_PE[CurrentProc]++;
1833   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1834   globalGranStats.tot_sq_probes++;
1835 #elif defined(PAR)
1836   // collect parallel global statistics (currently done together with GC stats)
1837   if (RtsFlags.ParFlags.ParStats.Global &&
1838       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1839     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1840     globalParStats.tot_threads_created++;
1841   }
1842 #endif 
1843
1844 #if defined(GRAN)
1845   IF_GRAN_DEBUG(pri,
1846                 belch("==__ schedule: Created TSO %d (%p);",
1847                       CurrentProc, tso, tso->id));
1848 #elif defined(PAR)
1849     IF_PAR_DEBUG(verbose,
1850                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1851                        tso->id, tso, advisory_thread_count));
1852 #else
1853   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1854                                  tso->id, tso->stack_size));
1855 #endif    
1856   return tso;
1857 }
1858
1859 #if defined(PAR)
1860 /* RFP:
1861    all parallel thread creation calls should fall through the following routine.
1862 */
1863 StgTSO *
1864 createSparkThread(rtsSpark spark) 
1865 { StgTSO *tso;
1866   ASSERT(spark != (rtsSpark)NULL);
1867   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1868   { threadsIgnored++;
1869     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1870           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1871     return END_TSO_QUEUE;
1872   }
1873   else
1874   { threadsCreated++;
1875     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1876     if (tso==END_TSO_QUEUE)     
1877       barf("createSparkThread: Cannot create TSO");
1878 #if defined(DIST)
1879     tso->priority = AdvisoryPriority;
1880 #endif
1881     pushClosure(tso,spark);
1882     PUSH_ON_RUN_QUEUE(tso);
1883     advisory_thread_count++;    
1884   }
1885   return tso;
1886 }
1887 #endif
1888
1889 /*
1890   Turn a spark into a thread.
1891   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1892 */
1893 #if defined(PAR)
1894 StgTSO *
1895 activateSpark (rtsSpark spark) 
1896 {
1897   StgTSO *tso;
1898
1899   tso = createSparkThread(spark);
1900   if (RtsFlags.ParFlags.ParStats.Full) {   
1901     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1902     IF_PAR_DEBUG(verbose,
1903                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1904                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1905   }
1906   // ToDo: fwd info on local/global spark to thread -- HWL
1907   // tso->gran.exported =  spark->exported;
1908   // tso->gran.locked =   !spark->global;
1909   // tso->gran.sparkname = spark->name;
1910
1911   return tso;
1912 }
1913 #endif
1914
1915 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1916                                    Capability *initialCapability
1917                                    );
1918
1919
1920 /* ---------------------------------------------------------------------------
1921  * scheduleThread()
1922  *
1923  * scheduleThread puts a thread on the head of the runnable queue.
1924  * This will usually be done immediately after a thread is created.
1925  * The caller of scheduleThread must create the thread using e.g.
1926  * createThread and push an appropriate closure
1927  * on this thread's stack before the scheduler is invoked.
1928  * ------------------------------------------------------------------------ */
1929
1930 static void scheduleThread_ (StgTSO* tso);
1931
1932 void
1933 scheduleThread_(StgTSO *tso)
1934 {
1935   // Precondition: sched_mutex must be held.
1936   // The thread goes at the *end* of the run-queue, to avoid possible
1937   // starvation of any threads already on the queue.
1938   APPEND_TO_RUN_QUEUE(tso);
1939   THREAD_RUNNABLE();
1940 }
1941
1942 void
1943 scheduleThread(StgTSO* tso)
1944 {
1945   ACQUIRE_LOCK(&sched_mutex);
1946   scheduleThread_(tso);
1947   RELEASE_LOCK(&sched_mutex);
1948 }
1949
1950 #if defined(RTS_SUPPORTS_THREADS)
1951 static Condition bound_cond_cache;
1952 static int bound_cond_cache_full = 0;
1953 #endif
1954
1955
1956 SchedulerStatus
1957 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1958                    Capability *initialCapability)
1959 {
1960     // Precondition: sched_mutex must be held
1961     StgMainThread *m;
1962
1963     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1964     m->tso = tso;
1965     tso->main = m;
1966     m->ret = ret;
1967     m->stat = NoStatus;
1968     m->link = main_threads;
1969     m->prev = NULL;
1970     if (main_threads != NULL) {
1971         main_threads->prev = m;
1972     }
1973     main_threads = m;
1974
1975 #if defined(RTS_SUPPORTS_THREADS)
1976     // Allocating a new condition for each thread is expensive, so we
1977     // cache one.  This is a pretty feeble hack, but it helps speed up
1978     // consecutive call-ins quite a bit.
1979     if (bound_cond_cache_full) {
1980         m->bound_thread_cond = bound_cond_cache;
1981         bound_cond_cache_full = 0;
1982     } else {
1983         initCondition(&m->bound_thread_cond);
1984     }
1985 #endif
1986
1987     /* Put the thread on the main-threads list prior to scheduling the TSO.
1988        Failure to do so introduces a race condition in the MT case (as
1989        identified by Wolfgang Thaller), whereby the new task/OS thread 
1990        created by scheduleThread_() would complete prior to the thread
1991        that spawned it managed to put 'itself' on the main-threads list.
1992        The upshot of it all being that the worker thread wouldn't get to
1993        signal the completion of the its work item for the main thread to
1994        see (==> it got stuck waiting.)    -- sof 6/02.
1995     */
1996     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1997     
1998     APPEND_TO_RUN_QUEUE(tso);
1999     // NB. Don't call THREAD_RUNNABLE() here, because the thread is
2000     // bound and only runnable by *this* OS thread, so waking up other
2001     // workers will just slow things down.
2002
2003     return waitThread_(m, initialCapability);
2004 }
2005
2006 /* ---------------------------------------------------------------------------
2007  * initScheduler()
2008  *
2009  * Initialise the scheduler.  This resets all the queues - if the
2010  * queues contained any threads, they'll be garbage collected at the
2011  * next pass.
2012  *
2013  * ------------------------------------------------------------------------ */
2014
2015 void 
2016 initScheduler(void)
2017 {
2018 #if defined(GRAN)
2019   nat i;
2020
2021   for (i=0; i<=MAX_PROC; i++) {
2022     run_queue_hds[i]      = END_TSO_QUEUE;
2023     run_queue_tls[i]      = END_TSO_QUEUE;
2024     blocked_queue_hds[i]  = END_TSO_QUEUE;
2025     blocked_queue_tls[i]  = END_TSO_QUEUE;
2026     ccalling_threadss[i]  = END_TSO_QUEUE;
2027     sleeping_queue        = END_TSO_QUEUE;
2028   }
2029 #else
2030   run_queue_hd      = END_TSO_QUEUE;
2031   run_queue_tl      = END_TSO_QUEUE;
2032   blocked_queue_hd  = END_TSO_QUEUE;
2033   blocked_queue_tl  = END_TSO_QUEUE;
2034   sleeping_queue    = END_TSO_QUEUE;
2035 #endif 
2036
2037   suspended_ccalling_threads  = END_TSO_QUEUE;
2038
2039   main_threads = NULL;
2040   all_threads  = END_TSO_QUEUE;
2041
2042   context_switch = 0;
2043   interrupted    = 0;
2044
2045   RtsFlags.ConcFlags.ctxtSwitchTicks =
2046       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2047       
2048 #if defined(RTS_SUPPORTS_THREADS)
2049   /* Initialise the mutex and condition variables used by
2050    * the scheduler. */
2051   initMutex(&sched_mutex);
2052   initMutex(&term_mutex);
2053 #endif
2054   
2055   ACQUIRE_LOCK(&sched_mutex);
2056
2057   /* A capability holds the state a native thread needs in
2058    * order to execute STG code. At least one capability is
2059    * floating around (only SMP builds have more than one).
2060    */
2061   initCapabilities();
2062   
2063 #if defined(RTS_SUPPORTS_THREADS)
2064     /* start our haskell execution tasks */
2065     startTaskManager(0,taskStart);
2066 #endif
2067
2068 #if /* defined(SMP) ||*/ defined(PAR)
2069   initSparkPools();
2070 #endif
2071
2072   RELEASE_LOCK(&sched_mutex);
2073 }
2074
2075 void
2076 exitScheduler( void )
2077 {
2078 #if defined(RTS_SUPPORTS_THREADS)
2079   stopTaskManager();
2080 #endif
2081   shutting_down_scheduler = rtsTrue;
2082 }
2083
2084 /* ----------------------------------------------------------------------------
2085    Managing the per-task allocation areas.
2086    
2087    Each capability comes with an allocation area.  These are
2088    fixed-length block lists into which allocation can be done.
2089
2090    ToDo: no support for two-space collection at the moment???
2091    ------------------------------------------------------------------------- */
2092
2093 static
2094 SchedulerStatus
2095 waitThread_(StgMainThread* m, Capability *initialCapability)
2096 {
2097   SchedulerStatus stat;
2098
2099   // Precondition: sched_mutex must be held.
2100   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2101
2102 #if defined(GRAN)
2103   /* GranSim specific init */
2104   CurrentTSO = m->tso;                // the TSO to run
2105   procStatus[MainProc] = Busy;        // status of main PE
2106   CurrentProc = MainProc;             // PE to run it on
2107   schedule(m,initialCapability);
2108 #else
2109   schedule(m,initialCapability);
2110   ASSERT(m->stat != NoStatus);
2111 #endif
2112
2113   stat = m->stat;
2114
2115 #if defined(RTS_SUPPORTS_THREADS)
2116   // Free the condition variable, returning it to the cache if possible.
2117   if (!bound_cond_cache_full) {
2118       bound_cond_cache = m->bound_thread_cond;
2119       bound_cond_cache_full = 1;
2120   } else {
2121       closeCondition(&m->bound_thread_cond);
2122   }
2123 #endif
2124
2125   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2126   stgFree(m);
2127
2128   // Postcondition: sched_mutex still held
2129   return stat;
2130 }
2131
2132 /* ---------------------------------------------------------------------------
2133    Where are the roots that we know about?
2134
2135         - all the threads on the runnable queue
2136         - all the threads on the blocked queue
2137         - all the threads on the sleeping queue
2138         - all the thread currently executing a _ccall_GC
2139         - all the "main threads"
2140      
2141    ------------------------------------------------------------------------ */
2142
2143 /* This has to be protected either by the scheduler monitor, or by the
2144         garbage collection monitor (probably the latter).
2145         KH @ 25/10/99
2146 */
2147
2148 void
2149 GetRoots( evac_fn evac )
2150 {
2151 #if defined(GRAN)
2152   {
2153     nat i;
2154     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2155       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2156           evac((StgClosure **)&run_queue_hds[i]);
2157       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2158           evac((StgClosure **)&run_queue_tls[i]);
2159       
2160       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2161           evac((StgClosure **)&blocked_queue_hds[i]);
2162       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2163           evac((StgClosure **)&blocked_queue_tls[i]);
2164       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2165           evac((StgClosure **)&ccalling_threads[i]);
2166     }
2167   }
2168
2169   markEventQueue();
2170
2171 #else /* !GRAN */
2172   if (run_queue_hd != END_TSO_QUEUE) {
2173       ASSERT(run_queue_tl != END_TSO_QUEUE);
2174       evac((StgClosure **)&run_queue_hd);
2175       evac((StgClosure **)&run_queue_tl);
2176   }
2177   
2178   if (blocked_queue_hd != END_TSO_QUEUE) {
2179       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2180       evac((StgClosure **)&blocked_queue_hd);
2181       evac((StgClosure **)&blocked_queue_tl);
2182   }
2183   
2184   if (sleeping_queue != END_TSO_QUEUE) {
2185       evac((StgClosure **)&sleeping_queue);
2186   }
2187 #endif 
2188
2189   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2190       evac((StgClosure **)&suspended_ccalling_threads);
2191   }
2192
2193 #if defined(PAR) || defined(GRAN)
2194   markSparkQueue(evac);
2195 #endif
2196
2197 #if defined(RTS_USER_SIGNALS)
2198   // mark the signal handlers (signals should be already blocked)
2199   markSignalHandlers(evac);
2200 #endif
2201 }
2202
2203 /* -----------------------------------------------------------------------------
2204    performGC
2205
2206    This is the interface to the garbage collector from Haskell land.
2207    We provide this so that external C code can allocate and garbage
2208    collect when called from Haskell via _ccall_GC.
2209
2210    It might be useful to provide an interface whereby the programmer
2211    can specify more roots (ToDo).
2212    
2213    This needs to be protected by the GC condition variable above.  KH.
2214    -------------------------------------------------------------------------- */
2215
2216 static void (*extra_roots)(evac_fn);
2217
2218 void
2219 performGC(void)
2220 {
2221   /* Obligated to hold this lock upon entry */
2222   ACQUIRE_LOCK(&sched_mutex);
2223   GarbageCollect(GetRoots,rtsFalse);
2224   RELEASE_LOCK(&sched_mutex);
2225 }
2226
2227 void
2228 performMajorGC(void)
2229 {
2230   ACQUIRE_LOCK(&sched_mutex);
2231   GarbageCollect(GetRoots,rtsTrue);
2232   RELEASE_LOCK(&sched_mutex);
2233 }
2234
2235 static void
2236 AllRoots(evac_fn evac)
2237 {
2238     GetRoots(evac);             // the scheduler's roots
2239     extra_roots(evac);          // the user's roots
2240 }
2241
2242 void
2243 performGCWithRoots(void (*get_roots)(evac_fn))
2244 {
2245   ACQUIRE_LOCK(&sched_mutex);
2246   extra_roots = get_roots;
2247   GarbageCollect(AllRoots,rtsFalse);
2248   RELEASE_LOCK(&sched_mutex);
2249 }
2250
2251 /* -----------------------------------------------------------------------------
2252    Stack overflow
2253
2254    If the thread has reached its maximum stack size, then raise the
2255    StackOverflow exception in the offending thread.  Otherwise
2256    relocate the TSO into a larger chunk of memory and adjust its stack
2257    size appropriately.
2258    -------------------------------------------------------------------------- */
2259
2260 static StgTSO *
2261 threadStackOverflow(StgTSO *tso)
2262 {
2263   nat new_stack_size, new_tso_size, stack_words;
2264   StgPtr new_sp;
2265   StgTSO *dest;
2266
2267   IF_DEBUG(sanity,checkTSO(tso));
2268   if (tso->stack_size >= tso->max_stack_size) {
2269
2270     IF_DEBUG(gc,
2271              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2272                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2273              /* If we're debugging, just print out the top of the stack */
2274              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2275                                               tso->sp+64)));
2276
2277     /* Send this thread the StackOverflow exception */
2278     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2279     return tso;
2280   }
2281
2282   /* Try to double the current stack size.  If that takes us over the
2283    * maximum stack size for this thread, then use the maximum instead.
2284    * Finally round up so the TSO ends up as a whole number of blocks.
2285    */
2286   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2287   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2288                                        TSO_STRUCT_SIZE)/sizeof(W_);
2289   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2290   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2291
2292   IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2293
2294   dest = (StgTSO *)allocate(new_tso_size);
2295   TICK_ALLOC_TSO(new_stack_size,0);
2296
2297   /* copy the TSO block and the old stack into the new area */
2298   memcpy(dest,tso,TSO_STRUCT_SIZE);
2299   stack_words = tso->stack + tso->stack_size - tso->sp;
2300   new_sp = (P_)dest + new_tso_size - stack_words;
2301   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2302
2303   /* relocate the stack pointers... */
2304   dest->sp         = new_sp;
2305   dest->stack_size = new_stack_size;
2306         
2307   /* Mark the old TSO as relocated.  We have to check for relocated
2308    * TSOs in the garbage collector and any primops that deal with TSOs.
2309    *
2310    * It's important to set the sp value to just beyond the end
2311    * of the stack, so we don't attempt to scavenge any part of the
2312    * dead TSO's stack.
2313    */
2314   tso->what_next = ThreadRelocated;
2315   tso->link = dest;
2316   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2317   tso->why_blocked = NotBlocked;
2318   dest->mut_link = NULL;
2319
2320   IF_PAR_DEBUG(verbose,
2321                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2322                      tso->id, tso, tso->stack_size);
2323                /* If we're debugging, just print out the top of the stack */
2324                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2325                                                 tso->sp+64)));
2326   
2327   IF_DEBUG(sanity,checkTSO(tso));
2328 #if 0
2329   IF_DEBUG(scheduler,printTSO(dest));
2330 #endif
2331
2332   return dest;
2333 }
2334
2335 /* ---------------------------------------------------------------------------
2336    Wake up a queue that was blocked on some resource.
2337    ------------------------------------------------------------------------ */
2338
2339 #if defined(GRAN)
2340 STATIC_INLINE void
2341 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2342 {
2343 }
2344 #elif defined(PAR)
2345 STATIC_INLINE void
2346 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2347 {
2348   /* write RESUME events to log file and
2349      update blocked and fetch time (depending on type of the orig closure) */
2350   if (RtsFlags.ParFlags.ParStats.Full) {
2351     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2352                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2353                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2354     if (EMPTY_RUN_QUEUE())
2355       emitSchedule = rtsTrue;
2356
2357     switch (get_itbl(node)->type) {
2358         case FETCH_ME_BQ:
2359           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2360           break;
2361         case RBH:
2362         case FETCH_ME:
2363         case BLACKHOLE_BQ:
2364           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2365           break;
2366 #ifdef DIST
2367         case MVAR:
2368           break;
2369 #endif    
2370         default:
2371           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2372         }
2373       }
2374 }
2375 #endif
2376
2377 #if defined(GRAN)
2378 static StgBlockingQueueElement *
2379 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2380 {
2381     StgTSO *tso;
2382     PEs node_loc, tso_loc;
2383
2384     node_loc = where_is(node); // should be lifted out of loop
2385     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2386     tso_loc = where_is((StgClosure *)tso);
2387     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2388       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2389       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2390       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2391       // insertThread(tso, node_loc);
2392       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2393                 ResumeThread,
2394                 tso, node, (rtsSpark*)NULL);
2395       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2396       // len_local++;
2397       // len++;
2398     } else { // TSO is remote (actually should be FMBQ)
2399       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2400                                   RtsFlags.GranFlags.Costs.gunblocktime +
2401                                   RtsFlags.GranFlags.Costs.latency;
2402       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2403                 UnblockThread,
2404                 tso, node, (rtsSpark*)NULL);
2405       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2406       // len++;
2407     }
2408     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2409     IF_GRAN_DEBUG(bq,
2410                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2411                           (node_loc==tso_loc ? "Local" : "Global"), 
2412                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2413     tso->block_info.closure = NULL;
2414     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2415                              tso->id, tso));
2416 }
2417 #elif defined(PAR)
2418 static StgBlockingQueueElement *
2419 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2420 {
2421     StgBlockingQueueElement *next;
2422
2423     switch (get_itbl(bqe)->type) {
2424     case TSO:
2425       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2426       /* if it's a TSO just push it onto the run_queue */
2427       next = bqe->link;
2428       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2429       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2430       THREAD_RUNNABLE();
2431       unblockCount(bqe, node);
2432       /* reset blocking status after dumping event */
2433       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2434       break;
2435
2436     case BLOCKED_FETCH:
2437       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2438       next = bqe->link;
2439       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2440       PendingFetches = (StgBlockedFetch *)bqe;
2441       break;
2442
2443 # if defined(DEBUG)
2444       /* can ignore this case in a non-debugging setup; 
2445          see comments on RBHSave closures above */
2446     case CONSTR:
2447       /* check that the closure is an RBHSave closure */
2448       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2449              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2450              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2451       break;
2452
2453     default:
2454       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2455            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2456            (StgClosure *)bqe);
2457 # endif
2458     }
2459   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2460   return next;
2461 }
2462
2463 #else /* !GRAN && !PAR */
2464 static StgTSO *
2465 unblockOneLocked(StgTSO *tso)
2466 {
2467   StgTSO *next;
2468
2469   ASSERT(get_itbl(tso)->type == TSO);
2470   ASSERT(tso->why_blocked != NotBlocked);
2471   tso->why_blocked = NotBlocked;
2472   next = tso->link;
2473   APPEND_TO_RUN_QUEUE(tso);
2474   THREAD_RUNNABLE();
2475   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2476   return next;
2477 }
2478 #endif
2479
2480 #if defined(GRAN) || defined(PAR)
2481 INLINE_ME StgBlockingQueueElement *
2482 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2483 {
2484   ACQUIRE_LOCK(&sched_mutex);
2485   bqe = unblockOneLocked(bqe, node);
2486   RELEASE_LOCK(&sched_mutex);
2487   return bqe;
2488 }
2489 #else
2490 INLINE_ME StgTSO *
2491 unblockOne(StgTSO *tso)
2492 {
2493   ACQUIRE_LOCK(&sched_mutex);
2494   tso = unblockOneLocked(tso);
2495   RELEASE_LOCK(&sched_mutex);
2496   return tso;
2497 }
2498 #endif
2499
2500 #if defined(GRAN)
2501 void 
2502 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2503 {
2504   StgBlockingQueueElement *bqe;
2505   PEs node_loc;
2506   nat len = 0; 
2507
2508   IF_GRAN_DEBUG(bq, 
2509                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2510                       node, CurrentProc, CurrentTime[CurrentProc], 
2511                       CurrentTSO->id, CurrentTSO));
2512
2513   node_loc = where_is(node);
2514
2515   ASSERT(q == END_BQ_QUEUE ||
2516          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2517          get_itbl(q)->type == CONSTR); // closure (type constructor)
2518   ASSERT(is_unique(node));
2519
2520   /* FAKE FETCH: magically copy the node to the tso's proc;
2521      no Fetch necessary because in reality the node should not have been 
2522      moved to the other PE in the first place
2523   */
2524   if (CurrentProc!=node_loc) {
2525     IF_GRAN_DEBUG(bq, 
2526                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2527                         node, node_loc, CurrentProc, CurrentTSO->id, 
2528                         // CurrentTSO, where_is(CurrentTSO),
2529                         node->header.gran.procs));
2530     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2531     IF_GRAN_DEBUG(bq, 
2532                   belch("## new bitmask of node %p is %#x",
2533                         node, node->header.gran.procs));
2534     if (RtsFlags.GranFlags.GranSimStats.Global) {
2535       globalGranStats.tot_fake_fetches++;
2536     }
2537   }
2538
2539   bqe = q;
2540   // ToDo: check: ASSERT(CurrentProc==node_loc);
2541   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2542     //next = bqe->link;
2543     /* 
2544        bqe points to the current element in the queue
2545        next points to the next element in the queue
2546     */
2547     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2548     //tso_loc = where_is(tso);
2549     len++;
2550     bqe = unblockOneLocked(bqe, node);
2551   }
2552
2553   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2554      the closure to make room for the anchor of the BQ */
2555   if (bqe!=END_BQ_QUEUE) {
2556     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2557     /*
2558     ASSERT((info_ptr==&RBH_Save_0_info) ||
2559            (info_ptr==&RBH_Save_1_info) ||
2560            (info_ptr==&RBH_Save_2_info));
2561     */
2562     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2563     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2564     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2565
2566     IF_GRAN_DEBUG(bq,
2567                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2568                         node, info_type(node)));
2569   }
2570
2571   /* statistics gathering */
2572   if (RtsFlags.GranFlags.GranSimStats.Global) {
2573     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2574     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2575     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2576     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2577   }
2578   IF_GRAN_DEBUG(bq,
2579                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2580                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2581 }
2582 #elif defined(PAR)
2583 void 
2584 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2585 {
2586   StgBlockingQueueElement *bqe;
2587
2588   ACQUIRE_LOCK(&sched_mutex);
2589
2590   IF_PAR_DEBUG(verbose, 
2591                belch("##-_ AwBQ for node %p on [%x]: ",
2592                      node, mytid));
2593 #ifdef DIST  
2594   //RFP
2595   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2596     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2597     return;
2598   }
2599 #endif
2600   
2601   ASSERT(q == END_BQ_QUEUE ||
2602          get_itbl(q)->type == TSO ||           
2603          get_itbl(q)->type == BLOCKED_FETCH || 
2604          get_itbl(q)->type == CONSTR); 
2605
2606   bqe = q;
2607   while (get_itbl(bqe)->type==TSO || 
2608          get_itbl(bqe)->type==BLOCKED_FETCH) {
2609     bqe = unblockOneLocked(bqe, node);
2610   }
2611   RELEASE_LOCK(&sched_mutex);
2612 }
2613
2614 #else   /* !GRAN && !PAR */
2615
2616 void
2617 awakenBlockedQueueNoLock(StgTSO *tso)
2618 {
2619   while (tso != END_TSO_QUEUE) {
2620     tso = unblockOneLocked(tso);
2621   }
2622 }
2623
2624 void
2625 awakenBlockedQueue(StgTSO *tso)
2626 {
2627   ACQUIRE_LOCK(&sched_mutex);
2628   while (tso != END_TSO_QUEUE) {
2629     tso = unblockOneLocked(tso);
2630   }
2631   RELEASE_LOCK(&sched_mutex);
2632 }
2633 #endif
2634
2635 /* ---------------------------------------------------------------------------
2636    Interrupt execution
2637    - usually called inside a signal handler so it mustn't do anything fancy.   
2638    ------------------------------------------------------------------------ */
2639
2640 void
2641 interruptStgRts(void)
2642 {
2643     interrupted    = 1;
2644     context_switch = 1;
2645 #ifdef RTS_SUPPORTS_THREADS
2646     wakeBlockedWorkerThread();
2647 #endif
2648 }
2649
2650 /* -----------------------------------------------------------------------------
2651    Unblock a thread
2652
2653    This is for use when we raise an exception in another thread, which
2654    may be blocked.
2655    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2656    -------------------------------------------------------------------------- */
2657
2658 #if defined(GRAN) || defined(PAR)
2659 /*
2660   NB: only the type of the blocking queue is different in GranSim and GUM
2661       the operations on the queue-elements are the same
2662       long live polymorphism!
2663
2664   Locks: sched_mutex is held upon entry and exit.
2665
2666 */
2667 static void
2668 unblockThread(StgTSO *tso)
2669 {
2670   StgBlockingQueueElement *t, **last;
2671
2672   switch (tso->why_blocked) {
2673
2674   case NotBlocked:
2675     return;  /* not blocked */
2676
2677   case BlockedOnMVar:
2678     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2679     {
2680       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2681       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2682
2683       last = (StgBlockingQueueElement **)&mvar->head;
2684       for (t = (StgBlockingQueueElement *)mvar->head; 
2685            t != END_BQ_QUEUE; 
2686            last = &t->link, last_tso = t, t = t->link) {
2687         if (t == (StgBlockingQueueElement *)tso) {
2688           *last = (StgBlockingQueueElement *)tso->link;
2689           if (mvar->tail == tso) {
2690             mvar->tail = (StgTSO *)last_tso;
2691           }
2692           goto done;
2693         }
2694       }
2695       barf("unblockThread (MVAR): TSO not found");
2696     }
2697
2698   case BlockedOnBlackHole:
2699     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2700     {
2701       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2702
2703       last = &bq->blocking_queue;
2704       for (t = bq->blocking_queue; 
2705            t != END_BQ_QUEUE; 
2706            last = &t->link, t = t->link) {
2707         if (t == (StgBlockingQueueElement *)tso) {
2708           *last = (StgBlockingQueueElement *)tso->link;
2709           goto done;
2710         }
2711       }
2712       barf("unblockThread (BLACKHOLE): TSO not found");
2713     }
2714
2715   case BlockedOnException:
2716     {
2717       StgTSO *target  = tso->block_info.tso;
2718
2719       ASSERT(get_itbl(target)->type == TSO);
2720
2721       if (target->what_next == ThreadRelocated) {
2722           target = target->link;
2723           ASSERT(get_itbl(target)->type == TSO);
2724       }
2725
2726       ASSERT(target->blocked_exceptions != NULL);
2727
2728       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2729       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2730            t != END_BQ_QUEUE; 
2731            last = &t->link, t = t->link) {
2732         ASSERT(get_itbl(t)->type == TSO);
2733         if (t == (StgBlockingQueueElement *)tso) {
2734           *last = (StgBlockingQueueElement *)tso->link;
2735           goto done;
2736         }
2737       }
2738       barf("unblockThread (Exception): TSO not found");
2739     }
2740
2741   case BlockedOnRead:
2742   case BlockedOnWrite:
2743 #if defined(mingw32_TARGET_OS)
2744   case BlockedOnDoProc:
2745 #endif
2746     {
2747       /* take TSO off blocked_queue */
2748       StgBlockingQueueElement *prev = NULL;
2749       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2750            prev = t, t = t->link) {
2751         if (t == (StgBlockingQueueElement *)tso) {
2752           if (prev == NULL) {
2753             blocked_queue_hd = (StgTSO *)t->link;
2754             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2755               blocked_queue_tl = END_TSO_QUEUE;
2756             }
2757           } else {
2758             prev->link = t->link;
2759             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2760               blocked_queue_tl = (StgTSO *)prev;
2761             }
2762           }
2763           goto done;
2764         }
2765       }
2766       barf("unblockThread (I/O): TSO not found");
2767     }
2768
2769   case BlockedOnDelay:
2770     {
2771       /* take TSO off sleeping_queue */
2772       StgBlockingQueueElement *prev = NULL;
2773       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2774            prev = t, t = t->link) {
2775         if (t == (StgBlockingQueueElement *)tso) {
2776           if (prev == NULL) {
2777             sleeping_queue = (StgTSO *)t->link;
2778           } else {
2779             prev->link = t->link;
2780           }
2781           goto done;
2782         }
2783       }
2784       barf("unblockThread (delay): TSO not found");
2785     }
2786
2787   default:
2788     barf("unblockThread");
2789   }
2790
2791  done:
2792   tso->link = END_TSO_QUEUE;
2793   tso->why_blocked = NotBlocked;
2794   tso->block_info.closure = NULL;
2795   PUSH_ON_RUN_QUEUE(tso);
2796 }
2797 #else
2798 static void
2799 unblockThread(StgTSO *tso)
2800 {
2801   StgTSO *t, **last;
2802   
2803   /* To avoid locking unnecessarily. */
2804   if (tso->why_blocked == NotBlocked) {
2805     return;
2806   }
2807
2808   switch (tso->why_blocked) {
2809
2810   case BlockedOnMVar:
2811     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2812     {
2813       StgTSO *last_tso = END_TSO_QUEUE;
2814       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2815
2816       last = &mvar->head;
2817       for (t = mvar->head; t != END_TSO_QUEUE; 
2818            last = &t->link, last_tso = t, t = t->link) {
2819         if (t == tso) {
2820           *last = tso->link;
2821           if (mvar->tail == tso) {
2822             mvar->tail = last_tso;
2823           }
2824           goto done;
2825         }
2826       }
2827       barf("unblockThread (MVAR): TSO not found");
2828     }
2829
2830   case BlockedOnBlackHole:
2831     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2832     {
2833       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2834
2835       last = &bq->blocking_queue;
2836       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2837            last = &t->link, t = t->link) {
2838         if (t == tso) {
2839           *last = tso->link;
2840           goto done;
2841         }
2842       }
2843       barf("unblockThread (BLACKHOLE): TSO not found");
2844     }
2845
2846   case BlockedOnException:
2847     {
2848       StgTSO *target  = tso->block_info.tso;
2849
2850       ASSERT(get_itbl(target)->type == TSO);
2851
2852       while (target->what_next == ThreadRelocated) {
2853           target = target->link;
2854           ASSERT(get_itbl(target)->type == TSO);
2855       }
2856       
2857       ASSERT(target->blocked_exceptions != NULL);
2858
2859       last = &target->blocked_exceptions;
2860       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2861            last = &t->link, t = t->link) {
2862         ASSERT(get_itbl(t)->type == TSO);
2863         if (t == tso) {
2864           *last = tso->link;
2865           goto done;
2866         }
2867       }
2868       barf("unblockThread (Exception): TSO not found");
2869     }
2870
2871   case BlockedOnRead:
2872   case BlockedOnWrite:
2873 #if defined(mingw32_TARGET_OS)
2874   case BlockedOnDoProc:
2875 #endif
2876     {
2877       StgTSO *prev = NULL;
2878       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2879            prev = t, t = t->link) {
2880         if (t == tso) {
2881           if (prev == NULL) {
2882             blocked_queue_hd = t->link;
2883             if (blocked_queue_tl == t) {
2884               blocked_queue_tl = END_TSO_QUEUE;
2885             }
2886           } else {
2887             prev->link = t->link;
2888             if (blocked_queue_tl == t) {
2889               blocked_queue_tl = prev;
2890             }
2891           }
2892           goto done;
2893         }
2894       }
2895       barf("unblockThread (I/O): TSO not found");
2896     }
2897
2898   case BlockedOnDelay:
2899     {
2900       StgTSO *prev = NULL;
2901       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2902            prev = t, t = t->link) {
2903         if (t == tso) {
2904           if (prev == NULL) {
2905             sleeping_queue = t->link;
2906           } else {
2907             prev->link = t->link;
2908           }
2909           goto done;
2910         }
2911       }
2912       barf("unblockThread (delay): TSO not found");
2913     }
2914
2915   default:
2916     barf("unblockThread");
2917   }
2918
2919  done:
2920   tso->link = END_TSO_QUEUE;
2921   tso->why_blocked = NotBlocked;
2922   tso->block_info.closure = NULL;
2923   APPEND_TO_RUN_QUEUE(tso);
2924 }
2925 #endif
2926
2927 /* -----------------------------------------------------------------------------
2928  * raiseAsync()
2929  *
2930  * The following function implements the magic for raising an
2931  * asynchronous exception in an existing thread.
2932  *
2933  * We first remove the thread from any queue on which it might be
2934  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2935  *
2936  * We strip the stack down to the innermost CATCH_FRAME, building
2937  * thunks in the heap for all the active computations, so they can 
2938  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2939  * an application of the handler to the exception, and push it on
2940  * the top of the stack.
2941  * 
2942  * How exactly do we save all the active computations?  We create an
2943  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2944  * AP_STACKs pushes everything from the corresponding update frame
2945  * upwards onto the stack.  (Actually, it pushes everything up to the
2946  * next update frame plus a pointer to the next AP_STACK object.
2947  * Entering the next AP_STACK object pushes more onto the stack until we
2948  * reach the last AP_STACK object - at which point the stack should look
2949  * exactly as it did when we killed the TSO and we can continue
2950  * execution by entering the closure on top of the stack.
2951  *
2952  * We can also kill a thread entirely - this happens if either (a) the 
2953  * exception passed to raiseAsync is NULL, or (b) there's no
2954  * CATCH_FRAME on the stack.  In either case, we strip the entire
2955  * stack and replace the thread with a zombie.
2956  *
2957  * Locks: sched_mutex held upon entry nor exit.
2958  *
2959  * -------------------------------------------------------------------------- */
2960  
2961 void 
2962 deleteThread(StgTSO *tso)
2963 {
2964   raiseAsync(tso,NULL);
2965 }
2966
2967 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2968 static void 
2969 deleteThreadImmediately(StgTSO *tso)
2970 { // for forkProcess only:
2971   // delete thread without giving it a chance to catch the KillThread exception
2972
2973   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2974       return;
2975   }
2976
2977   if (tso->why_blocked != BlockedOnCCall &&
2978       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2979     unblockThread(tso);
2980   }
2981
2982   tso->what_next = ThreadKilled;
2983 }
2984 #endif
2985
2986 void
2987 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2988 {
2989   /* When raising async exs from contexts where sched_mutex isn't held;
2990      use raiseAsyncWithLock(). */
2991   ACQUIRE_LOCK(&sched_mutex);
2992   raiseAsync(tso,exception);
2993   RELEASE_LOCK(&sched_mutex);
2994 }
2995
2996 void
2997 raiseAsync(StgTSO *tso, StgClosure *exception)
2998 {
2999     StgRetInfoTable *info;
3000     StgPtr sp;
3001   
3002     // Thread already dead?
3003     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3004         return;
3005     }
3006
3007     IF_DEBUG(scheduler, 
3008              sched_belch("raising exception in thread %ld.", tso->id));
3009     
3010     // Remove it from any blocking queues
3011     unblockThread(tso);
3012
3013     sp = tso->sp;
3014     
3015     // The stack freezing code assumes there's a closure pointer on
3016     // the top of the stack, so we have to arrange that this is the case...
3017     //
3018     if (sp[0] == (W_)&stg_enter_info) {
3019         sp++;
3020     } else {
3021         sp--;
3022         sp[0] = (W_)&stg_dummy_ret_closure;
3023     }
3024
3025     while (1) {
3026         nat i;
3027
3028         // 1. Let the top of the stack be the "current closure"
3029         //
3030         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3031         // CATCH_FRAME.
3032         //
3033         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3034         // current closure applied to the chunk of stack up to (but not
3035         // including) the update frame.  This closure becomes the "current
3036         // closure".  Go back to step 2.
3037         //
3038         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3039         // top of the stack applied to the exception.
3040         // 
3041         // 5. If it's a STOP_FRAME, then kill the thread.
3042         
3043         StgPtr frame;
3044         
3045         frame = sp + 1;
3046         info = get_ret_itbl((StgClosure *)frame);
3047         
3048         while (info->i.type != UPDATE_FRAME
3049                && (info->i.type != CATCH_FRAME || exception == NULL)
3050                && info->i.type != STOP_FRAME) {
3051             frame += stack_frame_sizeW((StgClosure *)frame);
3052             info = get_ret_itbl((StgClosure *)frame);
3053         }
3054         
3055         switch (info->i.type) {
3056             
3057         case CATCH_FRAME:
3058             // If we find a CATCH_FRAME, and we've got an exception to raise,
3059             // then build the THUNK raise(exception), and leave it on
3060             // top of the CATCH_FRAME ready to enter.
3061             //
3062         {
3063 #ifdef PROFILING
3064             StgCatchFrame *cf = (StgCatchFrame *)frame;
3065 #endif
3066             StgClosure *raise;
3067             
3068             // we've got an exception to raise, so let's pass it to the
3069             // handler in this frame.
3070             //
3071             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3072             TICK_ALLOC_SE_THK(1,0);
3073             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3074             raise->payload[0] = exception;
3075             
3076             // throw away the stack from Sp up to the CATCH_FRAME.
3077             //
3078             sp = frame - 1;
3079             
3080             /* Ensure that async excpetions are blocked now, so we don't get
3081              * a surprise exception before we get around to executing the
3082              * handler.
3083              */
3084             if (tso->blocked_exceptions == NULL) {
3085                 tso->blocked_exceptions = END_TSO_QUEUE;
3086             }
3087             
3088             /* Put the newly-built THUNK on top of the stack, ready to execute
3089              * when the thread restarts.
3090              */
3091             sp[0] = (W_)raise;
3092             sp[-1] = (W_)&stg_enter_info;
3093             tso->sp = sp-1;
3094             tso->what_next = ThreadRunGHC;
3095             IF_DEBUG(sanity, checkTSO(tso));
3096             return;
3097         }
3098         
3099         case UPDATE_FRAME:
3100         {
3101             StgAP_STACK * ap;
3102             nat words;
3103             
3104             // First build an AP_STACK consisting of the stack chunk above the
3105             // current update frame, with the top word on the stack as the
3106             // fun field.
3107             //
3108             words = frame - sp - 1;
3109             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3110             
3111             ap->size = words;
3112             ap->fun  = (StgClosure *)sp[0];
3113             sp++;
3114             for(i=0; i < (nat)words; ++i) {
3115                 ap->payload[i] = (StgClosure *)*sp++;
3116             }
3117             
3118             SET_HDR(ap,&stg_AP_STACK_info,
3119                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3120             TICK_ALLOC_UP_THK(words+1,0);
3121             
3122             IF_DEBUG(scheduler,
3123                      fprintf(stderr,  "sched: Updating ");
3124                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3125                      fprintf(stderr,  " with ");
3126                      printObj((StgClosure *)ap);
3127                 );
3128
3129             // Replace the updatee with an indirection - happily
3130             // this will also wake up any threads currently
3131             // waiting on the result.
3132             //
3133             // Warning: if we're in a loop, more than one update frame on
3134             // the stack may point to the same object.  Be careful not to
3135             // overwrite an IND_OLDGEN in this case, because we'll screw
3136             // up the mutable lists.  To be on the safe side, don't
3137             // overwrite any kind of indirection at all.  See also
3138             // threadSqueezeStack in GC.c, where we have to make a similar
3139             // check.
3140             //
3141             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3142                 // revert the black hole
3143                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3144                                (StgClosure *)ap);
3145             }
3146             sp += sizeofW(StgUpdateFrame) - 1;
3147             sp[0] = (W_)ap; // push onto stack
3148             break;
3149         }
3150         
3151         case STOP_FRAME:
3152             // We've stripped the entire stack, the thread is now dead.
3153             sp += sizeofW(StgStopFrame);
3154             tso->what_next = ThreadKilled;
3155             tso->sp = sp;
3156             return;
3157             
3158         default:
3159             barf("raiseAsync");
3160         }
3161     }
3162     barf("raiseAsync");
3163 }
3164
3165 /* -----------------------------------------------------------------------------
3166    raiseExceptionHelper
3167    
3168    This function is called by the raise# primitve, just so that we can
3169    move some of the tricky bits of raising an exception from C-- into
3170    C.  Who knows, it might be a useful re-useable thing here too.
3171    -------------------------------------------------------------------------- */
3172
3173 StgWord
3174 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3175 {
3176     StgClosure *raise_closure = NULL;
3177     StgPtr p, next;
3178     StgRetInfoTable *info;
3179     //
3180     // This closure represents the expression 'raise# E' where E
3181     // is the exception raise.  It is used to overwrite all the
3182     // thunks which are currently under evaluataion.
3183     //
3184
3185     //    
3186     // LDV profiling: stg_raise_info has THUNK as its closure
3187     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3188     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3189     // 1 does not cause any problem unless profiling is performed.
3190     // However, when LDV profiling goes on, we need to linearly scan
3191     // small object pool, where raise_closure is stored, so we should
3192     // use MIN_UPD_SIZE.
3193     //
3194     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3195     //                                 sizeofW(StgClosure)+1);
3196     //
3197
3198     //
3199     // Walk up the stack, looking for the catch frame.  On the way,
3200     // we update any closures pointed to from update frames with the
3201     // raise closure that we just built.
3202     //
3203     p = tso->sp;
3204     while(1) {
3205         info = get_ret_itbl((StgClosure *)p);
3206         next = p + stack_frame_sizeW((StgClosure *)p);
3207         switch (info->i.type) {
3208             
3209         case UPDATE_FRAME:
3210             // Only create raise_closure if we need to.
3211             if (raise_closure == NULL) {
3212                 raise_closure = 
3213                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3214                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3215                 raise_closure->payload[0] = exception;
3216             }
3217             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3218             p = next;
3219             continue;
3220             
3221         case CATCH_FRAME:
3222             tso->sp = p;
3223             return CATCH_FRAME;
3224             
3225         case STOP_FRAME:
3226             tso->sp = p;
3227             return STOP_FRAME;
3228
3229         default:
3230             p = next; 
3231             continue;
3232         }
3233     }
3234 }
3235
3236 /* -----------------------------------------------------------------------------
3237    resurrectThreads is called after garbage collection on the list of
3238    threads found to be garbage.  Each of these threads will be woken
3239    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3240    on an MVar, or NonTermination if the thread was blocked on a Black
3241    Hole.
3242
3243    Locks: sched_mutex isn't held upon entry nor exit.
3244    -------------------------------------------------------------------------- */
3245
3246 void
3247 resurrectThreads( StgTSO *threads )
3248 {
3249   StgTSO *tso, *next;
3250
3251   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3252     next = tso->global_link;
3253     tso->global_link = all_threads;
3254     all_threads = tso;
3255     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3256
3257     switch (tso->why_blocked) {
3258     case BlockedOnMVar:
3259     case BlockedOnException:
3260       /* Called by GC - sched_mutex lock is currently held. */
3261       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3262       break;
3263     case BlockedOnBlackHole:
3264       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3265       break;
3266     case NotBlocked:
3267       /* This might happen if the thread was blocked on a black hole
3268        * belonging to a thread that we've just woken up (raiseAsync
3269        * can wake up threads, remember...).
3270        */
3271       continue;
3272     default:
3273       barf("resurrectThreads: thread blocked in a strange way");
3274     }
3275   }
3276 }
3277
3278 /* -----------------------------------------------------------------------------
3279  * Blackhole detection: if we reach a deadlock, test whether any
3280  * threads are blocked on themselves.  Any threads which are found to
3281  * be self-blocked get sent a NonTermination exception.
3282  *
3283  * This is only done in a deadlock situation in order to avoid
3284  * performance overhead in the normal case.
3285  *
3286  * Locks: sched_mutex is held upon entry and exit.
3287  * -------------------------------------------------------------------------- */
3288
3289 static void
3290 detectBlackHoles( void )
3291 {
3292     StgTSO *tso = all_threads;
3293     StgClosure *frame;
3294     StgClosure *blocked_on;
3295     StgRetInfoTable *info;
3296
3297     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3298
3299         while (tso->what_next == ThreadRelocated) {
3300             tso = tso->link;
3301             ASSERT(get_itbl(tso)->type == TSO);
3302         }
3303       
3304         if (tso->why_blocked != BlockedOnBlackHole) {
3305             continue;
3306         }
3307         blocked_on = tso->block_info.closure;
3308
3309         frame = (StgClosure *)tso->sp;
3310
3311         while(1) {
3312             info = get_ret_itbl(frame);
3313             switch (info->i.type) {
3314             case UPDATE_FRAME:
3315                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3316                     /* We are blocking on one of our own computations, so
3317                      * send this thread the NonTermination exception.  
3318                      */
3319                     IF_DEBUG(scheduler, 
3320                              sched_belch("thread %d is blocked on itself", tso->id));
3321                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3322                     goto done;
3323                 }
3324                 
3325                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3326                 continue;
3327
3328             case STOP_FRAME:
3329                 goto done;
3330
3331                 // normal stack frames; do nothing except advance the pointer
3332             default:
3333                 (StgPtr)frame += stack_frame_sizeW(frame);
3334             }
3335         }   
3336         done: ;
3337     }
3338 }
3339
3340 /* ----------------------------------------------------------------------------
3341  * Debugging: why is a thread blocked
3342  * [Also provides useful information when debugging threaded programs
3343  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3344    ------------------------------------------------------------------------- */
3345
3346 static
3347 void
3348 printThreadBlockage(StgTSO *tso)
3349 {
3350   switch (tso->why_blocked) {
3351   case BlockedOnRead:
3352     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3353     break;
3354   case BlockedOnWrite:
3355     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3356     break;
3357 #if defined(mingw32_TARGET_OS)
3358     case BlockedOnDoProc:
3359     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3360     break;
3361 #endif
3362   case BlockedOnDelay:
3363     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3364     break;
3365   case BlockedOnMVar:
3366     fprintf(stderr,"is blocked on an MVar");
3367     break;
3368   case BlockedOnException:
3369     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3370             tso->block_info.tso->id);
3371     break;
3372   case BlockedOnBlackHole:
3373     fprintf(stderr,"is blocked on a black hole");
3374     break;
3375   case NotBlocked:
3376     fprintf(stderr,"is not blocked");
3377     break;
3378 #if defined(PAR)
3379   case BlockedOnGA:
3380     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3381             tso->block_info.closure, info_type(tso->block_info.closure));
3382     break;
3383   case BlockedOnGA_NoSend:
3384     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3385             tso->block_info.closure, info_type(tso->block_info.closure));
3386     break;
3387 #endif
3388   case BlockedOnCCall:
3389     fprintf(stderr,"is blocked on an external call");
3390     break;
3391   case BlockedOnCCall_NoUnblockExc:
3392     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3393     break;
3394   default:
3395     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3396          tso->why_blocked, tso->id, tso);
3397   }
3398 }
3399
3400 static
3401 void
3402 printThreadStatus(StgTSO *tso)
3403 {
3404   switch (tso->what_next) {
3405   case ThreadKilled:
3406     fprintf(stderr,"has been killed");
3407     break;
3408   case ThreadComplete:
3409     fprintf(stderr,"has completed");
3410     break;
3411   default:
3412     printThreadBlockage(tso);
3413   }
3414 }
3415
3416 void
3417 printAllThreads(void)
3418 {
3419   StgTSO *t;
3420   void *label;
3421
3422 # if defined(GRAN)
3423   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3424   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3425                        time_string, rtsFalse/*no commas!*/);
3426
3427   fprintf(stderr, "all threads at [%s]:\n", time_string);
3428 # elif defined(PAR)
3429   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3430   ullong_format_string(CURRENT_TIME,
3431                        time_string, rtsFalse/*no commas!*/);
3432
3433   fprintf(stderr,"all threads at [%s]:\n", time_string);
3434 # else
3435   fprintf(stderr,"all threads:\n");
3436 # endif
3437
3438   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3439     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3440     label = lookupThreadLabel(t->id);
3441     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3442     printThreadStatus(t);
3443     fprintf(stderr,"\n");
3444   }
3445 }
3446     
3447 #ifdef DEBUG
3448
3449 /* 
3450    Print a whole blocking queue attached to node (debugging only).
3451 */
3452 # if defined(PAR)
3453 void 
3454 print_bq (StgClosure *node)
3455 {
3456   StgBlockingQueueElement *bqe;
3457   StgTSO *tso;
3458   rtsBool end;
3459
3460   fprintf(stderr,"## BQ of closure %p (%s): ",
3461           node, info_type(node));
3462
3463   /* should cover all closures that may have a blocking queue */
3464   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3465          get_itbl(node)->type == FETCH_ME_BQ ||
3466          get_itbl(node)->type == RBH ||
3467          get_itbl(node)->type == MVAR);
3468     
3469   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3470
3471   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3472 }
3473
3474 /* 
3475    Print a whole blocking queue starting with the element bqe.
3476 */
3477 void 
3478 print_bqe (StgBlockingQueueElement *bqe)
3479 {
3480   rtsBool end;
3481
3482   /* 
3483      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3484   */
3485   for (end = (bqe==END_BQ_QUEUE);
3486        !end; // iterate until bqe points to a CONSTR
3487        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3488        bqe = end ? END_BQ_QUEUE : bqe->link) {
3489     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3490     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3491     /* types of closures that may appear in a blocking queue */
3492     ASSERT(get_itbl(bqe)->type == TSO ||           
3493            get_itbl(bqe)->type == BLOCKED_FETCH || 
3494            get_itbl(bqe)->type == CONSTR); 
3495     /* only BQs of an RBH end with an RBH_Save closure */
3496     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3497
3498     switch (get_itbl(bqe)->type) {
3499     case TSO:
3500       fprintf(stderr," TSO %u (%x),",
3501               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3502       break;
3503     case BLOCKED_FETCH:
3504       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3505               ((StgBlockedFetch *)bqe)->node, 
3506               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3507               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3508               ((StgBlockedFetch *)bqe)->ga.weight);
3509       break;
3510     case CONSTR:
3511       fprintf(stderr," %s (IP %p),",
3512               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3513                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3514                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3515                "RBH_Save_?"), get_itbl(bqe));
3516       break;
3517     default:
3518       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3519            info_type((StgClosure *)bqe)); // , node, info_type(node));
3520       break;
3521     }
3522   } /* for */
3523   fputc('\n', stderr);
3524 }
3525 # elif defined(GRAN)
3526 void 
3527 print_bq (StgClosure *node)
3528 {
3529   StgBlockingQueueElement *bqe;
3530   PEs node_loc, tso_loc;
3531   rtsBool end;
3532
3533   /* should cover all closures that may have a blocking queue */
3534   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3535          get_itbl(node)->type == FETCH_ME_BQ ||
3536          get_itbl(node)->type == RBH);
3537     
3538   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3539   node_loc = where_is(node);
3540
3541   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3542           node, info_type(node), node_loc);
3543
3544   /* 
3545      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3546   */
3547   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3548        !end; // iterate until bqe points to a CONSTR
3549        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3550     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3551     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3552     /* types of closures that may appear in a blocking queue */
3553     ASSERT(get_itbl(bqe)->type == TSO ||           
3554            get_itbl(bqe)->type == CONSTR); 
3555     /* only BQs of an RBH end with an RBH_Save closure */
3556     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3557
3558     tso_loc = where_is((StgClosure *)bqe);
3559     switch (get_itbl(bqe)->type) {
3560     case TSO:
3561       fprintf(stderr," TSO %d (%p) on [PE %d],",
3562               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3563       break;
3564     case CONSTR:
3565       fprintf(stderr," %s (IP %p),",
3566               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3567                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3568                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3569                "RBH_Save_?"), get_itbl(bqe));
3570       break;
3571     default:
3572       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3573            info_type((StgClosure *)bqe), node, info_type(node));
3574       break;
3575     }
3576   } /* for */
3577   fputc('\n', stderr);
3578 }
3579 #else
3580 /* 
3581    Nice and easy: only TSOs on the blocking queue
3582 */
3583 void 
3584 print_bq (StgClosure *node)
3585 {
3586   StgTSO *tso;
3587
3588   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3589   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3590        tso != END_TSO_QUEUE; 
3591        tso=tso->link) {
3592     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3593     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3594     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3595   }
3596   fputc('\n', stderr);
3597 }
3598 # endif
3599
3600 #if defined(PAR)
3601 static nat
3602 run_queue_len(void)
3603 {
3604   nat i;
3605   StgTSO *tso;
3606
3607   for (i=0, tso=run_queue_hd; 
3608        tso != END_TSO_QUEUE;
3609        i++, tso=tso->link)
3610     /* nothing */
3611
3612   return i;
3613 }
3614 #endif
3615
3616 void
3617 sched_belch(char *s, ...)
3618 {
3619   va_list ap;
3620   va_start(ap,s);
3621 #ifdef RTS_SUPPORTS_THREADS
3622   fprintf(stderr, "sched (task %p): ", osThreadId());
3623 #elif defined(PAR)
3624   fprintf(stderr, "== ");
3625 #else
3626   fprintf(stderr, "sched: ");
3627 #endif
3628   vfprintf(stderr, s, ap);
3629   fprintf(stderr, "\n");
3630   fflush(stderr);
3631   va_end(ap);
3632 }
3633
3634 #endif /* DEBUG */