[project @ 2002-04-10 11:43:43 by stolz]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.136 2002/04/10 11:43:45 stolz Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #ifdef HAVE_SYS_TYPES_H
118 #include <sys/types.h>
119 #endif
120 #ifdef HAVE_UNISTD_H
121 #include <unistd.h>
122 #endif
123
124 #include <stdarg.h>
125
126 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
127 //@subsection Variables and Data structures
128
129 /* Main thread queue.
130  * Locks required: sched_mutex.
131  */
132 StgMainThread *main_threads;
133
134 /* Thread queues.
135  * Locks required: sched_mutex.
136  */
137 #if defined(GRAN)
138
139 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
140 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
141
142 /* 
143    In GranSim we have a runnable and a blocked queue for each processor.
144    In order to minimise code changes new arrays run_queue_hds/tls
145    are created. run_queue_hd is then a short cut (macro) for
146    run_queue_hds[CurrentProc] (see GranSim.h).
147    -- HWL
148 */
149 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
150 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
151 StgTSO *ccalling_threadss[MAX_PROC];
152 /* We use the same global list of threads (all_threads) in GranSim as in
153    the std RTS (i.e. we are cheating). However, we don't use this list in
154    the GranSim specific code at the moment (so we are only potentially
155    cheating).  */
156
157 #else /* !GRAN */
158
159 StgTSO *run_queue_hd, *run_queue_tl;
160 StgTSO *blocked_queue_hd, *blocked_queue_tl;
161 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
162
163 #endif
164
165 /* Linked list of all threads.
166  * Used for detecting garbage collected threads.
167  */
168 StgTSO *all_threads;
169
170 /* When a thread performs a safe C call (_ccall_GC, using old
171  * terminology), it gets put on the suspended_ccalling_threads
172  * list. Used by the garbage collector.
173  */
174 static StgTSO *suspended_ccalling_threads;
175
176 static StgTSO *threadStackOverflow(StgTSO *tso);
177
178 /* KH: The following two flags are shared memory locations.  There is no need
179        to lock them, since they are only unset at the end of a scheduler
180        operation.
181 */
182
183 /* flag set by signal handler to precipitate a context switch */
184 //@cindex context_switch
185 nat context_switch;
186
187 /* if this flag is set as well, give up execution */
188 //@cindex interrupted
189 rtsBool interrupted;
190
191 /* Next thread ID to allocate.
192  * Locks required: sched_mutex
193  */
194 //@cindex next_thread_id
195 StgThreadID next_thread_id = 1;
196
197 /*
198  * Pointers to the state of the current thread.
199  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
200  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
201  */
202  
203 /* The smallest stack size that makes any sense is:
204  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
205  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
206  *  + 1                       (the realworld token for an IO thread)
207  *  + 1                       (the closure to enter)
208  *
209  * A thread with this stack will bomb immediately with a stack
210  * overflow, which will increase its stack size.  
211  */
212
213 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
214
215
216 #if defined(GRAN)
217 StgTSO *CurrentTSO;
218 #endif
219
220 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
221  *  exists - earlier gccs apparently didn't.
222  *  -= chak
223  */
224 StgTSO dummy_tso;
225
226 rtsBool ready_to_gc;
227
228 void            addToBlockedQueue ( StgTSO *tso );
229
230 static void     schedule          ( void );
231        void     interruptStgRts   ( void );
232 #if defined(GRAN)
233 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
234 #else
235 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
236 #endif
237
238 static void     detectBlackHoles  ( void );
239
240 #ifdef DEBUG
241 static void sched_belch(char *s, ...);
242 #endif
243
244 #if defined(RTS_SUPPORTS_THREADS)
245 /* ToDo: carefully document the invariants that go together
246  *       with these synchronisation objects.
247  */
248 Mutex     sched_mutex       = INIT_MUTEX_VAR;
249 Mutex     term_mutex        = INIT_MUTEX_VAR;
250
251 # if defined(SMP)
252 static Condition gc_pending_cond = INIT_COND_VAR;
253 nat await_death;
254 # endif
255
256 #endif /* RTS_SUPPORTS_THREADS */
257
258 #if defined(PAR)
259 StgTSO *LastTSO;
260 rtsTime TimeOfLastYield;
261 rtsBool emitSchedule = rtsTrue;
262 #endif
263
264 #if DEBUG
265 char *whatNext_strs[] = {
266   "ThreadEnterGHC",
267   "ThreadRunGHC",
268   "ThreadEnterInterp",
269   "ThreadKilled",
270   "ThreadComplete"
271 };
272
273 char *threadReturnCode_strs[] = {
274   "HeapOverflow",                       /* might also be StackOverflow */
275   "StackOverflow",
276   "ThreadYielding",
277   "ThreadBlocked",
278   "ThreadFinished"
279 };
280 #endif
281
282 #if defined(PAR)
283 StgTSO * createSparkThread(rtsSpark spark);
284 StgTSO * activateSpark (rtsSpark spark);  
285 #endif
286
287 /*
288  * The thread state for the main thread.
289 // ToDo: check whether not needed any more
290 StgTSO   *MainTSO;
291  */
292
293 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
294 static void taskStart(void);
295 static void
296 taskStart(void)
297 {
298   schedule();
299 }
300 #endif
301
302
303
304
305 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
306 //@subsection Main scheduling loop
307
308 /* ---------------------------------------------------------------------------
309    Main scheduling loop.
310
311    We use round-robin scheduling, each thread returning to the
312    scheduler loop when one of these conditions is detected:
313
314       * out of heap space
315       * timer expires (thread yields)
316       * thread blocks
317       * thread ends
318       * stack overflow
319
320    Locking notes:  we acquire the scheduler lock once at the beginning
321    of the scheduler loop, and release it when
322     
323       * running a thread, or
324       * waiting for work, or
325       * waiting for a GC to complete.
326
327    GRAN version:
328      In a GranSim setup this loop iterates over the global event queue.
329      This revolves around the global event queue, which determines what 
330      to do next. Therefore, it's more complicated than either the 
331      concurrent or the parallel (GUM) setup.
332
333    GUM version:
334      GUM iterates over incoming messages.
335      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
336      and sends out a fish whenever it has nothing to do; in-between
337      doing the actual reductions (shared code below) it processes the
338      incoming messages and deals with delayed operations 
339      (see PendingFetches).
340      This is not the ugliest code you could imagine, but it's bloody close.
341
342    ------------------------------------------------------------------------ */
343 //@cindex schedule
344 static void
345 schedule( void )
346 {
347   StgTSO *t;
348   Capability *cap;
349   StgThreadReturnCode ret;
350 #if defined(GRAN)
351   rtsEvent *event;
352 #elif defined(PAR)
353   StgSparkPool *pool;
354   rtsSpark spark;
355   StgTSO *tso;
356   GlobalTaskId pe;
357   rtsBool receivedFinish = rtsFalse;
358 # if defined(DEBUG)
359   nat tp_size, sp_size; // stats only
360 # endif
361 #endif
362   rtsBool was_interrupted = rtsFalse;
363   
364   ACQUIRE_LOCK(&sched_mutex);
365  
366 #if defined(RTS_SUPPORTS_THREADS)
367   /* Check to see whether there are any worker threads
368      waiting to deposit external call results. If so,
369      yield our capability */
370   yieldToReturningWorker(&sched_mutex, cap);
371
372   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
373 #endif
374
375 #if defined(GRAN)
376   /* set up first event to get things going */
377   /* ToDo: assign costs for system setup and init MainTSO ! */
378   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
379             ContinueThread, 
380             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
381
382   IF_DEBUG(gran,
383            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
384            G_TSO(CurrentTSO, 5));
385
386   if (RtsFlags.GranFlags.Light) {
387     /* Save current time; GranSim Light only */
388     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
389   }      
390
391   event = get_next_event();
392
393   while (event!=(rtsEvent*)NULL) {
394     /* Choose the processor with the next event */
395     CurrentProc = event->proc;
396     CurrentTSO = event->tso;
397
398 #elif defined(PAR)
399
400   while (!receivedFinish) {    /* set by processMessages */
401                                /* when receiving PP_FINISH message         */ 
402 #else
403
404   while (1) {
405
406 #endif
407
408     IF_DEBUG(scheduler, printAllThreads());
409
410     /* If we're interrupted (the user pressed ^C, or some other
411      * termination condition occurred), kill all the currently running
412      * threads.
413      */
414     if (interrupted) {
415       IF_DEBUG(scheduler, sched_belch("interrupted"));
416       deleteAllThreads();
417       interrupted = rtsFalse;
418       was_interrupted = rtsTrue;
419     }
420
421     /* Go through the list of main threads and wake up any
422      * clients whose computations have finished.  ToDo: this
423      * should be done more efficiently without a linear scan
424      * of the main threads list, somehow...
425      */
426 #if defined(RTS_SUPPORTS_THREADS)
427     { 
428       StgMainThread *m, **prev;
429       prev = &main_threads;
430       for (m = main_threads; m != NULL; m = m->link) {
431         switch (m->tso->what_next) {
432         case ThreadComplete:
433           if (m->ret) {
434             *(m->ret) = (StgClosure *)m->tso->sp[0];
435           }
436           *prev = m->link;
437           m->stat = Success;
438           broadcastCondition(&m->wakeup);
439 #ifdef DEBUG
440           free(m->tso->label);
441 #endif
442           break;
443         case ThreadKilled:
444           if (m->ret) *(m->ret) = NULL;
445           *prev = m->link;
446           if (was_interrupted) {
447             m->stat = Interrupted;
448           } else {
449             m->stat = Killed;
450           }
451           broadcastCondition(&m->wakeup);
452 #ifdef DEBUG
453           free(m->tso->label);
454 #endif
455           break;
456         default:
457           break;
458         }
459       }
460     }
461
462 #else /* not threaded */
463
464 # if defined(PAR)
465     /* in GUM do this only on the Main PE */
466     if (IAmMainThread)
467 # endif
468     /* If our main thread has finished or been killed, return.
469      */
470     {
471       StgMainThread *m = main_threads;
472       if (m->tso->what_next == ThreadComplete
473           || m->tso->what_next == ThreadKilled) {
474 #ifdef DEBUG
475         free(m->tso->label);
476 #endif
477         main_threads = main_threads->link;
478         if (m->tso->what_next == ThreadComplete) {
479           /* we finished successfully, fill in the return value */
480           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
481           m->stat = Success;
482           return;
483         } else {
484           if (m->ret) { *(m->ret) = NULL; };
485           if (was_interrupted) {
486             m->stat = Interrupted;
487           } else {
488             m->stat = Killed;
489           }
490           return;
491         }
492       }
493     }
494 #endif
495
496     /* Top up the run queue from our spark pool.  We try to make the
497      * number of threads in the run queue equal to the number of
498      * free capabilities.
499      *
500      * Disable spark support in SMP for now, non-essential & requires
501      * a little bit of work to make it compile cleanly. -- sof 1/02.
502      */
503 #if 0 /* defined(SMP) */
504     {
505       nat n = getFreeCapabilities();
506       StgTSO *tso = run_queue_hd;
507
508       /* Count the run queue */
509       while (n > 0 && tso != END_TSO_QUEUE) {
510         tso = tso->link;
511         n--;
512       }
513
514       for (; n > 0; n--) {
515         StgClosure *spark;
516         spark = findSpark(rtsFalse);
517         if (spark == NULL) {
518           break; /* no more sparks in the pool */
519         } else {
520           /* I'd prefer this to be done in activateSpark -- HWL */
521           /* tricky - it needs to hold the scheduler lock and
522            * not try to re-acquire it -- SDM */
523           createSparkThread(spark);       
524           IF_DEBUG(scheduler,
525                    sched_belch("==^^ turning spark of closure %p into a thread",
526                                (StgClosure *)spark));
527         }
528       }
529       /* We need to wake up the other tasks if we just created some
530        * work for them.
531        */
532       if (getFreeCapabilities() - n > 1) {
533           signalCondition( &thread_ready_cond );
534       }
535     }
536 #endif // SMP
537
538     /* check for signals each time around the scheduler */
539 #ifndef mingw32_TARGET_OS
540     if (signals_pending()) {
541       startSignalHandlers();
542     }
543 #endif
544
545     /* Check whether any waiting threads need to be woken up.  If the
546      * run queue is empty, and there are no other tasks running, we
547      * can wait indefinitely for something to happen.
548      * ToDo: what if another client comes along & requests another
549      * main thread?
550      */
551     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
552       awaitEvent( EMPTY_RUN_QUEUE()
553 #if defined(SMP)
554         && allFreeCapabilities()
555 #endif
556         );
557     }
558     /* we can be interrupted while waiting for I/O... */
559     if (interrupted) continue;
560
561     /* 
562      * Detect deadlock: when we have no threads to run, there are no
563      * threads waiting on I/O or sleeping, and all the other tasks are
564      * waiting for work, we must have a deadlock of some description.
565      *
566      * We first try to find threads blocked on themselves (ie. black
567      * holes), and generate NonTermination exceptions where necessary.
568      *
569      * If no threads are black holed, we have a deadlock situation, so
570      * inform all the main threads.
571      */
572 #ifndef PAR
573     if (   EMPTY_THREAD_QUEUES()
574 #if defined(RTS_SUPPORTS_THREADS)
575         && EMPTY_QUEUE(suspended_ccalling_threads)
576 #endif
577 #ifdef SMP
578         && allFreeCapabilities()
579 #endif
580         )
581     {
582         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
583 #if defined(THREADED_RTS)
584         /* and SMP mode ..? */
585         releaseCapability(cap);
586 #endif
587         // Garbage collection can release some new threads due to
588         // either (a) finalizers or (b) threads resurrected because
589         // they are about to be send BlockedOnDeadMVar.  Any threads
590         // thus released will be immediately runnable.
591         GarbageCollect(GetRoots,rtsTrue);
592
593         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
594
595         IF_DEBUG(scheduler, 
596                  sched_belch("still deadlocked, checking for black holes..."));
597         detectBlackHoles();
598
599         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
600
601 #ifndef mingw32_TARGET_OS
602         /* If we have user-installed signal handlers, then wait
603          * for signals to arrive rather then bombing out with a
604          * deadlock.
605          */
606         if ( anyUserHandlers() ) {
607             IF_DEBUG(scheduler, 
608                      sched_belch("still deadlocked, waiting for signals..."));
609
610             awaitUserSignals();
611
612             // we might be interrupted...
613             if (interrupted) { continue; }
614
615             if (signals_pending()) {
616                 startSignalHandlers();
617             }
618             ASSERT(!EMPTY_RUN_QUEUE());
619             goto not_deadlocked;
620         }
621 #endif
622
623         /* Probably a real deadlock.  Send the current main thread the
624          * Deadlock exception (or in the SMP build, send *all* main
625          * threads the deadlock exception, since none of them can make
626          * progress).
627          */
628         {
629             StgMainThread *m;
630 #if defined(RTS_SUPPORTS_THREADS)
631             for (m = main_threads; m != NULL; m = m->link) {
632                 switch (m->tso->why_blocked) {
633                 case BlockedOnBlackHole:
634                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
635                     break;
636                 case BlockedOnException:
637                 case BlockedOnMVar:
638                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
639                     break;
640                 default:
641                     barf("deadlock: main thread blocked in a strange way");
642                 }
643             }
644 #else
645             m = main_threads;
646             switch (m->tso->why_blocked) {
647             case BlockedOnBlackHole:
648                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
649                 break;
650             case BlockedOnException:
651             case BlockedOnMVar:
652                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
653                 break;
654             default:
655                 barf("deadlock: main thread blocked in a strange way");
656             }
657 #endif
658         }
659
660 #if defined(RTS_SUPPORTS_THREADS)
661         /* ToDo: revisit conditions (and mechanism) for shutting
662            down a multi-threaded world  */
663         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
664         shutdownHaskellAndExit(0);
665 #endif
666     }
667   not_deadlocked:
668
669 #elif defined(PAR)
670     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
671 #endif
672
673 #if defined(SMP)
674     /* If there's a GC pending, don't do anything until it has
675      * completed.
676      */
677     if (ready_to_gc) {
678       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
679       waitCondition( &gc_pending_cond, &sched_mutex );
680     }
681 #endif    
682
683 #if defined(RTS_SUPPORTS_THREADS)
684     /* block until we've got a thread on the run queue and a free
685      * capability.
686      *
687      */
688     if ( EMPTY_RUN_QUEUE() ) {
689       /* Give up our capability */
690       releaseCapability(cap);
691       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
692       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
693       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
694 #if 0
695       while ( EMPTY_RUN_QUEUE() ) {
696         waitForWorkCapability(&sched_mutex, &cap);
697         IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
698       }
699 #endif
700     }
701 #endif
702
703 #if defined(GRAN)
704     if (RtsFlags.GranFlags.Light)
705       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
706
707     /* adjust time based on time-stamp */
708     if (event->time > CurrentTime[CurrentProc] &&
709         event->evttype != ContinueThread)
710       CurrentTime[CurrentProc] = event->time;
711     
712     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
713     if (!RtsFlags.GranFlags.Light)
714       handleIdlePEs();
715
716     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
717
718     /* main event dispatcher in GranSim */
719     switch (event->evttype) {
720       /* Should just be continuing execution */
721     case ContinueThread:
722       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
723       /* ToDo: check assertion
724       ASSERT(run_queue_hd != (StgTSO*)NULL &&
725              run_queue_hd != END_TSO_QUEUE);
726       */
727       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
728       if (!RtsFlags.GranFlags.DoAsyncFetch &&
729           procStatus[CurrentProc]==Fetching) {
730         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
731               CurrentTSO->id, CurrentTSO, CurrentProc);
732         goto next_thread;
733       } 
734       /* Ignore ContinueThreads for completed threads */
735       if (CurrentTSO->what_next == ThreadComplete) {
736         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
737               CurrentTSO->id, CurrentTSO, CurrentProc);
738         goto next_thread;
739       } 
740       /* Ignore ContinueThreads for threads that are being migrated */
741       if (PROCS(CurrentTSO)==Nowhere) { 
742         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
743               CurrentTSO->id, CurrentTSO, CurrentProc);
744         goto next_thread;
745       }
746       /* The thread should be at the beginning of the run queue */
747       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
748         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
749               CurrentTSO->id, CurrentTSO, CurrentProc);
750         break; // run the thread anyway
751       }
752       /*
753       new_event(proc, proc, CurrentTime[proc],
754                 FindWork,
755                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
756       goto next_thread; 
757       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
758       break; // now actually run the thread; DaH Qu'vam yImuHbej 
759
760     case FetchNode:
761       do_the_fetchnode(event);
762       goto next_thread;             /* handle next event in event queue  */
763       
764     case GlobalBlock:
765       do_the_globalblock(event);
766       goto next_thread;             /* handle next event in event queue  */
767       
768     case FetchReply:
769       do_the_fetchreply(event);
770       goto next_thread;             /* handle next event in event queue  */
771       
772     case UnblockThread:   /* Move from the blocked queue to the tail of */
773       do_the_unblock(event);
774       goto next_thread;             /* handle next event in event queue  */
775       
776     case ResumeThread:  /* Move from the blocked queue to the tail of */
777       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
778       event->tso->gran.blocktime += 
779         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
780       do_the_startthread(event);
781       goto next_thread;             /* handle next event in event queue  */
782       
783     case StartThread:
784       do_the_startthread(event);
785       goto next_thread;             /* handle next event in event queue  */
786       
787     case MoveThread:
788       do_the_movethread(event);
789       goto next_thread;             /* handle next event in event queue  */
790       
791     case MoveSpark:
792       do_the_movespark(event);
793       goto next_thread;             /* handle next event in event queue  */
794       
795     case FindWork:
796       do_the_findwork(event);
797       goto next_thread;             /* handle next event in event queue  */
798       
799     default:
800       barf("Illegal event type %u\n", event->evttype);
801     }  /* switch */
802     
803     /* This point was scheduler_loop in the old RTS */
804
805     IF_DEBUG(gran, belch("GRAN: after main switch"));
806
807     TimeOfLastEvent = CurrentTime[CurrentProc];
808     TimeOfNextEvent = get_time_of_next_event();
809     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
810     // CurrentTSO = ThreadQueueHd;
811
812     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
813                          TimeOfNextEvent));
814
815     if (RtsFlags.GranFlags.Light) 
816       GranSimLight_leave_system(event, &ActiveTSO); 
817
818     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
819
820     IF_DEBUG(gran, 
821              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
822
823     /* in a GranSim setup the TSO stays on the run queue */
824     t = CurrentTSO;
825     /* Take a thread from the run queue. */
826     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
827
828     IF_DEBUG(gran, 
829              fprintf(stderr, "GRAN: About to run current thread, which is\n");
830              G_TSO(t,5));
831
832     context_switch = 0; // turned on via GranYield, checking events and time slice
833
834     IF_DEBUG(gran, 
835              DumpGranEvent(GR_SCHEDULE, t));
836
837     procStatus[CurrentProc] = Busy;
838
839 #elif defined(PAR)
840     if (PendingFetches != END_BF_QUEUE) {
841         processFetches();
842     }
843
844     /* ToDo: phps merge with spark activation above */
845     /* check whether we have local work and send requests if we have none */
846     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
847       /* :-[  no local threads => look out for local sparks */
848       /* the spark pool for the current PE */
849       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
850       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
851           pool->hd < pool->tl) {
852         /* 
853          * ToDo: add GC code check that we really have enough heap afterwards!!
854          * Old comment:
855          * If we're here (no runnable threads) and we have pending
856          * sparks, we must have a space problem.  Get enough space
857          * to turn one of those pending sparks into a
858          * thread... 
859          */
860
861         spark = findSpark(rtsFalse);                /* get a spark */
862         if (spark != (rtsSpark) NULL) {
863           tso = activateSpark(spark);       /* turn the spark into a thread */
864           IF_PAR_DEBUG(schedule,
865                        belch("==== schedule: Created TSO %d (%p); %d threads active",
866                              tso->id, tso, advisory_thread_count));
867
868           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
869             belch("==^^ failed to activate spark");
870             goto next_thread;
871           }               /* otherwise fall through & pick-up new tso */
872         } else {
873           IF_PAR_DEBUG(verbose,
874                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
875                              spark_queue_len(pool)));
876           goto next_thread;
877         }
878       }
879
880       /* If we still have no work we need to send a FISH to get a spark
881          from another PE 
882       */
883       if (EMPTY_RUN_QUEUE()) {
884       /* =8-[  no local sparks => look for work on other PEs */
885         /*
886          * We really have absolutely no work.  Send out a fish
887          * (there may be some out there already), and wait for
888          * something to arrive.  We clearly can't run any threads
889          * until a SCHEDULE or RESUME arrives, and so that's what
890          * we're hoping to see.  (Of course, we still have to
891          * respond to other types of messages.)
892          */
893         TIME now = msTime() /*CURRENT_TIME*/;
894         IF_PAR_DEBUG(verbose, 
895                      belch("--  now=%ld", now));
896         IF_PAR_DEBUG(verbose,
897                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
898                          (last_fish_arrived_at!=0 &&
899                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
900                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
901                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
902                              last_fish_arrived_at,
903                              RtsFlags.ParFlags.fishDelay, now);
904                      });
905         
906         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
907             (last_fish_arrived_at==0 ||
908              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
909           /* outstandingFishes is set in sendFish, processFish;
910              avoid flooding system with fishes via delay */
911           pe = choosePE();
912           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
913                    NEW_FISH_HUNGER);
914
915           // Global statistics: count no. of fishes
916           if (RtsFlags.ParFlags.ParStats.Global &&
917               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
918             globalParStats.tot_fish_mess++;
919           }
920         }
921       
922         receivedFinish = processMessages();
923         goto next_thread;
924       }
925     } else if (PacketsWaiting()) {  /* Look for incoming messages */
926       receivedFinish = processMessages();
927     }
928
929     /* Now we are sure that we have some work available */
930     ASSERT(run_queue_hd != END_TSO_QUEUE);
931
932     /* Take a thread from the run queue, if we have work */
933     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
934     IF_DEBUG(sanity,checkTSO(t));
935
936     /* ToDo: write something to the log-file
937     if (RTSflags.ParFlags.granSimStats && !sameThread)
938         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
939
940     CurrentTSO = t;
941     */
942     /* the spark pool for the current PE */
943     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
944
945     IF_DEBUG(scheduler, 
946              belch("--=^ %d threads, %d sparks on [%#x]", 
947                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
948
949 # if 1
950     if (0 && RtsFlags.ParFlags.ParStats.Full && 
951         t && LastTSO && t->id != LastTSO->id && 
952         LastTSO->why_blocked == NotBlocked && 
953         LastTSO->what_next != ThreadComplete) {
954       // if previously scheduled TSO not blocked we have to record the context switch
955       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
956                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
957     }
958
959     if (RtsFlags.ParFlags.ParStats.Full && 
960         (emitSchedule /* forced emit */ ||
961         (t && LastTSO && t->id != LastTSO->id))) {
962       /* 
963          we are running a different TSO, so write a schedule event to log file
964          NB: If we use fair scheduling we also have to write  a deschedule 
965              event for LastTSO; with unfair scheduling we know that the
966              previous tso has blocked whenever we switch to another tso, so
967              we don't need it in GUM for now
968       */
969       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
970                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
971       emitSchedule = rtsFalse;
972     }
973      
974 # endif
975 #else /* !GRAN && !PAR */
976   
977     /* grab a thread from the run queue */
978     ASSERT(run_queue_hd != END_TSO_QUEUE);
979     t = POP_RUN_QUEUE();
980     // Sanity check the thread we're about to run.  This can be
981     // expensive if there is lots of thread switching going on...
982     IF_DEBUG(sanity,checkTSO(t));
983 #endif
984     
985     grabCapability(&cap);
986     cap->r.rCurrentTSO = t;
987     
988     /* context switches are now initiated by the timer signal, unless
989      * the user specified "context switch as often as possible", with
990      * +RTS -C0
991      */
992     if (
993 #ifdef PROFILING
994         RtsFlags.ProfFlags.profileInterval == 0 ||
995 #endif
996         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
997          && (run_queue_hd != END_TSO_QUEUE
998              || blocked_queue_hd != END_TSO_QUEUE
999              || sleeping_queue != END_TSO_QUEUE)))
1000         context_switch = 1;
1001     else
1002         context_switch = 0;
1003
1004     RELEASE_LOCK(&sched_mutex);
1005
1006     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1007                               t->id, t, whatNext_strs[t->what_next]));
1008
1009 #ifdef PROFILING
1010     startHeapProfTimer();
1011 #endif
1012
1013     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1014     /* Run the current thread 
1015      */
1016     switch (cap->r.rCurrentTSO->what_next) {
1017     case ThreadKilled:
1018     case ThreadComplete:
1019         /* Thread already finished, return to scheduler. */
1020         ret = ThreadFinished;
1021         break;
1022     case ThreadEnterGHC:
1023         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1024         break;
1025     case ThreadRunGHC:
1026         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1027         break;
1028     case ThreadEnterInterp:
1029         ret = interpretBCO(cap);
1030         break;
1031     default:
1032       barf("schedule: invalid what_next field");
1033     }
1034     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1035     
1036     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1037 #ifdef PROFILING
1038     stopHeapProfTimer();
1039     CCCS = CCS_SYSTEM;
1040 #endif
1041     
1042     ACQUIRE_LOCK(&sched_mutex);
1043
1044 #ifdef SMP
1045     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1046 #elif !defined(GRAN) && !defined(PAR)
1047     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1048 #endif
1049     t = cap->r.rCurrentTSO;
1050     
1051 #if defined(PAR)
1052     /* HACK 675: if the last thread didn't yield, make sure to print a 
1053        SCHEDULE event to the log file when StgRunning the next thread, even
1054        if it is the same one as before */
1055     LastTSO = t; 
1056     TimeOfLastYield = CURRENT_TIME;
1057 #endif
1058
1059     switch (ret) {
1060     case HeapOverflow:
1061 #if defined(GRAN)
1062       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1063       globalGranStats.tot_heapover++;
1064 #elif defined(PAR)
1065       globalParStats.tot_heapover++;
1066 #endif
1067
1068       // did the task ask for a large block?
1069       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1070           // if so, get one and push it on the front of the nursery.
1071           bdescr *bd;
1072           nat blocks;
1073           
1074           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1075
1076           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1077                                    t->id, t,
1078                                    whatNext_strs[t->what_next], blocks));
1079
1080           // don't do this if it would push us over the
1081           // alloc_blocks_lim limit; we'll GC first.
1082           if (alloc_blocks + blocks < alloc_blocks_lim) {
1083
1084               alloc_blocks += blocks;
1085               bd = allocGroup( blocks );
1086
1087               // link the new group into the list
1088               bd->link = cap->r.rCurrentNursery;
1089               bd->u.back = cap->r.rCurrentNursery->u.back;
1090               if (cap->r.rCurrentNursery->u.back != NULL) {
1091                   cap->r.rCurrentNursery->u.back->link = bd;
1092               } else {
1093                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1094                          g0s0->blocks == cap->r.rNursery);
1095                   cap->r.rNursery = g0s0->blocks = bd;
1096               }           
1097               cap->r.rCurrentNursery->u.back = bd;
1098
1099               // initialise it as a nursery block
1100               bd->step = g0s0;
1101               bd->gen_no = 0;
1102               bd->flags = 0;
1103               bd->free = bd->start;
1104
1105               // don't forget to update the block count in g0s0.
1106               g0s0->n_blocks += blocks;
1107               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1108
1109               // now update the nursery to point to the new block
1110               cap->r.rCurrentNursery = bd;
1111
1112               // we might be unlucky and have another thread get on the
1113               // run queue before us and steal the large block, but in that
1114               // case the thread will just end up requesting another large
1115               // block.
1116               PUSH_ON_RUN_QUEUE(t);
1117               break;
1118           }
1119       }
1120
1121       /* make all the running tasks block on a condition variable,
1122        * maybe set context_switch and wait till they all pile in,
1123        * then have them wait on a GC condition variable.
1124        */
1125       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1126                                t->id, t, whatNext_strs[t->what_next]));
1127       threadPaused(t);
1128 #if defined(GRAN)
1129       ASSERT(!is_on_queue(t,CurrentProc));
1130 #elif defined(PAR)
1131       /* Currently we emit a DESCHEDULE event before GC in GUM.
1132          ToDo: either add separate event to distinguish SYSTEM time from rest
1133                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1134       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1135         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1136                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1137         emitSchedule = rtsTrue;
1138       }
1139 #endif
1140       
1141       ready_to_gc = rtsTrue;
1142       context_switch = 1;               /* stop other threads ASAP */
1143       PUSH_ON_RUN_QUEUE(t);
1144       /* actual GC is done at the end of the while loop */
1145       break;
1146       
1147     case StackOverflow:
1148 #if defined(GRAN)
1149       IF_DEBUG(gran, 
1150                DumpGranEvent(GR_DESCHEDULE, t));
1151       globalGranStats.tot_stackover++;
1152 #elif defined(PAR)
1153       // IF_DEBUG(par, 
1154       // DumpGranEvent(GR_DESCHEDULE, t);
1155       globalParStats.tot_stackover++;
1156 #endif
1157       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1158                                t->id, t, whatNext_strs[t->what_next]));
1159       /* just adjust the stack for this thread, then pop it back
1160        * on the run queue.
1161        */
1162       threadPaused(t);
1163       { 
1164         StgMainThread *m;
1165         /* enlarge the stack */
1166         StgTSO *new_t = threadStackOverflow(t);
1167         
1168         /* This TSO has moved, so update any pointers to it from the
1169          * main thread stack.  It better not be on any other queues...
1170          * (it shouldn't be).
1171          */
1172         for (m = main_threads; m != NULL; m = m->link) {
1173           if (m->tso == t) {
1174             m->tso = new_t;
1175           }
1176         }
1177         threadPaused(new_t);
1178         PUSH_ON_RUN_QUEUE(new_t);
1179       }
1180       break;
1181
1182     case ThreadYielding:
1183 #if defined(GRAN)
1184       IF_DEBUG(gran, 
1185                DumpGranEvent(GR_DESCHEDULE, t));
1186       globalGranStats.tot_yields++;
1187 #elif defined(PAR)
1188       // IF_DEBUG(par, 
1189       // DumpGranEvent(GR_DESCHEDULE, t);
1190       globalParStats.tot_yields++;
1191 #endif
1192       /* put the thread back on the run queue.  Then, if we're ready to
1193        * GC, check whether this is the last task to stop.  If so, wake
1194        * up the GC thread.  getThread will block during a GC until the
1195        * GC is finished.
1196        */
1197       IF_DEBUG(scheduler,
1198                if (t->what_next == ThreadEnterInterp) {
1199                    /* ToDo: or maybe a timer expired when we were in Hugs?
1200                     * or maybe someone hit ctrl-C
1201                     */
1202                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1203                          t->id, t, whatNext_strs[t->what_next]);
1204                } else {
1205                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1206                          t->id, t, whatNext_strs[t->what_next]);
1207                }
1208                );
1209
1210       threadPaused(t);
1211
1212       IF_DEBUG(sanity,
1213                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1214                checkTSO(t));
1215       ASSERT(t->link == END_TSO_QUEUE);
1216 #if defined(GRAN)
1217       ASSERT(!is_on_queue(t,CurrentProc));
1218
1219       IF_DEBUG(sanity,
1220                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1221                checkThreadQsSanity(rtsTrue));
1222 #endif
1223 #if defined(PAR)
1224       if (RtsFlags.ParFlags.doFairScheduling) { 
1225         /* this does round-robin scheduling; good for concurrency */
1226         APPEND_TO_RUN_QUEUE(t);
1227       } else {
1228         /* this does unfair scheduling; good for parallelism */
1229         PUSH_ON_RUN_QUEUE(t);
1230       }
1231 #else
1232       /* this does round-robin scheduling; good for concurrency */
1233       APPEND_TO_RUN_QUEUE(t);
1234 #endif
1235 #if defined(GRAN)
1236       /* add a ContinueThread event to actually process the thread */
1237       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1238                 ContinueThread,
1239                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1240       IF_GRAN_DEBUG(bq, 
1241                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1242                G_EVENTQ(0);
1243                G_CURR_THREADQ(0));
1244 #endif /* GRAN */
1245       break;
1246       
1247     case ThreadBlocked:
1248 #if defined(GRAN)
1249       IF_DEBUG(scheduler,
1250                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1251                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1252                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1253
1254       // ??? needed; should emit block before
1255       IF_DEBUG(gran, 
1256                DumpGranEvent(GR_DESCHEDULE, t)); 
1257       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1258       /*
1259         ngoq Dogh!
1260       ASSERT(procStatus[CurrentProc]==Busy || 
1261               ((procStatus[CurrentProc]==Fetching) && 
1262               (t->block_info.closure!=(StgClosure*)NULL)));
1263       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1264           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1265             procStatus[CurrentProc]==Fetching)) 
1266         procStatus[CurrentProc] = Idle;
1267       */
1268 #elif defined(PAR)
1269       IF_DEBUG(scheduler,
1270                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1271                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1272       IF_PAR_DEBUG(bq,
1273
1274                    if (t->block_info.closure!=(StgClosure*)NULL) 
1275                      print_bq(t->block_info.closure));
1276
1277       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1278       blockThread(t);
1279
1280       /* whatever we schedule next, we must log that schedule */
1281       emitSchedule = rtsTrue;
1282
1283 #else /* !GRAN */
1284       /* don't need to do anything.  Either the thread is blocked on
1285        * I/O, in which case we'll have called addToBlockedQueue
1286        * previously, or it's blocked on an MVar or Blackhole, in which
1287        * case it'll be on the relevant queue already.
1288        */
1289       IF_DEBUG(scheduler,
1290                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1291                printThreadBlockage(t);
1292                fprintf(stderr, "\n"));
1293
1294       /* Only for dumping event to log file 
1295          ToDo: do I need this in GranSim, too?
1296       blockThread(t);
1297       */
1298 #endif
1299       threadPaused(t);
1300       break;
1301       
1302     case ThreadFinished:
1303       /* Need to check whether this was a main thread, and if so, signal
1304        * the task that started it with the return value.  If we have no
1305        * more main threads, we probably need to stop all the tasks until
1306        * we get a new one.
1307        */
1308       /* We also end up here if the thread kills itself with an
1309        * uncaught exception, see Exception.hc.
1310        */
1311       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1312 #if defined(GRAN)
1313       endThread(t, CurrentProc); // clean-up the thread
1314 #elif defined(PAR)
1315       /* For now all are advisory -- HWL */
1316       //if(t->priority==AdvisoryPriority) ??
1317       advisory_thread_count--;
1318       
1319 # ifdef DIST
1320       if(t->dist.priority==RevalPriority)
1321         FinishReval(t);
1322 # endif
1323       
1324       if (RtsFlags.ParFlags.ParStats.Full &&
1325           !RtsFlags.ParFlags.ParStats.Suppressed) 
1326         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1327 #endif
1328       break;
1329       
1330     default:
1331       barf("schedule: invalid thread return code %d", (int)ret);
1332     }
1333     
1334 #if defined(RTS_SUPPORTS_THREADS)
1335     /* I don't understand what this re-grab is doing -- sof */
1336     grabCapability(&cap);
1337 #endif
1338
1339 #ifdef PROFILING
1340     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1341         GarbageCollect(GetRoots, rtsTrue);
1342         heapCensus();
1343         performHeapProfile = rtsFalse;
1344         ready_to_gc = rtsFalse; // we already GC'd
1345     }
1346 #endif
1347
1348     if (ready_to_gc 
1349 #ifdef SMP
1350         && allFreeCapabilities() 
1351 #endif
1352         ) {
1353       /* everybody back, start the GC.
1354        * Could do it in this thread, or signal a condition var
1355        * to do it in another thread.  Either way, we need to
1356        * broadcast on gc_pending_cond afterward.
1357        */
1358 #if defined(RTS_SUPPORTS_THREADS)
1359       IF_DEBUG(scheduler,sched_belch("doing GC"));
1360 #endif
1361       GarbageCollect(GetRoots,rtsFalse);
1362       ready_to_gc = rtsFalse;
1363 #ifdef SMP
1364       broadcastCondition(&gc_pending_cond);
1365 #endif
1366 #if defined(GRAN)
1367       /* add a ContinueThread event to continue execution of current thread */
1368       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1369                 ContinueThread,
1370                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1371       IF_GRAN_DEBUG(bq, 
1372                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1373                G_EVENTQ(0);
1374                G_CURR_THREADQ(0));
1375 #endif /* GRAN */
1376     }
1377
1378 #if defined(GRAN)
1379   next_thread:
1380     IF_GRAN_DEBUG(unused,
1381                   print_eventq(EventHd));
1382
1383     event = get_next_event();
1384 #elif defined(PAR)
1385   next_thread:
1386     /* ToDo: wait for next message to arrive rather than busy wait */
1387 #endif /* GRAN */
1388
1389   } /* end of while(1) */
1390
1391   IF_PAR_DEBUG(verbose,
1392                belch("== Leaving schedule() after having received Finish"));
1393 }
1394
1395 /* ---------------------------------------------------------------------------
1396  * Singleton fork(). Do not copy any running threads.
1397  * ------------------------------------------------------------------------- */
1398
1399 StgInt forkProcess(StgTSO* tso) {
1400
1401 #ifndef mingw32_TARGET_OS
1402   pid_t pid;
1403   StgTSO* t,*next;
1404
1405   IF_DEBUG(scheduler,sched_belch("forking!"));
1406
1407   pid = fork();
1408   if (pid) { /* parent */
1409
1410   /* just return the pid */
1411     
1412   } else { /* child */
1413   /* wipe all other threads */
1414   run_queue_hd = tso;
1415   tso->link = END_TSO_QUEUE;
1416
1417   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1418      us is picky about finding the threat still in its queue when
1419      handling the deleteThread() */
1420
1421   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1422     next = t->link;
1423     if (t->id != tso->id) {
1424       deleteThread(t);
1425     }
1426   }
1427   }
1428   return pid;
1429 #else /* mingw32 */
1430   barf("forkProcess#: primop not implemented for mingw32, sorry!");
1431   return -1;
1432 #endif /* mingw32 */
1433 }
1434
1435 /* ---------------------------------------------------------------------------
1436  * deleteAllThreads():  kill all the live threads.
1437  *
1438  * This is used when we catch a user interrupt (^C), before performing
1439  * any necessary cleanups and running finalizers.
1440  * ------------------------------------------------------------------------- */
1441    
1442 void deleteAllThreads ( void )
1443 {
1444   StgTSO* t, *next;
1445   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1446   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1447       next = t->global_link;
1448       deleteThread(t);
1449   }      
1450   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1451   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1452   sleeping_queue = END_TSO_QUEUE;
1453 }
1454
1455 /* startThread and  insertThread are now in GranSim.c -- HWL */
1456
1457
1458 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1459 //@subsection Suspend and Resume
1460
1461 /* ---------------------------------------------------------------------------
1462  * Suspending & resuming Haskell threads.
1463  * 
1464  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1465  * its capability before calling the C function.  This allows another
1466  * task to pick up the capability and carry on running Haskell
1467  * threads.  It also means that if the C call blocks, it won't lock
1468  * the whole system.
1469  *
1470  * The Haskell thread making the C call is put to sleep for the
1471  * duration of the call, on the susepended_ccalling_threads queue.  We
1472  * give out a token to the task, which it can use to resume the thread
1473  * on return from the C function.
1474  * ------------------------------------------------------------------------- */
1475    
1476 StgInt
1477 suspendThread( StgRegTable *reg, 
1478                rtsBool concCall
1479 #if !defined(RTS_SUPPORTS_THREADS)
1480                STG_UNUSED
1481 #endif
1482                )
1483 {
1484   nat tok;
1485   Capability *cap;
1486
1487   /* assume that *reg is a pointer to the StgRegTable part
1488    * of a Capability.
1489    */
1490   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1491
1492   ACQUIRE_LOCK(&sched_mutex);
1493
1494   IF_DEBUG(scheduler,
1495            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1496
1497   threadPaused(cap->r.rCurrentTSO);
1498   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1499   suspended_ccalling_threads = cap->r.rCurrentTSO;
1500
1501 #if defined(RTS_SUPPORTS_THREADS)
1502   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1503 #endif
1504
1505   /* Use the thread ID as the token; it should be unique */
1506   tok = cap->r.rCurrentTSO->id;
1507
1508   /* Hand back capability */
1509   releaseCapability(cap);
1510   
1511 #if defined(RTS_SUPPORTS_THREADS)
1512   /* Preparing to leave the RTS, so ensure there's a native thread/task
1513      waiting to take over.
1514      
1515      ToDo: optimise this and only create a new task if there's a need
1516      for one (i.e., if there's only one Concurrent Haskell thread alive,
1517      there's no need to create a new task).
1518   */
1519   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1520   if (concCall) {
1521     startTask(taskStart);
1522   }
1523 #endif
1524
1525   /* Other threads _might_ be available for execution; signal this */
1526   THREAD_RUNNABLE();
1527   RELEASE_LOCK(&sched_mutex);
1528   return tok; 
1529 }
1530
1531 StgRegTable *
1532 resumeThread( StgInt tok,
1533               rtsBool concCall
1534 #if !defined(RTS_SUPPORTS_THREADS)
1535                STG_UNUSED
1536 #endif
1537               )
1538 {
1539   StgTSO *tso, **prev;
1540   Capability *cap;
1541
1542 #if defined(RTS_SUPPORTS_THREADS)
1543   /* Wait for permission to re-enter the RTS with the result. */
1544   if ( concCall ) {
1545     grabReturnCapability(&sched_mutex, &cap);
1546   } else {
1547     grabCapability(&cap);
1548   }
1549 #else
1550   grabCapability(&cap);
1551 #endif
1552
1553   /* Remove the thread off of the suspended list */
1554   prev = &suspended_ccalling_threads;
1555   for (tso = suspended_ccalling_threads; 
1556        tso != END_TSO_QUEUE; 
1557        prev = &tso->link, tso = tso->link) {
1558     if (tso->id == (StgThreadID)tok) {
1559       *prev = tso->link;
1560       break;
1561     }
1562   }
1563   if (tso == END_TSO_QUEUE) {
1564     barf("resumeThread: thread not found");
1565   }
1566   tso->link = END_TSO_QUEUE;
1567   /* Reset blocking status */
1568   tso->why_blocked  = NotBlocked;
1569
1570   RELEASE_LOCK(&sched_mutex);
1571
1572   cap->r.rCurrentTSO = tso;
1573   return &cap->r;
1574 }
1575
1576
1577 /* ---------------------------------------------------------------------------
1578  * Static functions
1579  * ------------------------------------------------------------------------ */
1580 static void unblockThread(StgTSO *tso);
1581
1582 /* ---------------------------------------------------------------------------
1583  * Comparing Thread ids.
1584  *
1585  * This is used from STG land in the implementation of the
1586  * instances of Eq/Ord for ThreadIds.
1587  * ------------------------------------------------------------------------ */
1588
1589 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1590
1591   StgThreadID id1 = tso1->id; 
1592   StgThreadID id2 = tso2->id;
1593  
1594   if (id1 < id2) return (-1);
1595   if (id1 > id2) return 1;
1596   return 0;
1597 }
1598
1599 /* ---------------------------------------------------------------------------
1600  * Fetching the ThreadID from an StgTSO.
1601  *
1602  * This is used in the implementation of Show for ThreadIds.
1603  * ------------------------------------------------------------------------ */
1604 int rts_getThreadId(const StgTSO *tso) 
1605 {
1606   return tso->id;
1607 }
1608
1609 #ifdef DEBUG
1610 void labelThread(StgTSO *tso, char *label)
1611 {
1612   int len;
1613   void *buf;
1614
1615   /* Caveat: Once set, you can only set the thread name to "" */
1616   len = strlen(label)+1;
1617   buf = realloc(tso->label,len);
1618   if (buf == NULL) {
1619     fprintf(stderr,"insufficient memory for labelThread!\n");
1620     free(tso->label);
1621   } else
1622     strncpy(buf,label,len);
1623   tso->label = buf;
1624 }
1625 #endif /* DEBUG */
1626
1627 /* ---------------------------------------------------------------------------
1628    Create a new thread.
1629
1630    The new thread starts with the given stack size.  Before the
1631    scheduler can run, however, this thread needs to have a closure
1632    (and possibly some arguments) pushed on its stack.  See
1633    pushClosure() in Schedule.h.
1634
1635    createGenThread() and createIOThread() (in SchedAPI.h) are
1636    convenient packaged versions of this function.
1637
1638    currently pri (priority) is only used in a GRAN setup -- HWL
1639    ------------------------------------------------------------------------ */
1640 //@cindex createThread
1641 #if defined(GRAN)
1642 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1643 StgTSO *
1644 createThread(nat stack_size, StgInt pri)
1645 {
1646   return createThread_(stack_size, rtsFalse, pri);
1647 }
1648
1649 static StgTSO *
1650 createThread_(nat size, rtsBool have_lock, StgInt pri)
1651 {
1652 #else
1653 StgTSO *
1654 createThread(nat stack_size)
1655 {
1656   return createThread_(stack_size, rtsFalse);
1657 }
1658
1659 static StgTSO *
1660 createThread_(nat size, rtsBool have_lock)
1661 {
1662 #endif
1663
1664     StgTSO *tso;
1665     nat stack_size;
1666
1667     /* First check whether we should create a thread at all */
1668 #if defined(PAR)
1669   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1670   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1671     threadsIgnored++;
1672     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1673           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1674     return END_TSO_QUEUE;
1675   }
1676   threadsCreated++;
1677 #endif
1678
1679 #if defined(GRAN)
1680   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1681 #endif
1682
1683   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1684
1685   /* catch ridiculously small stack sizes */
1686   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1687     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1688   }
1689
1690   stack_size = size - TSO_STRUCT_SIZEW;
1691
1692   tso = (StgTSO *)allocate(size);
1693   TICK_ALLOC_TSO(stack_size, 0);
1694
1695   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1696 #if defined(GRAN)
1697   SET_GRAN_HDR(tso, ThisPE);
1698 #endif
1699   tso->what_next     = ThreadEnterGHC;
1700
1701 #ifdef DEBUG
1702   tso->label = NULL;
1703 #endif
1704
1705   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1706    * protect the increment operation on next_thread_id.
1707    * In future, we could use an atomic increment instead.
1708    */
1709   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1710   tso->id = next_thread_id++; 
1711   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1712
1713   tso->why_blocked  = NotBlocked;
1714   tso->blocked_exceptions = NULL;
1715
1716   tso->stack_size   = stack_size;
1717   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1718                               - TSO_STRUCT_SIZEW;
1719   tso->sp           = (P_)&(tso->stack) + stack_size;
1720
1721 #ifdef PROFILING
1722   tso->prof.CCCS = CCS_MAIN;
1723 #endif
1724
1725   /* put a stop frame on the stack */
1726   tso->sp -= sizeofW(StgStopFrame);
1727   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1728   tso->su = (StgUpdateFrame*)tso->sp;
1729
1730   // ToDo: check this
1731 #if defined(GRAN)
1732   tso->link = END_TSO_QUEUE;
1733   /* uses more flexible routine in GranSim */
1734   insertThread(tso, CurrentProc);
1735 #else
1736   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1737    * from its creation
1738    */
1739 #endif
1740
1741 #if defined(GRAN) 
1742   if (RtsFlags.GranFlags.GranSimStats.Full) 
1743     DumpGranEvent(GR_START,tso);
1744 #elif defined(PAR)
1745   if (RtsFlags.ParFlags.ParStats.Full) 
1746     DumpGranEvent(GR_STARTQ,tso);
1747   /* HACk to avoid SCHEDULE 
1748      LastTSO = tso; */
1749 #endif
1750
1751   /* Link the new thread on the global thread list.
1752    */
1753   tso->global_link = all_threads;
1754   all_threads = tso;
1755
1756 #if defined(DIST)
1757   tso->dist.priority = MandatoryPriority; //by default that is...
1758 #endif
1759
1760 #if defined(GRAN)
1761   tso->gran.pri = pri;
1762 # if defined(DEBUG)
1763   tso->gran.magic = TSO_MAGIC; // debugging only
1764 # endif
1765   tso->gran.sparkname   = 0;
1766   tso->gran.startedat   = CURRENT_TIME; 
1767   tso->gran.exported    = 0;
1768   tso->gran.basicblocks = 0;
1769   tso->gran.allocs      = 0;
1770   tso->gran.exectime    = 0;
1771   tso->gran.fetchtime   = 0;
1772   tso->gran.fetchcount  = 0;
1773   tso->gran.blocktime   = 0;
1774   tso->gran.blockcount  = 0;
1775   tso->gran.blockedat   = 0;
1776   tso->gran.globalsparks = 0;
1777   tso->gran.localsparks  = 0;
1778   if (RtsFlags.GranFlags.Light)
1779     tso->gran.clock  = Now; /* local clock */
1780   else
1781     tso->gran.clock  = 0;
1782
1783   IF_DEBUG(gran,printTSO(tso));
1784 #elif defined(PAR)
1785 # if defined(DEBUG)
1786   tso->par.magic = TSO_MAGIC; // debugging only
1787 # endif
1788   tso->par.sparkname   = 0;
1789   tso->par.startedat   = CURRENT_TIME; 
1790   tso->par.exported    = 0;
1791   tso->par.basicblocks = 0;
1792   tso->par.allocs      = 0;
1793   tso->par.exectime    = 0;
1794   tso->par.fetchtime   = 0;
1795   tso->par.fetchcount  = 0;
1796   tso->par.blocktime   = 0;
1797   tso->par.blockcount  = 0;
1798   tso->par.blockedat   = 0;
1799   tso->par.globalsparks = 0;
1800   tso->par.localsparks  = 0;
1801 #endif
1802
1803 #if defined(GRAN)
1804   globalGranStats.tot_threads_created++;
1805   globalGranStats.threads_created_on_PE[CurrentProc]++;
1806   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1807   globalGranStats.tot_sq_probes++;
1808 #elif defined(PAR)
1809   // collect parallel global statistics (currently done together with GC stats)
1810   if (RtsFlags.ParFlags.ParStats.Global &&
1811       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1812     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1813     globalParStats.tot_threads_created++;
1814   }
1815 #endif 
1816
1817 #if defined(GRAN)
1818   IF_GRAN_DEBUG(pri,
1819                 belch("==__ schedule: Created TSO %d (%p);",
1820                       CurrentProc, tso, tso->id));
1821 #elif defined(PAR)
1822     IF_PAR_DEBUG(verbose,
1823                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1824                        tso->id, tso, advisory_thread_count));
1825 #else
1826   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1827                                  tso->id, tso->stack_size));
1828 #endif    
1829   return tso;
1830 }
1831
1832 #if defined(PAR)
1833 /* RFP:
1834    all parallel thread creation calls should fall through the following routine.
1835 */
1836 StgTSO *
1837 createSparkThread(rtsSpark spark) 
1838 { StgTSO *tso;
1839   ASSERT(spark != (rtsSpark)NULL);
1840   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1841   { threadsIgnored++;
1842     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1843           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1844     return END_TSO_QUEUE;
1845   }
1846   else
1847   { threadsCreated++;
1848     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1849     if (tso==END_TSO_QUEUE)     
1850       barf("createSparkThread: Cannot create TSO");
1851 #if defined(DIST)
1852     tso->priority = AdvisoryPriority;
1853 #endif
1854     pushClosure(tso,spark);
1855     PUSH_ON_RUN_QUEUE(tso);
1856     advisory_thread_count++;    
1857   }
1858   return tso;
1859 }
1860 #endif
1861
1862 /*
1863   Turn a spark into a thread.
1864   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1865 */
1866 #if defined(PAR)
1867 //@cindex activateSpark
1868 StgTSO *
1869 activateSpark (rtsSpark spark) 
1870 {
1871   StgTSO *tso;
1872
1873   tso = createSparkThread(spark);
1874   if (RtsFlags.ParFlags.ParStats.Full) {   
1875     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1876     IF_PAR_DEBUG(verbose,
1877                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1878                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1879   }
1880   // ToDo: fwd info on local/global spark to thread -- HWL
1881   // tso->gran.exported =  spark->exported;
1882   // tso->gran.locked =   !spark->global;
1883   // tso->gran.sparkname = spark->name;
1884
1885   return tso;
1886 }
1887 #endif
1888
1889 /* ---------------------------------------------------------------------------
1890  * scheduleThread()
1891  *
1892  * scheduleThread puts a thread on the head of the runnable queue.
1893  * This will usually be done immediately after a thread is created.
1894  * The caller of scheduleThread must create the thread using e.g.
1895  * createThread and push an appropriate closure
1896  * on this thread's stack before the scheduler is invoked.
1897  * ------------------------------------------------------------------------ */
1898
1899 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1900
1901 void
1902 scheduleThread_(StgTSO *tso
1903                , rtsBool createTask
1904 #if !defined(THREADED_RTS)
1905                  STG_UNUSED
1906 #endif
1907               )
1908 {
1909   ACQUIRE_LOCK(&sched_mutex);
1910
1911   /* Put the new thread on the head of the runnable queue.  The caller
1912    * better push an appropriate closure on this thread's stack
1913    * beforehand.  In the SMP case, the thread may start running as
1914    * soon as we release the scheduler lock below.
1915    */
1916   PUSH_ON_RUN_QUEUE(tso);
1917 #if defined(THREADED_RTS)
1918   /* If main() is scheduling a thread, don't bother creating a 
1919    * new task.
1920    */
1921   if ( createTask ) {
1922     startTask(taskStart);
1923   }
1924 #endif
1925   THREAD_RUNNABLE();
1926
1927 #if 0
1928   IF_DEBUG(scheduler,printTSO(tso));
1929 #endif
1930   RELEASE_LOCK(&sched_mutex);
1931 }
1932
1933 void scheduleThread(StgTSO* tso)
1934 {
1935   return scheduleThread_(tso, rtsFalse);
1936 }
1937
1938 void scheduleExtThread(StgTSO* tso)
1939 {
1940   return scheduleThread_(tso, rtsTrue);
1941 }
1942
1943 /* ---------------------------------------------------------------------------
1944  * initScheduler()
1945  *
1946  * Initialise the scheduler.  This resets all the queues - if the
1947  * queues contained any threads, they'll be garbage collected at the
1948  * next pass.
1949  *
1950  * ------------------------------------------------------------------------ */
1951
1952 #ifdef SMP
1953 static void
1954 term_handler(int sig STG_UNUSED)
1955 {
1956   stat_workerStop();
1957   ACQUIRE_LOCK(&term_mutex);
1958   await_death--;
1959   RELEASE_LOCK(&term_mutex);
1960   shutdownThread();
1961 }
1962 #endif
1963
1964 void 
1965 initScheduler(void)
1966 {
1967 #if defined(GRAN)
1968   nat i;
1969
1970   for (i=0; i<=MAX_PROC; i++) {
1971     run_queue_hds[i]      = END_TSO_QUEUE;
1972     run_queue_tls[i]      = END_TSO_QUEUE;
1973     blocked_queue_hds[i]  = END_TSO_QUEUE;
1974     blocked_queue_tls[i]  = END_TSO_QUEUE;
1975     ccalling_threadss[i]  = END_TSO_QUEUE;
1976     sleeping_queue        = END_TSO_QUEUE;
1977   }
1978 #else
1979   run_queue_hd      = END_TSO_QUEUE;
1980   run_queue_tl      = END_TSO_QUEUE;
1981   blocked_queue_hd  = END_TSO_QUEUE;
1982   blocked_queue_tl  = END_TSO_QUEUE;
1983   sleeping_queue    = END_TSO_QUEUE;
1984 #endif 
1985
1986   suspended_ccalling_threads  = END_TSO_QUEUE;
1987
1988   main_threads = NULL;
1989   all_threads  = END_TSO_QUEUE;
1990
1991   context_switch = 0;
1992   interrupted    = 0;
1993
1994   RtsFlags.ConcFlags.ctxtSwitchTicks =
1995       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1996       
1997 #if defined(RTS_SUPPORTS_THREADS)
1998   /* Initialise the mutex and condition variables used by
1999    * the scheduler. */
2000   initMutex(&sched_mutex);
2001   initMutex(&term_mutex);
2002
2003   initCondition(&thread_ready_cond);
2004 #endif
2005   
2006 #if defined(SMP)
2007   initCondition(&gc_pending_cond);
2008 #endif
2009
2010 #if defined(RTS_SUPPORTS_THREADS)
2011   ACQUIRE_LOCK(&sched_mutex);
2012 #endif
2013
2014   /* Install the SIGHUP handler */
2015 #if defined(SMP)
2016   {
2017     struct sigaction action,oact;
2018
2019     action.sa_handler = term_handler;
2020     sigemptyset(&action.sa_mask);
2021     action.sa_flags = 0;
2022     if (sigaction(SIGTERM, &action, &oact) != 0) {
2023       barf("can't install TERM handler");
2024     }
2025   }
2026 #endif
2027
2028   /* A capability holds the state a native thread needs in
2029    * order to execute STG code. At least one capability is
2030    * floating around (only SMP builds have more than one).
2031    */
2032   initCapabilities();
2033   
2034 #if defined(RTS_SUPPORTS_THREADS)
2035     /* start our haskell execution tasks */
2036 # if defined(SMP)
2037     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2038 # else
2039     startTaskManager(0,taskStart);
2040 # endif
2041 #endif
2042
2043 #if /* defined(SMP) ||*/ defined(PAR)
2044   initSparkPools();
2045 #endif
2046
2047 #if defined(RTS_SUPPORTS_THREADS)
2048   RELEASE_LOCK(&sched_mutex);
2049 #endif
2050
2051 }
2052
2053 void
2054 exitScheduler( void )
2055 {
2056 #if defined(RTS_SUPPORTS_THREADS)
2057   stopTaskManager();
2058 #endif
2059 }
2060
2061 /* -----------------------------------------------------------------------------
2062    Managing the per-task allocation areas.
2063    
2064    Each capability comes with an allocation area.  These are
2065    fixed-length block lists into which allocation can be done.
2066
2067    ToDo: no support for two-space collection at the moment???
2068    -------------------------------------------------------------------------- */
2069
2070 /* -----------------------------------------------------------------------------
2071  * waitThread is the external interface for running a new computation
2072  * and waiting for the result.
2073  *
2074  * In the non-SMP case, we create a new main thread, push it on the 
2075  * main-thread stack, and invoke the scheduler to run it.  The
2076  * scheduler will return when the top main thread on the stack has
2077  * completed or died, and fill in the necessary fields of the
2078  * main_thread structure.
2079  *
2080  * In the SMP case, we create a main thread as before, but we then
2081  * create a new condition variable and sleep on it.  When our new
2082  * main thread has completed, we'll be woken up and the status/result
2083  * will be in the main_thread struct.
2084  * -------------------------------------------------------------------------- */
2085
2086 int 
2087 howManyThreadsAvail ( void )
2088 {
2089    int i = 0;
2090    StgTSO* q;
2091    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2092       i++;
2093    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2094       i++;
2095    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2096       i++;
2097    return i;
2098 }
2099
2100 void
2101 finishAllThreads ( void )
2102 {
2103    do {
2104       while (run_queue_hd != END_TSO_QUEUE) {
2105          waitThread ( run_queue_hd, NULL);
2106       }
2107       while (blocked_queue_hd != END_TSO_QUEUE) {
2108          waitThread ( blocked_queue_hd, NULL);
2109       }
2110       while (sleeping_queue != END_TSO_QUEUE) {
2111          waitThread ( blocked_queue_hd, NULL);
2112       }
2113    } while 
2114       (blocked_queue_hd != END_TSO_QUEUE || 
2115        run_queue_hd     != END_TSO_QUEUE ||
2116        sleeping_queue   != END_TSO_QUEUE);
2117 }
2118
2119 SchedulerStatus
2120 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2121
2122 #if defined(THREADED_RTS)
2123   return waitThread_(tso,ret, rtsFalse);
2124 #else
2125   return waitThread_(tso,ret);
2126 #endif
2127 }
2128
2129 SchedulerStatus
2130 waitThread_(StgTSO *tso,
2131             /*out*/StgClosure **ret
2132 #if defined(THREADED_RTS)
2133             , rtsBool blockWaiting
2134 #endif
2135            )
2136 {
2137   StgMainThread *m;
2138   SchedulerStatus stat;
2139
2140   ACQUIRE_LOCK(&sched_mutex);
2141   
2142   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2143
2144   m->tso = tso;
2145   m->ret = ret;
2146   m->stat = NoStatus;
2147 #if defined(RTS_SUPPORTS_THREADS)
2148   initCondition(&m->wakeup);
2149 #endif
2150
2151   m->link = main_threads;
2152   main_threads = m;
2153
2154   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2155
2156 #if defined(RTS_SUPPORTS_THREADS)
2157
2158 # if defined(THREADED_RTS)
2159   if (!blockWaiting) {
2160     /* In the threaded case, the OS thread that called main()
2161      * gets to enter the RTS directly without going via another
2162      * task/thread.
2163      */
2164     RELEASE_LOCK(&sched_mutex);
2165     schedule();
2166     ASSERT(m->stat != NoStatus);
2167   } else 
2168 # endif
2169   {
2170     IF_DEBUG(scheduler, sched_belch("sfoo"));
2171     do {
2172       waitCondition(&m->wakeup, &sched_mutex);
2173     } while (m->stat == NoStatus);
2174   }
2175 #elif defined(GRAN)
2176   /* GranSim specific init */
2177   CurrentTSO = m->tso;                // the TSO to run
2178   procStatus[MainProc] = Busy;        // status of main PE
2179   CurrentProc = MainProc;             // PE to run it on
2180
2181   schedule();
2182 #else
2183   RELEASE_LOCK(&sched_mutex);
2184   schedule();
2185   ASSERT(m->stat != NoStatus);
2186 #endif
2187
2188   stat = m->stat;
2189
2190 #if defined(RTS_SUPPORTS_THREADS)
2191   closeCondition(&m->wakeup);
2192 #endif
2193
2194   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2195                               m->tso->id));
2196   free(m);
2197
2198 #if defined(THREADED_RTS)
2199   if (blockWaiting) 
2200 #endif
2201     RELEASE_LOCK(&sched_mutex);
2202
2203   return stat;
2204 }
2205
2206 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2207 //@subsection Run queue code 
2208
2209 #if 0
2210 /* 
2211    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2212        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2213        implicit global variable that has to be correct when calling these
2214        fcts -- HWL 
2215 */
2216
2217 /* Put the new thread on the head of the runnable queue.
2218  * The caller of createThread better push an appropriate closure
2219  * on this thread's stack before the scheduler is invoked.
2220  */
2221 static /* inline */ void
2222 add_to_run_queue(tso)
2223 StgTSO* tso; 
2224 {
2225   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2226   tso->link = run_queue_hd;
2227   run_queue_hd = tso;
2228   if (run_queue_tl == END_TSO_QUEUE) {
2229     run_queue_tl = tso;
2230   }
2231 }
2232
2233 /* Put the new thread at the end of the runnable queue. */
2234 static /* inline */ void
2235 push_on_run_queue(tso)
2236 StgTSO* tso; 
2237 {
2238   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2239   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2240   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2241   if (run_queue_hd == END_TSO_QUEUE) {
2242     run_queue_hd = tso;
2243   } else {
2244     run_queue_tl->link = tso;
2245   }
2246   run_queue_tl = tso;
2247 }
2248
2249 /* 
2250    Should be inlined because it's used very often in schedule.  The tso
2251    argument is actually only needed in GranSim, where we want to have the
2252    possibility to schedule *any* TSO on the run queue, irrespective of the
2253    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2254    the run queue and dequeue the tso, adjusting the links in the queue. 
2255 */
2256 //@cindex take_off_run_queue
2257 static /* inline */ StgTSO*
2258 take_off_run_queue(StgTSO *tso) {
2259   StgTSO *t, *prev;
2260
2261   /* 
2262      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2263
2264      if tso is specified, unlink that tso from the run_queue (doesn't have
2265      to be at the beginning of the queue); GranSim only 
2266   */
2267   if (tso!=END_TSO_QUEUE) {
2268     /* find tso in queue */
2269     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2270          t!=END_TSO_QUEUE && t!=tso;
2271          prev=t, t=t->link) 
2272       /* nothing */ ;
2273     ASSERT(t==tso);
2274     /* now actually dequeue the tso */
2275     if (prev!=END_TSO_QUEUE) {
2276       ASSERT(run_queue_hd!=t);
2277       prev->link = t->link;
2278     } else {
2279       /* t is at beginning of thread queue */
2280       ASSERT(run_queue_hd==t);
2281       run_queue_hd = t->link;
2282     }
2283     /* t is at end of thread queue */
2284     if (t->link==END_TSO_QUEUE) {
2285       ASSERT(t==run_queue_tl);
2286       run_queue_tl = prev;
2287     } else {
2288       ASSERT(run_queue_tl!=t);
2289     }
2290     t->link = END_TSO_QUEUE;
2291   } else {
2292     /* take tso from the beginning of the queue; std concurrent code */
2293     t = run_queue_hd;
2294     if (t != END_TSO_QUEUE) {
2295       run_queue_hd = t->link;
2296       t->link = END_TSO_QUEUE;
2297       if (run_queue_hd == END_TSO_QUEUE) {
2298         run_queue_tl = END_TSO_QUEUE;
2299       }
2300     }
2301   }
2302   return t;
2303 }
2304
2305 #endif /* 0 */
2306
2307 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2308 //@subsection Garbage Collextion Routines
2309
2310 /* ---------------------------------------------------------------------------
2311    Where are the roots that we know about?
2312
2313         - all the threads on the runnable queue
2314         - all the threads on the blocked queue
2315         - all the threads on the sleeping queue
2316         - all the thread currently executing a _ccall_GC
2317         - all the "main threads"
2318      
2319    ------------------------------------------------------------------------ */
2320
2321 /* This has to be protected either by the scheduler monitor, or by the
2322         garbage collection monitor (probably the latter).
2323         KH @ 25/10/99
2324 */
2325
2326 void
2327 GetRoots(evac_fn evac)
2328 {
2329 #if defined(GRAN)
2330   {
2331     nat i;
2332     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2333       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2334           evac((StgClosure **)&run_queue_hds[i]);
2335       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2336           evac((StgClosure **)&run_queue_tls[i]);
2337       
2338       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2339           evac((StgClosure **)&blocked_queue_hds[i]);
2340       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2341           evac((StgClosure **)&blocked_queue_tls[i]);
2342       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2343           evac((StgClosure **)&ccalling_threads[i]);
2344     }
2345   }
2346
2347   markEventQueue();
2348
2349 #else /* !GRAN */
2350   if (run_queue_hd != END_TSO_QUEUE) {
2351       ASSERT(run_queue_tl != END_TSO_QUEUE);
2352       evac((StgClosure **)&run_queue_hd);
2353       evac((StgClosure **)&run_queue_tl);
2354   }
2355   
2356   if (blocked_queue_hd != END_TSO_QUEUE) {
2357       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2358       evac((StgClosure **)&blocked_queue_hd);
2359       evac((StgClosure **)&blocked_queue_tl);
2360   }
2361   
2362   if (sleeping_queue != END_TSO_QUEUE) {
2363       evac((StgClosure **)&sleeping_queue);
2364   }
2365 #endif 
2366
2367   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2368       evac((StgClosure **)&suspended_ccalling_threads);
2369   }
2370
2371 #if defined(PAR) || defined(GRAN)
2372   markSparkQueue(evac);
2373 #endif
2374 }
2375
2376 /* -----------------------------------------------------------------------------
2377    performGC
2378
2379    This is the interface to the garbage collector from Haskell land.
2380    We provide this so that external C code can allocate and garbage
2381    collect when called from Haskell via _ccall_GC.
2382
2383    It might be useful to provide an interface whereby the programmer
2384    can specify more roots (ToDo).
2385    
2386    This needs to be protected by the GC condition variable above.  KH.
2387    -------------------------------------------------------------------------- */
2388
2389 void (*extra_roots)(evac_fn);
2390
2391 void
2392 performGC(void)
2393 {
2394   /* Obligated to hold this lock upon entry */
2395   ACQUIRE_LOCK(&sched_mutex);
2396   GarbageCollect(GetRoots,rtsFalse);
2397   RELEASE_LOCK(&sched_mutex);
2398 }
2399
2400 void
2401 performMajorGC(void)
2402 {
2403   ACQUIRE_LOCK(&sched_mutex);
2404   GarbageCollect(GetRoots,rtsTrue);
2405   RELEASE_LOCK(&sched_mutex);
2406 }
2407
2408 static void
2409 AllRoots(evac_fn evac)
2410 {
2411     GetRoots(evac);             // the scheduler's roots
2412     extra_roots(evac);          // the user's roots
2413 }
2414
2415 void
2416 performGCWithRoots(void (*get_roots)(evac_fn))
2417 {
2418   ACQUIRE_LOCK(&sched_mutex);
2419   extra_roots = get_roots;
2420   GarbageCollect(AllRoots,rtsFalse);
2421   RELEASE_LOCK(&sched_mutex);
2422 }
2423
2424 /* -----------------------------------------------------------------------------
2425    Stack overflow
2426
2427    If the thread has reached its maximum stack size, then raise the
2428    StackOverflow exception in the offending thread.  Otherwise
2429    relocate the TSO into a larger chunk of memory and adjust its stack
2430    size appropriately.
2431    -------------------------------------------------------------------------- */
2432
2433 static StgTSO *
2434 threadStackOverflow(StgTSO *tso)
2435 {
2436   nat new_stack_size, new_tso_size, diff, stack_words;
2437   StgPtr new_sp;
2438   StgTSO *dest;
2439
2440   IF_DEBUG(sanity,checkTSO(tso));
2441   if (tso->stack_size >= tso->max_stack_size) {
2442
2443     IF_DEBUG(gc,
2444              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2445                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2446              /* If we're debugging, just print out the top of the stack */
2447              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2448                                               tso->sp+64)));
2449
2450     /* Send this thread the StackOverflow exception */
2451     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2452     return tso;
2453   }
2454
2455   /* Try to double the current stack size.  If that takes us over the
2456    * maximum stack size for this thread, then use the maximum instead.
2457    * Finally round up so the TSO ends up as a whole number of blocks.
2458    */
2459   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2460   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2461                                        TSO_STRUCT_SIZE)/sizeof(W_);
2462   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2463   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2464
2465   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2466
2467   dest = (StgTSO *)allocate(new_tso_size);
2468   TICK_ALLOC_TSO(new_stack_size,0);
2469
2470   /* copy the TSO block and the old stack into the new area */
2471   memcpy(dest,tso,TSO_STRUCT_SIZE);
2472   stack_words = tso->stack + tso->stack_size - tso->sp;
2473   new_sp = (P_)dest + new_tso_size - stack_words;
2474   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2475
2476   /* relocate the stack pointers... */
2477   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2478   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2479   dest->sp    = new_sp;
2480   dest->stack_size = new_stack_size;
2481         
2482   /* and relocate the update frame list */
2483   relocate_stack(dest, diff);
2484
2485   /* Mark the old TSO as relocated.  We have to check for relocated
2486    * TSOs in the garbage collector and any primops that deal with TSOs.
2487    *
2488    * It's important to set the sp and su values to just beyond the end
2489    * of the stack, so we don't attempt to scavenge any part of the
2490    * dead TSO's stack.
2491    */
2492   tso->what_next = ThreadRelocated;
2493   tso->link = dest;
2494   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2495   tso->su = (StgUpdateFrame *)tso->sp;
2496   tso->why_blocked = NotBlocked;
2497   dest->mut_link = NULL;
2498
2499   IF_PAR_DEBUG(verbose,
2500                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2501                      tso->id, tso, tso->stack_size);
2502                /* If we're debugging, just print out the top of the stack */
2503                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2504                                                 tso->sp+64)));
2505   
2506   IF_DEBUG(sanity,checkTSO(tso));
2507 #if 0
2508   IF_DEBUG(scheduler,printTSO(dest));
2509 #endif
2510
2511   return dest;
2512 }
2513
2514 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2515 //@subsection Blocking Queue Routines
2516
2517 /* ---------------------------------------------------------------------------
2518    Wake up a queue that was blocked on some resource.
2519    ------------------------------------------------------------------------ */
2520
2521 #if defined(GRAN)
2522 static inline void
2523 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2524 {
2525 }
2526 #elif defined(PAR)
2527 static inline void
2528 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2529 {
2530   /* write RESUME events to log file and
2531      update blocked and fetch time (depending on type of the orig closure) */
2532   if (RtsFlags.ParFlags.ParStats.Full) {
2533     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2534                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2535                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2536     if (EMPTY_RUN_QUEUE())
2537       emitSchedule = rtsTrue;
2538
2539     switch (get_itbl(node)->type) {
2540         case FETCH_ME_BQ:
2541           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2542           break;
2543         case RBH:
2544         case FETCH_ME:
2545         case BLACKHOLE_BQ:
2546           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2547           break;
2548 #ifdef DIST
2549         case MVAR:
2550           break;
2551 #endif    
2552         default:
2553           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2554         }
2555       }
2556 }
2557 #endif
2558
2559 #if defined(GRAN)
2560 static StgBlockingQueueElement *
2561 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2562 {
2563     StgTSO *tso;
2564     PEs node_loc, tso_loc;
2565
2566     node_loc = where_is(node); // should be lifted out of loop
2567     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2568     tso_loc = where_is((StgClosure *)tso);
2569     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2570       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2571       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2572       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2573       // insertThread(tso, node_loc);
2574       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2575                 ResumeThread,
2576                 tso, node, (rtsSpark*)NULL);
2577       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2578       // len_local++;
2579       // len++;
2580     } else { // TSO is remote (actually should be FMBQ)
2581       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2582                                   RtsFlags.GranFlags.Costs.gunblocktime +
2583                                   RtsFlags.GranFlags.Costs.latency;
2584       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2585                 UnblockThread,
2586                 tso, node, (rtsSpark*)NULL);
2587       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2588       // len++;
2589     }
2590     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2591     IF_GRAN_DEBUG(bq,
2592                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2593                           (node_loc==tso_loc ? "Local" : "Global"), 
2594                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2595     tso->block_info.closure = NULL;
2596     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2597                              tso->id, tso));
2598 }
2599 #elif defined(PAR)
2600 static StgBlockingQueueElement *
2601 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2602 {
2603     StgBlockingQueueElement *next;
2604
2605     switch (get_itbl(bqe)->type) {
2606     case TSO:
2607       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2608       /* if it's a TSO just push it onto the run_queue */
2609       next = bqe->link;
2610       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2611       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2612       THREAD_RUNNABLE();
2613       unblockCount(bqe, node);
2614       /* reset blocking status after dumping event */
2615       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2616       break;
2617
2618     case BLOCKED_FETCH:
2619       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2620       next = bqe->link;
2621       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2622       PendingFetches = (StgBlockedFetch *)bqe;
2623       break;
2624
2625 # if defined(DEBUG)
2626       /* can ignore this case in a non-debugging setup; 
2627          see comments on RBHSave closures above */
2628     case CONSTR:
2629       /* check that the closure is an RBHSave closure */
2630       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2631              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2632              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2633       break;
2634
2635     default:
2636       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2637            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2638            (StgClosure *)bqe);
2639 # endif
2640     }
2641   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2642   return next;
2643 }
2644
2645 #else /* !GRAN && !PAR */
2646 static StgTSO *
2647 unblockOneLocked(StgTSO *tso)
2648 {
2649   StgTSO *next;
2650
2651   ASSERT(get_itbl(tso)->type == TSO);
2652   ASSERT(tso->why_blocked != NotBlocked);
2653   tso->why_blocked = NotBlocked;
2654   next = tso->link;
2655   PUSH_ON_RUN_QUEUE(tso);
2656   THREAD_RUNNABLE();
2657   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2658   return next;
2659 }
2660 #endif
2661
2662 #if defined(GRAN) || defined(PAR)
2663 inline StgBlockingQueueElement *
2664 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2665 {
2666   ACQUIRE_LOCK(&sched_mutex);
2667   bqe = unblockOneLocked(bqe, node);
2668   RELEASE_LOCK(&sched_mutex);
2669   return bqe;
2670 }
2671 #else
2672 inline StgTSO *
2673 unblockOne(StgTSO *tso)
2674 {
2675   ACQUIRE_LOCK(&sched_mutex);
2676   tso = unblockOneLocked(tso);
2677   RELEASE_LOCK(&sched_mutex);
2678   return tso;
2679 }
2680 #endif
2681
2682 #if defined(GRAN)
2683 void 
2684 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2685 {
2686   StgBlockingQueueElement *bqe;
2687   PEs node_loc;
2688   nat len = 0; 
2689
2690   IF_GRAN_DEBUG(bq, 
2691                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2692                       node, CurrentProc, CurrentTime[CurrentProc], 
2693                       CurrentTSO->id, CurrentTSO));
2694
2695   node_loc = where_is(node);
2696
2697   ASSERT(q == END_BQ_QUEUE ||
2698          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2699          get_itbl(q)->type == CONSTR); // closure (type constructor)
2700   ASSERT(is_unique(node));
2701
2702   /* FAKE FETCH: magically copy the node to the tso's proc;
2703      no Fetch necessary because in reality the node should not have been 
2704      moved to the other PE in the first place
2705   */
2706   if (CurrentProc!=node_loc) {
2707     IF_GRAN_DEBUG(bq, 
2708                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2709                         node, node_loc, CurrentProc, CurrentTSO->id, 
2710                         // CurrentTSO, where_is(CurrentTSO),
2711                         node->header.gran.procs));
2712     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2713     IF_GRAN_DEBUG(bq, 
2714                   belch("## new bitmask of node %p is %#x",
2715                         node, node->header.gran.procs));
2716     if (RtsFlags.GranFlags.GranSimStats.Global) {
2717       globalGranStats.tot_fake_fetches++;
2718     }
2719   }
2720
2721   bqe = q;
2722   // ToDo: check: ASSERT(CurrentProc==node_loc);
2723   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2724     //next = bqe->link;
2725     /* 
2726        bqe points to the current element in the queue
2727        next points to the next element in the queue
2728     */
2729     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2730     //tso_loc = where_is(tso);
2731     len++;
2732     bqe = unblockOneLocked(bqe, node);
2733   }
2734
2735   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2736      the closure to make room for the anchor of the BQ */
2737   if (bqe!=END_BQ_QUEUE) {
2738     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2739     /*
2740     ASSERT((info_ptr==&RBH_Save_0_info) ||
2741            (info_ptr==&RBH_Save_1_info) ||
2742            (info_ptr==&RBH_Save_2_info));
2743     */
2744     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2745     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2746     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2747
2748     IF_GRAN_DEBUG(bq,
2749                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2750                         node, info_type(node)));
2751   }
2752
2753   /* statistics gathering */
2754   if (RtsFlags.GranFlags.GranSimStats.Global) {
2755     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2756     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2757     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2758     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2759   }
2760   IF_GRAN_DEBUG(bq,
2761                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2762                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2763 }
2764 #elif defined(PAR)
2765 void 
2766 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2767 {
2768   StgBlockingQueueElement *bqe;
2769
2770   ACQUIRE_LOCK(&sched_mutex);
2771
2772   IF_PAR_DEBUG(verbose, 
2773                belch("##-_ AwBQ for node %p on [%x]: ",
2774                      node, mytid));
2775 #ifdef DIST  
2776   //RFP
2777   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2778     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2779     return;
2780   }
2781 #endif
2782   
2783   ASSERT(q == END_BQ_QUEUE ||
2784          get_itbl(q)->type == TSO ||           
2785          get_itbl(q)->type == BLOCKED_FETCH || 
2786          get_itbl(q)->type == CONSTR); 
2787
2788   bqe = q;
2789   while (get_itbl(bqe)->type==TSO || 
2790          get_itbl(bqe)->type==BLOCKED_FETCH) {
2791     bqe = unblockOneLocked(bqe, node);
2792   }
2793   RELEASE_LOCK(&sched_mutex);
2794 }
2795
2796 #else   /* !GRAN && !PAR */
2797 void
2798 awakenBlockedQueue(StgTSO *tso)
2799 {
2800   ACQUIRE_LOCK(&sched_mutex);
2801   while (tso != END_TSO_QUEUE) {
2802     tso = unblockOneLocked(tso);
2803   }
2804   RELEASE_LOCK(&sched_mutex);
2805 }
2806 #endif
2807
2808 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2809 //@subsection Exception Handling Routines
2810
2811 /* ---------------------------------------------------------------------------
2812    Interrupt execution
2813    - usually called inside a signal handler so it mustn't do anything fancy.   
2814    ------------------------------------------------------------------------ */
2815
2816 void
2817 interruptStgRts(void)
2818 {
2819     interrupted    = 1;
2820     context_switch = 1;
2821 }
2822
2823 /* -----------------------------------------------------------------------------
2824    Unblock a thread
2825
2826    This is for use when we raise an exception in another thread, which
2827    may be blocked.
2828    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2829    -------------------------------------------------------------------------- */
2830
2831 #if defined(GRAN) || defined(PAR)
2832 /*
2833   NB: only the type of the blocking queue is different in GranSim and GUM
2834       the operations on the queue-elements are the same
2835       long live polymorphism!
2836 */
2837 static void
2838 unblockThread(StgTSO *tso)
2839 {
2840   StgBlockingQueueElement *t, **last;
2841
2842   ACQUIRE_LOCK(&sched_mutex);
2843   switch (tso->why_blocked) {
2844
2845   case NotBlocked:
2846     return;  /* not blocked */
2847
2848   case BlockedOnMVar:
2849     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2850     {
2851       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2852       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2853
2854       last = (StgBlockingQueueElement **)&mvar->head;
2855       for (t = (StgBlockingQueueElement *)mvar->head; 
2856            t != END_BQ_QUEUE; 
2857            last = &t->link, last_tso = t, t = t->link) {
2858         if (t == (StgBlockingQueueElement *)tso) {
2859           *last = (StgBlockingQueueElement *)tso->link;
2860           if (mvar->tail == tso) {
2861             mvar->tail = (StgTSO *)last_tso;
2862           }
2863           goto done;
2864         }
2865       }
2866       barf("unblockThread (MVAR): TSO not found");
2867     }
2868
2869   case BlockedOnBlackHole:
2870     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2871     {
2872       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2873
2874       last = &bq->blocking_queue;
2875       for (t = bq->blocking_queue; 
2876            t != END_BQ_QUEUE; 
2877            last = &t->link, t = t->link) {
2878         if (t == (StgBlockingQueueElement *)tso) {
2879           *last = (StgBlockingQueueElement *)tso->link;
2880           goto done;
2881         }
2882       }
2883       barf("unblockThread (BLACKHOLE): TSO not found");
2884     }
2885
2886   case BlockedOnException:
2887     {
2888       StgTSO *target  = tso->block_info.tso;
2889
2890       ASSERT(get_itbl(target)->type == TSO);
2891
2892       if (target->what_next == ThreadRelocated) {
2893           target = target->link;
2894           ASSERT(get_itbl(target)->type == TSO);
2895       }
2896
2897       ASSERT(target->blocked_exceptions != NULL);
2898
2899       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2900       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2901            t != END_BQ_QUEUE; 
2902            last = &t->link, t = t->link) {
2903         ASSERT(get_itbl(t)->type == TSO);
2904         if (t == (StgBlockingQueueElement *)tso) {
2905           *last = (StgBlockingQueueElement *)tso->link;
2906           goto done;
2907         }
2908       }
2909       barf("unblockThread (Exception): TSO not found");
2910     }
2911
2912   case BlockedOnRead:
2913   case BlockedOnWrite:
2914     {
2915       /* take TSO off blocked_queue */
2916       StgBlockingQueueElement *prev = NULL;
2917       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2918            prev = t, t = t->link) {
2919         if (t == (StgBlockingQueueElement *)tso) {
2920           if (prev == NULL) {
2921             blocked_queue_hd = (StgTSO *)t->link;
2922             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2923               blocked_queue_tl = END_TSO_QUEUE;
2924             }
2925           } else {
2926             prev->link = t->link;
2927             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2928               blocked_queue_tl = (StgTSO *)prev;
2929             }
2930           }
2931           goto done;
2932         }
2933       }
2934       barf("unblockThread (I/O): TSO not found");
2935     }
2936
2937   case BlockedOnDelay:
2938     {
2939       /* take TSO off sleeping_queue */
2940       StgBlockingQueueElement *prev = NULL;
2941       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2942            prev = t, t = t->link) {
2943         if (t == (StgBlockingQueueElement *)tso) {
2944           if (prev == NULL) {
2945             sleeping_queue = (StgTSO *)t->link;
2946           } else {
2947             prev->link = t->link;
2948           }
2949           goto done;
2950         }
2951       }
2952       barf("unblockThread (I/O): TSO not found");
2953     }
2954
2955   default:
2956     barf("unblockThread");
2957   }
2958
2959  done:
2960   tso->link = END_TSO_QUEUE;
2961   tso->why_blocked = NotBlocked;
2962   tso->block_info.closure = NULL;
2963   PUSH_ON_RUN_QUEUE(tso);
2964   RELEASE_LOCK(&sched_mutex);
2965 }
2966 #else
2967 static void
2968 unblockThread(StgTSO *tso)
2969 {
2970   StgTSO *t, **last;
2971
2972   ACQUIRE_LOCK(&sched_mutex);
2973   switch (tso->why_blocked) {
2974
2975   case NotBlocked:
2976     return;  /* not blocked */
2977
2978   case BlockedOnMVar:
2979     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2980     {
2981       StgTSO *last_tso = END_TSO_QUEUE;
2982       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2983
2984       last = &mvar->head;
2985       for (t = mvar->head; t != END_TSO_QUEUE; 
2986            last = &t->link, last_tso = t, t = t->link) {
2987         if (t == tso) {
2988           *last = tso->link;
2989           if (mvar->tail == tso) {
2990             mvar->tail = last_tso;
2991           }
2992           goto done;
2993         }
2994       }
2995       barf("unblockThread (MVAR): TSO not found");
2996     }
2997
2998   case BlockedOnBlackHole:
2999     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3000     {
3001       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3002
3003       last = &bq->blocking_queue;
3004       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3005            last = &t->link, t = t->link) {
3006         if (t == tso) {
3007           *last = tso->link;
3008           goto done;
3009         }
3010       }
3011       barf("unblockThread (BLACKHOLE): TSO not found");
3012     }
3013
3014   case BlockedOnException:
3015     {
3016       StgTSO *target  = tso->block_info.tso;
3017
3018       ASSERT(get_itbl(target)->type == TSO);
3019
3020       while (target->what_next == ThreadRelocated) {
3021           target = target->link;
3022           ASSERT(get_itbl(target)->type == TSO);
3023       }
3024       
3025       ASSERT(target->blocked_exceptions != NULL);
3026
3027       last = &target->blocked_exceptions;
3028       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3029            last = &t->link, t = t->link) {
3030         ASSERT(get_itbl(t)->type == TSO);
3031         if (t == tso) {
3032           *last = tso->link;
3033           goto done;
3034         }
3035       }
3036       barf("unblockThread (Exception): TSO not found");
3037     }
3038
3039   case BlockedOnRead:
3040   case BlockedOnWrite:
3041     {
3042       StgTSO *prev = NULL;
3043       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3044            prev = t, t = t->link) {
3045         if (t == tso) {
3046           if (prev == NULL) {
3047             blocked_queue_hd = t->link;
3048             if (blocked_queue_tl == t) {
3049               blocked_queue_tl = END_TSO_QUEUE;
3050             }
3051           } else {
3052             prev->link = t->link;
3053             if (blocked_queue_tl == t) {
3054               blocked_queue_tl = prev;
3055             }
3056           }
3057           goto done;
3058         }
3059       }
3060       barf("unblockThread (I/O): TSO not found");
3061     }
3062
3063   case BlockedOnDelay:
3064     {
3065       StgTSO *prev = NULL;
3066       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3067            prev = t, t = t->link) {
3068         if (t == tso) {
3069           if (prev == NULL) {
3070             sleeping_queue = t->link;
3071           } else {
3072             prev->link = t->link;
3073           }
3074           goto done;
3075         }
3076       }
3077       barf("unblockThread (I/O): TSO not found");
3078     }
3079
3080   default:
3081     barf("unblockThread");
3082   }
3083
3084  done:
3085   tso->link = END_TSO_QUEUE;
3086   tso->why_blocked = NotBlocked;
3087   tso->block_info.closure = NULL;
3088   PUSH_ON_RUN_QUEUE(tso);
3089   RELEASE_LOCK(&sched_mutex);
3090 }
3091 #endif
3092
3093 /* -----------------------------------------------------------------------------
3094  * raiseAsync()
3095  *
3096  * The following function implements the magic for raising an
3097  * asynchronous exception in an existing thread.
3098  *
3099  * We first remove the thread from any queue on which it might be
3100  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3101  *
3102  * We strip the stack down to the innermost CATCH_FRAME, building
3103  * thunks in the heap for all the active computations, so they can 
3104  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3105  * an application of the handler to the exception, and push it on
3106  * the top of the stack.
3107  * 
3108  * How exactly do we save all the active computations?  We create an
3109  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3110  * AP_UPDs pushes everything from the corresponding update frame
3111  * upwards onto the stack.  (Actually, it pushes everything up to the
3112  * next update frame plus a pointer to the next AP_UPD object.
3113  * Entering the next AP_UPD object pushes more onto the stack until we
3114  * reach the last AP_UPD object - at which point the stack should look
3115  * exactly as it did when we killed the TSO and we can continue
3116  * execution by entering the closure on top of the stack.
3117  *
3118  * We can also kill a thread entirely - this happens if either (a) the 
3119  * exception passed to raiseAsync is NULL, or (b) there's no
3120  * CATCH_FRAME on the stack.  In either case, we strip the entire
3121  * stack and replace the thread with a zombie.
3122  *
3123  * -------------------------------------------------------------------------- */
3124  
3125 void 
3126 deleteThread(StgTSO *tso)
3127 {
3128   raiseAsync(tso,NULL);
3129 }
3130
3131 void
3132 raiseAsync(StgTSO *tso, StgClosure *exception)
3133 {
3134   StgUpdateFrame* su = tso->su;
3135   StgPtr          sp = tso->sp;
3136   
3137   /* Thread already dead? */
3138   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3139     return;
3140   }
3141
3142   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3143
3144   /* Remove it from any blocking queues */
3145   unblockThread(tso);
3146
3147   /* The stack freezing code assumes there's a closure pointer on
3148    * the top of the stack.  This isn't always the case with compiled
3149    * code, so we have to push a dummy closure on the top which just
3150    * returns to the next return address on the stack.
3151    */
3152   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3153     *(--sp) = (W_)&stg_dummy_ret_closure;
3154   }
3155
3156   while (1) {
3157     nat words = ((P_)su - (P_)sp) - 1;
3158     nat i;
3159     StgAP_UPD * ap;
3160
3161     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3162      * then build the THUNK raise(exception), and leave it on
3163      * top of the CATCH_FRAME ready to enter.
3164      */
3165     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3166 #ifdef PROFILING
3167       StgCatchFrame *cf = (StgCatchFrame *)su;
3168 #endif
3169       StgClosure *raise;
3170
3171       /* we've got an exception to raise, so let's pass it to the
3172        * handler in this frame.
3173        */
3174       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3175       TICK_ALLOC_SE_THK(1,0);
3176       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3177       raise->payload[0] = exception;
3178
3179       /* throw away the stack from Sp up to the CATCH_FRAME.
3180        */
3181       sp = (P_)su - 1;
3182
3183       /* Ensure that async excpetions are blocked now, so we don't get
3184        * a surprise exception before we get around to executing the
3185        * handler.
3186        */
3187       if (tso->blocked_exceptions == NULL) {
3188           tso->blocked_exceptions = END_TSO_QUEUE;
3189       }
3190
3191       /* Put the newly-built THUNK on top of the stack, ready to execute
3192        * when the thread restarts.
3193        */
3194       sp[0] = (W_)raise;
3195       tso->sp = sp;
3196       tso->su = su;
3197       tso->what_next = ThreadEnterGHC;
3198       IF_DEBUG(sanity, checkTSO(tso));
3199       return;
3200     }
3201
3202     /* First build an AP_UPD consisting of the stack chunk above the
3203      * current update frame, with the top word on the stack as the
3204      * fun field.
3205      */
3206     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3207     
3208     ASSERT(words >= 0);
3209     
3210     ap->n_args = words;
3211     ap->fun    = (StgClosure *)sp[0];
3212     sp++;
3213     for(i=0; i < (nat)words; ++i) {
3214       ap->payload[i] = (StgClosure *)*sp++;
3215     }
3216     
3217     switch (get_itbl(su)->type) {
3218       
3219     case UPDATE_FRAME:
3220       {
3221         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3222         TICK_ALLOC_UP_THK(words+1,0);
3223         
3224         IF_DEBUG(scheduler,
3225                  fprintf(stderr,  "scheduler: Updating ");
3226                  printPtr((P_)su->updatee); 
3227                  fprintf(stderr,  " with ");
3228                  printObj((StgClosure *)ap);
3229                  );
3230         
3231         /* Replace the updatee with an indirection - happily
3232          * this will also wake up any threads currently
3233          * waiting on the result.
3234          *
3235          * Warning: if we're in a loop, more than one update frame on
3236          * the stack may point to the same object.  Be careful not to
3237          * overwrite an IND_OLDGEN in this case, because we'll screw
3238          * up the mutable lists.  To be on the safe side, don't
3239          * overwrite any kind of indirection at all.  See also
3240          * threadSqueezeStack in GC.c, where we have to make a similar
3241          * check.
3242          */
3243         if (!closure_IND(su->updatee)) {
3244             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3245         }
3246         su = su->link;
3247         sp += sizeofW(StgUpdateFrame) -1;
3248         sp[0] = (W_)ap; /* push onto stack */
3249         break;
3250       }
3251
3252     case CATCH_FRAME:
3253       {
3254         StgCatchFrame *cf = (StgCatchFrame *)su;
3255         StgClosure* o;
3256         
3257         /* We want a PAP, not an AP_UPD.  Fortunately, the
3258          * layout's the same.
3259          */
3260         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3261         TICK_ALLOC_UPD_PAP(words+1,0);
3262         
3263         /* now build o = FUN(catch,ap,handler) */
3264         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3265         TICK_ALLOC_FUN(2,0);
3266         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3267         o->payload[0] = (StgClosure *)ap;
3268         o->payload[1] = cf->handler;
3269         
3270         IF_DEBUG(scheduler,
3271                  fprintf(stderr,  "scheduler: Built ");
3272                  printObj((StgClosure *)o);
3273                  );
3274         
3275         /* pop the old handler and put o on the stack */
3276         su = cf->link;
3277         sp += sizeofW(StgCatchFrame) - 1;
3278         sp[0] = (W_)o;
3279         break;
3280       }
3281       
3282     case SEQ_FRAME:
3283       {
3284         StgSeqFrame *sf = (StgSeqFrame *)su;
3285         StgClosure* o;
3286         
3287         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3288         TICK_ALLOC_UPD_PAP(words+1,0);
3289         
3290         /* now build o = FUN(seq,ap) */
3291         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3292         TICK_ALLOC_SE_THK(1,0);
3293         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3294         o->payload[0] = (StgClosure *)ap;
3295         
3296         IF_DEBUG(scheduler,
3297                  fprintf(stderr,  "scheduler: Built ");
3298                  printObj((StgClosure *)o);
3299                  );
3300         
3301         /* pop the old handler and put o on the stack */
3302         su = sf->link;
3303         sp += sizeofW(StgSeqFrame) - 1;
3304         sp[0] = (W_)o;
3305         break;
3306       }
3307       
3308     case STOP_FRAME:
3309       /* We've stripped the entire stack, the thread is now dead. */
3310       sp += sizeofW(StgStopFrame) - 1;
3311       sp[0] = (W_)exception;    /* save the exception */
3312       tso->what_next = ThreadKilled;
3313       tso->su = (StgUpdateFrame *)(sp+1);
3314       tso->sp = sp;
3315       return;
3316
3317     default:
3318       barf("raiseAsync");
3319     }
3320   }
3321   barf("raiseAsync");
3322 }
3323
3324 /* -----------------------------------------------------------------------------
3325    resurrectThreads is called after garbage collection on the list of
3326    threads found to be garbage.  Each of these threads will be woken
3327    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3328    on an MVar, or NonTermination if the thread was blocked on a Black
3329    Hole.
3330    -------------------------------------------------------------------------- */
3331
3332 void
3333 resurrectThreads( StgTSO *threads )
3334 {
3335   StgTSO *tso, *next;
3336
3337   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3338     next = tso->global_link;
3339     tso->global_link = all_threads;
3340     all_threads = tso;
3341     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3342
3343     switch (tso->why_blocked) {
3344     case BlockedOnMVar:
3345     case BlockedOnException:
3346       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3347       break;
3348     case BlockedOnBlackHole:
3349       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3350       break;
3351     case NotBlocked:
3352       /* This might happen if the thread was blocked on a black hole
3353        * belonging to a thread that we've just woken up (raiseAsync
3354        * can wake up threads, remember...).
3355        */
3356       continue;
3357     default:
3358       barf("resurrectThreads: thread blocked in a strange way");
3359     }
3360   }
3361 }
3362
3363 /* -----------------------------------------------------------------------------
3364  * Blackhole detection: if we reach a deadlock, test whether any
3365  * threads are blocked on themselves.  Any threads which are found to
3366  * be self-blocked get sent a NonTermination exception.
3367  *
3368  * This is only done in a deadlock situation in order to avoid
3369  * performance overhead in the normal case.
3370  * -------------------------------------------------------------------------- */
3371
3372 static void
3373 detectBlackHoles( void )
3374 {
3375     StgTSO *t = all_threads;
3376     StgUpdateFrame *frame;
3377     StgClosure *blocked_on;
3378
3379     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3380
3381         while (t->what_next == ThreadRelocated) {
3382             t = t->link;
3383             ASSERT(get_itbl(t)->type == TSO);
3384         }
3385       
3386         if (t->why_blocked != BlockedOnBlackHole) {
3387             continue;
3388         }
3389
3390         blocked_on = t->block_info.closure;
3391
3392         for (frame = t->su; ; frame = frame->link) {
3393             switch (get_itbl(frame)->type) {
3394
3395             case UPDATE_FRAME:
3396                 if (frame->updatee == blocked_on) {
3397                     /* We are blocking on one of our own computations, so
3398                      * send this thread the NonTermination exception.  
3399                      */
3400                     IF_DEBUG(scheduler, 
3401                              sched_belch("thread %d is blocked on itself", t->id));
3402                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3403                     goto done;
3404                 }
3405                 else {
3406                     continue;
3407                 }
3408
3409             case CATCH_FRAME:
3410             case SEQ_FRAME:
3411                 continue;
3412                 
3413             case STOP_FRAME:
3414                 break;
3415             }
3416             break;
3417         }
3418
3419     done: ;
3420     }   
3421 }
3422
3423 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3424 //@subsection Debugging Routines
3425
3426 /* -----------------------------------------------------------------------------
3427    Debugging: why is a thread blocked
3428    -------------------------------------------------------------------------- */
3429
3430 #ifdef DEBUG
3431
3432 void
3433 printThreadBlockage(StgTSO *tso)
3434 {
3435   switch (tso->why_blocked) {
3436   case BlockedOnRead:
3437     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3438     break;
3439   case BlockedOnWrite:
3440     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3441     break;
3442   case BlockedOnDelay:
3443     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3444     break;
3445   case BlockedOnMVar:
3446     fprintf(stderr,"is blocked on an MVar");
3447     break;
3448   case BlockedOnException:
3449     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3450             tso->block_info.tso->id);
3451     break;
3452   case BlockedOnBlackHole:
3453     fprintf(stderr,"is blocked on a black hole");
3454     break;
3455   case NotBlocked:
3456     fprintf(stderr,"is not blocked");
3457     break;
3458 #if defined(PAR)
3459   case BlockedOnGA:
3460     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3461             tso->block_info.closure, info_type(tso->block_info.closure));
3462     break;
3463   case BlockedOnGA_NoSend:
3464     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3465             tso->block_info.closure, info_type(tso->block_info.closure));
3466     break;
3467 #endif
3468 #if defined(RTS_SUPPORTS_THREADS)
3469   case BlockedOnCCall:
3470     fprintf(stderr,"is blocked on an external call");
3471     break;
3472 #endif
3473   default:
3474     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3475          tso->why_blocked, tso->id, tso);
3476   }
3477 }
3478
3479 void
3480 printThreadStatus(StgTSO *tso)
3481 {
3482   switch (tso->what_next) {
3483   case ThreadKilled:
3484     fprintf(stderr,"has been killed");
3485     break;
3486   case ThreadComplete:
3487     fprintf(stderr,"has completed");
3488     break;
3489   default:
3490     printThreadBlockage(tso);
3491   }
3492 }
3493
3494 void
3495 printAllThreads(void)
3496 {
3497   StgTSO *t;
3498
3499 # if defined(GRAN)
3500   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3501   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3502                        time_string, rtsFalse/*no commas!*/);
3503
3504   sched_belch("all threads at [%s]:", time_string);
3505 # elif defined(PAR)
3506   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3507   ullong_format_string(CURRENT_TIME,
3508                        time_string, rtsFalse/*no commas!*/);
3509
3510   sched_belch("all threads at [%s]:", time_string);
3511 # else
3512   sched_belch("all threads:");
3513 # endif
3514
3515   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3516     fprintf(stderr, "\tthread %d ", t->id);
3517     if (t->label) fprintf(stderr,"[\"%s\"] ",t->label);
3518     printThreadStatus(t);
3519     fprintf(stderr,"\n");
3520   }
3521 }
3522     
3523 /* 
3524    Print a whole blocking queue attached to node (debugging only).
3525 */
3526 //@cindex print_bq
3527 # if defined(PAR)
3528 void 
3529 print_bq (StgClosure *node)
3530 {
3531   StgBlockingQueueElement *bqe;
3532   StgTSO *tso;
3533   rtsBool end;
3534
3535   fprintf(stderr,"## BQ of closure %p (%s): ",
3536           node, info_type(node));
3537
3538   /* should cover all closures that may have a blocking queue */
3539   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3540          get_itbl(node)->type == FETCH_ME_BQ ||
3541          get_itbl(node)->type == RBH ||
3542          get_itbl(node)->type == MVAR);
3543     
3544   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3545
3546   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3547 }
3548
3549 /* 
3550    Print a whole blocking queue starting with the element bqe.
3551 */
3552 void 
3553 print_bqe (StgBlockingQueueElement *bqe)
3554 {
3555   rtsBool end;
3556
3557   /* 
3558      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3559   */
3560   for (end = (bqe==END_BQ_QUEUE);
3561        !end; // iterate until bqe points to a CONSTR
3562        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3563        bqe = end ? END_BQ_QUEUE : bqe->link) {
3564     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3565     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3566     /* types of closures that may appear in a blocking queue */
3567     ASSERT(get_itbl(bqe)->type == TSO ||           
3568            get_itbl(bqe)->type == BLOCKED_FETCH || 
3569            get_itbl(bqe)->type == CONSTR); 
3570     /* only BQs of an RBH end with an RBH_Save closure */
3571     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3572
3573     switch (get_itbl(bqe)->type) {
3574     case TSO:
3575       fprintf(stderr," TSO %u (%x),",
3576               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3577       break;
3578     case BLOCKED_FETCH:
3579       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3580               ((StgBlockedFetch *)bqe)->node, 
3581               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3582               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3583               ((StgBlockedFetch *)bqe)->ga.weight);
3584       break;
3585     case CONSTR:
3586       fprintf(stderr," %s (IP %p),",
3587               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3588                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3589                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3590                "RBH_Save_?"), get_itbl(bqe));
3591       break;
3592     default:
3593       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3594            info_type((StgClosure *)bqe)); // , node, info_type(node));
3595       break;
3596     }
3597   } /* for */
3598   fputc('\n', stderr);
3599 }
3600 # elif defined(GRAN)
3601 void 
3602 print_bq (StgClosure *node)
3603 {
3604   StgBlockingQueueElement *bqe;
3605   PEs node_loc, tso_loc;
3606   rtsBool end;
3607
3608   /* should cover all closures that may have a blocking queue */
3609   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3610          get_itbl(node)->type == FETCH_ME_BQ ||
3611          get_itbl(node)->type == RBH);
3612     
3613   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3614   node_loc = where_is(node);
3615
3616   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3617           node, info_type(node), node_loc);
3618
3619   /* 
3620      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3621   */
3622   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3623        !end; // iterate until bqe points to a CONSTR
3624        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3625     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3626     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3627     /* types of closures that may appear in a blocking queue */
3628     ASSERT(get_itbl(bqe)->type == TSO ||           
3629            get_itbl(bqe)->type == CONSTR); 
3630     /* only BQs of an RBH end with an RBH_Save closure */
3631     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3632
3633     tso_loc = where_is((StgClosure *)bqe);
3634     switch (get_itbl(bqe)->type) {
3635     case TSO:
3636       fprintf(stderr," TSO %d (%p) on [PE %d],",
3637               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3638       break;
3639     case CONSTR:
3640       fprintf(stderr," %s (IP %p),",
3641               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3642                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3643                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3644                "RBH_Save_?"), get_itbl(bqe));
3645       break;
3646     default:
3647       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3648            info_type((StgClosure *)bqe), node, info_type(node));
3649       break;
3650     }
3651   } /* for */
3652   fputc('\n', stderr);
3653 }
3654 #else
3655 /* 
3656    Nice and easy: only TSOs on the blocking queue
3657 */
3658 void 
3659 print_bq (StgClosure *node)
3660 {
3661   StgTSO *tso;
3662
3663   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3664   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3665        tso != END_TSO_QUEUE; 
3666        tso=tso->link) {
3667     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3668     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3669     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3670   }
3671   fputc('\n', stderr);
3672 }
3673 # endif
3674
3675 #if defined(PAR)
3676 static nat
3677 run_queue_len(void)
3678 {
3679   nat i;
3680   StgTSO *tso;
3681
3682   for (i=0, tso=run_queue_hd; 
3683        tso != END_TSO_QUEUE;
3684        i++, tso=tso->link)
3685     /* nothing */
3686
3687   return i;
3688 }
3689 #endif
3690
3691 static void
3692 sched_belch(char *s, ...)
3693 {
3694   va_list ap;
3695   va_start(ap,s);
3696 #ifdef SMP
3697   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3698 #elif defined(PAR)
3699   fprintf(stderr, "== ");
3700 #else
3701   fprintf(stderr, "scheduler: ");
3702 #endif
3703   vfprintf(stderr, s, ap);
3704   fprintf(stderr, "\n");
3705 }
3706
3707 #endif /* DEBUG */
3708
3709
3710 //@node Index,  , Debugging Routines, Main scheduling code
3711 //@subsection Index
3712
3713 //@index
3714 //* StgMainThread::  @cindex\s-+StgMainThread
3715 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3716 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3717 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3718 //* context_switch::  @cindex\s-+context_switch
3719 //* createThread::  @cindex\s-+createThread
3720 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3721 //* initScheduler::  @cindex\s-+initScheduler
3722 //* interrupted::  @cindex\s-+interrupted
3723 //* next_thread_id::  @cindex\s-+next_thread_id
3724 //* print_bq::  @cindex\s-+print_bq
3725 //* run_queue_hd::  @cindex\s-+run_queue_hd
3726 //* run_queue_tl::  @cindex\s-+run_queue_tl
3727 //* sched_mutex::  @cindex\s-+sched_mutex
3728 //* schedule::  @cindex\s-+schedule
3729 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3730 //* term_mutex::  @cindex\s-+term_mutex
3731 //@end index