[project @ 2001-11-26 16:54:21 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.108 2001/11/26 16:54:22 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "PosixSource.h"
78 #include "Rts.h"
79 #include "SchedAPI.h"
80 #include "RtsUtils.h"
81 #include "RtsFlags.h"
82 #include "Storage.h"
83 #include "StgRun.h"
84 #include "StgStartup.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #ifdef PROFILING
99 #include "Proftimer.h"
100 #include "ProfHeap.h"
101 #include "RetainerProfile.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113
114 #include <stdarg.h>
115
116 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
117 //@subsection Variables and Data structures
118
119 /* Main threads:
120  *
121  * These are the threads which clients have requested that we run.  
122  *
123  * In an SMP build, we might have several concurrent clients all
124  * waiting for results, and each one will wait on a condition variable
125  * until the result is available.
126  *
127  * In non-SMP, clients are strictly nested: the first client calls
128  * into the RTS, which might call out again to C with a _ccall_GC, and
129  * eventually re-enter the RTS.
130  *
131  * Main threads information is kept in a linked list:
132  */
133 //@cindex StgMainThread
134 typedef struct StgMainThread_ {
135   StgTSO *         tso;
136   SchedulerStatus  stat;
137   StgClosure **    ret;
138 #ifdef SMP
139   pthread_cond_t wakeup;
140 #endif
141   struct StgMainThread_ *link;
142 } StgMainThread;
143
144 /* Main thread queue.
145  * Locks required: sched_mutex.
146  */
147 static StgMainThread *main_threads;
148
149 /* Thread queues.
150  * Locks required: sched_mutex.
151  */
152 #if defined(GRAN)
153
154 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
155 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
156
157 /* 
158    In GranSim we have a runable and a blocked queue for each processor.
159    In order to minimise code changes new arrays run_queue_hds/tls
160    are created. run_queue_hd is then a short cut (macro) for
161    run_queue_hds[CurrentProc] (see GranSim.h).
162    -- HWL
163 */
164 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
165 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
166 StgTSO *ccalling_threadss[MAX_PROC];
167 /* We use the same global list of threads (all_threads) in GranSim as in
168    the std RTS (i.e. we are cheating). However, we don't use this list in
169    the GranSim specific code at the moment (so we are only potentially
170    cheating).  */
171
172 #else /* !GRAN */
173
174 StgTSO *run_queue_hd, *run_queue_tl;
175 StgTSO *blocked_queue_hd, *blocked_queue_tl;
176 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
177
178 #endif
179
180 /* Linked list of all threads.
181  * Used for detecting garbage collected threads.
182  */
183 StgTSO *all_threads;
184
185 /* Threads suspended in _ccall_GC.
186  */
187 static StgTSO *suspended_ccalling_threads;
188
189 static StgTSO *threadStackOverflow(StgTSO *tso);
190
191 /* KH: The following two flags are shared memory locations.  There is no need
192        to lock them, since they are only unset at the end of a scheduler
193        operation.
194 */
195
196 /* flag set by signal handler to precipitate a context switch */
197 //@cindex context_switch
198 nat context_switch;
199
200 /* if this flag is set as well, give up execution */
201 //@cindex interrupted
202 rtsBool interrupted;
203
204 /* Next thread ID to allocate.
205  * Locks required: sched_mutex
206  */
207 //@cindex next_thread_id
208 StgThreadID next_thread_id = 1;
209
210 /*
211  * Pointers to the state of the current thread.
212  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
213  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
214  */
215  
216 /* The smallest stack size that makes any sense is:
217  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
218  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
219  *  + 1                       (the realworld token for an IO thread)
220  *  + 1                       (the closure to enter)
221  *
222  * A thread with this stack will bomb immediately with a stack
223  * overflow, which will increase its stack size.  
224  */
225
226 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
227
228 /* Free capability list.
229  * Locks required: sched_mutex.
230  */
231 #ifdef SMP
232 Capability *free_capabilities; /* Available capabilities for running threads */
233 nat n_free_capabilities;       /* total number of available capabilities */
234 #else
235 Capability MainCapability;     /* for non-SMP, we have one global capability */
236 #endif
237
238 #if defined(GRAN)
239 StgTSO *CurrentTSO;
240 #endif
241
242 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
243  *  exists - earlier gccs apparently didn't.
244  *  -= chak
245  */
246 StgTSO dummy_tso;
247
248 rtsBool ready_to_gc;
249
250 /* All our current task ids, saved in case we need to kill them later.
251  */
252 #ifdef SMP
253 //@cindex task_ids
254 task_info *task_ids;
255 #endif
256
257 void            addToBlockedQueue ( StgTSO *tso );
258
259 static void     schedule          ( void );
260        void     interruptStgRts   ( void );
261 #if defined(GRAN)
262 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
263 #else
264 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
265 #endif
266
267 static void     detectBlackHoles  ( void );
268
269 #ifdef DEBUG
270 static void sched_belch(char *s, ...);
271 #endif
272
273 #ifdef SMP
274 //@cindex sched_mutex
275 //@cindex term_mutex
276 //@cindex thread_ready_cond
277 //@cindex gc_pending_cond
278 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
279 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
280 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
281 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
282
283 nat await_death;
284 #endif
285
286 #if defined(PAR)
287 StgTSO *LastTSO;
288 rtsTime TimeOfLastYield;
289 rtsBool emitSchedule = rtsTrue;
290 #endif
291
292 #if DEBUG
293 char *whatNext_strs[] = {
294   "ThreadEnterGHC",
295   "ThreadRunGHC",
296   "ThreadEnterInterp",
297   "ThreadKilled",
298   "ThreadComplete"
299 };
300
301 char *threadReturnCode_strs[] = {
302   "HeapOverflow",                       /* might also be StackOverflow */
303   "StackOverflow",
304   "ThreadYielding",
305   "ThreadBlocked",
306   "ThreadFinished"
307 };
308 #endif
309
310 #ifdef PAR
311 StgTSO * createSparkThread(rtsSpark spark);
312 StgTSO * activateSpark (rtsSpark spark);  
313 #endif
314
315 /*
316  * The thread state for the main thread.
317 // ToDo: check whether not needed any more
318 StgTSO   *MainTSO;
319  */
320
321 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
322 //@subsection Main scheduling loop
323
324 /* ---------------------------------------------------------------------------
325    Main scheduling loop.
326
327    We use round-robin scheduling, each thread returning to the
328    scheduler loop when one of these conditions is detected:
329
330       * out of heap space
331       * timer expires (thread yields)
332       * thread blocks
333       * thread ends
334       * stack overflow
335
336    Locking notes:  we acquire the scheduler lock once at the beginning
337    of the scheduler loop, and release it when
338     
339       * running a thread, or
340       * waiting for work, or
341       * waiting for a GC to complete.
342
343    GRAN version:
344      In a GranSim setup this loop iterates over the global event queue.
345      This revolves around the global event queue, which determines what 
346      to do next. Therefore, it's more complicated than either the 
347      concurrent or the parallel (GUM) setup.
348
349    GUM version:
350      GUM iterates over incoming messages.
351      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
352      and sends out a fish whenever it has nothing to do; in-between
353      doing the actual reductions (shared code below) it processes the
354      incoming messages and deals with delayed operations 
355      (see PendingFetches).
356      This is not the ugliest code you could imagine, but it's bloody close.
357
358    ------------------------------------------------------------------------ */
359 //@cindex schedule
360 static void
361 schedule( void )
362 {
363   StgTSO *t;
364   Capability *cap;
365   StgThreadReturnCode ret;
366 #if defined(GRAN)
367   rtsEvent *event;
368 #elif defined(PAR)
369   StgSparkPool *pool;
370   rtsSpark spark;
371   StgTSO *tso;
372   GlobalTaskId pe;
373   rtsBool receivedFinish = rtsFalse;
374 # if defined(DEBUG)
375   nat tp_size, sp_size; // stats only
376 # endif
377 #endif
378   rtsBool was_interrupted = rtsFalse;
379   
380   ACQUIRE_LOCK(&sched_mutex);
381
382 #if defined(GRAN)
383
384   /* set up first event to get things going */
385   /* ToDo: assign costs for system setup and init MainTSO ! */
386   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
387             ContinueThread, 
388             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
389
390   IF_DEBUG(gran,
391            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
392            G_TSO(CurrentTSO, 5));
393
394   if (RtsFlags.GranFlags.Light) {
395     /* Save current time; GranSim Light only */
396     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
397   }      
398
399   event = get_next_event();
400
401   while (event!=(rtsEvent*)NULL) {
402     /* Choose the processor with the next event */
403     CurrentProc = event->proc;
404     CurrentTSO = event->tso;
405
406 #elif defined(PAR)
407
408   while (!receivedFinish) {    /* set by processMessages */
409                                /* when receiving PP_FINISH message         */ 
410 #else
411
412   while (1) {
413
414 #endif
415
416     IF_DEBUG(scheduler, printAllThreads());
417
418     /* If we're interrupted (the user pressed ^C, or some other
419      * termination condition occurred), kill all the currently running
420      * threads.
421      */
422     if (interrupted) {
423       IF_DEBUG(scheduler, sched_belch("interrupted"));
424       deleteAllThreads();
425       interrupted = rtsFalse;
426       was_interrupted = rtsTrue;
427     }
428
429     /* Go through the list of main threads and wake up any
430      * clients whose computations have finished.  ToDo: this
431      * should be done more efficiently without a linear scan
432      * of the main threads list, somehow...
433      */
434 #ifdef SMP
435     { 
436       StgMainThread *m, **prev;
437       prev = &main_threads;
438       for (m = main_threads; m != NULL; m = m->link) {
439         switch (m->tso->what_next) {
440         case ThreadComplete:
441           if (m->ret) {
442             *(m->ret) = (StgClosure *)m->tso->sp[0];
443           }
444           *prev = m->link;
445           m->stat = Success;
446           pthread_cond_broadcast(&m->wakeup);
447           break;
448         case ThreadKilled:
449           if (m->ret) *(m->ret) = NULL;
450           *prev = m->link;
451           if (was_interrupted) {
452             m->stat = Interrupted;
453           } else {
454             m->stat = Killed;
455           }
456           pthread_cond_broadcast(&m->wakeup);
457           break;
458         default:
459           break;
460         }
461       }
462     }
463
464 #else // not SMP
465
466 # if defined(PAR)
467     /* in GUM do this only on the Main PE */
468     if (IAmMainThread)
469 # endif
470     /* If our main thread has finished or been killed, return.
471      */
472     {
473       StgMainThread *m = main_threads;
474       if (m->tso->what_next == ThreadComplete
475           || m->tso->what_next == ThreadKilled) {
476         main_threads = main_threads->link;
477         if (m->tso->what_next == ThreadComplete) {
478           /* we finished successfully, fill in the return value */
479           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
480           m->stat = Success;
481           return;
482         } else {
483           if (m->ret) { *(m->ret) = NULL; };
484           if (was_interrupted) {
485             m->stat = Interrupted;
486           } else {
487             m->stat = Killed;
488           }
489           return;
490         }
491       }
492     }
493 #endif
494
495     /* Top up the run queue from our spark pool.  We try to make the
496      * number of threads in the run queue equal to the number of
497      * free capabilities.
498      */
499 #if defined(SMP)
500     {
501       nat n = n_free_capabilities;
502       StgTSO *tso = run_queue_hd;
503
504       /* Count the run queue */
505       while (n > 0 && tso != END_TSO_QUEUE) {
506         tso = tso->link;
507         n--;
508       }
509
510       for (; n > 0; n--) {
511         StgClosure *spark;
512         spark = findSpark(rtsFalse);
513         if (spark == NULL) {
514           break; /* no more sparks in the pool */
515         } else {
516           /* I'd prefer this to be done in activateSpark -- HWL */
517           /* tricky - it needs to hold the scheduler lock and
518            * not try to re-acquire it -- SDM */
519           createSparkThread(spark);       
520           IF_DEBUG(scheduler,
521                    sched_belch("==^^ turning spark of closure %p into a thread",
522                                (StgClosure *)spark));
523         }
524       }
525       /* We need to wake up the other tasks if we just created some
526        * work for them.
527        */
528       if (n_free_capabilities - n > 1) {
529           pthread_cond_signal(&thread_ready_cond);
530       }
531     }
532 #endif // SMP
533
534     /* check for signals each time around the scheduler */
535 #ifndef mingw32_TARGET_OS
536     if (signals_pending()) {
537       startSignalHandlers();
538     }
539 #endif
540
541     /* Check whether any waiting threads need to be woken up.  If the
542      * run queue is empty, and there are no other tasks running, we
543      * can wait indefinitely for something to happen.
544      * ToDo: what if another client comes along & requests another
545      * main thread?
546      */
547     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
548       awaitEvent(
549            (run_queue_hd == END_TSO_QUEUE)
550 #ifdef SMP
551         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
552 #endif
553         );
554     }
555     /* we can be interrupted while waiting for I/O... */
556     if (interrupted) continue;
557
558     /* 
559      * Detect deadlock: when we have no threads to run, there are no
560      * threads waiting on I/O or sleeping, and all the other tasks are
561      * waiting for work, we must have a deadlock of some description.
562      *
563      * We first try to find threads blocked on themselves (ie. black
564      * holes), and generate NonTermination exceptions where necessary.
565      *
566      * If no threads are black holed, we have a deadlock situation, so
567      * inform all the main threads.
568      */
569 #ifndef PAR
570     if (blocked_queue_hd == END_TSO_QUEUE
571         && run_queue_hd == END_TSO_QUEUE
572         && sleeping_queue == END_TSO_QUEUE
573 #ifdef SMP
574         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
575 #endif
576         )
577     {
578         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
579         GarbageCollect(GetRoots,rtsTrue);
580         if (blocked_queue_hd == END_TSO_QUEUE
581             && run_queue_hd == END_TSO_QUEUE
582             && sleeping_queue == END_TSO_QUEUE) {
583             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
584             detectBlackHoles();
585             if (run_queue_hd == END_TSO_QUEUE) {
586                 StgMainThread *m = main_threads;
587 #ifdef SMP
588                 for (; m != NULL; m = m->link) {
589                     deleteThread(m->tso);
590                     m->ret = NULL;
591                     m->stat = Deadlock;
592                     pthread_cond_broadcast(&m->wakeup);
593                 }
594                 main_threads = NULL;
595 #else
596                 deleteThread(m->tso);
597                 m->ret = NULL;
598                 m->stat = Deadlock;
599                 main_threads = m->link;
600                 return;
601 #endif
602             }
603         }
604     }
605 #elif defined(PAR)
606     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
607 #endif
608
609 #ifdef SMP
610     /* If there's a GC pending, don't do anything until it has
611      * completed.
612      */
613     if (ready_to_gc) {
614       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
615       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
616     }
617     
618     /* block until we've got a thread on the run queue and a free
619      * capability.
620      */
621     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
622       IF_DEBUG(scheduler, sched_belch("waiting for work"));
623       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
624       IF_DEBUG(scheduler, sched_belch("work now available"));
625     }
626 #endif
627
628 #if defined(GRAN)
629
630     if (RtsFlags.GranFlags.Light)
631       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
632
633     /* adjust time based on time-stamp */
634     if (event->time > CurrentTime[CurrentProc] &&
635         event->evttype != ContinueThread)
636       CurrentTime[CurrentProc] = event->time;
637     
638     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
639     if (!RtsFlags.GranFlags.Light)
640       handleIdlePEs();
641
642     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
643
644     /* main event dispatcher in GranSim */
645     switch (event->evttype) {
646       /* Should just be continuing execution */
647     case ContinueThread:
648       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
649       /* ToDo: check assertion
650       ASSERT(run_queue_hd != (StgTSO*)NULL &&
651              run_queue_hd != END_TSO_QUEUE);
652       */
653       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
654       if (!RtsFlags.GranFlags.DoAsyncFetch &&
655           procStatus[CurrentProc]==Fetching) {
656         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
657               CurrentTSO->id, CurrentTSO, CurrentProc);
658         goto next_thread;
659       } 
660       /* Ignore ContinueThreads for completed threads */
661       if (CurrentTSO->what_next == ThreadComplete) {
662         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
663               CurrentTSO->id, CurrentTSO, CurrentProc);
664         goto next_thread;
665       } 
666       /* Ignore ContinueThreads for threads that are being migrated */
667       if (PROCS(CurrentTSO)==Nowhere) { 
668         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
669               CurrentTSO->id, CurrentTSO, CurrentProc);
670         goto next_thread;
671       }
672       /* The thread should be at the beginning of the run queue */
673       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
674         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
675               CurrentTSO->id, CurrentTSO, CurrentProc);
676         break; // run the thread anyway
677       }
678       /*
679       new_event(proc, proc, CurrentTime[proc],
680                 FindWork,
681                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
682       goto next_thread; 
683       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
684       break; // now actually run the thread; DaH Qu'vam yImuHbej 
685
686     case FetchNode:
687       do_the_fetchnode(event);
688       goto next_thread;             /* handle next event in event queue  */
689       
690     case GlobalBlock:
691       do_the_globalblock(event);
692       goto next_thread;             /* handle next event in event queue  */
693       
694     case FetchReply:
695       do_the_fetchreply(event);
696       goto next_thread;             /* handle next event in event queue  */
697       
698     case UnblockThread:   /* Move from the blocked queue to the tail of */
699       do_the_unblock(event);
700       goto next_thread;             /* handle next event in event queue  */
701       
702     case ResumeThread:  /* Move from the blocked queue to the tail of */
703       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
704       event->tso->gran.blocktime += 
705         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
706       do_the_startthread(event);
707       goto next_thread;             /* handle next event in event queue  */
708       
709     case StartThread:
710       do_the_startthread(event);
711       goto next_thread;             /* handle next event in event queue  */
712       
713     case MoveThread:
714       do_the_movethread(event);
715       goto next_thread;             /* handle next event in event queue  */
716       
717     case MoveSpark:
718       do_the_movespark(event);
719       goto next_thread;             /* handle next event in event queue  */
720       
721     case FindWork:
722       do_the_findwork(event);
723       goto next_thread;             /* handle next event in event queue  */
724       
725     default:
726       barf("Illegal event type %u\n", event->evttype);
727     }  /* switch */
728     
729     /* This point was scheduler_loop in the old RTS */
730
731     IF_DEBUG(gran, belch("GRAN: after main switch"));
732
733     TimeOfLastEvent = CurrentTime[CurrentProc];
734     TimeOfNextEvent = get_time_of_next_event();
735     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
736     // CurrentTSO = ThreadQueueHd;
737
738     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
739                          TimeOfNextEvent));
740
741     if (RtsFlags.GranFlags.Light) 
742       GranSimLight_leave_system(event, &ActiveTSO); 
743
744     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
745
746     IF_DEBUG(gran, 
747              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
748
749     /* in a GranSim setup the TSO stays on the run queue */
750     t = CurrentTSO;
751     /* Take a thread from the run queue. */
752     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
753
754     IF_DEBUG(gran, 
755              fprintf(stderr, "GRAN: About to run current thread, which is\n");
756              G_TSO(t,5));
757
758     context_switch = 0; // turned on via GranYield, checking events and time slice
759
760     IF_DEBUG(gran, 
761              DumpGranEvent(GR_SCHEDULE, t));
762
763     procStatus[CurrentProc] = Busy;
764
765 #elif defined(PAR)
766     if (PendingFetches != END_BF_QUEUE) {
767         processFetches();
768     }
769
770     /* ToDo: phps merge with spark activation above */
771     /* check whether we have local work and send requests if we have none */
772     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
773       /* :-[  no local threads => look out for local sparks */
774       /* the spark pool for the current PE */
775       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
776       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
777           pool->hd < pool->tl) {
778         /* 
779          * ToDo: add GC code check that we really have enough heap afterwards!!
780          * Old comment:
781          * If we're here (no runnable threads) and we have pending
782          * sparks, we must have a space problem.  Get enough space
783          * to turn one of those pending sparks into a
784          * thread... 
785          */
786
787         spark = findSpark(rtsFalse);                /* get a spark */
788         if (spark != (rtsSpark) NULL) {
789           tso = activateSpark(spark);       /* turn the spark into a thread */
790           IF_PAR_DEBUG(schedule,
791                        belch("==== schedule: Created TSO %d (%p); %d threads active",
792                              tso->id, tso, advisory_thread_count));
793
794           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
795             belch("==^^ failed to activate spark");
796             goto next_thread;
797           }               /* otherwise fall through & pick-up new tso */
798         } else {
799           IF_PAR_DEBUG(verbose,
800                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
801                              spark_queue_len(pool)));
802           goto next_thread;
803         }
804       }
805
806       /* If we still have no work we need to send a FISH to get a spark
807          from another PE 
808       */
809       if (EMPTY_RUN_QUEUE()) {
810       /* =8-[  no local sparks => look for work on other PEs */
811         /*
812          * We really have absolutely no work.  Send out a fish
813          * (there may be some out there already), and wait for
814          * something to arrive.  We clearly can't run any threads
815          * until a SCHEDULE or RESUME arrives, and so that's what
816          * we're hoping to see.  (Of course, we still have to
817          * respond to other types of messages.)
818          */
819         TIME now = msTime() /*CURRENT_TIME*/;
820         IF_PAR_DEBUG(verbose, 
821                      belch("--  now=%ld", now));
822         IF_PAR_DEBUG(verbose,
823                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
824                          (last_fish_arrived_at!=0 &&
825                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
826                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
827                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
828                              last_fish_arrived_at,
829                              RtsFlags.ParFlags.fishDelay, now);
830                      });
831         
832         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
833             (last_fish_arrived_at==0 ||
834              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
835           /* outstandingFishes is set in sendFish, processFish;
836              avoid flooding system with fishes via delay */
837           pe = choosePE();
838           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
839                    NEW_FISH_HUNGER);
840
841           // Global statistics: count no. of fishes
842           if (RtsFlags.ParFlags.ParStats.Global &&
843               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
844             globalParStats.tot_fish_mess++;
845           }
846         }
847       
848         receivedFinish = processMessages();
849         goto next_thread;
850       }
851     } else if (PacketsWaiting()) {  /* Look for incoming messages */
852       receivedFinish = processMessages();
853     }
854
855     /* Now we are sure that we have some work available */
856     ASSERT(run_queue_hd != END_TSO_QUEUE);
857
858     /* Take a thread from the run queue, if we have work */
859     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
860     IF_DEBUG(sanity,checkTSO(t));
861
862     /* ToDo: write something to the log-file
863     if (RTSflags.ParFlags.granSimStats && !sameThread)
864         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
865
866     CurrentTSO = t;
867     */
868     /* the spark pool for the current PE */
869     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
870
871     IF_DEBUG(scheduler, 
872              belch("--=^ %d threads, %d sparks on [%#x]", 
873                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
874
875 #if 1
876     if (0 && RtsFlags.ParFlags.ParStats.Full && 
877         t && LastTSO && t->id != LastTSO->id && 
878         LastTSO->why_blocked == NotBlocked && 
879         LastTSO->what_next != ThreadComplete) {
880       // if previously scheduled TSO not blocked we have to record the context switch
881       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
882                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
883     }
884
885     if (RtsFlags.ParFlags.ParStats.Full && 
886         (emitSchedule /* forced emit */ ||
887         (t && LastTSO && t->id != LastTSO->id))) {
888       /* 
889          we are running a different TSO, so write a schedule event to log file
890          NB: If we use fair scheduling we also have to write  a deschedule 
891              event for LastTSO; with unfair scheduling we know that the
892              previous tso has blocked whenever we switch to another tso, so
893              we don't need it in GUM for now
894       */
895       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
896                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
897       emitSchedule = rtsFalse;
898     }
899      
900 #endif
901 #else /* !GRAN && !PAR */
902   
903     /* grab a thread from the run queue
904      */
905     ASSERT(run_queue_hd != END_TSO_QUEUE);
906     t = POP_RUN_QUEUE();
907
908     // Sanity check the thread we're about to run.  This can be
909     // expensive if there is lots of thread switching going on...
910     IF_DEBUG(sanity,checkTSO(t));
911
912 #endif
913     
914     /* grab a capability
915      */
916 #ifdef SMP
917     cap = free_capabilities;
918     free_capabilities = cap->link;
919     n_free_capabilities--;
920 #else
921     cap = &MainCapability;
922 #endif
923
924     cap->r.rCurrentTSO = t;
925     
926     /* context switches are now initiated by the timer signal, unless
927      * the user specified "context switch as often as possible", with
928      * +RTS -C0
929      */
930     if (
931 #ifdef PROFILING
932         RtsFlags.ProfFlags.profileInterval == 0 ||
933 #endif
934         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
935          && (run_queue_hd != END_TSO_QUEUE
936              || blocked_queue_hd != END_TSO_QUEUE
937              || sleeping_queue != END_TSO_QUEUE)))
938         context_switch = 1;
939     else
940         context_switch = 0;
941
942     RELEASE_LOCK(&sched_mutex);
943
944     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
945                               t->id, t, whatNext_strs[t->what_next]));
946
947 #ifdef PROFILING
948     startHeapProfTimer();
949 #endif
950
951     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
952     /* Run the current thread 
953      */
954     switch (cap->r.rCurrentTSO->what_next) {
955     case ThreadKilled:
956     case ThreadComplete:
957         /* Thread already finished, return to scheduler. */
958         ret = ThreadFinished;
959         break;
960     case ThreadEnterGHC:
961         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
962         break;
963     case ThreadRunGHC:
964         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
965         break;
966     case ThreadEnterInterp:
967         ret = interpretBCO(cap);
968         break;
969     default:
970       barf("schedule: invalid what_next field");
971     }
972     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
973     
974     /* Costs for the scheduler are assigned to CCS_SYSTEM */
975 #ifdef PROFILING
976     stopHeapProfTimer();
977     CCCS = CCS_SYSTEM;
978 #endif
979     
980     ACQUIRE_LOCK(&sched_mutex);
981
982 #ifdef SMP
983     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
984 #elif !defined(GRAN) && !defined(PAR)
985     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
986 #endif
987     t = cap->r.rCurrentTSO;
988     
989 #if defined(PAR)
990     /* HACK 675: if the last thread didn't yield, make sure to print a 
991        SCHEDULE event to the log file when StgRunning the next thread, even
992        if it is the same one as before */
993     LastTSO = t; 
994     TimeOfLastYield = CURRENT_TIME;
995 #endif
996
997     switch (ret) {
998     case HeapOverflow:
999 #if defined(GRAN)
1000       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1001       globalGranStats.tot_heapover++;
1002 #elif defined(PAR)
1003       globalParStats.tot_heapover++;
1004 #endif
1005
1006       // did the task ask for a large block?
1007       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1008           // if so, get one and push it on the front of the nursery.
1009           bdescr *bd;
1010           nat blocks;
1011           
1012           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1013
1014           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1015                                    t->id, t,
1016                                    whatNext_strs[t->what_next], blocks));
1017
1018           // don't do this if it would push us over the
1019           // alloc_blocks_lim limit; we'll GC first.
1020           if (alloc_blocks + blocks < alloc_blocks_lim) {
1021
1022               alloc_blocks += blocks;
1023               bd = allocGroup( blocks );
1024
1025               // link the new group into the list
1026               bd->link = cap->r.rCurrentNursery;
1027               bd->u.back = cap->r.rCurrentNursery->u.back;
1028               if (cap->r.rCurrentNursery->u.back != NULL) {
1029                   cap->r.rCurrentNursery->u.back->link = bd;
1030               } else {
1031                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1032                          g0s0->blocks == cap->r.rNursery);
1033                   cap->r.rNursery = g0s0->blocks = bd;
1034               }           
1035               cap->r.rCurrentNursery->u.back = bd;
1036
1037               // initialise it as a nursery block
1038               bd->step = g0s0;
1039               bd->gen_no = 0;
1040               bd->flags = 0;
1041               bd->free = bd->start;
1042
1043               // don't forget to update the block count in g0s0.
1044               g0s0->n_blocks += blocks;
1045               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1046
1047               // now update the nursery to point to the new block
1048               cap->r.rCurrentNursery = bd;
1049
1050               // we might be unlucky and have another thread get on the
1051               // run queue before us and steal the large block, but in that
1052               // case the thread will just end up requesting another large
1053               // block.
1054               PUSH_ON_RUN_QUEUE(t);
1055               break;
1056           }
1057       }
1058
1059       /* make all the running tasks block on a condition variable,
1060        * maybe set context_switch and wait till they all pile in,
1061        * then have them wait on a GC condition variable.
1062        */
1063       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1064                                t->id, t, whatNext_strs[t->what_next]));
1065       threadPaused(t);
1066 #if defined(GRAN)
1067       ASSERT(!is_on_queue(t,CurrentProc));
1068 #elif defined(PAR)
1069       /* Currently we emit a DESCHEDULE event before GC in GUM.
1070          ToDo: either add separate event to distinguish SYSTEM time from rest
1071                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1072       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1073         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1074                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1075         emitSchedule = rtsTrue;
1076       }
1077 #endif
1078       
1079       ready_to_gc = rtsTrue;
1080       context_switch = 1;               /* stop other threads ASAP */
1081       PUSH_ON_RUN_QUEUE(t);
1082       /* actual GC is done at the end of the while loop */
1083       break;
1084       
1085     case StackOverflow:
1086 #if defined(GRAN)
1087       IF_DEBUG(gran, 
1088                DumpGranEvent(GR_DESCHEDULE, t));
1089       globalGranStats.tot_stackover++;
1090 #elif defined(PAR)
1091       // IF_DEBUG(par, 
1092       // DumpGranEvent(GR_DESCHEDULE, t);
1093       globalParStats.tot_stackover++;
1094 #endif
1095       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1096                                t->id, t, whatNext_strs[t->what_next]));
1097       /* just adjust the stack for this thread, then pop it back
1098        * on the run queue.
1099        */
1100       threadPaused(t);
1101       { 
1102         StgMainThread *m;
1103         /* enlarge the stack */
1104         StgTSO *new_t = threadStackOverflow(t);
1105         
1106         /* This TSO has moved, so update any pointers to it from the
1107          * main thread stack.  It better not be on any other queues...
1108          * (it shouldn't be).
1109          */
1110         for (m = main_threads; m != NULL; m = m->link) {
1111           if (m->tso == t) {
1112             m->tso = new_t;
1113           }
1114         }
1115         threadPaused(new_t);
1116         PUSH_ON_RUN_QUEUE(new_t);
1117       }
1118       break;
1119
1120     case ThreadYielding:
1121 #if defined(GRAN)
1122       IF_DEBUG(gran, 
1123                DumpGranEvent(GR_DESCHEDULE, t));
1124       globalGranStats.tot_yields++;
1125 #elif defined(PAR)
1126       // IF_DEBUG(par, 
1127       // DumpGranEvent(GR_DESCHEDULE, t);
1128       globalParStats.tot_yields++;
1129 #endif
1130       /* put the thread back on the run queue.  Then, if we're ready to
1131        * GC, check whether this is the last task to stop.  If so, wake
1132        * up the GC thread.  getThread will block during a GC until the
1133        * GC is finished.
1134        */
1135       IF_DEBUG(scheduler,
1136                if (t->what_next == ThreadEnterInterp) {
1137                    /* ToDo: or maybe a timer expired when we were in Hugs?
1138                     * or maybe someone hit ctrl-C
1139                     */
1140                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1141                          t->id, t, whatNext_strs[t->what_next]);
1142                } else {
1143                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1144                          t->id, t, whatNext_strs[t->what_next]);
1145                }
1146                );
1147
1148       threadPaused(t);
1149
1150       IF_DEBUG(sanity,
1151                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1152                checkTSO(t));
1153       ASSERT(t->link == END_TSO_QUEUE);
1154 #if defined(GRAN)
1155       ASSERT(!is_on_queue(t,CurrentProc));
1156
1157       IF_DEBUG(sanity,
1158                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1159                checkThreadQsSanity(rtsTrue));
1160 #endif
1161 #if defined(PAR)
1162       if (RtsFlags.ParFlags.doFairScheduling) { 
1163         /* this does round-robin scheduling; good for concurrency */
1164         APPEND_TO_RUN_QUEUE(t);
1165       } else {
1166         /* this does unfair scheduling; good for parallelism */
1167         PUSH_ON_RUN_QUEUE(t);
1168       }
1169 #else
1170       /* this does round-robin scheduling; good for concurrency */
1171       APPEND_TO_RUN_QUEUE(t);
1172 #endif
1173 #if defined(GRAN)
1174       /* add a ContinueThread event to actually process the thread */
1175       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1176                 ContinueThread,
1177                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1178       IF_GRAN_DEBUG(bq, 
1179                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1180                G_EVENTQ(0);
1181                G_CURR_THREADQ(0));
1182 #endif /* GRAN */
1183       break;
1184       
1185     case ThreadBlocked:
1186 #if defined(GRAN)
1187       IF_DEBUG(scheduler,
1188                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1189                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1190                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1191
1192       // ??? needed; should emit block before
1193       IF_DEBUG(gran, 
1194                DumpGranEvent(GR_DESCHEDULE, t)); 
1195       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1196       /*
1197         ngoq Dogh!
1198       ASSERT(procStatus[CurrentProc]==Busy || 
1199               ((procStatus[CurrentProc]==Fetching) && 
1200               (t->block_info.closure!=(StgClosure*)NULL)));
1201       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1202           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1203             procStatus[CurrentProc]==Fetching)) 
1204         procStatus[CurrentProc] = Idle;
1205       */
1206 #elif defined(PAR)
1207       IF_DEBUG(scheduler,
1208                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1209                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1210       IF_PAR_DEBUG(bq,
1211
1212                    if (t->block_info.closure!=(StgClosure*)NULL) 
1213                      print_bq(t->block_info.closure));
1214
1215       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1216       blockThread(t);
1217
1218       /* whatever we schedule next, we must log that schedule */
1219       emitSchedule = rtsTrue;
1220
1221 #else /* !GRAN */
1222       /* don't need to do anything.  Either the thread is blocked on
1223        * I/O, in which case we'll have called addToBlockedQueue
1224        * previously, or it's blocked on an MVar or Blackhole, in which
1225        * case it'll be on the relevant queue already.
1226        */
1227       IF_DEBUG(scheduler,
1228                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1229                printThreadBlockage(t);
1230                fprintf(stderr, "\n"));
1231
1232       /* Only for dumping event to log file 
1233          ToDo: do I need this in GranSim, too?
1234       blockThread(t);
1235       */
1236 #endif
1237       threadPaused(t);
1238       break;
1239       
1240     case ThreadFinished:
1241       /* Need to check whether this was a main thread, and if so, signal
1242        * the task that started it with the return value.  If we have no
1243        * more main threads, we probably need to stop all the tasks until
1244        * we get a new one.
1245        */
1246       /* We also end up here if the thread kills itself with an
1247        * uncaught exception, see Exception.hc.
1248        */
1249       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1250 #if defined(GRAN)
1251       endThread(t, CurrentProc); // clean-up the thread
1252 #elif defined(PAR)
1253       /* For now all are advisory -- HWL */
1254       //if(t->priority==AdvisoryPriority) ??
1255       advisory_thread_count--;
1256       
1257 # ifdef DIST
1258       if(t->dist.priority==RevalPriority)
1259         FinishReval(t);
1260 # endif
1261       
1262       if (RtsFlags.ParFlags.ParStats.Full &&
1263           !RtsFlags.ParFlags.ParStats.Suppressed) 
1264         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1265 #endif
1266       break;
1267       
1268     default:
1269       barf("schedule: invalid thread return code %d", (int)ret);
1270     }
1271     
1272 #ifdef SMP
1273     cap->link = free_capabilities;
1274     free_capabilities = cap;
1275     n_free_capabilities++;
1276 #endif
1277
1278 #ifdef PROFILING
1279     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1280         GarbageCollect(GetRoots, rtsTrue);
1281         heapCensus();
1282         performHeapProfile = rtsFalse;
1283         ready_to_gc = rtsFalse; // we already GC'd
1284     }
1285 #endif
1286
1287 #ifdef SMP
1288     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1289 #else
1290     if (ready_to_gc) 
1291 #endif
1292       {
1293       /* everybody back, start the GC.
1294        * Could do it in this thread, or signal a condition var
1295        * to do it in another thread.  Either way, we need to
1296        * broadcast on gc_pending_cond afterward.
1297        */
1298 #ifdef SMP
1299       IF_DEBUG(scheduler,sched_belch("doing GC"));
1300 #endif
1301       GarbageCollect(GetRoots,rtsFalse);
1302       ready_to_gc = rtsFalse;
1303 #ifdef SMP
1304       pthread_cond_broadcast(&gc_pending_cond);
1305 #endif
1306 #if defined(GRAN)
1307       /* add a ContinueThread event to continue execution of current thread */
1308       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1309                 ContinueThread,
1310                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1311       IF_GRAN_DEBUG(bq, 
1312                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1313                G_EVENTQ(0);
1314                G_CURR_THREADQ(0));
1315 #endif /* GRAN */
1316     }
1317
1318 #if defined(GRAN)
1319   next_thread:
1320     IF_GRAN_DEBUG(unused,
1321                   print_eventq(EventHd));
1322
1323     event = get_next_event();
1324 #elif defined(PAR)
1325   next_thread:
1326     /* ToDo: wait for next message to arrive rather than busy wait */
1327 #endif /* GRAN */
1328
1329   } /* end of while(1) */
1330
1331   IF_PAR_DEBUG(verbose,
1332                belch("== Leaving schedule() after having received Finish"));
1333 }
1334
1335 /* ---------------------------------------------------------------------------
1336  * deleteAllThreads():  kill all the live threads.
1337  *
1338  * This is used when we catch a user interrupt (^C), before performing
1339  * any necessary cleanups and running finalizers.
1340  * ------------------------------------------------------------------------- */
1341    
1342 void deleteAllThreads ( void )
1343 {
1344   StgTSO* t;
1345   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1346   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1347       deleteThread(t);
1348   }
1349   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1350       deleteThread(t);
1351   }
1352   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1353       deleteThread(t);
1354   }
1355   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1356   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1357   sleeping_queue = END_TSO_QUEUE;
1358 }
1359
1360 /* startThread and  insertThread are now in GranSim.c -- HWL */
1361
1362 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1363 //@subsection Suspend and Resume
1364
1365 /* ---------------------------------------------------------------------------
1366  * Suspending & resuming Haskell threads.
1367  * 
1368  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1369  * its capability before calling the C function.  This allows another
1370  * task to pick up the capability and carry on running Haskell
1371  * threads.  It also means that if the C call blocks, it won't lock
1372  * the whole system.
1373  *
1374  * The Haskell thread making the C call is put to sleep for the
1375  * duration of the call, on the susepended_ccalling_threads queue.  We
1376  * give out a token to the task, which it can use to resume the thread
1377  * on return from the C function.
1378  * ------------------------------------------------------------------------- */
1379    
1380 StgInt
1381 suspendThread( StgRegTable *reg )
1382 {
1383   nat tok;
1384   Capability *cap;
1385
1386   // assume that *reg is a pointer to the StgRegTable part of a Capability
1387   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1388
1389   ACQUIRE_LOCK(&sched_mutex);
1390
1391   IF_DEBUG(scheduler,
1392            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1393
1394   threadPaused(cap->r.rCurrentTSO);
1395   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1396   suspended_ccalling_threads = cap->r.rCurrentTSO;
1397
1398   /* Use the thread ID as the token; it should be unique */
1399   tok = cap->r.rCurrentTSO->id;
1400
1401 #ifdef SMP
1402   cap->link = free_capabilities;
1403   free_capabilities = cap;
1404   n_free_capabilities++;
1405 #endif
1406
1407   RELEASE_LOCK(&sched_mutex);
1408   return tok; 
1409 }
1410
1411 StgRegTable *
1412 resumeThread( StgInt tok )
1413 {
1414   StgTSO *tso, **prev;
1415   Capability *cap;
1416
1417   ACQUIRE_LOCK(&sched_mutex);
1418
1419   prev = &suspended_ccalling_threads;
1420   for (tso = suspended_ccalling_threads; 
1421        tso != END_TSO_QUEUE; 
1422        prev = &tso->link, tso = tso->link) {
1423     if (tso->id == (StgThreadID)tok) {
1424       *prev = tso->link;
1425       break;
1426     }
1427   }
1428   if (tso == END_TSO_QUEUE) {
1429     barf("resumeThread: thread not found");
1430   }
1431   tso->link = END_TSO_QUEUE;
1432
1433 #ifdef SMP
1434   while (free_capabilities == NULL) {
1435     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1436     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1437     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1438   }
1439   cap = free_capabilities;
1440   free_capabilities = cap->link;
1441   n_free_capabilities--;
1442 #else  
1443   cap = &MainCapability;
1444 #endif
1445
1446   cap->r.rCurrentTSO = tso;
1447
1448   RELEASE_LOCK(&sched_mutex);
1449   return &cap->r;
1450 }
1451
1452
1453 /* ---------------------------------------------------------------------------
1454  * Static functions
1455  * ------------------------------------------------------------------------ */
1456 static void unblockThread(StgTSO *tso);
1457
1458 /* ---------------------------------------------------------------------------
1459  * Comparing Thread ids.
1460  *
1461  * This is used from STG land in the implementation of the
1462  * instances of Eq/Ord for ThreadIds.
1463  * ------------------------------------------------------------------------ */
1464
1465 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1466
1467   StgThreadID id1 = tso1->id; 
1468   StgThreadID id2 = tso2->id;
1469  
1470   if (id1 < id2) return (-1);
1471   if (id1 > id2) return 1;
1472   return 0;
1473 }
1474
1475 /* ---------------------------------------------------------------------------
1476  * Fetching the ThreadID from an StgTSO.
1477  *
1478  * This is used in the implementation of Show for ThreadIds.
1479  * ------------------------------------------------------------------------ */
1480 int rts_getThreadId(const StgTSO *tso) 
1481 {
1482   return tso->id;
1483 }
1484
1485 /* ---------------------------------------------------------------------------
1486    Create a new thread.
1487
1488    The new thread starts with the given stack size.  Before the
1489    scheduler can run, however, this thread needs to have a closure
1490    (and possibly some arguments) pushed on its stack.  See
1491    pushClosure() in Schedule.h.
1492
1493    createGenThread() and createIOThread() (in SchedAPI.h) are
1494    convenient packaged versions of this function.
1495
1496    currently pri (priority) is only used in a GRAN setup -- HWL
1497    ------------------------------------------------------------------------ */
1498 //@cindex createThread
1499 #if defined(GRAN)
1500 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1501 StgTSO *
1502 createThread(nat stack_size, StgInt pri)
1503 {
1504   return createThread_(stack_size, rtsFalse, pri);
1505 }
1506
1507 static StgTSO *
1508 createThread_(nat size, rtsBool have_lock, StgInt pri)
1509 {
1510 #else
1511 StgTSO *
1512 createThread(nat stack_size)
1513 {
1514   return createThread_(stack_size, rtsFalse);
1515 }
1516
1517 static StgTSO *
1518 createThread_(nat size, rtsBool have_lock)
1519 {
1520 #endif
1521
1522     StgTSO *tso;
1523     nat stack_size;
1524
1525     /* First check whether we should create a thread at all */
1526 #if defined(PAR)
1527   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1528   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1529     threadsIgnored++;
1530     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1531           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1532     return END_TSO_QUEUE;
1533   }
1534   threadsCreated++;
1535 #endif
1536
1537 #if defined(GRAN)
1538   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1539 #endif
1540
1541   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1542
1543   /* catch ridiculously small stack sizes */
1544   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1545     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1546   }
1547
1548   stack_size = size - TSO_STRUCT_SIZEW;
1549
1550   tso = (StgTSO *)allocate(size);
1551   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1552
1553   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1554 #if defined(GRAN)
1555   SET_GRAN_HDR(tso, ThisPE);
1556 #endif
1557   tso->what_next     = ThreadEnterGHC;
1558
1559   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1560    * protect the increment operation on next_thread_id.
1561    * In future, we could use an atomic increment instead.
1562    */
1563   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1564   tso->id = next_thread_id++; 
1565   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1566
1567   tso->why_blocked  = NotBlocked;
1568   tso->blocked_exceptions = NULL;
1569
1570   tso->stack_size   = stack_size;
1571   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1572                               - TSO_STRUCT_SIZEW;
1573   tso->sp           = (P_)&(tso->stack) + stack_size;
1574
1575 #ifdef PROFILING
1576   tso->prof.CCCS = CCS_MAIN;
1577 #endif
1578
1579   /* put a stop frame on the stack */
1580   tso->sp -= sizeofW(StgStopFrame);
1581   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1582   tso->su = (StgUpdateFrame*)tso->sp;
1583
1584   // ToDo: check this
1585 #if defined(GRAN)
1586   tso->link = END_TSO_QUEUE;
1587   /* uses more flexible routine in GranSim */
1588   insertThread(tso, CurrentProc);
1589 #else
1590   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1591    * from its creation
1592    */
1593 #endif
1594
1595 #if defined(GRAN) 
1596   if (RtsFlags.GranFlags.GranSimStats.Full) 
1597     DumpGranEvent(GR_START,tso);
1598 #elif defined(PAR)
1599   if (RtsFlags.ParFlags.ParStats.Full) 
1600     DumpGranEvent(GR_STARTQ,tso);
1601   /* HACk to avoid SCHEDULE 
1602      LastTSO = tso; */
1603 #endif
1604
1605   /* Link the new thread on the global thread list.
1606    */
1607   tso->global_link = all_threads;
1608   all_threads = tso;
1609
1610 #if defined(DIST)
1611   tso->dist.priority = MandatoryPriority; //by default that is...
1612 #endif
1613
1614 #if defined(GRAN)
1615   tso->gran.pri = pri;
1616 # if defined(DEBUG)
1617   tso->gran.magic = TSO_MAGIC; // debugging only
1618 # endif
1619   tso->gran.sparkname   = 0;
1620   tso->gran.startedat   = CURRENT_TIME; 
1621   tso->gran.exported    = 0;
1622   tso->gran.basicblocks = 0;
1623   tso->gran.allocs      = 0;
1624   tso->gran.exectime    = 0;
1625   tso->gran.fetchtime   = 0;
1626   tso->gran.fetchcount  = 0;
1627   tso->gran.blocktime   = 0;
1628   tso->gran.blockcount  = 0;
1629   tso->gran.blockedat   = 0;
1630   tso->gran.globalsparks = 0;
1631   tso->gran.localsparks  = 0;
1632   if (RtsFlags.GranFlags.Light)
1633     tso->gran.clock  = Now; /* local clock */
1634   else
1635     tso->gran.clock  = 0;
1636
1637   IF_DEBUG(gran,printTSO(tso));
1638 #elif defined(PAR)
1639 # if defined(DEBUG)
1640   tso->par.magic = TSO_MAGIC; // debugging only
1641 # endif
1642   tso->par.sparkname   = 0;
1643   tso->par.startedat   = CURRENT_TIME; 
1644   tso->par.exported    = 0;
1645   tso->par.basicblocks = 0;
1646   tso->par.allocs      = 0;
1647   tso->par.exectime    = 0;
1648   tso->par.fetchtime   = 0;
1649   tso->par.fetchcount  = 0;
1650   tso->par.blocktime   = 0;
1651   tso->par.blockcount  = 0;
1652   tso->par.blockedat   = 0;
1653   tso->par.globalsparks = 0;
1654   tso->par.localsparks  = 0;
1655 #endif
1656
1657 #if defined(GRAN)
1658   globalGranStats.tot_threads_created++;
1659   globalGranStats.threads_created_on_PE[CurrentProc]++;
1660   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1661   globalGranStats.tot_sq_probes++;
1662 #elif defined(PAR)
1663   // collect parallel global statistics (currently done together with GC stats)
1664   if (RtsFlags.ParFlags.ParStats.Global &&
1665       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1666     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1667     globalParStats.tot_threads_created++;
1668   }
1669 #endif 
1670
1671 #if defined(GRAN)
1672   IF_GRAN_DEBUG(pri,
1673                 belch("==__ schedule: Created TSO %d (%p);",
1674                       CurrentProc, tso, tso->id));
1675 #elif defined(PAR)
1676     IF_PAR_DEBUG(verbose,
1677                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1678                        tso->id, tso, advisory_thread_count));
1679 #else
1680   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1681                                  tso->id, tso->stack_size));
1682 #endif    
1683   return tso;
1684 }
1685
1686 #if defined(PAR)
1687 /* RFP:
1688    all parallel thread creation calls should fall through the following routine.
1689 */
1690 StgTSO *
1691 createSparkThread(rtsSpark spark) 
1692 { StgTSO *tso;
1693   ASSERT(spark != (rtsSpark)NULL);
1694   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1695   { threadsIgnored++;
1696     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1697           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1698     return END_TSO_QUEUE;
1699   }
1700   else
1701   { threadsCreated++;
1702     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1703     if (tso==END_TSO_QUEUE)     
1704       barf("createSparkThread: Cannot create TSO");
1705 #if defined(DIST)
1706     tso->priority = AdvisoryPriority;
1707 #endif
1708     pushClosure(tso,spark);
1709     PUSH_ON_RUN_QUEUE(tso);
1710     advisory_thread_count++;    
1711   }
1712   return tso;
1713 }
1714 #endif
1715
1716 /*
1717   Turn a spark into a thread.
1718   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1719 */
1720 #if defined(PAR)
1721 //@cindex activateSpark
1722 StgTSO *
1723 activateSpark (rtsSpark spark) 
1724 {
1725   StgTSO *tso;
1726
1727   tso = createSparkThread(spark);
1728   if (RtsFlags.ParFlags.ParStats.Full) {   
1729     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1730     IF_PAR_DEBUG(verbose,
1731                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1732                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1733   }
1734   // ToDo: fwd info on local/global spark to thread -- HWL
1735   // tso->gran.exported =  spark->exported;
1736   // tso->gran.locked =   !spark->global;
1737   // tso->gran.sparkname = spark->name;
1738
1739   return tso;
1740 }
1741 #endif
1742
1743 /* ---------------------------------------------------------------------------
1744  * scheduleThread()
1745  *
1746  * scheduleThread puts a thread on the head of the runnable queue.
1747  * This will usually be done immediately after a thread is created.
1748  * The caller of scheduleThread must create the thread using e.g.
1749  * createThread and push an appropriate closure
1750  * on this thread's stack before the scheduler is invoked.
1751  * ------------------------------------------------------------------------ */
1752
1753 void
1754 scheduleThread(StgTSO *tso)
1755 {
1756   if (tso==END_TSO_QUEUE){    
1757     schedule();
1758     return;
1759   }
1760
1761   ACQUIRE_LOCK(&sched_mutex);
1762
1763   /* Put the new thread on the head of the runnable queue.  The caller
1764    * better push an appropriate closure on this thread's stack
1765    * beforehand.  In the SMP case, the thread may start running as
1766    * soon as we release the scheduler lock below.
1767    */
1768   PUSH_ON_RUN_QUEUE(tso);
1769   THREAD_RUNNABLE();
1770
1771 #if 0
1772   IF_DEBUG(scheduler,printTSO(tso));
1773 #endif
1774   RELEASE_LOCK(&sched_mutex);
1775 }
1776
1777 /* ---------------------------------------------------------------------------
1778  * startTasks()
1779  *
1780  * Start up Posix threads to run each of the scheduler tasks.
1781  * I believe the task ids are not needed in the system as defined.
1782  *  KH @ 25/10/99
1783  * ------------------------------------------------------------------------ */
1784
1785 #if defined(PAR) || defined(SMP)
1786 void
1787 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1788 {
1789   scheduleThread(END_TSO_QUEUE);
1790 }
1791 #endif
1792
1793 /* ---------------------------------------------------------------------------
1794  * initScheduler()
1795  *
1796  * Initialise the scheduler.  This resets all the queues - if the
1797  * queues contained any threads, they'll be garbage collected at the
1798  * next pass.
1799  *
1800  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1801  * ------------------------------------------------------------------------ */
1802
1803 #ifdef SMP
1804 static void
1805 term_handler(int sig STG_UNUSED)
1806 {
1807   stat_workerStop();
1808   ACQUIRE_LOCK(&term_mutex);
1809   await_death--;
1810   RELEASE_LOCK(&term_mutex);
1811   pthread_exit(NULL);
1812 }
1813 #endif
1814
1815 static void
1816 initCapability( Capability *cap )
1817 {
1818     cap->f.stgChk0         = (F_)__stg_chk_0;
1819     cap->f.stgChk1         = (F_)__stg_chk_1;
1820     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
1821     cap->f.stgUpdatePAP    = (F_)__stg_update_PAP;
1822 }
1823
1824 void 
1825 initScheduler(void)
1826 {
1827 #if defined(GRAN)
1828   nat i;
1829
1830   for (i=0; i<=MAX_PROC; i++) {
1831     run_queue_hds[i]      = END_TSO_QUEUE;
1832     run_queue_tls[i]      = END_TSO_QUEUE;
1833     blocked_queue_hds[i]  = END_TSO_QUEUE;
1834     blocked_queue_tls[i]  = END_TSO_QUEUE;
1835     ccalling_threadss[i]  = END_TSO_QUEUE;
1836     sleeping_queue        = END_TSO_QUEUE;
1837   }
1838 #else
1839   run_queue_hd      = END_TSO_QUEUE;
1840   run_queue_tl      = END_TSO_QUEUE;
1841   blocked_queue_hd  = END_TSO_QUEUE;
1842   blocked_queue_tl  = END_TSO_QUEUE;
1843   sleeping_queue    = END_TSO_QUEUE;
1844 #endif 
1845
1846   suspended_ccalling_threads  = END_TSO_QUEUE;
1847
1848   main_threads = NULL;
1849   all_threads  = END_TSO_QUEUE;
1850
1851   context_switch = 0;
1852   interrupted    = 0;
1853
1854   RtsFlags.ConcFlags.ctxtSwitchTicks =
1855       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1856
1857   /* Install the SIGHUP handler */
1858 #ifdef SMP
1859   {
1860     struct sigaction action,oact;
1861
1862     action.sa_handler = term_handler;
1863     sigemptyset(&action.sa_mask);
1864     action.sa_flags = 0;
1865     if (sigaction(SIGTERM, &action, &oact) != 0) {
1866       barf("can't install TERM handler");
1867     }
1868   }
1869 #endif
1870
1871 #ifdef SMP
1872   /* Allocate N Capabilities */
1873   {
1874     nat i;
1875     Capability *cap, *prev;
1876     cap  = NULL;
1877     prev = NULL;
1878     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1879       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1880       initCapability(cap);
1881       cap->link = prev;
1882       prev = cap;
1883     }
1884     free_capabilities = cap;
1885     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1886   }
1887   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1888                              n_free_capabilities););
1889 #else
1890   initCapability(&MainCapability);
1891 #endif
1892
1893 #if defined(SMP) || defined(PAR)
1894   initSparkPools();
1895 #endif
1896 }
1897
1898 #ifdef SMP
1899 void
1900 startTasks( void )
1901 {
1902   nat i;
1903   int r;
1904   pthread_t tid;
1905   
1906   /* make some space for saving all the thread ids */
1907   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1908                             "initScheduler:task_ids");
1909   
1910   /* and create all the threads */
1911   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1912     r = pthread_create(&tid,NULL,taskStart,NULL);
1913     if (r != 0) {
1914       barf("startTasks: Can't create new Posix thread");
1915     }
1916     task_ids[i].id = tid;
1917     task_ids[i].mut_time = 0.0;
1918     task_ids[i].mut_etime = 0.0;
1919     task_ids[i].gc_time = 0.0;
1920     task_ids[i].gc_etime = 0.0;
1921     task_ids[i].elapsedtimestart = elapsedtime();
1922     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1923   }
1924 }
1925 #endif
1926
1927 void
1928 exitScheduler( void )
1929 {
1930 #ifdef SMP
1931   nat i;
1932
1933   /* Don't want to use pthread_cancel, since we'd have to install
1934    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1935    * all our locks.
1936    */
1937 #if 0
1938   /* Cancel all our tasks */
1939   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1940     pthread_cancel(task_ids[i].id);
1941   }
1942   
1943   /* Wait for all the tasks to terminate */
1944   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1945     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1946                                task_ids[i].id));
1947     pthread_join(task_ids[i].id, NULL);
1948   }
1949 #endif
1950
1951   /* Send 'em all a SIGHUP.  That should shut 'em up.
1952    */
1953   await_death = RtsFlags.ParFlags.nNodes;
1954   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1955     pthread_kill(task_ids[i].id,SIGTERM);
1956   }
1957   while (await_death > 0) {
1958     sched_yield();
1959   }
1960 #endif
1961 }
1962
1963 /* -----------------------------------------------------------------------------
1964    Managing the per-task allocation areas.
1965    
1966    Each capability comes with an allocation area.  These are
1967    fixed-length block lists into which allocation can be done.
1968
1969    ToDo: no support for two-space collection at the moment???
1970    -------------------------------------------------------------------------- */
1971
1972 /* -----------------------------------------------------------------------------
1973  * waitThread is the external interface for running a new computation
1974  * and waiting for the result.
1975  *
1976  * In the non-SMP case, we create a new main thread, push it on the 
1977  * main-thread stack, and invoke the scheduler to run it.  The
1978  * scheduler will return when the top main thread on the stack has
1979  * completed or died, and fill in the necessary fields of the
1980  * main_thread structure.
1981  *
1982  * In the SMP case, we create a main thread as before, but we then
1983  * create a new condition variable and sleep on it.  When our new
1984  * main thread has completed, we'll be woken up and the status/result
1985  * will be in the main_thread struct.
1986  * -------------------------------------------------------------------------- */
1987
1988 int 
1989 howManyThreadsAvail ( void )
1990 {
1991    int i = 0;
1992    StgTSO* q;
1993    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1994       i++;
1995    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1996       i++;
1997    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1998       i++;
1999    return i;
2000 }
2001
2002 void
2003 finishAllThreads ( void )
2004 {
2005    do {
2006       while (run_queue_hd != END_TSO_QUEUE) {
2007          waitThread ( run_queue_hd, NULL );
2008       }
2009       while (blocked_queue_hd != END_TSO_QUEUE) {
2010          waitThread ( blocked_queue_hd, NULL );
2011       }
2012       while (sleeping_queue != END_TSO_QUEUE) {
2013          waitThread ( blocked_queue_hd, NULL );
2014       }
2015    } while 
2016       (blocked_queue_hd != END_TSO_QUEUE || 
2017        run_queue_hd     != END_TSO_QUEUE ||
2018        sleeping_queue   != END_TSO_QUEUE);
2019 }
2020
2021 SchedulerStatus
2022 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2023 {
2024   StgMainThread *m;
2025   SchedulerStatus stat;
2026
2027   ACQUIRE_LOCK(&sched_mutex);
2028   
2029   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2030
2031   m->tso = tso;
2032   m->ret = ret;
2033   m->stat = NoStatus;
2034 #ifdef SMP
2035   pthread_cond_init(&m->wakeup, NULL);
2036 #endif
2037
2038   m->link = main_threads;
2039   main_threads = m;
2040
2041   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2042                               m->tso->id));
2043
2044 #ifdef SMP
2045   do {
2046     pthread_cond_wait(&m->wakeup, &sched_mutex);
2047   } while (m->stat == NoStatus);
2048 #elif defined(GRAN)
2049   /* GranSim specific init */
2050   CurrentTSO = m->tso;                // the TSO to run
2051   procStatus[MainProc] = Busy;        // status of main PE
2052   CurrentProc = MainProc;             // PE to run it on
2053
2054   schedule();
2055 #else
2056   schedule();
2057   ASSERT(m->stat != NoStatus);
2058 #endif
2059
2060   stat = m->stat;
2061
2062 #ifdef SMP
2063   pthread_cond_destroy(&m->wakeup);
2064 #endif
2065
2066   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2067                               m->tso->id));
2068   free(m);
2069
2070   RELEASE_LOCK(&sched_mutex);
2071
2072   return stat;
2073 }
2074
2075 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2076 //@subsection Run queue code 
2077
2078 #if 0
2079 /* 
2080    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2081        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2082        implicit global variable that has to be correct when calling these
2083        fcts -- HWL 
2084 */
2085
2086 /* Put the new thread on the head of the runnable queue.
2087  * The caller of createThread better push an appropriate closure
2088  * on this thread's stack before the scheduler is invoked.
2089  */
2090 static /* inline */ void
2091 add_to_run_queue(tso)
2092 StgTSO* tso; 
2093 {
2094   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2095   tso->link = run_queue_hd;
2096   run_queue_hd = tso;
2097   if (run_queue_tl == END_TSO_QUEUE) {
2098     run_queue_tl = tso;
2099   }
2100 }
2101
2102 /* Put the new thread at the end of the runnable queue. */
2103 static /* inline */ void
2104 push_on_run_queue(tso)
2105 StgTSO* tso; 
2106 {
2107   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2108   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2109   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2110   if (run_queue_hd == END_TSO_QUEUE) {
2111     run_queue_hd = tso;
2112   } else {
2113     run_queue_tl->link = tso;
2114   }
2115   run_queue_tl = tso;
2116 }
2117
2118 /* 
2119    Should be inlined because it's used very often in schedule.  The tso
2120    argument is actually only needed in GranSim, where we want to have the
2121    possibility to schedule *any* TSO on the run queue, irrespective of the
2122    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2123    the run queue and dequeue the tso, adjusting the links in the queue. 
2124 */
2125 //@cindex take_off_run_queue
2126 static /* inline */ StgTSO*
2127 take_off_run_queue(StgTSO *tso) {
2128   StgTSO *t, *prev;
2129
2130   /* 
2131      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2132
2133      if tso is specified, unlink that tso from the run_queue (doesn't have
2134      to be at the beginning of the queue); GranSim only 
2135   */
2136   if (tso!=END_TSO_QUEUE) {
2137     /* find tso in queue */
2138     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2139          t!=END_TSO_QUEUE && t!=tso;
2140          prev=t, t=t->link) 
2141       /* nothing */ ;
2142     ASSERT(t==tso);
2143     /* now actually dequeue the tso */
2144     if (prev!=END_TSO_QUEUE) {
2145       ASSERT(run_queue_hd!=t);
2146       prev->link = t->link;
2147     } else {
2148       /* t is at beginning of thread queue */
2149       ASSERT(run_queue_hd==t);
2150       run_queue_hd = t->link;
2151     }
2152     /* t is at end of thread queue */
2153     if (t->link==END_TSO_QUEUE) {
2154       ASSERT(t==run_queue_tl);
2155       run_queue_tl = prev;
2156     } else {
2157       ASSERT(run_queue_tl!=t);
2158     }
2159     t->link = END_TSO_QUEUE;
2160   } else {
2161     /* take tso from the beginning of the queue; std concurrent code */
2162     t = run_queue_hd;
2163     if (t != END_TSO_QUEUE) {
2164       run_queue_hd = t->link;
2165       t->link = END_TSO_QUEUE;
2166       if (run_queue_hd == END_TSO_QUEUE) {
2167         run_queue_tl = END_TSO_QUEUE;
2168       }
2169     }
2170   }
2171   return t;
2172 }
2173
2174 #endif /* 0 */
2175
2176 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2177 //@subsection Garbage Collextion Routines
2178
2179 /* ---------------------------------------------------------------------------
2180    Where are the roots that we know about?
2181
2182         - all the threads on the runnable queue
2183         - all the threads on the blocked queue
2184         - all the threads on the sleeping queue
2185         - all the thread currently executing a _ccall_GC
2186         - all the "main threads"
2187      
2188    ------------------------------------------------------------------------ */
2189
2190 /* This has to be protected either by the scheduler monitor, or by the
2191         garbage collection monitor (probably the latter).
2192         KH @ 25/10/99
2193 */
2194
2195 void
2196 GetRoots(evac_fn evac)
2197 {
2198   StgMainThread *m;
2199
2200 #if defined(GRAN)
2201   {
2202     nat i;
2203     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2204       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2205           evac((StgClosure **)&run_queue_hds[i]);
2206       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2207           evac((StgClosure **)&run_queue_tls[i]);
2208       
2209       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2210           evac((StgClosure **)&blocked_queue_hds[i]);
2211       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2212           evac((StgClosure **)&blocked_queue_tls[i]);
2213       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2214           evac((StgClosure **)&ccalling_threads[i]);
2215     }
2216   }
2217
2218   markEventQueue();
2219
2220 #else /* !GRAN */
2221   if (run_queue_hd != END_TSO_QUEUE) {
2222       ASSERT(run_queue_tl != END_TSO_QUEUE);
2223       evac((StgClosure **)&run_queue_hd);
2224       evac((StgClosure **)&run_queue_tl);
2225   }
2226   
2227   if (blocked_queue_hd != END_TSO_QUEUE) {
2228       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2229       evac((StgClosure **)&blocked_queue_hd);
2230       evac((StgClosure **)&blocked_queue_tl);
2231   }
2232   
2233   if (sleeping_queue != END_TSO_QUEUE) {
2234       evac((StgClosure **)&sleeping_queue);
2235   }
2236 #endif 
2237
2238   for (m = main_threads; m != NULL; m = m->link) {
2239       evac((StgClosure **)&m->tso);
2240   }
2241   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2242       evac((StgClosure **)&suspended_ccalling_threads);
2243   }
2244
2245 #if defined(SMP) || defined(PAR) || defined(GRAN)
2246   markSparkQueue(evac);
2247 #endif
2248 }
2249
2250 /* -----------------------------------------------------------------------------
2251    performGC
2252
2253    This is the interface to the garbage collector from Haskell land.
2254    We provide this so that external C code can allocate and garbage
2255    collect when called from Haskell via _ccall_GC.
2256
2257    It might be useful to provide an interface whereby the programmer
2258    can specify more roots (ToDo).
2259    
2260    This needs to be protected by the GC condition variable above.  KH.
2261    -------------------------------------------------------------------------- */
2262
2263 void (*extra_roots)(evac_fn);
2264
2265 void
2266 performGC(void)
2267 {
2268   GarbageCollect(GetRoots,rtsFalse);
2269 }
2270
2271 void
2272 performMajorGC(void)
2273 {
2274   GarbageCollect(GetRoots,rtsTrue);
2275 }
2276
2277 static void
2278 AllRoots(evac_fn evac)
2279 {
2280     GetRoots(evac);             // the scheduler's roots
2281     extra_roots(evac);          // the user's roots
2282 }
2283
2284 void
2285 performGCWithRoots(void (*get_roots)(evac_fn))
2286 {
2287   extra_roots = get_roots;
2288   GarbageCollect(AllRoots,rtsFalse);
2289 }
2290
2291 /* -----------------------------------------------------------------------------
2292    Stack overflow
2293
2294    If the thread has reached its maximum stack size, then raise the
2295    StackOverflow exception in the offending thread.  Otherwise
2296    relocate the TSO into a larger chunk of memory and adjust its stack
2297    size appropriately.
2298    -------------------------------------------------------------------------- */
2299
2300 static StgTSO *
2301 threadStackOverflow(StgTSO *tso)
2302 {
2303   nat new_stack_size, new_tso_size, diff, stack_words;
2304   StgPtr new_sp;
2305   StgTSO *dest;
2306
2307   IF_DEBUG(sanity,checkTSO(tso));
2308   if (tso->stack_size >= tso->max_stack_size) {
2309
2310     IF_DEBUG(gc,
2311              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2312                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2313              /* If we're debugging, just print out the top of the stack */
2314              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2315                                               tso->sp+64)));
2316
2317     /* Send this thread the StackOverflow exception */
2318     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2319     return tso;
2320   }
2321
2322   /* Try to double the current stack size.  If that takes us over the
2323    * maximum stack size for this thread, then use the maximum instead.
2324    * Finally round up so the TSO ends up as a whole number of blocks.
2325    */
2326   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2327   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2328                                        TSO_STRUCT_SIZE)/sizeof(W_);
2329   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2330   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2331
2332   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2333
2334   dest = (StgTSO *)allocate(new_tso_size);
2335   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2336
2337   /* copy the TSO block and the old stack into the new area */
2338   memcpy(dest,tso,TSO_STRUCT_SIZE);
2339   stack_words = tso->stack + tso->stack_size - tso->sp;
2340   new_sp = (P_)dest + new_tso_size - stack_words;
2341   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2342
2343   /* relocate the stack pointers... */
2344   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2345   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2346   dest->sp    = new_sp;
2347   dest->stack_size = new_stack_size;
2348         
2349   /* and relocate the update frame list */
2350   relocate_stack(dest, diff);
2351
2352   /* Mark the old TSO as relocated.  We have to check for relocated
2353    * TSOs in the garbage collector and any primops that deal with TSOs.
2354    *
2355    * It's important to set the sp and su values to just beyond the end
2356    * of the stack, so we don't attempt to scavenge any part of the
2357    * dead TSO's stack.
2358    */
2359   tso->what_next = ThreadRelocated;
2360   tso->link = dest;
2361   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2362   tso->su = (StgUpdateFrame *)tso->sp;
2363   tso->why_blocked = NotBlocked;
2364   dest->mut_link = NULL;
2365
2366   IF_PAR_DEBUG(verbose,
2367                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2368                      tso->id, tso, tso->stack_size);
2369                /* If we're debugging, just print out the top of the stack */
2370                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2371                                                 tso->sp+64)));
2372   
2373   IF_DEBUG(sanity,checkTSO(tso));
2374 #if 0
2375   IF_DEBUG(scheduler,printTSO(dest));
2376 #endif
2377
2378   return dest;
2379 }
2380
2381 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2382 //@subsection Blocking Queue Routines
2383
2384 /* ---------------------------------------------------------------------------
2385    Wake up a queue that was blocked on some resource.
2386    ------------------------------------------------------------------------ */
2387
2388 #if defined(GRAN)
2389 static inline void
2390 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2391 {
2392 }
2393 #elif defined(PAR)
2394 static inline void
2395 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2396 {
2397   /* write RESUME events to log file and
2398      update blocked and fetch time (depending on type of the orig closure) */
2399   if (RtsFlags.ParFlags.ParStats.Full) {
2400     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2401                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2402                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2403     if (EMPTY_RUN_QUEUE())
2404       emitSchedule = rtsTrue;
2405
2406     switch (get_itbl(node)->type) {
2407         case FETCH_ME_BQ:
2408           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2409           break;
2410         case RBH:
2411         case FETCH_ME:
2412         case BLACKHOLE_BQ:
2413           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2414           break;
2415 #ifdef DIST
2416         case MVAR:
2417           break;
2418 #endif    
2419         default:
2420           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2421         }
2422       }
2423 }
2424 #endif
2425
2426 #if defined(GRAN)
2427 static StgBlockingQueueElement *
2428 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2429 {
2430     StgTSO *tso;
2431     PEs node_loc, tso_loc;
2432
2433     node_loc = where_is(node); // should be lifted out of loop
2434     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2435     tso_loc = where_is((StgClosure *)tso);
2436     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2437       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2438       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2439       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2440       // insertThread(tso, node_loc);
2441       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2442                 ResumeThread,
2443                 tso, node, (rtsSpark*)NULL);
2444       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2445       // len_local++;
2446       // len++;
2447     } else { // TSO is remote (actually should be FMBQ)
2448       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2449                                   RtsFlags.GranFlags.Costs.gunblocktime +
2450                                   RtsFlags.GranFlags.Costs.latency;
2451       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2452                 UnblockThread,
2453                 tso, node, (rtsSpark*)NULL);
2454       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2455       // len++;
2456     }
2457     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2458     IF_GRAN_DEBUG(bq,
2459                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2460                           (node_loc==tso_loc ? "Local" : "Global"), 
2461                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2462     tso->block_info.closure = NULL;
2463     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2464                              tso->id, tso));
2465 }
2466 #elif defined(PAR)
2467 static StgBlockingQueueElement *
2468 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2469 {
2470     StgBlockingQueueElement *next;
2471
2472     switch (get_itbl(bqe)->type) {
2473     case TSO:
2474       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2475       /* if it's a TSO just push it onto the run_queue */
2476       next = bqe->link;
2477       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2478       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2479       THREAD_RUNNABLE();
2480       unblockCount(bqe, node);
2481       /* reset blocking status after dumping event */
2482       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2483       break;
2484
2485     case BLOCKED_FETCH:
2486       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2487       next = bqe->link;
2488       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2489       PendingFetches = (StgBlockedFetch *)bqe;
2490       break;
2491
2492 # if defined(DEBUG)
2493       /* can ignore this case in a non-debugging setup; 
2494          see comments on RBHSave closures above */
2495     case CONSTR:
2496       /* check that the closure is an RBHSave closure */
2497       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2498              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2499              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2500       break;
2501
2502     default:
2503       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2504            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2505            (StgClosure *)bqe);
2506 # endif
2507     }
2508   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2509   return next;
2510 }
2511
2512 #else /* !GRAN && !PAR */
2513 static StgTSO *
2514 unblockOneLocked(StgTSO *tso)
2515 {
2516   StgTSO *next;
2517
2518   ASSERT(get_itbl(tso)->type == TSO);
2519   ASSERT(tso->why_blocked != NotBlocked);
2520   tso->why_blocked = NotBlocked;
2521   next = tso->link;
2522   PUSH_ON_RUN_QUEUE(tso);
2523   THREAD_RUNNABLE();
2524   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2525   return next;
2526 }
2527 #endif
2528
2529 #if defined(GRAN) || defined(PAR)
2530 inline StgBlockingQueueElement *
2531 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2532 {
2533   ACQUIRE_LOCK(&sched_mutex);
2534   bqe = unblockOneLocked(bqe, node);
2535   RELEASE_LOCK(&sched_mutex);
2536   return bqe;
2537 }
2538 #else
2539 inline StgTSO *
2540 unblockOne(StgTSO *tso)
2541 {
2542   ACQUIRE_LOCK(&sched_mutex);
2543   tso = unblockOneLocked(tso);
2544   RELEASE_LOCK(&sched_mutex);
2545   return tso;
2546 }
2547 #endif
2548
2549 #if defined(GRAN)
2550 void 
2551 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2552 {
2553   StgBlockingQueueElement *bqe;
2554   PEs node_loc;
2555   nat len = 0; 
2556
2557   IF_GRAN_DEBUG(bq, 
2558                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2559                       node, CurrentProc, CurrentTime[CurrentProc], 
2560                       CurrentTSO->id, CurrentTSO));
2561
2562   node_loc = where_is(node);
2563
2564   ASSERT(q == END_BQ_QUEUE ||
2565          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2566          get_itbl(q)->type == CONSTR); // closure (type constructor)
2567   ASSERT(is_unique(node));
2568
2569   /* FAKE FETCH: magically copy the node to the tso's proc;
2570      no Fetch necessary because in reality the node should not have been 
2571      moved to the other PE in the first place
2572   */
2573   if (CurrentProc!=node_loc) {
2574     IF_GRAN_DEBUG(bq, 
2575                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2576                         node, node_loc, CurrentProc, CurrentTSO->id, 
2577                         // CurrentTSO, where_is(CurrentTSO),
2578                         node->header.gran.procs));
2579     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2580     IF_GRAN_DEBUG(bq, 
2581                   belch("## new bitmask of node %p is %#x",
2582                         node, node->header.gran.procs));
2583     if (RtsFlags.GranFlags.GranSimStats.Global) {
2584       globalGranStats.tot_fake_fetches++;
2585     }
2586   }
2587
2588   bqe = q;
2589   // ToDo: check: ASSERT(CurrentProc==node_loc);
2590   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2591     //next = bqe->link;
2592     /* 
2593        bqe points to the current element in the queue
2594        next points to the next element in the queue
2595     */
2596     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2597     //tso_loc = where_is(tso);
2598     len++;
2599     bqe = unblockOneLocked(bqe, node);
2600   }
2601
2602   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2603      the closure to make room for the anchor of the BQ */
2604   if (bqe!=END_BQ_QUEUE) {
2605     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2606     /*
2607     ASSERT((info_ptr==&RBH_Save_0_info) ||
2608            (info_ptr==&RBH_Save_1_info) ||
2609            (info_ptr==&RBH_Save_2_info));
2610     */
2611     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2612     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2613     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2614
2615     IF_GRAN_DEBUG(bq,
2616                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2617                         node, info_type(node)));
2618   }
2619
2620   /* statistics gathering */
2621   if (RtsFlags.GranFlags.GranSimStats.Global) {
2622     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2623     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2624     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2625     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2626   }
2627   IF_GRAN_DEBUG(bq,
2628                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2629                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2630 }
2631 #elif defined(PAR)
2632 void 
2633 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2634 {
2635   StgBlockingQueueElement *bqe;
2636
2637   ACQUIRE_LOCK(&sched_mutex);
2638
2639   IF_PAR_DEBUG(verbose, 
2640                belch("##-_ AwBQ for node %p on [%x]: ",
2641                      node, mytid));
2642 #ifdef DIST  
2643   //RFP
2644   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2645     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2646     return;
2647   }
2648 #endif
2649   
2650   ASSERT(q == END_BQ_QUEUE ||
2651          get_itbl(q)->type == TSO ||           
2652          get_itbl(q)->type == BLOCKED_FETCH || 
2653          get_itbl(q)->type == CONSTR); 
2654
2655   bqe = q;
2656   while (get_itbl(bqe)->type==TSO || 
2657          get_itbl(bqe)->type==BLOCKED_FETCH) {
2658     bqe = unblockOneLocked(bqe, node);
2659   }
2660   RELEASE_LOCK(&sched_mutex);
2661 }
2662
2663 #else   /* !GRAN && !PAR */
2664 void
2665 awakenBlockedQueue(StgTSO *tso)
2666 {
2667   ACQUIRE_LOCK(&sched_mutex);
2668   while (tso != END_TSO_QUEUE) {
2669     tso = unblockOneLocked(tso);
2670   }
2671   RELEASE_LOCK(&sched_mutex);
2672 }
2673 #endif
2674
2675 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2676 //@subsection Exception Handling Routines
2677
2678 /* ---------------------------------------------------------------------------
2679    Interrupt execution
2680    - usually called inside a signal handler so it mustn't do anything fancy.   
2681    ------------------------------------------------------------------------ */
2682
2683 void
2684 interruptStgRts(void)
2685 {
2686     interrupted    = 1;
2687     context_switch = 1;
2688 }
2689
2690 /* -----------------------------------------------------------------------------
2691    Unblock a thread
2692
2693    This is for use when we raise an exception in another thread, which
2694    may be blocked.
2695    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2696    -------------------------------------------------------------------------- */
2697
2698 #if defined(GRAN) || defined(PAR)
2699 /*
2700   NB: only the type of the blocking queue is different in GranSim and GUM
2701       the operations on the queue-elements are the same
2702       long live polymorphism!
2703 */
2704 static void
2705 unblockThread(StgTSO *tso)
2706 {
2707   StgBlockingQueueElement *t, **last;
2708
2709   ACQUIRE_LOCK(&sched_mutex);
2710   switch (tso->why_blocked) {
2711
2712   case NotBlocked:
2713     return;  /* not blocked */
2714
2715   case BlockedOnMVar:
2716     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2717     {
2718       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2719       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2720
2721       last = (StgBlockingQueueElement **)&mvar->head;
2722       for (t = (StgBlockingQueueElement *)mvar->head; 
2723            t != END_BQ_QUEUE; 
2724            last = &t->link, last_tso = t, t = t->link) {
2725         if (t == (StgBlockingQueueElement *)tso) {
2726           *last = (StgBlockingQueueElement *)tso->link;
2727           if (mvar->tail == tso) {
2728             mvar->tail = (StgTSO *)last_tso;
2729           }
2730           goto done;
2731         }
2732       }
2733       barf("unblockThread (MVAR): TSO not found");
2734     }
2735
2736   case BlockedOnBlackHole:
2737     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2738     {
2739       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2740
2741       last = &bq->blocking_queue;
2742       for (t = bq->blocking_queue; 
2743            t != END_BQ_QUEUE; 
2744            last = &t->link, t = t->link) {
2745         if (t == (StgBlockingQueueElement *)tso) {
2746           *last = (StgBlockingQueueElement *)tso->link;
2747           goto done;
2748         }
2749       }
2750       barf("unblockThread (BLACKHOLE): TSO not found");
2751     }
2752
2753   case BlockedOnException:
2754     {
2755       StgTSO *target  = tso->block_info.tso;
2756
2757       ASSERT(get_itbl(target)->type == TSO);
2758
2759       if (target->what_next == ThreadRelocated) {
2760           target = target->link;
2761           ASSERT(get_itbl(target)->type == TSO);
2762       }
2763
2764       ASSERT(target->blocked_exceptions != NULL);
2765
2766       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2767       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2768            t != END_BQ_QUEUE; 
2769            last = &t->link, t = t->link) {
2770         ASSERT(get_itbl(t)->type == TSO);
2771         if (t == (StgBlockingQueueElement *)tso) {
2772           *last = (StgBlockingQueueElement *)tso->link;
2773           goto done;
2774         }
2775       }
2776       barf("unblockThread (Exception): TSO not found");
2777     }
2778
2779   case BlockedOnRead:
2780   case BlockedOnWrite:
2781     {
2782       /* take TSO off blocked_queue */
2783       StgBlockingQueueElement *prev = NULL;
2784       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2785            prev = t, t = t->link) {
2786         if (t == (StgBlockingQueueElement *)tso) {
2787           if (prev == NULL) {
2788             blocked_queue_hd = (StgTSO *)t->link;
2789             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2790               blocked_queue_tl = END_TSO_QUEUE;
2791             }
2792           } else {
2793             prev->link = t->link;
2794             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2795               blocked_queue_tl = (StgTSO *)prev;
2796             }
2797           }
2798           goto done;
2799         }
2800       }
2801       barf("unblockThread (I/O): TSO not found");
2802     }
2803
2804   case BlockedOnDelay:
2805     {
2806       /* take TSO off sleeping_queue */
2807       StgBlockingQueueElement *prev = NULL;
2808       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2809            prev = t, t = t->link) {
2810         if (t == (StgBlockingQueueElement *)tso) {
2811           if (prev == NULL) {
2812             sleeping_queue = (StgTSO *)t->link;
2813           } else {
2814             prev->link = t->link;
2815           }
2816           goto done;
2817         }
2818       }
2819       barf("unblockThread (I/O): TSO not found");
2820     }
2821
2822   default:
2823     barf("unblockThread");
2824   }
2825
2826  done:
2827   tso->link = END_TSO_QUEUE;
2828   tso->why_blocked = NotBlocked;
2829   tso->block_info.closure = NULL;
2830   PUSH_ON_RUN_QUEUE(tso);
2831   RELEASE_LOCK(&sched_mutex);
2832 }
2833 #else
2834 static void
2835 unblockThread(StgTSO *tso)
2836 {
2837   StgTSO *t, **last;
2838
2839   ACQUIRE_LOCK(&sched_mutex);
2840   switch (tso->why_blocked) {
2841
2842   case NotBlocked:
2843     return;  /* not blocked */
2844
2845   case BlockedOnMVar:
2846     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2847     {
2848       StgTSO *last_tso = END_TSO_QUEUE;
2849       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2850
2851       last = &mvar->head;
2852       for (t = mvar->head; t != END_TSO_QUEUE; 
2853            last = &t->link, last_tso = t, t = t->link) {
2854         if (t == tso) {
2855           *last = tso->link;
2856           if (mvar->tail == tso) {
2857             mvar->tail = last_tso;
2858           }
2859           goto done;
2860         }
2861       }
2862       barf("unblockThread (MVAR): TSO not found");
2863     }
2864
2865   case BlockedOnBlackHole:
2866     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2867     {
2868       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2869
2870       last = &bq->blocking_queue;
2871       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2872            last = &t->link, t = t->link) {
2873         if (t == tso) {
2874           *last = tso->link;
2875           goto done;
2876         }
2877       }
2878       barf("unblockThread (BLACKHOLE): TSO not found");
2879     }
2880
2881   case BlockedOnException:
2882     {
2883       StgTSO *target  = tso->block_info.tso;
2884
2885       ASSERT(get_itbl(target)->type == TSO);
2886
2887       while (target->what_next == ThreadRelocated) {
2888           target = target->link;
2889           ASSERT(get_itbl(target)->type == TSO);
2890       }
2891       
2892       ASSERT(target->blocked_exceptions != NULL);
2893
2894       last = &target->blocked_exceptions;
2895       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2896            last = &t->link, t = t->link) {
2897         ASSERT(get_itbl(t)->type == TSO);
2898         if (t == tso) {
2899           *last = tso->link;
2900           goto done;
2901         }
2902       }
2903       barf("unblockThread (Exception): TSO not found");
2904     }
2905
2906   case BlockedOnRead:
2907   case BlockedOnWrite:
2908     {
2909       StgTSO *prev = NULL;
2910       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2911            prev = t, t = t->link) {
2912         if (t == tso) {
2913           if (prev == NULL) {
2914             blocked_queue_hd = t->link;
2915             if (blocked_queue_tl == t) {
2916               blocked_queue_tl = END_TSO_QUEUE;
2917             }
2918           } else {
2919             prev->link = t->link;
2920             if (blocked_queue_tl == t) {
2921               blocked_queue_tl = prev;
2922             }
2923           }
2924           goto done;
2925         }
2926       }
2927       barf("unblockThread (I/O): TSO not found");
2928     }
2929
2930   case BlockedOnDelay:
2931     {
2932       StgTSO *prev = NULL;
2933       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2934            prev = t, t = t->link) {
2935         if (t == tso) {
2936           if (prev == NULL) {
2937             sleeping_queue = t->link;
2938           } else {
2939             prev->link = t->link;
2940           }
2941           goto done;
2942         }
2943       }
2944       barf("unblockThread (I/O): TSO not found");
2945     }
2946
2947   default:
2948     barf("unblockThread");
2949   }
2950
2951  done:
2952   tso->link = END_TSO_QUEUE;
2953   tso->why_blocked = NotBlocked;
2954   tso->block_info.closure = NULL;
2955   PUSH_ON_RUN_QUEUE(tso);
2956   RELEASE_LOCK(&sched_mutex);
2957 }
2958 #endif
2959
2960 /* -----------------------------------------------------------------------------
2961  * raiseAsync()
2962  *
2963  * The following function implements the magic for raising an
2964  * asynchronous exception in an existing thread.
2965  *
2966  * We first remove the thread from any queue on which it might be
2967  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2968  *
2969  * We strip the stack down to the innermost CATCH_FRAME, building
2970  * thunks in the heap for all the active computations, so they can 
2971  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2972  * an application of the handler to the exception, and push it on
2973  * the top of the stack.
2974  * 
2975  * How exactly do we save all the active computations?  We create an
2976  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2977  * AP_UPDs pushes everything from the corresponding update frame
2978  * upwards onto the stack.  (Actually, it pushes everything up to the
2979  * next update frame plus a pointer to the next AP_UPD object.
2980  * Entering the next AP_UPD object pushes more onto the stack until we
2981  * reach the last AP_UPD object - at which point the stack should look
2982  * exactly as it did when we killed the TSO and we can continue
2983  * execution by entering the closure on top of the stack.
2984  *
2985  * We can also kill a thread entirely - this happens if either (a) the 
2986  * exception passed to raiseAsync is NULL, or (b) there's no
2987  * CATCH_FRAME on the stack.  In either case, we strip the entire
2988  * stack and replace the thread with a zombie.
2989  *
2990  * -------------------------------------------------------------------------- */
2991  
2992 void 
2993 deleteThread(StgTSO *tso)
2994 {
2995   raiseAsync(tso,NULL);
2996 }
2997
2998 void
2999 raiseAsync(StgTSO *tso, StgClosure *exception)
3000 {
3001   StgUpdateFrame* su = tso->su;
3002   StgPtr          sp = tso->sp;
3003   
3004   /* Thread already dead? */
3005   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3006     return;
3007   }
3008
3009   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3010
3011   /* Remove it from any blocking queues */
3012   unblockThread(tso);
3013
3014   /* The stack freezing code assumes there's a closure pointer on
3015    * the top of the stack.  This isn't always the case with compiled
3016    * code, so we have to push a dummy closure on the top which just
3017    * returns to the next return address on the stack.
3018    */
3019   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3020     *(--sp) = (W_)&stg_dummy_ret_closure;
3021   }
3022
3023   while (1) {
3024     nat words = ((P_)su - (P_)sp) - 1;
3025     nat i;
3026     StgAP_UPD * ap;
3027
3028     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3029      * then build PAP(handler,exception,realworld#), and leave it on
3030      * top of the stack ready to enter.
3031      */
3032     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3033       StgCatchFrame *cf = (StgCatchFrame *)su;
3034       /* we've got an exception to raise, so let's pass it to the
3035        * handler in this frame.
3036        */
3037       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3038       TICK_ALLOC_UPD_PAP(3,0);
3039       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3040               
3041       ap->n_args = 2;
3042       ap->fun = cf->handler;    /* :: Exception -> IO a */
3043       ap->payload[0] = exception;
3044       ap->payload[1] = ARG_TAG(0); /* realworld token */
3045
3046       /* throw away the stack from Sp up to and including the
3047        * CATCH_FRAME.
3048        */
3049       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3050       tso->su = cf->link;
3051
3052       /* Restore the blocked/unblocked state for asynchronous exceptions
3053        * at the CATCH_FRAME.  
3054        *
3055        * If exceptions were unblocked at the catch, arrange that they
3056        * are unblocked again after executing the handler by pushing an
3057        * unblockAsyncExceptions_ret stack frame.
3058        */
3059       if (!cf->exceptions_blocked) {
3060         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3061       }
3062       
3063       /* Ensure that async exceptions are blocked when running the handler.
3064        */
3065       if (tso->blocked_exceptions == NULL) {
3066         tso->blocked_exceptions = END_TSO_QUEUE;
3067       }
3068       
3069       /* Put the newly-built PAP on top of the stack, ready to execute
3070        * when the thread restarts.
3071        */
3072       sp[0] = (W_)ap;
3073       tso->sp = sp;
3074       tso->what_next = ThreadEnterGHC;
3075       IF_DEBUG(sanity, checkTSO(tso));
3076       return;
3077     }
3078
3079     /* First build an AP_UPD consisting of the stack chunk above the
3080      * current update frame, with the top word on the stack as the
3081      * fun field.
3082      */
3083     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3084     
3085     ASSERT(words >= 0);
3086     
3087     ap->n_args = words;
3088     ap->fun    = (StgClosure *)sp[0];
3089     sp++;
3090     for(i=0; i < (nat)words; ++i) {
3091       ap->payload[i] = (StgClosure *)*sp++;
3092     }
3093     
3094     switch (get_itbl(su)->type) {
3095       
3096     case UPDATE_FRAME:
3097       {
3098         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3099         TICK_ALLOC_UP_THK(words+1,0);
3100         
3101         IF_DEBUG(scheduler,
3102                  fprintf(stderr,  "scheduler: Updating ");
3103                  printPtr((P_)su->updatee); 
3104                  fprintf(stderr,  " with ");
3105                  printObj((StgClosure *)ap);
3106                  );
3107         
3108         /* Replace the updatee with an indirection - happily
3109          * this will also wake up any threads currently
3110          * waiting on the result.
3111          *
3112          * Warning: if we're in a loop, more than one update frame on
3113          * the stack may point to the same object.  Be careful not to
3114          * overwrite an IND_OLDGEN in this case, because we'll screw
3115          * up the mutable lists.  To be on the safe side, don't
3116          * overwrite any kind of indirection at all.  See also
3117          * threadSqueezeStack in GC.c, where we have to make a similar
3118          * check.
3119          */
3120         if (!closure_IND(su->updatee)) {
3121             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3122         }
3123         su = su->link;
3124         sp += sizeofW(StgUpdateFrame) -1;
3125         sp[0] = (W_)ap; /* push onto stack */
3126         break;
3127       }
3128
3129     case CATCH_FRAME:
3130       {
3131         StgCatchFrame *cf = (StgCatchFrame *)su;
3132         StgClosure* o;
3133         
3134         /* We want a PAP, not an AP_UPD.  Fortunately, the
3135          * layout's the same.
3136          */
3137         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3138         TICK_ALLOC_UPD_PAP(words+1,0);
3139         
3140         /* now build o = FUN(catch,ap,handler) */
3141         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3142         TICK_ALLOC_FUN(2,0);
3143         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3144         o->payload[0] = (StgClosure *)ap;
3145         o->payload[1] = cf->handler;
3146         
3147         IF_DEBUG(scheduler,
3148                  fprintf(stderr,  "scheduler: Built ");
3149                  printObj((StgClosure *)o);
3150                  );
3151         
3152         /* pop the old handler and put o on the stack */
3153         su = cf->link;
3154         sp += sizeofW(StgCatchFrame) - 1;
3155         sp[0] = (W_)o;
3156         break;
3157       }
3158       
3159     case SEQ_FRAME:
3160       {
3161         StgSeqFrame *sf = (StgSeqFrame *)su;
3162         StgClosure* o;
3163         
3164         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3165         TICK_ALLOC_UPD_PAP(words+1,0);
3166         
3167         /* now build o = FUN(seq,ap) */
3168         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3169         TICK_ALLOC_SE_THK(1,0);
3170         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3171         o->payload[0] = (StgClosure *)ap;
3172         
3173         IF_DEBUG(scheduler,
3174                  fprintf(stderr,  "scheduler: Built ");
3175                  printObj((StgClosure *)o);
3176                  );
3177         
3178         /* pop the old handler and put o on the stack */
3179         su = sf->link;
3180         sp += sizeofW(StgSeqFrame) - 1;
3181         sp[0] = (W_)o;
3182         break;
3183       }
3184       
3185     case STOP_FRAME:
3186       /* We've stripped the entire stack, the thread is now dead. */
3187       sp += sizeofW(StgStopFrame) - 1;
3188       sp[0] = (W_)exception;    /* save the exception */
3189       tso->what_next = ThreadKilled;
3190       tso->su = (StgUpdateFrame *)(sp+1);
3191       tso->sp = sp;
3192       return;
3193
3194     default:
3195       barf("raiseAsync");
3196     }
3197   }
3198   barf("raiseAsync");
3199 }
3200
3201 /* -----------------------------------------------------------------------------
3202    resurrectThreads is called after garbage collection on the list of
3203    threads found to be garbage.  Each of these threads will be woken
3204    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3205    on an MVar, or NonTermination if the thread was blocked on a Black
3206    Hole.
3207    -------------------------------------------------------------------------- */
3208
3209 void
3210 resurrectThreads( StgTSO *threads )
3211 {
3212   StgTSO *tso, *next;
3213
3214   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3215     next = tso->global_link;
3216     tso->global_link = all_threads;
3217     all_threads = tso;
3218     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3219
3220     switch (tso->why_blocked) {
3221     case BlockedOnMVar:
3222     case BlockedOnException:
3223       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3224       break;
3225     case BlockedOnBlackHole:
3226       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3227       break;
3228     case NotBlocked:
3229       /* This might happen if the thread was blocked on a black hole
3230        * belonging to a thread that we've just woken up (raiseAsync
3231        * can wake up threads, remember...).
3232        */
3233       continue;
3234     default:
3235       barf("resurrectThreads: thread blocked in a strange way");
3236     }
3237   }
3238 }
3239
3240 /* -----------------------------------------------------------------------------
3241  * Blackhole detection: if we reach a deadlock, test whether any
3242  * threads are blocked on themselves.  Any threads which are found to
3243  * be self-blocked get sent a NonTermination exception.
3244  *
3245  * This is only done in a deadlock situation in order to avoid
3246  * performance overhead in the normal case.
3247  * -------------------------------------------------------------------------- */
3248
3249 static void
3250 detectBlackHoles( void )
3251 {
3252     StgTSO *t = all_threads;
3253     StgUpdateFrame *frame;
3254     StgClosure *blocked_on;
3255
3256     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3257
3258         while (t->what_next == ThreadRelocated) {
3259             t = t->link;
3260             ASSERT(get_itbl(t)->type == TSO);
3261         }
3262       
3263         if (t->why_blocked != BlockedOnBlackHole) {
3264             continue;
3265         }
3266
3267         blocked_on = t->block_info.closure;
3268
3269         for (frame = t->su; ; frame = frame->link) {
3270             switch (get_itbl(frame)->type) {
3271
3272             case UPDATE_FRAME:
3273                 if (frame->updatee == blocked_on) {
3274                     /* We are blocking on one of our own computations, so
3275                      * send this thread the NonTermination exception.  
3276                      */
3277                     IF_DEBUG(scheduler, 
3278                              sched_belch("thread %d is blocked on itself", t->id));
3279                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3280                     goto done;
3281                 }
3282                 else {
3283                     continue;
3284                 }
3285
3286             case CATCH_FRAME:
3287             case SEQ_FRAME:
3288                 continue;
3289                 
3290             case STOP_FRAME:
3291                 break;
3292             }
3293             break;
3294         }
3295
3296     done: ;
3297     }   
3298 }
3299
3300 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3301 //@subsection Debugging Routines
3302
3303 /* -----------------------------------------------------------------------------
3304    Debugging: why is a thread blocked
3305    -------------------------------------------------------------------------- */
3306
3307 #ifdef DEBUG
3308
3309 void
3310 printThreadBlockage(StgTSO *tso)
3311 {
3312   switch (tso->why_blocked) {
3313   case BlockedOnRead:
3314     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3315     break;
3316   case BlockedOnWrite:
3317     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3318     break;
3319   case BlockedOnDelay:
3320     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3321     break;
3322   case BlockedOnMVar:
3323     fprintf(stderr,"is blocked on an MVar");
3324     break;
3325   case BlockedOnException:
3326     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3327             tso->block_info.tso->id);
3328     break;
3329   case BlockedOnBlackHole:
3330     fprintf(stderr,"is blocked on a black hole");
3331     break;
3332   case NotBlocked:
3333     fprintf(stderr,"is not blocked");
3334     break;
3335 #if defined(PAR)
3336   case BlockedOnGA:
3337     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3338             tso->block_info.closure, info_type(tso->block_info.closure));
3339     break;
3340   case BlockedOnGA_NoSend:
3341     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3342             tso->block_info.closure, info_type(tso->block_info.closure));
3343     break;
3344 #endif
3345   default:
3346     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3347          tso->why_blocked, tso->id, tso);
3348   }
3349 }
3350
3351 void
3352 printThreadStatus(StgTSO *tso)
3353 {
3354   switch (tso->what_next) {
3355   case ThreadKilled:
3356     fprintf(stderr,"has been killed");
3357     break;
3358   case ThreadComplete:
3359     fprintf(stderr,"has completed");
3360     break;
3361   default:
3362     printThreadBlockage(tso);
3363   }
3364 }
3365
3366 void
3367 printAllThreads(void)
3368 {
3369   StgTSO *t;
3370
3371 # if defined(GRAN)
3372   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3373   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3374                        time_string, rtsFalse/*no commas!*/);
3375
3376   sched_belch("all threads at [%s]:", time_string);
3377 # elif defined(PAR)
3378   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3379   ullong_format_string(CURRENT_TIME,
3380                        time_string, rtsFalse/*no commas!*/);
3381
3382   sched_belch("all threads at [%s]:", time_string);
3383 # else
3384   sched_belch("all threads:");
3385 # endif
3386
3387   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3388     fprintf(stderr, "\tthread %d ", t->id);
3389     printThreadStatus(t);
3390     fprintf(stderr,"\n");
3391   }
3392 }
3393     
3394 /* 
3395    Print a whole blocking queue attached to node (debugging only).
3396 */
3397 //@cindex print_bq
3398 # if defined(PAR)
3399 void 
3400 print_bq (StgClosure *node)
3401 {
3402   StgBlockingQueueElement *bqe;
3403   StgTSO *tso;
3404   rtsBool end;
3405
3406   fprintf(stderr,"## BQ of closure %p (%s): ",
3407           node, info_type(node));
3408
3409   /* should cover all closures that may have a blocking queue */
3410   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3411          get_itbl(node)->type == FETCH_ME_BQ ||
3412          get_itbl(node)->type == RBH ||
3413          get_itbl(node)->type == MVAR);
3414     
3415   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3416
3417   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3418 }
3419
3420 /* 
3421    Print a whole blocking queue starting with the element bqe.
3422 */
3423 void 
3424 print_bqe (StgBlockingQueueElement *bqe)
3425 {
3426   rtsBool end;
3427
3428   /* 
3429      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3430   */
3431   for (end = (bqe==END_BQ_QUEUE);
3432        !end; // iterate until bqe points to a CONSTR
3433        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3434        bqe = end ? END_BQ_QUEUE : bqe->link) {
3435     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3436     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3437     /* types of closures that may appear in a blocking queue */
3438     ASSERT(get_itbl(bqe)->type == TSO ||           
3439            get_itbl(bqe)->type == BLOCKED_FETCH || 
3440            get_itbl(bqe)->type == CONSTR); 
3441     /* only BQs of an RBH end with an RBH_Save closure */
3442     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3443
3444     switch (get_itbl(bqe)->type) {
3445     case TSO:
3446       fprintf(stderr," TSO %u (%x),",
3447               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3448       break;
3449     case BLOCKED_FETCH:
3450       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3451               ((StgBlockedFetch *)bqe)->node, 
3452               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3453               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3454               ((StgBlockedFetch *)bqe)->ga.weight);
3455       break;
3456     case CONSTR:
3457       fprintf(stderr," %s (IP %p),",
3458               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3459                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3460                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3461                "RBH_Save_?"), get_itbl(bqe));
3462       break;
3463     default:
3464       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3465            info_type((StgClosure *)bqe)); // , node, info_type(node));
3466       break;
3467     }
3468   } /* for */
3469   fputc('\n', stderr);
3470 }
3471 # elif defined(GRAN)
3472 void 
3473 print_bq (StgClosure *node)
3474 {
3475   StgBlockingQueueElement *bqe;
3476   PEs node_loc, tso_loc;
3477   rtsBool end;
3478
3479   /* should cover all closures that may have a blocking queue */
3480   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3481          get_itbl(node)->type == FETCH_ME_BQ ||
3482          get_itbl(node)->type == RBH);
3483     
3484   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3485   node_loc = where_is(node);
3486
3487   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3488           node, info_type(node), node_loc);
3489
3490   /* 
3491      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3492   */
3493   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3494        !end; // iterate until bqe points to a CONSTR
3495        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3496     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3497     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3498     /* types of closures that may appear in a blocking queue */
3499     ASSERT(get_itbl(bqe)->type == TSO ||           
3500            get_itbl(bqe)->type == CONSTR); 
3501     /* only BQs of an RBH end with an RBH_Save closure */
3502     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3503
3504     tso_loc = where_is((StgClosure *)bqe);
3505     switch (get_itbl(bqe)->type) {
3506     case TSO:
3507       fprintf(stderr," TSO %d (%p) on [PE %d],",
3508               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3509       break;
3510     case CONSTR:
3511       fprintf(stderr," %s (IP %p),",
3512               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3513                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3514                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3515                "RBH_Save_?"), get_itbl(bqe));
3516       break;
3517     default:
3518       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3519            info_type((StgClosure *)bqe), node, info_type(node));
3520       break;
3521     }
3522   } /* for */
3523   fputc('\n', stderr);
3524 }
3525 #else
3526 /* 
3527    Nice and easy: only TSOs on the blocking queue
3528 */
3529 void 
3530 print_bq (StgClosure *node)
3531 {
3532   StgTSO *tso;
3533
3534   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3535   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3536        tso != END_TSO_QUEUE; 
3537        tso=tso->link) {
3538     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3539     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3540     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3541   }
3542   fputc('\n', stderr);
3543 }
3544 # endif
3545
3546 #if defined(PAR)
3547 static nat
3548 run_queue_len(void)
3549 {
3550   nat i;
3551   StgTSO *tso;
3552
3553   for (i=0, tso=run_queue_hd; 
3554        tso != END_TSO_QUEUE;
3555        i++, tso=tso->link)
3556     /* nothing */
3557
3558   return i;
3559 }
3560 #endif
3561
3562 static void
3563 sched_belch(char *s, ...)
3564 {
3565   va_list ap;
3566   va_start(ap,s);
3567 #ifdef SMP
3568   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3569 #elif defined(PAR)
3570   fprintf(stderr, "== ");
3571 #else
3572   fprintf(stderr, "scheduler: ");
3573 #endif
3574   vfprintf(stderr, s, ap);
3575   fprintf(stderr, "\n");
3576 }
3577
3578 #endif /* DEBUG */
3579
3580
3581 //@node Index,  , Debugging Routines, Main scheduling code
3582 //@subsection Index
3583
3584 //@index
3585 //* MainRegTable::  @cindex\s-+MainRegTable
3586 //* StgMainThread::  @cindex\s-+StgMainThread
3587 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3588 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3589 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3590 //* context_switch::  @cindex\s-+context_switch
3591 //* createThread::  @cindex\s-+createThread
3592 //* free_capabilities::  @cindex\s-+free_capabilities
3593 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3594 //* initScheduler::  @cindex\s-+initScheduler
3595 //* interrupted::  @cindex\s-+interrupted
3596 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3597 //* next_thread_id::  @cindex\s-+next_thread_id
3598 //* print_bq::  @cindex\s-+print_bq
3599 //* run_queue_hd::  @cindex\s-+run_queue_hd
3600 //* run_queue_tl::  @cindex\s-+run_queue_tl
3601 //* sched_mutex::  @cindex\s-+sched_mutex
3602 //* schedule::  @cindex\s-+schedule
3603 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3604 //* task_ids::  @cindex\s-+task_ids
3605 //* term_mutex::  @cindex\s-+term_mutex
3606 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3607 //@end index