[project @ 2001-11-22 14:25:11 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.107 2001/11/22 14:25:12 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "PosixSource.h"
78 #include "Rts.h"
79 #include "SchedAPI.h"
80 #include "RtsUtils.h"
81 #include "RtsFlags.h"
82 #include "Storage.h"
83 #include "StgRun.h"
84 #include "StgStartup.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #ifdef PROFILING
99 #include "Proftimer.h"
100 #include "ProfHeap.h"
101 #include "RetainerProfile.h"
102 #include "LdvProfile.h"
103 #endif
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
111 # include "HLC.h"
112 #endif
113 #include "Sparks.h"
114
115 #include <stdarg.h>
116
117 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
118 //@subsection Variables and Data structures
119
120 /* Main threads:
121  *
122  * These are the threads which clients have requested that we run.  
123  *
124  * In an SMP build, we might have several concurrent clients all
125  * waiting for results, and each one will wait on a condition variable
126  * until the result is available.
127  *
128  * In non-SMP, clients are strictly nested: the first client calls
129  * into the RTS, which might call out again to C with a _ccall_GC, and
130  * eventually re-enter the RTS.
131  *
132  * Main threads information is kept in a linked list:
133  */
134 //@cindex StgMainThread
135 typedef struct StgMainThread_ {
136   StgTSO *         tso;
137   SchedulerStatus  stat;
138   StgClosure **    ret;
139 #ifdef SMP
140   pthread_cond_t wakeup;
141 #endif
142   struct StgMainThread_ *link;
143 } StgMainThread;
144
145 /* Main thread queue.
146  * Locks required: sched_mutex.
147  */
148 static StgMainThread *main_threads;
149
150 /* Thread queues.
151  * Locks required: sched_mutex.
152  */
153 #if defined(GRAN)
154
155 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
156 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
157
158 /* 
159    In GranSim we have a runable and a blocked queue for each processor.
160    In order to minimise code changes new arrays run_queue_hds/tls
161    are created. run_queue_hd is then a short cut (macro) for
162    run_queue_hds[CurrentProc] (see GranSim.h).
163    -- HWL
164 */
165 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
166 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
167 StgTSO *ccalling_threadss[MAX_PROC];
168 /* We use the same global list of threads (all_threads) in GranSim as in
169    the std RTS (i.e. we are cheating). However, we don't use this list in
170    the GranSim specific code at the moment (so we are only potentially
171    cheating).  */
172
173 #else /* !GRAN */
174
175 StgTSO *run_queue_hd, *run_queue_tl;
176 StgTSO *blocked_queue_hd, *blocked_queue_tl;
177 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
178
179 #endif
180
181 /* Linked list of all threads.
182  * Used for detecting garbage collected threads.
183  */
184 StgTSO *all_threads;
185
186 /* Threads suspended in _ccall_GC.
187  */
188 static StgTSO *suspended_ccalling_threads;
189
190 static StgTSO *threadStackOverflow(StgTSO *tso);
191
192 /* KH: The following two flags are shared memory locations.  There is no need
193        to lock them, since they are only unset at the end of a scheduler
194        operation.
195 */
196
197 /* flag set by signal handler to precipitate a context switch */
198 //@cindex context_switch
199 nat context_switch;
200
201 /* if this flag is set as well, give up execution */
202 //@cindex interrupted
203 rtsBool interrupted;
204
205 /* Next thread ID to allocate.
206  * Locks required: sched_mutex
207  */
208 //@cindex next_thread_id
209 StgThreadID next_thread_id = 1;
210
211 /*
212  * Pointers to the state of the current thread.
213  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
214  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
215  */
216  
217 /* The smallest stack size that makes any sense is:
218  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
219  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
220  *  + 1                       (the realworld token for an IO thread)
221  *  + 1                       (the closure to enter)
222  *
223  * A thread with this stack will bomb immediately with a stack
224  * overflow, which will increase its stack size.  
225  */
226
227 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
228
229 /* Free capability list.
230  * Locks required: sched_mutex.
231  */
232 #ifdef SMP
233 Capability *free_capabilities; /* Available capabilities for running threads */
234 nat n_free_capabilities;       /* total number of available capabilities */
235 #else
236 Capability MainCapability;     /* for non-SMP, we have one global capability */
237 #endif
238
239 #if defined(GRAN)
240 StgTSO *CurrentTSO;
241 #endif
242
243 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
244  *  exists - earlier gccs apparently didn't.
245  *  -= chak
246  */
247 StgTSO dummy_tso;
248
249 rtsBool ready_to_gc;
250
251 /* All our current task ids, saved in case we need to kill them later.
252  */
253 #ifdef SMP
254 //@cindex task_ids
255 task_info *task_ids;
256 #endif
257
258 void            addToBlockedQueue ( StgTSO *tso );
259
260 static void     schedule          ( void );
261        void     interruptStgRts   ( void );
262 #if defined(GRAN)
263 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
264 #else
265 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
266 #endif
267
268 static void     detectBlackHoles  ( void );
269
270 #ifdef DEBUG
271 static void sched_belch(char *s, ...);
272 #endif
273
274 #ifdef SMP
275 //@cindex sched_mutex
276 //@cindex term_mutex
277 //@cindex thread_ready_cond
278 //@cindex gc_pending_cond
279 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
280 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
281 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
282 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
283
284 nat await_death;
285 #endif
286
287 #if defined(PAR)
288 StgTSO *LastTSO;
289 rtsTime TimeOfLastYield;
290 rtsBool emitSchedule = rtsTrue;
291 #endif
292
293 #if DEBUG
294 char *whatNext_strs[] = {
295   "ThreadEnterGHC",
296   "ThreadRunGHC",
297   "ThreadEnterInterp",
298   "ThreadKilled",
299   "ThreadComplete"
300 };
301
302 char *threadReturnCode_strs[] = {
303   "HeapOverflow",                       /* might also be StackOverflow */
304   "StackOverflow",
305   "ThreadYielding",
306   "ThreadBlocked",
307   "ThreadFinished"
308 };
309 #endif
310
311 #ifdef PAR
312 StgTSO * createSparkThread(rtsSpark spark);
313 StgTSO * activateSpark (rtsSpark spark);  
314 #endif
315
316 /*
317  * The thread state for the main thread.
318 // ToDo: check whether not needed any more
319 StgTSO   *MainTSO;
320  */
321
322 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
323 //@subsection Main scheduling loop
324
325 /* ---------------------------------------------------------------------------
326    Main scheduling loop.
327
328    We use round-robin scheduling, each thread returning to the
329    scheduler loop when one of these conditions is detected:
330
331       * out of heap space
332       * timer expires (thread yields)
333       * thread blocks
334       * thread ends
335       * stack overflow
336
337    Locking notes:  we acquire the scheduler lock once at the beginning
338    of the scheduler loop, and release it when
339     
340       * running a thread, or
341       * waiting for work, or
342       * waiting for a GC to complete.
343
344    GRAN version:
345      In a GranSim setup this loop iterates over the global event queue.
346      This revolves around the global event queue, which determines what 
347      to do next. Therefore, it's more complicated than either the 
348      concurrent or the parallel (GUM) setup.
349
350    GUM version:
351      GUM iterates over incoming messages.
352      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
353      and sends out a fish whenever it has nothing to do; in-between
354      doing the actual reductions (shared code below) it processes the
355      incoming messages and deals with delayed operations 
356      (see PendingFetches).
357      This is not the ugliest code you could imagine, but it's bloody close.
358
359    ------------------------------------------------------------------------ */
360 //@cindex schedule
361 static void
362 schedule( void )
363 {
364   StgTSO *t;
365   Capability *cap;
366   StgThreadReturnCode ret;
367 #if defined(GRAN)
368   rtsEvent *event;
369 #elif defined(PAR)
370   StgSparkPool *pool;
371   rtsSpark spark;
372   StgTSO *tso;
373   GlobalTaskId pe;
374   rtsBool receivedFinish = rtsFalse;
375 # if defined(DEBUG)
376   nat tp_size, sp_size; // stats only
377 # endif
378 #endif
379   rtsBool was_interrupted = rtsFalse;
380   
381   ACQUIRE_LOCK(&sched_mutex);
382
383 #if defined(GRAN)
384
385   /* set up first event to get things going */
386   /* ToDo: assign costs for system setup and init MainTSO ! */
387   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
388             ContinueThread, 
389             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
390
391   IF_DEBUG(gran,
392            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
393            G_TSO(CurrentTSO, 5));
394
395   if (RtsFlags.GranFlags.Light) {
396     /* Save current time; GranSim Light only */
397     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
398   }      
399
400   event = get_next_event();
401
402   while (event!=(rtsEvent*)NULL) {
403     /* Choose the processor with the next event */
404     CurrentProc = event->proc;
405     CurrentTSO = event->tso;
406
407 #elif defined(PAR)
408
409   while (!receivedFinish) {    /* set by processMessages */
410                                /* when receiving PP_FINISH message         */ 
411 #else
412
413   while (1) {
414
415 #endif
416
417     IF_DEBUG(scheduler, printAllThreads());
418
419     /* If we're interrupted (the user pressed ^C, or some other
420      * termination condition occurred), kill all the currently running
421      * threads.
422      */
423     if (interrupted) {
424       IF_DEBUG(scheduler, sched_belch("interrupted"));
425       deleteAllThreads();
426       interrupted = rtsFalse;
427       was_interrupted = rtsTrue;
428     }
429
430     /* Go through the list of main threads and wake up any
431      * clients whose computations have finished.  ToDo: this
432      * should be done more efficiently without a linear scan
433      * of the main threads list, somehow...
434      */
435 #ifdef SMP
436     { 
437       StgMainThread *m, **prev;
438       prev = &main_threads;
439       for (m = main_threads; m != NULL; m = m->link) {
440         switch (m->tso->what_next) {
441         case ThreadComplete:
442           if (m->ret) {
443             *(m->ret) = (StgClosure *)m->tso->sp[0];
444           }
445           *prev = m->link;
446           m->stat = Success;
447           pthread_cond_broadcast(&m->wakeup);
448           break;
449         case ThreadKilled:
450           if (m->ret) *(m->ret) = NULL;
451           *prev = m->link;
452           if (was_interrupted) {
453             m->stat = Interrupted;
454           } else {
455             m->stat = Killed;
456           }
457           pthread_cond_broadcast(&m->wakeup);
458           break;
459         default:
460           break;
461         }
462       }
463     }
464
465 #else // not SMP
466
467 # if defined(PAR)
468     /* in GUM do this only on the Main PE */
469     if (IAmMainThread)
470 # endif
471     /* If our main thread has finished or been killed, return.
472      */
473     {
474       StgMainThread *m = main_threads;
475       if (m->tso->what_next == ThreadComplete
476           || m->tso->what_next == ThreadKilled) {
477         main_threads = main_threads->link;
478         if (m->tso->what_next == ThreadComplete) {
479           /* we finished successfully, fill in the return value */
480           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
481           m->stat = Success;
482           return;
483         } else {
484           if (m->ret) { *(m->ret) = NULL; };
485           if (was_interrupted) {
486             m->stat = Interrupted;
487           } else {
488             m->stat = Killed;
489           }
490           return;
491         }
492       }
493     }
494 #endif
495
496     /* Top up the run queue from our spark pool.  We try to make the
497      * number of threads in the run queue equal to the number of
498      * free capabilities.
499      */
500 #if defined(SMP)
501     {
502       nat n = n_free_capabilities;
503       StgTSO *tso = run_queue_hd;
504
505       /* Count the run queue */
506       while (n > 0 && tso != END_TSO_QUEUE) {
507         tso = tso->link;
508         n--;
509       }
510
511       for (; n > 0; n--) {
512         StgClosure *spark;
513         spark = findSpark(rtsFalse);
514         if (spark == NULL) {
515           break; /* no more sparks in the pool */
516         } else {
517           /* I'd prefer this to be done in activateSpark -- HWL */
518           /* tricky - it needs to hold the scheduler lock and
519            * not try to re-acquire it -- SDM */
520           createSparkThread(spark);       
521           IF_DEBUG(scheduler,
522                    sched_belch("==^^ turning spark of closure %p into a thread",
523                                (StgClosure *)spark));
524         }
525       }
526       /* We need to wake up the other tasks if we just created some
527        * work for them.
528        */
529       if (n_free_capabilities - n > 1) {
530           pthread_cond_signal(&thread_ready_cond);
531       }
532     }
533 #endif // SMP
534
535     /* check for signals each time around the scheduler */
536 #ifndef mingw32_TARGET_OS
537     if (signals_pending()) {
538       startSignalHandlers();
539     }
540 #endif
541
542     /* Check whether any waiting threads need to be woken up.  If the
543      * run queue is empty, and there are no other tasks running, we
544      * can wait indefinitely for something to happen.
545      * ToDo: what if another client comes along & requests another
546      * main thread?
547      */
548     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
549       awaitEvent(
550            (run_queue_hd == END_TSO_QUEUE)
551 #ifdef SMP
552         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
553 #endif
554         );
555     }
556     /* we can be interrupted while waiting for I/O... */
557     if (interrupted) continue;
558
559     /* 
560      * Detect deadlock: when we have no threads to run, there are no
561      * threads waiting on I/O or sleeping, and all the other tasks are
562      * waiting for work, we must have a deadlock of some description.
563      *
564      * We first try to find threads blocked on themselves (ie. black
565      * holes), and generate NonTermination exceptions where necessary.
566      *
567      * If no threads are black holed, we have a deadlock situation, so
568      * inform all the main threads.
569      */
570 #ifndef PAR
571     if (blocked_queue_hd == END_TSO_QUEUE
572         && run_queue_hd == END_TSO_QUEUE
573         && sleeping_queue == END_TSO_QUEUE
574 #ifdef SMP
575         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
576 #endif
577         )
578     {
579         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
580         GarbageCollect(GetRoots,rtsTrue);
581         if (blocked_queue_hd == END_TSO_QUEUE
582             && run_queue_hd == END_TSO_QUEUE
583             && sleeping_queue == END_TSO_QUEUE) {
584             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
585             detectBlackHoles();
586             if (run_queue_hd == END_TSO_QUEUE) {
587                 StgMainThread *m = main_threads;
588 #ifdef SMP
589                 for (; m != NULL; m = m->link) {
590                     deleteThread(m->tso);
591                     m->ret = NULL;
592                     m->stat = Deadlock;
593                     pthread_cond_broadcast(&m->wakeup);
594                 }
595                 main_threads = NULL;
596 #else
597                 deleteThread(m->tso);
598                 m->ret = NULL;
599                 m->stat = Deadlock;
600                 main_threads = m->link;
601                 return;
602 #endif
603             }
604         }
605     }
606 #elif defined(PAR)
607     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
608 #endif
609
610 #ifdef SMP
611     /* If there's a GC pending, don't do anything until it has
612      * completed.
613      */
614     if (ready_to_gc) {
615       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
616       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
617     }
618     
619     /* block until we've got a thread on the run queue and a free
620      * capability.
621      */
622     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
623       IF_DEBUG(scheduler, sched_belch("waiting for work"));
624       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
625       IF_DEBUG(scheduler, sched_belch("work now available"));
626     }
627 #endif
628
629 #if defined(GRAN)
630
631     if (RtsFlags.GranFlags.Light)
632       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
633
634     /* adjust time based on time-stamp */
635     if (event->time > CurrentTime[CurrentProc] &&
636         event->evttype != ContinueThread)
637       CurrentTime[CurrentProc] = event->time;
638     
639     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
640     if (!RtsFlags.GranFlags.Light)
641       handleIdlePEs();
642
643     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
644
645     /* main event dispatcher in GranSim */
646     switch (event->evttype) {
647       /* Should just be continuing execution */
648     case ContinueThread:
649       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
650       /* ToDo: check assertion
651       ASSERT(run_queue_hd != (StgTSO*)NULL &&
652              run_queue_hd != END_TSO_QUEUE);
653       */
654       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
655       if (!RtsFlags.GranFlags.DoAsyncFetch &&
656           procStatus[CurrentProc]==Fetching) {
657         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
658               CurrentTSO->id, CurrentTSO, CurrentProc);
659         goto next_thread;
660       } 
661       /* Ignore ContinueThreads for completed threads */
662       if (CurrentTSO->what_next == ThreadComplete) {
663         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
664               CurrentTSO->id, CurrentTSO, CurrentProc);
665         goto next_thread;
666       } 
667       /* Ignore ContinueThreads for threads that are being migrated */
668       if (PROCS(CurrentTSO)==Nowhere) { 
669         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
670               CurrentTSO->id, CurrentTSO, CurrentProc);
671         goto next_thread;
672       }
673       /* The thread should be at the beginning of the run queue */
674       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
675         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
676               CurrentTSO->id, CurrentTSO, CurrentProc);
677         break; // run the thread anyway
678       }
679       /*
680       new_event(proc, proc, CurrentTime[proc],
681                 FindWork,
682                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
683       goto next_thread; 
684       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
685       break; // now actually run the thread; DaH Qu'vam yImuHbej 
686
687     case FetchNode:
688       do_the_fetchnode(event);
689       goto next_thread;             /* handle next event in event queue  */
690       
691     case GlobalBlock:
692       do_the_globalblock(event);
693       goto next_thread;             /* handle next event in event queue  */
694       
695     case FetchReply:
696       do_the_fetchreply(event);
697       goto next_thread;             /* handle next event in event queue  */
698       
699     case UnblockThread:   /* Move from the blocked queue to the tail of */
700       do_the_unblock(event);
701       goto next_thread;             /* handle next event in event queue  */
702       
703     case ResumeThread:  /* Move from the blocked queue to the tail of */
704       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
705       event->tso->gran.blocktime += 
706         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
707       do_the_startthread(event);
708       goto next_thread;             /* handle next event in event queue  */
709       
710     case StartThread:
711       do_the_startthread(event);
712       goto next_thread;             /* handle next event in event queue  */
713       
714     case MoveThread:
715       do_the_movethread(event);
716       goto next_thread;             /* handle next event in event queue  */
717       
718     case MoveSpark:
719       do_the_movespark(event);
720       goto next_thread;             /* handle next event in event queue  */
721       
722     case FindWork:
723       do_the_findwork(event);
724       goto next_thread;             /* handle next event in event queue  */
725       
726     default:
727       barf("Illegal event type %u\n", event->evttype);
728     }  /* switch */
729     
730     /* This point was scheduler_loop in the old RTS */
731
732     IF_DEBUG(gran, belch("GRAN: after main switch"));
733
734     TimeOfLastEvent = CurrentTime[CurrentProc];
735     TimeOfNextEvent = get_time_of_next_event();
736     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
737     // CurrentTSO = ThreadQueueHd;
738
739     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
740                          TimeOfNextEvent));
741
742     if (RtsFlags.GranFlags.Light) 
743       GranSimLight_leave_system(event, &ActiveTSO); 
744
745     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
746
747     IF_DEBUG(gran, 
748              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
749
750     /* in a GranSim setup the TSO stays on the run queue */
751     t = CurrentTSO;
752     /* Take a thread from the run queue. */
753     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
754
755     IF_DEBUG(gran, 
756              fprintf(stderr, "GRAN: About to run current thread, which is\n");
757              G_TSO(t,5));
758
759     context_switch = 0; // turned on via GranYield, checking events and time slice
760
761     IF_DEBUG(gran, 
762              DumpGranEvent(GR_SCHEDULE, t));
763
764     procStatus[CurrentProc] = Busy;
765
766 #elif defined(PAR)
767     if (PendingFetches != END_BF_QUEUE) {
768         processFetches();
769     }
770
771     /* ToDo: phps merge with spark activation above */
772     /* check whether we have local work and send requests if we have none */
773     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
774       /* :-[  no local threads => look out for local sparks */
775       /* the spark pool for the current PE */
776       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
777       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
778           pool->hd < pool->tl) {
779         /* 
780          * ToDo: add GC code check that we really have enough heap afterwards!!
781          * Old comment:
782          * If we're here (no runnable threads) and we have pending
783          * sparks, we must have a space problem.  Get enough space
784          * to turn one of those pending sparks into a
785          * thread... 
786          */
787
788         spark = findSpark(rtsFalse);                /* get a spark */
789         if (spark != (rtsSpark) NULL) {
790           tso = activateSpark(spark);       /* turn the spark into a thread */
791           IF_PAR_DEBUG(schedule,
792                        belch("==== schedule: Created TSO %d (%p); %d threads active",
793                              tso->id, tso, advisory_thread_count));
794
795           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
796             belch("==^^ failed to activate spark");
797             goto next_thread;
798           }               /* otherwise fall through & pick-up new tso */
799         } else {
800           IF_PAR_DEBUG(verbose,
801                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
802                              spark_queue_len(pool)));
803           goto next_thread;
804         }
805       }
806
807       /* If we still have no work we need to send a FISH to get a spark
808          from another PE 
809       */
810       if (EMPTY_RUN_QUEUE()) {
811       /* =8-[  no local sparks => look for work on other PEs */
812         /*
813          * We really have absolutely no work.  Send out a fish
814          * (there may be some out there already), and wait for
815          * something to arrive.  We clearly can't run any threads
816          * until a SCHEDULE or RESUME arrives, and so that's what
817          * we're hoping to see.  (Of course, we still have to
818          * respond to other types of messages.)
819          */
820         TIME now = msTime() /*CURRENT_TIME*/;
821         IF_PAR_DEBUG(verbose, 
822                      belch("--  now=%ld", now));
823         IF_PAR_DEBUG(verbose,
824                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
825                          (last_fish_arrived_at!=0 &&
826                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
827                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
828                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
829                              last_fish_arrived_at,
830                              RtsFlags.ParFlags.fishDelay, now);
831                      });
832         
833         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
834             (last_fish_arrived_at==0 ||
835              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
836           /* outstandingFishes is set in sendFish, processFish;
837              avoid flooding system with fishes via delay */
838           pe = choosePE();
839           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
840                    NEW_FISH_HUNGER);
841
842           // Global statistics: count no. of fishes
843           if (RtsFlags.ParFlags.ParStats.Global &&
844               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
845             globalParStats.tot_fish_mess++;
846           }
847         }
848       
849         receivedFinish = processMessages();
850         goto next_thread;
851       }
852     } else if (PacketsWaiting()) {  /* Look for incoming messages */
853       receivedFinish = processMessages();
854     }
855
856     /* Now we are sure that we have some work available */
857     ASSERT(run_queue_hd != END_TSO_QUEUE);
858
859     /* Take a thread from the run queue, if we have work */
860     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
861     IF_DEBUG(sanity,checkTSO(t));
862
863     /* ToDo: write something to the log-file
864     if (RTSflags.ParFlags.granSimStats && !sameThread)
865         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
866
867     CurrentTSO = t;
868     */
869     /* the spark pool for the current PE */
870     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
871
872     IF_DEBUG(scheduler, 
873              belch("--=^ %d threads, %d sparks on [%#x]", 
874                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
875
876 #if 1
877     if (0 && RtsFlags.ParFlags.ParStats.Full && 
878         t && LastTSO && t->id != LastTSO->id && 
879         LastTSO->why_blocked == NotBlocked && 
880         LastTSO->what_next != ThreadComplete) {
881       // if previously scheduled TSO not blocked we have to record the context switch
882       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
883                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
884     }
885
886     if (RtsFlags.ParFlags.ParStats.Full && 
887         (emitSchedule /* forced emit */ ||
888         (t && LastTSO && t->id != LastTSO->id))) {
889       /* 
890          we are running a different TSO, so write a schedule event to log file
891          NB: If we use fair scheduling we also have to write  a deschedule 
892              event for LastTSO; with unfair scheduling we know that the
893              previous tso has blocked whenever we switch to another tso, so
894              we don't need it in GUM for now
895       */
896       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
897                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
898       emitSchedule = rtsFalse;
899     }
900      
901 #endif
902 #else /* !GRAN && !PAR */
903   
904     /* grab a thread from the run queue
905      */
906     ASSERT(run_queue_hd != END_TSO_QUEUE);
907     t = POP_RUN_QUEUE();
908
909     // Sanity check the thread we're about to run.  This can be
910     // expensive if there is lots of thread switching going on...
911     IF_DEBUG(sanity,checkTSO(t));
912
913 #endif
914     
915     /* grab a capability
916      */
917 #ifdef SMP
918     cap = free_capabilities;
919     free_capabilities = cap->link;
920     n_free_capabilities--;
921 #else
922     cap = &MainCapability;
923 #endif
924
925     cap->r.rCurrentTSO = t;
926     
927     /* context switches are now initiated by the timer signal, unless
928      * the user specified "context switch as often as possible", with
929      * +RTS -C0
930      */
931     if (
932 #ifdef PROFILING
933         RtsFlags.ProfFlags.profileInterval == 0 ||
934 #endif
935         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
936          && (run_queue_hd != END_TSO_QUEUE
937              || blocked_queue_hd != END_TSO_QUEUE
938              || sleeping_queue != END_TSO_QUEUE)))
939         context_switch = 1;
940     else
941         context_switch = 0;
942
943     RELEASE_LOCK(&sched_mutex);
944
945     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
946                               t->id, t, whatNext_strs[t->what_next]));
947
948 #ifdef PROFILING
949     startHeapProfTimer();
950 #endif
951
952     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
953     /* Run the current thread 
954      */
955     switch (cap->r.rCurrentTSO->what_next) {
956     case ThreadKilled:
957     case ThreadComplete:
958         /* Thread already finished, return to scheduler. */
959         ret = ThreadFinished;
960         break;
961     case ThreadEnterGHC:
962         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
963         break;
964     case ThreadRunGHC:
965         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
966         break;
967     case ThreadEnterInterp:
968         ret = interpretBCO(cap);
969         break;
970     default:
971       barf("schedule: invalid what_next field");
972     }
973     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
974     
975     /* Costs for the scheduler are assigned to CCS_SYSTEM */
976 #ifdef PROFILING
977     stopHeapProfTimer();
978     CCCS = CCS_SYSTEM;
979 #endif
980     
981     ACQUIRE_LOCK(&sched_mutex);
982
983 #ifdef SMP
984     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
985 #elif !defined(GRAN) && !defined(PAR)
986     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
987 #endif
988     t = cap->r.rCurrentTSO;
989     
990 #if defined(PAR)
991     /* HACK 675: if the last thread didn't yield, make sure to print a 
992        SCHEDULE event to the log file when StgRunning the next thread, even
993        if it is the same one as before */
994     LastTSO = t; 
995     TimeOfLastYield = CURRENT_TIME;
996 #endif
997
998     switch (ret) {
999     case HeapOverflow:
1000 #if defined(GRAN)
1001       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1002       globalGranStats.tot_heapover++;
1003 #elif defined(PAR)
1004       globalParStats.tot_heapover++;
1005 #endif
1006
1007       // did the task ask for a large block?
1008       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1009           // if so, get one and push it on the front of the nursery.
1010           bdescr *bd;
1011           nat blocks;
1012           
1013           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1014
1015           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1016                                    t->id, t,
1017                                    whatNext_strs[t->what_next], blocks));
1018
1019           // don't do this if it would push us over the
1020           // alloc_blocks_lim limit; we'll GC first.
1021           if (alloc_blocks + blocks < alloc_blocks_lim) {
1022
1023               alloc_blocks += blocks;
1024               bd = allocGroup( blocks );
1025
1026               // link the new group into the list
1027               bd->link = cap->r.rCurrentNursery;
1028               bd->u.back = cap->r.rCurrentNursery->u.back;
1029               if (cap->r.rCurrentNursery->u.back != NULL) {
1030                   cap->r.rCurrentNursery->u.back->link = bd;
1031               } else {
1032                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1033                          g0s0->blocks == cap->r.rNursery);
1034                   cap->r.rNursery = g0s0->blocks = bd;
1035               }           
1036               cap->r.rCurrentNursery->u.back = bd;
1037
1038               // initialise it as a nursery block
1039               bd->step = g0s0;
1040               bd->gen_no = 0;
1041               bd->flags = 0;
1042               bd->free = bd->start;
1043
1044               // don't forget to update the block count in g0s0.
1045               g0s0->n_blocks += blocks;
1046               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1047
1048               // now update the nursery to point to the new block
1049               cap->r.rCurrentNursery = bd;
1050
1051               // we might be unlucky and have another thread get on the
1052               // run queue before us and steal the large block, but in that
1053               // case the thread will just end up requesting another large
1054               // block.
1055               PUSH_ON_RUN_QUEUE(t);
1056               break;
1057           }
1058       }
1059
1060       /* make all the running tasks block on a condition variable,
1061        * maybe set context_switch and wait till they all pile in,
1062        * then have them wait on a GC condition variable.
1063        */
1064       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1065                                t->id, t, whatNext_strs[t->what_next]));
1066       threadPaused(t);
1067 #if defined(GRAN)
1068       ASSERT(!is_on_queue(t,CurrentProc));
1069 #elif defined(PAR)
1070       /* Currently we emit a DESCHEDULE event before GC in GUM.
1071          ToDo: either add separate event to distinguish SYSTEM time from rest
1072                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1073       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1074         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1075                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1076         emitSchedule = rtsTrue;
1077       }
1078 #endif
1079       
1080       ready_to_gc = rtsTrue;
1081       context_switch = 1;               /* stop other threads ASAP */
1082       PUSH_ON_RUN_QUEUE(t);
1083       /* actual GC is done at the end of the while loop */
1084       break;
1085       
1086     case StackOverflow:
1087 #if defined(GRAN)
1088       IF_DEBUG(gran, 
1089                DumpGranEvent(GR_DESCHEDULE, t));
1090       globalGranStats.tot_stackover++;
1091 #elif defined(PAR)
1092       // IF_DEBUG(par, 
1093       // DumpGranEvent(GR_DESCHEDULE, t);
1094       globalParStats.tot_stackover++;
1095 #endif
1096       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1097                                t->id, t, whatNext_strs[t->what_next]));
1098       /* just adjust the stack for this thread, then pop it back
1099        * on the run queue.
1100        */
1101       threadPaused(t);
1102       { 
1103         StgMainThread *m;
1104         /* enlarge the stack */
1105         StgTSO *new_t = threadStackOverflow(t);
1106         
1107         /* This TSO has moved, so update any pointers to it from the
1108          * main thread stack.  It better not be on any other queues...
1109          * (it shouldn't be).
1110          */
1111         for (m = main_threads; m != NULL; m = m->link) {
1112           if (m->tso == t) {
1113             m->tso = new_t;
1114           }
1115         }
1116         threadPaused(new_t);
1117         PUSH_ON_RUN_QUEUE(new_t);
1118       }
1119       break;
1120
1121     case ThreadYielding:
1122 #if defined(GRAN)
1123       IF_DEBUG(gran, 
1124                DumpGranEvent(GR_DESCHEDULE, t));
1125       globalGranStats.tot_yields++;
1126 #elif defined(PAR)
1127       // IF_DEBUG(par, 
1128       // DumpGranEvent(GR_DESCHEDULE, t);
1129       globalParStats.tot_yields++;
1130 #endif
1131       /* put the thread back on the run queue.  Then, if we're ready to
1132        * GC, check whether this is the last task to stop.  If so, wake
1133        * up the GC thread.  getThread will block during a GC until the
1134        * GC is finished.
1135        */
1136       IF_DEBUG(scheduler,
1137                if (t->what_next == ThreadEnterInterp) {
1138                    /* ToDo: or maybe a timer expired when we were in Hugs?
1139                     * or maybe someone hit ctrl-C
1140                     */
1141                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1142                          t->id, t, whatNext_strs[t->what_next]);
1143                } else {
1144                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1145                          t->id, t, whatNext_strs[t->what_next]);
1146                }
1147                );
1148
1149       threadPaused(t);
1150
1151       IF_DEBUG(sanity,
1152                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1153                checkTSO(t));
1154       ASSERT(t->link == END_TSO_QUEUE);
1155 #if defined(GRAN)
1156       ASSERT(!is_on_queue(t,CurrentProc));
1157
1158       IF_DEBUG(sanity,
1159                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1160                checkThreadQsSanity(rtsTrue));
1161 #endif
1162 #if defined(PAR)
1163       if (RtsFlags.ParFlags.doFairScheduling) { 
1164         /* this does round-robin scheduling; good for concurrency */
1165         APPEND_TO_RUN_QUEUE(t);
1166       } else {
1167         /* this does unfair scheduling; good for parallelism */
1168         PUSH_ON_RUN_QUEUE(t);
1169       }
1170 #else
1171       /* this does round-robin scheduling; good for concurrency */
1172       APPEND_TO_RUN_QUEUE(t);
1173 #endif
1174 #if defined(GRAN)
1175       /* add a ContinueThread event to actually process the thread */
1176       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1177                 ContinueThread,
1178                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1179       IF_GRAN_DEBUG(bq, 
1180                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1181                G_EVENTQ(0);
1182                G_CURR_THREADQ(0));
1183 #endif /* GRAN */
1184       break;
1185       
1186     case ThreadBlocked:
1187 #if defined(GRAN)
1188       IF_DEBUG(scheduler,
1189                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1190                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1191                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1192
1193       // ??? needed; should emit block before
1194       IF_DEBUG(gran, 
1195                DumpGranEvent(GR_DESCHEDULE, t)); 
1196       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1197       /*
1198         ngoq Dogh!
1199       ASSERT(procStatus[CurrentProc]==Busy || 
1200               ((procStatus[CurrentProc]==Fetching) && 
1201               (t->block_info.closure!=(StgClosure*)NULL)));
1202       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1203           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1204             procStatus[CurrentProc]==Fetching)) 
1205         procStatus[CurrentProc] = Idle;
1206       */
1207 #elif defined(PAR)
1208       IF_DEBUG(scheduler,
1209                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1210                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1211       IF_PAR_DEBUG(bq,
1212
1213                    if (t->block_info.closure!=(StgClosure*)NULL) 
1214                      print_bq(t->block_info.closure));
1215
1216       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1217       blockThread(t);
1218
1219       /* whatever we schedule next, we must log that schedule */
1220       emitSchedule = rtsTrue;
1221
1222 #else /* !GRAN */
1223       /* don't need to do anything.  Either the thread is blocked on
1224        * I/O, in which case we'll have called addToBlockedQueue
1225        * previously, or it's blocked on an MVar or Blackhole, in which
1226        * case it'll be on the relevant queue already.
1227        */
1228       IF_DEBUG(scheduler,
1229                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1230                printThreadBlockage(t);
1231                fprintf(stderr, "\n"));
1232
1233       /* Only for dumping event to log file 
1234          ToDo: do I need this in GranSim, too?
1235       blockThread(t);
1236       */
1237 #endif
1238       threadPaused(t);
1239       break;
1240       
1241     case ThreadFinished:
1242       /* Need to check whether this was a main thread, and if so, signal
1243        * the task that started it with the return value.  If we have no
1244        * more main threads, we probably need to stop all the tasks until
1245        * we get a new one.
1246        */
1247       /* We also end up here if the thread kills itself with an
1248        * uncaught exception, see Exception.hc.
1249        */
1250       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1251 #if defined(GRAN)
1252       endThread(t, CurrentProc); // clean-up the thread
1253 #elif defined(PAR)
1254       /* For now all are advisory -- HWL */
1255       //if(t->priority==AdvisoryPriority) ??
1256       advisory_thread_count--;
1257       
1258 # ifdef DIST
1259       if(t->dist.priority==RevalPriority)
1260         FinishReval(t);
1261 # endif
1262       
1263       if (RtsFlags.ParFlags.ParStats.Full &&
1264           !RtsFlags.ParFlags.ParStats.Suppressed) 
1265         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1266 #endif
1267       break;
1268       
1269     default:
1270       barf("schedule: invalid thread return code %d", (int)ret);
1271     }
1272     
1273 #ifdef SMP
1274     cap->link = free_capabilities;
1275     free_capabilities = cap;
1276     n_free_capabilities++;
1277 #endif
1278
1279 #ifdef PROFILING
1280     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1281         if (RtsFlags.ProfFlags.doHeapProfile == HEAP_BY_RETAINER) { 
1282             //
1283             // Note: currently retainer profiling is performed after
1284             // a major garbage collection.
1285             //
1286             GarbageCollect(GetRoots, rtsTrue);
1287             retainerProfile();
1288         } else if (RtsFlags.ProfFlags.doHeapProfile == HEAP_BY_LDV) {
1289             //
1290             // We have LdvCensus() preceded by a major garbage
1291             // collection because we don't want *genuinely* dead
1292             // closures to be involved in LDV profiling. Another good
1293             // reason is to produce consistent profiling results
1294             // regardless of the interval at which GCs are performed.
1295             // In other words, we want LDV profiling results to be
1296             // completely independent of the GC interval.
1297             //
1298             GarbageCollect(GetRoots, rtsTrue);
1299             LdvCensus();
1300         } else {
1301             //
1302             // Normal creator-based heap profile
1303             //
1304             GarbageCollect(GetRoots, rtsTrue);
1305             heapCensus();
1306         }
1307         performHeapProfile = rtsFalse;
1308         ready_to_gc = rtsFalse; // we already GC'd
1309     }
1310 #endif
1311
1312 #ifdef SMP
1313     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1314 #else
1315     if (ready_to_gc) 
1316 #endif
1317       {
1318       /* everybody back, start the GC.
1319        * Could do it in this thread, or signal a condition var
1320        * to do it in another thread.  Either way, we need to
1321        * broadcast on gc_pending_cond afterward.
1322        */
1323 #ifdef SMP
1324       IF_DEBUG(scheduler,sched_belch("doing GC"));
1325 #endif
1326       GarbageCollect(GetRoots,rtsFalse);
1327       ready_to_gc = rtsFalse;
1328 #ifdef SMP
1329       pthread_cond_broadcast(&gc_pending_cond);
1330 #endif
1331 #if defined(GRAN)
1332       /* add a ContinueThread event to continue execution of current thread */
1333       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1334                 ContinueThread,
1335                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1336       IF_GRAN_DEBUG(bq, 
1337                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1338                G_EVENTQ(0);
1339                G_CURR_THREADQ(0));
1340 #endif /* GRAN */
1341     }
1342
1343 #if defined(GRAN)
1344   next_thread:
1345     IF_GRAN_DEBUG(unused,
1346                   print_eventq(EventHd));
1347
1348     event = get_next_event();
1349 #elif defined(PAR)
1350   next_thread:
1351     /* ToDo: wait for next message to arrive rather than busy wait */
1352 #endif /* GRAN */
1353
1354   } /* end of while(1) */
1355
1356   IF_PAR_DEBUG(verbose,
1357                belch("== Leaving schedule() after having received Finish"));
1358 }
1359
1360 /* ---------------------------------------------------------------------------
1361  * deleteAllThreads():  kill all the live threads.
1362  *
1363  * This is used when we catch a user interrupt (^C), before performing
1364  * any necessary cleanups and running finalizers.
1365  * ------------------------------------------------------------------------- */
1366    
1367 void deleteAllThreads ( void )
1368 {
1369   StgTSO* t;
1370   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1371   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1372       deleteThread(t);
1373   }
1374   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1375       deleteThread(t);
1376   }
1377   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1378       deleteThread(t);
1379   }
1380   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1381   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1382   sleeping_queue = END_TSO_QUEUE;
1383 }
1384
1385 /* startThread and  insertThread are now in GranSim.c -- HWL */
1386
1387 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1388 //@subsection Suspend and Resume
1389
1390 /* ---------------------------------------------------------------------------
1391  * Suspending & resuming Haskell threads.
1392  * 
1393  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1394  * its capability before calling the C function.  This allows another
1395  * task to pick up the capability and carry on running Haskell
1396  * threads.  It also means that if the C call blocks, it won't lock
1397  * the whole system.
1398  *
1399  * The Haskell thread making the C call is put to sleep for the
1400  * duration of the call, on the susepended_ccalling_threads queue.  We
1401  * give out a token to the task, which it can use to resume the thread
1402  * on return from the C function.
1403  * ------------------------------------------------------------------------- */
1404    
1405 StgInt
1406 suspendThread( StgRegTable *reg )
1407 {
1408   nat tok;
1409   Capability *cap;
1410
1411   // assume that *reg is a pointer to the StgRegTable part of a Capability
1412   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1413
1414   ACQUIRE_LOCK(&sched_mutex);
1415
1416   IF_DEBUG(scheduler,
1417            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1418
1419   threadPaused(cap->r.rCurrentTSO);
1420   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1421   suspended_ccalling_threads = cap->r.rCurrentTSO;
1422
1423   /* Use the thread ID as the token; it should be unique */
1424   tok = cap->r.rCurrentTSO->id;
1425
1426 #ifdef SMP
1427   cap->link = free_capabilities;
1428   free_capabilities = cap;
1429   n_free_capabilities++;
1430 #endif
1431
1432   RELEASE_LOCK(&sched_mutex);
1433   return tok; 
1434 }
1435
1436 StgRegTable *
1437 resumeThread( StgInt tok )
1438 {
1439   StgTSO *tso, **prev;
1440   Capability *cap;
1441
1442   ACQUIRE_LOCK(&sched_mutex);
1443
1444   prev = &suspended_ccalling_threads;
1445   for (tso = suspended_ccalling_threads; 
1446        tso != END_TSO_QUEUE; 
1447        prev = &tso->link, tso = tso->link) {
1448     if (tso->id == (StgThreadID)tok) {
1449       *prev = tso->link;
1450       break;
1451     }
1452   }
1453   if (tso == END_TSO_QUEUE) {
1454     barf("resumeThread: thread not found");
1455   }
1456   tso->link = END_TSO_QUEUE;
1457
1458 #ifdef SMP
1459   while (free_capabilities == NULL) {
1460     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1461     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1462     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1463   }
1464   cap = free_capabilities;
1465   free_capabilities = cap->link;
1466   n_free_capabilities--;
1467 #else  
1468   cap = &MainCapability;
1469 #endif
1470
1471   cap->r.rCurrentTSO = tso;
1472
1473   RELEASE_LOCK(&sched_mutex);
1474   return &cap->r;
1475 }
1476
1477
1478 /* ---------------------------------------------------------------------------
1479  * Static functions
1480  * ------------------------------------------------------------------------ */
1481 static void unblockThread(StgTSO *tso);
1482
1483 /* ---------------------------------------------------------------------------
1484  * Comparing Thread ids.
1485  *
1486  * This is used from STG land in the implementation of the
1487  * instances of Eq/Ord for ThreadIds.
1488  * ------------------------------------------------------------------------ */
1489
1490 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1491
1492   StgThreadID id1 = tso1->id; 
1493   StgThreadID id2 = tso2->id;
1494  
1495   if (id1 < id2) return (-1);
1496   if (id1 > id2) return 1;
1497   return 0;
1498 }
1499
1500 /* ---------------------------------------------------------------------------
1501  * Fetching the ThreadID from an StgTSO.
1502  *
1503  * This is used in the implementation of Show for ThreadIds.
1504  * ------------------------------------------------------------------------ */
1505 int rts_getThreadId(const StgTSO *tso) 
1506 {
1507   return tso->id;
1508 }
1509
1510 /* ---------------------------------------------------------------------------
1511    Create a new thread.
1512
1513    The new thread starts with the given stack size.  Before the
1514    scheduler can run, however, this thread needs to have a closure
1515    (and possibly some arguments) pushed on its stack.  See
1516    pushClosure() in Schedule.h.
1517
1518    createGenThread() and createIOThread() (in SchedAPI.h) are
1519    convenient packaged versions of this function.
1520
1521    currently pri (priority) is only used in a GRAN setup -- HWL
1522    ------------------------------------------------------------------------ */
1523 //@cindex createThread
1524 #if defined(GRAN)
1525 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1526 StgTSO *
1527 createThread(nat stack_size, StgInt pri)
1528 {
1529   return createThread_(stack_size, rtsFalse, pri);
1530 }
1531
1532 static StgTSO *
1533 createThread_(nat size, rtsBool have_lock, StgInt pri)
1534 {
1535 #else
1536 StgTSO *
1537 createThread(nat stack_size)
1538 {
1539   return createThread_(stack_size, rtsFalse);
1540 }
1541
1542 static StgTSO *
1543 createThread_(nat size, rtsBool have_lock)
1544 {
1545 #endif
1546
1547     StgTSO *tso;
1548     nat stack_size;
1549
1550     /* First check whether we should create a thread at all */
1551 #if defined(PAR)
1552   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1553   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1554     threadsIgnored++;
1555     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1556           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1557     return END_TSO_QUEUE;
1558   }
1559   threadsCreated++;
1560 #endif
1561
1562 #if defined(GRAN)
1563   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1564 #endif
1565
1566   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1567
1568   /* catch ridiculously small stack sizes */
1569   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1570     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1571   }
1572
1573   stack_size = size - TSO_STRUCT_SIZEW;
1574
1575   tso = (StgTSO *)allocate(size);
1576   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1577
1578   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1579 #if defined(GRAN)
1580   SET_GRAN_HDR(tso, ThisPE);
1581 #endif
1582   tso->what_next     = ThreadEnterGHC;
1583
1584   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1585    * protect the increment operation on next_thread_id.
1586    * In future, we could use an atomic increment instead.
1587    */
1588   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1589   tso->id = next_thread_id++; 
1590   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1591
1592   tso->why_blocked  = NotBlocked;
1593   tso->blocked_exceptions = NULL;
1594
1595   tso->stack_size   = stack_size;
1596   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1597                               - TSO_STRUCT_SIZEW;
1598   tso->sp           = (P_)&(tso->stack) + stack_size;
1599
1600 #ifdef PROFILING
1601   tso->prof.CCCS = CCS_MAIN;
1602 #endif
1603
1604   /* put a stop frame on the stack */
1605   tso->sp -= sizeofW(StgStopFrame);
1606   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1607   tso->su = (StgUpdateFrame*)tso->sp;
1608
1609   // ToDo: check this
1610 #if defined(GRAN)
1611   tso->link = END_TSO_QUEUE;
1612   /* uses more flexible routine in GranSim */
1613   insertThread(tso, CurrentProc);
1614 #else
1615   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1616    * from its creation
1617    */
1618 #endif
1619
1620 #if defined(GRAN) 
1621   if (RtsFlags.GranFlags.GranSimStats.Full) 
1622     DumpGranEvent(GR_START,tso);
1623 #elif defined(PAR)
1624   if (RtsFlags.ParFlags.ParStats.Full) 
1625     DumpGranEvent(GR_STARTQ,tso);
1626   /* HACk to avoid SCHEDULE 
1627      LastTSO = tso; */
1628 #endif
1629
1630   /* Link the new thread on the global thread list.
1631    */
1632   tso->global_link = all_threads;
1633   all_threads = tso;
1634
1635 #if defined(DIST)
1636   tso->dist.priority = MandatoryPriority; //by default that is...
1637 #endif
1638
1639 #if defined(GRAN)
1640   tso->gran.pri = pri;
1641 # if defined(DEBUG)
1642   tso->gran.magic = TSO_MAGIC; // debugging only
1643 # endif
1644   tso->gran.sparkname   = 0;
1645   tso->gran.startedat   = CURRENT_TIME; 
1646   tso->gran.exported    = 0;
1647   tso->gran.basicblocks = 0;
1648   tso->gran.allocs      = 0;
1649   tso->gran.exectime    = 0;
1650   tso->gran.fetchtime   = 0;
1651   tso->gran.fetchcount  = 0;
1652   tso->gran.blocktime   = 0;
1653   tso->gran.blockcount  = 0;
1654   tso->gran.blockedat   = 0;
1655   tso->gran.globalsparks = 0;
1656   tso->gran.localsparks  = 0;
1657   if (RtsFlags.GranFlags.Light)
1658     tso->gran.clock  = Now; /* local clock */
1659   else
1660     tso->gran.clock  = 0;
1661
1662   IF_DEBUG(gran,printTSO(tso));
1663 #elif defined(PAR)
1664 # if defined(DEBUG)
1665   tso->par.magic = TSO_MAGIC; // debugging only
1666 # endif
1667   tso->par.sparkname   = 0;
1668   tso->par.startedat   = CURRENT_TIME; 
1669   tso->par.exported    = 0;
1670   tso->par.basicblocks = 0;
1671   tso->par.allocs      = 0;
1672   tso->par.exectime    = 0;
1673   tso->par.fetchtime   = 0;
1674   tso->par.fetchcount  = 0;
1675   tso->par.blocktime   = 0;
1676   tso->par.blockcount  = 0;
1677   tso->par.blockedat   = 0;
1678   tso->par.globalsparks = 0;
1679   tso->par.localsparks  = 0;
1680 #endif
1681
1682 #if defined(GRAN)
1683   globalGranStats.tot_threads_created++;
1684   globalGranStats.threads_created_on_PE[CurrentProc]++;
1685   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1686   globalGranStats.tot_sq_probes++;
1687 #elif defined(PAR)
1688   // collect parallel global statistics (currently done together with GC stats)
1689   if (RtsFlags.ParFlags.ParStats.Global &&
1690       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1691     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1692     globalParStats.tot_threads_created++;
1693   }
1694 #endif 
1695
1696 #if defined(GRAN)
1697   IF_GRAN_DEBUG(pri,
1698                 belch("==__ schedule: Created TSO %d (%p);",
1699                       CurrentProc, tso, tso->id));
1700 #elif defined(PAR)
1701     IF_PAR_DEBUG(verbose,
1702                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1703                        tso->id, tso, advisory_thread_count));
1704 #else
1705   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1706                                  tso->id, tso->stack_size));
1707 #endif    
1708   return tso;
1709 }
1710
1711 #if defined(PAR)
1712 /* RFP:
1713    all parallel thread creation calls should fall through the following routine.
1714 */
1715 StgTSO *
1716 createSparkThread(rtsSpark spark) 
1717 { StgTSO *tso;
1718   ASSERT(spark != (rtsSpark)NULL);
1719   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1720   { threadsIgnored++;
1721     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1722           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1723     return END_TSO_QUEUE;
1724   }
1725   else
1726   { threadsCreated++;
1727     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1728     if (tso==END_TSO_QUEUE)     
1729       barf("createSparkThread: Cannot create TSO");
1730 #if defined(DIST)
1731     tso->priority = AdvisoryPriority;
1732 #endif
1733     pushClosure(tso,spark);
1734     PUSH_ON_RUN_QUEUE(tso);
1735     advisory_thread_count++;    
1736   }
1737   return tso;
1738 }
1739 #endif
1740
1741 /*
1742   Turn a spark into a thread.
1743   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1744 */
1745 #if defined(PAR)
1746 //@cindex activateSpark
1747 StgTSO *
1748 activateSpark (rtsSpark spark) 
1749 {
1750   StgTSO *tso;
1751
1752   tso = createSparkThread(spark);
1753   if (RtsFlags.ParFlags.ParStats.Full) {   
1754     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1755     IF_PAR_DEBUG(verbose,
1756                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1757                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1758   }
1759   // ToDo: fwd info on local/global spark to thread -- HWL
1760   // tso->gran.exported =  spark->exported;
1761   // tso->gran.locked =   !spark->global;
1762   // tso->gran.sparkname = spark->name;
1763
1764   return tso;
1765 }
1766 #endif
1767
1768 /* ---------------------------------------------------------------------------
1769  * scheduleThread()
1770  *
1771  * scheduleThread puts a thread on the head of the runnable queue.
1772  * This will usually be done immediately after a thread is created.
1773  * The caller of scheduleThread must create the thread using e.g.
1774  * createThread and push an appropriate closure
1775  * on this thread's stack before the scheduler is invoked.
1776  * ------------------------------------------------------------------------ */
1777
1778 void
1779 scheduleThread(StgTSO *tso)
1780 {
1781   if (tso==END_TSO_QUEUE){    
1782     schedule();
1783     return;
1784   }
1785
1786   ACQUIRE_LOCK(&sched_mutex);
1787
1788   /* Put the new thread on the head of the runnable queue.  The caller
1789    * better push an appropriate closure on this thread's stack
1790    * beforehand.  In the SMP case, the thread may start running as
1791    * soon as we release the scheduler lock below.
1792    */
1793   PUSH_ON_RUN_QUEUE(tso);
1794   THREAD_RUNNABLE();
1795
1796 #if 0
1797   IF_DEBUG(scheduler,printTSO(tso));
1798 #endif
1799   RELEASE_LOCK(&sched_mutex);
1800 }
1801
1802 /* ---------------------------------------------------------------------------
1803  * startTasks()
1804  *
1805  * Start up Posix threads to run each of the scheduler tasks.
1806  * I believe the task ids are not needed in the system as defined.
1807  *  KH @ 25/10/99
1808  * ------------------------------------------------------------------------ */
1809
1810 #if defined(PAR) || defined(SMP)
1811 void
1812 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1813 {
1814   scheduleThread(END_TSO_QUEUE);
1815 }
1816 #endif
1817
1818 /* ---------------------------------------------------------------------------
1819  * initScheduler()
1820  *
1821  * Initialise the scheduler.  This resets all the queues - if the
1822  * queues contained any threads, they'll be garbage collected at the
1823  * next pass.
1824  *
1825  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1826  * ------------------------------------------------------------------------ */
1827
1828 #ifdef SMP
1829 static void
1830 term_handler(int sig STG_UNUSED)
1831 {
1832   stat_workerStop();
1833   ACQUIRE_LOCK(&term_mutex);
1834   await_death--;
1835   RELEASE_LOCK(&term_mutex);
1836   pthread_exit(NULL);
1837 }
1838 #endif
1839
1840 static void
1841 initCapability( Capability *cap )
1842 {
1843     cap->f.stgChk0         = (F_)__stg_chk_0;
1844     cap->f.stgChk1         = (F_)__stg_chk_1;
1845     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
1846     cap->f.stgUpdatePAP    = (F_)__stg_update_PAP;
1847 }
1848
1849 void 
1850 initScheduler(void)
1851 {
1852 #if defined(GRAN)
1853   nat i;
1854
1855   for (i=0; i<=MAX_PROC; i++) {
1856     run_queue_hds[i]      = END_TSO_QUEUE;
1857     run_queue_tls[i]      = END_TSO_QUEUE;
1858     blocked_queue_hds[i]  = END_TSO_QUEUE;
1859     blocked_queue_tls[i]  = END_TSO_QUEUE;
1860     ccalling_threadss[i]  = END_TSO_QUEUE;
1861     sleeping_queue        = END_TSO_QUEUE;
1862   }
1863 #else
1864   run_queue_hd      = END_TSO_QUEUE;
1865   run_queue_tl      = END_TSO_QUEUE;
1866   blocked_queue_hd  = END_TSO_QUEUE;
1867   blocked_queue_tl  = END_TSO_QUEUE;
1868   sleeping_queue    = END_TSO_QUEUE;
1869 #endif 
1870
1871   suspended_ccalling_threads  = END_TSO_QUEUE;
1872
1873   main_threads = NULL;
1874   all_threads  = END_TSO_QUEUE;
1875
1876   context_switch = 0;
1877   interrupted    = 0;
1878
1879   RtsFlags.ConcFlags.ctxtSwitchTicks =
1880       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1881
1882   /* Install the SIGHUP handler */
1883 #ifdef SMP
1884   {
1885     struct sigaction action,oact;
1886
1887     action.sa_handler = term_handler;
1888     sigemptyset(&action.sa_mask);
1889     action.sa_flags = 0;
1890     if (sigaction(SIGTERM, &action, &oact) != 0) {
1891       barf("can't install TERM handler");
1892     }
1893   }
1894 #endif
1895
1896 #ifdef SMP
1897   /* Allocate N Capabilities */
1898   {
1899     nat i;
1900     Capability *cap, *prev;
1901     cap  = NULL;
1902     prev = NULL;
1903     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1904       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1905       initCapability(cap);
1906       cap->link = prev;
1907       prev = cap;
1908     }
1909     free_capabilities = cap;
1910     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1911   }
1912   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1913                              n_free_capabilities););
1914 #else
1915   initCapability(&MainCapability);
1916 #endif
1917
1918 #if defined(SMP) || defined(PAR)
1919   initSparkPools();
1920 #endif
1921 }
1922
1923 #ifdef SMP
1924 void
1925 startTasks( void )
1926 {
1927   nat i;
1928   int r;
1929   pthread_t tid;
1930   
1931   /* make some space for saving all the thread ids */
1932   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1933                             "initScheduler:task_ids");
1934   
1935   /* and create all the threads */
1936   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1937     r = pthread_create(&tid,NULL,taskStart,NULL);
1938     if (r != 0) {
1939       barf("startTasks: Can't create new Posix thread");
1940     }
1941     task_ids[i].id = tid;
1942     task_ids[i].mut_time = 0.0;
1943     task_ids[i].mut_etime = 0.0;
1944     task_ids[i].gc_time = 0.0;
1945     task_ids[i].gc_etime = 0.0;
1946     task_ids[i].elapsedtimestart = elapsedtime();
1947     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1948   }
1949 }
1950 #endif
1951
1952 void
1953 exitScheduler( void )
1954 {
1955 #ifdef SMP
1956   nat i;
1957
1958   /* Don't want to use pthread_cancel, since we'd have to install
1959    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1960    * all our locks.
1961    */
1962 #if 0
1963   /* Cancel all our tasks */
1964   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1965     pthread_cancel(task_ids[i].id);
1966   }
1967   
1968   /* Wait for all the tasks to terminate */
1969   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1970     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1971                                task_ids[i].id));
1972     pthread_join(task_ids[i].id, NULL);
1973   }
1974 #endif
1975
1976   /* Send 'em all a SIGHUP.  That should shut 'em up.
1977    */
1978   await_death = RtsFlags.ParFlags.nNodes;
1979   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1980     pthread_kill(task_ids[i].id,SIGTERM);
1981   }
1982   while (await_death > 0) {
1983     sched_yield();
1984   }
1985 #endif
1986 }
1987
1988 /* -----------------------------------------------------------------------------
1989    Managing the per-task allocation areas.
1990    
1991    Each capability comes with an allocation area.  These are
1992    fixed-length block lists into which allocation can be done.
1993
1994    ToDo: no support for two-space collection at the moment???
1995    -------------------------------------------------------------------------- */
1996
1997 /* -----------------------------------------------------------------------------
1998  * waitThread is the external interface for running a new computation
1999  * and waiting for the result.
2000  *
2001  * In the non-SMP case, we create a new main thread, push it on the 
2002  * main-thread stack, and invoke the scheduler to run it.  The
2003  * scheduler will return when the top main thread on the stack has
2004  * completed or died, and fill in the necessary fields of the
2005  * main_thread structure.
2006  *
2007  * In the SMP case, we create a main thread as before, but we then
2008  * create a new condition variable and sleep on it.  When our new
2009  * main thread has completed, we'll be woken up and the status/result
2010  * will be in the main_thread struct.
2011  * -------------------------------------------------------------------------- */
2012
2013 int 
2014 howManyThreadsAvail ( void )
2015 {
2016    int i = 0;
2017    StgTSO* q;
2018    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2019       i++;
2020    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2021       i++;
2022    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2023       i++;
2024    return i;
2025 }
2026
2027 void
2028 finishAllThreads ( void )
2029 {
2030    do {
2031       while (run_queue_hd != END_TSO_QUEUE) {
2032          waitThread ( run_queue_hd, NULL );
2033       }
2034       while (blocked_queue_hd != END_TSO_QUEUE) {
2035          waitThread ( blocked_queue_hd, NULL );
2036       }
2037       while (sleeping_queue != END_TSO_QUEUE) {
2038          waitThread ( blocked_queue_hd, NULL );
2039       }
2040    } while 
2041       (blocked_queue_hd != END_TSO_QUEUE || 
2042        run_queue_hd     != END_TSO_QUEUE ||
2043        sleeping_queue   != END_TSO_QUEUE);
2044 }
2045
2046 SchedulerStatus
2047 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2048 {
2049   StgMainThread *m;
2050   SchedulerStatus stat;
2051
2052   ACQUIRE_LOCK(&sched_mutex);
2053   
2054   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2055
2056   m->tso = tso;
2057   m->ret = ret;
2058   m->stat = NoStatus;
2059 #ifdef SMP
2060   pthread_cond_init(&m->wakeup, NULL);
2061 #endif
2062
2063   m->link = main_threads;
2064   main_threads = m;
2065
2066   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2067                               m->tso->id));
2068
2069 #ifdef SMP
2070   do {
2071     pthread_cond_wait(&m->wakeup, &sched_mutex);
2072   } while (m->stat == NoStatus);
2073 #elif defined(GRAN)
2074   /* GranSim specific init */
2075   CurrentTSO = m->tso;                // the TSO to run
2076   procStatus[MainProc] = Busy;        // status of main PE
2077   CurrentProc = MainProc;             // PE to run it on
2078
2079   schedule();
2080 #else
2081   schedule();
2082   ASSERT(m->stat != NoStatus);
2083 #endif
2084
2085   stat = m->stat;
2086
2087 #ifdef SMP
2088   pthread_cond_destroy(&m->wakeup);
2089 #endif
2090
2091   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2092                               m->tso->id));
2093   free(m);
2094
2095   RELEASE_LOCK(&sched_mutex);
2096
2097   return stat;
2098 }
2099
2100 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2101 //@subsection Run queue code 
2102
2103 #if 0
2104 /* 
2105    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2106        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2107        implicit global variable that has to be correct when calling these
2108        fcts -- HWL 
2109 */
2110
2111 /* Put the new thread on the head of the runnable queue.
2112  * The caller of createThread better push an appropriate closure
2113  * on this thread's stack before the scheduler is invoked.
2114  */
2115 static /* inline */ void
2116 add_to_run_queue(tso)
2117 StgTSO* tso; 
2118 {
2119   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2120   tso->link = run_queue_hd;
2121   run_queue_hd = tso;
2122   if (run_queue_tl == END_TSO_QUEUE) {
2123     run_queue_tl = tso;
2124   }
2125 }
2126
2127 /* Put the new thread at the end of the runnable queue. */
2128 static /* inline */ void
2129 push_on_run_queue(tso)
2130 StgTSO* tso; 
2131 {
2132   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2133   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2134   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2135   if (run_queue_hd == END_TSO_QUEUE) {
2136     run_queue_hd = tso;
2137   } else {
2138     run_queue_tl->link = tso;
2139   }
2140   run_queue_tl = tso;
2141 }
2142
2143 /* 
2144    Should be inlined because it's used very often in schedule.  The tso
2145    argument is actually only needed in GranSim, where we want to have the
2146    possibility to schedule *any* TSO on the run queue, irrespective of the
2147    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2148    the run queue and dequeue the tso, adjusting the links in the queue. 
2149 */
2150 //@cindex take_off_run_queue
2151 static /* inline */ StgTSO*
2152 take_off_run_queue(StgTSO *tso) {
2153   StgTSO *t, *prev;
2154
2155   /* 
2156      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2157
2158      if tso is specified, unlink that tso from the run_queue (doesn't have
2159      to be at the beginning of the queue); GranSim only 
2160   */
2161   if (tso!=END_TSO_QUEUE) {
2162     /* find tso in queue */
2163     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2164          t!=END_TSO_QUEUE && t!=tso;
2165          prev=t, t=t->link) 
2166       /* nothing */ ;
2167     ASSERT(t==tso);
2168     /* now actually dequeue the tso */
2169     if (prev!=END_TSO_QUEUE) {
2170       ASSERT(run_queue_hd!=t);
2171       prev->link = t->link;
2172     } else {
2173       /* t is at beginning of thread queue */
2174       ASSERT(run_queue_hd==t);
2175       run_queue_hd = t->link;
2176     }
2177     /* t is at end of thread queue */
2178     if (t->link==END_TSO_QUEUE) {
2179       ASSERT(t==run_queue_tl);
2180       run_queue_tl = prev;
2181     } else {
2182       ASSERT(run_queue_tl!=t);
2183     }
2184     t->link = END_TSO_QUEUE;
2185   } else {
2186     /* take tso from the beginning of the queue; std concurrent code */
2187     t = run_queue_hd;
2188     if (t != END_TSO_QUEUE) {
2189       run_queue_hd = t->link;
2190       t->link = END_TSO_QUEUE;
2191       if (run_queue_hd == END_TSO_QUEUE) {
2192         run_queue_tl = END_TSO_QUEUE;
2193       }
2194     }
2195   }
2196   return t;
2197 }
2198
2199 #endif /* 0 */
2200
2201 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2202 //@subsection Garbage Collextion Routines
2203
2204 /* ---------------------------------------------------------------------------
2205    Where are the roots that we know about?
2206
2207         - all the threads on the runnable queue
2208         - all the threads on the blocked queue
2209         - all the threads on the sleeping queue
2210         - all the thread currently executing a _ccall_GC
2211         - all the "main threads"
2212      
2213    ------------------------------------------------------------------------ */
2214
2215 /* This has to be protected either by the scheduler monitor, or by the
2216         garbage collection monitor (probably the latter).
2217         KH @ 25/10/99
2218 */
2219
2220 void
2221 GetRoots(evac_fn evac)
2222 {
2223   StgMainThread *m;
2224
2225 #if defined(GRAN)
2226   {
2227     nat i;
2228     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2229       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2230           evac((StgClosure **)&run_queue_hds[i]);
2231       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2232           evac((StgClosure **)&run_queue_tls[i]);
2233       
2234       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2235           evac((StgClosure **)&blocked_queue_hds[i]);
2236       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2237           evac((StgClosure **)&blocked_queue_tls[i]);
2238       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2239           evac((StgClosure **)&ccalling_threads[i]);
2240     }
2241   }
2242
2243   markEventQueue();
2244
2245 #else /* !GRAN */
2246   if (run_queue_hd != END_TSO_QUEUE) {
2247       ASSERT(run_queue_tl != END_TSO_QUEUE);
2248       evac((StgClosure **)&run_queue_hd);
2249       evac((StgClosure **)&run_queue_tl);
2250   }
2251   
2252   if (blocked_queue_hd != END_TSO_QUEUE) {
2253       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2254       evac((StgClosure **)&blocked_queue_hd);
2255       evac((StgClosure **)&blocked_queue_tl);
2256   }
2257   
2258   if (sleeping_queue != END_TSO_QUEUE) {
2259       evac((StgClosure **)&sleeping_queue);
2260   }
2261 #endif 
2262
2263   for (m = main_threads; m != NULL; m = m->link) {
2264       evac((StgClosure **)&m->tso);
2265   }
2266   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2267       evac((StgClosure **)&suspended_ccalling_threads);
2268   }
2269
2270 #if defined(SMP) || defined(PAR) || defined(GRAN)
2271   markSparkQueue(evac);
2272 #endif
2273 }
2274
2275 /* -----------------------------------------------------------------------------
2276    performGC
2277
2278    This is the interface to the garbage collector from Haskell land.
2279    We provide this so that external C code can allocate and garbage
2280    collect when called from Haskell via _ccall_GC.
2281
2282    It might be useful to provide an interface whereby the programmer
2283    can specify more roots (ToDo).
2284    
2285    This needs to be protected by the GC condition variable above.  KH.
2286    -------------------------------------------------------------------------- */
2287
2288 void (*extra_roots)(evac_fn);
2289
2290 void
2291 performGC(void)
2292 {
2293   GarbageCollect(GetRoots,rtsFalse);
2294 }
2295
2296 void
2297 performMajorGC(void)
2298 {
2299   GarbageCollect(GetRoots,rtsTrue);
2300 }
2301
2302 static void
2303 AllRoots(evac_fn evac)
2304 {
2305     GetRoots(evac);             // the scheduler's roots
2306     extra_roots(evac);          // the user's roots
2307 }
2308
2309 void
2310 performGCWithRoots(void (*get_roots)(evac_fn))
2311 {
2312   extra_roots = get_roots;
2313   GarbageCollect(AllRoots,rtsFalse);
2314 }
2315
2316 /* -----------------------------------------------------------------------------
2317    Stack overflow
2318
2319    If the thread has reached its maximum stack size, then raise the
2320    StackOverflow exception in the offending thread.  Otherwise
2321    relocate the TSO into a larger chunk of memory and adjust its stack
2322    size appropriately.
2323    -------------------------------------------------------------------------- */
2324
2325 static StgTSO *
2326 threadStackOverflow(StgTSO *tso)
2327 {
2328   nat new_stack_size, new_tso_size, diff, stack_words;
2329   StgPtr new_sp;
2330   StgTSO *dest;
2331
2332   IF_DEBUG(sanity,checkTSO(tso));
2333   if (tso->stack_size >= tso->max_stack_size) {
2334
2335     IF_DEBUG(gc,
2336              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2337                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2338              /* If we're debugging, just print out the top of the stack */
2339              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2340                                               tso->sp+64)));
2341
2342     /* Send this thread the StackOverflow exception */
2343     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2344     return tso;
2345   }
2346
2347   /* Try to double the current stack size.  If that takes us over the
2348    * maximum stack size for this thread, then use the maximum instead.
2349    * Finally round up so the TSO ends up as a whole number of blocks.
2350    */
2351   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2352   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2353                                        TSO_STRUCT_SIZE)/sizeof(W_);
2354   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2355   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2356
2357   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2358
2359   dest = (StgTSO *)allocate(new_tso_size);
2360   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2361
2362   /* copy the TSO block and the old stack into the new area */
2363   memcpy(dest,tso,TSO_STRUCT_SIZE);
2364   stack_words = tso->stack + tso->stack_size - tso->sp;
2365   new_sp = (P_)dest + new_tso_size - stack_words;
2366   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2367
2368   /* relocate the stack pointers... */
2369   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2370   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2371   dest->sp    = new_sp;
2372   dest->stack_size = new_stack_size;
2373         
2374   /* and relocate the update frame list */
2375   relocate_stack(dest, diff);
2376
2377   /* Mark the old TSO as relocated.  We have to check for relocated
2378    * TSOs in the garbage collector and any primops that deal with TSOs.
2379    *
2380    * It's important to set the sp and su values to just beyond the end
2381    * of the stack, so we don't attempt to scavenge any part of the
2382    * dead TSO's stack.
2383    */
2384   tso->what_next = ThreadRelocated;
2385   tso->link = dest;
2386   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2387   tso->su = (StgUpdateFrame *)tso->sp;
2388   tso->why_blocked = NotBlocked;
2389   dest->mut_link = NULL;
2390
2391   IF_PAR_DEBUG(verbose,
2392                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2393                      tso->id, tso, tso->stack_size);
2394                /* If we're debugging, just print out the top of the stack */
2395                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2396                                                 tso->sp+64)));
2397   
2398   IF_DEBUG(sanity,checkTSO(tso));
2399 #if 0
2400   IF_DEBUG(scheduler,printTSO(dest));
2401 #endif
2402
2403   return dest;
2404 }
2405
2406 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2407 //@subsection Blocking Queue Routines
2408
2409 /* ---------------------------------------------------------------------------
2410    Wake up a queue that was blocked on some resource.
2411    ------------------------------------------------------------------------ */
2412
2413 #if defined(GRAN)
2414 static inline void
2415 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2416 {
2417 }
2418 #elif defined(PAR)
2419 static inline void
2420 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2421 {
2422   /* write RESUME events to log file and
2423      update blocked and fetch time (depending on type of the orig closure) */
2424   if (RtsFlags.ParFlags.ParStats.Full) {
2425     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2426                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2427                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2428     if (EMPTY_RUN_QUEUE())
2429       emitSchedule = rtsTrue;
2430
2431     switch (get_itbl(node)->type) {
2432         case FETCH_ME_BQ:
2433           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2434           break;
2435         case RBH:
2436         case FETCH_ME:
2437         case BLACKHOLE_BQ:
2438           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2439           break;
2440 #ifdef DIST
2441         case MVAR:
2442           break;
2443 #endif    
2444         default:
2445           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2446         }
2447       }
2448 }
2449 #endif
2450
2451 #if defined(GRAN)
2452 static StgBlockingQueueElement *
2453 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2454 {
2455     StgTSO *tso;
2456     PEs node_loc, tso_loc;
2457
2458     node_loc = where_is(node); // should be lifted out of loop
2459     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2460     tso_loc = where_is((StgClosure *)tso);
2461     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2462       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2463       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2464       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2465       // insertThread(tso, node_loc);
2466       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2467                 ResumeThread,
2468                 tso, node, (rtsSpark*)NULL);
2469       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2470       // len_local++;
2471       // len++;
2472     } else { // TSO is remote (actually should be FMBQ)
2473       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2474                                   RtsFlags.GranFlags.Costs.gunblocktime +
2475                                   RtsFlags.GranFlags.Costs.latency;
2476       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2477                 UnblockThread,
2478                 tso, node, (rtsSpark*)NULL);
2479       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2480       // len++;
2481     }
2482     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2483     IF_GRAN_DEBUG(bq,
2484                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2485                           (node_loc==tso_loc ? "Local" : "Global"), 
2486                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2487     tso->block_info.closure = NULL;
2488     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2489                              tso->id, tso));
2490 }
2491 #elif defined(PAR)
2492 static StgBlockingQueueElement *
2493 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2494 {
2495     StgBlockingQueueElement *next;
2496
2497     switch (get_itbl(bqe)->type) {
2498     case TSO:
2499       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2500       /* if it's a TSO just push it onto the run_queue */
2501       next = bqe->link;
2502       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2503       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2504       THREAD_RUNNABLE();
2505       unblockCount(bqe, node);
2506       /* reset blocking status after dumping event */
2507       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2508       break;
2509
2510     case BLOCKED_FETCH:
2511       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2512       next = bqe->link;
2513       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2514       PendingFetches = (StgBlockedFetch *)bqe;
2515       break;
2516
2517 # if defined(DEBUG)
2518       /* can ignore this case in a non-debugging setup; 
2519          see comments on RBHSave closures above */
2520     case CONSTR:
2521       /* check that the closure is an RBHSave closure */
2522       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2523              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2524              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2525       break;
2526
2527     default:
2528       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2529            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2530            (StgClosure *)bqe);
2531 # endif
2532     }
2533   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2534   return next;
2535 }
2536
2537 #else /* !GRAN && !PAR */
2538 static StgTSO *
2539 unblockOneLocked(StgTSO *tso)
2540 {
2541   StgTSO *next;
2542
2543   ASSERT(get_itbl(tso)->type == TSO);
2544   ASSERT(tso->why_blocked != NotBlocked);
2545   tso->why_blocked = NotBlocked;
2546   next = tso->link;
2547   PUSH_ON_RUN_QUEUE(tso);
2548   THREAD_RUNNABLE();
2549   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2550   return next;
2551 }
2552 #endif
2553
2554 #if defined(GRAN) || defined(PAR)
2555 inline StgBlockingQueueElement *
2556 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2557 {
2558   ACQUIRE_LOCK(&sched_mutex);
2559   bqe = unblockOneLocked(bqe, node);
2560   RELEASE_LOCK(&sched_mutex);
2561   return bqe;
2562 }
2563 #else
2564 inline StgTSO *
2565 unblockOne(StgTSO *tso)
2566 {
2567   ACQUIRE_LOCK(&sched_mutex);
2568   tso = unblockOneLocked(tso);
2569   RELEASE_LOCK(&sched_mutex);
2570   return tso;
2571 }
2572 #endif
2573
2574 #if defined(GRAN)
2575 void 
2576 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2577 {
2578   StgBlockingQueueElement *bqe;
2579   PEs node_loc;
2580   nat len = 0; 
2581
2582   IF_GRAN_DEBUG(bq, 
2583                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2584                       node, CurrentProc, CurrentTime[CurrentProc], 
2585                       CurrentTSO->id, CurrentTSO));
2586
2587   node_loc = where_is(node);
2588
2589   ASSERT(q == END_BQ_QUEUE ||
2590          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2591          get_itbl(q)->type == CONSTR); // closure (type constructor)
2592   ASSERT(is_unique(node));
2593
2594   /* FAKE FETCH: magically copy the node to the tso's proc;
2595      no Fetch necessary because in reality the node should not have been 
2596      moved to the other PE in the first place
2597   */
2598   if (CurrentProc!=node_loc) {
2599     IF_GRAN_DEBUG(bq, 
2600                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2601                         node, node_loc, CurrentProc, CurrentTSO->id, 
2602                         // CurrentTSO, where_is(CurrentTSO),
2603                         node->header.gran.procs));
2604     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2605     IF_GRAN_DEBUG(bq, 
2606                   belch("## new bitmask of node %p is %#x",
2607                         node, node->header.gran.procs));
2608     if (RtsFlags.GranFlags.GranSimStats.Global) {
2609       globalGranStats.tot_fake_fetches++;
2610     }
2611   }
2612
2613   bqe = q;
2614   // ToDo: check: ASSERT(CurrentProc==node_loc);
2615   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2616     //next = bqe->link;
2617     /* 
2618        bqe points to the current element in the queue
2619        next points to the next element in the queue
2620     */
2621     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2622     //tso_loc = where_is(tso);
2623     len++;
2624     bqe = unblockOneLocked(bqe, node);
2625   }
2626
2627   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2628      the closure to make room for the anchor of the BQ */
2629   if (bqe!=END_BQ_QUEUE) {
2630     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2631     /*
2632     ASSERT((info_ptr==&RBH_Save_0_info) ||
2633            (info_ptr==&RBH_Save_1_info) ||
2634            (info_ptr==&RBH_Save_2_info));
2635     */
2636     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2637     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2638     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2639
2640     IF_GRAN_DEBUG(bq,
2641                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2642                         node, info_type(node)));
2643   }
2644
2645   /* statistics gathering */
2646   if (RtsFlags.GranFlags.GranSimStats.Global) {
2647     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2648     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2649     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2650     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2651   }
2652   IF_GRAN_DEBUG(bq,
2653                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2654                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2655 }
2656 #elif defined(PAR)
2657 void 
2658 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2659 {
2660   StgBlockingQueueElement *bqe;
2661
2662   ACQUIRE_LOCK(&sched_mutex);
2663
2664   IF_PAR_DEBUG(verbose, 
2665                belch("##-_ AwBQ for node %p on [%x]: ",
2666                      node, mytid));
2667 #ifdef DIST  
2668   //RFP
2669   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2670     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2671     return;
2672   }
2673 #endif
2674   
2675   ASSERT(q == END_BQ_QUEUE ||
2676          get_itbl(q)->type == TSO ||           
2677          get_itbl(q)->type == BLOCKED_FETCH || 
2678          get_itbl(q)->type == CONSTR); 
2679
2680   bqe = q;
2681   while (get_itbl(bqe)->type==TSO || 
2682          get_itbl(bqe)->type==BLOCKED_FETCH) {
2683     bqe = unblockOneLocked(bqe, node);
2684   }
2685   RELEASE_LOCK(&sched_mutex);
2686 }
2687
2688 #else   /* !GRAN && !PAR */
2689 void
2690 awakenBlockedQueue(StgTSO *tso)
2691 {
2692   ACQUIRE_LOCK(&sched_mutex);
2693   while (tso != END_TSO_QUEUE) {
2694     tso = unblockOneLocked(tso);
2695   }
2696   RELEASE_LOCK(&sched_mutex);
2697 }
2698 #endif
2699
2700 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2701 //@subsection Exception Handling Routines
2702
2703 /* ---------------------------------------------------------------------------
2704    Interrupt execution
2705    - usually called inside a signal handler so it mustn't do anything fancy.   
2706    ------------------------------------------------------------------------ */
2707
2708 void
2709 interruptStgRts(void)
2710 {
2711     interrupted    = 1;
2712     context_switch = 1;
2713 }
2714
2715 /* -----------------------------------------------------------------------------
2716    Unblock a thread
2717
2718    This is for use when we raise an exception in another thread, which
2719    may be blocked.
2720    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2721    -------------------------------------------------------------------------- */
2722
2723 #if defined(GRAN) || defined(PAR)
2724 /*
2725   NB: only the type of the blocking queue is different in GranSim and GUM
2726       the operations on the queue-elements are the same
2727       long live polymorphism!
2728 */
2729 static void
2730 unblockThread(StgTSO *tso)
2731 {
2732   StgBlockingQueueElement *t, **last;
2733
2734   ACQUIRE_LOCK(&sched_mutex);
2735   switch (tso->why_blocked) {
2736
2737   case NotBlocked:
2738     return;  /* not blocked */
2739
2740   case BlockedOnMVar:
2741     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2742     {
2743       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2744       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2745
2746       last = (StgBlockingQueueElement **)&mvar->head;
2747       for (t = (StgBlockingQueueElement *)mvar->head; 
2748            t != END_BQ_QUEUE; 
2749            last = &t->link, last_tso = t, t = t->link) {
2750         if (t == (StgBlockingQueueElement *)tso) {
2751           *last = (StgBlockingQueueElement *)tso->link;
2752           if (mvar->tail == tso) {
2753             mvar->tail = (StgTSO *)last_tso;
2754           }
2755           goto done;
2756         }
2757       }
2758       barf("unblockThread (MVAR): TSO not found");
2759     }
2760
2761   case BlockedOnBlackHole:
2762     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2763     {
2764       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2765
2766       last = &bq->blocking_queue;
2767       for (t = bq->blocking_queue; 
2768            t != END_BQ_QUEUE; 
2769            last = &t->link, t = t->link) {
2770         if (t == (StgBlockingQueueElement *)tso) {
2771           *last = (StgBlockingQueueElement *)tso->link;
2772           goto done;
2773         }
2774       }
2775       barf("unblockThread (BLACKHOLE): TSO not found");
2776     }
2777
2778   case BlockedOnException:
2779     {
2780       StgTSO *target  = tso->block_info.tso;
2781
2782       ASSERT(get_itbl(target)->type == TSO);
2783
2784       if (target->what_next == ThreadRelocated) {
2785           target = target->link;
2786           ASSERT(get_itbl(target)->type == TSO);
2787       }
2788
2789       ASSERT(target->blocked_exceptions != NULL);
2790
2791       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2792       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2793            t != END_BQ_QUEUE; 
2794            last = &t->link, t = t->link) {
2795         ASSERT(get_itbl(t)->type == TSO);
2796         if (t == (StgBlockingQueueElement *)tso) {
2797           *last = (StgBlockingQueueElement *)tso->link;
2798           goto done;
2799         }
2800       }
2801       barf("unblockThread (Exception): TSO not found");
2802     }
2803
2804   case BlockedOnRead:
2805   case BlockedOnWrite:
2806     {
2807       /* take TSO off blocked_queue */
2808       StgBlockingQueueElement *prev = NULL;
2809       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2810            prev = t, t = t->link) {
2811         if (t == (StgBlockingQueueElement *)tso) {
2812           if (prev == NULL) {
2813             blocked_queue_hd = (StgTSO *)t->link;
2814             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2815               blocked_queue_tl = END_TSO_QUEUE;
2816             }
2817           } else {
2818             prev->link = t->link;
2819             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2820               blocked_queue_tl = (StgTSO *)prev;
2821             }
2822           }
2823           goto done;
2824         }
2825       }
2826       barf("unblockThread (I/O): TSO not found");
2827     }
2828
2829   case BlockedOnDelay:
2830     {
2831       /* take TSO off sleeping_queue */
2832       StgBlockingQueueElement *prev = NULL;
2833       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2834            prev = t, t = t->link) {
2835         if (t == (StgBlockingQueueElement *)tso) {
2836           if (prev == NULL) {
2837             sleeping_queue = (StgTSO *)t->link;
2838           } else {
2839             prev->link = t->link;
2840           }
2841           goto done;
2842         }
2843       }
2844       barf("unblockThread (I/O): TSO not found");
2845     }
2846
2847   default:
2848     barf("unblockThread");
2849   }
2850
2851  done:
2852   tso->link = END_TSO_QUEUE;
2853   tso->why_blocked = NotBlocked;
2854   tso->block_info.closure = NULL;
2855   PUSH_ON_RUN_QUEUE(tso);
2856   RELEASE_LOCK(&sched_mutex);
2857 }
2858 #else
2859 static void
2860 unblockThread(StgTSO *tso)
2861 {
2862   StgTSO *t, **last;
2863
2864   ACQUIRE_LOCK(&sched_mutex);
2865   switch (tso->why_blocked) {
2866
2867   case NotBlocked:
2868     return;  /* not blocked */
2869
2870   case BlockedOnMVar:
2871     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2872     {
2873       StgTSO *last_tso = END_TSO_QUEUE;
2874       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2875
2876       last = &mvar->head;
2877       for (t = mvar->head; t != END_TSO_QUEUE; 
2878            last = &t->link, last_tso = t, t = t->link) {
2879         if (t == tso) {
2880           *last = tso->link;
2881           if (mvar->tail == tso) {
2882             mvar->tail = last_tso;
2883           }
2884           goto done;
2885         }
2886       }
2887       barf("unblockThread (MVAR): TSO not found");
2888     }
2889
2890   case BlockedOnBlackHole:
2891     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2892     {
2893       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2894
2895       last = &bq->blocking_queue;
2896       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2897            last = &t->link, t = t->link) {
2898         if (t == tso) {
2899           *last = tso->link;
2900           goto done;
2901         }
2902       }
2903       barf("unblockThread (BLACKHOLE): TSO not found");
2904     }
2905
2906   case BlockedOnException:
2907     {
2908       StgTSO *target  = tso->block_info.tso;
2909
2910       ASSERT(get_itbl(target)->type == TSO);
2911
2912       while (target->what_next == ThreadRelocated) {
2913           target = target->link;
2914           ASSERT(get_itbl(target)->type == TSO);
2915       }
2916       
2917       ASSERT(target->blocked_exceptions != NULL);
2918
2919       last = &target->blocked_exceptions;
2920       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2921            last = &t->link, t = t->link) {
2922         ASSERT(get_itbl(t)->type == TSO);
2923         if (t == tso) {
2924           *last = tso->link;
2925           goto done;
2926         }
2927       }
2928       barf("unblockThread (Exception): TSO not found");
2929     }
2930
2931   case BlockedOnRead:
2932   case BlockedOnWrite:
2933     {
2934       StgTSO *prev = NULL;
2935       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2936            prev = t, t = t->link) {
2937         if (t == tso) {
2938           if (prev == NULL) {
2939             blocked_queue_hd = t->link;
2940             if (blocked_queue_tl == t) {
2941               blocked_queue_tl = END_TSO_QUEUE;
2942             }
2943           } else {
2944             prev->link = t->link;
2945             if (blocked_queue_tl == t) {
2946               blocked_queue_tl = prev;
2947             }
2948           }
2949           goto done;
2950         }
2951       }
2952       barf("unblockThread (I/O): TSO not found");
2953     }
2954
2955   case BlockedOnDelay:
2956     {
2957       StgTSO *prev = NULL;
2958       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2959            prev = t, t = t->link) {
2960         if (t == tso) {
2961           if (prev == NULL) {
2962             sleeping_queue = t->link;
2963           } else {
2964             prev->link = t->link;
2965           }
2966           goto done;
2967         }
2968       }
2969       barf("unblockThread (I/O): TSO not found");
2970     }
2971
2972   default:
2973     barf("unblockThread");
2974   }
2975
2976  done:
2977   tso->link = END_TSO_QUEUE;
2978   tso->why_blocked = NotBlocked;
2979   tso->block_info.closure = NULL;
2980   PUSH_ON_RUN_QUEUE(tso);
2981   RELEASE_LOCK(&sched_mutex);
2982 }
2983 #endif
2984
2985 /* -----------------------------------------------------------------------------
2986  * raiseAsync()
2987  *
2988  * The following function implements the magic for raising an
2989  * asynchronous exception in an existing thread.
2990  *
2991  * We first remove the thread from any queue on which it might be
2992  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2993  *
2994  * We strip the stack down to the innermost CATCH_FRAME, building
2995  * thunks in the heap for all the active computations, so they can 
2996  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2997  * an application of the handler to the exception, and push it on
2998  * the top of the stack.
2999  * 
3000  * How exactly do we save all the active computations?  We create an
3001  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3002  * AP_UPDs pushes everything from the corresponding update frame
3003  * upwards onto the stack.  (Actually, it pushes everything up to the
3004  * next update frame plus a pointer to the next AP_UPD object.
3005  * Entering the next AP_UPD object pushes more onto the stack until we
3006  * reach the last AP_UPD object - at which point the stack should look
3007  * exactly as it did when we killed the TSO and we can continue
3008  * execution by entering the closure on top of the stack.
3009  *
3010  * We can also kill a thread entirely - this happens if either (a) the 
3011  * exception passed to raiseAsync is NULL, or (b) there's no
3012  * CATCH_FRAME on the stack.  In either case, we strip the entire
3013  * stack and replace the thread with a zombie.
3014  *
3015  * -------------------------------------------------------------------------- */
3016  
3017 void 
3018 deleteThread(StgTSO *tso)
3019 {
3020   raiseAsync(tso,NULL);
3021 }
3022
3023 void
3024 raiseAsync(StgTSO *tso, StgClosure *exception)
3025 {
3026   StgUpdateFrame* su = tso->su;
3027   StgPtr          sp = tso->sp;
3028   
3029   /* Thread already dead? */
3030   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3031     return;
3032   }
3033
3034   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3035
3036   /* Remove it from any blocking queues */
3037   unblockThread(tso);
3038
3039   /* The stack freezing code assumes there's a closure pointer on
3040    * the top of the stack.  This isn't always the case with compiled
3041    * code, so we have to push a dummy closure on the top which just
3042    * returns to the next return address on the stack.
3043    */
3044   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3045     *(--sp) = (W_)&stg_dummy_ret_closure;
3046   }
3047
3048   while (1) {
3049     nat words = ((P_)su - (P_)sp) - 1;
3050     nat i;
3051     StgAP_UPD * ap;
3052
3053     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3054      * then build PAP(handler,exception,realworld#), and leave it on
3055      * top of the stack ready to enter.
3056      */
3057     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3058       StgCatchFrame *cf = (StgCatchFrame *)su;
3059       /* we've got an exception to raise, so let's pass it to the
3060        * handler in this frame.
3061        */
3062       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3063       TICK_ALLOC_UPD_PAP(3,0);
3064       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3065               
3066       ap->n_args = 2;
3067       ap->fun = cf->handler;    /* :: Exception -> IO a */
3068       ap->payload[0] = exception;
3069       ap->payload[1] = ARG_TAG(0); /* realworld token */
3070
3071       /* throw away the stack from Sp up to and including the
3072        * CATCH_FRAME.
3073        */
3074       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3075       tso->su = cf->link;
3076
3077       /* Restore the blocked/unblocked state for asynchronous exceptions
3078        * at the CATCH_FRAME.  
3079        *
3080        * If exceptions were unblocked at the catch, arrange that they
3081        * are unblocked again after executing the handler by pushing an
3082        * unblockAsyncExceptions_ret stack frame.
3083        */
3084       if (!cf->exceptions_blocked) {
3085         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3086       }
3087       
3088       /* Ensure that async exceptions are blocked when running the handler.
3089        */
3090       if (tso->blocked_exceptions == NULL) {
3091         tso->blocked_exceptions = END_TSO_QUEUE;
3092       }
3093       
3094       /* Put the newly-built PAP on top of the stack, ready to execute
3095        * when the thread restarts.
3096        */
3097       sp[0] = (W_)ap;
3098       tso->sp = sp;
3099       tso->what_next = ThreadEnterGHC;
3100       IF_DEBUG(sanity, checkTSO(tso));
3101       return;
3102     }
3103
3104     /* First build an AP_UPD consisting of the stack chunk above the
3105      * current update frame, with the top word on the stack as the
3106      * fun field.
3107      */
3108     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3109     
3110     ASSERT(words >= 0);
3111     
3112     ap->n_args = words;
3113     ap->fun    = (StgClosure *)sp[0];
3114     sp++;
3115     for(i=0; i < (nat)words; ++i) {
3116       ap->payload[i] = (StgClosure *)*sp++;
3117     }
3118     
3119     switch (get_itbl(su)->type) {
3120       
3121     case UPDATE_FRAME:
3122       {
3123         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3124         TICK_ALLOC_UP_THK(words+1,0);
3125         
3126         IF_DEBUG(scheduler,
3127                  fprintf(stderr,  "scheduler: Updating ");
3128                  printPtr((P_)su->updatee); 
3129                  fprintf(stderr,  " with ");
3130                  printObj((StgClosure *)ap);
3131                  );
3132         
3133         /* Replace the updatee with an indirection - happily
3134          * this will also wake up any threads currently
3135          * waiting on the result.
3136          *
3137          * Warning: if we're in a loop, more than one update frame on
3138          * the stack may point to the same object.  Be careful not to
3139          * overwrite an IND_OLDGEN in this case, because we'll screw
3140          * up the mutable lists.  To be on the safe side, don't
3141          * overwrite any kind of indirection at all.  See also
3142          * threadSqueezeStack in GC.c, where we have to make a similar
3143          * check.
3144          */
3145         if (!closure_IND(su->updatee)) {
3146             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3147         }
3148         su = su->link;
3149         sp += sizeofW(StgUpdateFrame) -1;
3150         sp[0] = (W_)ap; /* push onto stack */
3151         break;
3152       }
3153
3154     case CATCH_FRAME:
3155       {
3156         StgCatchFrame *cf = (StgCatchFrame *)su;
3157         StgClosure* o;
3158         
3159         /* We want a PAP, not an AP_UPD.  Fortunately, the
3160          * layout's the same.
3161          */
3162         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3163         TICK_ALLOC_UPD_PAP(words+1,0);
3164         
3165         /* now build o = FUN(catch,ap,handler) */
3166         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3167         TICK_ALLOC_FUN(2,0);
3168         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3169         o->payload[0] = (StgClosure *)ap;
3170         o->payload[1] = cf->handler;
3171         
3172         IF_DEBUG(scheduler,
3173                  fprintf(stderr,  "scheduler: Built ");
3174                  printObj((StgClosure *)o);
3175                  );
3176         
3177         /* pop the old handler and put o on the stack */
3178         su = cf->link;
3179         sp += sizeofW(StgCatchFrame) - 1;
3180         sp[0] = (W_)o;
3181         break;
3182       }
3183       
3184     case SEQ_FRAME:
3185       {
3186         StgSeqFrame *sf = (StgSeqFrame *)su;
3187         StgClosure* o;
3188         
3189         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3190         TICK_ALLOC_UPD_PAP(words+1,0);
3191         
3192         /* now build o = FUN(seq,ap) */
3193         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3194         TICK_ALLOC_SE_THK(1,0);
3195         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3196         o->payload[0] = (StgClosure *)ap;
3197         
3198         IF_DEBUG(scheduler,
3199                  fprintf(stderr,  "scheduler: Built ");
3200                  printObj((StgClosure *)o);
3201                  );
3202         
3203         /* pop the old handler and put o on the stack */
3204         su = sf->link;
3205         sp += sizeofW(StgSeqFrame) - 1;
3206         sp[0] = (W_)o;
3207         break;
3208       }
3209       
3210     case STOP_FRAME:
3211       /* We've stripped the entire stack, the thread is now dead. */
3212       sp += sizeofW(StgStopFrame) - 1;
3213       sp[0] = (W_)exception;    /* save the exception */
3214       tso->what_next = ThreadKilled;
3215       tso->su = (StgUpdateFrame *)(sp+1);
3216       tso->sp = sp;
3217       return;
3218
3219     default:
3220       barf("raiseAsync");
3221     }
3222   }
3223   barf("raiseAsync");
3224 }
3225
3226 /* -----------------------------------------------------------------------------
3227    resurrectThreads is called after garbage collection on the list of
3228    threads found to be garbage.  Each of these threads will be woken
3229    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3230    on an MVar, or NonTermination if the thread was blocked on a Black
3231    Hole.
3232    -------------------------------------------------------------------------- */
3233
3234 void
3235 resurrectThreads( StgTSO *threads )
3236 {
3237   StgTSO *tso, *next;
3238
3239   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3240     next = tso->global_link;
3241     tso->global_link = all_threads;
3242     all_threads = tso;
3243     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3244
3245     switch (tso->why_blocked) {
3246     case BlockedOnMVar:
3247     case BlockedOnException:
3248       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3249       break;
3250     case BlockedOnBlackHole:
3251       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3252       break;
3253     case NotBlocked:
3254       /* This might happen if the thread was blocked on a black hole
3255        * belonging to a thread that we've just woken up (raiseAsync
3256        * can wake up threads, remember...).
3257        */
3258       continue;
3259     default:
3260       barf("resurrectThreads: thread blocked in a strange way");
3261     }
3262   }
3263 }
3264
3265 /* -----------------------------------------------------------------------------
3266  * Blackhole detection: if we reach a deadlock, test whether any
3267  * threads are blocked on themselves.  Any threads which are found to
3268  * be self-blocked get sent a NonTermination exception.
3269  *
3270  * This is only done in a deadlock situation in order to avoid
3271  * performance overhead in the normal case.
3272  * -------------------------------------------------------------------------- */
3273
3274 static void
3275 detectBlackHoles( void )
3276 {
3277     StgTSO *t = all_threads;
3278     StgUpdateFrame *frame;
3279     StgClosure *blocked_on;
3280
3281     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3282
3283         while (t->what_next == ThreadRelocated) {
3284             t = t->link;
3285             ASSERT(get_itbl(t)->type == TSO);
3286         }
3287       
3288         if (t->why_blocked != BlockedOnBlackHole) {
3289             continue;
3290         }
3291
3292         blocked_on = t->block_info.closure;
3293
3294         for (frame = t->su; ; frame = frame->link) {
3295             switch (get_itbl(frame)->type) {
3296
3297             case UPDATE_FRAME:
3298                 if (frame->updatee == blocked_on) {
3299                     /* We are blocking on one of our own computations, so
3300                      * send this thread the NonTermination exception.  
3301                      */
3302                     IF_DEBUG(scheduler, 
3303                              sched_belch("thread %d is blocked on itself", t->id));
3304                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3305                     goto done;
3306                 }
3307                 else {
3308                     continue;
3309                 }
3310
3311             case CATCH_FRAME:
3312             case SEQ_FRAME:
3313                 continue;
3314                 
3315             case STOP_FRAME:
3316                 break;
3317             }
3318             break;
3319         }
3320
3321     done: ;
3322     }   
3323 }
3324
3325 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3326 //@subsection Debugging Routines
3327
3328 /* -----------------------------------------------------------------------------
3329    Debugging: why is a thread blocked
3330    -------------------------------------------------------------------------- */
3331
3332 #ifdef DEBUG
3333
3334 void
3335 printThreadBlockage(StgTSO *tso)
3336 {
3337   switch (tso->why_blocked) {
3338   case BlockedOnRead:
3339     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3340     break;
3341   case BlockedOnWrite:
3342     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3343     break;
3344   case BlockedOnDelay:
3345     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3346     break;
3347   case BlockedOnMVar:
3348     fprintf(stderr,"is blocked on an MVar");
3349     break;
3350   case BlockedOnException:
3351     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3352             tso->block_info.tso->id);
3353     break;
3354   case BlockedOnBlackHole:
3355     fprintf(stderr,"is blocked on a black hole");
3356     break;
3357   case NotBlocked:
3358     fprintf(stderr,"is not blocked");
3359     break;
3360 #if defined(PAR)
3361   case BlockedOnGA:
3362     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3363             tso->block_info.closure, info_type(tso->block_info.closure));
3364     break;
3365   case BlockedOnGA_NoSend:
3366     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3367             tso->block_info.closure, info_type(tso->block_info.closure));
3368     break;
3369 #endif
3370   default:
3371     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3372          tso->why_blocked, tso->id, tso);
3373   }
3374 }
3375
3376 void
3377 printThreadStatus(StgTSO *tso)
3378 {
3379   switch (tso->what_next) {
3380   case ThreadKilled:
3381     fprintf(stderr,"has been killed");
3382     break;
3383   case ThreadComplete:
3384     fprintf(stderr,"has completed");
3385     break;
3386   default:
3387     printThreadBlockage(tso);
3388   }
3389 }
3390
3391 void
3392 printAllThreads(void)
3393 {
3394   StgTSO *t;
3395
3396 # if defined(GRAN)
3397   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3398   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3399                        time_string, rtsFalse/*no commas!*/);
3400
3401   sched_belch("all threads at [%s]:", time_string);
3402 # elif defined(PAR)
3403   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3404   ullong_format_string(CURRENT_TIME,
3405                        time_string, rtsFalse/*no commas!*/);
3406
3407   sched_belch("all threads at [%s]:", time_string);
3408 # else
3409   sched_belch("all threads:");
3410 # endif
3411
3412   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3413     fprintf(stderr, "\tthread %d ", t->id);
3414     printThreadStatus(t);
3415     fprintf(stderr,"\n");
3416   }
3417 }
3418     
3419 /* 
3420    Print a whole blocking queue attached to node (debugging only).
3421 */
3422 //@cindex print_bq
3423 # if defined(PAR)
3424 void 
3425 print_bq (StgClosure *node)
3426 {
3427   StgBlockingQueueElement *bqe;
3428   StgTSO *tso;
3429   rtsBool end;
3430
3431   fprintf(stderr,"## BQ of closure %p (%s): ",
3432           node, info_type(node));
3433
3434   /* should cover all closures that may have a blocking queue */
3435   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3436          get_itbl(node)->type == FETCH_ME_BQ ||
3437          get_itbl(node)->type == RBH ||
3438          get_itbl(node)->type == MVAR);
3439     
3440   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3441
3442   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3443 }
3444
3445 /* 
3446    Print a whole blocking queue starting with the element bqe.
3447 */
3448 void 
3449 print_bqe (StgBlockingQueueElement *bqe)
3450 {
3451   rtsBool end;
3452
3453   /* 
3454      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3455   */
3456   for (end = (bqe==END_BQ_QUEUE);
3457        !end; // iterate until bqe points to a CONSTR
3458        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3459        bqe = end ? END_BQ_QUEUE : bqe->link) {
3460     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3461     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3462     /* types of closures that may appear in a blocking queue */
3463     ASSERT(get_itbl(bqe)->type == TSO ||           
3464            get_itbl(bqe)->type == BLOCKED_FETCH || 
3465            get_itbl(bqe)->type == CONSTR); 
3466     /* only BQs of an RBH end with an RBH_Save closure */
3467     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3468
3469     switch (get_itbl(bqe)->type) {
3470     case TSO:
3471       fprintf(stderr," TSO %u (%x),",
3472               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3473       break;
3474     case BLOCKED_FETCH:
3475       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3476               ((StgBlockedFetch *)bqe)->node, 
3477               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3478               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3479               ((StgBlockedFetch *)bqe)->ga.weight);
3480       break;
3481     case CONSTR:
3482       fprintf(stderr," %s (IP %p),",
3483               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3484                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3485                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3486                "RBH_Save_?"), get_itbl(bqe));
3487       break;
3488     default:
3489       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3490            info_type((StgClosure *)bqe)); // , node, info_type(node));
3491       break;
3492     }
3493   } /* for */
3494   fputc('\n', stderr);
3495 }
3496 # elif defined(GRAN)
3497 void 
3498 print_bq (StgClosure *node)
3499 {
3500   StgBlockingQueueElement *bqe;
3501   PEs node_loc, tso_loc;
3502   rtsBool end;
3503
3504   /* should cover all closures that may have a blocking queue */
3505   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3506          get_itbl(node)->type == FETCH_ME_BQ ||
3507          get_itbl(node)->type == RBH);
3508     
3509   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3510   node_loc = where_is(node);
3511
3512   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3513           node, info_type(node), node_loc);
3514
3515   /* 
3516      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3517   */
3518   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3519        !end; // iterate until bqe points to a CONSTR
3520        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3521     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3522     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3523     /* types of closures that may appear in a blocking queue */
3524     ASSERT(get_itbl(bqe)->type == TSO ||           
3525            get_itbl(bqe)->type == CONSTR); 
3526     /* only BQs of an RBH end with an RBH_Save closure */
3527     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3528
3529     tso_loc = where_is((StgClosure *)bqe);
3530     switch (get_itbl(bqe)->type) {
3531     case TSO:
3532       fprintf(stderr," TSO %d (%p) on [PE %d],",
3533               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3534       break;
3535     case CONSTR:
3536       fprintf(stderr," %s (IP %p),",
3537               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3538                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3539                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3540                "RBH_Save_?"), get_itbl(bqe));
3541       break;
3542     default:
3543       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3544            info_type((StgClosure *)bqe), node, info_type(node));
3545       break;
3546     }
3547   } /* for */
3548   fputc('\n', stderr);
3549 }
3550 #else
3551 /* 
3552    Nice and easy: only TSOs on the blocking queue
3553 */
3554 void 
3555 print_bq (StgClosure *node)
3556 {
3557   StgTSO *tso;
3558
3559   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3560   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3561        tso != END_TSO_QUEUE; 
3562        tso=tso->link) {
3563     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3564     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3565     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3566   }
3567   fputc('\n', stderr);
3568 }
3569 # endif
3570
3571 #if defined(PAR)
3572 static nat
3573 run_queue_len(void)
3574 {
3575   nat i;
3576   StgTSO *tso;
3577
3578   for (i=0, tso=run_queue_hd; 
3579        tso != END_TSO_QUEUE;
3580        i++, tso=tso->link)
3581     /* nothing */
3582
3583   return i;
3584 }
3585 #endif
3586
3587 static void
3588 sched_belch(char *s, ...)
3589 {
3590   va_list ap;
3591   va_start(ap,s);
3592 #ifdef SMP
3593   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3594 #elif defined(PAR)
3595   fprintf(stderr, "== ");
3596 #else
3597   fprintf(stderr, "scheduler: ");
3598 #endif
3599   vfprintf(stderr, s, ap);
3600   fprintf(stderr, "\n");
3601 }
3602
3603 #endif /* DEBUG */
3604
3605
3606 //@node Index,  , Debugging Routines, Main scheduling code
3607 //@subsection Index
3608
3609 //@index
3610 //* MainRegTable::  @cindex\s-+MainRegTable
3611 //* StgMainThread::  @cindex\s-+StgMainThread
3612 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3613 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3614 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3615 //* context_switch::  @cindex\s-+context_switch
3616 //* createThread::  @cindex\s-+createThread
3617 //* free_capabilities::  @cindex\s-+free_capabilities
3618 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3619 //* initScheduler::  @cindex\s-+initScheduler
3620 //* interrupted::  @cindex\s-+interrupted
3621 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3622 //* next_thread_id::  @cindex\s-+next_thread_id
3623 //* print_bq::  @cindex\s-+print_bq
3624 //* run_queue_hd::  @cindex\s-+run_queue_hd
3625 //* run_queue_tl::  @cindex\s-+run_queue_tl
3626 //* sched_mutex::  @cindex\s-+sched_mutex
3627 //* schedule::  @cindex\s-+schedule
3628 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3629 //* task_ids::  @cindex\s-+task_ids
3630 //* term_mutex::  @cindex\s-+term_mutex
3631 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3632 //@end index