[project @ 2001-11-08 12:46:31 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.105 2001/11/08 12:46:31 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "PosixSource.h"
78 #include "Rts.h"
79 #include "SchedAPI.h"
80 #include "RtsUtils.h"
81 #include "RtsFlags.h"
82 #include "Storage.h"
83 #include "StgRun.h"
84 #include "StgStartup.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #if defined(GRAN) || defined(PAR)
99 # include "GranSimRts.h"
100 # include "GranSim.h"
101 # include "ParallelRts.h"
102 # include "Parallel.h"
103 # include "ParallelDebug.h"
104 # include "FetchMe.h"
105 # include "HLC.h"
106 #endif
107 #include "Sparks.h"
108
109 #include <stdarg.h>
110
111 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
112 //@subsection Variables and Data structures
113
114 /* Main threads:
115  *
116  * These are the threads which clients have requested that we run.  
117  *
118  * In an SMP build, we might have several concurrent clients all
119  * waiting for results, and each one will wait on a condition variable
120  * until the result is available.
121  *
122  * In non-SMP, clients are strictly nested: the first client calls
123  * into the RTS, which might call out again to C with a _ccall_GC, and
124  * eventually re-enter the RTS.
125  *
126  * Main threads information is kept in a linked list:
127  */
128 //@cindex StgMainThread
129 typedef struct StgMainThread_ {
130   StgTSO *         tso;
131   SchedulerStatus  stat;
132   StgClosure **    ret;
133 #ifdef SMP
134   pthread_cond_t wakeup;
135 #endif
136   struct StgMainThread_ *link;
137 } StgMainThread;
138
139 /* Main thread queue.
140  * Locks required: sched_mutex.
141  */
142 static StgMainThread *main_threads;
143
144 /* Thread queues.
145  * Locks required: sched_mutex.
146  */
147 #if defined(GRAN)
148
149 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
150 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
151
152 /* 
153    In GranSim we have a runable and a blocked queue for each processor.
154    In order to minimise code changes new arrays run_queue_hds/tls
155    are created. run_queue_hd is then a short cut (macro) for
156    run_queue_hds[CurrentProc] (see GranSim.h).
157    -- HWL
158 */
159 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
160 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
161 StgTSO *ccalling_threadss[MAX_PROC];
162 /* We use the same global list of threads (all_threads) in GranSim as in
163    the std RTS (i.e. we are cheating). However, we don't use this list in
164    the GranSim specific code at the moment (so we are only potentially
165    cheating).  */
166
167 #else /* !GRAN */
168
169 StgTSO *run_queue_hd, *run_queue_tl;
170 StgTSO *blocked_queue_hd, *blocked_queue_tl;
171 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
172
173 #endif
174
175 /* Linked list of all threads.
176  * Used for detecting garbage collected threads.
177  */
178 StgTSO *all_threads;
179
180 /* Threads suspended in _ccall_GC.
181  */
182 static StgTSO *suspended_ccalling_threads;
183
184 static void GetRoots(evac_fn);
185 static StgTSO *threadStackOverflow(StgTSO *tso);
186
187 /* KH: The following two flags are shared memory locations.  There is no need
188        to lock them, since they are only unset at the end of a scheduler
189        operation.
190 */
191
192 /* flag set by signal handler to precipitate a context switch */
193 //@cindex context_switch
194 nat context_switch;
195
196 /* if this flag is set as well, give up execution */
197 //@cindex interrupted
198 rtsBool interrupted;
199
200 /* Next thread ID to allocate.
201  * Locks required: sched_mutex
202  */
203 //@cindex next_thread_id
204 StgThreadID next_thread_id = 1;
205
206 /*
207  * Pointers to the state of the current thread.
208  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
209  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
210  */
211  
212 /* The smallest stack size that makes any sense is:
213  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
214  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
215  *  + 1                       (the realworld token for an IO thread)
216  *  + 1                       (the closure to enter)
217  *
218  * A thread with this stack will bomb immediately with a stack
219  * overflow, which will increase its stack size.  
220  */
221
222 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
223
224 /* Free capability list.
225  * Locks required: sched_mutex.
226  */
227 #ifdef SMP
228 Capability *free_capabilities; /* Available capabilities for running threads */
229 nat n_free_capabilities;       /* total number of available capabilities */
230 #else
231 Capability MainCapability;     /* for non-SMP, we have one global capability */
232 #endif
233
234 #if defined(GRAN)
235 StgTSO *CurrentTSO;
236 #endif
237
238 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
239  *  exists - earlier gccs apparently didn't.
240  *  -= chak
241  */
242 StgTSO dummy_tso;
243
244 rtsBool ready_to_gc;
245
246 /* All our current task ids, saved in case we need to kill them later.
247  */
248 #ifdef SMP
249 //@cindex task_ids
250 task_info *task_ids;
251 #endif
252
253 void            addToBlockedQueue ( StgTSO *tso );
254
255 static void     schedule          ( void );
256        void     interruptStgRts   ( void );
257 #if defined(GRAN)
258 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
259 #else
260 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
261 #endif
262
263 static void     detectBlackHoles  ( void );
264
265 #ifdef DEBUG
266 static void sched_belch(char *s, ...);
267 #endif
268
269 #ifdef SMP
270 //@cindex sched_mutex
271 //@cindex term_mutex
272 //@cindex thread_ready_cond
273 //@cindex gc_pending_cond
274 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
275 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
276 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
277 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
278
279 nat await_death;
280 #endif
281
282 #if defined(PAR)
283 StgTSO *LastTSO;
284 rtsTime TimeOfLastYield;
285 rtsBool emitSchedule = rtsTrue;
286 #endif
287
288 #if DEBUG
289 char *whatNext_strs[] = {
290   "ThreadEnterGHC",
291   "ThreadRunGHC",
292   "ThreadEnterInterp",
293   "ThreadKilled",
294   "ThreadComplete"
295 };
296
297 char *threadReturnCode_strs[] = {
298   "HeapOverflow",                       /* might also be StackOverflow */
299   "StackOverflow",
300   "ThreadYielding",
301   "ThreadBlocked",
302   "ThreadFinished"
303 };
304 #endif
305
306 #ifdef PAR
307 StgTSO * createSparkThread(rtsSpark spark);
308 StgTSO * activateSpark (rtsSpark spark);  
309 #endif
310
311 /*
312  * The thread state for the main thread.
313 // ToDo: check whether not needed any more
314 StgTSO   *MainTSO;
315  */
316
317 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
318 //@subsection Main scheduling loop
319
320 /* ---------------------------------------------------------------------------
321    Main scheduling loop.
322
323    We use round-robin scheduling, each thread returning to the
324    scheduler loop when one of these conditions is detected:
325
326       * out of heap space
327       * timer expires (thread yields)
328       * thread blocks
329       * thread ends
330       * stack overflow
331
332    Locking notes:  we acquire the scheduler lock once at the beginning
333    of the scheduler loop, and release it when
334     
335       * running a thread, or
336       * waiting for work, or
337       * waiting for a GC to complete.
338
339    GRAN version:
340      In a GranSim setup this loop iterates over the global event queue.
341      This revolves around the global event queue, which determines what 
342      to do next. Therefore, it's more complicated than either the 
343      concurrent or the parallel (GUM) setup.
344
345    GUM version:
346      GUM iterates over incoming messages.
347      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
348      and sends out a fish whenever it has nothing to do; in-between
349      doing the actual reductions (shared code below) it processes the
350      incoming messages and deals with delayed operations 
351      (see PendingFetches).
352      This is not the ugliest code you could imagine, but it's bloody close.
353
354    ------------------------------------------------------------------------ */
355 //@cindex schedule
356 static void
357 schedule( void )
358 {
359   StgTSO *t;
360   Capability *cap;
361   StgThreadReturnCode ret;
362 #if defined(GRAN)
363   rtsEvent *event;
364 #elif defined(PAR)
365   StgSparkPool *pool;
366   rtsSpark spark;
367   StgTSO *tso;
368   GlobalTaskId pe;
369   rtsBool receivedFinish = rtsFalse;
370 # if defined(DEBUG)
371   nat tp_size, sp_size; // stats only
372 # endif
373 #endif
374   rtsBool was_interrupted = rtsFalse;
375   
376   ACQUIRE_LOCK(&sched_mutex);
377
378 #if defined(GRAN)
379
380   /* set up first event to get things going */
381   /* ToDo: assign costs for system setup and init MainTSO ! */
382   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
383             ContinueThread, 
384             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
385
386   IF_DEBUG(gran,
387            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
388            G_TSO(CurrentTSO, 5));
389
390   if (RtsFlags.GranFlags.Light) {
391     /* Save current time; GranSim Light only */
392     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
393   }      
394
395   event = get_next_event();
396
397   while (event!=(rtsEvent*)NULL) {
398     /* Choose the processor with the next event */
399     CurrentProc = event->proc;
400     CurrentTSO = event->tso;
401
402 #elif defined(PAR)
403
404   while (!receivedFinish) {    /* set by processMessages */
405                                /* when receiving PP_FINISH message         */ 
406 #else
407
408   while (1) {
409
410 #endif
411
412     IF_DEBUG(scheduler, printAllThreads());
413
414     /* If we're interrupted (the user pressed ^C, or some other
415      * termination condition occurred), kill all the currently running
416      * threads.
417      */
418     if (interrupted) {
419       IF_DEBUG(scheduler, sched_belch("interrupted"));
420       deleteAllThreads();
421       interrupted = rtsFalse;
422       was_interrupted = rtsTrue;
423     }
424
425     /* Go through the list of main threads and wake up any
426      * clients whose computations have finished.  ToDo: this
427      * should be done more efficiently without a linear scan
428      * of the main threads list, somehow...
429      */
430 #ifdef SMP
431     { 
432       StgMainThread *m, **prev;
433       prev = &main_threads;
434       for (m = main_threads; m != NULL; m = m->link) {
435         switch (m->tso->what_next) {
436         case ThreadComplete:
437           if (m->ret) {
438             *(m->ret) = (StgClosure *)m->tso->sp[0];
439           }
440           *prev = m->link;
441           m->stat = Success;
442           pthread_cond_broadcast(&m->wakeup);
443           break;
444         case ThreadKilled:
445           if (m->ret) *(m->ret) = NULL;
446           *prev = m->link;
447           if (was_interrupted) {
448             m->stat = Interrupted;
449           } else {
450             m->stat = Killed;
451           }
452           pthread_cond_broadcast(&m->wakeup);
453           break;
454         default:
455           break;
456         }
457       }
458     }
459
460 #else // not SMP
461
462 # if defined(PAR)
463     /* in GUM do this only on the Main PE */
464     if (IAmMainThread)
465 # endif
466     /* If our main thread has finished or been killed, return.
467      */
468     {
469       StgMainThread *m = main_threads;
470       if (m->tso->what_next == ThreadComplete
471           || m->tso->what_next == ThreadKilled) {
472         main_threads = main_threads->link;
473         if (m->tso->what_next == ThreadComplete) {
474           /* we finished successfully, fill in the return value */
475           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
476           m->stat = Success;
477           return;
478         } else {
479           if (m->ret) { *(m->ret) = NULL; };
480           if (was_interrupted) {
481             m->stat = Interrupted;
482           } else {
483             m->stat = Killed;
484           }
485           return;
486         }
487       }
488     }
489 #endif
490
491     /* Top up the run queue from our spark pool.  We try to make the
492      * number of threads in the run queue equal to the number of
493      * free capabilities.
494      */
495 #if defined(SMP)
496     {
497       nat n = n_free_capabilities;
498       StgTSO *tso = run_queue_hd;
499
500       /* Count the run queue */
501       while (n > 0 && tso != END_TSO_QUEUE) {
502         tso = tso->link;
503         n--;
504       }
505
506       for (; n > 0; n--) {
507         StgClosure *spark;
508         spark = findSpark(rtsFalse);
509         if (spark == NULL) {
510           break; /* no more sparks in the pool */
511         } else {
512           /* I'd prefer this to be done in activateSpark -- HWL */
513           /* tricky - it needs to hold the scheduler lock and
514            * not try to re-acquire it -- SDM */
515           createSparkThread(spark);       
516           IF_DEBUG(scheduler,
517                    sched_belch("==^^ turning spark of closure %p into a thread",
518                                (StgClosure *)spark));
519         }
520       }
521       /* We need to wake up the other tasks if we just created some
522        * work for them.
523        */
524       if (n_free_capabilities - n > 1) {
525           pthread_cond_signal(&thread_ready_cond);
526       }
527     }
528 #endif // SMP
529
530     /* check for signals each time around the scheduler */
531 #ifndef mingw32_TARGET_OS
532     if (signals_pending()) {
533       startSignalHandlers();
534     }
535 #endif
536
537     /* Check whether any waiting threads need to be woken up.  If the
538      * run queue is empty, and there are no other tasks running, we
539      * can wait indefinitely for something to happen.
540      * ToDo: what if another client comes along & requests another
541      * main thread?
542      */
543     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
544       awaitEvent(
545            (run_queue_hd == END_TSO_QUEUE)
546 #ifdef SMP
547         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
548 #endif
549         );
550     }
551     /* we can be interrupted while waiting for I/O... */
552     if (interrupted) continue;
553
554     /* 
555      * Detect deadlock: when we have no threads to run, there are no
556      * threads waiting on I/O or sleeping, and all the other tasks are
557      * waiting for work, we must have a deadlock of some description.
558      *
559      * We first try to find threads blocked on themselves (ie. black
560      * holes), and generate NonTermination exceptions where necessary.
561      *
562      * If no threads are black holed, we have a deadlock situation, so
563      * inform all the main threads.
564      */
565 #ifndef PAR
566     if (blocked_queue_hd == END_TSO_QUEUE
567         && run_queue_hd == END_TSO_QUEUE
568         && sleeping_queue == END_TSO_QUEUE
569 #ifdef SMP
570         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
571 #endif
572         )
573     {
574         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
575         GarbageCollect(GetRoots,rtsTrue);
576         if (blocked_queue_hd == END_TSO_QUEUE
577             && run_queue_hd == END_TSO_QUEUE
578             && sleeping_queue == END_TSO_QUEUE) {
579             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
580             detectBlackHoles();
581             if (run_queue_hd == END_TSO_QUEUE) {
582                 StgMainThread *m = main_threads;
583 #ifdef SMP
584                 for (; m != NULL; m = m->link) {
585                     deleteThread(m->tso);
586                     m->ret = NULL;
587                     m->stat = Deadlock;
588                     pthread_cond_broadcast(&m->wakeup);
589                 }
590                 main_threads = NULL;
591 #else
592                 deleteThread(m->tso);
593                 m->ret = NULL;
594                 m->stat = Deadlock;
595                 main_threads = m->link;
596                 return;
597 #endif
598             }
599         }
600     }
601 #elif defined(PAR)
602     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
603 #endif
604
605 #ifdef SMP
606     /* If there's a GC pending, don't do anything until it has
607      * completed.
608      */
609     if (ready_to_gc) {
610       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
611       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
612     }
613     
614     /* block until we've got a thread on the run queue and a free
615      * capability.
616      */
617     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
618       IF_DEBUG(scheduler, sched_belch("waiting for work"));
619       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
620       IF_DEBUG(scheduler, sched_belch("work now available"));
621     }
622 #endif
623
624 #if defined(GRAN)
625
626     if (RtsFlags.GranFlags.Light)
627       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
628
629     /* adjust time based on time-stamp */
630     if (event->time > CurrentTime[CurrentProc] &&
631         event->evttype != ContinueThread)
632       CurrentTime[CurrentProc] = event->time;
633     
634     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
635     if (!RtsFlags.GranFlags.Light)
636       handleIdlePEs();
637
638     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
639
640     /* main event dispatcher in GranSim */
641     switch (event->evttype) {
642       /* Should just be continuing execution */
643     case ContinueThread:
644       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
645       /* ToDo: check assertion
646       ASSERT(run_queue_hd != (StgTSO*)NULL &&
647              run_queue_hd != END_TSO_QUEUE);
648       */
649       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
650       if (!RtsFlags.GranFlags.DoAsyncFetch &&
651           procStatus[CurrentProc]==Fetching) {
652         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
653               CurrentTSO->id, CurrentTSO, CurrentProc);
654         goto next_thread;
655       } 
656       /* Ignore ContinueThreads for completed threads */
657       if (CurrentTSO->what_next == ThreadComplete) {
658         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
659               CurrentTSO->id, CurrentTSO, CurrentProc);
660         goto next_thread;
661       } 
662       /* Ignore ContinueThreads for threads that are being migrated */
663       if (PROCS(CurrentTSO)==Nowhere) { 
664         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
665               CurrentTSO->id, CurrentTSO, CurrentProc);
666         goto next_thread;
667       }
668       /* The thread should be at the beginning of the run queue */
669       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
670         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
671               CurrentTSO->id, CurrentTSO, CurrentProc);
672         break; // run the thread anyway
673       }
674       /*
675       new_event(proc, proc, CurrentTime[proc],
676                 FindWork,
677                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
678       goto next_thread; 
679       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
680       break; // now actually run the thread; DaH Qu'vam yImuHbej 
681
682     case FetchNode:
683       do_the_fetchnode(event);
684       goto next_thread;             /* handle next event in event queue  */
685       
686     case GlobalBlock:
687       do_the_globalblock(event);
688       goto next_thread;             /* handle next event in event queue  */
689       
690     case FetchReply:
691       do_the_fetchreply(event);
692       goto next_thread;             /* handle next event in event queue  */
693       
694     case UnblockThread:   /* Move from the blocked queue to the tail of */
695       do_the_unblock(event);
696       goto next_thread;             /* handle next event in event queue  */
697       
698     case ResumeThread:  /* Move from the blocked queue to the tail of */
699       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
700       event->tso->gran.blocktime += 
701         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
702       do_the_startthread(event);
703       goto next_thread;             /* handle next event in event queue  */
704       
705     case StartThread:
706       do_the_startthread(event);
707       goto next_thread;             /* handle next event in event queue  */
708       
709     case MoveThread:
710       do_the_movethread(event);
711       goto next_thread;             /* handle next event in event queue  */
712       
713     case MoveSpark:
714       do_the_movespark(event);
715       goto next_thread;             /* handle next event in event queue  */
716       
717     case FindWork:
718       do_the_findwork(event);
719       goto next_thread;             /* handle next event in event queue  */
720       
721     default:
722       barf("Illegal event type %u\n", event->evttype);
723     }  /* switch */
724     
725     /* This point was scheduler_loop in the old RTS */
726
727     IF_DEBUG(gran, belch("GRAN: after main switch"));
728
729     TimeOfLastEvent = CurrentTime[CurrentProc];
730     TimeOfNextEvent = get_time_of_next_event();
731     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
732     // CurrentTSO = ThreadQueueHd;
733
734     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
735                          TimeOfNextEvent));
736
737     if (RtsFlags.GranFlags.Light) 
738       GranSimLight_leave_system(event, &ActiveTSO); 
739
740     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
741
742     IF_DEBUG(gran, 
743              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
744
745     /* in a GranSim setup the TSO stays on the run queue */
746     t = CurrentTSO;
747     /* Take a thread from the run queue. */
748     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
749
750     IF_DEBUG(gran, 
751              fprintf(stderr, "GRAN: About to run current thread, which is\n");
752              G_TSO(t,5));
753
754     context_switch = 0; // turned on via GranYield, checking events and time slice
755
756     IF_DEBUG(gran, 
757              DumpGranEvent(GR_SCHEDULE, t));
758
759     procStatus[CurrentProc] = Busy;
760
761 #elif defined(PAR)
762     if (PendingFetches != END_BF_QUEUE) {
763         processFetches();
764     }
765
766     /* ToDo: phps merge with spark activation above */
767     /* check whether we have local work and send requests if we have none */
768     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
769       /* :-[  no local threads => look out for local sparks */
770       /* the spark pool for the current PE */
771       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
772       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
773           pool->hd < pool->tl) {
774         /* 
775          * ToDo: add GC code check that we really have enough heap afterwards!!
776          * Old comment:
777          * If we're here (no runnable threads) and we have pending
778          * sparks, we must have a space problem.  Get enough space
779          * to turn one of those pending sparks into a
780          * thread... 
781          */
782
783         spark = findSpark(rtsFalse);                /* get a spark */
784         if (spark != (rtsSpark) NULL) {
785           tso = activateSpark(spark);       /* turn the spark into a thread */
786           IF_PAR_DEBUG(schedule,
787                        belch("==== schedule: Created TSO %d (%p); %d threads active",
788                              tso->id, tso, advisory_thread_count));
789
790           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
791             belch("==^^ failed to activate spark");
792             goto next_thread;
793           }               /* otherwise fall through & pick-up new tso */
794         } else {
795           IF_PAR_DEBUG(verbose,
796                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
797                              spark_queue_len(pool)));
798           goto next_thread;
799         }
800       }
801
802       /* If we still have no work we need to send a FISH to get a spark
803          from another PE 
804       */
805       if (EMPTY_RUN_QUEUE()) {
806       /* =8-[  no local sparks => look for work on other PEs */
807         /*
808          * We really have absolutely no work.  Send out a fish
809          * (there may be some out there already), and wait for
810          * something to arrive.  We clearly can't run any threads
811          * until a SCHEDULE or RESUME arrives, and so that's what
812          * we're hoping to see.  (Of course, we still have to
813          * respond to other types of messages.)
814          */
815         TIME now = msTime() /*CURRENT_TIME*/;
816         IF_PAR_DEBUG(verbose, 
817                      belch("--  now=%ld", now));
818         IF_PAR_DEBUG(verbose,
819                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
820                          (last_fish_arrived_at!=0 &&
821                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
822                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
823                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
824                              last_fish_arrived_at,
825                              RtsFlags.ParFlags.fishDelay, now);
826                      });
827         
828         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
829             (last_fish_arrived_at==0 ||
830              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
831           /* outstandingFishes is set in sendFish, processFish;
832              avoid flooding system with fishes via delay */
833           pe = choosePE();
834           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
835                    NEW_FISH_HUNGER);
836
837           // Global statistics: count no. of fishes
838           if (RtsFlags.ParFlags.ParStats.Global &&
839               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
840             globalParStats.tot_fish_mess++;
841           }
842         }
843       
844         receivedFinish = processMessages();
845         goto next_thread;
846       }
847     } else if (PacketsWaiting()) {  /* Look for incoming messages */
848       receivedFinish = processMessages();
849     }
850
851     /* Now we are sure that we have some work available */
852     ASSERT(run_queue_hd != END_TSO_QUEUE);
853
854     /* Take a thread from the run queue, if we have work */
855     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
856     IF_DEBUG(sanity,checkTSO(t));
857
858     /* ToDo: write something to the log-file
859     if (RTSflags.ParFlags.granSimStats && !sameThread)
860         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
861
862     CurrentTSO = t;
863     */
864     /* the spark pool for the current PE */
865     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
866
867     IF_DEBUG(scheduler, 
868              belch("--=^ %d threads, %d sparks on [%#x]", 
869                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
870
871 #if 1
872     if (0 && RtsFlags.ParFlags.ParStats.Full && 
873         t && LastTSO && t->id != LastTSO->id && 
874         LastTSO->why_blocked == NotBlocked && 
875         LastTSO->what_next != ThreadComplete) {
876       // if previously scheduled TSO not blocked we have to record the context switch
877       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
878                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
879     }
880
881     if (RtsFlags.ParFlags.ParStats.Full && 
882         (emitSchedule /* forced emit */ ||
883         (t && LastTSO && t->id != LastTSO->id))) {
884       /* 
885          we are running a different TSO, so write a schedule event to log file
886          NB: If we use fair scheduling we also have to write  a deschedule 
887              event for LastTSO; with unfair scheduling we know that the
888              previous tso has blocked whenever we switch to another tso, so
889              we don't need it in GUM for now
890       */
891       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
892                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
893       emitSchedule = rtsFalse;
894     }
895      
896 #endif
897 #else /* !GRAN && !PAR */
898   
899     /* grab a thread from the run queue
900      */
901     ASSERT(run_queue_hd != END_TSO_QUEUE);
902     t = POP_RUN_QUEUE();
903
904     // Sanity check the thread we're about to run.  This can be
905     // expensive if there is lots of thread switching going on...
906     IF_DEBUG(sanity,checkTSO(t));
907
908 #endif
909     
910     /* grab a capability
911      */
912 #ifdef SMP
913     cap = free_capabilities;
914     free_capabilities = cap->link;
915     n_free_capabilities--;
916 #else
917     cap = &MainCapability;
918 #endif
919
920     cap->r.rCurrentTSO = t;
921     
922     /* context switches are now initiated by the timer signal, unless
923      * the user specified "context switch as often as possible", with
924      * +RTS -C0
925      */
926     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
927         && (run_queue_hd != END_TSO_QUEUE
928             || blocked_queue_hd != END_TSO_QUEUE
929             || sleeping_queue != END_TSO_QUEUE))
930         context_switch = 1;
931     else
932         context_switch = 0;
933
934     RELEASE_LOCK(&sched_mutex);
935
936     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
937                               t->id, t, whatNext_strs[t->what_next]));
938
939     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
940     /* Run the current thread 
941      */
942     switch (cap->r.rCurrentTSO->what_next) {
943     case ThreadKilled:
944     case ThreadComplete:
945         /* Thread already finished, return to scheduler. */
946         ret = ThreadFinished;
947         break;
948     case ThreadEnterGHC:
949         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
950         break;
951     case ThreadRunGHC:
952         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
953         break;
954     case ThreadEnterInterp:
955         ret = interpretBCO(cap);
956         break;
957     default:
958       barf("schedule: invalid what_next field");
959     }
960     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
961     
962     /* Costs for the scheduler are assigned to CCS_SYSTEM */
963 #ifdef PROFILING
964     CCCS = CCS_SYSTEM;
965 #endif
966     
967     ACQUIRE_LOCK(&sched_mutex);
968
969 #ifdef SMP
970     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
971 #elif !defined(GRAN) && !defined(PAR)
972     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
973 #endif
974     t = cap->r.rCurrentTSO;
975     
976 #if defined(PAR)
977     /* HACK 675: if the last thread didn't yield, make sure to print a 
978        SCHEDULE event to the log file when StgRunning the next thread, even
979        if it is the same one as before */
980     LastTSO = t; 
981     TimeOfLastYield = CURRENT_TIME;
982 #endif
983
984     switch (ret) {
985     case HeapOverflow:
986 #if defined(GRAN)
987       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
988       globalGranStats.tot_heapover++;
989 #elif defined(PAR)
990       globalParStats.tot_heapover++;
991 #endif
992
993       // did the task ask for a large block?
994       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
995           // if so, get one and push it on the front of the nursery.
996           bdescr *bd;
997           nat blocks;
998           
999           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1000
1001           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1002                                    t->id, t,
1003                                    whatNext_strs[t->what_next], blocks));
1004
1005           // don't do this if it would push us over the
1006           // alloc_blocks_lim limit; we'll GC first.
1007           if (alloc_blocks + blocks < alloc_blocks_lim) {
1008
1009               alloc_blocks += blocks;
1010               bd = allocGroup( blocks );
1011
1012               // link the new group into the list
1013               bd->link = cap->r.rCurrentNursery;
1014               bd->u.back = cap->r.rCurrentNursery->u.back;
1015               if (cap->r.rCurrentNursery->u.back != NULL) {
1016                   cap->r.rCurrentNursery->u.back->link = bd;
1017               } else {
1018                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1019                          g0s0->blocks == cap->r.rNursery);
1020                   cap->r.rNursery = g0s0->blocks = bd;
1021               }           
1022               cap->r.rCurrentNursery->u.back = bd;
1023
1024               // initialise it as a nursery block
1025               bd->step = g0s0;
1026               bd->gen_no = 0;
1027               bd->flags = 0;
1028               bd->free = bd->start;
1029
1030               // don't forget to update the block count in g0s0.
1031               g0s0->n_blocks += blocks;
1032               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1033
1034               // now update the nursery to point to the new block
1035               cap->r.rCurrentNursery = bd;
1036
1037               // we might be unlucky and have another thread get on the
1038               // run queue before us and steal the large block, but in that
1039               // case the thread will just end up requesting another large
1040               // block.
1041               PUSH_ON_RUN_QUEUE(t);
1042               break;
1043           }
1044       }
1045
1046       /* make all the running tasks block on a condition variable,
1047        * maybe set context_switch and wait till they all pile in,
1048        * then have them wait on a GC condition variable.
1049        */
1050       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1051                                t->id, t, whatNext_strs[t->what_next]));
1052       threadPaused(t);
1053 #if defined(GRAN)
1054       ASSERT(!is_on_queue(t,CurrentProc));
1055 #elif defined(PAR)
1056       /* Currently we emit a DESCHEDULE event before GC in GUM.
1057          ToDo: either add separate event to distinguish SYSTEM time from rest
1058                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1059       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1060         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1061                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1062         emitSchedule = rtsTrue;
1063       }
1064 #endif
1065       
1066       ready_to_gc = rtsTrue;
1067       context_switch = 1;               /* stop other threads ASAP */
1068       PUSH_ON_RUN_QUEUE(t);
1069       /* actual GC is done at the end of the while loop */
1070       break;
1071       
1072     case StackOverflow:
1073 #if defined(GRAN)
1074       IF_DEBUG(gran, 
1075                DumpGranEvent(GR_DESCHEDULE, t));
1076       globalGranStats.tot_stackover++;
1077 #elif defined(PAR)
1078       // IF_DEBUG(par, 
1079       // DumpGranEvent(GR_DESCHEDULE, t);
1080       globalParStats.tot_stackover++;
1081 #endif
1082       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1083                                t->id, t, whatNext_strs[t->what_next]));
1084       /* just adjust the stack for this thread, then pop it back
1085        * on the run queue.
1086        */
1087       threadPaused(t);
1088       { 
1089         StgMainThread *m;
1090         /* enlarge the stack */
1091         StgTSO *new_t = threadStackOverflow(t);
1092         
1093         /* This TSO has moved, so update any pointers to it from the
1094          * main thread stack.  It better not be on any other queues...
1095          * (it shouldn't be).
1096          */
1097         for (m = main_threads; m != NULL; m = m->link) {
1098           if (m->tso == t) {
1099             m->tso = new_t;
1100           }
1101         }
1102         threadPaused(new_t);
1103         PUSH_ON_RUN_QUEUE(new_t);
1104       }
1105       break;
1106
1107     case ThreadYielding:
1108 #if defined(GRAN)
1109       IF_DEBUG(gran, 
1110                DumpGranEvent(GR_DESCHEDULE, t));
1111       globalGranStats.tot_yields++;
1112 #elif defined(PAR)
1113       // IF_DEBUG(par, 
1114       // DumpGranEvent(GR_DESCHEDULE, t);
1115       globalParStats.tot_yields++;
1116 #endif
1117       /* put the thread back on the run queue.  Then, if we're ready to
1118        * GC, check whether this is the last task to stop.  If so, wake
1119        * up the GC thread.  getThread will block during a GC until the
1120        * GC is finished.
1121        */
1122       IF_DEBUG(scheduler,
1123                if (t->what_next == ThreadEnterInterp) {
1124                    /* ToDo: or maybe a timer expired when we were in Hugs?
1125                     * or maybe someone hit ctrl-C
1126                     */
1127                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1128                          t->id, t, whatNext_strs[t->what_next]);
1129                } else {
1130                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1131                          t->id, t, whatNext_strs[t->what_next]);
1132                }
1133                );
1134
1135       threadPaused(t);
1136
1137       IF_DEBUG(sanity,
1138                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1139                checkTSO(t));
1140       ASSERT(t->link == END_TSO_QUEUE);
1141 #if defined(GRAN)
1142       ASSERT(!is_on_queue(t,CurrentProc));
1143
1144       IF_DEBUG(sanity,
1145                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1146                checkThreadQsSanity(rtsTrue));
1147 #endif
1148 #if defined(PAR)
1149       if (RtsFlags.ParFlags.doFairScheduling) { 
1150         /* this does round-robin scheduling; good for concurrency */
1151         APPEND_TO_RUN_QUEUE(t);
1152       } else {
1153         /* this does unfair scheduling; good for parallelism */
1154         PUSH_ON_RUN_QUEUE(t);
1155       }
1156 #else
1157       /* this does round-robin scheduling; good for concurrency */
1158       APPEND_TO_RUN_QUEUE(t);
1159 #endif
1160 #if defined(GRAN)
1161       /* add a ContinueThread event to actually process the thread */
1162       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1163                 ContinueThread,
1164                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1165       IF_GRAN_DEBUG(bq, 
1166                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1167                G_EVENTQ(0);
1168                G_CURR_THREADQ(0));
1169 #endif /* GRAN */
1170       break;
1171       
1172     case ThreadBlocked:
1173 #if defined(GRAN)
1174       IF_DEBUG(scheduler,
1175                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1176                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1177                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1178
1179       // ??? needed; should emit block before
1180       IF_DEBUG(gran, 
1181                DumpGranEvent(GR_DESCHEDULE, t)); 
1182       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1183       /*
1184         ngoq Dogh!
1185       ASSERT(procStatus[CurrentProc]==Busy || 
1186               ((procStatus[CurrentProc]==Fetching) && 
1187               (t->block_info.closure!=(StgClosure*)NULL)));
1188       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1189           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1190             procStatus[CurrentProc]==Fetching)) 
1191         procStatus[CurrentProc] = Idle;
1192       */
1193 #elif defined(PAR)
1194       IF_DEBUG(scheduler,
1195                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1196                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1197       IF_PAR_DEBUG(bq,
1198
1199                    if (t->block_info.closure!=(StgClosure*)NULL) 
1200                      print_bq(t->block_info.closure));
1201
1202       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1203       blockThread(t);
1204
1205       /* whatever we schedule next, we must log that schedule */
1206       emitSchedule = rtsTrue;
1207
1208 #else /* !GRAN */
1209       /* don't need to do anything.  Either the thread is blocked on
1210        * I/O, in which case we'll have called addToBlockedQueue
1211        * previously, or it's blocked on an MVar or Blackhole, in which
1212        * case it'll be on the relevant queue already.
1213        */
1214       IF_DEBUG(scheduler,
1215                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1216                printThreadBlockage(t);
1217                fprintf(stderr, "\n"));
1218
1219       /* Only for dumping event to log file 
1220          ToDo: do I need this in GranSim, too?
1221       blockThread(t);
1222       */
1223 #endif
1224       threadPaused(t);
1225       break;
1226       
1227     case ThreadFinished:
1228       /* Need to check whether this was a main thread, and if so, signal
1229        * the task that started it with the return value.  If we have no
1230        * more main threads, we probably need to stop all the tasks until
1231        * we get a new one.
1232        */
1233       /* We also end up here if the thread kills itself with an
1234        * uncaught exception, see Exception.hc.
1235        */
1236       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1237 #if defined(GRAN)
1238       endThread(t, CurrentProc); // clean-up the thread
1239 #elif defined(PAR)
1240       /* For now all are advisory -- HWL */
1241       //if(t->priority==AdvisoryPriority) ??
1242       advisory_thread_count--;
1243       
1244 # ifdef DIST
1245       if(t->dist.priority==RevalPriority)
1246         FinishReval(t);
1247 # endif
1248       
1249       if (RtsFlags.ParFlags.ParStats.Full &&
1250           !RtsFlags.ParFlags.ParStats.Suppressed) 
1251         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1252 #endif
1253       break;
1254       
1255     default:
1256       barf("schedule: invalid thread return code %d", (int)ret);
1257     }
1258     
1259 #ifdef SMP
1260     cap->link = free_capabilities;
1261     free_capabilities = cap;
1262     n_free_capabilities++;
1263 #endif
1264
1265 #ifdef SMP
1266     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1267 #else
1268     if (ready_to_gc) 
1269 #endif
1270       {
1271       /* everybody back, start the GC.
1272        * Could do it in this thread, or signal a condition var
1273        * to do it in another thread.  Either way, we need to
1274        * broadcast on gc_pending_cond afterward.
1275        */
1276 #ifdef SMP
1277       IF_DEBUG(scheduler,sched_belch("doing GC"));
1278 #endif
1279       GarbageCollect(GetRoots,rtsFalse);
1280       ready_to_gc = rtsFalse;
1281 #ifdef SMP
1282       pthread_cond_broadcast(&gc_pending_cond);
1283 #endif
1284 #if defined(GRAN)
1285       /* add a ContinueThread event to continue execution of current thread */
1286       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1287                 ContinueThread,
1288                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1289       IF_GRAN_DEBUG(bq, 
1290                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1291                G_EVENTQ(0);
1292                G_CURR_THREADQ(0));
1293 #endif /* GRAN */
1294     }
1295
1296 #if defined(GRAN)
1297   next_thread:
1298     IF_GRAN_DEBUG(unused,
1299                   print_eventq(EventHd));
1300
1301     event = get_next_event();
1302 #elif defined(PAR)
1303   next_thread:
1304     /* ToDo: wait for next message to arrive rather than busy wait */
1305 #endif /* GRAN */
1306
1307   } /* end of while(1) */
1308
1309   IF_PAR_DEBUG(verbose,
1310                belch("== Leaving schedule() after having received Finish"));
1311 }
1312
1313 /* ---------------------------------------------------------------------------
1314  * deleteAllThreads():  kill all the live threads.
1315  *
1316  * This is used when we catch a user interrupt (^C), before performing
1317  * any necessary cleanups and running finalizers.
1318  * ------------------------------------------------------------------------- */
1319    
1320 void deleteAllThreads ( void )
1321 {
1322   StgTSO* t;
1323   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1324   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1325       deleteThread(t);
1326   }
1327   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1328       deleteThread(t);
1329   }
1330   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1331       deleteThread(t);
1332   }
1333   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1334   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1335   sleeping_queue = END_TSO_QUEUE;
1336 }
1337
1338 /* startThread and  insertThread are now in GranSim.c -- HWL */
1339
1340 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1341 //@subsection Suspend and Resume
1342
1343 /* ---------------------------------------------------------------------------
1344  * Suspending & resuming Haskell threads.
1345  * 
1346  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1347  * its capability before calling the C function.  This allows another
1348  * task to pick up the capability and carry on running Haskell
1349  * threads.  It also means that if the C call blocks, it won't lock
1350  * the whole system.
1351  *
1352  * The Haskell thread making the C call is put to sleep for the
1353  * duration of the call, on the susepended_ccalling_threads queue.  We
1354  * give out a token to the task, which it can use to resume the thread
1355  * on return from the C function.
1356  * ------------------------------------------------------------------------- */
1357    
1358 StgInt
1359 suspendThread( Capability *cap )
1360 {
1361   nat tok;
1362
1363   ACQUIRE_LOCK(&sched_mutex);
1364
1365   IF_DEBUG(scheduler,
1366            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1367
1368   threadPaused(cap->r.rCurrentTSO);
1369   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1370   suspended_ccalling_threads = cap->r.rCurrentTSO;
1371
1372   /* Use the thread ID as the token; it should be unique */
1373   tok = cap->r.rCurrentTSO->id;
1374
1375 #ifdef SMP
1376   cap->link = free_capabilities;
1377   free_capabilities = cap;
1378   n_free_capabilities++;
1379 #endif
1380
1381   RELEASE_LOCK(&sched_mutex);
1382   return tok; 
1383 }
1384
1385 Capability *
1386 resumeThread( StgInt tok )
1387 {
1388   StgTSO *tso, **prev;
1389   Capability *cap;
1390
1391   ACQUIRE_LOCK(&sched_mutex);
1392
1393   prev = &suspended_ccalling_threads;
1394   for (tso = suspended_ccalling_threads; 
1395        tso != END_TSO_QUEUE; 
1396        prev = &tso->link, tso = tso->link) {
1397     if (tso->id == (StgThreadID)tok) {
1398       *prev = tso->link;
1399       break;
1400     }
1401   }
1402   if (tso == END_TSO_QUEUE) {
1403     barf("resumeThread: thread not found");
1404   }
1405   tso->link = END_TSO_QUEUE;
1406
1407 #ifdef SMP
1408   while (free_capabilities == NULL) {
1409     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1410     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1411     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1412   }
1413   cap = free_capabilities;
1414   free_capabilities = cap->link;
1415   n_free_capabilities--;
1416 #else  
1417   cap = &MainCapability;
1418 #endif
1419
1420   cap->r.rCurrentTSO = tso;
1421
1422   RELEASE_LOCK(&sched_mutex);
1423   return cap;
1424 }
1425
1426
1427 /* ---------------------------------------------------------------------------
1428  * Static functions
1429  * ------------------------------------------------------------------------ */
1430 static void unblockThread(StgTSO *tso);
1431
1432 /* ---------------------------------------------------------------------------
1433  * Comparing Thread ids.
1434  *
1435  * This is used from STG land in the implementation of the
1436  * instances of Eq/Ord for ThreadIds.
1437  * ------------------------------------------------------------------------ */
1438
1439 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1440
1441   StgThreadID id1 = tso1->id; 
1442   StgThreadID id2 = tso2->id;
1443  
1444   if (id1 < id2) return (-1);
1445   if (id1 > id2) return 1;
1446   return 0;
1447 }
1448
1449 /* ---------------------------------------------------------------------------
1450  * Fetching the ThreadID from an StgTSO.
1451  *
1452  * This is used in the implementation of Show for ThreadIds.
1453  * ------------------------------------------------------------------------ */
1454 int rts_getThreadId(const StgTSO *tso) 
1455 {
1456   return tso->id;
1457 }
1458
1459 /* ---------------------------------------------------------------------------
1460    Create a new thread.
1461
1462    The new thread starts with the given stack size.  Before the
1463    scheduler can run, however, this thread needs to have a closure
1464    (and possibly some arguments) pushed on its stack.  See
1465    pushClosure() in Schedule.h.
1466
1467    createGenThread() and createIOThread() (in SchedAPI.h) are
1468    convenient packaged versions of this function.
1469
1470    currently pri (priority) is only used in a GRAN setup -- HWL
1471    ------------------------------------------------------------------------ */
1472 //@cindex createThread
1473 #if defined(GRAN)
1474 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1475 StgTSO *
1476 createThread(nat stack_size, StgInt pri)
1477 {
1478   return createThread_(stack_size, rtsFalse, pri);
1479 }
1480
1481 static StgTSO *
1482 createThread_(nat size, rtsBool have_lock, StgInt pri)
1483 {
1484 #else
1485 StgTSO *
1486 createThread(nat stack_size)
1487 {
1488   return createThread_(stack_size, rtsFalse);
1489 }
1490
1491 static StgTSO *
1492 createThread_(nat size, rtsBool have_lock)
1493 {
1494 #endif
1495
1496     StgTSO *tso;
1497     nat stack_size;
1498
1499     /* First check whether we should create a thread at all */
1500 #if defined(PAR)
1501   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1502   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1503     threadsIgnored++;
1504     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1505           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1506     return END_TSO_QUEUE;
1507   }
1508   threadsCreated++;
1509 #endif
1510
1511 #if defined(GRAN)
1512   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1513 #endif
1514
1515   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1516
1517   /* catch ridiculously small stack sizes */
1518   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1519     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1520   }
1521
1522   stack_size = size - TSO_STRUCT_SIZEW;
1523
1524   tso = (StgTSO *)allocate(size);
1525   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1526
1527   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1528 #if defined(GRAN)
1529   SET_GRAN_HDR(tso, ThisPE);
1530 #endif
1531   tso->what_next     = ThreadEnterGHC;
1532
1533   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1534    * protect the increment operation on next_thread_id.
1535    * In future, we could use an atomic increment instead.
1536    */
1537   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1538   tso->id = next_thread_id++; 
1539   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1540
1541   tso->why_blocked  = NotBlocked;
1542   tso->blocked_exceptions = NULL;
1543
1544   tso->stack_size   = stack_size;
1545   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1546                               - TSO_STRUCT_SIZEW;
1547   tso->sp           = (P_)&(tso->stack) + stack_size;
1548
1549 #ifdef PROFILING
1550   tso->prof.CCCS = CCS_MAIN;
1551 #endif
1552
1553   /* put a stop frame on the stack */
1554   tso->sp -= sizeofW(StgStopFrame);
1555   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1556   tso->su = (StgUpdateFrame*)tso->sp;
1557
1558   // ToDo: check this
1559 #if defined(GRAN)
1560   tso->link = END_TSO_QUEUE;
1561   /* uses more flexible routine in GranSim */
1562   insertThread(tso, CurrentProc);
1563 #else
1564   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1565    * from its creation
1566    */
1567 #endif
1568
1569 #if defined(GRAN) 
1570   if (RtsFlags.GranFlags.GranSimStats.Full) 
1571     DumpGranEvent(GR_START,tso);
1572 #elif defined(PAR)
1573   if (RtsFlags.ParFlags.ParStats.Full) 
1574     DumpGranEvent(GR_STARTQ,tso);
1575   /* HACk to avoid SCHEDULE 
1576      LastTSO = tso; */
1577 #endif
1578
1579   /* Link the new thread on the global thread list.
1580    */
1581   tso->global_link = all_threads;
1582   all_threads = tso;
1583
1584 #if defined(DIST)
1585   tso->dist.priority = MandatoryPriority; //by default that is...
1586 #endif
1587
1588 #if defined(GRAN)
1589   tso->gran.pri = pri;
1590 # if defined(DEBUG)
1591   tso->gran.magic = TSO_MAGIC; // debugging only
1592 # endif
1593   tso->gran.sparkname   = 0;
1594   tso->gran.startedat   = CURRENT_TIME; 
1595   tso->gran.exported    = 0;
1596   tso->gran.basicblocks = 0;
1597   tso->gran.allocs      = 0;
1598   tso->gran.exectime    = 0;
1599   tso->gran.fetchtime   = 0;
1600   tso->gran.fetchcount  = 0;
1601   tso->gran.blocktime   = 0;
1602   tso->gran.blockcount  = 0;
1603   tso->gran.blockedat   = 0;
1604   tso->gran.globalsparks = 0;
1605   tso->gran.localsparks  = 0;
1606   if (RtsFlags.GranFlags.Light)
1607     tso->gran.clock  = Now; /* local clock */
1608   else
1609     tso->gran.clock  = 0;
1610
1611   IF_DEBUG(gran,printTSO(tso));
1612 #elif defined(PAR)
1613 # if defined(DEBUG)
1614   tso->par.magic = TSO_MAGIC; // debugging only
1615 # endif
1616   tso->par.sparkname   = 0;
1617   tso->par.startedat   = CURRENT_TIME; 
1618   tso->par.exported    = 0;
1619   tso->par.basicblocks = 0;
1620   tso->par.allocs      = 0;
1621   tso->par.exectime    = 0;
1622   tso->par.fetchtime   = 0;
1623   tso->par.fetchcount  = 0;
1624   tso->par.blocktime   = 0;
1625   tso->par.blockcount  = 0;
1626   tso->par.blockedat   = 0;
1627   tso->par.globalsparks = 0;
1628   tso->par.localsparks  = 0;
1629 #endif
1630
1631 #if defined(GRAN)
1632   globalGranStats.tot_threads_created++;
1633   globalGranStats.threads_created_on_PE[CurrentProc]++;
1634   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1635   globalGranStats.tot_sq_probes++;
1636 #elif defined(PAR)
1637   // collect parallel global statistics (currently done together with GC stats)
1638   if (RtsFlags.ParFlags.ParStats.Global &&
1639       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1640     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1641     globalParStats.tot_threads_created++;
1642   }
1643 #endif 
1644
1645 #if defined(GRAN)
1646   IF_GRAN_DEBUG(pri,
1647                 belch("==__ schedule: Created TSO %d (%p);",
1648                       CurrentProc, tso, tso->id));
1649 #elif defined(PAR)
1650     IF_PAR_DEBUG(verbose,
1651                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1652                        tso->id, tso, advisory_thread_count));
1653 #else
1654   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1655                                  tso->id, tso->stack_size));
1656 #endif    
1657   return tso;
1658 }
1659
1660 #if defined(PAR)
1661 /* RFP:
1662    all parallel thread creation calls should fall through the following routine.
1663 */
1664 StgTSO *
1665 createSparkThread(rtsSpark spark) 
1666 { StgTSO *tso;
1667   ASSERT(spark != (rtsSpark)NULL);
1668   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1669   { threadsIgnored++;
1670     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1671           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1672     return END_TSO_QUEUE;
1673   }
1674   else
1675   { threadsCreated++;
1676     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1677     if (tso==END_TSO_QUEUE)     
1678       barf("createSparkThread: Cannot create TSO");
1679 #if defined(DIST)
1680     tso->priority = AdvisoryPriority;
1681 #endif
1682     pushClosure(tso,spark);
1683     PUSH_ON_RUN_QUEUE(tso);
1684     advisory_thread_count++;    
1685   }
1686   return tso;
1687 }
1688 #endif
1689
1690 /*
1691   Turn a spark into a thread.
1692   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1693 */
1694 #if defined(PAR)
1695 //@cindex activateSpark
1696 StgTSO *
1697 activateSpark (rtsSpark spark) 
1698 {
1699   StgTSO *tso;
1700
1701   tso = createSparkThread(spark);
1702   if (RtsFlags.ParFlags.ParStats.Full) {   
1703     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1704     IF_PAR_DEBUG(verbose,
1705                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1706                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1707   }
1708   // ToDo: fwd info on local/global spark to thread -- HWL
1709   // tso->gran.exported =  spark->exported;
1710   // tso->gran.locked =   !spark->global;
1711   // tso->gran.sparkname = spark->name;
1712
1713   return tso;
1714 }
1715 #endif
1716
1717 /* ---------------------------------------------------------------------------
1718  * scheduleThread()
1719  *
1720  * scheduleThread puts a thread on the head of the runnable queue.
1721  * This will usually be done immediately after a thread is created.
1722  * The caller of scheduleThread must create the thread using e.g.
1723  * createThread and push an appropriate closure
1724  * on this thread's stack before the scheduler is invoked.
1725  * ------------------------------------------------------------------------ */
1726
1727 void
1728 scheduleThread(StgTSO *tso)
1729 {
1730   if (tso==END_TSO_QUEUE){    
1731     schedule();
1732     return;
1733   }
1734
1735   ACQUIRE_LOCK(&sched_mutex);
1736
1737   /* Put the new thread on the head of the runnable queue.  The caller
1738    * better push an appropriate closure on this thread's stack
1739    * beforehand.  In the SMP case, the thread may start running as
1740    * soon as we release the scheduler lock below.
1741    */
1742   PUSH_ON_RUN_QUEUE(tso);
1743   THREAD_RUNNABLE();
1744
1745 #if 0
1746   IF_DEBUG(scheduler,printTSO(tso));
1747 #endif
1748   RELEASE_LOCK(&sched_mutex);
1749 }
1750
1751 /* ---------------------------------------------------------------------------
1752  * startTasks()
1753  *
1754  * Start up Posix threads to run each of the scheduler tasks.
1755  * I believe the task ids are not needed in the system as defined.
1756  *  KH @ 25/10/99
1757  * ------------------------------------------------------------------------ */
1758
1759 #if defined(PAR) || defined(SMP)
1760 void
1761 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1762 {
1763   scheduleThread(END_TSO_QUEUE);
1764 }
1765 #endif
1766
1767 /* ---------------------------------------------------------------------------
1768  * initScheduler()
1769  *
1770  * Initialise the scheduler.  This resets all the queues - if the
1771  * queues contained any threads, they'll be garbage collected at the
1772  * next pass.
1773  *
1774  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1775  * ------------------------------------------------------------------------ */
1776
1777 #ifdef SMP
1778 static void
1779 term_handler(int sig STG_UNUSED)
1780 {
1781   stat_workerStop();
1782   ACQUIRE_LOCK(&term_mutex);
1783   await_death--;
1784   RELEASE_LOCK(&term_mutex);
1785   pthread_exit(NULL);
1786 }
1787 #endif
1788
1789 static void
1790 initCapability( Capability *cap )
1791 {
1792     cap->f.stgChk0         = (F_)__stg_chk_0;
1793     cap->f.stgChk1         = (F_)__stg_chk_1;
1794     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
1795     cap->f.stgUpdatePAP    = (F_)__stg_update_PAP;
1796 }
1797
1798 void 
1799 initScheduler(void)
1800 {
1801 #if defined(GRAN)
1802   nat i;
1803
1804   for (i=0; i<=MAX_PROC; i++) {
1805     run_queue_hds[i]      = END_TSO_QUEUE;
1806     run_queue_tls[i]      = END_TSO_QUEUE;
1807     blocked_queue_hds[i]  = END_TSO_QUEUE;
1808     blocked_queue_tls[i]  = END_TSO_QUEUE;
1809     ccalling_threadss[i]  = END_TSO_QUEUE;
1810     sleeping_queue        = END_TSO_QUEUE;
1811   }
1812 #else
1813   run_queue_hd      = END_TSO_QUEUE;
1814   run_queue_tl      = END_TSO_QUEUE;
1815   blocked_queue_hd  = END_TSO_QUEUE;
1816   blocked_queue_tl  = END_TSO_QUEUE;
1817   sleeping_queue    = END_TSO_QUEUE;
1818 #endif 
1819
1820   suspended_ccalling_threads  = END_TSO_QUEUE;
1821
1822   main_threads = NULL;
1823   all_threads  = END_TSO_QUEUE;
1824
1825   context_switch = 0;
1826   interrupted    = 0;
1827
1828   RtsFlags.ConcFlags.ctxtSwitchTicks =
1829       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1830
1831   /* Install the SIGHUP handler */
1832 #ifdef SMP
1833   {
1834     struct sigaction action,oact;
1835
1836     action.sa_handler = term_handler;
1837     sigemptyset(&action.sa_mask);
1838     action.sa_flags = 0;
1839     if (sigaction(SIGTERM, &action, &oact) != 0) {
1840       barf("can't install TERM handler");
1841     }
1842   }
1843 #endif
1844
1845 #ifdef SMP
1846   /* Allocate N Capabilities */
1847   {
1848     nat i;
1849     Capability *cap, *prev;
1850     cap  = NULL;
1851     prev = NULL;
1852     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1853       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1854       initCapability(cap);
1855       cap->link = prev;
1856       prev = cap;
1857     }
1858     free_capabilities = cap;
1859     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1860   }
1861   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1862                              n_free_capabilities););
1863 #else
1864   initCapability(&MainCapability);
1865 #endif
1866
1867 #if defined(SMP) || defined(PAR)
1868   initSparkPools();
1869 #endif
1870 }
1871
1872 #ifdef SMP
1873 void
1874 startTasks( void )
1875 {
1876   nat i;
1877   int r;
1878   pthread_t tid;
1879   
1880   /* make some space for saving all the thread ids */
1881   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1882                             "initScheduler:task_ids");
1883   
1884   /* and create all the threads */
1885   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1886     r = pthread_create(&tid,NULL,taskStart,NULL);
1887     if (r != 0) {
1888       barf("startTasks: Can't create new Posix thread");
1889     }
1890     task_ids[i].id = tid;
1891     task_ids[i].mut_time = 0.0;
1892     task_ids[i].mut_etime = 0.0;
1893     task_ids[i].gc_time = 0.0;
1894     task_ids[i].gc_etime = 0.0;
1895     task_ids[i].elapsedtimestart = elapsedtime();
1896     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1897   }
1898 }
1899 #endif
1900
1901 void
1902 exitScheduler( void )
1903 {
1904 #ifdef SMP
1905   nat i;
1906
1907   /* Don't want to use pthread_cancel, since we'd have to install
1908    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1909    * all our locks.
1910    */
1911 #if 0
1912   /* Cancel all our tasks */
1913   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1914     pthread_cancel(task_ids[i].id);
1915   }
1916   
1917   /* Wait for all the tasks to terminate */
1918   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1919     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1920                                task_ids[i].id));
1921     pthread_join(task_ids[i].id, NULL);
1922   }
1923 #endif
1924
1925   /* Send 'em all a SIGHUP.  That should shut 'em up.
1926    */
1927   await_death = RtsFlags.ParFlags.nNodes;
1928   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1929     pthread_kill(task_ids[i].id,SIGTERM);
1930   }
1931   while (await_death > 0) {
1932     sched_yield();
1933   }
1934 #endif
1935 }
1936
1937 /* -----------------------------------------------------------------------------
1938    Managing the per-task allocation areas.
1939    
1940    Each capability comes with an allocation area.  These are
1941    fixed-length block lists into which allocation can be done.
1942
1943    ToDo: no support for two-space collection at the moment???
1944    -------------------------------------------------------------------------- */
1945
1946 /* -----------------------------------------------------------------------------
1947  * waitThread is the external interface for running a new computation
1948  * and waiting for the result.
1949  *
1950  * In the non-SMP case, we create a new main thread, push it on the 
1951  * main-thread stack, and invoke the scheduler to run it.  The
1952  * scheduler will return when the top main thread on the stack has
1953  * completed or died, and fill in the necessary fields of the
1954  * main_thread structure.
1955  *
1956  * In the SMP case, we create a main thread as before, but we then
1957  * create a new condition variable and sleep on it.  When our new
1958  * main thread has completed, we'll be woken up and the status/result
1959  * will be in the main_thread struct.
1960  * -------------------------------------------------------------------------- */
1961
1962 int 
1963 howManyThreadsAvail ( void )
1964 {
1965    int i = 0;
1966    StgTSO* q;
1967    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1968       i++;
1969    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1970       i++;
1971    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1972       i++;
1973    return i;
1974 }
1975
1976 void
1977 finishAllThreads ( void )
1978 {
1979    do {
1980       while (run_queue_hd != END_TSO_QUEUE) {
1981          waitThread ( run_queue_hd, NULL );
1982       }
1983       while (blocked_queue_hd != END_TSO_QUEUE) {
1984          waitThread ( blocked_queue_hd, NULL );
1985       }
1986       while (sleeping_queue != END_TSO_QUEUE) {
1987          waitThread ( blocked_queue_hd, NULL );
1988       }
1989    } while 
1990       (blocked_queue_hd != END_TSO_QUEUE || 
1991        run_queue_hd     != END_TSO_QUEUE ||
1992        sleeping_queue   != END_TSO_QUEUE);
1993 }
1994
1995 SchedulerStatus
1996 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1997 {
1998   StgMainThread *m;
1999   SchedulerStatus stat;
2000
2001   ACQUIRE_LOCK(&sched_mutex);
2002   
2003   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2004
2005   m->tso = tso;
2006   m->ret = ret;
2007   m->stat = NoStatus;
2008 #ifdef SMP
2009   pthread_cond_init(&m->wakeup, NULL);
2010 #endif
2011
2012   m->link = main_threads;
2013   main_threads = m;
2014
2015   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2016                               m->tso->id));
2017
2018 #ifdef SMP
2019   do {
2020     pthread_cond_wait(&m->wakeup, &sched_mutex);
2021   } while (m->stat == NoStatus);
2022 #elif defined(GRAN)
2023   /* GranSim specific init */
2024   CurrentTSO = m->tso;                // the TSO to run
2025   procStatus[MainProc] = Busy;        // status of main PE
2026   CurrentProc = MainProc;             // PE to run it on
2027
2028   schedule();
2029 #else
2030   schedule();
2031   ASSERT(m->stat != NoStatus);
2032 #endif
2033
2034   stat = m->stat;
2035
2036 #ifdef SMP
2037   pthread_cond_destroy(&m->wakeup);
2038 #endif
2039
2040   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2041                               m->tso->id));
2042   free(m);
2043
2044   RELEASE_LOCK(&sched_mutex);
2045
2046   return stat;
2047 }
2048
2049 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2050 //@subsection Run queue code 
2051
2052 #if 0
2053 /* 
2054    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2055        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2056        implicit global variable that has to be correct when calling these
2057        fcts -- HWL 
2058 */
2059
2060 /* Put the new thread on the head of the runnable queue.
2061  * The caller of createThread better push an appropriate closure
2062  * on this thread's stack before the scheduler is invoked.
2063  */
2064 static /* inline */ void
2065 add_to_run_queue(tso)
2066 StgTSO* tso; 
2067 {
2068   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2069   tso->link = run_queue_hd;
2070   run_queue_hd = tso;
2071   if (run_queue_tl == END_TSO_QUEUE) {
2072     run_queue_tl = tso;
2073   }
2074 }
2075
2076 /* Put the new thread at the end of the runnable queue. */
2077 static /* inline */ void
2078 push_on_run_queue(tso)
2079 StgTSO* tso; 
2080 {
2081   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2082   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2083   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2084   if (run_queue_hd == END_TSO_QUEUE) {
2085     run_queue_hd = tso;
2086   } else {
2087     run_queue_tl->link = tso;
2088   }
2089   run_queue_tl = tso;
2090 }
2091
2092 /* 
2093    Should be inlined because it's used very often in schedule.  The tso
2094    argument is actually only needed in GranSim, where we want to have the
2095    possibility to schedule *any* TSO on the run queue, irrespective of the
2096    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2097    the run queue and dequeue the tso, adjusting the links in the queue. 
2098 */
2099 //@cindex take_off_run_queue
2100 static /* inline */ StgTSO*
2101 take_off_run_queue(StgTSO *tso) {
2102   StgTSO *t, *prev;
2103
2104   /* 
2105      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2106
2107      if tso is specified, unlink that tso from the run_queue (doesn't have
2108      to be at the beginning of the queue); GranSim only 
2109   */
2110   if (tso!=END_TSO_QUEUE) {
2111     /* find tso in queue */
2112     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2113          t!=END_TSO_QUEUE && t!=tso;
2114          prev=t, t=t->link) 
2115       /* nothing */ ;
2116     ASSERT(t==tso);
2117     /* now actually dequeue the tso */
2118     if (prev!=END_TSO_QUEUE) {
2119       ASSERT(run_queue_hd!=t);
2120       prev->link = t->link;
2121     } else {
2122       /* t is at beginning of thread queue */
2123       ASSERT(run_queue_hd==t);
2124       run_queue_hd = t->link;
2125     }
2126     /* t is at end of thread queue */
2127     if (t->link==END_TSO_QUEUE) {
2128       ASSERT(t==run_queue_tl);
2129       run_queue_tl = prev;
2130     } else {
2131       ASSERT(run_queue_tl!=t);
2132     }
2133     t->link = END_TSO_QUEUE;
2134   } else {
2135     /* take tso from the beginning of the queue; std concurrent code */
2136     t = run_queue_hd;
2137     if (t != END_TSO_QUEUE) {
2138       run_queue_hd = t->link;
2139       t->link = END_TSO_QUEUE;
2140       if (run_queue_hd == END_TSO_QUEUE) {
2141         run_queue_tl = END_TSO_QUEUE;
2142       }
2143     }
2144   }
2145   return t;
2146 }
2147
2148 #endif /* 0 */
2149
2150 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2151 //@subsection Garbage Collextion Routines
2152
2153 /* ---------------------------------------------------------------------------
2154    Where are the roots that we know about?
2155
2156         - all the threads on the runnable queue
2157         - all the threads on the blocked queue
2158         - all the threads on the sleeping queue
2159         - all the thread currently executing a _ccall_GC
2160         - all the "main threads"
2161      
2162    ------------------------------------------------------------------------ */
2163
2164 /* This has to be protected either by the scheduler monitor, or by the
2165         garbage collection monitor (probably the latter).
2166         KH @ 25/10/99
2167 */
2168
2169 static void
2170 GetRoots(evac_fn evac)
2171 {
2172   StgMainThread *m;
2173
2174 #if defined(GRAN)
2175   {
2176     nat i;
2177     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2178       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2179           evac((StgClosure **)&run_queue_hds[i]);
2180       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2181           evac((StgClosure **)&run_queue_tls[i]);
2182       
2183       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2184           evac((StgClosure **)&blocked_queue_hds[i]);
2185       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2186           evac((StgClosure **)&blocked_queue_tls[i]);
2187       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2188           evac((StgClosure **)&ccalling_threads[i]);
2189     }
2190   }
2191
2192   markEventQueue();
2193
2194 #else /* !GRAN */
2195   if (run_queue_hd != END_TSO_QUEUE) {
2196       ASSERT(run_queue_tl != END_TSO_QUEUE);
2197       evac((StgClosure **)&run_queue_hd);
2198       evac((StgClosure **)&run_queue_tl);
2199   }
2200   
2201   if (blocked_queue_hd != END_TSO_QUEUE) {
2202       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2203       evac((StgClosure **)&blocked_queue_hd);
2204       evac((StgClosure **)&blocked_queue_tl);
2205   }
2206   
2207   if (sleeping_queue != END_TSO_QUEUE) {
2208       evac((StgClosure **)&sleeping_queue);
2209   }
2210 #endif 
2211
2212   for (m = main_threads; m != NULL; m = m->link) {
2213       evac((StgClosure **)&m->tso);
2214   }
2215   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2216       evac((StgClosure **)&suspended_ccalling_threads);
2217   }
2218
2219 #if defined(SMP) || defined(PAR) || defined(GRAN)
2220   markSparkQueue(evac);
2221 #endif
2222 }
2223
2224 /* -----------------------------------------------------------------------------
2225    performGC
2226
2227    This is the interface to the garbage collector from Haskell land.
2228    We provide this so that external C code can allocate and garbage
2229    collect when called from Haskell via _ccall_GC.
2230
2231    It might be useful to provide an interface whereby the programmer
2232    can specify more roots (ToDo).
2233    
2234    This needs to be protected by the GC condition variable above.  KH.
2235    -------------------------------------------------------------------------- */
2236
2237 void (*extra_roots)(evac_fn);
2238
2239 void
2240 performGC(void)
2241 {
2242   GarbageCollect(GetRoots,rtsFalse);
2243 }
2244
2245 void
2246 performMajorGC(void)
2247 {
2248   GarbageCollect(GetRoots,rtsTrue);
2249 }
2250
2251 static void
2252 AllRoots(evac_fn evac)
2253 {
2254     GetRoots(evac);             // the scheduler's roots
2255     extra_roots(evac);          // the user's roots
2256 }
2257
2258 void
2259 performGCWithRoots(void (*get_roots)(evac_fn))
2260 {
2261   extra_roots = get_roots;
2262   GarbageCollect(AllRoots,rtsFalse);
2263 }
2264
2265 /* -----------------------------------------------------------------------------
2266    Stack overflow
2267
2268    If the thread has reached its maximum stack size, then raise the
2269    StackOverflow exception in the offending thread.  Otherwise
2270    relocate the TSO into a larger chunk of memory and adjust its stack
2271    size appropriately.
2272    -------------------------------------------------------------------------- */
2273
2274 static StgTSO *
2275 threadStackOverflow(StgTSO *tso)
2276 {
2277   nat new_stack_size, new_tso_size, diff, stack_words;
2278   StgPtr new_sp;
2279   StgTSO *dest;
2280
2281   IF_DEBUG(sanity,checkTSO(tso));
2282   if (tso->stack_size >= tso->max_stack_size) {
2283
2284     IF_DEBUG(gc,
2285              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2286                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2287              /* If we're debugging, just print out the top of the stack */
2288              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2289                                               tso->sp+64)));
2290
2291     /* Send this thread the StackOverflow exception */
2292     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2293     return tso;
2294   }
2295
2296   /* Try to double the current stack size.  If that takes us over the
2297    * maximum stack size for this thread, then use the maximum instead.
2298    * Finally round up so the TSO ends up as a whole number of blocks.
2299    */
2300   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2301   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2302                                        TSO_STRUCT_SIZE)/sizeof(W_);
2303   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2304   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2305
2306   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2307
2308   dest = (StgTSO *)allocate(new_tso_size);
2309   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2310
2311   /* copy the TSO block and the old stack into the new area */
2312   memcpy(dest,tso,TSO_STRUCT_SIZE);
2313   stack_words = tso->stack + tso->stack_size - tso->sp;
2314   new_sp = (P_)dest + new_tso_size - stack_words;
2315   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2316
2317   /* relocate the stack pointers... */
2318   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2319   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2320   dest->sp    = new_sp;
2321   dest->stack_size = new_stack_size;
2322         
2323   /* and relocate the update frame list */
2324   relocate_stack(dest, diff);
2325
2326   /* Mark the old TSO as relocated.  We have to check for relocated
2327    * TSOs in the garbage collector and any primops that deal with TSOs.
2328    *
2329    * It's important to set the sp and su values to just beyond the end
2330    * of the stack, so we don't attempt to scavenge any part of the
2331    * dead TSO's stack.
2332    */
2333   tso->what_next = ThreadRelocated;
2334   tso->link = dest;
2335   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2336   tso->su = (StgUpdateFrame *)tso->sp;
2337   tso->why_blocked = NotBlocked;
2338   dest->mut_link = NULL;
2339
2340   IF_PAR_DEBUG(verbose,
2341                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2342                      tso->id, tso, tso->stack_size);
2343                /* If we're debugging, just print out the top of the stack */
2344                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2345                                                 tso->sp+64)));
2346   
2347   IF_DEBUG(sanity,checkTSO(tso));
2348 #if 0
2349   IF_DEBUG(scheduler,printTSO(dest));
2350 #endif
2351
2352   return dest;
2353 }
2354
2355 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2356 //@subsection Blocking Queue Routines
2357
2358 /* ---------------------------------------------------------------------------
2359    Wake up a queue that was blocked on some resource.
2360    ------------------------------------------------------------------------ */
2361
2362 #if defined(GRAN)
2363 static inline void
2364 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2365 {
2366 }
2367 #elif defined(PAR)
2368 static inline void
2369 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2370 {
2371   /* write RESUME events to log file and
2372      update blocked and fetch time (depending on type of the orig closure) */
2373   if (RtsFlags.ParFlags.ParStats.Full) {
2374     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2375                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2376                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2377     if (EMPTY_RUN_QUEUE())
2378       emitSchedule = rtsTrue;
2379
2380     switch (get_itbl(node)->type) {
2381         case FETCH_ME_BQ:
2382           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2383           break;
2384         case RBH:
2385         case FETCH_ME:
2386         case BLACKHOLE_BQ:
2387           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2388           break;
2389 #ifdef DIST
2390         case MVAR:
2391           break;
2392 #endif    
2393         default:
2394           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2395         }
2396       }
2397 }
2398 #endif
2399
2400 #if defined(GRAN)
2401 static StgBlockingQueueElement *
2402 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2403 {
2404     StgTSO *tso;
2405     PEs node_loc, tso_loc;
2406
2407     node_loc = where_is(node); // should be lifted out of loop
2408     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2409     tso_loc = where_is((StgClosure *)tso);
2410     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2411       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2412       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2413       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2414       // insertThread(tso, node_loc);
2415       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2416                 ResumeThread,
2417                 tso, node, (rtsSpark*)NULL);
2418       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2419       // len_local++;
2420       // len++;
2421     } else { // TSO is remote (actually should be FMBQ)
2422       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2423                                   RtsFlags.GranFlags.Costs.gunblocktime +
2424                                   RtsFlags.GranFlags.Costs.latency;
2425       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2426                 UnblockThread,
2427                 tso, node, (rtsSpark*)NULL);
2428       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2429       // len++;
2430     }
2431     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2432     IF_GRAN_DEBUG(bq,
2433                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2434                           (node_loc==tso_loc ? "Local" : "Global"), 
2435                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2436     tso->block_info.closure = NULL;
2437     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2438                              tso->id, tso));
2439 }
2440 #elif defined(PAR)
2441 static StgBlockingQueueElement *
2442 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2443 {
2444     StgBlockingQueueElement *next;
2445
2446     switch (get_itbl(bqe)->type) {
2447     case TSO:
2448       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2449       /* if it's a TSO just push it onto the run_queue */
2450       next = bqe->link;
2451       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2452       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2453       THREAD_RUNNABLE();
2454       unblockCount(bqe, node);
2455       /* reset blocking status after dumping event */
2456       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2457       break;
2458
2459     case BLOCKED_FETCH:
2460       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2461       next = bqe->link;
2462       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2463       PendingFetches = (StgBlockedFetch *)bqe;
2464       break;
2465
2466 # if defined(DEBUG)
2467       /* can ignore this case in a non-debugging setup; 
2468          see comments on RBHSave closures above */
2469     case CONSTR:
2470       /* check that the closure is an RBHSave closure */
2471       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2472              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2473              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2474       break;
2475
2476     default:
2477       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2478            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2479            (StgClosure *)bqe);
2480 # endif
2481     }
2482   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2483   return next;
2484 }
2485
2486 #else /* !GRAN && !PAR */
2487 static StgTSO *
2488 unblockOneLocked(StgTSO *tso)
2489 {
2490   StgTSO *next;
2491
2492   ASSERT(get_itbl(tso)->type == TSO);
2493   ASSERT(tso->why_blocked != NotBlocked);
2494   tso->why_blocked = NotBlocked;
2495   next = tso->link;
2496   PUSH_ON_RUN_QUEUE(tso);
2497   THREAD_RUNNABLE();
2498   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2499   return next;
2500 }
2501 #endif
2502
2503 #if defined(GRAN) || defined(PAR)
2504 inline StgBlockingQueueElement *
2505 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2506 {
2507   ACQUIRE_LOCK(&sched_mutex);
2508   bqe = unblockOneLocked(bqe, node);
2509   RELEASE_LOCK(&sched_mutex);
2510   return bqe;
2511 }
2512 #else
2513 inline StgTSO *
2514 unblockOne(StgTSO *tso)
2515 {
2516   ACQUIRE_LOCK(&sched_mutex);
2517   tso = unblockOneLocked(tso);
2518   RELEASE_LOCK(&sched_mutex);
2519   return tso;
2520 }
2521 #endif
2522
2523 #if defined(GRAN)
2524 void 
2525 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2526 {
2527   StgBlockingQueueElement *bqe;
2528   PEs node_loc;
2529   nat len = 0; 
2530
2531   IF_GRAN_DEBUG(bq, 
2532                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2533                       node, CurrentProc, CurrentTime[CurrentProc], 
2534                       CurrentTSO->id, CurrentTSO));
2535
2536   node_loc = where_is(node);
2537
2538   ASSERT(q == END_BQ_QUEUE ||
2539          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2540          get_itbl(q)->type == CONSTR); // closure (type constructor)
2541   ASSERT(is_unique(node));
2542
2543   /* FAKE FETCH: magically copy the node to the tso's proc;
2544      no Fetch necessary because in reality the node should not have been 
2545      moved to the other PE in the first place
2546   */
2547   if (CurrentProc!=node_loc) {
2548     IF_GRAN_DEBUG(bq, 
2549                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2550                         node, node_loc, CurrentProc, CurrentTSO->id, 
2551                         // CurrentTSO, where_is(CurrentTSO),
2552                         node->header.gran.procs));
2553     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2554     IF_GRAN_DEBUG(bq, 
2555                   belch("## new bitmask of node %p is %#x",
2556                         node, node->header.gran.procs));
2557     if (RtsFlags.GranFlags.GranSimStats.Global) {
2558       globalGranStats.tot_fake_fetches++;
2559     }
2560   }
2561
2562   bqe = q;
2563   // ToDo: check: ASSERT(CurrentProc==node_loc);
2564   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2565     //next = bqe->link;
2566     /* 
2567        bqe points to the current element in the queue
2568        next points to the next element in the queue
2569     */
2570     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2571     //tso_loc = where_is(tso);
2572     len++;
2573     bqe = unblockOneLocked(bqe, node);
2574   }
2575
2576   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2577      the closure to make room for the anchor of the BQ */
2578   if (bqe!=END_BQ_QUEUE) {
2579     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2580     /*
2581     ASSERT((info_ptr==&RBH_Save_0_info) ||
2582            (info_ptr==&RBH_Save_1_info) ||
2583            (info_ptr==&RBH_Save_2_info));
2584     */
2585     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2586     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2587     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2588
2589     IF_GRAN_DEBUG(bq,
2590                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2591                         node, info_type(node)));
2592   }
2593
2594   /* statistics gathering */
2595   if (RtsFlags.GranFlags.GranSimStats.Global) {
2596     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2597     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2598     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2599     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2600   }
2601   IF_GRAN_DEBUG(bq,
2602                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2603                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2604 }
2605 #elif defined(PAR)
2606 void 
2607 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2608 {
2609   StgBlockingQueueElement *bqe;
2610
2611   ACQUIRE_LOCK(&sched_mutex);
2612
2613   IF_PAR_DEBUG(verbose, 
2614                belch("##-_ AwBQ for node %p on [%x]: ",
2615                      node, mytid));
2616 #ifdef DIST  
2617   //RFP
2618   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2619     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2620     return;
2621   }
2622 #endif
2623   
2624   ASSERT(q == END_BQ_QUEUE ||
2625          get_itbl(q)->type == TSO ||           
2626          get_itbl(q)->type == BLOCKED_FETCH || 
2627          get_itbl(q)->type == CONSTR); 
2628
2629   bqe = q;
2630   while (get_itbl(bqe)->type==TSO || 
2631          get_itbl(bqe)->type==BLOCKED_FETCH) {
2632     bqe = unblockOneLocked(bqe, node);
2633   }
2634   RELEASE_LOCK(&sched_mutex);
2635 }
2636
2637 #else   /* !GRAN && !PAR */
2638 void
2639 awakenBlockedQueue(StgTSO *tso)
2640 {
2641   ACQUIRE_LOCK(&sched_mutex);
2642   while (tso != END_TSO_QUEUE) {
2643     tso = unblockOneLocked(tso);
2644   }
2645   RELEASE_LOCK(&sched_mutex);
2646 }
2647 #endif
2648
2649 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2650 //@subsection Exception Handling Routines
2651
2652 /* ---------------------------------------------------------------------------
2653    Interrupt execution
2654    - usually called inside a signal handler so it mustn't do anything fancy.   
2655    ------------------------------------------------------------------------ */
2656
2657 void
2658 interruptStgRts(void)
2659 {
2660     interrupted    = 1;
2661     context_switch = 1;
2662 }
2663
2664 /* -----------------------------------------------------------------------------
2665    Unblock a thread
2666
2667    This is for use when we raise an exception in another thread, which
2668    may be blocked.
2669    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2670    -------------------------------------------------------------------------- */
2671
2672 #if defined(GRAN) || defined(PAR)
2673 /*
2674   NB: only the type of the blocking queue is different in GranSim and GUM
2675       the operations on the queue-elements are the same
2676       long live polymorphism!
2677 */
2678 static void
2679 unblockThread(StgTSO *tso)
2680 {
2681   StgBlockingQueueElement *t, **last;
2682
2683   ACQUIRE_LOCK(&sched_mutex);
2684   switch (tso->why_blocked) {
2685
2686   case NotBlocked:
2687     return;  /* not blocked */
2688
2689   case BlockedOnMVar:
2690     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2691     {
2692       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2693       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2694
2695       last = (StgBlockingQueueElement **)&mvar->head;
2696       for (t = (StgBlockingQueueElement *)mvar->head; 
2697            t != END_BQ_QUEUE; 
2698            last = &t->link, last_tso = t, t = t->link) {
2699         if (t == (StgBlockingQueueElement *)tso) {
2700           *last = (StgBlockingQueueElement *)tso->link;
2701           if (mvar->tail == tso) {
2702             mvar->tail = (StgTSO *)last_tso;
2703           }
2704           goto done;
2705         }
2706       }
2707       barf("unblockThread (MVAR): TSO not found");
2708     }
2709
2710   case BlockedOnBlackHole:
2711     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2712     {
2713       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2714
2715       last = &bq->blocking_queue;
2716       for (t = bq->blocking_queue; 
2717            t != END_BQ_QUEUE; 
2718            last = &t->link, t = t->link) {
2719         if (t == (StgBlockingQueueElement *)tso) {
2720           *last = (StgBlockingQueueElement *)tso->link;
2721           goto done;
2722         }
2723       }
2724       barf("unblockThread (BLACKHOLE): TSO not found");
2725     }
2726
2727   case BlockedOnException:
2728     {
2729       StgTSO *target  = tso->block_info.tso;
2730
2731       ASSERT(get_itbl(target)->type == TSO);
2732
2733       if (target->what_next == ThreadRelocated) {
2734           target = target->link;
2735           ASSERT(get_itbl(target)->type == TSO);
2736       }
2737
2738       ASSERT(target->blocked_exceptions != NULL);
2739
2740       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2741       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2742            t != END_BQ_QUEUE; 
2743            last = &t->link, t = t->link) {
2744         ASSERT(get_itbl(t)->type == TSO);
2745         if (t == (StgBlockingQueueElement *)tso) {
2746           *last = (StgBlockingQueueElement *)tso->link;
2747           goto done;
2748         }
2749       }
2750       barf("unblockThread (Exception): TSO not found");
2751     }
2752
2753   case BlockedOnRead:
2754   case BlockedOnWrite:
2755     {
2756       /* take TSO off blocked_queue */
2757       StgBlockingQueueElement *prev = NULL;
2758       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2759            prev = t, t = t->link) {
2760         if (t == (StgBlockingQueueElement *)tso) {
2761           if (prev == NULL) {
2762             blocked_queue_hd = (StgTSO *)t->link;
2763             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2764               blocked_queue_tl = END_TSO_QUEUE;
2765             }
2766           } else {
2767             prev->link = t->link;
2768             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2769               blocked_queue_tl = (StgTSO *)prev;
2770             }
2771           }
2772           goto done;
2773         }
2774       }
2775       barf("unblockThread (I/O): TSO not found");
2776     }
2777
2778   case BlockedOnDelay:
2779     {
2780       /* take TSO off sleeping_queue */
2781       StgBlockingQueueElement *prev = NULL;
2782       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2783            prev = t, t = t->link) {
2784         if (t == (StgBlockingQueueElement *)tso) {
2785           if (prev == NULL) {
2786             sleeping_queue = (StgTSO *)t->link;
2787           } else {
2788             prev->link = t->link;
2789           }
2790           goto done;
2791         }
2792       }
2793       barf("unblockThread (I/O): TSO not found");
2794     }
2795
2796   default:
2797     barf("unblockThread");
2798   }
2799
2800  done:
2801   tso->link = END_TSO_QUEUE;
2802   tso->why_blocked = NotBlocked;
2803   tso->block_info.closure = NULL;
2804   PUSH_ON_RUN_QUEUE(tso);
2805   RELEASE_LOCK(&sched_mutex);
2806 }
2807 #else
2808 static void
2809 unblockThread(StgTSO *tso)
2810 {
2811   StgTSO *t, **last;
2812
2813   ACQUIRE_LOCK(&sched_mutex);
2814   switch (tso->why_blocked) {
2815
2816   case NotBlocked:
2817     return;  /* not blocked */
2818
2819   case BlockedOnMVar:
2820     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2821     {
2822       StgTSO *last_tso = END_TSO_QUEUE;
2823       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2824
2825       last = &mvar->head;
2826       for (t = mvar->head; t != END_TSO_QUEUE; 
2827            last = &t->link, last_tso = t, t = t->link) {
2828         if (t == tso) {
2829           *last = tso->link;
2830           if (mvar->tail == tso) {
2831             mvar->tail = last_tso;
2832           }
2833           goto done;
2834         }
2835       }
2836       barf("unblockThread (MVAR): TSO not found");
2837     }
2838
2839   case BlockedOnBlackHole:
2840     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2841     {
2842       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2843
2844       last = &bq->blocking_queue;
2845       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2846            last = &t->link, t = t->link) {
2847         if (t == tso) {
2848           *last = tso->link;
2849           goto done;
2850         }
2851       }
2852       barf("unblockThread (BLACKHOLE): TSO not found");
2853     }
2854
2855   case BlockedOnException:
2856     {
2857       StgTSO *target  = tso->block_info.tso;
2858
2859       ASSERT(get_itbl(target)->type == TSO);
2860
2861       while (target->what_next == ThreadRelocated) {
2862           target = target->link;
2863           ASSERT(get_itbl(target)->type == TSO);
2864       }
2865       
2866       ASSERT(target->blocked_exceptions != NULL);
2867
2868       last = &target->blocked_exceptions;
2869       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2870            last = &t->link, t = t->link) {
2871         ASSERT(get_itbl(t)->type == TSO);
2872         if (t == tso) {
2873           *last = tso->link;
2874           goto done;
2875         }
2876       }
2877       barf("unblockThread (Exception): TSO not found");
2878     }
2879
2880   case BlockedOnRead:
2881   case BlockedOnWrite:
2882     {
2883       StgTSO *prev = NULL;
2884       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2885            prev = t, t = t->link) {
2886         if (t == tso) {
2887           if (prev == NULL) {
2888             blocked_queue_hd = t->link;
2889             if (blocked_queue_tl == t) {
2890               blocked_queue_tl = END_TSO_QUEUE;
2891             }
2892           } else {
2893             prev->link = t->link;
2894             if (blocked_queue_tl == t) {
2895               blocked_queue_tl = prev;
2896             }
2897           }
2898           goto done;
2899         }
2900       }
2901       barf("unblockThread (I/O): TSO not found");
2902     }
2903
2904   case BlockedOnDelay:
2905     {
2906       StgTSO *prev = NULL;
2907       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2908            prev = t, t = t->link) {
2909         if (t == tso) {
2910           if (prev == NULL) {
2911             sleeping_queue = t->link;
2912           } else {
2913             prev->link = t->link;
2914           }
2915           goto done;
2916         }
2917       }
2918       barf("unblockThread (I/O): TSO not found");
2919     }
2920
2921   default:
2922     barf("unblockThread");
2923   }
2924
2925  done:
2926   tso->link = END_TSO_QUEUE;
2927   tso->why_blocked = NotBlocked;
2928   tso->block_info.closure = NULL;
2929   PUSH_ON_RUN_QUEUE(tso);
2930   RELEASE_LOCK(&sched_mutex);
2931 }
2932 #endif
2933
2934 /* -----------------------------------------------------------------------------
2935  * raiseAsync()
2936  *
2937  * The following function implements the magic for raising an
2938  * asynchronous exception in an existing thread.
2939  *
2940  * We first remove the thread from any queue on which it might be
2941  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2942  *
2943  * We strip the stack down to the innermost CATCH_FRAME, building
2944  * thunks in the heap for all the active computations, so they can 
2945  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2946  * an application of the handler to the exception, and push it on
2947  * the top of the stack.
2948  * 
2949  * How exactly do we save all the active computations?  We create an
2950  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2951  * AP_UPDs pushes everything from the corresponding update frame
2952  * upwards onto the stack.  (Actually, it pushes everything up to the
2953  * next update frame plus a pointer to the next AP_UPD object.
2954  * Entering the next AP_UPD object pushes more onto the stack until we
2955  * reach the last AP_UPD object - at which point the stack should look
2956  * exactly as it did when we killed the TSO and we can continue
2957  * execution by entering the closure on top of the stack.
2958  *
2959  * We can also kill a thread entirely - this happens if either (a) the 
2960  * exception passed to raiseAsync is NULL, or (b) there's no
2961  * CATCH_FRAME on the stack.  In either case, we strip the entire
2962  * stack and replace the thread with a zombie.
2963  *
2964  * -------------------------------------------------------------------------- */
2965  
2966 void 
2967 deleteThread(StgTSO *tso)
2968 {
2969   raiseAsync(tso,NULL);
2970 }
2971
2972 void
2973 raiseAsync(StgTSO *tso, StgClosure *exception)
2974 {
2975   StgUpdateFrame* su = tso->su;
2976   StgPtr          sp = tso->sp;
2977   
2978   /* Thread already dead? */
2979   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2980     return;
2981   }
2982
2983   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2984
2985   /* Remove it from any blocking queues */
2986   unblockThread(tso);
2987
2988   /* The stack freezing code assumes there's a closure pointer on
2989    * the top of the stack.  This isn't always the case with compiled
2990    * code, so we have to push a dummy closure on the top which just
2991    * returns to the next return address on the stack.
2992    */
2993   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2994     *(--sp) = (W_)&stg_dummy_ret_closure;
2995   }
2996
2997   while (1) {
2998     nat words = ((P_)su - (P_)sp) - 1;
2999     nat i;
3000     StgAP_UPD * ap;
3001
3002     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3003      * then build PAP(handler,exception,realworld#), and leave it on
3004      * top of the stack ready to enter.
3005      */
3006     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3007       StgCatchFrame *cf = (StgCatchFrame *)su;
3008       /* we've got an exception to raise, so let's pass it to the
3009        * handler in this frame.
3010        */
3011       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3012       TICK_ALLOC_UPD_PAP(3,0);
3013       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3014               
3015       ap->n_args = 2;
3016       ap->fun = cf->handler;    /* :: Exception -> IO a */
3017       ap->payload[0] = exception;
3018       ap->payload[1] = ARG_TAG(0); /* realworld token */
3019
3020       /* throw away the stack from Sp up to and including the
3021        * CATCH_FRAME.
3022        */
3023       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3024       tso->su = cf->link;
3025
3026       /* Restore the blocked/unblocked state for asynchronous exceptions
3027        * at the CATCH_FRAME.  
3028        *
3029        * If exceptions were unblocked at the catch, arrange that they
3030        * are unblocked again after executing the handler by pushing an
3031        * unblockAsyncExceptions_ret stack frame.
3032        */
3033       if (!cf->exceptions_blocked) {
3034         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3035       }
3036       
3037       /* Ensure that async exceptions are blocked when running the handler.
3038        */
3039       if (tso->blocked_exceptions == NULL) {
3040         tso->blocked_exceptions = END_TSO_QUEUE;
3041       }
3042       
3043       /* Put the newly-built PAP on top of the stack, ready to execute
3044        * when the thread restarts.
3045        */
3046       sp[0] = (W_)ap;
3047       tso->sp = sp;
3048       tso->what_next = ThreadEnterGHC;
3049       IF_DEBUG(sanity, checkTSO(tso));
3050       return;
3051     }
3052
3053     /* First build an AP_UPD consisting of the stack chunk above the
3054      * current update frame, with the top word on the stack as the
3055      * fun field.
3056      */
3057     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3058     
3059     ASSERT(words >= 0);
3060     
3061     ap->n_args = words;
3062     ap->fun    = (StgClosure *)sp[0];
3063     sp++;
3064     for(i=0; i < (nat)words; ++i) {
3065       ap->payload[i] = (StgClosure *)*sp++;
3066     }
3067     
3068     switch (get_itbl(su)->type) {
3069       
3070     case UPDATE_FRAME:
3071       {
3072         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3073         TICK_ALLOC_UP_THK(words+1,0);
3074         
3075         IF_DEBUG(scheduler,
3076                  fprintf(stderr,  "scheduler: Updating ");
3077                  printPtr((P_)su->updatee); 
3078                  fprintf(stderr,  " with ");
3079                  printObj((StgClosure *)ap);
3080                  );
3081         
3082         /* Replace the updatee with an indirection - happily
3083          * this will also wake up any threads currently
3084          * waiting on the result.
3085          *
3086          * Warning: if we're in a loop, more than one update frame on
3087          * the stack may point to the same object.  Be careful not to
3088          * overwrite an IND_OLDGEN in this case, because we'll screw
3089          * up the mutable lists.  To be on the safe side, don't
3090          * overwrite any kind of indirection at all.  See also
3091          * threadSqueezeStack in GC.c, where we have to make a similar
3092          * check.
3093          */
3094         if (!closure_IND(su->updatee)) {
3095             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3096         }
3097         su = su->link;
3098         sp += sizeofW(StgUpdateFrame) -1;
3099         sp[0] = (W_)ap; /* push onto stack */
3100         break;
3101       }
3102
3103     case CATCH_FRAME:
3104       {
3105         StgCatchFrame *cf = (StgCatchFrame *)su;
3106         StgClosure* o;
3107         
3108         /* We want a PAP, not an AP_UPD.  Fortunately, the
3109          * layout's the same.
3110          */
3111         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3112         TICK_ALLOC_UPD_PAP(words+1,0);
3113         
3114         /* now build o = FUN(catch,ap,handler) */
3115         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3116         TICK_ALLOC_FUN(2,0);
3117         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3118         o->payload[0] = (StgClosure *)ap;
3119         o->payload[1] = cf->handler;
3120         
3121         IF_DEBUG(scheduler,
3122                  fprintf(stderr,  "scheduler: Built ");
3123                  printObj((StgClosure *)o);
3124                  );
3125         
3126         /* pop the old handler and put o on the stack */
3127         su = cf->link;
3128         sp += sizeofW(StgCatchFrame) - 1;
3129         sp[0] = (W_)o;
3130         break;
3131       }
3132       
3133     case SEQ_FRAME:
3134       {
3135         StgSeqFrame *sf = (StgSeqFrame *)su;
3136         StgClosure* o;
3137         
3138         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3139         TICK_ALLOC_UPD_PAP(words+1,0);
3140         
3141         /* now build o = FUN(seq,ap) */
3142         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3143         TICK_ALLOC_SE_THK(1,0);
3144         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3145         o->payload[0] = (StgClosure *)ap;
3146         
3147         IF_DEBUG(scheduler,
3148                  fprintf(stderr,  "scheduler: Built ");
3149                  printObj((StgClosure *)o);
3150                  );
3151         
3152         /* pop the old handler and put o on the stack */
3153         su = sf->link;
3154         sp += sizeofW(StgSeqFrame) - 1;
3155         sp[0] = (W_)o;
3156         break;
3157       }
3158       
3159     case STOP_FRAME:
3160       /* We've stripped the entire stack, the thread is now dead. */
3161       sp += sizeofW(StgStopFrame) - 1;
3162       sp[0] = (W_)exception;    /* save the exception */
3163       tso->what_next = ThreadKilled;
3164       tso->su = (StgUpdateFrame *)(sp+1);
3165       tso->sp = sp;
3166       return;
3167
3168     default:
3169       barf("raiseAsync");
3170     }
3171   }
3172   barf("raiseAsync");
3173 }
3174
3175 /* -----------------------------------------------------------------------------
3176    resurrectThreads is called after garbage collection on the list of
3177    threads found to be garbage.  Each of these threads will be woken
3178    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3179    on an MVar, or NonTermination if the thread was blocked on a Black
3180    Hole.
3181    -------------------------------------------------------------------------- */
3182
3183 void
3184 resurrectThreads( StgTSO *threads )
3185 {
3186   StgTSO *tso, *next;
3187
3188   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3189     next = tso->global_link;
3190     tso->global_link = all_threads;
3191     all_threads = tso;
3192     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3193
3194     switch (tso->why_blocked) {
3195     case BlockedOnMVar:
3196     case BlockedOnException:
3197       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3198       break;
3199     case BlockedOnBlackHole:
3200       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3201       break;
3202     case NotBlocked:
3203       /* This might happen if the thread was blocked on a black hole
3204        * belonging to a thread that we've just woken up (raiseAsync
3205        * can wake up threads, remember...).
3206        */
3207       continue;
3208     default:
3209       barf("resurrectThreads: thread blocked in a strange way");
3210     }
3211   }
3212 }
3213
3214 /* -----------------------------------------------------------------------------
3215  * Blackhole detection: if we reach a deadlock, test whether any
3216  * threads are blocked on themselves.  Any threads which are found to
3217  * be self-blocked get sent a NonTermination exception.
3218  *
3219  * This is only done in a deadlock situation in order to avoid
3220  * performance overhead in the normal case.
3221  * -------------------------------------------------------------------------- */
3222
3223 static void
3224 detectBlackHoles( void )
3225 {
3226     StgTSO *t = all_threads;
3227     StgUpdateFrame *frame;
3228     StgClosure *blocked_on;
3229
3230     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3231
3232         while (t->what_next == ThreadRelocated) {
3233             t = t->link;
3234             ASSERT(get_itbl(t)->type == TSO);
3235         }
3236       
3237         if (t->why_blocked != BlockedOnBlackHole) {
3238             continue;
3239         }
3240
3241         blocked_on = t->block_info.closure;
3242
3243         for (frame = t->su; ; frame = frame->link) {
3244             switch (get_itbl(frame)->type) {
3245
3246             case UPDATE_FRAME:
3247                 if (frame->updatee == blocked_on) {
3248                     /* We are blocking on one of our own computations, so
3249                      * send this thread the NonTermination exception.  
3250                      */
3251                     IF_DEBUG(scheduler, 
3252                              sched_belch("thread %d is blocked on itself", t->id));
3253                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3254                     goto done;
3255                 }
3256                 else {
3257                     continue;
3258                 }
3259
3260             case CATCH_FRAME:
3261             case SEQ_FRAME:
3262                 continue;
3263                 
3264             case STOP_FRAME:
3265                 break;
3266             }
3267             break;
3268         }
3269
3270     done: ;
3271     }   
3272 }
3273
3274 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3275 //@subsection Debugging Routines
3276
3277 /* -----------------------------------------------------------------------------
3278    Debugging: why is a thread blocked
3279    -------------------------------------------------------------------------- */
3280
3281 #ifdef DEBUG
3282
3283 void
3284 printThreadBlockage(StgTSO *tso)
3285 {
3286   switch (tso->why_blocked) {
3287   case BlockedOnRead:
3288     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3289     break;
3290   case BlockedOnWrite:
3291     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3292     break;
3293   case BlockedOnDelay:
3294     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3295     break;
3296   case BlockedOnMVar:
3297     fprintf(stderr,"is blocked on an MVar");
3298     break;
3299   case BlockedOnException:
3300     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3301             tso->block_info.tso->id);
3302     break;
3303   case BlockedOnBlackHole:
3304     fprintf(stderr,"is blocked on a black hole");
3305     break;
3306   case NotBlocked:
3307     fprintf(stderr,"is not blocked");
3308     break;
3309 #if defined(PAR)
3310   case BlockedOnGA:
3311     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3312             tso->block_info.closure, info_type(tso->block_info.closure));
3313     break;
3314   case BlockedOnGA_NoSend:
3315     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3316             tso->block_info.closure, info_type(tso->block_info.closure));
3317     break;
3318 #endif
3319   default:
3320     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3321          tso->why_blocked, tso->id, tso);
3322   }
3323 }
3324
3325 void
3326 printThreadStatus(StgTSO *tso)
3327 {
3328   switch (tso->what_next) {
3329   case ThreadKilled:
3330     fprintf(stderr,"has been killed");
3331     break;
3332   case ThreadComplete:
3333     fprintf(stderr,"has completed");
3334     break;
3335   default:
3336     printThreadBlockage(tso);
3337   }
3338 }
3339
3340 void
3341 printAllThreads(void)
3342 {
3343   StgTSO *t;
3344
3345 # if defined(GRAN)
3346   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3347   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3348                        time_string, rtsFalse/*no commas!*/);
3349
3350   sched_belch("all threads at [%s]:", time_string);
3351 # elif defined(PAR)
3352   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3353   ullong_format_string(CURRENT_TIME,
3354                        time_string, rtsFalse/*no commas!*/);
3355
3356   sched_belch("all threads at [%s]:", time_string);
3357 # else
3358   sched_belch("all threads:");
3359 # endif
3360
3361   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3362     fprintf(stderr, "\tthread %d ", t->id);
3363     printThreadStatus(t);
3364     fprintf(stderr,"\n");
3365   }
3366 }
3367     
3368 /* 
3369    Print a whole blocking queue attached to node (debugging only).
3370 */
3371 //@cindex print_bq
3372 # if defined(PAR)
3373 void 
3374 print_bq (StgClosure *node)
3375 {
3376   StgBlockingQueueElement *bqe;
3377   StgTSO *tso;
3378   rtsBool end;
3379
3380   fprintf(stderr,"## BQ of closure %p (%s): ",
3381           node, info_type(node));
3382
3383   /* should cover all closures that may have a blocking queue */
3384   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3385          get_itbl(node)->type == FETCH_ME_BQ ||
3386          get_itbl(node)->type == RBH ||
3387          get_itbl(node)->type == MVAR);
3388     
3389   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3390
3391   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3392 }
3393
3394 /* 
3395    Print a whole blocking queue starting with the element bqe.
3396 */
3397 void 
3398 print_bqe (StgBlockingQueueElement *bqe)
3399 {
3400   rtsBool end;
3401
3402   /* 
3403      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3404   */
3405   for (end = (bqe==END_BQ_QUEUE);
3406        !end; // iterate until bqe points to a CONSTR
3407        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3408        bqe = end ? END_BQ_QUEUE : bqe->link) {
3409     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3410     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3411     /* types of closures that may appear in a blocking queue */
3412     ASSERT(get_itbl(bqe)->type == TSO ||           
3413            get_itbl(bqe)->type == BLOCKED_FETCH || 
3414            get_itbl(bqe)->type == CONSTR); 
3415     /* only BQs of an RBH end with an RBH_Save closure */
3416     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3417
3418     switch (get_itbl(bqe)->type) {
3419     case TSO:
3420       fprintf(stderr," TSO %u (%x),",
3421               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3422       break;
3423     case BLOCKED_FETCH:
3424       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3425               ((StgBlockedFetch *)bqe)->node, 
3426               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3427               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3428               ((StgBlockedFetch *)bqe)->ga.weight);
3429       break;
3430     case CONSTR:
3431       fprintf(stderr," %s (IP %p),",
3432               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3433                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3434                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3435                "RBH_Save_?"), get_itbl(bqe));
3436       break;
3437     default:
3438       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3439            info_type((StgClosure *)bqe)); // , node, info_type(node));
3440       break;
3441     }
3442   } /* for */
3443   fputc('\n', stderr);
3444 }
3445 # elif defined(GRAN)
3446 void 
3447 print_bq (StgClosure *node)
3448 {
3449   StgBlockingQueueElement *bqe;
3450   PEs node_loc, tso_loc;
3451   rtsBool end;
3452
3453   /* should cover all closures that may have a blocking queue */
3454   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3455          get_itbl(node)->type == FETCH_ME_BQ ||
3456          get_itbl(node)->type == RBH);
3457     
3458   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3459   node_loc = where_is(node);
3460
3461   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3462           node, info_type(node), node_loc);
3463
3464   /* 
3465      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3466   */
3467   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3468        !end; // iterate until bqe points to a CONSTR
3469        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3470     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3471     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3472     /* types of closures that may appear in a blocking queue */
3473     ASSERT(get_itbl(bqe)->type == TSO ||           
3474            get_itbl(bqe)->type == CONSTR); 
3475     /* only BQs of an RBH end with an RBH_Save closure */
3476     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3477
3478     tso_loc = where_is((StgClosure *)bqe);
3479     switch (get_itbl(bqe)->type) {
3480     case TSO:
3481       fprintf(stderr," TSO %d (%p) on [PE %d],",
3482               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3483       break;
3484     case CONSTR:
3485       fprintf(stderr," %s (IP %p),",
3486               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3487                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3488                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3489                "RBH_Save_?"), get_itbl(bqe));
3490       break;
3491     default:
3492       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3493            info_type((StgClosure *)bqe), node, info_type(node));
3494       break;
3495     }
3496   } /* for */
3497   fputc('\n', stderr);
3498 }
3499 #else
3500 /* 
3501    Nice and easy: only TSOs on the blocking queue
3502 */
3503 void 
3504 print_bq (StgClosure *node)
3505 {
3506   StgTSO *tso;
3507
3508   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3509   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3510        tso != END_TSO_QUEUE; 
3511        tso=tso->link) {
3512     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3513     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3514     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3515   }
3516   fputc('\n', stderr);
3517 }
3518 # endif
3519
3520 #if defined(PAR)
3521 static nat
3522 run_queue_len(void)
3523 {
3524   nat i;
3525   StgTSO *tso;
3526
3527   for (i=0, tso=run_queue_hd; 
3528        tso != END_TSO_QUEUE;
3529        i++, tso=tso->link)
3530     /* nothing */
3531
3532   return i;
3533 }
3534 #endif
3535
3536 static void
3537 sched_belch(char *s, ...)
3538 {
3539   va_list ap;
3540   va_start(ap,s);
3541 #ifdef SMP
3542   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3543 #elif defined(PAR)
3544   fprintf(stderr, "== ");
3545 #else
3546   fprintf(stderr, "scheduler: ");
3547 #endif
3548   vfprintf(stderr, s, ap);
3549   fprintf(stderr, "\n");
3550 }
3551
3552 #endif /* DEBUG */
3553
3554
3555 //@node Index,  , Debugging Routines, Main scheduling code
3556 //@subsection Index
3557
3558 //@index
3559 //* MainRegTable::  @cindex\s-+MainRegTable
3560 //* StgMainThread::  @cindex\s-+StgMainThread
3561 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3562 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3563 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3564 //* context_switch::  @cindex\s-+context_switch
3565 //* createThread::  @cindex\s-+createThread
3566 //* free_capabilities::  @cindex\s-+free_capabilities
3567 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3568 //* initScheduler::  @cindex\s-+initScheduler
3569 //* interrupted::  @cindex\s-+interrupted
3570 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3571 //* next_thread_id::  @cindex\s-+next_thread_id
3572 //* print_bq::  @cindex\s-+print_bq
3573 //* run_queue_hd::  @cindex\s-+run_queue_hd
3574 //* run_queue_tl::  @cindex\s-+run_queue_tl
3575 //* sched_mutex::  @cindex\s-+sched_mutex
3576 //* schedule::  @cindex\s-+schedule
3577 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3578 //* task_ids::  @cindex\s-+task_ids
3579 //* term_mutex::  @cindex\s-+term_mutex
3580 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3581 //@end index