[project @ 2002-01-24 07:50:01 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.113 2002/01/24 07:50:02 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "PosixSource.h"
78 #include "Rts.h"
79 #include "SchedAPI.h"
80 #include "RtsUtils.h"
81 #include "RtsFlags.h"
82 #include "Storage.h"
83 #include "StgRun.h"
84 #include "StgStartup.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #ifdef PROFILING
99 #include "Proftimer.h"
100 #include "ProfHeap.h"
101 #endif
102 #if defined(GRAN) || defined(PAR)
103 # include "GranSimRts.h"
104 # include "GranSim.h"
105 # include "ParallelRts.h"
106 # include "Parallel.h"
107 # include "ParallelDebug.h"
108 # include "FetchMe.h"
109 # include "HLC.h"
110 #endif
111 #include "Sparks.h"
112
113 #include <stdarg.h>
114
115 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
116 //@subsection Variables and Data structures
117
118 /* Main threads:
119  *
120  * These are the threads which clients have requested that we run.  
121  *
122  * In an SMP build, we might have several concurrent clients all
123  * waiting for results, and each one will wait on a condition variable
124  * until the result is available.
125  *
126  * In non-SMP, clients are strictly nested: the first client calls
127  * into the RTS, which might call out again to C with a _ccall_GC, and
128  * eventually re-enter the RTS.
129  *
130  * Main threads information is kept in a linked list:
131  */
132 //@cindex StgMainThread
133 typedef struct StgMainThread_ {
134   StgTSO *         tso;
135   SchedulerStatus  stat;
136   StgClosure **    ret;
137 #ifdef SMP
138   pthread_cond_t wakeup;
139 #endif
140   struct StgMainThread_ *link;
141 } StgMainThread;
142
143 /* Main thread queue.
144  * Locks required: sched_mutex.
145  */
146 static StgMainThread *main_threads;
147
148 /* Thread queues.
149  * Locks required: sched_mutex.
150  */
151 #if defined(GRAN)
152
153 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
154 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
155
156 /* 
157    In GranSim we have a runable and a blocked queue for each processor.
158    In order to minimise code changes new arrays run_queue_hds/tls
159    are created. run_queue_hd is then a short cut (macro) for
160    run_queue_hds[CurrentProc] (see GranSim.h).
161    -- HWL
162 */
163 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
164 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
165 StgTSO *ccalling_threadss[MAX_PROC];
166 /* We use the same global list of threads (all_threads) in GranSim as in
167    the std RTS (i.e. we are cheating). However, we don't use this list in
168    the GranSim specific code at the moment (so we are only potentially
169    cheating).  */
170
171 #else /* !GRAN */
172
173 StgTSO *run_queue_hd, *run_queue_tl;
174 StgTSO *blocked_queue_hd, *blocked_queue_tl;
175 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
176
177 #endif
178
179 /* Linked list of all threads.
180  * Used for detecting garbage collected threads.
181  */
182 StgTSO *all_threads;
183
184 /* Threads suspended in _ccall_GC.
185  */
186 static StgTSO *suspended_ccalling_threads;
187
188 static StgTSO *threadStackOverflow(StgTSO *tso);
189
190 /* KH: The following two flags are shared memory locations.  There is no need
191        to lock them, since they are only unset at the end of a scheduler
192        operation.
193 */
194
195 /* flag set by signal handler to precipitate a context switch */
196 //@cindex context_switch
197 nat context_switch;
198
199 /* if this flag is set as well, give up execution */
200 //@cindex interrupted
201 rtsBool interrupted;
202
203 /* Next thread ID to allocate.
204  * Locks required: sched_mutex
205  */
206 //@cindex next_thread_id
207 StgThreadID next_thread_id = 1;
208
209 /*
210  * Pointers to the state of the current thread.
211  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
212  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
213  */
214  
215 /* The smallest stack size that makes any sense is:
216  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
217  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
218  *  + 1                       (the realworld token for an IO thread)
219  *  + 1                       (the closure to enter)
220  *
221  * A thread with this stack will bomb immediately with a stack
222  * overflow, which will increase its stack size.  
223  */
224
225 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
226
227 /* Free capability list.
228  * Locks required: sched_mutex.
229  */
230 #ifdef SMP
231 Capability *free_capabilities; /* Available capabilities for running threads */
232 nat n_free_capabilities;       /* total number of available capabilities */
233 #else
234 Capability MainCapability;     /* for non-SMP, we have one global capability */
235 #endif
236
237 #if defined(GRAN)
238 StgTSO *CurrentTSO;
239 #endif
240
241 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
242  *  exists - earlier gccs apparently didn't.
243  *  -= chak
244  */
245 StgTSO dummy_tso;
246
247 rtsBool ready_to_gc;
248
249 /* All our current task ids, saved in case we need to kill them later.
250  */
251 #ifdef SMP
252 //@cindex task_ids
253 task_info *task_ids;
254 #endif
255
256 void            addToBlockedQueue ( StgTSO *tso );
257
258 static void     schedule          ( void );
259        void     interruptStgRts   ( void );
260 #if defined(GRAN)
261 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
262 #else
263 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
264 #endif
265
266 static void     detectBlackHoles  ( void );
267
268 #ifdef DEBUG
269 static void sched_belch(char *s, ...);
270 #endif
271
272 #ifdef SMP
273 //@cindex sched_mutex
274 //@cindex term_mutex
275 //@cindex thread_ready_cond
276 //@cindex gc_pending_cond
277 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
278 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
279 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
280 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
281
282 nat await_death;
283 #endif
284
285 #if defined(PAR)
286 StgTSO *LastTSO;
287 rtsTime TimeOfLastYield;
288 rtsBool emitSchedule = rtsTrue;
289 #endif
290
291 #if DEBUG
292 char *whatNext_strs[] = {
293   "ThreadEnterGHC",
294   "ThreadRunGHC",
295   "ThreadEnterInterp",
296   "ThreadKilled",
297   "ThreadComplete"
298 };
299
300 char *threadReturnCode_strs[] = {
301   "HeapOverflow",                       /* might also be StackOverflow */
302   "StackOverflow",
303   "ThreadYielding",
304   "ThreadBlocked",
305   "ThreadFinished"
306 };
307 #endif
308
309 #if defined(PAR)
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);  
312 #endif
313
314 /*
315  * The thread state for the main thread.
316 // ToDo: check whether not needed any more
317 StgTSO   *MainTSO;
318  */
319
320 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
321 //@subsection Main scheduling loop
322
323 /* ---------------------------------------------------------------------------
324    Main scheduling loop.
325
326    We use round-robin scheduling, each thread returning to the
327    scheduler loop when one of these conditions is detected:
328
329       * out of heap space
330       * timer expires (thread yields)
331       * thread blocks
332       * thread ends
333       * stack overflow
334
335    Locking notes:  we acquire the scheduler lock once at the beginning
336    of the scheduler loop, and release it when
337     
338       * running a thread, or
339       * waiting for work, or
340       * waiting for a GC to complete.
341
342    GRAN version:
343      In a GranSim setup this loop iterates over the global event queue.
344      This revolves around the global event queue, which determines what 
345      to do next. Therefore, it's more complicated than either the 
346      concurrent or the parallel (GUM) setup.
347
348    GUM version:
349      GUM iterates over incoming messages.
350      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
351      and sends out a fish whenever it has nothing to do; in-between
352      doing the actual reductions (shared code below) it processes the
353      incoming messages and deals with delayed operations 
354      (see PendingFetches).
355      This is not the ugliest code you could imagine, but it's bloody close.
356
357    ------------------------------------------------------------------------ */
358 //@cindex schedule
359 static void
360 schedule( void )
361 {
362   StgTSO *t;
363   Capability *cap;
364   StgThreadReturnCode ret;
365 #if defined(GRAN)
366   rtsEvent *event;
367 #elif defined(PAR)
368   StgSparkPool *pool;
369   rtsSpark spark;
370   StgTSO *tso;
371   GlobalTaskId pe;
372   rtsBool receivedFinish = rtsFalse;
373 # if defined(DEBUG)
374   nat tp_size, sp_size; // stats only
375 # endif
376 #endif
377   rtsBool was_interrupted = rtsFalse;
378   
379   ACQUIRE_LOCK(&sched_mutex);
380
381 #if defined(GRAN)
382
383   /* set up first event to get things going */
384   /* ToDo: assign costs for system setup and init MainTSO ! */
385   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
386             ContinueThread, 
387             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
388
389   IF_DEBUG(gran,
390            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391            G_TSO(CurrentTSO, 5));
392
393   if (RtsFlags.GranFlags.Light) {
394     /* Save current time; GranSim Light only */
395     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
396   }      
397
398   event = get_next_event();
399
400   while (event!=(rtsEvent*)NULL) {
401     /* Choose the processor with the next event */
402     CurrentProc = event->proc;
403     CurrentTSO = event->tso;
404
405 #elif defined(PAR)
406
407   while (!receivedFinish) {    /* set by processMessages */
408                                /* when receiving PP_FINISH message         */ 
409 #else
410
411   while (1) {
412
413 #endif
414
415     IF_DEBUG(scheduler, printAllThreads());
416
417     /* If we're interrupted (the user pressed ^C, or some other
418      * termination condition occurred), kill all the currently running
419      * threads.
420      */
421     if (interrupted) {
422       IF_DEBUG(scheduler, sched_belch("interrupted"));
423       deleteAllThreads();
424       interrupted = rtsFalse;
425       was_interrupted = rtsTrue;
426     }
427
428     /* Go through the list of main threads and wake up any
429      * clients whose computations have finished.  ToDo: this
430      * should be done more efficiently without a linear scan
431      * of the main threads list, somehow...
432      */
433 #ifdef SMP
434     { 
435       StgMainThread *m, **prev;
436       prev = &main_threads;
437       for (m = main_threads; m != NULL; m = m->link) {
438         switch (m->tso->what_next) {
439         case ThreadComplete:
440           if (m->ret) {
441             *(m->ret) = (StgClosure *)m->tso->sp[0];
442           }
443           *prev = m->link;
444           m->stat = Success;
445           pthread_cond_broadcast(&m->wakeup);
446           break;
447         case ThreadKilled:
448           if (m->ret) *(m->ret) = NULL;
449           *prev = m->link;
450           if (was_interrupted) {
451             m->stat = Interrupted;
452           } else {
453             m->stat = Killed;
454           }
455           pthread_cond_broadcast(&m->wakeup);
456           break;
457         default:
458           break;
459         }
460       }
461     }
462
463 #else // not SMP
464
465 # if defined(PAR)
466     /* in GUM do this only on the Main PE */
467     if (IAmMainThread)
468 # endif
469     /* If our main thread has finished or been killed, return.
470      */
471     {
472       StgMainThread *m = main_threads;
473       if (m->tso->what_next == ThreadComplete
474           || m->tso->what_next == ThreadKilled) {
475         main_threads = main_threads->link;
476         if (m->tso->what_next == ThreadComplete) {
477           /* we finished successfully, fill in the return value */
478           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
479           m->stat = Success;
480           return;
481         } else {
482           if (m->ret) { *(m->ret) = NULL; };
483           if (was_interrupted) {
484             m->stat = Interrupted;
485           } else {
486             m->stat = Killed;
487           }
488           return;
489         }
490       }
491     }
492 #endif
493
494     /* Top up the run queue from our spark pool.  We try to make the
495      * number of threads in the run queue equal to the number of
496      * free capabilities.
497      *
498      * Disable spark support in SMP for now, non-essential & requires
499      * a little bit of work to make it compile cleanly. -- sof 1/02.
500      */
501 #if 0 /* defined(SMP) */
502     {
503       nat n = n_free_capabilities;
504       StgTSO *tso = run_queue_hd;
505
506       /* Count the run queue */
507       while (n > 0 && tso != END_TSO_QUEUE) {
508         tso = tso->link;
509         n--;
510       }
511
512       for (; n > 0; n--) {
513         StgClosure *spark;
514         spark = findSpark(rtsFalse);
515         if (spark == NULL) {
516           break; /* no more sparks in the pool */
517         } else {
518           /* I'd prefer this to be done in activateSpark -- HWL */
519           /* tricky - it needs to hold the scheduler lock and
520            * not try to re-acquire it -- SDM */
521           createSparkThread(spark);       
522           IF_DEBUG(scheduler,
523                    sched_belch("==^^ turning spark of closure %p into a thread",
524                                (StgClosure *)spark));
525         }
526       }
527       /* We need to wake up the other tasks if we just created some
528        * work for them.
529        */
530       if (n_free_capabilities - n > 1) {
531           pthread_cond_signal(&thread_ready_cond);
532       }
533     }
534 #endif // SMP
535
536     /* check for signals each time around the scheduler */
537 #ifndef mingw32_TARGET_OS
538     if (signals_pending()) {
539       startSignalHandlers();
540     }
541 #endif
542
543     /* Check whether any waiting threads need to be woken up.  If the
544      * run queue is empty, and there are no other tasks running, we
545      * can wait indefinitely for something to happen.
546      * ToDo: what if another client comes along & requests another
547      * main thread?
548      */
549     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
550       awaitEvent(
551            (run_queue_hd == END_TSO_QUEUE)
552 #ifdef SMP
553         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
554 #endif
555         );
556     }
557     /* we can be interrupted while waiting for I/O... */
558     if (interrupted) continue;
559
560     /* 
561      * Detect deadlock: when we have no threads to run, there are no
562      * threads waiting on I/O or sleeping, and all the other tasks are
563      * waiting for work, we must have a deadlock of some description.
564      *
565      * We first try to find threads blocked on themselves (ie. black
566      * holes), and generate NonTermination exceptions where necessary.
567      *
568      * If no threads are black holed, we have a deadlock situation, so
569      * inform all the main threads.
570      */
571 #ifndef PAR
572     if (blocked_queue_hd == END_TSO_QUEUE
573         && run_queue_hd == END_TSO_QUEUE
574         && sleeping_queue == END_TSO_QUEUE
575 #ifdef SMP
576         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
577 #endif
578         )
579     {
580         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
581         GarbageCollect(GetRoots,rtsTrue);
582         if (blocked_queue_hd == END_TSO_QUEUE
583             && run_queue_hd == END_TSO_QUEUE
584             && sleeping_queue == END_TSO_QUEUE) {
585
586             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
587             detectBlackHoles();
588
589             // No black holes, so probably a real deadlock.  Send the
590             // current main thread the Deadlock exception (or in the SMP
591             // build, send *all* main threads the deadlock exception,
592             // since none of them can make progress).
593             if (run_queue_hd == END_TSO_QUEUE) {
594                 StgMainThread *m;
595 #ifdef SMP
596                 for (m = main_threads; m != NULL; m = m->link) {
597                     switch (m->tso->why_blocked) {
598                     case BlockedOnBlackHole:
599                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
600                         break;
601                     case BlockedOnException:
602                     case BlockedOnMVar:
603                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
604                         break;
605                     default:
606                         barf("deadlock: main thread blocked in a strange way");
607                     }
608                 }
609 #else
610                 m = main_threads;
611                 switch (m->tso->why_blocked) {
612                 case BlockedOnBlackHole:
613                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
614                     break;
615                 case BlockedOnException:
616                 case BlockedOnMVar:
617                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
618                     break;
619                 default:
620                     barf("deadlock: main thread blocked in a strange way");
621                 }
622 #endif
623             }
624             ASSERT( run_queue_hd != END_TSO_QUEUE );
625         }
626     }
627 #elif defined(PAR)
628     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
629 #endif
630
631 #ifdef SMP
632     /* If there's a GC pending, don't do anything until it has
633      * completed.
634      */
635     if (ready_to_gc) {
636       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
637       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
638     }
639     
640     /* block until we've got a thread on the run queue and a free
641      * capability.
642      */
643     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
644       IF_DEBUG(scheduler, sched_belch("waiting for work"));
645       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
646       IF_DEBUG(scheduler, sched_belch("work now available"));
647     }
648 #endif
649
650 #if defined(GRAN)
651
652     if (RtsFlags.GranFlags.Light)
653       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
654
655     /* adjust time based on time-stamp */
656     if (event->time > CurrentTime[CurrentProc] &&
657         event->evttype != ContinueThread)
658       CurrentTime[CurrentProc] = event->time;
659     
660     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
661     if (!RtsFlags.GranFlags.Light)
662       handleIdlePEs();
663
664     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
665
666     /* main event dispatcher in GranSim */
667     switch (event->evttype) {
668       /* Should just be continuing execution */
669     case ContinueThread:
670       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
671       /* ToDo: check assertion
672       ASSERT(run_queue_hd != (StgTSO*)NULL &&
673              run_queue_hd != END_TSO_QUEUE);
674       */
675       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
676       if (!RtsFlags.GranFlags.DoAsyncFetch &&
677           procStatus[CurrentProc]==Fetching) {
678         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
679               CurrentTSO->id, CurrentTSO, CurrentProc);
680         goto next_thread;
681       } 
682       /* Ignore ContinueThreads for completed threads */
683       if (CurrentTSO->what_next == ThreadComplete) {
684         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
685               CurrentTSO->id, CurrentTSO, CurrentProc);
686         goto next_thread;
687       } 
688       /* Ignore ContinueThreads for threads that are being migrated */
689       if (PROCS(CurrentTSO)==Nowhere) { 
690         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
691               CurrentTSO->id, CurrentTSO, CurrentProc);
692         goto next_thread;
693       }
694       /* The thread should be at the beginning of the run queue */
695       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
696         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
697               CurrentTSO->id, CurrentTSO, CurrentProc);
698         break; // run the thread anyway
699       }
700       /*
701       new_event(proc, proc, CurrentTime[proc],
702                 FindWork,
703                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
704       goto next_thread; 
705       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
706       break; // now actually run the thread; DaH Qu'vam yImuHbej 
707
708     case FetchNode:
709       do_the_fetchnode(event);
710       goto next_thread;             /* handle next event in event queue  */
711       
712     case GlobalBlock:
713       do_the_globalblock(event);
714       goto next_thread;             /* handle next event in event queue  */
715       
716     case FetchReply:
717       do_the_fetchreply(event);
718       goto next_thread;             /* handle next event in event queue  */
719       
720     case UnblockThread:   /* Move from the blocked queue to the tail of */
721       do_the_unblock(event);
722       goto next_thread;             /* handle next event in event queue  */
723       
724     case ResumeThread:  /* Move from the blocked queue to the tail of */
725       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
726       event->tso->gran.blocktime += 
727         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
728       do_the_startthread(event);
729       goto next_thread;             /* handle next event in event queue  */
730       
731     case StartThread:
732       do_the_startthread(event);
733       goto next_thread;             /* handle next event in event queue  */
734       
735     case MoveThread:
736       do_the_movethread(event);
737       goto next_thread;             /* handle next event in event queue  */
738       
739     case MoveSpark:
740       do_the_movespark(event);
741       goto next_thread;             /* handle next event in event queue  */
742       
743     case FindWork:
744       do_the_findwork(event);
745       goto next_thread;             /* handle next event in event queue  */
746       
747     default:
748       barf("Illegal event type %u\n", event->evttype);
749     }  /* switch */
750     
751     /* This point was scheduler_loop in the old RTS */
752
753     IF_DEBUG(gran, belch("GRAN: after main switch"));
754
755     TimeOfLastEvent = CurrentTime[CurrentProc];
756     TimeOfNextEvent = get_time_of_next_event();
757     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
758     // CurrentTSO = ThreadQueueHd;
759
760     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
761                          TimeOfNextEvent));
762
763     if (RtsFlags.GranFlags.Light) 
764       GranSimLight_leave_system(event, &ActiveTSO); 
765
766     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
767
768     IF_DEBUG(gran, 
769              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
770
771     /* in a GranSim setup the TSO stays on the run queue */
772     t = CurrentTSO;
773     /* Take a thread from the run queue. */
774     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
775
776     IF_DEBUG(gran, 
777              fprintf(stderr, "GRAN: About to run current thread, which is\n");
778              G_TSO(t,5));
779
780     context_switch = 0; // turned on via GranYield, checking events and time slice
781
782     IF_DEBUG(gran, 
783              DumpGranEvent(GR_SCHEDULE, t));
784
785     procStatus[CurrentProc] = Busy;
786
787 #elif defined(PAR)
788     if (PendingFetches != END_BF_QUEUE) {
789         processFetches();
790     }
791
792     /* ToDo: phps merge with spark activation above */
793     /* check whether we have local work and send requests if we have none */
794     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
795       /* :-[  no local threads => look out for local sparks */
796       /* the spark pool for the current PE */
797       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
798       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
799           pool->hd < pool->tl) {
800         /* 
801          * ToDo: add GC code check that we really have enough heap afterwards!!
802          * Old comment:
803          * If we're here (no runnable threads) and we have pending
804          * sparks, we must have a space problem.  Get enough space
805          * to turn one of those pending sparks into a
806          * thread... 
807          */
808
809         spark = findSpark(rtsFalse);                /* get a spark */
810         if (spark != (rtsSpark) NULL) {
811           tso = activateSpark(spark);       /* turn the spark into a thread */
812           IF_PAR_DEBUG(schedule,
813                        belch("==== schedule: Created TSO %d (%p); %d threads active",
814                              tso->id, tso, advisory_thread_count));
815
816           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
817             belch("==^^ failed to activate spark");
818             goto next_thread;
819           }               /* otherwise fall through & pick-up new tso */
820         } else {
821           IF_PAR_DEBUG(verbose,
822                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
823                              spark_queue_len(pool)));
824           goto next_thread;
825         }
826       }
827
828       /* If we still have no work we need to send a FISH to get a spark
829          from another PE 
830       */
831       if (EMPTY_RUN_QUEUE()) {
832       /* =8-[  no local sparks => look for work on other PEs */
833         /*
834          * We really have absolutely no work.  Send out a fish
835          * (there may be some out there already), and wait for
836          * something to arrive.  We clearly can't run any threads
837          * until a SCHEDULE or RESUME arrives, and so that's what
838          * we're hoping to see.  (Of course, we still have to
839          * respond to other types of messages.)
840          */
841         TIME now = msTime() /*CURRENT_TIME*/;
842         IF_PAR_DEBUG(verbose, 
843                      belch("--  now=%ld", now));
844         IF_PAR_DEBUG(verbose,
845                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
846                          (last_fish_arrived_at!=0 &&
847                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
848                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
849                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
850                              last_fish_arrived_at,
851                              RtsFlags.ParFlags.fishDelay, now);
852                      });
853         
854         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
855             (last_fish_arrived_at==0 ||
856              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
857           /* outstandingFishes is set in sendFish, processFish;
858              avoid flooding system with fishes via delay */
859           pe = choosePE();
860           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
861                    NEW_FISH_HUNGER);
862
863           // Global statistics: count no. of fishes
864           if (RtsFlags.ParFlags.ParStats.Global &&
865               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
866             globalParStats.tot_fish_mess++;
867           }
868         }
869       
870         receivedFinish = processMessages();
871         goto next_thread;
872       }
873     } else if (PacketsWaiting()) {  /* Look for incoming messages */
874       receivedFinish = processMessages();
875     }
876
877     /* Now we are sure that we have some work available */
878     ASSERT(run_queue_hd != END_TSO_QUEUE);
879
880     /* Take a thread from the run queue, if we have work */
881     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
882     IF_DEBUG(sanity,checkTSO(t));
883
884     /* ToDo: write something to the log-file
885     if (RTSflags.ParFlags.granSimStats && !sameThread)
886         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
887
888     CurrentTSO = t;
889     */
890     /* the spark pool for the current PE */
891     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
892
893     IF_DEBUG(scheduler, 
894              belch("--=^ %d threads, %d sparks on [%#x]", 
895                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
896
897 #if 1
898     if (0 && RtsFlags.ParFlags.ParStats.Full && 
899         t && LastTSO && t->id != LastTSO->id && 
900         LastTSO->why_blocked == NotBlocked && 
901         LastTSO->what_next != ThreadComplete) {
902       // if previously scheduled TSO not blocked we have to record the context switch
903       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
904                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
905     }
906
907     if (RtsFlags.ParFlags.ParStats.Full && 
908         (emitSchedule /* forced emit */ ||
909         (t && LastTSO && t->id != LastTSO->id))) {
910       /* 
911          we are running a different TSO, so write a schedule event to log file
912          NB: If we use fair scheduling we also have to write  a deschedule 
913              event for LastTSO; with unfair scheduling we know that the
914              previous tso has blocked whenever we switch to another tso, so
915              we don't need it in GUM for now
916       */
917       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
918                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
919       emitSchedule = rtsFalse;
920     }
921      
922 #endif
923 #else /* !GRAN && !PAR */
924   
925     /* grab a thread from the run queue
926      */
927     ASSERT(run_queue_hd != END_TSO_QUEUE);
928     t = POP_RUN_QUEUE();
929
930     // Sanity check the thread we're about to run.  This can be
931     // expensive if there is lots of thread switching going on...
932     IF_DEBUG(sanity,checkTSO(t));
933
934 #endif
935     
936     /* grab a capability
937      */
938 #ifdef SMP
939     cap = free_capabilities;
940     free_capabilities = cap->link;
941     n_free_capabilities--;
942 #else
943     cap = &MainCapability;
944 #endif
945
946     cap->r.rCurrentTSO = t;
947     
948     /* context switches are now initiated by the timer signal, unless
949      * the user specified "context switch as often as possible", with
950      * +RTS -C0
951      */
952     if (
953 #ifdef PROFILING
954         RtsFlags.ProfFlags.profileInterval == 0 ||
955 #endif
956         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
957          && (run_queue_hd != END_TSO_QUEUE
958              || blocked_queue_hd != END_TSO_QUEUE
959              || sleeping_queue != END_TSO_QUEUE)))
960         context_switch = 1;
961     else
962         context_switch = 0;
963
964     RELEASE_LOCK(&sched_mutex);
965
966     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
967                               t->id, t, whatNext_strs[t->what_next]));
968
969 #ifdef PROFILING
970     startHeapProfTimer();
971 #endif
972
973     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
974     /* Run the current thread 
975      */
976     switch (cap->r.rCurrentTSO->what_next) {
977     case ThreadKilled:
978     case ThreadComplete:
979         /* Thread already finished, return to scheduler. */
980         ret = ThreadFinished;
981         break;
982     case ThreadEnterGHC:
983         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
984         break;
985     case ThreadRunGHC:
986         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
987         break;
988     case ThreadEnterInterp:
989         ret = interpretBCO(cap);
990         break;
991     default:
992       barf("schedule: invalid what_next field");
993     }
994     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
995     
996     /* Costs for the scheduler are assigned to CCS_SYSTEM */
997 #ifdef PROFILING
998     stopHeapProfTimer();
999     CCCS = CCS_SYSTEM;
1000 #endif
1001     
1002     ACQUIRE_LOCK(&sched_mutex);
1003
1004 #ifdef SMP
1005     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
1006 #elif !defined(GRAN) && !defined(PAR)
1007     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1008 #endif
1009     t = cap->r.rCurrentTSO;
1010     
1011 #if defined(PAR)
1012     /* HACK 675: if the last thread didn't yield, make sure to print a 
1013        SCHEDULE event to the log file when StgRunning the next thread, even
1014        if it is the same one as before */
1015     LastTSO = t; 
1016     TimeOfLastYield = CURRENT_TIME;
1017 #endif
1018
1019     switch (ret) {
1020     case HeapOverflow:
1021 #if defined(GRAN)
1022       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1023       globalGranStats.tot_heapover++;
1024 #elif defined(PAR)
1025       globalParStats.tot_heapover++;
1026 #endif
1027
1028       // did the task ask for a large block?
1029       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1030           // if so, get one and push it on the front of the nursery.
1031           bdescr *bd;
1032           nat blocks;
1033           
1034           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1035
1036           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1037                                    t->id, t,
1038                                    whatNext_strs[t->what_next], blocks));
1039
1040           // don't do this if it would push us over the
1041           // alloc_blocks_lim limit; we'll GC first.
1042           if (alloc_blocks + blocks < alloc_blocks_lim) {
1043
1044               alloc_blocks += blocks;
1045               bd = allocGroup( blocks );
1046
1047               // link the new group into the list
1048               bd->link = cap->r.rCurrentNursery;
1049               bd->u.back = cap->r.rCurrentNursery->u.back;
1050               if (cap->r.rCurrentNursery->u.back != NULL) {
1051                   cap->r.rCurrentNursery->u.back->link = bd;
1052               } else {
1053                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1054                          g0s0->blocks == cap->r.rNursery);
1055                   cap->r.rNursery = g0s0->blocks = bd;
1056               }           
1057               cap->r.rCurrentNursery->u.back = bd;
1058
1059               // initialise it as a nursery block
1060               bd->step = g0s0;
1061               bd->gen_no = 0;
1062               bd->flags = 0;
1063               bd->free = bd->start;
1064
1065               // don't forget to update the block count in g0s0.
1066               g0s0->n_blocks += blocks;
1067               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1068
1069               // now update the nursery to point to the new block
1070               cap->r.rCurrentNursery = bd;
1071
1072               // we might be unlucky and have another thread get on the
1073               // run queue before us and steal the large block, but in that
1074               // case the thread will just end up requesting another large
1075               // block.
1076               PUSH_ON_RUN_QUEUE(t);
1077               break;
1078           }
1079       }
1080
1081       /* make all the running tasks block on a condition variable,
1082        * maybe set context_switch and wait till they all pile in,
1083        * then have them wait on a GC condition variable.
1084        */
1085       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1086                                t->id, t, whatNext_strs[t->what_next]));
1087       threadPaused(t);
1088 #if defined(GRAN)
1089       ASSERT(!is_on_queue(t,CurrentProc));
1090 #elif defined(PAR)
1091       /* Currently we emit a DESCHEDULE event before GC in GUM.
1092          ToDo: either add separate event to distinguish SYSTEM time from rest
1093                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1094       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1095         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1096                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1097         emitSchedule = rtsTrue;
1098       }
1099 #endif
1100       
1101       ready_to_gc = rtsTrue;
1102       context_switch = 1;               /* stop other threads ASAP */
1103       PUSH_ON_RUN_QUEUE(t);
1104       /* actual GC is done at the end of the while loop */
1105       break;
1106       
1107     case StackOverflow:
1108 #if defined(GRAN)
1109       IF_DEBUG(gran, 
1110                DumpGranEvent(GR_DESCHEDULE, t));
1111       globalGranStats.tot_stackover++;
1112 #elif defined(PAR)
1113       // IF_DEBUG(par, 
1114       // DumpGranEvent(GR_DESCHEDULE, t);
1115       globalParStats.tot_stackover++;
1116 #endif
1117       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1118                                t->id, t, whatNext_strs[t->what_next]));
1119       /* just adjust the stack for this thread, then pop it back
1120        * on the run queue.
1121        */
1122       threadPaused(t);
1123       { 
1124         StgMainThread *m;
1125         /* enlarge the stack */
1126         StgTSO *new_t = threadStackOverflow(t);
1127         
1128         /* This TSO has moved, so update any pointers to it from the
1129          * main thread stack.  It better not be on any other queues...
1130          * (it shouldn't be).
1131          */
1132         for (m = main_threads; m != NULL; m = m->link) {
1133           if (m->tso == t) {
1134             m->tso = new_t;
1135           }
1136         }
1137         threadPaused(new_t);
1138         PUSH_ON_RUN_QUEUE(new_t);
1139       }
1140       break;
1141
1142     case ThreadYielding:
1143 #if defined(GRAN)
1144       IF_DEBUG(gran, 
1145                DumpGranEvent(GR_DESCHEDULE, t));
1146       globalGranStats.tot_yields++;
1147 #elif defined(PAR)
1148       // IF_DEBUG(par, 
1149       // DumpGranEvent(GR_DESCHEDULE, t);
1150       globalParStats.tot_yields++;
1151 #endif
1152       /* put the thread back on the run queue.  Then, if we're ready to
1153        * GC, check whether this is the last task to stop.  If so, wake
1154        * up the GC thread.  getThread will block during a GC until the
1155        * GC is finished.
1156        */
1157       IF_DEBUG(scheduler,
1158                if (t->what_next == ThreadEnterInterp) {
1159                    /* ToDo: or maybe a timer expired when we were in Hugs?
1160                     * or maybe someone hit ctrl-C
1161                     */
1162                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1163                          t->id, t, whatNext_strs[t->what_next]);
1164                } else {
1165                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1166                          t->id, t, whatNext_strs[t->what_next]);
1167                }
1168                );
1169
1170       threadPaused(t);
1171
1172       IF_DEBUG(sanity,
1173                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1174                checkTSO(t));
1175       ASSERT(t->link == END_TSO_QUEUE);
1176 #if defined(GRAN)
1177       ASSERT(!is_on_queue(t,CurrentProc));
1178
1179       IF_DEBUG(sanity,
1180                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1181                checkThreadQsSanity(rtsTrue));
1182 #endif
1183 #if defined(PAR)
1184       if (RtsFlags.ParFlags.doFairScheduling) { 
1185         /* this does round-robin scheduling; good for concurrency */
1186         APPEND_TO_RUN_QUEUE(t);
1187       } else {
1188         /* this does unfair scheduling; good for parallelism */
1189         PUSH_ON_RUN_QUEUE(t);
1190       }
1191 #else
1192       /* this does round-robin scheduling; good for concurrency */
1193       APPEND_TO_RUN_QUEUE(t);
1194 #endif
1195 #if defined(GRAN)
1196       /* add a ContinueThread event to actually process the thread */
1197       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1198                 ContinueThread,
1199                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1200       IF_GRAN_DEBUG(bq, 
1201                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1202                G_EVENTQ(0);
1203                G_CURR_THREADQ(0));
1204 #endif /* GRAN */
1205       break;
1206       
1207     case ThreadBlocked:
1208 #if defined(GRAN)
1209       IF_DEBUG(scheduler,
1210                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1211                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1212                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1213
1214       // ??? needed; should emit block before
1215       IF_DEBUG(gran, 
1216                DumpGranEvent(GR_DESCHEDULE, t)); 
1217       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1218       /*
1219         ngoq Dogh!
1220       ASSERT(procStatus[CurrentProc]==Busy || 
1221               ((procStatus[CurrentProc]==Fetching) && 
1222               (t->block_info.closure!=(StgClosure*)NULL)));
1223       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1224           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1225             procStatus[CurrentProc]==Fetching)) 
1226         procStatus[CurrentProc] = Idle;
1227       */
1228 #elif defined(PAR)
1229       IF_DEBUG(scheduler,
1230                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1231                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1232       IF_PAR_DEBUG(bq,
1233
1234                    if (t->block_info.closure!=(StgClosure*)NULL) 
1235                      print_bq(t->block_info.closure));
1236
1237       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1238       blockThread(t);
1239
1240       /* whatever we schedule next, we must log that schedule */
1241       emitSchedule = rtsTrue;
1242
1243 #else /* !GRAN */
1244       /* don't need to do anything.  Either the thread is blocked on
1245        * I/O, in which case we'll have called addToBlockedQueue
1246        * previously, or it's blocked on an MVar or Blackhole, in which
1247        * case it'll be on the relevant queue already.
1248        */
1249       IF_DEBUG(scheduler,
1250                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1251                printThreadBlockage(t);
1252                fprintf(stderr, "\n"));
1253
1254       /* Only for dumping event to log file 
1255          ToDo: do I need this in GranSim, too?
1256       blockThread(t);
1257       */
1258 #endif
1259       threadPaused(t);
1260       break;
1261       
1262     case ThreadFinished:
1263       /* Need to check whether this was a main thread, and if so, signal
1264        * the task that started it with the return value.  If we have no
1265        * more main threads, we probably need to stop all the tasks until
1266        * we get a new one.
1267        */
1268       /* We also end up here if the thread kills itself with an
1269        * uncaught exception, see Exception.hc.
1270        */
1271       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1272 #if defined(GRAN)
1273       endThread(t, CurrentProc); // clean-up the thread
1274 #elif defined(PAR)
1275       /* For now all are advisory -- HWL */
1276       //if(t->priority==AdvisoryPriority) ??
1277       advisory_thread_count--;
1278       
1279 # ifdef DIST
1280       if(t->dist.priority==RevalPriority)
1281         FinishReval(t);
1282 # endif
1283       
1284       if (RtsFlags.ParFlags.ParStats.Full &&
1285           !RtsFlags.ParFlags.ParStats.Suppressed) 
1286         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1287 #endif
1288       break;
1289       
1290     default:
1291       barf("schedule: invalid thread return code %d", (int)ret);
1292     }
1293     
1294 #ifdef SMP
1295     cap->link = free_capabilities;
1296     free_capabilities = cap;
1297     n_free_capabilities++;
1298 #endif
1299
1300 #ifdef PROFILING
1301     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1302         GarbageCollect(GetRoots, rtsTrue);
1303         heapCensus();
1304         performHeapProfile = rtsFalse;
1305         ready_to_gc = rtsFalse; // we already GC'd
1306     }
1307 #endif
1308
1309 #ifdef SMP
1310     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1311 #else
1312     if (ready_to_gc) 
1313 #endif
1314       {
1315       /* everybody back, start the GC.
1316        * Could do it in this thread, or signal a condition var
1317        * to do it in another thread.  Either way, we need to
1318        * broadcast on gc_pending_cond afterward.
1319        */
1320 #ifdef SMP
1321       IF_DEBUG(scheduler,sched_belch("doing GC"));
1322 #endif
1323       GarbageCollect(GetRoots,rtsFalse);
1324       ready_to_gc = rtsFalse;
1325 #ifdef SMP
1326       pthread_cond_broadcast(&gc_pending_cond);
1327 #endif
1328 #if defined(GRAN)
1329       /* add a ContinueThread event to continue execution of current thread */
1330       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1331                 ContinueThread,
1332                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1333       IF_GRAN_DEBUG(bq, 
1334                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1335                G_EVENTQ(0);
1336                G_CURR_THREADQ(0));
1337 #endif /* GRAN */
1338     }
1339
1340 #if defined(GRAN)
1341   next_thread:
1342     IF_GRAN_DEBUG(unused,
1343                   print_eventq(EventHd));
1344
1345     event = get_next_event();
1346 #elif defined(PAR)
1347   next_thread:
1348     /* ToDo: wait for next message to arrive rather than busy wait */
1349 #endif /* GRAN */
1350
1351   } /* end of while(1) */
1352
1353   IF_PAR_DEBUG(verbose,
1354                belch("== Leaving schedule() after having received Finish"));
1355 }
1356
1357 /* ---------------------------------------------------------------------------
1358  * deleteAllThreads():  kill all the live threads.
1359  *
1360  * This is used when we catch a user interrupt (^C), before performing
1361  * any necessary cleanups and running finalizers.
1362  * ------------------------------------------------------------------------- */
1363    
1364 void deleteAllThreads ( void )
1365 {
1366   StgTSO* t;
1367   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1368   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1369       deleteThread(t);
1370   }
1371   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1372       deleteThread(t);
1373   }
1374   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1375       deleteThread(t);
1376   }
1377   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1378   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1379   sleeping_queue = END_TSO_QUEUE;
1380 }
1381
1382 /* startThread and  insertThread are now in GranSim.c -- HWL */
1383
1384 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1385 //@subsection Suspend and Resume
1386
1387 /* ---------------------------------------------------------------------------
1388  * Suspending & resuming Haskell threads.
1389  * 
1390  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1391  * its capability before calling the C function.  This allows another
1392  * task to pick up the capability and carry on running Haskell
1393  * threads.  It also means that if the C call blocks, it won't lock
1394  * the whole system.
1395  *
1396  * The Haskell thread making the C call is put to sleep for the
1397  * duration of the call, on the susepended_ccalling_threads queue.  We
1398  * give out a token to the task, which it can use to resume the thread
1399  * on return from the C function.
1400  * ------------------------------------------------------------------------- */
1401    
1402 StgInt
1403 suspendThread( StgRegTable *reg )
1404 {
1405   nat tok;
1406   Capability *cap;
1407
1408   // assume that *reg is a pointer to the StgRegTable part of a Capability
1409   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1410
1411   ACQUIRE_LOCK(&sched_mutex);
1412
1413   IF_DEBUG(scheduler,
1414            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1415
1416   threadPaused(cap->r.rCurrentTSO);
1417   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1418   suspended_ccalling_threads = cap->r.rCurrentTSO;
1419
1420   /* Use the thread ID as the token; it should be unique */
1421   tok = cap->r.rCurrentTSO->id;
1422
1423 #ifdef SMP
1424   cap->link = free_capabilities;
1425   free_capabilities = cap;
1426   n_free_capabilities++;
1427 #endif
1428
1429   RELEASE_LOCK(&sched_mutex);
1430   return tok; 
1431 }
1432
1433 StgRegTable *
1434 resumeThread( StgInt tok )
1435 {
1436   StgTSO *tso, **prev;
1437   Capability *cap;
1438
1439   ACQUIRE_LOCK(&sched_mutex);
1440
1441   prev = &suspended_ccalling_threads;
1442   for (tso = suspended_ccalling_threads; 
1443        tso != END_TSO_QUEUE; 
1444        prev = &tso->link, tso = tso->link) {
1445     if (tso->id == (StgThreadID)tok) {
1446       *prev = tso->link;
1447       break;
1448     }
1449   }
1450   if (tso == END_TSO_QUEUE) {
1451     barf("resumeThread: thread not found");
1452   }
1453   tso->link = END_TSO_QUEUE;
1454
1455 #ifdef SMP
1456   while (free_capabilities == NULL) {
1457     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1458     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1459     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1460   }
1461   cap = free_capabilities;
1462   free_capabilities = cap->link;
1463   n_free_capabilities--;
1464 #else  
1465   cap = &MainCapability;
1466 #endif
1467
1468   cap->r.rCurrentTSO = tso;
1469
1470   RELEASE_LOCK(&sched_mutex);
1471   return &cap->r;
1472 }
1473
1474
1475 /* ---------------------------------------------------------------------------
1476  * Static functions
1477  * ------------------------------------------------------------------------ */
1478 static void unblockThread(StgTSO *tso);
1479
1480 /* ---------------------------------------------------------------------------
1481  * Comparing Thread ids.
1482  *
1483  * This is used from STG land in the implementation of the
1484  * instances of Eq/Ord for ThreadIds.
1485  * ------------------------------------------------------------------------ */
1486
1487 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1488
1489   StgThreadID id1 = tso1->id; 
1490   StgThreadID id2 = tso2->id;
1491  
1492   if (id1 < id2) return (-1);
1493   if (id1 > id2) return 1;
1494   return 0;
1495 }
1496
1497 /* ---------------------------------------------------------------------------
1498  * Fetching the ThreadID from an StgTSO.
1499  *
1500  * This is used in the implementation of Show for ThreadIds.
1501  * ------------------------------------------------------------------------ */
1502 int rts_getThreadId(const StgTSO *tso) 
1503 {
1504   return tso->id;
1505 }
1506
1507 /* ---------------------------------------------------------------------------
1508    Create a new thread.
1509
1510    The new thread starts with the given stack size.  Before the
1511    scheduler can run, however, this thread needs to have a closure
1512    (and possibly some arguments) pushed on its stack.  See
1513    pushClosure() in Schedule.h.
1514
1515    createGenThread() and createIOThread() (in SchedAPI.h) are
1516    convenient packaged versions of this function.
1517
1518    currently pri (priority) is only used in a GRAN setup -- HWL
1519    ------------------------------------------------------------------------ */
1520 //@cindex createThread
1521 #if defined(GRAN)
1522 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1523 StgTSO *
1524 createThread(nat stack_size, StgInt pri)
1525 {
1526   return createThread_(stack_size, rtsFalse, pri);
1527 }
1528
1529 static StgTSO *
1530 createThread_(nat size, rtsBool have_lock, StgInt pri)
1531 {
1532 #else
1533 StgTSO *
1534 createThread(nat stack_size)
1535 {
1536   return createThread_(stack_size, rtsFalse);
1537 }
1538
1539 static StgTSO *
1540 createThread_(nat size, rtsBool have_lock)
1541 {
1542 #endif
1543
1544     StgTSO *tso;
1545     nat stack_size;
1546
1547     /* First check whether we should create a thread at all */
1548 #if defined(PAR)
1549   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1550   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1551     threadsIgnored++;
1552     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1553           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1554     return END_TSO_QUEUE;
1555   }
1556   threadsCreated++;
1557 #endif
1558
1559 #if defined(GRAN)
1560   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1561 #endif
1562
1563   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1564
1565   /* catch ridiculously small stack sizes */
1566   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1567     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1568   }
1569
1570   stack_size = size - TSO_STRUCT_SIZEW;
1571
1572   tso = (StgTSO *)allocate(size);
1573   TICK_ALLOC_TSO(stack_size, 0);
1574
1575   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1576 #if defined(GRAN)
1577   SET_GRAN_HDR(tso, ThisPE);
1578 #endif
1579   tso->what_next     = ThreadEnterGHC;
1580
1581   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1582    * protect the increment operation on next_thread_id.
1583    * In future, we could use an atomic increment instead.
1584    */
1585   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1586   tso->id = next_thread_id++; 
1587   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1588
1589   tso->why_blocked  = NotBlocked;
1590   tso->blocked_exceptions = NULL;
1591
1592   tso->stack_size   = stack_size;
1593   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1594                               - TSO_STRUCT_SIZEW;
1595   tso->sp           = (P_)&(tso->stack) + stack_size;
1596
1597 #ifdef PROFILING
1598   tso->prof.CCCS = CCS_MAIN;
1599 #endif
1600
1601   /* put a stop frame on the stack */
1602   tso->sp -= sizeofW(StgStopFrame);
1603   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1604   tso->su = (StgUpdateFrame*)tso->sp;
1605
1606   // ToDo: check this
1607 #if defined(GRAN)
1608   tso->link = END_TSO_QUEUE;
1609   /* uses more flexible routine in GranSim */
1610   insertThread(tso, CurrentProc);
1611 #else
1612   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1613    * from its creation
1614    */
1615 #endif
1616
1617 #if defined(GRAN) 
1618   if (RtsFlags.GranFlags.GranSimStats.Full) 
1619     DumpGranEvent(GR_START,tso);
1620 #elif defined(PAR)
1621   if (RtsFlags.ParFlags.ParStats.Full) 
1622     DumpGranEvent(GR_STARTQ,tso);
1623   /* HACk to avoid SCHEDULE 
1624      LastTSO = tso; */
1625 #endif
1626
1627   /* Link the new thread on the global thread list.
1628    */
1629   tso->global_link = all_threads;
1630   all_threads = tso;
1631
1632 #if defined(DIST)
1633   tso->dist.priority = MandatoryPriority; //by default that is...
1634 #endif
1635
1636 #if defined(GRAN)
1637   tso->gran.pri = pri;
1638 # if defined(DEBUG)
1639   tso->gran.magic = TSO_MAGIC; // debugging only
1640 # endif
1641   tso->gran.sparkname   = 0;
1642   tso->gran.startedat   = CURRENT_TIME; 
1643   tso->gran.exported    = 0;
1644   tso->gran.basicblocks = 0;
1645   tso->gran.allocs      = 0;
1646   tso->gran.exectime    = 0;
1647   tso->gran.fetchtime   = 0;
1648   tso->gran.fetchcount  = 0;
1649   tso->gran.blocktime   = 0;
1650   tso->gran.blockcount  = 0;
1651   tso->gran.blockedat   = 0;
1652   tso->gran.globalsparks = 0;
1653   tso->gran.localsparks  = 0;
1654   if (RtsFlags.GranFlags.Light)
1655     tso->gran.clock  = Now; /* local clock */
1656   else
1657     tso->gran.clock  = 0;
1658
1659   IF_DEBUG(gran,printTSO(tso));
1660 #elif defined(PAR)
1661 # if defined(DEBUG)
1662   tso->par.magic = TSO_MAGIC; // debugging only
1663 # endif
1664   tso->par.sparkname   = 0;
1665   tso->par.startedat   = CURRENT_TIME; 
1666   tso->par.exported    = 0;
1667   tso->par.basicblocks = 0;
1668   tso->par.allocs      = 0;
1669   tso->par.exectime    = 0;
1670   tso->par.fetchtime   = 0;
1671   tso->par.fetchcount  = 0;
1672   tso->par.blocktime   = 0;
1673   tso->par.blockcount  = 0;
1674   tso->par.blockedat   = 0;
1675   tso->par.globalsparks = 0;
1676   tso->par.localsparks  = 0;
1677 #endif
1678
1679 #if defined(GRAN)
1680   globalGranStats.tot_threads_created++;
1681   globalGranStats.threads_created_on_PE[CurrentProc]++;
1682   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1683   globalGranStats.tot_sq_probes++;
1684 #elif defined(PAR)
1685   // collect parallel global statistics (currently done together with GC stats)
1686   if (RtsFlags.ParFlags.ParStats.Global &&
1687       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1688     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1689     globalParStats.tot_threads_created++;
1690   }
1691 #endif 
1692
1693 #if defined(GRAN)
1694   IF_GRAN_DEBUG(pri,
1695                 belch("==__ schedule: Created TSO %d (%p);",
1696                       CurrentProc, tso, tso->id));
1697 #elif defined(PAR)
1698     IF_PAR_DEBUG(verbose,
1699                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1700                        tso->id, tso, advisory_thread_count));
1701 #else
1702   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1703                                  tso->id, tso->stack_size));
1704 #endif    
1705   return tso;
1706 }
1707
1708 #if defined(PAR)
1709 /* RFP:
1710    all parallel thread creation calls should fall through the following routine.
1711 */
1712 StgTSO *
1713 createSparkThread(rtsSpark spark) 
1714 { StgTSO *tso;
1715   ASSERT(spark != (rtsSpark)NULL);
1716   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1717   { threadsIgnored++;
1718     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1719           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1720     return END_TSO_QUEUE;
1721   }
1722   else
1723   { threadsCreated++;
1724     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1725     if (tso==END_TSO_QUEUE)     
1726       barf("createSparkThread: Cannot create TSO");
1727 #if defined(DIST)
1728     tso->priority = AdvisoryPriority;
1729 #endif
1730     pushClosure(tso,spark);
1731     PUSH_ON_RUN_QUEUE(tso);
1732     advisory_thread_count++;    
1733   }
1734   return tso;
1735 }
1736 #endif
1737
1738 /*
1739   Turn a spark into a thread.
1740   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1741 */
1742 #if defined(PAR)
1743 //@cindex activateSpark
1744 StgTSO *
1745 activateSpark (rtsSpark spark) 
1746 {
1747   StgTSO *tso;
1748
1749   tso = createSparkThread(spark);
1750   if (RtsFlags.ParFlags.ParStats.Full) {   
1751     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1752     IF_PAR_DEBUG(verbose,
1753                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1754                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1755   }
1756   // ToDo: fwd info on local/global spark to thread -- HWL
1757   // tso->gran.exported =  spark->exported;
1758   // tso->gran.locked =   !spark->global;
1759   // tso->gran.sparkname = spark->name;
1760
1761   return tso;
1762 }
1763 #endif
1764
1765 /* ---------------------------------------------------------------------------
1766  * scheduleThread()
1767  *
1768  * scheduleThread puts a thread on the head of the runnable queue.
1769  * This will usually be done immediately after a thread is created.
1770  * The caller of scheduleThread must create the thread using e.g.
1771  * createThread and push an appropriate closure
1772  * on this thread's stack before the scheduler is invoked.
1773  * ------------------------------------------------------------------------ */
1774
1775 void
1776 scheduleThread(StgTSO *tso)
1777 {
1778   ACQUIRE_LOCK(&sched_mutex);
1779
1780   /* Put the new thread on the head of the runnable queue.  The caller
1781    * better push an appropriate closure on this thread's stack
1782    * beforehand.  In the SMP case, the thread may start running as
1783    * soon as we release the scheduler lock below.
1784    */
1785   PUSH_ON_RUN_QUEUE(tso);
1786   THREAD_RUNNABLE();
1787
1788 #if 0
1789   IF_DEBUG(scheduler,printTSO(tso));
1790 #endif
1791   RELEASE_LOCK(&sched_mutex);
1792 }
1793
1794 /* ---------------------------------------------------------------------------
1795  * startTasks()
1796  *
1797  * Start up Posix threads to run each of the scheduler tasks.
1798  * I believe the task ids are not needed in the system as defined.
1799  *  KH @ 25/10/99
1800  * ------------------------------------------------------------------------ */
1801
1802 #if defined(PAR) || defined(SMP)
1803 void
1804 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1805 {
1806   schedule();
1807 }
1808 #endif
1809
1810 /* ---------------------------------------------------------------------------
1811  * initScheduler()
1812  *
1813  * Initialise the scheduler.  This resets all the queues - if the
1814  * queues contained any threads, they'll be garbage collected at the
1815  * next pass.
1816  *
1817  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1818  * ------------------------------------------------------------------------ */
1819
1820 #ifdef SMP
1821 static void
1822 term_handler(int sig STG_UNUSED)
1823 {
1824   stat_workerStop();
1825   ACQUIRE_LOCK(&term_mutex);
1826   await_death--;
1827   RELEASE_LOCK(&term_mutex);
1828   pthread_exit(NULL);
1829 }
1830 #endif
1831
1832 static void
1833 initCapability( Capability *cap )
1834 {
1835     cap->f.stgChk0         = (F_)__stg_chk_0;
1836     cap->f.stgChk1         = (F_)__stg_chk_1;
1837     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
1838     cap->f.stgUpdatePAP    = (F_)__stg_update_PAP;
1839 }
1840
1841 void 
1842 initScheduler(void)
1843 {
1844 #if defined(GRAN)
1845   nat i;
1846
1847   for (i=0; i<=MAX_PROC; i++) {
1848     run_queue_hds[i]      = END_TSO_QUEUE;
1849     run_queue_tls[i]      = END_TSO_QUEUE;
1850     blocked_queue_hds[i]  = END_TSO_QUEUE;
1851     blocked_queue_tls[i]  = END_TSO_QUEUE;
1852     ccalling_threadss[i]  = END_TSO_QUEUE;
1853     sleeping_queue        = END_TSO_QUEUE;
1854   }
1855 #else
1856   run_queue_hd      = END_TSO_QUEUE;
1857   run_queue_tl      = END_TSO_QUEUE;
1858   blocked_queue_hd  = END_TSO_QUEUE;
1859   blocked_queue_tl  = END_TSO_QUEUE;
1860   sleeping_queue    = END_TSO_QUEUE;
1861 #endif 
1862
1863   suspended_ccalling_threads  = END_TSO_QUEUE;
1864
1865   main_threads = NULL;
1866   all_threads  = END_TSO_QUEUE;
1867
1868   context_switch = 0;
1869   interrupted    = 0;
1870
1871   RtsFlags.ConcFlags.ctxtSwitchTicks =
1872       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1873
1874   /* Install the SIGHUP handler */
1875 #ifdef SMP
1876   {
1877     struct sigaction action,oact;
1878
1879     action.sa_handler = term_handler;
1880     sigemptyset(&action.sa_mask);
1881     action.sa_flags = 0;
1882     if (sigaction(SIGTERM, &action, &oact) != 0) {
1883       barf("can't install TERM handler");
1884     }
1885   }
1886 #endif
1887
1888 #ifdef SMP
1889   /* Allocate N Capabilities */
1890   {
1891     nat i;
1892     Capability *cap, *prev;
1893     cap  = NULL;
1894     prev = NULL;
1895     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1896       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1897       initCapability(cap);
1898       cap->link = prev;
1899       prev = cap;
1900     }
1901     free_capabilities = cap;
1902     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1903   }
1904   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1905                              n_free_capabilities););
1906 #else
1907   initCapability(&MainCapability);
1908 #endif
1909
1910 #if /* defined(SMP) ||*/ defined(PAR)
1911   initSparkPools();
1912 #endif
1913 }
1914
1915 #ifdef SMP
1916 void
1917 startTasks( void )
1918 {
1919   nat i;
1920   int r;
1921   pthread_t tid;
1922   
1923   /* make some space for saving all the thread ids */
1924   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1925                             "initScheduler:task_ids");
1926   
1927   /* and create all the threads */
1928   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1929     r = pthread_create(&tid,NULL,taskStart,NULL);
1930     if (r != 0) {
1931       barf("startTasks: Can't create new Posix thread");
1932     }
1933     task_ids[i].id = tid;
1934     task_ids[i].mut_time = 0.0;
1935     task_ids[i].mut_etime = 0.0;
1936     task_ids[i].gc_time = 0.0;
1937     task_ids[i].gc_etime = 0.0;
1938     task_ids[i].elapsedtimestart = stat_getElapsedTime();
1939     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1940   }
1941 }
1942 #endif
1943
1944 void
1945 exitScheduler( void )
1946 {
1947 #ifdef SMP
1948   nat i;
1949
1950   /* Don't want to use pthread_cancel, since we'd have to install
1951    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1952    * all our locks.
1953    */
1954 #if 0
1955   /* Cancel all our tasks */
1956   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1957     pthread_cancel(task_ids[i].id);
1958   }
1959   
1960   /* Wait for all the tasks to terminate */
1961   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1962     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1963                                task_ids[i].id));
1964     pthread_join(task_ids[i].id, NULL);
1965   }
1966 #endif
1967
1968   /* Send 'em all a SIGHUP.  That should shut 'em up.
1969    */
1970   await_death = RtsFlags.ParFlags.nNodes;
1971   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1972     pthread_kill(task_ids[i].id,SIGTERM);
1973   }
1974   while (await_death > 0) {
1975     sched_yield();
1976   }
1977 #endif
1978 }
1979
1980 /* -----------------------------------------------------------------------------
1981    Managing the per-task allocation areas.
1982    
1983    Each capability comes with an allocation area.  These are
1984    fixed-length block lists into which allocation can be done.
1985
1986    ToDo: no support for two-space collection at the moment???
1987    -------------------------------------------------------------------------- */
1988
1989 /* -----------------------------------------------------------------------------
1990  * waitThread is the external interface for running a new computation
1991  * and waiting for the result.
1992  *
1993  * In the non-SMP case, we create a new main thread, push it on the 
1994  * main-thread stack, and invoke the scheduler to run it.  The
1995  * scheduler will return when the top main thread on the stack has
1996  * completed or died, and fill in the necessary fields of the
1997  * main_thread structure.
1998  *
1999  * In the SMP case, we create a main thread as before, but we then
2000  * create a new condition variable and sleep on it.  When our new
2001  * main thread has completed, we'll be woken up and the status/result
2002  * will be in the main_thread struct.
2003  * -------------------------------------------------------------------------- */
2004
2005 int 
2006 howManyThreadsAvail ( void )
2007 {
2008    int i = 0;
2009    StgTSO* q;
2010    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2011       i++;
2012    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2013       i++;
2014    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2015       i++;
2016    return i;
2017 }
2018
2019 void
2020 finishAllThreads ( void )
2021 {
2022    do {
2023       while (run_queue_hd != END_TSO_QUEUE) {
2024          waitThread ( run_queue_hd, NULL );
2025       }
2026       while (blocked_queue_hd != END_TSO_QUEUE) {
2027          waitThread ( blocked_queue_hd, NULL );
2028       }
2029       while (sleeping_queue != END_TSO_QUEUE) {
2030          waitThread ( blocked_queue_hd, NULL );
2031       }
2032    } while 
2033       (blocked_queue_hd != END_TSO_QUEUE || 
2034        run_queue_hd     != END_TSO_QUEUE ||
2035        sleeping_queue   != END_TSO_QUEUE);
2036 }
2037
2038 SchedulerStatus
2039 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2040 {
2041   StgMainThread *m;
2042   SchedulerStatus stat;
2043
2044   ACQUIRE_LOCK(&sched_mutex);
2045   
2046   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2047
2048   m->tso = tso;
2049   m->ret = ret;
2050   m->stat = NoStatus;
2051 #ifdef SMP
2052   pthread_cond_init(&m->wakeup, NULL);
2053 #endif
2054
2055   m->link = main_threads;
2056   main_threads = m;
2057
2058   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2059                               m->tso->id));
2060
2061 #ifdef SMP
2062   do {
2063     pthread_cond_wait(&m->wakeup, &sched_mutex);
2064   } while (m->stat == NoStatus);
2065 #elif defined(GRAN)
2066   /* GranSim specific init */
2067   CurrentTSO = m->tso;                // the TSO to run
2068   procStatus[MainProc] = Busy;        // status of main PE
2069   CurrentProc = MainProc;             // PE to run it on
2070
2071   schedule();
2072 #else
2073   schedule();
2074   ASSERT(m->stat != NoStatus);
2075 #endif
2076
2077   stat = m->stat;
2078
2079 #ifdef SMP
2080   pthread_cond_destroy(&m->wakeup);
2081 #endif
2082
2083   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2084                               m->tso->id));
2085   free(m);
2086
2087   RELEASE_LOCK(&sched_mutex);
2088
2089   return stat;
2090 }
2091
2092 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2093 //@subsection Run queue code 
2094
2095 #if 0
2096 /* 
2097    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2098        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2099        implicit global variable that has to be correct when calling these
2100        fcts -- HWL 
2101 */
2102
2103 /* Put the new thread on the head of the runnable queue.
2104  * The caller of createThread better push an appropriate closure
2105  * on this thread's stack before the scheduler is invoked.
2106  */
2107 static /* inline */ void
2108 add_to_run_queue(tso)
2109 StgTSO* tso; 
2110 {
2111   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2112   tso->link = run_queue_hd;
2113   run_queue_hd = tso;
2114   if (run_queue_tl == END_TSO_QUEUE) {
2115     run_queue_tl = tso;
2116   }
2117 }
2118
2119 /* Put the new thread at the end of the runnable queue. */
2120 static /* inline */ void
2121 push_on_run_queue(tso)
2122 StgTSO* tso; 
2123 {
2124   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2125   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2126   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2127   if (run_queue_hd == END_TSO_QUEUE) {
2128     run_queue_hd = tso;
2129   } else {
2130     run_queue_tl->link = tso;
2131   }
2132   run_queue_tl = tso;
2133 }
2134
2135 /* 
2136    Should be inlined because it's used very often in schedule.  The tso
2137    argument is actually only needed in GranSim, where we want to have the
2138    possibility to schedule *any* TSO on the run queue, irrespective of the
2139    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2140    the run queue and dequeue the tso, adjusting the links in the queue. 
2141 */
2142 //@cindex take_off_run_queue
2143 static /* inline */ StgTSO*
2144 take_off_run_queue(StgTSO *tso) {
2145   StgTSO *t, *prev;
2146
2147   /* 
2148      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2149
2150      if tso is specified, unlink that tso from the run_queue (doesn't have
2151      to be at the beginning of the queue); GranSim only 
2152   */
2153   if (tso!=END_TSO_QUEUE) {
2154     /* find tso in queue */
2155     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2156          t!=END_TSO_QUEUE && t!=tso;
2157          prev=t, t=t->link) 
2158       /* nothing */ ;
2159     ASSERT(t==tso);
2160     /* now actually dequeue the tso */
2161     if (prev!=END_TSO_QUEUE) {
2162       ASSERT(run_queue_hd!=t);
2163       prev->link = t->link;
2164     } else {
2165       /* t is at beginning of thread queue */
2166       ASSERT(run_queue_hd==t);
2167       run_queue_hd = t->link;
2168     }
2169     /* t is at end of thread queue */
2170     if (t->link==END_TSO_QUEUE) {
2171       ASSERT(t==run_queue_tl);
2172       run_queue_tl = prev;
2173     } else {
2174       ASSERT(run_queue_tl!=t);
2175     }
2176     t->link = END_TSO_QUEUE;
2177   } else {
2178     /* take tso from the beginning of the queue; std concurrent code */
2179     t = run_queue_hd;
2180     if (t != END_TSO_QUEUE) {
2181       run_queue_hd = t->link;
2182       t->link = END_TSO_QUEUE;
2183       if (run_queue_hd == END_TSO_QUEUE) {
2184         run_queue_tl = END_TSO_QUEUE;
2185       }
2186     }
2187   }
2188   return t;
2189 }
2190
2191 #endif /* 0 */
2192
2193 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2194 //@subsection Garbage Collextion Routines
2195
2196 /* ---------------------------------------------------------------------------
2197    Where are the roots that we know about?
2198
2199         - all the threads on the runnable queue
2200         - all the threads on the blocked queue
2201         - all the threads on the sleeping queue
2202         - all the thread currently executing a _ccall_GC
2203         - all the "main threads"
2204      
2205    ------------------------------------------------------------------------ */
2206
2207 /* This has to be protected either by the scheduler monitor, or by the
2208         garbage collection monitor (probably the latter).
2209         KH @ 25/10/99
2210 */
2211
2212 void
2213 GetRoots(evac_fn evac)
2214 {
2215   StgMainThread *m;
2216
2217 #if defined(GRAN)
2218   {
2219     nat i;
2220     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2221       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2222           evac((StgClosure **)&run_queue_hds[i]);
2223       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2224           evac((StgClosure **)&run_queue_tls[i]);
2225       
2226       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2227           evac((StgClosure **)&blocked_queue_hds[i]);
2228       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2229           evac((StgClosure **)&blocked_queue_tls[i]);
2230       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2231           evac((StgClosure **)&ccalling_threads[i]);
2232     }
2233   }
2234
2235   markEventQueue();
2236
2237 #else /* !GRAN */
2238   if (run_queue_hd != END_TSO_QUEUE) {
2239       ASSERT(run_queue_tl != END_TSO_QUEUE);
2240       evac((StgClosure **)&run_queue_hd);
2241       evac((StgClosure **)&run_queue_tl);
2242   }
2243   
2244   if (blocked_queue_hd != END_TSO_QUEUE) {
2245       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2246       evac((StgClosure **)&blocked_queue_hd);
2247       evac((StgClosure **)&blocked_queue_tl);
2248   }
2249   
2250   if (sleeping_queue != END_TSO_QUEUE) {
2251       evac((StgClosure **)&sleeping_queue);
2252   }
2253 #endif 
2254
2255   for (m = main_threads; m != NULL; m = m->link) {
2256       evac((StgClosure **)&m->tso);
2257   }
2258   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2259       evac((StgClosure **)&suspended_ccalling_threads);
2260   }
2261
2262 #if defined(PAR) || defined(GRAN)
2263   markSparkQueue(evac);
2264 #endif
2265 }
2266
2267 /* -----------------------------------------------------------------------------
2268    performGC
2269
2270    This is the interface to the garbage collector from Haskell land.
2271    We provide this so that external C code can allocate and garbage
2272    collect when called from Haskell via _ccall_GC.
2273
2274    It might be useful to provide an interface whereby the programmer
2275    can specify more roots (ToDo).
2276    
2277    This needs to be protected by the GC condition variable above.  KH.
2278    -------------------------------------------------------------------------- */
2279
2280 void (*extra_roots)(evac_fn);
2281
2282 void
2283 performGC(void)
2284 {
2285   GarbageCollect(GetRoots,rtsFalse);
2286 }
2287
2288 void
2289 performMajorGC(void)
2290 {
2291   GarbageCollect(GetRoots,rtsTrue);
2292 }
2293
2294 static void
2295 AllRoots(evac_fn evac)
2296 {
2297     GetRoots(evac);             // the scheduler's roots
2298     extra_roots(evac);          // the user's roots
2299 }
2300
2301 void
2302 performGCWithRoots(void (*get_roots)(evac_fn))
2303 {
2304   extra_roots = get_roots;
2305   GarbageCollect(AllRoots,rtsFalse);
2306 }
2307
2308 /* -----------------------------------------------------------------------------
2309    Stack overflow
2310
2311    If the thread has reached its maximum stack size, then raise the
2312    StackOverflow exception in the offending thread.  Otherwise
2313    relocate the TSO into a larger chunk of memory and adjust its stack
2314    size appropriately.
2315    -------------------------------------------------------------------------- */
2316
2317 static StgTSO *
2318 threadStackOverflow(StgTSO *tso)
2319 {
2320   nat new_stack_size, new_tso_size, diff, stack_words;
2321   StgPtr new_sp;
2322   StgTSO *dest;
2323
2324   IF_DEBUG(sanity,checkTSO(tso));
2325   if (tso->stack_size >= tso->max_stack_size) {
2326
2327     IF_DEBUG(gc,
2328              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2329                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2330              /* If we're debugging, just print out the top of the stack */
2331              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2332                                               tso->sp+64)));
2333
2334     /* Send this thread the StackOverflow exception */
2335     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2336     return tso;
2337   }
2338
2339   /* Try to double the current stack size.  If that takes us over the
2340    * maximum stack size for this thread, then use the maximum instead.
2341    * Finally round up so the TSO ends up as a whole number of blocks.
2342    */
2343   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2344   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2345                                        TSO_STRUCT_SIZE)/sizeof(W_);
2346   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2347   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2348
2349   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2350
2351   dest = (StgTSO *)allocate(new_tso_size);
2352   TICK_ALLOC_TSO(new_stack_size,0);
2353
2354   /* copy the TSO block and the old stack into the new area */
2355   memcpy(dest,tso,TSO_STRUCT_SIZE);
2356   stack_words = tso->stack + tso->stack_size - tso->sp;
2357   new_sp = (P_)dest + new_tso_size - stack_words;
2358   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2359
2360   /* relocate the stack pointers... */
2361   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2362   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2363   dest->sp    = new_sp;
2364   dest->stack_size = new_stack_size;
2365         
2366   /* and relocate the update frame list */
2367   relocate_stack(dest, diff);
2368
2369   /* Mark the old TSO as relocated.  We have to check for relocated
2370    * TSOs in the garbage collector and any primops that deal with TSOs.
2371    *
2372    * It's important to set the sp and su values to just beyond the end
2373    * of the stack, so we don't attempt to scavenge any part of the
2374    * dead TSO's stack.
2375    */
2376   tso->what_next = ThreadRelocated;
2377   tso->link = dest;
2378   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2379   tso->su = (StgUpdateFrame *)tso->sp;
2380   tso->why_blocked = NotBlocked;
2381   dest->mut_link = NULL;
2382
2383   IF_PAR_DEBUG(verbose,
2384                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2385                      tso->id, tso, tso->stack_size);
2386                /* If we're debugging, just print out the top of the stack */
2387                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2388                                                 tso->sp+64)));
2389   
2390   IF_DEBUG(sanity,checkTSO(tso));
2391 #if 0
2392   IF_DEBUG(scheduler,printTSO(dest));
2393 #endif
2394
2395   return dest;
2396 }
2397
2398 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2399 //@subsection Blocking Queue Routines
2400
2401 /* ---------------------------------------------------------------------------
2402    Wake up a queue that was blocked on some resource.
2403    ------------------------------------------------------------------------ */
2404
2405 #if defined(GRAN)
2406 static inline void
2407 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2408 {
2409 }
2410 #elif defined(PAR)
2411 static inline void
2412 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2413 {
2414   /* write RESUME events to log file and
2415      update blocked and fetch time (depending on type of the orig closure) */
2416   if (RtsFlags.ParFlags.ParStats.Full) {
2417     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2418                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2419                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2420     if (EMPTY_RUN_QUEUE())
2421       emitSchedule = rtsTrue;
2422
2423     switch (get_itbl(node)->type) {
2424         case FETCH_ME_BQ:
2425           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2426           break;
2427         case RBH:
2428         case FETCH_ME:
2429         case BLACKHOLE_BQ:
2430           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2431           break;
2432 #ifdef DIST
2433         case MVAR:
2434           break;
2435 #endif    
2436         default:
2437           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2438         }
2439       }
2440 }
2441 #endif
2442
2443 #if defined(GRAN)
2444 static StgBlockingQueueElement *
2445 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2446 {
2447     StgTSO *tso;
2448     PEs node_loc, tso_loc;
2449
2450     node_loc = where_is(node); // should be lifted out of loop
2451     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2452     tso_loc = where_is((StgClosure *)tso);
2453     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2454       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2455       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2456       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2457       // insertThread(tso, node_loc);
2458       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2459                 ResumeThread,
2460                 tso, node, (rtsSpark*)NULL);
2461       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2462       // len_local++;
2463       // len++;
2464     } else { // TSO is remote (actually should be FMBQ)
2465       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2466                                   RtsFlags.GranFlags.Costs.gunblocktime +
2467                                   RtsFlags.GranFlags.Costs.latency;
2468       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2469                 UnblockThread,
2470                 tso, node, (rtsSpark*)NULL);
2471       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2472       // len++;
2473     }
2474     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2475     IF_GRAN_DEBUG(bq,
2476                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2477                           (node_loc==tso_loc ? "Local" : "Global"), 
2478                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2479     tso->block_info.closure = NULL;
2480     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2481                              tso->id, tso));
2482 }
2483 #elif defined(PAR)
2484 static StgBlockingQueueElement *
2485 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2486 {
2487     StgBlockingQueueElement *next;
2488
2489     switch (get_itbl(bqe)->type) {
2490     case TSO:
2491       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2492       /* if it's a TSO just push it onto the run_queue */
2493       next = bqe->link;
2494       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2495       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2496       THREAD_RUNNABLE();
2497       unblockCount(bqe, node);
2498       /* reset blocking status after dumping event */
2499       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2500       break;
2501
2502     case BLOCKED_FETCH:
2503       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2504       next = bqe->link;
2505       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2506       PendingFetches = (StgBlockedFetch *)bqe;
2507       break;
2508
2509 # if defined(DEBUG)
2510       /* can ignore this case in a non-debugging setup; 
2511          see comments on RBHSave closures above */
2512     case CONSTR:
2513       /* check that the closure is an RBHSave closure */
2514       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2515              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2516              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2517       break;
2518
2519     default:
2520       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2521            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2522            (StgClosure *)bqe);
2523 # endif
2524     }
2525   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2526   return next;
2527 }
2528
2529 #else /* !GRAN && !PAR */
2530 static StgTSO *
2531 unblockOneLocked(StgTSO *tso)
2532 {
2533   StgTSO *next;
2534
2535   ASSERT(get_itbl(tso)->type == TSO);
2536   ASSERT(tso->why_blocked != NotBlocked);
2537   tso->why_blocked = NotBlocked;
2538   next = tso->link;
2539   PUSH_ON_RUN_QUEUE(tso);
2540   THREAD_RUNNABLE();
2541   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2542   return next;
2543 }
2544 #endif
2545
2546 #if defined(GRAN) || defined(PAR)
2547 inline StgBlockingQueueElement *
2548 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2549 {
2550   ACQUIRE_LOCK(&sched_mutex);
2551   bqe = unblockOneLocked(bqe, node);
2552   RELEASE_LOCK(&sched_mutex);
2553   return bqe;
2554 }
2555 #else
2556 inline StgTSO *
2557 unblockOne(StgTSO *tso)
2558 {
2559   ACQUIRE_LOCK(&sched_mutex);
2560   tso = unblockOneLocked(tso);
2561   RELEASE_LOCK(&sched_mutex);
2562   return tso;
2563 }
2564 #endif
2565
2566 #if defined(GRAN)
2567 void 
2568 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2569 {
2570   StgBlockingQueueElement *bqe;
2571   PEs node_loc;
2572   nat len = 0; 
2573
2574   IF_GRAN_DEBUG(bq, 
2575                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2576                       node, CurrentProc, CurrentTime[CurrentProc], 
2577                       CurrentTSO->id, CurrentTSO));
2578
2579   node_loc = where_is(node);
2580
2581   ASSERT(q == END_BQ_QUEUE ||
2582          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2583          get_itbl(q)->type == CONSTR); // closure (type constructor)
2584   ASSERT(is_unique(node));
2585
2586   /* FAKE FETCH: magically copy the node to the tso's proc;
2587      no Fetch necessary because in reality the node should not have been 
2588      moved to the other PE in the first place
2589   */
2590   if (CurrentProc!=node_loc) {
2591     IF_GRAN_DEBUG(bq, 
2592                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2593                         node, node_loc, CurrentProc, CurrentTSO->id, 
2594                         // CurrentTSO, where_is(CurrentTSO),
2595                         node->header.gran.procs));
2596     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2597     IF_GRAN_DEBUG(bq, 
2598                   belch("## new bitmask of node %p is %#x",
2599                         node, node->header.gran.procs));
2600     if (RtsFlags.GranFlags.GranSimStats.Global) {
2601       globalGranStats.tot_fake_fetches++;
2602     }
2603   }
2604
2605   bqe = q;
2606   // ToDo: check: ASSERT(CurrentProc==node_loc);
2607   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2608     //next = bqe->link;
2609     /* 
2610        bqe points to the current element in the queue
2611        next points to the next element in the queue
2612     */
2613     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2614     //tso_loc = where_is(tso);
2615     len++;
2616     bqe = unblockOneLocked(bqe, node);
2617   }
2618
2619   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2620      the closure to make room for the anchor of the BQ */
2621   if (bqe!=END_BQ_QUEUE) {
2622     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2623     /*
2624     ASSERT((info_ptr==&RBH_Save_0_info) ||
2625            (info_ptr==&RBH_Save_1_info) ||
2626            (info_ptr==&RBH_Save_2_info));
2627     */
2628     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2629     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2630     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2631
2632     IF_GRAN_DEBUG(bq,
2633                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2634                         node, info_type(node)));
2635   }
2636
2637   /* statistics gathering */
2638   if (RtsFlags.GranFlags.GranSimStats.Global) {
2639     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2640     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2641     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2642     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2643   }
2644   IF_GRAN_DEBUG(bq,
2645                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2646                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2647 }
2648 #elif defined(PAR)
2649 void 
2650 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2651 {
2652   StgBlockingQueueElement *bqe;
2653
2654   ACQUIRE_LOCK(&sched_mutex);
2655
2656   IF_PAR_DEBUG(verbose, 
2657                belch("##-_ AwBQ for node %p on [%x]: ",
2658                      node, mytid));
2659 #ifdef DIST  
2660   //RFP
2661   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2662     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2663     return;
2664   }
2665 #endif
2666   
2667   ASSERT(q == END_BQ_QUEUE ||
2668          get_itbl(q)->type == TSO ||           
2669          get_itbl(q)->type == BLOCKED_FETCH || 
2670          get_itbl(q)->type == CONSTR); 
2671
2672   bqe = q;
2673   while (get_itbl(bqe)->type==TSO || 
2674          get_itbl(bqe)->type==BLOCKED_FETCH) {
2675     bqe = unblockOneLocked(bqe, node);
2676   }
2677   RELEASE_LOCK(&sched_mutex);
2678 }
2679
2680 #else   /* !GRAN && !PAR */
2681 void
2682 awakenBlockedQueue(StgTSO *tso)
2683 {
2684   ACQUIRE_LOCK(&sched_mutex);
2685   while (tso != END_TSO_QUEUE) {
2686     tso = unblockOneLocked(tso);
2687   }
2688   RELEASE_LOCK(&sched_mutex);
2689 }
2690 #endif
2691
2692 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2693 //@subsection Exception Handling Routines
2694
2695 /* ---------------------------------------------------------------------------
2696    Interrupt execution
2697    - usually called inside a signal handler so it mustn't do anything fancy.   
2698    ------------------------------------------------------------------------ */
2699
2700 void
2701 interruptStgRts(void)
2702 {
2703     interrupted    = 1;
2704     context_switch = 1;
2705 }
2706
2707 /* -----------------------------------------------------------------------------
2708    Unblock a thread
2709
2710    This is for use when we raise an exception in another thread, which
2711    may be blocked.
2712    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2713    -------------------------------------------------------------------------- */
2714
2715 #if defined(GRAN) || defined(PAR)
2716 /*
2717   NB: only the type of the blocking queue is different in GranSim and GUM
2718       the operations on the queue-elements are the same
2719       long live polymorphism!
2720 */
2721 static void
2722 unblockThread(StgTSO *tso)
2723 {
2724   StgBlockingQueueElement *t, **last;
2725
2726   ACQUIRE_LOCK(&sched_mutex);
2727   switch (tso->why_blocked) {
2728
2729   case NotBlocked:
2730     return;  /* not blocked */
2731
2732   case BlockedOnMVar:
2733     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2734     {
2735       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2736       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2737
2738       last = (StgBlockingQueueElement **)&mvar->head;
2739       for (t = (StgBlockingQueueElement *)mvar->head; 
2740            t != END_BQ_QUEUE; 
2741            last = &t->link, last_tso = t, t = t->link) {
2742         if (t == (StgBlockingQueueElement *)tso) {
2743           *last = (StgBlockingQueueElement *)tso->link;
2744           if (mvar->tail == tso) {
2745             mvar->tail = (StgTSO *)last_tso;
2746           }
2747           goto done;
2748         }
2749       }
2750       barf("unblockThread (MVAR): TSO not found");
2751     }
2752
2753   case BlockedOnBlackHole:
2754     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2755     {
2756       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2757
2758       last = &bq->blocking_queue;
2759       for (t = bq->blocking_queue; 
2760            t != END_BQ_QUEUE; 
2761            last = &t->link, t = t->link) {
2762         if (t == (StgBlockingQueueElement *)tso) {
2763           *last = (StgBlockingQueueElement *)tso->link;
2764           goto done;
2765         }
2766       }
2767       barf("unblockThread (BLACKHOLE): TSO not found");
2768     }
2769
2770   case BlockedOnException:
2771     {
2772       StgTSO *target  = tso->block_info.tso;
2773
2774       ASSERT(get_itbl(target)->type == TSO);
2775
2776       if (target->what_next == ThreadRelocated) {
2777           target = target->link;
2778           ASSERT(get_itbl(target)->type == TSO);
2779       }
2780
2781       ASSERT(target->blocked_exceptions != NULL);
2782
2783       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2784       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2785            t != END_BQ_QUEUE; 
2786            last = &t->link, t = t->link) {
2787         ASSERT(get_itbl(t)->type == TSO);
2788         if (t == (StgBlockingQueueElement *)tso) {
2789           *last = (StgBlockingQueueElement *)tso->link;
2790           goto done;
2791         }
2792       }
2793       barf("unblockThread (Exception): TSO not found");
2794     }
2795
2796   case BlockedOnRead:
2797   case BlockedOnWrite:
2798     {
2799       /* take TSO off blocked_queue */
2800       StgBlockingQueueElement *prev = NULL;
2801       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2802            prev = t, t = t->link) {
2803         if (t == (StgBlockingQueueElement *)tso) {
2804           if (prev == NULL) {
2805             blocked_queue_hd = (StgTSO *)t->link;
2806             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2807               blocked_queue_tl = END_TSO_QUEUE;
2808             }
2809           } else {
2810             prev->link = t->link;
2811             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2812               blocked_queue_tl = (StgTSO *)prev;
2813             }
2814           }
2815           goto done;
2816         }
2817       }
2818       barf("unblockThread (I/O): TSO not found");
2819     }
2820
2821   case BlockedOnDelay:
2822     {
2823       /* take TSO off sleeping_queue */
2824       StgBlockingQueueElement *prev = NULL;
2825       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2826            prev = t, t = t->link) {
2827         if (t == (StgBlockingQueueElement *)tso) {
2828           if (prev == NULL) {
2829             sleeping_queue = (StgTSO *)t->link;
2830           } else {
2831             prev->link = t->link;
2832           }
2833           goto done;
2834         }
2835       }
2836       barf("unblockThread (I/O): TSO not found");
2837     }
2838
2839   default:
2840     barf("unblockThread");
2841   }
2842
2843  done:
2844   tso->link = END_TSO_QUEUE;
2845   tso->why_blocked = NotBlocked;
2846   tso->block_info.closure = NULL;
2847   PUSH_ON_RUN_QUEUE(tso);
2848   RELEASE_LOCK(&sched_mutex);
2849 }
2850 #else
2851 static void
2852 unblockThread(StgTSO *tso)
2853 {
2854   StgTSO *t, **last;
2855
2856   ACQUIRE_LOCK(&sched_mutex);
2857   switch (tso->why_blocked) {
2858
2859   case NotBlocked:
2860     return;  /* not blocked */
2861
2862   case BlockedOnMVar:
2863     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2864     {
2865       StgTSO *last_tso = END_TSO_QUEUE;
2866       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2867
2868       last = &mvar->head;
2869       for (t = mvar->head; t != END_TSO_QUEUE; 
2870            last = &t->link, last_tso = t, t = t->link) {
2871         if (t == tso) {
2872           *last = tso->link;
2873           if (mvar->tail == tso) {
2874             mvar->tail = last_tso;
2875           }
2876           goto done;
2877         }
2878       }
2879       barf("unblockThread (MVAR): TSO not found");
2880     }
2881
2882   case BlockedOnBlackHole:
2883     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2884     {
2885       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2886
2887       last = &bq->blocking_queue;
2888       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2889            last = &t->link, t = t->link) {
2890         if (t == tso) {
2891           *last = tso->link;
2892           goto done;
2893         }
2894       }
2895       barf("unblockThread (BLACKHOLE): TSO not found");
2896     }
2897
2898   case BlockedOnException:
2899     {
2900       StgTSO *target  = tso->block_info.tso;
2901
2902       ASSERT(get_itbl(target)->type == TSO);
2903
2904       while (target->what_next == ThreadRelocated) {
2905           target = target->link;
2906           ASSERT(get_itbl(target)->type == TSO);
2907       }
2908       
2909       ASSERT(target->blocked_exceptions != NULL);
2910
2911       last = &target->blocked_exceptions;
2912       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2913            last = &t->link, t = t->link) {
2914         ASSERT(get_itbl(t)->type == TSO);
2915         if (t == tso) {
2916           *last = tso->link;
2917           goto done;
2918         }
2919       }
2920       barf("unblockThread (Exception): TSO not found");
2921     }
2922
2923   case BlockedOnRead:
2924   case BlockedOnWrite:
2925     {
2926       StgTSO *prev = NULL;
2927       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2928            prev = t, t = t->link) {
2929         if (t == tso) {
2930           if (prev == NULL) {
2931             blocked_queue_hd = t->link;
2932             if (blocked_queue_tl == t) {
2933               blocked_queue_tl = END_TSO_QUEUE;
2934             }
2935           } else {
2936             prev->link = t->link;
2937             if (blocked_queue_tl == t) {
2938               blocked_queue_tl = prev;
2939             }
2940           }
2941           goto done;
2942         }
2943       }
2944       barf("unblockThread (I/O): TSO not found");
2945     }
2946
2947   case BlockedOnDelay:
2948     {
2949       StgTSO *prev = NULL;
2950       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2951            prev = t, t = t->link) {
2952         if (t == tso) {
2953           if (prev == NULL) {
2954             sleeping_queue = t->link;
2955           } else {
2956             prev->link = t->link;
2957           }
2958           goto done;
2959         }
2960       }
2961       barf("unblockThread (I/O): TSO not found");
2962     }
2963
2964   default:
2965     barf("unblockThread");
2966   }
2967
2968  done:
2969   tso->link = END_TSO_QUEUE;
2970   tso->why_blocked = NotBlocked;
2971   tso->block_info.closure = NULL;
2972   PUSH_ON_RUN_QUEUE(tso);
2973   RELEASE_LOCK(&sched_mutex);
2974 }
2975 #endif
2976
2977 /* -----------------------------------------------------------------------------
2978  * raiseAsync()
2979  *
2980  * The following function implements the magic for raising an
2981  * asynchronous exception in an existing thread.
2982  *
2983  * We first remove the thread from any queue on which it might be
2984  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2985  *
2986  * We strip the stack down to the innermost CATCH_FRAME, building
2987  * thunks in the heap for all the active computations, so they can 
2988  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2989  * an application of the handler to the exception, and push it on
2990  * the top of the stack.
2991  * 
2992  * How exactly do we save all the active computations?  We create an
2993  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2994  * AP_UPDs pushes everything from the corresponding update frame
2995  * upwards onto the stack.  (Actually, it pushes everything up to the
2996  * next update frame plus a pointer to the next AP_UPD object.
2997  * Entering the next AP_UPD object pushes more onto the stack until we
2998  * reach the last AP_UPD object - at which point the stack should look
2999  * exactly as it did when we killed the TSO and we can continue
3000  * execution by entering the closure on top of the stack.
3001  *
3002  * We can also kill a thread entirely - this happens if either (a) the 
3003  * exception passed to raiseAsync is NULL, or (b) there's no
3004  * CATCH_FRAME on the stack.  In either case, we strip the entire
3005  * stack and replace the thread with a zombie.
3006  *
3007  * -------------------------------------------------------------------------- */
3008  
3009 void 
3010 deleteThread(StgTSO *tso)
3011 {
3012   raiseAsync(tso,NULL);
3013 }
3014
3015 void
3016 raiseAsync(StgTSO *tso, StgClosure *exception)
3017 {
3018   StgUpdateFrame* su = tso->su;
3019   StgPtr          sp = tso->sp;
3020   
3021   /* Thread already dead? */
3022   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3023     return;
3024   }
3025
3026   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3027
3028   /* Remove it from any blocking queues */
3029   unblockThread(tso);
3030
3031   /* The stack freezing code assumes there's a closure pointer on
3032    * the top of the stack.  This isn't always the case with compiled
3033    * code, so we have to push a dummy closure on the top which just
3034    * returns to the next return address on the stack.
3035    */
3036   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3037     *(--sp) = (W_)&stg_dummy_ret_closure;
3038   }
3039
3040   while (1) {
3041     nat words = ((P_)su - (P_)sp) - 1;
3042     nat i;
3043     StgAP_UPD * ap;
3044
3045     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3046      * then build PAP(handler,exception,realworld#), and leave it on
3047      * top of the stack ready to enter.
3048      */
3049     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3050       StgCatchFrame *cf = (StgCatchFrame *)su;
3051       /* we've got an exception to raise, so let's pass it to the
3052        * handler in this frame.
3053        */
3054       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3055       TICK_ALLOC_UPD_PAP(3,0);
3056       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3057               
3058       ap->n_args = 2;
3059       ap->fun = cf->handler;    /* :: Exception -> IO a */
3060       ap->payload[0] = exception;
3061       ap->payload[1] = ARG_TAG(0); /* realworld token */
3062
3063       /* throw away the stack from Sp up to and including the
3064        * CATCH_FRAME.
3065        */
3066       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3067       tso->su = cf->link;
3068
3069       /* Restore the blocked/unblocked state for asynchronous exceptions
3070        * at the CATCH_FRAME.  
3071        *
3072        * If exceptions were unblocked at the catch, arrange that they
3073        * are unblocked again after executing the handler by pushing an
3074        * unblockAsyncExceptions_ret stack frame.
3075        */
3076       if (!cf->exceptions_blocked) {
3077         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3078       }
3079       
3080       /* Ensure that async exceptions are blocked when running the handler.
3081        */
3082       if (tso->blocked_exceptions == NULL) {
3083         tso->blocked_exceptions = END_TSO_QUEUE;
3084       }
3085       
3086       /* Put the newly-built PAP on top of the stack, ready to execute
3087        * when the thread restarts.
3088        */
3089       sp[0] = (W_)ap;
3090       tso->sp = sp;
3091       tso->what_next = ThreadEnterGHC;
3092       IF_DEBUG(sanity, checkTSO(tso));
3093       return;
3094     }
3095
3096     /* First build an AP_UPD consisting of the stack chunk above the
3097      * current update frame, with the top word on the stack as the
3098      * fun field.
3099      */
3100     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3101     
3102     ASSERT(words >= 0);
3103     
3104     ap->n_args = words;
3105     ap->fun    = (StgClosure *)sp[0];
3106     sp++;
3107     for(i=0; i < (nat)words; ++i) {
3108       ap->payload[i] = (StgClosure *)*sp++;
3109     }
3110     
3111     switch (get_itbl(su)->type) {
3112       
3113     case UPDATE_FRAME:
3114       {
3115         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3116         TICK_ALLOC_UP_THK(words+1,0);
3117         
3118         IF_DEBUG(scheduler,
3119                  fprintf(stderr,  "scheduler: Updating ");
3120                  printPtr((P_)su->updatee); 
3121                  fprintf(stderr,  " with ");
3122                  printObj((StgClosure *)ap);
3123                  );
3124         
3125         /* Replace the updatee with an indirection - happily
3126          * this will also wake up any threads currently
3127          * waiting on the result.
3128          *
3129          * Warning: if we're in a loop, more than one update frame on
3130          * the stack may point to the same object.  Be careful not to
3131          * overwrite an IND_OLDGEN in this case, because we'll screw
3132          * up the mutable lists.  To be on the safe side, don't
3133          * overwrite any kind of indirection at all.  See also
3134          * threadSqueezeStack in GC.c, where we have to make a similar
3135          * check.
3136          */
3137         if (!closure_IND(su->updatee)) {
3138             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3139         }
3140         su = su->link;
3141         sp += sizeofW(StgUpdateFrame) -1;
3142         sp[0] = (W_)ap; /* push onto stack */
3143         break;
3144       }
3145
3146     case CATCH_FRAME:
3147       {
3148         StgCatchFrame *cf = (StgCatchFrame *)su;
3149         StgClosure* o;
3150         
3151         /* We want a PAP, not an AP_UPD.  Fortunately, the
3152          * layout's the same.
3153          */
3154         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3155         TICK_ALLOC_UPD_PAP(words+1,0);
3156         
3157         /* now build o = FUN(catch,ap,handler) */
3158         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3159         TICK_ALLOC_FUN(2,0);
3160         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3161         o->payload[0] = (StgClosure *)ap;
3162         o->payload[1] = cf->handler;
3163         
3164         IF_DEBUG(scheduler,
3165                  fprintf(stderr,  "scheduler: Built ");
3166                  printObj((StgClosure *)o);
3167                  );
3168         
3169         /* pop the old handler and put o on the stack */
3170         su = cf->link;
3171         sp += sizeofW(StgCatchFrame) - 1;
3172         sp[0] = (W_)o;
3173         break;
3174       }
3175       
3176     case SEQ_FRAME:
3177       {
3178         StgSeqFrame *sf = (StgSeqFrame *)su;
3179         StgClosure* o;
3180         
3181         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3182         TICK_ALLOC_UPD_PAP(words+1,0);
3183         
3184         /* now build o = FUN(seq,ap) */
3185         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3186         TICK_ALLOC_SE_THK(1,0);
3187         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3188         o->payload[0] = (StgClosure *)ap;
3189         
3190         IF_DEBUG(scheduler,
3191                  fprintf(stderr,  "scheduler: Built ");
3192                  printObj((StgClosure *)o);
3193                  );
3194         
3195         /* pop the old handler and put o on the stack */
3196         su = sf->link;
3197         sp += sizeofW(StgSeqFrame) - 1;
3198         sp[0] = (W_)o;
3199         break;
3200       }
3201       
3202     case STOP_FRAME:
3203       /* We've stripped the entire stack, the thread is now dead. */
3204       sp += sizeofW(StgStopFrame) - 1;
3205       sp[0] = (W_)exception;    /* save the exception */
3206       tso->what_next = ThreadKilled;
3207       tso->su = (StgUpdateFrame *)(sp+1);
3208       tso->sp = sp;
3209       return;
3210
3211     default:
3212       barf("raiseAsync");
3213     }
3214   }
3215   barf("raiseAsync");
3216 }
3217
3218 /* -----------------------------------------------------------------------------
3219    resurrectThreads is called after garbage collection on the list of
3220    threads found to be garbage.  Each of these threads will be woken
3221    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3222    on an MVar, or NonTermination if the thread was blocked on a Black
3223    Hole.
3224    -------------------------------------------------------------------------- */
3225
3226 void
3227 resurrectThreads( StgTSO *threads )
3228 {
3229   StgTSO *tso, *next;
3230
3231   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3232     next = tso->global_link;
3233     tso->global_link = all_threads;
3234     all_threads = tso;
3235     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3236
3237     switch (tso->why_blocked) {
3238     case BlockedOnMVar:
3239     case BlockedOnException:
3240       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3241       break;
3242     case BlockedOnBlackHole:
3243       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3244       break;
3245     case NotBlocked:
3246       /* This might happen if the thread was blocked on a black hole
3247        * belonging to a thread that we've just woken up (raiseAsync
3248        * can wake up threads, remember...).
3249        */
3250       continue;
3251     default:
3252       barf("resurrectThreads: thread blocked in a strange way");
3253     }
3254   }
3255 }
3256
3257 /* -----------------------------------------------------------------------------
3258  * Blackhole detection: if we reach a deadlock, test whether any
3259  * threads are blocked on themselves.  Any threads which are found to
3260  * be self-blocked get sent a NonTermination exception.
3261  *
3262  * This is only done in a deadlock situation in order to avoid
3263  * performance overhead in the normal case.
3264  * -------------------------------------------------------------------------- */
3265
3266 static void
3267 detectBlackHoles( void )
3268 {
3269     StgTSO *t = all_threads;
3270     StgUpdateFrame *frame;
3271     StgClosure *blocked_on;
3272
3273     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3274
3275         while (t->what_next == ThreadRelocated) {
3276             t = t->link;
3277             ASSERT(get_itbl(t)->type == TSO);
3278         }
3279       
3280         if (t->why_blocked != BlockedOnBlackHole) {
3281             continue;
3282         }
3283
3284         blocked_on = t->block_info.closure;
3285
3286         for (frame = t->su; ; frame = frame->link) {
3287             switch (get_itbl(frame)->type) {
3288
3289             case UPDATE_FRAME:
3290                 if (frame->updatee == blocked_on) {
3291                     /* We are blocking on one of our own computations, so
3292                      * send this thread the NonTermination exception.  
3293                      */
3294                     IF_DEBUG(scheduler, 
3295                              sched_belch("thread %d is blocked on itself", t->id));
3296                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3297                     goto done;
3298                 }
3299                 else {
3300                     continue;
3301                 }
3302
3303             case CATCH_FRAME:
3304             case SEQ_FRAME:
3305                 continue;
3306                 
3307             case STOP_FRAME:
3308                 break;
3309             }
3310             break;
3311         }
3312
3313     done: ;
3314     }   
3315 }
3316
3317 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3318 //@subsection Debugging Routines
3319
3320 /* -----------------------------------------------------------------------------
3321    Debugging: why is a thread blocked
3322    -------------------------------------------------------------------------- */
3323
3324 #ifdef DEBUG
3325
3326 void
3327 printThreadBlockage(StgTSO *tso)
3328 {
3329   switch (tso->why_blocked) {
3330   case BlockedOnRead:
3331     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3332     break;
3333   case BlockedOnWrite:
3334     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3335     break;
3336   case BlockedOnDelay:
3337     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3338     break;
3339   case BlockedOnMVar:
3340     fprintf(stderr,"is blocked on an MVar");
3341     break;
3342   case BlockedOnException:
3343     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3344             tso->block_info.tso->id);
3345     break;
3346   case BlockedOnBlackHole:
3347     fprintf(stderr,"is blocked on a black hole");
3348     break;
3349   case NotBlocked:
3350     fprintf(stderr,"is not blocked");
3351     break;
3352 #if defined(PAR)
3353   case BlockedOnGA:
3354     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3355             tso->block_info.closure, info_type(tso->block_info.closure));
3356     break;
3357   case BlockedOnGA_NoSend:
3358     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3359             tso->block_info.closure, info_type(tso->block_info.closure));
3360     break;
3361 #endif
3362   default:
3363     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3364          tso->why_blocked, tso->id, tso);
3365   }
3366 }
3367
3368 void
3369 printThreadStatus(StgTSO *tso)
3370 {
3371   switch (tso->what_next) {
3372   case ThreadKilled:
3373     fprintf(stderr,"has been killed");
3374     break;
3375   case ThreadComplete:
3376     fprintf(stderr,"has completed");
3377     break;
3378   default:
3379     printThreadBlockage(tso);
3380   }
3381 }
3382
3383 void
3384 printAllThreads(void)
3385 {
3386   StgTSO *t;
3387
3388 # if defined(GRAN)
3389   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3390   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3391                        time_string, rtsFalse/*no commas!*/);
3392
3393   sched_belch("all threads at [%s]:", time_string);
3394 # elif defined(PAR)
3395   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3396   ullong_format_string(CURRENT_TIME,
3397                        time_string, rtsFalse/*no commas!*/);
3398
3399   sched_belch("all threads at [%s]:", time_string);
3400 # else
3401   sched_belch("all threads:");
3402 # endif
3403
3404   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3405     fprintf(stderr, "\tthread %d ", t->id);
3406     printThreadStatus(t);
3407     fprintf(stderr,"\n");
3408   }
3409 }
3410     
3411 /* 
3412    Print a whole blocking queue attached to node (debugging only).
3413 */
3414 //@cindex print_bq
3415 # if defined(PAR)
3416 void 
3417 print_bq (StgClosure *node)
3418 {
3419   StgBlockingQueueElement *bqe;
3420   StgTSO *tso;
3421   rtsBool end;
3422
3423   fprintf(stderr,"## BQ of closure %p (%s): ",
3424           node, info_type(node));
3425
3426   /* should cover all closures that may have a blocking queue */
3427   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3428          get_itbl(node)->type == FETCH_ME_BQ ||
3429          get_itbl(node)->type == RBH ||
3430          get_itbl(node)->type == MVAR);
3431     
3432   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3433
3434   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3435 }
3436
3437 /* 
3438    Print a whole blocking queue starting with the element bqe.
3439 */
3440 void 
3441 print_bqe (StgBlockingQueueElement *bqe)
3442 {
3443   rtsBool end;
3444
3445   /* 
3446      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3447   */
3448   for (end = (bqe==END_BQ_QUEUE);
3449        !end; // iterate until bqe points to a CONSTR
3450        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3451        bqe = end ? END_BQ_QUEUE : bqe->link) {
3452     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3453     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3454     /* types of closures that may appear in a blocking queue */
3455     ASSERT(get_itbl(bqe)->type == TSO ||           
3456            get_itbl(bqe)->type == BLOCKED_FETCH || 
3457            get_itbl(bqe)->type == CONSTR); 
3458     /* only BQs of an RBH end with an RBH_Save closure */
3459     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3460
3461     switch (get_itbl(bqe)->type) {
3462     case TSO:
3463       fprintf(stderr," TSO %u (%x),",
3464               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3465       break;
3466     case BLOCKED_FETCH:
3467       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3468               ((StgBlockedFetch *)bqe)->node, 
3469               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3470               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3471               ((StgBlockedFetch *)bqe)->ga.weight);
3472       break;
3473     case CONSTR:
3474       fprintf(stderr," %s (IP %p),",
3475               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3476                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3477                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3478                "RBH_Save_?"), get_itbl(bqe));
3479       break;
3480     default:
3481       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3482            info_type((StgClosure *)bqe)); // , node, info_type(node));
3483       break;
3484     }
3485   } /* for */
3486   fputc('\n', stderr);
3487 }
3488 # elif defined(GRAN)
3489 void 
3490 print_bq (StgClosure *node)
3491 {
3492   StgBlockingQueueElement *bqe;
3493   PEs node_loc, tso_loc;
3494   rtsBool end;
3495
3496   /* should cover all closures that may have a blocking queue */
3497   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3498          get_itbl(node)->type == FETCH_ME_BQ ||
3499          get_itbl(node)->type == RBH);
3500     
3501   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3502   node_loc = where_is(node);
3503
3504   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3505           node, info_type(node), node_loc);
3506
3507   /* 
3508      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3509   */
3510   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3511        !end; // iterate until bqe points to a CONSTR
3512        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3513     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3514     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3515     /* types of closures that may appear in a blocking queue */
3516     ASSERT(get_itbl(bqe)->type == TSO ||           
3517            get_itbl(bqe)->type == CONSTR); 
3518     /* only BQs of an RBH end with an RBH_Save closure */
3519     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3520
3521     tso_loc = where_is((StgClosure *)bqe);
3522     switch (get_itbl(bqe)->type) {
3523     case TSO:
3524       fprintf(stderr," TSO %d (%p) on [PE %d],",
3525               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3526       break;
3527     case CONSTR:
3528       fprintf(stderr," %s (IP %p),",
3529               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3530                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3531                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3532                "RBH_Save_?"), get_itbl(bqe));
3533       break;
3534     default:
3535       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3536            info_type((StgClosure *)bqe), node, info_type(node));
3537       break;
3538     }
3539   } /* for */
3540   fputc('\n', stderr);
3541 }
3542 #else
3543 /* 
3544    Nice and easy: only TSOs on the blocking queue
3545 */
3546 void 
3547 print_bq (StgClosure *node)
3548 {
3549   StgTSO *tso;
3550
3551   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3552   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3553        tso != END_TSO_QUEUE; 
3554        tso=tso->link) {
3555     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3556     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3557     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3558   }
3559   fputc('\n', stderr);
3560 }
3561 # endif
3562
3563 #if defined(PAR)
3564 static nat
3565 run_queue_len(void)
3566 {
3567   nat i;
3568   StgTSO *tso;
3569
3570   for (i=0, tso=run_queue_hd; 
3571        tso != END_TSO_QUEUE;
3572        i++, tso=tso->link)
3573     /* nothing */
3574
3575   return i;
3576 }
3577 #endif
3578
3579 static void
3580 sched_belch(char *s, ...)
3581 {
3582   va_list ap;
3583   va_start(ap,s);
3584 #ifdef SMP
3585   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3586 #elif defined(PAR)
3587   fprintf(stderr, "== ");
3588 #else
3589   fprintf(stderr, "scheduler: ");
3590 #endif
3591   vfprintf(stderr, s, ap);
3592   fprintf(stderr, "\n");
3593 }
3594
3595 #endif /* DEBUG */
3596
3597
3598 //@node Index,  , Debugging Routines, Main scheduling code
3599 //@subsection Index
3600
3601 //@index
3602 //* MainRegTable::  @cindex\s-+MainRegTable
3603 //* StgMainThread::  @cindex\s-+StgMainThread
3604 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3605 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3606 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3607 //* context_switch::  @cindex\s-+context_switch
3608 //* createThread::  @cindex\s-+createThread
3609 //* free_capabilities::  @cindex\s-+free_capabilities
3610 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3611 //* initScheduler::  @cindex\s-+initScheduler
3612 //* interrupted::  @cindex\s-+interrupted
3613 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3614 //* next_thread_id::  @cindex\s-+next_thread_id
3615 //* print_bq::  @cindex\s-+print_bq
3616 //* run_queue_hd::  @cindex\s-+run_queue_hd
3617 //* run_queue_tl::  @cindex\s-+run_queue_tl
3618 //* sched_mutex::  @cindex\s-+sched_mutex
3619 //* schedule::  @cindex\s-+schedule
3620 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3621 //* task_ids::  @cindex\s-+task_ids
3622 //* term_mutex::  @cindex\s-+term_mutex
3623 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3624 //@end index