[project @ 2002-06-26 08:18:38 by stolz]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.146 2002/06/26 08:18:42 stolz Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #include "ThreadLabels.h"
100 #ifdef PROFILING
101 #include "Proftimer.h"
102 #include "ProfHeap.h"
103 #endif
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
111 # include "HLC.h"
112 #endif
113 #include "Sparks.h"
114 #include "Capability.h"
115 #include "OSThreads.h"
116 #include  "Task.h"
117
118 #ifdef HAVE_SYS_TYPES_H
119 #include <sys/types.h>
120 #endif
121 #ifdef HAVE_UNISTD_H
122 #include <unistd.h>
123 #endif
124
125 #include <stdarg.h>
126
127 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
128 //@subsection Variables and Data structures
129
130 /* Main thread queue.
131  * Locks required: sched_mutex.
132  */
133 StgMainThread *main_threads;
134
135 /* Thread queues.
136  * Locks required: sched_mutex.
137  */
138 #if defined(GRAN)
139
140 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
141 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
142
143 /* 
144    In GranSim we have a runnable and a blocked queue for each processor.
145    In order to minimise code changes new arrays run_queue_hds/tls
146    are created. run_queue_hd is then a short cut (macro) for
147    run_queue_hds[CurrentProc] (see GranSim.h).
148    -- HWL
149 */
150 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
151 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
152 StgTSO *ccalling_threadss[MAX_PROC];
153 /* We use the same global list of threads (all_threads) in GranSim as in
154    the std RTS (i.e. we are cheating). However, we don't use this list in
155    the GranSim specific code at the moment (so we are only potentially
156    cheating).  */
157
158 #else /* !GRAN */
159
160 StgTSO *run_queue_hd, *run_queue_tl;
161 StgTSO *blocked_queue_hd, *blocked_queue_tl;
162 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
163
164 #endif
165
166 /* Linked list of all threads.
167  * Used for detecting garbage collected threads.
168  */
169 StgTSO *all_threads;
170
171 /* When a thread performs a safe C call (_ccall_GC, using old
172  * terminology), it gets put on the suspended_ccalling_threads
173  * list. Used by the garbage collector.
174  */
175 static StgTSO *suspended_ccalling_threads;
176
177 static StgTSO *threadStackOverflow(StgTSO *tso);
178
179 /* KH: The following two flags are shared memory locations.  There is no need
180        to lock them, since they are only unset at the end of a scheduler
181        operation.
182 */
183
184 /* flag set by signal handler to precipitate a context switch */
185 //@cindex context_switch
186 nat context_switch;
187
188 /* if this flag is set as well, give up execution */
189 //@cindex interrupted
190 rtsBool interrupted;
191
192 /* Next thread ID to allocate.
193  * Locks required: thread_id_mutex
194  */
195 //@cindex next_thread_id
196 StgThreadID next_thread_id = 1;
197
198 /*
199  * Pointers to the state of the current thread.
200  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
201  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
202  */
203  
204 /* The smallest stack size that makes any sense is:
205  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
206  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
207  *  + 1                       (the realworld token for an IO thread)
208  *  + 1                       (the closure to enter)
209  *
210  * A thread with this stack will bomb immediately with a stack
211  * overflow, which will increase its stack size.  
212  */
213
214 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
215
216
217 #if defined(GRAN)
218 StgTSO *CurrentTSO;
219 #endif
220
221 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
222  *  exists - earlier gccs apparently didn't.
223  *  -= chak
224  */
225 StgTSO dummy_tso;
226
227 rtsBool ready_to_gc;
228
229 /*
230  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
231  * in an MT setting, needed to signal that a worker thread shouldn't hang around
232  * in the scheduler when it is out of work.
233  */
234 static rtsBool shutting_down_scheduler = rtsFalse;
235
236 void            addToBlockedQueue ( StgTSO *tso );
237
238 static void     schedule          ( void );
239        void     interruptStgRts   ( void );
240
241 static void     detectBlackHoles  ( void );
242
243 #ifdef DEBUG
244 static void sched_belch(char *s, ...);
245 #endif
246
247 #if defined(RTS_SUPPORTS_THREADS)
248 /* ToDo: carefully document the invariants that go together
249  *       with these synchronisation objects.
250  */
251 Mutex     sched_mutex       = INIT_MUTEX_VAR;
252 Mutex     term_mutex        = INIT_MUTEX_VAR;
253
254 /*
255  * A heavyweight solution to the problem of protecting
256  * the thread_id from concurrent update.
257  */
258 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
259
260
261 # if defined(SMP)
262 static Condition gc_pending_cond = INIT_COND_VAR;
263 nat await_death;
264 # endif
265
266 #endif /* RTS_SUPPORTS_THREADS */
267
268 #if defined(PAR)
269 StgTSO *LastTSO;
270 rtsTime TimeOfLastYield;
271 rtsBool emitSchedule = rtsTrue;
272 #endif
273
274 #if DEBUG
275 char *whatNext_strs[] = {
276   "ThreadEnterGHC",
277   "ThreadRunGHC",
278   "ThreadEnterInterp",
279   "ThreadKilled",
280   "ThreadComplete"
281 };
282
283 char *threadReturnCode_strs[] = {
284   "HeapOverflow",                       /* might also be StackOverflow */
285   "StackOverflow",
286   "ThreadYielding",
287   "ThreadBlocked",
288   "ThreadFinished"
289 };
290 #endif
291
292 #if defined(PAR)
293 StgTSO * createSparkThread(rtsSpark spark);
294 StgTSO * activateSpark (rtsSpark spark);  
295 #endif
296
297 /*
298  * The thread state for the main thread.
299 // ToDo: check whether not needed any more
300 StgTSO   *MainTSO;
301  */
302
303 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
304 static void taskStart(void);
305 static void
306 taskStart(void)
307 {
308   schedule();
309 }
310 #endif
311
312
313
314
315 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
316 //@subsection Main scheduling loop
317
318 /* ---------------------------------------------------------------------------
319    Main scheduling loop.
320
321    We use round-robin scheduling, each thread returning to the
322    scheduler loop when one of these conditions is detected:
323
324       * out of heap space
325       * timer expires (thread yields)
326       * thread blocks
327       * thread ends
328       * stack overflow
329
330    Locking notes:  we acquire the scheduler lock once at the beginning
331    of the scheduler loop, and release it when
332     
333       * running a thread, or
334       * waiting for work, or
335       * waiting for a GC to complete.
336
337    GRAN version:
338      In a GranSim setup this loop iterates over the global event queue.
339      This revolves around the global event queue, which determines what 
340      to do next. Therefore, it's more complicated than either the 
341      concurrent or the parallel (GUM) setup.
342
343    GUM version:
344      GUM iterates over incoming messages.
345      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
346      and sends out a fish whenever it has nothing to do; in-between
347      doing the actual reductions (shared code below) it processes the
348      incoming messages and deals with delayed operations 
349      (see PendingFetches).
350      This is not the ugliest code you could imagine, but it's bloody close.
351
352    ------------------------------------------------------------------------ */
353 //@cindex schedule
354 static void
355 schedule( void )
356 {
357   StgTSO *t;
358   Capability *cap;
359   StgThreadReturnCode ret;
360 #if defined(GRAN)
361   rtsEvent *event;
362 #elif defined(PAR)
363   StgSparkPool *pool;
364   rtsSpark spark;
365   StgTSO *tso;
366   GlobalTaskId pe;
367   rtsBool receivedFinish = rtsFalse;
368 # if defined(DEBUG)
369   nat tp_size, sp_size; // stats only
370 # endif
371 #endif
372   rtsBool was_interrupted = rtsFalse;
373   
374   ACQUIRE_LOCK(&sched_mutex);
375  
376 #if defined(RTS_SUPPORTS_THREADS)
377   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
378 #else
379   /* simply initialise it in the non-threaded case */
380   grabCapability(&cap);
381 #endif
382
383 #if defined(GRAN)
384   /* set up first event to get things going */
385   /* ToDo: assign costs for system setup and init MainTSO ! */
386   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
387             ContinueThread, 
388             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
389
390   IF_DEBUG(gran,
391            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
392            G_TSO(CurrentTSO, 5));
393
394   if (RtsFlags.GranFlags.Light) {
395     /* Save current time; GranSim Light only */
396     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
397   }      
398
399   event = get_next_event();
400
401   while (event!=(rtsEvent*)NULL) {
402     /* Choose the processor with the next event */
403     CurrentProc = event->proc;
404     CurrentTSO = event->tso;
405
406 #elif defined(PAR)
407
408   while (!receivedFinish) {    /* set by processMessages */
409                                /* when receiving PP_FINISH message         */ 
410 #else
411
412   while (1) {
413
414 #endif
415
416     IF_DEBUG(scheduler, printAllThreads());
417
418 #if defined(RTS_SUPPORTS_THREADS)
419     /* Check to see whether there are any worker threads
420        waiting to deposit external call results. If so,
421        yield our capability */
422     yieldToReturningWorker(&sched_mutex, &cap);
423 #endif
424
425     /* If we're interrupted (the user pressed ^C, or some other
426      * termination condition occurred), kill all the currently running
427      * threads.
428      */
429     if (interrupted) {
430       IF_DEBUG(scheduler, sched_belch("interrupted"));
431       deleteAllThreads();
432       interrupted = rtsFalse;
433       was_interrupted = rtsTrue;
434     }
435
436     /* Go through the list of main threads and wake up any
437      * clients whose computations have finished.  ToDo: this
438      * should be done more efficiently without a linear scan
439      * of the main threads list, somehow...
440      */
441 #if defined(RTS_SUPPORTS_THREADS)
442     { 
443       StgMainThread *m, **prev;
444       prev = &main_threads;
445       for (m = main_threads; m != NULL; m = m->link) {
446         switch (m->tso->what_next) {
447         case ThreadComplete:
448           if (m->ret) {
449             *(m->ret) = (StgClosure *)m->tso->sp[0];
450           }
451           *prev = m->link;
452           m->stat = Success;
453           broadcastCondition(&m->wakeup);
454 #ifdef DEBUG
455           removeThreadLabel(m->tso);
456 #endif
457           break;
458         case ThreadKilled:
459           if (m->ret) *(m->ret) = NULL;
460           *prev = m->link;
461           if (was_interrupted) {
462             m->stat = Interrupted;
463           } else {
464             m->stat = Killed;
465           }
466           broadcastCondition(&m->wakeup);
467 #ifdef DEBUG
468           removeThreadLabel(m->tso);
469 #endif
470           break;
471         default:
472           break;
473         }
474       }
475     }
476
477 #else /* not threaded */
478
479 # if defined(PAR)
480     /* in GUM do this only on the Main PE */
481     if (IAmMainThread)
482 # endif
483     /* If our main thread has finished or been killed, return.
484      */
485     {
486       StgMainThread *m = main_threads;
487       if (m->tso->what_next == ThreadComplete
488           || m->tso->what_next == ThreadKilled) {
489 #ifdef DEBUG
490         removeThreadLabel((StgWord)m->tso);
491 #endif
492         main_threads = main_threads->link;
493         if (m->tso->what_next == ThreadComplete) {
494           /* we finished successfully, fill in the return value */
495           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
496           m->stat = Success;
497           return;
498         } else {
499           if (m->ret) { *(m->ret) = NULL; };
500           if (was_interrupted) {
501             m->stat = Interrupted;
502           } else {
503             m->stat = Killed;
504           }
505           return;
506         }
507       }
508     }
509 #endif
510
511     /* Top up the run queue from our spark pool.  We try to make the
512      * number of threads in the run queue equal to the number of
513      * free capabilities.
514      *
515      * Disable spark support in SMP for now, non-essential & requires
516      * a little bit of work to make it compile cleanly. -- sof 1/02.
517      */
518 #if 0 /* defined(SMP) */
519     {
520       nat n = getFreeCapabilities();
521       StgTSO *tso = run_queue_hd;
522
523       /* Count the run queue */
524       while (n > 0 && tso != END_TSO_QUEUE) {
525         tso = tso->link;
526         n--;
527       }
528
529       for (; n > 0; n--) {
530         StgClosure *spark;
531         spark = findSpark(rtsFalse);
532         if (spark == NULL) {
533           break; /* no more sparks in the pool */
534         } else {
535           /* I'd prefer this to be done in activateSpark -- HWL */
536           /* tricky - it needs to hold the scheduler lock and
537            * not try to re-acquire it -- SDM */
538           createSparkThread(spark);       
539           IF_DEBUG(scheduler,
540                    sched_belch("==^^ turning spark of closure %p into a thread",
541                                (StgClosure *)spark));
542         }
543       }
544       /* We need to wake up the other tasks if we just created some
545        * work for them.
546        */
547       if (getFreeCapabilities() - n > 1) {
548           signalCondition( &thread_ready_cond );
549       }
550     }
551 #endif // SMP
552
553     /* check for signals each time around the scheduler */
554 #ifndef mingw32_TARGET_OS
555     if (signals_pending()) {
556       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
557       startSignalHandlers();
558       ACQUIRE_LOCK(&sched_mutex);
559     }
560 #endif
561
562     /* Check whether any waiting threads need to be woken up.  If the
563      * run queue is empty, and there are no other tasks running, we
564      * can wait indefinitely for something to happen.
565      * ToDo: what if another client comes along & requests another
566      * main thread?
567      */
568     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
569       awaitEvent( EMPTY_RUN_QUEUE()
570 #if defined(SMP)
571         && allFreeCapabilities()
572 #endif
573         );
574     }
575     /* we can be interrupted while waiting for I/O... */
576     if (interrupted) continue;
577
578     /* 
579      * Detect deadlock: when we have no threads to run, there are no
580      * threads waiting on I/O or sleeping, and all the other tasks are
581      * waiting for work, we must have a deadlock of some description.
582      *
583      * We first try to find threads blocked on themselves (ie. black
584      * holes), and generate NonTermination exceptions where necessary.
585      *
586      * If no threads are black holed, we have a deadlock situation, so
587      * inform all the main threads.
588      */
589 #ifndef PAR
590     if (   EMPTY_THREAD_QUEUES()
591 #if defined(RTS_SUPPORTS_THREADS)
592         && EMPTY_QUEUE(suspended_ccalling_threads)
593 #endif
594 #ifdef SMP
595         && allFreeCapabilities()
596 #endif
597         )
598     {
599         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
600 #if defined(THREADED_RTS)
601         /* and SMP mode ..? */
602         releaseCapability(cap);
603 #endif
604         // Garbage collection can release some new threads due to
605         // either (a) finalizers or (b) threads resurrected because
606         // they are about to be send BlockedOnDeadMVar.  Any threads
607         // thus released will be immediately runnable.
608         GarbageCollect(GetRoots,rtsTrue);
609
610         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
611
612         IF_DEBUG(scheduler, 
613                  sched_belch("still deadlocked, checking for black holes..."));
614         detectBlackHoles();
615
616         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
617
618 #ifndef mingw32_TARGET_OS
619         /* If we have user-installed signal handlers, then wait
620          * for signals to arrive rather then bombing out with a
621          * deadlock.
622          */
623 #if defined(RTS_SUPPORTS_THREADS)
624         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
625                       a signal with no runnable threads (or I/O
626                       suspended ones) leads nowhere quick.
627                       For now, simply shut down when we reach this
628                       condition.
629                       
630                       ToDo: define precisely under what conditions
631                       the Scheduler should shut down in an MT setting.
632                    */
633 #else
634         if ( anyUserHandlers() ) {
635 #endif
636             IF_DEBUG(scheduler, 
637                      sched_belch("still deadlocked, waiting for signals..."));
638
639             awaitUserSignals();
640
641             // we might be interrupted...
642             if (interrupted) { continue; }
643
644             if (signals_pending()) {
645                 RELEASE_LOCK(&sched_mutex);
646                 startSignalHandlers();
647                 ACQUIRE_LOCK(&sched_mutex);
648             }
649             ASSERT(!EMPTY_RUN_QUEUE());
650             goto not_deadlocked;
651         }
652 #endif
653
654         /* Probably a real deadlock.  Send the current main thread the
655          * Deadlock exception (or in the SMP build, send *all* main
656          * threads the deadlock exception, since none of them can make
657          * progress).
658          */
659         {
660             StgMainThread *m;
661 #if defined(RTS_SUPPORTS_THREADS)
662             for (m = main_threads; m != NULL; m = m->link) {
663                 switch (m->tso->why_blocked) {
664                 case BlockedOnBlackHole:
665                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
666                     break;
667                 case BlockedOnException:
668                 case BlockedOnMVar:
669                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
670                     break;
671                 default:
672                     barf("deadlock: main thread blocked in a strange way");
673                 }
674             }
675 #else
676             m = main_threads;
677             switch (m->tso->why_blocked) {
678             case BlockedOnBlackHole:
679                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
680                 break;
681             case BlockedOnException:
682             case BlockedOnMVar:
683                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
684                 break;
685             default:
686                 barf("deadlock: main thread blocked in a strange way");
687             }
688 #endif
689         }
690
691 #if defined(RTS_SUPPORTS_THREADS)
692         /* ToDo: revisit conditions (and mechanism) for shutting
693            down a multi-threaded world  */
694         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
695         RELEASE_LOCK(&sched_mutex);
696         shutdownHaskell();
697         return;
698 #endif
699     }
700   not_deadlocked:
701
702 #elif defined(PAR)
703     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
704 #endif
705
706 #if defined(SMP)
707     /* If there's a GC pending, don't do anything until it has
708      * completed.
709      */
710     if (ready_to_gc) {
711       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
712       waitCondition( &gc_pending_cond, &sched_mutex );
713     }
714 #endif    
715
716 #if defined(RTS_SUPPORTS_THREADS)
717     /* block until we've got a thread on the run queue and a free
718      * capability.
719      *
720      */
721     if ( EMPTY_RUN_QUEUE() ) {
722       /* Give up our capability */
723       releaseCapability(cap);
724
725       /* If we're in the process of shutting down (& running the
726        * a batch of finalisers), don't wait around.
727        */
728       if ( shutting_down_scheduler ) {
729         RELEASE_LOCK(&sched_mutex);
730         return;
731       }
732       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
733       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
734       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
735     }
736 #endif
737
738 #if defined(GRAN)
739     if (RtsFlags.GranFlags.Light)
740       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
741
742     /* adjust time based on time-stamp */
743     if (event->time > CurrentTime[CurrentProc] &&
744         event->evttype != ContinueThread)
745       CurrentTime[CurrentProc] = event->time;
746     
747     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
748     if (!RtsFlags.GranFlags.Light)
749       handleIdlePEs();
750
751     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
752
753     /* main event dispatcher in GranSim */
754     switch (event->evttype) {
755       /* Should just be continuing execution */
756     case ContinueThread:
757       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
758       /* ToDo: check assertion
759       ASSERT(run_queue_hd != (StgTSO*)NULL &&
760              run_queue_hd != END_TSO_QUEUE);
761       */
762       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
763       if (!RtsFlags.GranFlags.DoAsyncFetch &&
764           procStatus[CurrentProc]==Fetching) {
765         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
766               CurrentTSO->id, CurrentTSO, CurrentProc);
767         goto next_thread;
768       } 
769       /* Ignore ContinueThreads for completed threads */
770       if (CurrentTSO->what_next == ThreadComplete) {
771         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
772               CurrentTSO->id, CurrentTSO, CurrentProc);
773         goto next_thread;
774       } 
775       /* Ignore ContinueThreads for threads that are being migrated */
776       if (PROCS(CurrentTSO)==Nowhere) { 
777         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
778               CurrentTSO->id, CurrentTSO, CurrentProc);
779         goto next_thread;
780       }
781       /* The thread should be at the beginning of the run queue */
782       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
783         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
784               CurrentTSO->id, CurrentTSO, CurrentProc);
785         break; // run the thread anyway
786       }
787       /*
788       new_event(proc, proc, CurrentTime[proc],
789                 FindWork,
790                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
791       goto next_thread; 
792       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
793       break; // now actually run the thread; DaH Qu'vam yImuHbej 
794
795     case FetchNode:
796       do_the_fetchnode(event);
797       goto next_thread;             /* handle next event in event queue  */
798       
799     case GlobalBlock:
800       do_the_globalblock(event);
801       goto next_thread;             /* handle next event in event queue  */
802       
803     case FetchReply:
804       do_the_fetchreply(event);
805       goto next_thread;             /* handle next event in event queue  */
806       
807     case UnblockThread:   /* Move from the blocked queue to the tail of */
808       do_the_unblock(event);
809       goto next_thread;             /* handle next event in event queue  */
810       
811     case ResumeThread:  /* Move from the blocked queue to the tail of */
812       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
813       event->tso->gran.blocktime += 
814         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
815       do_the_startthread(event);
816       goto next_thread;             /* handle next event in event queue  */
817       
818     case StartThread:
819       do_the_startthread(event);
820       goto next_thread;             /* handle next event in event queue  */
821       
822     case MoveThread:
823       do_the_movethread(event);
824       goto next_thread;             /* handle next event in event queue  */
825       
826     case MoveSpark:
827       do_the_movespark(event);
828       goto next_thread;             /* handle next event in event queue  */
829       
830     case FindWork:
831       do_the_findwork(event);
832       goto next_thread;             /* handle next event in event queue  */
833       
834     default:
835       barf("Illegal event type %u\n", event->evttype);
836     }  /* switch */
837     
838     /* This point was scheduler_loop in the old RTS */
839
840     IF_DEBUG(gran, belch("GRAN: after main switch"));
841
842     TimeOfLastEvent = CurrentTime[CurrentProc];
843     TimeOfNextEvent = get_time_of_next_event();
844     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
845     // CurrentTSO = ThreadQueueHd;
846
847     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
848                          TimeOfNextEvent));
849
850     if (RtsFlags.GranFlags.Light) 
851       GranSimLight_leave_system(event, &ActiveTSO); 
852
853     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
854
855     IF_DEBUG(gran, 
856              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
857
858     /* in a GranSim setup the TSO stays on the run queue */
859     t = CurrentTSO;
860     /* Take a thread from the run queue. */
861     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
862
863     IF_DEBUG(gran, 
864              fprintf(stderr, "GRAN: About to run current thread, which is\n");
865              G_TSO(t,5));
866
867     context_switch = 0; // turned on via GranYield, checking events and time slice
868
869     IF_DEBUG(gran, 
870              DumpGranEvent(GR_SCHEDULE, t));
871
872     procStatus[CurrentProc] = Busy;
873
874 #elif defined(PAR)
875     if (PendingFetches != END_BF_QUEUE) {
876         processFetches();
877     }
878
879     /* ToDo: phps merge with spark activation above */
880     /* check whether we have local work and send requests if we have none */
881     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
882       /* :-[  no local threads => look out for local sparks */
883       /* the spark pool for the current PE */
884       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
885       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
886           pool->hd < pool->tl) {
887         /* 
888          * ToDo: add GC code check that we really have enough heap afterwards!!
889          * Old comment:
890          * If we're here (no runnable threads) and we have pending
891          * sparks, we must have a space problem.  Get enough space
892          * to turn one of those pending sparks into a
893          * thread... 
894          */
895
896         spark = findSpark(rtsFalse);                /* get a spark */
897         if (spark != (rtsSpark) NULL) {
898           tso = activateSpark(spark);       /* turn the spark into a thread */
899           IF_PAR_DEBUG(schedule,
900                        belch("==== schedule: Created TSO %d (%p); %d threads active",
901                              tso->id, tso, advisory_thread_count));
902
903           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
904             belch("==^^ failed to activate spark");
905             goto next_thread;
906           }               /* otherwise fall through & pick-up new tso */
907         } else {
908           IF_PAR_DEBUG(verbose,
909                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
910                              spark_queue_len(pool)));
911           goto next_thread;
912         }
913       }
914
915       /* If we still have no work we need to send a FISH to get a spark
916          from another PE 
917       */
918       if (EMPTY_RUN_QUEUE()) {
919       /* =8-[  no local sparks => look for work on other PEs */
920         /*
921          * We really have absolutely no work.  Send out a fish
922          * (there may be some out there already), and wait for
923          * something to arrive.  We clearly can't run any threads
924          * until a SCHEDULE or RESUME arrives, and so that's what
925          * we're hoping to see.  (Of course, we still have to
926          * respond to other types of messages.)
927          */
928         TIME now = msTime() /*CURRENT_TIME*/;
929         IF_PAR_DEBUG(verbose, 
930                      belch("--  now=%ld", now));
931         IF_PAR_DEBUG(verbose,
932                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
933                          (last_fish_arrived_at!=0 &&
934                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
935                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
936                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
937                              last_fish_arrived_at,
938                              RtsFlags.ParFlags.fishDelay, now);
939                      });
940         
941         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
942             (last_fish_arrived_at==0 ||
943              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
944           /* outstandingFishes is set in sendFish, processFish;
945              avoid flooding system with fishes via delay */
946           pe = choosePE();
947           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
948                    NEW_FISH_HUNGER);
949
950           // Global statistics: count no. of fishes
951           if (RtsFlags.ParFlags.ParStats.Global &&
952               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
953             globalParStats.tot_fish_mess++;
954           }
955         }
956       
957         receivedFinish = processMessages();
958         goto next_thread;
959       }
960     } else if (PacketsWaiting()) {  /* Look for incoming messages */
961       receivedFinish = processMessages();
962     }
963
964     /* Now we are sure that we have some work available */
965     ASSERT(run_queue_hd != END_TSO_QUEUE);
966
967     /* Take a thread from the run queue, if we have work */
968     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
969     IF_DEBUG(sanity,checkTSO(t));
970
971     /* ToDo: write something to the log-file
972     if (RTSflags.ParFlags.granSimStats && !sameThread)
973         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
974
975     CurrentTSO = t;
976     */
977     /* the spark pool for the current PE */
978     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
979
980     IF_DEBUG(scheduler, 
981              belch("--=^ %d threads, %d sparks on [%#x]", 
982                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
983
984 # if 1
985     if (0 && RtsFlags.ParFlags.ParStats.Full && 
986         t && LastTSO && t->id != LastTSO->id && 
987         LastTSO->why_blocked == NotBlocked && 
988         LastTSO->what_next != ThreadComplete) {
989       // if previously scheduled TSO not blocked we have to record the context switch
990       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
991                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
992     }
993
994     if (RtsFlags.ParFlags.ParStats.Full && 
995         (emitSchedule /* forced emit */ ||
996         (t && LastTSO && t->id != LastTSO->id))) {
997       /* 
998          we are running a different TSO, so write a schedule event to log file
999          NB: If we use fair scheduling we also have to write  a deschedule 
1000              event for LastTSO; with unfair scheduling we know that the
1001              previous tso has blocked whenever we switch to another tso, so
1002              we don't need it in GUM for now
1003       */
1004       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1005                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1006       emitSchedule = rtsFalse;
1007     }
1008      
1009 # endif
1010 #else /* !GRAN && !PAR */
1011   
1012     /* grab a thread from the run queue */
1013     ASSERT(run_queue_hd != END_TSO_QUEUE);
1014     t = POP_RUN_QUEUE();
1015     // Sanity check the thread we're about to run.  This can be
1016     // expensive if there is lots of thread switching going on...
1017     IF_DEBUG(sanity,checkTSO(t));
1018 #endif
1019     
1020     cap->r.rCurrentTSO = t;
1021     
1022     /* context switches are now initiated by the timer signal, unless
1023      * the user specified "context switch as often as possible", with
1024      * +RTS -C0
1025      */
1026     if (
1027 #ifdef PROFILING
1028         RtsFlags.ProfFlags.profileInterval == 0 ||
1029 #endif
1030         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1031          && (run_queue_hd != END_TSO_QUEUE
1032              || blocked_queue_hd != END_TSO_QUEUE
1033              || sleeping_queue != END_TSO_QUEUE)))
1034         context_switch = 1;
1035     else
1036         context_switch = 0;
1037
1038     RELEASE_LOCK(&sched_mutex);
1039
1040     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1041                               t->id, t, whatNext_strs[t->what_next]));
1042
1043 #ifdef PROFILING
1044     startHeapProfTimer();
1045 #endif
1046
1047     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1048     /* Run the current thread 
1049      */
1050     switch (cap->r.rCurrentTSO->what_next) {
1051     case ThreadKilled:
1052     case ThreadComplete:
1053         /* Thread already finished, return to scheduler. */
1054         ret = ThreadFinished;
1055         break;
1056     case ThreadEnterGHC:
1057         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1058         break;
1059     case ThreadRunGHC:
1060         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1061         break;
1062     case ThreadEnterInterp:
1063         ret = interpretBCO(cap);
1064         break;
1065     default:
1066       barf("schedule: invalid what_next field");
1067     }
1068     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1069     
1070     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1071 #ifdef PROFILING
1072     stopHeapProfTimer();
1073     CCCS = CCS_SYSTEM;
1074 #endif
1075     
1076     ACQUIRE_LOCK(&sched_mutex);
1077
1078 #ifdef SMP
1079     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1080 #elif !defined(GRAN) && !defined(PAR)
1081     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1082 #endif
1083     t = cap->r.rCurrentTSO;
1084     
1085 #if defined(PAR)
1086     /* HACK 675: if the last thread didn't yield, make sure to print a 
1087        SCHEDULE event to the log file when StgRunning the next thread, even
1088        if it is the same one as before */
1089     LastTSO = t; 
1090     TimeOfLastYield = CURRENT_TIME;
1091 #endif
1092
1093     switch (ret) {
1094     case HeapOverflow:
1095 #if defined(GRAN)
1096       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1097       globalGranStats.tot_heapover++;
1098 #elif defined(PAR)
1099       globalParStats.tot_heapover++;
1100 #endif
1101
1102       // did the task ask for a large block?
1103       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1104           // if so, get one and push it on the front of the nursery.
1105           bdescr *bd;
1106           nat blocks;
1107           
1108           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1109
1110           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1111                                    t->id, t,
1112                                    whatNext_strs[t->what_next], blocks));
1113
1114           // don't do this if it would push us over the
1115           // alloc_blocks_lim limit; we'll GC first.
1116           if (alloc_blocks + blocks < alloc_blocks_lim) {
1117
1118               alloc_blocks += blocks;
1119               bd = allocGroup( blocks );
1120
1121               // link the new group into the list
1122               bd->link = cap->r.rCurrentNursery;
1123               bd->u.back = cap->r.rCurrentNursery->u.back;
1124               if (cap->r.rCurrentNursery->u.back != NULL) {
1125                   cap->r.rCurrentNursery->u.back->link = bd;
1126               } else {
1127                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1128                          g0s0->blocks == cap->r.rNursery);
1129                   cap->r.rNursery = g0s0->blocks = bd;
1130               }           
1131               cap->r.rCurrentNursery->u.back = bd;
1132
1133               // initialise it as a nursery block
1134               bd->step = g0s0;
1135               bd->gen_no = 0;
1136               bd->flags = 0;
1137               bd->free = bd->start;
1138
1139               // don't forget to update the block count in g0s0.
1140               g0s0->n_blocks += blocks;
1141               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1142
1143               // now update the nursery to point to the new block
1144               cap->r.rCurrentNursery = bd;
1145
1146               // we might be unlucky and have another thread get on the
1147               // run queue before us and steal the large block, but in that
1148               // case the thread will just end up requesting another large
1149               // block.
1150               PUSH_ON_RUN_QUEUE(t);
1151               break;
1152           }
1153       }
1154
1155       /* make all the running tasks block on a condition variable,
1156        * maybe set context_switch and wait till they all pile in,
1157        * then have them wait on a GC condition variable.
1158        */
1159       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1160                                t->id, t, whatNext_strs[t->what_next]));
1161       threadPaused(t);
1162 #if defined(GRAN)
1163       ASSERT(!is_on_queue(t,CurrentProc));
1164 #elif defined(PAR)
1165       /* Currently we emit a DESCHEDULE event before GC in GUM.
1166          ToDo: either add separate event to distinguish SYSTEM time from rest
1167                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1168       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1169         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1170                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1171         emitSchedule = rtsTrue;
1172       }
1173 #endif
1174       
1175       ready_to_gc = rtsTrue;
1176       context_switch = 1;               /* stop other threads ASAP */
1177       PUSH_ON_RUN_QUEUE(t);
1178       /* actual GC is done at the end of the while loop */
1179       break;
1180       
1181     case StackOverflow:
1182 #if defined(GRAN)
1183       IF_DEBUG(gran, 
1184                DumpGranEvent(GR_DESCHEDULE, t));
1185       globalGranStats.tot_stackover++;
1186 #elif defined(PAR)
1187       // IF_DEBUG(par, 
1188       // DumpGranEvent(GR_DESCHEDULE, t);
1189       globalParStats.tot_stackover++;
1190 #endif
1191       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1192                                t->id, t, whatNext_strs[t->what_next]));
1193       /* just adjust the stack for this thread, then pop it back
1194        * on the run queue.
1195        */
1196       threadPaused(t);
1197       { 
1198         StgMainThread *m;
1199         /* enlarge the stack */
1200         StgTSO *new_t = threadStackOverflow(t);
1201         
1202         /* This TSO has moved, so update any pointers to it from the
1203          * main thread stack.  It better not be on any other queues...
1204          * (it shouldn't be).
1205          */
1206         for (m = main_threads; m != NULL; m = m->link) {
1207           if (m->tso == t) {
1208             m->tso = new_t;
1209           }
1210         }
1211         threadPaused(new_t);
1212         PUSH_ON_RUN_QUEUE(new_t);
1213       }
1214       break;
1215
1216     case ThreadYielding:
1217 #if defined(GRAN)
1218       IF_DEBUG(gran, 
1219                DumpGranEvent(GR_DESCHEDULE, t));
1220       globalGranStats.tot_yields++;
1221 #elif defined(PAR)
1222       // IF_DEBUG(par, 
1223       // DumpGranEvent(GR_DESCHEDULE, t);
1224       globalParStats.tot_yields++;
1225 #endif
1226       /* put the thread back on the run queue.  Then, if we're ready to
1227        * GC, check whether this is the last task to stop.  If so, wake
1228        * up the GC thread.  getThread will block during a GC until the
1229        * GC is finished.
1230        */
1231       IF_DEBUG(scheduler,
1232                if (t->what_next == ThreadEnterInterp) {
1233                    /* ToDo: or maybe a timer expired when we were in Hugs?
1234                     * or maybe someone hit ctrl-C
1235                     */
1236                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1237                          t->id, t, whatNext_strs[t->what_next]);
1238                } else {
1239                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1240                          t->id, t, whatNext_strs[t->what_next]);
1241                }
1242                );
1243
1244       threadPaused(t);
1245
1246       IF_DEBUG(sanity,
1247                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1248                checkTSO(t));
1249       ASSERT(t->link == END_TSO_QUEUE);
1250 #if defined(GRAN)
1251       ASSERT(!is_on_queue(t,CurrentProc));
1252
1253       IF_DEBUG(sanity,
1254                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1255                checkThreadQsSanity(rtsTrue));
1256 #endif
1257 #if defined(PAR)
1258       if (RtsFlags.ParFlags.doFairScheduling) { 
1259         /* this does round-robin scheduling; good for concurrency */
1260         APPEND_TO_RUN_QUEUE(t);
1261       } else {
1262         /* this does unfair scheduling; good for parallelism */
1263         PUSH_ON_RUN_QUEUE(t);
1264       }
1265 #else
1266       /* this does round-robin scheduling; good for concurrency */
1267       APPEND_TO_RUN_QUEUE(t);
1268 #endif
1269 #if defined(GRAN)
1270       /* add a ContinueThread event to actually process the thread */
1271       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1272                 ContinueThread,
1273                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1274       IF_GRAN_DEBUG(bq, 
1275                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1276                G_EVENTQ(0);
1277                G_CURR_THREADQ(0));
1278 #endif /* GRAN */
1279       break;
1280       
1281     case ThreadBlocked:
1282 #if defined(GRAN)
1283       IF_DEBUG(scheduler,
1284                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1285                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1286                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1287
1288       // ??? needed; should emit block before
1289       IF_DEBUG(gran, 
1290                DumpGranEvent(GR_DESCHEDULE, t)); 
1291       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1292       /*
1293         ngoq Dogh!
1294       ASSERT(procStatus[CurrentProc]==Busy || 
1295               ((procStatus[CurrentProc]==Fetching) && 
1296               (t->block_info.closure!=(StgClosure*)NULL)));
1297       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1298           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1299             procStatus[CurrentProc]==Fetching)) 
1300         procStatus[CurrentProc] = Idle;
1301       */
1302 #elif defined(PAR)
1303       IF_DEBUG(scheduler,
1304                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1305                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1306       IF_PAR_DEBUG(bq,
1307
1308                    if (t->block_info.closure!=(StgClosure*)NULL) 
1309                      print_bq(t->block_info.closure));
1310
1311       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1312       blockThread(t);
1313
1314       /* whatever we schedule next, we must log that schedule */
1315       emitSchedule = rtsTrue;
1316
1317 #else /* !GRAN */
1318       /* don't need to do anything.  Either the thread is blocked on
1319        * I/O, in which case we'll have called addToBlockedQueue
1320        * previously, or it's blocked on an MVar or Blackhole, in which
1321        * case it'll be on the relevant queue already.
1322        */
1323       IF_DEBUG(scheduler,
1324                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1325                printThreadBlockage(t);
1326                fprintf(stderr, "\n"));
1327
1328       /* Only for dumping event to log file 
1329          ToDo: do I need this in GranSim, too?
1330       blockThread(t);
1331       */
1332 #endif
1333       threadPaused(t);
1334       break;
1335       
1336     case ThreadFinished:
1337       /* Need to check whether this was a main thread, and if so, signal
1338        * the task that started it with the return value.  If we have no
1339        * more main threads, we probably need to stop all the tasks until
1340        * we get a new one.
1341        */
1342       /* We also end up here if the thread kills itself with an
1343        * uncaught exception, see Exception.hc.
1344        */
1345       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1346 #if defined(GRAN)
1347       endThread(t, CurrentProc); // clean-up the thread
1348 #elif defined(PAR)
1349       /* For now all are advisory -- HWL */
1350       //if(t->priority==AdvisoryPriority) ??
1351       advisory_thread_count--;
1352       
1353 # ifdef DIST
1354       if(t->dist.priority==RevalPriority)
1355         FinishReval(t);
1356 # endif
1357       
1358       if (RtsFlags.ParFlags.ParStats.Full &&
1359           !RtsFlags.ParFlags.ParStats.Suppressed) 
1360         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1361 #endif
1362       break;
1363       
1364     default:
1365       barf("schedule: invalid thread return code %d", (int)ret);
1366     }
1367
1368 #ifdef PROFILING
1369     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1370         GarbageCollect(GetRoots, rtsTrue);
1371         heapCensus();
1372         performHeapProfile = rtsFalse;
1373         ready_to_gc = rtsFalse; // we already GC'd
1374     }
1375 #endif
1376
1377     if (ready_to_gc 
1378 #ifdef SMP
1379         && allFreeCapabilities() 
1380 #endif
1381         ) {
1382       /* everybody back, start the GC.
1383        * Could do it in this thread, or signal a condition var
1384        * to do it in another thread.  Either way, we need to
1385        * broadcast on gc_pending_cond afterward.
1386        */
1387 #if defined(RTS_SUPPORTS_THREADS)
1388       IF_DEBUG(scheduler,sched_belch("doing GC"));
1389 #endif
1390       GarbageCollect(GetRoots,rtsFalse);
1391       ready_to_gc = rtsFalse;
1392 #ifdef SMP
1393       broadcastCondition(&gc_pending_cond);
1394 #endif
1395 #if defined(GRAN)
1396       /* add a ContinueThread event to continue execution of current thread */
1397       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1398                 ContinueThread,
1399                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1400       IF_GRAN_DEBUG(bq, 
1401                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1402                G_EVENTQ(0);
1403                G_CURR_THREADQ(0));
1404 #endif /* GRAN */
1405     }
1406
1407 #if defined(GRAN)
1408   next_thread:
1409     IF_GRAN_DEBUG(unused,
1410                   print_eventq(EventHd));
1411
1412     event = get_next_event();
1413 #elif defined(PAR)
1414   next_thread:
1415     /* ToDo: wait for next message to arrive rather than busy wait */
1416 #endif /* GRAN */
1417
1418   } /* end of while(1) */
1419
1420   IF_PAR_DEBUG(verbose,
1421                belch("== Leaving schedule() after having received Finish"));
1422 }
1423
1424 /* ---------------------------------------------------------------------------
1425  * Singleton fork(). Do not copy any running threads.
1426  * ------------------------------------------------------------------------- */
1427
1428 StgInt forkProcess(StgTSO* tso) {
1429
1430 #ifndef mingw32_TARGET_OS
1431   pid_t pid;
1432   StgTSO* t,*next;
1433
1434   IF_DEBUG(scheduler,sched_belch("forking!"));
1435
1436   pid = fork();
1437   if (pid) { /* parent */
1438
1439   /* just return the pid */
1440     
1441   } else { /* child */
1442   /* wipe all other threads */
1443   run_queue_hd = tso;
1444   tso->link = END_TSO_QUEUE;
1445
1446   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1447      us is picky about finding the threat still in its queue when
1448      handling the deleteThread() */
1449
1450   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1451     next = t->link;
1452     if (t->id != tso->id) {
1453       deleteThread(t);
1454     }
1455   }
1456   }
1457   return pid;
1458 #else /* mingw32 */
1459   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1460   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1461   return -1;
1462 #endif /* mingw32 */
1463 }
1464
1465 /* ---------------------------------------------------------------------------
1466  * deleteAllThreads():  kill all the live threads.
1467  *
1468  * This is used when we catch a user interrupt (^C), before performing
1469  * any necessary cleanups and running finalizers.
1470  *
1471  * Locks: sched_mutex held.
1472  * ------------------------------------------------------------------------- */
1473    
1474 void deleteAllThreads ( void )
1475 {
1476   StgTSO* t, *next;
1477   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1478   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1479       next = t->global_link;
1480       deleteThread(t);
1481   }      
1482   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1483   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1484   sleeping_queue = END_TSO_QUEUE;
1485 }
1486
1487 /* startThread and  insertThread are now in GranSim.c -- HWL */
1488
1489
1490 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1491 //@subsection Suspend and Resume
1492
1493 /* ---------------------------------------------------------------------------
1494  * Suspending & resuming Haskell threads.
1495  * 
1496  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1497  * its capability before calling the C function.  This allows another
1498  * task to pick up the capability and carry on running Haskell
1499  * threads.  It also means that if the C call blocks, it won't lock
1500  * the whole system.
1501  *
1502  * The Haskell thread making the C call is put to sleep for the
1503  * duration of the call, on the susepended_ccalling_threads queue.  We
1504  * give out a token to the task, which it can use to resume the thread
1505  * on return from the C function.
1506  * ------------------------------------------------------------------------- */
1507    
1508 StgInt
1509 suspendThread( StgRegTable *reg, 
1510                rtsBool concCall
1511 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1512                STG_UNUSED
1513 #endif
1514                )
1515 {
1516   nat tok;
1517   Capability *cap;
1518
1519   /* assume that *reg is a pointer to the StgRegTable part
1520    * of a Capability.
1521    */
1522   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1523
1524   ACQUIRE_LOCK(&sched_mutex);
1525
1526   IF_DEBUG(scheduler,
1527            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1528
1529   threadPaused(cap->r.rCurrentTSO);
1530   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1531   suspended_ccalling_threads = cap->r.rCurrentTSO;
1532
1533 #if defined(RTS_SUPPORTS_THREADS)
1534   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1535 #endif
1536
1537   /* Use the thread ID as the token; it should be unique */
1538   tok = cap->r.rCurrentTSO->id;
1539
1540   /* Hand back capability */
1541   releaseCapability(cap);
1542   
1543 #if defined(RTS_SUPPORTS_THREADS)
1544   /* Preparing to leave the RTS, so ensure there's a native thread/task
1545      waiting to take over.
1546      
1547      ToDo: optimise this and only create a new task if there's a need
1548      for one (i.e., if there's only one Concurrent Haskell thread alive,
1549      there's no need to create a new task).
1550   */
1551   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1552   if (concCall) {
1553     startTask(taskStart);
1554   }
1555 #endif
1556
1557   /* Other threads _might_ be available for execution; signal this */
1558   THREAD_RUNNABLE();
1559   RELEASE_LOCK(&sched_mutex);
1560   return tok; 
1561 }
1562
1563 StgRegTable *
1564 resumeThread( StgInt tok,
1565               rtsBool concCall
1566 #if !defined(RTS_SUPPORTS_THREADS)
1567                STG_UNUSED
1568 #endif
1569               )
1570 {
1571   StgTSO *tso, **prev;
1572   Capability *cap;
1573
1574 #if defined(RTS_SUPPORTS_THREADS)
1575   /* Wait for permission to re-enter the RTS with the result. */
1576   if ( concCall ) {
1577     ACQUIRE_LOCK(&sched_mutex);
1578     grabReturnCapability(&sched_mutex, &cap);
1579   } else {
1580     grabCapability(&cap);
1581   }
1582 #else
1583   grabCapability(&cap);
1584 #endif
1585
1586   /* Remove the thread off of the suspended list */
1587   prev = &suspended_ccalling_threads;
1588   for (tso = suspended_ccalling_threads; 
1589        tso != END_TSO_QUEUE; 
1590        prev = &tso->link, tso = tso->link) {
1591     if (tso->id == (StgThreadID)tok) {
1592       *prev = tso->link;
1593       break;
1594     }
1595   }
1596   if (tso == END_TSO_QUEUE) {
1597     barf("resumeThread: thread not found");
1598   }
1599   tso->link = END_TSO_QUEUE;
1600   /* Reset blocking status */
1601   tso->why_blocked  = NotBlocked;
1602
1603   cap->r.rCurrentTSO = tso;
1604   RELEASE_LOCK(&sched_mutex);
1605   return &cap->r;
1606 }
1607
1608
1609 /* ---------------------------------------------------------------------------
1610  * Static functions
1611  * ------------------------------------------------------------------------ */
1612 static void unblockThread(StgTSO *tso);
1613
1614 /* ---------------------------------------------------------------------------
1615  * Comparing Thread ids.
1616  *
1617  * This is used from STG land in the implementation of the
1618  * instances of Eq/Ord for ThreadIds.
1619  * ------------------------------------------------------------------------ */
1620
1621 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1622
1623   StgThreadID id1 = tso1->id; 
1624   StgThreadID id2 = tso2->id;
1625  
1626   if (id1 < id2) return (-1);
1627   if (id1 > id2) return 1;
1628   return 0;
1629 }
1630
1631 /* ---------------------------------------------------------------------------
1632  * Fetching the ThreadID from an StgTSO.
1633  *
1634  * This is used in the implementation of Show for ThreadIds.
1635  * ------------------------------------------------------------------------ */
1636 int rts_getThreadId(const StgTSO *tso) 
1637 {
1638   return tso->id;
1639 }
1640
1641 #ifdef DEBUG
1642 void labelThread(StgTSO *tso, char *label)
1643 {
1644   int len;
1645   void *buf;
1646
1647   /* Caveat: Once set, you can only set the thread name to "" */
1648   len = strlen(label)+1;
1649   buf = malloc(len);
1650   if (buf == NULL) {
1651     fprintf(stderr,"insufficient memory for labelThread!\n");
1652   } else
1653     strncpy(buf,label,len);
1654   /* Update will free the old memory for us */
1655   updateThreadLabel((StgWord)tso,buf);
1656 }
1657 #endif /* DEBUG */
1658
1659 /* ---------------------------------------------------------------------------
1660    Create a new thread.
1661
1662    The new thread starts with the given stack size.  Before the
1663    scheduler can run, however, this thread needs to have a closure
1664    (and possibly some arguments) pushed on its stack.  See
1665    pushClosure() in Schedule.h.
1666
1667    createGenThread() and createIOThread() (in SchedAPI.h) are
1668    convenient packaged versions of this function.
1669
1670    currently pri (priority) is only used in a GRAN setup -- HWL
1671    ------------------------------------------------------------------------ */
1672 //@cindex createThread
1673 #if defined(GRAN)
1674 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1675 StgTSO *
1676 createThread(nat size, StgInt pri)
1677 #else
1678 StgTSO *
1679 createThread(nat size)
1680 #endif
1681 {
1682
1683     StgTSO *tso;
1684     nat stack_size;
1685
1686     /* First check whether we should create a thread at all */
1687 #if defined(PAR)
1688   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1689   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1690     threadsIgnored++;
1691     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1692           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1693     return END_TSO_QUEUE;
1694   }
1695   threadsCreated++;
1696 #endif
1697
1698 #if defined(GRAN)
1699   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1700 #endif
1701
1702   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1703
1704   /* catch ridiculously small stack sizes */
1705   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1706     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1707   }
1708
1709   stack_size = size - TSO_STRUCT_SIZEW;
1710
1711   tso = (StgTSO *)allocate(size);
1712   TICK_ALLOC_TSO(stack_size, 0);
1713
1714   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1715 #if defined(GRAN)
1716   SET_GRAN_HDR(tso, ThisPE);
1717 #endif
1718   tso->what_next     = ThreadEnterGHC;
1719
1720   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1721    * protect the increment operation on next_thread_id.
1722    * In future, we could use an atomic increment instead.
1723    */
1724   ACQUIRE_LOCK(&thread_id_mutex);
1725   tso->id = next_thread_id++; 
1726   RELEASE_LOCK(&thread_id_mutex);
1727
1728   tso->why_blocked  = NotBlocked;
1729   tso->blocked_exceptions = NULL;
1730
1731   tso->stack_size   = stack_size;
1732   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1733                               - TSO_STRUCT_SIZEW;
1734   tso->sp           = (P_)&(tso->stack) + stack_size;
1735
1736 #ifdef PROFILING
1737   tso->prof.CCCS = CCS_MAIN;
1738 #endif
1739
1740   /* put a stop frame on the stack */
1741   tso->sp -= sizeofW(StgStopFrame);
1742   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1743   tso->su = (StgUpdateFrame*)tso->sp;
1744
1745   // ToDo: check this
1746 #if defined(GRAN)
1747   tso->link = END_TSO_QUEUE;
1748   /* uses more flexible routine in GranSim */
1749   insertThread(tso, CurrentProc);
1750 #else
1751   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1752    * from its creation
1753    */
1754 #endif
1755
1756 #if defined(GRAN) 
1757   if (RtsFlags.GranFlags.GranSimStats.Full) 
1758     DumpGranEvent(GR_START,tso);
1759 #elif defined(PAR)
1760   if (RtsFlags.ParFlags.ParStats.Full) 
1761     DumpGranEvent(GR_STARTQ,tso);
1762   /* HACk to avoid SCHEDULE 
1763      LastTSO = tso; */
1764 #endif
1765
1766   /* Link the new thread on the global thread list.
1767    */
1768   tso->global_link = all_threads;
1769   all_threads = tso;
1770
1771 #if defined(DIST)
1772   tso->dist.priority = MandatoryPriority; //by default that is...
1773 #endif
1774
1775 #if defined(GRAN)
1776   tso->gran.pri = pri;
1777 # if defined(DEBUG)
1778   tso->gran.magic = TSO_MAGIC; // debugging only
1779 # endif
1780   tso->gran.sparkname   = 0;
1781   tso->gran.startedat   = CURRENT_TIME; 
1782   tso->gran.exported    = 0;
1783   tso->gran.basicblocks = 0;
1784   tso->gran.allocs      = 0;
1785   tso->gran.exectime    = 0;
1786   tso->gran.fetchtime   = 0;
1787   tso->gran.fetchcount  = 0;
1788   tso->gran.blocktime   = 0;
1789   tso->gran.blockcount  = 0;
1790   tso->gran.blockedat   = 0;
1791   tso->gran.globalsparks = 0;
1792   tso->gran.localsparks  = 0;
1793   if (RtsFlags.GranFlags.Light)
1794     tso->gran.clock  = Now; /* local clock */
1795   else
1796     tso->gran.clock  = 0;
1797
1798   IF_DEBUG(gran,printTSO(tso));
1799 #elif defined(PAR)
1800 # if defined(DEBUG)
1801   tso->par.magic = TSO_MAGIC; // debugging only
1802 # endif
1803   tso->par.sparkname   = 0;
1804   tso->par.startedat   = CURRENT_TIME; 
1805   tso->par.exported    = 0;
1806   tso->par.basicblocks = 0;
1807   tso->par.allocs      = 0;
1808   tso->par.exectime    = 0;
1809   tso->par.fetchtime   = 0;
1810   tso->par.fetchcount  = 0;
1811   tso->par.blocktime   = 0;
1812   tso->par.blockcount  = 0;
1813   tso->par.blockedat   = 0;
1814   tso->par.globalsparks = 0;
1815   tso->par.localsparks  = 0;
1816 #endif
1817
1818 #if defined(GRAN)
1819   globalGranStats.tot_threads_created++;
1820   globalGranStats.threads_created_on_PE[CurrentProc]++;
1821   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1822   globalGranStats.tot_sq_probes++;
1823 #elif defined(PAR)
1824   // collect parallel global statistics (currently done together with GC stats)
1825   if (RtsFlags.ParFlags.ParStats.Global &&
1826       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1827     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1828     globalParStats.tot_threads_created++;
1829   }
1830 #endif 
1831
1832 #if defined(GRAN)
1833   IF_GRAN_DEBUG(pri,
1834                 belch("==__ schedule: Created TSO %d (%p);",
1835                       CurrentProc, tso, tso->id));
1836 #elif defined(PAR)
1837     IF_PAR_DEBUG(verbose,
1838                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1839                        tso->id, tso, advisory_thread_count));
1840 #else
1841   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1842                                  tso->id, tso->stack_size));
1843 #endif    
1844   return tso;
1845 }
1846
1847 #if defined(PAR)
1848 /* RFP:
1849    all parallel thread creation calls should fall through the following routine.
1850 */
1851 StgTSO *
1852 createSparkThread(rtsSpark spark) 
1853 { StgTSO *tso;
1854   ASSERT(spark != (rtsSpark)NULL);
1855   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1856   { threadsIgnored++;
1857     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1858           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1859     return END_TSO_QUEUE;
1860   }
1861   else
1862   { threadsCreated++;
1863     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1864     if (tso==END_TSO_QUEUE)     
1865       barf("createSparkThread: Cannot create TSO");
1866 #if defined(DIST)
1867     tso->priority = AdvisoryPriority;
1868 #endif
1869     pushClosure(tso,spark);
1870     PUSH_ON_RUN_QUEUE(tso);
1871     advisory_thread_count++;    
1872   }
1873   return tso;
1874 }
1875 #endif
1876
1877 /*
1878   Turn a spark into a thread.
1879   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1880 */
1881 #if defined(PAR)
1882 //@cindex activateSpark
1883 StgTSO *
1884 activateSpark (rtsSpark spark) 
1885 {
1886   StgTSO *tso;
1887
1888   tso = createSparkThread(spark);
1889   if (RtsFlags.ParFlags.ParStats.Full) {   
1890     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1891     IF_PAR_DEBUG(verbose,
1892                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1893                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1894   }
1895   // ToDo: fwd info on local/global spark to thread -- HWL
1896   // tso->gran.exported =  spark->exported;
1897   // tso->gran.locked =   !spark->global;
1898   // tso->gran.sparkname = spark->name;
1899
1900   return tso;
1901 }
1902 #endif
1903
1904 static SchedulerStatus waitThread_(/*out*/StgMainThread* m
1905 #if defined(THREADED_RTS)
1906                                    , rtsBool blockWaiting
1907 #endif
1908                                    );
1909
1910
1911 /* ---------------------------------------------------------------------------
1912  * scheduleThread()
1913  *
1914  * scheduleThread puts a thread on the head of the runnable queue.
1915  * This will usually be done immediately after a thread is created.
1916  * The caller of scheduleThread must create the thread using e.g.
1917  * createThread and push an appropriate closure
1918  * on this thread's stack before the scheduler is invoked.
1919  * ------------------------------------------------------------------------ */
1920
1921 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1922
1923 void
1924 scheduleThread_(StgTSO *tso
1925                , rtsBool createTask
1926 #if !defined(THREADED_RTS)
1927                  STG_UNUSED
1928 #endif
1929               )
1930 {
1931   ACQUIRE_LOCK(&sched_mutex);
1932
1933   /* Put the new thread on the head of the runnable queue.  The caller
1934    * better push an appropriate closure on this thread's stack
1935    * beforehand.  In the SMP case, the thread may start running as
1936    * soon as we release the scheduler lock below.
1937    */
1938   PUSH_ON_RUN_QUEUE(tso);
1939 #if defined(THREADED_RTS)
1940   /* If main() is scheduling a thread, don't bother creating a 
1941    * new task.
1942    */
1943   if ( createTask ) {
1944     startTask(taskStart);
1945   }
1946 #endif
1947   THREAD_RUNNABLE();
1948
1949 #if 0
1950   IF_DEBUG(scheduler,printTSO(tso));
1951 #endif
1952   RELEASE_LOCK(&sched_mutex);
1953 }
1954
1955 void scheduleThread(StgTSO* tso)
1956 {
1957   scheduleThread_(tso, rtsFalse);
1958 }
1959
1960 SchedulerStatus
1961 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
1962 {
1963   StgMainThread *m;
1964
1965   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1966   m->tso = tso;
1967   m->ret = ret;
1968   m->stat = NoStatus;
1969 #if defined(RTS_SUPPORTS_THREADS)
1970   initCondition(&m->wakeup);
1971 #endif
1972
1973   /* Put the thread on the main-threads list prior to scheduling the TSO.
1974      Failure to do so introduces a race condition in the MT case (as
1975      identified by Wolfgang Thaller), whereby the new task/OS thread 
1976      created by scheduleThread_() would complete prior to the thread
1977      that spawned it managed to put 'itself' on the main-threads list.
1978      The upshot of it all being that the worker thread wouldn't get to
1979      signal the completion of the its work item for the main thread to
1980      see (==> it got stuck waiting.)    -- sof 6/02.
1981   */
1982   ACQUIRE_LOCK(&sched_mutex);
1983   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
1984   
1985   m->link = main_threads;
1986   main_threads = m;
1987
1988   /* Inefficient (scheduleThread_() acquires it again right away),
1989    * but obviously correct.
1990    */
1991   RELEASE_LOCK(&sched_mutex);
1992
1993   scheduleThread_(tso, rtsTrue);
1994 #if defined(THREADED_RTS)
1995   return waitThread_(m, rtsTrue);
1996 #else
1997   return waitThread_(m);
1998 #endif
1999 }
2000
2001 /* ---------------------------------------------------------------------------
2002  * initScheduler()
2003  *
2004  * Initialise the scheduler.  This resets all the queues - if the
2005  * queues contained any threads, they'll be garbage collected at the
2006  * next pass.
2007  *
2008  * ------------------------------------------------------------------------ */
2009
2010 #ifdef SMP
2011 static void
2012 term_handler(int sig STG_UNUSED)
2013 {
2014   stat_workerStop();
2015   ACQUIRE_LOCK(&term_mutex);
2016   await_death--;
2017   RELEASE_LOCK(&term_mutex);
2018   shutdownThread();
2019 }
2020 #endif
2021
2022 void 
2023 initScheduler(void)
2024 {
2025 #if defined(GRAN)
2026   nat i;
2027
2028   for (i=0; i<=MAX_PROC; i++) {
2029     run_queue_hds[i]      = END_TSO_QUEUE;
2030     run_queue_tls[i]      = END_TSO_QUEUE;
2031     blocked_queue_hds[i]  = END_TSO_QUEUE;
2032     blocked_queue_tls[i]  = END_TSO_QUEUE;
2033     ccalling_threadss[i]  = END_TSO_QUEUE;
2034     sleeping_queue        = END_TSO_QUEUE;
2035   }
2036 #else
2037   run_queue_hd      = END_TSO_QUEUE;
2038   run_queue_tl      = END_TSO_QUEUE;
2039   blocked_queue_hd  = END_TSO_QUEUE;
2040   blocked_queue_tl  = END_TSO_QUEUE;
2041   sleeping_queue    = END_TSO_QUEUE;
2042 #endif 
2043
2044   suspended_ccalling_threads  = END_TSO_QUEUE;
2045
2046   main_threads = NULL;
2047   all_threads  = END_TSO_QUEUE;
2048
2049   context_switch = 0;
2050   interrupted    = 0;
2051
2052   RtsFlags.ConcFlags.ctxtSwitchTicks =
2053       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2054       
2055 #if defined(RTS_SUPPORTS_THREADS)
2056   /* Initialise the mutex and condition variables used by
2057    * the scheduler. */
2058   initMutex(&sched_mutex);
2059   initMutex(&term_mutex);
2060   initMutex(&thread_id_mutex);
2061
2062   initCondition(&thread_ready_cond);
2063 #endif
2064   
2065 #if defined(SMP)
2066   initCondition(&gc_pending_cond);
2067 #endif
2068
2069 #if defined(RTS_SUPPORTS_THREADS)
2070   ACQUIRE_LOCK(&sched_mutex);
2071 #endif
2072
2073   /* Install the SIGHUP handler */
2074 #if defined(SMP)
2075   {
2076     struct sigaction action,oact;
2077
2078     action.sa_handler = term_handler;
2079     sigemptyset(&action.sa_mask);
2080     action.sa_flags = 0;
2081     if (sigaction(SIGTERM, &action, &oact) != 0) {
2082       barf("can't install TERM handler");
2083     }
2084   }
2085 #endif
2086
2087   /* A capability holds the state a native thread needs in
2088    * order to execute STG code. At least one capability is
2089    * floating around (only SMP builds have more than one).
2090    */
2091   initCapabilities();
2092   
2093 #if defined(RTS_SUPPORTS_THREADS)
2094     /* start our haskell execution tasks */
2095 # if defined(SMP)
2096     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2097 # else
2098     startTaskManager(0,taskStart);
2099 # endif
2100 #endif
2101
2102 #if /* defined(SMP) ||*/ defined(PAR)
2103   initSparkPools();
2104 #endif
2105
2106 #if defined(RTS_SUPPORTS_THREADS)
2107   RELEASE_LOCK(&sched_mutex);
2108 #endif
2109
2110 }
2111
2112 void
2113 exitScheduler( void )
2114 {
2115 #if defined(RTS_SUPPORTS_THREADS)
2116   stopTaskManager();
2117 #endif
2118   shutting_down_scheduler = rtsTrue;
2119 }
2120
2121 /* -----------------------------------------------------------------------------
2122    Managing the per-task allocation areas.
2123    
2124    Each capability comes with an allocation area.  These are
2125    fixed-length block lists into which allocation can be done.
2126
2127    ToDo: no support for two-space collection at the moment???
2128    -------------------------------------------------------------------------- */
2129
2130 /* -----------------------------------------------------------------------------
2131  * waitThread is the external interface for running a new computation
2132  * and waiting for the result.
2133  *
2134  * In the non-SMP case, we create a new main thread, push it on the 
2135  * main-thread stack, and invoke the scheduler to run it.  The
2136  * scheduler will return when the top main thread on the stack has
2137  * completed or died, and fill in the necessary fields of the
2138  * main_thread structure.
2139  *
2140  * In the SMP case, we create a main thread as before, but we then
2141  * create a new condition variable and sleep on it.  When our new
2142  * main thread has completed, we'll be woken up and the status/result
2143  * will be in the main_thread struct.
2144  * -------------------------------------------------------------------------- */
2145
2146 int 
2147 howManyThreadsAvail ( void )
2148 {
2149    int i = 0;
2150    StgTSO* q;
2151    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2152       i++;
2153    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2154       i++;
2155    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2156       i++;
2157    return i;
2158 }
2159
2160 void
2161 finishAllThreads ( void )
2162 {
2163    do {
2164       while (run_queue_hd != END_TSO_QUEUE) {
2165          waitThread ( run_queue_hd, NULL);
2166       }
2167       while (blocked_queue_hd != END_TSO_QUEUE) {
2168          waitThread ( blocked_queue_hd, NULL);
2169       }
2170       while (sleeping_queue != END_TSO_QUEUE) {
2171          waitThread ( blocked_queue_hd, NULL);
2172       }
2173    } while 
2174       (blocked_queue_hd != END_TSO_QUEUE || 
2175        run_queue_hd     != END_TSO_QUEUE ||
2176        sleeping_queue   != END_TSO_QUEUE);
2177 }
2178
2179 SchedulerStatus
2180 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2181
2182   StgMainThread *m;
2183
2184   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2185   m->tso = tso;
2186   m->ret = ret;
2187   m->stat = NoStatus;
2188 #if defined(RTS_SUPPORTS_THREADS)
2189   initCondition(&m->wakeup);
2190 #endif
2191
2192   /* see scheduleWaitThread() comment */
2193   ACQUIRE_LOCK(&sched_mutex);
2194   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2195   m->link = main_threads;
2196   main_threads = m;
2197   RELEASE_LOCK(&sched_mutex);
2198
2199   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2200 #if defined(THREADED_RTS)
2201   return waitThread_(m, rtsFalse);
2202 #else
2203   return waitThread_(m);
2204 #endif
2205 }
2206
2207 static
2208 SchedulerStatus
2209 waitThread_(StgMainThread* m
2210 #if defined(THREADED_RTS)
2211             , rtsBool blockWaiting
2212 #endif
2213            )
2214 {
2215   SchedulerStatus stat;
2216
2217   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2218
2219 #if defined(RTS_SUPPORTS_THREADS)
2220
2221 # if defined(THREADED_RTS)
2222   if (!blockWaiting) {
2223     /* In the threaded case, the OS thread that called main()
2224      * gets to enter the RTS directly without going via another
2225      * task/thread.
2226      */
2227     schedule();
2228     ASSERT(m->stat != NoStatus);
2229   } else 
2230 # endif
2231   {
2232     ACQUIRE_LOCK(&sched_mutex);
2233     do {
2234       waitCondition(&m->wakeup, &sched_mutex);
2235     } while (m->stat == NoStatus);
2236   }
2237 #elif defined(GRAN)
2238   /* GranSim specific init */
2239   CurrentTSO = m->tso;                // the TSO to run
2240   procStatus[MainProc] = Busy;        // status of main PE
2241   CurrentProc = MainProc;             // PE to run it on
2242
2243   schedule();
2244 #else
2245   RELEASE_LOCK(&sched_mutex);
2246   schedule();
2247   ASSERT(m->stat != NoStatus);
2248 #endif
2249
2250   stat = m->stat;
2251
2252 #if defined(RTS_SUPPORTS_THREADS)
2253   closeCondition(&m->wakeup);
2254 #endif
2255
2256   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2257                               m->tso->id));
2258   free(m);
2259
2260 #if defined(THREADED_RTS)
2261   if (blockWaiting) 
2262 #endif
2263     RELEASE_LOCK(&sched_mutex);
2264
2265   return stat;
2266 }
2267
2268 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2269 //@subsection Run queue code 
2270
2271 #if 0
2272 /* 
2273    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2274        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2275        implicit global variable that has to be correct when calling these
2276        fcts -- HWL 
2277 */
2278
2279 /* Put the new thread on the head of the runnable queue.
2280  * The caller of createThread better push an appropriate closure
2281  * on this thread's stack before the scheduler is invoked.
2282  */
2283 static /* inline */ void
2284 add_to_run_queue(tso)
2285 StgTSO* tso; 
2286 {
2287   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2288   tso->link = run_queue_hd;
2289   run_queue_hd = tso;
2290   if (run_queue_tl == END_TSO_QUEUE) {
2291     run_queue_tl = tso;
2292   }
2293 }
2294
2295 /* Put the new thread at the end of the runnable queue. */
2296 static /* inline */ void
2297 push_on_run_queue(tso)
2298 StgTSO* tso; 
2299 {
2300   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2301   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2302   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2303   if (run_queue_hd == END_TSO_QUEUE) {
2304     run_queue_hd = tso;
2305   } else {
2306     run_queue_tl->link = tso;
2307   }
2308   run_queue_tl = tso;
2309 }
2310
2311 /* 
2312    Should be inlined because it's used very often in schedule.  The tso
2313    argument is actually only needed in GranSim, where we want to have the
2314    possibility to schedule *any* TSO on the run queue, irrespective of the
2315    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2316    the run queue and dequeue the tso, adjusting the links in the queue. 
2317 */
2318 //@cindex take_off_run_queue
2319 static /* inline */ StgTSO*
2320 take_off_run_queue(StgTSO *tso) {
2321   StgTSO *t, *prev;
2322
2323   /* 
2324      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2325
2326      if tso is specified, unlink that tso from the run_queue (doesn't have
2327      to be at the beginning of the queue); GranSim only 
2328   */
2329   if (tso!=END_TSO_QUEUE) {
2330     /* find tso in queue */
2331     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2332          t!=END_TSO_QUEUE && t!=tso;
2333          prev=t, t=t->link) 
2334       /* nothing */ ;
2335     ASSERT(t==tso);
2336     /* now actually dequeue the tso */
2337     if (prev!=END_TSO_QUEUE) {
2338       ASSERT(run_queue_hd!=t);
2339       prev->link = t->link;
2340     } else {
2341       /* t is at beginning of thread queue */
2342       ASSERT(run_queue_hd==t);
2343       run_queue_hd = t->link;
2344     }
2345     /* t is at end of thread queue */
2346     if (t->link==END_TSO_QUEUE) {
2347       ASSERT(t==run_queue_tl);
2348       run_queue_tl = prev;
2349     } else {
2350       ASSERT(run_queue_tl!=t);
2351     }
2352     t->link = END_TSO_QUEUE;
2353   } else {
2354     /* take tso from the beginning of the queue; std concurrent code */
2355     t = run_queue_hd;
2356     if (t != END_TSO_QUEUE) {
2357       run_queue_hd = t->link;
2358       t->link = END_TSO_QUEUE;
2359       if (run_queue_hd == END_TSO_QUEUE) {
2360         run_queue_tl = END_TSO_QUEUE;
2361       }
2362     }
2363   }
2364   return t;
2365 }
2366
2367 #endif /* 0 */
2368
2369 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2370 //@subsection Garbage Collextion Routines
2371
2372 /* ---------------------------------------------------------------------------
2373    Where are the roots that we know about?
2374
2375         - all the threads on the runnable queue
2376         - all the threads on the blocked queue
2377         - all the threads on the sleeping queue
2378         - all the thread currently executing a _ccall_GC
2379         - all the "main threads"
2380      
2381    ------------------------------------------------------------------------ */
2382
2383 /* This has to be protected either by the scheduler monitor, or by the
2384         garbage collection monitor (probably the latter).
2385         KH @ 25/10/99
2386 */
2387
2388 void
2389 GetRoots(evac_fn evac)
2390 {
2391 #if defined(GRAN)
2392   {
2393     nat i;
2394     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2395       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2396           evac((StgClosure **)&run_queue_hds[i]);
2397       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2398           evac((StgClosure **)&run_queue_tls[i]);
2399       
2400       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2401           evac((StgClosure **)&blocked_queue_hds[i]);
2402       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2403           evac((StgClosure **)&blocked_queue_tls[i]);
2404       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2405           evac((StgClosure **)&ccalling_threads[i]);
2406     }
2407   }
2408
2409   markEventQueue();
2410
2411 #else /* !GRAN */
2412   if (run_queue_hd != END_TSO_QUEUE) {
2413       ASSERT(run_queue_tl != END_TSO_QUEUE);
2414       evac((StgClosure **)&run_queue_hd);
2415       evac((StgClosure **)&run_queue_tl);
2416   }
2417   
2418   if (blocked_queue_hd != END_TSO_QUEUE) {
2419       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2420       evac((StgClosure **)&blocked_queue_hd);
2421       evac((StgClosure **)&blocked_queue_tl);
2422   }
2423   
2424   if (sleeping_queue != END_TSO_QUEUE) {
2425       evac((StgClosure **)&sleeping_queue);
2426   }
2427 #endif 
2428
2429   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2430       evac((StgClosure **)&suspended_ccalling_threads);
2431   }
2432
2433 #if defined(PAR) || defined(GRAN)
2434   markSparkQueue(evac);
2435 #endif
2436 }
2437
2438 /* -----------------------------------------------------------------------------
2439    performGC
2440
2441    This is the interface to the garbage collector from Haskell land.
2442    We provide this so that external C code can allocate and garbage
2443    collect when called from Haskell via _ccall_GC.
2444
2445    It might be useful to provide an interface whereby the programmer
2446    can specify more roots (ToDo).
2447    
2448    This needs to be protected by the GC condition variable above.  KH.
2449    -------------------------------------------------------------------------- */
2450
2451 void (*extra_roots)(evac_fn);
2452
2453 void
2454 performGC(void)
2455 {
2456   /* Obligated to hold this lock upon entry */
2457   ACQUIRE_LOCK(&sched_mutex);
2458   GarbageCollect(GetRoots,rtsFalse);
2459   RELEASE_LOCK(&sched_mutex);
2460 }
2461
2462 void
2463 performMajorGC(void)
2464 {
2465   ACQUIRE_LOCK(&sched_mutex);
2466   GarbageCollect(GetRoots,rtsTrue);
2467   RELEASE_LOCK(&sched_mutex);
2468 }
2469
2470 static void
2471 AllRoots(evac_fn evac)
2472 {
2473     GetRoots(evac);             // the scheduler's roots
2474     extra_roots(evac);          // the user's roots
2475 }
2476
2477 void
2478 performGCWithRoots(void (*get_roots)(evac_fn))
2479 {
2480   ACQUIRE_LOCK(&sched_mutex);
2481   extra_roots = get_roots;
2482   GarbageCollect(AllRoots,rtsFalse);
2483   RELEASE_LOCK(&sched_mutex);
2484 }
2485
2486 /* -----------------------------------------------------------------------------
2487    Stack overflow
2488
2489    If the thread has reached its maximum stack size, then raise the
2490    StackOverflow exception in the offending thread.  Otherwise
2491    relocate the TSO into a larger chunk of memory and adjust its stack
2492    size appropriately.
2493    -------------------------------------------------------------------------- */
2494
2495 static StgTSO *
2496 threadStackOverflow(StgTSO *tso)
2497 {
2498   nat new_stack_size, new_tso_size, diff, stack_words;
2499   StgPtr new_sp;
2500   StgTSO *dest;
2501
2502   IF_DEBUG(sanity,checkTSO(tso));
2503   if (tso->stack_size >= tso->max_stack_size) {
2504
2505     IF_DEBUG(gc,
2506              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2507                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2508              /* If we're debugging, just print out the top of the stack */
2509              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2510                                               tso->sp+64)));
2511
2512     /* Send this thread the StackOverflow exception */
2513     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2514     return tso;
2515   }
2516
2517   /* Try to double the current stack size.  If that takes us over the
2518    * maximum stack size for this thread, then use the maximum instead.
2519    * Finally round up so the TSO ends up as a whole number of blocks.
2520    */
2521   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2522   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2523                                        TSO_STRUCT_SIZE)/sizeof(W_);
2524   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2525   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2526
2527   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2528
2529   dest = (StgTSO *)allocate(new_tso_size);
2530   TICK_ALLOC_TSO(new_stack_size,0);
2531
2532   /* copy the TSO block and the old stack into the new area */
2533   memcpy(dest,tso,TSO_STRUCT_SIZE);
2534   stack_words = tso->stack + tso->stack_size - tso->sp;
2535   new_sp = (P_)dest + new_tso_size - stack_words;
2536   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2537
2538   /* relocate the stack pointers... */
2539   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2540   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2541   dest->sp    = new_sp;
2542   dest->stack_size = new_stack_size;
2543         
2544   /* and relocate the update frame list */
2545   relocate_stack(dest, diff);
2546
2547   /* Mark the old TSO as relocated.  We have to check for relocated
2548    * TSOs in the garbage collector and any primops that deal with TSOs.
2549    *
2550    * It's important to set the sp and su values to just beyond the end
2551    * of the stack, so we don't attempt to scavenge any part of the
2552    * dead TSO's stack.
2553    */
2554   tso->what_next = ThreadRelocated;
2555   tso->link = dest;
2556   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2557   tso->su = (StgUpdateFrame *)tso->sp;
2558   tso->why_blocked = NotBlocked;
2559   dest->mut_link = NULL;
2560
2561   IF_PAR_DEBUG(verbose,
2562                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2563                      tso->id, tso, tso->stack_size);
2564                /* If we're debugging, just print out the top of the stack */
2565                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2566                                                 tso->sp+64)));
2567   
2568   IF_DEBUG(sanity,checkTSO(tso));
2569 #if 0
2570   IF_DEBUG(scheduler,printTSO(dest));
2571 #endif
2572
2573   return dest;
2574 }
2575
2576 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2577 //@subsection Blocking Queue Routines
2578
2579 /* ---------------------------------------------------------------------------
2580    Wake up a queue that was blocked on some resource.
2581    ------------------------------------------------------------------------ */
2582
2583 #if defined(GRAN)
2584 static inline void
2585 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2586 {
2587 }
2588 #elif defined(PAR)
2589 static inline void
2590 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2591 {
2592   /* write RESUME events to log file and
2593      update blocked and fetch time (depending on type of the orig closure) */
2594   if (RtsFlags.ParFlags.ParStats.Full) {
2595     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2596                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2597                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2598     if (EMPTY_RUN_QUEUE())
2599       emitSchedule = rtsTrue;
2600
2601     switch (get_itbl(node)->type) {
2602         case FETCH_ME_BQ:
2603           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2604           break;
2605         case RBH:
2606         case FETCH_ME:
2607         case BLACKHOLE_BQ:
2608           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2609           break;
2610 #ifdef DIST
2611         case MVAR:
2612           break;
2613 #endif    
2614         default:
2615           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2616         }
2617       }
2618 }
2619 #endif
2620
2621 #if defined(GRAN)
2622 static StgBlockingQueueElement *
2623 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2624 {
2625     StgTSO *tso;
2626     PEs node_loc, tso_loc;
2627
2628     node_loc = where_is(node); // should be lifted out of loop
2629     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2630     tso_loc = where_is((StgClosure *)tso);
2631     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2632       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2633       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2634       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2635       // insertThread(tso, node_loc);
2636       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2637                 ResumeThread,
2638                 tso, node, (rtsSpark*)NULL);
2639       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2640       // len_local++;
2641       // len++;
2642     } else { // TSO is remote (actually should be FMBQ)
2643       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2644                                   RtsFlags.GranFlags.Costs.gunblocktime +
2645                                   RtsFlags.GranFlags.Costs.latency;
2646       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2647                 UnblockThread,
2648                 tso, node, (rtsSpark*)NULL);
2649       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2650       // len++;
2651     }
2652     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2653     IF_GRAN_DEBUG(bq,
2654                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2655                           (node_loc==tso_loc ? "Local" : "Global"), 
2656                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2657     tso->block_info.closure = NULL;
2658     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2659                              tso->id, tso));
2660 }
2661 #elif defined(PAR)
2662 static StgBlockingQueueElement *
2663 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2664 {
2665     StgBlockingQueueElement *next;
2666
2667     switch (get_itbl(bqe)->type) {
2668     case TSO:
2669       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2670       /* if it's a TSO just push it onto the run_queue */
2671       next = bqe->link;
2672       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2673       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2674       THREAD_RUNNABLE();
2675       unblockCount(bqe, node);
2676       /* reset blocking status after dumping event */
2677       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2678       break;
2679
2680     case BLOCKED_FETCH:
2681       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2682       next = bqe->link;
2683       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2684       PendingFetches = (StgBlockedFetch *)bqe;
2685       break;
2686
2687 # if defined(DEBUG)
2688       /* can ignore this case in a non-debugging setup; 
2689          see comments on RBHSave closures above */
2690     case CONSTR:
2691       /* check that the closure is an RBHSave closure */
2692       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2693              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2694              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2695       break;
2696
2697     default:
2698       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2699            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2700            (StgClosure *)bqe);
2701 # endif
2702     }
2703   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2704   return next;
2705 }
2706
2707 #else /* !GRAN && !PAR */
2708 static StgTSO *
2709 unblockOneLocked(StgTSO *tso)
2710 {
2711   StgTSO *next;
2712
2713   ASSERT(get_itbl(tso)->type == TSO);
2714   ASSERT(tso->why_blocked != NotBlocked);
2715   tso->why_blocked = NotBlocked;
2716   next = tso->link;
2717   PUSH_ON_RUN_QUEUE(tso);
2718   THREAD_RUNNABLE();
2719   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2720   return next;
2721 }
2722 #endif
2723
2724 #if defined(GRAN) || defined(PAR)
2725 inline StgBlockingQueueElement *
2726 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2727 {
2728   ACQUIRE_LOCK(&sched_mutex);
2729   bqe = unblockOneLocked(bqe, node);
2730   RELEASE_LOCK(&sched_mutex);
2731   return bqe;
2732 }
2733 #else
2734 inline StgTSO *
2735 unblockOne(StgTSO *tso)
2736 {
2737   ACQUIRE_LOCK(&sched_mutex);
2738   tso = unblockOneLocked(tso);
2739   RELEASE_LOCK(&sched_mutex);
2740   return tso;
2741 }
2742 #endif
2743
2744 #if defined(GRAN)
2745 void 
2746 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2747 {
2748   StgBlockingQueueElement *bqe;
2749   PEs node_loc;
2750   nat len = 0; 
2751
2752   IF_GRAN_DEBUG(bq, 
2753                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2754                       node, CurrentProc, CurrentTime[CurrentProc], 
2755                       CurrentTSO->id, CurrentTSO));
2756
2757   node_loc = where_is(node);
2758
2759   ASSERT(q == END_BQ_QUEUE ||
2760          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2761          get_itbl(q)->type == CONSTR); // closure (type constructor)
2762   ASSERT(is_unique(node));
2763
2764   /* FAKE FETCH: magically copy the node to the tso's proc;
2765      no Fetch necessary because in reality the node should not have been 
2766      moved to the other PE in the first place
2767   */
2768   if (CurrentProc!=node_loc) {
2769     IF_GRAN_DEBUG(bq, 
2770                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2771                         node, node_loc, CurrentProc, CurrentTSO->id, 
2772                         // CurrentTSO, where_is(CurrentTSO),
2773                         node->header.gran.procs));
2774     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2775     IF_GRAN_DEBUG(bq, 
2776                   belch("## new bitmask of node %p is %#x",
2777                         node, node->header.gran.procs));
2778     if (RtsFlags.GranFlags.GranSimStats.Global) {
2779       globalGranStats.tot_fake_fetches++;
2780     }
2781   }
2782
2783   bqe = q;
2784   // ToDo: check: ASSERT(CurrentProc==node_loc);
2785   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2786     //next = bqe->link;
2787     /* 
2788        bqe points to the current element in the queue
2789        next points to the next element in the queue
2790     */
2791     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2792     //tso_loc = where_is(tso);
2793     len++;
2794     bqe = unblockOneLocked(bqe, node);
2795   }
2796
2797   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2798      the closure to make room for the anchor of the BQ */
2799   if (bqe!=END_BQ_QUEUE) {
2800     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2801     /*
2802     ASSERT((info_ptr==&RBH_Save_0_info) ||
2803            (info_ptr==&RBH_Save_1_info) ||
2804            (info_ptr==&RBH_Save_2_info));
2805     */
2806     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2807     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2808     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2809
2810     IF_GRAN_DEBUG(bq,
2811                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2812                         node, info_type(node)));
2813   }
2814
2815   /* statistics gathering */
2816   if (RtsFlags.GranFlags.GranSimStats.Global) {
2817     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2818     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2819     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2820     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2821   }
2822   IF_GRAN_DEBUG(bq,
2823                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2824                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2825 }
2826 #elif defined(PAR)
2827 void 
2828 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2829 {
2830   StgBlockingQueueElement *bqe;
2831
2832   ACQUIRE_LOCK(&sched_mutex);
2833
2834   IF_PAR_DEBUG(verbose, 
2835                belch("##-_ AwBQ for node %p on [%x]: ",
2836                      node, mytid));
2837 #ifdef DIST  
2838   //RFP
2839   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2840     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2841     return;
2842   }
2843 #endif
2844   
2845   ASSERT(q == END_BQ_QUEUE ||
2846          get_itbl(q)->type == TSO ||           
2847          get_itbl(q)->type == BLOCKED_FETCH || 
2848          get_itbl(q)->type == CONSTR); 
2849
2850   bqe = q;
2851   while (get_itbl(bqe)->type==TSO || 
2852          get_itbl(bqe)->type==BLOCKED_FETCH) {
2853     bqe = unblockOneLocked(bqe, node);
2854   }
2855   RELEASE_LOCK(&sched_mutex);
2856 }
2857
2858 #else   /* !GRAN && !PAR */
2859 void
2860 awakenBlockedQueue(StgTSO *tso)
2861 {
2862   ACQUIRE_LOCK(&sched_mutex);
2863   while (tso != END_TSO_QUEUE) {
2864     tso = unblockOneLocked(tso);
2865   }
2866   RELEASE_LOCK(&sched_mutex);
2867 }
2868 #endif
2869
2870 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2871 //@subsection Exception Handling Routines
2872
2873 /* ---------------------------------------------------------------------------
2874    Interrupt execution
2875    - usually called inside a signal handler so it mustn't do anything fancy.   
2876    ------------------------------------------------------------------------ */
2877
2878 void
2879 interruptStgRts(void)
2880 {
2881     interrupted    = 1;
2882     context_switch = 1;
2883 }
2884
2885 /* -----------------------------------------------------------------------------
2886    Unblock a thread
2887
2888    This is for use when we raise an exception in another thread, which
2889    may be blocked.
2890    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2891    -------------------------------------------------------------------------- */
2892
2893 #if defined(GRAN) || defined(PAR)
2894 /*
2895   NB: only the type of the blocking queue is different in GranSim and GUM
2896       the operations on the queue-elements are the same
2897       long live polymorphism!
2898
2899   Locks: sched_mutex is held upon entry and exit.
2900
2901 */
2902 static void
2903 unblockThread(StgTSO *tso)
2904 {
2905   StgBlockingQueueElement *t, **last;
2906
2907   switch (tso->why_blocked) {
2908
2909   case NotBlocked:
2910     return;  /* not blocked */
2911
2912   case BlockedOnMVar:
2913     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2914     {
2915       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2916       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2917
2918       last = (StgBlockingQueueElement **)&mvar->head;
2919       for (t = (StgBlockingQueueElement *)mvar->head; 
2920            t != END_BQ_QUEUE; 
2921            last = &t->link, last_tso = t, t = t->link) {
2922         if (t == (StgBlockingQueueElement *)tso) {
2923           *last = (StgBlockingQueueElement *)tso->link;
2924           if (mvar->tail == tso) {
2925             mvar->tail = (StgTSO *)last_tso;
2926           }
2927           goto done;
2928         }
2929       }
2930       barf("unblockThread (MVAR): TSO not found");
2931     }
2932
2933   case BlockedOnBlackHole:
2934     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2935     {
2936       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2937
2938       last = &bq->blocking_queue;
2939       for (t = bq->blocking_queue; 
2940            t != END_BQ_QUEUE; 
2941            last = &t->link, t = t->link) {
2942         if (t == (StgBlockingQueueElement *)tso) {
2943           *last = (StgBlockingQueueElement *)tso->link;
2944           goto done;
2945         }
2946       }
2947       barf("unblockThread (BLACKHOLE): TSO not found");
2948     }
2949
2950   case BlockedOnException:
2951     {
2952       StgTSO *target  = tso->block_info.tso;
2953
2954       ASSERT(get_itbl(target)->type == TSO);
2955
2956       if (target->what_next == ThreadRelocated) {
2957           target = target->link;
2958           ASSERT(get_itbl(target)->type == TSO);
2959       }
2960
2961       ASSERT(target->blocked_exceptions != NULL);
2962
2963       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2964       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2965            t != END_BQ_QUEUE; 
2966            last = &t->link, t = t->link) {
2967         ASSERT(get_itbl(t)->type == TSO);
2968         if (t == (StgBlockingQueueElement *)tso) {
2969           *last = (StgBlockingQueueElement *)tso->link;
2970           goto done;
2971         }
2972       }
2973       barf("unblockThread (Exception): TSO not found");
2974     }
2975
2976   case BlockedOnRead:
2977   case BlockedOnWrite:
2978     {
2979       /* take TSO off blocked_queue */
2980       StgBlockingQueueElement *prev = NULL;
2981       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2982            prev = t, t = t->link) {
2983         if (t == (StgBlockingQueueElement *)tso) {
2984           if (prev == NULL) {
2985             blocked_queue_hd = (StgTSO *)t->link;
2986             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2987               blocked_queue_tl = END_TSO_QUEUE;
2988             }
2989           } else {
2990             prev->link = t->link;
2991             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2992               blocked_queue_tl = (StgTSO *)prev;
2993             }
2994           }
2995           goto done;
2996         }
2997       }
2998       barf("unblockThread (I/O): TSO not found");
2999     }
3000
3001   case BlockedOnDelay:
3002     {
3003       /* take TSO off sleeping_queue */
3004       StgBlockingQueueElement *prev = NULL;
3005       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3006            prev = t, t = t->link) {
3007         if (t == (StgBlockingQueueElement *)tso) {
3008           if (prev == NULL) {
3009             sleeping_queue = (StgTSO *)t->link;
3010           } else {
3011             prev->link = t->link;
3012           }
3013           goto done;
3014         }
3015       }
3016       barf("unblockThread (I/O): TSO not found");
3017     }
3018
3019   default:
3020     barf("unblockThread");
3021   }
3022
3023  done:
3024   tso->link = END_TSO_QUEUE;
3025   tso->why_blocked = NotBlocked;
3026   tso->block_info.closure = NULL;
3027   PUSH_ON_RUN_QUEUE(tso);
3028 }
3029 #else
3030 static void
3031 unblockThread(StgTSO *tso)
3032 {
3033   StgTSO *t, **last;
3034   
3035   /* To avoid locking unnecessarily. */
3036   if (tso->why_blocked == NotBlocked) {
3037     return;
3038   }
3039
3040   switch (tso->why_blocked) {
3041
3042   case BlockedOnMVar:
3043     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3044     {
3045       StgTSO *last_tso = END_TSO_QUEUE;
3046       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3047
3048       last = &mvar->head;
3049       for (t = mvar->head; t != END_TSO_QUEUE; 
3050            last = &t->link, last_tso = t, t = t->link) {
3051         if (t == tso) {
3052           *last = tso->link;
3053           if (mvar->tail == tso) {
3054             mvar->tail = last_tso;
3055           }
3056           goto done;
3057         }
3058       }
3059       barf("unblockThread (MVAR): TSO not found");
3060     }
3061
3062   case BlockedOnBlackHole:
3063     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3064     {
3065       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3066
3067       last = &bq->blocking_queue;
3068       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3069            last = &t->link, t = t->link) {
3070         if (t == tso) {
3071           *last = tso->link;
3072           goto done;
3073         }
3074       }
3075       barf("unblockThread (BLACKHOLE): TSO not found");
3076     }
3077
3078   case BlockedOnException:
3079     {
3080       StgTSO *target  = tso->block_info.tso;
3081
3082       ASSERT(get_itbl(target)->type == TSO);
3083
3084       while (target->what_next == ThreadRelocated) {
3085           target = target->link;
3086           ASSERT(get_itbl(target)->type == TSO);
3087       }
3088       
3089       ASSERT(target->blocked_exceptions != NULL);
3090
3091       last = &target->blocked_exceptions;
3092       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3093            last = &t->link, t = t->link) {
3094         ASSERT(get_itbl(t)->type == TSO);
3095         if (t == tso) {
3096           *last = tso->link;
3097           goto done;
3098         }
3099       }
3100       barf("unblockThread (Exception): TSO not found");
3101     }
3102
3103   case BlockedOnRead:
3104   case BlockedOnWrite:
3105     {
3106       StgTSO *prev = NULL;
3107       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3108            prev = t, t = t->link) {
3109         if (t == tso) {
3110           if (prev == NULL) {
3111             blocked_queue_hd = t->link;
3112             if (blocked_queue_tl == t) {
3113               blocked_queue_tl = END_TSO_QUEUE;
3114             }
3115           } else {
3116             prev->link = t->link;
3117             if (blocked_queue_tl == t) {
3118               blocked_queue_tl = prev;
3119             }
3120           }
3121           goto done;
3122         }
3123       }
3124       barf("unblockThread (I/O): TSO not found");
3125     }
3126
3127   case BlockedOnDelay:
3128     {
3129       StgTSO *prev = NULL;
3130       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3131            prev = t, t = t->link) {
3132         if (t == tso) {
3133           if (prev == NULL) {
3134             sleeping_queue = t->link;
3135           } else {
3136             prev->link = t->link;
3137           }
3138           goto done;
3139         }
3140       }
3141       barf("unblockThread (I/O): TSO not found");
3142     }
3143
3144   default:
3145     barf("unblockThread");
3146   }
3147
3148  done:
3149   tso->link = END_TSO_QUEUE;
3150   tso->why_blocked = NotBlocked;
3151   tso->block_info.closure = NULL;
3152   PUSH_ON_RUN_QUEUE(tso);
3153 }
3154 #endif
3155
3156 /* -----------------------------------------------------------------------------
3157  * raiseAsync()
3158  *
3159  * The following function implements the magic for raising an
3160  * asynchronous exception in an existing thread.
3161  *
3162  * We first remove the thread from any queue on which it might be
3163  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3164  *
3165  * We strip the stack down to the innermost CATCH_FRAME, building
3166  * thunks in the heap for all the active computations, so they can 
3167  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3168  * an application of the handler to the exception, and push it on
3169  * the top of the stack.
3170  * 
3171  * How exactly do we save all the active computations?  We create an
3172  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3173  * AP_UPDs pushes everything from the corresponding update frame
3174  * upwards onto the stack.  (Actually, it pushes everything up to the
3175  * next update frame plus a pointer to the next AP_UPD object.
3176  * Entering the next AP_UPD object pushes more onto the stack until we
3177  * reach the last AP_UPD object - at which point the stack should look
3178  * exactly as it did when we killed the TSO and we can continue
3179  * execution by entering the closure on top of the stack.
3180  *
3181  * We can also kill a thread entirely - this happens if either (a) the 
3182  * exception passed to raiseAsync is NULL, or (b) there's no
3183  * CATCH_FRAME on the stack.  In either case, we strip the entire
3184  * stack and replace the thread with a zombie.
3185  *
3186  * Locks: sched_mutex held upon entry nor exit.
3187  *
3188  * -------------------------------------------------------------------------- */
3189  
3190 void 
3191 deleteThread(StgTSO *tso)
3192 {
3193   raiseAsync(tso,NULL);
3194 }
3195
3196 void
3197 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3198 {
3199   /* When raising async exs from contexts where sched_mutex isn't held;
3200      use raiseAsyncWithLock(). */
3201   ACQUIRE_LOCK(&sched_mutex);
3202   raiseAsync(tso,exception);
3203   RELEASE_LOCK(&sched_mutex);
3204 }
3205
3206 void
3207 raiseAsync(StgTSO *tso, StgClosure *exception)
3208 {
3209   StgUpdateFrame* su = tso->su;
3210   StgPtr          sp = tso->sp;
3211   
3212   /* Thread already dead? */
3213   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3214     return;
3215   }
3216
3217   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3218
3219   /* Remove it from any blocking queues */
3220   unblockThread(tso);
3221
3222   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3223   /* The stack freezing code assumes there's a closure pointer on
3224    * the top of the stack.  This isn't always the case with compiled
3225    * code, so we have to push a dummy closure on the top which just
3226    * returns to the next return address on the stack.
3227    */
3228   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3229     *(--sp) = (W_)&stg_dummy_ret_closure;
3230   }
3231
3232   while (1) {
3233     nat words = ((P_)su - (P_)sp) - 1;
3234     nat i;
3235     StgAP_UPD * ap;
3236
3237     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3238      * then build the THUNK raise(exception), and leave it on
3239      * top of the CATCH_FRAME ready to enter.
3240      */
3241     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3242 #ifdef PROFILING
3243       StgCatchFrame *cf = (StgCatchFrame *)su;
3244 #endif
3245       StgClosure *raise;
3246
3247       /* we've got an exception to raise, so let's pass it to the
3248        * handler in this frame.
3249        */
3250       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3251       TICK_ALLOC_SE_THK(1,0);
3252       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3253       raise->payload[0] = exception;
3254
3255       /* throw away the stack from Sp up to the CATCH_FRAME.
3256        */
3257       sp = (P_)su - 1;
3258
3259       /* Ensure that async excpetions are blocked now, so we don't get
3260        * a surprise exception before we get around to executing the
3261        * handler.
3262        */
3263       if (tso->blocked_exceptions == NULL) {
3264           tso->blocked_exceptions = END_TSO_QUEUE;
3265       }
3266
3267       /* Put the newly-built THUNK on top of the stack, ready to execute
3268        * when the thread restarts.
3269        */
3270       sp[0] = (W_)raise;
3271       tso->sp = sp;
3272       tso->su = su;
3273       tso->what_next = ThreadEnterGHC;
3274       IF_DEBUG(sanity, checkTSO(tso));
3275       return;
3276     }
3277
3278     /* First build an AP_UPD consisting of the stack chunk above the
3279      * current update frame, with the top word on the stack as the
3280      * fun field.
3281      */
3282     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3283     
3284     ASSERT(words >= 0);
3285     
3286     ap->n_args = words;
3287     ap->fun    = (StgClosure *)sp[0];
3288     sp++;
3289     for(i=0; i < (nat)words; ++i) {
3290       ap->payload[i] = (StgClosure *)*sp++;
3291     }
3292     
3293     switch (get_itbl(su)->type) {
3294       
3295     case UPDATE_FRAME:
3296       {
3297         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3298         TICK_ALLOC_UP_THK(words+1,0);
3299         
3300         IF_DEBUG(scheduler,
3301                  fprintf(stderr,  "scheduler: Updating ");
3302                  printPtr((P_)su->updatee); 
3303                  fprintf(stderr,  " with ");
3304                  printObj((StgClosure *)ap);
3305                  );
3306         
3307         /* Replace the updatee with an indirection - happily
3308          * this will also wake up any threads currently
3309          * waiting on the result.
3310          *
3311          * Warning: if we're in a loop, more than one update frame on
3312          * the stack may point to the same object.  Be careful not to
3313          * overwrite an IND_OLDGEN in this case, because we'll screw
3314          * up the mutable lists.  To be on the safe side, don't
3315          * overwrite any kind of indirection at all.  See also
3316          * threadSqueezeStack in GC.c, where we have to make a similar
3317          * check.
3318          */
3319         if (!closure_IND(su->updatee)) {
3320             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3321         }
3322         su = su->link;
3323         sp += sizeofW(StgUpdateFrame) -1;
3324         sp[0] = (W_)ap; /* push onto stack */
3325         break;
3326       }
3327
3328     case CATCH_FRAME:
3329       {
3330         StgCatchFrame *cf = (StgCatchFrame *)su;
3331         StgClosure* o;
3332         
3333         /* We want a PAP, not an AP_UPD.  Fortunately, the
3334          * layout's the same.
3335          */
3336         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3337         TICK_ALLOC_UPD_PAP(words+1,0);
3338         
3339         /* now build o = FUN(catch,ap,handler) */
3340         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3341         TICK_ALLOC_FUN(2,0);
3342         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3343         o->payload[0] = (StgClosure *)ap;
3344         o->payload[1] = cf->handler;
3345         
3346         IF_DEBUG(scheduler,
3347                  fprintf(stderr,  "scheduler: Built ");
3348                  printObj((StgClosure *)o);
3349                  );
3350         
3351         /* pop the old handler and put o on the stack */
3352         su = cf->link;
3353         sp += sizeofW(StgCatchFrame) - 1;
3354         sp[0] = (W_)o;
3355         break;
3356       }
3357       
3358     case SEQ_FRAME:
3359       {
3360         StgSeqFrame *sf = (StgSeqFrame *)su;
3361         StgClosure* o;
3362         
3363         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3364         TICK_ALLOC_UPD_PAP(words+1,0);
3365         
3366         /* now build o = FUN(seq,ap) */
3367         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3368         TICK_ALLOC_SE_THK(1,0);
3369         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3370         o->payload[0] = (StgClosure *)ap;
3371         
3372         IF_DEBUG(scheduler,
3373                  fprintf(stderr,  "scheduler: Built ");
3374                  printObj((StgClosure *)o);
3375                  );
3376         
3377         /* pop the old handler and put o on the stack */
3378         su = sf->link;
3379         sp += sizeofW(StgSeqFrame) - 1;
3380         sp[0] = (W_)o;
3381         break;
3382       }
3383       
3384     case STOP_FRAME:
3385       /* We've stripped the entire stack, the thread is now dead. */
3386       sp += sizeofW(StgStopFrame) - 1;
3387       sp[0] = (W_)exception;    /* save the exception */
3388       tso->what_next = ThreadKilled;
3389       tso->su = (StgUpdateFrame *)(sp+1);
3390       tso->sp = sp;
3391       return;
3392
3393     default:
3394       barf("raiseAsync");
3395     }
3396   }
3397   barf("raiseAsync");
3398 }
3399
3400 /* -----------------------------------------------------------------------------
3401    resurrectThreads is called after garbage collection on the list of
3402    threads found to be garbage.  Each of these threads will be woken
3403    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3404    on an MVar, or NonTermination if the thread was blocked on a Black
3405    Hole.
3406
3407    Locks: sched_mutex isn't held upon entry nor exit.
3408    -------------------------------------------------------------------------- */
3409
3410 void
3411 resurrectThreads( StgTSO *threads )
3412 {
3413   StgTSO *tso, *next;
3414
3415   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3416     next = tso->global_link;
3417     tso->global_link = all_threads;
3418     all_threads = tso;
3419     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3420
3421     switch (tso->why_blocked) {
3422     case BlockedOnMVar:
3423     case BlockedOnException:
3424       /* Called by GC - sched_mutex lock is currently held. */
3425       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3426       break;
3427     case BlockedOnBlackHole:
3428       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3429       break;
3430     case NotBlocked:
3431       /* This might happen if the thread was blocked on a black hole
3432        * belonging to a thread that we've just woken up (raiseAsync
3433        * can wake up threads, remember...).
3434        */
3435       continue;
3436     default:
3437       barf("resurrectThreads: thread blocked in a strange way");
3438     }
3439   }
3440 }
3441
3442 /* -----------------------------------------------------------------------------
3443  * Blackhole detection: if we reach a deadlock, test whether any
3444  * threads are blocked on themselves.  Any threads which are found to
3445  * be self-blocked get sent a NonTermination exception.
3446  *
3447  * This is only done in a deadlock situation in order to avoid
3448  * performance overhead in the normal case.
3449  *
3450  * Locks: sched_mutex is held upon entry and exit.
3451  * -------------------------------------------------------------------------- */
3452
3453 static void
3454 detectBlackHoles( void )
3455 {
3456     StgTSO *t = all_threads;
3457     StgUpdateFrame *frame;
3458     StgClosure *blocked_on;
3459
3460     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3461
3462         while (t->what_next == ThreadRelocated) {
3463             t = t->link;
3464             ASSERT(get_itbl(t)->type == TSO);
3465         }
3466       
3467         if (t->why_blocked != BlockedOnBlackHole) {
3468             continue;
3469         }
3470
3471         blocked_on = t->block_info.closure;
3472
3473         for (frame = t->su; ; frame = frame->link) {
3474             switch (get_itbl(frame)->type) {
3475
3476             case UPDATE_FRAME:
3477                 if (frame->updatee == blocked_on) {
3478                     /* We are blocking on one of our own computations, so
3479                      * send this thread the NonTermination exception.  
3480                      */
3481                     IF_DEBUG(scheduler, 
3482                              sched_belch("thread %d is blocked on itself", t->id));
3483                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3484                     goto done;
3485                 }
3486                 else {
3487                     continue;
3488                 }
3489
3490             case CATCH_FRAME:
3491             case SEQ_FRAME:
3492                 continue;
3493                 
3494             case STOP_FRAME:
3495                 break;
3496             }
3497             break;
3498         }
3499
3500     done: ;
3501     }   
3502 }
3503
3504 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3505 //@subsection Debugging Routines
3506
3507 /* -----------------------------------------------------------------------------
3508    Debugging: why is a thread blocked
3509    -------------------------------------------------------------------------- */
3510
3511 #ifdef DEBUG
3512
3513 void
3514 printThreadBlockage(StgTSO *tso)
3515 {
3516   switch (tso->why_blocked) {
3517   case BlockedOnRead:
3518     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3519     break;
3520   case BlockedOnWrite:
3521     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3522     break;
3523   case BlockedOnDelay:
3524     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3525     break;
3526   case BlockedOnMVar:
3527     fprintf(stderr,"is blocked on an MVar");
3528     break;
3529   case BlockedOnException:
3530     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3531             tso->block_info.tso->id);
3532     break;
3533   case BlockedOnBlackHole:
3534     fprintf(stderr,"is blocked on a black hole");
3535     break;
3536   case NotBlocked:
3537     fprintf(stderr,"is not blocked");
3538     break;
3539 #if defined(PAR)
3540   case BlockedOnGA:
3541     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3542             tso->block_info.closure, info_type(tso->block_info.closure));
3543     break;
3544   case BlockedOnGA_NoSend:
3545     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3546             tso->block_info.closure, info_type(tso->block_info.closure));
3547     break;
3548 #endif
3549 #if defined(RTS_SUPPORTS_THREADS)
3550   case BlockedOnCCall:
3551     fprintf(stderr,"is blocked on an external call");
3552     break;
3553 #endif
3554   default:
3555     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3556          tso->why_blocked, tso->id, tso);
3557   }
3558 }
3559
3560 void
3561 printThreadStatus(StgTSO *tso)
3562 {
3563   switch (tso->what_next) {
3564   case ThreadKilled:
3565     fprintf(stderr,"has been killed");
3566     break;
3567   case ThreadComplete:
3568     fprintf(stderr,"has completed");
3569     break;
3570   default:
3571     printThreadBlockage(tso);
3572   }
3573 }
3574
3575 void
3576 printAllThreads(void)
3577 {
3578   StgTSO *t;
3579   void *label;
3580
3581 # if defined(GRAN)
3582   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3583   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3584                        time_string, rtsFalse/*no commas!*/);
3585
3586   sched_belch("all threads at [%s]:", time_string);
3587 # elif defined(PAR)
3588   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3589   ullong_format_string(CURRENT_TIME,
3590                        time_string, rtsFalse/*no commas!*/);
3591
3592   sched_belch("all threads at [%s]:", time_string);
3593 # else
3594   sched_belch("all threads:");
3595 # endif
3596
3597   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3598     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3599     label = lookupThreadLabel((StgWord)t);
3600     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3601     printThreadStatus(t);
3602     fprintf(stderr,"\n");
3603   }
3604 }
3605     
3606 /* 
3607    Print a whole blocking queue attached to node (debugging only).
3608 */
3609 //@cindex print_bq
3610 # if defined(PAR)
3611 void 
3612 print_bq (StgClosure *node)
3613 {
3614   StgBlockingQueueElement *bqe;
3615   StgTSO *tso;
3616   rtsBool end;
3617
3618   fprintf(stderr,"## BQ of closure %p (%s): ",
3619           node, info_type(node));
3620
3621   /* should cover all closures that may have a blocking queue */
3622   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3623          get_itbl(node)->type == FETCH_ME_BQ ||
3624          get_itbl(node)->type == RBH ||
3625          get_itbl(node)->type == MVAR);
3626     
3627   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3628
3629   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3630 }
3631
3632 /* 
3633    Print a whole blocking queue starting with the element bqe.
3634 */
3635 void 
3636 print_bqe (StgBlockingQueueElement *bqe)
3637 {
3638   rtsBool end;
3639
3640   /* 
3641      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3642   */
3643   for (end = (bqe==END_BQ_QUEUE);
3644        !end; // iterate until bqe points to a CONSTR
3645        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3646        bqe = end ? END_BQ_QUEUE : bqe->link) {
3647     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3648     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3649     /* types of closures that may appear in a blocking queue */
3650     ASSERT(get_itbl(bqe)->type == TSO ||           
3651            get_itbl(bqe)->type == BLOCKED_FETCH || 
3652            get_itbl(bqe)->type == CONSTR); 
3653     /* only BQs of an RBH end with an RBH_Save closure */
3654     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3655
3656     switch (get_itbl(bqe)->type) {
3657     case TSO:
3658       fprintf(stderr," TSO %u (%x),",
3659               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3660       break;
3661     case BLOCKED_FETCH:
3662       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3663               ((StgBlockedFetch *)bqe)->node, 
3664               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3665               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3666               ((StgBlockedFetch *)bqe)->ga.weight);
3667       break;
3668     case CONSTR:
3669       fprintf(stderr," %s (IP %p),",
3670               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3671                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3672                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3673                "RBH_Save_?"), get_itbl(bqe));
3674       break;
3675     default:
3676       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3677            info_type((StgClosure *)bqe)); // , node, info_type(node));
3678       break;
3679     }
3680   } /* for */
3681   fputc('\n', stderr);
3682 }
3683 # elif defined(GRAN)
3684 void 
3685 print_bq (StgClosure *node)
3686 {
3687   StgBlockingQueueElement *bqe;
3688   PEs node_loc, tso_loc;
3689   rtsBool end;
3690
3691   /* should cover all closures that may have a blocking queue */
3692   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3693          get_itbl(node)->type == FETCH_ME_BQ ||
3694          get_itbl(node)->type == RBH);
3695     
3696   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3697   node_loc = where_is(node);
3698
3699   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3700           node, info_type(node), node_loc);
3701
3702   /* 
3703      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3704   */
3705   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3706        !end; // iterate until bqe points to a CONSTR
3707        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3708     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3709     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3710     /* types of closures that may appear in a blocking queue */
3711     ASSERT(get_itbl(bqe)->type == TSO ||           
3712            get_itbl(bqe)->type == CONSTR); 
3713     /* only BQs of an RBH end with an RBH_Save closure */
3714     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3715
3716     tso_loc = where_is((StgClosure *)bqe);
3717     switch (get_itbl(bqe)->type) {
3718     case TSO:
3719       fprintf(stderr," TSO %d (%p) on [PE %d],",
3720               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3721       break;
3722     case CONSTR:
3723       fprintf(stderr," %s (IP %p),",
3724               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3725                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3726                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3727                "RBH_Save_?"), get_itbl(bqe));
3728       break;
3729     default:
3730       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3731            info_type((StgClosure *)bqe), node, info_type(node));
3732       break;
3733     }
3734   } /* for */
3735   fputc('\n', stderr);
3736 }
3737 #else
3738 /* 
3739    Nice and easy: only TSOs on the blocking queue
3740 */
3741 void 
3742 print_bq (StgClosure *node)
3743 {
3744   StgTSO *tso;
3745
3746   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3747   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3748        tso != END_TSO_QUEUE; 
3749        tso=tso->link) {
3750     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3751     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3752     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3753   }
3754   fputc('\n', stderr);
3755 }
3756 # endif
3757
3758 #if defined(PAR)
3759 static nat
3760 run_queue_len(void)
3761 {
3762   nat i;
3763   StgTSO *tso;
3764
3765   for (i=0, tso=run_queue_hd; 
3766        tso != END_TSO_QUEUE;
3767        i++, tso=tso->link)
3768     /* nothing */
3769
3770   return i;
3771 }
3772 #endif
3773
3774 static void
3775 sched_belch(char *s, ...)
3776 {
3777   va_list ap;
3778   va_start(ap,s);
3779 #ifdef SMP
3780   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3781 #elif defined(PAR)
3782   fprintf(stderr, "== ");
3783 #else
3784   fprintf(stderr, "scheduler: ");
3785 #endif
3786   vfprintf(stderr, s, ap);
3787   fprintf(stderr, "\n");
3788   va_end(ap);
3789 }
3790
3791 #endif /* DEBUG */
3792
3793
3794 //@node Index,  , Debugging Routines, Main scheduling code
3795 //@subsection Index
3796
3797 //@index
3798 //* StgMainThread::  @cindex\s-+StgMainThread
3799 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3800 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3801 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3802 //* context_switch::  @cindex\s-+context_switch
3803 //* createThread::  @cindex\s-+createThread
3804 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3805 //* initScheduler::  @cindex\s-+initScheduler
3806 //* interrupted::  @cindex\s-+interrupted
3807 //* next_thread_id::  @cindex\s-+next_thread_id
3808 //* print_bq::  @cindex\s-+print_bq
3809 //* run_queue_hd::  @cindex\s-+run_queue_hd
3810 //* run_queue_tl::  @cindex\s-+run_queue_tl
3811 //* sched_mutex::  @cindex\s-+sched_mutex
3812 //* schedule::  @cindex\s-+schedule
3813 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3814 //* term_mutex::  @cindex\s-+term_mutex
3815 //@end index