[project @ 2005-02-03 11:21:03 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "Hooks.h"
48 #define COMPILING_SCHEDULER
49 #include "Schedule.h"
50 #include "StgMiscClosures.h"
51 #include "Storage.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PAR)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include "OSThreads.h"
80 #include  "Task.h"
81
82 #ifdef HAVE_SYS_TYPES_H
83 #include <sys/types.h>
84 #endif
85 #ifdef HAVE_UNISTD_H
86 #include <unistd.h>
87 #endif
88
89 #include <string.h>
90 #include <stdlib.h>
91 #include <stdarg.h>
92
93 #ifdef HAVE_ERRNO_H
94 #include <errno.h>
95 #endif
96
97 #ifdef THREADED_RTS
98 #define USED_IN_THREADED_RTS
99 #else
100 #define USED_IN_THREADED_RTS STG_UNUSED
101 #endif
102
103 #ifdef RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS
105 #else
106 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
107 #endif
108
109 /* Main thread queue.
110  * Locks required: sched_mutex.
111  */
112 StgMainThread *main_threads = NULL;
113
114 /* Thread queues.
115  * Locks required: sched_mutex.
116  */
117 #if defined(GRAN)
118
119 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
120 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
121
122 /* 
123    In GranSim we have a runnable and a blocked queue for each processor.
124    In order to minimise code changes new arrays run_queue_hds/tls
125    are created. run_queue_hd is then a short cut (macro) for
126    run_queue_hds[CurrentProc] (see GranSim.h).
127    -- HWL
128 */
129 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
130 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
131 StgTSO *ccalling_threadss[MAX_PROC];
132 /* We use the same global list of threads (all_threads) in GranSim as in
133    the std RTS (i.e. we are cheating). However, we don't use this list in
134    the GranSim specific code at the moment (so we are only potentially
135    cheating).  */
136
137 #else /* !GRAN */
138
139 StgTSO *run_queue_hd = NULL;
140 StgTSO *run_queue_tl = NULL;
141 StgTSO *blocked_queue_hd = NULL;
142 StgTSO *blocked_queue_tl = NULL;
143 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
144
145 #endif
146
147 /* Linked list of all threads.
148  * Used for detecting garbage collected threads.
149  */
150 StgTSO *all_threads = NULL;
151
152 /* When a thread performs a safe C call (_ccall_GC, using old
153  * terminology), it gets put on the suspended_ccalling_threads
154  * list. Used by the garbage collector.
155  */
156 static StgTSO *suspended_ccalling_threads;
157
158 static StgTSO *threadStackOverflow(StgTSO *tso);
159
160 /* KH: The following two flags are shared memory locations.  There is no need
161        to lock them, since they are only unset at the end of a scheduler
162        operation.
163 */
164
165 /* flag set by signal handler to precipitate a context switch */
166 int context_switch = 0;
167
168 /* if this flag is set as well, give up execution */
169 rtsBool interrupted = rtsFalse;
170
171 /* If this flag is set, we are running Haskell code.  Used to detect
172  * uses of 'foreign import unsafe' that should be 'safe'.
173  */
174 rtsBool in_haskell = rtsFalse;
175
176 /* Next thread ID to allocate.
177  * Locks required: thread_id_mutex
178  */
179 static StgThreadID next_thread_id = 1;
180
181 /*
182  * Pointers to the state of the current thread.
183  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
185  */
186  
187 /* The smallest stack size that makes any sense is:
188  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
189  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
190  *  + 1                       (the closure to enter)
191  *  + 1                       (stg_ap_v_ret)
192  *  + 1                       (spare slot req'd by stg_ap_v_ret)
193  *
194  * A thread with this stack will bomb immediately with a stack
195  * overflow, which will increase its stack size.  
196  */
197
198 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
199
200
201 #if defined(GRAN)
202 StgTSO *CurrentTSO;
203 #endif
204
205 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
206  *  exists - earlier gccs apparently didn't.
207  *  -= chak
208  */
209 StgTSO dummy_tso;
210
211 static rtsBool ready_to_gc;
212
213 /*
214  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
215  * in an MT setting, needed to signal that a worker thread shouldn't hang around
216  * in the scheduler when it is out of work.
217  */
218 static rtsBool shutting_down_scheduler = rtsFalse;
219
220 void            addToBlockedQueue ( StgTSO *tso );
221
222 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
223        void     interruptStgRts   ( void );
224
225 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
226 static void     detectBlackHoles  ( void );
227 #endif
228
229 static void     raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically);
230
231 #if defined(RTS_SUPPORTS_THREADS)
232 /* ToDo: carefully document the invariants that go together
233  *       with these synchronisation objects.
234  */
235 Mutex     sched_mutex       = INIT_MUTEX_VAR;
236 Mutex     term_mutex        = INIT_MUTEX_VAR;
237
238 #endif /* RTS_SUPPORTS_THREADS */
239
240 #if defined(PAR)
241 StgTSO *LastTSO;
242 rtsTime TimeOfLastYield;
243 rtsBool emitSchedule = rtsTrue;
244 #endif
245
246 #if DEBUG
247 static char *whatNext_strs[] = {
248   "(unknown)",
249   "ThreadRunGHC",
250   "ThreadInterpret",
251   "ThreadKilled",
252   "ThreadRelocated",
253   "ThreadComplete"
254 };
255 #endif
256
257 #if defined(PAR)
258 StgTSO * createSparkThread(rtsSpark spark);
259 StgTSO * activateSpark (rtsSpark spark);  
260 #endif
261
262 /* ----------------------------------------------------------------------------
263  * Starting Tasks
264  * ------------------------------------------------------------------------- */
265
266 #if defined(RTS_SUPPORTS_THREADS)
267 static rtsBool startingWorkerThread = rtsFalse;
268
269 static void taskStart(void);
270 static void
271 taskStart(void)
272 {
273   ACQUIRE_LOCK(&sched_mutex);
274   startingWorkerThread = rtsFalse;
275   schedule(NULL,NULL);
276   RELEASE_LOCK(&sched_mutex);
277 }
278
279 void
280 startSchedulerTaskIfNecessary(void)
281 {
282   if(run_queue_hd != END_TSO_QUEUE
283     || blocked_queue_hd != END_TSO_QUEUE
284     || sleeping_queue != END_TSO_QUEUE)
285   {
286     if(!startingWorkerThread)
287     { // we don't want to start another worker thread
288       // just because the last one hasn't yet reached the
289       // "waiting for capability" state
290       startingWorkerThread = rtsTrue;
291       if(!startTask(taskStart))
292       {
293         startingWorkerThread = rtsFalse;
294       }
295     }
296   }
297 }
298 #endif
299
300 /* ---------------------------------------------------------------------------
301    Main scheduling loop.
302
303    We use round-robin scheduling, each thread returning to the
304    scheduler loop when one of these conditions is detected:
305
306       * out of heap space
307       * timer expires (thread yields)
308       * thread blocks
309       * thread ends
310       * stack overflow
311
312    Locking notes:  we acquire the scheduler lock once at the beginning
313    of the scheduler loop, and release it when
314     
315       * running a thread, or
316       * waiting for work, or
317       * waiting for a GC to complete.
318
319    GRAN version:
320      In a GranSim setup this loop iterates over the global event queue.
321      This revolves around the global event queue, which determines what 
322      to do next. Therefore, it's more complicated than either the 
323      concurrent or the parallel (GUM) setup.
324
325    GUM version:
326      GUM iterates over incoming messages.
327      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
328      and sends out a fish whenever it has nothing to do; in-between
329      doing the actual reductions (shared code below) it processes the
330      incoming messages and deals with delayed operations 
331      (see PendingFetches).
332      This is not the ugliest code you could imagine, but it's bloody close.
333
334    ------------------------------------------------------------------------ */
335 static void
336 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
337           Capability *initialCapability )
338 {
339   StgTSO *t;
340   Capability *cap;
341   StgThreadReturnCode ret;
342 #if defined(GRAN)
343   rtsEvent *event;
344 #elif defined(PAR)
345   StgSparkPool *pool;
346   rtsSpark spark;
347   StgTSO *tso;
348   GlobalTaskId pe;
349   rtsBool receivedFinish = rtsFalse;
350 # if defined(DEBUG)
351   nat tp_size, sp_size; // stats only
352 # endif
353 #endif
354   rtsBool was_interrupted = rtsFalse;
355   nat prev_what_next;
356   
357   // Pre-condition: sched_mutex is held.
358   // We might have a capability, passed in as initialCapability.
359   cap = initialCapability;
360
361 #if defined(RTS_SUPPORTS_THREADS)
362   //
363   // in the threaded case, the capability is either passed in via the
364   // initialCapability parameter, or initialized inside the scheduler
365   // loop 
366   //
367   IF_DEBUG(scheduler,
368            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
369                        mainThread, initialCapability);
370       );
371 #else
372   // simply initialise it in the non-threaded case
373   grabCapability(&cap);
374 #endif
375
376 #if defined(GRAN)
377   /* set up first event to get things going */
378   /* ToDo: assign costs for system setup and init MainTSO ! */
379   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
380             ContinueThread, 
381             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
382
383   IF_DEBUG(gran,
384            debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
385            G_TSO(CurrentTSO, 5));
386
387   if (RtsFlags.GranFlags.Light) {
388     /* Save current time; GranSim Light only */
389     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
390   }      
391
392   event = get_next_event();
393
394   while (event!=(rtsEvent*)NULL) {
395     /* Choose the processor with the next event */
396     CurrentProc = event->proc;
397     CurrentTSO = event->tso;
398
399 #elif defined(PAR)
400
401   while (!receivedFinish) {    /* set by processMessages */
402                                /* when receiving PP_FINISH message         */ 
403
404 #else // everything except GRAN and PAR
405
406   while (1) {
407
408 #endif
409
410      IF_DEBUG(scheduler, printAllThreads());
411
412 #if defined(RTS_SUPPORTS_THREADS)
413       // Yield the capability to higher-priority tasks if necessary.
414       //
415       if (cap != NULL) {
416           yieldCapability(&cap);
417       }
418
419       // If we do not currently hold a capability, we wait for one
420       //
421       if (cap == NULL) {
422           waitForCapability(&sched_mutex, &cap,
423                             mainThread ? &mainThread->bound_thread_cond : NULL);
424       }
425
426       // We now have a capability...
427 #endif
428
429     // Check whether we have re-entered the RTS from Haskell without
430     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
431     // call).
432     if (in_haskell) {
433           errorBelch("schedule: re-entered unsafely.\n"
434                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
435           stg_exit(1);
436     }
437
438     //
439     // If we're interrupted (the user pressed ^C, or some other
440     // termination condition occurred), kill all the currently running
441     // threads.
442     //
443     if (interrupted) {
444         IF_DEBUG(scheduler, sched_belch("interrupted"));
445         interrupted = rtsFalse;
446         was_interrupted = rtsTrue;
447 #if defined(RTS_SUPPORTS_THREADS)
448         // In the threaded RTS, deadlock detection doesn't work,
449         // so just exit right away.
450         errorBelch("interrupted");
451         releaseCapability(cap);
452         RELEASE_LOCK(&sched_mutex);
453         shutdownHaskellAndExit(EXIT_SUCCESS);
454 #else
455         deleteAllThreads();
456 #endif
457     }
458
459 #if defined(RTS_USER_SIGNALS)
460     // check for signals each time around the scheduler
461     if (signals_pending()) {
462       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
463       startSignalHandlers();
464       ACQUIRE_LOCK(&sched_mutex);
465     }
466 #endif
467
468     //
469     // Check whether any waiting threads need to be woken up.  If the
470     // run queue is empty, and there are no other tasks running, we
471     // can wait indefinitely for something to happen.
472     //
473     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
474     {
475 #if defined(RTS_SUPPORTS_THREADS)
476         // We shouldn't be here...
477         barf("schedule: awaitEvent() in threaded RTS");
478 #endif
479         awaitEvent( EMPTY_RUN_QUEUE() );
480     }
481     // we can be interrupted while waiting for I/O...
482     if (interrupted) continue;
483
484     /* 
485      * Detect deadlock: when we have no threads to run, there are no
486      * threads waiting on I/O or sleeping, and all the other tasks are
487      * waiting for work, we must have a deadlock of some description.
488      *
489      * We first try to find threads blocked on themselves (ie. black
490      * holes), and generate NonTermination exceptions where necessary.
491      *
492      * If no threads are black holed, we have a deadlock situation, so
493      * inform all the main threads.
494      */
495 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
496     if (   EMPTY_THREAD_QUEUES() )
497     {
498         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
499
500         // Garbage collection can release some new threads due to
501         // either (a) finalizers or (b) threads resurrected because
502         // they are unreachable and will therefore be sent an
503         // exception.  Any threads thus released will be immediately
504         // runnable.
505         GarbageCollect(GetRoots,rtsTrue);
506         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
507
508 #if defined(RTS_USER_SIGNALS)
509         /* If we have user-installed signal handlers, then wait
510          * for signals to arrive rather then bombing out with a
511          * deadlock.
512          */
513         if ( anyUserHandlers() ) {
514             IF_DEBUG(scheduler, 
515                      sched_belch("still deadlocked, waiting for signals..."));
516
517             awaitUserSignals();
518
519             // we might be interrupted...
520             if (interrupted) { continue; }
521
522             if (signals_pending()) {
523                 RELEASE_LOCK(&sched_mutex);
524                 startSignalHandlers();
525                 ACQUIRE_LOCK(&sched_mutex);
526             }
527             ASSERT(!EMPTY_RUN_QUEUE());
528             goto not_deadlocked;
529         }
530 #endif
531
532         /* Probably a real deadlock.  Send the current main thread the
533          * Deadlock exception (or in the SMP build, send *all* main
534          * threads the deadlock exception, since none of them can make
535          * progress).
536          */
537         {
538             StgMainThread *m;
539             m = main_threads;
540             switch (m->tso->why_blocked) {
541             case BlockedOnBlackHole:
542             case BlockedOnException:
543             case BlockedOnMVar:
544                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
545                 break;
546             default:
547                 barf("deadlock: main thread blocked in a strange way");
548             }
549         }
550     }
551   not_deadlocked:
552
553 #elif defined(RTS_SUPPORTS_THREADS)
554     // ToDo: add deadlock detection in threaded RTS
555 #elif defined(PAR)
556     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
557 #endif
558
559 #if defined(RTS_SUPPORTS_THREADS) || defined(mingw32_HOST_OS)
560     /* win32: might be back here due to awaitEvent() being abandoned
561      * as a result of a console event having been delivered.
562      */
563     if ( EMPTY_RUN_QUEUE() ) {
564         continue; // nothing to do
565     }
566 #endif
567
568 #if defined(GRAN)
569     if (RtsFlags.GranFlags.Light)
570       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
571
572     /* adjust time based on time-stamp */
573     if (event->time > CurrentTime[CurrentProc] &&
574         event->evttype != ContinueThread)
575       CurrentTime[CurrentProc] = event->time;
576     
577     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
578     if (!RtsFlags.GranFlags.Light)
579       handleIdlePEs();
580
581     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
582
583     /* main event dispatcher in GranSim */
584     switch (event->evttype) {
585       /* Should just be continuing execution */
586     case ContinueThread:
587       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
588       /* ToDo: check assertion
589       ASSERT(run_queue_hd != (StgTSO*)NULL &&
590              run_queue_hd != END_TSO_QUEUE);
591       */
592       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
593       if (!RtsFlags.GranFlags.DoAsyncFetch &&
594           procStatus[CurrentProc]==Fetching) {
595         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
596               CurrentTSO->id, CurrentTSO, CurrentProc);
597         goto next_thread;
598       } 
599       /* Ignore ContinueThreads for completed threads */
600       if (CurrentTSO->what_next == ThreadComplete) {
601         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
602               CurrentTSO->id, CurrentTSO, CurrentProc);
603         goto next_thread;
604       } 
605       /* Ignore ContinueThreads for threads that are being migrated */
606       if (PROCS(CurrentTSO)==Nowhere) { 
607         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
608               CurrentTSO->id, CurrentTSO, CurrentProc);
609         goto next_thread;
610       }
611       /* The thread should be at the beginning of the run queue */
612       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
613         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
614               CurrentTSO->id, CurrentTSO, CurrentProc);
615         break; // run the thread anyway
616       }
617       /*
618       new_event(proc, proc, CurrentTime[proc],
619                 FindWork,
620                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
621       goto next_thread; 
622       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
623       break; // now actually run the thread; DaH Qu'vam yImuHbej 
624
625     case FetchNode:
626       do_the_fetchnode(event);
627       goto next_thread;             /* handle next event in event queue  */
628       
629     case GlobalBlock:
630       do_the_globalblock(event);
631       goto next_thread;             /* handle next event in event queue  */
632       
633     case FetchReply:
634       do_the_fetchreply(event);
635       goto next_thread;             /* handle next event in event queue  */
636       
637     case UnblockThread:   /* Move from the blocked queue to the tail of */
638       do_the_unblock(event);
639       goto next_thread;             /* handle next event in event queue  */
640       
641     case ResumeThread:  /* Move from the blocked queue to the tail of */
642       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
643       event->tso->gran.blocktime += 
644         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
645       do_the_startthread(event);
646       goto next_thread;             /* handle next event in event queue  */
647       
648     case StartThread:
649       do_the_startthread(event);
650       goto next_thread;             /* handle next event in event queue  */
651       
652     case MoveThread:
653       do_the_movethread(event);
654       goto next_thread;             /* handle next event in event queue  */
655       
656     case MoveSpark:
657       do_the_movespark(event);
658       goto next_thread;             /* handle next event in event queue  */
659       
660     case FindWork:
661       do_the_findwork(event);
662       goto next_thread;             /* handle next event in event queue  */
663       
664     default:
665       barf("Illegal event type %u\n", event->evttype);
666     }  /* switch */
667     
668     /* This point was scheduler_loop in the old RTS */
669
670     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
671
672     TimeOfLastEvent = CurrentTime[CurrentProc];
673     TimeOfNextEvent = get_time_of_next_event();
674     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
675     // CurrentTSO = ThreadQueueHd;
676
677     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
678                          TimeOfNextEvent));
679
680     if (RtsFlags.GranFlags.Light) 
681       GranSimLight_leave_system(event, &ActiveTSO); 
682
683     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
684
685     IF_DEBUG(gran, 
686              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
687
688     /* in a GranSim setup the TSO stays on the run queue */
689     t = CurrentTSO;
690     /* Take a thread from the run queue. */
691     POP_RUN_QUEUE(t); // take_off_run_queue(t);
692
693     IF_DEBUG(gran, 
694              debugBelch("GRAN: About to run current thread, which is\n");
695              G_TSO(t,5));
696
697     context_switch = 0; // turned on via GranYield, checking events and time slice
698
699     IF_DEBUG(gran, 
700              DumpGranEvent(GR_SCHEDULE, t));
701
702     procStatus[CurrentProc] = Busy;
703
704 #elif defined(PAR)
705     if (PendingFetches != END_BF_QUEUE) {
706         processFetches();
707     }
708
709     /* ToDo: phps merge with spark activation above */
710     /* check whether we have local work and send requests if we have none */
711     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
712       /* :-[  no local threads => look out for local sparks */
713       /* the spark pool for the current PE */
714       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
715       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
716           pool->hd < pool->tl) {
717         /* 
718          * ToDo: add GC code check that we really have enough heap afterwards!!
719          * Old comment:
720          * If we're here (no runnable threads) and we have pending
721          * sparks, we must have a space problem.  Get enough space
722          * to turn one of those pending sparks into a
723          * thread... 
724          */
725
726         spark = findSpark(rtsFalse);                /* get a spark */
727         if (spark != (rtsSpark) NULL) {
728           tso = activateSpark(spark);       /* turn the spark into a thread */
729           IF_PAR_DEBUG(schedule,
730                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
731                              tso->id, tso, advisory_thread_count));
732
733           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
734             debugBelch("==^^ failed to activate spark\n");
735             goto next_thread;
736           }               /* otherwise fall through & pick-up new tso */
737         } else {
738           IF_PAR_DEBUG(verbose,
739                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
740                              spark_queue_len(pool)));
741           goto next_thread;
742         }
743       }
744
745       /* If we still have no work we need to send a FISH to get a spark
746          from another PE 
747       */
748       if (EMPTY_RUN_QUEUE()) {
749       /* =8-[  no local sparks => look for work on other PEs */
750         /*
751          * We really have absolutely no work.  Send out a fish
752          * (there may be some out there already), and wait for
753          * something to arrive.  We clearly can't run any threads
754          * until a SCHEDULE or RESUME arrives, and so that's what
755          * we're hoping to see.  (Of course, we still have to
756          * respond to other types of messages.)
757          */
758         TIME now = msTime() /*CURRENT_TIME*/;
759         IF_PAR_DEBUG(verbose, 
760                      debugBelch("--  now=%ld\n", now));
761         IF_PAR_DEBUG(verbose,
762                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
763                          (last_fish_arrived_at!=0 &&
764                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
765                        debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
766                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
767                              last_fish_arrived_at,
768                              RtsFlags.ParFlags.fishDelay, now);
769                      });
770         
771         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
772             (last_fish_arrived_at==0 ||
773              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
774           /* outstandingFishes is set in sendFish, processFish;
775              avoid flooding system with fishes via delay */
776           pe = choosePE();
777           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
778                    NEW_FISH_HUNGER);
779
780           // Global statistics: count no. of fishes
781           if (RtsFlags.ParFlags.ParStats.Global &&
782               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
783             globalParStats.tot_fish_mess++;
784           }
785         }
786       
787         receivedFinish = processMessages();
788         goto next_thread;
789       }
790     } else if (PacketsWaiting()) {  /* Look for incoming messages */
791       receivedFinish = processMessages();
792     }
793
794     /* Now we are sure that we have some work available */
795     ASSERT(run_queue_hd != END_TSO_QUEUE);
796
797     /* Take a thread from the run queue, if we have work */
798     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
799     IF_DEBUG(sanity,checkTSO(t));
800
801     /* ToDo: write something to the log-file
802     if (RTSflags.ParFlags.granSimStats && !sameThread)
803         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
804
805     CurrentTSO = t;
806     */
807     /* the spark pool for the current PE */
808     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
809
810     IF_DEBUG(scheduler, 
811              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
812                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
813
814 # if 1
815     if (0 && RtsFlags.ParFlags.ParStats.Full && 
816         t && LastTSO && t->id != LastTSO->id && 
817         LastTSO->why_blocked == NotBlocked && 
818         LastTSO->what_next != ThreadComplete) {
819       // if previously scheduled TSO not blocked we have to record the context switch
820       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
821                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
822     }
823
824     if (RtsFlags.ParFlags.ParStats.Full && 
825         (emitSchedule /* forced emit */ ||
826         (t && LastTSO && t->id != LastTSO->id))) {
827       /* 
828          we are running a different TSO, so write a schedule event to log file
829          NB: If we use fair scheduling we also have to write  a deschedule 
830              event for LastTSO; with unfair scheduling we know that the
831              previous tso has blocked whenever we switch to another tso, so
832              we don't need it in GUM for now
833       */
834       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
835                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
836       emitSchedule = rtsFalse;
837     }
838      
839 # endif
840 #else /* !GRAN && !PAR */
841   
842     // grab a thread from the run queue
843     ASSERT(run_queue_hd != END_TSO_QUEUE);
844     POP_RUN_QUEUE(t);
845
846     // Sanity check the thread we're about to run.  This can be
847     // expensive if there is lots of thread switching going on...
848     IF_DEBUG(sanity,checkTSO(t));
849 #endif
850
851 #ifdef THREADED_RTS
852     {
853       StgMainThread *m = t->main;
854       
855       if(m)
856       {
857         if(m == mainThread)
858         {
859           IF_DEBUG(scheduler,
860             sched_belch("### Running thread %d in bound thread", t->id));
861           // yes, the Haskell thread is bound to the current native thread
862         }
863         else
864         {
865           IF_DEBUG(scheduler,
866             sched_belch("### thread %d bound to another OS thread", t->id));
867           // no, bound to a different Haskell thread: pass to that thread
868           PUSH_ON_RUN_QUEUE(t);
869           passCapability(&m->bound_thread_cond);
870           continue;
871         }
872       }
873       else
874       {
875         if(mainThread != NULL)
876         // The thread we want to run is bound.
877         {
878           IF_DEBUG(scheduler,
879             sched_belch("### this OS thread cannot run thread %d", t->id));
880           // no, the current native thread is bound to a different
881           // Haskell thread, so pass it to any worker thread
882           PUSH_ON_RUN_QUEUE(t);
883           passCapabilityToWorker();
884           continue; 
885         }
886       }
887     }
888 #endif
889
890     cap->r.rCurrentTSO = t;
891     
892     /* context switches are now initiated by the timer signal, unless
893      * the user specified "context switch as often as possible", with
894      * +RTS -C0
895      */
896     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
897          && (run_queue_hd != END_TSO_QUEUE
898              || blocked_queue_hd != END_TSO_QUEUE
899              || sleeping_queue != END_TSO_QUEUE)))
900         context_switch = 1;
901
902 run_thread:
903
904     RELEASE_LOCK(&sched_mutex);
905
906     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
907                               (long)t->id, whatNext_strs[t->what_next]));
908
909 #ifdef PROFILING
910     startHeapProfTimer();
911 #endif
912
913     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
914     /* Run the current thread 
915      */
916     prev_what_next = t->what_next;
917
918     errno = t->saved_errno;
919     in_haskell = rtsTrue;
920
921     switch (prev_what_next) {
922
923     case ThreadKilled:
924     case ThreadComplete:
925         /* Thread already finished, return to scheduler. */
926         ret = ThreadFinished;
927         break;
928
929     case ThreadRunGHC:
930         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
931         break;
932
933     case ThreadInterpret:
934         ret = interpretBCO(cap);
935         break;
936
937     default:
938       barf("schedule: invalid what_next field");
939     }
940
941     in_haskell = rtsFalse;
942
943     // The TSO might have moved, so find the new location:
944     t = cap->r.rCurrentTSO;
945
946     // And save the current errno in this thread.
947     t->saved_errno = errno;
948
949     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
950     
951     /* Costs for the scheduler are assigned to CCS_SYSTEM */
952 #ifdef PROFILING
953     stopHeapProfTimer();
954     CCCS = CCS_SYSTEM;
955 #endif
956     
957     ACQUIRE_LOCK(&sched_mutex);
958     
959 #ifdef RTS_SUPPORTS_THREADS
960     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
961 #elif !defined(GRAN) && !defined(PAR)
962     IF_DEBUG(scheduler,debugBelch("sched: "););
963 #endif
964     
965 #if defined(PAR)
966     /* HACK 675: if the last thread didn't yield, make sure to print a 
967        SCHEDULE event to the log file when StgRunning the next thread, even
968        if it is the same one as before */
969     LastTSO = t; 
970     TimeOfLastYield = CURRENT_TIME;
971 #endif
972
973     switch (ret) {
974     case HeapOverflow:
975 #if defined(GRAN)
976       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
977       globalGranStats.tot_heapover++;
978 #elif defined(PAR)
979       globalParStats.tot_heapover++;
980 #endif
981
982       // did the task ask for a large block?
983       if (cap->r.rHpAlloc > BLOCK_SIZE) {
984           // if so, get one and push it on the front of the nursery.
985           bdescr *bd;
986           nat blocks;
987           
988           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
989
990           IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
991                                    (long)t->id, whatNext_strs[t->what_next], blocks));
992
993           // don't do this if it would push us over the
994           // alloc_blocks_lim limit; we'll GC first.
995           if (alloc_blocks + blocks < alloc_blocks_lim) {
996
997               alloc_blocks += blocks;
998               bd = allocGroup( blocks );
999
1000               // link the new group into the list
1001               bd->link = cap->r.rCurrentNursery;
1002               bd->u.back = cap->r.rCurrentNursery->u.back;
1003               if (cap->r.rCurrentNursery->u.back != NULL) {
1004                   cap->r.rCurrentNursery->u.back->link = bd;
1005               } else {
1006                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1007                          g0s0->blocks == cap->r.rNursery);
1008                   cap->r.rNursery = g0s0->blocks = bd;
1009               }           
1010               cap->r.rCurrentNursery->u.back = bd;
1011
1012               // initialise it as a nursery block.  We initialise the
1013               // step, gen_no, and flags field of *every* sub-block in
1014               // this large block, because this is easier than making
1015               // sure that we always find the block head of a large
1016               // block whenever we call Bdescr() (eg. evacuate() and
1017               // isAlive() in the GC would both have to do this, at
1018               // least).
1019               { 
1020                   bdescr *x;
1021                   for (x = bd; x < bd + blocks; x++) {
1022                       x->step = g0s0;
1023                       x->gen_no = 0;
1024                       x->flags = 0;
1025                   }
1026               }
1027
1028               // don't forget to update the block count in g0s0.
1029               g0s0->n_blocks += blocks;
1030               // This assert can be a killer if the app is doing lots
1031               // of large block allocations.
1032               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1033
1034               // now update the nursery to point to the new block
1035               cap->r.rCurrentNursery = bd;
1036
1037               // we might be unlucky and have another thread get on the
1038               // run queue before us and steal the large block, but in that
1039               // case the thread will just end up requesting another large
1040               // block.
1041               PUSH_ON_RUN_QUEUE(t);
1042               break;
1043           }
1044       }
1045
1046       /* make all the running tasks block on a condition variable,
1047        * maybe set context_switch and wait till they all pile in,
1048        * then have them wait on a GC condition variable.
1049        */
1050       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1051                                (long)t->id, whatNext_strs[t->what_next]));
1052       threadPaused(t);
1053 #if defined(GRAN)
1054       ASSERT(!is_on_queue(t,CurrentProc));
1055 #elif defined(PAR)
1056       /* Currently we emit a DESCHEDULE event before GC in GUM.
1057          ToDo: either add separate event to distinguish SYSTEM time from rest
1058                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1059       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1060         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1061                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1062         emitSchedule = rtsTrue;
1063       }
1064 #endif
1065       
1066       ready_to_gc = rtsTrue;
1067       context_switch = 1;               /* stop other threads ASAP */
1068       PUSH_ON_RUN_QUEUE(t);
1069       /* actual GC is done at the end of the while loop */
1070       break;
1071       
1072     case StackOverflow:
1073 #if defined(GRAN)
1074       IF_DEBUG(gran, 
1075                DumpGranEvent(GR_DESCHEDULE, t));
1076       globalGranStats.tot_stackover++;
1077 #elif defined(PAR)
1078       // IF_DEBUG(par, 
1079       // DumpGranEvent(GR_DESCHEDULE, t);
1080       globalParStats.tot_stackover++;
1081 #endif
1082       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1083                                (long)t->id, whatNext_strs[t->what_next]));
1084       /* just adjust the stack for this thread, then pop it back
1085        * on the run queue.
1086        */
1087       threadPaused(t);
1088       { 
1089         /* enlarge the stack */
1090         StgTSO *new_t = threadStackOverflow(t);
1091         
1092         /* This TSO has moved, so update any pointers to it from the
1093          * main thread stack.  It better not be on any other queues...
1094          * (it shouldn't be).
1095          */
1096         if (t->main != NULL) {
1097             t->main->tso = new_t;
1098         }
1099         PUSH_ON_RUN_QUEUE(new_t);
1100       }
1101       break;
1102
1103     case ThreadYielding:
1104       // Reset the context switch flag.  We don't do this just before
1105       // running the thread, because that would mean we would lose ticks
1106       // during GC, which can lead to unfair scheduling (a thread hogs
1107       // the CPU because the tick always arrives during GC).  This way
1108       // penalises threads that do a lot of allocation, but that seems
1109       // better than the alternative.
1110       context_switch = 0;
1111
1112 #if defined(GRAN)
1113       IF_DEBUG(gran, 
1114                DumpGranEvent(GR_DESCHEDULE, t));
1115       globalGranStats.tot_yields++;
1116 #elif defined(PAR)
1117       // IF_DEBUG(par, 
1118       // DumpGranEvent(GR_DESCHEDULE, t);
1119       globalParStats.tot_yields++;
1120 #endif
1121       /* put the thread back on the run queue.  Then, if we're ready to
1122        * GC, check whether this is the last task to stop.  If so, wake
1123        * up the GC thread.  getThread will block during a GC until the
1124        * GC is finished.
1125        */
1126       IF_DEBUG(scheduler,
1127                if (t->what_next != prev_what_next) {
1128                    debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1129                          (long)t->id, whatNext_strs[t->what_next]);
1130                } else {
1131                    debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1132                          (long)t->id, whatNext_strs[t->what_next]);
1133                }
1134                );
1135
1136       IF_DEBUG(sanity,
1137                //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1138                checkTSO(t));
1139       ASSERT(t->link == END_TSO_QUEUE);
1140
1141       // Shortcut if we're just switching evaluators: don't bother
1142       // doing stack squeezing (which can be expensive), just run the
1143       // thread.
1144       if (t->what_next != prev_what_next) {
1145           goto run_thread;
1146       }
1147
1148       threadPaused(t);
1149
1150 #if defined(GRAN)
1151       ASSERT(!is_on_queue(t,CurrentProc));
1152
1153       IF_DEBUG(sanity,
1154                //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1155                checkThreadQsSanity(rtsTrue));
1156 #endif
1157
1158 #if defined(PAR)
1159       if (RtsFlags.ParFlags.doFairScheduling) { 
1160         /* this does round-robin scheduling; good for concurrency */
1161         APPEND_TO_RUN_QUEUE(t);
1162       } else {
1163         /* this does unfair scheduling; good for parallelism */
1164         PUSH_ON_RUN_QUEUE(t);
1165       }
1166 #else
1167       // this does round-robin scheduling; good for concurrency
1168       APPEND_TO_RUN_QUEUE(t);
1169 #endif
1170
1171 #if defined(GRAN)
1172       /* add a ContinueThread event to actually process the thread */
1173       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1174                 ContinueThread,
1175                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1176       IF_GRAN_DEBUG(bq, 
1177                debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1178                G_EVENTQ(0);
1179                G_CURR_THREADQ(0));
1180 #endif /* GRAN */
1181       break;
1182
1183     case ThreadBlocked:
1184 #if defined(GRAN)
1185       IF_DEBUG(scheduler,
1186                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1187                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1188                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1189
1190       // ??? needed; should emit block before
1191       IF_DEBUG(gran, 
1192                DumpGranEvent(GR_DESCHEDULE, t)); 
1193       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1194       /*
1195         ngoq Dogh!
1196       ASSERT(procStatus[CurrentProc]==Busy || 
1197               ((procStatus[CurrentProc]==Fetching) && 
1198               (t->block_info.closure!=(StgClosure*)NULL)));
1199       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1200           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1201             procStatus[CurrentProc]==Fetching)) 
1202         procStatus[CurrentProc] = Idle;
1203       */
1204 #elif defined(PAR)
1205       IF_DEBUG(scheduler,
1206                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1207                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1208       IF_PAR_DEBUG(bq,
1209
1210                    if (t->block_info.closure!=(StgClosure*)NULL) 
1211                      print_bq(t->block_info.closure));
1212
1213       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1214       blockThread(t);
1215
1216       /* whatever we schedule next, we must log that schedule */
1217       emitSchedule = rtsTrue;
1218
1219 #else /* !GRAN */
1220       /* don't need to do anything.  Either the thread is blocked on
1221        * I/O, in which case we'll have called addToBlockedQueue
1222        * previously, or it's blocked on an MVar or Blackhole, in which
1223        * case it'll be on the relevant queue already.
1224        */
1225       ASSERT(t->why_blocked != NotBlocked);
1226       IF_DEBUG(scheduler,
1227                debugBelch("--<< thread %d (%s) stopped: ", 
1228                        t->id, whatNext_strs[t->what_next]);
1229                printThreadBlockage(t);
1230                debugBelch("\n"));
1231
1232       /* Only for dumping event to log file 
1233          ToDo: do I need this in GranSim, too?
1234       blockThread(t);
1235       */
1236 #endif
1237       threadPaused(t);
1238       break;
1239
1240     case ThreadFinished:
1241       /* Need to check whether this was a main thread, and if so, signal
1242        * the task that started it with the return value.  If we have no
1243        * more main threads, we probably need to stop all the tasks until
1244        * we get a new one.
1245        */
1246       /* We also end up here if the thread kills itself with an
1247        * uncaught exception, see Exception.hc.
1248        */
1249       IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1250                                t->id, whatNext_strs[t->what_next]));
1251 #if defined(GRAN)
1252       endThread(t, CurrentProc); // clean-up the thread
1253 #elif defined(PAR)
1254       /* For now all are advisory -- HWL */
1255       //if(t->priority==AdvisoryPriority) ??
1256       advisory_thread_count--;
1257       
1258 # ifdef DIST
1259       if(t->dist.priority==RevalPriority)
1260         FinishReval(t);
1261 # endif
1262       
1263       if (RtsFlags.ParFlags.ParStats.Full &&
1264           !RtsFlags.ParFlags.ParStats.Suppressed) 
1265         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1266 #endif
1267
1268       //
1269       // Check whether the thread that just completed was a main
1270       // thread, and if so return with the result.  
1271       //
1272       // There is an assumption here that all thread completion goes
1273       // through this point; we need to make sure that if a thread
1274       // ends up in the ThreadKilled state, that it stays on the run
1275       // queue so it can be dealt with here.
1276       //
1277       if (
1278 #if defined(RTS_SUPPORTS_THREADS)
1279           mainThread != NULL
1280 #else
1281           mainThread->tso == t
1282 #endif
1283           )
1284       {
1285           // We are a bound thread: this must be our thread that just
1286           // completed.
1287           ASSERT(mainThread->tso == t);
1288
1289           if (t->what_next == ThreadComplete) {
1290               if (mainThread->ret) {
1291                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1292                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1293               }
1294               mainThread->stat = Success;
1295           } else {
1296               if (mainThread->ret) {
1297                   *(mainThread->ret) = NULL;
1298               }
1299               if (was_interrupted) {
1300                   mainThread->stat = Interrupted;
1301               } else {
1302                   mainThread->stat = Killed;
1303               }
1304           }
1305 #ifdef DEBUG
1306           removeThreadLabel((StgWord)mainThread->tso->id);
1307 #endif
1308           if (mainThread->prev == NULL) {
1309               main_threads = mainThread->link;
1310           } else {
1311               mainThread->prev->link = mainThread->link;
1312           }
1313           if (mainThread->link != NULL) {
1314               mainThread->link->prev = NULL;
1315           }
1316           releaseCapability(cap);
1317           return;
1318       }
1319
1320 #ifdef RTS_SUPPORTS_THREADS
1321       ASSERT(t->main == NULL);
1322 #else
1323       if (t->main != NULL) {
1324           // Must be a main thread that is not the topmost one.  Leave
1325           // it on the run queue until the stack has unwound to the
1326           // point where we can deal with this.  Leaving it on the run
1327           // queue also ensures that the garbage collector knows about
1328           // this thread and its return value (it gets dropped from the
1329           // all_threads list so there's no other way to find it).
1330           APPEND_TO_RUN_QUEUE(t);
1331       }
1332 #endif
1333       break;
1334
1335     default:
1336       barf("schedule: invalid thread return code %d", (int)ret);
1337     }
1338
1339 #ifdef PROFILING
1340     // When we have +RTS -i0 and we're heap profiling, do a census at
1341     // every GC.  This lets us get repeatable runs for debugging.
1342     if (performHeapProfile ||
1343         (RtsFlags.ProfFlags.profileInterval==0 &&
1344          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1345         GarbageCollect(GetRoots, rtsTrue);
1346         heapCensus();
1347         performHeapProfile = rtsFalse;
1348         ready_to_gc = rtsFalse; // we already GC'd
1349     }
1350 #endif
1351
1352     if (ready_to_gc) {
1353       /* Kick any transactions which are invalid back to their atomically frames.
1354        * When next scheduled they will try to commit, this commit will fail and
1355        * they will retry. */
1356       for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1357         if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1358           if (!stmValidateTransaction (t -> trec)) {
1359             IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1360
1361             // strip the stack back to the ATOMICALLY_FRAME, aborting
1362             // the (nested) transaction, and saving the stack of any
1363             // partially-evaluated thunks on the heap.
1364             raiseAsync_(t, NULL, rtsTrue);
1365             
1366 #ifdef REG_R1
1367             ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1368 #endif
1369           }
1370         }
1371       }
1372
1373       /* everybody back, start the GC.
1374        * Could do it in this thread, or signal a condition var
1375        * to do it in another thread.  Either way, we need to
1376        * broadcast on gc_pending_cond afterward.
1377        */
1378 #if defined(RTS_SUPPORTS_THREADS)
1379       IF_DEBUG(scheduler,sched_belch("doing GC"));
1380 #endif
1381       GarbageCollect(GetRoots,rtsFalse);
1382       ready_to_gc = rtsFalse;
1383 #if defined(GRAN)
1384       /* add a ContinueThread event to continue execution of current thread */
1385       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1386                 ContinueThread,
1387                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1388       IF_GRAN_DEBUG(bq, 
1389                debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1390                G_EVENTQ(0);
1391                G_CURR_THREADQ(0));
1392 #endif /* GRAN */
1393     }
1394
1395 #if defined(GRAN)
1396   next_thread:
1397     IF_GRAN_DEBUG(unused,
1398                   print_eventq(EventHd));
1399
1400     event = get_next_event();
1401 #elif defined(PAR)
1402   next_thread:
1403     /* ToDo: wait for next message to arrive rather than busy wait */
1404 #endif /* GRAN */
1405
1406   } /* end of while(1) */
1407
1408   IF_PAR_DEBUG(verbose,
1409                debugBelch("== Leaving schedule() after having received Finish\n"));
1410 }
1411
1412 /* ---------------------------------------------------------------------------
1413  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1414  * used by Control.Concurrent for error checking.
1415  * ------------------------------------------------------------------------- */
1416  
1417 StgBool
1418 rtsSupportsBoundThreads(void)
1419 {
1420 #ifdef THREADED_RTS
1421   return rtsTrue;
1422 #else
1423   return rtsFalse;
1424 #endif
1425 }
1426
1427 /* ---------------------------------------------------------------------------
1428  * isThreadBound(tso): check whether tso is bound to an OS thread.
1429  * ------------------------------------------------------------------------- */
1430  
1431 StgBool
1432 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1433 {
1434 #ifdef THREADED_RTS
1435   return (tso->main != NULL);
1436 #endif
1437   return rtsFalse;
1438 }
1439
1440 /* ---------------------------------------------------------------------------
1441  * Singleton fork(). Do not copy any running threads.
1442  * ------------------------------------------------------------------------- */
1443
1444 #ifndef mingw32_HOST_OS
1445 #define FORKPROCESS_PRIMOP_SUPPORTED
1446 #endif
1447
1448 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1449 static void 
1450 deleteThreadImmediately(StgTSO *tso);
1451 #endif
1452 StgInt
1453 forkProcess(HsStablePtr *entry
1454 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1455             STG_UNUSED
1456 #endif
1457            )
1458 {
1459 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1460   pid_t pid;
1461   StgTSO* t,*next;
1462   StgMainThread *m;
1463   SchedulerStatus rc;
1464
1465   IF_DEBUG(scheduler,sched_belch("forking!"));
1466   rts_lock(); // This not only acquires sched_mutex, it also
1467               // makes sure that no other threads are running
1468
1469   pid = fork();
1470
1471   if (pid) { /* parent */
1472
1473   /* just return the pid */
1474     rts_unlock();
1475     return pid;
1476     
1477   } else { /* child */
1478     
1479     
1480       // delete all threads
1481     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1482     
1483     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1484       next = t->link;
1485
1486         // don't allow threads to catch the ThreadKilled exception
1487       deleteThreadImmediately(t);
1488     }
1489     
1490       // wipe the main thread list
1491     while((m = main_threads) != NULL) {
1492       main_threads = m->link;
1493 # ifdef THREADED_RTS
1494       closeCondition(&m->bound_thread_cond);
1495 # endif
1496       stgFree(m);
1497     }
1498     
1499     rc = rts_evalStableIO(entry, NULL);  // run the action
1500     rts_checkSchedStatus("forkProcess",rc);
1501     
1502     rts_unlock();
1503     
1504     hs_exit();                      // clean up and exit
1505     stg_exit(0);
1506   }
1507 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1508   barf("forkProcess#: primop not supported, sorry!\n");
1509   return -1;
1510 #endif
1511 }
1512
1513 /* ---------------------------------------------------------------------------
1514  * deleteAllThreads():  kill all the live threads.
1515  *
1516  * This is used when we catch a user interrupt (^C), before performing
1517  * any necessary cleanups and running finalizers.
1518  *
1519  * Locks: sched_mutex held.
1520  * ------------------------------------------------------------------------- */
1521    
1522 void
1523 deleteAllThreads ( void )
1524 {
1525   StgTSO* t, *next;
1526   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1527   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1528       next = t->global_link;
1529       deleteThread(t);
1530   }      
1531
1532   // The run queue now contains a bunch of ThreadKilled threads.  We
1533   // must not throw these away: the main thread(s) will be in there
1534   // somewhere, and the main scheduler loop has to deal with it.
1535   // Also, the run queue is the only thing keeping these threads from
1536   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1537
1538   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1539   ASSERT(sleeping_queue == END_TSO_QUEUE);
1540 }
1541
1542 /* startThread and  insertThread are now in GranSim.c -- HWL */
1543
1544
1545 /* ---------------------------------------------------------------------------
1546  * Suspending & resuming Haskell threads.
1547  * 
1548  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1549  * its capability before calling the C function.  This allows another
1550  * task to pick up the capability and carry on running Haskell
1551  * threads.  It also means that if the C call blocks, it won't lock
1552  * the whole system.
1553  *
1554  * The Haskell thread making the C call is put to sleep for the
1555  * duration of the call, on the susepended_ccalling_threads queue.  We
1556  * give out a token to the task, which it can use to resume the thread
1557  * on return from the C function.
1558  * ------------------------------------------------------------------------- */
1559    
1560 StgInt
1561 suspendThread( StgRegTable *reg )
1562 {
1563   nat tok;
1564   Capability *cap;
1565   int saved_errno = errno;
1566
1567   /* assume that *reg is a pointer to the StgRegTable part
1568    * of a Capability.
1569    */
1570   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1571
1572   ACQUIRE_LOCK(&sched_mutex);
1573
1574   IF_DEBUG(scheduler,
1575            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1576
1577   // XXX this might not be necessary --SDM
1578   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1579
1580   threadPaused(cap->r.rCurrentTSO);
1581   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1582   suspended_ccalling_threads = cap->r.rCurrentTSO;
1583
1584   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1585       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1586       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1587   } else {
1588       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1589   }
1590
1591   /* Use the thread ID as the token; it should be unique */
1592   tok = cap->r.rCurrentTSO->id;
1593
1594   /* Hand back capability */
1595   releaseCapability(cap);
1596   
1597 #if defined(RTS_SUPPORTS_THREADS)
1598   /* Preparing to leave the RTS, so ensure there's a native thread/task
1599      waiting to take over.
1600   */
1601   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1602 #endif
1603
1604   RELEASE_LOCK(&sched_mutex);
1605   
1606   errno = saved_errno;
1607   in_haskell = rtsFalse;
1608   return tok; 
1609 }
1610
1611 StgRegTable *
1612 resumeThread( StgInt tok )
1613 {
1614   StgTSO *tso, **prev;
1615   Capability *cap;
1616   int saved_errno = errno;
1617
1618 #if defined(RTS_SUPPORTS_THREADS)
1619   /* Wait for permission to re-enter the RTS with the result. */
1620   ACQUIRE_LOCK(&sched_mutex);
1621   waitForReturnCapability(&sched_mutex, &cap);
1622
1623   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1624 #else
1625   grabCapability(&cap);
1626 #endif
1627
1628   /* Remove the thread off of the suspended list */
1629   prev = &suspended_ccalling_threads;
1630   for (tso = suspended_ccalling_threads; 
1631        tso != END_TSO_QUEUE; 
1632        prev = &tso->link, tso = tso->link) {
1633     if (tso->id == (StgThreadID)tok) {
1634       *prev = tso->link;
1635       break;
1636     }
1637   }
1638   if (tso == END_TSO_QUEUE) {
1639     barf("resumeThread: thread not found");
1640   }
1641   tso->link = END_TSO_QUEUE;
1642   
1643   if(tso->why_blocked == BlockedOnCCall) {
1644       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1645       tso->blocked_exceptions = NULL;
1646   }
1647   
1648   /* Reset blocking status */
1649   tso->why_blocked  = NotBlocked;
1650
1651   cap->r.rCurrentTSO = tso;
1652   RELEASE_LOCK(&sched_mutex);
1653   errno = saved_errno;
1654   in_haskell = rtsTrue;
1655   return &cap->r;
1656 }
1657
1658
1659 /* ---------------------------------------------------------------------------
1660  * Static functions
1661  * ------------------------------------------------------------------------ */
1662 static void unblockThread(StgTSO *tso);
1663
1664 /* ---------------------------------------------------------------------------
1665  * Comparing Thread ids.
1666  *
1667  * This is used from STG land in the implementation of the
1668  * instances of Eq/Ord for ThreadIds.
1669  * ------------------------------------------------------------------------ */
1670
1671 int
1672 cmp_thread(StgPtr tso1, StgPtr tso2) 
1673
1674   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1675   StgThreadID id2 = ((StgTSO *)tso2)->id;
1676  
1677   if (id1 < id2) return (-1);
1678   if (id1 > id2) return 1;
1679   return 0;
1680 }
1681
1682 /* ---------------------------------------------------------------------------
1683  * Fetching the ThreadID from an StgTSO.
1684  *
1685  * This is used in the implementation of Show for ThreadIds.
1686  * ------------------------------------------------------------------------ */
1687 int
1688 rts_getThreadId(StgPtr tso) 
1689 {
1690   return ((StgTSO *)tso)->id;
1691 }
1692
1693 #ifdef DEBUG
1694 void
1695 labelThread(StgPtr tso, char *label)
1696 {
1697   int len;
1698   void *buf;
1699
1700   /* Caveat: Once set, you can only set the thread name to "" */
1701   len = strlen(label)+1;
1702   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1703   strncpy(buf,label,len);
1704   /* Update will free the old memory for us */
1705   updateThreadLabel(((StgTSO *)tso)->id,buf);
1706 }
1707 #endif /* DEBUG */
1708
1709 /* ---------------------------------------------------------------------------
1710    Create a new thread.
1711
1712    The new thread starts with the given stack size.  Before the
1713    scheduler can run, however, this thread needs to have a closure
1714    (and possibly some arguments) pushed on its stack.  See
1715    pushClosure() in Schedule.h.
1716
1717    createGenThread() and createIOThread() (in SchedAPI.h) are
1718    convenient packaged versions of this function.
1719
1720    currently pri (priority) is only used in a GRAN setup -- HWL
1721    ------------------------------------------------------------------------ */
1722 #if defined(GRAN)
1723 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1724 StgTSO *
1725 createThread(nat size, StgInt pri)
1726 #else
1727 StgTSO *
1728 createThread(nat size)
1729 #endif
1730 {
1731
1732     StgTSO *tso;
1733     nat stack_size;
1734
1735     /* First check whether we should create a thread at all */
1736 #if defined(PAR)
1737   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1738   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1739     threadsIgnored++;
1740     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1741           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1742     return END_TSO_QUEUE;
1743   }
1744   threadsCreated++;
1745 #endif
1746
1747 #if defined(GRAN)
1748   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1749 #endif
1750
1751   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1752
1753   /* catch ridiculously small stack sizes */
1754   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1755     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1756   }
1757
1758   stack_size = size - TSO_STRUCT_SIZEW;
1759
1760   tso = (StgTSO *)allocate(size);
1761   TICK_ALLOC_TSO(stack_size, 0);
1762
1763   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1764 #if defined(GRAN)
1765   SET_GRAN_HDR(tso, ThisPE);
1766 #endif
1767
1768   // Always start with the compiled code evaluator
1769   tso->what_next = ThreadRunGHC;
1770
1771   tso->id = next_thread_id++; 
1772   tso->why_blocked  = NotBlocked;
1773   tso->blocked_exceptions = NULL;
1774
1775   tso->saved_errno = 0;
1776   tso->main = NULL;
1777   
1778   tso->stack_size   = stack_size;
1779   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1780                               - TSO_STRUCT_SIZEW;
1781   tso->sp           = (P_)&(tso->stack) + stack_size;
1782
1783   tso->trec = NO_TREC;
1784
1785 #ifdef PROFILING
1786   tso->prof.CCCS = CCS_MAIN;
1787 #endif
1788
1789   /* put a stop frame on the stack */
1790   tso->sp -= sizeofW(StgStopFrame);
1791   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1792   tso->link = END_TSO_QUEUE;
1793
1794   // ToDo: check this
1795 #if defined(GRAN)
1796   /* uses more flexible routine in GranSim */
1797   insertThread(tso, CurrentProc);
1798 #else
1799   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1800    * from its creation
1801    */
1802 #endif
1803
1804 #if defined(GRAN) 
1805   if (RtsFlags.GranFlags.GranSimStats.Full) 
1806     DumpGranEvent(GR_START,tso);
1807 #elif defined(PAR)
1808   if (RtsFlags.ParFlags.ParStats.Full) 
1809     DumpGranEvent(GR_STARTQ,tso);
1810   /* HACk to avoid SCHEDULE 
1811      LastTSO = tso; */
1812 #endif
1813
1814   /* Link the new thread on the global thread list.
1815    */
1816   tso->global_link = all_threads;
1817   all_threads = tso;
1818
1819 #if defined(DIST)
1820   tso->dist.priority = MandatoryPriority; //by default that is...
1821 #endif
1822
1823 #if defined(GRAN)
1824   tso->gran.pri = pri;
1825 # if defined(DEBUG)
1826   tso->gran.magic = TSO_MAGIC; // debugging only
1827 # endif
1828   tso->gran.sparkname   = 0;
1829   tso->gran.startedat   = CURRENT_TIME; 
1830   tso->gran.exported    = 0;
1831   tso->gran.basicblocks = 0;
1832   tso->gran.allocs      = 0;
1833   tso->gran.exectime    = 0;
1834   tso->gran.fetchtime   = 0;
1835   tso->gran.fetchcount  = 0;
1836   tso->gran.blocktime   = 0;
1837   tso->gran.blockcount  = 0;
1838   tso->gran.blockedat   = 0;
1839   tso->gran.globalsparks = 0;
1840   tso->gran.localsparks  = 0;
1841   if (RtsFlags.GranFlags.Light)
1842     tso->gran.clock  = Now; /* local clock */
1843   else
1844     tso->gran.clock  = 0;
1845
1846   IF_DEBUG(gran,printTSO(tso));
1847 #elif defined(PAR)
1848 # if defined(DEBUG)
1849   tso->par.magic = TSO_MAGIC; // debugging only
1850 # endif
1851   tso->par.sparkname   = 0;
1852   tso->par.startedat   = CURRENT_TIME; 
1853   tso->par.exported    = 0;
1854   tso->par.basicblocks = 0;
1855   tso->par.allocs      = 0;
1856   tso->par.exectime    = 0;
1857   tso->par.fetchtime   = 0;
1858   tso->par.fetchcount  = 0;
1859   tso->par.blocktime   = 0;
1860   tso->par.blockcount  = 0;
1861   tso->par.blockedat   = 0;
1862   tso->par.globalsparks = 0;
1863   tso->par.localsparks  = 0;
1864 #endif
1865
1866 #if defined(GRAN)
1867   globalGranStats.tot_threads_created++;
1868   globalGranStats.threads_created_on_PE[CurrentProc]++;
1869   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1870   globalGranStats.tot_sq_probes++;
1871 #elif defined(PAR)
1872   // collect parallel global statistics (currently done together with GC stats)
1873   if (RtsFlags.ParFlags.ParStats.Global &&
1874       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1875     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1876     globalParStats.tot_threads_created++;
1877   }
1878 #endif 
1879
1880 #if defined(GRAN)
1881   IF_GRAN_DEBUG(pri,
1882                 sched_belch("==__ schedule: Created TSO %d (%p);",
1883                       CurrentProc, tso, tso->id));
1884 #elif defined(PAR)
1885     IF_PAR_DEBUG(verbose,
1886                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1887                        (long)tso->id, tso, advisory_thread_count));
1888 #else
1889   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1890                                 (long)tso->id, (long)tso->stack_size));
1891 #endif    
1892   return tso;
1893 }
1894
1895 #if defined(PAR)
1896 /* RFP:
1897    all parallel thread creation calls should fall through the following routine.
1898 */
1899 StgTSO *
1900 createSparkThread(rtsSpark spark) 
1901 { StgTSO *tso;
1902   ASSERT(spark != (rtsSpark)NULL);
1903   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1904   { threadsIgnored++;
1905     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1906           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1907     return END_TSO_QUEUE;
1908   }
1909   else
1910   { threadsCreated++;
1911     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1912     if (tso==END_TSO_QUEUE)     
1913       barf("createSparkThread: Cannot create TSO");
1914 #if defined(DIST)
1915     tso->priority = AdvisoryPriority;
1916 #endif
1917     pushClosure(tso,spark);
1918     PUSH_ON_RUN_QUEUE(tso);
1919     advisory_thread_count++;    
1920   }
1921   return tso;
1922 }
1923 #endif
1924
1925 /*
1926   Turn a spark into a thread.
1927   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1928 */
1929 #if defined(PAR)
1930 StgTSO *
1931 activateSpark (rtsSpark spark) 
1932 {
1933   StgTSO *tso;
1934
1935   tso = createSparkThread(spark);
1936   if (RtsFlags.ParFlags.ParStats.Full) {   
1937     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1938     IF_PAR_DEBUG(verbose,
1939                  debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1940                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1941   }
1942   // ToDo: fwd info on local/global spark to thread -- HWL
1943   // tso->gran.exported =  spark->exported;
1944   // tso->gran.locked =   !spark->global;
1945   // tso->gran.sparkname = spark->name;
1946
1947   return tso;
1948 }
1949 #endif
1950
1951 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1952                                    Capability *initialCapability
1953                                    );
1954
1955
1956 /* ---------------------------------------------------------------------------
1957  * scheduleThread()
1958  *
1959  * scheduleThread puts a thread on the head of the runnable queue.
1960  * This will usually be done immediately after a thread is created.
1961  * The caller of scheduleThread must create the thread using e.g.
1962  * createThread and push an appropriate closure
1963  * on this thread's stack before the scheduler is invoked.
1964  * ------------------------------------------------------------------------ */
1965
1966 static void scheduleThread_ (StgTSO* tso);
1967
1968 void
1969 scheduleThread_(StgTSO *tso)
1970 {
1971   // The thread goes at the *end* of the run-queue, to avoid possible
1972   // starvation of any threads already on the queue.
1973   APPEND_TO_RUN_QUEUE(tso);
1974   threadRunnable();
1975 }
1976
1977 void
1978 scheduleThread(StgTSO* tso)
1979 {
1980   ACQUIRE_LOCK(&sched_mutex);
1981   scheduleThread_(tso);
1982   RELEASE_LOCK(&sched_mutex);
1983 }
1984
1985 #if defined(RTS_SUPPORTS_THREADS)
1986 static Condition bound_cond_cache;
1987 static int bound_cond_cache_full = 0;
1988 #endif
1989
1990
1991 SchedulerStatus
1992 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1993                    Capability *initialCapability)
1994 {
1995     // Precondition: sched_mutex must be held
1996     StgMainThread *m;
1997
1998     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1999     m->tso = tso;
2000     tso->main = m;
2001     m->ret = ret;
2002     m->stat = NoStatus;
2003     m->link = main_threads;
2004     m->prev = NULL;
2005     if (main_threads != NULL) {
2006         main_threads->prev = m;
2007     }
2008     main_threads = m;
2009
2010 #if defined(RTS_SUPPORTS_THREADS)
2011     // Allocating a new condition for each thread is expensive, so we
2012     // cache one.  This is a pretty feeble hack, but it helps speed up
2013     // consecutive call-ins quite a bit.
2014     if (bound_cond_cache_full) {
2015         m->bound_thread_cond = bound_cond_cache;
2016         bound_cond_cache_full = 0;
2017     } else {
2018         initCondition(&m->bound_thread_cond);
2019     }
2020 #endif
2021
2022     /* Put the thread on the main-threads list prior to scheduling the TSO.
2023        Failure to do so introduces a race condition in the MT case (as
2024        identified by Wolfgang Thaller), whereby the new task/OS thread 
2025        created by scheduleThread_() would complete prior to the thread
2026        that spawned it managed to put 'itself' on the main-threads list.
2027        The upshot of it all being that the worker thread wouldn't get to
2028        signal the completion of the its work item for the main thread to
2029        see (==> it got stuck waiting.)    -- sof 6/02.
2030     */
2031     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2032     
2033     APPEND_TO_RUN_QUEUE(tso);
2034     // NB. Don't call threadRunnable() here, because the thread is
2035     // bound and only runnable by *this* OS thread, so waking up other
2036     // workers will just slow things down.
2037
2038     return waitThread_(m, initialCapability);
2039 }
2040
2041 /* ---------------------------------------------------------------------------
2042  * initScheduler()
2043  *
2044  * Initialise the scheduler.  This resets all the queues - if the
2045  * queues contained any threads, they'll be garbage collected at the
2046  * next pass.
2047  *
2048  * ------------------------------------------------------------------------ */
2049
2050 void 
2051 initScheduler(void)
2052 {
2053 #if defined(GRAN)
2054   nat i;
2055
2056   for (i=0; i<=MAX_PROC; i++) {
2057     run_queue_hds[i]      = END_TSO_QUEUE;
2058     run_queue_tls[i]      = END_TSO_QUEUE;
2059     blocked_queue_hds[i]  = END_TSO_QUEUE;
2060     blocked_queue_tls[i]  = END_TSO_QUEUE;
2061     ccalling_threadss[i]  = END_TSO_QUEUE;
2062     sleeping_queue        = END_TSO_QUEUE;
2063   }
2064 #else
2065   run_queue_hd      = END_TSO_QUEUE;
2066   run_queue_tl      = END_TSO_QUEUE;
2067   blocked_queue_hd  = END_TSO_QUEUE;
2068   blocked_queue_tl  = END_TSO_QUEUE;
2069   sleeping_queue    = END_TSO_QUEUE;
2070 #endif 
2071
2072   suspended_ccalling_threads  = END_TSO_QUEUE;
2073
2074   main_threads = NULL;
2075   all_threads  = END_TSO_QUEUE;
2076
2077   context_switch = 0;
2078   interrupted    = 0;
2079
2080   RtsFlags.ConcFlags.ctxtSwitchTicks =
2081       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2082       
2083 #if defined(RTS_SUPPORTS_THREADS)
2084   /* Initialise the mutex and condition variables used by
2085    * the scheduler. */
2086   initMutex(&sched_mutex);
2087   initMutex(&term_mutex);
2088 #endif
2089   
2090   ACQUIRE_LOCK(&sched_mutex);
2091
2092   /* A capability holds the state a native thread needs in
2093    * order to execute STG code. At least one capability is
2094    * floating around (only SMP builds have more than one).
2095    */
2096   initCapabilities();
2097   
2098 #if defined(RTS_SUPPORTS_THREADS)
2099     /* start our haskell execution tasks */
2100     startTaskManager(0,taskStart);
2101 #endif
2102
2103 #if /* defined(SMP) ||*/ defined(PAR)
2104   initSparkPools();
2105 #endif
2106
2107   RELEASE_LOCK(&sched_mutex);
2108 }
2109
2110 void
2111 exitScheduler( void )
2112 {
2113 #if defined(RTS_SUPPORTS_THREADS)
2114   stopTaskManager();
2115 #endif
2116   shutting_down_scheduler = rtsTrue;
2117 }
2118
2119 /* ----------------------------------------------------------------------------
2120    Managing the per-task allocation areas.
2121    
2122    Each capability comes with an allocation area.  These are
2123    fixed-length block lists into which allocation can be done.
2124
2125    ToDo: no support for two-space collection at the moment???
2126    ------------------------------------------------------------------------- */
2127
2128 static
2129 SchedulerStatus
2130 waitThread_(StgMainThread* m, Capability *initialCapability)
2131 {
2132   SchedulerStatus stat;
2133
2134   // Precondition: sched_mutex must be held.
2135   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2136
2137 #if defined(GRAN)
2138   /* GranSim specific init */
2139   CurrentTSO = m->tso;                // the TSO to run
2140   procStatus[MainProc] = Busy;        // status of main PE
2141   CurrentProc = MainProc;             // PE to run it on
2142   schedule(m,initialCapability);
2143 #else
2144   schedule(m,initialCapability);
2145   ASSERT(m->stat != NoStatus);
2146 #endif
2147
2148   stat = m->stat;
2149
2150 #if defined(RTS_SUPPORTS_THREADS)
2151   // Free the condition variable, returning it to the cache if possible.
2152   if (!bound_cond_cache_full) {
2153       bound_cond_cache = m->bound_thread_cond;
2154       bound_cond_cache_full = 1;
2155   } else {
2156       closeCondition(&m->bound_thread_cond);
2157   }
2158 #endif
2159
2160   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2161   stgFree(m);
2162
2163   // Postcondition: sched_mutex still held
2164   return stat;
2165 }
2166
2167 /* ---------------------------------------------------------------------------
2168    Where are the roots that we know about?
2169
2170         - all the threads on the runnable queue
2171         - all the threads on the blocked queue
2172         - all the threads on the sleeping queue
2173         - all the thread currently executing a _ccall_GC
2174         - all the "main threads"
2175      
2176    ------------------------------------------------------------------------ */
2177
2178 /* This has to be protected either by the scheduler monitor, or by the
2179         garbage collection monitor (probably the latter).
2180         KH @ 25/10/99
2181 */
2182
2183 void
2184 GetRoots( evac_fn evac )
2185 {
2186 #if defined(GRAN)
2187   {
2188     nat i;
2189     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2190       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2191           evac((StgClosure **)&run_queue_hds[i]);
2192       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2193           evac((StgClosure **)&run_queue_tls[i]);
2194       
2195       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2196           evac((StgClosure **)&blocked_queue_hds[i]);
2197       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2198           evac((StgClosure **)&blocked_queue_tls[i]);
2199       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2200           evac((StgClosure **)&ccalling_threads[i]);
2201     }
2202   }
2203
2204   markEventQueue();
2205
2206 #else /* !GRAN */
2207   if (run_queue_hd != END_TSO_QUEUE) {
2208       ASSERT(run_queue_tl != END_TSO_QUEUE);
2209       evac((StgClosure **)&run_queue_hd);
2210       evac((StgClosure **)&run_queue_tl);
2211   }
2212   
2213   if (blocked_queue_hd != END_TSO_QUEUE) {
2214       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2215       evac((StgClosure **)&blocked_queue_hd);
2216       evac((StgClosure **)&blocked_queue_tl);
2217   }
2218   
2219   if (sleeping_queue != END_TSO_QUEUE) {
2220       evac((StgClosure **)&sleeping_queue);
2221   }
2222 #endif 
2223
2224   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2225       evac((StgClosure **)&suspended_ccalling_threads);
2226   }
2227
2228 #if defined(PAR) || defined(GRAN)
2229   markSparkQueue(evac);
2230 #endif
2231
2232 #if defined(RTS_USER_SIGNALS)
2233   // mark the signal handlers (signals should be already blocked)
2234   markSignalHandlers(evac);
2235 #endif
2236 }
2237
2238 /* -----------------------------------------------------------------------------
2239    performGC
2240
2241    This is the interface to the garbage collector from Haskell land.
2242    We provide this so that external C code can allocate and garbage
2243    collect when called from Haskell via _ccall_GC.
2244
2245    It might be useful to provide an interface whereby the programmer
2246    can specify more roots (ToDo).
2247    
2248    This needs to be protected by the GC condition variable above.  KH.
2249    -------------------------------------------------------------------------- */
2250
2251 static void (*extra_roots)(evac_fn);
2252
2253 void
2254 performGC(void)
2255 {
2256   /* Obligated to hold this lock upon entry */
2257   ACQUIRE_LOCK(&sched_mutex);
2258   GarbageCollect(GetRoots,rtsFalse);
2259   RELEASE_LOCK(&sched_mutex);
2260 }
2261
2262 void
2263 performMajorGC(void)
2264 {
2265   ACQUIRE_LOCK(&sched_mutex);
2266   GarbageCollect(GetRoots,rtsTrue);
2267   RELEASE_LOCK(&sched_mutex);
2268 }
2269
2270 static void
2271 AllRoots(evac_fn evac)
2272 {
2273     GetRoots(evac);             // the scheduler's roots
2274     extra_roots(evac);          // the user's roots
2275 }
2276
2277 void
2278 performGCWithRoots(void (*get_roots)(evac_fn))
2279 {
2280   ACQUIRE_LOCK(&sched_mutex);
2281   extra_roots = get_roots;
2282   GarbageCollect(AllRoots,rtsFalse);
2283   RELEASE_LOCK(&sched_mutex);
2284 }
2285
2286 /* -----------------------------------------------------------------------------
2287    Stack overflow
2288
2289    If the thread has reached its maximum stack size, then raise the
2290    StackOverflow exception in the offending thread.  Otherwise
2291    relocate the TSO into a larger chunk of memory and adjust its stack
2292    size appropriately.
2293    -------------------------------------------------------------------------- */
2294
2295 static StgTSO *
2296 threadStackOverflow(StgTSO *tso)
2297 {
2298   nat new_stack_size, new_tso_size, stack_words;
2299   StgPtr new_sp;
2300   StgTSO *dest;
2301
2302   IF_DEBUG(sanity,checkTSO(tso));
2303   if (tso->stack_size >= tso->max_stack_size) {
2304
2305     IF_DEBUG(gc,
2306              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2307                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2308              /* If we're debugging, just print out the top of the stack */
2309              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2310                                               tso->sp+64)));
2311
2312     /* Send this thread the StackOverflow exception */
2313     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2314     return tso;
2315   }
2316
2317   /* Try to double the current stack size.  If that takes us over the
2318    * maximum stack size for this thread, then use the maximum instead.
2319    * Finally round up so the TSO ends up as a whole number of blocks.
2320    */
2321   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2322   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2323                                        TSO_STRUCT_SIZE)/sizeof(W_);
2324   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2325   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2326
2327   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2328
2329   dest = (StgTSO *)allocate(new_tso_size);
2330   TICK_ALLOC_TSO(new_stack_size,0);
2331
2332   /* copy the TSO block and the old stack into the new area */
2333   memcpy(dest,tso,TSO_STRUCT_SIZE);
2334   stack_words = tso->stack + tso->stack_size - tso->sp;
2335   new_sp = (P_)dest + new_tso_size - stack_words;
2336   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2337
2338   /* relocate the stack pointers... */
2339   dest->sp         = new_sp;
2340   dest->stack_size = new_stack_size;
2341         
2342   /* Mark the old TSO as relocated.  We have to check for relocated
2343    * TSOs in the garbage collector and any primops that deal with TSOs.
2344    *
2345    * It's important to set the sp value to just beyond the end
2346    * of the stack, so we don't attempt to scavenge any part of the
2347    * dead TSO's stack.
2348    */
2349   tso->what_next = ThreadRelocated;
2350   tso->link = dest;
2351   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2352   tso->why_blocked = NotBlocked;
2353   dest->mut_link = NULL;
2354
2355   IF_PAR_DEBUG(verbose,
2356                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2357                      tso->id, tso, tso->stack_size);
2358                /* If we're debugging, just print out the top of the stack */
2359                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2360                                                 tso->sp+64)));
2361   
2362   IF_DEBUG(sanity,checkTSO(tso));
2363 #if 0
2364   IF_DEBUG(scheduler,printTSO(dest));
2365 #endif
2366
2367   return dest;
2368 }
2369
2370 /* ---------------------------------------------------------------------------
2371    Wake up a queue that was blocked on some resource.
2372    ------------------------------------------------------------------------ */
2373
2374 #if defined(GRAN)
2375 STATIC_INLINE void
2376 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2377 {
2378 }
2379 #elif defined(PAR)
2380 STATIC_INLINE void
2381 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2382 {
2383   /* write RESUME events to log file and
2384      update blocked and fetch time (depending on type of the orig closure) */
2385   if (RtsFlags.ParFlags.ParStats.Full) {
2386     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2387                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2388                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2389     if (EMPTY_RUN_QUEUE())
2390       emitSchedule = rtsTrue;
2391
2392     switch (get_itbl(node)->type) {
2393         case FETCH_ME_BQ:
2394           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2395           break;
2396         case RBH:
2397         case FETCH_ME:
2398         case BLACKHOLE_BQ:
2399           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2400           break;
2401 #ifdef DIST
2402         case MVAR:
2403           break;
2404 #endif    
2405         default:
2406           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2407         }
2408       }
2409 }
2410 #endif
2411
2412 #if defined(GRAN)
2413 static StgBlockingQueueElement *
2414 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2415 {
2416     StgTSO *tso;
2417     PEs node_loc, tso_loc;
2418
2419     node_loc = where_is(node); // should be lifted out of loop
2420     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2421     tso_loc = where_is((StgClosure *)tso);
2422     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2423       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2424       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2425       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2426       // insertThread(tso, node_loc);
2427       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2428                 ResumeThread,
2429                 tso, node, (rtsSpark*)NULL);
2430       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2431       // len_local++;
2432       // len++;
2433     } else { // TSO is remote (actually should be FMBQ)
2434       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2435                                   RtsFlags.GranFlags.Costs.gunblocktime +
2436                                   RtsFlags.GranFlags.Costs.latency;
2437       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2438                 UnblockThread,
2439                 tso, node, (rtsSpark*)NULL);
2440       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2441       // len++;
2442     }
2443     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2444     IF_GRAN_DEBUG(bq,
2445                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2446                           (node_loc==tso_loc ? "Local" : "Global"), 
2447                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2448     tso->block_info.closure = NULL;
2449     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2450                              tso->id, tso));
2451 }
2452 #elif defined(PAR)
2453 static StgBlockingQueueElement *
2454 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2455 {
2456     StgBlockingQueueElement *next;
2457
2458     switch (get_itbl(bqe)->type) {
2459     case TSO:
2460       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2461       /* if it's a TSO just push it onto the run_queue */
2462       next = bqe->link;
2463       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2464       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2465       threadRunnable();
2466       unblockCount(bqe, node);
2467       /* reset blocking status after dumping event */
2468       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2469       break;
2470
2471     case BLOCKED_FETCH:
2472       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2473       next = bqe->link;
2474       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2475       PendingFetches = (StgBlockedFetch *)bqe;
2476       break;
2477
2478 # if defined(DEBUG)
2479       /* can ignore this case in a non-debugging setup; 
2480          see comments on RBHSave closures above */
2481     case CONSTR:
2482       /* check that the closure is an RBHSave closure */
2483       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2484              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2485              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2486       break;
2487
2488     default:
2489       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2490            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2491            (StgClosure *)bqe);
2492 # endif
2493     }
2494   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2495   return next;
2496 }
2497
2498 #else /* !GRAN && !PAR */
2499 static StgTSO *
2500 unblockOneLocked(StgTSO *tso)
2501 {
2502   StgTSO *next;
2503
2504   ASSERT(get_itbl(tso)->type == TSO);
2505   ASSERT(tso->why_blocked != NotBlocked);
2506   tso->why_blocked = NotBlocked;
2507   next = tso->link;
2508   tso->link = END_TSO_QUEUE;
2509   APPEND_TO_RUN_QUEUE(tso);
2510   threadRunnable();
2511   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2512   return next;
2513 }
2514 #endif
2515
2516 #if defined(GRAN) || defined(PAR)
2517 INLINE_ME StgBlockingQueueElement *
2518 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2519 {
2520   ACQUIRE_LOCK(&sched_mutex);
2521   bqe = unblockOneLocked(bqe, node);
2522   RELEASE_LOCK(&sched_mutex);
2523   return bqe;
2524 }
2525 #else
2526 INLINE_ME StgTSO *
2527 unblockOne(StgTSO *tso)
2528 {
2529   ACQUIRE_LOCK(&sched_mutex);
2530   tso = unblockOneLocked(tso);
2531   RELEASE_LOCK(&sched_mutex);
2532   return tso;
2533 }
2534 #endif
2535
2536 #if defined(GRAN)
2537 void 
2538 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2539 {
2540   StgBlockingQueueElement *bqe;
2541   PEs node_loc;
2542   nat len = 0; 
2543
2544   IF_GRAN_DEBUG(bq, 
2545                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2546                       node, CurrentProc, CurrentTime[CurrentProc], 
2547                       CurrentTSO->id, CurrentTSO));
2548
2549   node_loc = where_is(node);
2550
2551   ASSERT(q == END_BQ_QUEUE ||
2552          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2553          get_itbl(q)->type == CONSTR); // closure (type constructor)
2554   ASSERT(is_unique(node));
2555
2556   /* FAKE FETCH: magically copy the node to the tso's proc;
2557      no Fetch necessary because in reality the node should not have been 
2558      moved to the other PE in the first place
2559   */
2560   if (CurrentProc!=node_loc) {
2561     IF_GRAN_DEBUG(bq, 
2562                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2563                         node, node_loc, CurrentProc, CurrentTSO->id, 
2564                         // CurrentTSO, where_is(CurrentTSO),
2565                         node->header.gran.procs));
2566     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2567     IF_GRAN_DEBUG(bq, 
2568                   debugBelch("## new bitmask of node %p is %#x\n",
2569                         node, node->header.gran.procs));
2570     if (RtsFlags.GranFlags.GranSimStats.Global) {
2571       globalGranStats.tot_fake_fetches++;
2572     }
2573   }
2574
2575   bqe = q;
2576   // ToDo: check: ASSERT(CurrentProc==node_loc);
2577   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2578     //next = bqe->link;
2579     /* 
2580        bqe points to the current element in the queue
2581        next points to the next element in the queue
2582     */
2583     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2584     //tso_loc = where_is(tso);
2585     len++;
2586     bqe = unblockOneLocked(bqe, node);
2587   }
2588
2589   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2590      the closure to make room for the anchor of the BQ */
2591   if (bqe!=END_BQ_QUEUE) {
2592     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2593     /*
2594     ASSERT((info_ptr==&RBH_Save_0_info) ||
2595            (info_ptr==&RBH_Save_1_info) ||
2596            (info_ptr==&RBH_Save_2_info));
2597     */
2598     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2599     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2600     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2601
2602     IF_GRAN_DEBUG(bq,
2603                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2604                         node, info_type(node)));
2605   }
2606
2607   /* statistics gathering */
2608   if (RtsFlags.GranFlags.GranSimStats.Global) {
2609     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2610     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2611     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2612     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2613   }
2614   IF_GRAN_DEBUG(bq,
2615                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2616                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2617 }
2618 #elif defined(PAR)
2619 void 
2620 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2621 {
2622   StgBlockingQueueElement *bqe;
2623
2624   ACQUIRE_LOCK(&sched_mutex);
2625
2626   IF_PAR_DEBUG(verbose, 
2627                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2628                      node, mytid));
2629 #ifdef DIST  
2630   //RFP
2631   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2632     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2633     return;
2634   }
2635 #endif
2636   
2637   ASSERT(q == END_BQ_QUEUE ||
2638          get_itbl(q)->type == TSO ||           
2639          get_itbl(q)->type == BLOCKED_FETCH || 
2640          get_itbl(q)->type == CONSTR); 
2641
2642   bqe = q;
2643   while (get_itbl(bqe)->type==TSO || 
2644          get_itbl(bqe)->type==BLOCKED_FETCH) {
2645     bqe = unblockOneLocked(bqe, node);
2646   }
2647   RELEASE_LOCK(&sched_mutex);
2648 }
2649
2650 #else   /* !GRAN && !PAR */
2651
2652 void
2653 awakenBlockedQueueNoLock(StgTSO *tso)
2654 {
2655   while (tso != END_TSO_QUEUE) {
2656     tso = unblockOneLocked(tso);
2657   }
2658 }
2659
2660 void
2661 awakenBlockedQueue(StgTSO *tso)
2662 {
2663   ACQUIRE_LOCK(&sched_mutex);
2664   while (tso != END_TSO_QUEUE) {
2665     tso = unblockOneLocked(tso);
2666   }
2667   RELEASE_LOCK(&sched_mutex);
2668 }
2669 #endif
2670
2671 /* ---------------------------------------------------------------------------
2672    Interrupt execution
2673    - usually called inside a signal handler so it mustn't do anything fancy.   
2674    ------------------------------------------------------------------------ */
2675
2676 void
2677 interruptStgRts(void)
2678 {
2679     interrupted    = 1;
2680     context_switch = 1;
2681 }
2682
2683 /* -----------------------------------------------------------------------------
2684    Unblock a thread
2685
2686    This is for use when we raise an exception in another thread, which
2687    may be blocked.
2688    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2689    -------------------------------------------------------------------------- */
2690
2691 #if defined(GRAN) || defined(PAR)
2692 /*
2693   NB: only the type of the blocking queue is different in GranSim and GUM
2694       the operations on the queue-elements are the same
2695       long live polymorphism!
2696
2697   Locks: sched_mutex is held upon entry and exit.
2698
2699 */
2700 static void
2701 unblockThread(StgTSO *tso)
2702 {
2703   StgBlockingQueueElement *t, **last;
2704
2705   switch (tso->why_blocked) {
2706
2707   case NotBlocked:
2708     return;  /* not blocked */
2709
2710   case BlockedOnSTM:
2711     // Be careful: nothing to do here!  We tell the scheduler that the thread
2712     // is runnable and we leave it to the stack-walking code to abort the 
2713     // transaction while unwinding the stack.  We should perhaps have a debugging
2714     // test to make sure that this really happens and that the 'zombie' transaction
2715     // does not get committed.
2716     goto done;
2717
2718   case BlockedOnMVar:
2719     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2720     {
2721       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2722       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2723
2724       last = (StgBlockingQueueElement **)&mvar->head;
2725       for (t = (StgBlockingQueueElement *)mvar->head; 
2726            t != END_BQ_QUEUE; 
2727            last = &t->link, last_tso = t, t = t->link) {
2728         if (t == (StgBlockingQueueElement *)tso) {
2729           *last = (StgBlockingQueueElement *)tso->link;
2730           if (mvar->tail == tso) {
2731             mvar->tail = (StgTSO *)last_tso;
2732           }
2733           goto done;
2734         }
2735       }
2736       barf("unblockThread (MVAR): TSO not found");
2737     }
2738
2739   case BlockedOnBlackHole:
2740     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2741     {
2742       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2743
2744       last = &bq->blocking_queue;
2745       for (t = bq->blocking_queue; 
2746            t != END_BQ_QUEUE; 
2747            last = &t->link, t = t->link) {
2748         if (t == (StgBlockingQueueElement *)tso) {
2749           *last = (StgBlockingQueueElement *)tso->link;
2750           goto done;
2751         }
2752       }
2753       barf("unblockThread (BLACKHOLE): TSO not found");
2754     }
2755
2756   case BlockedOnException:
2757     {
2758       StgTSO *target  = tso->block_info.tso;
2759
2760       ASSERT(get_itbl(target)->type == TSO);
2761
2762       if (target->what_next == ThreadRelocated) {
2763           target = target->link;
2764           ASSERT(get_itbl(target)->type == TSO);
2765       }
2766
2767       ASSERT(target->blocked_exceptions != NULL);
2768
2769       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2770       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2771            t != END_BQ_QUEUE; 
2772            last = &t->link, t = t->link) {
2773         ASSERT(get_itbl(t)->type == TSO);
2774         if (t == (StgBlockingQueueElement *)tso) {
2775           *last = (StgBlockingQueueElement *)tso->link;
2776           goto done;
2777         }
2778       }
2779       barf("unblockThread (Exception): TSO not found");
2780     }
2781
2782   case BlockedOnRead:
2783   case BlockedOnWrite:
2784 #if defined(mingw32_HOST_OS)
2785   case BlockedOnDoProc:
2786 #endif
2787     {
2788       /* take TSO off blocked_queue */
2789       StgBlockingQueueElement *prev = NULL;
2790       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2791            prev = t, t = t->link) {
2792         if (t == (StgBlockingQueueElement *)tso) {
2793           if (prev == NULL) {
2794             blocked_queue_hd = (StgTSO *)t->link;
2795             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2796               blocked_queue_tl = END_TSO_QUEUE;
2797             }
2798           } else {
2799             prev->link = t->link;
2800             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2801               blocked_queue_tl = (StgTSO *)prev;
2802             }
2803           }
2804           goto done;
2805         }
2806       }
2807       barf("unblockThread (I/O): TSO not found");
2808     }
2809
2810   case BlockedOnDelay:
2811     {
2812       /* take TSO off sleeping_queue */
2813       StgBlockingQueueElement *prev = NULL;
2814       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2815            prev = t, t = t->link) {
2816         if (t == (StgBlockingQueueElement *)tso) {
2817           if (prev == NULL) {
2818             sleeping_queue = (StgTSO *)t->link;
2819           } else {
2820             prev->link = t->link;
2821           }
2822           goto done;
2823         }
2824       }
2825       barf("unblockThread (delay): TSO not found");
2826     }
2827
2828   default:
2829     barf("unblockThread");
2830   }
2831
2832  done:
2833   tso->link = END_TSO_QUEUE;
2834   tso->why_blocked = NotBlocked;
2835   tso->block_info.closure = NULL;
2836   PUSH_ON_RUN_QUEUE(tso);
2837 }
2838 #else
2839 static void
2840 unblockThread(StgTSO *tso)
2841 {
2842   StgTSO *t, **last;
2843   
2844   /* To avoid locking unnecessarily. */
2845   if (tso->why_blocked == NotBlocked) {
2846     return;
2847   }
2848
2849   switch (tso->why_blocked) {
2850
2851   case BlockedOnSTM:
2852     // Be careful: nothing to do here!  We tell the scheduler that the thread
2853     // is runnable and we leave it to the stack-walking code to abort the 
2854     // transaction while unwinding the stack.  We should perhaps have a debugging
2855     // test to make sure that this really happens and that the 'zombie' transaction
2856     // does not get committed.
2857     goto done;
2858
2859   case BlockedOnMVar:
2860     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2861     {
2862       StgTSO *last_tso = END_TSO_QUEUE;
2863       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2864
2865       last = &mvar->head;
2866       for (t = mvar->head; t != END_TSO_QUEUE; 
2867            last = &t->link, last_tso = t, t = t->link) {
2868         if (t == tso) {
2869           *last = tso->link;
2870           if (mvar->tail == tso) {
2871             mvar->tail = last_tso;
2872           }
2873           goto done;
2874         }
2875       }
2876       barf("unblockThread (MVAR): TSO not found");
2877     }
2878
2879   case BlockedOnBlackHole:
2880     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2881     {
2882       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2883
2884       last = &bq->blocking_queue;
2885       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2886            last = &t->link, t = t->link) {
2887         if (t == tso) {
2888           *last = tso->link;
2889           goto done;
2890         }
2891       }
2892       barf("unblockThread (BLACKHOLE): TSO not found");
2893     }
2894
2895   case BlockedOnException:
2896     {
2897       StgTSO *target  = tso->block_info.tso;
2898
2899       ASSERT(get_itbl(target)->type == TSO);
2900
2901       while (target->what_next == ThreadRelocated) {
2902           target = target->link;
2903           ASSERT(get_itbl(target)->type == TSO);
2904       }
2905       
2906       ASSERT(target->blocked_exceptions != NULL);
2907
2908       last = &target->blocked_exceptions;
2909       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2910            last = &t->link, t = t->link) {
2911         ASSERT(get_itbl(t)->type == TSO);
2912         if (t == tso) {
2913           *last = tso->link;
2914           goto done;
2915         }
2916       }
2917       barf("unblockThread (Exception): TSO not found");
2918     }
2919
2920   case BlockedOnRead:
2921   case BlockedOnWrite:
2922 #if defined(mingw32_HOST_OS)
2923   case BlockedOnDoProc:
2924 #endif
2925     {
2926       StgTSO *prev = NULL;
2927       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2928            prev = t, t = t->link) {
2929         if (t == tso) {
2930           if (prev == NULL) {
2931             blocked_queue_hd = t->link;
2932             if (blocked_queue_tl == t) {
2933               blocked_queue_tl = END_TSO_QUEUE;
2934             }
2935           } else {
2936             prev->link = t->link;
2937             if (blocked_queue_tl == t) {
2938               blocked_queue_tl = prev;
2939             }
2940           }
2941           goto done;
2942         }
2943       }
2944       barf("unblockThread (I/O): TSO not found");
2945     }
2946
2947   case BlockedOnDelay:
2948     {
2949       StgTSO *prev = NULL;
2950       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2951            prev = t, t = t->link) {
2952         if (t == tso) {
2953           if (prev == NULL) {
2954             sleeping_queue = t->link;
2955           } else {
2956             prev->link = t->link;
2957           }
2958           goto done;
2959         }
2960       }
2961       barf("unblockThread (delay): TSO not found");
2962     }
2963
2964   default:
2965     barf("unblockThread");
2966   }
2967
2968  done:
2969   tso->link = END_TSO_QUEUE;
2970   tso->why_blocked = NotBlocked;
2971   tso->block_info.closure = NULL;
2972   APPEND_TO_RUN_QUEUE(tso);
2973 }
2974 #endif
2975
2976 /* -----------------------------------------------------------------------------
2977  * raiseAsync()
2978  *
2979  * The following function implements the magic for raising an
2980  * asynchronous exception in an existing thread.
2981  *
2982  * We first remove the thread from any queue on which it might be
2983  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2984  *
2985  * We strip the stack down to the innermost CATCH_FRAME, building
2986  * thunks in the heap for all the active computations, so they can 
2987  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2988  * an application of the handler to the exception, and push it on
2989  * the top of the stack.
2990  * 
2991  * How exactly do we save all the active computations?  We create an
2992  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2993  * AP_STACKs pushes everything from the corresponding update frame
2994  * upwards onto the stack.  (Actually, it pushes everything up to the
2995  * next update frame plus a pointer to the next AP_STACK object.
2996  * Entering the next AP_STACK object pushes more onto the stack until we
2997  * reach the last AP_STACK object - at which point the stack should look
2998  * exactly as it did when we killed the TSO and we can continue
2999  * execution by entering the closure on top of the stack.
3000  *
3001  * We can also kill a thread entirely - this happens if either (a) the 
3002  * exception passed to raiseAsync is NULL, or (b) there's no
3003  * CATCH_FRAME on the stack.  In either case, we strip the entire
3004  * stack and replace the thread with a zombie.
3005  *
3006  * Locks: sched_mutex held upon entry nor exit.
3007  *
3008  * -------------------------------------------------------------------------- */
3009  
3010 void 
3011 deleteThread(StgTSO *tso)
3012 {
3013   if (tso->why_blocked != BlockedOnCCall &&
3014       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3015       raiseAsync(tso,NULL);
3016   }
3017 }
3018
3019 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3020 static void 
3021 deleteThreadImmediately(StgTSO *tso)
3022 { // for forkProcess only:
3023   // delete thread without giving it a chance to catch the KillThread exception
3024
3025   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3026       return;
3027   }
3028
3029   if (tso->why_blocked != BlockedOnCCall &&
3030       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3031     unblockThread(tso);
3032   }
3033
3034   tso->what_next = ThreadKilled;
3035 }
3036 #endif
3037
3038 void
3039 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3040 {
3041   /* When raising async exs from contexts where sched_mutex isn't held;
3042      use raiseAsyncWithLock(). */
3043   ACQUIRE_LOCK(&sched_mutex);
3044   raiseAsync(tso,exception);
3045   RELEASE_LOCK(&sched_mutex);
3046 }
3047
3048 void
3049 raiseAsync(StgTSO *tso, StgClosure *exception)
3050 {
3051     raiseAsync_(tso, exception, rtsFalse);
3052 }
3053
3054 static void
3055 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3056 {
3057     StgRetInfoTable *info;
3058     StgPtr sp;
3059   
3060     // Thread already dead?
3061     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3062         return;
3063     }
3064
3065     IF_DEBUG(scheduler, 
3066              sched_belch("raising exception in thread %ld.", (long)tso->id));
3067     
3068     // Remove it from any blocking queues
3069     unblockThread(tso);
3070
3071     sp = tso->sp;
3072     
3073     // The stack freezing code assumes there's a closure pointer on
3074     // the top of the stack, so we have to arrange that this is the case...
3075     //
3076     if (sp[0] == (W_)&stg_enter_info) {
3077         sp++;
3078     } else {
3079         sp--;
3080         sp[0] = (W_)&stg_dummy_ret_closure;
3081     }
3082
3083     while (1) {
3084         nat i;
3085
3086         // 1. Let the top of the stack be the "current closure"
3087         //
3088         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3089         // CATCH_FRAME.
3090         //
3091         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3092         // current closure applied to the chunk of stack up to (but not
3093         // including) the update frame.  This closure becomes the "current
3094         // closure".  Go back to step 2.
3095         //
3096         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3097         // top of the stack applied to the exception.
3098         // 
3099         // 5. If it's a STOP_FRAME, then kill the thread.
3100         // 
3101         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3102         // transaction
3103        
3104         
3105         StgPtr frame;
3106         
3107         frame = sp + 1;
3108         info = get_ret_itbl((StgClosure *)frame);
3109         
3110         while (info->i.type != UPDATE_FRAME
3111                && (info->i.type != CATCH_FRAME || exception == NULL)
3112                && info->i.type != STOP_FRAME
3113                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3114         {
3115             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3116               // IF we find an ATOMICALLY_FRAME then we abort the
3117               // current transaction and propagate the exception.  In
3118               // this case (unlike ordinary exceptions) we do not care
3119               // whether the transaction is valid or not because its
3120               // possible validity cannot have caused the exception
3121               // and will not be visible after the abort.
3122               IF_DEBUG(stm,
3123                        debugBelch("Found atomically block delivering async exception\n"));
3124               stmAbortTransaction(tso -> trec);
3125               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3126             }
3127             frame += stack_frame_sizeW((StgClosure *)frame);
3128             info = get_ret_itbl((StgClosure *)frame);
3129         }
3130         
3131         switch (info->i.type) {
3132             
3133         case ATOMICALLY_FRAME:
3134             ASSERT(stop_at_atomically);
3135             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3136             stmCondemnTransaction(tso -> trec);
3137 #ifdef REG_R1
3138             tso->sp = frame;
3139 #else
3140             // R1 is not a register: the return convention for IO in
3141             // this case puts the return value on the stack, so we
3142             // need to set up the stack to return to the atomically
3143             // frame properly...
3144             tso->sp = frame - 2;
3145             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3146             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3147 #endif
3148             tso->what_next = ThreadRunGHC;
3149             return;
3150
3151         case CATCH_FRAME:
3152             // If we find a CATCH_FRAME, and we've got an exception to raise,
3153             // then build the THUNK raise(exception), and leave it on
3154             // top of the CATCH_FRAME ready to enter.
3155             //
3156         {
3157 #ifdef PROFILING
3158             StgCatchFrame *cf = (StgCatchFrame *)frame;
3159 #endif
3160             StgClosure *raise;
3161             
3162             // we've got an exception to raise, so let's pass it to the
3163             // handler in this frame.
3164             //
3165             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3166             TICK_ALLOC_SE_THK(1,0);
3167             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3168             raise->payload[0] = exception;
3169             
3170             // throw away the stack from Sp up to the CATCH_FRAME.
3171             //
3172             sp = frame - 1;
3173             
3174             /* Ensure that async excpetions are blocked now, so we don't get
3175              * a surprise exception before we get around to executing the
3176              * handler.
3177              */
3178             if (tso->blocked_exceptions == NULL) {
3179                 tso->blocked_exceptions = END_TSO_QUEUE;
3180             }
3181             
3182             /* Put the newly-built THUNK on top of the stack, ready to execute
3183              * when the thread restarts.
3184              */
3185             sp[0] = (W_)raise;
3186             sp[-1] = (W_)&stg_enter_info;
3187             tso->sp = sp-1;
3188             tso->what_next = ThreadRunGHC;
3189             IF_DEBUG(sanity, checkTSO(tso));
3190             return;
3191         }
3192         
3193         case UPDATE_FRAME:
3194         {
3195             StgAP_STACK * ap;
3196             nat words;
3197             
3198             // First build an AP_STACK consisting of the stack chunk above the
3199             // current update frame, with the top word on the stack as the
3200             // fun field.
3201             //
3202             words = frame - sp - 1;
3203             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3204             
3205             ap->size = words;
3206             ap->fun  = (StgClosure *)sp[0];
3207             sp++;
3208             for(i=0; i < (nat)words; ++i) {
3209                 ap->payload[i] = (StgClosure *)*sp++;
3210             }
3211             
3212             SET_HDR(ap,&stg_AP_STACK_info,
3213                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3214             TICK_ALLOC_UP_THK(words+1,0);
3215             
3216             IF_DEBUG(scheduler,
3217                      debugBelch("sched: Updating ");
3218                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3219                      debugBelch(" with ");
3220                      printObj((StgClosure *)ap);
3221                 );
3222
3223             // Replace the updatee with an indirection - happily
3224             // this will also wake up any threads currently
3225             // waiting on the result.
3226             //
3227             // Warning: if we're in a loop, more than one update frame on
3228             // the stack may point to the same object.  Be careful not to
3229             // overwrite an IND_OLDGEN in this case, because we'll screw
3230             // up the mutable lists.  To be on the safe side, don't
3231             // overwrite any kind of indirection at all.  See also
3232             // threadSqueezeStack in GC.c, where we have to make a similar
3233             // check.
3234             //
3235             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3236                 // revert the black hole
3237                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3238                                (StgClosure *)ap);
3239             }
3240             sp += sizeofW(StgUpdateFrame) - 1;
3241             sp[0] = (W_)ap; // push onto stack
3242             break;
3243         }
3244         
3245         case STOP_FRAME:
3246             // We've stripped the entire stack, the thread is now dead.
3247             sp += sizeofW(StgStopFrame);
3248             tso->what_next = ThreadKilled;
3249             tso->sp = sp;
3250             return;
3251             
3252         default:
3253             barf("raiseAsync");
3254         }
3255     }
3256     barf("raiseAsync");
3257 }
3258
3259 /* -----------------------------------------------------------------------------
3260    raiseExceptionHelper
3261    
3262    This function is called by the raise# primitve, just so that we can
3263    move some of the tricky bits of raising an exception from C-- into
3264    C.  Who knows, it might be a useful re-useable thing here too.
3265    -------------------------------------------------------------------------- */
3266
3267 StgWord
3268 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3269 {
3270     StgClosure *raise_closure = NULL;
3271     StgPtr p, next;
3272     StgRetInfoTable *info;
3273     //
3274     // This closure represents the expression 'raise# E' where E
3275     // is the exception raise.  It is used to overwrite all the
3276     // thunks which are currently under evaluataion.
3277     //
3278
3279     //    
3280     // LDV profiling: stg_raise_info has THUNK as its closure
3281     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3282     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3283     // 1 does not cause any problem unless profiling is performed.
3284     // However, when LDV profiling goes on, we need to linearly scan
3285     // small object pool, where raise_closure is stored, so we should
3286     // use MIN_UPD_SIZE.
3287     //
3288     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3289     //                                 sizeofW(StgClosure)+1);
3290     //
3291
3292     //
3293     // Walk up the stack, looking for the catch frame.  On the way,
3294     // we update any closures pointed to from update frames with the
3295     // raise closure that we just built.
3296     //
3297     p = tso->sp;
3298     while(1) {
3299         info = get_ret_itbl((StgClosure *)p);
3300         next = p + stack_frame_sizeW((StgClosure *)p);
3301         switch (info->i.type) {
3302             
3303         case UPDATE_FRAME:
3304             // Only create raise_closure if we need to.
3305             if (raise_closure == NULL) {
3306                 raise_closure = 
3307                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3308                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3309                 raise_closure->payload[0] = exception;
3310             }
3311             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3312             p = next;
3313             continue;
3314
3315         case ATOMICALLY_FRAME:
3316             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3317             tso->sp = p;
3318             return ATOMICALLY_FRAME;
3319             
3320         case CATCH_FRAME:
3321             tso->sp = p;
3322             return CATCH_FRAME;
3323
3324         case CATCH_STM_FRAME:
3325             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3326             tso->sp = p;
3327             return CATCH_STM_FRAME;
3328             
3329         case STOP_FRAME:
3330             tso->sp = p;
3331             return STOP_FRAME;
3332
3333         case CATCH_RETRY_FRAME:
3334         default:
3335             p = next; 
3336             continue;
3337         }
3338     }
3339 }
3340
3341
3342 /* -----------------------------------------------------------------------------
3343    findRetryFrameHelper
3344
3345    This function is called by the retry# primitive.  It traverses the stack
3346    leaving tso->sp referring to the frame which should handle the retry.  
3347
3348    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3349    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3350
3351    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3352    despite the similar implementation.
3353
3354    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3355    not be created within memory transactions.
3356    -------------------------------------------------------------------------- */
3357
3358 StgWord
3359 findRetryFrameHelper (StgTSO *tso)
3360 {
3361   StgPtr           p, next;
3362   StgRetInfoTable *info;
3363
3364   p = tso -> sp;
3365   while (1) {
3366     info = get_ret_itbl((StgClosure *)p);
3367     next = p + stack_frame_sizeW((StgClosure *)p);
3368     switch (info->i.type) {
3369       
3370     case ATOMICALLY_FRAME:
3371       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3372       tso->sp = p;
3373       return ATOMICALLY_FRAME;
3374       
3375     case CATCH_RETRY_FRAME:
3376       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3377       tso->sp = p;
3378       return CATCH_RETRY_FRAME;
3379       
3380     case CATCH_STM_FRAME:
3381     default:
3382       ASSERT(info->i.type != CATCH_FRAME);
3383       ASSERT(info->i.type != STOP_FRAME);
3384       p = next; 
3385       continue;
3386     }
3387   }
3388 }
3389
3390 /* -----------------------------------------------------------------------------
3391    resurrectThreads is called after garbage collection on the list of
3392    threads found to be garbage.  Each of these threads will be woken
3393    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3394    on an MVar, or NonTermination if the thread was blocked on a Black
3395    Hole.
3396
3397    Locks: sched_mutex isn't held upon entry nor exit.
3398    -------------------------------------------------------------------------- */
3399
3400 void
3401 resurrectThreads( StgTSO *threads )
3402 {
3403   StgTSO *tso, *next;
3404
3405   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3406     next = tso->global_link;
3407     tso->global_link = all_threads;
3408     all_threads = tso;
3409     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3410
3411     switch (tso->why_blocked) {
3412     case BlockedOnMVar:
3413     case BlockedOnException:
3414       /* Called by GC - sched_mutex lock is currently held. */
3415       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3416       break;
3417     case BlockedOnBlackHole:
3418       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3419       break;
3420     case BlockedOnSTM:
3421       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3422       break;
3423     case NotBlocked:
3424       /* This might happen if the thread was blocked on a black hole
3425        * belonging to a thread that we've just woken up (raiseAsync
3426        * can wake up threads, remember...).
3427        */
3428       continue;
3429     default:
3430       barf("resurrectThreads: thread blocked in a strange way");
3431     }
3432   }
3433 }
3434
3435 /* ----------------------------------------------------------------------------
3436  * Debugging: why is a thread blocked
3437  * [Also provides useful information when debugging threaded programs
3438  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3439    ------------------------------------------------------------------------- */
3440
3441 static
3442 void
3443 printThreadBlockage(StgTSO *tso)
3444 {
3445   switch (tso->why_blocked) {
3446   case BlockedOnRead:
3447     debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3448     break;
3449   case BlockedOnWrite:
3450     debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3451     break;
3452 #if defined(mingw32_HOST_OS)
3453     case BlockedOnDoProc:
3454     debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3455     break;
3456 #endif
3457   case BlockedOnDelay:
3458     debugBelch("is blocked until %d", tso->block_info.target);
3459     break;
3460   case BlockedOnMVar:
3461     debugBelch("is blocked on an MVar");
3462     break;
3463   case BlockedOnException:
3464     debugBelch("is blocked on delivering an exception to thread %d",
3465             tso->block_info.tso->id);
3466     break;
3467   case BlockedOnBlackHole:
3468     debugBelch("is blocked on a black hole");
3469     break;
3470   case NotBlocked:
3471     debugBelch("is not blocked");
3472     break;
3473 #if defined(PAR)
3474   case BlockedOnGA:
3475     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3476             tso->block_info.closure, info_type(tso->block_info.closure));
3477     break;
3478   case BlockedOnGA_NoSend:
3479     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3480             tso->block_info.closure, info_type(tso->block_info.closure));
3481     break;
3482 #endif
3483   case BlockedOnCCall:
3484     debugBelch("is blocked on an external call");
3485     break;
3486   case BlockedOnCCall_NoUnblockExc:
3487     debugBelch("is blocked on an external call (exceptions were already blocked)");
3488     break;
3489   case BlockedOnSTM:
3490     debugBelch("is blocked on an STM operation");
3491     break;
3492   default:
3493     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3494          tso->why_blocked, tso->id, tso);
3495   }
3496 }
3497
3498 static
3499 void
3500 printThreadStatus(StgTSO *tso)
3501 {
3502   switch (tso->what_next) {
3503   case ThreadKilled:
3504     debugBelch("has been killed");
3505     break;
3506   case ThreadComplete:
3507     debugBelch("has completed");
3508     break;
3509   default:
3510     printThreadBlockage(tso);
3511   }
3512 }
3513
3514 void
3515 printAllThreads(void)
3516 {
3517   StgTSO *t;
3518
3519 # if defined(GRAN)
3520   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3521   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3522                        time_string, rtsFalse/*no commas!*/);
3523
3524   debugBelch("all threads at [%s]:\n", time_string);
3525 # elif defined(PAR)
3526   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3527   ullong_format_string(CURRENT_TIME,
3528                        time_string, rtsFalse/*no commas!*/);
3529
3530   debugBelch("all threads at [%s]:\n", time_string);
3531 # else
3532   debugBelch("all threads:\n");
3533 # endif
3534
3535   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3536     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3537 #if defined(DEBUG)
3538     {
3539       void *label = lookupThreadLabel(t->id);
3540       if (label) debugBelch("[\"%s\"] ",(char *)label);
3541     }
3542 #endif
3543     printThreadStatus(t);
3544     debugBelch("\n");
3545   }
3546 }
3547     
3548 #ifdef DEBUG
3549
3550 /* 
3551    Print a whole blocking queue attached to node (debugging only).
3552 */
3553 # if defined(PAR)
3554 void 
3555 print_bq (StgClosure *node)
3556 {
3557   StgBlockingQueueElement *bqe;
3558   StgTSO *tso;
3559   rtsBool end;
3560
3561   debugBelch("## BQ of closure %p (%s): ",
3562           node, info_type(node));
3563
3564   /* should cover all closures that may have a blocking queue */
3565   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3566          get_itbl(node)->type == FETCH_ME_BQ ||
3567          get_itbl(node)->type == RBH ||
3568          get_itbl(node)->type == MVAR);
3569     
3570   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3571
3572   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3573 }
3574
3575 /* 
3576    Print a whole blocking queue starting with the element bqe.
3577 */
3578 void 
3579 print_bqe (StgBlockingQueueElement *bqe)
3580 {
3581   rtsBool end;
3582
3583   /* 
3584      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3585   */
3586   for (end = (bqe==END_BQ_QUEUE);
3587        !end; // iterate until bqe points to a CONSTR
3588        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3589        bqe = end ? END_BQ_QUEUE : bqe->link) {
3590     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3591     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3592     /* types of closures that may appear in a blocking queue */
3593     ASSERT(get_itbl(bqe)->type == TSO ||           
3594            get_itbl(bqe)->type == BLOCKED_FETCH || 
3595            get_itbl(bqe)->type == CONSTR); 
3596     /* only BQs of an RBH end with an RBH_Save closure */
3597     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3598
3599     switch (get_itbl(bqe)->type) {
3600     case TSO:
3601       debugBelch(" TSO %u (%x),",
3602               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3603       break;
3604     case BLOCKED_FETCH:
3605       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3606               ((StgBlockedFetch *)bqe)->node, 
3607               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3608               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3609               ((StgBlockedFetch *)bqe)->ga.weight);
3610       break;
3611     case CONSTR:
3612       debugBelch(" %s (IP %p),",
3613               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3614                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3615                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3616                "RBH_Save_?"), get_itbl(bqe));
3617       break;
3618     default:
3619       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3620            info_type((StgClosure *)bqe)); // , node, info_type(node));
3621       break;
3622     }
3623   } /* for */
3624   debugBelch("\n");
3625 }
3626 # elif defined(GRAN)
3627 void 
3628 print_bq (StgClosure *node)
3629 {
3630   StgBlockingQueueElement *bqe;
3631   PEs node_loc, tso_loc;
3632   rtsBool end;
3633
3634   /* should cover all closures that may have a blocking queue */
3635   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3636          get_itbl(node)->type == FETCH_ME_BQ ||
3637          get_itbl(node)->type == RBH);
3638     
3639   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3640   node_loc = where_is(node);
3641
3642   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3643           node, info_type(node), node_loc);
3644
3645   /* 
3646      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3647   */
3648   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3649        !end; // iterate until bqe points to a CONSTR
3650        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3651     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3652     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3653     /* types of closures that may appear in a blocking queue */
3654     ASSERT(get_itbl(bqe)->type == TSO ||           
3655            get_itbl(bqe)->type == CONSTR); 
3656     /* only BQs of an RBH end with an RBH_Save closure */
3657     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3658
3659     tso_loc = where_is((StgClosure *)bqe);
3660     switch (get_itbl(bqe)->type) {
3661     case TSO:
3662       debugBelch(" TSO %d (%p) on [PE %d],",
3663               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3664       break;
3665     case CONSTR:
3666       debugBelch(" %s (IP %p),",
3667               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3668                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3669                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3670                "RBH_Save_?"), get_itbl(bqe));
3671       break;
3672     default:
3673       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3674            info_type((StgClosure *)bqe), node, info_type(node));
3675       break;
3676     }
3677   } /* for */
3678   debugBelch("\n");
3679 }
3680 #else
3681 /* 
3682    Nice and easy: only TSOs on the blocking queue
3683 */
3684 void 
3685 print_bq (StgClosure *node)
3686 {
3687   StgTSO *tso;
3688
3689   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3690   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3691        tso != END_TSO_QUEUE; 
3692        tso=tso->link) {
3693     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3694     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3695     debugBelch(" TSO %d (%p),", tso->id, tso);
3696   }
3697   debugBelch("\n");
3698 }
3699 # endif
3700
3701 #if defined(PAR)
3702 static nat
3703 run_queue_len(void)
3704 {
3705   nat i;
3706   StgTSO *tso;
3707
3708   for (i=0, tso=run_queue_hd; 
3709        tso != END_TSO_QUEUE;
3710        i++, tso=tso->link)
3711     /* nothing */
3712
3713   return i;
3714 }
3715 #endif
3716
3717 void
3718 sched_belch(char *s, ...)
3719 {
3720   va_list ap;
3721   va_start(ap,s);
3722 #ifdef RTS_SUPPORTS_THREADS
3723   debugBelch("sched (task %p): ", osThreadId());
3724 #elif defined(PAR)
3725   debugBelch("== ");
3726 #else
3727   debugBelch("sched: ");
3728 #endif
3729   vdebugBelch(s, ap);
3730   debugBelch("\n");
3731   va_end(ap);
3732 }
3733
3734 #endif /* DEBUG */