[project @ 2004-11-18 09:56:07 by tharris]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "Hooks.h"
48 #define COMPILING_SCHEDULER
49 #include "Schedule.h"
50 #include "StgMiscClosures.h"
51 #include "Storage.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PAR)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include "OSThreads.h"
80 #include  "Task.h"
81
82 #ifdef HAVE_SYS_TYPES_H
83 #include <sys/types.h>
84 #endif
85 #ifdef HAVE_UNISTD_H
86 #include <unistd.h>
87 #endif
88
89 #include <string.h>
90 #include <stdlib.h>
91 #include <stdarg.h>
92
93 #ifdef HAVE_ERRNO_H
94 #include <errno.h>
95 #endif
96
97 #ifdef THREADED_RTS
98 #define USED_IN_THREADED_RTS
99 #else
100 #define USED_IN_THREADED_RTS STG_UNUSED
101 #endif
102
103 #ifdef RTS_SUPPORTS_THREADS
104 #define USED_WHEN_RTS_SUPPORTS_THREADS
105 #else
106 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
107 #endif
108
109 /* Main thread queue.
110  * Locks required: sched_mutex.
111  */
112 StgMainThread *main_threads = NULL;
113
114 /* Thread queues.
115  * Locks required: sched_mutex.
116  */
117 #if defined(GRAN)
118
119 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
120 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
121
122 /* 
123    In GranSim we have a runnable and a blocked queue for each processor.
124    In order to minimise code changes new arrays run_queue_hds/tls
125    are created. run_queue_hd is then a short cut (macro) for
126    run_queue_hds[CurrentProc] (see GranSim.h).
127    -- HWL
128 */
129 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
130 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
131 StgTSO *ccalling_threadss[MAX_PROC];
132 /* We use the same global list of threads (all_threads) in GranSim as in
133    the std RTS (i.e. we are cheating). However, we don't use this list in
134    the GranSim specific code at the moment (so we are only potentially
135    cheating).  */
136
137 #else /* !GRAN */
138
139 StgTSO *run_queue_hd = NULL;
140 StgTSO *run_queue_tl = NULL;
141 StgTSO *blocked_queue_hd = NULL;
142 StgTSO *blocked_queue_tl = NULL;
143 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
144
145 #endif
146
147 /* Linked list of all threads.
148  * Used for detecting garbage collected threads.
149  */
150 StgTSO *all_threads = NULL;
151
152 /* When a thread performs a safe C call (_ccall_GC, using old
153  * terminology), it gets put on the suspended_ccalling_threads
154  * list. Used by the garbage collector.
155  */
156 static StgTSO *suspended_ccalling_threads;
157
158 static StgTSO *threadStackOverflow(StgTSO *tso);
159
160 /* KH: The following two flags are shared memory locations.  There is no need
161        to lock them, since they are only unset at the end of a scheduler
162        operation.
163 */
164
165 /* flag set by signal handler to precipitate a context switch */
166 int context_switch = 0;
167
168 /* if this flag is set as well, give up execution */
169 rtsBool interrupted = rtsFalse;
170
171 /* Next thread ID to allocate.
172  * Locks required: thread_id_mutex
173  */
174 static StgThreadID next_thread_id = 1;
175
176 /*
177  * Pointers to the state of the current thread.
178  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
179  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
180  */
181  
182 /* The smallest stack size that makes any sense is:
183  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
184  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
185  *  + 1                       (the closure to enter)
186  *  + 1                       (stg_ap_v_ret)
187  *  + 1                       (spare slot req'd by stg_ap_v_ret)
188  *
189  * A thread with this stack will bomb immediately with a stack
190  * overflow, which will increase its stack size.  
191  */
192
193 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
194
195
196 #if defined(GRAN)
197 StgTSO *CurrentTSO;
198 #endif
199
200 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
201  *  exists - earlier gccs apparently didn't.
202  *  -= chak
203  */
204 StgTSO dummy_tso;
205
206 static rtsBool ready_to_gc;
207
208 /*
209  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
210  * in an MT setting, needed to signal that a worker thread shouldn't hang around
211  * in the scheduler when it is out of work.
212  */
213 static rtsBool shutting_down_scheduler = rtsFalse;
214
215 void            addToBlockedQueue ( StgTSO *tso );
216
217 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
218        void     interruptStgRts   ( void );
219
220 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
221 static void     detectBlackHoles  ( void );
222 #endif
223
224 #if defined(RTS_SUPPORTS_THREADS)
225 /* ToDo: carefully document the invariants that go together
226  *       with these synchronisation objects.
227  */
228 Mutex     sched_mutex       = INIT_MUTEX_VAR;
229 Mutex     term_mutex        = INIT_MUTEX_VAR;
230
231 #endif /* RTS_SUPPORTS_THREADS */
232
233 #if defined(PAR)
234 StgTSO *LastTSO;
235 rtsTime TimeOfLastYield;
236 rtsBool emitSchedule = rtsTrue;
237 #endif
238
239 #if DEBUG
240 static char *whatNext_strs[] = {
241   "(unknown)",
242   "ThreadRunGHC",
243   "ThreadInterpret",
244   "ThreadKilled",
245   "ThreadRelocated",
246   "ThreadComplete"
247 };
248 #endif
249
250 #if defined(PAR)
251 StgTSO * createSparkThread(rtsSpark spark);
252 StgTSO * activateSpark (rtsSpark spark);  
253 #endif
254
255 /* ----------------------------------------------------------------------------
256  * Starting Tasks
257  * ------------------------------------------------------------------------- */
258
259 #if defined(RTS_SUPPORTS_THREADS)
260 static rtsBool startingWorkerThread = rtsFalse;
261
262 static void taskStart(void);
263 static void
264 taskStart(void)
265 {
266   ACQUIRE_LOCK(&sched_mutex);
267   startingWorkerThread = rtsFalse;
268   schedule(NULL,NULL);
269   RELEASE_LOCK(&sched_mutex);
270 }
271
272 void
273 startSchedulerTaskIfNecessary(void)
274 {
275   if(run_queue_hd != END_TSO_QUEUE
276     || blocked_queue_hd != END_TSO_QUEUE
277     || sleeping_queue != END_TSO_QUEUE)
278   {
279     if(!startingWorkerThread)
280     { // we don't want to start another worker thread
281       // just because the last one hasn't yet reached the
282       // "waiting for capability" state
283       startingWorkerThread = rtsTrue;
284       if(!startTask(taskStart))
285       {
286         startingWorkerThread = rtsFalse;
287       }
288     }
289   }
290 }
291 #endif
292
293 /* ---------------------------------------------------------------------------
294    Main scheduling loop.
295
296    We use round-robin scheduling, each thread returning to the
297    scheduler loop when one of these conditions is detected:
298
299       * out of heap space
300       * timer expires (thread yields)
301       * thread blocks
302       * thread ends
303       * stack overflow
304
305    Locking notes:  we acquire the scheduler lock once at the beginning
306    of the scheduler loop, and release it when
307     
308       * running a thread, or
309       * waiting for work, or
310       * waiting for a GC to complete.
311
312    GRAN version:
313      In a GranSim setup this loop iterates over the global event queue.
314      This revolves around the global event queue, which determines what 
315      to do next. Therefore, it's more complicated than either the 
316      concurrent or the parallel (GUM) setup.
317
318    GUM version:
319      GUM iterates over incoming messages.
320      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
321      and sends out a fish whenever it has nothing to do; in-between
322      doing the actual reductions (shared code below) it processes the
323      incoming messages and deals with delayed operations 
324      (see PendingFetches).
325      This is not the ugliest code you could imagine, but it's bloody close.
326
327    ------------------------------------------------------------------------ */
328 static void
329 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
330           Capability *initialCapability )
331 {
332   StgTSO *t;
333   Capability *cap;
334   StgThreadReturnCode ret;
335 #if defined(GRAN)
336   rtsEvent *event;
337 #elif defined(PAR)
338   StgSparkPool *pool;
339   rtsSpark spark;
340   StgTSO *tso;
341   GlobalTaskId pe;
342   rtsBool receivedFinish = rtsFalse;
343 # if defined(DEBUG)
344   nat tp_size, sp_size; // stats only
345 # endif
346 #endif
347   rtsBool was_interrupted = rtsFalse;
348   nat prev_what_next;
349   
350   // Pre-condition: sched_mutex is held.
351   // We might have a capability, passed in as initialCapability.
352   cap = initialCapability;
353
354 #if defined(RTS_SUPPORTS_THREADS)
355   //
356   // in the threaded case, the capability is either passed in via the
357   // initialCapability parameter, or initialized inside the scheduler
358   // loop 
359   //
360   IF_DEBUG(scheduler,
361            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
362                        mainThread, initialCapability);
363       );
364 #else
365   // simply initialise it in the non-threaded case
366   grabCapability(&cap);
367 #endif
368
369 #if defined(GRAN)
370   /* set up first event to get things going */
371   /* ToDo: assign costs for system setup and init MainTSO ! */
372   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
373             ContinueThread, 
374             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
375
376   IF_DEBUG(gran,
377            debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
378            G_TSO(CurrentTSO, 5));
379
380   if (RtsFlags.GranFlags.Light) {
381     /* Save current time; GranSim Light only */
382     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
383   }      
384
385   event = get_next_event();
386
387   while (event!=(rtsEvent*)NULL) {
388     /* Choose the processor with the next event */
389     CurrentProc = event->proc;
390     CurrentTSO = event->tso;
391
392 #elif defined(PAR)
393
394   while (!receivedFinish) {    /* set by processMessages */
395                                /* when receiving PP_FINISH message         */ 
396
397 #else // everything except GRAN and PAR
398
399   while (1) {
400
401 #endif
402
403      IF_DEBUG(scheduler, printAllThreads());
404
405 #if defined(RTS_SUPPORTS_THREADS)
406       // Yield the capability to higher-priority tasks if necessary.
407       //
408       if (cap != NULL) {
409           yieldCapability(&cap);
410       }
411
412       // If we do not currently hold a capability, we wait for one
413       //
414       if (cap == NULL) {
415           waitForCapability(&sched_mutex, &cap,
416                             mainThread ? &mainThread->bound_thread_cond : NULL);
417       }
418
419       // We now have a capability...
420 #endif
421
422     //
423     // If we're interrupted (the user pressed ^C, or some other
424     // termination condition occurred), kill all the currently running
425     // threads.
426     //
427     if (interrupted) {
428         IF_DEBUG(scheduler, sched_belch("interrupted"));
429         interrupted = rtsFalse;
430         was_interrupted = rtsTrue;
431 #if defined(RTS_SUPPORTS_THREADS)
432         // In the threaded RTS, deadlock detection doesn't work,
433         // so just exit right away.
434         errorBelch("interrupted");
435         releaseCapability(cap);
436         RELEASE_LOCK(&sched_mutex);
437         shutdownHaskellAndExit(EXIT_SUCCESS);
438 #else
439         deleteAllThreads();
440 #endif
441     }
442
443 #if defined(RTS_USER_SIGNALS)
444     // check for signals each time around the scheduler
445     if (signals_pending()) {
446       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
447       startSignalHandlers();
448       ACQUIRE_LOCK(&sched_mutex);
449     }
450 #endif
451
452     //
453     // Check whether any waiting threads need to be woken up.  If the
454     // run queue is empty, and there are no other tasks running, we
455     // can wait indefinitely for something to happen.
456     //
457     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
458     {
459 #if defined(RTS_SUPPORTS_THREADS)
460         // We shouldn't be here...
461         barf("schedule: awaitEvent() in threaded RTS");
462 #endif
463         awaitEvent( EMPTY_RUN_QUEUE() );
464     }
465     // we can be interrupted while waiting for I/O...
466     if (interrupted) continue;
467
468     /* 
469      * Detect deadlock: when we have no threads to run, there are no
470      * threads waiting on I/O or sleeping, and all the other tasks are
471      * waiting for work, we must have a deadlock of some description.
472      *
473      * We first try to find threads blocked on themselves (ie. black
474      * holes), and generate NonTermination exceptions where necessary.
475      *
476      * If no threads are black holed, we have a deadlock situation, so
477      * inform all the main threads.
478      */
479 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
480     if (   EMPTY_THREAD_QUEUES() )
481     {
482         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
483
484         // Garbage collection can release some new threads due to
485         // either (a) finalizers or (b) threads resurrected because
486         // they are unreachable and will therefore be sent an
487         // exception.  Any threads thus released will be immediately
488         // runnable.
489         GarbageCollect(GetRoots,rtsTrue);
490         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
491
492 #if defined(RTS_USER_SIGNALS)
493         /* If we have user-installed signal handlers, then wait
494          * for signals to arrive rather then bombing out with a
495          * deadlock.
496          */
497         if ( anyUserHandlers() ) {
498             IF_DEBUG(scheduler, 
499                      sched_belch("still deadlocked, waiting for signals..."));
500
501             awaitUserSignals();
502
503             // we might be interrupted...
504             if (interrupted) { continue; }
505
506             if (signals_pending()) {
507                 RELEASE_LOCK(&sched_mutex);
508                 startSignalHandlers();
509                 ACQUIRE_LOCK(&sched_mutex);
510             }
511             ASSERT(!EMPTY_RUN_QUEUE());
512             goto not_deadlocked;
513         }
514 #endif
515
516         /* Probably a real deadlock.  Send the current main thread the
517          * Deadlock exception (or in the SMP build, send *all* main
518          * threads the deadlock exception, since none of them can make
519          * progress).
520          */
521         {
522             StgMainThread *m;
523             m = main_threads;
524             switch (m->tso->why_blocked) {
525             case BlockedOnBlackHole:
526             case BlockedOnException:
527             case BlockedOnMVar:
528                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
529                 break;
530             default:
531                 barf("deadlock: main thread blocked in a strange way");
532             }
533         }
534     }
535   not_deadlocked:
536
537 #elif defined(RTS_SUPPORTS_THREADS)
538     // ToDo: add deadlock detection in threaded RTS
539 #elif defined(PAR)
540     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
541 #endif
542
543 #if defined(RTS_SUPPORTS_THREADS)
544     if ( EMPTY_RUN_QUEUE() ) {
545         continue; // nothing to do
546     }
547 #endif
548
549 #if defined(GRAN)
550     if (RtsFlags.GranFlags.Light)
551       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
552
553     /* adjust time based on time-stamp */
554     if (event->time > CurrentTime[CurrentProc] &&
555         event->evttype != ContinueThread)
556       CurrentTime[CurrentProc] = event->time;
557     
558     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
559     if (!RtsFlags.GranFlags.Light)
560       handleIdlePEs();
561
562     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
563
564     /* main event dispatcher in GranSim */
565     switch (event->evttype) {
566       /* Should just be continuing execution */
567     case ContinueThread:
568       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
569       /* ToDo: check assertion
570       ASSERT(run_queue_hd != (StgTSO*)NULL &&
571              run_queue_hd != END_TSO_QUEUE);
572       */
573       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
574       if (!RtsFlags.GranFlags.DoAsyncFetch &&
575           procStatus[CurrentProc]==Fetching) {
576         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
577               CurrentTSO->id, CurrentTSO, CurrentProc);
578         goto next_thread;
579       } 
580       /* Ignore ContinueThreads for completed threads */
581       if (CurrentTSO->what_next == ThreadComplete) {
582         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
583               CurrentTSO->id, CurrentTSO, CurrentProc);
584         goto next_thread;
585       } 
586       /* Ignore ContinueThreads for threads that are being migrated */
587       if (PROCS(CurrentTSO)==Nowhere) { 
588         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
589               CurrentTSO->id, CurrentTSO, CurrentProc);
590         goto next_thread;
591       }
592       /* The thread should be at the beginning of the run queue */
593       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
594         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
595               CurrentTSO->id, CurrentTSO, CurrentProc);
596         break; // run the thread anyway
597       }
598       /*
599       new_event(proc, proc, CurrentTime[proc],
600                 FindWork,
601                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
602       goto next_thread; 
603       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
604       break; // now actually run the thread; DaH Qu'vam yImuHbej 
605
606     case FetchNode:
607       do_the_fetchnode(event);
608       goto next_thread;             /* handle next event in event queue  */
609       
610     case GlobalBlock:
611       do_the_globalblock(event);
612       goto next_thread;             /* handle next event in event queue  */
613       
614     case FetchReply:
615       do_the_fetchreply(event);
616       goto next_thread;             /* handle next event in event queue  */
617       
618     case UnblockThread:   /* Move from the blocked queue to the tail of */
619       do_the_unblock(event);
620       goto next_thread;             /* handle next event in event queue  */
621       
622     case ResumeThread:  /* Move from the blocked queue to the tail of */
623       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
624       event->tso->gran.blocktime += 
625         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
626       do_the_startthread(event);
627       goto next_thread;             /* handle next event in event queue  */
628       
629     case StartThread:
630       do_the_startthread(event);
631       goto next_thread;             /* handle next event in event queue  */
632       
633     case MoveThread:
634       do_the_movethread(event);
635       goto next_thread;             /* handle next event in event queue  */
636       
637     case MoveSpark:
638       do_the_movespark(event);
639       goto next_thread;             /* handle next event in event queue  */
640       
641     case FindWork:
642       do_the_findwork(event);
643       goto next_thread;             /* handle next event in event queue  */
644       
645     default:
646       barf("Illegal event type %u\n", event->evttype);
647     }  /* switch */
648     
649     /* This point was scheduler_loop in the old RTS */
650
651     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
652
653     TimeOfLastEvent = CurrentTime[CurrentProc];
654     TimeOfNextEvent = get_time_of_next_event();
655     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
656     // CurrentTSO = ThreadQueueHd;
657
658     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
659                          TimeOfNextEvent));
660
661     if (RtsFlags.GranFlags.Light) 
662       GranSimLight_leave_system(event, &ActiveTSO); 
663
664     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
665
666     IF_DEBUG(gran, 
667              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
668
669     /* in a GranSim setup the TSO stays on the run queue */
670     t = CurrentTSO;
671     /* Take a thread from the run queue. */
672     POP_RUN_QUEUE(t); // take_off_run_queue(t);
673
674     IF_DEBUG(gran, 
675              debugBelch("GRAN: About to run current thread, which is\n");
676              G_TSO(t,5));
677
678     context_switch = 0; // turned on via GranYield, checking events and time slice
679
680     IF_DEBUG(gran, 
681              DumpGranEvent(GR_SCHEDULE, t));
682
683     procStatus[CurrentProc] = Busy;
684
685 #elif defined(PAR)
686     if (PendingFetches != END_BF_QUEUE) {
687         processFetches();
688     }
689
690     /* ToDo: phps merge with spark activation above */
691     /* check whether we have local work and send requests if we have none */
692     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
693       /* :-[  no local threads => look out for local sparks */
694       /* the spark pool for the current PE */
695       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
696       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
697           pool->hd < pool->tl) {
698         /* 
699          * ToDo: add GC code check that we really have enough heap afterwards!!
700          * Old comment:
701          * If we're here (no runnable threads) and we have pending
702          * sparks, we must have a space problem.  Get enough space
703          * to turn one of those pending sparks into a
704          * thread... 
705          */
706
707         spark = findSpark(rtsFalse);                /* get a spark */
708         if (spark != (rtsSpark) NULL) {
709           tso = activateSpark(spark);       /* turn the spark into a thread */
710           IF_PAR_DEBUG(schedule,
711                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
712                              tso->id, tso, advisory_thread_count));
713
714           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
715             debugBelch("==^^ failed to activate spark\n");
716             goto next_thread;
717           }               /* otherwise fall through & pick-up new tso */
718         } else {
719           IF_PAR_DEBUG(verbose,
720                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
721                              spark_queue_len(pool)));
722           goto next_thread;
723         }
724       }
725
726       /* If we still have no work we need to send a FISH to get a spark
727          from another PE 
728       */
729       if (EMPTY_RUN_QUEUE()) {
730       /* =8-[  no local sparks => look for work on other PEs */
731         /*
732          * We really have absolutely no work.  Send out a fish
733          * (there may be some out there already), and wait for
734          * something to arrive.  We clearly can't run any threads
735          * until a SCHEDULE or RESUME arrives, and so that's what
736          * we're hoping to see.  (Of course, we still have to
737          * respond to other types of messages.)
738          */
739         TIME now = msTime() /*CURRENT_TIME*/;
740         IF_PAR_DEBUG(verbose, 
741                      debugBelch("--  now=%ld\n", now));
742         IF_PAR_DEBUG(verbose,
743                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
744                          (last_fish_arrived_at!=0 &&
745                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
746                        debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
747                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
748                              last_fish_arrived_at,
749                              RtsFlags.ParFlags.fishDelay, now);
750                      });
751         
752         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
753             (last_fish_arrived_at==0 ||
754              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
755           /* outstandingFishes is set in sendFish, processFish;
756              avoid flooding system with fishes via delay */
757           pe = choosePE();
758           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
759                    NEW_FISH_HUNGER);
760
761           // Global statistics: count no. of fishes
762           if (RtsFlags.ParFlags.ParStats.Global &&
763               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
764             globalParStats.tot_fish_mess++;
765           }
766         }
767       
768         receivedFinish = processMessages();
769         goto next_thread;
770       }
771     } else if (PacketsWaiting()) {  /* Look for incoming messages */
772       receivedFinish = processMessages();
773     }
774
775     /* Now we are sure that we have some work available */
776     ASSERT(run_queue_hd != END_TSO_QUEUE);
777
778     /* Take a thread from the run queue, if we have work */
779     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
780     IF_DEBUG(sanity,checkTSO(t));
781
782     /* ToDo: write something to the log-file
783     if (RTSflags.ParFlags.granSimStats && !sameThread)
784         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
785
786     CurrentTSO = t;
787     */
788     /* the spark pool for the current PE */
789     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
790
791     IF_DEBUG(scheduler, 
792              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
793                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
794
795 # if 1
796     if (0 && RtsFlags.ParFlags.ParStats.Full && 
797         t && LastTSO && t->id != LastTSO->id && 
798         LastTSO->why_blocked == NotBlocked && 
799         LastTSO->what_next != ThreadComplete) {
800       // if previously scheduled TSO not blocked we have to record the context switch
801       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
802                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
803     }
804
805     if (RtsFlags.ParFlags.ParStats.Full && 
806         (emitSchedule /* forced emit */ ||
807         (t && LastTSO && t->id != LastTSO->id))) {
808       /* 
809          we are running a different TSO, so write a schedule event to log file
810          NB: If we use fair scheduling we also have to write  a deschedule 
811              event for LastTSO; with unfair scheduling we know that the
812              previous tso has blocked whenever we switch to another tso, so
813              we don't need it in GUM for now
814       */
815       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
816                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
817       emitSchedule = rtsFalse;
818     }
819      
820 # endif
821 #else /* !GRAN && !PAR */
822   
823     // grab a thread from the run queue
824     ASSERT(run_queue_hd != END_TSO_QUEUE);
825     POP_RUN_QUEUE(t);
826
827     // Sanity check the thread we're about to run.  This can be
828     // expensive if there is lots of thread switching going on...
829     IF_DEBUG(sanity,checkTSO(t));
830 #endif
831
832 #ifdef THREADED_RTS
833     {
834       StgMainThread *m = t->main;
835       
836       if(m)
837       {
838         if(m == mainThread)
839         {
840           IF_DEBUG(scheduler,
841             sched_belch("### Running thread %d in bound thread", t->id));
842           // yes, the Haskell thread is bound to the current native thread
843         }
844         else
845         {
846           IF_DEBUG(scheduler,
847             sched_belch("### thread %d bound to another OS thread", t->id));
848           // no, bound to a different Haskell thread: pass to that thread
849           PUSH_ON_RUN_QUEUE(t);
850           passCapability(&m->bound_thread_cond);
851           continue;
852         }
853       }
854       else
855       {
856         if(mainThread != NULL)
857         // The thread we want to run is bound.
858         {
859           IF_DEBUG(scheduler,
860             sched_belch("### this OS thread cannot run thread %d", t->id));
861           // no, the current native thread is bound to a different
862           // Haskell thread, so pass it to any worker thread
863           PUSH_ON_RUN_QUEUE(t);
864           passCapabilityToWorker();
865           continue; 
866         }
867       }
868     }
869 #endif
870
871     cap->r.rCurrentTSO = t;
872     
873     /* context switches are now initiated by the timer signal, unless
874      * the user specified "context switch as often as possible", with
875      * +RTS -C0
876      */
877     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
878          && (run_queue_hd != END_TSO_QUEUE
879              || blocked_queue_hd != END_TSO_QUEUE
880              || sleeping_queue != END_TSO_QUEUE)))
881         context_switch = 1;
882
883 run_thread:
884
885     RELEASE_LOCK(&sched_mutex);
886
887     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
888                               (long)t->id, whatNext_strs[t->what_next]));
889
890 #ifdef PROFILING
891     startHeapProfTimer();
892 #endif
893
894     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
895     /* Run the current thread 
896      */
897     prev_what_next = t->what_next;
898
899     errno = t->saved_errno;
900
901     switch (prev_what_next) {
902
903     case ThreadKilled:
904     case ThreadComplete:
905         /* Thread already finished, return to scheduler. */
906         ret = ThreadFinished;
907         break;
908
909     case ThreadRunGHC:
910         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
911         break;
912
913     case ThreadInterpret:
914         ret = interpretBCO(cap);
915         break;
916
917     default:
918       barf("schedule: invalid what_next field");
919     }
920
921     // The TSO might have moved, so find the new location:
922     t = cap->r.rCurrentTSO;
923
924     // And save the current errno in this thread.
925     t->saved_errno = errno;
926
927     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
928     
929     /* Costs for the scheduler are assigned to CCS_SYSTEM */
930 #ifdef PROFILING
931     stopHeapProfTimer();
932     CCCS = CCS_SYSTEM;
933 #endif
934     
935     ACQUIRE_LOCK(&sched_mutex);
936     
937 #ifdef RTS_SUPPORTS_THREADS
938     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
939 #elif !defined(GRAN) && !defined(PAR)
940     IF_DEBUG(scheduler,debugBelch("sched: "););
941 #endif
942     
943 #if defined(PAR)
944     /* HACK 675: if the last thread didn't yield, make sure to print a 
945        SCHEDULE event to the log file when StgRunning the next thread, even
946        if it is the same one as before */
947     LastTSO = t; 
948     TimeOfLastYield = CURRENT_TIME;
949 #endif
950
951     switch (ret) {
952     case HeapOverflow:
953 #if defined(GRAN)
954       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
955       globalGranStats.tot_heapover++;
956 #elif defined(PAR)
957       globalParStats.tot_heapover++;
958 #endif
959
960       // did the task ask for a large block?
961       if (cap->r.rHpAlloc > BLOCK_SIZE) {
962           // if so, get one and push it on the front of the nursery.
963           bdescr *bd;
964           nat blocks;
965           
966           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
967
968           IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
969                                    (long)t->id, whatNext_strs[t->what_next], blocks));
970
971           // don't do this if it would push us over the
972           // alloc_blocks_lim limit; we'll GC first.
973           if (alloc_blocks + blocks < alloc_blocks_lim) {
974
975               alloc_blocks += blocks;
976               bd = allocGroup( blocks );
977
978               // link the new group into the list
979               bd->link = cap->r.rCurrentNursery;
980               bd->u.back = cap->r.rCurrentNursery->u.back;
981               if (cap->r.rCurrentNursery->u.back != NULL) {
982                   cap->r.rCurrentNursery->u.back->link = bd;
983               } else {
984                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
985                          g0s0->blocks == cap->r.rNursery);
986                   cap->r.rNursery = g0s0->blocks = bd;
987               }           
988               cap->r.rCurrentNursery->u.back = bd;
989
990               // initialise it as a nursery block.  We initialise the
991               // step, gen_no, and flags field of *every* sub-block in
992               // this large block, because this is easier than making
993               // sure that we always find the block head of a large
994               // block whenever we call Bdescr() (eg. evacuate() and
995               // isAlive() in the GC would both have to do this, at
996               // least).
997               { 
998                   bdescr *x;
999                   for (x = bd; x < bd + blocks; x++) {
1000                       x->step = g0s0;
1001                       x->gen_no = 0;
1002                       x->flags = 0;
1003                   }
1004               }
1005
1006               // don't forget to update the block count in g0s0.
1007               g0s0->n_blocks += blocks;
1008               // This assert can be a killer if the app is doing lots
1009               // of large block allocations.
1010               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1011
1012               // now update the nursery to point to the new block
1013               cap->r.rCurrentNursery = bd;
1014
1015               // we might be unlucky and have another thread get on the
1016               // run queue before us and steal the large block, but in that
1017               // case the thread will just end up requesting another large
1018               // block.
1019               PUSH_ON_RUN_QUEUE(t);
1020               break;
1021           }
1022       }
1023
1024       /* make all the running tasks block on a condition variable,
1025        * maybe set context_switch and wait till they all pile in,
1026        * then have them wait on a GC condition variable.
1027        */
1028       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1029                                (long)t->id, whatNext_strs[t->what_next]));
1030       threadPaused(t);
1031 #if defined(GRAN)
1032       ASSERT(!is_on_queue(t,CurrentProc));
1033 #elif defined(PAR)
1034       /* Currently we emit a DESCHEDULE event before GC in GUM.
1035          ToDo: either add separate event to distinguish SYSTEM time from rest
1036                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1037       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1038         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1039                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1040         emitSchedule = rtsTrue;
1041       }
1042 #endif
1043       
1044       ready_to_gc = rtsTrue;
1045       context_switch = 1;               /* stop other threads ASAP */
1046       PUSH_ON_RUN_QUEUE(t);
1047       /* actual GC is done at the end of the while loop */
1048       break;
1049       
1050     case StackOverflow:
1051 #if defined(GRAN)
1052       IF_DEBUG(gran, 
1053                DumpGranEvent(GR_DESCHEDULE, t));
1054       globalGranStats.tot_stackover++;
1055 #elif defined(PAR)
1056       // IF_DEBUG(par, 
1057       // DumpGranEvent(GR_DESCHEDULE, t);
1058       globalParStats.tot_stackover++;
1059 #endif
1060       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1061                                (long)t->id, whatNext_strs[t->what_next]));
1062       /* just adjust the stack for this thread, then pop it back
1063        * on the run queue.
1064        */
1065       threadPaused(t);
1066       { 
1067         /* enlarge the stack */
1068         StgTSO *new_t = threadStackOverflow(t);
1069         
1070         /* This TSO has moved, so update any pointers to it from the
1071          * main thread stack.  It better not be on any other queues...
1072          * (it shouldn't be).
1073          */
1074         if (t->main != NULL) {
1075             t->main->tso = new_t;
1076         }
1077         PUSH_ON_RUN_QUEUE(new_t);
1078       }
1079       break;
1080
1081     case ThreadYielding:
1082       // Reset the context switch flag.  We don't do this just before
1083       // running the thread, because that would mean we would lose ticks
1084       // during GC, which can lead to unfair scheduling (a thread hogs
1085       // the CPU because the tick always arrives during GC).  This way
1086       // penalises threads that do a lot of allocation, but that seems
1087       // better than the alternative.
1088       context_switch = 0;
1089
1090 #if defined(GRAN)
1091       IF_DEBUG(gran, 
1092                DumpGranEvent(GR_DESCHEDULE, t));
1093       globalGranStats.tot_yields++;
1094 #elif defined(PAR)
1095       // IF_DEBUG(par, 
1096       // DumpGranEvent(GR_DESCHEDULE, t);
1097       globalParStats.tot_yields++;
1098 #endif
1099       /* put the thread back on the run queue.  Then, if we're ready to
1100        * GC, check whether this is the last task to stop.  If so, wake
1101        * up the GC thread.  getThread will block during a GC until the
1102        * GC is finished.
1103        */
1104       IF_DEBUG(scheduler,
1105                if (t->what_next != prev_what_next) {
1106                    debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1107                          (long)t->id, whatNext_strs[t->what_next]);
1108                } else {
1109                    debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1110                          (long)t->id, whatNext_strs[t->what_next]);
1111                }
1112                );
1113
1114       IF_DEBUG(sanity,
1115                //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1116                checkTSO(t));
1117       ASSERT(t->link == END_TSO_QUEUE);
1118
1119       // Shortcut if we're just switching evaluators: don't bother
1120       // doing stack squeezing (which can be expensive), just run the
1121       // thread.
1122       if (t->what_next != prev_what_next) {
1123           goto run_thread;
1124       }
1125
1126       threadPaused(t);
1127
1128 #if defined(GRAN)
1129       ASSERT(!is_on_queue(t,CurrentProc));
1130
1131       IF_DEBUG(sanity,
1132                //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1133                checkThreadQsSanity(rtsTrue));
1134 #endif
1135
1136 #if defined(PAR)
1137       if (RtsFlags.ParFlags.doFairScheduling) { 
1138         /* this does round-robin scheduling; good for concurrency */
1139         APPEND_TO_RUN_QUEUE(t);
1140       } else {
1141         /* this does unfair scheduling; good for parallelism */
1142         PUSH_ON_RUN_QUEUE(t);
1143       }
1144 #else
1145       // this does round-robin scheduling; good for concurrency
1146       APPEND_TO_RUN_QUEUE(t);
1147 #endif
1148
1149 #if defined(GRAN)
1150       /* add a ContinueThread event to actually process the thread */
1151       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1152                 ContinueThread,
1153                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1154       IF_GRAN_DEBUG(bq, 
1155                debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1156                G_EVENTQ(0);
1157                G_CURR_THREADQ(0));
1158 #endif /* GRAN */
1159       break;
1160
1161     case ThreadBlocked:
1162 #if defined(GRAN)
1163       IF_DEBUG(scheduler,
1164                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1165                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1166                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1167
1168       // ??? needed; should emit block before
1169       IF_DEBUG(gran, 
1170                DumpGranEvent(GR_DESCHEDULE, t)); 
1171       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1172       /*
1173         ngoq Dogh!
1174       ASSERT(procStatus[CurrentProc]==Busy || 
1175               ((procStatus[CurrentProc]==Fetching) && 
1176               (t->block_info.closure!=(StgClosure*)NULL)));
1177       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1178           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1179             procStatus[CurrentProc]==Fetching)) 
1180         procStatus[CurrentProc] = Idle;
1181       */
1182 #elif defined(PAR)
1183       IF_DEBUG(scheduler,
1184                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1185                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1186       IF_PAR_DEBUG(bq,
1187
1188                    if (t->block_info.closure!=(StgClosure*)NULL) 
1189                      print_bq(t->block_info.closure));
1190
1191       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1192       blockThread(t);
1193
1194       /* whatever we schedule next, we must log that schedule */
1195       emitSchedule = rtsTrue;
1196
1197 #else /* !GRAN */
1198       /* don't need to do anything.  Either the thread is blocked on
1199        * I/O, in which case we'll have called addToBlockedQueue
1200        * previously, or it's blocked on an MVar or Blackhole, in which
1201        * case it'll be on the relevant queue already.
1202        */
1203       IF_DEBUG(scheduler,
1204                debugBelch("--<< thread %d (%s) stopped: ", 
1205                        t->id, whatNext_strs[t->what_next]);
1206                printThreadBlockage(t);
1207                debugBelch("\n"));
1208
1209       /* Only for dumping event to log file 
1210          ToDo: do I need this in GranSim, too?
1211       blockThread(t);
1212       */
1213 #endif
1214       threadPaused(t);
1215       break;
1216
1217     case ThreadFinished:
1218       /* Need to check whether this was a main thread, and if so, signal
1219        * the task that started it with the return value.  If we have no
1220        * more main threads, we probably need to stop all the tasks until
1221        * we get a new one.
1222        */
1223       /* We also end up here if the thread kills itself with an
1224        * uncaught exception, see Exception.hc.
1225        */
1226       IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1227                                t->id, whatNext_strs[t->what_next]));
1228 #if defined(GRAN)
1229       endThread(t, CurrentProc); // clean-up the thread
1230 #elif defined(PAR)
1231       /* For now all are advisory -- HWL */
1232       //if(t->priority==AdvisoryPriority) ??
1233       advisory_thread_count--;
1234       
1235 # ifdef DIST
1236       if(t->dist.priority==RevalPriority)
1237         FinishReval(t);
1238 # endif
1239       
1240       if (RtsFlags.ParFlags.ParStats.Full &&
1241           !RtsFlags.ParFlags.ParStats.Suppressed) 
1242         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1243 #endif
1244
1245       //
1246       // Check whether the thread that just completed was a main
1247       // thread, and if so return with the result.  
1248       //
1249       // There is an assumption here that all thread completion goes
1250       // through this point; we need to make sure that if a thread
1251       // ends up in the ThreadKilled state, that it stays on the run
1252       // queue so it can be dealt with here.
1253       //
1254       if (
1255 #if defined(RTS_SUPPORTS_THREADS)
1256           mainThread != NULL
1257 #else
1258           mainThread->tso == t
1259 #endif
1260           )
1261       {
1262           // We are a bound thread: this must be our thread that just
1263           // completed.
1264           ASSERT(mainThread->tso == t);
1265
1266           if (t->what_next == ThreadComplete) {
1267               if (mainThread->ret) {
1268                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1269                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1270               }
1271               mainThread->stat = Success;
1272           } else {
1273               if (mainThread->ret) {
1274                   *(mainThread->ret) = NULL;
1275               }
1276               if (was_interrupted) {
1277                   mainThread->stat = Interrupted;
1278               } else {
1279                   mainThread->stat = Killed;
1280               }
1281           }
1282 #ifdef DEBUG
1283           removeThreadLabel((StgWord)mainThread->tso->id);
1284 #endif
1285           if (mainThread->prev == NULL) {
1286               main_threads = mainThread->link;
1287           } else {
1288               mainThread->prev->link = mainThread->link;
1289           }
1290           if (mainThread->link != NULL) {
1291               mainThread->link->prev = NULL;
1292           }
1293           releaseCapability(cap);
1294           return;
1295       }
1296
1297 #ifdef RTS_SUPPORTS_THREADS
1298       ASSERT(t->main == NULL);
1299 #else
1300       if (t->main != NULL) {
1301           // Must be a main thread that is not the topmost one.  Leave
1302           // it on the run queue until the stack has unwound to the
1303           // point where we can deal with this.  Leaving it on the run
1304           // queue also ensures that the garbage collector knows about
1305           // this thread and its return value (it gets dropped from the
1306           // all_threads list so there's no other way to find it).
1307           APPEND_TO_RUN_QUEUE(t);
1308       }
1309 #endif
1310       break;
1311
1312     default:
1313       barf("schedule: invalid thread return code %d", (int)ret);
1314     }
1315
1316 #ifdef PROFILING
1317     // When we have +RTS -i0 and we're heap profiling, do a census at
1318     // every GC.  This lets us get repeatable runs for debugging.
1319     if (performHeapProfile ||
1320         (RtsFlags.ProfFlags.profileInterval==0 &&
1321          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1322         GarbageCollect(GetRoots, rtsTrue);
1323         heapCensus();
1324         performHeapProfile = rtsFalse;
1325         ready_to_gc = rtsFalse; // we already GC'd
1326     }
1327 #endif
1328
1329     if (ready_to_gc) {
1330       /* Kick any transactions which are invalid back to their atomically frames.
1331        * When next scheduled they will try to commit, this commit will fail and
1332        * they will retry. */
1333       for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1334         if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1335           if (!stmValidateTransaction (t -> trec)) {
1336             StgRetInfoTable *info;
1337             StgPtr sp = t -> sp;
1338
1339             IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1340
1341             if (sp[0] == (W_)&stg_enter_info) {
1342               sp++;
1343             } else {
1344               sp--;
1345               sp[0] = (W_)&stg_dummy_ret_closure;
1346             }
1347
1348             // Look up the stack for its atomically frame
1349             StgPtr frame;
1350             frame = sp + 1;
1351             info = get_ret_itbl((StgClosure *)frame);
1352               
1353             while (info->i.type != ATOMICALLY_FRAME &&
1354                    info->i.type != STOP_FRAME &&
1355                    info->i.type != UPDATE_FRAME) {
1356               if (info -> i.type == CATCH_RETRY_FRAME) {
1357                 IF_DEBUG(stm, sched_belch("Aborting transaction in catch-retry frame"));
1358                 stmAbortTransaction(t -> trec);
1359                 t -> trec = stmGetEnclosingTRec(t -> trec);
1360               }
1361               frame += stack_frame_sizeW((StgClosure *)frame);
1362               info = get_ret_itbl((StgClosure *)frame);
1363             }
1364             
1365             if (!info -> i.type == ATOMICALLY_FRAME) {
1366               barf("Could not find ATOMICALLY_FRAME for unvalidatable thread");
1367             }
1368
1369             // Cause the thread to enter its atomically frame again when
1370             // scheduled -- this will attempt stmCommitTransaction or stmReWait
1371             // which will fail triggering re-rexecution.
1372             t->sp = frame;
1373             t->what_next = ThreadRunGHC;
1374           }
1375         }
1376       }
1377
1378       /* everybody back, start the GC.
1379        * Could do it in this thread, or signal a condition var
1380        * to do it in another thread.  Either way, we need to
1381        * broadcast on gc_pending_cond afterward.
1382        */
1383 #if defined(RTS_SUPPORTS_THREADS)
1384       IF_DEBUG(scheduler,sched_belch("doing GC"));
1385 #endif
1386       GarbageCollect(GetRoots,rtsFalse);
1387       ready_to_gc = rtsFalse;
1388 #if defined(GRAN)
1389       /* add a ContinueThread event to continue execution of current thread */
1390       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1391                 ContinueThread,
1392                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1393       IF_GRAN_DEBUG(bq, 
1394                debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1395                G_EVENTQ(0);
1396                G_CURR_THREADQ(0));
1397 #endif /* GRAN */
1398     }
1399
1400 #if defined(GRAN)
1401   next_thread:
1402     IF_GRAN_DEBUG(unused,
1403                   print_eventq(EventHd));
1404
1405     event = get_next_event();
1406 #elif defined(PAR)
1407   next_thread:
1408     /* ToDo: wait for next message to arrive rather than busy wait */
1409 #endif /* GRAN */
1410
1411   } /* end of while(1) */
1412
1413   IF_PAR_DEBUG(verbose,
1414                debugBelch("== Leaving schedule() after having received Finish\n"));
1415 }
1416
1417 /* ---------------------------------------------------------------------------
1418  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1419  * used by Control.Concurrent for error checking.
1420  * ------------------------------------------------------------------------- */
1421  
1422 StgBool
1423 rtsSupportsBoundThreads(void)
1424 {
1425 #ifdef THREADED_RTS
1426   return rtsTrue;
1427 #else
1428   return rtsFalse;
1429 #endif
1430 }
1431
1432 /* ---------------------------------------------------------------------------
1433  * isThreadBound(tso): check whether tso is bound to an OS thread.
1434  * ------------------------------------------------------------------------- */
1435  
1436 StgBool
1437 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1438 {
1439 #ifdef THREADED_RTS
1440   return (tso->main != NULL);
1441 #endif
1442   return rtsFalse;
1443 }
1444
1445 /* ---------------------------------------------------------------------------
1446  * Singleton fork(). Do not copy any running threads.
1447  * ------------------------------------------------------------------------- */
1448
1449 #ifndef mingw32_TARGET_OS
1450 #define FORKPROCESS_PRIMOP_SUPPORTED
1451 #endif
1452
1453 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1454 static void 
1455 deleteThreadImmediately(StgTSO *tso);
1456 #endif
1457 StgInt
1458 forkProcess(HsStablePtr *entry
1459 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1460             STG_UNUSED
1461 #endif
1462            )
1463 {
1464 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1465   pid_t pid;
1466   StgTSO* t,*next;
1467   StgMainThread *m;
1468   SchedulerStatus rc;
1469
1470   IF_DEBUG(scheduler,sched_belch("forking!"));
1471   rts_lock(); // This not only acquires sched_mutex, it also
1472               // makes sure that no other threads are running
1473
1474   pid = fork();
1475
1476   if (pid) { /* parent */
1477
1478   /* just return the pid */
1479     rts_unlock();
1480     return pid;
1481     
1482   } else { /* child */
1483     
1484     
1485       // delete all threads
1486     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1487     
1488     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1489       next = t->link;
1490
1491         // don't allow threads to catch the ThreadKilled exception
1492       deleteThreadImmediately(t);
1493     }
1494     
1495       // wipe the main thread list
1496     while((m = main_threads) != NULL) {
1497       main_threads = m->link;
1498 # ifdef THREADED_RTS
1499       closeCondition(&m->bound_thread_cond);
1500 # endif
1501       stgFree(m);
1502     }
1503     
1504     rc = rts_evalStableIO(entry, NULL);  // run the action
1505     rts_checkSchedStatus("forkProcess",rc);
1506     
1507     rts_unlock();
1508     
1509     hs_exit();                      // clean up and exit
1510     stg_exit(0);
1511   }
1512 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1513   barf("forkProcess#: primop not supported, sorry!\n");
1514   return -1;
1515 #endif
1516 }
1517
1518 /* ---------------------------------------------------------------------------
1519  * deleteAllThreads():  kill all the live threads.
1520  *
1521  * This is used when we catch a user interrupt (^C), before performing
1522  * any necessary cleanups and running finalizers.
1523  *
1524  * Locks: sched_mutex held.
1525  * ------------------------------------------------------------------------- */
1526    
1527 void
1528 deleteAllThreads ( void )
1529 {
1530   StgTSO* t, *next;
1531   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1532   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1533       next = t->global_link;
1534       deleteThread(t);
1535   }      
1536
1537   // The run queue now contains a bunch of ThreadKilled threads.  We
1538   // must not throw these away: the main thread(s) will be in there
1539   // somewhere, and the main scheduler loop has to deal with it.
1540   // Also, the run queue is the only thing keeping these threads from
1541   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1542
1543   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1544   ASSERT(sleeping_queue == END_TSO_QUEUE);
1545 }
1546
1547 /* startThread and  insertThread are now in GranSim.c -- HWL */
1548
1549
1550 /* ---------------------------------------------------------------------------
1551  * Suspending & resuming Haskell threads.
1552  * 
1553  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1554  * its capability before calling the C function.  This allows another
1555  * task to pick up the capability and carry on running Haskell
1556  * threads.  It also means that if the C call blocks, it won't lock
1557  * the whole system.
1558  *
1559  * The Haskell thread making the C call is put to sleep for the
1560  * duration of the call, on the susepended_ccalling_threads queue.  We
1561  * give out a token to the task, which it can use to resume the thread
1562  * on return from the C function.
1563  * ------------------------------------------------------------------------- */
1564    
1565 StgInt
1566 suspendThread( StgRegTable *reg )
1567 {
1568   nat tok;
1569   Capability *cap;
1570   int saved_errno = errno;
1571
1572   /* assume that *reg is a pointer to the StgRegTable part
1573    * of a Capability.
1574    */
1575   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1576
1577   ACQUIRE_LOCK(&sched_mutex);
1578
1579   IF_DEBUG(scheduler,
1580            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1581
1582   // XXX this might not be necessary --SDM
1583   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1584
1585   threadPaused(cap->r.rCurrentTSO);
1586   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1587   suspended_ccalling_threads = cap->r.rCurrentTSO;
1588
1589   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1590       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1591       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1592   } else {
1593       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1594   }
1595
1596   /* Use the thread ID as the token; it should be unique */
1597   tok = cap->r.rCurrentTSO->id;
1598
1599   /* Hand back capability */
1600   releaseCapability(cap);
1601   
1602 #if defined(RTS_SUPPORTS_THREADS)
1603   /* Preparing to leave the RTS, so ensure there's a native thread/task
1604      waiting to take over.
1605   */
1606   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1607 #endif
1608
1609   RELEASE_LOCK(&sched_mutex);
1610   
1611   errno = saved_errno;
1612   return tok; 
1613 }
1614
1615 StgRegTable *
1616 resumeThread( StgInt tok )
1617 {
1618   StgTSO *tso, **prev;
1619   Capability *cap;
1620   int saved_errno = errno;
1621
1622 #if defined(RTS_SUPPORTS_THREADS)
1623   /* Wait for permission to re-enter the RTS with the result. */
1624   ACQUIRE_LOCK(&sched_mutex);
1625   waitForReturnCapability(&sched_mutex, &cap);
1626
1627   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1628 #else
1629   grabCapability(&cap);
1630 #endif
1631
1632   /* Remove the thread off of the suspended list */
1633   prev = &suspended_ccalling_threads;
1634   for (tso = suspended_ccalling_threads; 
1635        tso != END_TSO_QUEUE; 
1636        prev = &tso->link, tso = tso->link) {
1637     if (tso->id == (StgThreadID)tok) {
1638       *prev = tso->link;
1639       break;
1640     }
1641   }
1642   if (tso == END_TSO_QUEUE) {
1643     barf("resumeThread: thread not found");
1644   }
1645   tso->link = END_TSO_QUEUE;
1646   
1647   if(tso->why_blocked == BlockedOnCCall) {
1648       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1649       tso->blocked_exceptions = NULL;
1650   }
1651   
1652   /* Reset blocking status */
1653   tso->why_blocked  = NotBlocked;
1654
1655   cap->r.rCurrentTSO = tso;
1656   RELEASE_LOCK(&sched_mutex);
1657   errno = saved_errno;
1658   return &cap->r;
1659 }
1660
1661
1662 /* ---------------------------------------------------------------------------
1663  * Static functions
1664  * ------------------------------------------------------------------------ */
1665 static void unblockThread(StgTSO *tso);
1666
1667 /* ---------------------------------------------------------------------------
1668  * Comparing Thread ids.
1669  *
1670  * This is used from STG land in the implementation of the
1671  * instances of Eq/Ord for ThreadIds.
1672  * ------------------------------------------------------------------------ */
1673
1674 int
1675 cmp_thread(StgPtr tso1, StgPtr tso2) 
1676
1677   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1678   StgThreadID id2 = ((StgTSO *)tso2)->id;
1679  
1680   if (id1 < id2) return (-1);
1681   if (id1 > id2) return 1;
1682   return 0;
1683 }
1684
1685 /* ---------------------------------------------------------------------------
1686  * Fetching the ThreadID from an StgTSO.
1687  *
1688  * This is used in the implementation of Show for ThreadIds.
1689  * ------------------------------------------------------------------------ */
1690 int
1691 rts_getThreadId(StgPtr tso) 
1692 {
1693   return ((StgTSO *)tso)->id;
1694 }
1695
1696 #ifdef DEBUG
1697 void
1698 labelThread(StgPtr tso, char *label)
1699 {
1700   int len;
1701   void *buf;
1702
1703   /* Caveat: Once set, you can only set the thread name to "" */
1704   len = strlen(label)+1;
1705   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1706   strncpy(buf,label,len);
1707   /* Update will free the old memory for us */
1708   updateThreadLabel(((StgTSO *)tso)->id,buf);
1709 }
1710 #endif /* DEBUG */
1711
1712 /* ---------------------------------------------------------------------------
1713    Create a new thread.
1714
1715    The new thread starts with the given stack size.  Before the
1716    scheduler can run, however, this thread needs to have a closure
1717    (and possibly some arguments) pushed on its stack.  See
1718    pushClosure() in Schedule.h.
1719
1720    createGenThread() and createIOThread() (in SchedAPI.h) are
1721    convenient packaged versions of this function.
1722
1723    currently pri (priority) is only used in a GRAN setup -- HWL
1724    ------------------------------------------------------------------------ */
1725 #if defined(GRAN)
1726 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1727 StgTSO *
1728 createThread(nat size, StgInt pri)
1729 #else
1730 StgTSO *
1731 createThread(nat size)
1732 #endif
1733 {
1734
1735     StgTSO *tso;
1736     nat stack_size;
1737
1738     /* First check whether we should create a thread at all */
1739 #if defined(PAR)
1740   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1741   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1742     threadsIgnored++;
1743     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1744           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1745     return END_TSO_QUEUE;
1746   }
1747   threadsCreated++;
1748 #endif
1749
1750 #if defined(GRAN)
1751   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1752 #endif
1753
1754   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1755
1756   /* catch ridiculously small stack sizes */
1757   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1758     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1759   }
1760
1761   stack_size = size - TSO_STRUCT_SIZEW;
1762
1763   tso = (StgTSO *)allocate(size);
1764   TICK_ALLOC_TSO(stack_size, 0);
1765
1766   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1767 #if defined(GRAN)
1768   SET_GRAN_HDR(tso, ThisPE);
1769 #endif
1770
1771   // Always start with the compiled code evaluator
1772   tso->what_next = ThreadRunGHC;
1773
1774   tso->id = next_thread_id++; 
1775   tso->why_blocked  = NotBlocked;
1776   tso->blocked_exceptions = NULL;
1777
1778   tso->saved_errno = 0;
1779   tso->main = NULL;
1780   
1781   tso->stack_size   = stack_size;
1782   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1783                               - TSO_STRUCT_SIZEW;
1784   tso->sp           = (P_)&(tso->stack) + stack_size;
1785
1786   tso->trec = NO_TREC;
1787
1788 #ifdef PROFILING
1789   tso->prof.CCCS = CCS_MAIN;
1790 #endif
1791
1792   /* put a stop frame on the stack */
1793   tso->sp -= sizeofW(StgStopFrame);
1794   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1795   tso->link = END_TSO_QUEUE;
1796
1797   // ToDo: check this
1798 #if defined(GRAN)
1799   /* uses more flexible routine in GranSim */
1800   insertThread(tso, CurrentProc);
1801 #else
1802   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1803    * from its creation
1804    */
1805 #endif
1806
1807 #if defined(GRAN) 
1808   if (RtsFlags.GranFlags.GranSimStats.Full) 
1809     DumpGranEvent(GR_START,tso);
1810 #elif defined(PAR)
1811   if (RtsFlags.ParFlags.ParStats.Full) 
1812     DumpGranEvent(GR_STARTQ,tso);
1813   /* HACk to avoid SCHEDULE 
1814      LastTSO = tso; */
1815 #endif
1816
1817   /* Link the new thread on the global thread list.
1818    */
1819   tso->global_link = all_threads;
1820   all_threads = tso;
1821
1822 #if defined(DIST)
1823   tso->dist.priority = MandatoryPriority; //by default that is...
1824 #endif
1825
1826 #if defined(GRAN)
1827   tso->gran.pri = pri;
1828 # if defined(DEBUG)
1829   tso->gran.magic = TSO_MAGIC; // debugging only
1830 # endif
1831   tso->gran.sparkname   = 0;
1832   tso->gran.startedat   = CURRENT_TIME; 
1833   tso->gran.exported    = 0;
1834   tso->gran.basicblocks = 0;
1835   tso->gran.allocs      = 0;
1836   tso->gran.exectime    = 0;
1837   tso->gran.fetchtime   = 0;
1838   tso->gran.fetchcount  = 0;
1839   tso->gran.blocktime   = 0;
1840   tso->gran.blockcount  = 0;
1841   tso->gran.blockedat   = 0;
1842   tso->gran.globalsparks = 0;
1843   tso->gran.localsparks  = 0;
1844   if (RtsFlags.GranFlags.Light)
1845     tso->gran.clock  = Now; /* local clock */
1846   else
1847     tso->gran.clock  = 0;
1848
1849   IF_DEBUG(gran,printTSO(tso));
1850 #elif defined(PAR)
1851 # if defined(DEBUG)
1852   tso->par.magic = TSO_MAGIC; // debugging only
1853 # endif
1854   tso->par.sparkname   = 0;
1855   tso->par.startedat   = CURRENT_TIME; 
1856   tso->par.exported    = 0;
1857   tso->par.basicblocks = 0;
1858   tso->par.allocs      = 0;
1859   tso->par.exectime    = 0;
1860   tso->par.fetchtime   = 0;
1861   tso->par.fetchcount  = 0;
1862   tso->par.blocktime   = 0;
1863   tso->par.blockcount  = 0;
1864   tso->par.blockedat   = 0;
1865   tso->par.globalsparks = 0;
1866   tso->par.localsparks  = 0;
1867 #endif
1868
1869 #if defined(GRAN)
1870   globalGranStats.tot_threads_created++;
1871   globalGranStats.threads_created_on_PE[CurrentProc]++;
1872   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1873   globalGranStats.tot_sq_probes++;
1874 #elif defined(PAR)
1875   // collect parallel global statistics (currently done together with GC stats)
1876   if (RtsFlags.ParFlags.ParStats.Global &&
1877       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1878     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1879     globalParStats.tot_threads_created++;
1880   }
1881 #endif 
1882
1883 #if defined(GRAN)
1884   IF_GRAN_DEBUG(pri,
1885                 sched_belch("==__ schedule: Created TSO %d (%p);",
1886                       CurrentProc, tso, tso->id));
1887 #elif defined(PAR)
1888     IF_PAR_DEBUG(verbose,
1889                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1890                        (long)tso->id, tso, advisory_thread_count));
1891 #else
1892   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1893                                 (long)tso->id, (long)tso->stack_size));
1894 #endif    
1895   return tso;
1896 }
1897
1898 #if defined(PAR)
1899 /* RFP:
1900    all parallel thread creation calls should fall through the following routine.
1901 */
1902 StgTSO *
1903 createSparkThread(rtsSpark spark) 
1904 { StgTSO *tso;
1905   ASSERT(spark != (rtsSpark)NULL);
1906   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1907   { threadsIgnored++;
1908     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1909           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1910     return END_TSO_QUEUE;
1911   }
1912   else
1913   { threadsCreated++;
1914     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1915     if (tso==END_TSO_QUEUE)     
1916       barf("createSparkThread: Cannot create TSO");
1917 #if defined(DIST)
1918     tso->priority = AdvisoryPriority;
1919 #endif
1920     pushClosure(tso,spark);
1921     PUSH_ON_RUN_QUEUE(tso);
1922     advisory_thread_count++;    
1923   }
1924   return tso;
1925 }
1926 #endif
1927
1928 /*
1929   Turn a spark into a thread.
1930   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1931 */
1932 #if defined(PAR)
1933 StgTSO *
1934 activateSpark (rtsSpark spark) 
1935 {
1936   StgTSO *tso;
1937
1938   tso = createSparkThread(spark);
1939   if (RtsFlags.ParFlags.ParStats.Full) {   
1940     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1941     IF_PAR_DEBUG(verbose,
1942                  debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1943                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1944   }
1945   // ToDo: fwd info on local/global spark to thread -- HWL
1946   // tso->gran.exported =  spark->exported;
1947   // tso->gran.locked =   !spark->global;
1948   // tso->gran.sparkname = spark->name;
1949
1950   return tso;
1951 }
1952 #endif
1953
1954 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1955                                    Capability *initialCapability
1956                                    );
1957
1958
1959 /* ---------------------------------------------------------------------------
1960  * scheduleThread()
1961  *
1962  * scheduleThread puts a thread on the head of the runnable queue.
1963  * This will usually be done immediately after a thread is created.
1964  * The caller of scheduleThread must create the thread using e.g.
1965  * createThread and push an appropriate closure
1966  * on this thread's stack before the scheduler is invoked.
1967  * ------------------------------------------------------------------------ */
1968
1969 static void scheduleThread_ (StgTSO* tso);
1970
1971 void
1972 scheduleThread_(StgTSO *tso)
1973 {
1974   // The thread goes at the *end* of the run-queue, to avoid possible
1975   // starvation of any threads already on the queue.
1976   APPEND_TO_RUN_QUEUE(tso);
1977   threadRunnable();
1978 }
1979
1980 void
1981 scheduleThread(StgTSO* tso)
1982 {
1983   ACQUIRE_LOCK(&sched_mutex);
1984   scheduleThread_(tso);
1985   RELEASE_LOCK(&sched_mutex);
1986 }
1987
1988 #if defined(RTS_SUPPORTS_THREADS)
1989 static Condition bound_cond_cache;
1990 static int bound_cond_cache_full = 0;
1991 #endif
1992
1993
1994 SchedulerStatus
1995 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1996                    Capability *initialCapability)
1997 {
1998     // Precondition: sched_mutex must be held
1999     StgMainThread *m;
2000
2001     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2002     m->tso = tso;
2003     tso->main = m;
2004     m->ret = ret;
2005     m->stat = NoStatus;
2006     m->link = main_threads;
2007     m->prev = NULL;
2008     if (main_threads != NULL) {
2009         main_threads->prev = m;
2010     }
2011     main_threads = m;
2012
2013 #if defined(RTS_SUPPORTS_THREADS)
2014     // Allocating a new condition for each thread is expensive, so we
2015     // cache one.  This is a pretty feeble hack, but it helps speed up
2016     // consecutive call-ins quite a bit.
2017     if (bound_cond_cache_full) {
2018         m->bound_thread_cond = bound_cond_cache;
2019         bound_cond_cache_full = 0;
2020     } else {
2021         initCondition(&m->bound_thread_cond);
2022     }
2023 #endif
2024
2025     /* Put the thread on the main-threads list prior to scheduling the TSO.
2026        Failure to do so introduces a race condition in the MT case (as
2027        identified by Wolfgang Thaller), whereby the new task/OS thread 
2028        created by scheduleThread_() would complete prior to the thread
2029        that spawned it managed to put 'itself' on the main-threads list.
2030        The upshot of it all being that the worker thread wouldn't get to
2031        signal the completion of the its work item for the main thread to
2032        see (==> it got stuck waiting.)    -- sof 6/02.
2033     */
2034     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2035     
2036     APPEND_TO_RUN_QUEUE(tso);
2037     // NB. Don't call threadRunnable() here, because the thread is
2038     // bound and only runnable by *this* OS thread, so waking up other
2039     // workers will just slow things down.
2040
2041     return waitThread_(m, initialCapability);
2042 }
2043
2044 /* ---------------------------------------------------------------------------
2045  * initScheduler()
2046  *
2047  * Initialise the scheduler.  This resets all the queues - if the
2048  * queues contained any threads, they'll be garbage collected at the
2049  * next pass.
2050  *
2051  * ------------------------------------------------------------------------ */
2052
2053 void 
2054 initScheduler(void)
2055 {
2056 #if defined(GRAN)
2057   nat i;
2058
2059   for (i=0; i<=MAX_PROC; i++) {
2060     run_queue_hds[i]      = END_TSO_QUEUE;
2061     run_queue_tls[i]      = END_TSO_QUEUE;
2062     blocked_queue_hds[i]  = END_TSO_QUEUE;
2063     blocked_queue_tls[i]  = END_TSO_QUEUE;
2064     ccalling_threadss[i]  = END_TSO_QUEUE;
2065     sleeping_queue        = END_TSO_QUEUE;
2066   }
2067 #else
2068   run_queue_hd      = END_TSO_QUEUE;
2069   run_queue_tl      = END_TSO_QUEUE;
2070   blocked_queue_hd  = END_TSO_QUEUE;
2071   blocked_queue_tl  = END_TSO_QUEUE;
2072   sleeping_queue    = END_TSO_QUEUE;
2073 #endif 
2074
2075   suspended_ccalling_threads  = END_TSO_QUEUE;
2076
2077   main_threads = NULL;
2078   all_threads  = END_TSO_QUEUE;
2079
2080   context_switch = 0;
2081   interrupted    = 0;
2082
2083   RtsFlags.ConcFlags.ctxtSwitchTicks =
2084       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2085       
2086 #if defined(RTS_SUPPORTS_THREADS)
2087   /* Initialise the mutex and condition variables used by
2088    * the scheduler. */
2089   initMutex(&sched_mutex);
2090   initMutex(&term_mutex);
2091 #endif
2092   
2093   ACQUIRE_LOCK(&sched_mutex);
2094
2095   /* A capability holds the state a native thread needs in
2096    * order to execute STG code. At least one capability is
2097    * floating around (only SMP builds have more than one).
2098    */
2099   initCapabilities();
2100   
2101 #if defined(RTS_SUPPORTS_THREADS)
2102     /* start our haskell execution tasks */
2103     startTaskManager(0,taskStart);
2104 #endif
2105
2106 #if /* defined(SMP) ||*/ defined(PAR)
2107   initSparkPools();
2108 #endif
2109
2110   RELEASE_LOCK(&sched_mutex);
2111 }
2112
2113 void
2114 exitScheduler( void )
2115 {
2116 #if defined(RTS_SUPPORTS_THREADS)
2117   stopTaskManager();
2118 #endif
2119   shutting_down_scheduler = rtsTrue;
2120 }
2121
2122 /* ----------------------------------------------------------------------------
2123    Managing the per-task allocation areas.
2124    
2125    Each capability comes with an allocation area.  These are
2126    fixed-length block lists into which allocation can be done.
2127
2128    ToDo: no support for two-space collection at the moment???
2129    ------------------------------------------------------------------------- */
2130
2131 static
2132 SchedulerStatus
2133 waitThread_(StgMainThread* m, Capability *initialCapability)
2134 {
2135   SchedulerStatus stat;
2136
2137   // Precondition: sched_mutex must be held.
2138   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2139
2140 #if defined(GRAN)
2141   /* GranSim specific init */
2142   CurrentTSO = m->tso;                // the TSO to run
2143   procStatus[MainProc] = Busy;        // status of main PE
2144   CurrentProc = MainProc;             // PE to run it on
2145   schedule(m,initialCapability);
2146 #else
2147   schedule(m,initialCapability);
2148   ASSERT(m->stat != NoStatus);
2149 #endif
2150
2151   stat = m->stat;
2152
2153 #if defined(RTS_SUPPORTS_THREADS)
2154   // Free the condition variable, returning it to the cache if possible.
2155   if (!bound_cond_cache_full) {
2156       bound_cond_cache = m->bound_thread_cond;
2157       bound_cond_cache_full = 1;
2158   } else {
2159       closeCondition(&m->bound_thread_cond);
2160   }
2161 #endif
2162
2163   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2164   stgFree(m);
2165
2166   // Postcondition: sched_mutex still held
2167   return stat;
2168 }
2169
2170 /* ---------------------------------------------------------------------------
2171    Where are the roots that we know about?
2172
2173         - all the threads on the runnable queue
2174         - all the threads on the blocked queue
2175         - all the threads on the sleeping queue
2176         - all the thread currently executing a _ccall_GC
2177         - all the "main threads"
2178      
2179    ------------------------------------------------------------------------ */
2180
2181 /* This has to be protected either by the scheduler monitor, or by the
2182         garbage collection monitor (probably the latter).
2183         KH @ 25/10/99
2184 */
2185
2186 void
2187 GetRoots( evac_fn evac )
2188 {
2189 #if defined(GRAN)
2190   {
2191     nat i;
2192     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2193       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2194           evac((StgClosure **)&run_queue_hds[i]);
2195       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2196           evac((StgClosure **)&run_queue_tls[i]);
2197       
2198       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2199           evac((StgClosure **)&blocked_queue_hds[i]);
2200       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2201           evac((StgClosure **)&blocked_queue_tls[i]);
2202       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2203           evac((StgClosure **)&ccalling_threads[i]);
2204     }
2205   }
2206
2207   markEventQueue();
2208
2209 #else /* !GRAN */
2210   if (run_queue_hd != END_TSO_QUEUE) {
2211       ASSERT(run_queue_tl != END_TSO_QUEUE);
2212       evac((StgClosure **)&run_queue_hd);
2213       evac((StgClosure **)&run_queue_tl);
2214   }
2215   
2216   if (blocked_queue_hd != END_TSO_QUEUE) {
2217       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2218       evac((StgClosure **)&blocked_queue_hd);
2219       evac((StgClosure **)&blocked_queue_tl);
2220   }
2221   
2222   if (sleeping_queue != END_TSO_QUEUE) {
2223       evac((StgClosure **)&sleeping_queue);
2224   }
2225 #endif 
2226
2227   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2228       evac((StgClosure **)&suspended_ccalling_threads);
2229   }
2230
2231 #if defined(PAR) || defined(GRAN)
2232   markSparkQueue(evac);
2233 #endif
2234
2235 #if defined(RTS_USER_SIGNALS)
2236   // mark the signal handlers (signals should be already blocked)
2237   markSignalHandlers(evac);
2238 #endif
2239 }
2240
2241 /* -----------------------------------------------------------------------------
2242    performGC
2243
2244    This is the interface to the garbage collector from Haskell land.
2245    We provide this so that external C code can allocate and garbage
2246    collect when called from Haskell via _ccall_GC.
2247
2248    It might be useful to provide an interface whereby the programmer
2249    can specify more roots (ToDo).
2250    
2251    This needs to be protected by the GC condition variable above.  KH.
2252    -------------------------------------------------------------------------- */
2253
2254 static void (*extra_roots)(evac_fn);
2255
2256 void
2257 performGC(void)
2258 {
2259   /* Obligated to hold this lock upon entry */
2260   ACQUIRE_LOCK(&sched_mutex);
2261   GarbageCollect(GetRoots,rtsFalse);
2262   RELEASE_LOCK(&sched_mutex);
2263 }
2264
2265 void
2266 performMajorGC(void)
2267 {
2268   ACQUIRE_LOCK(&sched_mutex);
2269   GarbageCollect(GetRoots,rtsTrue);
2270   RELEASE_LOCK(&sched_mutex);
2271 }
2272
2273 static void
2274 AllRoots(evac_fn evac)
2275 {
2276     GetRoots(evac);             // the scheduler's roots
2277     extra_roots(evac);          // the user's roots
2278 }
2279
2280 void
2281 performGCWithRoots(void (*get_roots)(evac_fn))
2282 {
2283   ACQUIRE_LOCK(&sched_mutex);
2284   extra_roots = get_roots;
2285   GarbageCollect(AllRoots,rtsFalse);
2286   RELEASE_LOCK(&sched_mutex);
2287 }
2288
2289 /* -----------------------------------------------------------------------------
2290    Stack overflow
2291
2292    If the thread has reached its maximum stack size, then raise the
2293    StackOverflow exception in the offending thread.  Otherwise
2294    relocate the TSO into a larger chunk of memory and adjust its stack
2295    size appropriately.
2296    -------------------------------------------------------------------------- */
2297
2298 static StgTSO *
2299 threadStackOverflow(StgTSO *tso)
2300 {
2301   nat new_stack_size, new_tso_size, stack_words;
2302   StgPtr new_sp;
2303   StgTSO *dest;
2304
2305   IF_DEBUG(sanity,checkTSO(tso));
2306   if (tso->stack_size >= tso->max_stack_size) {
2307
2308     IF_DEBUG(gc,
2309              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2310                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2311              /* If we're debugging, just print out the top of the stack */
2312              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2313                                               tso->sp+64)));
2314
2315     /* Send this thread the StackOverflow exception */
2316     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2317     return tso;
2318   }
2319
2320   /* Try to double the current stack size.  If that takes us over the
2321    * maximum stack size for this thread, then use the maximum instead.
2322    * Finally round up so the TSO ends up as a whole number of blocks.
2323    */
2324   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2325   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2326                                        TSO_STRUCT_SIZE)/sizeof(W_);
2327   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2328   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2329
2330   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2331
2332   dest = (StgTSO *)allocate(new_tso_size);
2333   TICK_ALLOC_TSO(new_stack_size,0);
2334
2335   /* copy the TSO block and the old stack into the new area */
2336   memcpy(dest,tso,TSO_STRUCT_SIZE);
2337   stack_words = tso->stack + tso->stack_size - tso->sp;
2338   new_sp = (P_)dest + new_tso_size - stack_words;
2339   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2340
2341   /* relocate the stack pointers... */
2342   dest->sp         = new_sp;
2343   dest->stack_size = new_stack_size;
2344         
2345   /* Mark the old TSO as relocated.  We have to check for relocated
2346    * TSOs in the garbage collector and any primops that deal with TSOs.
2347    *
2348    * It's important to set the sp value to just beyond the end
2349    * of the stack, so we don't attempt to scavenge any part of the
2350    * dead TSO's stack.
2351    */
2352   tso->what_next = ThreadRelocated;
2353   tso->link = dest;
2354   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2355   tso->why_blocked = NotBlocked;
2356   dest->mut_link = NULL;
2357
2358   IF_PAR_DEBUG(verbose,
2359                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2360                      tso->id, tso, tso->stack_size);
2361                /* If we're debugging, just print out the top of the stack */
2362                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2363                                                 tso->sp+64)));
2364   
2365   IF_DEBUG(sanity,checkTSO(tso));
2366 #if 0
2367   IF_DEBUG(scheduler,printTSO(dest));
2368 #endif
2369
2370   return dest;
2371 }
2372
2373 /* ---------------------------------------------------------------------------
2374    Wake up a queue that was blocked on some resource.
2375    ------------------------------------------------------------------------ */
2376
2377 #if defined(GRAN)
2378 STATIC_INLINE void
2379 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2380 {
2381 }
2382 #elif defined(PAR)
2383 STATIC_INLINE void
2384 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2385 {
2386   /* write RESUME events to log file and
2387      update blocked and fetch time (depending on type of the orig closure) */
2388   if (RtsFlags.ParFlags.ParStats.Full) {
2389     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2390                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2391                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2392     if (EMPTY_RUN_QUEUE())
2393       emitSchedule = rtsTrue;
2394
2395     switch (get_itbl(node)->type) {
2396         case FETCH_ME_BQ:
2397           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2398           break;
2399         case RBH:
2400         case FETCH_ME:
2401         case BLACKHOLE_BQ:
2402           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2403           break;
2404 #ifdef DIST
2405         case MVAR:
2406           break;
2407 #endif    
2408         default:
2409           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2410         }
2411       }
2412 }
2413 #endif
2414
2415 #if defined(GRAN)
2416 static StgBlockingQueueElement *
2417 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2418 {
2419     StgTSO *tso;
2420     PEs node_loc, tso_loc;
2421
2422     node_loc = where_is(node); // should be lifted out of loop
2423     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2424     tso_loc = where_is((StgClosure *)tso);
2425     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2426       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2427       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2428       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2429       // insertThread(tso, node_loc);
2430       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2431                 ResumeThread,
2432                 tso, node, (rtsSpark*)NULL);
2433       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2434       // len_local++;
2435       // len++;
2436     } else { // TSO is remote (actually should be FMBQ)
2437       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2438                                   RtsFlags.GranFlags.Costs.gunblocktime +
2439                                   RtsFlags.GranFlags.Costs.latency;
2440       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2441                 UnblockThread,
2442                 tso, node, (rtsSpark*)NULL);
2443       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2444       // len++;
2445     }
2446     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2447     IF_GRAN_DEBUG(bq,
2448                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2449                           (node_loc==tso_loc ? "Local" : "Global"), 
2450                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2451     tso->block_info.closure = NULL;
2452     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2453                              tso->id, tso));
2454 }
2455 #elif defined(PAR)
2456 static StgBlockingQueueElement *
2457 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2458 {
2459     StgBlockingQueueElement *next;
2460
2461     switch (get_itbl(bqe)->type) {
2462     case TSO:
2463       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2464       /* if it's a TSO just push it onto the run_queue */
2465       next = bqe->link;
2466       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2467       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2468       threadRunnable();
2469       unblockCount(bqe, node);
2470       /* reset blocking status after dumping event */
2471       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2472       break;
2473
2474     case BLOCKED_FETCH:
2475       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2476       next = bqe->link;
2477       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2478       PendingFetches = (StgBlockedFetch *)bqe;
2479       break;
2480
2481 # if defined(DEBUG)
2482       /* can ignore this case in a non-debugging setup; 
2483          see comments on RBHSave closures above */
2484     case CONSTR:
2485       /* check that the closure is an RBHSave closure */
2486       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2487              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2488              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2489       break;
2490
2491     default:
2492       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2493            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2494            (StgClosure *)bqe);
2495 # endif
2496     }
2497   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2498   return next;
2499 }
2500
2501 #else /* !GRAN && !PAR */
2502 static StgTSO *
2503 unblockOneLocked(StgTSO *tso)
2504 {
2505   StgTSO *next;
2506
2507   ASSERT(get_itbl(tso)->type == TSO);
2508   ASSERT(tso->why_blocked != NotBlocked);
2509   tso->why_blocked = NotBlocked;
2510   next = tso->link;
2511   tso->link = END_TSO_QUEUE;
2512   APPEND_TO_RUN_QUEUE(tso);
2513   threadRunnable();
2514   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2515   return next;
2516 }
2517 #endif
2518
2519 #if defined(GRAN) || defined(PAR)
2520 INLINE_ME StgBlockingQueueElement *
2521 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2522 {
2523   ACQUIRE_LOCK(&sched_mutex);
2524   bqe = unblockOneLocked(bqe, node);
2525   RELEASE_LOCK(&sched_mutex);
2526   return bqe;
2527 }
2528 #else
2529 INLINE_ME StgTSO *
2530 unblockOne(StgTSO *tso)
2531 {
2532   ACQUIRE_LOCK(&sched_mutex);
2533   tso = unblockOneLocked(tso);
2534   RELEASE_LOCK(&sched_mutex);
2535   return tso;
2536 }
2537 #endif
2538
2539 #if defined(GRAN)
2540 void 
2541 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2542 {
2543   StgBlockingQueueElement *bqe;
2544   PEs node_loc;
2545   nat len = 0; 
2546
2547   IF_GRAN_DEBUG(bq, 
2548                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2549                       node, CurrentProc, CurrentTime[CurrentProc], 
2550                       CurrentTSO->id, CurrentTSO));
2551
2552   node_loc = where_is(node);
2553
2554   ASSERT(q == END_BQ_QUEUE ||
2555          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2556          get_itbl(q)->type == CONSTR); // closure (type constructor)
2557   ASSERT(is_unique(node));
2558
2559   /* FAKE FETCH: magically copy the node to the tso's proc;
2560      no Fetch necessary because in reality the node should not have been 
2561      moved to the other PE in the first place
2562   */
2563   if (CurrentProc!=node_loc) {
2564     IF_GRAN_DEBUG(bq, 
2565                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2566                         node, node_loc, CurrentProc, CurrentTSO->id, 
2567                         // CurrentTSO, where_is(CurrentTSO),
2568                         node->header.gran.procs));
2569     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2570     IF_GRAN_DEBUG(bq, 
2571                   debugBelch("## new bitmask of node %p is %#x\n",
2572                         node, node->header.gran.procs));
2573     if (RtsFlags.GranFlags.GranSimStats.Global) {
2574       globalGranStats.tot_fake_fetches++;
2575     }
2576   }
2577
2578   bqe = q;
2579   // ToDo: check: ASSERT(CurrentProc==node_loc);
2580   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2581     //next = bqe->link;
2582     /* 
2583        bqe points to the current element in the queue
2584        next points to the next element in the queue
2585     */
2586     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2587     //tso_loc = where_is(tso);
2588     len++;
2589     bqe = unblockOneLocked(bqe, node);
2590   }
2591
2592   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2593      the closure to make room for the anchor of the BQ */
2594   if (bqe!=END_BQ_QUEUE) {
2595     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2596     /*
2597     ASSERT((info_ptr==&RBH_Save_0_info) ||
2598            (info_ptr==&RBH_Save_1_info) ||
2599            (info_ptr==&RBH_Save_2_info));
2600     */
2601     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2602     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2603     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2604
2605     IF_GRAN_DEBUG(bq,
2606                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2607                         node, info_type(node)));
2608   }
2609
2610   /* statistics gathering */
2611   if (RtsFlags.GranFlags.GranSimStats.Global) {
2612     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2613     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2614     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2615     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2616   }
2617   IF_GRAN_DEBUG(bq,
2618                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2619                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2620 }
2621 #elif defined(PAR)
2622 void 
2623 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2624 {
2625   StgBlockingQueueElement *bqe;
2626
2627   ACQUIRE_LOCK(&sched_mutex);
2628
2629   IF_PAR_DEBUG(verbose, 
2630                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2631                      node, mytid));
2632 #ifdef DIST  
2633   //RFP
2634   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2635     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2636     return;
2637   }
2638 #endif
2639   
2640   ASSERT(q == END_BQ_QUEUE ||
2641          get_itbl(q)->type == TSO ||           
2642          get_itbl(q)->type == BLOCKED_FETCH || 
2643          get_itbl(q)->type == CONSTR); 
2644
2645   bqe = q;
2646   while (get_itbl(bqe)->type==TSO || 
2647          get_itbl(bqe)->type==BLOCKED_FETCH) {
2648     bqe = unblockOneLocked(bqe, node);
2649   }
2650   RELEASE_LOCK(&sched_mutex);
2651 }
2652
2653 #else   /* !GRAN && !PAR */
2654
2655 void
2656 awakenBlockedQueueNoLock(StgTSO *tso)
2657 {
2658   while (tso != END_TSO_QUEUE) {
2659     tso = unblockOneLocked(tso);
2660   }
2661 }
2662
2663 void
2664 awakenBlockedQueue(StgTSO *tso)
2665 {
2666   ACQUIRE_LOCK(&sched_mutex);
2667   while (tso != END_TSO_QUEUE) {
2668     tso = unblockOneLocked(tso);
2669   }
2670   RELEASE_LOCK(&sched_mutex);
2671 }
2672 #endif
2673
2674 /* ---------------------------------------------------------------------------
2675    Interrupt execution
2676    - usually called inside a signal handler so it mustn't do anything fancy.   
2677    ------------------------------------------------------------------------ */
2678
2679 void
2680 interruptStgRts(void)
2681 {
2682     interrupted    = 1;
2683     context_switch = 1;
2684 }
2685
2686 /* -----------------------------------------------------------------------------
2687    Unblock a thread
2688
2689    This is for use when we raise an exception in another thread, which
2690    may be blocked.
2691    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2692    -------------------------------------------------------------------------- */
2693
2694 #if defined(GRAN) || defined(PAR)
2695 /*
2696   NB: only the type of the blocking queue is different in GranSim and GUM
2697       the operations on the queue-elements are the same
2698       long live polymorphism!
2699
2700   Locks: sched_mutex is held upon entry and exit.
2701
2702 */
2703 static void
2704 unblockThread(StgTSO *tso)
2705 {
2706   StgBlockingQueueElement *t, **last;
2707
2708   switch (tso->why_blocked) {
2709
2710   case NotBlocked:
2711     return;  /* not blocked */
2712
2713   case BlockedOnSTM:
2714     // Be careful: nothing to do here!  We tell the scheduler that the thread
2715     // is runnable and we leave it to the stack-walking code to abort the 
2716     // transaction while unwinding the stack.  We should perhaps have a debugging
2717     // test to make sure that this really happens and that the 'zombie' transaction
2718     // does not get committed.
2719     goto done;
2720
2721   case BlockedOnMVar:
2722     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2723     {
2724       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2725       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2726
2727       last = (StgBlockingQueueElement **)&mvar->head;
2728       for (t = (StgBlockingQueueElement *)mvar->head; 
2729            t != END_BQ_QUEUE; 
2730            last = &t->link, last_tso = t, t = t->link) {
2731         if (t == (StgBlockingQueueElement *)tso) {
2732           *last = (StgBlockingQueueElement *)tso->link;
2733           if (mvar->tail == tso) {
2734             mvar->tail = (StgTSO *)last_tso;
2735           }
2736           goto done;
2737         }
2738       }
2739       barf("unblockThread (MVAR): TSO not found");
2740     }
2741
2742   case BlockedOnBlackHole:
2743     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2744     {
2745       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2746
2747       last = &bq->blocking_queue;
2748       for (t = bq->blocking_queue; 
2749            t != END_BQ_QUEUE; 
2750            last = &t->link, t = t->link) {
2751         if (t == (StgBlockingQueueElement *)tso) {
2752           *last = (StgBlockingQueueElement *)tso->link;
2753           goto done;
2754         }
2755       }
2756       barf("unblockThread (BLACKHOLE): TSO not found");
2757     }
2758
2759   case BlockedOnException:
2760     {
2761       StgTSO *target  = tso->block_info.tso;
2762
2763       ASSERT(get_itbl(target)->type == TSO);
2764
2765       if (target->what_next == ThreadRelocated) {
2766           target = target->link;
2767           ASSERT(get_itbl(target)->type == TSO);
2768       }
2769
2770       ASSERT(target->blocked_exceptions != NULL);
2771
2772       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2773       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2774            t != END_BQ_QUEUE; 
2775            last = &t->link, t = t->link) {
2776         ASSERT(get_itbl(t)->type == TSO);
2777         if (t == (StgBlockingQueueElement *)tso) {
2778           *last = (StgBlockingQueueElement *)tso->link;
2779           goto done;
2780         }
2781       }
2782       barf("unblockThread (Exception): TSO not found");
2783     }
2784
2785   case BlockedOnRead:
2786   case BlockedOnWrite:
2787 #if defined(mingw32_TARGET_OS)
2788   case BlockedOnDoProc:
2789 #endif
2790     {
2791       /* take TSO off blocked_queue */
2792       StgBlockingQueueElement *prev = NULL;
2793       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2794            prev = t, t = t->link) {
2795         if (t == (StgBlockingQueueElement *)tso) {
2796           if (prev == NULL) {
2797             blocked_queue_hd = (StgTSO *)t->link;
2798             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2799               blocked_queue_tl = END_TSO_QUEUE;
2800             }
2801           } else {
2802             prev->link = t->link;
2803             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2804               blocked_queue_tl = (StgTSO *)prev;
2805             }
2806           }
2807           goto done;
2808         }
2809       }
2810       barf("unblockThread (I/O): TSO not found");
2811     }
2812
2813   case BlockedOnDelay:
2814     {
2815       /* take TSO off sleeping_queue */
2816       StgBlockingQueueElement *prev = NULL;
2817       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2818            prev = t, t = t->link) {
2819         if (t == (StgBlockingQueueElement *)tso) {
2820           if (prev == NULL) {
2821             sleeping_queue = (StgTSO *)t->link;
2822           } else {
2823             prev->link = t->link;
2824           }
2825           goto done;
2826         }
2827       }
2828       barf("unblockThread (delay): TSO not found");
2829     }
2830
2831   default:
2832     barf("unblockThread");
2833   }
2834
2835  done:
2836   tso->link = END_TSO_QUEUE;
2837   tso->why_blocked = NotBlocked;
2838   tso->block_info.closure = NULL;
2839   PUSH_ON_RUN_QUEUE(tso);
2840 }
2841 #else
2842 static void
2843 unblockThread(StgTSO *tso)
2844 {
2845   StgTSO *t, **last;
2846   
2847   /* To avoid locking unnecessarily. */
2848   if (tso->why_blocked == NotBlocked) {
2849     return;
2850   }
2851
2852   switch (tso->why_blocked) {
2853
2854   case BlockedOnSTM:
2855     // Be careful: nothing to do here!  We tell the scheduler that the thread
2856     // is runnable and we leave it to the stack-walking code to abort the 
2857     // transaction while unwinding the stack.  We should perhaps have a debugging
2858     // test to make sure that this really happens and that the 'zombie' transaction
2859     // does not get committed.
2860     goto done;
2861
2862   case BlockedOnMVar:
2863     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2864     {
2865       StgTSO *last_tso = END_TSO_QUEUE;
2866       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2867
2868       last = &mvar->head;
2869       for (t = mvar->head; t != END_TSO_QUEUE; 
2870            last = &t->link, last_tso = t, t = t->link) {
2871         if (t == tso) {
2872           *last = tso->link;
2873           if (mvar->tail == tso) {
2874             mvar->tail = last_tso;
2875           }
2876           goto done;
2877         }
2878       }
2879       barf("unblockThread (MVAR): TSO not found");
2880     }
2881
2882   case BlockedOnBlackHole:
2883     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2884     {
2885       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2886
2887       last = &bq->blocking_queue;
2888       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2889            last = &t->link, t = t->link) {
2890         if (t == tso) {
2891           *last = tso->link;
2892           goto done;
2893         }
2894       }
2895       barf("unblockThread (BLACKHOLE): TSO not found");
2896     }
2897
2898   case BlockedOnException:
2899     {
2900       StgTSO *target  = tso->block_info.tso;
2901
2902       ASSERT(get_itbl(target)->type == TSO);
2903
2904       while (target->what_next == ThreadRelocated) {
2905           target = target->link;
2906           ASSERT(get_itbl(target)->type == TSO);
2907       }
2908       
2909       ASSERT(target->blocked_exceptions != NULL);
2910
2911       last = &target->blocked_exceptions;
2912       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2913            last = &t->link, t = t->link) {
2914         ASSERT(get_itbl(t)->type == TSO);
2915         if (t == tso) {
2916           *last = tso->link;
2917           goto done;
2918         }
2919       }
2920       barf("unblockThread (Exception): TSO not found");
2921     }
2922
2923   case BlockedOnRead:
2924   case BlockedOnWrite:
2925 #if defined(mingw32_TARGET_OS)
2926   case BlockedOnDoProc:
2927 #endif
2928     {
2929       StgTSO *prev = NULL;
2930       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2931            prev = t, t = t->link) {
2932         if (t == tso) {
2933           if (prev == NULL) {
2934             blocked_queue_hd = t->link;
2935             if (blocked_queue_tl == t) {
2936               blocked_queue_tl = END_TSO_QUEUE;
2937             }
2938           } else {
2939             prev->link = t->link;
2940             if (blocked_queue_tl == t) {
2941               blocked_queue_tl = prev;
2942             }
2943           }
2944           goto done;
2945         }
2946       }
2947       barf("unblockThread (I/O): TSO not found");
2948     }
2949
2950   case BlockedOnDelay:
2951     {
2952       StgTSO *prev = NULL;
2953       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2954            prev = t, t = t->link) {
2955         if (t == tso) {
2956           if (prev == NULL) {
2957             sleeping_queue = t->link;
2958           } else {
2959             prev->link = t->link;
2960           }
2961           goto done;
2962         }
2963       }
2964       barf("unblockThread (delay): TSO not found");
2965     }
2966
2967   default:
2968     barf("unblockThread");
2969   }
2970
2971  done:
2972   tso->link = END_TSO_QUEUE;
2973   tso->why_blocked = NotBlocked;
2974   tso->block_info.closure = NULL;
2975   APPEND_TO_RUN_QUEUE(tso);
2976 }
2977 #endif
2978
2979 /* -----------------------------------------------------------------------------
2980  * raiseAsync()
2981  *
2982  * The following function implements the magic for raising an
2983  * asynchronous exception in an existing thread.
2984  *
2985  * We first remove the thread from any queue on which it might be
2986  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2987  *
2988  * We strip the stack down to the innermost CATCH_FRAME, building
2989  * thunks in the heap for all the active computations, so they can 
2990  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2991  * an application of the handler to the exception, and push it on
2992  * the top of the stack.
2993  * 
2994  * How exactly do we save all the active computations?  We create an
2995  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2996  * AP_STACKs pushes everything from the corresponding update frame
2997  * upwards onto the stack.  (Actually, it pushes everything up to the
2998  * next update frame plus a pointer to the next AP_STACK object.
2999  * Entering the next AP_STACK object pushes more onto the stack until we
3000  * reach the last AP_STACK object - at which point the stack should look
3001  * exactly as it did when we killed the TSO and we can continue
3002  * execution by entering the closure on top of the stack.
3003  *
3004  * We can also kill a thread entirely - this happens if either (a) the 
3005  * exception passed to raiseAsync is NULL, or (b) there's no
3006  * CATCH_FRAME on the stack.  In either case, we strip the entire
3007  * stack and replace the thread with a zombie.
3008  *
3009  * Locks: sched_mutex held upon entry nor exit.
3010  *
3011  * -------------------------------------------------------------------------- */
3012  
3013 void 
3014 deleteThread(StgTSO *tso)
3015 {
3016   raiseAsync(tso,NULL);
3017 }
3018
3019 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3020 static void 
3021 deleteThreadImmediately(StgTSO *tso)
3022 { // for forkProcess only:
3023   // delete thread without giving it a chance to catch the KillThread exception
3024
3025   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3026       return;
3027   }
3028
3029   if (tso->why_blocked != BlockedOnCCall &&
3030       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3031     unblockThread(tso);
3032   }
3033
3034   tso->what_next = ThreadKilled;
3035 }
3036 #endif
3037
3038 void
3039 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3040 {
3041   /* When raising async exs from contexts where sched_mutex isn't held;
3042      use raiseAsyncWithLock(). */
3043   ACQUIRE_LOCK(&sched_mutex);
3044   raiseAsync(tso,exception);
3045   RELEASE_LOCK(&sched_mutex);
3046 }
3047
3048 void
3049 raiseAsync(StgTSO *tso, StgClosure *exception)
3050 {
3051     StgRetInfoTable *info;
3052     StgPtr sp;
3053   
3054     // Thread already dead?
3055     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3056         return;
3057     }
3058
3059     IF_DEBUG(scheduler, 
3060              sched_belch("raising exception in thread %ld.", (long)tso->id));
3061     
3062     // Remove it from any blocking queues
3063     unblockThread(tso);
3064
3065     sp = tso->sp;
3066     
3067     // The stack freezing code assumes there's a closure pointer on
3068     // the top of the stack, so we have to arrange that this is the case...
3069     //
3070     if (sp[0] == (W_)&stg_enter_info) {
3071         sp++;
3072     } else {
3073         sp--;
3074         sp[0] = (W_)&stg_dummy_ret_closure;
3075     }
3076
3077     while (1) {
3078         nat i;
3079
3080         // 1. Let the top of the stack be the "current closure"
3081         //
3082         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3083         // CATCH_FRAME.
3084         //
3085         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3086         // current closure applied to the chunk of stack up to (but not
3087         // including) the update frame.  This closure becomes the "current
3088         // closure".  Go back to step 2.
3089         //
3090         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3091         // top of the stack applied to the exception.
3092         // 
3093         // 5. If it's a STOP_FRAME, then kill the thread.
3094         // 
3095         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3096         // transaction
3097        
3098         
3099         StgPtr frame;
3100         
3101         frame = sp + 1;
3102         info = get_ret_itbl((StgClosure *)frame);
3103         
3104         while (info->i.type != UPDATE_FRAME
3105                && (info->i.type != CATCH_FRAME || exception == NULL)
3106                && info->i.type != STOP_FRAME) {
3107             if (info->i.type == ATOMICALLY_FRAME) {
3108               // IF we find an ATOMICALLY_FRAME then we abort the
3109               // current transaction and propagate the exception.  In
3110               // this case (unlike ordinary exceptions) we do not care
3111               // whether the transaction is valid or not because its
3112               // possible validity cannot have caused the exception
3113               // and will not be visible after the abort.
3114               IF_DEBUG(stm,
3115                        debugBelch("Found atomically block delivering async exception\n"));
3116               stmAbortTransaction(tso -> trec);
3117               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3118             }
3119             frame += stack_frame_sizeW((StgClosure *)frame);
3120             info = get_ret_itbl((StgClosure *)frame);
3121         }
3122         
3123         switch (info->i.type) {
3124             
3125         case CATCH_FRAME:
3126             // If we find a CATCH_FRAME, and we've got an exception to raise,
3127             // then build the THUNK raise(exception), and leave it on
3128             // top of the CATCH_FRAME ready to enter.
3129             //
3130         {
3131 #ifdef PROFILING
3132             StgCatchFrame *cf = (StgCatchFrame *)frame;
3133 #endif
3134             StgClosure *raise;
3135             
3136             // we've got an exception to raise, so let's pass it to the
3137             // handler in this frame.
3138             //
3139             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3140             TICK_ALLOC_SE_THK(1,0);
3141             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3142             raise->payload[0] = exception;
3143             
3144             // throw away the stack from Sp up to the CATCH_FRAME.
3145             //
3146             sp = frame - 1;
3147             
3148             /* Ensure that async excpetions are blocked now, so we don't get
3149              * a surprise exception before we get around to executing the
3150              * handler.
3151              */
3152             if (tso->blocked_exceptions == NULL) {
3153                 tso->blocked_exceptions = END_TSO_QUEUE;
3154             }
3155             
3156             /* Put the newly-built THUNK on top of the stack, ready to execute
3157              * when the thread restarts.
3158              */
3159             sp[0] = (W_)raise;
3160             sp[-1] = (W_)&stg_enter_info;
3161             tso->sp = sp-1;
3162             tso->what_next = ThreadRunGHC;
3163             IF_DEBUG(sanity, checkTSO(tso));
3164             return;
3165         }
3166         
3167         case UPDATE_FRAME:
3168         {
3169             StgAP_STACK * ap;
3170             nat words;
3171             
3172             // First build an AP_STACK consisting of the stack chunk above the
3173             // current update frame, with the top word on the stack as the
3174             // fun field.
3175             //
3176             words = frame - sp - 1;
3177             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3178             
3179             ap->size = words;
3180             ap->fun  = (StgClosure *)sp[0];
3181             sp++;
3182             for(i=0; i < (nat)words; ++i) {
3183                 ap->payload[i] = (StgClosure *)*sp++;
3184             }
3185             
3186             SET_HDR(ap,&stg_AP_STACK_info,
3187                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3188             TICK_ALLOC_UP_THK(words+1,0);
3189             
3190             IF_DEBUG(scheduler,
3191                      debugBelch("sched: Updating ");
3192                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3193                      debugBelch(" with ");
3194                      printObj((StgClosure *)ap);
3195                 );
3196
3197             // Replace the updatee with an indirection - happily
3198             // this will also wake up any threads currently
3199             // waiting on the result.
3200             //
3201             // Warning: if we're in a loop, more than one update frame on
3202             // the stack may point to the same object.  Be careful not to
3203             // overwrite an IND_OLDGEN in this case, because we'll screw
3204             // up the mutable lists.  To be on the safe side, don't
3205             // overwrite any kind of indirection at all.  See also
3206             // threadSqueezeStack in GC.c, where we have to make a similar
3207             // check.
3208             //
3209             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3210                 // revert the black hole
3211                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3212                                (StgClosure *)ap);
3213             }
3214             sp += sizeofW(StgUpdateFrame) - 1;
3215             sp[0] = (W_)ap; // push onto stack
3216             break;
3217         }
3218         
3219         case STOP_FRAME:
3220             // We've stripped the entire stack, the thread is now dead.
3221             sp += sizeofW(StgStopFrame);
3222             tso->what_next = ThreadKilled;
3223             tso->sp = sp;
3224             return;
3225             
3226         default:
3227             barf("raiseAsync");
3228         }
3229     }
3230     barf("raiseAsync");
3231 }
3232
3233 /* -----------------------------------------------------------------------------
3234    raiseExceptionHelper
3235    
3236    This function is called by the raise# primitve, just so that we can
3237    move some of the tricky bits of raising an exception from C-- into
3238    C.  Who knows, it might be a useful re-useable thing here too.
3239    -------------------------------------------------------------------------- */
3240
3241 StgWord
3242 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3243 {
3244     StgClosure *raise_closure = NULL;
3245     StgPtr p, next;
3246     StgRetInfoTable *info;
3247     //
3248     // This closure represents the expression 'raise# E' where E
3249     // is the exception raise.  It is used to overwrite all the
3250     // thunks which are currently under evaluataion.
3251     //
3252
3253     //    
3254     // LDV profiling: stg_raise_info has THUNK as its closure
3255     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3256     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3257     // 1 does not cause any problem unless profiling is performed.
3258     // However, when LDV profiling goes on, we need to linearly scan
3259     // small object pool, where raise_closure is stored, so we should
3260     // use MIN_UPD_SIZE.
3261     //
3262     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3263     //                                 sizeofW(StgClosure)+1);
3264     //
3265
3266     //
3267     // Walk up the stack, looking for the catch frame.  On the way,
3268     // we update any closures pointed to from update frames with the
3269     // raise closure that we just built.
3270     //
3271     p = tso->sp;
3272     while(1) {
3273         info = get_ret_itbl((StgClosure *)p);
3274         next = p + stack_frame_sizeW((StgClosure *)p);
3275         switch (info->i.type) {
3276             
3277         case UPDATE_FRAME:
3278             // Only create raise_closure if we need to.
3279             if (raise_closure == NULL) {
3280                 raise_closure = 
3281                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3282                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3283                 raise_closure->payload[0] = exception;
3284             }
3285             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3286             p = next;
3287             continue;
3288
3289         case ATOMICALLY_FRAME:
3290             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3291             tso->sp = p;
3292             return ATOMICALLY_FRAME;
3293             
3294         case CATCH_FRAME:
3295             tso->sp = p;
3296             return CATCH_FRAME;
3297
3298         case CATCH_STM_FRAME:
3299             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3300             tso->sp = p;
3301             return CATCH_STM_FRAME;
3302             
3303         case STOP_FRAME:
3304             tso->sp = p;
3305             return STOP_FRAME;
3306
3307         case CATCH_RETRY_FRAME:
3308         default:
3309             p = next; 
3310             continue;
3311         }
3312     }
3313 }
3314
3315
3316 /* -----------------------------------------------------------------------------
3317    findRetryFrameHelper
3318
3319    This function is called by the retry# primitive.  It traverses the stack
3320    leaving tso->sp referring to the frame which should handle the retry.  
3321
3322    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3323    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3324
3325    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3326    despite the similar implementation.
3327
3328    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3329    not be created within memory transactions.
3330    -------------------------------------------------------------------------- */
3331
3332 StgWord
3333 findRetryFrameHelper (StgTSO *tso)
3334 {
3335   StgPtr           p, next;
3336   StgRetInfoTable *info;
3337
3338   p = tso -> sp;
3339   while (1) {
3340     info = get_ret_itbl((StgClosure *)p);
3341     next = p + stack_frame_sizeW((StgClosure *)p);
3342     switch (info->i.type) {
3343       
3344     case ATOMICALLY_FRAME:
3345       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3346       tso->sp = p;
3347       return ATOMICALLY_FRAME;
3348       
3349     case CATCH_RETRY_FRAME:
3350       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3351       tso->sp = p;
3352       return CATCH_RETRY_FRAME;
3353       
3354     case CATCH_STM_FRAME:
3355     default:
3356       ASSERT(info->i.type != CATCH_FRAME);
3357       ASSERT(info->i.type != STOP_FRAME);
3358       p = next; 
3359       continue;
3360     }
3361   }
3362 }
3363    
3364 /* -----------------------------------------------------------------------------
3365    resurrectThreads is called after garbage collection on the list of
3366    threads found to be garbage.  Each of these threads will be woken
3367    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3368    on an MVar, or NonTermination if the thread was blocked on a Black
3369    Hole.
3370
3371    Locks: sched_mutex isn't held upon entry nor exit.
3372    -------------------------------------------------------------------------- */
3373
3374 void
3375 resurrectThreads( StgTSO *threads )
3376 {
3377   StgTSO *tso, *next;
3378
3379   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3380     next = tso->global_link;
3381     tso->global_link = all_threads;
3382     all_threads = tso;
3383     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3384
3385     switch (tso->why_blocked) {
3386     case BlockedOnMVar:
3387     case BlockedOnException:
3388       /* Called by GC - sched_mutex lock is currently held. */
3389       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3390       break;
3391     case BlockedOnBlackHole:
3392       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3393       break;
3394     case BlockedOnSTM:
3395       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3396       break;
3397     case NotBlocked:
3398       /* This might happen if the thread was blocked on a black hole
3399        * belonging to a thread that we've just woken up (raiseAsync
3400        * can wake up threads, remember...).
3401        */
3402       continue;
3403     default:
3404       barf("resurrectThreads: thread blocked in a strange way");
3405     }
3406   }
3407 }
3408
3409 /* ----------------------------------------------------------------------------
3410  * Debugging: why is a thread blocked
3411  * [Also provides useful information when debugging threaded programs
3412  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3413    ------------------------------------------------------------------------- */
3414
3415 static
3416 void
3417 printThreadBlockage(StgTSO *tso)
3418 {
3419   switch (tso->why_blocked) {
3420   case BlockedOnRead:
3421     debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3422     break;
3423   case BlockedOnWrite:
3424     debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3425     break;
3426 #if defined(mingw32_TARGET_OS)
3427     case BlockedOnDoProc:
3428     debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3429     break;
3430 #endif
3431   case BlockedOnDelay:
3432     debugBelch("is blocked until %d", tso->block_info.target);
3433     break;
3434   case BlockedOnMVar:
3435     debugBelch("is blocked on an MVar");
3436     break;
3437   case BlockedOnException:
3438     debugBelch("is blocked on delivering an exception to thread %d",
3439             tso->block_info.tso->id);
3440     break;
3441   case BlockedOnBlackHole:
3442     debugBelch("is blocked on a black hole");
3443     break;
3444   case NotBlocked:
3445     debugBelch("is not blocked");
3446     break;
3447 #if defined(PAR)
3448   case BlockedOnGA:
3449     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3450             tso->block_info.closure, info_type(tso->block_info.closure));
3451     break;
3452   case BlockedOnGA_NoSend:
3453     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3454             tso->block_info.closure, info_type(tso->block_info.closure));
3455     break;
3456 #endif
3457   case BlockedOnCCall:
3458     debugBelch("is blocked on an external call");
3459     break;
3460   case BlockedOnCCall_NoUnblockExc:
3461     debugBelch("is blocked on an external call (exceptions were already blocked)");
3462     break;
3463   case BlockedOnSTM:
3464     debugBelch("is blocked on an STM operation");
3465     break;
3466   default:
3467     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3468          tso->why_blocked, tso->id, tso);
3469   }
3470 }
3471
3472 static
3473 void
3474 printThreadStatus(StgTSO *tso)
3475 {
3476   switch (tso->what_next) {
3477   case ThreadKilled:
3478     debugBelch("has been killed");
3479     break;
3480   case ThreadComplete:
3481     debugBelch("has completed");
3482     break;
3483   default:
3484     printThreadBlockage(tso);
3485   }
3486 }
3487
3488 void
3489 printAllThreads(void)
3490 {
3491   StgTSO *t;
3492
3493 # if defined(GRAN)
3494   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3495   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3496                        time_string, rtsFalse/*no commas!*/);
3497
3498   debugBelch("all threads at [%s]:\n", time_string);
3499 # elif defined(PAR)
3500   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3501   ullong_format_string(CURRENT_TIME,
3502                        time_string, rtsFalse/*no commas!*/);
3503
3504   debugBelch("all threads at [%s]:\n", time_string);
3505 # else
3506   debugBelch("all threads:\n");
3507 # endif
3508
3509   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3510     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3511 #if defined(DEBUG)
3512     {
3513       void *label = lookupThreadLabel(t->id);
3514       if (label) debugBelch("[\"%s\"] ",(char *)label);
3515     }
3516 #endif
3517     printThreadStatus(t);
3518     debugBelch("\n");
3519   }
3520 }
3521     
3522 #ifdef DEBUG
3523
3524 /* 
3525    Print a whole blocking queue attached to node (debugging only).
3526 */
3527 # if defined(PAR)
3528 void 
3529 print_bq (StgClosure *node)
3530 {
3531   StgBlockingQueueElement *bqe;
3532   StgTSO *tso;
3533   rtsBool end;
3534
3535   debugBelch("## BQ of closure %p (%s): ",
3536           node, info_type(node));
3537
3538   /* should cover all closures that may have a blocking queue */
3539   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3540          get_itbl(node)->type == FETCH_ME_BQ ||
3541          get_itbl(node)->type == RBH ||
3542          get_itbl(node)->type == MVAR);
3543     
3544   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3545
3546   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3547 }
3548
3549 /* 
3550    Print a whole blocking queue starting with the element bqe.
3551 */
3552 void 
3553 print_bqe (StgBlockingQueueElement *bqe)
3554 {
3555   rtsBool end;
3556
3557   /* 
3558      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3559   */
3560   for (end = (bqe==END_BQ_QUEUE);
3561        !end; // iterate until bqe points to a CONSTR
3562        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3563        bqe = end ? END_BQ_QUEUE : bqe->link) {
3564     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3565     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3566     /* types of closures that may appear in a blocking queue */
3567     ASSERT(get_itbl(bqe)->type == TSO ||           
3568            get_itbl(bqe)->type == BLOCKED_FETCH || 
3569            get_itbl(bqe)->type == CONSTR); 
3570     /* only BQs of an RBH end with an RBH_Save closure */
3571     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3572
3573     switch (get_itbl(bqe)->type) {
3574     case TSO:
3575       debugBelch(" TSO %u (%x),",
3576               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3577       break;
3578     case BLOCKED_FETCH:
3579       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3580               ((StgBlockedFetch *)bqe)->node, 
3581               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3582               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3583               ((StgBlockedFetch *)bqe)->ga.weight);
3584       break;
3585     case CONSTR:
3586       debugBelch(" %s (IP %p),",
3587               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3588                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3589                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3590                "RBH_Save_?"), get_itbl(bqe));
3591       break;
3592     default:
3593       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3594            info_type((StgClosure *)bqe)); // , node, info_type(node));
3595       break;
3596     }
3597   } /* for */
3598   debugBelch("\n");
3599 }
3600 # elif defined(GRAN)
3601 void 
3602 print_bq (StgClosure *node)
3603 {
3604   StgBlockingQueueElement *bqe;
3605   PEs node_loc, tso_loc;
3606   rtsBool end;
3607
3608   /* should cover all closures that may have a blocking queue */
3609   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3610          get_itbl(node)->type == FETCH_ME_BQ ||
3611          get_itbl(node)->type == RBH);
3612     
3613   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3614   node_loc = where_is(node);
3615
3616   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3617           node, info_type(node), node_loc);
3618
3619   /* 
3620      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3621   */
3622   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3623        !end; // iterate until bqe points to a CONSTR
3624        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3625     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3626     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3627     /* types of closures that may appear in a blocking queue */
3628     ASSERT(get_itbl(bqe)->type == TSO ||           
3629            get_itbl(bqe)->type == CONSTR); 
3630     /* only BQs of an RBH end with an RBH_Save closure */
3631     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3632
3633     tso_loc = where_is((StgClosure *)bqe);
3634     switch (get_itbl(bqe)->type) {
3635     case TSO:
3636       debugBelch(" TSO %d (%p) on [PE %d],",
3637               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3638       break;
3639     case CONSTR:
3640       debugBelch(" %s (IP %p),",
3641               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3642                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3643                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3644                "RBH_Save_?"), get_itbl(bqe));
3645       break;
3646     default:
3647       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3648            info_type((StgClosure *)bqe), node, info_type(node));
3649       break;
3650     }
3651   } /* for */
3652   debugBelch("\n");
3653 }
3654 #else
3655 /* 
3656    Nice and easy: only TSOs on the blocking queue
3657 */
3658 void 
3659 print_bq (StgClosure *node)
3660 {
3661   StgTSO *tso;
3662
3663   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3664   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3665        tso != END_TSO_QUEUE; 
3666        tso=tso->link) {
3667     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3668     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3669     debugBelch(" TSO %d (%p),", tso->id, tso);
3670   }
3671   debugBelch("\n");
3672 }
3673 # endif
3674
3675 #if defined(PAR)
3676 static nat
3677 run_queue_len(void)
3678 {
3679   nat i;
3680   StgTSO *tso;
3681
3682   for (i=0, tso=run_queue_hd; 
3683        tso != END_TSO_QUEUE;
3684        i++, tso=tso->link)
3685     /* nothing */
3686
3687   return i;
3688 }
3689 #endif
3690
3691 void
3692 sched_belch(char *s, ...)
3693 {
3694   va_list ap;
3695   va_start(ap,s);
3696 #ifdef RTS_SUPPORTS_THREADS
3697   debugBelch("sched (task %p): ", osThreadId());
3698 #elif defined(PAR)
3699   debugBelch("== ");
3700 #else
3701   debugBelch("sched: ");
3702 #endif
3703   vdebugBelch(s, ap);
3704   debugBelch("\n");
3705   va_end(ap);
3706 }
3707
3708 #endif /* DEBUG */