[project @ 2002-06-19 20:45:14 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.145 2002/06/19 20:45:15 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #ifdef HAVE_SYS_TYPES_H
118 #include <sys/types.h>
119 #endif
120 #ifdef HAVE_UNISTD_H
121 #include <unistd.h>
122 #endif
123
124 #include <stdarg.h>
125
126 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
127 //@subsection Variables and Data structures
128
129 /* Main thread queue.
130  * Locks required: sched_mutex.
131  */
132 StgMainThread *main_threads;
133
134 /* Thread queues.
135  * Locks required: sched_mutex.
136  */
137 #if defined(GRAN)
138
139 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
140 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
141
142 /* 
143    In GranSim we have a runnable and a blocked queue for each processor.
144    In order to minimise code changes new arrays run_queue_hds/tls
145    are created. run_queue_hd is then a short cut (macro) for
146    run_queue_hds[CurrentProc] (see GranSim.h).
147    -- HWL
148 */
149 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
150 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
151 StgTSO *ccalling_threadss[MAX_PROC];
152 /* We use the same global list of threads (all_threads) in GranSim as in
153    the std RTS (i.e. we are cheating). However, we don't use this list in
154    the GranSim specific code at the moment (so we are only potentially
155    cheating).  */
156
157 #else /* !GRAN */
158
159 StgTSO *run_queue_hd, *run_queue_tl;
160 StgTSO *blocked_queue_hd, *blocked_queue_tl;
161 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
162
163 #endif
164
165 /* Linked list of all threads.
166  * Used for detecting garbage collected threads.
167  */
168 StgTSO *all_threads;
169
170 /* When a thread performs a safe C call (_ccall_GC, using old
171  * terminology), it gets put on the suspended_ccalling_threads
172  * list. Used by the garbage collector.
173  */
174 static StgTSO *suspended_ccalling_threads;
175
176 static StgTSO *threadStackOverflow(StgTSO *tso);
177
178 /* KH: The following two flags are shared memory locations.  There is no need
179        to lock them, since they are only unset at the end of a scheduler
180        operation.
181 */
182
183 /* flag set by signal handler to precipitate a context switch */
184 //@cindex context_switch
185 nat context_switch;
186
187 /* if this flag is set as well, give up execution */
188 //@cindex interrupted
189 rtsBool interrupted;
190
191 /* Next thread ID to allocate.
192  * Locks required: thread_id_mutex
193  */
194 //@cindex next_thread_id
195 StgThreadID next_thread_id = 1;
196
197 /*
198  * Pointers to the state of the current thread.
199  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
200  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
201  */
202  
203 /* The smallest stack size that makes any sense is:
204  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
205  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
206  *  + 1                       (the realworld token for an IO thread)
207  *  + 1                       (the closure to enter)
208  *
209  * A thread with this stack will bomb immediately with a stack
210  * overflow, which will increase its stack size.  
211  */
212
213 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
214
215
216 #if defined(GRAN)
217 StgTSO *CurrentTSO;
218 #endif
219
220 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
221  *  exists - earlier gccs apparently didn't.
222  *  -= chak
223  */
224 StgTSO dummy_tso;
225
226 rtsBool ready_to_gc;
227
228 /*
229  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
230  * in an MT setting, needed to signal that a worker thread shouldn't hang around
231  * in the scheduler when it is out of work.
232  */
233 static rtsBool shutting_down_scheduler = rtsFalse;
234
235 void            addToBlockedQueue ( StgTSO *tso );
236
237 static void     schedule          ( void );
238        void     interruptStgRts   ( void );
239
240 static void     detectBlackHoles  ( void );
241
242 #ifdef DEBUG
243 static void sched_belch(char *s, ...);
244 #endif
245
246 #if defined(RTS_SUPPORTS_THREADS)
247 /* ToDo: carefully document the invariants that go together
248  *       with these synchronisation objects.
249  */
250 Mutex     sched_mutex       = INIT_MUTEX_VAR;
251 Mutex     term_mutex        = INIT_MUTEX_VAR;
252
253 /*
254  * A heavyweight solution to the problem of protecting
255  * the thread_id from concurrent update.
256  */
257 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
258
259
260 # if defined(SMP)
261 static Condition gc_pending_cond = INIT_COND_VAR;
262 nat await_death;
263 # endif
264
265 #endif /* RTS_SUPPORTS_THREADS */
266
267 #if defined(PAR)
268 StgTSO *LastTSO;
269 rtsTime TimeOfLastYield;
270 rtsBool emitSchedule = rtsTrue;
271 #endif
272
273 #if DEBUG
274 char *whatNext_strs[] = {
275   "ThreadEnterGHC",
276   "ThreadRunGHC",
277   "ThreadEnterInterp",
278   "ThreadKilled",
279   "ThreadComplete"
280 };
281
282 char *threadReturnCode_strs[] = {
283   "HeapOverflow",                       /* might also be StackOverflow */
284   "StackOverflow",
285   "ThreadYielding",
286   "ThreadBlocked",
287   "ThreadFinished"
288 };
289 #endif
290
291 #if defined(PAR)
292 StgTSO * createSparkThread(rtsSpark spark);
293 StgTSO * activateSpark (rtsSpark spark);  
294 #endif
295
296 /*
297  * The thread state for the main thread.
298 // ToDo: check whether not needed any more
299 StgTSO   *MainTSO;
300  */
301
302 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
303 static void taskStart(void);
304 static void
305 taskStart(void)
306 {
307   schedule();
308 }
309 #endif
310
311
312
313
314 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
315 //@subsection Main scheduling loop
316
317 /* ---------------------------------------------------------------------------
318    Main scheduling loop.
319
320    We use round-robin scheduling, each thread returning to the
321    scheduler loop when one of these conditions is detected:
322
323       * out of heap space
324       * timer expires (thread yields)
325       * thread blocks
326       * thread ends
327       * stack overflow
328
329    Locking notes:  we acquire the scheduler lock once at the beginning
330    of the scheduler loop, and release it when
331     
332       * running a thread, or
333       * waiting for work, or
334       * waiting for a GC to complete.
335
336    GRAN version:
337      In a GranSim setup this loop iterates over the global event queue.
338      This revolves around the global event queue, which determines what 
339      to do next. Therefore, it's more complicated than either the 
340      concurrent or the parallel (GUM) setup.
341
342    GUM version:
343      GUM iterates over incoming messages.
344      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
345      and sends out a fish whenever it has nothing to do; in-between
346      doing the actual reductions (shared code below) it processes the
347      incoming messages and deals with delayed operations 
348      (see PendingFetches).
349      This is not the ugliest code you could imagine, but it's bloody close.
350
351    ------------------------------------------------------------------------ */
352 //@cindex schedule
353 static void
354 schedule( void )
355 {
356   StgTSO *t;
357   Capability *cap;
358   StgThreadReturnCode ret;
359 #if defined(GRAN)
360   rtsEvent *event;
361 #elif defined(PAR)
362   StgSparkPool *pool;
363   rtsSpark spark;
364   StgTSO *tso;
365   GlobalTaskId pe;
366   rtsBool receivedFinish = rtsFalse;
367 # if defined(DEBUG)
368   nat tp_size, sp_size; // stats only
369 # endif
370 #endif
371   rtsBool was_interrupted = rtsFalse;
372   
373   ACQUIRE_LOCK(&sched_mutex);
374  
375 #if defined(RTS_SUPPORTS_THREADS)
376   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
377 #else
378   /* simply initialise it in the non-threaded case */
379   grabCapability(&cap);
380 #endif
381
382 #if defined(GRAN)
383   /* set up first event to get things going */
384   /* ToDo: assign costs for system setup and init MainTSO ! */
385   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
386             ContinueThread, 
387             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
388
389   IF_DEBUG(gran,
390            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391            G_TSO(CurrentTSO, 5));
392
393   if (RtsFlags.GranFlags.Light) {
394     /* Save current time; GranSim Light only */
395     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
396   }      
397
398   event = get_next_event();
399
400   while (event!=(rtsEvent*)NULL) {
401     /* Choose the processor with the next event */
402     CurrentProc = event->proc;
403     CurrentTSO = event->tso;
404
405 #elif defined(PAR)
406
407   while (!receivedFinish) {    /* set by processMessages */
408                                /* when receiving PP_FINISH message         */ 
409 #else
410
411   while (1) {
412
413 #endif
414
415     IF_DEBUG(scheduler, printAllThreads());
416
417 #if defined(RTS_SUPPORTS_THREADS)
418     /* Check to see whether there are any worker threads
419        waiting to deposit external call results. If so,
420        yield our capability */
421     yieldToReturningWorker(&sched_mutex, &cap);
422 #endif
423
424     /* If we're interrupted (the user pressed ^C, or some other
425      * termination condition occurred), kill all the currently running
426      * threads.
427      */
428     if (interrupted) {
429       IF_DEBUG(scheduler, sched_belch("interrupted"));
430       deleteAllThreads();
431       interrupted = rtsFalse;
432       was_interrupted = rtsTrue;
433     }
434
435     /* Go through the list of main threads and wake up any
436      * clients whose computations have finished.  ToDo: this
437      * should be done more efficiently without a linear scan
438      * of the main threads list, somehow...
439      */
440 #if defined(RTS_SUPPORTS_THREADS)
441     { 
442       StgMainThread *m, **prev;
443       prev = &main_threads;
444       for (m = main_threads; m != NULL; m = m->link) {
445         switch (m->tso->what_next) {
446         case ThreadComplete:
447           if (m->ret) {
448             *(m->ret) = (StgClosure *)m->tso->sp[0];
449           }
450           *prev = m->link;
451           m->stat = Success;
452           broadcastCondition(&m->wakeup);
453 #ifdef DEBUG
454           free(m->tso->label);
455           m->tso->label = NULL;
456 #endif
457           break;
458         case ThreadKilled:
459           if (m->ret) *(m->ret) = NULL;
460           *prev = m->link;
461           if (was_interrupted) {
462             m->stat = Interrupted;
463           } else {
464             m->stat = Killed;
465           }
466           broadcastCondition(&m->wakeup);
467 #ifdef DEBUG
468           free(m->tso->label);
469           m->tso->label = NULL;
470 #endif
471           break;
472         default:
473           break;
474         }
475       }
476     }
477
478 #else /* not threaded */
479
480 # if defined(PAR)
481     /* in GUM do this only on the Main PE */
482     if (IAmMainThread)
483 # endif
484     /* If our main thread has finished or been killed, return.
485      */
486     {
487       StgMainThread *m = main_threads;
488       if (m->tso->what_next == ThreadComplete
489           || m->tso->what_next == ThreadKilled) {
490 #ifdef DEBUG
491         free(m->tso->label);
492         m->tso->label = NULL;
493 #endif
494         main_threads = main_threads->link;
495         if (m->tso->what_next == ThreadComplete) {
496           /* we finished successfully, fill in the return value */
497           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
498           m->stat = Success;
499           return;
500         } else {
501           if (m->ret) { *(m->ret) = NULL; };
502           if (was_interrupted) {
503             m->stat = Interrupted;
504           } else {
505             m->stat = Killed;
506           }
507           return;
508         }
509       }
510     }
511 #endif
512
513     /* Top up the run queue from our spark pool.  We try to make the
514      * number of threads in the run queue equal to the number of
515      * free capabilities.
516      *
517      * Disable spark support in SMP for now, non-essential & requires
518      * a little bit of work to make it compile cleanly. -- sof 1/02.
519      */
520 #if 0 /* defined(SMP) */
521     {
522       nat n = getFreeCapabilities();
523       StgTSO *tso = run_queue_hd;
524
525       /* Count the run queue */
526       while (n > 0 && tso != END_TSO_QUEUE) {
527         tso = tso->link;
528         n--;
529       }
530
531       for (; n > 0; n--) {
532         StgClosure *spark;
533         spark = findSpark(rtsFalse);
534         if (spark == NULL) {
535           break; /* no more sparks in the pool */
536         } else {
537           /* I'd prefer this to be done in activateSpark -- HWL */
538           /* tricky - it needs to hold the scheduler lock and
539            * not try to re-acquire it -- SDM */
540           createSparkThread(spark);       
541           IF_DEBUG(scheduler,
542                    sched_belch("==^^ turning spark of closure %p into a thread",
543                                (StgClosure *)spark));
544         }
545       }
546       /* We need to wake up the other tasks if we just created some
547        * work for them.
548        */
549       if (getFreeCapabilities() - n > 1) {
550           signalCondition( &thread_ready_cond );
551       }
552     }
553 #endif // SMP
554
555     /* check for signals each time around the scheduler */
556 #ifndef mingw32_TARGET_OS
557     if (signals_pending()) {
558       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
559       startSignalHandlers();
560       ACQUIRE_LOCK(&sched_mutex);
561     }
562 #endif
563
564     /* Check whether any waiting threads need to be woken up.  If the
565      * run queue is empty, and there are no other tasks running, we
566      * can wait indefinitely for something to happen.
567      * ToDo: what if another client comes along & requests another
568      * main thread?
569      */
570     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
571       awaitEvent( EMPTY_RUN_QUEUE()
572 #if defined(SMP)
573         && allFreeCapabilities()
574 #endif
575         );
576     }
577     /* we can be interrupted while waiting for I/O... */
578     if (interrupted) continue;
579
580     /* 
581      * Detect deadlock: when we have no threads to run, there are no
582      * threads waiting on I/O or sleeping, and all the other tasks are
583      * waiting for work, we must have a deadlock of some description.
584      *
585      * We first try to find threads blocked on themselves (ie. black
586      * holes), and generate NonTermination exceptions where necessary.
587      *
588      * If no threads are black holed, we have a deadlock situation, so
589      * inform all the main threads.
590      */
591 #ifndef PAR
592     if (   EMPTY_THREAD_QUEUES()
593 #if defined(RTS_SUPPORTS_THREADS)
594         && EMPTY_QUEUE(suspended_ccalling_threads)
595 #endif
596 #ifdef SMP
597         && allFreeCapabilities()
598 #endif
599         )
600     {
601         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
602 #if defined(THREADED_RTS)
603         /* and SMP mode ..? */
604         releaseCapability(cap);
605 #endif
606         // Garbage collection can release some new threads due to
607         // either (a) finalizers or (b) threads resurrected because
608         // they are about to be send BlockedOnDeadMVar.  Any threads
609         // thus released will be immediately runnable.
610         GarbageCollect(GetRoots,rtsTrue);
611
612         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
613
614         IF_DEBUG(scheduler, 
615                  sched_belch("still deadlocked, checking for black holes..."));
616         detectBlackHoles();
617
618         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
619
620 #ifndef mingw32_TARGET_OS
621         /* If we have user-installed signal handlers, then wait
622          * for signals to arrive rather then bombing out with a
623          * deadlock.
624          */
625 #if defined(RTS_SUPPORTS_THREADS)
626         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
627                       a signal with no runnable threads (or I/O
628                       suspended ones) leads nowhere quick.
629                       For now, simply shut down when we reach this
630                       condition.
631                       
632                       ToDo: define precisely under what conditions
633                       the Scheduler should shut down in an MT setting.
634                    */
635 #else
636         if ( anyUserHandlers() ) {
637 #endif
638             IF_DEBUG(scheduler, 
639                      sched_belch("still deadlocked, waiting for signals..."));
640
641             awaitUserSignals();
642
643             // we might be interrupted...
644             if (interrupted) { continue; }
645
646             if (signals_pending()) {
647                 RELEASE_LOCK(&sched_mutex);
648                 startSignalHandlers();
649                 ACQUIRE_LOCK(&sched_mutex);
650             }
651             ASSERT(!EMPTY_RUN_QUEUE());
652             goto not_deadlocked;
653         }
654 #endif
655
656         /* Probably a real deadlock.  Send the current main thread the
657          * Deadlock exception (or in the SMP build, send *all* main
658          * threads the deadlock exception, since none of them can make
659          * progress).
660          */
661         {
662             StgMainThread *m;
663 #if defined(RTS_SUPPORTS_THREADS)
664             for (m = main_threads; m != NULL; m = m->link) {
665                 switch (m->tso->why_blocked) {
666                 case BlockedOnBlackHole:
667                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
668                     break;
669                 case BlockedOnException:
670                 case BlockedOnMVar:
671                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
672                     break;
673                 default:
674                     barf("deadlock: main thread blocked in a strange way");
675                 }
676             }
677 #else
678             m = main_threads;
679             switch (m->tso->why_blocked) {
680             case BlockedOnBlackHole:
681                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
682                 break;
683             case BlockedOnException:
684             case BlockedOnMVar:
685                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
686                 break;
687             default:
688                 barf("deadlock: main thread blocked in a strange way");
689             }
690 #endif
691         }
692
693 #if defined(RTS_SUPPORTS_THREADS)
694         /* ToDo: revisit conditions (and mechanism) for shutting
695            down a multi-threaded world  */
696         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
697         RELEASE_LOCK(&sched_mutex);
698         shutdownHaskell();
699         return;
700 #endif
701     }
702   not_deadlocked:
703
704 #elif defined(PAR)
705     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
706 #endif
707
708 #if defined(SMP)
709     /* If there's a GC pending, don't do anything until it has
710      * completed.
711      */
712     if (ready_to_gc) {
713       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
714       waitCondition( &gc_pending_cond, &sched_mutex );
715     }
716 #endif    
717
718 #if defined(RTS_SUPPORTS_THREADS)
719     /* block until we've got a thread on the run queue and a free
720      * capability.
721      *
722      */
723     if ( EMPTY_RUN_QUEUE() ) {
724       /* Give up our capability */
725       releaseCapability(cap);
726
727       /* If we're in the process of shutting down (& running the
728        * a batch of finalisers), don't wait around.
729        */
730       if ( shutting_down_scheduler ) {
731         RELEASE_LOCK(&sched_mutex);
732         return;
733       }
734       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
735       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
736       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
737     }
738 #endif
739
740 #if defined(GRAN)
741     if (RtsFlags.GranFlags.Light)
742       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
743
744     /* adjust time based on time-stamp */
745     if (event->time > CurrentTime[CurrentProc] &&
746         event->evttype != ContinueThread)
747       CurrentTime[CurrentProc] = event->time;
748     
749     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
750     if (!RtsFlags.GranFlags.Light)
751       handleIdlePEs();
752
753     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
754
755     /* main event dispatcher in GranSim */
756     switch (event->evttype) {
757       /* Should just be continuing execution */
758     case ContinueThread:
759       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
760       /* ToDo: check assertion
761       ASSERT(run_queue_hd != (StgTSO*)NULL &&
762              run_queue_hd != END_TSO_QUEUE);
763       */
764       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
765       if (!RtsFlags.GranFlags.DoAsyncFetch &&
766           procStatus[CurrentProc]==Fetching) {
767         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
768               CurrentTSO->id, CurrentTSO, CurrentProc);
769         goto next_thread;
770       } 
771       /* Ignore ContinueThreads for completed threads */
772       if (CurrentTSO->what_next == ThreadComplete) {
773         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
774               CurrentTSO->id, CurrentTSO, CurrentProc);
775         goto next_thread;
776       } 
777       /* Ignore ContinueThreads for threads that are being migrated */
778       if (PROCS(CurrentTSO)==Nowhere) { 
779         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
780               CurrentTSO->id, CurrentTSO, CurrentProc);
781         goto next_thread;
782       }
783       /* The thread should be at the beginning of the run queue */
784       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
785         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
786               CurrentTSO->id, CurrentTSO, CurrentProc);
787         break; // run the thread anyway
788       }
789       /*
790       new_event(proc, proc, CurrentTime[proc],
791                 FindWork,
792                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
793       goto next_thread; 
794       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
795       break; // now actually run the thread; DaH Qu'vam yImuHbej 
796
797     case FetchNode:
798       do_the_fetchnode(event);
799       goto next_thread;             /* handle next event in event queue  */
800       
801     case GlobalBlock:
802       do_the_globalblock(event);
803       goto next_thread;             /* handle next event in event queue  */
804       
805     case FetchReply:
806       do_the_fetchreply(event);
807       goto next_thread;             /* handle next event in event queue  */
808       
809     case UnblockThread:   /* Move from the blocked queue to the tail of */
810       do_the_unblock(event);
811       goto next_thread;             /* handle next event in event queue  */
812       
813     case ResumeThread:  /* Move from the blocked queue to the tail of */
814       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
815       event->tso->gran.blocktime += 
816         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
817       do_the_startthread(event);
818       goto next_thread;             /* handle next event in event queue  */
819       
820     case StartThread:
821       do_the_startthread(event);
822       goto next_thread;             /* handle next event in event queue  */
823       
824     case MoveThread:
825       do_the_movethread(event);
826       goto next_thread;             /* handle next event in event queue  */
827       
828     case MoveSpark:
829       do_the_movespark(event);
830       goto next_thread;             /* handle next event in event queue  */
831       
832     case FindWork:
833       do_the_findwork(event);
834       goto next_thread;             /* handle next event in event queue  */
835       
836     default:
837       barf("Illegal event type %u\n", event->evttype);
838     }  /* switch */
839     
840     /* This point was scheduler_loop in the old RTS */
841
842     IF_DEBUG(gran, belch("GRAN: after main switch"));
843
844     TimeOfLastEvent = CurrentTime[CurrentProc];
845     TimeOfNextEvent = get_time_of_next_event();
846     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
847     // CurrentTSO = ThreadQueueHd;
848
849     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
850                          TimeOfNextEvent));
851
852     if (RtsFlags.GranFlags.Light) 
853       GranSimLight_leave_system(event, &ActiveTSO); 
854
855     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
856
857     IF_DEBUG(gran, 
858              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
859
860     /* in a GranSim setup the TSO stays on the run queue */
861     t = CurrentTSO;
862     /* Take a thread from the run queue. */
863     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
864
865     IF_DEBUG(gran, 
866              fprintf(stderr, "GRAN: About to run current thread, which is\n");
867              G_TSO(t,5));
868
869     context_switch = 0; // turned on via GranYield, checking events and time slice
870
871     IF_DEBUG(gran, 
872              DumpGranEvent(GR_SCHEDULE, t));
873
874     procStatus[CurrentProc] = Busy;
875
876 #elif defined(PAR)
877     if (PendingFetches != END_BF_QUEUE) {
878         processFetches();
879     }
880
881     /* ToDo: phps merge with spark activation above */
882     /* check whether we have local work and send requests if we have none */
883     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
884       /* :-[  no local threads => look out for local sparks */
885       /* the spark pool for the current PE */
886       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
887       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
888           pool->hd < pool->tl) {
889         /* 
890          * ToDo: add GC code check that we really have enough heap afterwards!!
891          * Old comment:
892          * If we're here (no runnable threads) and we have pending
893          * sparks, we must have a space problem.  Get enough space
894          * to turn one of those pending sparks into a
895          * thread... 
896          */
897
898         spark = findSpark(rtsFalse);                /* get a spark */
899         if (spark != (rtsSpark) NULL) {
900           tso = activateSpark(spark);       /* turn the spark into a thread */
901           IF_PAR_DEBUG(schedule,
902                        belch("==== schedule: Created TSO %d (%p); %d threads active",
903                              tso->id, tso, advisory_thread_count));
904
905           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
906             belch("==^^ failed to activate spark");
907             goto next_thread;
908           }               /* otherwise fall through & pick-up new tso */
909         } else {
910           IF_PAR_DEBUG(verbose,
911                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
912                              spark_queue_len(pool)));
913           goto next_thread;
914         }
915       }
916
917       /* If we still have no work we need to send a FISH to get a spark
918          from another PE 
919       */
920       if (EMPTY_RUN_QUEUE()) {
921       /* =8-[  no local sparks => look for work on other PEs */
922         /*
923          * We really have absolutely no work.  Send out a fish
924          * (there may be some out there already), and wait for
925          * something to arrive.  We clearly can't run any threads
926          * until a SCHEDULE or RESUME arrives, and so that's what
927          * we're hoping to see.  (Of course, we still have to
928          * respond to other types of messages.)
929          */
930         TIME now = msTime() /*CURRENT_TIME*/;
931         IF_PAR_DEBUG(verbose, 
932                      belch("--  now=%ld", now));
933         IF_PAR_DEBUG(verbose,
934                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
935                          (last_fish_arrived_at!=0 &&
936                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
937                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
938                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
939                              last_fish_arrived_at,
940                              RtsFlags.ParFlags.fishDelay, now);
941                      });
942         
943         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
944             (last_fish_arrived_at==0 ||
945              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
946           /* outstandingFishes is set in sendFish, processFish;
947              avoid flooding system with fishes via delay */
948           pe = choosePE();
949           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
950                    NEW_FISH_HUNGER);
951
952           // Global statistics: count no. of fishes
953           if (RtsFlags.ParFlags.ParStats.Global &&
954               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
955             globalParStats.tot_fish_mess++;
956           }
957         }
958       
959         receivedFinish = processMessages();
960         goto next_thread;
961       }
962     } else if (PacketsWaiting()) {  /* Look for incoming messages */
963       receivedFinish = processMessages();
964     }
965
966     /* Now we are sure that we have some work available */
967     ASSERT(run_queue_hd != END_TSO_QUEUE);
968
969     /* Take a thread from the run queue, if we have work */
970     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
971     IF_DEBUG(sanity,checkTSO(t));
972
973     /* ToDo: write something to the log-file
974     if (RTSflags.ParFlags.granSimStats && !sameThread)
975         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
976
977     CurrentTSO = t;
978     */
979     /* the spark pool for the current PE */
980     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
981
982     IF_DEBUG(scheduler, 
983              belch("--=^ %d threads, %d sparks on [%#x]", 
984                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
985
986 # if 1
987     if (0 && RtsFlags.ParFlags.ParStats.Full && 
988         t && LastTSO && t->id != LastTSO->id && 
989         LastTSO->why_blocked == NotBlocked && 
990         LastTSO->what_next != ThreadComplete) {
991       // if previously scheduled TSO not blocked we have to record the context switch
992       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
993                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
994     }
995
996     if (RtsFlags.ParFlags.ParStats.Full && 
997         (emitSchedule /* forced emit */ ||
998         (t && LastTSO && t->id != LastTSO->id))) {
999       /* 
1000          we are running a different TSO, so write a schedule event to log file
1001          NB: If we use fair scheduling we also have to write  a deschedule 
1002              event for LastTSO; with unfair scheduling we know that the
1003              previous tso has blocked whenever we switch to another tso, so
1004              we don't need it in GUM for now
1005       */
1006       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1007                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1008       emitSchedule = rtsFalse;
1009     }
1010      
1011 # endif
1012 #else /* !GRAN && !PAR */
1013   
1014     /* grab a thread from the run queue */
1015     ASSERT(run_queue_hd != END_TSO_QUEUE);
1016     t = POP_RUN_QUEUE();
1017     // Sanity check the thread we're about to run.  This can be
1018     // expensive if there is lots of thread switching going on...
1019     IF_DEBUG(sanity,checkTSO(t));
1020 #endif
1021     
1022     cap->r.rCurrentTSO = t;
1023     
1024     /* context switches are now initiated by the timer signal, unless
1025      * the user specified "context switch as often as possible", with
1026      * +RTS -C0
1027      */
1028     if (
1029 #ifdef PROFILING
1030         RtsFlags.ProfFlags.profileInterval == 0 ||
1031 #endif
1032         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1033          && (run_queue_hd != END_TSO_QUEUE
1034              || blocked_queue_hd != END_TSO_QUEUE
1035              || sleeping_queue != END_TSO_QUEUE)))
1036         context_switch = 1;
1037     else
1038         context_switch = 0;
1039
1040     RELEASE_LOCK(&sched_mutex);
1041
1042     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1043                               t->id, t, whatNext_strs[t->what_next]));
1044
1045 #ifdef PROFILING
1046     startHeapProfTimer();
1047 #endif
1048
1049     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1050     /* Run the current thread 
1051      */
1052     switch (cap->r.rCurrentTSO->what_next) {
1053     case ThreadKilled:
1054     case ThreadComplete:
1055         /* Thread already finished, return to scheduler. */
1056         ret = ThreadFinished;
1057         break;
1058     case ThreadEnterGHC:
1059         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1060         break;
1061     case ThreadRunGHC:
1062         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1063         break;
1064     case ThreadEnterInterp:
1065         ret = interpretBCO(cap);
1066         break;
1067     default:
1068       barf("schedule: invalid what_next field");
1069     }
1070     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1071     
1072     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1073 #ifdef PROFILING
1074     stopHeapProfTimer();
1075     CCCS = CCS_SYSTEM;
1076 #endif
1077     
1078     ACQUIRE_LOCK(&sched_mutex);
1079
1080 #ifdef SMP
1081     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1082 #elif !defined(GRAN) && !defined(PAR)
1083     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1084 #endif
1085     t = cap->r.rCurrentTSO;
1086     
1087 #if defined(PAR)
1088     /* HACK 675: if the last thread didn't yield, make sure to print a 
1089        SCHEDULE event to the log file when StgRunning the next thread, even
1090        if it is the same one as before */
1091     LastTSO = t; 
1092     TimeOfLastYield = CURRENT_TIME;
1093 #endif
1094
1095     switch (ret) {
1096     case HeapOverflow:
1097 #if defined(GRAN)
1098       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1099       globalGranStats.tot_heapover++;
1100 #elif defined(PAR)
1101       globalParStats.tot_heapover++;
1102 #endif
1103
1104       // did the task ask for a large block?
1105       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1106           // if so, get one and push it on the front of the nursery.
1107           bdescr *bd;
1108           nat blocks;
1109           
1110           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1111
1112           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1113                                    t->id, t,
1114                                    whatNext_strs[t->what_next], blocks));
1115
1116           // don't do this if it would push us over the
1117           // alloc_blocks_lim limit; we'll GC first.
1118           if (alloc_blocks + blocks < alloc_blocks_lim) {
1119
1120               alloc_blocks += blocks;
1121               bd = allocGroup( blocks );
1122
1123               // link the new group into the list
1124               bd->link = cap->r.rCurrentNursery;
1125               bd->u.back = cap->r.rCurrentNursery->u.back;
1126               if (cap->r.rCurrentNursery->u.back != NULL) {
1127                   cap->r.rCurrentNursery->u.back->link = bd;
1128               } else {
1129                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1130                          g0s0->blocks == cap->r.rNursery);
1131                   cap->r.rNursery = g0s0->blocks = bd;
1132               }           
1133               cap->r.rCurrentNursery->u.back = bd;
1134
1135               // initialise it as a nursery block
1136               bd->step = g0s0;
1137               bd->gen_no = 0;
1138               bd->flags = 0;
1139               bd->free = bd->start;
1140
1141               // don't forget to update the block count in g0s0.
1142               g0s0->n_blocks += blocks;
1143               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1144
1145               // now update the nursery to point to the new block
1146               cap->r.rCurrentNursery = bd;
1147
1148               // we might be unlucky and have another thread get on the
1149               // run queue before us and steal the large block, but in that
1150               // case the thread will just end up requesting another large
1151               // block.
1152               PUSH_ON_RUN_QUEUE(t);
1153               break;
1154           }
1155       }
1156
1157       /* make all the running tasks block on a condition variable,
1158        * maybe set context_switch and wait till they all pile in,
1159        * then have them wait on a GC condition variable.
1160        */
1161       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1162                                t->id, t, whatNext_strs[t->what_next]));
1163       threadPaused(t);
1164 #if defined(GRAN)
1165       ASSERT(!is_on_queue(t,CurrentProc));
1166 #elif defined(PAR)
1167       /* Currently we emit a DESCHEDULE event before GC in GUM.
1168          ToDo: either add separate event to distinguish SYSTEM time from rest
1169                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1170       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1171         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1172                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1173         emitSchedule = rtsTrue;
1174       }
1175 #endif
1176       
1177       ready_to_gc = rtsTrue;
1178       context_switch = 1;               /* stop other threads ASAP */
1179       PUSH_ON_RUN_QUEUE(t);
1180       /* actual GC is done at the end of the while loop */
1181       break;
1182       
1183     case StackOverflow:
1184 #if defined(GRAN)
1185       IF_DEBUG(gran, 
1186                DumpGranEvent(GR_DESCHEDULE, t));
1187       globalGranStats.tot_stackover++;
1188 #elif defined(PAR)
1189       // IF_DEBUG(par, 
1190       // DumpGranEvent(GR_DESCHEDULE, t);
1191       globalParStats.tot_stackover++;
1192 #endif
1193       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1194                                t->id, t, whatNext_strs[t->what_next]));
1195       /* just adjust the stack for this thread, then pop it back
1196        * on the run queue.
1197        */
1198       threadPaused(t);
1199       { 
1200         StgMainThread *m;
1201         /* enlarge the stack */
1202         StgTSO *new_t = threadStackOverflow(t);
1203         
1204         /* This TSO has moved, so update any pointers to it from the
1205          * main thread stack.  It better not be on any other queues...
1206          * (it shouldn't be).
1207          */
1208         for (m = main_threads; m != NULL; m = m->link) {
1209           if (m->tso == t) {
1210             m->tso = new_t;
1211           }
1212         }
1213         threadPaused(new_t);
1214         PUSH_ON_RUN_QUEUE(new_t);
1215       }
1216       break;
1217
1218     case ThreadYielding:
1219 #if defined(GRAN)
1220       IF_DEBUG(gran, 
1221                DumpGranEvent(GR_DESCHEDULE, t));
1222       globalGranStats.tot_yields++;
1223 #elif defined(PAR)
1224       // IF_DEBUG(par, 
1225       // DumpGranEvent(GR_DESCHEDULE, t);
1226       globalParStats.tot_yields++;
1227 #endif
1228       /* put the thread back on the run queue.  Then, if we're ready to
1229        * GC, check whether this is the last task to stop.  If so, wake
1230        * up the GC thread.  getThread will block during a GC until the
1231        * GC is finished.
1232        */
1233       IF_DEBUG(scheduler,
1234                if (t->what_next == ThreadEnterInterp) {
1235                    /* ToDo: or maybe a timer expired when we were in Hugs?
1236                     * or maybe someone hit ctrl-C
1237                     */
1238                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1239                          t->id, t, whatNext_strs[t->what_next]);
1240                } else {
1241                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1242                          t->id, t, whatNext_strs[t->what_next]);
1243                }
1244                );
1245
1246       threadPaused(t);
1247
1248       IF_DEBUG(sanity,
1249                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1250                checkTSO(t));
1251       ASSERT(t->link == END_TSO_QUEUE);
1252 #if defined(GRAN)
1253       ASSERT(!is_on_queue(t,CurrentProc));
1254
1255       IF_DEBUG(sanity,
1256                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1257                checkThreadQsSanity(rtsTrue));
1258 #endif
1259 #if defined(PAR)
1260       if (RtsFlags.ParFlags.doFairScheduling) { 
1261         /* this does round-robin scheduling; good for concurrency */
1262         APPEND_TO_RUN_QUEUE(t);
1263       } else {
1264         /* this does unfair scheduling; good for parallelism */
1265         PUSH_ON_RUN_QUEUE(t);
1266       }
1267 #else
1268       /* this does round-robin scheduling; good for concurrency */
1269       APPEND_TO_RUN_QUEUE(t);
1270 #endif
1271 #if defined(GRAN)
1272       /* add a ContinueThread event to actually process the thread */
1273       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1274                 ContinueThread,
1275                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1276       IF_GRAN_DEBUG(bq, 
1277                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1278                G_EVENTQ(0);
1279                G_CURR_THREADQ(0));
1280 #endif /* GRAN */
1281       break;
1282       
1283     case ThreadBlocked:
1284 #if defined(GRAN)
1285       IF_DEBUG(scheduler,
1286                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1287                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1288                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1289
1290       // ??? needed; should emit block before
1291       IF_DEBUG(gran, 
1292                DumpGranEvent(GR_DESCHEDULE, t)); 
1293       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1294       /*
1295         ngoq Dogh!
1296       ASSERT(procStatus[CurrentProc]==Busy || 
1297               ((procStatus[CurrentProc]==Fetching) && 
1298               (t->block_info.closure!=(StgClosure*)NULL)));
1299       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1300           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1301             procStatus[CurrentProc]==Fetching)) 
1302         procStatus[CurrentProc] = Idle;
1303       */
1304 #elif defined(PAR)
1305       IF_DEBUG(scheduler,
1306                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1307                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1308       IF_PAR_DEBUG(bq,
1309
1310                    if (t->block_info.closure!=(StgClosure*)NULL) 
1311                      print_bq(t->block_info.closure));
1312
1313       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1314       blockThread(t);
1315
1316       /* whatever we schedule next, we must log that schedule */
1317       emitSchedule = rtsTrue;
1318
1319 #else /* !GRAN */
1320       /* don't need to do anything.  Either the thread is blocked on
1321        * I/O, in which case we'll have called addToBlockedQueue
1322        * previously, or it's blocked on an MVar or Blackhole, in which
1323        * case it'll be on the relevant queue already.
1324        */
1325       IF_DEBUG(scheduler,
1326                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1327                printThreadBlockage(t);
1328                fprintf(stderr, "\n"));
1329
1330       /* Only for dumping event to log file 
1331          ToDo: do I need this in GranSim, too?
1332       blockThread(t);
1333       */
1334 #endif
1335       threadPaused(t);
1336       break;
1337       
1338     case ThreadFinished:
1339       /* Need to check whether this was a main thread, and if so, signal
1340        * the task that started it with the return value.  If we have no
1341        * more main threads, we probably need to stop all the tasks until
1342        * we get a new one.
1343        */
1344       /* We also end up here if the thread kills itself with an
1345        * uncaught exception, see Exception.hc.
1346        */
1347       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1348 #if defined(GRAN)
1349       endThread(t, CurrentProc); // clean-up the thread
1350 #elif defined(PAR)
1351       /* For now all are advisory -- HWL */
1352       //if(t->priority==AdvisoryPriority) ??
1353       advisory_thread_count--;
1354       
1355 # ifdef DIST
1356       if(t->dist.priority==RevalPriority)
1357         FinishReval(t);
1358 # endif
1359       
1360       if (RtsFlags.ParFlags.ParStats.Full &&
1361           !RtsFlags.ParFlags.ParStats.Suppressed) 
1362         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1363 #endif
1364       break;
1365       
1366     default:
1367       barf("schedule: invalid thread return code %d", (int)ret);
1368     }
1369
1370 #ifdef PROFILING
1371     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1372         GarbageCollect(GetRoots, rtsTrue);
1373         heapCensus();
1374         performHeapProfile = rtsFalse;
1375         ready_to_gc = rtsFalse; // we already GC'd
1376     }
1377 #endif
1378
1379     if (ready_to_gc 
1380 #ifdef SMP
1381         && allFreeCapabilities() 
1382 #endif
1383         ) {
1384       /* everybody back, start the GC.
1385        * Could do it in this thread, or signal a condition var
1386        * to do it in another thread.  Either way, we need to
1387        * broadcast on gc_pending_cond afterward.
1388        */
1389 #if defined(RTS_SUPPORTS_THREADS)
1390       IF_DEBUG(scheduler,sched_belch("doing GC"));
1391 #endif
1392       GarbageCollect(GetRoots,rtsFalse);
1393       ready_to_gc = rtsFalse;
1394 #ifdef SMP
1395       broadcastCondition(&gc_pending_cond);
1396 #endif
1397 #if defined(GRAN)
1398       /* add a ContinueThread event to continue execution of current thread */
1399       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1400                 ContinueThread,
1401                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1402       IF_GRAN_DEBUG(bq, 
1403                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1404                G_EVENTQ(0);
1405                G_CURR_THREADQ(0));
1406 #endif /* GRAN */
1407     }
1408
1409 #if defined(GRAN)
1410   next_thread:
1411     IF_GRAN_DEBUG(unused,
1412                   print_eventq(EventHd));
1413
1414     event = get_next_event();
1415 #elif defined(PAR)
1416   next_thread:
1417     /* ToDo: wait for next message to arrive rather than busy wait */
1418 #endif /* GRAN */
1419
1420   } /* end of while(1) */
1421
1422   IF_PAR_DEBUG(verbose,
1423                belch("== Leaving schedule() after having received Finish"));
1424 }
1425
1426 /* ---------------------------------------------------------------------------
1427  * Singleton fork(). Do not copy any running threads.
1428  * ------------------------------------------------------------------------- */
1429
1430 StgInt forkProcess(StgTSO* tso) {
1431
1432 #ifndef mingw32_TARGET_OS
1433   pid_t pid;
1434   StgTSO* t,*next;
1435
1436   IF_DEBUG(scheduler,sched_belch("forking!"));
1437
1438   pid = fork();
1439   if (pid) { /* parent */
1440
1441   /* just return the pid */
1442     
1443   } else { /* child */
1444   /* wipe all other threads */
1445   run_queue_hd = tso;
1446   tso->link = END_TSO_QUEUE;
1447
1448   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1449      us is picky about finding the threat still in its queue when
1450      handling the deleteThread() */
1451
1452   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1453     next = t->link;
1454     if (t->id != tso->id) {
1455       deleteThread(t);
1456     }
1457   }
1458   }
1459   return pid;
1460 #else /* mingw32 */
1461   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1462   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1463   return -1;
1464 #endif /* mingw32 */
1465 }
1466
1467 /* ---------------------------------------------------------------------------
1468  * deleteAllThreads():  kill all the live threads.
1469  *
1470  * This is used when we catch a user interrupt (^C), before performing
1471  * any necessary cleanups and running finalizers.
1472  *
1473  * Locks: sched_mutex held.
1474  * ------------------------------------------------------------------------- */
1475    
1476 void deleteAllThreads ( void )
1477 {
1478   StgTSO* t, *next;
1479   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1480   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1481       next = t->global_link;
1482       deleteThread(t);
1483   }      
1484   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1485   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1486   sleeping_queue = END_TSO_QUEUE;
1487 }
1488
1489 /* startThread and  insertThread are now in GranSim.c -- HWL */
1490
1491
1492 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1493 //@subsection Suspend and Resume
1494
1495 /* ---------------------------------------------------------------------------
1496  * Suspending & resuming Haskell threads.
1497  * 
1498  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1499  * its capability before calling the C function.  This allows another
1500  * task to pick up the capability and carry on running Haskell
1501  * threads.  It also means that if the C call blocks, it won't lock
1502  * the whole system.
1503  *
1504  * The Haskell thread making the C call is put to sleep for the
1505  * duration of the call, on the susepended_ccalling_threads queue.  We
1506  * give out a token to the task, which it can use to resume the thread
1507  * on return from the C function.
1508  * ------------------------------------------------------------------------- */
1509    
1510 StgInt
1511 suspendThread( StgRegTable *reg, 
1512                rtsBool concCall
1513 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1514                STG_UNUSED
1515 #endif
1516                )
1517 {
1518   nat tok;
1519   Capability *cap;
1520
1521   /* assume that *reg is a pointer to the StgRegTable part
1522    * of a Capability.
1523    */
1524   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1525
1526   ACQUIRE_LOCK(&sched_mutex);
1527
1528   IF_DEBUG(scheduler,
1529            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1530
1531   threadPaused(cap->r.rCurrentTSO);
1532   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1533   suspended_ccalling_threads = cap->r.rCurrentTSO;
1534
1535 #if defined(RTS_SUPPORTS_THREADS)
1536   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1537 #endif
1538
1539   /* Use the thread ID as the token; it should be unique */
1540   tok = cap->r.rCurrentTSO->id;
1541
1542   /* Hand back capability */
1543   releaseCapability(cap);
1544   
1545 #if defined(RTS_SUPPORTS_THREADS)
1546   /* Preparing to leave the RTS, so ensure there's a native thread/task
1547      waiting to take over.
1548      
1549      ToDo: optimise this and only create a new task if there's a need
1550      for one (i.e., if there's only one Concurrent Haskell thread alive,
1551      there's no need to create a new task).
1552   */
1553   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1554   if (concCall) {
1555     startTask(taskStart);
1556   }
1557 #endif
1558
1559   /* Other threads _might_ be available for execution; signal this */
1560   THREAD_RUNNABLE();
1561   RELEASE_LOCK(&sched_mutex);
1562   return tok; 
1563 }
1564
1565 StgRegTable *
1566 resumeThread( StgInt tok,
1567               rtsBool concCall
1568 #if !defined(RTS_SUPPORTS_THREADS)
1569                STG_UNUSED
1570 #endif
1571               )
1572 {
1573   StgTSO *tso, **prev;
1574   Capability *cap;
1575
1576 #if defined(RTS_SUPPORTS_THREADS)
1577   /* Wait for permission to re-enter the RTS with the result. */
1578   if ( concCall ) {
1579     ACQUIRE_LOCK(&sched_mutex);
1580     grabReturnCapability(&sched_mutex, &cap);
1581   } else {
1582     grabCapability(&cap);
1583   }
1584 #else
1585   grabCapability(&cap);
1586 #endif
1587
1588   /* Remove the thread off of the suspended list */
1589   prev = &suspended_ccalling_threads;
1590   for (tso = suspended_ccalling_threads; 
1591        tso != END_TSO_QUEUE; 
1592        prev = &tso->link, tso = tso->link) {
1593     if (tso->id == (StgThreadID)tok) {
1594       *prev = tso->link;
1595       break;
1596     }
1597   }
1598   if (tso == END_TSO_QUEUE) {
1599     barf("resumeThread: thread not found");
1600   }
1601   tso->link = END_TSO_QUEUE;
1602   /* Reset blocking status */
1603   tso->why_blocked  = NotBlocked;
1604
1605   cap->r.rCurrentTSO = tso;
1606   RELEASE_LOCK(&sched_mutex);
1607   return &cap->r;
1608 }
1609
1610
1611 /* ---------------------------------------------------------------------------
1612  * Static functions
1613  * ------------------------------------------------------------------------ */
1614 static void unblockThread(StgTSO *tso);
1615
1616 /* ---------------------------------------------------------------------------
1617  * Comparing Thread ids.
1618  *
1619  * This is used from STG land in the implementation of the
1620  * instances of Eq/Ord for ThreadIds.
1621  * ------------------------------------------------------------------------ */
1622
1623 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1624
1625   StgThreadID id1 = tso1->id; 
1626   StgThreadID id2 = tso2->id;
1627  
1628   if (id1 < id2) return (-1);
1629   if (id1 > id2) return 1;
1630   return 0;
1631 }
1632
1633 /* ---------------------------------------------------------------------------
1634  * Fetching the ThreadID from an StgTSO.
1635  *
1636  * This is used in the implementation of Show for ThreadIds.
1637  * ------------------------------------------------------------------------ */
1638 int rts_getThreadId(const StgTSO *tso) 
1639 {
1640   return tso->id;
1641 }
1642
1643 #ifdef DEBUG
1644 void labelThread(StgTSO *tso, char *label)
1645 {
1646   int len;
1647   void *buf;
1648
1649   /* Caveat: Once set, you can only set the thread name to "" */
1650   len = strlen(label)+1;
1651   buf = realloc(tso->label,len);
1652   if (buf == NULL) {
1653     fprintf(stderr,"insufficient memory for labelThread!\n");
1654     free(tso->label);
1655     tso->label = NULL;
1656   } else
1657     strncpy(buf,label,len);
1658   tso->label = buf;
1659 }
1660 #endif /* DEBUG */
1661
1662 /* ---------------------------------------------------------------------------
1663    Create a new thread.
1664
1665    The new thread starts with the given stack size.  Before the
1666    scheduler can run, however, this thread needs to have a closure
1667    (and possibly some arguments) pushed on its stack.  See
1668    pushClosure() in Schedule.h.
1669
1670    createGenThread() and createIOThread() (in SchedAPI.h) are
1671    convenient packaged versions of this function.
1672
1673    currently pri (priority) is only used in a GRAN setup -- HWL
1674    ------------------------------------------------------------------------ */
1675 //@cindex createThread
1676 #if defined(GRAN)
1677 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1678 StgTSO *
1679 createThread(nat size, StgInt pri)
1680 #else
1681 StgTSO *
1682 createThread(nat size)
1683 #endif
1684 {
1685
1686     StgTSO *tso;
1687     nat stack_size;
1688
1689     /* First check whether we should create a thread at all */
1690 #if defined(PAR)
1691   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1692   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1693     threadsIgnored++;
1694     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1695           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1696     return END_TSO_QUEUE;
1697   }
1698   threadsCreated++;
1699 #endif
1700
1701 #if defined(GRAN)
1702   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1703 #endif
1704
1705   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1706
1707   /* catch ridiculously small stack sizes */
1708   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1709     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1710   }
1711
1712   stack_size = size - TSO_STRUCT_SIZEW;
1713
1714   tso = (StgTSO *)allocate(size);
1715   TICK_ALLOC_TSO(stack_size, 0);
1716
1717   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1718 #if defined(GRAN)
1719   SET_GRAN_HDR(tso, ThisPE);
1720 #endif
1721   tso->what_next     = ThreadEnterGHC;
1722
1723 #ifdef DEBUG
1724   tso->label = NULL;
1725 #endif
1726
1727   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1728    * protect the increment operation on next_thread_id.
1729    * In future, we could use an atomic increment instead.
1730    */
1731   ACQUIRE_LOCK(&thread_id_mutex);
1732   tso->id = next_thread_id++; 
1733   RELEASE_LOCK(&thread_id_mutex);
1734
1735   tso->why_blocked  = NotBlocked;
1736   tso->blocked_exceptions = NULL;
1737
1738   tso->stack_size   = stack_size;
1739   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1740                               - TSO_STRUCT_SIZEW;
1741   tso->sp           = (P_)&(tso->stack) + stack_size;
1742
1743 #ifdef PROFILING
1744   tso->prof.CCCS = CCS_MAIN;
1745 #endif
1746
1747   /* put a stop frame on the stack */
1748   tso->sp -= sizeofW(StgStopFrame);
1749   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1750   tso->su = (StgUpdateFrame*)tso->sp;
1751
1752   // ToDo: check this
1753 #if defined(GRAN)
1754   tso->link = END_TSO_QUEUE;
1755   /* uses more flexible routine in GranSim */
1756   insertThread(tso, CurrentProc);
1757 #else
1758   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1759    * from its creation
1760    */
1761 #endif
1762
1763 #if defined(GRAN) 
1764   if (RtsFlags.GranFlags.GranSimStats.Full) 
1765     DumpGranEvent(GR_START,tso);
1766 #elif defined(PAR)
1767   if (RtsFlags.ParFlags.ParStats.Full) 
1768     DumpGranEvent(GR_STARTQ,tso);
1769   /* HACk to avoid SCHEDULE 
1770      LastTSO = tso; */
1771 #endif
1772
1773   /* Link the new thread on the global thread list.
1774    */
1775   tso->global_link = all_threads;
1776   all_threads = tso;
1777
1778 #if defined(DIST)
1779   tso->dist.priority = MandatoryPriority; //by default that is...
1780 #endif
1781
1782 #if defined(GRAN)
1783   tso->gran.pri = pri;
1784 # if defined(DEBUG)
1785   tso->gran.magic = TSO_MAGIC; // debugging only
1786 # endif
1787   tso->gran.sparkname   = 0;
1788   tso->gran.startedat   = CURRENT_TIME; 
1789   tso->gran.exported    = 0;
1790   tso->gran.basicblocks = 0;
1791   tso->gran.allocs      = 0;
1792   tso->gran.exectime    = 0;
1793   tso->gran.fetchtime   = 0;
1794   tso->gran.fetchcount  = 0;
1795   tso->gran.blocktime   = 0;
1796   tso->gran.blockcount  = 0;
1797   tso->gran.blockedat   = 0;
1798   tso->gran.globalsparks = 0;
1799   tso->gran.localsparks  = 0;
1800   if (RtsFlags.GranFlags.Light)
1801     tso->gran.clock  = Now; /* local clock */
1802   else
1803     tso->gran.clock  = 0;
1804
1805   IF_DEBUG(gran,printTSO(tso));
1806 #elif defined(PAR)
1807 # if defined(DEBUG)
1808   tso->par.magic = TSO_MAGIC; // debugging only
1809 # endif
1810   tso->par.sparkname   = 0;
1811   tso->par.startedat   = CURRENT_TIME; 
1812   tso->par.exported    = 0;
1813   tso->par.basicblocks = 0;
1814   tso->par.allocs      = 0;
1815   tso->par.exectime    = 0;
1816   tso->par.fetchtime   = 0;
1817   tso->par.fetchcount  = 0;
1818   tso->par.blocktime   = 0;
1819   tso->par.blockcount  = 0;
1820   tso->par.blockedat   = 0;
1821   tso->par.globalsparks = 0;
1822   tso->par.localsparks  = 0;
1823 #endif
1824
1825 #if defined(GRAN)
1826   globalGranStats.tot_threads_created++;
1827   globalGranStats.threads_created_on_PE[CurrentProc]++;
1828   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1829   globalGranStats.tot_sq_probes++;
1830 #elif defined(PAR)
1831   // collect parallel global statistics (currently done together with GC stats)
1832   if (RtsFlags.ParFlags.ParStats.Global &&
1833       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1834     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1835     globalParStats.tot_threads_created++;
1836   }
1837 #endif 
1838
1839 #if defined(GRAN)
1840   IF_GRAN_DEBUG(pri,
1841                 belch("==__ schedule: Created TSO %d (%p);",
1842                       CurrentProc, tso, tso->id));
1843 #elif defined(PAR)
1844     IF_PAR_DEBUG(verbose,
1845                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1846                        tso->id, tso, advisory_thread_count));
1847 #else
1848   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1849                                  tso->id, tso->stack_size));
1850 #endif    
1851   return tso;
1852 }
1853
1854 #if defined(PAR)
1855 /* RFP:
1856    all parallel thread creation calls should fall through the following routine.
1857 */
1858 StgTSO *
1859 createSparkThread(rtsSpark spark) 
1860 { StgTSO *tso;
1861   ASSERT(spark != (rtsSpark)NULL);
1862   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1863   { threadsIgnored++;
1864     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1865           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1866     return END_TSO_QUEUE;
1867   }
1868   else
1869   { threadsCreated++;
1870     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1871     if (tso==END_TSO_QUEUE)     
1872       barf("createSparkThread: Cannot create TSO");
1873 #if defined(DIST)
1874     tso->priority = AdvisoryPriority;
1875 #endif
1876     pushClosure(tso,spark);
1877     PUSH_ON_RUN_QUEUE(tso);
1878     advisory_thread_count++;    
1879   }
1880   return tso;
1881 }
1882 #endif
1883
1884 /*
1885   Turn a spark into a thread.
1886   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1887 */
1888 #if defined(PAR)
1889 //@cindex activateSpark
1890 StgTSO *
1891 activateSpark (rtsSpark spark) 
1892 {
1893   StgTSO *tso;
1894
1895   tso = createSparkThread(spark);
1896   if (RtsFlags.ParFlags.ParStats.Full) {   
1897     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1898     IF_PAR_DEBUG(verbose,
1899                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1900                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1901   }
1902   // ToDo: fwd info on local/global spark to thread -- HWL
1903   // tso->gran.exported =  spark->exported;
1904   // tso->gran.locked =   !spark->global;
1905   // tso->gran.sparkname = spark->name;
1906
1907   return tso;
1908 }
1909 #endif
1910
1911 static SchedulerStatus waitThread_(/*out*/StgMainThread* m
1912 #if defined(THREADED_RTS)
1913                                    , rtsBool blockWaiting
1914 #endif
1915                                    );
1916
1917
1918 /* ---------------------------------------------------------------------------
1919  * scheduleThread()
1920  *
1921  * scheduleThread puts a thread on the head of the runnable queue.
1922  * This will usually be done immediately after a thread is created.
1923  * The caller of scheduleThread must create the thread using e.g.
1924  * createThread and push an appropriate closure
1925  * on this thread's stack before the scheduler is invoked.
1926  * ------------------------------------------------------------------------ */
1927
1928 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1929
1930 void
1931 scheduleThread_(StgTSO *tso
1932                , rtsBool createTask
1933 #if !defined(THREADED_RTS)
1934                  STG_UNUSED
1935 #endif
1936               )
1937 {
1938   ACQUIRE_LOCK(&sched_mutex);
1939
1940   /* Put the new thread on the head of the runnable queue.  The caller
1941    * better push an appropriate closure on this thread's stack
1942    * beforehand.  In the SMP case, the thread may start running as
1943    * soon as we release the scheduler lock below.
1944    */
1945   PUSH_ON_RUN_QUEUE(tso);
1946 #if defined(THREADED_RTS)
1947   /* If main() is scheduling a thread, don't bother creating a 
1948    * new task.
1949    */
1950   if ( createTask ) {
1951     startTask(taskStart);
1952   }
1953 #endif
1954   THREAD_RUNNABLE();
1955
1956 #if 0
1957   IF_DEBUG(scheduler,printTSO(tso));
1958 #endif
1959   RELEASE_LOCK(&sched_mutex);
1960 }
1961
1962 void scheduleThread(StgTSO* tso)
1963 {
1964   scheduleThread_(tso, rtsFalse);
1965 }
1966
1967 SchedulerStatus
1968 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
1969 {
1970   StgMainThread *m;
1971
1972   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1973   m->tso = tso;
1974   m->ret = ret;
1975   m->stat = NoStatus;
1976 #if defined(RTS_SUPPORTS_THREADS)
1977   initCondition(&m->wakeup);
1978 #endif
1979
1980   /* Put the thread on the main-threads list prior to scheduling the TSO.
1981      Failure to do so introduces a race condition in the MT case (as
1982      identified by Wolfgang Thaller), whereby the new task/OS thread 
1983      created by scheduleThread_() would complete prior to the thread
1984      that spawned it managed to put 'itself' on the main-threads list.
1985      The upshot of it all being that the worker thread wouldn't get to
1986      signal the completion of the its work item for the main thread to
1987      see (==> it got stuck waiting.)    -- sof 6/02.
1988   */
1989   ACQUIRE_LOCK(&sched_mutex);
1990   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
1991   
1992   m->link = main_threads;
1993   main_threads = m;
1994
1995   /* Inefficient (scheduleThread_() acquires it again right away),
1996    * but obviously correct.
1997    */
1998   RELEASE_LOCK(&sched_mutex);
1999
2000   scheduleThread_(tso, rtsTrue);
2001 #if defined(THREADED_RTS)
2002   return waitThread_(m, rtsTrue);
2003 #else
2004   return waitThread_(m);
2005 #endif
2006 }
2007
2008 /* ---------------------------------------------------------------------------
2009  * initScheduler()
2010  *
2011  * Initialise the scheduler.  This resets all the queues - if the
2012  * queues contained any threads, they'll be garbage collected at the
2013  * next pass.
2014  *
2015  * ------------------------------------------------------------------------ */
2016
2017 #ifdef SMP
2018 static void
2019 term_handler(int sig STG_UNUSED)
2020 {
2021   stat_workerStop();
2022   ACQUIRE_LOCK(&term_mutex);
2023   await_death--;
2024   RELEASE_LOCK(&term_mutex);
2025   shutdownThread();
2026 }
2027 #endif
2028
2029 void 
2030 initScheduler(void)
2031 {
2032 #if defined(GRAN)
2033   nat i;
2034
2035   for (i=0; i<=MAX_PROC; i++) {
2036     run_queue_hds[i]      = END_TSO_QUEUE;
2037     run_queue_tls[i]      = END_TSO_QUEUE;
2038     blocked_queue_hds[i]  = END_TSO_QUEUE;
2039     blocked_queue_tls[i]  = END_TSO_QUEUE;
2040     ccalling_threadss[i]  = END_TSO_QUEUE;
2041     sleeping_queue        = END_TSO_QUEUE;
2042   }
2043 #else
2044   run_queue_hd      = END_TSO_QUEUE;
2045   run_queue_tl      = END_TSO_QUEUE;
2046   blocked_queue_hd  = END_TSO_QUEUE;
2047   blocked_queue_tl  = END_TSO_QUEUE;
2048   sleeping_queue    = END_TSO_QUEUE;
2049 #endif 
2050
2051   suspended_ccalling_threads  = END_TSO_QUEUE;
2052
2053   main_threads = NULL;
2054   all_threads  = END_TSO_QUEUE;
2055
2056   context_switch = 0;
2057   interrupted    = 0;
2058
2059   RtsFlags.ConcFlags.ctxtSwitchTicks =
2060       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2061       
2062 #if defined(RTS_SUPPORTS_THREADS)
2063   /* Initialise the mutex and condition variables used by
2064    * the scheduler. */
2065   initMutex(&sched_mutex);
2066   initMutex(&term_mutex);
2067   initMutex(&thread_id_mutex);
2068
2069   initCondition(&thread_ready_cond);
2070 #endif
2071   
2072 #if defined(SMP)
2073   initCondition(&gc_pending_cond);
2074 #endif
2075
2076 #if defined(RTS_SUPPORTS_THREADS)
2077   ACQUIRE_LOCK(&sched_mutex);
2078 #endif
2079
2080   /* Install the SIGHUP handler */
2081 #if defined(SMP)
2082   {
2083     struct sigaction action,oact;
2084
2085     action.sa_handler = term_handler;
2086     sigemptyset(&action.sa_mask);
2087     action.sa_flags = 0;
2088     if (sigaction(SIGTERM, &action, &oact) != 0) {
2089       barf("can't install TERM handler");
2090     }
2091   }
2092 #endif
2093
2094   /* A capability holds the state a native thread needs in
2095    * order to execute STG code. At least one capability is
2096    * floating around (only SMP builds have more than one).
2097    */
2098   initCapabilities();
2099   
2100 #if defined(RTS_SUPPORTS_THREADS)
2101     /* start our haskell execution tasks */
2102 # if defined(SMP)
2103     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2104 # else
2105     startTaskManager(0,taskStart);
2106 # endif
2107 #endif
2108
2109 #if /* defined(SMP) ||*/ defined(PAR)
2110   initSparkPools();
2111 #endif
2112
2113 #if defined(RTS_SUPPORTS_THREADS)
2114   RELEASE_LOCK(&sched_mutex);
2115 #endif
2116
2117 }
2118
2119 void
2120 exitScheduler( void )
2121 {
2122 #if defined(RTS_SUPPORTS_THREADS)
2123   stopTaskManager();
2124 #endif
2125   shutting_down_scheduler = rtsTrue;
2126 }
2127
2128 /* -----------------------------------------------------------------------------
2129    Managing the per-task allocation areas.
2130    
2131    Each capability comes with an allocation area.  These are
2132    fixed-length block lists into which allocation can be done.
2133
2134    ToDo: no support for two-space collection at the moment???
2135    -------------------------------------------------------------------------- */
2136
2137 /* -----------------------------------------------------------------------------
2138  * waitThread is the external interface for running a new computation
2139  * and waiting for the result.
2140  *
2141  * In the non-SMP case, we create a new main thread, push it on the 
2142  * main-thread stack, and invoke the scheduler to run it.  The
2143  * scheduler will return when the top main thread on the stack has
2144  * completed or died, and fill in the necessary fields of the
2145  * main_thread structure.
2146  *
2147  * In the SMP case, we create a main thread as before, but we then
2148  * create a new condition variable and sleep on it.  When our new
2149  * main thread has completed, we'll be woken up and the status/result
2150  * will be in the main_thread struct.
2151  * -------------------------------------------------------------------------- */
2152
2153 int 
2154 howManyThreadsAvail ( void )
2155 {
2156    int i = 0;
2157    StgTSO* q;
2158    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2159       i++;
2160    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2161       i++;
2162    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2163       i++;
2164    return i;
2165 }
2166
2167 void
2168 finishAllThreads ( void )
2169 {
2170    do {
2171       while (run_queue_hd != END_TSO_QUEUE) {
2172          waitThread ( run_queue_hd, NULL);
2173       }
2174       while (blocked_queue_hd != END_TSO_QUEUE) {
2175          waitThread ( blocked_queue_hd, NULL);
2176       }
2177       while (sleeping_queue != END_TSO_QUEUE) {
2178          waitThread ( blocked_queue_hd, NULL);
2179       }
2180    } while 
2181       (blocked_queue_hd != END_TSO_QUEUE || 
2182        run_queue_hd     != END_TSO_QUEUE ||
2183        sleeping_queue   != END_TSO_QUEUE);
2184 }
2185
2186 SchedulerStatus
2187 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2188
2189   StgMainThread *m;
2190
2191   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2192   m->tso = tso;
2193   m->ret = ret;
2194   m->stat = NoStatus;
2195 #if defined(RTS_SUPPORTS_THREADS)
2196   initCondition(&m->wakeup);
2197 #endif
2198
2199   /* see scheduleWaitThread() comment */
2200   ACQUIRE_LOCK(&sched_mutex);
2201   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2202   m->link = main_threads;
2203   main_threads = m;
2204   RELEASE_LOCK(&sched_mutex);
2205
2206   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2207 #if defined(THREADED_RTS)
2208   return waitThread_(m, rtsFalse);
2209 #else
2210   return waitThread_(m);
2211 #endif
2212 }
2213
2214 static
2215 SchedulerStatus
2216 waitThread_(StgMainThread* m
2217 #if defined(THREADED_RTS)
2218             , rtsBool blockWaiting
2219 #endif
2220            )
2221 {
2222   SchedulerStatus stat;
2223
2224   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2225
2226 #if defined(RTS_SUPPORTS_THREADS)
2227
2228 # if defined(THREADED_RTS)
2229   if (!blockWaiting) {
2230     /* In the threaded case, the OS thread that called main()
2231      * gets to enter the RTS directly without going via another
2232      * task/thread.
2233      */
2234     schedule();
2235     ASSERT(m->stat != NoStatus);
2236   } else 
2237 # endif
2238   {
2239     ACQUIRE_LOCK(&sched_mutex);
2240     do {
2241       waitCondition(&m->wakeup, &sched_mutex);
2242     } while (m->stat == NoStatus);
2243   }
2244 #elif defined(GRAN)
2245   /* GranSim specific init */
2246   CurrentTSO = m->tso;                // the TSO to run
2247   procStatus[MainProc] = Busy;        // status of main PE
2248   CurrentProc = MainProc;             // PE to run it on
2249
2250   schedule();
2251 #else
2252   RELEASE_LOCK(&sched_mutex);
2253   schedule();
2254   ASSERT(m->stat != NoStatus);
2255 #endif
2256
2257   stat = m->stat;
2258
2259 #if defined(RTS_SUPPORTS_THREADS)
2260   closeCondition(&m->wakeup);
2261 #endif
2262
2263   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2264                               m->tso->id));
2265   free(m);
2266
2267 #if defined(THREADED_RTS)
2268   if (blockWaiting) 
2269 #endif
2270     RELEASE_LOCK(&sched_mutex);
2271
2272   return stat;
2273 }
2274
2275 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2276 //@subsection Run queue code 
2277
2278 #if 0
2279 /* 
2280    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2281        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2282        implicit global variable that has to be correct when calling these
2283        fcts -- HWL 
2284 */
2285
2286 /* Put the new thread on the head of the runnable queue.
2287  * The caller of createThread better push an appropriate closure
2288  * on this thread's stack before the scheduler is invoked.
2289  */
2290 static /* inline */ void
2291 add_to_run_queue(tso)
2292 StgTSO* tso; 
2293 {
2294   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2295   tso->link = run_queue_hd;
2296   run_queue_hd = tso;
2297   if (run_queue_tl == END_TSO_QUEUE) {
2298     run_queue_tl = tso;
2299   }
2300 }
2301
2302 /* Put the new thread at the end of the runnable queue. */
2303 static /* inline */ void
2304 push_on_run_queue(tso)
2305 StgTSO* tso; 
2306 {
2307   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2308   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2309   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2310   if (run_queue_hd == END_TSO_QUEUE) {
2311     run_queue_hd = tso;
2312   } else {
2313     run_queue_tl->link = tso;
2314   }
2315   run_queue_tl = tso;
2316 }
2317
2318 /* 
2319    Should be inlined because it's used very often in schedule.  The tso
2320    argument is actually only needed in GranSim, where we want to have the
2321    possibility to schedule *any* TSO on the run queue, irrespective of the
2322    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2323    the run queue and dequeue the tso, adjusting the links in the queue. 
2324 */
2325 //@cindex take_off_run_queue
2326 static /* inline */ StgTSO*
2327 take_off_run_queue(StgTSO *tso) {
2328   StgTSO *t, *prev;
2329
2330   /* 
2331      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2332
2333      if tso is specified, unlink that tso from the run_queue (doesn't have
2334      to be at the beginning of the queue); GranSim only 
2335   */
2336   if (tso!=END_TSO_QUEUE) {
2337     /* find tso in queue */
2338     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2339          t!=END_TSO_QUEUE && t!=tso;
2340          prev=t, t=t->link) 
2341       /* nothing */ ;
2342     ASSERT(t==tso);
2343     /* now actually dequeue the tso */
2344     if (prev!=END_TSO_QUEUE) {
2345       ASSERT(run_queue_hd!=t);
2346       prev->link = t->link;
2347     } else {
2348       /* t is at beginning of thread queue */
2349       ASSERT(run_queue_hd==t);
2350       run_queue_hd = t->link;
2351     }
2352     /* t is at end of thread queue */
2353     if (t->link==END_TSO_QUEUE) {
2354       ASSERT(t==run_queue_tl);
2355       run_queue_tl = prev;
2356     } else {
2357       ASSERT(run_queue_tl!=t);
2358     }
2359     t->link = END_TSO_QUEUE;
2360   } else {
2361     /* take tso from the beginning of the queue; std concurrent code */
2362     t = run_queue_hd;
2363     if (t != END_TSO_QUEUE) {
2364       run_queue_hd = t->link;
2365       t->link = END_TSO_QUEUE;
2366       if (run_queue_hd == END_TSO_QUEUE) {
2367         run_queue_tl = END_TSO_QUEUE;
2368       }
2369     }
2370   }
2371   return t;
2372 }
2373
2374 #endif /* 0 */
2375
2376 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2377 //@subsection Garbage Collextion Routines
2378
2379 /* ---------------------------------------------------------------------------
2380    Where are the roots that we know about?
2381
2382         - all the threads on the runnable queue
2383         - all the threads on the blocked queue
2384         - all the threads on the sleeping queue
2385         - all the thread currently executing a _ccall_GC
2386         - all the "main threads"
2387      
2388    ------------------------------------------------------------------------ */
2389
2390 /* This has to be protected either by the scheduler monitor, or by the
2391         garbage collection monitor (probably the latter).
2392         KH @ 25/10/99
2393 */
2394
2395 void
2396 GetRoots(evac_fn evac)
2397 {
2398 #if defined(GRAN)
2399   {
2400     nat i;
2401     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2402       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2403           evac((StgClosure **)&run_queue_hds[i]);
2404       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2405           evac((StgClosure **)&run_queue_tls[i]);
2406       
2407       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2408           evac((StgClosure **)&blocked_queue_hds[i]);
2409       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2410           evac((StgClosure **)&blocked_queue_tls[i]);
2411       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2412           evac((StgClosure **)&ccalling_threads[i]);
2413     }
2414   }
2415
2416   markEventQueue();
2417
2418 #else /* !GRAN */
2419   if (run_queue_hd != END_TSO_QUEUE) {
2420       ASSERT(run_queue_tl != END_TSO_QUEUE);
2421       evac((StgClosure **)&run_queue_hd);
2422       evac((StgClosure **)&run_queue_tl);
2423   }
2424   
2425   if (blocked_queue_hd != END_TSO_QUEUE) {
2426       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2427       evac((StgClosure **)&blocked_queue_hd);
2428       evac((StgClosure **)&blocked_queue_tl);
2429   }
2430   
2431   if (sleeping_queue != END_TSO_QUEUE) {
2432       evac((StgClosure **)&sleeping_queue);
2433   }
2434 #endif 
2435
2436   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2437       evac((StgClosure **)&suspended_ccalling_threads);
2438   }
2439
2440 #if defined(PAR) || defined(GRAN)
2441   markSparkQueue(evac);
2442 #endif
2443 }
2444
2445 /* -----------------------------------------------------------------------------
2446    performGC
2447
2448    This is the interface to the garbage collector from Haskell land.
2449    We provide this so that external C code can allocate and garbage
2450    collect when called from Haskell via _ccall_GC.
2451
2452    It might be useful to provide an interface whereby the programmer
2453    can specify more roots (ToDo).
2454    
2455    This needs to be protected by the GC condition variable above.  KH.
2456    -------------------------------------------------------------------------- */
2457
2458 void (*extra_roots)(evac_fn);
2459
2460 void
2461 performGC(void)
2462 {
2463   /* Obligated to hold this lock upon entry */
2464   ACQUIRE_LOCK(&sched_mutex);
2465   GarbageCollect(GetRoots,rtsFalse);
2466   RELEASE_LOCK(&sched_mutex);
2467 }
2468
2469 void
2470 performMajorGC(void)
2471 {
2472   ACQUIRE_LOCK(&sched_mutex);
2473   GarbageCollect(GetRoots,rtsTrue);
2474   RELEASE_LOCK(&sched_mutex);
2475 }
2476
2477 static void
2478 AllRoots(evac_fn evac)
2479 {
2480     GetRoots(evac);             // the scheduler's roots
2481     extra_roots(evac);          // the user's roots
2482 }
2483
2484 void
2485 performGCWithRoots(void (*get_roots)(evac_fn))
2486 {
2487   ACQUIRE_LOCK(&sched_mutex);
2488   extra_roots = get_roots;
2489   GarbageCollect(AllRoots,rtsFalse);
2490   RELEASE_LOCK(&sched_mutex);
2491 }
2492
2493 /* -----------------------------------------------------------------------------
2494    Stack overflow
2495
2496    If the thread has reached its maximum stack size, then raise the
2497    StackOverflow exception in the offending thread.  Otherwise
2498    relocate the TSO into a larger chunk of memory and adjust its stack
2499    size appropriately.
2500    -------------------------------------------------------------------------- */
2501
2502 static StgTSO *
2503 threadStackOverflow(StgTSO *tso)
2504 {
2505   nat new_stack_size, new_tso_size, diff, stack_words;
2506   StgPtr new_sp;
2507   StgTSO *dest;
2508
2509   IF_DEBUG(sanity,checkTSO(tso));
2510   if (tso->stack_size >= tso->max_stack_size) {
2511
2512     IF_DEBUG(gc,
2513              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2514                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2515              /* If we're debugging, just print out the top of the stack */
2516              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2517                                               tso->sp+64)));
2518
2519     /* Send this thread the StackOverflow exception */
2520     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2521     return tso;
2522   }
2523
2524   /* Try to double the current stack size.  If that takes us over the
2525    * maximum stack size for this thread, then use the maximum instead.
2526    * Finally round up so the TSO ends up as a whole number of blocks.
2527    */
2528   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2529   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2530                                        TSO_STRUCT_SIZE)/sizeof(W_);
2531   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2532   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2533
2534   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2535
2536   dest = (StgTSO *)allocate(new_tso_size);
2537   TICK_ALLOC_TSO(new_stack_size,0);
2538
2539   /* copy the TSO block and the old stack into the new area */
2540   memcpy(dest,tso,TSO_STRUCT_SIZE);
2541   stack_words = tso->stack + tso->stack_size - tso->sp;
2542   new_sp = (P_)dest + new_tso_size - stack_words;
2543   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2544
2545   /* relocate the stack pointers... */
2546   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2547   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2548   dest->sp    = new_sp;
2549   dest->stack_size = new_stack_size;
2550         
2551   /* and relocate the update frame list */
2552   relocate_stack(dest, diff);
2553
2554   /* Mark the old TSO as relocated.  We have to check for relocated
2555    * TSOs in the garbage collector and any primops that deal with TSOs.
2556    *
2557    * It's important to set the sp and su values to just beyond the end
2558    * of the stack, so we don't attempt to scavenge any part of the
2559    * dead TSO's stack.
2560    */
2561   tso->what_next = ThreadRelocated;
2562   tso->link = dest;
2563   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2564   tso->su = (StgUpdateFrame *)tso->sp;
2565   tso->why_blocked = NotBlocked;
2566   dest->mut_link = NULL;
2567
2568   IF_PAR_DEBUG(verbose,
2569                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2570                      tso->id, tso, tso->stack_size);
2571                /* If we're debugging, just print out the top of the stack */
2572                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2573                                                 tso->sp+64)));
2574   
2575   IF_DEBUG(sanity,checkTSO(tso));
2576 #if 0
2577   IF_DEBUG(scheduler,printTSO(dest));
2578 #endif
2579
2580   return dest;
2581 }
2582
2583 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2584 //@subsection Blocking Queue Routines
2585
2586 /* ---------------------------------------------------------------------------
2587    Wake up a queue that was blocked on some resource.
2588    ------------------------------------------------------------------------ */
2589
2590 #if defined(GRAN)
2591 static inline void
2592 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2593 {
2594 }
2595 #elif defined(PAR)
2596 static inline void
2597 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2598 {
2599   /* write RESUME events to log file and
2600      update blocked and fetch time (depending on type of the orig closure) */
2601   if (RtsFlags.ParFlags.ParStats.Full) {
2602     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2603                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2604                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2605     if (EMPTY_RUN_QUEUE())
2606       emitSchedule = rtsTrue;
2607
2608     switch (get_itbl(node)->type) {
2609         case FETCH_ME_BQ:
2610           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2611           break;
2612         case RBH:
2613         case FETCH_ME:
2614         case BLACKHOLE_BQ:
2615           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2616           break;
2617 #ifdef DIST
2618         case MVAR:
2619           break;
2620 #endif    
2621         default:
2622           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2623         }
2624       }
2625 }
2626 #endif
2627
2628 #if defined(GRAN)
2629 static StgBlockingQueueElement *
2630 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2631 {
2632     StgTSO *tso;
2633     PEs node_loc, tso_loc;
2634
2635     node_loc = where_is(node); // should be lifted out of loop
2636     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2637     tso_loc = where_is((StgClosure *)tso);
2638     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2639       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2640       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2641       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2642       // insertThread(tso, node_loc);
2643       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2644                 ResumeThread,
2645                 tso, node, (rtsSpark*)NULL);
2646       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2647       // len_local++;
2648       // len++;
2649     } else { // TSO is remote (actually should be FMBQ)
2650       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2651                                   RtsFlags.GranFlags.Costs.gunblocktime +
2652                                   RtsFlags.GranFlags.Costs.latency;
2653       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2654                 UnblockThread,
2655                 tso, node, (rtsSpark*)NULL);
2656       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2657       // len++;
2658     }
2659     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2660     IF_GRAN_DEBUG(bq,
2661                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2662                           (node_loc==tso_loc ? "Local" : "Global"), 
2663                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2664     tso->block_info.closure = NULL;
2665     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2666                              tso->id, tso));
2667 }
2668 #elif defined(PAR)
2669 static StgBlockingQueueElement *
2670 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2671 {
2672     StgBlockingQueueElement *next;
2673
2674     switch (get_itbl(bqe)->type) {
2675     case TSO:
2676       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2677       /* if it's a TSO just push it onto the run_queue */
2678       next = bqe->link;
2679       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2680       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2681       THREAD_RUNNABLE();
2682       unblockCount(bqe, node);
2683       /* reset blocking status after dumping event */
2684       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2685       break;
2686
2687     case BLOCKED_FETCH:
2688       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2689       next = bqe->link;
2690       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2691       PendingFetches = (StgBlockedFetch *)bqe;
2692       break;
2693
2694 # if defined(DEBUG)
2695       /* can ignore this case in a non-debugging setup; 
2696          see comments on RBHSave closures above */
2697     case CONSTR:
2698       /* check that the closure is an RBHSave closure */
2699       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2700              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2701              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2702       break;
2703
2704     default:
2705       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2706            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2707            (StgClosure *)bqe);
2708 # endif
2709     }
2710   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2711   return next;
2712 }
2713
2714 #else /* !GRAN && !PAR */
2715 static StgTSO *
2716 unblockOneLocked(StgTSO *tso)
2717 {
2718   StgTSO *next;
2719
2720   ASSERT(get_itbl(tso)->type == TSO);
2721   ASSERT(tso->why_blocked != NotBlocked);
2722   tso->why_blocked = NotBlocked;
2723   next = tso->link;
2724   PUSH_ON_RUN_QUEUE(tso);
2725   THREAD_RUNNABLE();
2726   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2727   return next;
2728 }
2729 #endif
2730
2731 #if defined(GRAN) || defined(PAR)
2732 inline StgBlockingQueueElement *
2733 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2734 {
2735   ACQUIRE_LOCK(&sched_mutex);
2736   bqe = unblockOneLocked(bqe, node);
2737   RELEASE_LOCK(&sched_mutex);
2738   return bqe;
2739 }
2740 #else
2741 inline StgTSO *
2742 unblockOne(StgTSO *tso)
2743 {
2744   ACQUIRE_LOCK(&sched_mutex);
2745   tso = unblockOneLocked(tso);
2746   RELEASE_LOCK(&sched_mutex);
2747   return tso;
2748 }
2749 #endif
2750
2751 #if defined(GRAN)
2752 void 
2753 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2754 {
2755   StgBlockingQueueElement *bqe;
2756   PEs node_loc;
2757   nat len = 0; 
2758
2759   IF_GRAN_DEBUG(bq, 
2760                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2761                       node, CurrentProc, CurrentTime[CurrentProc], 
2762                       CurrentTSO->id, CurrentTSO));
2763
2764   node_loc = where_is(node);
2765
2766   ASSERT(q == END_BQ_QUEUE ||
2767          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2768          get_itbl(q)->type == CONSTR); // closure (type constructor)
2769   ASSERT(is_unique(node));
2770
2771   /* FAKE FETCH: magically copy the node to the tso's proc;
2772      no Fetch necessary because in reality the node should not have been 
2773      moved to the other PE in the first place
2774   */
2775   if (CurrentProc!=node_loc) {
2776     IF_GRAN_DEBUG(bq, 
2777                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2778                         node, node_loc, CurrentProc, CurrentTSO->id, 
2779                         // CurrentTSO, where_is(CurrentTSO),
2780                         node->header.gran.procs));
2781     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2782     IF_GRAN_DEBUG(bq, 
2783                   belch("## new bitmask of node %p is %#x",
2784                         node, node->header.gran.procs));
2785     if (RtsFlags.GranFlags.GranSimStats.Global) {
2786       globalGranStats.tot_fake_fetches++;
2787     }
2788   }
2789
2790   bqe = q;
2791   // ToDo: check: ASSERT(CurrentProc==node_loc);
2792   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2793     //next = bqe->link;
2794     /* 
2795        bqe points to the current element in the queue
2796        next points to the next element in the queue
2797     */
2798     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2799     //tso_loc = where_is(tso);
2800     len++;
2801     bqe = unblockOneLocked(bqe, node);
2802   }
2803
2804   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2805      the closure to make room for the anchor of the BQ */
2806   if (bqe!=END_BQ_QUEUE) {
2807     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2808     /*
2809     ASSERT((info_ptr==&RBH_Save_0_info) ||
2810            (info_ptr==&RBH_Save_1_info) ||
2811            (info_ptr==&RBH_Save_2_info));
2812     */
2813     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2814     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2815     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2816
2817     IF_GRAN_DEBUG(bq,
2818                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2819                         node, info_type(node)));
2820   }
2821
2822   /* statistics gathering */
2823   if (RtsFlags.GranFlags.GranSimStats.Global) {
2824     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2825     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2826     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2827     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2828   }
2829   IF_GRAN_DEBUG(bq,
2830                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2831                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2832 }
2833 #elif defined(PAR)
2834 void 
2835 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2836 {
2837   StgBlockingQueueElement *bqe;
2838
2839   ACQUIRE_LOCK(&sched_mutex);
2840
2841   IF_PAR_DEBUG(verbose, 
2842                belch("##-_ AwBQ for node %p on [%x]: ",
2843                      node, mytid));
2844 #ifdef DIST  
2845   //RFP
2846   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2847     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2848     return;
2849   }
2850 #endif
2851   
2852   ASSERT(q == END_BQ_QUEUE ||
2853          get_itbl(q)->type == TSO ||           
2854          get_itbl(q)->type == BLOCKED_FETCH || 
2855          get_itbl(q)->type == CONSTR); 
2856
2857   bqe = q;
2858   while (get_itbl(bqe)->type==TSO || 
2859          get_itbl(bqe)->type==BLOCKED_FETCH) {
2860     bqe = unblockOneLocked(bqe, node);
2861   }
2862   RELEASE_LOCK(&sched_mutex);
2863 }
2864
2865 #else   /* !GRAN && !PAR */
2866 void
2867 awakenBlockedQueue(StgTSO *tso)
2868 {
2869   ACQUIRE_LOCK(&sched_mutex);
2870   while (tso != END_TSO_QUEUE) {
2871     tso = unblockOneLocked(tso);
2872   }
2873   RELEASE_LOCK(&sched_mutex);
2874 }
2875 #endif
2876
2877 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2878 //@subsection Exception Handling Routines
2879
2880 /* ---------------------------------------------------------------------------
2881    Interrupt execution
2882    - usually called inside a signal handler so it mustn't do anything fancy.   
2883    ------------------------------------------------------------------------ */
2884
2885 void
2886 interruptStgRts(void)
2887 {
2888     interrupted    = 1;
2889     context_switch = 1;
2890 }
2891
2892 /* -----------------------------------------------------------------------------
2893    Unblock a thread
2894
2895    This is for use when we raise an exception in another thread, which
2896    may be blocked.
2897    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2898    -------------------------------------------------------------------------- */
2899
2900 #if defined(GRAN) || defined(PAR)
2901 /*
2902   NB: only the type of the blocking queue is different in GranSim and GUM
2903       the operations on the queue-elements are the same
2904       long live polymorphism!
2905
2906   Locks: sched_mutex is held upon entry and exit.
2907
2908 */
2909 static void
2910 unblockThread(StgTSO *tso)
2911 {
2912   StgBlockingQueueElement *t, **last;
2913
2914   switch (tso->why_blocked) {
2915
2916   case NotBlocked:
2917     return;  /* not blocked */
2918
2919   case BlockedOnMVar:
2920     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2921     {
2922       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2923       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2924
2925       last = (StgBlockingQueueElement **)&mvar->head;
2926       for (t = (StgBlockingQueueElement *)mvar->head; 
2927            t != END_BQ_QUEUE; 
2928            last = &t->link, last_tso = t, t = t->link) {
2929         if (t == (StgBlockingQueueElement *)tso) {
2930           *last = (StgBlockingQueueElement *)tso->link;
2931           if (mvar->tail == tso) {
2932             mvar->tail = (StgTSO *)last_tso;
2933           }
2934           goto done;
2935         }
2936       }
2937       barf("unblockThread (MVAR): TSO not found");
2938     }
2939
2940   case BlockedOnBlackHole:
2941     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2942     {
2943       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2944
2945       last = &bq->blocking_queue;
2946       for (t = bq->blocking_queue; 
2947            t != END_BQ_QUEUE; 
2948            last = &t->link, t = t->link) {
2949         if (t == (StgBlockingQueueElement *)tso) {
2950           *last = (StgBlockingQueueElement *)tso->link;
2951           goto done;
2952         }
2953       }
2954       barf("unblockThread (BLACKHOLE): TSO not found");
2955     }
2956
2957   case BlockedOnException:
2958     {
2959       StgTSO *target  = tso->block_info.tso;
2960
2961       ASSERT(get_itbl(target)->type == TSO);
2962
2963       if (target->what_next == ThreadRelocated) {
2964           target = target->link;
2965           ASSERT(get_itbl(target)->type == TSO);
2966       }
2967
2968       ASSERT(target->blocked_exceptions != NULL);
2969
2970       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2971       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2972            t != END_BQ_QUEUE; 
2973            last = &t->link, t = t->link) {
2974         ASSERT(get_itbl(t)->type == TSO);
2975         if (t == (StgBlockingQueueElement *)tso) {
2976           *last = (StgBlockingQueueElement *)tso->link;
2977           goto done;
2978         }
2979       }
2980       barf("unblockThread (Exception): TSO not found");
2981     }
2982
2983   case BlockedOnRead:
2984   case BlockedOnWrite:
2985     {
2986       /* take TSO off blocked_queue */
2987       StgBlockingQueueElement *prev = NULL;
2988       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2989            prev = t, t = t->link) {
2990         if (t == (StgBlockingQueueElement *)tso) {
2991           if (prev == NULL) {
2992             blocked_queue_hd = (StgTSO *)t->link;
2993             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2994               blocked_queue_tl = END_TSO_QUEUE;
2995             }
2996           } else {
2997             prev->link = t->link;
2998             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2999               blocked_queue_tl = (StgTSO *)prev;
3000             }
3001           }
3002           goto done;
3003         }
3004       }
3005       barf("unblockThread (I/O): TSO not found");
3006     }
3007
3008   case BlockedOnDelay:
3009     {
3010       /* take TSO off sleeping_queue */
3011       StgBlockingQueueElement *prev = NULL;
3012       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3013            prev = t, t = t->link) {
3014         if (t == (StgBlockingQueueElement *)tso) {
3015           if (prev == NULL) {
3016             sleeping_queue = (StgTSO *)t->link;
3017           } else {
3018             prev->link = t->link;
3019           }
3020           goto done;
3021         }
3022       }
3023       barf("unblockThread (I/O): TSO not found");
3024     }
3025
3026   default:
3027     barf("unblockThread");
3028   }
3029
3030  done:
3031   tso->link = END_TSO_QUEUE;
3032   tso->why_blocked = NotBlocked;
3033   tso->block_info.closure = NULL;
3034   PUSH_ON_RUN_QUEUE(tso);
3035 }
3036 #else
3037 static void
3038 unblockThread(StgTSO *tso)
3039 {
3040   StgTSO *t, **last;
3041   
3042   /* To avoid locking unnecessarily. */
3043   if (tso->why_blocked == NotBlocked) {
3044     return;
3045   }
3046
3047   switch (tso->why_blocked) {
3048
3049   case BlockedOnMVar:
3050     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3051     {
3052       StgTSO *last_tso = END_TSO_QUEUE;
3053       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3054
3055       last = &mvar->head;
3056       for (t = mvar->head; t != END_TSO_QUEUE; 
3057            last = &t->link, last_tso = t, t = t->link) {
3058         if (t == tso) {
3059           *last = tso->link;
3060           if (mvar->tail == tso) {
3061             mvar->tail = last_tso;
3062           }
3063           goto done;
3064         }
3065       }
3066       barf("unblockThread (MVAR): TSO not found");
3067     }
3068
3069   case BlockedOnBlackHole:
3070     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3071     {
3072       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3073
3074       last = &bq->blocking_queue;
3075       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3076            last = &t->link, t = t->link) {
3077         if (t == tso) {
3078           *last = tso->link;
3079           goto done;
3080         }
3081       }
3082       barf("unblockThread (BLACKHOLE): TSO not found");
3083     }
3084
3085   case BlockedOnException:
3086     {
3087       StgTSO *target  = tso->block_info.tso;
3088
3089       ASSERT(get_itbl(target)->type == TSO);
3090
3091       while (target->what_next == ThreadRelocated) {
3092           target = target->link;
3093           ASSERT(get_itbl(target)->type == TSO);
3094       }
3095       
3096       ASSERT(target->blocked_exceptions != NULL);
3097
3098       last = &target->blocked_exceptions;
3099       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3100            last = &t->link, t = t->link) {
3101         ASSERT(get_itbl(t)->type == TSO);
3102         if (t == tso) {
3103           *last = tso->link;
3104           goto done;
3105         }
3106       }
3107       barf("unblockThread (Exception): TSO not found");
3108     }
3109
3110   case BlockedOnRead:
3111   case BlockedOnWrite:
3112     {
3113       StgTSO *prev = NULL;
3114       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3115            prev = t, t = t->link) {
3116         if (t == tso) {
3117           if (prev == NULL) {
3118             blocked_queue_hd = t->link;
3119             if (blocked_queue_tl == t) {
3120               blocked_queue_tl = END_TSO_QUEUE;
3121             }
3122           } else {
3123             prev->link = t->link;
3124             if (blocked_queue_tl == t) {
3125               blocked_queue_tl = prev;
3126             }
3127           }
3128           goto done;
3129         }
3130       }
3131       barf("unblockThread (I/O): TSO not found");
3132     }
3133
3134   case BlockedOnDelay:
3135     {
3136       StgTSO *prev = NULL;
3137       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3138            prev = t, t = t->link) {
3139         if (t == tso) {
3140           if (prev == NULL) {
3141             sleeping_queue = t->link;
3142           } else {
3143             prev->link = t->link;
3144           }
3145           goto done;
3146         }
3147       }
3148       barf("unblockThread (I/O): TSO not found");
3149     }
3150
3151   default:
3152     barf("unblockThread");
3153   }
3154
3155  done:
3156   tso->link = END_TSO_QUEUE;
3157   tso->why_blocked = NotBlocked;
3158   tso->block_info.closure = NULL;
3159   PUSH_ON_RUN_QUEUE(tso);
3160 }
3161 #endif
3162
3163 /* -----------------------------------------------------------------------------
3164  * raiseAsync()
3165  *
3166  * The following function implements the magic for raising an
3167  * asynchronous exception in an existing thread.
3168  *
3169  * We first remove the thread from any queue on which it might be
3170  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3171  *
3172  * We strip the stack down to the innermost CATCH_FRAME, building
3173  * thunks in the heap for all the active computations, so they can 
3174  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3175  * an application of the handler to the exception, and push it on
3176  * the top of the stack.
3177  * 
3178  * How exactly do we save all the active computations?  We create an
3179  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3180  * AP_UPDs pushes everything from the corresponding update frame
3181  * upwards onto the stack.  (Actually, it pushes everything up to the
3182  * next update frame plus a pointer to the next AP_UPD object.
3183  * Entering the next AP_UPD object pushes more onto the stack until we
3184  * reach the last AP_UPD object - at which point the stack should look
3185  * exactly as it did when we killed the TSO and we can continue
3186  * execution by entering the closure on top of the stack.
3187  *
3188  * We can also kill a thread entirely - this happens if either (a) the 
3189  * exception passed to raiseAsync is NULL, or (b) there's no
3190  * CATCH_FRAME on the stack.  In either case, we strip the entire
3191  * stack and replace the thread with a zombie.
3192  *
3193  * Locks: sched_mutex held upon entry nor exit.
3194  *
3195  * -------------------------------------------------------------------------- */
3196  
3197 void 
3198 deleteThread(StgTSO *tso)
3199 {
3200   raiseAsync(tso,NULL);
3201 }
3202
3203 void
3204 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3205 {
3206   /* When raising async exs from contexts where sched_mutex isn't held;
3207      use raiseAsyncWithLock(). */
3208   ACQUIRE_LOCK(&sched_mutex);
3209   raiseAsync(tso,exception);
3210   RELEASE_LOCK(&sched_mutex);
3211 }
3212
3213 void
3214 raiseAsync(StgTSO *tso, StgClosure *exception)
3215 {
3216   StgUpdateFrame* su = tso->su;
3217   StgPtr          sp = tso->sp;
3218   
3219   /* Thread already dead? */
3220   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3221     return;
3222   }
3223
3224   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3225
3226   /* Remove it from any blocking queues */
3227   unblockThread(tso);
3228
3229   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3230   /* The stack freezing code assumes there's a closure pointer on
3231    * the top of the stack.  This isn't always the case with compiled
3232    * code, so we have to push a dummy closure on the top which just
3233    * returns to the next return address on the stack.
3234    */
3235   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3236     *(--sp) = (W_)&stg_dummy_ret_closure;
3237   }
3238
3239   while (1) {
3240     nat words = ((P_)su - (P_)sp) - 1;
3241     nat i;
3242     StgAP_UPD * ap;
3243
3244     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3245      * then build the THUNK raise(exception), and leave it on
3246      * top of the CATCH_FRAME ready to enter.
3247      */
3248     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3249 #ifdef PROFILING
3250       StgCatchFrame *cf = (StgCatchFrame *)su;
3251 #endif
3252       StgClosure *raise;
3253
3254       /* we've got an exception to raise, so let's pass it to the
3255        * handler in this frame.
3256        */
3257       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3258       TICK_ALLOC_SE_THK(1,0);
3259       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3260       raise->payload[0] = exception;
3261
3262       /* throw away the stack from Sp up to the CATCH_FRAME.
3263        */
3264       sp = (P_)su - 1;
3265
3266       /* Ensure that async excpetions are blocked now, so we don't get
3267        * a surprise exception before we get around to executing the
3268        * handler.
3269        */
3270       if (tso->blocked_exceptions == NULL) {
3271           tso->blocked_exceptions = END_TSO_QUEUE;
3272       }
3273
3274       /* Put the newly-built THUNK on top of the stack, ready to execute
3275        * when the thread restarts.
3276        */
3277       sp[0] = (W_)raise;
3278       tso->sp = sp;
3279       tso->su = su;
3280       tso->what_next = ThreadEnterGHC;
3281       IF_DEBUG(sanity, checkTSO(tso));
3282       return;
3283     }
3284
3285     /* First build an AP_UPD consisting of the stack chunk above the
3286      * current update frame, with the top word on the stack as the
3287      * fun field.
3288      */
3289     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3290     
3291     ASSERT(words >= 0);
3292     
3293     ap->n_args = words;
3294     ap->fun    = (StgClosure *)sp[0];
3295     sp++;
3296     for(i=0; i < (nat)words; ++i) {
3297       ap->payload[i] = (StgClosure *)*sp++;
3298     }
3299     
3300     switch (get_itbl(su)->type) {
3301       
3302     case UPDATE_FRAME:
3303       {
3304         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3305         TICK_ALLOC_UP_THK(words+1,0);
3306         
3307         IF_DEBUG(scheduler,
3308                  fprintf(stderr,  "scheduler: Updating ");
3309                  printPtr((P_)su->updatee); 
3310                  fprintf(stderr,  " with ");
3311                  printObj((StgClosure *)ap);
3312                  );
3313         
3314         /* Replace the updatee with an indirection - happily
3315          * this will also wake up any threads currently
3316          * waiting on the result.
3317          *
3318          * Warning: if we're in a loop, more than one update frame on
3319          * the stack may point to the same object.  Be careful not to
3320          * overwrite an IND_OLDGEN in this case, because we'll screw
3321          * up the mutable lists.  To be on the safe side, don't
3322          * overwrite any kind of indirection at all.  See also
3323          * threadSqueezeStack in GC.c, where we have to make a similar
3324          * check.
3325          */
3326         if (!closure_IND(su->updatee)) {
3327             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3328         }
3329         su = su->link;
3330         sp += sizeofW(StgUpdateFrame) -1;
3331         sp[0] = (W_)ap; /* push onto stack */
3332         break;
3333       }
3334
3335     case CATCH_FRAME:
3336       {
3337         StgCatchFrame *cf = (StgCatchFrame *)su;
3338         StgClosure* o;
3339         
3340         /* We want a PAP, not an AP_UPD.  Fortunately, the
3341          * layout's the same.
3342          */
3343         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3344         TICK_ALLOC_UPD_PAP(words+1,0);
3345         
3346         /* now build o = FUN(catch,ap,handler) */
3347         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3348         TICK_ALLOC_FUN(2,0);
3349         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3350         o->payload[0] = (StgClosure *)ap;
3351         o->payload[1] = cf->handler;
3352         
3353         IF_DEBUG(scheduler,
3354                  fprintf(stderr,  "scheduler: Built ");
3355                  printObj((StgClosure *)o);
3356                  );
3357         
3358         /* pop the old handler and put o on the stack */
3359         su = cf->link;
3360         sp += sizeofW(StgCatchFrame) - 1;
3361         sp[0] = (W_)o;
3362         break;
3363       }
3364       
3365     case SEQ_FRAME:
3366       {
3367         StgSeqFrame *sf = (StgSeqFrame *)su;
3368         StgClosure* o;
3369         
3370         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3371         TICK_ALLOC_UPD_PAP(words+1,0);
3372         
3373         /* now build o = FUN(seq,ap) */
3374         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3375         TICK_ALLOC_SE_THK(1,0);
3376         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3377         o->payload[0] = (StgClosure *)ap;
3378         
3379         IF_DEBUG(scheduler,
3380                  fprintf(stderr,  "scheduler: Built ");
3381                  printObj((StgClosure *)o);
3382                  );
3383         
3384         /* pop the old handler and put o on the stack */
3385         su = sf->link;
3386         sp += sizeofW(StgSeqFrame) - 1;
3387         sp[0] = (W_)o;
3388         break;
3389       }
3390       
3391     case STOP_FRAME:
3392       /* We've stripped the entire stack, the thread is now dead. */
3393       sp += sizeofW(StgStopFrame) - 1;
3394       sp[0] = (W_)exception;    /* save the exception */
3395       tso->what_next = ThreadKilled;
3396       tso->su = (StgUpdateFrame *)(sp+1);
3397       tso->sp = sp;
3398       return;
3399
3400     default:
3401       barf("raiseAsync");
3402     }
3403   }
3404   barf("raiseAsync");
3405 }
3406
3407 /* -----------------------------------------------------------------------------
3408    resurrectThreads is called after garbage collection on the list of
3409    threads found to be garbage.  Each of these threads will be woken
3410    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3411    on an MVar, or NonTermination if the thread was blocked on a Black
3412    Hole.
3413
3414    Locks: sched_mutex isn't held upon entry nor exit.
3415    -------------------------------------------------------------------------- */
3416
3417 void
3418 resurrectThreads( StgTSO *threads )
3419 {
3420   StgTSO *tso, *next;
3421
3422   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3423     next = tso->global_link;
3424     tso->global_link = all_threads;
3425     all_threads = tso;
3426     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3427
3428     switch (tso->why_blocked) {
3429     case BlockedOnMVar:
3430     case BlockedOnException:
3431       /* Called by GC - sched_mutex lock is currently held. */
3432       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3433       break;
3434     case BlockedOnBlackHole:
3435       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3436       break;
3437     case NotBlocked:
3438       /* This might happen if the thread was blocked on a black hole
3439        * belonging to a thread that we've just woken up (raiseAsync
3440        * can wake up threads, remember...).
3441        */
3442       continue;
3443     default:
3444       barf("resurrectThreads: thread blocked in a strange way");
3445     }
3446   }
3447 }
3448
3449 /* -----------------------------------------------------------------------------
3450  * Blackhole detection: if we reach a deadlock, test whether any
3451  * threads are blocked on themselves.  Any threads which are found to
3452  * be self-blocked get sent a NonTermination exception.
3453  *
3454  * This is only done in a deadlock situation in order to avoid
3455  * performance overhead in the normal case.
3456  *
3457  * Locks: sched_mutex is held upon entry and exit.
3458  * -------------------------------------------------------------------------- */
3459
3460 static void
3461 detectBlackHoles( void )
3462 {
3463     StgTSO *t = all_threads;
3464     StgUpdateFrame *frame;
3465     StgClosure *blocked_on;
3466
3467     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3468
3469         while (t->what_next == ThreadRelocated) {
3470             t = t->link;
3471             ASSERT(get_itbl(t)->type == TSO);
3472         }
3473       
3474         if (t->why_blocked != BlockedOnBlackHole) {
3475             continue;
3476         }
3477
3478         blocked_on = t->block_info.closure;
3479
3480         for (frame = t->su; ; frame = frame->link) {
3481             switch (get_itbl(frame)->type) {
3482
3483             case UPDATE_FRAME:
3484                 if (frame->updatee == blocked_on) {
3485                     /* We are blocking on one of our own computations, so
3486                      * send this thread the NonTermination exception.  
3487                      */
3488                     IF_DEBUG(scheduler, 
3489                              sched_belch("thread %d is blocked on itself", t->id));
3490                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3491                     goto done;
3492                 }
3493                 else {
3494                     continue;
3495                 }
3496
3497             case CATCH_FRAME:
3498             case SEQ_FRAME:
3499                 continue;
3500                 
3501             case STOP_FRAME:
3502                 break;
3503             }
3504             break;
3505         }
3506
3507     done: ;
3508     }   
3509 }
3510
3511 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3512 //@subsection Debugging Routines
3513
3514 /* -----------------------------------------------------------------------------
3515    Debugging: why is a thread blocked
3516    -------------------------------------------------------------------------- */
3517
3518 #ifdef DEBUG
3519
3520 void
3521 printThreadBlockage(StgTSO *tso)
3522 {
3523   switch (tso->why_blocked) {
3524   case BlockedOnRead:
3525     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3526     break;
3527   case BlockedOnWrite:
3528     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3529     break;
3530   case BlockedOnDelay:
3531     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3532     break;
3533   case BlockedOnMVar:
3534     fprintf(stderr,"is blocked on an MVar");
3535     break;
3536   case BlockedOnException:
3537     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3538             tso->block_info.tso->id);
3539     break;
3540   case BlockedOnBlackHole:
3541     fprintf(stderr,"is blocked on a black hole");
3542     break;
3543   case NotBlocked:
3544     fprintf(stderr,"is not blocked");
3545     break;
3546 #if defined(PAR)
3547   case BlockedOnGA:
3548     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3549             tso->block_info.closure, info_type(tso->block_info.closure));
3550     break;
3551   case BlockedOnGA_NoSend:
3552     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3553             tso->block_info.closure, info_type(tso->block_info.closure));
3554     break;
3555 #endif
3556 #if defined(RTS_SUPPORTS_THREADS)
3557   case BlockedOnCCall:
3558     fprintf(stderr,"is blocked on an external call");
3559     break;
3560 #endif
3561   default:
3562     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3563          tso->why_blocked, tso->id, tso);
3564   }
3565 }
3566
3567 void
3568 printThreadStatus(StgTSO *tso)
3569 {
3570   switch (tso->what_next) {
3571   case ThreadKilled:
3572     fprintf(stderr,"has been killed");
3573     break;
3574   case ThreadComplete:
3575     fprintf(stderr,"has completed");
3576     break;
3577   default:
3578     printThreadBlockage(tso);
3579   }
3580 }
3581
3582 void
3583 printAllThreads(void)
3584 {
3585   StgTSO *t;
3586
3587 # if defined(GRAN)
3588   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3589   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3590                        time_string, rtsFalse/*no commas!*/);
3591
3592   sched_belch("all threads at [%s]:", time_string);
3593 # elif defined(PAR)
3594   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3595   ullong_format_string(CURRENT_TIME,
3596                        time_string, rtsFalse/*no commas!*/);
3597
3598   sched_belch("all threads at [%s]:", time_string);
3599 # else
3600   sched_belch("all threads:");
3601 # endif
3602
3603   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3604     fprintf(stderr, "\tthread %d ", t->id);
3605     if (t->label) fprintf(stderr,"[\"%s\"] ",t->label);
3606     printThreadStatus(t);
3607     fprintf(stderr,"\n");
3608   }
3609 }
3610     
3611 /* 
3612    Print a whole blocking queue attached to node (debugging only).
3613 */
3614 //@cindex print_bq
3615 # if defined(PAR)
3616 void 
3617 print_bq (StgClosure *node)
3618 {
3619   StgBlockingQueueElement *bqe;
3620   StgTSO *tso;
3621   rtsBool end;
3622
3623   fprintf(stderr,"## BQ of closure %p (%s): ",
3624           node, info_type(node));
3625
3626   /* should cover all closures that may have a blocking queue */
3627   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3628          get_itbl(node)->type == FETCH_ME_BQ ||
3629          get_itbl(node)->type == RBH ||
3630          get_itbl(node)->type == MVAR);
3631     
3632   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3633
3634   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3635 }
3636
3637 /* 
3638    Print a whole blocking queue starting with the element bqe.
3639 */
3640 void 
3641 print_bqe (StgBlockingQueueElement *bqe)
3642 {
3643   rtsBool end;
3644
3645   /* 
3646      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3647   */
3648   for (end = (bqe==END_BQ_QUEUE);
3649        !end; // iterate until bqe points to a CONSTR
3650        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3651        bqe = end ? END_BQ_QUEUE : bqe->link) {
3652     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3653     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3654     /* types of closures that may appear in a blocking queue */
3655     ASSERT(get_itbl(bqe)->type == TSO ||           
3656            get_itbl(bqe)->type == BLOCKED_FETCH || 
3657            get_itbl(bqe)->type == CONSTR); 
3658     /* only BQs of an RBH end with an RBH_Save closure */
3659     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3660
3661     switch (get_itbl(bqe)->type) {
3662     case TSO:
3663       fprintf(stderr," TSO %u (%x),",
3664               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3665       break;
3666     case BLOCKED_FETCH:
3667       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3668               ((StgBlockedFetch *)bqe)->node, 
3669               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3670               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3671               ((StgBlockedFetch *)bqe)->ga.weight);
3672       break;
3673     case CONSTR:
3674       fprintf(stderr," %s (IP %p),",
3675               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3676                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3677                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3678                "RBH_Save_?"), get_itbl(bqe));
3679       break;
3680     default:
3681       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3682            info_type((StgClosure *)bqe)); // , node, info_type(node));
3683       break;
3684     }
3685   } /* for */
3686   fputc('\n', stderr);
3687 }
3688 # elif defined(GRAN)
3689 void 
3690 print_bq (StgClosure *node)
3691 {
3692   StgBlockingQueueElement *bqe;
3693   PEs node_loc, tso_loc;
3694   rtsBool end;
3695
3696   /* should cover all closures that may have a blocking queue */
3697   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3698          get_itbl(node)->type == FETCH_ME_BQ ||
3699          get_itbl(node)->type == RBH);
3700     
3701   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3702   node_loc = where_is(node);
3703
3704   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3705           node, info_type(node), node_loc);
3706
3707   /* 
3708      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3709   */
3710   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3711        !end; // iterate until bqe points to a CONSTR
3712        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3713     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3714     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3715     /* types of closures that may appear in a blocking queue */
3716     ASSERT(get_itbl(bqe)->type == TSO ||           
3717            get_itbl(bqe)->type == CONSTR); 
3718     /* only BQs of an RBH end with an RBH_Save closure */
3719     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3720
3721     tso_loc = where_is((StgClosure *)bqe);
3722     switch (get_itbl(bqe)->type) {
3723     case TSO:
3724       fprintf(stderr," TSO %d (%p) on [PE %d],",
3725               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3726       break;
3727     case CONSTR:
3728       fprintf(stderr," %s (IP %p),",
3729               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3730                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3731                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3732                "RBH_Save_?"), get_itbl(bqe));
3733       break;
3734     default:
3735       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3736            info_type((StgClosure *)bqe), node, info_type(node));
3737       break;
3738     }
3739   } /* for */
3740   fputc('\n', stderr);
3741 }
3742 #else
3743 /* 
3744    Nice and easy: only TSOs on the blocking queue
3745 */
3746 void 
3747 print_bq (StgClosure *node)
3748 {
3749   StgTSO *tso;
3750
3751   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3752   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3753        tso != END_TSO_QUEUE; 
3754        tso=tso->link) {
3755     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3756     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3757     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3758   }
3759   fputc('\n', stderr);
3760 }
3761 # endif
3762
3763 #if defined(PAR)
3764 static nat
3765 run_queue_len(void)
3766 {
3767   nat i;
3768   StgTSO *tso;
3769
3770   for (i=0, tso=run_queue_hd; 
3771        tso != END_TSO_QUEUE;
3772        i++, tso=tso->link)
3773     /* nothing */
3774
3775   return i;
3776 }
3777 #endif
3778
3779 static void
3780 sched_belch(char *s, ...)
3781 {
3782   va_list ap;
3783   va_start(ap,s);
3784 #ifdef SMP
3785   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3786 #elif defined(PAR)
3787   fprintf(stderr, "== ");
3788 #else
3789   fprintf(stderr, "scheduler: ");
3790 #endif
3791   vfprintf(stderr, s, ap);
3792   fprintf(stderr, "\n");
3793   va_end(ap);
3794 }
3795
3796 #endif /* DEBUG */
3797
3798
3799 //@node Index,  , Debugging Routines, Main scheduling code
3800 //@subsection Index
3801
3802 //@index
3803 //* StgMainThread::  @cindex\s-+StgMainThread
3804 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3805 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3806 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3807 //* context_switch::  @cindex\s-+context_switch
3808 //* createThread::  @cindex\s-+createThread
3809 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3810 //* initScheduler::  @cindex\s-+initScheduler
3811 //* interrupted::  @cindex\s-+interrupted
3812 //* next_thread_id::  @cindex\s-+next_thread_id
3813 //* print_bq::  @cindex\s-+print_bq
3814 //* run_queue_hd::  @cindex\s-+run_queue_hd
3815 //* run_queue_tl::  @cindex\s-+run_queue_tl
3816 //* sched_mutex::  @cindex\s-+sched_mutex
3817 //* schedule::  @cindex\s-+schedule
3818 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3819 //* term_mutex::  @cindex\s-+term_mutex
3820 //@end index