[project @ 2002-01-22 13:54:22 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.111 2002/01/22 13:54:22 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "PosixSource.h"
78 #include "Rts.h"
79 #include "SchedAPI.h"
80 #include "RtsUtils.h"
81 #include "RtsFlags.h"
82 #include "Storage.h"
83 #include "StgRun.h"
84 #include "StgStartup.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #ifdef PROFILING
99 #include "Proftimer.h"
100 #include "ProfHeap.h"
101 #endif
102 #if defined(GRAN) || defined(PAR)
103 # include "GranSimRts.h"
104 # include "GranSim.h"
105 # include "ParallelRts.h"
106 # include "Parallel.h"
107 # include "ParallelDebug.h"
108 # include "FetchMe.h"
109 # include "HLC.h"
110 #endif
111 #include "Sparks.h"
112
113 #include <stdarg.h>
114
115 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
116 //@subsection Variables and Data structures
117
118 /* Main threads:
119  *
120  * These are the threads which clients have requested that we run.  
121  *
122  * In an SMP build, we might have several concurrent clients all
123  * waiting for results, and each one will wait on a condition variable
124  * until the result is available.
125  *
126  * In non-SMP, clients are strictly nested: the first client calls
127  * into the RTS, which might call out again to C with a _ccall_GC, and
128  * eventually re-enter the RTS.
129  *
130  * Main threads information is kept in a linked list:
131  */
132 //@cindex StgMainThread
133 typedef struct StgMainThread_ {
134   StgTSO *         tso;
135   SchedulerStatus  stat;
136   StgClosure **    ret;
137 #ifdef SMP
138   pthread_cond_t wakeup;
139 #endif
140   struct StgMainThread_ *link;
141 } StgMainThread;
142
143 /* Main thread queue.
144  * Locks required: sched_mutex.
145  */
146 static StgMainThread *main_threads;
147
148 /* Thread queues.
149  * Locks required: sched_mutex.
150  */
151 #if defined(GRAN)
152
153 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
154 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
155
156 /* 
157    In GranSim we have a runable and a blocked queue for each processor.
158    In order to minimise code changes new arrays run_queue_hds/tls
159    are created. run_queue_hd is then a short cut (macro) for
160    run_queue_hds[CurrentProc] (see GranSim.h).
161    -- HWL
162 */
163 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
164 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
165 StgTSO *ccalling_threadss[MAX_PROC];
166 /* We use the same global list of threads (all_threads) in GranSim as in
167    the std RTS (i.e. we are cheating). However, we don't use this list in
168    the GranSim specific code at the moment (so we are only potentially
169    cheating).  */
170
171 #else /* !GRAN */
172
173 StgTSO *run_queue_hd, *run_queue_tl;
174 StgTSO *blocked_queue_hd, *blocked_queue_tl;
175 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
176
177 #endif
178
179 /* Linked list of all threads.
180  * Used for detecting garbage collected threads.
181  */
182 StgTSO *all_threads;
183
184 /* Threads suspended in _ccall_GC.
185  */
186 static StgTSO *suspended_ccalling_threads;
187
188 static StgTSO *threadStackOverflow(StgTSO *tso);
189
190 /* KH: The following two flags are shared memory locations.  There is no need
191        to lock them, since they are only unset at the end of a scheduler
192        operation.
193 */
194
195 /* flag set by signal handler to precipitate a context switch */
196 //@cindex context_switch
197 nat context_switch;
198
199 /* if this flag is set as well, give up execution */
200 //@cindex interrupted
201 rtsBool interrupted;
202
203 /* Next thread ID to allocate.
204  * Locks required: sched_mutex
205  */
206 //@cindex next_thread_id
207 StgThreadID next_thread_id = 1;
208
209 /*
210  * Pointers to the state of the current thread.
211  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
212  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
213  */
214  
215 /* The smallest stack size that makes any sense is:
216  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
217  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
218  *  + 1                       (the realworld token for an IO thread)
219  *  + 1                       (the closure to enter)
220  *
221  * A thread with this stack will bomb immediately with a stack
222  * overflow, which will increase its stack size.  
223  */
224
225 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
226
227 /* Free capability list.
228  * Locks required: sched_mutex.
229  */
230 #ifdef SMP
231 Capability *free_capabilities; /* Available capabilities for running threads */
232 nat n_free_capabilities;       /* total number of available capabilities */
233 #else
234 Capability MainCapability;     /* for non-SMP, we have one global capability */
235 #endif
236
237 #if defined(GRAN)
238 StgTSO *CurrentTSO;
239 #endif
240
241 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
242  *  exists - earlier gccs apparently didn't.
243  *  -= chak
244  */
245 StgTSO dummy_tso;
246
247 rtsBool ready_to_gc;
248
249 /* All our current task ids, saved in case we need to kill them later.
250  */
251 #ifdef SMP
252 //@cindex task_ids
253 task_info *task_ids;
254 #endif
255
256 void            addToBlockedQueue ( StgTSO *tso );
257
258 static void     schedule          ( void );
259        void     interruptStgRts   ( void );
260 #if defined(GRAN)
261 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
262 #else
263 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
264 #endif
265
266 static void     detectBlackHoles  ( void );
267
268 #ifdef DEBUG
269 static void sched_belch(char *s, ...);
270 #endif
271
272 #ifdef SMP
273 //@cindex sched_mutex
274 //@cindex term_mutex
275 //@cindex thread_ready_cond
276 //@cindex gc_pending_cond
277 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
278 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
279 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
280 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
281
282 nat await_death;
283 #endif
284
285 #if defined(PAR)
286 StgTSO *LastTSO;
287 rtsTime TimeOfLastYield;
288 rtsBool emitSchedule = rtsTrue;
289 #endif
290
291 #if DEBUG
292 char *whatNext_strs[] = {
293   "ThreadEnterGHC",
294   "ThreadRunGHC",
295   "ThreadEnterInterp",
296   "ThreadKilled",
297   "ThreadComplete"
298 };
299
300 char *threadReturnCode_strs[] = {
301   "HeapOverflow",                       /* might also be StackOverflow */
302   "StackOverflow",
303   "ThreadYielding",
304   "ThreadBlocked",
305   "ThreadFinished"
306 };
307 #endif
308
309 #ifdef PAR
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);  
312 #endif
313
314 /*
315  * The thread state for the main thread.
316 // ToDo: check whether not needed any more
317 StgTSO   *MainTSO;
318  */
319
320 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
321 //@subsection Main scheduling loop
322
323 /* ---------------------------------------------------------------------------
324    Main scheduling loop.
325
326    We use round-robin scheduling, each thread returning to the
327    scheduler loop when one of these conditions is detected:
328
329       * out of heap space
330       * timer expires (thread yields)
331       * thread blocks
332       * thread ends
333       * stack overflow
334
335    Locking notes:  we acquire the scheduler lock once at the beginning
336    of the scheduler loop, and release it when
337     
338       * running a thread, or
339       * waiting for work, or
340       * waiting for a GC to complete.
341
342    GRAN version:
343      In a GranSim setup this loop iterates over the global event queue.
344      This revolves around the global event queue, which determines what 
345      to do next. Therefore, it's more complicated than either the 
346      concurrent or the parallel (GUM) setup.
347
348    GUM version:
349      GUM iterates over incoming messages.
350      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
351      and sends out a fish whenever it has nothing to do; in-between
352      doing the actual reductions (shared code below) it processes the
353      incoming messages and deals with delayed operations 
354      (see PendingFetches).
355      This is not the ugliest code you could imagine, but it's bloody close.
356
357    ------------------------------------------------------------------------ */
358 //@cindex schedule
359 static void
360 schedule( void )
361 {
362   StgTSO *t;
363   Capability *cap;
364   StgThreadReturnCode ret;
365 #if defined(GRAN)
366   rtsEvent *event;
367 #elif defined(PAR)
368   StgSparkPool *pool;
369   rtsSpark spark;
370   StgTSO *tso;
371   GlobalTaskId pe;
372   rtsBool receivedFinish = rtsFalse;
373 # if defined(DEBUG)
374   nat tp_size, sp_size; // stats only
375 # endif
376 #endif
377   rtsBool was_interrupted = rtsFalse;
378   
379   ACQUIRE_LOCK(&sched_mutex);
380
381 #if defined(GRAN)
382
383   /* set up first event to get things going */
384   /* ToDo: assign costs for system setup and init MainTSO ! */
385   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
386             ContinueThread, 
387             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
388
389   IF_DEBUG(gran,
390            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391            G_TSO(CurrentTSO, 5));
392
393   if (RtsFlags.GranFlags.Light) {
394     /* Save current time; GranSim Light only */
395     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
396   }      
397
398   event = get_next_event();
399
400   while (event!=(rtsEvent*)NULL) {
401     /* Choose the processor with the next event */
402     CurrentProc = event->proc;
403     CurrentTSO = event->tso;
404
405 #elif defined(PAR)
406
407   while (!receivedFinish) {    /* set by processMessages */
408                                /* when receiving PP_FINISH message         */ 
409 #else
410
411   while (1) {
412
413 #endif
414
415     IF_DEBUG(scheduler, printAllThreads());
416
417     /* If we're interrupted (the user pressed ^C, or some other
418      * termination condition occurred), kill all the currently running
419      * threads.
420      */
421     if (interrupted) {
422       IF_DEBUG(scheduler, sched_belch("interrupted"));
423       deleteAllThreads();
424       interrupted = rtsFalse;
425       was_interrupted = rtsTrue;
426     }
427
428     /* Go through the list of main threads and wake up any
429      * clients whose computations have finished.  ToDo: this
430      * should be done more efficiently without a linear scan
431      * of the main threads list, somehow...
432      */
433 #ifdef SMP
434     { 
435       StgMainThread *m, **prev;
436       prev = &main_threads;
437       for (m = main_threads; m != NULL; m = m->link) {
438         switch (m->tso->what_next) {
439         case ThreadComplete:
440           if (m->ret) {
441             *(m->ret) = (StgClosure *)m->tso->sp[0];
442           }
443           *prev = m->link;
444           m->stat = Success;
445           pthread_cond_broadcast(&m->wakeup);
446           break;
447         case ThreadKilled:
448           if (m->ret) *(m->ret) = NULL;
449           *prev = m->link;
450           if (was_interrupted) {
451             m->stat = Interrupted;
452           } else {
453             m->stat = Killed;
454           }
455           pthread_cond_broadcast(&m->wakeup);
456           break;
457         default:
458           break;
459         }
460       }
461     }
462
463 #else // not SMP
464
465 # if defined(PAR)
466     /* in GUM do this only on the Main PE */
467     if (IAmMainThread)
468 # endif
469     /* If our main thread has finished or been killed, return.
470      */
471     {
472       StgMainThread *m = main_threads;
473       if (m->tso->what_next == ThreadComplete
474           || m->tso->what_next == ThreadKilled) {
475         main_threads = main_threads->link;
476         if (m->tso->what_next == ThreadComplete) {
477           /* we finished successfully, fill in the return value */
478           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
479           m->stat = Success;
480           return;
481         } else {
482           if (m->ret) { *(m->ret) = NULL; };
483           if (was_interrupted) {
484             m->stat = Interrupted;
485           } else {
486             m->stat = Killed;
487           }
488           return;
489         }
490       }
491     }
492 #endif
493
494     /* Top up the run queue from our spark pool.  We try to make the
495      * number of threads in the run queue equal to the number of
496      * free capabilities.
497      */
498 #if defined(SMP)
499     {
500       nat n = n_free_capabilities;
501       StgTSO *tso = run_queue_hd;
502
503       /* Count the run queue */
504       while (n > 0 && tso != END_TSO_QUEUE) {
505         tso = tso->link;
506         n--;
507       }
508
509       for (; n > 0; n--) {
510         StgClosure *spark;
511         spark = findSpark(rtsFalse);
512         if (spark == NULL) {
513           break; /* no more sparks in the pool */
514         } else {
515           /* I'd prefer this to be done in activateSpark -- HWL */
516           /* tricky - it needs to hold the scheduler lock and
517            * not try to re-acquire it -- SDM */
518           createSparkThread(spark);       
519           IF_DEBUG(scheduler,
520                    sched_belch("==^^ turning spark of closure %p into a thread",
521                                (StgClosure *)spark));
522         }
523       }
524       /* We need to wake up the other tasks if we just created some
525        * work for them.
526        */
527       if (n_free_capabilities - n > 1) {
528           pthread_cond_signal(&thread_ready_cond);
529       }
530     }
531 #endif // SMP
532
533     /* check for signals each time around the scheduler */
534 #ifndef mingw32_TARGET_OS
535     if (signals_pending()) {
536       startSignalHandlers();
537     }
538 #endif
539
540     /* Check whether any waiting threads need to be woken up.  If the
541      * run queue is empty, and there are no other tasks running, we
542      * can wait indefinitely for something to happen.
543      * ToDo: what if another client comes along & requests another
544      * main thread?
545      */
546     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
547       awaitEvent(
548            (run_queue_hd == END_TSO_QUEUE)
549 #ifdef SMP
550         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
551 #endif
552         );
553     }
554     /* we can be interrupted while waiting for I/O... */
555     if (interrupted) continue;
556
557     /* 
558      * Detect deadlock: when we have no threads to run, there are no
559      * threads waiting on I/O or sleeping, and all the other tasks are
560      * waiting for work, we must have a deadlock of some description.
561      *
562      * We first try to find threads blocked on themselves (ie. black
563      * holes), and generate NonTermination exceptions where necessary.
564      *
565      * If no threads are black holed, we have a deadlock situation, so
566      * inform all the main threads.
567      */
568 #ifndef PAR
569     if (blocked_queue_hd == END_TSO_QUEUE
570         && run_queue_hd == END_TSO_QUEUE
571         && sleeping_queue == END_TSO_QUEUE
572 #ifdef SMP
573         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
574 #endif
575         )
576     {
577         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
578         GarbageCollect(GetRoots,rtsTrue);
579         if (blocked_queue_hd == END_TSO_QUEUE
580             && run_queue_hd == END_TSO_QUEUE
581             && sleeping_queue == END_TSO_QUEUE) {
582
583             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
584             detectBlackHoles();
585
586             // No black holes, so probably a real deadlock.  Send the
587             // current main thread the Deadlock exception (or in the SMP
588             // build, send *all* main threads the deadlock exception,
589             // since none of them can make progress).
590             if (run_queue_hd == END_TSO_QUEUE) {
591                 StgMainThread *m;
592 #ifdef SMP
593                 for (m = main_threads; m != NULL; m = m->link) {
594                     switch (m->tso->why_blocked) {
595                     case BlockedOnBlackHole:
596                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
597                         break;
598                     case BlockedOnException:
599                     case BlockedOnMVar:
600                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
601                         break;
602                     default:
603                         barf("deadlock: main thread blocked in a strange way");
604                     }
605                 }
606 #else
607                 m = main_threads;
608                 switch (m->tso->why_blocked) {
609                 case BlockedOnBlackHole:
610                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
611                     break;
612                 case BlockedOnException:
613                 case BlockedOnMVar:
614                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
615                     break;
616                 default:
617                     barf("deadlock: main thread blocked in a strange way");
618                 }
619 #endif
620             }
621             ASSERT( run_queue_hd != END_TSO_QUEUE );
622         }
623     }
624 #elif defined(PAR)
625     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
626 #endif
627
628 #ifdef SMP
629     /* If there's a GC pending, don't do anything until it has
630      * completed.
631      */
632     if (ready_to_gc) {
633       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
634       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
635     }
636     
637     /* block until we've got a thread on the run queue and a free
638      * capability.
639      */
640     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
641       IF_DEBUG(scheduler, sched_belch("waiting for work"));
642       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
643       IF_DEBUG(scheduler, sched_belch("work now available"));
644     }
645 #endif
646
647 #if defined(GRAN)
648
649     if (RtsFlags.GranFlags.Light)
650       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
651
652     /* adjust time based on time-stamp */
653     if (event->time > CurrentTime[CurrentProc] &&
654         event->evttype != ContinueThread)
655       CurrentTime[CurrentProc] = event->time;
656     
657     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
658     if (!RtsFlags.GranFlags.Light)
659       handleIdlePEs();
660
661     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
662
663     /* main event dispatcher in GranSim */
664     switch (event->evttype) {
665       /* Should just be continuing execution */
666     case ContinueThread:
667       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
668       /* ToDo: check assertion
669       ASSERT(run_queue_hd != (StgTSO*)NULL &&
670              run_queue_hd != END_TSO_QUEUE);
671       */
672       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
673       if (!RtsFlags.GranFlags.DoAsyncFetch &&
674           procStatus[CurrentProc]==Fetching) {
675         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
676               CurrentTSO->id, CurrentTSO, CurrentProc);
677         goto next_thread;
678       } 
679       /* Ignore ContinueThreads for completed threads */
680       if (CurrentTSO->what_next == ThreadComplete) {
681         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
682               CurrentTSO->id, CurrentTSO, CurrentProc);
683         goto next_thread;
684       } 
685       /* Ignore ContinueThreads for threads that are being migrated */
686       if (PROCS(CurrentTSO)==Nowhere) { 
687         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
688               CurrentTSO->id, CurrentTSO, CurrentProc);
689         goto next_thread;
690       }
691       /* The thread should be at the beginning of the run queue */
692       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
693         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
694               CurrentTSO->id, CurrentTSO, CurrentProc);
695         break; // run the thread anyway
696       }
697       /*
698       new_event(proc, proc, CurrentTime[proc],
699                 FindWork,
700                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
701       goto next_thread; 
702       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
703       break; // now actually run the thread; DaH Qu'vam yImuHbej 
704
705     case FetchNode:
706       do_the_fetchnode(event);
707       goto next_thread;             /* handle next event in event queue  */
708       
709     case GlobalBlock:
710       do_the_globalblock(event);
711       goto next_thread;             /* handle next event in event queue  */
712       
713     case FetchReply:
714       do_the_fetchreply(event);
715       goto next_thread;             /* handle next event in event queue  */
716       
717     case UnblockThread:   /* Move from the blocked queue to the tail of */
718       do_the_unblock(event);
719       goto next_thread;             /* handle next event in event queue  */
720       
721     case ResumeThread:  /* Move from the blocked queue to the tail of */
722       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
723       event->tso->gran.blocktime += 
724         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
725       do_the_startthread(event);
726       goto next_thread;             /* handle next event in event queue  */
727       
728     case StartThread:
729       do_the_startthread(event);
730       goto next_thread;             /* handle next event in event queue  */
731       
732     case MoveThread:
733       do_the_movethread(event);
734       goto next_thread;             /* handle next event in event queue  */
735       
736     case MoveSpark:
737       do_the_movespark(event);
738       goto next_thread;             /* handle next event in event queue  */
739       
740     case FindWork:
741       do_the_findwork(event);
742       goto next_thread;             /* handle next event in event queue  */
743       
744     default:
745       barf("Illegal event type %u\n", event->evttype);
746     }  /* switch */
747     
748     /* This point was scheduler_loop in the old RTS */
749
750     IF_DEBUG(gran, belch("GRAN: after main switch"));
751
752     TimeOfLastEvent = CurrentTime[CurrentProc];
753     TimeOfNextEvent = get_time_of_next_event();
754     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
755     // CurrentTSO = ThreadQueueHd;
756
757     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
758                          TimeOfNextEvent));
759
760     if (RtsFlags.GranFlags.Light) 
761       GranSimLight_leave_system(event, &ActiveTSO); 
762
763     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
764
765     IF_DEBUG(gran, 
766              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
767
768     /* in a GranSim setup the TSO stays on the run queue */
769     t = CurrentTSO;
770     /* Take a thread from the run queue. */
771     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
772
773     IF_DEBUG(gran, 
774              fprintf(stderr, "GRAN: About to run current thread, which is\n");
775              G_TSO(t,5));
776
777     context_switch = 0; // turned on via GranYield, checking events and time slice
778
779     IF_DEBUG(gran, 
780              DumpGranEvent(GR_SCHEDULE, t));
781
782     procStatus[CurrentProc] = Busy;
783
784 #elif defined(PAR)
785     if (PendingFetches != END_BF_QUEUE) {
786         processFetches();
787     }
788
789     /* ToDo: phps merge with spark activation above */
790     /* check whether we have local work and send requests if we have none */
791     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
792       /* :-[  no local threads => look out for local sparks */
793       /* the spark pool for the current PE */
794       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
795       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
796           pool->hd < pool->tl) {
797         /* 
798          * ToDo: add GC code check that we really have enough heap afterwards!!
799          * Old comment:
800          * If we're here (no runnable threads) and we have pending
801          * sparks, we must have a space problem.  Get enough space
802          * to turn one of those pending sparks into a
803          * thread... 
804          */
805
806         spark = findSpark(rtsFalse);                /* get a spark */
807         if (spark != (rtsSpark) NULL) {
808           tso = activateSpark(spark);       /* turn the spark into a thread */
809           IF_PAR_DEBUG(schedule,
810                        belch("==== schedule: Created TSO %d (%p); %d threads active",
811                              tso->id, tso, advisory_thread_count));
812
813           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
814             belch("==^^ failed to activate spark");
815             goto next_thread;
816           }               /* otherwise fall through & pick-up new tso */
817         } else {
818           IF_PAR_DEBUG(verbose,
819                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
820                              spark_queue_len(pool)));
821           goto next_thread;
822         }
823       }
824
825       /* If we still have no work we need to send a FISH to get a spark
826          from another PE 
827       */
828       if (EMPTY_RUN_QUEUE()) {
829       /* =8-[  no local sparks => look for work on other PEs */
830         /*
831          * We really have absolutely no work.  Send out a fish
832          * (there may be some out there already), and wait for
833          * something to arrive.  We clearly can't run any threads
834          * until a SCHEDULE or RESUME arrives, and so that's what
835          * we're hoping to see.  (Of course, we still have to
836          * respond to other types of messages.)
837          */
838         TIME now = msTime() /*CURRENT_TIME*/;
839         IF_PAR_DEBUG(verbose, 
840                      belch("--  now=%ld", now));
841         IF_PAR_DEBUG(verbose,
842                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
843                          (last_fish_arrived_at!=0 &&
844                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
845                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
846                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
847                              last_fish_arrived_at,
848                              RtsFlags.ParFlags.fishDelay, now);
849                      });
850         
851         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
852             (last_fish_arrived_at==0 ||
853              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
854           /* outstandingFishes is set in sendFish, processFish;
855              avoid flooding system with fishes via delay */
856           pe = choosePE();
857           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
858                    NEW_FISH_HUNGER);
859
860           // Global statistics: count no. of fishes
861           if (RtsFlags.ParFlags.ParStats.Global &&
862               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
863             globalParStats.tot_fish_mess++;
864           }
865         }
866       
867         receivedFinish = processMessages();
868         goto next_thread;
869       }
870     } else if (PacketsWaiting()) {  /* Look for incoming messages */
871       receivedFinish = processMessages();
872     }
873
874     /* Now we are sure that we have some work available */
875     ASSERT(run_queue_hd != END_TSO_QUEUE);
876
877     /* Take a thread from the run queue, if we have work */
878     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
879     IF_DEBUG(sanity,checkTSO(t));
880
881     /* ToDo: write something to the log-file
882     if (RTSflags.ParFlags.granSimStats && !sameThread)
883         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
884
885     CurrentTSO = t;
886     */
887     /* the spark pool for the current PE */
888     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
889
890     IF_DEBUG(scheduler, 
891              belch("--=^ %d threads, %d sparks on [%#x]", 
892                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
893
894 #if 1
895     if (0 && RtsFlags.ParFlags.ParStats.Full && 
896         t && LastTSO && t->id != LastTSO->id && 
897         LastTSO->why_blocked == NotBlocked && 
898         LastTSO->what_next != ThreadComplete) {
899       // if previously scheduled TSO not blocked we have to record the context switch
900       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
901                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
902     }
903
904     if (RtsFlags.ParFlags.ParStats.Full && 
905         (emitSchedule /* forced emit */ ||
906         (t && LastTSO && t->id != LastTSO->id))) {
907       /* 
908          we are running a different TSO, so write a schedule event to log file
909          NB: If we use fair scheduling we also have to write  a deschedule 
910              event for LastTSO; with unfair scheduling we know that the
911              previous tso has blocked whenever we switch to another tso, so
912              we don't need it in GUM for now
913       */
914       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
915                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
916       emitSchedule = rtsFalse;
917     }
918      
919 #endif
920 #else /* !GRAN && !PAR */
921   
922     /* grab a thread from the run queue
923      */
924     ASSERT(run_queue_hd != END_TSO_QUEUE);
925     t = POP_RUN_QUEUE();
926
927     // Sanity check the thread we're about to run.  This can be
928     // expensive if there is lots of thread switching going on...
929     IF_DEBUG(sanity,checkTSO(t));
930
931 #endif
932     
933     /* grab a capability
934      */
935 #ifdef SMP
936     cap = free_capabilities;
937     free_capabilities = cap->link;
938     n_free_capabilities--;
939 #else
940     cap = &MainCapability;
941 #endif
942
943     cap->r.rCurrentTSO = t;
944     
945     /* context switches are now initiated by the timer signal, unless
946      * the user specified "context switch as often as possible", with
947      * +RTS -C0
948      */
949     if (
950 #ifdef PROFILING
951         RtsFlags.ProfFlags.profileInterval == 0 ||
952 #endif
953         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
954          && (run_queue_hd != END_TSO_QUEUE
955              || blocked_queue_hd != END_TSO_QUEUE
956              || sleeping_queue != END_TSO_QUEUE)))
957         context_switch = 1;
958     else
959         context_switch = 0;
960
961     RELEASE_LOCK(&sched_mutex);
962
963     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
964                               t->id, t, whatNext_strs[t->what_next]));
965
966 #ifdef PROFILING
967     startHeapProfTimer();
968 #endif
969
970     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
971     /* Run the current thread 
972      */
973     switch (cap->r.rCurrentTSO->what_next) {
974     case ThreadKilled:
975     case ThreadComplete:
976         /* Thread already finished, return to scheduler. */
977         ret = ThreadFinished;
978         break;
979     case ThreadEnterGHC:
980         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
981         break;
982     case ThreadRunGHC:
983         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
984         break;
985     case ThreadEnterInterp:
986         ret = interpretBCO(cap);
987         break;
988     default:
989       barf("schedule: invalid what_next field");
990     }
991     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
992     
993     /* Costs for the scheduler are assigned to CCS_SYSTEM */
994 #ifdef PROFILING
995     stopHeapProfTimer();
996     CCCS = CCS_SYSTEM;
997 #endif
998     
999     ACQUIRE_LOCK(&sched_mutex);
1000
1001 #ifdef SMP
1002     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
1003 #elif !defined(GRAN) && !defined(PAR)
1004     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1005 #endif
1006     t = cap->r.rCurrentTSO;
1007     
1008 #if defined(PAR)
1009     /* HACK 675: if the last thread didn't yield, make sure to print a 
1010        SCHEDULE event to the log file when StgRunning the next thread, even
1011        if it is the same one as before */
1012     LastTSO = t; 
1013     TimeOfLastYield = CURRENT_TIME;
1014 #endif
1015
1016     switch (ret) {
1017     case HeapOverflow:
1018 #if defined(GRAN)
1019       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1020       globalGranStats.tot_heapover++;
1021 #elif defined(PAR)
1022       globalParStats.tot_heapover++;
1023 #endif
1024
1025       // did the task ask for a large block?
1026       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1027           // if so, get one and push it on the front of the nursery.
1028           bdescr *bd;
1029           nat blocks;
1030           
1031           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1032
1033           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1034                                    t->id, t,
1035                                    whatNext_strs[t->what_next], blocks));
1036
1037           // don't do this if it would push us over the
1038           // alloc_blocks_lim limit; we'll GC first.
1039           if (alloc_blocks + blocks < alloc_blocks_lim) {
1040
1041               alloc_blocks += blocks;
1042               bd = allocGroup( blocks );
1043
1044               // link the new group into the list
1045               bd->link = cap->r.rCurrentNursery;
1046               bd->u.back = cap->r.rCurrentNursery->u.back;
1047               if (cap->r.rCurrentNursery->u.back != NULL) {
1048                   cap->r.rCurrentNursery->u.back->link = bd;
1049               } else {
1050                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1051                          g0s0->blocks == cap->r.rNursery);
1052                   cap->r.rNursery = g0s0->blocks = bd;
1053               }           
1054               cap->r.rCurrentNursery->u.back = bd;
1055
1056               // initialise it as a nursery block
1057               bd->step = g0s0;
1058               bd->gen_no = 0;
1059               bd->flags = 0;
1060               bd->free = bd->start;
1061
1062               // don't forget to update the block count in g0s0.
1063               g0s0->n_blocks += blocks;
1064               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1065
1066               // now update the nursery to point to the new block
1067               cap->r.rCurrentNursery = bd;
1068
1069               // we might be unlucky and have another thread get on the
1070               // run queue before us and steal the large block, but in that
1071               // case the thread will just end up requesting another large
1072               // block.
1073               PUSH_ON_RUN_QUEUE(t);
1074               break;
1075           }
1076       }
1077
1078       /* make all the running tasks block on a condition variable,
1079        * maybe set context_switch and wait till they all pile in,
1080        * then have them wait on a GC condition variable.
1081        */
1082       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1083                                t->id, t, whatNext_strs[t->what_next]));
1084       threadPaused(t);
1085 #if defined(GRAN)
1086       ASSERT(!is_on_queue(t,CurrentProc));
1087 #elif defined(PAR)
1088       /* Currently we emit a DESCHEDULE event before GC in GUM.
1089          ToDo: either add separate event to distinguish SYSTEM time from rest
1090                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1091       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1092         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1093                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1094         emitSchedule = rtsTrue;
1095       }
1096 #endif
1097       
1098       ready_to_gc = rtsTrue;
1099       context_switch = 1;               /* stop other threads ASAP */
1100       PUSH_ON_RUN_QUEUE(t);
1101       /* actual GC is done at the end of the while loop */
1102       break;
1103       
1104     case StackOverflow:
1105 #if defined(GRAN)
1106       IF_DEBUG(gran, 
1107                DumpGranEvent(GR_DESCHEDULE, t));
1108       globalGranStats.tot_stackover++;
1109 #elif defined(PAR)
1110       // IF_DEBUG(par, 
1111       // DumpGranEvent(GR_DESCHEDULE, t);
1112       globalParStats.tot_stackover++;
1113 #endif
1114       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1115                                t->id, t, whatNext_strs[t->what_next]));
1116       /* just adjust the stack for this thread, then pop it back
1117        * on the run queue.
1118        */
1119       threadPaused(t);
1120       { 
1121         StgMainThread *m;
1122         /* enlarge the stack */
1123         StgTSO *new_t = threadStackOverflow(t);
1124         
1125         /* This TSO has moved, so update any pointers to it from the
1126          * main thread stack.  It better not be on any other queues...
1127          * (it shouldn't be).
1128          */
1129         for (m = main_threads; m != NULL; m = m->link) {
1130           if (m->tso == t) {
1131             m->tso = new_t;
1132           }
1133         }
1134         threadPaused(new_t);
1135         PUSH_ON_RUN_QUEUE(new_t);
1136       }
1137       break;
1138
1139     case ThreadYielding:
1140 #if defined(GRAN)
1141       IF_DEBUG(gran, 
1142                DumpGranEvent(GR_DESCHEDULE, t));
1143       globalGranStats.tot_yields++;
1144 #elif defined(PAR)
1145       // IF_DEBUG(par, 
1146       // DumpGranEvent(GR_DESCHEDULE, t);
1147       globalParStats.tot_yields++;
1148 #endif
1149       /* put the thread back on the run queue.  Then, if we're ready to
1150        * GC, check whether this is the last task to stop.  If so, wake
1151        * up the GC thread.  getThread will block during a GC until the
1152        * GC is finished.
1153        */
1154       IF_DEBUG(scheduler,
1155                if (t->what_next == ThreadEnterInterp) {
1156                    /* ToDo: or maybe a timer expired when we were in Hugs?
1157                     * or maybe someone hit ctrl-C
1158                     */
1159                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1160                          t->id, t, whatNext_strs[t->what_next]);
1161                } else {
1162                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1163                          t->id, t, whatNext_strs[t->what_next]);
1164                }
1165                );
1166
1167       threadPaused(t);
1168
1169       IF_DEBUG(sanity,
1170                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1171                checkTSO(t));
1172       ASSERT(t->link == END_TSO_QUEUE);
1173 #if defined(GRAN)
1174       ASSERT(!is_on_queue(t,CurrentProc));
1175
1176       IF_DEBUG(sanity,
1177                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1178                checkThreadQsSanity(rtsTrue));
1179 #endif
1180 #if defined(PAR)
1181       if (RtsFlags.ParFlags.doFairScheduling) { 
1182         /* this does round-robin scheduling; good for concurrency */
1183         APPEND_TO_RUN_QUEUE(t);
1184       } else {
1185         /* this does unfair scheduling; good for parallelism */
1186         PUSH_ON_RUN_QUEUE(t);
1187       }
1188 #else
1189       /* this does round-robin scheduling; good for concurrency */
1190       APPEND_TO_RUN_QUEUE(t);
1191 #endif
1192 #if defined(GRAN)
1193       /* add a ContinueThread event to actually process the thread */
1194       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1195                 ContinueThread,
1196                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1197       IF_GRAN_DEBUG(bq, 
1198                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1199                G_EVENTQ(0);
1200                G_CURR_THREADQ(0));
1201 #endif /* GRAN */
1202       break;
1203       
1204     case ThreadBlocked:
1205 #if defined(GRAN)
1206       IF_DEBUG(scheduler,
1207                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1208                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1209                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1210
1211       // ??? needed; should emit block before
1212       IF_DEBUG(gran, 
1213                DumpGranEvent(GR_DESCHEDULE, t)); 
1214       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1215       /*
1216         ngoq Dogh!
1217       ASSERT(procStatus[CurrentProc]==Busy || 
1218               ((procStatus[CurrentProc]==Fetching) && 
1219               (t->block_info.closure!=(StgClosure*)NULL)));
1220       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1221           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1222             procStatus[CurrentProc]==Fetching)) 
1223         procStatus[CurrentProc] = Idle;
1224       */
1225 #elif defined(PAR)
1226       IF_DEBUG(scheduler,
1227                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1228                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1229       IF_PAR_DEBUG(bq,
1230
1231                    if (t->block_info.closure!=(StgClosure*)NULL) 
1232                      print_bq(t->block_info.closure));
1233
1234       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1235       blockThread(t);
1236
1237       /* whatever we schedule next, we must log that schedule */
1238       emitSchedule = rtsTrue;
1239
1240 #else /* !GRAN */
1241       /* don't need to do anything.  Either the thread is blocked on
1242        * I/O, in which case we'll have called addToBlockedQueue
1243        * previously, or it's blocked on an MVar or Blackhole, in which
1244        * case it'll be on the relevant queue already.
1245        */
1246       IF_DEBUG(scheduler,
1247                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1248                printThreadBlockage(t);
1249                fprintf(stderr, "\n"));
1250
1251       /* Only for dumping event to log file 
1252          ToDo: do I need this in GranSim, too?
1253       blockThread(t);
1254       */
1255 #endif
1256       threadPaused(t);
1257       break;
1258       
1259     case ThreadFinished:
1260       /* Need to check whether this was a main thread, and if so, signal
1261        * the task that started it with the return value.  If we have no
1262        * more main threads, we probably need to stop all the tasks until
1263        * we get a new one.
1264        */
1265       /* We also end up here if the thread kills itself with an
1266        * uncaught exception, see Exception.hc.
1267        */
1268       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1269 #if defined(GRAN)
1270       endThread(t, CurrentProc); // clean-up the thread
1271 #elif defined(PAR)
1272       /* For now all are advisory -- HWL */
1273       //if(t->priority==AdvisoryPriority) ??
1274       advisory_thread_count--;
1275       
1276 # ifdef DIST
1277       if(t->dist.priority==RevalPriority)
1278         FinishReval(t);
1279 # endif
1280       
1281       if (RtsFlags.ParFlags.ParStats.Full &&
1282           !RtsFlags.ParFlags.ParStats.Suppressed) 
1283         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1284 #endif
1285       break;
1286       
1287     default:
1288       barf("schedule: invalid thread return code %d", (int)ret);
1289     }
1290     
1291 #ifdef SMP
1292     cap->link = free_capabilities;
1293     free_capabilities = cap;
1294     n_free_capabilities++;
1295 #endif
1296
1297 #ifdef PROFILING
1298     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1299         GarbageCollect(GetRoots, rtsTrue);
1300         heapCensus();
1301         performHeapProfile = rtsFalse;
1302         ready_to_gc = rtsFalse; // we already GC'd
1303     }
1304 #endif
1305
1306 #ifdef SMP
1307     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1308 #else
1309     if (ready_to_gc) 
1310 #endif
1311       {
1312       /* everybody back, start the GC.
1313        * Could do it in this thread, or signal a condition var
1314        * to do it in another thread.  Either way, we need to
1315        * broadcast on gc_pending_cond afterward.
1316        */
1317 #ifdef SMP
1318       IF_DEBUG(scheduler,sched_belch("doing GC"));
1319 #endif
1320       GarbageCollect(GetRoots,rtsFalse);
1321       ready_to_gc = rtsFalse;
1322 #ifdef SMP
1323       pthread_cond_broadcast(&gc_pending_cond);
1324 #endif
1325 #if defined(GRAN)
1326       /* add a ContinueThread event to continue execution of current thread */
1327       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1328                 ContinueThread,
1329                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1330       IF_GRAN_DEBUG(bq, 
1331                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1332                G_EVENTQ(0);
1333                G_CURR_THREADQ(0));
1334 #endif /* GRAN */
1335     }
1336
1337 #if defined(GRAN)
1338   next_thread:
1339     IF_GRAN_DEBUG(unused,
1340                   print_eventq(EventHd));
1341
1342     event = get_next_event();
1343 #elif defined(PAR)
1344   next_thread:
1345     /* ToDo: wait for next message to arrive rather than busy wait */
1346 #endif /* GRAN */
1347
1348   } /* end of while(1) */
1349
1350   IF_PAR_DEBUG(verbose,
1351                belch("== Leaving schedule() after having received Finish"));
1352 }
1353
1354 /* ---------------------------------------------------------------------------
1355  * deleteAllThreads():  kill all the live threads.
1356  *
1357  * This is used when we catch a user interrupt (^C), before performing
1358  * any necessary cleanups and running finalizers.
1359  * ------------------------------------------------------------------------- */
1360    
1361 void deleteAllThreads ( void )
1362 {
1363   StgTSO* t;
1364   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1365   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1366       deleteThread(t);
1367   }
1368   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1369       deleteThread(t);
1370   }
1371   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1372       deleteThread(t);
1373   }
1374   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1375   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1376   sleeping_queue = END_TSO_QUEUE;
1377 }
1378
1379 /* startThread and  insertThread are now in GranSim.c -- HWL */
1380
1381 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1382 //@subsection Suspend and Resume
1383
1384 /* ---------------------------------------------------------------------------
1385  * Suspending & resuming Haskell threads.
1386  * 
1387  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1388  * its capability before calling the C function.  This allows another
1389  * task to pick up the capability and carry on running Haskell
1390  * threads.  It also means that if the C call blocks, it won't lock
1391  * the whole system.
1392  *
1393  * The Haskell thread making the C call is put to sleep for the
1394  * duration of the call, on the susepended_ccalling_threads queue.  We
1395  * give out a token to the task, which it can use to resume the thread
1396  * on return from the C function.
1397  * ------------------------------------------------------------------------- */
1398    
1399 StgInt
1400 suspendThread( StgRegTable *reg )
1401 {
1402   nat tok;
1403   Capability *cap;
1404
1405   // assume that *reg is a pointer to the StgRegTable part of a Capability
1406   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1407
1408   ACQUIRE_LOCK(&sched_mutex);
1409
1410   IF_DEBUG(scheduler,
1411            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1412
1413   threadPaused(cap->r.rCurrentTSO);
1414   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1415   suspended_ccalling_threads = cap->r.rCurrentTSO;
1416
1417   /* Use the thread ID as the token; it should be unique */
1418   tok = cap->r.rCurrentTSO->id;
1419
1420 #ifdef SMP
1421   cap->link = free_capabilities;
1422   free_capabilities = cap;
1423   n_free_capabilities++;
1424 #endif
1425
1426   RELEASE_LOCK(&sched_mutex);
1427   return tok; 
1428 }
1429
1430 StgRegTable *
1431 resumeThread( StgInt tok )
1432 {
1433   StgTSO *tso, **prev;
1434   Capability *cap;
1435
1436   ACQUIRE_LOCK(&sched_mutex);
1437
1438   prev = &suspended_ccalling_threads;
1439   for (tso = suspended_ccalling_threads; 
1440        tso != END_TSO_QUEUE; 
1441        prev = &tso->link, tso = tso->link) {
1442     if (tso->id == (StgThreadID)tok) {
1443       *prev = tso->link;
1444       break;
1445     }
1446   }
1447   if (tso == END_TSO_QUEUE) {
1448     barf("resumeThread: thread not found");
1449   }
1450   tso->link = END_TSO_QUEUE;
1451
1452 #ifdef SMP
1453   while (free_capabilities == NULL) {
1454     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1455     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1456     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1457   }
1458   cap = free_capabilities;
1459   free_capabilities = cap->link;
1460   n_free_capabilities--;
1461 #else  
1462   cap = &MainCapability;
1463 #endif
1464
1465   cap->r.rCurrentTSO = tso;
1466
1467   RELEASE_LOCK(&sched_mutex);
1468   return &cap->r;
1469 }
1470
1471
1472 /* ---------------------------------------------------------------------------
1473  * Static functions
1474  * ------------------------------------------------------------------------ */
1475 static void unblockThread(StgTSO *tso);
1476
1477 /* ---------------------------------------------------------------------------
1478  * Comparing Thread ids.
1479  *
1480  * This is used from STG land in the implementation of the
1481  * instances of Eq/Ord for ThreadIds.
1482  * ------------------------------------------------------------------------ */
1483
1484 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1485
1486   StgThreadID id1 = tso1->id; 
1487   StgThreadID id2 = tso2->id;
1488  
1489   if (id1 < id2) return (-1);
1490   if (id1 > id2) return 1;
1491   return 0;
1492 }
1493
1494 /* ---------------------------------------------------------------------------
1495  * Fetching the ThreadID from an StgTSO.
1496  *
1497  * This is used in the implementation of Show for ThreadIds.
1498  * ------------------------------------------------------------------------ */
1499 int rts_getThreadId(const StgTSO *tso) 
1500 {
1501   return tso->id;
1502 }
1503
1504 /* ---------------------------------------------------------------------------
1505    Create a new thread.
1506
1507    The new thread starts with the given stack size.  Before the
1508    scheduler can run, however, this thread needs to have a closure
1509    (and possibly some arguments) pushed on its stack.  See
1510    pushClosure() in Schedule.h.
1511
1512    createGenThread() and createIOThread() (in SchedAPI.h) are
1513    convenient packaged versions of this function.
1514
1515    currently pri (priority) is only used in a GRAN setup -- HWL
1516    ------------------------------------------------------------------------ */
1517 //@cindex createThread
1518 #if defined(GRAN)
1519 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1520 StgTSO *
1521 createThread(nat stack_size, StgInt pri)
1522 {
1523   return createThread_(stack_size, rtsFalse, pri);
1524 }
1525
1526 static StgTSO *
1527 createThread_(nat size, rtsBool have_lock, StgInt pri)
1528 {
1529 #else
1530 StgTSO *
1531 createThread(nat stack_size)
1532 {
1533   return createThread_(stack_size, rtsFalse);
1534 }
1535
1536 static StgTSO *
1537 createThread_(nat size, rtsBool have_lock)
1538 {
1539 #endif
1540
1541     StgTSO *tso;
1542     nat stack_size;
1543
1544     /* First check whether we should create a thread at all */
1545 #if defined(PAR)
1546   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1547   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1548     threadsIgnored++;
1549     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1550           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1551     return END_TSO_QUEUE;
1552   }
1553   threadsCreated++;
1554 #endif
1555
1556 #if defined(GRAN)
1557   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1558 #endif
1559
1560   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1561
1562   /* catch ridiculously small stack sizes */
1563   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1564     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1565   }
1566
1567   stack_size = size - TSO_STRUCT_SIZEW;
1568
1569   tso = (StgTSO *)allocate(size);
1570   TICK_ALLOC_TSO(stack_size, 0);
1571
1572   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1573 #if defined(GRAN)
1574   SET_GRAN_HDR(tso, ThisPE);
1575 #endif
1576   tso->what_next     = ThreadEnterGHC;
1577
1578   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1579    * protect the increment operation on next_thread_id.
1580    * In future, we could use an atomic increment instead.
1581    */
1582   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1583   tso->id = next_thread_id++; 
1584   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1585
1586   tso->why_blocked  = NotBlocked;
1587   tso->blocked_exceptions = NULL;
1588
1589   tso->stack_size   = stack_size;
1590   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1591                               - TSO_STRUCT_SIZEW;
1592   tso->sp           = (P_)&(tso->stack) + stack_size;
1593
1594 #ifdef PROFILING
1595   tso->prof.CCCS = CCS_MAIN;
1596 #endif
1597
1598   /* put a stop frame on the stack */
1599   tso->sp -= sizeofW(StgStopFrame);
1600   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1601   tso->su = (StgUpdateFrame*)tso->sp;
1602
1603   // ToDo: check this
1604 #if defined(GRAN)
1605   tso->link = END_TSO_QUEUE;
1606   /* uses more flexible routine in GranSim */
1607   insertThread(tso, CurrentProc);
1608 #else
1609   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1610    * from its creation
1611    */
1612 #endif
1613
1614 #if defined(GRAN) 
1615   if (RtsFlags.GranFlags.GranSimStats.Full) 
1616     DumpGranEvent(GR_START,tso);
1617 #elif defined(PAR)
1618   if (RtsFlags.ParFlags.ParStats.Full) 
1619     DumpGranEvent(GR_STARTQ,tso);
1620   /* HACk to avoid SCHEDULE 
1621      LastTSO = tso; */
1622 #endif
1623
1624   /* Link the new thread on the global thread list.
1625    */
1626   tso->global_link = all_threads;
1627   all_threads = tso;
1628
1629 #if defined(DIST)
1630   tso->dist.priority = MandatoryPriority; //by default that is...
1631 #endif
1632
1633 #if defined(GRAN)
1634   tso->gran.pri = pri;
1635 # if defined(DEBUG)
1636   tso->gran.magic = TSO_MAGIC; // debugging only
1637 # endif
1638   tso->gran.sparkname   = 0;
1639   tso->gran.startedat   = CURRENT_TIME; 
1640   tso->gran.exported    = 0;
1641   tso->gran.basicblocks = 0;
1642   tso->gran.allocs      = 0;
1643   tso->gran.exectime    = 0;
1644   tso->gran.fetchtime   = 0;
1645   tso->gran.fetchcount  = 0;
1646   tso->gran.blocktime   = 0;
1647   tso->gran.blockcount  = 0;
1648   tso->gran.blockedat   = 0;
1649   tso->gran.globalsparks = 0;
1650   tso->gran.localsparks  = 0;
1651   if (RtsFlags.GranFlags.Light)
1652     tso->gran.clock  = Now; /* local clock */
1653   else
1654     tso->gran.clock  = 0;
1655
1656   IF_DEBUG(gran,printTSO(tso));
1657 #elif defined(PAR)
1658 # if defined(DEBUG)
1659   tso->par.magic = TSO_MAGIC; // debugging only
1660 # endif
1661   tso->par.sparkname   = 0;
1662   tso->par.startedat   = CURRENT_TIME; 
1663   tso->par.exported    = 0;
1664   tso->par.basicblocks = 0;
1665   tso->par.allocs      = 0;
1666   tso->par.exectime    = 0;
1667   tso->par.fetchtime   = 0;
1668   tso->par.fetchcount  = 0;
1669   tso->par.blocktime   = 0;
1670   tso->par.blockcount  = 0;
1671   tso->par.blockedat   = 0;
1672   tso->par.globalsparks = 0;
1673   tso->par.localsparks  = 0;
1674 #endif
1675
1676 #if defined(GRAN)
1677   globalGranStats.tot_threads_created++;
1678   globalGranStats.threads_created_on_PE[CurrentProc]++;
1679   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1680   globalGranStats.tot_sq_probes++;
1681 #elif defined(PAR)
1682   // collect parallel global statistics (currently done together with GC stats)
1683   if (RtsFlags.ParFlags.ParStats.Global &&
1684       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1685     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1686     globalParStats.tot_threads_created++;
1687   }
1688 #endif 
1689
1690 #if defined(GRAN)
1691   IF_GRAN_DEBUG(pri,
1692                 belch("==__ schedule: Created TSO %d (%p);",
1693                       CurrentProc, tso, tso->id));
1694 #elif defined(PAR)
1695     IF_PAR_DEBUG(verbose,
1696                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1697                        tso->id, tso, advisory_thread_count));
1698 #else
1699   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1700                                  tso->id, tso->stack_size));
1701 #endif    
1702   return tso;
1703 }
1704
1705 #if defined(PAR)
1706 /* RFP:
1707    all parallel thread creation calls should fall through the following routine.
1708 */
1709 StgTSO *
1710 createSparkThread(rtsSpark spark) 
1711 { StgTSO *tso;
1712   ASSERT(spark != (rtsSpark)NULL);
1713   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1714   { threadsIgnored++;
1715     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1716           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1717     return END_TSO_QUEUE;
1718   }
1719   else
1720   { threadsCreated++;
1721     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1722     if (tso==END_TSO_QUEUE)     
1723       barf("createSparkThread: Cannot create TSO");
1724 #if defined(DIST)
1725     tso->priority = AdvisoryPriority;
1726 #endif
1727     pushClosure(tso,spark);
1728     PUSH_ON_RUN_QUEUE(tso);
1729     advisory_thread_count++;    
1730   }
1731   return tso;
1732 }
1733 #endif
1734
1735 /*
1736   Turn a spark into a thread.
1737   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1738 */
1739 #if defined(PAR)
1740 //@cindex activateSpark
1741 StgTSO *
1742 activateSpark (rtsSpark spark) 
1743 {
1744   StgTSO *tso;
1745
1746   tso = createSparkThread(spark);
1747   if (RtsFlags.ParFlags.ParStats.Full) {   
1748     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1749     IF_PAR_DEBUG(verbose,
1750                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1751                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1752   }
1753   // ToDo: fwd info on local/global spark to thread -- HWL
1754   // tso->gran.exported =  spark->exported;
1755   // tso->gran.locked =   !spark->global;
1756   // tso->gran.sparkname = spark->name;
1757
1758   return tso;
1759 }
1760 #endif
1761
1762 /* ---------------------------------------------------------------------------
1763  * scheduleThread()
1764  *
1765  * scheduleThread puts a thread on the head of the runnable queue.
1766  * This will usually be done immediately after a thread is created.
1767  * The caller of scheduleThread must create the thread using e.g.
1768  * createThread and push an appropriate closure
1769  * on this thread's stack before the scheduler is invoked.
1770  * ------------------------------------------------------------------------ */
1771
1772 void
1773 scheduleThread(StgTSO *tso)
1774 {
1775   ACQUIRE_LOCK(&sched_mutex);
1776
1777   /* Put the new thread on the head of the runnable queue.  The caller
1778    * better push an appropriate closure on this thread's stack
1779    * beforehand.  In the SMP case, the thread may start running as
1780    * soon as we release the scheduler lock below.
1781    */
1782   PUSH_ON_RUN_QUEUE(tso);
1783   THREAD_RUNNABLE();
1784
1785 #if 0
1786   IF_DEBUG(scheduler,printTSO(tso));
1787 #endif
1788   RELEASE_LOCK(&sched_mutex);
1789 }
1790
1791 /* ---------------------------------------------------------------------------
1792  * startTasks()
1793  *
1794  * Start up Posix threads to run each of the scheduler tasks.
1795  * I believe the task ids are not needed in the system as defined.
1796  *  KH @ 25/10/99
1797  * ------------------------------------------------------------------------ */
1798
1799 #if defined(PAR) || defined(SMP)
1800 void
1801 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1802 {
1803   schedule();
1804 }
1805 #endif
1806
1807 /* ---------------------------------------------------------------------------
1808  * initScheduler()
1809  *
1810  * Initialise the scheduler.  This resets all the queues - if the
1811  * queues contained any threads, they'll be garbage collected at the
1812  * next pass.
1813  *
1814  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1815  * ------------------------------------------------------------------------ */
1816
1817 #ifdef SMP
1818 static void
1819 term_handler(int sig STG_UNUSED)
1820 {
1821   stat_workerStop();
1822   ACQUIRE_LOCK(&term_mutex);
1823   await_death--;
1824   RELEASE_LOCK(&term_mutex);
1825   pthread_exit(NULL);
1826 }
1827 #endif
1828
1829 static void
1830 initCapability( Capability *cap )
1831 {
1832     cap->f.stgChk0         = (F_)__stg_chk_0;
1833     cap->f.stgChk1         = (F_)__stg_chk_1;
1834     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
1835     cap->f.stgUpdatePAP    = (F_)__stg_update_PAP;
1836 }
1837
1838 void 
1839 initScheduler(void)
1840 {
1841 #if defined(GRAN)
1842   nat i;
1843
1844   for (i=0; i<=MAX_PROC; i++) {
1845     run_queue_hds[i]      = END_TSO_QUEUE;
1846     run_queue_tls[i]      = END_TSO_QUEUE;
1847     blocked_queue_hds[i]  = END_TSO_QUEUE;
1848     blocked_queue_tls[i]  = END_TSO_QUEUE;
1849     ccalling_threadss[i]  = END_TSO_QUEUE;
1850     sleeping_queue        = END_TSO_QUEUE;
1851   }
1852 #else
1853   run_queue_hd      = END_TSO_QUEUE;
1854   run_queue_tl      = END_TSO_QUEUE;
1855   blocked_queue_hd  = END_TSO_QUEUE;
1856   blocked_queue_tl  = END_TSO_QUEUE;
1857   sleeping_queue    = END_TSO_QUEUE;
1858 #endif 
1859
1860   suspended_ccalling_threads  = END_TSO_QUEUE;
1861
1862   main_threads = NULL;
1863   all_threads  = END_TSO_QUEUE;
1864
1865   context_switch = 0;
1866   interrupted    = 0;
1867
1868   RtsFlags.ConcFlags.ctxtSwitchTicks =
1869       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1870
1871   /* Install the SIGHUP handler */
1872 #ifdef SMP
1873   {
1874     struct sigaction action,oact;
1875
1876     action.sa_handler = term_handler;
1877     sigemptyset(&action.sa_mask);
1878     action.sa_flags = 0;
1879     if (sigaction(SIGTERM, &action, &oact) != 0) {
1880       barf("can't install TERM handler");
1881     }
1882   }
1883 #endif
1884
1885 #ifdef SMP
1886   /* Allocate N Capabilities */
1887   {
1888     nat i;
1889     Capability *cap, *prev;
1890     cap  = NULL;
1891     prev = NULL;
1892     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1893       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1894       initCapability(cap);
1895       cap->link = prev;
1896       prev = cap;
1897     }
1898     free_capabilities = cap;
1899     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1900   }
1901   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1902                              n_free_capabilities););
1903 #else
1904   initCapability(&MainCapability);
1905 #endif
1906
1907 #if defined(SMP) || defined(PAR)
1908   initSparkPools();
1909 #endif
1910 }
1911
1912 #ifdef SMP
1913 void
1914 startTasks( void )
1915 {
1916   nat i;
1917   int r;
1918   pthread_t tid;
1919   
1920   /* make some space for saving all the thread ids */
1921   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1922                             "initScheduler:task_ids");
1923   
1924   /* and create all the threads */
1925   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1926     r = pthread_create(&tid,NULL,taskStart,NULL);
1927     if (r != 0) {
1928       barf("startTasks: Can't create new Posix thread");
1929     }
1930     task_ids[i].id = tid;
1931     task_ids[i].mut_time = 0.0;
1932     task_ids[i].mut_etime = 0.0;
1933     task_ids[i].gc_time = 0.0;
1934     task_ids[i].gc_etime = 0.0;
1935     task_ids[i].elapsedtimestart = elapsedtime();
1936     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1937   }
1938 }
1939 #endif
1940
1941 void
1942 exitScheduler( void )
1943 {
1944 #ifdef SMP
1945   nat i;
1946
1947   /* Don't want to use pthread_cancel, since we'd have to install
1948    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1949    * all our locks.
1950    */
1951 #if 0
1952   /* Cancel all our tasks */
1953   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1954     pthread_cancel(task_ids[i].id);
1955   }
1956   
1957   /* Wait for all the tasks to terminate */
1958   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1959     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1960                                task_ids[i].id));
1961     pthread_join(task_ids[i].id, NULL);
1962   }
1963 #endif
1964
1965   /* Send 'em all a SIGHUP.  That should shut 'em up.
1966    */
1967   await_death = RtsFlags.ParFlags.nNodes;
1968   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1969     pthread_kill(task_ids[i].id,SIGTERM);
1970   }
1971   while (await_death > 0) {
1972     sched_yield();
1973   }
1974 #endif
1975 }
1976
1977 /* -----------------------------------------------------------------------------
1978    Managing the per-task allocation areas.
1979    
1980    Each capability comes with an allocation area.  These are
1981    fixed-length block lists into which allocation can be done.
1982
1983    ToDo: no support for two-space collection at the moment???
1984    -------------------------------------------------------------------------- */
1985
1986 /* -----------------------------------------------------------------------------
1987  * waitThread is the external interface for running a new computation
1988  * and waiting for the result.
1989  *
1990  * In the non-SMP case, we create a new main thread, push it on the 
1991  * main-thread stack, and invoke the scheduler to run it.  The
1992  * scheduler will return when the top main thread on the stack has
1993  * completed or died, and fill in the necessary fields of the
1994  * main_thread structure.
1995  *
1996  * In the SMP case, we create a main thread as before, but we then
1997  * create a new condition variable and sleep on it.  When our new
1998  * main thread has completed, we'll be woken up and the status/result
1999  * will be in the main_thread struct.
2000  * -------------------------------------------------------------------------- */
2001
2002 int 
2003 howManyThreadsAvail ( void )
2004 {
2005    int i = 0;
2006    StgTSO* q;
2007    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2008       i++;
2009    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2010       i++;
2011    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2012       i++;
2013    return i;
2014 }
2015
2016 void
2017 finishAllThreads ( void )
2018 {
2019    do {
2020       while (run_queue_hd != END_TSO_QUEUE) {
2021          waitThread ( run_queue_hd, NULL );
2022       }
2023       while (blocked_queue_hd != END_TSO_QUEUE) {
2024          waitThread ( blocked_queue_hd, NULL );
2025       }
2026       while (sleeping_queue != END_TSO_QUEUE) {
2027          waitThread ( blocked_queue_hd, NULL );
2028       }
2029    } while 
2030       (blocked_queue_hd != END_TSO_QUEUE || 
2031        run_queue_hd     != END_TSO_QUEUE ||
2032        sleeping_queue   != END_TSO_QUEUE);
2033 }
2034
2035 SchedulerStatus
2036 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2037 {
2038   StgMainThread *m;
2039   SchedulerStatus stat;
2040
2041   ACQUIRE_LOCK(&sched_mutex);
2042   
2043   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2044
2045   m->tso = tso;
2046   m->ret = ret;
2047   m->stat = NoStatus;
2048 #ifdef SMP
2049   pthread_cond_init(&m->wakeup, NULL);
2050 #endif
2051
2052   m->link = main_threads;
2053   main_threads = m;
2054
2055   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2056                               m->tso->id));
2057
2058 #ifdef SMP
2059   do {
2060     pthread_cond_wait(&m->wakeup, &sched_mutex);
2061   } while (m->stat == NoStatus);
2062 #elif defined(GRAN)
2063   /* GranSim specific init */
2064   CurrentTSO = m->tso;                // the TSO to run
2065   procStatus[MainProc] = Busy;        // status of main PE
2066   CurrentProc = MainProc;             // PE to run it on
2067
2068   schedule();
2069 #else
2070   schedule();
2071   ASSERT(m->stat != NoStatus);
2072 #endif
2073
2074   stat = m->stat;
2075
2076 #ifdef SMP
2077   pthread_cond_destroy(&m->wakeup);
2078 #endif
2079
2080   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2081                               m->tso->id));
2082   free(m);
2083
2084   RELEASE_LOCK(&sched_mutex);
2085
2086   return stat;
2087 }
2088
2089 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2090 //@subsection Run queue code 
2091
2092 #if 0
2093 /* 
2094    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2095        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2096        implicit global variable that has to be correct when calling these
2097        fcts -- HWL 
2098 */
2099
2100 /* Put the new thread on the head of the runnable queue.
2101  * The caller of createThread better push an appropriate closure
2102  * on this thread's stack before the scheduler is invoked.
2103  */
2104 static /* inline */ void
2105 add_to_run_queue(tso)
2106 StgTSO* tso; 
2107 {
2108   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2109   tso->link = run_queue_hd;
2110   run_queue_hd = tso;
2111   if (run_queue_tl == END_TSO_QUEUE) {
2112     run_queue_tl = tso;
2113   }
2114 }
2115
2116 /* Put the new thread at the end of the runnable queue. */
2117 static /* inline */ void
2118 push_on_run_queue(tso)
2119 StgTSO* tso; 
2120 {
2121   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2122   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2123   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2124   if (run_queue_hd == END_TSO_QUEUE) {
2125     run_queue_hd = tso;
2126   } else {
2127     run_queue_tl->link = tso;
2128   }
2129   run_queue_tl = tso;
2130 }
2131
2132 /* 
2133    Should be inlined because it's used very often in schedule.  The tso
2134    argument is actually only needed in GranSim, where we want to have the
2135    possibility to schedule *any* TSO on the run queue, irrespective of the
2136    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2137    the run queue and dequeue the tso, adjusting the links in the queue. 
2138 */
2139 //@cindex take_off_run_queue
2140 static /* inline */ StgTSO*
2141 take_off_run_queue(StgTSO *tso) {
2142   StgTSO *t, *prev;
2143
2144   /* 
2145      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2146
2147      if tso is specified, unlink that tso from the run_queue (doesn't have
2148      to be at the beginning of the queue); GranSim only 
2149   */
2150   if (tso!=END_TSO_QUEUE) {
2151     /* find tso in queue */
2152     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2153          t!=END_TSO_QUEUE && t!=tso;
2154          prev=t, t=t->link) 
2155       /* nothing */ ;
2156     ASSERT(t==tso);
2157     /* now actually dequeue the tso */
2158     if (prev!=END_TSO_QUEUE) {
2159       ASSERT(run_queue_hd!=t);
2160       prev->link = t->link;
2161     } else {
2162       /* t is at beginning of thread queue */
2163       ASSERT(run_queue_hd==t);
2164       run_queue_hd = t->link;
2165     }
2166     /* t is at end of thread queue */
2167     if (t->link==END_TSO_QUEUE) {
2168       ASSERT(t==run_queue_tl);
2169       run_queue_tl = prev;
2170     } else {
2171       ASSERT(run_queue_tl!=t);
2172     }
2173     t->link = END_TSO_QUEUE;
2174   } else {
2175     /* take tso from the beginning of the queue; std concurrent code */
2176     t = run_queue_hd;
2177     if (t != END_TSO_QUEUE) {
2178       run_queue_hd = t->link;
2179       t->link = END_TSO_QUEUE;
2180       if (run_queue_hd == END_TSO_QUEUE) {
2181         run_queue_tl = END_TSO_QUEUE;
2182       }
2183     }
2184   }
2185   return t;
2186 }
2187
2188 #endif /* 0 */
2189
2190 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2191 //@subsection Garbage Collextion Routines
2192
2193 /* ---------------------------------------------------------------------------
2194    Where are the roots that we know about?
2195
2196         - all the threads on the runnable queue
2197         - all the threads on the blocked queue
2198         - all the threads on the sleeping queue
2199         - all the thread currently executing a _ccall_GC
2200         - all the "main threads"
2201      
2202    ------------------------------------------------------------------------ */
2203
2204 /* This has to be protected either by the scheduler monitor, or by the
2205         garbage collection monitor (probably the latter).
2206         KH @ 25/10/99
2207 */
2208
2209 void
2210 GetRoots(evac_fn evac)
2211 {
2212   StgMainThread *m;
2213
2214 #if defined(GRAN)
2215   {
2216     nat i;
2217     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2218       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2219           evac((StgClosure **)&run_queue_hds[i]);
2220       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2221           evac((StgClosure **)&run_queue_tls[i]);
2222       
2223       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2224           evac((StgClosure **)&blocked_queue_hds[i]);
2225       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2226           evac((StgClosure **)&blocked_queue_tls[i]);
2227       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2228           evac((StgClosure **)&ccalling_threads[i]);
2229     }
2230   }
2231
2232   markEventQueue();
2233
2234 #else /* !GRAN */
2235   if (run_queue_hd != END_TSO_QUEUE) {
2236       ASSERT(run_queue_tl != END_TSO_QUEUE);
2237       evac((StgClosure **)&run_queue_hd);
2238       evac((StgClosure **)&run_queue_tl);
2239   }
2240   
2241   if (blocked_queue_hd != END_TSO_QUEUE) {
2242       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2243       evac((StgClosure **)&blocked_queue_hd);
2244       evac((StgClosure **)&blocked_queue_tl);
2245   }
2246   
2247   if (sleeping_queue != END_TSO_QUEUE) {
2248       evac((StgClosure **)&sleeping_queue);
2249   }
2250 #endif 
2251
2252   for (m = main_threads; m != NULL; m = m->link) {
2253       evac((StgClosure **)&m->tso);
2254   }
2255   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2256       evac((StgClosure **)&suspended_ccalling_threads);
2257   }
2258
2259 #if defined(SMP) || defined(PAR) || defined(GRAN)
2260   markSparkQueue(evac);
2261 #endif
2262 }
2263
2264 /* -----------------------------------------------------------------------------
2265    performGC
2266
2267    This is the interface to the garbage collector from Haskell land.
2268    We provide this so that external C code can allocate and garbage
2269    collect when called from Haskell via _ccall_GC.
2270
2271    It might be useful to provide an interface whereby the programmer
2272    can specify more roots (ToDo).
2273    
2274    This needs to be protected by the GC condition variable above.  KH.
2275    -------------------------------------------------------------------------- */
2276
2277 void (*extra_roots)(evac_fn);
2278
2279 void
2280 performGC(void)
2281 {
2282   GarbageCollect(GetRoots,rtsFalse);
2283 }
2284
2285 void
2286 performMajorGC(void)
2287 {
2288   GarbageCollect(GetRoots,rtsTrue);
2289 }
2290
2291 static void
2292 AllRoots(evac_fn evac)
2293 {
2294     GetRoots(evac);             // the scheduler's roots
2295     extra_roots(evac);          // the user's roots
2296 }
2297
2298 void
2299 performGCWithRoots(void (*get_roots)(evac_fn))
2300 {
2301   extra_roots = get_roots;
2302   GarbageCollect(AllRoots,rtsFalse);
2303 }
2304
2305 /* -----------------------------------------------------------------------------
2306    Stack overflow
2307
2308    If the thread has reached its maximum stack size, then raise the
2309    StackOverflow exception in the offending thread.  Otherwise
2310    relocate the TSO into a larger chunk of memory and adjust its stack
2311    size appropriately.
2312    -------------------------------------------------------------------------- */
2313
2314 static StgTSO *
2315 threadStackOverflow(StgTSO *tso)
2316 {
2317   nat new_stack_size, new_tso_size, diff, stack_words;
2318   StgPtr new_sp;
2319   StgTSO *dest;
2320
2321   IF_DEBUG(sanity,checkTSO(tso));
2322   if (tso->stack_size >= tso->max_stack_size) {
2323
2324     IF_DEBUG(gc,
2325              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2326                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2327              /* If we're debugging, just print out the top of the stack */
2328              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2329                                               tso->sp+64)));
2330
2331     /* Send this thread the StackOverflow exception */
2332     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2333     return tso;
2334   }
2335
2336   /* Try to double the current stack size.  If that takes us over the
2337    * maximum stack size for this thread, then use the maximum instead.
2338    * Finally round up so the TSO ends up as a whole number of blocks.
2339    */
2340   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2341   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2342                                        TSO_STRUCT_SIZE)/sizeof(W_);
2343   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2344   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2345
2346   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2347
2348   dest = (StgTSO *)allocate(new_tso_size);
2349   TICK_ALLOC_TSO(new_stack_size,0);
2350
2351   /* copy the TSO block and the old stack into the new area */
2352   memcpy(dest,tso,TSO_STRUCT_SIZE);
2353   stack_words = tso->stack + tso->stack_size - tso->sp;
2354   new_sp = (P_)dest + new_tso_size - stack_words;
2355   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2356
2357   /* relocate the stack pointers... */
2358   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2359   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2360   dest->sp    = new_sp;
2361   dest->stack_size = new_stack_size;
2362         
2363   /* and relocate the update frame list */
2364   relocate_stack(dest, diff);
2365
2366   /* Mark the old TSO as relocated.  We have to check for relocated
2367    * TSOs in the garbage collector and any primops that deal with TSOs.
2368    *
2369    * It's important to set the sp and su values to just beyond the end
2370    * of the stack, so we don't attempt to scavenge any part of the
2371    * dead TSO's stack.
2372    */
2373   tso->what_next = ThreadRelocated;
2374   tso->link = dest;
2375   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2376   tso->su = (StgUpdateFrame *)tso->sp;
2377   tso->why_blocked = NotBlocked;
2378   dest->mut_link = NULL;
2379
2380   IF_PAR_DEBUG(verbose,
2381                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2382                      tso->id, tso, tso->stack_size);
2383                /* If we're debugging, just print out the top of the stack */
2384                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2385                                                 tso->sp+64)));
2386   
2387   IF_DEBUG(sanity,checkTSO(tso));
2388 #if 0
2389   IF_DEBUG(scheduler,printTSO(dest));
2390 #endif
2391
2392   return dest;
2393 }
2394
2395 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2396 //@subsection Blocking Queue Routines
2397
2398 /* ---------------------------------------------------------------------------
2399    Wake up a queue that was blocked on some resource.
2400    ------------------------------------------------------------------------ */
2401
2402 #if defined(GRAN)
2403 static inline void
2404 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2405 {
2406 }
2407 #elif defined(PAR)
2408 static inline void
2409 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2410 {
2411   /* write RESUME events to log file and
2412      update blocked and fetch time (depending on type of the orig closure) */
2413   if (RtsFlags.ParFlags.ParStats.Full) {
2414     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2415                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2416                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2417     if (EMPTY_RUN_QUEUE())
2418       emitSchedule = rtsTrue;
2419
2420     switch (get_itbl(node)->type) {
2421         case FETCH_ME_BQ:
2422           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2423           break;
2424         case RBH:
2425         case FETCH_ME:
2426         case BLACKHOLE_BQ:
2427           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2428           break;
2429 #ifdef DIST
2430         case MVAR:
2431           break;
2432 #endif    
2433         default:
2434           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2435         }
2436       }
2437 }
2438 #endif
2439
2440 #if defined(GRAN)
2441 static StgBlockingQueueElement *
2442 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2443 {
2444     StgTSO *tso;
2445     PEs node_loc, tso_loc;
2446
2447     node_loc = where_is(node); // should be lifted out of loop
2448     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2449     tso_loc = where_is((StgClosure *)tso);
2450     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2451       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2452       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2453       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2454       // insertThread(tso, node_loc);
2455       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2456                 ResumeThread,
2457                 tso, node, (rtsSpark*)NULL);
2458       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2459       // len_local++;
2460       // len++;
2461     } else { // TSO is remote (actually should be FMBQ)
2462       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2463                                   RtsFlags.GranFlags.Costs.gunblocktime +
2464                                   RtsFlags.GranFlags.Costs.latency;
2465       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2466                 UnblockThread,
2467                 tso, node, (rtsSpark*)NULL);
2468       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2469       // len++;
2470     }
2471     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2472     IF_GRAN_DEBUG(bq,
2473                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2474                           (node_loc==tso_loc ? "Local" : "Global"), 
2475                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2476     tso->block_info.closure = NULL;
2477     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2478                              tso->id, tso));
2479 }
2480 #elif defined(PAR)
2481 static StgBlockingQueueElement *
2482 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2483 {
2484     StgBlockingQueueElement *next;
2485
2486     switch (get_itbl(bqe)->type) {
2487     case TSO:
2488       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2489       /* if it's a TSO just push it onto the run_queue */
2490       next = bqe->link;
2491       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2492       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2493       THREAD_RUNNABLE();
2494       unblockCount(bqe, node);
2495       /* reset blocking status after dumping event */
2496       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2497       break;
2498
2499     case BLOCKED_FETCH:
2500       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2501       next = bqe->link;
2502       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2503       PendingFetches = (StgBlockedFetch *)bqe;
2504       break;
2505
2506 # if defined(DEBUG)
2507       /* can ignore this case in a non-debugging setup; 
2508          see comments on RBHSave closures above */
2509     case CONSTR:
2510       /* check that the closure is an RBHSave closure */
2511       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2512              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2513              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2514       break;
2515
2516     default:
2517       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2518            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2519            (StgClosure *)bqe);
2520 # endif
2521     }
2522   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2523   return next;
2524 }
2525
2526 #else /* !GRAN && !PAR */
2527 static StgTSO *
2528 unblockOneLocked(StgTSO *tso)
2529 {
2530   StgTSO *next;
2531
2532   ASSERT(get_itbl(tso)->type == TSO);
2533   ASSERT(tso->why_blocked != NotBlocked);
2534   tso->why_blocked = NotBlocked;
2535   next = tso->link;
2536   PUSH_ON_RUN_QUEUE(tso);
2537   THREAD_RUNNABLE();
2538   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2539   return next;
2540 }
2541 #endif
2542
2543 #if defined(GRAN) || defined(PAR)
2544 inline StgBlockingQueueElement *
2545 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2546 {
2547   ACQUIRE_LOCK(&sched_mutex);
2548   bqe = unblockOneLocked(bqe, node);
2549   RELEASE_LOCK(&sched_mutex);
2550   return bqe;
2551 }
2552 #else
2553 inline StgTSO *
2554 unblockOne(StgTSO *tso)
2555 {
2556   ACQUIRE_LOCK(&sched_mutex);
2557   tso = unblockOneLocked(tso);
2558   RELEASE_LOCK(&sched_mutex);
2559   return tso;
2560 }
2561 #endif
2562
2563 #if defined(GRAN)
2564 void 
2565 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2566 {
2567   StgBlockingQueueElement *bqe;
2568   PEs node_loc;
2569   nat len = 0; 
2570
2571   IF_GRAN_DEBUG(bq, 
2572                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2573                       node, CurrentProc, CurrentTime[CurrentProc], 
2574                       CurrentTSO->id, CurrentTSO));
2575
2576   node_loc = where_is(node);
2577
2578   ASSERT(q == END_BQ_QUEUE ||
2579          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2580          get_itbl(q)->type == CONSTR); // closure (type constructor)
2581   ASSERT(is_unique(node));
2582
2583   /* FAKE FETCH: magically copy the node to the tso's proc;
2584      no Fetch necessary because in reality the node should not have been 
2585      moved to the other PE in the first place
2586   */
2587   if (CurrentProc!=node_loc) {
2588     IF_GRAN_DEBUG(bq, 
2589                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2590                         node, node_loc, CurrentProc, CurrentTSO->id, 
2591                         // CurrentTSO, where_is(CurrentTSO),
2592                         node->header.gran.procs));
2593     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2594     IF_GRAN_DEBUG(bq, 
2595                   belch("## new bitmask of node %p is %#x",
2596                         node, node->header.gran.procs));
2597     if (RtsFlags.GranFlags.GranSimStats.Global) {
2598       globalGranStats.tot_fake_fetches++;
2599     }
2600   }
2601
2602   bqe = q;
2603   // ToDo: check: ASSERT(CurrentProc==node_loc);
2604   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2605     //next = bqe->link;
2606     /* 
2607        bqe points to the current element in the queue
2608        next points to the next element in the queue
2609     */
2610     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2611     //tso_loc = where_is(tso);
2612     len++;
2613     bqe = unblockOneLocked(bqe, node);
2614   }
2615
2616   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2617      the closure to make room for the anchor of the BQ */
2618   if (bqe!=END_BQ_QUEUE) {
2619     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2620     /*
2621     ASSERT((info_ptr==&RBH_Save_0_info) ||
2622            (info_ptr==&RBH_Save_1_info) ||
2623            (info_ptr==&RBH_Save_2_info));
2624     */
2625     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2626     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2627     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2628
2629     IF_GRAN_DEBUG(bq,
2630                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2631                         node, info_type(node)));
2632   }
2633
2634   /* statistics gathering */
2635   if (RtsFlags.GranFlags.GranSimStats.Global) {
2636     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2637     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2638     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2639     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2640   }
2641   IF_GRAN_DEBUG(bq,
2642                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2643                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2644 }
2645 #elif defined(PAR)
2646 void 
2647 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2648 {
2649   StgBlockingQueueElement *bqe;
2650
2651   ACQUIRE_LOCK(&sched_mutex);
2652
2653   IF_PAR_DEBUG(verbose, 
2654                belch("##-_ AwBQ for node %p on [%x]: ",
2655                      node, mytid));
2656 #ifdef DIST  
2657   //RFP
2658   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2659     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2660     return;
2661   }
2662 #endif
2663   
2664   ASSERT(q == END_BQ_QUEUE ||
2665          get_itbl(q)->type == TSO ||           
2666          get_itbl(q)->type == BLOCKED_FETCH || 
2667          get_itbl(q)->type == CONSTR); 
2668
2669   bqe = q;
2670   while (get_itbl(bqe)->type==TSO || 
2671          get_itbl(bqe)->type==BLOCKED_FETCH) {
2672     bqe = unblockOneLocked(bqe, node);
2673   }
2674   RELEASE_LOCK(&sched_mutex);
2675 }
2676
2677 #else   /* !GRAN && !PAR */
2678 void
2679 awakenBlockedQueue(StgTSO *tso)
2680 {
2681   ACQUIRE_LOCK(&sched_mutex);
2682   while (tso != END_TSO_QUEUE) {
2683     tso = unblockOneLocked(tso);
2684   }
2685   RELEASE_LOCK(&sched_mutex);
2686 }
2687 #endif
2688
2689 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2690 //@subsection Exception Handling Routines
2691
2692 /* ---------------------------------------------------------------------------
2693    Interrupt execution
2694    - usually called inside a signal handler so it mustn't do anything fancy.   
2695    ------------------------------------------------------------------------ */
2696
2697 void
2698 interruptStgRts(void)
2699 {
2700     interrupted    = 1;
2701     context_switch = 1;
2702 }
2703
2704 /* -----------------------------------------------------------------------------
2705    Unblock a thread
2706
2707    This is for use when we raise an exception in another thread, which
2708    may be blocked.
2709    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2710    -------------------------------------------------------------------------- */
2711
2712 #if defined(GRAN) || defined(PAR)
2713 /*
2714   NB: only the type of the blocking queue is different in GranSim and GUM
2715       the operations on the queue-elements are the same
2716       long live polymorphism!
2717 */
2718 static void
2719 unblockThread(StgTSO *tso)
2720 {
2721   StgBlockingQueueElement *t, **last;
2722
2723   ACQUIRE_LOCK(&sched_mutex);
2724   switch (tso->why_blocked) {
2725
2726   case NotBlocked:
2727     return;  /* not blocked */
2728
2729   case BlockedOnMVar:
2730     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2731     {
2732       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2733       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2734
2735       last = (StgBlockingQueueElement **)&mvar->head;
2736       for (t = (StgBlockingQueueElement *)mvar->head; 
2737            t != END_BQ_QUEUE; 
2738            last = &t->link, last_tso = t, t = t->link) {
2739         if (t == (StgBlockingQueueElement *)tso) {
2740           *last = (StgBlockingQueueElement *)tso->link;
2741           if (mvar->tail == tso) {
2742             mvar->tail = (StgTSO *)last_tso;
2743           }
2744           goto done;
2745         }
2746       }
2747       barf("unblockThread (MVAR): TSO not found");
2748     }
2749
2750   case BlockedOnBlackHole:
2751     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2752     {
2753       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2754
2755       last = &bq->blocking_queue;
2756       for (t = bq->blocking_queue; 
2757            t != END_BQ_QUEUE; 
2758            last = &t->link, t = t->link) {
2759         if (t == (StgBlockingQueueElement *)tso) {
2760           *last = (StgBlockingQueueElement *)tso->link;
2761           goto done;
2762         }
2763       }
2764       barf("unblockThread (BLACKHOLE): TSO not found");
2765     }
2766
2767   case BlockedOnException:
2768     {
2769       StgTSO *target  = tso->block_info.tso;
2770
2771       ASSERT(get_itbl(target)->type == TSO);
2772
2773       if (target->what_next == ThreadRelocated) {
2774           target = target->link;
2775           ASSERT(get_itbl(target)->type == TSO);
2776       }
2777
2778       ASSERT(target->blocked_exceptions != NULL);
2779
2780       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2781       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2782            t != END_BQ_QUEUE; 
2783            last = &t->link, t = t->link) {
2784         ASSERT(get_itbl(t)->type == TSO);
2785         if (t == (StgBlockingQueueElement *)tso) {
2786           *last = (StgBlockingQueueElement *)tso->link;
2787           goto done;
2788         }
2789       }
2790       barf("unblockThread (Exception): TSO not found");
2791     }
2792
2793   case BlockedOnRead:
2794   case BlockedOnWrite:
2795     {
2796       /* take TSO off blocked_queue */
2797       StgBlockingQueueElement *prev = NULL;
2798       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2799            prev = t, t = t->link) {
2800         if (t == (StgBlockingQueueElement *)tso) {
2801           if (prev == NULL) {
2802             blocked_queue_hd = (StgTSO *)t->link;
2803             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2804               blocked_queue_tl = END_TSO_QUEUE;
2805             }
2806           } else {
2807             prev->link = t->link;
2808             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2809               blocked_queue_tl = (StgTSO *)prev;
2810             }
2811           }
2812           goto done;
2813         }
2814       }
2815       barf("unblockThread (I/O): TSO not found");
2816     }
2817
2818   case BlockedOnDelay:
2819     {
2820       /* take TSO off sleeping_queue */
2821       StgBlockingQueueElement *prev = NULL;
2822       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2823            prev = t, t = t->link) {
2824         if (t == (StgBlockingQueueElement *)tso) {
2825           if (prev == NULL) {
2826             sleeping_queue = (StgTSO *)t->link;
2827           } else {
2828             prev->link = t->link;
2829           }
2830           goto done;
2831         }
2832       }
2833       barf("unblockThread (I/O): TSO not found");
2834     }
2835
2836   default:
2837     barf("unblockThread");
2838   }
2839
2840  done:
2841   tso->link = END_TSO_QUEUE;
2842   tso->why_blocked = NotBlocked;
2843   tso->block_info.closure = NULL;
2844   PUSH_ON_RUN_QUEUE(tso);
2845   RELEASE_LOCK(&sched_mutex);
2846 }
2847 #else
2848 static void
2849 unblockThread(StgTSO *tso)
2850 {
2851   StgTSO *t, **last;
2852
2853   ACQUIRE_LOCK(&sched_mutex);
2854   switch (tso->why_blocked) {
2855
2856   case NotBlocked:
2857     return;  /* not blocked */
2858
2859   case BlockedOnMVar:
2860     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2861     {
2862       StgTSO *last_tso = END_TSO_QUEUE;
2863       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2864
2865       last = &mvar->head;
2866       for (t = mvar->head; t != END_TSO_QUEUE; 
2867            last = &t->link, last_tso = t, t = t->link) {
2868         if (t == tso) {
2869           *last = tso->link;
2870           if (mvar->tail == tso) {
2871             mvar->tail = last_tso;
2872           }
2873           goto done;
2874         }
2875       }
2876       barf("unblockThread (MVAR): TSO not found");
2877     }
2878
2879   case BlockedOnBlackHole:
2880     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2881     {
2882       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2883
2884       last = &bq->blocking_queue;
2885       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2886            last = &t->link, t = t->link) {
2887         if (t == tso) {
2888           *last = tso->link;
2889           goto done;
2890         }
2891       }
2892       barf("unblockThread (BLACKHOLE): TSO not found");
2893     }
2894
2895   case BlockedOnException:
2896     {
2897       StgTSO *target  = tso->block_info.tso;
2898
2899       ASSERT(get_itbl(target)->type == TSO);
2900
2901       while (target->what_next == ThreadRelocated) {
2902           target = target->link;
2903           ASSERT(get_itbl(target)->type == TSO);
2904       }
2905       
2906       ASSERT(target->blocked_exceptions != NULL);
2907
2908       last = &target->blocked_exceptions;
2909       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2910            last = &t->link, t = t->link) {
2911         ASSERT(get_itbl(t)->type == TSO);
2912         if (t == tso) {
2913           *last = tso->link;
2914           goto done;
2915         }
2916       }
2917       barf("unblockThread (Exception): TSO not found");
2918     }
2919
2920   case BlockedOnRead:
2921   case BlockedOnWrite:
2922     {
2923       StgTSO *prev = NULL;
2924       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2925            prev = t, t = t->link) {
2926         if (t == tso) {
2927           if (prev == NULL) {
2928             blocked_queue_hd = t->link;
2929             if (blocked_queue_tl == t) {
2930               blocked_queue_tl = END_TSO_QUEUE;
2931             }
2932           } else {
2933             prev->link = t->link;
2934             if (blocked_queue_tl == t) {
2935               blocked_queue_tl = prev;
2936             }
2937           }
2938           goto done;
2939         }
2940       }
2941       barf("unblockThread (I/O): TSO not found");
2942     }
2943
2944   case BlockedOnDelay:
2945     {
2946       StgTSO *prev = NULL;
2947       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2948            prev = t, t = t->link) {
2949         if (t == tso) {
2950           if (prev == NULL) {
2951             sleeping_queue = t->link;
2952           } else {
2953             prev->link = t->link;
2954           }
2955           goto done;
2956         }
2957       }
2958       barf("unblockThread (I/O): TSO not found");
2959     }
2960
2961   default:
2962     barf("unblockThread");
2963   }
2964
2965  done:
2966   tso->link = END_TSO_QUEUE;
2967   tso->why_blocked = NotBlocked;
2968   tso->block_info.closure = NULL;
2969   PUSH_ON_RUN_QUEUE(tso);
2970   RELEASE_LOCK(&sched_mutex);
2971 }
2972 #endif
2973
2974 /* -----------------------------------------------------------------------------
2975  * raiseAsync()
2976  *
2977  * The following function implements the magic for raising an
2978  * asynchronous exception in an existing thread.
2979  *
2980  * We first remove the thread from any queue on which it might be
2981  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2982  *
2983  * We strip the stack down to the innermost CATCH_FRAME, building
2984  * thunks in the heap for all the active computations, so they can 
2985  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2986  * an application of the handler to the exception, and push it on
2987  * the top of the stack.
2988  * 
2989  * How exactly do we save all the active computations?  We create an
2990  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2991  * AP_UPDs pushes everything from the corresponding update frame
2992  * upwards onto the stack.  (Actually, it pushes everything up to the
2993  * next update frame plus a pointer to the next AP_UPD object.
2994  * Entering the next AP_UPD object pushes more onto the stack until we
2995  * reach the last AP_UPD object - at which point the stack should look
2996  * exactly as it did when we killed the TSO and we can continue
2997  * execution by entering the closure on top of the stack.
2998  *
2999  * We can also kill a thread entirely - this happens if either (a) the 
3000  * exception passed to raiseAsync is NULL, or (b) there's no
3001  * CATCH_FRAME on the stack.  In either case, we strip the entire
3002  * stack and replace the thread with a zombie.
3003  *
3004  * -------------------------------------------------------------------------- */
3005  
3006 void 
3007 deleteThread(StgTSO *tso)
3008 {
3009   raiseAsync(tso,NULL);
3010 }
3011
3012 void
3013 raiseAsync(StgTSO *tso, StgClosure *exception)
3014 {
3015   StgUpdateFrame* su = tso->su;
3016   StgPtr          sp = tso->sp;
3017   
3018   /* Thread already dead? */
3019   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3020     return;
3021   }
3022
3023   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3024
3025   /* Remove it from any blocking queues */
3026   unblockThread(tso);
3027
3028   /* The stack freezing code assumes there's a closure pointer on
3029    * the top of the stack.  This isn't always the case with compiled
3030    * code, so we have to push a dummy closure on the top which just
3031    * returns to the next return address on the stack.
3032    */
3033   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3034     *(--sp) = (W_)&stg_dummy_ret_closure;
3035   }
3036
3037   while (1) {
3038     nat words = ((P_)su - (P_)sp) - 1;
3039     nat i;
3040     StgAP_UPD * ap;
3041
3042     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3043      * then build PAP(handler,exception,realworld#), and leave it on
3044      * top of the stack ready to enter.
3045      */
3046     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3047       StgCatchFrame *cf = (StgCatchFrame *)su;
3048       /* we've got an exception to raise, so let's pass it to the
3049        * handler in this frame.
3050        */
3051       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3052       TICK_ALLOC_UPD_PAP(3,0);
3053       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3054               
3055       ap->n_args = 2;
3056       ap->fun = cf->handler;    /* :: Exception -> IO a */
3057       ap->payload[0] = exception;
3058       ap->payload[1] = ARG_TAG(0); /* realworld token */
3059
3060       /* throw away the stack from Sp up to and including the
3061        * CATCH_FRAME.
3062        */
3063       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3064       tso->su = cf->link;
3065
3066       /* Restore the blocked/unblocked state for asynchronous exceptions
3067        * at the CATCH_FRAME.  
3068        *
3069        * If exceptions were unblocked at the catch, arrange that they
3070        * are unblocked again after executing the handler by pushing an
3071        * unblockAsyncExceptions_ret stack frame.
3072        */
3073       if (!cf->exceptions_blocked) {
3074         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3075       }
3076       
3077       /* Ensure that async exceptions are blocked when running the handler.
3078        */
3079       if (tso->blocked_exceptions == NULL) {
3080         tso->blocked_exceptions = END_TSO_QUEUE;
3081       }
3082       
3083       /* Put the newly-built PAP on top of the stack, ready to execute
3084        * when the thread restarts.
3085        */
3086       sp[0] = (W_)ap;
3087       tso->sp = sp;
3088       tso->what_next = ThreadEnterGHC;
3089       IF_DEBUG(sanity, checkTSO(tso));
3090       return;
3091     }
3092
3093     /* First build an AP_UPD consisting of the stack chunk above the
3094      * current update frame, with the top word on the stack as the
3095      * fun field.
3096      */
3097     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3098     
3099     ASSERT(words >= 0);
3100     
3101     ap->n_args = words;
3102     ap->fun    = (StgClosure *)sp[0];
3103     sp++;
3104     for(i=0; i < (nat)words; ++i) {
3105       ap->payload[i] = (StgClosure *)*sp++;
3106     }
3107     
3108     switch (get_itbl(su)->type) {
3109       
3110     case UPDATE_FRAME:
3111       {
3112         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3113         TICK_ALLOC_UP_THK(words+1,0);
3114         
3115         IF_DEBUG(scheduler,
3116                  fprintf(stderr,  "scheduler: Updating ");
3117                  printPtr((P_)su->updatee); 
3118                  fprintf(stderr,  " with ");
3119                  printObj((StgClosure *)ap);
3120                  );
3121         
3122         /* Replace the updatee with an indirection - happily
3123          * this will also wake up any threads currently
3124          * waiting on the result.
3125          *
3126          * Warning: if we're in a loop, more than one update frame on
3127          * the stack may point to the same object.  Be careful not to
3128          * overwrite an IND_OLDGEN in this case, because we'll screw
3129          * up the mutable lists.  To be on the safe side, don't
3130          * overwrite any kind of indirection at all.  See also
3131          * threadSqueezeStack in GC.c, where we have to make a similar
3132          * check.
3133          */
3134         if (!closure_IND(su->updatee)) {
3135             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3136         }
3137         su = su->link;
3138         sp += sizeofW(StgUpdateFrame) -1;
3139         sp[0] = (W_)ap; /* push onto stack */
3140         break;
3141       }
3142
3143     case CATCH_FRAME:
3144       {
3145         StgCatchFrame *cf = (StgCatchFrame *)su;
3146         StgClosure* o;
3147         
3148         /* We want a PAP, not an AP_UPD.  Fortunately, the
3149          * layout's the same.
3150          */
3151         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3152         TICK_ALLOC_UPD_PAP(words+1,0);
3153         
3154         /* now build o = FUN(catch,ap,handler) */
3155         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3156         TICK_ALLOC_FUN(2,0);
3157         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3158         o->payload[0] = (StgClosure *)ap;
3159         o->payload[1] = cf->handler;
3160         
3161         IF_DEBUG(scheduler,
3162                  fprintf(stderr,  "scheduler: Built ");
3163                  printObj((StgClosure *)o);
3164                  );
3165         
3166         /* pop the old handler and put o on the stack */
3167         su = cf->link;
3168         sp += sizeofW(StgCatchFrame) - 1;
3169         sp[0] = (W_)o;
3170         break;
3171       }
3172       
3173     case SEQ_FRAME:
3174       {
3175         StgSeqFrame *sf = (StgSeqFrame *)su;
3176         StgClosure* o;
3177         
3178         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3179         TICK_ALLOC_UPD_PAP(words+1,0);
3180         
3181         /* now build o = FUN(seq,ap) */
3182         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3183         TICK_ALLOC_SE_THK(1,0);
3184         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3185         o->payload[0] = (StgClosure *)ap;
3186         
3187         IF_DEBUG(scheduler,
3188                  fprintf(stderr,  "scheduler: Built ");
3189                  printObj((StgClosure *)o);
3190                  );
3191         
3192         /* pop the old handler and put o on the stack */
3193         su = sf->link;
3194         sp += sizeofW(StgSeqFrame) - 1;
3195         sp[0] = (W_)o;
3196         break;
3197       }
3198       
3199     case STOP_FRAME:
3200       /* We've stripped the entire stack, the thread is now dead. */
3201       sp += sizeofW(StgStopFrame) - 1;
3202       sp[0] = (W_)exception;    /* save the exception */
3203       tso->what_next = ThreadKilled;
3204       tso->su = (StgUpdateFrame *)(sp+1);
3205       tso->sp = sp;
3206       return;
3207
3208     default:
3209       barf("raiseAsync");
3210     }
3211   }
3212   barf("raiseAsync");
3213 }
3214
3215 /* -----------------------------------------------------------------------------
3216    resurrectThreads is called after garbage collection on the list of
3217    threads found to be garbage.  Each of these threads will be woken
3218    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3219    on an MVar, or NonTermination if the thread was blocked on a Black
3220    Hole.
3221    -------------------------------------------------------------------------- */
3222
3223 void
3224 resurrectThreads( StgTSO *threads )
3225 {
3226   StgTSO *tso, *next;
3227
3228   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3229     next = tso->global_link;
3230     tso->global_link = all_threads;
3231     all_threads = tso;
3232     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3233
3234     switch (tso->why_blocked) {
3235     case BlockedOnMVar:
3236     case BlockedOnException:
3237       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3238       break;
3239     case BlockedOnBlackHole:
3240       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3241       break;
3242     case NotBlocked:
3243       /* This might happen if the thread was blocked on a black hole
3244        * belonging to a thread that we've just woken up (raiseAsync
3245        * can wake up threads, remember...).
3246        */
3247       continue;
3248     default:
3249       barf("resurrectThreads: thread blocked in a strange way");
3250     }
3251   }
3252 }
3253
3254 /* -----------------------------------------------------------------------------
3255  * Blackhole detection: if we reach a deadlock, test whether any
3256  * threads are blocked on themselves.  Any threads which are found to
3257  * be self-blocked get sent a NonTermination exception.
3258  *
3259  * This is only done in a deadlock situation in order to avoid
3260  * performance overhead in the normal case.
3261  * -------------------------------------------------------------------------- */
3262
3263 static void
3264 detectBlackHoles( void )
3265 {
3266     StgTSO *t = all_threads;
3267     StgUpdateFrame *frame;
3268     StgClosure *blocked_on;
3269
3270     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3271
3272         while (t->what_next == ThreadRelocated) {
3273             t = t->link;
3274             ASSERT(get_itbl(t)->type == TSO);
3275         }
3276       
3277         if (t->why_blocked != BlockedOnBlackHole) {
3278             continue;
3279         }
3280
3281         blocked_on = t->block_info.closure;
3282
3283         for (frame = t->su; ; frame = frame->link) {
3284             switch (get_itbl(frame)->type) {
3285
3286             case UPDATE_FRAME:
3287                 if (frame->updatee == blocked_on) {
3288                     /* We are blocking on one of our own computations, so
3289                      * send this thread the NonTermination exception.  
3290                      */
3291                     IF_DEBUG(scheduler, 
3292                              sched_belch("thread %d is blocked on itself", t->id));
3293                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3294                     goto done;
3295                 }
3296                 else {
3297                     continue;
3298                 }
3299
3300             case CATCH_FRAME:
3301             case SEQ_FRAME:
3302                 continue;
3303                 
3304             case STOP_FRAME:
3305                 break;
3306             }
3307             break;
3308         }
3309
3310     done: ;
3311     }   
3312 }
3313
3314 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3315 //@subsection Debugging Routines
3316
3317 /* -----------------------------------------------------------------------------
3318    Debugging: why is a thread blocked
3319    -------------------------------------------------------------------------- */
3320
3321 #ifdef DEBUG
3322
3323 void
3324 printThreadBlockage(StgTSO *tso)
3325 {
3326   switch (tso->why_blocked) {
3327   case BlockedOnRead:
3328     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3329     break;
3330   case BlockedOnWrite:
3331     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3332     break;
3333   case BlockedOnDelay:
3334     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3335     break;
3336   case BlockedOnMVar:
3337     fprintf(stderr,"is blocked on an MVar");
3338     break;
3339   case BlockedOnException:
3340     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3341             tso->block_info.tso->id);
3342     break;
3343   case BlockedOnBlackHole:
3344     fprintf(stderr,"is blocked on a black hole");
3345     break;
3346   case NotBlocked:
3347     fprintf(stderr,"is not blocked");
3348     break;
3349 #if defined(PAR)
3350   case BlockedOnGA:
3351     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3352             tso->block_info.closure, info_type(tso->block_info.closure));
3353     break;
3354   case BlockedOnGA_NoSend:
3355     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3356             tso->block_info.closure, info_type(tso->block_info.closure));
3357     break;
3358 #endif
3359   default:
3360     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3361          tso->why_blocked, tso->id, tso);
3362   }
3363 }
3364
3365 void
3366 printThreadStatus(StgTSO *tso)
3367 {
3368   switch (tso->what_next) {
3369   case ThreadKilled:
3370     fprintf(stderr,"has been killed");
3371     break;
3372   case ThreadComplete:
3373     fprintf(stderr,"has completed");
3374     break;
3375   default:
3376     printThreadBlockage(tso);
3377   }
3378 }
3379
3380 void
3381 printAllThreads(void)
3382 {
3383   StgTSO *t;
3384
3385 # if defined(GRAN)
3386   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3387   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3388                        time_string, rtsFalse/*no commas!*/);
3389
3390   sched_belch("all threads at [%s]:", time_string);
3391 # elif defined(PAR)
3392   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3393   ullong_format_string(CURRENT_TIME,
3394                        time_string, rtsFalse/*no commas!*/);
3395
3396   sched_belch("all threads at [%s]:", time_string);
3397 # else
3398   sched_belch("all threads:");
3399 # endif
3400
3401   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3402     fprintf(stderr, "\tthread %d ", t->id);
3403     printThreadStatus(t);
3404     fprintf(stderr,"\n");
3405   }
3406 }
3407     
3408 /* 
3409    Print a whole blocking queue attached to node (debugging only).
3410 */
3411 //@cindex print_bq
3412 # if defined(PAR)
3413 void 
3414 print_bq (StgClosure *node)
3415 {
3416   StgBlockingQueueElement *bqe;
3417   StgTSO *tso;
3418   rtsBool end;
3419
3420   fprintf(stderr,"## BQ of closure %p (%s): ",
3421           node, info_type(node));
3422
3423   /* should cover all closures that may have a blocking queue */
3424   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3425          get_itbl(node)->type == FETCH_ME_BQ ||
3426          get_itbl(node)->type == RBH ||
3427          get_itbl(node)->type == MVAR);
3428     
3429   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3430
3431   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3432 }
3433
3434 /* 
3435    Print a whole blocking queue starting with the element bqe.
3436 */
3437 void 
3438 print_bqe (StgBlockingQueueElement *bqe)
3439 {
3440   rtsBool end;
3441
3442   /* 
3443      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3444   */
3445   for (end = (bqe==END_BQ_QUEUE);
3446        !end; // iterate until bqe points to a CONSTR
3447        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3448        bqe = end ? END_BQ_QUEUE : bqe->link) {
3449     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3450     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3451     /* types of closures that may appear in a blocking queue */
3452     ASSERT(get_itbl(bqe)->type == TSO ||           
3453            get_itbl(bqe)->type == BLOCKED_FETCH || 
3454            get_itbl(bqe)->type == CONSTR); 
3455     /* only BQs of an RBH end with an RBH_Save closure */
3456     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3457
3458     switch (get_itbl(bqe)->type) {
3459     case TSO:
3460       fprintf(stderr," TSO %u (%x),",
3461               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3462       break;
3463     case BLOCKED_FETCH:
3464       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3465               ((StgBlockedFetch *)bqe)->node, 
3466               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3467               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3468               ((StgBlockedFetch *)bqe)->ga.weight);
3469       break;
3470     case CONSTR:
3471       fprintf(stderr," %s (IP %p),",
3472               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3473                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3474                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3475                "RBH_Save_?"), get_itbl(bqe));
3476       break;
3477     default:
3478       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3479            info_type((StgClosure *)bqe)); // , node, info_type(node));
3480       break;
3481     }
3482   } /* for */
3483   fputc('\n', stderr);
3484 }
3485 # elif defined(GRAN)
3486 void 
3487 print_bq (StgClosure *node)
3488 {
3489   StgBlockingQueueElement *bqe;
3490   PEs node_loc, tso_loc;
3491   rtsBool end;
3492
3493   /* should cover all closures that may have a blocking queue */
3494   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3495          get_itbl(node)->type == FETCH_ME_BQ ||
3496          get_itbl(node)->type == RBH);
3497     
3498   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3499   node_loc = where_is(node);
3500
3501   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3502           node, info_type(node), node_loc);
3503
3504   /* 
3505      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3506   */
3507   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3508        !end; // iterate until bqe points to a CONSTR
3509        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3510     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3511     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3512     /* types of closures that may appear in a blocking queue */
3513     ASSERT(get_itbl(bqe)->type == TSO ||           
3514            get_itbl(bqe)->type == CONSTR); 
3515     /* only BQs of an RBH end with an RBH_Save closure */
3516     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3517
3518     tso_loc = where_is((StgClosure *)bqe);
3519     switch (get_itbl(bqe)->type) {
3520     case TSO:
3521       fprintf(stderr," TSO %d (%p) on [PE %d],",
3522               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3523       break;
3524     case CONSTR:
3525       fprintf(stderr," %s (IP %p),",
3526               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3527                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3528                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3529                "RBH_Save_?"), get_itbl(bqe));
3530       break;
3531     default:
3532       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3533            info_type((StgClosure *)bqe), node, info_type(node));
3534       break;
3535     }
3536   } /* for */
3537   fputc('\n', stderr);
3538 }
3539 #else
3540 /* 
3541    Nice and easy: only TSOs on the blocking queue
3542 */
3543 void 
3544 print_bq (StgClosure *node)
3545 {
3546   StgTSO *tso;
3547
3548   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3549   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3550        tso != END_TSO_QUEUE; 
3551        tso=tso->link) {
3552     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3553     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3554     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3555   }
3556   fputc('\n', stderr);
3557 }
3558 # endif
3559
3560 #if defined(PAR)
3561 static nat
3562 run_queue_len(void)
3563 {
3564   nat i;
3565   StgTSO *tso;
3566
3567   for (i=0, tso=run_queue_hd; 
3568        tso != END_TSO_QUEUE;
3569        i++, tso=tso->link)
3570     /* nothing */
3571
3572   return i;
3573 }
3574 #endif
3575
3576 static void
3577 sched_belch(char *s, ...)
3578 {
3579   va_list ap;
3580   va_start(ap,s);
3581 #ifdef SMP
3582   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3583 #elif defined(PAR)
3584   fprintf(stderr, "== ");
3585 #else
3586   fprintf(stderr, "scheduler: ");
3587 #endif
3588   vfprintf(stderr, s, ap);
3589   fprintf(stderr, "\n");
3590 }
3591
3592 #endif /* DEBUG */
3593
3594
3595 //@node Index,  , Debugging Routines, Main scheduling code
3596 //@subsection Index
3597
3598 //@index
3599 //* MainRegTable::  @cindex\s-+MainRegTable
3600 //* StgMainThread::  @cindex\s-+StgMainThread
3601 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3602 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3603 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3604 //* context_switch::  @cindex\s-+context_switch
3605 //* createThread::  @cindex\s-+createThread
3606 //* free_capabilities::  @cindex\s-+free_capabilities
3607 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3608 //* initScheduler::  @cindex\s-+initScheduler
3609 //* interrupted::  @cindex\s-+interrupted
3610 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3611 //* next_thread_id::  @cindex\s-+next_thread_id
3612 //* print_bq::  @cindex\s-+print_bq
3613 //* run_queue_hd::  @cindex\s-+run_queue_hd
3614 //* run_queue_tl::  @cindex\s-+run_queue_tl
3615 //* sched_mutex::  @cindex\s-+sched_mutex
3616 //* schedule::  @cindex\s-+schedule
3617 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3618 //* task_ids::  @cindex\s-+task_ids
3619 //* term_mutex::  @cindex\s-+term_mutex
3620 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3621 //@end index