[project @ 2002-05-18 05:28:14 by ken]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.144 2002/05/18 05:28:15 ken Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #ifdef HAVE_SYS_TYPES_H
118 #include <sys/types.h>
119 #endif
120 #ifdef HAVE_UNISTD_H
121 #include <unistd.h>
122 #endif
123
124 #include <stdarg.h>
125
126 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
127 //@subsection Variables and Data structures
128
129 /* Main thread queue.
130  * Locks required: sched_mutex.
131  */
132 StgMainThread *main_threads;
133
134 /* Thread queues.
135  * Locks required: sched_mutex.
136  */
137 #if defined(GRAN)
138
139 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
140 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
141
142 /* 
143    In GranSim we have a runnable and a blocked queue for each processor.
144    In order to minimise code changes new arrays run_queue_hds/tls
145    are created. run_queue_hd is then a short cut (macro) for
146    run_queue_hds[CurrentProc] (see GranSim.h).
147    -- HWL
148 */
149 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
150 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
151 StgTSO *ccalling_threadss[MAX_PROC];
152 /* We use the same global list of threads (all_threads) in GranSim as in
153    the std RTS (i.e. we are cheating). However, we don't use this list in
154    the GranSim specific code at the moment (so we are only potentially
155    cheating).  */
156
157 #else /* !GRAN */
158
159 StgTSO *run_queue_hd, *run_queue_tl;
160 StgTSO *blocked_queue_hd, *blocked_queue_tl;
161 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
162
163 #endif
164
165 /* Linked list of all threads.
166  * Used for detecting garbage collected threads.
167  */
168 StgTSO *all_threads;
169
170 /* When a thread performs a safe C call (_ccall_GC, using old
171  * terminology), it gets put on the suspended_ccalling_threads
172  * list. Used by the garbage collector.
173  */
174 static StgTSO *suspended_ccalling_threads;
175
176 static StgTSO *threadStackOverflow(StgTSO *tso);
177
178 /* KH: The following two flags are shared memory locations.  There is no need
179        to lock them, since they are only unset at the end of a scheduler
180        operation.
181 */
182
183 /* flag set by signal handler to precipitate a context switch */
184 //@cindex context_switch
185 nat context_switch;
186
187 /* if this flag is set as well, give up execution */
188 //@cindex interrupted
189 rtsBool interrupted;
190
191 /* Next thread ID to allocate.
192  * Locks required: thread_id_mutex
193  */
194 //@cindex next_thread_id
195 StgThreadID next_thread_id = 1;
196
197 /*
198  * Pointers to the state of the current thread.
199  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
200  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
201  */
202  
203 /* The smallest stack size that makes any sense is:
204  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
205  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
206  *  + 1                       (the realworld token for an IO thread)
207  *  + 1                       (the closure to enter)
208  *
209  * A thread with this stack will bomb immediately with a stack
210  * overflow, which will increase its stack size.  
211  */
212
213 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
214
215
216 #if defined(GRAN)
217 StgTSO *CurrentTSO;
218 #endif
219
220 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
221  *  exists - earlier gccs apparently didn't.
222  *  -= chak
223  */
224 StgTSO dummy_tso;
225
226 rtsBool ready_to_gc;
227
228 /*
229  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
230  * in an MT setting, needed to signal that a worker thread shouldn't hang around
231  * in the scheduler when it is out of work.
232  */
233 static rtsBool shutting_down_scheduler = rtsFalse;
234
235 void            addToBlockedQueue ( StgTSO *tso );
236
237 static void     schedule          ( void );
238        void     interruptStgRts   ( void );
239
240 static void     detectBlackHoles  ( void );
241
242 #ifdef DEBUG
243 static void sched_belch(char *s, ...);
244 #endif
245
246 #if defined(RTS_SUPPORTS_THREADS)
247 /* ToDo: carefully document the invariants that go together
248  *       with these synchronisation objects.
249  */
250 Mutex     sched_mutex       = INIT_MUTEX_VAR;
251 Mutex     term_mutex        = INIT_MUTEX_VAR;
252
253 /*
254  * A heavyweight solution to the problem of protecting
255  * the thread_id from concurrent update.
256  */
257 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
258
259
260 # if defined(SMP)
261 static Condition gc_pending_cond = INIT_COND_VAR;
262 nat await_death;
263 # endif
264
265 #endif /* RTS_SUPPORTS_THREADS */
266
267 #if defined(PAR)
268 StgTSO *LastTSO;
269 rtsTime TimeOfLastYield;
270 rtsBool emitSchedule = rtsTrue;
271 #endif
272
273 #if DEBUG
274 char *whatNext_strs[] = {
275   "ThreadEnterGHC",
276   "ThreadRunGHC",
277   "ThreadEnterInterp",
278   "ThreadKilled",
279   "ThreadComplete"
280 };
281
282 char *threadReturnCode_strs[] = {
283   "HeapOverflow",                       /* might also be StackOverflow */
284   "StackOverflow",
285   "ThreadYielding",
286   "ThreadBlocked",
287   "ThreadFinished"
288 };
289 #endif
290
291 #if defined(PAR)
292 StgTSO * createSparkThread(rtsSpark spark);
293 StgTSO * activateSpark (rtsSpark spark);  
294 #endif
295
296 /*
297  * The thread state for the main thread.
298 // ToDo: check whether not needed any more
299 StgTSO   *MainTSO;
300  */
301
302 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
303 static void taskStart(void);
304 static void
305 taskStart(void)
306 {
307   schedule();
308 }
309 #endif
310
311
312
313
314 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
315 //@subsection Main scheduling loop
316
317 /* ---------------------------------------------------------------------------
318    Main scheduling loop.
319
320    We use round-robin scheduling, each thread returning to the
321    scheduler loop when one of these conditions is detected:
322
323       * out of heap space
324       * timer expires (thread yields)
325       * thread blocks
326       * thread ends
327       * stack overflow
328
329    Locking notes:  we acquire the scheduler lock once at the beginning
330    of the scheduler loop, and release it when
331     
332       * running a thread, or
333       * waiting for work, or
334       * waiting for a GC to complete.
335
336    GRAN version:
337      In a GranSim setup this loop iterates over the global event queue.
338      This revolves around the global event queue, which determines what 
339      to do next. Therefore, it's more complicated than either the 
340      concurrent or the parallel (GUM) setup.
341
342    GUM version:
343      GUM iterates over incoming messages.
344      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
345      and sends out a fish whenever it has nothing to do; in-between
346      doing the actual reductions (shared code below) it processes the
347      incoming messages and deals with delayed operations 
348      (see PendingFetches).
349      This is not the ugliest code you could imagine, but it's bloody close.
350
351    ------------------------------------------------------------------------ */
352 //@cindex schedule
353 static void
354 schedule( void )
355 {
356   StgTSO *t;
357   Capability *cap;
358   StgThreadReturnCode ret;
359 #if defined(GRAN)
360   rtsEvent *event;
361 #elif defined(PAR)
362   StgSparkPool *pool;
363   rtsSpark spark;
364   StgTSO *tso;
365   GlobalTaskId pe;
366   rtsBool receivedFinish = rtsFalse;
367 # if defined(DEBUG)
368   nat tp_size, sp_size; // stats only
369 # endif
370 #endif
371   rtsBool was_interrupted = rtsFalse;
372   
373   ACQUIRE_LOCK(&sched_mutex);
374  
375 #if defined(RTS_SUPPORTS_THREADS)
376   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
377 #else
378   /* simply initialise it in the non-threaded case */
379   grabCapability(&cap);
380 #endif
381
382 #if defined(GRAN)
383   /* set up first event to get things going */
384   /* ToDo: assign costs for system setup and init MainTSO ! */
385   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
386             ContinueThread, 
387             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
388
389   IF_DEBUG(gran,
390            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391            G_TSO(CurrentTSO, 5));
392
393   if (RtsFlags.GranFlags.Light) {
394     /* Save current time; GranSim Light only */
395     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
396   }      
397
398   event = get_next_event();
399
400   while (event!=(rtsEvent*)NULL) {
401     /* Choose the processor with the next event */
402     CurrentProc = event->proc;
403     CurrentTSO = event->tso;
404
405 #elif defined(PAR)
406
407   while (!receivedFinish) {    /* set by processMessages */
408                                /* when receiving PP_FINISH message         */ 
409 #else
410
411   while (1) {
412
413 #endif
414
415     IF_DEBUG(scheduler, printAllThreads());
416
417 #if defined(RTS_SUPPORTS_THREADS)
418     /* Check to see whether there are any worker threads
419        waiting to deposit external call results. If so,
420        yield our capability */
421     yieldToReturningWorker(&sched_mutex, &cap);
422 #endif
423
424     /* If we're interrupted (the user pressed ^C, or some other
425      * termination condition occurred), kill all the currently running
426      * threads.
427      */
428     if (interrupted) {
429       IF_DEBUG(scheduler, sched_belch("interrupted"));
430       deleteAllThreads();
431       interrupted = rtsFalse;
432       was_interrupted = rtsTrue;
433     }
434
435     /* Go through the list of main threads and wake up any
436      * clients whose computations have finished.  ToDo: this
437      * should be done more efficiently without a linear scan
438      * of the main threads list, somehow...
439      */
440 #if defined(RTS_SUPPORTS_THREADS)
441     { 
442       StgMainThread *m, **prev;
443       prev = &main_threads;
444       for (m = main_threads; m != NULL; m = m->link) {
445         switch (m->tso->what_next) {
446         case ThreadComplete:
447           if (m->ret) {
448             *(m->ret) = (StgClosure *)m->tso->sp[0];
449           }
450           *prev = m->link;
451           m->stat = Success;
452           broadcastCondition(&m->wakeup);
453 #ifdef DEBUG
454           free(m->tso->label);
455           m->tso->label = NULL;
456 #endif
457           break;
458         case ThreadKilled:
459           if (m->ret) *(m->ret) = NULL;
460           *prev = m->link;
461           if (was_interrupted) {
462             m->stat = Interrupted;
463           } else {
464             m->stat = Killed;
465           }
466           broadcastCondition(&m->wakeup);
467 #ifdef DEBUG
468           free(m->tso->label);
469           m->tso->label = NULL;
470 #endif
471           break;
472         default:
473           break;
474         }
475       }
476     }
477
478 #else /* not threaded */
479
480 # if defined(PAR)
481     /* in GUM do this only on the Main PE */
482     if (IAmMainThread)
483 # endif
484     /* If our main thread has finished or been killed, return.
485      */
486     {
487       StgMainThread *m = main_threads;
488       if (m->tso->what_next == ThreadComplete
489           || m->tso->what_next == ThreadKilled) {
490 #ifdef DEBUG
491         free(m->tso->label);
492         m->tso->label = NULL;
493 #endif
494         main_threads = main_threads->link;
495         if (m->tso->what_next == ThreadComplete) {
496           /* we finished successfully, fill in the return value */
497           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
498           m->stat = Success;
499           return;
500         } else {
501           if (m->ret) { *(m->ret) = NULL; };
502           if (was_interrupted) {
503             m->stat = Interrupted;
504           } else {
505             m->stat = Killed;
506           }
507           return;
508         }
509       }
510     }
511 #endif
512
513     /* Top up the run queue from our spark pool.  We try to make the
514      * number of threads in the run queue equal to the number of
515      * free capabilities.
516      *
517      * Disable spark support in SMP for now, non-essential & requires
518      * a little bit of work to make it compile cleanly. -- sof 1/02.
519      */
520 #if 0 /* defined(SMP) */
521     {
522       nat n = getFreeCapabilities();
523       StgTSO *tso = run_queue_hd;
524
525       /* Count the run queue */
526       while (n > 0 && tso != END_TSO_QUEUE) {
527         tso = tso->link;
528         n--;
529       }
530
531       for (; n > 0; n--) {
532         StgClosure *spark;
533         spark = findSpark(rtsFalse);
534         if (spark == NULL) {
535           break; /* no more sparks in the pool */
536         } else {
537           /* I'd prefer this to be done in activateSpark -- HWL */
538           /* tricky - it needs to hold the scheduler lock and
539            * not try to re-acquire it -- SDM */
540           createSparkThread(spark);       
541           IF_DEBUG(scheduler,
542                    sched_belch("==^^ turning spark of closure %p into a thread",
543                                (StgClosure *)spark));
544         }
545       }
546       /* We need to wake up the other tasks if we just created some
547        * work for them.
548        */
549       if (getFreeCapabilities() - n > 1) {
550           signalCondition( &thread_ready_cond );
551       }
552     }
553 #endif // SMP
554
555     /* check for signals each time around the scheduler */
556 #ifndef mingw32_TARGET_OS
557     if (signals_pending()) {
558       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
559       startSignalHandlers();
560       ACQUIRE_LOCK(&sched_mutex);
561     }
562 #endif
563
564     /* Check whether any waiting threads need to be woken up.  If the
565      * run queue is empty, and there are no other tasks running, we
566      * can wait indefinitely for something to happen.
567      * ToDo: what if another client comes along & requests another
568      * main thread?
569      */
570     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
571       awaitEvent( EMPTY_RUN_QUEUE()
572 #if defined(SMP)
573         && allFreeCapabilities()
574 #endif
575         );
576     }
577     /* we can be interrupted while waiting for I/O... */
578     if (interrupted) continue;
579
580     /* 
581      * Detect deadlock: when we have no threads to run, there are no
582      * threads waiting on I/O or sleeping, and all the other tasks are
583      * waiting for work, we must have a deadlock of some description.
584      *
585      * We first try to find threads blocked on themselves (ie. black
586      * holes), and generate NonTermination exceptions where necessary.
587      *
588      * If no threads are black holed, we have a deadlock situation, so
589      * inform all the main threads.
590      */
591 #ifndef PAR
592     if (   EMPTY_THREAD_QUEUES()
593 #if defined(RTS_SUPPORTS_THREADS)
594         && EMPTY_QUEUE(suspended_ccalling_threads)
595 #endif
596 #ifdef SMP
597         && allFreeCapabilities()
598 #endif
599         )
600     {
601         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
602 #if defined(THREADED_RTS)
603         /* and SMP mode ..? */
604         releaseCapability(cap);
605 #endif
606         // Garbage collection can release some new threads due to
607         // either (a) finalizers or (b) threads resurrected because
608         // they are about to be send BlockedOnDeadMVar.  Any threads
609         // thus released will be immediately runnable.
610         GarbageCollect(GetRoots,rtsTrue);
611
612         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
613
614         IF_DEBUG(scheduler, 
615                  sched_belch("still deadlocked, checking for black holes..."));
616         detectBlackHoles();
617
618         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
619
620 #ifndef mingw32_TARGET_OS
621         /* If we have user-installed signal handlers, then wait
622          * for signals to arrive rather then bombing out with a
623          * deadlock.
624          */
625 #if defined(RTS_SUPPORTS_THREADS)
626         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
627                       a signal with no runnable threads (or I/O
628                       suspended ones) leads nowhere quick.
629                       For now, simply shut down when we reach this
630                       condition.
631                       
632                       ToDo: define precisely under what conditions
633                       the Scheduler should shut down in an MT setting.
634                    */
635 #else
636         if ( anyUserHandlers() ) {
637 #endif
638             IF_DEBUG(scheduler, 
639                      sched_belch("still deadlocked, waiting for signals..."));
640
641             awaitUserSignals();
642
643             // we might be interrupted...
644             if (interrupted) { continue; }
645
646             if (signals_pending()) {
647                 RELEASE_LOCK(&sched_mutex);
648                 startSignalHandlers();
649                 ACQUIRE_LOCK(&sched_mutex);
650             }
651             ASSERT(!EMPTY_RUN_QUEUE());
652             goto not_deadlocked;
653         }
654 #endif
655
656         /* Probably a real deadlock.  Send the current main thread the
657          * Deadlock exception (or in the SMP build, send *all* main
658          * threads the deadlock exception, since none of them can make
659          * progress).
660          */
661         {
662             StgMainThread *m;
663 #if defined(RTS_SUPPORTS_THREADS)
664             for (m = main_threads; m != NULL; m = m->link) {
665                 switch (m->tso->why_blocked) {
666                 case BlockedOnBlackHole:
667                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
668                     break;
669                 case BlockedOnException:
670                 case BlockedOnMVar:
671                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
672                     break;
673                 default:
674                     barf("deadlock: main thread blocked in a strange way");
675                 }
676             }
677 #else
678             m = main_threads;
679             switch (m->tso->why_blocked) {
680             case BlockedOnBlackHole:
681                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
682                 break;
683             case BlockedOnException:
684             case BlockedOnMVar:
685                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
686                 break;
687             default:
688                 barf("deadlock: main thread blocked in a strange way");
689             }
690 #endif
691         }
692
693 #if defined(RTS_SUPPORTS_THREADS)
694         /* ToDo: revisit conditions (and mechanism) for shutting
695            down a multi-threaded world  */
696         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
697         RELEASE_LOCK(&sched_mutex);
698         shutdownHaskell();
699         return;
700 #endif
701     }
702   not_deadlocked:
703
704 #elif defined(PAR)
705     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
706 #endif
707
708 #if defined(SMP)
709     /* If there's a GC pending, don't do anything until it has
710      * completed.
711      */
712     if (ready_to_gc) {
713       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
714       waitCondition( &gc_pending_cond, &sched_mutex );
715     }
716 #endif    
717
718 #if defined(RTS_SUPPORTS_THREADS)
719     /* block until we've got a thread on the run queue and a free
720      * capability.
721      *
722      */
723     if ( EMPTY_RUN_QUEUE() ) {
724       /* Give up our capability */
725       releaseCapability(cap);
726
727       /* If we're in the process of shutting down (& running the
728        * a batch of finalisers), don't wait around.
729        */
730       if ( shutting_down_scheduler ) {
731         RELEASE_LOCK(&sched_mutex);
732         return;
733       }
734       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
735       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
736       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
737     }
738 #endif
739
740 #if defined(GRAN)
741     if (RtsFlags.GranFlags.Light)
742       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
743
744     /* adjust time based on time-stamp */
745     if (event->time > CurrentTime[CurrentProc] &&
746         event->evttype != ContinueThread)
747       CurrentTime[CurrentProc] = event->time;
748     
749     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
750     if (!RtsFlags.GranFlags.Light)
751       handleIdlePEs();
752
753     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
754
755     /* main event dispatcher in GranSim */
756     switch (event->evttype) {
757       /* Should just be continuing execution */
758     case ContinueThread:
759       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
760       /* ToDo: check assertion
761       ASSERT(run_queue_hd != (StgTSO*)NULL &&
762              run_queue_hd != END_TSO_QUEUE);
763       */
764       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
765       if (!RtsFlags.GranFlags.DoAsyncFetch &&
766           procStatus[CurrentProc]==Fetching) {
767         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
768               CurrentTSO->id, CurrentTSO, CurrentProc);
769         goto next_thread;
770       } 
771       /* Ignore ContinueThreads for completed threads */
772       if (CurrentTSO->what_next == ThreadComplete) {
773         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
774               CurrentTSO->id, CurrentTSO, CurrentProc);
775         goto next_thread;
776       } 
777       /* Ignore ContinueThreads for threads that are being migrated */
778       if (PROCS(CurrentTSO)==Nowhere) { 
779         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
780               CurrentTSO->id, CurrentTSO, CurrentProc);
781         goto next_thread;
782       }
783       /* The thread should be at the beginning of the run queue */
784       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
785         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
786               CurrentTSO->id, CurrentTSO, CurrentProc);
787         break; // run the thread anyway
788       }
789       /*
790       new_event(proc, proc, CurrentTime[proc],
791                 FindWork,
792                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
793       goto next_thread; 
794       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
795       break; // now actually run the thread; DaH Qu'vam yImuHbej 
796
797     case FetchNode:
798       do_the_fetchnode(event);
799       goto next_thread;             /* handle next event in event queue  */
800       
801     case GlobalBlock:
802       do_the_globalblock(event);
803       goto next_thread;             /* handle next event in event queue  */
804       
805     case FetchReply:
806       do_the_fetchreply(event);
807       goto next_thread;             /* handle next event in event queue  */
808       
809     case UnblockThread:   /* Move from the blocked queue to the tail of */
810       do_the_unblock(event);
811       goto next_thread;             /* handle next event in event queue  */
812       
813     case ResumeThread:  /* Move from the blocked queue to the tail of */
814       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
815       event->tso->gran.blocktime += 
816         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
817       do_the_startthread(event);
818       goto next_thread;             /* handle next event in event queue  */
819       
820     case StartThread:
821       do_the_startthread(event);
822       goto next_thread;             /* handle next event in event queue  */
823       
824     case MoveThread:
825       do_the_movethread(event);
826       goto next_thread;             /* handle next event in event queue  */
827       
828     case MoveSpark:
829       do_the_movespark(event);
830       goto next_thread;             /* handle next event in event queue  */
831       
832     case FindWork:
833       do_the_findwork(event);
834       goto next_thread;             /* handle next event in event queue  */
835       
836     default:
837       barf("Illegal event type %u\n", event->evttype);
838     }  /* switch */
839     
840     /* This point was scheduler_loop in the old RTS */
841
842     IF_DEBUG(gran, belch("GRAN: after main switch"));
843
844     TimeOfLastEvent = CurrentTime[CurrentProc];
845     TimeOfNextEvent = get_time_of_next_event();
846     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
847     // CurrentTSO = ThreadQueueHd;
848
849     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
850                          TimeOfNextEvent));
851
852     if (RtsFlags.GranFlags.Light) 
853       GranSimLight_leave_system(event, &ActiveTSO); 
854
855     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
856
857     IF_DEBUG(gran, 
858              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
859
860     /* in a GranSim setup the TSO stays on the run queue */
861     t = CurrentTSO;
862     /* Take a thread from the run queue. */
863     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
864
865     IF_DEBUG(gran, 
866              fprintf(stderr, "GRAN: About to run current thread, which is\n");
867              G_TSO(t,5));
868
869     context_switch = 0; // turned on via GranYield, checking events and time slice
870
871     IF_DEBUG(gran, 
872              DumpGranEvent(GR_SCHEDULE, t));
873
874     procStatus[CurrentProc] = Busy;
875
876 #elif defined(PAR)
877     if (PendingFetches != END_BF_QUEUE) {
878         processFetches();
879     }
880
881     /* ToDo: phps merge with spark activation above */
882     /* check whether we have local work and send requests if we have none */
883     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
884       /* :-[  no local threads => look out for local sparks */
885       /* the spark pool for the current PE */
886       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
887       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
888           pool->hd < pool->tl) {
889         /* 
890          * ToDo: add GC code check that we really have enough heap afterwards!!
891          * Old comment:
892          * If we're here (no runnable threads) and we have pending
893          * sparks, we must have a space problem.  Get enough space
894          * to turn one of those pending sparks into a
895          * thread... 
896          */
897
898         spark = findSpark(rtsFalse);                /* get a spark */
899         if (spark != (rtsSpark) NULL) {
900           tso = activateSpark(spark);       /* turn the spark into a thread */
901           IF_PAR_DEBUG(schedule,
902                        belch("==== schedule: Created TSO %d (%p); %d threads active",
903                              tso->id, tso, advisory_thread_count));
904
905           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
906             belch("==^^ failed to activate spark");
907             goto next_thread;
908           }               /* otherwise fall through & pick-up new tso */
909         } else {
910           IF_PAR_DEBUG(verbose,
911                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
912                              spark_queue_len(pool)));
913           goto next_thread;
914         }
915       }
916
917       /* If we still have no work we need to send a FISH to get a spark
918          from another PE 
919       */
920       if (EMPTY_RUN_QUEUE()) {
921       /* =8-[  no local sparks => look for work on other PEs */
922         /*
923          * We really have absolutely no work.  Send out a fish
924          * (there may be some out there already), and wait for
925          * something to arrive.  We clearly can't run any threads
926          * until a SCHEDULE or RESUME arrives, and so that's what
927          * we're hoping to see.  (Of course, we still have to
928          * respond to other types of messages.)
929          */
930         TIME now = msTime() /*CURRENT_TIME*/;
931         IF_PAR_DEBUG(verbose, 
932                      belch("--  now=%ld", now));
933         IF_PAR_DEBUG(verbose,
934                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
935                          (last_fish_arrived_at!=0 &&
936                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
937                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
938                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
939                              last_fish_arrived_at,
940                              RtsFlags.ParFlags.fishDelay, now);
941                      });
942         
943         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
944             (last_fish_arrived_at==0 ||
945              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
946           /* outstandingFishes is set in sendFish, processFish;
947              avoid flooding system with fishes via delay */
948           pe = choosePE();
949           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
950                    NEW_FISH_HUNGER);
951
952           // Global statistics: count no. of fishes
953           if (RtsFlags.ParFlags.ParStats.Global &&
954               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
955             globalParStats.tot_fish_mess++;
956           }
957         }
958       
959         receivedFinish = processMessages();
960         goto next_thread;
961       }
962     } else if (PacketsWaiting()) {  /* Look for incoming messages */
963       receivedFinish = processMessages();
964     }
965
966     /* Now we are sure that we have some work available */
967     ASSERT(run_queue_hd != END_TSO_QUEUE);
968
969     /* Take a thread from the run queue, if we have work */
970     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
971     IF_DEBUG(sanity,checkTSO(t));
972
973     /* ToDo: write something to the log-file
974     if (RTSflags.ParFlags.granSimStats && !sameThread)
975         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
976
977     CurrentTSO = t;
978     */
979     /* the spark pool for the current PE */
980     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
981
982     IF_DEBUG(scheduler, 
983              belch("--=^ %d threads, %d sparks on [%#x]", 
984                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
985
986 # if 1
987     if (0 && RtsFlags.ParFlags.ParStats.Full && 
988         t && LastTSO && t->id != LastTSO->id && 
989         LastTSO->why_blocked == NotBlocked && 
990         LastTSO->what_next != ThreadComplete) {
991       // if previously scheduled TSO not blocked we have to record the context switch
992       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
993                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
994     }
995
996     if (RtsFlags.ParFlags.ParStats.Full && 
997         (emitSchedule /* forced emit */ ||
998         (t && LastTSO && t->id != LastTSO->id))) {
999       /* 
1000          we are running a different TSO, so write a schedule event to log file
1001          NB: If we use fair scheduling we also have to write  a deschedule 
1002              event for LastTSO; with unfair scheduling we know that the
1003              previous tso has blocked whenever we switch to another tso, so
1004              we don't need it in GUM for now
1005       */
1006       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1007                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1008       emitSchedule = rtsFalse;
1009     }
1010      
1011 # endif
1012 #else /* !GRAN && !PAR */
1013   
1014     /* grab a thread from the run queue */
1015     ASSERT(run_queue_hd != END_TSO_QUEUE);
1016     t = POP_RUN_QUEUE();
1017     // Sanity check the thread we're about to run.  This can be
1018     // expensive if there is lots of thread switching going on...
1019     IF_DEBUG(sanity,checkTSO(t));
1020 #endif
1021     
1022     cap->r.rCurrentTSO = t;
1023     
1024     /* context switches are now initiated by the timer signal, unless
1025      * the user specified "context switch as often as possible", with
1026      * +RTS -C0
1027      */
1028     if (
1029 #ifdef PROFILING
1030         RtsFlags.ProfFlags.profileInterval == 0 ||
1031 #endif
1032         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1033          && (run_queue_hd != END_TSO_QUEUE
1034              || blocked_queue_hd != END_TSO_QUEUE
1035              || sleeping_queue != END_TSO_QUEUE)))
1036         context_switch = 1;
1037     else
1038         context_switch = 0;
1039
1040     RELEASE_LOCK(&sched_mutex);
1041
1042     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1043                               t->id, t, whatNext_strs[t->what_next]));
1044
1045 #ifdef PROFILING
1046     startHeapProfTimer();
1047 #endif
1048
1049     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1050     /* Run the current thread 
1051      */
1052     switch (cap->r.rCurrentTSO->what_next) {
1053     case ThreadKilled:
1054     case ThreadComplete:
1055         /* Thread already finished, return to scheduler. */
1056         ret = ThreadFinished;
1057         break;
1058     case ThreadEnterGHC:
1059         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1060         break;
1061     case ThreadRunGHC:
1062         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1063         break;
1064     case ThreadEnterInterp:
1065         ret = interpretBCO(cap);
1066         break;
1067     default:
1068       barf("schedule: invalid what_next field");
1069     }
1070     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1071     
1072     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1073 #ifdef PROFILING
1074     stopHeapProfTimer();
1075     CCCS = CCS_SYSTEM;
1076 #endif
1077     
1078     ACQUIRE_LOCK(&sched_mutex);
1079
1080 #ifdef SMP
1081     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1082 #elif !defined(GRAN) && !defined(PAR)
1083     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1084 #endif
1085     t = cap->r.rCurrentTSO;
1086     
1087 #if defined(PAR)
1088     /* HACK 675: if the last thread didn't yield, make sure to print a 
1089        SCHEDULE event to the log file when StgRunning the next thread, even
1090        if it is the same one as before */
1091     LastTSO = t; 
1092     TimeOfLastYield = CURRENT_TIME;
1093 #endif
1094
1095     switch (ret) {
1096     case HeapOverflow:
1097 #if defined(GRAN)
1098       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1099       globalGranStats.tot_heapover++;
1100 #elif defined(PAR)
1101       globalParStats.tot_heapover++;
1102 #endif
1103
1104       // did the task ask for a large block?
1105       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1106           // if so, get one and push it on the front of the nursery.
1107           bdescr *bd;
1108           nat blocks;
1109           
1110           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1111
1112           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1113                                    t->id, t,
1114                                    whatNext_strs[t->what_next], blocks));
1115
1116           // don't do this if it would push us over the
1117           // alloc_blocks_lim limit; we'll GC first.
1118           if (alloc_blocks + blocks < alloc_blocks_lim) {
1119
1120               alloc_blocks += blocks;
1121               bd = allocGroup( blocks );
1122
1123               // link the new group into the list
1124               bd->link = cap->r.rCurrentNursery;
1125               bd->u.back = cap->r.rCurrentNursery->u.back;
1126               if (cap->r.rCurrentNursery->u.back != NULL) {
1127                   cap->r.rCurrentNursery->u.back->link = bd;
1128               } else {
1129                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1130                          g0s0->blocks == cap->r.rNursery);
1131                   cap->r.rNursery = g0s0->blocks = bd;
1132               }           
1133               cap->r.rCurrentNursery->u.back = bd;
1134
1135               // initialise it as a nursery block
1136               bd->step = g0s0;
1137               bd->gen_no = 0;
1138               bd->flags = 0;
1139               bd->free = bd->start;
1140
1141               // don't forget to update the block count in g0s0.
1142               g0s0->n_blocks += blocks;
1143               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1144
1145               // now update the nursery to point to the new block
1146               cap->r.rCurrentNursery = bd;
1147
1148               // we might be unlucky and have another thread get on the
1149               // run queue before us and steal the large block, but in that
1150               // case the thread will just end up requesting another large
1151               // block.
1152               PUSH_ON_RUN_QUEUE(t);
1153               break;
1154           }
1155       }
1156
1157       /* make all the running tasks block on a condition variable,
1158        * maybe set context_switch and wait till they all pile in,
1159        * then have them wait on a GC condition variable.
1160        */
1161       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1162                                t->id, t, whatNext_strs[t->what_next]));
1163       threadPaused(t);
1164 #if defined(GRAN)
1165       ASSERT(!is_on_queue(t,CurrentProc));
1166 #elif defined(PAR)
1167       /* Currently we emit a DESCHEDULE event before GC in GUM.
1168          ToDo: either add separate event to distinguish SYSTEM time from rest
1169                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1170       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1171         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1172                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1173         emitSchedule = rtsTrue;
1174       }
1175 #endif
1176       
1177       ready_to_gc = rtsTrue;
1178       context_switch = 1;               /* stop other threads ASAP */
1179       PUSH_ON_RUN_QUEUE(t);
1180       /* actual GC is done at the end of the while loop */
1181       break;
1182       
1183     case StackOverflow:
1184 #if defined(GRAN)
1185       IF_DEBUG(gran, 
1186                DumpGranEvent(GR_DESCHEDULE, t));
1187       globalGranStats.tot_stackover++;
1188 #elif defined(PAR)
1189       // IF_DEBUG(par, 
1190       // DumpGranEvent(GR_DESCHEDULE, t);
1191       globalParStats.tot_stackover++;
1192 #endif
1193       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1194                                t->id, t, whatNext_strs[t->what_next]));
1195       /* just adjust the stack for this thread, then pop it back
1196        * on the run queue.
1197        */
1198       threadPaused(t);
1199       { 
1200         StgMainThread *m;
1201         /* enlarge the stack */
1202         StgTSO *new_t = threadStackOverflow(t);
1203         
1204         /* This TSO has moved, so update any pointers to it from the
1205          * main thread stack.  It better not be on any other queues...
1206          * (it shouldn't be).
1207          */
1208         for (m = main_threads; m != NULL; m = m->link) {
1209           if (m->tso == t) {
1210             m->tso = new_t;
1211           }
1212         }
1213         threadPaused(new_t);
1214         PUSH_ON_RUN_QUEUE(new_t);
1215       }
1216       break;
1217
1218     case ThreadYielding:
1219 #if defined(GRAN)
1220       IF_DEBUG(gran, 
1221                DumpGranEvent(GR_DESCHEDULE, t));
1222       globalGranStats.tot_yields++;
1223 #elif defined(PAR)
1224       // IF_DEBUG(par, 
1225       // DumpGranEvent(GR_DESCHEDULE, t);
1226       globalParStats.tot_yields++;
1227 #endif
1228       /* put the thread back on the run queue.  Then, if we're ready to
1229        * GC, check whether this is the last task to stop.  If so, wake
1230        * up the GC thread.  getThread will block during a GC until the
1231        * GC is finished.
1232        */
1233       IF_DEBUG(scheduler,
1234                if (t->what_next == ThreadEnterInterp) {
1235                    /* ToDo: or maybe a timer expired when we were in Hugs?
1236                     * or maybe someone hit ctrl-C
1237                     */
1238                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1239                          t->id, t, whatNext_strs[t->what_next]);
1240                } else {
1241                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1242                          t->id, t, whatNext_strs[t->what_next]);
1243                }
1244                );
1245
1246       threadPaused(t);
1247
1248       IF_DEBUG(sanity,
1249                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1250                checkTSO(t));
1251       ASSERT(t->link == END_TSO_QUEUE);
1252 #if defined(GRAN)
1253       ASSERT(!is_on_queue(t,CurrentProc));
1254
1255       IF_DEBUG(sanity,
1256                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1257                checkThreadQsSanity(rtsTrue));
1258 #endif
1259 #if defined(PAR)
1260       if (RtsFlags.ParFlags.doFairScheduling) { 
1261         /* this does round-robin scheduling; good for concurrency */
1262         APPEND_TO_RUN_QUEUE(t);
1263       } else {
1264         /* this does unfair scheduling; good for parallelism */
1265         PUSH_ON_RUN_QUEUE(t);
1266       }
1267 #else
1268       /* this does round-robin scheduling; good for concurrency */
1269       APPEND_TO_RUN_QUEUE(t);
1270 #endif
1271 #if defined(GRAN)
1272       /* add a ContinueThread event to actually process the thread */
1273       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1274                 ContinueThread,
1275                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1276       IF_GRAN_DEBUG(bq, 
1277                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1278                G_EVENTQ(0);
1279                G_CURR_THREADQ(0));
1280 #endif /* GRAN */
1281       break;
1282       
1283     case ThreadBlocked:
1284 #if defined(GRAN)
1285       IF_DEBUG(scheduler,
1286                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1287                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1288                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1289
1290       // ??? needed; should emit block before
1291       IF_DEBUG(gran, 
1292                DumpGranEvent(GR_DESCHEDULE, t)); 
1293       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1294       /*
1295         ngoq Dogh!
1296       ASSERT(procStatus[CurrentProc]==Busy || 
1297               ((procStatus[CurrentProc]==Fetching) && 
1298               (t->block_info.closure!=(StgClosure*)NULL)));
1299       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1300           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1301             procStatus[CurrentProc]==Fetching)) 
1302         procStatus[CurrentProc] = Idle;
1303       */
1304 #elif defined(PAR)
1305       IF_DEBUG(scheduler,
1306                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1307                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1308       IF_PAR_DEBUG(bq,
1309
1310                    if (t->block_info.closure!=(StgClosure*)NULL) 
1311                      print_bq(t->block_info.closure));
1312
1313       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1314       blockThread(t);
1315
1316       /* whatever we schedule next, we must log that schedule */
1317       emitSchedule = rtsTrue;
1318
1319 #else /* !GRAN */
1320       /* don't need to do anything.  Either the thread is blocked on
1321        * I/O, in which case we'll have called addToBlockedQueue
1322        * previously, or it's blocked on an MVar or Blackhole, in which
1323        * case it'll be on the relevant queue already.
1324        */
1325       IF_DEBUG(scheduler,
1326                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1327                printThreadBlockage(t);
1328                fprintf(stderr, "\n"));
1329
1330       /* Only for dumping event to log file 
1331          ToDo: do I need this in GranSim, too?
1332       blockThread(t);
1333       */
1334 #endif
1335       threadPaused(t);
1336       break;
1337       
1338     case ThreadFinished:
1339       /* Need to check whether this was a main thread, and if so, signal
1340        * the task that started it with the return value.  If we have no
1341        * more main threads, we probably need to stop all the tasks until
1342        * we get a new one.
1343        */
1344       /* We also end up here if the thread kills itself with an
1345        * uncaught exception, see Exception.hc.
1346        */
1347       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1348 #if defined(GRAN)
1349       endThread(t, CurrentProc); // clean-up the thread
1350 #elif defined(PAR)
1351       /* For now all are advisory -- HWL */
1352       //if(t->priority==AdvisoryPriority) ??
1353       advisory_thread_count--;
1354       
1355 # ifdef DIST
1356       if(t->dist.priority==RevalPriority)
1357         FinishReval(t);
1358 # endif
1359       
1360       if (RtsFlags.ParFlags.ParStats.Full &&
1361           !RtsFlags.ParFlags.ParStats.Suppressed) 
1362         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1363 #endif
1364       break;
1365       
1366     default:
1367       barf("schedule: invalid thread return code %d", (int)ret);
1368     }
1369
1370 #ifdef PROFILING
1371     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1372         GarbageCollect(GetRoots, rtsTrue);
1373         heapCensus();
1374         performHeapProfile = rtsFalse;
1375         ready_to_gc = rtsFalse; // we already GC'd
1376     }
1377 #endif
1378
1379     if (ready_to_gc 
1380 #ifdef SMP
1381         && allFreeCapabilities() 
1382 #endif
1383         ) {
1384       /* everybody back, start the GC.
1385        * Could do it in this thread, or signal a condition var
1386        * to do it in another thread.  Either way, we need to
1387        * broadcast on gc_pending_cond afterward.
1388        */
1389 #if defined(RTS_SUPPORTS_THREADS)
1390       IF_DEBUG(scheduler,sched_belch("doing GC"));
1391 #endif
1392       GarbageCollect(GetRoots,rtsFalse);
1393       ready_to_gc = rtsFalse;
1394 #ifdef SMP
1395       broadcastCondition(&gc_pending_cond);
1396 #endif
1397 #if defined(GRAN)
1398       /* add a ContinueThread event to continue execution of current thread */
1399       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1400                 ContinueThread,
1401                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1402       IF_GRAN_DEBUG(bq, 
1403                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1404                G_EVENTQ(0);
1405                G_CURR_THREADQ(0));
1406 #endif /* GRAN */
1407     }
1408
1409 #if defined(GRAN)
1410   next_thread:
1411     IF_GRAN_DEBUG(unused,
1412                   print_eventq(EventHd));
1413
1414     event = get_next_event();
1415 #elif defined(PAR)
1416   next_thread:
1417     /* ToDo: wait for next message to arrive rather than busy wait */
1418 #endif /* GRAN */
1419
1420   } /* end of while(1) */
1421
1422   IF_PAR_DEBUG(verbose,
1423                belch("== Leaving schedule() after having received Finish"));
1424 }
1425
1426 /* ---------------------------------------------------------------------------
1427  * Singleton fork(). Do not copy any running threads.
1428  * ------------------------------------------------------------------------- */
1429
1430 StgInt forkProcess(StgTSO* tso) {
1431
1432 #ifndef mingw32_TARGET_OS
1433   pid_t pid;
1434   StgTSO* t,*next;
1435
1436   IF_DEBUG(scheduler,sched_belch("forking!"));
1437
1438   pid = fork();
1439   if (pid) { /* parent */
1440
1441   /* just return the pid */
1442     
1443   } else { /* child */
1444   /* wipe all other threads */
1445   run_queue_hd = tso;
1446   tso->link = END_TSO_QUEUE;
1447
1448   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1449      us is picky about finding the threat still in its queue when
1450      handling the deleteThread() */
1451
1452   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1453     next = t->link;
1454     if (t->id != tso->id) {
1455       deleteThread(t);
1456     }
1457   }
1458   }
1459   return pid;
1460 #else /* mingw32 */
1461   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1462   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1463   return -1;
1464 #endif /* mingw32 */
1465 }
1466
1467 /* ---------------------------------------------------------------------------
1468  * deleteAllThreads():  kill all the live threads.
1469  *
1470  * This is used when we catch a user interrupt (^C), before performing
1471  * any necessary cleanups and running finalizers.
1472  *
1473  * Locks: sched_mutex held.
1474  * ------------------------------------------------------------------------- */
1475    
1476 void deleteAllThreads ( void )
1477 {
1478   StgTSO* t, *next;
1479   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1480   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1481       next = t->global_link;
1482       deleteThread(t);
1483   }      
1484   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1485   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1486   sleeping_queue = END_TSO_QUEUE;
1487 }
1488
1489 /* startThread and  insertThread are now in GranSim.c -- HWL */
1490
1491
1492 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1493 //@subsection Suspend and Resume
1494
1495 /* ---------------------------------------------------------------------------
1496  * Suspending & resuming Haskell threads.
1497  * 
1498  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1499  * its capability before calling the C function.  This allows another
1500  * task to pick up the capability and carry on running Haskell
1501  * threads.  It also means that if the C call blocks, it won't lock
1502  * the whole system.
1503  *
1504  * The Haskell thread making the C call is put to sleep for the
1505  * duration of the call, on the susepended_ccalling_threads queue.  We
1506  * give out a token to the task, which it can use to resume the thread
1507  * on return from the C function.
1508  * ------------------------------------------------------------------------- */
1509    
1510 StgInt
1511 suspendThread( StgRegTable *reg, 
1512                rtsBool concCall
1513 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1514                STG_UNUSED
1515 #endif
1516                )
1517 {
1518   nat tok;
1519   Capability *cap;
1520
1521   /* assume that *reg is a pointer to the StgRegTable part
1522    * of a Capability.
1523    */
1524   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1525
1526   ACQUIRE_LOCK(&sched_mutex);
1527
1528   IF_DEBUG(scheduler,
1529            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1530
1531   threadPaused(cap->r.rCurrentTSO);
1532   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1533   suspended_ccalling_threads = cap->r.rCurrentTSO;
1534
1535 #if defined(RTS_SUPPORTS_THREADS)
1536   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1537 #endif
1538
1539   /* Use the thread ID as the token; it should be unique */
1540   tok = cap->r.rCurrentTSO->id;
1541
1542   /* Hand back capability */
1543   releaseCapability(cap);
1544   
1545 #if defined(RTS_SUPPORTS_THREADS)
1546   /* Preparing to leave the RTS, so ensure there's a native thread/task
1547      waiting to take over.
1548      
1549      ToDo: optimise this and only create a new task if there's a need
1550      for one (i.e., if there's only one Concurrent Haskell thread alive,
1551      there's no need to create a new task).
1552   */
1553   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1554   if (concCall) {
1555     startTask(taskStart);
1556   }
1557 #endif
1558
1559   /* Other threads _might_ be available for execution; signal this */
1560   THREAD_RUNNABLE();
1561   RELEASE_LOCK(&sched_mutex);
1562   return tok; 
1563 }
1564
1565 StgRegTable *
1566 resumeThread( StgInt tok,
1567               rtsBool concCall
1568 #if !defined(RTS_SUPPORTS_THREADS)
1569                STG_UNUSED
1570 #endif
1571               )
1572 {
1573   StgTSO *tso, **prev;
1574   Capability *cap;
1575
1576 #if defined(RTS_SUPPORTS_THREADS)
1577   /* Wait for permission to re-enter the RTS with the result. */
1578   if ( concCall ) {
1579     ACQUIRE_LOCK(&sched_mutex);
1580     grabReturnCapability(&sched_mutex, &cap);
1581   } else {
1582     grabCapability(&cap);
1583   }
1584 #else
1585   grabCapability(&cap);
1586 #endif
1587
1588   /* Remove the thread off of the suspended list */
1589   prev = &suspended_ccalling_threads;
1590   for (tso = suspended_ccalling_threads; 
1591        tso != END_TSO_QUEUE; 
1592        prev = &tso->link, tso = tso->link) {
1593     if (tso->id == (StgThreadID)tok) {
1594       *prev = tso->link;
1595       break;
1596     }
1597   }
1598   if (tso == END_TSO_QUEUE) {
1599     barf("resumeThread: thread not found");
1600   }
1601   tso->link = END_TSO_QUEUE;
1602   /* Reset blocking status */
1603   tso->why_blocked  = NotBlocked;
1604
1605   cap->r.rCurrentTSO = tso;
1606   RELEASE_LOCK(&sched_mutex);
1607   return &cap->r;
1608 }
1609
1610
1611 /* ---------------------------------------------------------------------------
1612  * Static functions
1613  * ------------------------------------------------------------------------ */
1614 static void unblockThread(StgTSO *tso);
1615
1616 /* ---------------------------------------------------------------------------
1617  * Comparing Thread ids.
1618  *
1619  * This is used from STG land in the implementation of the
1620  * instances of Eq/Ord for ThreadIds.
1621  * ------------------------------------------------------------------------ */
1622
1623 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1624
1625   StgThreadID id1 = tso1->id; 
1626   StgThreadID id2 = tso2->id;
1627  
1628   if (id1 < id2) return (-1);
1629   if (id1 > id2) return 1;
1630   return 0;
1631 }
1632
1633 /* ---------------------------------------------------------------------------
1634  * Fetching the ThreadID from an StgTSO.
1635  *
1636  * This is used in the implementation of Show for ThreadIds.
1637  * ------------------------------------------------------------------------ */
1638 int rts_getThreadId(const StgTSO *tso) 
1639 {
1640   return tso->id;
1641 }
1642
1643 #ifdef DEBUG
1644 void labelThread(StgTSO *tso, char *label)
1645 {
1646   int len;
1647   void *buf;
1648
1649   /* Caveat: Once set, you can only set the thread name to "" */
1650   len = strlen(label)+1;
1651   buf = realloc(tso->label,len);
1652   if (buf == NULL) {
1653     fprintf(stderr,"insufficient memory for labelThread!\n");
1654     free(tso->label);
1655     tso->label = NULL;
1656   } else
1657     strncpy(buf,label,len);
1658   tso->label = buf;
1659 }
1660 #endif /* DEBUG */
1661
1662 /* ---------------------------------------------------------------------------
1663    Create a new thread.
1664
1665    The new thread starts with the given stack size.  Before the
1666    scheduler can run, however, this thread needs to have a closure
1667    (and possibly some arguments) pushed on its stack.  See
1668    pushClosure() in Schedule.h.
1669
1670    createGenThread() and createIOThread() (in SchedAPI.h) are
1671    convenient packaged versions of this function.
1672
1673    currently pri (priority) is only used in a GRAN setup -- HWL
1674    ------------------------------------------------------------------------ */
1675 //@cindex createThread
1676 #if defined(GRAN)
1677 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1678 StgTSO *
1679 createThread(nat size, StgInt pri)
1680 #else
1681 StgTSO *
1682 createThread(nat size)
1683 #endif
1684 {
1685
1686     StgTSO *tso;
1687     nat stack_size;
1688
1689     /* First check whether we should create a thread at all */
1690 #if defined(PAR)
1691   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1692   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1693     threadsIgnored++;
1694     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1695           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1696     return END_TSO_QUEUE;
1697   }
1698   threadsCreated++;
1699 #endif
1700
1701 #if defined(GRAN)
1702   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1703 #endif
1704
1705   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1706
1707   /* catch ridiculously small stack sizes */
1708   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1709     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1710   }
1711
1712   stack_size = size - TSO_STRUCT_SIZEW;
1713
1714   tso = (StgTSO *)allocate(size);
1715   TICK_ALLOC_TSO(stack_size, 0);
1716
1717   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1718 #if defined(GRAN)
1719   SET_GRAN_HDR(tso, ThisPE);
1720 #endif
1721   tso->what_next     = ThreadEnterGHC;
1722
1723 #ifdef DEBUG
1724   tso->label = NULL;
1725 #endif
1726
1727   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1728    * protect the increment operation on next_thread_id.
1729    * In future, we could use an atomic increment instead.
1730    */
1731   ACQUIRE_LOCK(&thread_id_mutex);
1732   tso->id = next_thread_id++; 
1733   RELEASE_LOCK(&thread_id_mutex);
1734
1735   tso->why_blocked  = NotBlocked;
1736   tso->blocked_exceptions = NULL;
1737
1738   tso->stack_size   = stack_size;
1739   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1740                               - TSO_STRUCT_SIZEW;
1741   tso->sp           = (P_)&(tso->stack) + stack_size;
1742
1743 #ifdef PROFILING
1744   tso->prof.CCCS = CCS_MAIN;
1745 #endif
1746
1747   /* put a stop frame on the stack */
1748   tso->sp -= sizeofW(StgStopFrame);
1749   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1750   tso->su = (StgUpdateFrame*)tso->sp;
1751
1752   // ToDo: check this
1753 #if defined(GRAN)
1754   tso->link = END_TSO_QUEUE;
1755   /* uses more flexible routine in GranSim */
1756   insertThread(tso, CurrentProc);
1757 #else
1758   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1759    * from its creation
1760    */
1761 #endif
1762
1763 #if defined(GRAN) 
1764   if (RtsFlags.GranFlags.GranSimStats.Full) 
1765     DumpGranEvent(GR_START,tso);
1766 #elif defined(PAR)
1767   if (RtsFlags.ParFlags.ParStats.Full) 
1768     DumpGranEvent(GR_STARTQ,tso);
1769   /* HACk to avoid SCHEDULE 
1770      LastTSO = tso; */
1771 #endif
1772
1773   /* Link the new thread on the global thread list.
1774    */
1775   tso->global_link = all_threads;
1776   all_threads = tso;
1777
1778 #if defined(DIST)
1779   tso->dist.priority = MandatoryPriority; //by default that is...
1780 #endif
1781
1782 #if defined(GRAN)
1783   tso->gran.pri = pri;
1784 # if defined(DEBUG)
1785   tso->gran.magic = TSO_MAGIC; // debugging only
1786 # endif
1787   tso->gran.sparkname   = 0;
1788   tso->gran.startedat   = CURRENT_TIME; 
1789   tso->gran.exported    = 0;
1790   tso->gran.basicblocks = 0;
1791   tso->gran.allocs      = 0;
1792   tso->gran.exectime    = 0;
1793   tso->gran.fetchtime   = 0;
1794   tso->gran.fetchcount  = 0;
1795   tso->gran.blocktime   = 0;
1796   tso->gran.blockcount  = 0;
1797   tso->gran.blockedat   = 0;
1798   tso->gran.globalsparks = 0;
1799   tso->gran.localsparks  = 0;
1800   if (RtsFlags.GranFlags.Light)
1801     tso->gran.clock  = Now; /* local clock */
1802   else
1803     tso->gran.clock  = 0;
1804
1805   IF_DEBUG(gran,printTSO(tso));
1806 #elif defined(PAR)
1807 # if defined(DEBUG)
1808   tso->par.magic = TSO_MAGIC; // debugging only
1809 # endif
1810   tso->par.sparkname   = 0;
1811   tso->par.startedat   = CURRENT_TIME; 
1812   tso->par.exported    = 0;
1813   tso->par.basicblocks = 0;
1814   tso->par.allocs      = 0;
1815   tso->par.exectime    = 0;
1816   tso->par.fetchtime   = 0;
1817   tso->par.fetchcount  = 0;
1818   tso->par.blocktime   = 0;
1819   tso->par.blockcount  = 0;
1820   tso->par.blockedat   = 0;
1821   tso->par.globalsparks = 0;
1822   tso->par.localsparks  = 0;
1823 #endif
1824
1825 #if defined(GRAN)
1826   globalGranStats.tot_threads_created++;
1827   globalGranStats.threads_created_on_PE[CurrentProc]++;
1828   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1829   globalGranStats.tot_sq_probes++;
1830 #elif defined(PAR)
1831   // collect parallel global statistics (currently done together with GC stats)
1832   if (RtsFlags.ParFlags.ParStats.Global &&
1833       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1834     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1835     globalParStats.tot_threads_created++;
1836   }
1837 #endif 
1838
1839 #if defined(GRAN)
1840   IF_GRAN_DEBUG(pri,
1841                 belch("==__ schedule: Created TSO %d (%p);",
1842                       CurrentProc, tso, tso->id));
1843 #elif defined(PAR)
1844     IF_PAR_DEBUG(verbose,
1845                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1846                        tso->id, tso, advisory_thread_count));
1847 #else
1848   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1849                                  tso->id, tso->stack_size));
1850 #endif    
1851   return tso;
1852 }
1853
1854 #if defined(PAR)
1855 /* RFP:
1856    all parallel thread creation calls should fall through the following routine.
1857 */
1858 StgTSO *
1859 createSparkThread(rtsSpark spark) 
1860 { StgTSO *tso;
1861   ASSERT(spark != (rtsSpark)NULL);
1862   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1863   { threadsIgnored++;
1864     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1865           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1866     return END_TSO_QUEUE;
1867   }
1868   else
1869   { threadsCreated++;
1870     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1871     if (tso==END_TSO_QUEUE)     
1872       barf("createSparkThread: Cannot create TSO");
1873 #if defined(DIST)
1874     tso->priority = AdvisoryPriority;
1875 #endif
1876     pushClosure(tso,spark);
1877     PUSH_ON_RUN_QUEUE(tso);
1878     advisory_thread_count++;    
1879   }
1880   return tso;
1881 }
1882 #endif
1883
1884 /*
1885   Turn a spark into a thread.
1886   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1887 */
1888 #if defined(PAR)
1889 //@cindex activateSpark
1890 StgTSO *
1891 activateSpark (rtsSpark spark) 
1892 {
1893   StgTSO *tso;
1894
1895   tso = createSparkThread(spark);
1896   if (RtsFlags.ParFlags.ParStats.Full) {   
1897     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1898     IF_PAR_DEBUG(verbose,
1899                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1900                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1901   }
1902   // ToDo: fwd info on local/global spark to thread -- HWL
1903   // tso->gran.exported =  spark->exported;
1904   // tso->gran.locked =   !spark->global;
1905   // tso->gran.sparkname = spark->name;
1906
1907   return tso;
1908 }
1909 #endif
1910
1911 /* ---------------------------------------------------------------------------
1912  * scheduleThread()
1913  *
1914  * scheduleThread puts a thread on the head of the runnable queue.
1915  * This will usually be done immediately after a thread is created.
1916  * The caller of scheduleThread must create the thread using e.g.
1917  * createThread and push an appropriate closure
1918  * on this thread's stack before the scheduler is invoked.
1919  * ------------------------------------------------------------------------ */
1920
1921 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1922
1923 void
1924 scheduleThread_(StgTSO *tso
1925                , rtsBool createTask
1926 #if !defined(THREADED_RTS)
1927                  STG_UNUSED
1928 #endif
1929               )
1930 {
1931   ACQUIRE_LOCK(&sched_mutex);
1932
1933   /* Put the new thread on the head of the runnable queue.  The caller
1934    * better push an appropriate closure on this thread's stack
1935    * beforehand.  In the SMP case, the thread may start running as
1936    * soon as we release the scheduler lock below.
1937    */
1938   PUSH_ON_RUN_QUEUE(tso);
1939 #if defined(THREADED_RTS)
1940   /* If main() is scheduling a thread, don't bother creating a 
1941    * new task.
1942    */
1943   if ( createTask ) {
1944     startTask(taskStart);
1945   }
1946 #endif
1947   THREAD_RUNNABLE();
1948
1949 #if 0
1950   IF_DEBUG(scheduler,printTSO(tso));
1951 #endif
1952   RELEASE_LOCK(&sched_mutex);
1953 }
1954
1955 void scheduleThread(StgTSO* tso)
1956 {
1957   return scheduleThread_(tso, rtsFalse);
1958 }
1959
1960 void scheduleExtThread(StgTSO* tso)
1961 {
1962   return scheduleThread_(tso, rtsTrue);
1963 }
1964
1965 /* ---------------------------------------------------------------------------
1966  * initScheduler()
1967  *
1968  * Initialise the scheduler.  This resets all the queues - if the
1969  * queues contained any threads, they'll be garbage collected at the
1970  * next pass.
1971  *
1972  * ------------------------------------------------------------------------ */
1973
1974 #ifdef SMP
1975 static void
1976 term_handler(int sig STG_UNUSED)
1977 {
1978   stat_workerStop();
1979   ACQUIRE_LOCK(&term_mutex);
1980   await_death--;
1981   RELEASE_LOCK(&term_mutex);
1982   shutdownThread();
1983 }
1984 #endif
1985
1986 void 
1987 initScheduler(void)
1988 {
1989 #if defined(GRAN)
1990   nat i;
1991
1992   for (i=0; i<=MAX_PROC; i++) {
1993     run_queue_hds[i]      = END_TSO_QUEUE;
1994     run_queue_tls[i]      = END_TSO_QUEUE;
1995     blocked_queue_hds[i]  = END_TSO_QUEUE;
1996     blocked_queue_tls[i]  = END_TSO_QUEUE;
1997     ccalling_threadss[i]  = END_TSO_QUEUE;
1998     sleeping_queue        = END_TSO_QUEUE;
1999   }
2000 #else
2001   run_queue_hd      = END_TSO_QUEUE;
2002   run_queue_tl      = END_TSO_QUEUE;
2003   blocked_queue_hd  = END_TSO_QUEUE;
2004   blocked_queue_tl  = END_TSO_QUEUE;
2005   sleeping_queue    = END_TSO_QUEUE;
2006 #endif 
2007
2008   suspended_ccalling_threads  = END_TSO_QUEUE;
2009
2010   main_threads = NULL;
2011   all_threads  = END_TSO_QUEUE;
2012
2013   context_switch = 0;
2014   interrupted    = 0;
2015
2016   RtsFlags.ConcFlags.ctxtSwitchTicks =
2017       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2018       
2019 #if defined(RTS_SUPPORTS_THREADS)
2020   /* Initialise the mutex and condition variables used by
2021    * the scheduler. */
2022   initMutex(&sched_mutex);
2023   initMutex(&term_mutex);
2024   initMutex(&thread_id_mutex);
2025
2026   initCondition(&thread_ready_cond);
2027 #endif
2028   
2029 #if defined(SMP)
2030   initCondition(&gc_pending_cond);
2031 #endif
2032
2033 #if defined(RTS_SUPPORTS_THREADS)
2034   ACQUIRE_LOCK(&sched_mutex);
2035 #endif
2036
2037   /* Install the SIGHUP handler */
2038 #if defined(SMP)
2039   {
2040     struct sigaction action,oact;
2041
2042     action.sa_handler = term_handler;
2043     sigemptyset(&action.sa_mask);
2044     action.sa_flags = 0;
2045     if (sigaction(SIGTERM, &action, &oact) != 0) {
2046       barf("can't install TERM handler");
2047     }
2048   }
2049 #endif
2050
2051   /* A capability holds the state a native thread needs in
2052    * order to execute STG code. At least one capability is
2053    * floating around (only SMP builds have more than one).
2054    */
2055   initCapabilities();
2056   
2057 #if defined(RTS_SUPPORTS_THREADS)
2058     /* start our haskell execution tasks */
2059 # if defined(SMP)
2060     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2061 # else
2062     startTaskManager(0,taskStart);
2063 # endif
2064 #endif
2065
2066 #if /* defined(SMP) ||*/ defined(PAR)
2067   initSparkPools();
2068 #endif
2069
2070 #if defined(RTS_SUPPORTS_THREADS)
2071   RELEASE_LOCK(&sched_mutex);
2072 #endif
2073
2074 }
2075
2076 void
2077 exitScheduler( void )
2078 {
2079 #if defined(RTS_SUPPORTS_THREADS)
2080   stopTaskManager();
2081 #endif
2082   shutting_down_scheduler = rtsTrue;
2083 }
2084
2085 /* -----------------------------------------------------------------------------
2086    Managing the per-task allocation areas.
2087    
2088    Each capability comes with an allocation area.  These are
2089    fixed-length block lists into which allocation can be done.
2090
2091    ToDo: no support for two-space collection at the moment???
2092    -------------------------------------------------------------------------- */
2093
2094 /* -----------------------------------------------------------------------------
2095  * waitThread is the external interface for running a new computation
2096  * and waiting for the result.
2097  *
2098  * In the non-SMP case, we create a new main thread, push it on the 
2099  * main-thread stack, and invoke the scheduler to run it.  The
2100  * scheduler will return when the top main thread on the stack has
2101  * completed or died, and fill in the necessary fields of the
2102  * main_thread structure.
2103  *
2104  * In the SMP case, we create a main thread as before, but we then
2105  * create a new condition variable and sleep on it.  When our new
2106  * main thread has completed, we'll be woken up and the status/result
2107  * will be in the main_thread struct.
2108  * -------------------------------------------------------------------------- */
2109
2110 int 
2111 howManyThreadsAvail ( void )
2112 {
2113    int i = 0;
2114    StgTSO* q;
2115    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2116       i++;
2117    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2118       i++;
2119    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2120       i++;
2121    return i;
2122 }
2123
2124 void
2125 finishAllThreads ( void )
2126 {
2127    do {
2128       while (run_queue_hd != END_TSO_QUEUE) {
2129          waitThread ( run_queue_hd, NULL);
2130       }
2131       while (blocked_queue_hd != END_TSO_QUEUE) {
2132          waitThread ( blocked_queue_hd, NULL);
2133       }
2134       while (sleeping_queue != END_TSO_QUEUE) {
2135          waitThread ( blocked_queue_hd, NULL);
2136       }
2137    } while 
2138       (blocked_queue_hd != END_TSO_QUEUE || 
2139        run_queue_hd     != END_TSO_QUEUE ||
2140        sleeping_queue   != END_TSO_QUEUE);
2141 }
2142
2143 SchedulerStatus
2144 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2145
2146   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2147 #if defined(THREADED_RTS)
2148   return waitThread_(tso,ret, rtsFalse);
2149 #else
2150   return waitThread_(tso,ret);
2151 #endif
2152 }
2153
2154 SchedulerStatus
2155 waitThread_(StgTSO *tso,
2156             /*out*/StgClosure **ret
2157 #if defined(THREADED_RTS)
2158             , rtsBool blockWaiting
2159 #endif
2160            )
2161 {
2162   StgMainThread *m;
2163   SchedulerStatus stat;
2164
2165   ACQUIRE_LOCK(&sched_mutex);
2166   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2167   
2168   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2169
2170   m->tso = tso;
2171   m->ret = ret;
2172   m->stat = NoStatus;
2173 #if defined(RTS_SUPPORTS_THREADS)
2174   initCondition(&m->wakeup);
2175 #endif
2176
2177   m->link = main_threads;
2178   main_threads = m;
2179
2180   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2181
2182 #if defined(RTS_SUPPORTS_THREADS)
2183
2184 # if defined(THREADED_RTS)
2185   if (!blockWaiting) {
2186     /* In the threaded case, the OS thread that called main()
2187      * gets to enter the RTS directly without going via another
2188      * task/thread.
2189      */
2190     RELEASE_LOCK(&sched_mutex);
2191     schedule();
2192     ASSERT(m->stat != NoStatus);
2193   } else 
2194 # endif
2195   {
2196     do {
2197       waitCondition(&m->wakeup, &sched_mutex);
2198     } while (m->stat == NoStatus);
2199   }
2200 #elif defined(GRAN)
2201   /* GranSim specific init */
2202   CurrentTSO = m->tso;                // the TSO to run
2203   procStatus[MainProc] = Busy;        // status of main PE
2204   CurrentProc = MainProc;             // PE to run it on
2205
2206   schedule();
2207 #else
2208   RELEASE_LOCK(&sched_mutex);
2209   schedule();
2210   ASSERT(m->stat != NoStatus);
2211 #endif
2212
2213   stat = m->stat;
2214
2215 #if defined(RTS_SUPPORTS_THREADS)
2216   closeCondition(&m->wakeup);
2217 #endif
2218
2219   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2220                               m->tso->id));
2221   free(m);
2222
2223 #if defined(THREADED_RTS)
2224   if (blockWaiting) 
2225 #endif
2226     RELEASE_LOCK(&sched_mutex);
2227
2228   return stat;
2229 }
2230
2231 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2232 //@subsection Run queue code 
2233
2234 #if 0
2235 /* 
2236    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2237        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2238        implicit global variable that has to be correct when calling these
2239        fcts -- HWL 
2240 */
2241
2242 /* Put the new thread on the head of the runnable queue.
2243  * The caller of createThread better push an appropriate closure
2244  * on this thread's stack before the scheduler is invoked.
2245  */
2246 static /* inline */ void
2247 add_to_run_queue(tso)
2248 StgTSO* tso; 
2249 {
2250   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2251   tso->link = run_queue_hd;
2252   run_queue_hd = tso;
2253   if (run_queue_tl == END_TSO_QUEUE) {
2254     run_queue_tl = tso;
2255   }
2256 }
2257
2258 /* Put the new thread at the end of the runnable queue. */
2259 static /* inline */ void
2260 push_on_run_queue(tso)
2261 StgTSO* tso; 
2262 {
2263   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2264   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2265   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2266   if (run_queue_hd == END_TSO_QUEUE) {
2267     run_queue_hd = tso;
2268   } else {
2269     run_queue_tl->link = tso;
2270   }
2271   run_queue_tl = tso;
2272 }
2273
2274 /* 
2275    Should be inlined because it's used very often in schedule.  The tso
2276    argument is actually only needed in GranSim, where we want to have the
2277    possibility to schedule *any* TSO on the run queue, irrespective of the
2278    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2279    the run queue and dequeue the tso, adjusting the links in the queue. 
2280 */
2281 //@cindex take_off_run_queue
2282 static /* inline */ StgTSO*
2283 take_off_run_queue(StgTSO *tso) {
2284   StgTSO *t, *prev;
2285
2286   /* 
2287      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2288
2289      if tso is specified, unlink that tso from the run_queue (doesn't have
2290      to be at the beginning of the queue); GranSim only 
2291   */
2292   if (tso!=END_TSO_QUEUE) {
2293     /* find tso in queue */
2294     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2295          t!=END_TSO_QUEUE && t!=tso;
2296          prev=t, t=t->link) 
2297       /* nothing */ ;
2298     ASSERT(t==tso);
2299     /* now actually dequeue the tso */
2300     if (prev!=END_TSO_QUEUE) {
2301       ASSERT(run_queue_hd!=t);
2302       prev->link = t->link;
2303     } else {
2304       /* t is at beginning of thread queue */
2305       ASSERT(run_queue_hd==t);
2306       run_queue_hd = t->link;
2307     }
2308     /* t is at end of thread queue */
2309     if (t->link==END_TSO_QUEUE) {
2310       ASSERT(t==run_queue_tl);
2311       run_queue_tl = prev;
2312     } else {
2313       ASSERT(run_queue_tl!=t);
2314     }
2315     t->link = END_TSO_QUEUE;
2316   } else {
2317     /* take tso from the beginning of the queue; std concurrent code */
2318     t = run_queue_hd;
2319     if (t != END_TSO_QUEUE) {
2320       run_queue_hd = t->link;
2321       t->link = END_TSO_QUEUE;
2322       if (run_queue_hd == END_TSO_QUEUE) {
2323         run_queue_tl = END_TSO_QUEUE;
2324       }
2325     }
2326   }
2327   return t;
2328 }
2329
2330 #endif /* 0 */
2331
2332 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2333 //@subsection Garbage Collextion Routines
2334
2335 /* ---------------------------------------------------------------------------
2336    Where are the roots that we know about?
2337
2338         - all the threads on the runnable queue
2339         - all the threads on the blocked queue
2340         - all the threads on the sleeping queue
2341         - all the thread currently executing a _ccall_GC
2342         - all the "main threads"
2343      
2344    ------------------------------------------------------------------------ */
2345
2346 /* This has to be protected either by the scheduler monitor, or by the
2347         garbage collection monitor (probably the latter).
2348         KH @ 25/10/99
2349 */
2350
2351 void
2352 GetRoots(evac_fn evac)
2353 {
2354 #if defined(GRAN)
2355   {
2356     nat i;
2357     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2358       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2359           evac((StgClosure **)&run_queue_hds[i]);
2360       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2361           evac((StgClosure **)&run_queue_tls[i]);
2362       
2363       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2364           evac((StgClosure **)&blocked_queue_hds[i]);
2365       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2366           evac((StgClosure **)&blocked_queue_tls[i]);
2367       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2368           evac((StgClosure **)&ccalling_threads[i]);
2369     }
2370   }
2371
2372   markEventQueue();
2373
2374 #else /* !GRAN */
2375   if (run_queue_hd != END_TSO_QUEUE) {
2376       ASSERT(run_queue_tl != END_TSO_QUEUE);
2377       evac((StgClosure **)&run_queue_hd);
2378       evac((StgClosure **)&run_queue_tl);
2379   }
2380   
2381   if (blocked_queue_hd != END_TSO_QUEUE) {
2382       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2383       evac((StgClosure **)&blocked_queue_hd);
2384       evac((StgClosure **)&blocked_queue_tl);
2385   }
2386   
2387   if (sleeping_queue != END_TSO_QUEUE) {
2388       evac((StgClosure **)&sleeping_queue);
2389   }
2390 #endif 
2391
2392   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2393       evac((StgClosure **)&suspended_ccalling_threads);
2394   }
2395
2396 #if defined(PAR) || defined(GRAN)
2397   markSparkQueue(evac);
2398 #endif
2399 }
2400
2401 /* -----------------------------------------------------------------------------
2402    performGC
2403
2404    This is the interface to the garbage collector from Haskell land.
2405    We provide this so that external C code can allocate and garbage
2406    collect when called from Haskell via _ccall_GC.
2407
2408    It might be useful to provide an interface whereby the programmer
2409    can specify more roots (ToDo).
2410    
2411    This needs to be protected by the GC condition variable above.  KH.
2412    -------------------------------------------------------------------------- */
2413
2414 void (*extra_roots)(evac_fn);
2415
2416 void
2417 performGC(void)
2418 {
2419   /* Obligated to hold this lock upon entry */
2420   ACQUIRE_LOCK(&sched_mutex);
2421   GarbageCollect(GetRoots,rtsFalse);
2422   RELEASE_LOCK(&sched_mutex);
2423 }
2424
2425 void
2426 performMajorGC(void)
2427 {
2428   ACQUIRE_LOCK(&sched_mutex);
2429   GarbageCollect(GetRoots,rtsTrue);
2430   RELEASE_LOCK(&sched_mutex);
2431 }
2432
2433 static void
2434 AllRoots(evac_fn evac)
2435 {
2436     GetRoots(evac);             // the scheduler's roots
2437     extra_roots(evac);          // the user's roots
2438 }
2439
2440 void
2441 performGCWithRoots(void (*get_roots)(evac_fn))
2442 {
2443   ACQUIRE_LOCK(&sched_mutex);
2444   extra_roots = get_roots;
2445   GarbageCollect(AllRoots,rtsFalse);
2446   RELEASE_LOCK(&sched_mutex);
2447 }
2448
2449 /* -----------------------------------------------------------------------------
2450    Stack overflow
2451
2452    If the thread has reached its maximum stack size, then raise the
2453    StackOverflow exception in the offending thread.  Otherwise
2454    relocate the TSO into a larger chunk of memory and adjust its stack
2455    size appropriately.
2456    -------------------------------------------------------------------------- */
2457
2458 static StgTSO *
2459 threadStackOverflow(StgTSO *tso)
2460 {
2461   nat new_stack_size, new_tso_size, diff, stack_words;
2462   StgPtr new_sp;
2463   StgTSO *dest;
2464
2465   IF_DEBUG(sanity,checkTSO(tso));
2466   if (tso->stack_size >= tso->max_stack_size) {
2467
2468     IF_DEBUG(gc,
2469              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2470                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2471              /* If we're debugging, just print out the top of the stack */
2472              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2473                                               tso->sp+64)));
2474
2475     /* Send this thread the StackOverflow exception */
2476     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2477     return tso;
2478   }
2479
2480   /* Try to double the current stack size.  If that takes us over the
2481    * maximum stack size for this thread, then use the maximum instead.
2482    * Finally round up so the TSO ends up as a whole number of blocks.
2483    */
2484   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2485   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2486                                        TSO_STRUCT_SIZE)/sizeof(W_);
2487   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2488   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2489
2490   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2491
2492   dest = (StgTSO *)allocate(new_tso_size);
2493   TICK_ALLOC_TSO(new_stack_size,0);
2494
2495   /* copy the TSO block and the old stack into the new area */
2496   memcpy(dest,tso,TSO_STRUCT_SIZE);
2497   stack_words = tso->stack + tso->stack_size - tso->sp;
2498   new_sp = (P_)dest + new_tso_size - stack_words;
2499   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2500
2501   /* relocate the stack pointers... */
2502   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2503   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2504   dest->sp    = new_sp;
2505   dest->stack_size = new_stack_size;
2506         
2507   /* and relocate the update frame list */
2508   relocate_stack(dest, diff);
2509
2510   /* Mark the old TSO as relocated.  We have to check for relocated
2511    * TSOs in the garbage collector and any primops that deal with TSOs.
2512    *
2513    * It's important to set the sp and su values to just beyond the end
2514    * of the stack, so we don't attempt to scavenge any part of the
2515    * dead TSO's stack.
2516    */
2517   tso->what_next = ThreadRelocated;
2518   tso->link = dest;
2519   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2520   tso->su = (StgUpdateFrame *)tso->sp;
2521   tso->why_blocked = NotBlocked;
2522   dest->mut_link = NULL;
2523
2524   IF_PAR_DEBUG(verbose,
2525                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2526                      tso->id, tso, tso->stack_size);
2527                /* If we're debugging, just print out the top of the stack */
2528                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2529                                                 tso->sp+64)));
2530   
2531   IF_DEBUG(sanity,checkTSO(tso));
2532 #if 0
2533   IF_DEBUG(scheduler,printTSO(dest));
2534 #endif
2535
2536   return dest;
2537 }
2538
2539 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2540 //@subsection Blocking Queue Routines
2541
2542 /* ---------------------------------------------------------------------------
2543    Wake up a queue that was blocked on some resource.
2544    ------------------------------------------------------------------------ */
2545
2546 #if defined(GRAN)
2547 static inline void
2548 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2549 {
2550 }
2551 #elif defined(PAR)
2552 static inline void
2553 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2554 {
2555   /* write RESUME events to log file and
2556      update blocked and fetch time (depending on type of the orig closure) */
2557   if (RtsFlags.ParFlags.ParStats.Full) {
2558     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2559                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2560                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2561     if (EMPTY_RUN_QUEUE())
2562       emitSchedule = rtsTrue;
2563
2564     switch (get_itbl(node)->type) {
2565         case FETCH_ME_BQ:
2566           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2567           break;
2568         case RBH:
2569         case FETCH_ME:
2570         case BLACKHOLE_BQ:
2571           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2572           break;
2573 #ifdef DIST
2574         case MVAR:
2575           break;
2576 #endif    
2577         default:
2578           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2579         }
2580       }
2581 }
2582 #endif
2583
2584 #if defined(GRAN)
2585 static StgBlockingQueueElement *
2586 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2587 {
2588     StgTSO *tso;
2589     PEs node_loc, tso_loc;
2590
2591     node_loc = where_is(node); // should be lifted out of loop
2592     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2593     tso_loc = where_is((StgClosure *)tso);
2594     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2595       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2596       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2597       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2598       // insertThread(tso, node_loc);
2599       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2600                 ResumeThread,
2601                 tso, node, (rtsSpark*)NULL);
2602       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2603       // len_local++;
2604       // len++;
2605     } else { // TSO is remote (actually should be FMBQ)
2606       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2607                                   RtsFlags.GranFlags.Costs.gunblocktime +
2608                                   RtsFlags.GranFlags.Costs.latency;
2609       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2610                 UnblockThread,
2611                 tso, node, (rtsSpark*)NULL);
2612       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2613       // len++;
2614     }
2615     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2616     IF_GRAN_DEBUG(bq,
2617                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2618                           (node_loc==tso_loc ? "Local" : "Global"), 
2619                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2620     tso->block_info.closure = NULL;
2621     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2622                              tso->id, tso));
2623 }
2624 #elif defined(PAR)
2625 static StgBlockingQueueElement *
2626 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2627 {
2628     StgBlockingQueueElement *next;
2629
2630     switch (get_itbl(bqe)->type) {
2631     case TSO:
2632       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2633       /* if it's a TSO just push it onto the run_queue */
2634       next = bqe->link;
2635       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2636       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2637       THREAD_RUNNABLE();
2638       unblockCount(bqe, node);
2639       /* reset blocking status after dumping event */
2640       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2641       break;
2642
2643     case BLOCKED_FETCH:
2644       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2645       next = bqe->link;
2646       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2647       PendingFetches = (StgBlockedFetch *)bqe;
2648       break;
2649
2650 # if defined(DEBUG)
2651       /* can ignore this case in a non-debugging setup; 
2652          see comments on RBHSave closures above */
2653     case CONSTR:
2654       /* check that the closure is an RBHSave closure */
2655       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2656              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2657              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2658       break;
2659
2660     default:
2661       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2662            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2663            (StgClosure *)bqe);
2664 # endif
2665     }
2666   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2667   return next;
2668 }
2669
2670 #else /* !GRAN && !PAR */
2671 static StgTSO *
2672 unblockOneLocked(StgTSO *tso)
2673 {
2674   StgTSO *next;
2675
2676   ASSERT(get_itbl(tso)->type == TSO);
2677   ASSERT(tso->why_blocked != NotBlocked);
2678   tso->why_blocked = NotBlocked;
2679   next = tso->link;
2680   PUSH_ON_RUN_QUEUE(tso);
2681   THREAD_RUNNABLE();
2682   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2683   return next;
2684 }
2685 #endif
2686
2687 #if defined(GRAN) || defined(PAR)
2688 inline StgBlockingQueueElement *
2689 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2690 {
2691   ACQUIRE_LOCK(&sched_mutex);
2692   bqe = unblockOneLocked(bqe, node);
2693   RELEASE_LOCK(&sched_mutex);
2694   return bqe;
2695 }
2696 #else
2697 inline StgTSO *
2698 unblockOne(StgTSO *tso)
2699 {
2700   ACQUIRE_LOCK(&sched_mutex);
2701   tso = unblockOneLocked(tso);
2702   RELEASE_LOCK(&sched_mutex);
2703   return tso;
2704 }
2705 #endif
2706
2707 #if defined(GRAN)
2708 void 
2709 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2710 {
2711   StgBlockingQueueElement *bqe;
2712   PEs node_loc;
2713   nat len = 0; 
2714
2715   IF_GRAN_DEBUG(bq, 
2716                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2717                       node, CurrentProc, CurrentTime[CurrentProc], 
2718                       CurrentTSO->id, CurrentTSO));
2719
2720   node_loc = where_is(node);
2721
2722   ASSERT(q == END_BQ_QUEUE ||
2723          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2724          get_itbl(q)->type == CONSTR); // closure (type constructor)
2725   ASSERT(is_unique(node));
2726
2727   /* FAKE FETCH: magically copy the node to the tso's proc;
2728      no Fetch necessary because in reality the node should not have been 
2729      moved to the other PE in the first place
2730   */
2731   if (CurrentProc!=node_loc) {
2732     IF_GRAN_DEBUG(bq, 
2733                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2734                         node, node_loc, CurrentProc, CurrentTSO->id, 
2735                         // CurrentTSO, where_is(CurrentTSO),
2736                         node->header.gran.procs));
2737     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2738     IF_GRAN_DEBUG(bq, 
2739                   belch("## new bitmask of node %p is %#x",
2740                         node, node->header.gran.procs));
2741     if (RtsFlags.GranFlags.GranSimStats.Global) {
2742       globalGranStats.tot_fake_fetches++;
2743     }
2744   }
2745
2746   bqe = q;
2747   // ToDo: check: ASSERT(CurrentProc==node_loc);
2748   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2749     //next = bqe->link;
2750     /* 
2751        bqe points to the current element in the queue
2752        next points to the next element in the queue
2753     */
2754     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2755     //tso_loc = where_is(tso);
2756     len++;
2757     bqe = unblockOneLocked(bqe, node);
2758   }
2759
2760   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2761      the closure to make room for the anchor of the BQ */
2762   if (bqe!=END_BQ_QUEUE) {
2763     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2764     /*
2765     ASSERT((info_ptr==&RBH_Save_0_info) ||
2766            (info_ptr==&RBH_Save_1_info) ||
2767            (info_ptr==&RBH_Save_2_info));
2768     */
2769     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2770     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2771     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2772
2773     IF_GRAN_DEBUG(bq,
2774                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2775                         node, info_type(node)));
2776   }
2777
2778   /* statistics gathering */
2779   if (RtsFlags.GranFlags.GranSimStats.Global) {
2780     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2781     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2782     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2783     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2784   }
2785   IF_GRAN_DEBUG(bq,
2786                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2787                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2788 }
2789 #elif defined(PAR)
2790 void 
2791 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2792 {
2793   StgBlockingQueueElement *bqe;
2794
2795   ACQUIRE_LOCK(&sched_mutex);
2796
2797   IF_PAR_DEBUG(verbose, 
2798                belch("##-_ AwBQ for node %p on [%x]: ",
2799                      node, mytid));
2800 #ifdef DIST  
2801   //RFP
2802   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2803     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2804     return;
2805   }
2806 #endif
2807   
2808   ASSERT(q == END_BQ_QUEUE ||
2809          get_itbl(q)->type == TSO ||           
2810          get_itbl(q)->type == BLOCKED_FETCH || 
2811          get_itbl(q)->type == CONSTR); 
2812
2813   bqe = q;
2814   while (get_itbl(bqe)->type==TSO || 
2815          get_itbl(bqe)->type==BLOCKED_FETCH) {
2816     bqe = unblockOneLocked(bqe, node);
2817   }
2818   RELEASE_LOCK(&sched_mutex);
2819 }
2820
2821 #else   /* !GRAN && !PAR */
2822 void
2823 awakenBlockedQueue(StgTSO *tso)
2824 {
2825   ACQUIRE_LOCK(&sched_mutex);
2826   while (tso != END_TSO_QUEUE) {
2827     tso = unblockOneLocked(tso);
2828   }
2829   RELEASE_LOCK(&sched_mutex);
2830 }
2831 #endif
2832
2833 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2834 //@subsection Exception Handling Routines
2835
2836 /* ---------------------------------------------------------------------------
2837    Interrupt execution
2838    - usually called inside a signal handler so it mustn't do anything fancy.   
2839    ------------------------------------------------------------------------ */
2840
2841 void
2842 interruptStgRts(void)
2843 {
2844     interrupted    = 1;
2845     context_switch = 1;
2846 }
2847
2848 /* -----------------------------------------------------------------------------
2849    Unblock a thread
2850
2851    This is for use when we raise an exception in another thread, which
2852    may be blocked.
2853    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2854    -------------------------------------------------------------------------- */
2855
2856 #if defined(GRAN) || defined(PAR)
2857 /*
2858   NB: only the type of the blocking queue is different in GranSim and GUM
2859       the operations on the queue-elements are the same
2860       long live polymorphism!
2861
2862   Locks: sched_mutex is held upon entry and exit.
2863
2864 */
2865 static void
2866 unblockThread(StgTSO *tso)
2867 {
2868   StgBlockingQueueElement *t, **last;
2869
2870   switch (tso->why_blocked) {
2871
2872   case NotBlocked:
2873     return;  /* not blocked */
2874
2875   case BlockedOnMVar:
2876     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2877     {
2878       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2879       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2880
2881       last = (StgBlockingQueueElement **)&mvar->head;
2882       for (t = (StgBlockingQueueElement *)mvar->head; 
2883            t != END_BQ_QUEUE; 
2884            last = &t->link, last_tso = t, t = t->link) {
2885         if (t == (StgBlockingQueueElement *)tso) {
2886           *last = (StgBlockingQueueElement *)tso->link;
2887           if (mvar->tail == tso) {
2888             mvar->tail = (StgTSO *)last_tso;
2889           }
2890           goto done;
2891         }
2892       }
2893       barf("unblockThread (MVAR): TSO not found");
2894     }
2895
2896   case BlockedOnBlackHole:
2897     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2898     {
2899       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2900
2901       last = &bq->blocking_queue;
2902       for (t = bq->blocking_queue; 
2903            t != END_BQ_QUEUE; 
2904            last = &t->link, t = t->link) {
2905         if (t == (StgBlockingQueueElement *)tso) {
2906           *last = (StgBlockingQueueElement *)tso->link;
2907           goto done;
2908         }
2909       }
2910       barf("unblockThread (BLACKHOLE): TSO not found");
2911     }
2912
2913   case BlockedOnException:
2914     {
2915       StgTSO *target  = tso->block_info.tso;
2916
2917       ASSERT(get_itbl(target)->type == TSO);
2918
2919       if (target->what_next == ThreadRelocated) {
2920           target = target->link;
2921           ASSERT(get_itbl(target)->type == TSO);
2922       }
2923
2924       ASSERT(target->blocked_exceptions != NULL);
2925
2926       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2927       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2928            t != END_BQ_QUEUE; 
2929            last = &t->link, t = t->link) {
2930         ASSERT(get_itbl(t)->type == TSO);
2931         if (t == (StgBlockingQueueElement *)tso) {
2932           *last = (StgBlockingQueueElement *)tso->link;
2933           goto done;
2934         }
2935       }
2936       barf("unblockThread (Exception): TSO not found");
2937     }
2938
2939   case BlockedOnRead:
2940   case BlockedOnWrite:
2941     {
2942       /* take TSO off blocked_queue */
2943       StgBlockingQueueElement *prev = NULL;
2944       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2945            prev = t, t = t->link) {
2946         if (t == (StgBlockingQueueElement *)tso) {
2947           if (prev == NULL) {
2948             blocked_queue_hd = (StgTSO *)t->link;
2949             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2950               blocked_queue_tl = END_TSO_QUEUE;
2951             }
2952           } else {
2953             prev->link = t->link;
2954             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2955               blocked_queue_tl = (StgTSO *)prev;
2956             }
2957           }
2958           goto done;
2959         }
2960       }
2961       barf("unblockThread (I/O): TSO not found");
2962     }
2963
2964   case BlockedOnDelay:
2965     {
2966       /* take TSO off sleeping_queue */
2967       StgBlockingQueueElement *prev = NULL;
2968       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2969            prev = t, t = t->link) {
2970         if (t == (StgBlockingQueueElement *)tso) {
2971           if (prev == NULL) {
2972             sleeping_queue = (StgTSO *)t->link;
2973           } else {
2974             prev->link = t->link;
2975           }
2976           goto done;
2977         }
2978       }
2979       barf("unblockThread (I/O): TSO not found");
2980     }
2981
2982   default:
2983     barf("unblockThread");
2984   }
2985
2986  done:
2987   tso->link = END_TSO_QUEUE;
2988   tso->why_blocked = NotBlocked;
2989   tso->block_info.closure = NULL;
2990   PUSH_ON_RUN_QUEUE(tso);
2991 }
2992 #else
2993 static void
2994 unblockThread(StgTSO *tso)
2995 {
2996   StgTSO *t, **last;
2997   
2998   /* To avoid locking unnecessarily. */
2999   if (tso->why_blocked == NotBlocked) {
3000     return;
3001   }
3002
3003   switch (tso->why_blocked) {
3004
3005   case BlockedOnMVar:
3006     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3007     {
3008       StgTSO *last_tso = END_TSO_QUEUE;
3009       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3010
3011       last = &mvar->head;
3012       for (t = mvar->head; t != END_TSO_QUEUE; 
3013            last = &t->link, last_tso = t, t = t->link) {
3014         if (t == tso) {
3015           *last = tso->link;
3016           if (mvar->tail == tso) {
3017             mvar->tail = last_tso;
3018           }
3019           goto done;
3020         }
3021       }
3022       barf("unblockThread (MVAR): TSO not found");
3023     }
3024
3025   case BlockedOnBlackHole:
3026     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3027     {
3028       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3029
3030       last = &bq->blocking_queue;
3031       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3032            last = &t->link, t = t->link) {
3033         if (t == tso) {
3034           *last = tso->link;
3035           goto done;
3036         }
3037       }
3038       barf("unblockThread (BLACKHOLE): TSO not found");
3039     }
3040
3041   case BlockedOnException:
3042     {
3043       StgTSO *target  = tso->block_info.tso;
3044
3045       ASSERT(get_itbl(target)->type == TSO);
3046
3047       while (target->what_next == ThreadRelocated) {
3048           target = target->link;
3049           ASSERT(get_itbl(target)->type == TSO);
3050       }
3051       
3052       ASSERT(target->blocked_exceptions != NULL);
3053
3054       last = &target->blocked_exceptions;
3055       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3056            last = &t->link, t = t->link) {
3057         ASSERT(get_itbl(t)->type == TSO);
3058         if (t == tso) {
3059           *last = tso->link;
3060           goto done;
3061         }
3062       }
3063       barf("unblockThread (Exception): TSO not found");
3064     }
3065
3066   case BlockedOnRead:
3067   case BlockedOnWrite:
3068     {
3069       StgTSO *prev = NULL;
3070       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3071            prev = t, t = t->link) {
3072         if (t == tso) {
3073           if (prev == NULL) {
3074             blocked_queue_hd = t->link;
3075             if (blocked_queue_tl == t) {
3076               blocked_queue_tl = END_TSO_QUEUE;
3077             }
3078           } else {
3079             prev->link = t->link;
3080             if (blocked_queue_tl == t) {
3081               blocked_queue_tl = prev;
3082             }
3083           }
3084           goto done;
3085         }
3086       }
3087       barf("unblockThread (I/O): TSO not found");
3088     }
3089
3090   case BlockedOnDelay:
3091     {
3092       StgTSO *prev = NULL;
3093       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3094            prev = t, t = t->link) {
3095         if (t == tso) {
3096           if (prev == NULL) {
3097             sleeping_queue = t->link;
3098           } else {
3099             prev->link = t->link;
3100           }
3101           goto done;
3102         }
3103       }
3104       barf("unblockThread (I/O): TSO not found");
3105     }
3106
3107   default:
3108     barf("unblockThread");
3109   }
3110
3111  done:
3112   tso->link = END_TSO_QUEUE;
3113   tso->why_blocked = NotBlocked;
3114   tso->block_info.closure = NULL;
3115   PUSH_ON_RUN_QUEUE(tso);
3116 }
3117 #endif
3118
3119 /* -----------------------------------------------------------------------------
3120  * raiseAsync()
3121  *
3122  * The following function implements the magic for raising an
3123  * asynchronous exception in an existing thread.
3124  *
3125  * We first remove the thread from any queue on which it might be
3126  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3127  *
3128  * We strip the stack down to the innermost CATCH_FRAME, building
3129  * thunks in the heap for all the active computations, so they can 
3130  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3131  * an application of the handler to the exception, and push it on
3132  * the top of the stack.
3133  * 
3134  * How exactly do we save all the active computations?  We create an
3135  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3136  * AP_UPDs pushes everything from the corresponding update frame
3137  * upwards onto the stack.  (Actually, it pushes everything up to the
3138  * next update frame plus a pointer to the next AP_UPD object.
3139  * Entering the next AP_UPD object pushes more onto the stack until we
3140  * reach the last AP_UPD object - at which point the stack should look
3141  * exactly as it did when we killed the TSO and we can continue
3142  * execution by entering the closure on top of the stack.
3143  *
3144  * We can also kill a thread entirely - this happens if either (a) the 
3145  * exception passed to raiseAsync is NULL, or (b) there's no
3146  * CATCH_FRAME on the stack.  In either case, we strip the entire
3147  * stack and replace the thread with a zombie.
3148  *
3149  * Locks: sched_mutex held upon entry nor exit.
3150  *
3151  * -------------------------------------------------------------------------- */
3152  
3153 void 
3154 deleteThread(StgTSO *tso)
3155 {
3156   raiseAsync(tso,NULL);
3157 }
3158
3159 void
3160 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3161 {
3162   /* When raising async exs from contexts where sched_mutex isn't held;
3163      use raiseAsyncWithLock(). */
3164   ACQUIRE_LOCK(&sched_mutex);
3165   raiseAsync(tso,exception);
3166   RELEASE_LOCK(&sched_mutex);
3167 }
3168
3169 void
3170 raiseAsync(StgTSO *tso, StgClosure *exception)
3171 {
3172   StgUpdateFrame* su = tso->su;
3173   StgPtr          sp = tso->sp;
3174   
3175   /* Thread already dead? */
3176   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3177     return;
3178   }
3179
3180   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3181
3182   /* Remove it from any blocking queues */
3183   unblockThread(tso);
3184
3185   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3186   /* The stack freezing code assumes there's a closure pointer on
3187    * the top of the stack.  This isn't always the case with compiled
3188    * code, so we have to push a dummy closure on the top which just
3189    * returns to the next return address on the stack.
3190    */
3191   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3192     *(--sp) = (W_)&stg_dummy_ret_closure;
3193   }
3194
3195   while (1) {
3196     nat words = ((P_)su - (P_)sp) - 1;
3197     nat i;
3198     StgAP_UPD * ap;
3199
3200     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3201      * then build the THUNK raise(exception), and leave it on
3202      * top of the CATCH_FRAME ready to enter.
3203      */
3204     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3205 #ifdef PROFILING
3206       StgCatchFrame *cf = (StgCatchFrame *)su;
3207 #endif
3208       StgClosure *raise;
3209
3210       /* we've got an exception to raise, so let's pass it to the
3211        * handler in this frame.
3212        */
3213       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3214       TICK_ALLOC_SE_THK(1,0);
3215       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3216       raise->payload[0] = exception;
3217
3218       /* throw away the stack from Sp up to the CATCH_FRAME.
3219        */
3220       sp = (P_)su - 1;
3221
3222       /* Ensure that async excpetions are blocked now, so we don't get
3223        * a surprise exception before we get around to executing the
3224        * handler.
3225        */
3226       if (tso->blocked_exceptions == NULL) {
3227           tso->blocked_exceptions = END_TSO_QUEUE;
3228       }
3229
3230       /* Put the newly-built THUNK on top of the stack, ready to execute
3231        * when the thread restarts.
3232        */
3233       sp[0] = (W_)raise;
3234       tso->sp = sp;
3235       tso->su = su;
3236       tso->what_next = ThreadEnterGHC;
3237       IF_DEBUG(sanity, checkTSO(tso));
3238       return;
3239     }
3240
3241     /* First build an AP_UPD consisting of the stack chunk above the
3242      * current update frame, with the top word on the stack as the
3243      * fun field.
3244      */
3245     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3246     
3247     ASSERT(words >= 0);
3248     
3249     ap->n_args = words;
3250     ap->fun    = (StgClosure *)sp[0];
3251     sp++;
3252     for(i=0; i < (nat)words; ++i) {
3253       ap->payload[i] = (StgClosure *)*sp++;
3254     }
3255     
3256     switch (get_itbl(su)->type) {
3257       
3258     case UPDATE_FRAME:
3259       {
3260         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3261         TICK_ALLOC_UP_THK(words+1,0);
3262         
3263         IF_DEBUG(scheduler,
3264                  fprintf(stderr,  "scheduler: Updating ");
3265                  printPtr((P_)su->updatee); 
3266                  fprintf(stderr,  " with ");
3267                  printObj((StgClosure *)ap);
3268                  );
3269         
3270         /* Replace the updatee with an indirection - happily
3271          * this will also wake up any threads currently
3272          * waiting on the result.
3273          *
3274          * Warning: if we're in a loop, more than one update frame on
3275          * the stack may point to the same object.  Be careful not to
3276          * overwrite an IND_OLDGEN in this case, because we'll screw
3277          * up the mutable lists.  To be on the safe side, don't
3278          * overwrite any kind of indirection at all.  See also
3279          * threadSqueezeStack in GC.c, where we have to make a similar
3280          * check.
3281          */
3282         if (!closure_IND(su->updatee)) {
3283             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3284         }
3285         su = su->link;
3286         sp += sizeofW(StgUpdateFrame) -1;
3287         sp[0] = (W_)ap; /* push onto stack */
3288         break;
3289       }
3290
3291     case CATCH_FRAME:
3292       {
3293         StgCatchFrame *cf = (StgCatchFrame *)su;
3294         StgClosure* o;
3295         
3296         /* We want a PAP, not an AP_UPD.  Fortunately, the
3297          * layout's the same.
3298          */
3299         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3300         TICK_ALLOC_UPD_PAP(words+1,0);
3301         
3302         /* now build o = FUN(catch,ap,handler) */
3303         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3304         TICK_ALLOC_FUN(2,0);
3305         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3306         o->payload[0] = (StgClosure *)ap;
3307         o->payload[1] = cf->handler;
3308         
3309         IF_DEBUG(scheduler,
3310                  fprintf(stderr,  "scheduler: Built ");
3311                  printObj((StgClosure *)o);
3312                  );
3313         
3314         /* pop the old handler and put o on the stack */
3315         su = cf->link;
3316         sp += sizeofW(StgCatchFrame) - 1;
3317         sp[0] = (W_)o;
3318         break;
3319       }
3320       
3321     case SEQ_FRAME:
3322       {
3323         StgSeqFrame *sf = (StgSeqFrame *)su;
3324         StgClosure* o;
3325         
3326         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3327         TICK_ALLOC_UPD_PAP(words+1,0);
3328         
3329         /* now build o = FUN(seq,ap) */
3330         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3331         TICK_ALLOC_SE_THK(1,0);
3332         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3333         o->payload[0] = (StgClosure *)ap;
3334         
3335         IF_DEBUG(scheduler,
3336                  fprintf(stderr,  "scheduler: Built ");
3337                  printObj((StgClosure *)o);
3338                  );
3339         
3340         /* pop the old handler and put o on the stack */
3341         su = sf->link;
3342         sp += sizeofW(StgSeqFrame) - 1;
3343         sp[0] = (W_)o;
3344         break;
3345       }
3346       
3347     case STOP_FRAME:
3348       /* We've stripped the entire stack, the thread is now dead. */
3349       sp += sizeofW(StgStopFrame) - 1;
3350       sp[0] = (W_)exception;    /* save the exception */
3351       tso->what_next = ThreadKilled;
3352       tso->su = (StgUpdateFrame *)(sp+1);
3353       tso->sp = sp;
3354       return;
3355
3356     default:
3357       barf("raiseAsync");
3358     }
3359   }
3360   barf("raiseAsync");
3361 }
3362
3363 /* -----------------------------------------------------------------------------
3364    resurrectThreads is called after garbage collection on the list of
3365    threads found to be garbage.  Each of these threads will be woken
3366    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3367    on an MVar, or NonTermination if the thread was blocked on a Black
3368    Hole.
3369
3370    Locks: sched_mutex isn't held upon entry nor exit.
3371    -------------------------------------------------------------------------- */
3372
3373 void
3374 resurrectThreads( StgTSO *threads )
3375 {
3376   StgTSO *tso, *next;
3377
3378   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3379     next = tso->global_link;
3380     tso->global_link = all_threads;
3381     all_threads = tso;
3382     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3383
3384     switch (tso->why_blocked) {
3385     case BlockedOnMVar:
3386     case BlockedOnException:
3387       /* Called by GC - sched_mutex lock is currently held. */
3388       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3389       break;
3390     case BlockedOnBlackHole:
3391       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3392       break;
3393     case NotBlocked:
3394       /* This might happen if the thread was blocked on a black hole
3395        * belonging to a thread that we've just woken up (raiseAsync
3396        * can wake up threads, remember...).
3397        */
3398       continue;
3399     default:
3400       barf("resurrectThreads: thread blocked in a strange way");
3401     }
3402   }
3403 }
3404
3405 /* -----------------------------------------------------------------------------
3406  * Blackhole detection: if we reach a deadlock, test whether any
3407  * threads are blocked on themselves.  Any threads which are found to
3408  * be self-blocked get sent a NonTermination exception.
3409  *
3410  * This is only done in a deadlock situation in order to avoid
3411  * performance overhead in the normal case.
3412  *
3413  * Locks: sched_mutex is held upon entry and exit.
3414  * -------------------------------------------------------------------------- */
3415
3416 static void
3417 detectBlackHoles( void )
3418 {
3419     StgTSO *t = all_threads;
3420     StgUpdateFrame *frame;
3421     StgClosure *blocked_on;
3422
3423     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3424
3425         while (t->what_next == ThreadRelocated) {
3426             t = t->link;
3427             ASSERT(get_itbl(t)->type == TSO);
3428         }
3429       
3430         if (t->why_blocked != BlockedOnBlackHole) {
3431             continue;
3432         }
3433
3434         blocked_on = t->block_info.closure;
3435
3436         for (frame = t->su; ; frame = frame->link) {
3437             switch (get_itbl(frame)->type) {
3438
3439             case UPDATE_FRAME:
3440                 if (frame->updatee == blocked_on) {
3441                     /* We are blocking on one of our own computations, so
3442                      * send this thread the NonTermination exception.  
3443                      */
3444                     IF_DEBUG(scheduler, 
3445                              sched_belch("thread %d is blocked on itself", t->id));
3446                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3447                     goto done;
3448                 }
3449                 else {
3450                     continue;
3451                 }
3452
3453             case CATCH_FRAME:
3454             case SEQ_FRAME:
3455                 continue;
3456                 
3457             case STOP_FRAME:
3458                 break;
3459             }
3460             break;
3461         }
3462
3463     done: ;
3464     }   
3465 }
3466
3467 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3468 //@subsection Debugging Routines
3469
3470 /* -----------------------------------------------------------------------------
3471    Debugging: why is a thread blocked
3472    -------------------------------------------------------------------------- */
3473
3474 #ifdef DEBUG
3475
3476 void
3477 printThreadBlockage(StgTSO *tso)
3478 {
3479   switch (tso->why_blocked) {
3480   case BlockedOnRead:
3481     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3482     break;
3483   case BlockedOnWrite:
3484     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3485     break;
3486   case BlockedOnDelay:
3487     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3488     break;
3489   case BlockedOnMVar:
3490     fprintf(stderr,"is blocked on an MVar");
3491     break;
3492   case BlockedOnException:
3493     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3494             tso->block_info.tso->id);
3495     break;
3496   case BlockedOnBlackHole:
3497     fprintf(stderr,"is blocked on a black hole");
3498     break;
3499   case NotBlocked:
3500     fprintf(stderr,"is not blocked");
3501     break;
3502 #if defined(PAR)
3503   case BlockedOnGA:
3504     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3505             tso->block_info.closure, info_type(tso->block_info.closure));
3506     break;
3507   case BlockedOnGA_NoSend:
3508     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3509             tso->block_info.closure, info_type(tso->block_info.closure));
3510     break;
3511 #endif
3512 #if defined(RTS_SUPPORTS_THREADS)
3513   case BlockedOnCCall:
3514     fprintf(stderr,"is blocked on an external call");
3515     break;
3516 #endif
3517   default:
3518     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3519          tso->why_blocked, tso->id, tso);
3520   }
3521 }
3522
3523 void
3524 printThreadStatus(StgTSO *tso)
3525 {
3526   switch (tso->what_next) {
3527   case ThreadKilled:
3528     fprintf(stderr,"has been killed");
3529     break;
3530   case ThreadComplete:
3531     fprintf(stderr,"has completed");
3532     break;
3533   default:
3534     printThreadBlockage(tso);
3535   }
3536 }
3537
3538 void
3539 printAllThreads(void)
3540 {
3541   StgTSO *t;
3542
3543 # if defined(GRAN)
3544   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3545   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3546                        time_string, rtsFalse/*no commas!*/);
3547
3548   sched_belch("all threads at [%s]:", time_string);
3549 # elif defined(PAR)
3550   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3551   ullong_format_string(CURRENT_TIME,
3552                        time_string, rtsFalse/*no commas!*/);
3553
3554   sched_belch("all threads at [%s]:", time_string);
3555 # else
3556   sched_belch("all threads:");
3557 # endif
3558
3559   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3560     fprintf(stderr, "\tthread %d ", t->id);
3561     if (t->label) fprintf(stderr,"[\"%s\"] ",t->label);
3562     printThreadStatus(t);
3563     fprintf(stderr,"\n");
3564   }
3565 }
3566     
3567 /* 
3568    Print a whole blocking queue attached to node (debugging only).
3569 */
3570 //@cindex print_bq
3571 # if defined(PAR)
3572 void 
3573 print_bq (StgClosure *node)
3574 {
3575   StgBlockingQueueElement *bqe;
3576   StgTSO *tso;
3577   rtsBool end;
3578
3579   fprintf(stderr,"## BQ of closure %p (%s): ",
3580           node, info_type(node));
3581
3582   /* should cover all closures that may have a blocking queue */
3583   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3584          get_itbl(node)->type == FETCH_ME_BQ ||
3585          get_itbl(node)->type == RBH ||
3586          get_itbl(node)->type == MVAR);
3587     
3588   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3589
3590   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3591 }
3592
3593 /* 
3594    Print a whole blocking queue starting with the element bqe.
3595 */
3596 void 
3597 print_bqe (StgBlockingQueueElement *bqe)
3598 {
3599   rtsBool end;
3600
3601   /* 
3602      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3603   */
3604   for (end = (bqe==END_BQ_QUEUE);
3605        !end; // iterate until bqe points to a CONSTR
3606        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3607        bqe = end ? END_BQ_QUEUE : bqe->link) {
3608     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3609     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3610     /* types of closures that may appear in a blocking queue */
3611     ASSERT(get_itbl(bqe)->type == TSO ||           
3612            get_itbl(bqe)->type == BLOCKED_FETCH || 
3613            get_itbl(bqe)->type == CONSTR); 
3614     /* only BQs of an RBH end with an RBH_Save closure */
3615     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3616
3617     switch (get_itbl(bqe)->type) {
3618     case TSO:
3619       fprintf(stderr," TSO %u (%x),",
3620               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3621       break;
3622     case BLOCKED_FETCH:
3623       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3624               ((StgBlockedFetch *)bqe)->node, 
3625               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3626               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3627               ((StgBlockedFetch *)bqe)->ga.weight);
3628       break;
3629     case CONSTR:
3630       fprintf(stderr," %s (IP %p),",
3631               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3632                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3633                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3634                "RBH_Save_?"), get_itbl(bqe));
3635       break;
3636     default:
3637       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3638            info_type((StgClosure *)bqe)); // , node, info_type(node));
3639       break;
3640     }
3641   } /* for */
3642   fputc('\n', stderr);
3643 }
3644 # elif defined(GRAN)
3645 void 
3646 print_bq (StgClosure *node)
3647 {
3648   StgBlockingQueueElement *bqe;
3649   PEs node_loc, tso_loc;
3650   rtsBool end;
3651
3652   /* should cover all closures that may have a blocking queue */
3653   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3654          get_itbl(node)->type == FETCH_ME_BQ ||
3655          get_itbl(node)->type == RBH);
3656     
3657   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3658   node_loc = where_is(node);
3659
3660   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3661           node, info_type(node), node_loc);
3662
3663   /* 
3664      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3665   */
3666   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3667        !end; // iterate until bqe points to a CONSTR
3668        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3669     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3670     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3671     /* types of closures that may appear in a blocking queue */
3672     ASSERT(get_itbl(bqe)->type == TSO ||           
3673            get_itbl(bqe)->type == CONSTR); 
3674     /* only BQs of an RBH end with an RBH_Save closure */
3675     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3676
3677     tso_loc = where_is((StgClosure *)bqe);
3678     switch (get_itbl(bqe)->type) {
3679     case TSO:
3680       fprintf(stderr," TSO %d (%p) on [PE %d],",
3681               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3682       break;
3683     case CONSTR:
3684       fprintf(stderr," %s (IP %p),",
3685               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3686                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3687                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3688                "RBH_Save_?"), get_itbl(bqe));
3689       break;
3690     default:
3691       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3692            info_type((StgClosure *)bqe), node, info_type(node));
3693       break;
3694     }
3695   } /* for */
3696   fputc('\n', stderr);
3697 }
3698 #else
3699 /* 
3700    Nice and easy: only TSOs on the blocking queue
3701 */
3702 void 
3703 print_bq (StgClosure *node)
3704 {
3705   StgTSO *tso;
3706
3707   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3708   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3709        tso != END_TSO_QUEUE; 
3710        tso=tso->link) {
3711     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3712     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3713     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3714   }
3715   fputc('\n', stderr);
3716 }
3717 # endif
3718
3719 #if defined(PAR)
3720 static nat
3721 run_queue_len(void)
3722 {
3723   nat i;
3724   StgTSO *tso;
3725
3726   for (i=0, tso=run_queue_hd; 
3727        tso != END_TSO_QUEUE;
3728        i++, tso=tso->link)
3729     /* nothing */
3730
3731   return i;
3732 }
3733 #endif
3734
3735 static void
3736 sched_belch(char *s, ...)
3737 {
3738   va_list ap;
3739   va_start(ap,s);
3740 #ifdef SMP
3741   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3742 #elif defined(PAR)
3743   fprintf(stderr, "== ");
3744 #else
3745   fprintf(stderr, "scheduler: ");
3746 #endif
3747   vfprintf(stderr, s, ap);
3748   fprintf(stderr, "\n");
3749   va_end(ap);
3750 }
3751
3752 #endif /* DEBUG */
3753
3754
3755 //@node Index,  , Debugging Routines, Main scheduling code
3756 //@subsection Index
3757
3758 //@index
3759 //* StgMainThread::  @cindex\s-+StgMainThread
3760 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3761 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3762 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3763 //* context_switch::  @cindex\s-+context_switch
3764 //* createThread::  @cindex\s-+createThread
3765 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3766 //* initScheduler::  @cindex\s-+initScheduler
3767 //* interrupted::  @cindex\s-+interrupted
3768 //* next_thread_id::  @cindex\s-+next_thread_id
3769 //* print_bq::  @cindex\s-+print_bq
3770 //* run_queue_hd::  @cindex\s-+run_queue_hd
3771 //* run_queue_tl::  @cindex\s-+run_queue_tl
3772 //* sched_mutex::  @cindex\s-+sched_mutex
3773 //* schedule::  @cindex\s-+schedule
3774 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3775 //* term_mutex::  @cindex\s-+term_mutex
3776 //@end index