04e9f1f43df5c1d9f89d22550ad72f44b6be3ea1
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "Hooks.h"
48 #define COMPILING_SCHEDULER
49 #include "Schedule.h"
50 #include "StgMiscClosures.h"
51 #include "Storage.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "Timer.h"
59 #include "Prelude.h"
60 #include "ThreadLabels.h"
61 #include "LdvProfile.h"
62 #include "Updates.h"
63 #ifdef PROFILING
64 #include "Proftimer.h"
65 #include "ProfHeap.h"
66 #endif
67 #if defined(GRAN) || defined(PAR)
68 # include "GranSimRts.h"
69 # include "GranSim.h"
70 # include "ParallelRts.h"
71 # include "Parallel.h"
72 # include "ParallelDebug.h"
73 # include "FetchMe.h"
74 # include "HLC.h"
75 #endif
76 #include "Sparks.h"
77 #include "Capability.h"
78 #include "OSThreads.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 #ifdef THREADED_RTS
97 #define USED_IN_THREADED_RTS
98 #else
99 #define USED_IN_THREADED_RTS STG_UNUSED
100 #endif
101
102 #ifdef RTS_SUPPORTS_THREADS
103 #define USED_WHEN_RTS_SUPPORTS_THREADS
104 #else
105 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
106 #endif
107
108 /* Main thread queue.
109  * Locks required: sched_mutex.
110  */
111 StgMainThread *main_threads = NULL;
112
113 /* Thread queues.
114  * Locks required: sched_mutex.
115  */
116 #if defined(GRAN)
117
118 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
119 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
120
121 /* 
122    In GranSim we have a runnable and a blocked queue for each processor.
123    In order to minimise code changes new arrays run_queue_hds/tls
124    are created. run_queue_hd is then a short cut (macro) for
125    run_queue_hds[CurrentProc] (see GranSim.h).
126    -- HWL
127 */
128 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
129 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
130 StgTSO *ccalling_threadss[MAX_PROC];
131 /* We use the same global list of threads (all_threads) in GranSim as in
132    the std RTS (i.e. we are cheating). However, we don't use this list in
133    the GranSim specific code at the moment (so we are only potentially
134    cheating).  */
135
136 #else /* !GRAN */
137
138 StgTSO *run_queue_hd = NULL;
139 StgTSO *run_queue_tl = NULL;
140 StgTSO *blocked_queue_hd = NULL;
141 StgTSO *blocked_queue_tl = NULL;
142 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
143
144 #endif
145
146 /* Linked list of all threads.
147  * Used for detecting garbage collected threads.
148  */
149 StgTSO *all_threads = NULL;
150
151 /* When a thread performs a safe C call (_ccall_GC, using old
152  * terminology), it gets put on the suspended_ccalling_threads
153  * list. Used by the garbage collector.
154  */
155 static StgTSO *suspended_ccalling_threads;
156
157 static StgTSO *threadStackOverflow(StgTSO *tso);
158
159 /* KH: The following two flags are shared memory locations.  There is no need
160        to lock them, since they are only unset at the end of a scheduler
161        operation.
162 */
163
164 /* flag set by signal handler to precipitate a context switch */
165 nat context_switch = 0;
166
167 /* if this flag is set as well, give up execution */
168 rtsBool interrupted = rtsFalse;
169
170 /* Next thread ID to allocate.
171  * Locks required: thread_id_mutex
172  */
173 static StgThreadID next_thread_id = 1;
174
175 /*
176  * Pointers to the state of the current thread.
177  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
178  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
179  */
180  
181 /* The smallest stack size that makes any sense is:
182  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
183  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
184  *  + 1                       (the closure to enter)
185  *  + 1                       (stg_ap_v_ret)
186  *  + 1                       (spare slot req'd by stg_ap_v_ret)
187  *
188  * A thread with this stack will bomb immediately with a stack
189  * overflow, which will increase its stack size.  
190  */
191
192 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
193
194
195 #if defined(GRAN)
196 StgTSO *CurrentTSO;
197 #endif
198
199 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
200  *  exists - earlier gccs apparently didn't.
201  *  -= chak
202  */
203 StgTSO dummy_tso;
204
205 static rtsBool ready_to_gc;
206
207 /*
208  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
209  * in an MT setting, needed to signal that a worker thread shouldn't hang around
210  * in the scheduler when it is out of work.
211  */
212 static rtsBool shutting_down_scheduler = rtsFalse;
213
214 void            addToBlockedQueue ( StgTSO *tso );
215
216 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
217        void     interruptStgRts   ( void );
218
219 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
220 static void     detectBlackHoles  ( void );
221 #endif
222
223 #if defined(RTS_SUPPORTS_THREADS)
224 /* ToDo: carefully document the invariants that go together
225  *       with these synchronisation objects.
226  */
227 Mutex     sched_mutex       = INIT_MUTEX_VAR;
228 Mutex     term_mutex        = INIT_MUTEX_VAR;
229
230 #endif /* RTS_SUPPORTS_THREADS */
231
232 #if defined(PAR)
233 StgTSO *LastTSO;
234 rtsTime TimeOfLastYield;
235 rtsBool emitSchedule = rtsTrue;
236 #endif
237
238 #if DEBUG
239 static char *whatNext_strs[] = {
240   "(unknown)",
241   "ThreadRunGHC",
242   "ThreadInterpret",
243   "ThreadKilled",
244   "ThreadRelocated",
245   "ThreadComplete"
246 };
247 #endif
248
249 #if defined(PAR)
250 StgTSO * createSparkThread(rtsSpark spark);
251 StgTSO * activateSpark (rtsSpark spark);  
252 #endif
253
254 /* ----------------------------------------------------------------------------
255  * Starting Tasks
256  * ------------------------------------------------------------------------- */
257
258 #if defined(RTS_SUPPORTS_THREADS)
259 static rtsBool startingWorkerThread = rtsFalse;
260
261 static void taskStart(void);
262 static void
263 taskStart(void)
264 {
265   ACQUIRE_LOCK(&sched_mutex);
266   startingWorkerThread = rtsFalse;
267   schedule(NULL,NULL);
268   RELEASE_LOCK(&sched_mutex);
269 }
270
271 void
272 startSchedulerTaskIfNecessary(void)
273 {
274   if(run_queue_hd != END_TSO_QUEUE
275     || blocked_queue_hd != END_TSO_QUEUE
276     || sleeping_queue != END_TSO_QUEUE)
277   {
278     if(!startingWorkerThread)
279     { // we don't want to start another worker thread
280       // just because the last one hasn't yet reached the
281       // "waiting for capability" state
282       startingWorkerThread = rtsTrue;
283       if(!startTask(taskStart))
284       {
285         startingWorkerThread = rtsFalse;
286       }
287     }
288   }
289 }
290 #endif
291
292 /* ---------------------------------------------------------------------------
293    Main scheduling loop.
294
295    We use round-robin scheduling, each thread returning to the
296    scheduler loop when one of these conditions is detected:
297
298       * out of heap space
299       * timer expires (thread yields)
300       * thread blocks
301       * thread ends
302       * stack overflow
303
304    Locking notes:  we acquire the scheduler lock once at the beginning
305    of the scheduler loop, and release it when
306     
307       * running a thread, or
308       * waiting for work, or
309       * waiting for a GC to complete.
310
311    GRAN version:
312      In a GranSim setup this loop iterates over the global event queue.
313      This revolves around the global event queue, which determines what 
314      to do next. Therefore, it's more complicated than either the 
315      concurrent or the parallel (GUM) setup.
316
317    GUM version:
318      GUM iterates over incoming messages.
319      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
320      and sends out a fish whenever it has nothing to do; in-between
321      doing the actual reductions (shared code below) it processes the
322      incoming messages and deals with delayed operations 
323      (see PendingFetches).
324      This is not the ugliest code you could imagine, but it's bloody close.
325
326    ------------------------------------------------------------------------ */
327 static void
328 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
329           Capability *initialCapability )
330 {
331   StgTSO *t;
332   Capability *cap;
333   StgThreadReturnCode ret;
334 #if defined(GRAN)
335   rtsEvent *event;
336 #elif defined(PAR)
337   StgSparkPool *pool;
338   rtsSpark spark;
339   StgTSO *tso;
340   GlobalTaskId pe;
341   rtsBool receivedFinish = rtsFalse;
342 # if defined(DEBUG)
343   nat tp_size, sp_size; // stats only
344 # endif
345 #endif
346   rtsBool was_interrupted = rtsFalse;
347   nat prev_what_next;
348   
349   // Pre-condition: sched_mutex is held.
350   // We might have a capability, passed in as initialCapability.
351   cap = initialCapability;
352
353 #if defined(RTS_SUPPORTS_THREADS)
354   //
355   // in the threaded case, the capability is either passed in via the
356   // initialCapability parameter, or initialized inside the scheduler
357   // loop 
358   //
359   IF_DEBUG(scheduler,
360            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
361                        mainThread, initialCapability);
362       );
363 #else
364   // simply initialise it in the non-threaded case
365   grabCapability(&cap);
366 #endif
367
368 #if defined(GRAN)
369   /* set up first event to get things going */
370   /* ToDo: assign costs for system setup and init MainTSO ! */
371   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
372             ContinueThread, 
373             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
374
375   IF_DEBUG(gran,
376            debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
377            G_TSO(CurrentTSO, 5));
378
379   if (RtsFlags.GranFlags.Light) {
380     /* Save current time; GranSim Light only */
381     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
382   }      
383
384   event = get_next_event();
385
386   while (event!=(rtsEvent*)NULL) {
387     /* Choose the processor with the next event */
388     CurrentProc = event->proc;
389     CurrentTSO = event->tso;
390
391 #elif defined(PAR)
392
393   while (!receivedFinish) {    /* set by processMessages */
394                                /* when receiving PP_FINISH message         */ 
395
396 #else // everything except GRAN and PAR
397
398   while (1) {
399
400 #endif
401
402      IF_DEBUG(scheduler, printAllThreads());
403
404 #if defined(RTS_SUPPORTS_THREADS)
405       // Yield the capability to higher-priority tasks if necessary.
406       //
407       if (cap != NULL) {
408           yieldCapability(&cap);
409       }
410
411       // If we do not currently hold a capability, we wait for one
412       //
413       if (cap == NULL) {
414           waitForCapability(&sched_mutex, &cap,
415                             mainThread ? &mainThread->bound_thread_cond : NULL);
416       }
417
418       // We now have a capability...
419 #endif
420
421     //
422     // If we're interrupted (the user pressed ^C, or some other
423     // termination condition occurred), kill all the currently running
424     // threads.
425     //
426     if (interrupted) {
427         IF_DEBUG(scheduler, sched_belch("interrupted"));
428         interrupted = rtsFalse;
429         was_interrupted = rtsTrue;
430 #if defined(RTS_SUPPORTS_THREADS)
431         // In the threaded RTS, deadlock detection doesn't work,
432         // so just exit right away.
433         errorBelch("interrupted");
434         releaseCapability(cap);
435         RELEASE_LOCK(&sched_mutex);
436         shutdownHaskellAndExit(EXIT_SUCCESS);
437 #else
438         deleteAllThreads();
439 #endif
440     }
441
442 #if defined(RTS_USER_SIGNALS)
443     // check for signals each time around the scheduler
444     if (signals_pending()) {
445       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
446       startSignalHandlers();
447       ACQUIRE_LOCK(&sched_mutex);
448     }
449 #endif
450
451     //
452     // Check whether any waiting threads need to be woken up.  If the
453     // run queue is empty, and there are no other tasks running, we
454     // can wait indefinitely for something to happen.
455     //
456     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
457 #if defined(RTS_SUPPORTS_THREADS)
458                 || EMPTY_RUN_QUEUE()
459 #endif
460         )
461     {
462       awaitEvent( EMPTY_RUN_QUEUE() );
463     }
464     // we can be interrupted while waiting for I/O...
465     if (interrupted) continue;
466
467     /* 
468      * Detect deadlock: when we have no threads to run, there are no
469      * threads waiting on I/O or sleeping, and all the other tasks are
470      * waiting for work, we must have a deadlock of some description.
471      *
472      * We first try to find threads blocked on themselves (ie. black
473      * holes), and generate NonTermination exceptions where necessary.
474      *
475      * If no threads are black holed, we have a deadlock situation, so
476      * inform all the main threads.
477      */
478 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
479     if (   EMPTY_THREAD_QUEUES() )
480     {
481         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
482         // Garbage collection can release some new threads due to
483         // either (a) finalizers or (b) threads resurrected because
484         // they are about to be send BlockedOnDeadMVar.  Any threads
485         // thus released will be immediately runnable.
486         GarbageCollect(GetRoots,rtsTrue);
487
488         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
489
490         IF_DEBUG(scheduler, 
491                  sched_belch("still deadlocked, checking for black holes..."));
492         detectBlackHoles();
493
494         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
495
496 #if defined(RTS_USER_SIGNALS)
497         /* If we have user-installed signal handlers, then wait
498          * for signals to arrive rather then bombing out with a
499          * deadlock.
500          */
501         if ( anyUserHandlers() ) {
502             IF_DEBUG(scheduler, 
503                      sched_belch("still deadlocked, waiting for signals..."));
504
505             awaitUserSignals();
506
507             // we might be interrupted...
508             if (interrupted) { continue; }
509
510             if (signals_pending()) {
511                 RELEASE_LOCK(&sched_mutex);
512                 startSignalHandlers();
513                 ACQUIRE_LOCK(&sched_mutex);
514             }
515             ASSERT(!EMPTY_RUN_QUEUE());
516             goto not_deadlocked;
517         }
518 #endif
519
520         /* Probably a real deadlock.  Send the current main thread the
521          * Deadlock exception (or in the SMP build, send *all* main
522          * threads the deadlock exception, since none of them can make
523          * progress).
524          */
525         {
526             StgMainThread *m;
527             m = main_threads;
528             switch (m->tso->why_blocked) {
529             case BlockedOnBlackHole:
530             case BlockedOnException:
531             case BlockedOnMVar:
532                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
533                 break;
534             default:
535                 barf("deadlock: main thread blocked in a strange way");
536             }
537         }
538     }
539   not_deadlocked:
540
541 #elif defined(RTS_SUPPORTS_THREADS)
542     // ToDo: add deadlock detection in threaded RTS
543 #elif defined(PAR)
544     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
545 #endif
546
547 #if defined(RTS_SUPPORTS_THREADS)
548     if ( EMPTY_RUN_QUEUE() ) {
549         continue; // nothing to do
550     }
551 #endif
552
553 #if defined(GRAN)
554     if (RtsFlags.GranFlags.Light)
555       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
556
557     /* adjust time based on time-stamp */
558     if (event->time > CurrentTime[CurrentProc] &&
559         event->evttype != ContinueThread)
560       CurrentTime[CurrentProc] = event->time;
561     
562     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
563     if (!RtsFlags.GranFlags.Light)
564       handleIdlePEs();
565
566     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
567
568     /* main event dispatcher in GranSim */
569     switch (event->evttype) {
570       /* Should just be continuing execution */
571     case ContinueThread:
572       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
573       /* ToDo: check assertion
574       ASSERT(run_queue_hd != (StgTSO*)NULL &&
575              run_queue_hd != END_TSO_QUEUE);
576       */
577       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
578       if (!RtsFlags.GranFlags.DoAsyncFetch &&
579           procStatus[CurrentProc]==Fetching) {
580         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
581               CurrentTSO->id, CurrentTSO, CurrentProc);
582         goto next_thread;
583       } 
584       /* Ignore ContinueThreads for completed threads */
585       if (CurrentTSO->what_next == ThreadComplete) {
586         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
587               CurrentTSO->id, CurrentTSO, CurrentProc);
588         goto next_thread;
589       } 
590       /* Ignore ContinueThreads for threads that are being migrated */
591       if (PROCS(CurrentTSO)==Nowhere) { 
592         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
593               CurrentTSO->id, CurrentTSO, CurrentProc);
594         goto next_thread;
595       }
596       /* The thread should be at the beginning of the run queue */
597       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
598         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
599               CurrentTSO->id, CurrentTSO, CurrentProc);
600         break; // run the thread anyway
601       }
602       /*
603       new_event(proc, proc, CurrentTime[proc],
604                 FindWork,
605                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
606       goto next_thread; 
607       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
608       break; // now actually run the thread; DaH Qu'vam yImuHbej 
609
610     case FetchNode:
611       do_the_fetchnode(event);
612       goto next_thread;             /* handle next event in event queue  */
613       
614     case GlobalBlock:
615       do_the_globalblock(event);
616       goto next_thread;             /* handle next event in event queue  */
617       
618     case FetchReply:
619       do_the_fetchreply(event);
620       goto next_thread;             /* handle next event in event queue  */
621       
622     case UnblockThread:   /* Move from the blocked queue to the tail of */
623       do_the_unblock(event);
624       goto next_thread;             /* handle next event in event queue  */
625       
626     case ResumeThread:  /* Move from the blocked queue to the tail of */
627       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
628       event->tso->gran.blocktime += 
629         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
630       do_the_startthread(event);
631       goto next_thread;             /* handle next event in event queue  */
632       
633     case StartThread:
634       do_the_startthread(event);
635       goto next_thread;             /* handle next event in event queue  */
636       
637     case MoveThread:
638       do_the_movethread(event);
639       goto next_thread;             /* handle next event in event queue  */
640       
641     case MoveSpark:
642       do_the_movespark(event);
643       goto next_thread;             /* handle next event in event queue  */
644       
645     case FindWork:
646       do_the_findwork(event);
647       goto next_thread;             /* handle next event in event queue  */
648       
649     default:
650       barf("Illegal event type %u\n", event->evttype);
651     }  /* switch */
652     
653     /* This point was scheduler_loop in the old RTS */
654
655     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
656
657     TimeOfLastEvent = CurrentTime[CurrentProc];
658     TimeOfNextEvent = get_time_of_next_event();
659     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
660     // CurrentTSO = ThreadQueueHd;
661
662     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
663                          TimeOfNextEvent));
664
665     if (RtsFlags.GranFlags.Light) 
666       GranSimLight_leave_system(event, &ActiveTSO); 
667
668     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
669
670     IF_DEBUG(gran, 
671              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
672
673     /* in a GranSim setup the TSO stays on the run queue */
674     t = CurrentTSO;
675     /* Take a thread from the run queue. */
676     POP_RUN_QUEUE(t); // take_off_run_queue(t);
677
678     IF_DEBUG(gran, 
679              debugBelch("GRAN: About to run current thread, which is\n");
680              G_TSO(t,5));
681
682     context_switch = 0; // turned on via GranYield, checking events and time slice
683
684     IF_DEBUG(gran, 
685              DumpGranEvent(GR_SCHEDULE, t));
686
687     procStatus[CurrentProc] = Busy;
688
689 #elif defined(PAR)
690     if (PendingFetches != END_BF_QUEUE) {
691         processFetches();
692     }
693
694     /* ToDo: phps merge with spark activation above */
695     /* check whether we have local work and send requests if we have none */
696     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
697       /* :-[  no local threads => look out for local sparks */
698       /* the spark pool for the current PE */
699       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
700       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
701           pool->hd < pool->tl) {
702         /* 
703          * ToDo: add GC code check that we really have enough heap afterwards!!
704          * Old comment:
705          * If we're here (no runnable threads) and we have pending
706          * sparks, we must have a space problem.  Get enough space
707          * to turn one of those pending sparks into a
708          * thread... 
709          */
710
711         spark = findSpark(rtsFalse);                /* get a spark */
712         if (spark != (rtsSpark) NULL) {
713           tso = activateSpark(spark);       /* turn the spark into a thread */
714           IF_PAR_DEBUG(schedule,
715                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
716                              tso->id, tso, advisory_thread_count));
717
718           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
719             debugBelch("==^^ failed to activate spark\n");
720             goto next_thread;
721           }               /* otherwise fall through & pick-up new tso */
722         } else {
723           IF_PAR_DEBUG(verbose,
724                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
725                              spark_queue_len(pool)));
726           goto next_thread;
727         }
728       }
729
730       /* If we still have no work we need to send a FISH to get a spark
731          from another PE 
732       */
733       if (EMPTY_RUN_QUEUE()) {
734       /* =8-[  no local sparks => look for work on other PEs */
735         /*
736          * We really have absolutely no work.  Send out a fish
737          * (there may be some out there already), and wait for
738          * something to arrive.  We clearly can't run any threads
739          * until a SCHEDULE or RESUME arrives, and so that's what
740          * we're hoping to see.  (Of course, we still have to
741          * respond to other types of messages.)
742          */
743         TIME now = msTime() /*CURRENT_TIME*/;
744         IF_PAR_DEBUG(verbose, 
745                      debugBelch("--  now=%ld\n", now));
746         IF_PAR_DEBUG(verbose,
747                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
748                          (last_fish_arrived_at!=0 &&
749                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
750                        debugBelch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)\n",
751                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
752                              last_fish_arrived_at,
753                              RtsFlags.ParFlags.fishDelay, now);
754                      });
755         
756         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
757             (last_fish_arrived_at==0 ||
758              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
759           /* outstandingFishes is set in sendFish, processFish;
760              avoid flooding system with fishes via delay */
761           pe = choosePE();
762           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
763                    NEW_FISH_HUNGER);
764
765           // Global statistics: count no. of fishes
766           if (RtsFlags.ParFlags.ParStats.Global &&
767               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
768             globalParStats.tot_fish_mess++;
769           }
770         }
771       
772         receivedFinish = processMessages();
773         goto next_thread;
774       }
775     } else if (PacketsWaiting()) {  /* Look for incoming messages */
776       receivedFinish = processMessages();
777     }
778
779     /* Now we are sure that we have some work available */
780     ASSERT(run_queue_hd != END_TSO_QUEUE);
781
782     /* Take a thread from the run queue, if we have work */
783     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
784     IF_DEBUG(sanity,checkTSO(t));
785
786     /* ToDo: write something to the log-file
787     if (RTSflags.ParFlags.granSimStats && !sameThread)
788         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
789
790     CurrentTSO = t;
791     */
792     /* the spark pool for the current PE */
793     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
794
795     IF_DEBUG(scheduler, 
796              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
797                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
798
799 # if 1
800     if (0 && RtsFlags.ParFlags.ParStats.Full && 
801         t && LastTSO && t->id != LastTSO->id && 
802         LastTSO->why_blocked == NotBlocked && 
803         LastTSO->what_next != ThreadComplete) {
804       // if previously scheduled TSO not blocked we have to record the context switch
805       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
806                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
807     }
808
809     if (RtsFlags.ParFlags.ParStats.Full && 
810         (emitSchedule /* forced emit */ ||
811         (t && LastTSO && t->id != LastTSO->id))) {
812       /* 
813          we are running a different TSO, so write a schedule event to log file
814          NB: If we use fair scheduling we also have to write  a deschedule 
815              event for LastTSO; with unfair scheduling we know that the
816              previous tso has blocked whenever we switch to another tso, so
817              we don't need it in GUM for now
818       */
819       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
820                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
821       emitSchedule = rtsFalse;
822     }
823      
824 # endif
825 #else /* !GRAN && !PAR */
826   
827     // grab a thread from the run queue
828     ASSERT(run_queue_hd != END_TSO_QUEUE);
829     POP_RUN_QUEUE(t);
830
831     // Sanity check the thread we're about to run.  This can be
832     // expensive if there is lots of thread switching going on...
833     IF_DEBUG(sanity,checkTSO(t));
834 #endif
835
836 #ifdef THREADED_RTS
837     {
838       StgMainThread *m = t->main;
839       
840       if(m)
841       {
842         if(m == mainThread)
843         {
844           IF_DEBUG(scheduler,
845             sched_belch("### Running thread %d in bound thread", t->id));
846           // yes, the Haskell thread is bound to the current native thread
847         }
848         else
849         {
850           IF_DEBUG(scheduler,
851             sched_belch("### thread %d bound to another OS thread", t->id));
852           // no, bound to a different Haskell thread: pass to that thread
853           PUSH_ON_RUN_QUEUE(t);
854           passCapability(&m->bound_thread_cond);
855           continue;
856         }
857       }
858       else
859       {
860         if(mainThread != NULL)
861         // The thread we want to run is bound.
862         {
863           IF_DEBUG(scheduler,
864             sched_belch("### this OS thread cannot run thread %d", t->id));
865           // no, the current native thread is bound to a different
866           // Haskell thread, so pass it to any worker thread
867           PUSH_ON_RUN_QUEUE(t);
868           passCapabilityToWorker();
869           continue; 
870         }
871       }
872     }
873 #endif
874
875     cap->r.rCurrentTSO = t;
876     
877     /* context switches are now initiated by the timer signal, unless
878      * the user specified "context switch as often as possible", with
879      * +RTS -C0
880      */
881     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
882          && (run_queue_hd != END_TSO_QUEUE
883              || blocked_queue_hd != END_TSO_QUEUE
884              || sleeping_queue != END_TSO_QUEUE)))
885         context_switch = 1;
886
887 run_thread:
888
889     RELEASE_LOCK(&sched_mutex);
890
891     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
892                               (long)t->id, whatNext_strs[t->what_next]));
893
894 #ifdef PROFILING
895     startHeapProfTimer();
896 #endif
897
898     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
899     /* Run the current thread 
900      */
901     prev_what_next = t->what_next;
902
903     errno = t->saved_errno;
904
905     switch (prev_what_next) {
906
907     case ThreadKilled:
908     case ThreadComplete:
909         /* Thread already finished, return to scheduler. */
910         ret = ThreadFinished;
911         break;
912
913     case ThreadRunGHC:
914         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
915         break;
916
917     case ThreadInterpret:
918         ret = interpretBCO(cap);
919         break;
920
921     default:
922       barf("schedule: invalid what_next field");
923     }
924
925     // The TSO might have moved, so find the new location:
926     t = cap->r.rCurrentTSO;
927
928     // And save the current errno in this thread.
929     t->saved_errno = errno;
930
931     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
932     
933     /* Costs for the scheduler are assigned to CCS_SYSTEM */
934 #ifdef PROFILING
935     stopHeapProfTimer();
936     CCCS = CCS_SYSTEM;
937 #endif
938     
939     ACQUIRE_LOCK(&sched_mutex);
940     
941 #ifdef RTS_SUPPORTS_THREADS
942     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
943 #elif !defined(GRAN) && !defined(PAR)
944     IF_DEBUG(scheduler,debugBelch("sched: "););
945 #endif
946     
947 #if defined(PAR)
948     /* HACK 675: if the last thread didn't yield, make sure to print a 
949        SCHEDULE event to the log file when StgRunning the next thread, even
950        if it is the same one as before */
951     LastTSO = t; 
952     TimeOfLastYield = CURRENT_TIME;
953 #endif
954
955     switch (ret) {
956     case HeapOverflow:
957 #if defined(GRAN)
958       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
959       globalGranStats.tot_heapover++;
960 #elif defined(PAR)
961       globalParStats.tot_heapover++;
962 #endif
963
964       // did the task ask for a large block?
965       if (cap->r.rHpAlloc > BLOCK_SIZE) {
966           // if so, get one and push it on the front of the nursery.
967           bdescr *bd;
968           nat blocks;
969           
970           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
971
972           IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %d)\n", 
973                                    (long)t->id, whatNext_strs[t->what_next], blocks));
974
975           // don't do this if it would push us over the
976           // alloc_blocks_lim limit; we'll GC first.
977           if (alloc_blocks + blocks < alloc_blocks_lim) {
978
979               alloc_blocks += blocks;
980               bd = allocGroup( blocks );
981
982               // link the new group into the list
983               bd->link = cap->r.rCurrentNursery;
984               bd->u.back = cap->r.rCurrentNursery->u.back;
985               if (cap->r.rCurrentNursery->u.back != NULL) {
986                   cap->r.rCurrentNursery->u.back->link = bd;
987               } else {
988                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
989                          g0s0->blocks == cap->r.rNursery);
990                   cap->r.rNursery = g0s0->blocks = bd;
991               }           
992               cap->r.rCurrentNursery->u.back = bd;
993
994               // initialise it as a nursery block.  We initialise the
995               // step, gen_no, and flags field of *every* sub-block in
996               // this large block, because this is easier than making
997               // sure that we always find the block head of a large
998               // block whenever we call Bdescr() (eg. evacuate() and
999               // isAlive() in the GC would both have to do this, at
1000               // least).
1001               { 
1002                   bdescr *x;
1003                   for (x = bd; x < bd + blocks; x++) {
1004                       x->step = g0s0;
1005                       x->gen_no = 0;
1006                       x->flags = 0;
1007                   }
1008               }
1009
1010               // don't forget to update the block count in g0s0.
1011               g0s0->n_blocks += blocks;
1012               // This assert can be a killer if the app is doing lots
1013               // of large block allocations.
1014               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1015
1016               // now update the nursery to point to the new block
1017               cap->r.rCurrentNursery = bd;
1018
1019               // we might be unlucky and have another thread get on the
1020               // run queue before us and steal the large block, but in that
1021               // case the thread will just end up requesting another large
1022               // block.
1023               PUSH_ON_RUN_QUEUE(t);
1024               break;
1025           }
1026       }
1027
1028       /* make all the running tasks block on a condition variable,
1029        * maybe set context_switch and wait till they all pile in,
1030        * then have them wait on a GC condition variable.
1031        */
1032       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1033                                (long)t->id, whatNext_strs[t->what_next]));
1034       threadPaused(t);
1035 #if defined(GRAN)
1036       ASSERT(!is_on_queue(t,CurrentProc));
1037 #elif defined(PAR)
1038       /* Currently we emit a DESCHEDULE event before GC in GUM.
1039          ToDo: either add separate event to distinguish SYSTEM time from rest
1040                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1041       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1042         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1043                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1044         emitSchedule = rtsTrue;
1045       }
1046 #endif
1047       
1048       ready_to_gc = rtsTrue;
1049       context_switch = 1;               /* stop other threads ASAP */
1050       PUSH_ON_RUN_QUEUE(t);
1051       /* actual GC is done at the end of the while loop */
1052       break;
1053       
1054     case StackOverflow:
1055 #if defined(GRAN)
1056       IF_DEBUG(gran, 
1057                DumpGranEvent(GR_DESCHEDULE, t));
1058       globalGranStats.tot_stackover++;
1059 #elif defined(PAR)
1060       // IF_DEBUG(par, 
1061       // DumpGranEvent(GR_DESCHEDULE, t);
1062       globalParStats.tot_stackover++;
1063 #endif
1064       IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1065                                (long)t->id, whatNext_strs[t->what_next]));
1066       /* just adjust the stack for this thread, then pop it back
1067        * on the run queue.
1068        */
1069       threadPaused(t);
1070       { 
1071         /* enlarge the stack */
1072         StgTSO *new_t = threadStackOverflow(t);
1073         
1074         /* This TSO has moved, so update any pointers to it from the
1075          * main thread stack.  It better not be on any other queues...
1076          * (it shouldn't be).
1077          */
1078         if (t->main != NULL) {
1079             t->main->tso = new_t;
1080         }
1081         PUSH_ON_RUN_QUEUE(new_t);
1082       }
1083       break;
1084
1085     case ThreadYielding:
1086       // Reset the context switch flag.  We don't do this just before
1087       // running the thread, because that would mean we would lose ticks
1088       // during GC, which can lead to unfair scheduling (a thread hogs
1089       // the CPU because the tick always arrives during GC).  This way
1090       // penalises threads that do a lot of allocation, but that seems
1091       // better than the alternative.
1092       context_switch = 0;
1093
1094 #if defined(GRAN)
1095       IF_DEBUG(gran, 
1096                DumpGranEvent(GR_DESCHEDULE, t));
1097       globalGranStats.tot_yields++;
1098 #elif defined(PAR)
1099       // IF_DEBUG(par, 
1100       // DumpGranEvent(GR_DESCHEDULE, t);
1101       globalParStats.tot_yields++;
1102 #endif
1103       /* put the thread back on the run queue.  Then, if we're ready to
1104        * GC, check whether this is the last task to stop.  If so, wake
1105        * up the GC thread.  getThread will block during a GC until the
1106        * GC is finished.
1107        */
1108       IF_DEBUG(scheduler,
1109                if (t->what_next != prev_what_next) {
1110                    debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1111                          (long)t->id, whatNext_strs[t->what_next]);
1112                } else {
1113                    debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1114                          (long)t->id, whatNext_strs[t->what_next]);
1115                }
1116                );
1117
1118       IF_DEBUG(sanity,
1119                //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1120                checkTSO(t));
1121       ASSERT(t->link == END_TSO_QUEUE);
1122
1123       // Shortcut if we're just switching evaluators: don't bother
1124       // doing stack squeezing (which can be expensive), just run the
1125       // thread.
1126       if (t->what_next != prev_what_next) {
1127           goto run_thread;
1128       }
1129
1130       threadPaused(t);
1131
1132 #if defined(GRAN)
1133       ASSERT(!is_on_queue(t,CurrentProc));
1134
1135       IF_DEBUG(sanity,
1136                //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1137                checkThreadQsSanity(rtsTrue));
1138 #endif
1139
1140 #if defined(PAR)
1141       if (RtsFlags.ParFlags.doFairScheduling) { 
1142         /* this does round-robin scheduling; good for concurrency */
1143         APPEND_TO_RUN_QUEUE(t);
1144       } else {
1145         /* this does unfair scheduling; good for parallelism */
1146         PUSH_ON_RUN_QUEUE(t);
1147       }
1148 #else
1149       // this does round-robin scheduling; good for concurrency
1150       APPEND_TO_RUN_QUEUE(t);
1151 #endif
1152
1153 #if defined(GRAN)
1154       /* add a ContinueThread event to actually process the thread */
1155       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1156                 ContinueThread,
1157                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1158       IF_GRAN_DEBUG(bq, 
1159                debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1160                G_EVENTQ(0);
1161                G_CURR_THREADQ(0));
1162 #endif /* GRAN */
1163       break;
1164
1165     case ThreadBlocked:
1166 #if defined(GRAN)
1167       IF_DEBUG(scheduler,
1168                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1169                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1170                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1171
1172       // ??? needed; should emit block before
1173       IF_DEBUG(gran, 
1174                DumpGranEvent(GR_DESCHEDULE, t)); 
1175       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1176       /*
1177         ngoq Dogh!
1178       ASSERT(procStatus[CurrentProc]==Busy || 
1179               ((procStatus[CurrentProc]==Fetching) && 
1180               (t->block_info.closure!=(StgClosure*)NULL)));
1181       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1182           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1183             procStatus[CurrentProc]==Fetching)) 
1184         procStatus[CurrentProc] = Idle;
1185       */
1186 #elif defined(PAR)
1187       IF_DEBUG(scheduler,
1188                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1189                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1190       IF_PAR_DEBUG(bq,
1191
1192                    if (t->block_info.closure!=(StgClosure*)NULL) 
1193                      print_bq(t->block_info.closure));
1194
1195       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1196       blockThread(t);
1197
1198       /* whatever we schedule next, we must log that schedule */
1199       emitSchedule = rtsTrue;
1200
1201 #else /* !GRAN */
1202       /* don't need to do anything.  Either the thread is blocked on
1203        * I/O, in which case we'll have called addToBlockedQueue
1204        * previously, or it's blocked on an MVar or Blackhole, in which
1205        * case it'll be on the relevant queue already.
1206        */
1207       IF_DEBUG(scheduler,
1208                debugBelch("--<< thread %d (%s) stopped: ", 
1209                        t->id, whatNext_strs[t->what_next]);
1210                printThreadBlockage(t);
1211                debugBelch("\n"));
1212
1213       /* Only for dumping event to log file 
1214          ToDo: do I need this in GranSim, too?
1215       blockThread(t);
1216       */
1217 #endif
1218       threadPaused(t);
1219       break;
1220
1221     case ThreadFinished:
1222       /* Need to check whether this was a main thread, and if so, signal
1223        * the task that started it with the return value.  If we have no
1224        * more main threads, we probably need to stop all the tasks until
1225        * we get a new one.
1226        */
1227       /* We also end up here if the thread kills itself with an
1228        * uncaught exception, see Exception.hc.
1229        */
1230       IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1231                                t->id, whatNext_strs[t->what_next]));
1232 #if defined(GRAN)
1233       endThread(t, CurrentProc); // clean-up the thread
1234 #elif defined(PAR)
1235       /* For now all are advisory -- HWL */
1236       //if(t->priority==AdvisoryPriority) ??
1237       advisory_thread_count--;
1238       
1239 # ifdef DIST
1240       if(t->dist.priority==RevalPriority)
1241         FinishReval(t);
1242 # endif
1243       
1244       if (RtsFlags.ParFlags.ParStats.Full &&
1245           !RtsFlags.ParFlags.ParStats.Suppressed) 
1246         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1247 #endif
1248
1249       //
1250       // Check whether the thread that just completed was a main
1251       // thread, and if so return with the result.  
1252       //
1253       // There is an assumption here that all thread completion goes
1254       // through this point; we need to make sure that if a thread
1255       // ends up in the ThreadKilled state, that it stays on the run
1256       // queue so it can be dealt with here.
1257       //
1258       if (
1259 #if defined(RTS_SUPPORTS_THREADS)
1260           mainThread != NULL
1261 #else
1262           mainThread->tso == t
1263 #endif
1264           )
1265       {
1266           // We are a bound thread: this must be our thread that just
1267           // completed.
1268           ASSERT(mainThread->tso == t);
1269
1270           if (t->what_next == ThreadComplete) {
1271               if (mainThread->ret) {
1272                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1273                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1274               }
1275               mainThread->stat = Success;
1276           } else {
1277               if (mainThread->ret) {
1278                   *(mainThread->ret) = NULL;
1279               }
1280               if (was_interrupted) {
1281                   mainThread->stat = Interrupted;
1282               } else {
1283                   mainThread->stat = Killed;
1284               }
1285           }
1286 #ifdef DEBUG
1287           removeThreadLabel((StgWord)mainThread->tso->id);
1288 #endif
1289           if (mainThread->prev == NULL) {
1290               main_threads = mainThread->link;
1291           } else {
1292               mainThread->prev->link = mainThread->link;
1293           }
1294           if (mainThread->link != NULL) {
1295               mainThread->link->prev = NULL;
1296           }
1297           releaseCapability(cap);
1298           return;
1299       }
1300
1301 #ifdef RTS_SUPPORTS_THREADS
1302       ASSERT(t->main == NULL);
1303 #else
1304       if (t->main != NULL) {
1305           // Must be a main thread that is not the topmost one.  Leave
1306           // it on the run queue until the stack has unwound to the
1307           // point where we can deal with this.  Leaving it on the run
1308           // queue also ensures that the garbage collector knows about
1309           // this thread and its return value (it gets dropped from the
1310           // all_threads list so there's no other way to find it).
1311           APPEND_TO_RUN_QUEUE(t);
1312       }
1313 #endif
1314       break;
1315
1316     default:
1317       barf("schedule: invalid thread return code %d", (int)ret);
1318     }
1319
1320 #ifdef PROFILING
1321     // When we have +RTS -i0 and we're heap profiling, do a census at
1322     // every GC.  This lets us get repeatable runs for debugging.
1323     if (performHeapProfile ||
1324         (RtsFlags.ProfFlags.profileInterval==0 &&
1325          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1326         GarbageCollect(GetRoots, rtsTrue);
1327         heapCensus();
1328         performHeapProfile = rtsFalse;
1329         ready_to_gc = rtsFalse; // we already GC'd
1330     }
1331 #endif
1332
1333     if (ready_to_gc) {
1334       /* everybody back, start the GC.
1335        * Could do it in this thread, or signal a condition var
1336        * to do it in another thread.  Either way, we need to
1337        * broadcast on gc_pending_cond afterward.
1338        */
1339 #if defined(RTS_SUPPORTS_THREADS)
1340       IF_DEBUG(scheduler,sched_belch("doing GC"));
1341 #endif
1342       GarbageCollect(GetRoots,rtsFalse);
1343       ready_to_gc = rtsFalse;
1344 #if defined(GRAN)
1345       /* add a ContinueThread event to continue execution of current thread */
1346       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1347                 ContinueThread,
1348                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1349       IF_GRAN_DEBUG(bq, 
1350                debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1351                G_EVENTQ(0);
1352                G_CURR_THREADQ(0));
1353 #endif /* GRAN */
1354     }
1355
1356 #if defined(GRAN)
1357   next_thread:
1358     IF_GRAN_DEBUG(unused,
1359                   print_eventq(EventHd));
1360
1361     event = get_next_event();
1362 #elif defined(PAR)
1363   next_thread:
1364     /* ToDo: wait for next message to arrive rather than busy wait */
1365 #endif /* GRAN */
1366
1367   } /* end of while(1) */
1368
1369   IF_PAR_DEBUG(verbose,
1370                debugBelch("== Leaving schedule() after having received Finish\n"));
1371 }
1372
1373 /* ---------------------------------------------------------------------------
1374  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1375  * used by Control.Concurrent for error checking.
1376  * ------------------------------------------------------------------------- */
1377  
1378 StgBool
1379 rtsSupportsBoundThreads(void)
1380 {
1381 #ifdef THREADED_RTS
1382   return rtsTrue;
1383 #else
1384   return rtsFalse;
1385 #endif
1386 }
1387
1388 /* ---------------------------------------------------------------------------
1389  * isThreadBound(tso): check whether tso is bound to an OS thread.
1390  * ------------------------------------------------------------------------- */
1391  
1392 StgBool
1393 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1394 {
1395 #ifdef THREADED_RTS
1396   return (tso->main != NULL);
1397 #endif
1398   return rtsFalse;
1399 }
1400
1401 /* ---------------------------------------------------------------------------
1402  * Singleton fork(). Do not copy any running threads.
1403  * ------------------------------------------------------------------------- */
1404
1405 #ifndef mingw32_TARGET_OS
1406 #define FORKPROCESS_PRIMOP_SUPPORTED
1407 #endif
1408
1409 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1410 static void 
1411 deleteThreadImmediately(StgTSO *tso);
1412 #endif
1413 StgInt
1414 forkProcess(HsStablePtr *entry
1415 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1416             STG_UNUSED
1417 #endif
1418            )
1419 {
1420 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1421   pid_t pid;
1422   StgTSO* t,*next;
1423   StgMainThread *m;
1424   SchedulerStatus rc;
1425
1426   IF_DEBUG(scheduler,sched_belch("forking!"));
1427   rts_lock(); // This not only acquires sched_mutex, it also
1428               // makes sure that no other threads are running
1429
1430   pid = fork();
1431
1432   if (pid) { /* parent */
1433
1434   /* just return the pid */
1435     rts_unlock();
1436     return pid;
1437     
1438   } else { /* child */
1439     
1440     
1441       // delete all threads
1442     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1443     
1444     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1445       next = t->link;
1446
1447         // don't allow threads to catch the ThreadKilled exception
1448       deleteThreadImmediately(t);
1449     }
1450     
1451       // wipe the main thread list
1452     while((m = main_threads) != NULL) {
1453       main_threads = m->link;
1454 # ifdef THREADED_RTS
1455       closeCondition(&m->bound_thread_cond);
1456 # endif
1457       stgFree(m);
1458     }
1459     
1460 # ifdef RTS_SUPPORTS_THREADS
1461     resetTaskManagerAfterFork();      // tell startTask() and friends that
1462     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1463     resetWorkerWakeupPipeAfterFork();
1464 # endif
1465     
1466     rc = rts_evalStableIO(entry, NULL);  // run the action
1467     rts_checkSchedStatus("forkProcess",rc);
1468     
1469     rts_unlock();
1470     
1471     hs_exit();                      // clean up and exit
1472     stg_exit(0);
1473   }
1474 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1475   barf("forkProcess#: primop not supported, sorry!\n");
1476   return -1;
1477 #endif
1478 }
1479
1480 /* ---------------------------------------------------------------------------
1481  * deleteAllThreads():  kill all the live threads.
1482  *
1483  * This is used when we catch a user interrupt (^C), before performing
1484  * any necessary cleanups and running finalizers.
1485  *
1486  * Locks: sched_mutex held.
1487  * ------------------------------------------------------------------------- */
1488    
1489 void
1490 deleteAllThreads ( void )
1491 {
1492   StgTSO* t, *next;
1493   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1494   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1495       next = t->global_link;
1496       deleteThread(t);
1497   }      
1498
1499   // The run queue now contains a bunch of ThreadKilled threads.  We
1500   // must not throw these away: the main thread(s) will be in there
1501   // somewhere, and the main scheduler loop has to deal with it.
1502   // Also, the run queue is the only thing keeping these threads from
1503   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1504
1505   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1506   ASSERT(sleeping_queue == END_TSO_QUEUE);
1507 }
1508
1509 /* startThread and  insertThread are now in GranSim.c -- HWL */
1510
1511
1512 /* ---------------------------------------------------------------------------
1513  * Suspending & resuming Haskell threads.
1514  * 
1515  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1516  * its capability before calling the C function.  This allows another
1517  * task to pick up the capability and carry on running Haskell
1518  * threads.  It also means that if the C call blocks, it won't lock
1519  * the whole system.
1520  *
1521  * The Haskell thread making the C call is put to sleep for the
1522  * duration of the call, on the susepended_ccalling_threads queue.  We
1523  * give out a token to the task, which it can use to resume the thread
1524  * on return from the C function.
1525  * ------------------------------------------------------------------------- */
1526    
1527 StgInt
1528 suspendThread( StgRegTable *reg )
1529 {
1530   nat tok;
1531   Capability *cap;
1532   int saved_errno = errno;
1533
1534   /* assume that *reg is a pointer to the StgRegTable part
1535    * of a Capability.
1536    */
1537   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1538
1539   ACQUIRE_LOCK(&sched_mutex);
1540
1541   IF_DEBUG(scheduler,
1542            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1543
1544   // XXX this might not be necessary --SDM
1545   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1546
1547   threadPaused(cap->r.rCurrentTSO);
1548   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1549   suspended_ccalling_threads = cap->r.rCurrentTSO;
1550
1551   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1552       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1553       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1554   } else {
1555       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1556   }
1557
1558   /* Use the thread ID as the token; it should be unique */
1559   tok = cap->r.rCurrentTSO->id;
1560
1561   /* Hand back capability */
1562   releaseCapability(cap);
1563   
1564 #if defined(RTS_SUPPORTS_THREADS)
1565   /* Preparing to leave the RTS, so ensure there's a native thread/task
1566      waiting to take over.
1567   */
1568   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1569 #endif
1570
1571   /* Other threads _might_ be available for execution; signal this */
1572   THREAD_RUNNABLE();
1573   RELEASE_LOCK(&sched_mutex);
1574   
1575   errno = saved_errno;
1576   return tok; 
1577 }
1578
1579 StgRegTable *
1580 resumeThread( StgInt tok )
1581 {
1582   StgTSO *tso, **prev;
1583   Capability *cap;
1584   int saved_errno = errno;
1585
1586 #if defined(RTS_SUPPORTS_THREADS)
1587   /* Wait for permission to re-enter the RTS with the result. */
1588   ACQUIRE_LOCK(&sched_mutex);
1589   waitForReturnCapability(&sched_mutex, &cap);
1590
1591   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1592 #else
1593   grabCapability(&cap);
1594 #endif
1595
1596   /* Remove the thread off of the suspended list */
1597   prev = &suspended_ccalling_threads;
1598   for (tso = suspended_ccalling_threads; 
1599        tso != END_TSO_QUEUE; 
1600        prev = &tso->link, tso = tso->link) {
1601     if (tso->id == (StgThreadID)tok) {
1602       *prev = tso->link;
1603       break;
1604     }
1605   }
1606   if (tso == END_TSO_QUEUE) {
1607     barf("resumeThread: thread not found");
1608   }
1609   tso->link = END_TSO_QUEUE;
1610   
1611   if(tso->why_blocked == BlockedOnCCall) {
1612       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1613       tso->blocked_exceptions = NULL;
1614   }
1615   
1616   /* Reset blocking status */
1617   tso->why_blocked  = NotBlocked;
1618
1619   cap->r.rCurrentTSO = tso;
1620   RELEASE_LOCK(&sched_mutex);
1621   errno = saved_errno;
1622   return &cap->r;
1623 }
1624
1625
1626 /* ---------------------------------------------------------------------------
1627  * Static functions
1628  * ------------------------------------------------------------------------ */
1629 static void unblockThread(StgTSO *tso);
1630
1631 /* ---------------------------------------------------------------------------
1632  * Comparing Thread ids.
1633  *
1634  * This is used from STG land in the implementation of the
1635  * instances of Eq/Ord for ThreadIds.
1636  * ------------------------------------------------------------------------ */
1637
1638 int
1639 cmp_thread(StgPtr tso1, StgPtr tso2) 
1640
1641   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1642   StgThreadID id2 = ((StgTSO *)tso2)->id;
1643  
1644   if (id1 < id2) return (-1);
1645   if (id1 > id2) return 1;
1646   return 0;
1647 }
1648
1649 /* ---------------------------------------------------------------------------
1650  * Fetching the ThreadID from an StgTSO.
1651  *
1652  * This is used in the implementation of Show for ThreadIds.
1653  * ------------------------------------------------------------------------ */
1654 int
1655 rts_getThreadId(StgPtr tso) 
1656 {
1657   return ((StgTSO *)tso)->id;
1658 }
1659
1660 #ifdef DEBUG
1661 void
1662 labelThread(StgPtr tso, char *label)
1663 {
1664   int len;
1665   void *buf;
1666
1667   /* Caveat: Once set, you can only set the thread name to "" */
1668   len = strlen(label)+1;
1669   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1670   strncpy(buf,label,len);
1671   /* Update will free the old memory for us */
1672   updateThreadLabel(((StgTSO *)tso)->id,buf);
1673 }
1674 #endif /* DEBUG */
1675
1676 /* ---------------------------------------------------------------------------
1677    Create a new thread.
1678
1679    The new thread starts with the given stack size.  Before the
1680    scheduler can run, however, this thread needs to have a closure
1681    (and possibly some arguments) pushed on its stack.  See
1682    pushClosure() in Schedule.h.
1683
1684    createGenThread() and createIOThread() (in SchedAPI.h) are
1685    convenient packaged versions of this function.
1686
1687    currently pri (priority) is only used in a GRAN setup -- HWL
1688    ------------------------------------------------------------------------ */
1689 #if defined(GRAN)
1690 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1691 StgTSO *
1692 createThread(nat size, StgInt pri)
1693 #else
1694 StgTSO *
1695 createThread(nat size)
1696 #endif
1697 {
1698
1699     StgTSO *tso;
1700     nat stack_size;
1701
1702     /* First check whether we should create a thread at all */
1703 #if defined(PAR)
1704   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1705   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1706     threadsIgnored++;
1707     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
1708           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1709     return END_TSO_QUEUE;
1710   }
1711   threadsCreated++;
1712 #endif
1713
1714 #if defined(GRAN)
1715   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1716 #endif
1717
1718   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1719
1720   /* catch ridiculously small stack sizes */
1721   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1722     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1723   }
1724
1725   stack_size = size - TSO_STRUCT_SIZEW;
1726
1727   tso = (StgTSO *)allocate(size);
1728   TICK_ALLOC_TSO(stack_size, 0);
1729
1730   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1731 #if defined(GRAN)
1732   SET_GRAN_HDR(tso, ThisPE);
1733 #endif
1734
1735   // Always start with the compiled code evaluator
1736   tso->what_next = ThreadRunGHC;
1737
1738   tso->id = next_thread_id++; 
1739   tso->why_blocked  = NotBlocked;
1740   tso->blocked_exceptions = NULL;
1741
1742   tso->saved_errno = 0;
1743   tso->main = NULL;
1744   
1745   tso->stack_size   = stack_size;
1746   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1747                               - TSO_STRUCT_SIZEW;
1748   tso->sp           = (P_)&(tso->stack) + stack_size;
1749
1750 #ifdef PROFILING
1751   tso->prof.CCCS = CCS_MAIN;
1752 #endif
1753
1754   /* put a stop frame on the stack */
1755   tso->sp -= sizeofW(StgStopFrame);
1756   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1757   tso->link = END_TSO_QUEUE;
1758
1759   // ToDo: check this
1760 #if defined(GRAN)
1761   /* uses more flexible routine in GranSim */
1762   insertThread(tso, CurrentProc);
1763 #else
1764   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1765    * from its creation
1766    */
1767 #endif
1768
1769 #if defined(GRAN) 
1770   if (RtsFlags.GranFlags.GranSimStats.Full) 
1771     DumpGranEvent(GR_START,tso);
1772 #elif defined(PAR)
1773   if (RtsFlags.ParFlags.ParStats.Full) 
1774     DumpGranEvent(GR_STARTQ,tso);
1775   /* HACk to avoid SCHEDULE 
1776      LastTSO = tso; */
1777 #endif
1778
1779   /* Link the new thread on the global thread list.
1780    */
1781   tso->global_link = all_threads;
1782   all_threads = tso;
1783
1784 #if defined(DIST)
1785   tso->dist.priority = MandatoryPriority; //by default that is...
1786 #endif
1787
1788 #if defined(GRAN)
1789   tso->gran.pri = pri;
1790 # if defined(DEBUG)
1791   tso->gran.magic = TSO_MAGIC; // debugging only
1792 # endif
1793   tso->gran.sparkname   = 0;
1794   tso->gran.startedat   = CURRENT_TIME; 
1795   tso->gran.exported    = 0;
1796   tso->gran.basicblocks = 0;
1797   tso->gran.allocs      = 0;
1798   tso->gran.exectime    = 0;
1799   tso->gran.fetchtime   = 0;
1800   tso->gran.fetchcount  = 0;
1801   tso->gran.blocktime   = 0;
1802   tso->gran.blockcount  = 0;
1803   tso->gran.blockedat   = 0;
1804   tso->gran.globalsparks = 0;
1805   tso->gran.localsparks  = 0;
1806   if (RtsFlags.GranFlags.Light)
1807     tso->gran.clock  = Now; /* local clock */
1808   else
1809     tso->gran.clock  = 0;
1810
1811   IF_DEBUG(gran,printTSO(tso));
1812 #elif defined(PAR)
1813 # if defined(DEBUG)
1814   tso->par.magic = TSO_MAGIC; // debugging only
1815 # endif
1816   tso->par.sparkname   = 0;
1817   tso->par.startedat   = CURRENT_TIME; 
1818   tso->par.exported    = 0;
1819   tso->par.basicblocks = 0;
1820   tso->par.allocs      = 0;
1821   tso->par.exectime    = 0;
1822   tso->par.fetchtime   = 0;
1823   tso->par.fetchcount  = 0;
1824   tso->par.blocktime   = 0;
1825   tso->par.blockcount  = 0;
1826   tso->par.blockedat   = 0;
1827   tso->par.globalsparks = 0;
1828   tso->par.localsparks  = 0;
1829 #endif
1830
1831 #if defined(GRAN)
1832   globalGranStats.tot_threads_created++;
1833   globalGranStats.threads_created_on_PE[CurrentProc]++;
1834   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1835   globalGranStats.tot_sq_probes++;
1836 #elif defined(PAR)
1837   // collect parallel global statistics (currently done together with GC stats)
1838   if (RtsFlags.ParFlags.ParStats.Global &&
1839       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1840     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1841     globalParStats.tot_threads_created++;
1842   }
1843 #endif 
1844
1845 #if defined(GRAN)
1846   IF_GRAN_DEBUG(pri,
1847                 sched_belch("==__ schedule: Created TSO %d (%p);",
1848                       CurrentProc, tso, tso->id));
1849 #elif defined(PAR)
1850     IF_PAR_DEBUG(verbose,
1851                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
1852                        (long)tso->id, tso, advisory_thread_count));
1853 #else
1854   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1855                                 (long)tso->id, (long)tso->stack_size));
1856 #endif    
1857   return tso;
1858 }
1859
1860 #if defined(PAR)
1861 /* RFP:
1862    all parallel thread creation calls should fall through the following routine.
1863 */
1864 StgTSO *
1865 createSparkThread(rtsSpark spark) 
1866 { StgTSO *tso;
1867   ASSERT(spark != (rtsSpark)NULL);
1868   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1869   { threadsIgnored++;
1870     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1871           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1872     return END_TSO_QUEUE;
1873   }
1874   else
1875   { threadsCreated++;
1876     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1877     if (tso==END_TSO_QUEUE)     
1878       barf("createSparkThread: Cannot create TSO");
1879 #if defined(DIST)
1880     tso->priority = AdvisoryPriority;
1881 #endif
1882     pushClosure(tso,spark);
1883     PUSH_ON_RUN_QUEUE(tso);
1884     advisory_thread_count++;    
1885   }
1886   return tso;
1887 }
1888 #endif
1889
1890 /*
1891   Turn a spark into a thread.
1892   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1893 */
1894 #if defined(PAR)
1895 StgTSO *
1896 activateSpark (rtsSpark spark) 
1897 {
1898   StgTSO *tso;
1899
1900   tso = createSparkThread(spark);
1901   if (RtsFlags.ParFlags.ParStats.Full) {   
1902     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1903     IF_PAR_DEBUG(verbose,
1904                  debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
1905                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1906   }
1907   // ToDo: fwd info on local/global spark to thread -- HWL
1908   // tso->gran.exported =  spark->exported;
1909   // tso->gran.locked =   !spark->global;
1910   // tso->gran.sparkname = spark->name;
1911
1912   return tso;
1913 }
1914 #endif
1915
1916 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1917                                    Capability *initialCapability
1918                                    );
1919
1920
1921 /* ---------------------------------------------------------------------------
1922  * scheduleThread()
1923  *
1924  * scheduleThread puts a thread on the head of the runnable queue.
1925  * This will usually be done immediately after a thread is created.
1926  * The caller of scheduleThread must create the thread using e.g.
1927  * createThread and push an appropriate closure
1928  * on this thread's stack before the scheduler is invoked.
1929  * ------------------------------------------------------------------------ */
1930
1931 static void scheduleThread_ (StgTSO* tso);
1932
1933 void
1934 scheduleThread_(StgTSO *tso)
1935 {
1936   // Precondition: sched_mutex must be held.
1937   // The thread goes at the *end* of the run-queue, to avoid possible
1938   // starvation of any threads already on the queue.
1939   APPEND_TO_RUN_QUEUE(tso);
1940   THREAD_RUNNABLE();
1941 }
1942
1943 void
1944 scheduleThread(StgTSO* tso)
1945 {
1946   ACQUIRE_LOCK(&sched_mutex);
1947   scheduleThread_(tso);
1948   RELEASE_LOCK(&sched_mutex);
1949 }
1950
1951 #if defined(RTS_SUPPORTS_THREADS)
1952 static Condition bound_cond_cache;
1953 static int bound_cond_cache_full = 0;
1954 #endif
1955
1956
1957 SchedulerStatus
1958 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1959                    Capability *initialCapability)
1960 {
1961     // Precondition: sched_mutex must be held
1962     StgMainThread *m;
1963
1964     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1965     m->tso = tso;
1966     tso->main = m;
1967     m->ret = ret;
1968     m->stat = NoStatus;
1969     m->link = main_threads;
1970     m->prev = NULL;
1971     if (main_threads != NULL) {
1972         main_threads->prev = m;
1973     }
1974     main_threads = m;
1975
1976 #if defined(RTS_SUPPORTS_THREADS)
1977     // Allocating a new condition for each thread is expensive, so we
1978     // cache one.  This is a pretty feeble hack, but it helps speed up
1979     // consecutive call-ins quite a bit.
1980     if (bound_cond_cache_full) {
1981         m->bound_thread_cond = bound_cond_cache;
1982         bound_cond_cache_full = 0;
1983     } else {
1984         initCondition(&m->bound_thread_cond);
1985     }
1986 #endif
1987
1988     /* Put the thread on the main-threads list prior to scheduling the TSO.
1989        Failure to do so introduces a race condition in the MT case (as
1990        identified by Wolfgang Thaller), whereby the new task/OS thread 
1991        created by scheduleThread_() would complete prior to the thread
1992        that spawned it managed to put 'itself' on the main-threads list.
1993        The upshot of it all being that the worker thread wouldn't get to
1994        signal the completion of the its work item for the main thread to
1995        see (==> it got stuck waiting.)    -- sof 6/02.
1996     */
1997     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
1998     
1999     APPEND_TO_RUN_QUEUE(tso);
2000     // NB. Don't call THREAD_RUNNABLE() here, because the thread is
2001     // bound and only runnable by *this* OS thread, so waking up other
2002     // workers will just slow things down.
2003
2004     return waitThread_(m, initialCapability);
2005 }
2006
2007 /* ---------------------------------------------------------------------------
2008  * initScheduler()
2009  *
2010  * Initialise the scheduler.  This resets all the queues - if the
2011  * queues contained any threads, they'll be garbage collected at the
2012  * next pass.
2013  *
2014  * ------------------------------------------------------------------------ */
2015
2016 void 
2017 initScheduler(void)
2018 {
2019 #if defined(GRAN)
2020   nat i;
2021
2022   for (i=0; i<=MAX_PROC; i++) {
2023     run_queue_hds[i]      = END_TSO_QUEUE;
2024     run_queue_tls[i]      = END_TSO_QUEUE;
2025     blocked_queue_hds[i]  = END_TSO_QUEUE;
2026     blocked_queue_tls[i]  = END_TSO_QUEUE;
2027     ccalling_threadss[i]  = END_TSO_QUEUE;
2028     sleeping_queue        = END_TSO_QUEUE;
2029   }
2030 #else
2031   run_queue_hd      = END_TSO_QUEUE;
2032   run_queue_tl      = END_TSO_QUEUE;
2033   blocked_queue_hd  = END_TSO_QUEUE;
2034   blocked_queue_tl  = END_TSO_QUEUE;
2035   sleeping_queue    = END_TSO_QUEUE;
2036 #endif 
2037
2038   suspended_ccalling_threads  = END_TSO_QUEUE;
2039
2040   main_threads = NULL;
2041   all_threads  = END_TSO_QUEUE;
2042
2043   context_switch = 0;
2044   interrupted    = 0;
2045
2046   RtsFlags.ConcFlags.ctxtSwitchTicks =
2047       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2048       
2049 #if defined(RTS_SUPPORTS_THREADS)
2050   /* Initialise the mutex and condition variables used by
2051    * the scheduler. */
2052   initMutex(&sched_mutex);
2053   initMutex(&term_mutex);
2054 #endif
2055   
2056   ACQUIRE_LOCK(&sched_mutex);
2057
2058   /* A capability holds the state a native thread needs in
2059    * order to execute STG code. At least one capability is
2060    * floating around (only SMP builds have more than one).
2061    */
2062   initCapabilities();
2063   
2064 #if defined(RTS_SUPPORTS_THREADS)
2065     /* start our haskell execution tasks */
2066     startTaskManager(0,taskStart);
2067 #endif
2068
2069 #if /* defined(SMP) ||*/ defined(PAR)
2070   initSparkPools();
2071 #endif
2072
2073   RELEASE_LOCK(&sched_mutex);
2074 }
2075
2076 void
2077 exitScheduler( void )
2078 {
2079 #if defined(RTS_SUPPORTS_THREADS)
2080   stopTaskManager();
2081 #endif
2082   shutting_down_scheduler = rtsTrue;
2083 }
2084
2085 /* ----------------------------------------------------------------------------
2086    Managing the per-task allocation areas.
2087    
2088    Each capability comes with an allocation area.  These are
2089    fixed-length block lists into which allocation can be done.
2090
2091    ToDo: no support for two-space collection at the moment???
2092    ------------------------------------------------------------------------- */
2093
2094 static
2095 SchedulerStatus
2096 waitThread_(StgMainThread* m, Capability *initialCapability)
2097 {
2098   SchedulerStatus stat;
2099
2100   // Precondition: sched_mutex must be held.
2101   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2102
2103 #if defined(GRAN)
2104   /* GranSim specific init */
2105   CurrentTSO = m->tso;                // the TSO to run
2106   procStatus[MainProc] = Busy;        // status of main PE
2107   CurrentProc = MainProc;             // PE to run it on
2108   schedule(m,initialCapability);
2109 #else
2110   schedule(m,initialCapability);
2111   ASSERT(m->stat != NoStatus);
2112 #endif
2113
2114   stat = m->stat;
2115
2116 #if defined(RTS_SUPPORTS_THREADS)
2117   // Free the condition variable, returning it to the cache if possible.
2118   if (!bound_cond_cache_full) {
2119       bound_cond_cache = m->bound_thread_cond;
2120       bound_cond_cache_full = 1;
2121   } else {
2122       closeCondition(&m->bound_thread_cond);
2123   }
2124 #endif
2125
2126   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2127   stgFree(m);
2128
2129   // Postcondition: sched_mutex still held
2130   return stat;
2131 }
2132
2133 /* ---------------------------------------------------------------------------
2134    Where are the roots that we know about?
2135
2136         - all the threads on the runnable queue
2137         - all the threads on the blocked queue
2138         - all the threads on the sleeping queue
2139         - all the thread currently executing a _ccall_GC
2140         - all the "main threads"
2141      
2142    ------------------------------------------------------------------------ */
2143
2144 /* This has to be protected either by the scheduler monitor, or by the
2145         garbage collection monitor (probably the latter).
2146         KH @ 25/10/99
2147 */
2148
2149 void
2150 GetRoots( evac_fn evac )
2151 {
2152 #if defined(GRAN)
2153   {
2154     nat i;
2155     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2156       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2157           evac((StgClosure **)&run_queue_hds[i]);
2158       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2159           evac((StgClosure **)&run_queue_tls[i]);
2160       
2161       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2162           evac((StgClosure **)&blocked_queue_hds[i]);
2163       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2164           evac((StgClosure **)&blocked_queue_tls[i]);
2165       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2166           evac((StgClosure **)&ccalling_threads[i]);
2167     }
2168   }
2169
2170   markEventQueue();
2171
2172 #else /* !GRAN */
2173   if (run_queue_hd != END_TSO_QUEUE) {
2174       ASSERT(run_queue_tl != END_TSO_QUEUE);
2175       evac((StgClosure **)&run_queue_hd);
2176       evac((StgClosure **)&run_queue_tl);
2177   }
2178   
2179   if (blocked_queue_hd != END_TSO_QUEUE) {
2180       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2181       evac((StgClosure **)&blocked_queue_hd);
2182       evac((StgClosure **)&blocked_queue_tl);
2183   }
2184   
2185   if (sleeping_queue != END_TSO_QUEUE) {
2186       evac((StgClosure **)&sleeping_queue);
2187   }
2188 #endif 
2189
2190   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2191       evac((StgClosure **)&suspended_ccalling_threads);
2192   }
2193
2194 #if defined(PAR) || defined(GRAN)
2195   markSparkQueue(evac);
2196 #endif
2197
2198 #if defined(RTS_USER_SIGNALS)
2199   // mark the signal handlers (signals should be already blocked)
2200   markSignalHandlers(evac);
2201 #endif
2202 }
2203
2204 /* -----------------------------------------------------------------------------
2205    performGC
2206
2207    This is the interface to the garbage collector from Haskell land.
2208    We provide this so that external C code can allocate and garbage
2209    collect when called from Haskell via _ccall_GC.
2210
2211    It might be useful to provide an interface whereby the programmer
2212    can specify more roots (ToDo).
2213    
2214    This needs to be protected by the GC condition variable above.  KH.
2215    -------------------------------------------------------------------------- */
2216
2217 static void (*extra_roots)(evac_fn);
2218
2219 void
2220 performGC(void)
2221 {
2222   /* Obligated to hold this lock upon entry */
2223   ACQUIRE_LOCK(&sched_mutex);
2224   GarbageCollect(GetRoots,rtsFalse);
2225   RELEASE_LOCK(&sched_mutex);
2226 }
2227
2228 void
2229 performMajorGC(void)
2230 {
2231   ACQUIRE_LOCK(&sched_mutex);
2232   GarbageCollect(GetRoots,rtsTrue);
2233   RELEASE_LOCK(&sched_mutex);
2234 }
2235
2236 static void
2237 AllRoots(evac_fn evac)
2238 {
2239     GetRoots(evac);             // the scheduler's roots
2240     extra_roots(evac);          // the user's roots
2241 }
2242
2243 void
2244 performGCWithRoots(void (*get_roots)(evac_fn))
2245 {
2246   ACQUIRE_LOCK(&sched_mutex);
2247   extra_roots = get_roots;
2248   GarbageCollect(AllRoots,rtsFalse);
2249   RELEASE_LOCK(&sched_mutex);
2250 }
2251
2252 /* -----------------------------------------------------------------------------
2253    Stack overflow
2254
2255    If the thread has reached its maximum stack size, then raise the
2256    StackOverflow exception in the offending thread.  Otherwise
2257    relocate the TSO into a larger chunk of memory and adjust its stack
2258    size appropriately.
2259    -------------------------------------------------------------------------- */
2260
2261 static StgTSO *
2262 threadStackOverflow(StgTSO *tso)
2263 {
2264   nat new_stack_size, new_tso_size, stack_words;
2265   StgPtr new_sp;
2266   StgTSO *dest;
2267
2268   IF_DEBUG(sanity,checkTSO(tso));
2269   if (tso->stack_size >= tso->max_stack_size) {
2270
2271     IF_DEBUG(gc,
2272              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2273                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2274              /* If we're debugging, just print out the top of the stack */
2275              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2276                                               tso->sp+64)));
2277
2278     /* Send this thread the StackOverflow exception */
2279     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2280     return tso;
2281   }
2282
2283   /* Try to double the current stack size.  If that takes us over the
2284    * maximum stack size for this thread, then use the maximum instead.
2285    * Finally round up so the TSO ends up as a whole number of blocks.
2286    */
2287   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2288   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2289                                        TSO_STRUCT_SIZE)/sizeof(W_);
2290   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2291   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2292
2293   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2294
2295   dest = (StgTSO *)allocate(new_tso_size);
2296   TICK_ALLOC_TSO(new_stack_size,0);
2297
2298   /* copy the TSO block and the old stack into the new area */
2299   memcpy(dest,tso,TSO_STRUCT_SIZE);
2300   stack_words = tso->stack + tso->stack_size - tso->sp;
2301   new_sp = (P_)dest + new_tso_size - stack_words;
2302   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2303
2304   /* relocate the stack pointers... */
2305   dest->sp         = new_sp;
2306   dest->stack_size = new_stack_size;
2307         
2308   /* Mark the old TSO as relocated.  We have to check for relocated
2309    * TSOs in the garbage collector and any primops that deal with TSOs.
2310    *
2311    * It's important to set the sp value to just beyond the end
2312    * of the stack, so we don't attempt to scavenge any part of the
2313    * dead TSO's stack.
2314    */
2315   tso->what_next = ThreadRelocated;
2316   tso->link = dest;
2317   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2318   tso->why_blocked = NotBlocked;
2319   dest->mut_link = NULL;
2320
2321   IF_PAR_DEBUG(verbose,
2322                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2323                      tso->id, tso, tso->stack_size);
2324                /* If we're debugging, just print out the top of the stack */
2325                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2326                                                 tso->sp+64)));
2327   
2328   IF_DEBUG(sanity,checkTSO(tso));
2329 #if 0
2330   IF_DEBUG(scheduler,printTSO(dest));
2331 #endif
2332
2333   return dest;
2334 }
2335
2336 /* ---------------------------------------------------------------------------
2337    Wake up a queue that was blocked on some resource.
2338    ------------------------------------------------------------------------ */
2339
2340 #if defined(GRAN)
2341 STATIC_INLINE void
2342 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2343 {
2344 }
2345 #elif defined(PAR)
2346 STATIC_INLINE void
2347 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2348 {
2349   /* write RESUME events to log file and
2350      update blocked and fetch time (depending on type of the orig closure) */
2351   if (RtsFlags.ParFlags.ParStats.Full) {
2352     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2353                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2354                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2355     if (EMPTY_RUN_QUEUE())
2356       emitSchedule = rtsTrue;
2357
2358     switch (get_itbl(node)->type) {
2359         case FETCH_ME_BQ:
2360           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2361           break;
2362         case RBH:
2363         case FETCH_ME:
2364         case BLACKHOLE_BQ:
2365           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2366           break;
2367 #ifdef DIST
2368         case MVAR:
2369           break;
2370 #endif    
2371         default:
2372           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2373         }
2374       }
2375 }
2376 #endif
2377
2378 #if defined(GRAN)
2379 static StgBlockingQueueElement *
2380 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2381 {
2382     StgTSO *tso;
2383     PEs node_loc, tso_loc;
2384
2385     node_loc = where_is(node); // should be lifted out of loop
2386     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2387     tso_loc = where_is((StgClosure *)tso);
2388     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2389       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2390       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2391       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2392       // insertThread(tso, node_loc);
2393       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2394                 ResumeThread,
2395                 tso, node, (rtsSpark*)NULL);
2396       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2397       // len_local++;
2398       // len++;
2399     } else { // TSO is remote (actually should be FMBQ)
2400       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2401                                   RtsFlags.GranFlags.Costs.gunblocktime +
2402                                   RtsFlags.GranFlags.Costs.latency;
2403       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2404                 UnblockThread,
2405                 tso, node, (rtsSpark*)NULL);
2406       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2407       // len++;
2408     }
2409     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2410     IF_GRAN_DEBUG(bq,
2411                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2412                           (node_loc==tso_loc ? "Local" : "Global"), 
2413                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2414     tso->block_info.closure = NULL;
2415     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2416                              tso->id, tso));
2417 }
2418 #elif defined(PAR)
2419 static StgBlockingQueueElement *
2420 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2421 {
2422     StgBlockingQueueElement *next;
2423
2424     switch (get_itbl(bqe)->type) {
2425     case TSO:
2426       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2427       /* if it's a TSO just push it onto the run_queue */
2428       next = bqe->link;
2429       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2430       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2431       THREAD_RUNNABLE();
2432       unblockCount(bqe, node);
2433       /* reset blocking status after dumping event */
2434       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2435       break;
2436
2437     case BLOCKED_FETCH:
2438       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2439       next = bqe->link;
2440       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2441       PendingFetches = (StgBlockedFetch *)bqe;
2442       break;
2443
2444 # if defined(DEBUG)
2445       /* can ignore this case in a non-debugging setup; 
2446          see comments on RBHSave closures above */
2447     case CONSTR:
2448       /* check that the closure is an RBHSave closure */
2449       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2450              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2451              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2452       break;
2453
2454     default:
2455       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2456            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2457            (StgClosure *)bqe);
2458 # endif
2459     }
2460   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
2461   return next;
2462 }
2463
2464 #else /* !GRAN && !PAR */
2465 static StgTSO *
2466 unblockOneLocked(StgTSO *tso)
2467 {
2468   StgTSO *next;
2469
2470   ASSERT(get_itbl(tso)->type == TSO);
2471   ASSERT(tso->why_blocked != NotBlocked);
2472   tso->why_blocked = NotBlocked;
2473   next = tso->link;
2474   tso->link = END_TSO_QUEUE;
2475   APPEND_TO_RUN_QUEUE(tso);
2476   THREAD_RUNNABLE();
2477   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
2478   return next;
2479 }
2480 #endif
2481
2482 #if defined(GRAN) || defined(PAR)
2483 INLINE_ME StgBlockingQueueElement *
2484 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2485 {
2486   ACQUIRE_LOCK(&sched_mutex);
2487   bqe = unblockOneLocked(bqe, node);
2488   RELEASE_LOCK(&sched_mutex);
2489   return bqe;
2490 }
2491 #else
2492 INLINE_ME StgTSO *
2493 unblockOne(StgTSO *tso)
2494 {
2495   ACQUIRE_LOCK(&sched_mutex);
2496   tso = unblockOneLocked(tso);
2497   RELEASE_LOCK(&sched_mutex);
2498   return tso;
2499 }
2500 #endif
2501
2502 #if defined(GRAN)
2503 void 
2504 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2505 {
2506   StgBlockingQueueElement *bqe;
2507   PEs node_loc;
2508   nat len = 0; 
2509
2510   IF_GRAN_DEBUG(bq, 
2511                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
2512                       node, CurrentProc, CurrentTime[CurrentProc], 
2513                       CurrentTSO->id, CurrentTSO));
2514
2515   node_loc = where_is(node);
2516
2517   ASSERT(q == END_BQ_QUEUE ||
2518          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2519          get_itbl(q)->type == CONSTR); // closure (type constructor)
2520   ASSERT(is_unique(node));
2521
2522   /* FAKE FETCH: magically copy the node to the tso's proc;
2523      no Fetch necessary because in reality the node should not have been 
2524      moved to the other PE in the first place
2525   */
2526   if (CurrentProc!=node_loc) {
2527     IF_GRAN_DEBUG(bq, 
2528                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
2529                         node, node_loc, CurrentProc, CurrentTSO->id, 
2530                         // CurrentTSO, where_is(CurrentTSO),
2531                         node->header.gran.procs));
2532     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2533     IF_GRAN_DEBUG(bq, 
2534                   debugBelch("## new bitmask of node %p is %#x\n",
2535                         node, node->header.gran.procs));
2536     if (RtsFlags.GranFlags.GranSimStats.Global) {
2537       globalGranStats.tot_fake_fetches++;
2538     }
2539   }
2540
2541   bqe = q;
2542   // ToDo: check: ASSERT(CurrentProc==node_loc);
2543   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2544     //next = bqe->link;
2545     /* 
2546        bqe points to the current element in the queue
2547        next points to the next element in the queue
2548     */
2549     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2550     //tso_loc = where_is(tso);
2551     len++;
2552     bqe = unblockOneLocked(bqe, node);
2553   }
2554
2555   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2556      the closure to make room for the anchor of the BQ */
2557   if (bqe!=END_BQ_QUEUE) {
2558     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2559     /*
2560     ASSERT((info_ptr==&RBH_Save_0_info) ||
2561            (info_ptr==&RBH_Save_1_info) ||
2562            (info_ptr==&RBH_Save_2_info));
2563     */
2564     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2565     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2566     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2567
2568     IF_GRAN_DEBUG(bq,
2569                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
2570                         node, info_type(node)));
2571   }
2572
2573   /* statistics gathering */
2574   if (RtsFlags.GranFlags.GranSimStats.Global) {
2575     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2576     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2577     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2578     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2579   }
2580   IF_GRAN_DEBUG(bq,
2581                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
2582                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2583 }
2584 #elif defined(PAR)
2585 void 
2586 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2587 {
2588   StgBlockingQueueElement *bqe;
2589
2590   ACQUIRE_LOCK(&sched_mutex);
2591
2592   IF_PAR_DEBUG(verbose, 
2593                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
2594                      node, mytid));
2595 #ifdef DIST  
2596   //RFP
2597   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2598     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
2599     return;
2600   }
2601 #endif
2602   
2603   ASSERT(q == END_BQ_QUEUE ||
2604          get_itbl(q)->type == TSO ||           
2605          get_itbl(q)->type == BLOCKED_FETCH || 
2606          get_itbl(q)->type == CONSTR); 
2607
2608   bqe = q;
2609   while (get_itbl(bqe)->type==TSO || 
2610          get_itbl(bqe)->type==BLOCKED_FETCH) {
2611     bqe = unblockOneLocked(bqe, node);
2612   }
2613   RELEASE_LOCK(&sched_mutex);
2614 }
2615
2616 #else   /* !GRAN && !PAR */
2617
2618 void
2619 awakenBlockedQueueNoLock(StgTSO *tso)
2620 {
2621   while (tso != END_TSO_QUEUE) {
2622     tso = unblockOneLocked(tso);
2623   }
2624 }
2625
2626 void
2627 awakenBlockedQueue(StgTSO *tso)
2628 {
2629   ACQUIRE_LOCK(&sched_mutex);
2630   while (tso != END_TSO_QUEUE) {
2631     tso = unblockOneLocked(tso);
2632   }
2633   RELEASE_LOCK(&sched_mutex);
2634 }
2635 #endif
2636
2637 /* ---------------------------------------------------------------------------
2638    Interrupt execution
2639    - usually called inside a signal handler so it mustn't do anything fancy.   
2640    ------------------------------------------------------------------------ */
2641
2642 void
2643 interruptStgRts(void)
2644 {
2645     interrupted    = 1;
2646     context_switch = 1;
2647 #ifdef RTS_SUPPORTS_THREADS
2648     wakeBlockedWorkerThread();
2649 #endif
2650 }
2651
2652 /* -----------------------------------------------------------------------------
2653    Unblock a thread
2654
2655    This is for use when we raise an exception in another thread, which
2656    may be blocked.
2657    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2658    -------------------------------------------------------------------------- */
2659
2660 #if defined(GRAN) || defined(PAR)
2661 /*
2662   NB: only the type of the blocking queue is different in GranSim and GUM
2663       the operations on the queue-elements are the same
2664       long live polymorphism!
2665
2666   Locks: sched_mutex is held upon entry and exit.
2667
2668 */
2669 static void
2670 unblockThread(StgTSO *tso)
2671 {
2672   StgBlockingQueueElement *t, **last;
2673
2674   switch (tso->why_blocked) {
2675
2676   case NotBlocked:
2677     return;  /* not blocked */
2678
2679   case BlockedOnMVar:
2680     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2681     {
2682       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2683       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2684
2685       last = (StgBlockingQueueElement **)&mvar->head;
2686       for (t = (StgBlockingQueueElement *)mvar->head; 
2687            t != END_BQ_QUEUE; 
2688            last = &t->link, last_tso = t, t = t->link) {
2689         if (t == (StgBlockingQueueElement *)tso) {
2690           *last = (StgBlockingQueueElement *)tso->link;
2691           if (mvar->tail == tso) {
2692             mvar->tail = (StgTSO *)last_tso;
2693           }
2694           goto done;
2695         }
2696       }
2697       barf("unblockThread (MVAR): TSO not found");
2698     }
2699
2700   case BlockedOnBlackHole:
2701     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2702     {
2703       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2704
2705       last = &bq->blocking_queue;
2706       for (t = bq->blocking_queue; 
2707            t != END_BQ_QUEUE; 
2708            last = &t->link, t = t->link) {
2709         if (t == (StgBlockingQueueElement *)tso) {
2710           *last = (StgBlockingQueueElement *)tso->link;
2711           goto done;
2712         }
2713       }
2714       barf("unblockThread (BLACKHOLE): TSO not found");
2715     }
2716
2717   case BlockedOnException:
2718     {
2719       StgTSO *target  = tso->block_info.tso;
2720
2721       ASSERT(get_itbl(target)->type == TSO);
2722
2723       if (target->what_next == ThreadRelocated) {
2724           target = target->link;
2725           ASSERT(get_itbl(target)->type == TSO);
2726       }
2727
2728       ASSERT(target->blocked_exceptions != NULL);
2729
2730       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2731       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2732            t != END_BQ_QUEUE; 
2733            last = &t->link, t = t->link) {
2734         ASSERT(get_itbl(t)->type == TSO);
2735         if (t == (StgBlockingQueueElement *)tso) {
2736           *last = (StgBlockingQueueElement *)tso->link;
2737           goto done;
2738         }
2739       }
2740       barf("unblockThread (Exception): TSO not found");
2741     }
2742
2743   case BlockedOnRead:
2744   case BlockedOnWrite:
2745 #if defined(mingw32_TARGET_OS)
2746   case BlockedOnDoProc:
2747 #endif
2748     {
2749       /* take TSO off blocked_queue */
2750       StgBlockingQueueElement *prev = NULL;
2751       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2752            prev = t, t = t->link) {
2753         if (t == (StgBlockingQueueElement *)tso) {
2754           if (prev == NULL) {
2755             blocked_queue_hd = (StgTSO *)t->link;
2756             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2757               blocked_queue_tl = END_TSO_QUEUE;
2758             }
2759           } else {
2760             prev->link = t->link;
2761             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2762               blocked_queue_tl = (StgTSO *)prev;
2763             }
2764           }
2765           goto done;
2766         }
2767       }
2768       barf("unblockThread (I/O): TSO not found");
2769     }
2770
2771   case BlockedOnDelay:
2772     {
2773       /* take TSO off sleeping_queue */
2774       StgBlockingQueueElement *prev = NULL;
2775       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2776            prev = t, t = t->link) {
2777         if (t == (StgBlockingQueueElement *)tso) {
2778           if (prev == NULL) {
2779             sleeping_queue = (StgTSO *)t->link;
2780           } else {
2781             prev->link = t->link;
2782           }
2783           goto done;
2784         }
2785       }
2786       barf("unblockThread (delay): TSO not found");
2787     }
2788
2789   default:
2790     barf("unblockThread");
2791   }
2792
2793  done:
2794   tso->link = END_TSO_QUEUE;
2795   tso->why_blocked = NotBlocked;
2796   tso->block_info.closure = NULL;
2797   PUSH_ON_RUN_QUEUE(tso);
2798 }
2799 #else
2800 static void
2801 unblockThread(StgTSO *tso)
2802 {
2803   StgTSO *t, **last;
2804   
2805   /* To avoid locking unnecessarily. */
2806   if (tso->why_blocked == NotBlocked) {
2807     return;
2808   }
2809
2810   switch (tso->why_blocked) {
2811
2812   case BlockedOnMVar:
2813     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2814     {
2815       StgTSO *last_tso = END_TSO_QUEUE;
2816       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2817
2818       last = &mvar->head;
2819       for (t = mvar->head; t != END_TSO_QUEUE; 
2820            last = &t->link, last_tso = t, t = t->link) {
2821         if (t == tso) {
2822           *last = tso->link;
2823           if (mvar->tail == tso) {
2824             mvar->tail = last_tso;
2825           }
2826           goto done;
2827         }
2828       }
2829       barf("unblockThread (MVAR): TSO not found");
2830     }
2831
2832   case BlockedOnBlackHole:
2833     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2834     {
2835       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2836
2837       last = &bq->blocking_queue;
2838       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2839            last = &t->link, t = t->link) {
2840         if (t == tso) {
2841           *last = tso->link;
2842           goto done;
2843         }
2844       }
2845       barf("unblockThread (BLACKHOLE): TSO not found");
2846     }
2847
2848   case BlockedOnException:
2849     {
2850       StgTSO *target  = tso->block_info.tso;
2851
2852       ASSERT(get_itbl(target)->type == TSO);
2853
2854       while (target->what_next == ThreadRelocated) {
2855           target = target->link;
2856           ASSERT(get_itbl(target)->type == TSO);
2857       }
2858       
2859       ASSERT(target->blocked_exceptions != NULL);
2860
2861       last = &target->blocked_exceptions;
2862       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2863            last = &t->link, t = t->link) {
2864         ASSERT(get_itbl(t)->type == TSO);
2865         if (t == tso) {
2866           *last = tso->link;
2867           goto done;
2868         }
2869       }
2870       barf("unblockThread (Exception): TSO not found");
2871     }
2872
2873   case BlockedOnRead:
2874   case BlockedOnWrite:
2875 #if defined(mingw32_TARGET_OS)
2876   case BlockedOnDoProc:
2877 #endif
2878     {
2879       StgTSO *prev = NULL;
2880       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2881            prev = t, t = t->link) {
2882         if (t == tso) {
2883           if (prev == NULL) {
2884             blocked_queue_hd = t->link;
2885             if (blocked_queue_tl == t) {
2886               blocked_queue_tl = END_TSO_QUEUE;
2887             }
2888           } else {
2889             prev->link = t->link;
2890             if (blocked_queue_tl == t) {
2891               blocked_queue_tl = prev;
2892             }
2893           }
2894           goto done;
2895         }
2896       }
2897       barf("unblockThread (I/O): TSO not found");
2898     }
2899
2900   case BlockedOnDelay:
2901     {
2902       StgTSO *prev = NULL;
2903       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2904            prev = t, t = t->link) {
2905         if (t == tso) {
2906           if (prev == NULL) {
2907             sleeping_queue = t->link;
2908           } else {
2909             prev->link = t->link;
2910           }
2911           goto done;
2912         }
2913       }
2914       barf("unblockThread (delay): TSO not found");
2915     }
2916
2917   default:
2918     barf("unblockThread");
2919   }
2920
2921  done:
2922   tso->link = END_TSO_QUEUE;
2923   tso->why_blocked = NotBlocked;
2924   tso->block_info.closure = NULL;
2925   APPEND_TO_RUN_QUEUE(tso);
2926 }
2927 #endif
2928
2929 /* -----------------------------------------------------------------------------
2930  * raiseAsync()
2931  *
2932  * The following function implements the magic for raising an
2933  * asynchronous exception in an existing thread.
2934  *
2935  * We first remove the thread from any queue on which it might be
2936  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2937  *
2938  * We strip the stack down to the innermost CATCH_FRAME, building
2939  * thunks in the heap for all the active computations, so they can 
2940  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2941  * an application of the handler to the exception, and push it on
2942  * the top of the stack.
2943  * 
2944  * How exactly do we save all the active computations?  We create an
2945  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2946  * AP_STACKs pushes everything from the corresponding update frame
2947  * upwards onto the stack.  (Actually, it pushes everything up to the
2948  * next update frame plus a pointer to the next AP_STACK object.
2949  * Entering the next AP_STACK object pushes more onto the stack until we
2950  * reach the last AP_STACK object - at which point the stack should look
2951  * exactly as it did when we killed the TSO and we can continue
2952  * execution by entering the closure on top of the stack.
2953  *
2954  * We can also kill a thread entirely - this happens if either (a) the 
2955  * exception passed to raiseAsync is NULL, or (b) there's no
2956  * CATCH_FRAME on the stack.  In either case, we strip the entire
2957  * stack and replace the thread with a zombie.
2958  *
2959  * Locks: sched_mutex held upon entry nor exit.
2960  *
2961  * -------------------------------------------------------------------------- */
2962  
2963 void 
2964 deleteThread(StgTSO *tso)
2965 {
2966   raiseAsync(tso,NULL);
2967 }
2968
2969 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2970 static void 
2971 deleteThreadImmediately(StgTSO *tso)
2972 { // for forkProcess only:
2973   // delete thread without giving it a chance to catch the KillThread exception
2974
2975   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2976       return;
2977   }
2978
2979   if (tso->why_blocked != BlockedOnCCall &&
2980       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2981     unblockThread(tso);
2982   }
2983
2984   tso->what_next = ThreadKilled;
2985 }
2986 #endif
2987
2988 void
2989 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2990 {
2991   /* When raising async exs from contexts where sched_mutex isn't held;
2992      use raiseAsyncWithLock(). */
2993   ACQUIRE_LOCK(&sched_mutex);
2994   raiseAsync(tso,exception);
2995   RELEASE_LOCK(&sched_mutex);
2996 }
2997
2998 void
2999 raiseAsync(StgTSO *tso, StgClosure *exception)
3000 {
3001     StgRetInfoTable *info;
3002     StgPtr sp;
3003   
3004     // Thread already dead?
3005     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3006         return;
3007     }
3008
3009     IF_DEBUG(scheduler, 
3010              sched_belch("raising exception in thread %ld.", (long)tso->id));
3011     
3012     // Remove it from any blocking queues
3013     unblockThread(tso);
3014
3015     sp = tso->sp;
3016     
3017     // The stack freezing code assumes there's a closure pointer on
3018     // the top of the stack, so we have to arrange that this is the case...
3019     //
3020     if (sp[0] == (W_)&stg_enter_info) {
3021         sp++;
3022     } else {
3023         sp--;
3024         sp[0] = (W_)&stg_dummy_ret_closure;
3025     }
3026
3027     while (1) {
3028         nat i;
3029
3030         // 1. Let the top of the stack be the "current closure"
3031         //
3032         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3033         // CATCH_FRAME.
3034         //
3035         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3036         // current closure applied to the chunk of stack up to (but not
3037         // including) the update frame.  This closure becomes the "current
3038         // closure".  Go back to step 2.
3039         //
3040         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3041         // top of the stack applied to the exception.
3042         // 
3043         // 5. If it's a STOP_FRAME, then kill the thread.
3044         
3045         StgPtr frame;
3046         
3047         frame = sp + 1;
3048         info = get_ret_itbl((StgClosure *)frame);
3049         
3050         while (info->i.type != UPDATE_FRAME
3051                && (info->i.type != CATCH_FRAME || exception == NULL)
3052                && info->i.type != STOP_FRAME) {
3053             frame += stack_frame_sizeW((StgClosure *)frame);
3054             info = get_ret_itbl((StgClosure *)frame);
3055         }
3056         
3057         switch (info->i.type) {
3058             
3059         case CATCH_FRAME:
3060             // If we find a CATCH_FRAME, and we've got an exception to raise,
3061             // then build the THUNK raise(exception), and leave it on
3062             // top of the CATCH_FRAME ready to enter.
3063             //
3064         {
3065 #ifdef PROFILING
3066             StgCatchFrame *cf = (StgCatchFrame *)frame;
3067 #endif
3068             StgClosure *raise;
3069             
3070             // we've got an exception to raise, so let's pass it to the
3071             // handler in this frame.
3072             //
3073             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3074             TICK_ALLOC_SE_THK(1,0);
3075             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3076             raise->payload[0] = exception;
3077             
3078             // throw away the stack from Sp up to the CATCH_FRAME.
3079             //
3080             sp = frame - 1;
3081             
3082             /* Ensure that async excpetions are blocked now, so we don't get
3083              * a surprise exception before we get around to executing the
3084              * handler.
3085              */
3086             if (tso->blocked_exceptions == NULL) {
3087                 tso->blocked_exceptions = END_TSO_QUEUE;
3088             }
3089             
3090             /* Put the newly-built THUNK on top of the stack, ready to execute
3091              * when the thread restarts.
3092              */
3093             sp[0] = (W_)raise;
3094             sp[-1] = (W_)&stg_enter_info;
3095             tso->sp = sp-1;
3096             tso->what_next = ThreadRunGHC;
3097             IF_DEBUG(sanity, checkTSO(tso));
3098             return;
3099         }
3100         
3101         case UPDATE_FRAME:
3102         {
3103             StgAP_STACK * ap;
3104             nat words;
3105             
3106             // First build an AP_STACK consisting of the stack chunk above the
3107             // current update frame, with the top word on the stack as the
3108             // fun field.
3109             //
3110             words = frame - sp - 1;
3111             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3112             
3113             ap->size = words;
3114             ap->fun  = (StgClosure *)sp[0];
3115             sp++;
3116             for(i=0; i < (nat)words; ++i) {
3117                 ap->payload[i] = (StgClosure *)*sp++;
3118             }
3119             
3120             SET_HDR(ap,&stg_AP_STACK_info,
3121                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3122             TICK_ALLOC_UP_THK(words+1,0);
3123             
3124             IF_DEBUG(scheduler,
3125                      debugBelch("sched: Updating ");
3126                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3127                      debugBelch(" with ");
3128                      printObj((StgClosure *)ap);
3129                 );
3130
3131             // Replace the updatee with an indirection - happily
3132             // this will also wake up any threads currently
3133             // waiting on the result.
3134             //
3135             // Warning: if we're in a loop, more than one update frame on
3136             // the stack may point to the same object.  Be careful not to
3137             // overwrite an IND_OLDGEN in this case, because we'll screw
3138             // up the mutable lists.  To be on the safe side, don't
3139             // overwrite any kind of indirection at all.  See also
3140             // threadSqueezeStack in GC.c, where we have to make a similar
3141             // check.
3142             //
3143             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3144                 // revert the black hole
3145                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3146                                (StgClosure *)ap);
3147             }
3148             sp += sizeofW(StgUpdateFrame) - 1;
3149             sp[0] = (W_)ap; // push onto stack
3150             break;
3151         }
3152         
3153         case STOP_FRAME:
3154             // We've stripped the entire stack, the thread is now dead.
3155             sp += sizeofW(StgStopFrame);
3156             tso->what_next = ThreadKilled;
3157             tso->sp = sp;
3158             return;
3159             
3160         default:
3161             barf("raiseAsync");
3162         }
3163     }
3164     barf("raiseAsync");
3165 }
3166
3167 /* -----------------------------------------------------------------------------
3168    raiseExceptionHelper
3169    
3170    This function is called by the raise# primitve, just so that we can
3171    move some of the tricky bits of raising an exception from C-- into
3172    C.  Who knows, it might be a useful re-useable thing here too.
3173    -------------------------------------------------------------------------- */
3174
3175 StgWord
3176 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3177 {
3178     StgClosure *raise_closure = NULL;
3179     StgPtr p, next;
3180     StgRetInfoTable *info;
3181     //
3182     // This closure represents the expression 'raise# E' where E
3183     // is the exception raise.  It is used to overwrite all the
3184     // thunks which are currently under evaluataion.
3185     //
3186
3187     //    
3188     // LDV profiling: stg_raise_info has THUNK as its closure
3189     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3190     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3191     // 1 does not cause any problem unless profiling is performed.
3192     // However, when LDV profiling goes on, we need to linearly scan
3193     // small object pool, where raise_closure is stored, so we should
3194     // use MIN_UPD_SIZE.
3195     //
3196     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3197     //                                 sizeofW(StgClosure)+1);
3198     //
3199
3200     //
3201     // Walk up the stack, looking for the catch frame.  On the way,
3202     // we update any closures pointed to from update frames with the
3203     // raise closure that we just built.
3204     //
3205     p = tso->sp;
3206     while(1) {
3207         info = get_ret_itbl((StgClosure *)p);
3208         next = p + stack_frame_sizeW((StgClosure *)p);
3209         switch (info->i.type) {
3210             
3211         case UPDATE_FRAME:
3212             // Only create raise_closure if we need to.
3213             if (raise_closure == NULL) {
3214                 raise_closure = 
3215                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3216                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3217                 raise_closure->payload[0] = exception;
3218             }
3219             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3220             p = next;
3221             continue;
3222             
3223         case CATCH_FRAME:
3224             tso->sp = p;
3225             return CATCH_FRAME;
3226             
3227         case STOP_FRAME:
3228             tso->sp = p;
3229             return STOP_FRAME;
3230
3231         default:
3232             p = next; 
3233             continue;
3234         }
3235     }
3236 }
3237
3238 /* -----------------------------------------------------------------------------
3239    resurrectThreads is called after garbage collection on the list of
3240    threads found to be garbage.  Each of these threads will be woken
3241    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3242    on an MVar, or NonTermination if the thread was blocked on a Black
3243    Hole.
3244
3245    Locks: sched_mutex isn't held upon entry nor exit.
3246    -------------------------------------------------------------------------- */
3247
3248 void
3249 resurrectThreads( StgTSO *threads )
3250 {
3251   StgTSO *tso, *next;
3252
3253   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3254     next = tso->global_link;
3255     tso->global_link = all_threads;
3256     all_threads = tso;
3257     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3258
3259     switch (tso->why_blocked) {
3260     case BlockedOnMVar:
3261     case BlockedOnException:
3262       /* Called by GC - sched_mutex lock is currently held. */
3263       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3264       break;
3265     case BlockedOnBlackHole:
3266       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3267       break;
3268     case NotBlocked:
3269       /* This might happen if the thread was blocked on a black hole
3270        * belonging to a thread that we've just woken up (raiseAsync
3271        * can wake up threads, remember...).
3272        */
3273       continue;
3274     default:
3275       barf("resurrectThreads: thread blocked in a strange way");
3276     }
3277   }
3278 }
3279
3280 /* -----------------------------------------------------------------------------
3281  * Blackhole detection: if we reach a deadlock, test whether any
3282  * threads are blocked on themselves.  Any threads which are found to
3283  * be self-blocked get sent a NonTermination exception.
3284  *
3285  * This is only done in a deadlock situation in order to avoid
3286  * performance overhead in the normal case.
3287  *
3288  * Locks: sched_mutex is held upon entry and exit.
3289  * -------------------------------------------------------------------------- */
3290
3291 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
3292 static void
3293 detectBlackHoles( void )
3294 {
3295     StgTSO *tso = all_threads;
3296     StgPtr frame;
3297     StgClosure *blocked_on;
3298     StgRetInfoTable *info;
3299
3300     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3301
3302         while (tso->what_next == ThreadRelocated) {
3303             tso = tso->link;
3304             ASSERT(get_itbl(tso)->type == TSO);
3305         }
3306       
3307         if (tso->why_blocked != BlockedOnBlackHole) {
3308             continue;
3309         }
3310         blocked_on = tso->block_info.closure;
3311
3312         frame = tso->sp;
3313
3314         while(1) {
3315             info = get_ret_itbl((StgClosure *)frame);
3316             switch (info->i.type) {
3317             case UPDATE_FRAME:
3318                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3319                     /* We are blocking on one of our own computations, so
3320                      * send this thread the NonTermination exception.  
3321                      */
3322                     IF_DEBUG(scheduler, 
3323                              sched_belch("thread %d is blocked on itself", tso->id));
3324                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3325                     goto done;
3326                 }
3327                 
3328                 frame = (StgPtr)((StgUpdateFrame *)frame + 1);
3329                 continue;
3330
3331             case STOP_FRAME:
3332                 goto done;
3333
3334                 // normal stack frames; do nothing except advance the pointer
3335             default:
3336                 frame += stack_frame_sizeW((StgClosure *)frame);
3337             }
3338         }   
3339         done: ;
3340     }
3341 }
3342 #endif
3343
3344 /* ----------------------------------------------------------------------------
3345  * Debugging: why is a thread blocked
3346  * [Also provides useful information when debugging threaded programs
3347  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3348    ------------------------------------------------------------------------- */
3349
3350 static
3351 void
3352 printThreadBlockage(StgTSO *tso)
3353 {
3354   switch (tso->why_blocked) {
3355   case BlockedOnRead:
3356     debugBelch("is blocked on read from fd %d", tso->block_info.fd);
3357     break;
3358   case BlockedOnWrite:
3359     debugBelch("is blocked on write to fd %d", tso->block_info.fd);
3360     break;
3361 #if defined(mingw32_TARGET_OS)
3362     case BlockedOnDoProc:
3363     debugBelch("is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3364     break;
3365 #endif
3366   case BlockedOnDelay:
3367     debugBelch("is blocked until %d", tso->block_info.target);
3368     break;
3369   case BlockedOnMVar:
3370     debugBelch("is blocked on an MVar");
3371     break;
3372   case BlockedOnException:
3373     debugBelch("is blocked on delivering an exception to thread %d",
3374             tso->block_info.tso->id);
3375     break;
3376   case BlockedOnBlackHole:
3377     debugBelch("is blocked on a black hole");
3378     break;
3379   case NotBlocked:
3380     debugBelch("is not blocked");
3381     break;
3382 #if defined(PAR)
3383   case BlockedOnGA:
3384     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
3385             tso->block_info.closure, info_type(tso->block_info.closure));
3386     break;
3387   case BlockedOnGA_NoSend:
3388     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
3389             tso->block_info.closure, info_type(tso->block_info.closure));
3390     break;
3391 #endif
3392   case BlockedOnCCall:
3393     debugBelch("is blocked on an external call");
3394     break;
3395   case BlockedOnCCall_NoUnblockExc:
3396     debugBelch("is blocked on an external call (exceptions were already blocked)");
3397     break;
3398   default:
3399     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3400          tso->why_blocked, tso->id, tso);
3401   }
3402 }
3403
3404 static
3405 void
3406 printThreadStatus(StgTSO *tso)
3407 {
3408   switch (tso->what_next) {
3409   case ThreadKilled:
3410     debugBelch("has been killed");
3411     break;
3412   case ThreadComplete:
3413     debugBelch("has completed");
3414     break;
3415   default:
3416     printThreadBlockage(tso);
3417   }
3418 }
3419
3420 void
3421 printAllThreads(void)
3422 {
3423   StgTSO *t;
3424
3425 # if defined(GRAN)
3426   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3427   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3428                        time_string, rtsFalse/*no commas!*/);
3429
3430   debugBelch("all threads at [%s]:\n", time_string);
3431 # elif defined(PAR)
3432   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3433   ullong_format_string(CURRENT_TIME,
3434                        time_string, rtsFalse/*no commas!*/);
3435
3436   debugBelch("all threads at [%s]:\n", time_string);
3437 # else
3438   debugBelch("all threads:\n");
3439 # endif
3440
3441   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3442     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
3443 #if defined(DEBUG)
3444     {
3445       void *label = lookupThreadLabel(t->id);
3446       if (label) debugBelch("[\"%s\"] ",(char *)label);
3447     }
3448 #endif
3449     printThreadStatus(t);
3450     debugBelch("\n");
3451   }
3452 }
3453     
3454 #ifdef DEBUG
3455
3456 /* 
3457    Print a whole blocking queue attached to node (debugging only).
3458 */
3459 # if defined(PAR)
3460 void 
3461 print_bq (StgClosure *node)
3462 {
3463   StgBlockingQueueElement *bqe;
3464   StgTSO *tso;
3465   rtsBool end;
3466
3467   debugBelch("## BQ of closure %p (%s): ",
3468           node, info_type(node));
3469
3470   /* should cover all closures that may have a blocking queue */
3471   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3472          get_itbl(node)->type == FETCH_ME_BQ ||
3473          get_itbl(node)->type == RBH ||
3474          get_itbl(node)->type == MVAR);
3475     
3476   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3477
3478   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3479 }
3480
3481 /* 
3482    Print a whole blocking queue starting with the element bqe.
3483 */
3484 void 
3485 print_bqe (StgBlockingQueueElement *bqe)
3486 {
3487   rtsBool end;
3488
3489   /* 
3490      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3491   */
3492   for (end = (bqe==END_BQ_QUEUE);
3493        !end; // iterate until bqe points to a CONSTR
3494        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3495        bqe = end ? END_BQ_QUEUE : bqe->link) {
3496     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3497     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3498     /* types of closures that may appear in a blocking queue */
3499     ASSERT(get_itbl(bqe)->type == TSO ||           
3500            get_itbl(bqe)->type == BLOCKED_FETCH || 
3501            get_itbl(bqe)->type == CONSTR); 
3502     /* only BQs of an RBH end with an RBH_Save closure */
3503     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3504
3505     switch (get_itbl(bqe)->type) {
3506     case TSO:
3507       debugBelch(" TSO %u (%x),",
3508               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3509       break;
3510     case BLOCKED_FETCH:
3511       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
3512               ((StgBlockedFetch *)bqe)->node, 
3513               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3514               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3515               ((StgBlockedFetch *)bqe)->ga.weight);
3516       break;
3517     case CONSTR:
3518       debugBelch(" %s (IP %p),",
3519               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3520                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3521                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3522                "RBH_Save_?"), get_itbl(bqe));
3523       break;
3524     default:
3525       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3526            info_type((StgClosure *)bqe)); // , node, info_type(node));
3527       break;
3528     }
3529   } /* for */
3530   debugBelch("\n");
3531 }
3532 # elif defined(GRAN)
3533 void 
3534 print_bq (StgClosure *node)
3535 {
3536   StgBlockingQueueElement *bqe;
3537   PEs node_loc, tso_loc;
3538   rtsBool end;
3539
3540   /* should cover all closures that may have a blocking queue */
3541   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3542          get_itbl(node)->type == FETCH_ME_BQ ||
3543          get_itbl(node)->type == RBH);
3544     
3545   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3546   node_loc = where_is(node);
3547
3548   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
3549           node, info_type(node), node_loc);
3550
3551   /* 
3552      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3553   */
3554   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3555        !end; // iterate until bqe points to a CONSTR
3556        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3557     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3558     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3559     /* types of closures that may appear in a blocking queue */
3560     ASSERT(get_itbl(bqe)->type == TSO ||           
3561            get_itbl(bqe)->type == CONSTR); 
3562     /* only BQs of an RBH end with an RBH_Save closure */
3563     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3564
3565     tso_loc = where_is((StgClosure *)bqe);
3566     switch (get_itbl(bqe)->type) {
3567     case TSO:
3568       debugBelch(" TSO %d (%p) on [PE %d],",
3569               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3570       break;
3571     case CONSTR:
3572       debugBelch(" %s (IP %p),",
3573               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3574                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3575                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3576                "RBH_Save_?"), get_itbl(bqe));
3577       break;
3578     default:
3579       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3580            info_type((StgClosure *)bqe), node, info_type(node));
3581       break;
3582     }
3583   } /* for */
3584   debugBelch("\n");
3585 }
3586 #else
3587 /* 
3588    Nice and easy: only TSOs on the blocking queue
3589 */
3590 void 
3591 print_bq (StgClosure *node)
3592 {
3593   StgTSO *tso;
3594
3595   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3596   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3597        tso != END_TSO_QUEUE; 
3598        tso=tso->link) {
3599     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3600     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3601     debugBelch(" TSO %d (%p),", tso->id, tso);
3602   }
3603   debugBelch("\n");
3604 }
3605 # endif
3606
3607 #if defined(PAR)
3608 static nat
3609 run_queue_len(void)
3610 {
3611   nat i;
3612   StgTSO *tso;
3613
3614   for (i=0, tso=run_queue_hd; 
3615        tso != END_TSO_QUEUE;
3616        i++, tso=tso->link)
3617     /* nothing */
3618
3619   return i;
3620 }
3621 #endif
3622
3623 void
3624 sched_belch(char *s, ...)
3625 {
3626   va_list ap;
3627   va_start(ap,s);
3628 #ifdef RTS_SUPPORTS_THREADS
3629   debugBelch("sched (task %p): ", osThreadId());
3630 #elif defined(PAR)
3631   debugBelch("== ");
3632 #else
3633   debugBelch("sched: ");
3634 #endif
3635   vdebugBelch(s, ap);
3636   debugBelch("\n");
3637   va_end(ap);
3638 }
3639
3640 #endif /* DEBUG */