[project @ 2001-03-23 16:36:20 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.95 2001/03/23 16:36:21 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "Rts.h"
78 #include "SchedAPI.h"
79 #include "RtsUtils.h"
80 #include "RtsFlags.h"
81 #include "Storage.h"
82 #include "StgRun.h"
83 #include "StgStartup.h"
84 #include "GC.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #if defined(GRAN) || defined(PAR)
99 # include "GranSimRts.h"
100 # include "GranSim.h"
101 # include "ParallelRts.h"
102 # include "Parallel.h"
103 # include "ParallelDebug.h"
104 # include "FetchMe.h"
105 # include "HLC.h"
106 #endif
107 #include "Sparks.h"
108
109 #include <stdarg.h>
110
111 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
112 //@subsection Variables and Data structures
113
114 /* Main threads:
115  *
116  * These are the threads which clients have requested that we run.  
117  *
118  * In an SMP build, we might have several concurrent clients all
119  * waiting for results, and each one will wait on a condition variable
120  * until the result is available.
121  *
122  * In non-SMP, clients are strictly nested: the first client calls
123  * into the RTS, which might call out again to C with a _ccall_GC, and
124  * eventually re-enter the RTS.
125  *
126  * Main threads information is kept in a linked list:
127  */
128 //@cindex StgMainThread
129 typedef struct StgMainThread_ {
130   StgTSO *         tso;
131   SchedulerStatus  stat;
132   StgClosure **    ret;
133 #ifdef SMP
134   pthread_cond_t wakeup;
135 #endif
136   struct StgMainThread_ *link;
137 } StgMainThread;
138
139 /* Main thread queue.
140  * Locks required: sched_mutex.
141  */
142 static StgMainThread *main_threads;
143
144 /* Thread queues.
145  * Locks required: sched_mutex.
146  */
147 #if defined(GRAN)
148
149 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
150 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
151
152 /* 
153    In GranSim we have a runable and a blocked queue for each processor.
154    In order to minimise code changes new arrays run_queue_hds/tls
155    are created. run_queue_hd is then a short cut (macro) for
156    run_queue_hds[CurrentProc] (see GranSim.h).
157    -- HWL
158 */
159 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
160 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
161 StgTSO *ccalling_threadss[MAX_PROC];
162 /* We use the same global list of threads (all_threads) in GranSim as in
163    the std RTS (i.e. we are cheating). However, we don't use this list in
164    the GranSim specific code at the moment (so we are only potentially
165    cheating).  */
166
167 #else /* !GRAN */
168
169 StgTSO *run_queue_hd, *run_queue_tl;
170 StgTSO *blocked_queue_hd, *blocked_queue_tl;
171 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
172
173 #endif
174
175 /* Linked list of all threads.
176  * Used for detecting garbage collected threads.
177  */
178 StgTSO *all_threads;
179
180 /* Threads suspended in _ccall_GC.
181  */
182 static StgTSO *suspended_ccalling_threads;
183
184 static void GetRoots(void);
185 static StgTSO *threadStackOverflow(StgTSO *tso);
186
187 /* KH: The following two flags are shared memory locations.  There is no need
188        to lock them, since they are only unset at the end of a scheduler
189        operation.
190 */
191
192 /* flag set by signal handler to precipitate a context switch */
193 //@cindex context_switch
194 nat context_switch;
195
196 /* if this flag is set as well, give up execution */
197 //@cindex interrupted
198 rtsBool interrupted;
199
200 /* Next thread ID to allocate.
201  * Locks required: sched_mutex
202  */
203 //@cindex next_thread_id
204 StgThreadID next_thread_id = 1;
205
206 /*
207  * Pointers to the state of the current thread.
208  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
209  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
210  */
211  
212 /* The smallest stack size that makes any sense is:
213  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
214  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
215  *  + 1                       (the realworld token for an IO thread)
216  *  + 1                       (the closure to enter)
217  *
218  * A thread with this stack will bomb immediately with a stack
219  * overflow, which will increase its stack size.  
220  */
221
222 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
223
224 /* Free capability list.
225  * Locks required: sched_mutex.
226  */
227 #ifdef SMP
228 //@cindex free_capabilities
229 //@cindex n_free_capabilities
230 Capability *free_capabilities; /* Available capabilities for running threads */
231 nat n_free_capabilities;       /* total number of available capabilities */
232 #else
233 //@cindex MainRegTable
234 Capability MainRegTable;       /* for non-SMP, we have one global capability */
235 #endif
236
237 #if defined(GRAN)
238 StgTSO *CurrentTSO;
239 #endif
240
241 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
242  *  exists - earlier gccs apparently didn't.
243  *  -= chak
244  */
245 StgTSO dummy_tso;
246
247 rtsBool ready_to_gc;
248
249 /* All our current task ids, saved in case we need to kill them later.
250  */
251 #ifdef SMP
252 //@cindex task_ids
253 task_info *task_ids;
254 #endif
255
256 void            addToBlockedQueue ( StgTSO *tso );
257
258 static void     schedule          ( void );
259        void     interruptStgRts   ( void );
260 #if defined(GRAN)
261 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
262 #else
263 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
264 #endif
265
266 static void     detectBlackHoles  ( void );
267
268 #ifdef DEBUG
269 static void sched_belch(char *s, ...);
270 #endif
271
272 #ifdef SMP
273 //@cindex sched_mutex
274 //@cindex term_mutex
275 //@cindex thread_ready_cond
276 //@cindex gc_pending_cond
277 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
278 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
279 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
280 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
281
282 nat await_death;
283 #endif
284
285 #if defined(PAR)
286 StgTSO *LastTSO;
287 rtsTime TimeOfLastYield;
288 rtsBool emitSchedule = rtsTrue;
289 #endif
290
291 #if DEBUG
292 char *whatNext_strs[] = {
293   "ThreadEnterGHC",
294   "ThreadRunGHC",
295   "ThreadEnterInterp",
296   "ThreadKilled",
297   "ThreadComplete"
298 };
299
300 char *threadReturnCode_strs[] = {
301   "HeapOverflow",                       /* might also be StackOverflow */
302   "StackOverflow",
303   "ThreadYielding",
304   "ThreadBlocked",
305   "ThreadFinished"
306 };
307 #endif
308
309 #ifdef PAR
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);  
312 #endif
313
314 /*
315  * The thread state for the main thread.
316 // ToDo: check whether not needed any more
317 StgTSO   *MainTSO;
318  */
319
320 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
321 //@subsection Main scheduling loop
322
323 /* ---------------------------------------------------------------------------
324    Main scheduling loop.
325
326    We use round-robin scheduling, each thread returning to the
327    scheduler loop when one of these conditions is detected:
328
329       * out of heap space
330       * timer expires (thread yields)
331       * thread blocks
332       * thread ends
333       * stack overflow
334
335    Locking notes:  we acquire the scheduler lock once at the beginning
336    of the scheduler loop, and release it when
337     
338       * running a thread, or
339       * waiting for work, or
340       * waiting for a GC to complete.
341
342    GRAN version:
343      In a GranSim setup this loop iterates over the global event queue.
344      This revolves around the global event queue, which determines what 
345      to do next. Therefore, it's more complicated than either the 
346      concurrent or the parallel (GUM) setup.
347
348    GUM version:
349      GUM iterates over incoming messages.
350      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
351      and sends out a fish whenever it has nothing to do; in-between
352      doing the actual reductions (shared code below) it processes the
353      incoming messages and deals with delayed operations 
354      (see PendingFetches).
355      This is not the ugliest code you could imagine, but it's bloody close.
356
357    ------------------------------------------------------------------------ */
358 //@cindex schedule
359 static void
360 schedule( void )
361 {
362   StgTSO *t;
363   Capability *cap;
364   StgThreadReturnCode ret;
365 #if defined(GRAN)
366   rtsEvent *event;
367 #elif defined(PAR)
368   StgSparkPool *pool;
369   rtsSpark spark;
370   StgTSO *tso;
371   GlobalTaskId pe;
372   rtsBool receivedFinish = rtsFalse;
373 # if defined(DEBUG)
374   nat tp_size, sp_size; // stats only
375 # endif
376 #endif
377   rtsBool was_interrupted = rtsFalse;
378   
379   ACQUIRE_LOCK(&sched_mutex);
380
381 #if defined(GRAN)
382
383   /* set up first event to get things going */
384   /* ToDo: assign costs for system setup and init MainTSO ! */
385   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
386             ContinueThread, 
387             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
388
389   IF_DEBUG(gran,
390            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391            G_TSO(CurrentTSO, 5));
392
393   if (RtsFlags.GranFlags.Light) {
394     /* Save current time; GranSim Light only */
395     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
396   }      
397
398   event = get_next_event();
399
400   while (event!=(rtsEvent*)NULL) {
401     /* Choose the processor with the next event */
402     CurrentProc = event->proc;
403     CurrentTSO = event->tso;
404
405 #elif defined(PAR)
406
407   while (!receivedFinish) {    /* set by processMessages */
408                                /* when receiving PP_FINISH message         */ 
409 #else
410
411   while (1) {
412
413 #endif
414
415     IF_DEBUG(scheduler, printAllThreads());
416
417     /* If we're interrupted (the user pressed ^C, or some other
418      * termination condition occurred), kill all the currently running
419      * threads.
420      */
421     if (interrupted) {
422       IF_DEBUG(scheduler, sched_belch("interrupted"));
423       deleteAllThreads();
424       interrupted = rtsFalse;
425       was_interrupted = rtsTrue;
426     }
427
428     /* Go through the list of main threads and wake up any
429      * clients whose computations have finished.  ToDo: this
430      * should be done more efficiently without a linear scan
431      * of the main threads list, somehow...
432      */
433 #ifdef SMP
434     { 
435       StgMainThread *m, **prev;
436       prev = &main_threads;
437       for (m = main_threads; m != NULL; m = m->link) {
438         switch (m->tso->what_next) {
439         case ThreadComplete:
440           if (m->ret) {
441             *(m->ret) = (StgClosure *)m->tso->sp[0];
442           }
443           *prev = m->link;
444           m->stat = Success;
445           pthread_cond_broadcast(&m->wakeup);
446           break;
447         case ThreadKilled:
448           *prev = m->link;
449           if (was_interrupted) {
450             m->stat = Interrupted;
451           } else {
452             m->stat = Killed;
453           }
454           pthread_cond_broadcast(&m->wakeup);
455           break;
456         default:
457           break;
458         }
459       }
460     }
461
462 #else
463 # if defined(PAR)
464     /* in GUM do this only on the Main PE */
465     if (IAmMainThread)
466 # endif
467     /* If our main thread has finished or been killed, return.
468      */
469     {
470       StgMainThread *m = main_threads;
471       if (m->tso->what_next == ThreadComplete
472           || m->tso->what_next == ThreadKilled) {
473         main_threads = main_threads->link;
474         if (m->tso->what_next == ThreadComplete) {
475           /* we finished successfully, fill in the return value */
476           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
477           m->stat = Success;
478           return;
479         } else {
480           if (was_interrupted) {
481             m->stat = Interrupted;
482           } else {
483             m->stat = Killed;
484           }
485           return;
486         }
487       }
488     }
489 #endif
490
491     /* Top up the run queue from our spark pool.  We try to make the
492      * number of threads in the run queue equal to the number of
493      * free capabilities.
494      */
495 #if defined(SMP)
496     {
497       nat n = n_free_capabilities;
498       StgTSO *tso = run_queue_hd;
499
500       /* Count the run queue */
501       while (n > 0 && tso != END_TSO_QUEUE) {
502         tso = tso->link;
503         n--;
504       }
505
506       for (; n > 0; n--) {
507         StgClosure *spark;
508         spark = findSpark(rtsFalse);
509         if (spark == NULL) {
510           break; /* no more sparks in the pool */
511         } else {
512           /* I'd prefer this to be done in activateSpark -- HWL */
513           /* tricky - it needs to hold the scheduler lock and
514            * not try to re-acquire it -- SDM */
515           createSparkThread(spark);       
516           IF_DEBUG(scheduler,
517                    sched_belch("==^^ turning spark of closure %p into a thread",
518                                (StgClosure *)spark));
519         }
520       }
521       /* We need to wake up the other tasks if we just created some
522        * work for them.
523        */
524       if (n_free_capabilities - n > 1) {
525           pthread_cond_signal(&thread_ready_cond);
526       }
527     }
528 #endif /* SMP */
529
530     /* Check whether any waiting threads need to be woken up.  If the
531      * run queue is empty, and there are no other tasks running, we
532      * can wait indefinitely for something to happen.
533      * ToDo: what if another client comes along & requests another
534      * main thread?
535      */
536     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
537       awaitEvent(
538            (run_queue_hd == END_TSO_QUEUE)
539 #ifdef SMP
540         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
541 #endif
542         );
543     }
544     /* we can be interrupted while waiting for I/O... */
545     if (interrupted) continue;
546
547     /* check for signals each time around the scheduler */
548 #ifndef mingw32_TARGET_OS
549     if (signals_pending()) {
550       start_signal_handlers();
551     }
552 #endif
553
554     /* 
555      * Detect deadlock: when we have no threads to run, there are no
556      * threads waiting on I/O or sleeping, and all the other tasks are
557      * waiting for work, we must have a deadlock of some description.
558      *
559      * We first try to find threads blocked on themselves (ie. black
560      * holes), and generate NonTermination exceptions where necessary.
561      *
562      * If no threads are black holed, we have a deadlock situation, so
563      * inform all the main threads.
564      */
565 #ifdef SMP
566     if (blocked_queue_hd == END_TSO_QUEUE
567         && run_queue_hd == END_TSO_QUEUE
568         && sleeping_queue == END_TSO_QUEUE
569         && (n_free_capabilities == RtsFlags.ParFlags.nNodes))
570     {
571         IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes..."));
572         detectBlackHoles();
573         if (run_queue_hd == END_TSO_QUEUE) {
574             StgMainThread *m;
575             for (m = main_threads; m != NULL; m = m->link) {
576                 m->ret = NULL;
577                 m->stat = Deadlock;
578                 pthread_cond_broadcast(&m->wakeup);
579             }
580             main_threads = NULL;
581         }
582     }
583 #elif defined(PAR)
584     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
585 #else /* ! SMP */
586     if (blocked_queue_hd == END_TSO_QUEUE
587         && run_queue_hd == END_TSO_QUEUE
588         && sleeping_queue == END_TSO_QUEUE)
589     {
590         IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes..."));
591         detectBlackHoles();
592         if (run_queue_hd == END_TSO_QUEUE) {
593             StgMainThread *m = main_threads;
594             m->ret = NULL;
595             m->stat = Deadlock;
596             main_threads = m->link;
597             return;
598         }
599     }
600 #endif
601
602 #ifdef SMP
603     /* If there's a GC pending, don't do anything until it has
604      * completed.
605      */
606     if (ready_to_gc) {
607       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
608       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
609     }
610     
611     /* block until we've got a thread on the run queue and a free
612      * capability.
613      */
614     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
615       IF_DEBUG(scheduler, sched_belch("waiting for work"));
616       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
617       IF_DEBUG(scheduler, sched_belch("work now available"));
618     }
619 #endif
620
621 #if defined(GRAN)
622
623     if (RtsFlags.GranFlags.Light)
624       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
625
626     /* adjust time based on time-stamp */
627     if (event->time > CurrentTime[CurrentProc] &&
628         event->evttype != ContinueThread)
629       CurrentTime[CurrentProc] = event->time;
630     
631     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
632     if (!RtsFlags.GranFlags.Light)
633       handleIdlePEs();
634
635     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
636
637     /* main event dispatcher in GranSim */
638     switch (event->evttype) {
639       /* Should just be continuing execution */
640     case ContinueThread:
641       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
642       /* ToDo: check assertion
643       ASSERT(run_queue_hd != (StgTSO*)NULL &&
644              run_queue_hd != END_TSO_QUEUE);
645       */
646       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
647       if (!RtsFlags.GranFlags.DoAsyncFetch &&
648           procStatus[CurrentProc]==Fetching) {
649         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
650               CurrentTSO->id, CurrentTSO, CurrentProc);
651         goto next_thread;
652       } 
653       /* Ignore ContinueThreads for completed threads */
654       if (CurrentTSO->what_next == ThreadComplete) {
655         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
656               CurrentTSO->id, CurrentTSO, CurrentProc);
657         goto next_thread;
658       } 
659       /* Ignore ContinueThreads for threads that are being migrated */
660       if (PROCS(CurrentTSO)==Nowhere) { 
661         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
662               CurrentTSO->id, CurrentTSO, CurrentProc);
663         goto next_thread;
664       }
665       /* The thread should be at the beginning of the run queue */
666       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
667         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
668               CurrentTSO->id, CurrentTSO, CurrentProc);
669         break; // run the thread anyway
670       }
671       /*
672       new_event(proc, proc, CurrentTime[proc],
673                 FindWork,
674                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
675       goto next_thread; 
676       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
677       break; // now actually run the thread; DaH Qu'vam yImuHbej 
678
679     case FetchNode:
680       do_the_fetchnode(event);
681       goto next_thread;             /* handle next event in event queue  */
682       
683     case GlobalBlock:
684       do_the_globalblock(event);
685       goto next_thread;             /* handle next event in event queue  */
686       
687     case FetchReply:
688       do_the_fetchreply(event);
689       goto next_thread;             /* handle next event in event queue  */
690       
691     case UnblockThread:   /* Move from the blocked queue to the tail of */
692       do_the_unblock(event);
693       goto next_thread;             /* handle next event in event queue  */
694       
695     case ResumeThread:  /* Move from the blocked queue to the tail of */
696       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
697       event->tso->gran.blocktime += 
698         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
699       do_the_startthread(event);
700       goto next_thread;             /* handle next event in event queue  */
701       
702     case StartThread:
703       do_the_startthread(event);
704       goto next_thread;             /* handle next event in event queue  */
705       
706     case MoveThread:
707       do_the_movethread(event);
708       goto next_thread;             /* handle next event in event queue  */
709       
710     case MoveSpark:
711       do_the_movespark(event);
712       goto next_thread;             /* handle next event in event queue  */
713       
714     case FindWork:
715       do_the_findwork(event);
716       goto next_thread;             /* handle next event in event queue  */
717       
718     default:
719       barf("Illegal event type %u\n", event->evttype);
720     }  /* switch */
721     
722     /* This point was scheduler_loop in the old RTS */
723
724     IF_DEBUG(gran, belch("GRAN: after main switch"));
725
726     TimeOfLastEvent = CurrentTime[CurrentProc];
727     TimeOfNextEvent = get_time_of_next_event();
728     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
729     // CurrentTSO = ThreadQueueHd;
730
731     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
732                          TimeOfNextEvent));
733
734     if (RtsFlags.GranFlags.Light) 
735       GranSimLight_leave_system(event, &ActiveTSO); 
736
737     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
738
739     IF_DEBUG(gran, 
740              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
741
742     /* in a GranSim setup the TSO stays on the run queue */
743     t = CurrentTSO;
744     /* Take a thread from the run queue. */
745     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
746
747     IF_DEBUG(gran, 
748              fprintf(stderr, "GRAN: About to run current thread, which is\n");
749              G_TSO(t,5));
750
751     context_switch = 0; // turned on via GranYield, checking events and time slice
752
753     IF_DEBUG(gran, 
754              DumpGranEvent(GR_SCHEDULE, t));
755
756     procStatus[CurrentProc] = Busy;
757
758 #elif defined(PAR)
759     if (PendingFetches != END_BF_QUEUE) {
760         processFetches();
761     }
762
763     /* ToDo: phps merge with spark activation above */
764     /* check whether we have local work and send requests if we have none */
765     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
766       /* :-[  no local threads => look out for local sparks */
767       /* the spark pool for the current PE */
768       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
769       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
770           pool->hd < pool->tl) {
771         /* 
772          * ToDo: add GC code check that we really have enough heap afterwards!!
773          * Old comment:
774          * If we're here (no runnable threads) and we have pending
775          * sparks, we must have a space problem.  Get enough space
776          * to turn one of those pending sparks into a
777          * thread... 
778          */
779
780         spark = findSpark(rtsFalse);                /* get a spark */
781         if (spark != (rtsSpark) NULL) {
782           tso = activateSpark(spark);       /* turn the spark into a thread */
783           IF_PAR_DEBUG(schedule,
784                        belch("==== schedule: Created TSO %d (%p); %d threads active",
785                              tso->id, tso, advisory_thread_count));
786
787           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
788             belch("==^^ failed to activate spark");
789             goto next_thread;
790           }               /* otherwise fall through & pick-up new tso */
791         } else {
792           IF_PAR_DEBUG(verbose,
793                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
794                              spark_queue_len(pool)));
795           goto next_thread;
796         }
797       }
798
799       /* If we still have no work we need to send a FISH to get a spark
800          from another PE 
801       */
802       if (EMPTY_RUN_QUEUE()) {
803       /* =8-[  no local sparks => look for work on other PEs */
804         /*
805          * We really have absolutely no work.  Send out a fish
806          * (there may be some out there already), and wait for
807          * something to arrive.  We clearly can't run any threads
808          * until a SCHEDULE or RESUME arrives, and so that's what
809          * we're hoping to see.  (Of course, we still have to
810          * respond to other types of messages.)
811          */
812         TIME now = msTime() /*CURRENT_TIME*/;
813         IF_PAR_DEBUG(verbose, 
814                      belch("--  now=%ld", now));
815         IF_PAR_DEBUG(verbose,
816                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
817                          (last_fish_arrived_at!=0 &&
818                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
819                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
820                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
821                              last_fish_arrived_at,
822                              RtsFlags.ParFlags.fishDelay, now);
823                      });
824         
825         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
826             (last_fish_arrived_at==0 ||
827              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
828           /* outstandingFishes is set in sendFish, processFish;
829              avoid flooding system with fishes via delay */
830           pe = choosePE();
831           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
832                    NEW_FISH_HUNGER);
833
834           // Global statistics: count no. of fishes
835           if (RtsFlags.ParFlags.ParStats.Global &&
836               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
837             globalParStats.tot_fish_mess++;
838           }
839         }
840       
841         receivedFinish = processMessages();
842         goto next_thread;
843       }
844     } else if (PacketsWaiting()) {  /* Look for incoming messages */
845       receivedFinish = processMessages();
846     }
847
848     /* Now we are sure that we have some work available */
849     ASSERT(run_queue_hd != END_TSO_QUEUE);
850
851     /* Take a thread from the run queue, if we have work */
852     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
853     IF_DEBUG(sanity,checkTSO(t));
854
855     /* ToDo: write something to the log-file
856     if (RTSflags.ParFlags.granSimStats && !sameThread)
857         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
858
859     CurrentTSO = t;
860     */
861     /* the spark pool for the current PE */
862     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
863
864     IF_DEBUG(scheduler, 
865              belch("--=^ %d threads, %d sparks on [%#x]", 
866                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
867
868 #if 1
869     if (0 && RtsFlags.ParFlags.ParStats.Full && 
870         t && LastTSO && t->id != LastTSO->id && 
871         LastTSO->why_blocked == NotBlocked && 
872         LastTSO->what_next != ThreadComplete) {
873       // if previously scheduled TSO not blocked we have to record the context switch
874       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
875                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
876     }
877
878     if (RtsFlags.ParFlags.ParStats.Full && 
879         (emitSchedule /* forced emit */ ||
880         (t && LastTSO && t->id != LastTSO->id))) {
881       /* 
882          we are running a different TSO, so write a schedule event to log file
883          NB: If we use fair scheduling we also have to write  a deschedule 
884              event for LastTSO; with unfair scheduling we know that the
885              previous tso has blocked whenever we switch to another tso, so
886              we don't need it in GUM for now
887       */
888       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
889                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
890       emitSchedule = rtsFalse;
891     }
892      
893 #endif
894 #else /* !GRAN && !PAR */
895   
896     /* grab a thread from the run queue
897      */
898     ASSERT(run_queue_hd != END_TSO_QUEUE);
899     t = POP_RUN_QUEUE();
900     IF_DEBUG(sanity,checkTSO(t));
901
902 #endif
903     
904     /* grab a capability
905      */
906 #ifdef SMP
907     cap = free_capabilities;
908     free_capabilities = cap->link;
909     n_free_capabilities--;
910 #else
911     cap = &MainRegTable;
912 #endif
913     
914     cap->rCurrentTSO = t;
915     
916     /* context switches are now initiated by the timer signal, unless
917      * the user specified "context switch as often as possible", with
918      * +RTS -C0
919      */
920     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
921         && (run_queue_hd != END_TSO_QUEUE
922             || blocked_queue_hd != END_TSO_QUEUE
923             || sleeping_queue != END_TSO_QUEUE))
924         context_switch = 1;
925     else
926         context_switch = 0;
927
928     RELEASE_LOCK(&sched_mutex);
929
930     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
931                               t->id, t, whatNext_strs[t->what_next]));
932
933     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
934     /* Run the current thread 
935      */
936     switch (cap->rCurrentTSO->what_next) {
937     case ThreadKilled:
938     case ThreadComplete:
939         /* Thread already finished, return to scheduler. */
940         ret = ThreadFinished;
941         break;
942     case ThreadEnterGHC:
943         ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
944         break;
945     case ThreadRunGHC:
946         ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
947         break;
948     case ThreadEnterInterp:
949         ret = interpretBCO(cap);
950         break;
951     default:
952       barf("schedule: invalid what_next field");
953     }
954     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
955     
956     /* Costs for the scheduler are assigned to CCS_SYSTEM */
957 #ifdef PROFILING
958     CCCS = CCS_SYSTEM;
959 #endif
960     
961     ACQUIRE_LOCK(&sched_mutex);
962
963 #ifdef SMP
964     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
965 #elif !defined(GRAN) && !defined(PAR)
966     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
967 #endif
968     t = cap->rCurrentTSO;
969     
970 #if defined(PAR)
971     /* HACK 675: if the last thread didn't yield, make sure to print a 
972        SCHEDULE event to the log file when StgRunning the next thread, even
973        if it is the same one as before */
974     LastTSO = t; 
975     TimeOfLastYield = CURRENT_TIME;
976 #endif
977
978     switch (ret) {
979     case HeapOverflow:
980 #if defined(GRAN)
981       IF_DEBUG(gran, 
982                DumpGranEvent(GR_DESCHEDULE, t));
983       globalGranStats.tot_heapover++;
984 #elif defined(PAR)
985       // IF_DEBUG(par, 
986       //DumpGranEvent(GR_DESCHEDULE, t);
987       globalParStats.tot_heapover++;
988 #endif
989       /* make all the running tasks block on a condition variable,
990        * maybe set context_switch and wait till they all pile in,
991        * then have them wait on a GC condition variable.
992        */
993       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
994                                t->id, t, whatNext_strs[t->what_next]));
995       threadPaused(t);
996 #if defined(GRAN)
997       ASSERT(!is_on_queue(t,CurrentProc));
998 #elif defined(PAR)
999       /* Currently we emit a DESCHEDULE event before GC in GUM.
1000          ToDo: either add separate event to distinguish SYSTEM time from rest
1001                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1002       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1003         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1004                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1005         emitSchedule = rtsTrue;
1006       }
1007 #endif
1008       
1009       ready_to_gc = rtsTrue;
1010       context_switch = 1;               /* stop other threads ASAP */
1011       PUSH_ON_RUN_QUEUE(t);
1012       /* actual GC is done at the end of the while loop */
1013       break;
1014       
1015     case StackOverflow:
1016 #if defined(GRAN)
1017       IF_DEBUG(gran, 
1018                DumpGranEvent(GR_DESCHEDULE, t));
1019       globalGranStats.tot_stackover++;
1020 #elif defined(PAR)
1021       // IF_DEBUG(par, 
1022       // DumpGranEvent(GR_DESCHEDULE, t);
1023       globalParStats.tot_stackover++;
1024 #endif
1025       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1026                                t->id, t, whatNext_strs[t->what_next]));
1027       /* just adjust the stack for this thread, then pop it back
1028        * on the run queue.
1029        */
1030       threadPaused(t);
1031       { 
1032         StgMainThread *m;
1033         /* enlarge the stack */
1034         StgTSO *new_t = threadStackOverflow(t);
1035         
1036         /* This TSO has moved, so update any pointers to it from the
1037          * main thread stack.  It better not be on any other queues...
1038          * (it shouldn't be).
1039          */
1040         for (m = main_threads; m != NULL; m = m->link) {
1041           if (m->tso == t) {
1042             m->tso = new_t;
1043           }
1044         }
1045         threadPaused(new_t);
1046         PUSH_ON_RUN_QUEUE(new_t);
1047       }
1048       break;
1049
1050     case ThreadYielding:
1051 #if defined(GRAN)
1052       IF_DEBUG(gran, 
1053                DumpGranEvent(GR_DESCHEDULE, t));
1054       globalGranStats.tot_yields++;
1055 #elif defined(PAR)
1056       // IF_DEBUG(par, 
1057       // DumpGranEvent(GR_DESCHEDULE, t);
1058       globalParStats.tot_yields++;
1059 #endif
1060       /* put the thread back on the run queue.  Then, if we're ready to
1061        * GC, check whether this is the last task to stop.  If so, wake
1062        * up the GC thread.  getThread will block during a GC until the
1063        * GC is finished.
1064        */
1065       IF_DEBUG(scheduler,
1066                if (t->what_next == ThreadEnterInterp) {
1067                    /* ToDo: or maybe a timer expired when we were in Hugs?
1068                     * or maybe someone hit ctrl-C
1069                     */
1070                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1071                          t->id, t, whatNext_strs[t->what_next]);
1072                } else {
1073                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1074                          t->id, t, whatNext_strs[t->what_next]);
1075                }
1076                );
1077
1078       threadPaused(t);
1079
1080       IF_DEBUG(sanity,
1081                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1082                checkTSO(t));
1083       ASSERT(t->link == END_TSO_QUEUE);
1084 #if defined(GRAN)
1085       ASSERT(!is_on_queue(t,CurrentProc));
1086
1087       IF_DEBUG(sanity,
1088                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1089                checkThreadQsSanity(rtsTrue));
1090 #endif
1091 #if defined(PAR)
1092       if (RtsFlags.ParFlags.doFairScheduling) { 
1093         /* this does round-robin scheduling; good for concurrency */
1094         APPEND_TO_RUN_QUEUE(t);
1095       } else {
1096         /* this does unfair scheduling; good for parallelism */
1097         PUSH_ON_RUN_QUEUE(t);
1098       }
1099 #else
1100       /* this does round-robin scheduling; good for concurrency */
1101       APPEND_TO_RUN_QUEUE(t);
1102 #endif
1103 #if defined(GRAN)
1104       /* add a ContinueThread event to actually process the thread */
1105       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1106                 ContinueThread,
1107                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1108       IF_GRAN_DEBUG(bq, 
1109                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1110                G_EVENTQ(0);
1111                G_CURR_THREADQ(0));
1112 #endif /* GRAN */
1113       break;
1114       
1115     case ThreadBlocked:
1116 #if defined(GRAN)
1117       IF_DEBUG(scheduler,
1118                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1119                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1120                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1121
1122       // ??? needed; should emit block before
1123       IF_DEBUG(gran, 
1124                DumpGranEvent(GR_DESCHEDULE, t)); 
1125       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1126       /*
1127         ngoq Dogh!
1128       ASSERT(procStatus[CurrentProc]==Busy || 
1129               ((procStatus[CurrentProc]==Fetching) && 
1130               (t->block_info.closure!=(StgClosure*)NULL)));
1131       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1132           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1133             procStatus[CurrentProc]==Fetching)) 
1134         procStatus[CurrentProc] = Idle;
1135       */
1136 #elif defined(PAR)
1137       IF_DEBUG(scheduler,
1138                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1139                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1140       IF_PAR_DEBUG(bq,
1141
1142                    if (t->block_info.closure!=(StgClosure*)NULL) 
1143                      print_bq(t->block_info.closure));
1144
1145       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1146       blockThread(t);
1147
1148       /* whatever we schedule next, we must log that schedule */
1149       emitSchedule = rtsTrue;
1150
1151 #else /* !GRAN */
1152       /* don't need to do anything.  Either the thread is blocked on
1153        * I/O, in which case we'll have called addToBlockedQueue
1154        * previously, or it's blocked on an MVar or Blackhole, in which
1155        * case it'll be on the relevant queue already.
1156        */
1157       IF_DEBUG(scheduler,
1158                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1159                printThreadBlockage(t);
1160                fprintf(stderr, "\n"));
1161
1162       /* Only for dumping event to log file 
1163          ToDo: do I need this in GranSim, too?
1164       blockThread(t);
1165       */
1166 #endif
1167       threadPaused(t);
1168       break;
1169       
1170     case ThreadFinished:
1171       /* Need to check whether this was a main thread, and if so, signal
1172        * the task that started it with the return value.  If we have no
1173        * more main threads, we probably need to stop all the tasks until
1174        * we get a new one.
1175        */
1176       /* We also end up here if the thread kills itself with an
1177        * uncaught exception, see Exception.hc.
1178        */
1179       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1180 #if defined(GRAN)
1181       endThread(t, CurrentProc); // clean-up the thread
1182 #elif defined(PAR)
1183       /* For now all are advisory -- HWL */
1184       //if(t->priority==AdvisoryPriority) ??
1185       advisory_thread_count--;
1186       
1187 # ifdef DIST
1188       if(t->dist.priority==RevalPriority)
1189         FinishReval(t);
1190 # endif
1191       
1192       if (RtsFlags.ParFlags.ParStats.Full &&
1193           !RtsFlags.ParFlags.ParStats.Suppressed) 
1194         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1195 #endif
1196       break;
1197       
1198     default:
1199       barf("schedule: invalid thread return code %d", (int)ret);
1200     }
1201     
1202 #ifdef SMP
1203     cap->link = free_capabilities;
1204     free_capabilities = cap;
1205     n_free_capabilities++;
1206 #endif
1207
1208 #ifdef SMP
1209     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1210 #else
1211     if (ready_to_gc) 
1212 #endif
1213       {
1214       /* everybody back, start the GC.
1215        * Could do it in this thread, or signal a condition var
1216        * to do it in another thread.  Either way, we need to
1217        * broadcast on gc_pending_cond afterward.
1218        */
1219 #ifdef SMP
1220       IF_DEBUG(scheduler,sched_belch("doing GC"));
1221 #endif
1222       GarbageCollect(GetRoots,rtsFalse);
1223       ready_to_gc = rtsFalse;
1224 #ifdef SMP
1225       pthread_cond_broadcast(&gc_pending_cond);
1226 #endif
1227 #if defined(GRAN)
1228       /* add a ContinueThread event to continue execution of current thread */
1229       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1230                 ContinueThread,
1231                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1232       IF_GRAN_DEBUG(bq, 
1233                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1234                G_EVENTQ(0);
1235                G_CURR_THREADQ(0));
1236 #endif /* GRAN */
1237     }
1238 #if defined(GRAN)
1239   next_thread:
1240     IF_GRAN_DEBUG(unused,
1241                   print_eventq(EventHd));
1242
1243     event = get_next_event();
1244
1245 #elif defined(PAR)
1246   next_thread:
1247     /* ToDo: wait for next message to arrive rather than busy wait */
1248
1249 #else /* GRAN */
1250   /* not any more
1251   next_thread:
1252     t = take_off_run_queue(END_TSO_QUEUE);
1253   */
1254 #endif /* GRAN */
1255   } /* end of while(1) */
1256   IF_PAR_DEBUG(verbose,
1257                belch("== Leaving schedule() after having received Finish"));
1258 }
1259
1260 /* ---------------------------------------------------------------------------
1261  * deleteAllThreads():  kill all the live threads.
1262  *
1263  * This is used when we catch a user interrupt (^C), before performing
1264  * any necessary cleanups and running finalizers.
1265  * ------------------------------------------------------------------------- */
1266    
1267 void deleteAllThreads ( void )
1268 {
1269   StgTSO* t;
1270   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1271   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1272       deleteThread(t);
1273   }
1274   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1275       deleteThread(t);
1276   }
1277   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1278       deleteThread(t);
1279   }
1280   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1281   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1282   sleeping_queue = END_TSO_QUEUE;
1283 }
1284
1285 /* startThread and  insertThread are now in GranSim.c -- HWL */
1286
1287 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1288 //@subsection Suspend and Resume
1289
1290 /* ---------------------------------------------------------------------------
1291  * Suspending & resuming Haskell threads.
1292  * 
1293  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1294  * its capability before calling the C function.  This allows another
1295  * task to pick up the capability and carry on running Haskell
1296  * threads.  It also means that if the C call blocks, it won't lock
1297  * the whole system.
1298  *
1299  * The Haskell thread making the C call is put to sleep for the
1300  * duration of the call, on the susepended_ccalling_threads queue.  We
1301  * give out a token to the task, which it can use to resume the thread
1302  * on return from the C function.
1303  * ------------------------------------------------------------------------- */
1304    
1305 StgInt
1306 suspendThread( Capability *cap )
1307 {
1308   nat tok;
1309
1310   ACQUIRE_LOCK(&sched_mutex);
1311
1312   IF_DEBUG(scheduler,
1313            sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id));
1314
1315   threadPaused(cap->rCurrentTSO);
1316   cap->rCurrentTSO->link = suspended_ccalling_threads;
1317   suspended_ccalling_threads = cap->rCurrentTSO;
1318
1319   /* Use the thread ID as the token; it should be unique */
1320   tok = cap->rCurrentTSO->id;
1321
1322 #ifdef SMP
1323   cap->link = free_capabilities;
1324   free_capabilities = cap;
1325   n_free_capabilities++;
1326 #endif
1327
1328   RELEASE_LOCK(&sched_mutex);
1329   return tok; 
1330 }
1331
1332 Capability *
1333 resumeThread( StgInt tok )
1334 {
1335   StgTSO *tso, **prev;
1336   Capability *cap;
1337
1338   ACQUIRE_LOCK(&sched_mutex);
1339
1340   prev = &suspended_ccalling_threads;
1341   for (tso = suspended_ccalling_threads; 
1342        tso != END_TSO_QUEUE; 
1343        prev = &tso->link, tso = tso->link) {
1344     if (tso->id == (StgThreadID)tok) {
1345       *prev = tso->link;
1346       break;
1347     }
1348   }
1349   if (tso == END_TSO_QUEUE) {
1350     barf("resumeThread: thread not found");
1351   }
1352   tso->link = END_TSO_QUEUE;
1353
1354 #ifdef SMP
1355   while (free_capabilities == NULL) {
1356     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1357     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1358     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1359   }
1360   cap = free_capabilities;
1361   free_capabilities = cap->link;
1362   n_free_capabilities--;
1363 #else  
1364   cap = &MainRegTable;
1365 #endif
1366
1367   cap->rCurrentTSO = tso;
1368
1369   RELEASE_LOCK(&sched_mutex);
1370   return cap;
1371 }
1372
1373
1374 /* ---------------------------------------------------------------------------
1375  * Static functions
1376  * ------------------------------------------------------------------------ */
1377 static void unblockThread(StgTSO *tso);
1378
1379 /* ---------------------------------------------------------------------------
1380  * Comparing Thread ids.
1381  *
1382  * This is used from STG land in the implementation of the
1383  * instances of Eq/Ord for ThreadIds.
1384  * ------------------------------------------------------------------------ */
1385
1386 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1387
1388   StgThreadID id1 = tso1->id; 
1389   StgThreadID id2 = tso2->id;
1390  
1391   if (id1 < id2) return (-1);
1392   if (id1 > id2) return 1;
1393   return 0;
1394 }
1395
1396 /* ---------------------------------------------------------------------------
1397    Create a new thread.
1398
1399    The new thread starts with the given stack size.  Before the
1400    scheduler can run, however, this thread needs to have a closure
1401    (and possibly some arguments) pushed on its stack.  See
1402    pushClosure() in Schedule.h.
1403
1404    createGenThread() and createIOThread() (in SchedAPI.h) are
1405    convenient packaged versions of this function.
1406
1407    currently pri (priority) is only used in a GRAN setup -- HWL
1408    ------------------------------------------------------------------------ */
1409 //@cindex createThread
1410 #if defined(GRAN)
1411 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1412 StgTSO *
1413 createThread(nat stack_size, StgInt pri)
1414 {
1415   return createThread_(stack_size, rtsFalse, pri);
1416 }
1417
1418 static StgTSO *
1419 createThread_(nat size, rtsBool have_lock, StgInt pri)
1420 {
1421 #else
1422 StgTSO *
1423 createThread(nat stack_size)
1424 {
1425   return createThread_(stack_size, rtsFalse);
1426 }
1427
1428 static StgTSO *
1429 createThread_(nat size, rtsBool have_lock)
1430 {
1431 #endif
1432
1433     StgTSO *tso;
1434     nat stack_size;
1435
1436     /* First check whether we should create a thread at all */
1437 #if defined(PAR)
1438   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1439   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1440     threadsIgnored++;
1441     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1442           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1443     return END_TSO_QUEUE;
1444   }
1445   threadsCreated++;
1446 #endif
1447
1448 #if defined(GRAN)
1449   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1450 #endif
1451
1452   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1453
1454   /* catch ridiculously small stack sizes */
1455   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1456     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1457   }
1458
1459   stack_size = size - TSO_STRUCT_SIZEW;
1460
1461   tso = (StgTSO *)allocate(size);
1462   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1463
1464   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1465 #if defined(GRAN)
1466   SET_GRAN_HDR(tso, ThisPE);
1467 #endif
1468   tso->what_next     = ThreadEnterGHC;
1469
1470   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1471    * protect the increment operation on next_thread_id.
1472    * In future, we could use an atomic increment instead.
1473    */
1474   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1475   tso->id = next_thread_id++; 
1476   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1477
1478   tso->why_blocked  = NotBlocked;
1479   tso->blocked_exceptions = NULL;
1480
1481   tso->stack_size   = stack_size;
1482   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1483                               - TSO_STRUCT_SIZEW;
1484   tso->sp           = (P_)&(tso->stack) + stack_size;
1485
1486 #ifdef PROFILING
1487   tso->prof.CCCS = CCS_MAIN;
1488 #endif
1489
1490   /* put a stop frame on the stack */
1491   tso->sp -= sizeofW(StgStopFrame);
1492   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1493   tso->su = (StgUpdateFrame*)tso->sp;
1494
1495   // ToDo: check this
1496 #if defined(GRAN)
1497   tso->link = END_TSO_QUEUE;
1498   /* uses more flexible routine in GranSim */
1499   insertThread(tso, CurrentProc);
1500 #else
1501   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1502    * from its creation
1503    */
1504 #endif
1505
1506 #if defined(GRAN) 
1507   if (RtsFlags.GranFlags.GranSimStats.Full) 
1508     DumpGranEvent(GR_START,tso);
1509 #elif defined(PAR)
1510   if (RtsFlags.ParFlags.ParStats.Full) 
1511     DumpGranEvent(GR_STARTQ,tso);
1512   /* HACk to avoid SCHEDULE 
1513      LastTSO = tso; */
1514 #endif
1515
1516   /* Link the new thread on the global thread list.
1517    */
1518   tso->global_link = all_threads;
1519   all_threads = tso;
1520
1521 #if defined(DIST)
1522   tso->dist.priority = MandatoryPriority; //by default that is...
1523 #endif
1524
1525 #if defined(GRAN)
1526   tso->gran.pri = pri;
1527 # if defined(DEBUG)
1528   tso->gran.magic = TSO_MAGIC; // debugging only
1529 # endif
1530   tso->gran.sparkname   = 0;
1531   tso->gran.startedat   = CURRENT_TIME; 
1532   tso->gran.exported    = 0;
1533   tso->gran.basicblocks = 0;
1534   tso->gran.allocs      = 0;
1535   tso->gran.exectime    = 0;
1536   tso->gran.fetchtime   = 0;
1537   tso->gran.fetchcount  = 0;
1538   tso->gran.blocktime   = 0;
1539   tso->gran.blockcount  = 0;
1540   tso->gran.blockedat   = 0;
1541   tso->gran.globalsparks = 0;
1542   tso->gran.localsparks  = 0;
1543   if (RtsFlags.GranFlags.Light)
1544     tso->gran.clock  = Now; /* local clock */
1545   else
1546     tso->gran.clock  = 0;
1547
1548   IF_DEBUG(gran,printTSO(tso));
1549 #elif defined(PAR)
1550 # if defined(DEBUG)
1551   tso->par.magic = TSO_MAGIC; // debugging only
1552 # endif
1553   tso->par.sparkname   = 0;
1554   tso->par.startedat   = CURRENT_TIME; 
1555   tso->par.exported    = 0;
1556   tso->par.basicblocks = 0;
1557   tso->par.allocs      = 0;
1558   tso->par.exectime    = 0;
1559   tso->par.fetchtime   = 0;
1560   tso->par.fetchcount  = 0;
1561   tso->par.blocktime   = 0;
1562   tso->par.blockcount  = 0;
1563   tso->par.blockedat   = 0;
1564   tso->par.globalsparks = 0;
1565   tso->par.localsparks  = 0;
1566 #endif
1567
1568 #if defined(GRAN)
1569   globalGranStats.tot_threads_created++;
1570   globalGranStats.threads_created_on_PE[CurrentProc]++;
1571   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1572   globalGranStats.tot_sq_probes++;
1573 #elif defined(PAR)
1574   // collect parallel global statistics (currently done together with GC stats)
1575   if (RtsFlags.ParFlags.ParStats.Global &&
1576       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1577     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1578     globalParStats.tot_threads_created++;
1579   }
1580 #endif 
1581
1582 #if defined(GRAN)
1583   IF_GRAN_DEBUG(pri,
1584                 belch("==__ schedule: Created TSO %d (%p);",
1585                       CurrentProc, tso, tso->id));
1586 #elif defined(PAR)
1587     IF_PAR_DEBUG(verbose,
1588                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1589                        tso->id, tso, advisory_thread_count));
1590 #else
1591   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1592                                  tso->id, tso->stack_size));
1593 #endif    
1594   return tso;
1595 }
1596
1597 #if defined(PAR)
1598 /* RFP:
1599    all parallel thread creation calls should fall through the following routine.
1600 */
1601 StgTSO *
1602 createSparkThread(rtsSpark spark) 
1603 { StgTSO *tso;
1604   ASSERT(spark != (rtsSpark)NULL);
1605   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1606   { threadsIgnored++;
1607     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1608           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1609     return END_TSO_QUEUE;
1610   }
1611   else
1612   { threadsCreated++;
1613     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1614     if (tso==END_TSO_QUEUE)     
1615       barf("createSparkThread: Cannot create TSO");
1616 #if defined(DIST)
1617     tso->priority = AdvisoryPriority;
1618 #endif
1619     pushClosure(tso,spark);
1620     PUSH_ON_RUN_QUEUE(tso);
1621     advisory_thread_count++;    
1622   }
1623   return tso;
1624 }
1625 #endif
1626
1627 /*
1628   Turn a spark into a thread.
1629   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1630 */
1631 #if defined(PAR)
1632 //@cindex activateSpark
1633 StgTSO *
1634 activateSpark (rtsSpark spark) 
1635 {
1636   StgTSO *tso;
1637
1638   tso = createSparkThread(spark);
1639   if (RtsFlags.ParFlags.ParStats.Full) {   
1640     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1641     IF_PAR_DEBUG(verbose,
1642                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1643                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1644   }
1645   // ToDo: fwd info on local/global spark to thread -- HWL
1646   // tso->gran.exported =  spark->exported;
1647   // tso->gran.locked =   !spark->global;
1648   // tso->gran.sparkname = spark->name;
1649
1650   return tso;
1651 }
1652 #endif
1653
1654 /* ---------------------------------------------------------------------------
1655  * scheduleThread()
1656  *
1657  * scheduleThread puts a thread on the head of the runnable queue.
1658  * This will usually be done immediately after a thread is created.
1659  * The caller of scheduleThread must create the thread using e.g.
1660  * createThread and push an appropriate closure
1661  * on this thread's stack before the scheduler is invoked.
1662  * ------------------------------------------------------------------------ */
1663
1664 void
1665 scheduleThread(StgTSO *tso)
1666 {
1667   if (tso==END_TSO_QUEUE){    
1668     schedule();
1669     return;
1670   }
1671
1672   ACQUIRE_LOCK(&sched_mutex);
1673
1674   /* Put the new thread on the head of the runnable queue.  The caller
1675    * better push an appropriate closure on this thread's stack
1676    * beforehand.  In the SMP case, the thread may start running as
1677    * soon as we release the scheduler lock below.
1678    */
1679   PUSH_ON_RUN_QUEUE(tso);
1680   THREAD_RUNNABLE();
1681
1682 #if 0
1683   IF_DEBUG(scheduler,printTSO(tso));
1684 #endif
1685   RELEASE_LOCK(&sched_mutex);
1686 }
1687
1688 /* ---------------------------------------------------------------------------
1689  * startTasks()
1690  *
1691  * Start up Posix threads to run each of the scheduler tasks.
1692  * I believe the task ids are not needed in the system as defined.
1693  *  KH @ 25/10/99
1694  * ------------------------------------------------------------------------ */
1695
1696 #if defined(PAR) || defined(SMP)
1697 void
1698 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1699 {
1700   scheduleThread(END_TSO_QUEUE);
1701 }
1702 #endif
1703
1704 /* ---------------------------------------------------------------------------
1705  * initScheduler()
1706  *
1707  * Initialise the scheduler.  This resets all the queues - if the
1708  * queues contained any threads, they'll be garbage collected at the
1709  * next pass.
1710  *
1711  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1712  * ------------------------------------------------------------------------ */
1713
1714 #ifdef SMP
1715 static void
1716 term_handler(int sig STG_UNUSED)
1717 {
1718   stat_workerStop();
1719   ACQUIRE_LOCK(&term_mutex);
1720   await_death--;
1721   RELEASE_LOCK(&term_mutex);
1722   pthread_exit(NULL);
1723 }
1724 #endif
1725
1726 //@cindex initScheduler
1727 void 
1728 initScheduler(void)
1729 {
1730 #if defined(GRAN)
1731   nat i;
1732
1733   for (i=0; i<=MAX_PROC; i++) {
1734     run_queue_hds[i]      = END_TSO_QUEUE;
1735     run_queue_tls[i]      = END_TSO_QUEUE;
1736     blocked_queue_hds[i]  = END_TSO_QUEUE;
1737     blocked_queue_tls[i]  = END_TSO_QUEUE;
1738     ccalling_threadss[i]  = END_TSO_QUEUE;
1739     sleeping_queue        = END_TSO_QUEUE;
1740   }
1741 #else
1742   run_queue_hd      = END_TSO_QUEUE;
1743   run_queue_tl      = END_TSO_QUEUE;
1744   blocked_queue_hd  = END_TSO_QUEUE;
1745   blocked_queue_tl  = END_TSO_QUEUE;
1746   sleeping_queue    = END_TSO_QUEUE;
1747 #endif 
1748
1749   suspended_ccalling_threads  = END_TSO_QUEUE;
1750
1751   main_threads = NULL;
1752   all_threads  = END_TSO_QUEUE;
1753
1754   context_switch = 0;
1755   interrupted    = 0;
1756
1757   RtsFlags.ConcFlags.ctxtSwitchTicks =
1758       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1759
1760   /* Install the SIGHUP handler */
1761 #ifdef SMP
1762   {
1763     struct sigaction action,oact;
1764
1765     action.sa_handler = term_handler;
1766     sigemptyset(&action.sa_mask);
1767     action.sa_flags = 0;
1768     if (sigaction(SIGTERM, &action, &oact) != 0) {
1769       barf("can't install TERM handler");
1770     }
1771   }
1772 #endif
1773
1774 #ifdef SMP
1775   /* Allocate N Capabilities */
1776   {
1777     nat i;
1778     Capability *cap, *prev;
1779     cap  = NULL;
1780     prev = NULL;
1781     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1782       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1783       cap->link = prev;
1784       prev = cap;
1785     }
1786     free_capabilities = cap;
1787     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1788   }
1789   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1790                              n_free_capabilities););
1791 #endif
1792
1793 #if defined(SMP) || defined(PAR)
1794   initSparkPools();
1795 #endif
1796 }
1797
1798 #ifdef SMP
1799 void
1800 startTasks( void )
1801 {
1802   nat i;
1803   int r;
1804   pthread_t tid;
1805   
1806   /* make some space for saving all the thread ids */
1807   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1808                             "initScheduler:task_ids");
1809   
1810   /* and create all the threads */
1811   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1812     r = pthread_create(&tid,NULL,taskStart,NULL);
1813     if (r != 0) {
1814       barf("startTasks: Can't create new Posix thread");
1815     }
1816     task_ids[i].id = tid;
1817     task_ids[i].mut_time = 0.0;
1818     task_ids[i].mut_etime = 0.0;
1819     task_ids[i].gc_time = 0.0;
1820     task_ids[i].gc_etime = 0.0;
1821     task_ids[i].elapsedtimestart = elapsedtime();
1822     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1823   }
1824 }
1825 #endif
1826
1827 void
1828 exitScheduler( void )
1829 {
1830 #ifdef SMP
1831   nat i;
1832
1833   /* Don't want to use pthread_cancel, since we'd have to install
1834    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1835    * all our locks.
1836    */
1837 #if 0
1838   /* Cancel all our tasks */
1839   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1840     pthread_cancel(task_ids[i].id);
1841   }
1842   
1843   /* Wait for all the tasks to terminate */
1844   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1845     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1846                                task_ids[i].id));
1847     pthread_join(task_ids[i].id, NULL);
1848   }
1849 #endif
1850
1851   /* Send 'em all a SIGHUP.  That should shut 'em up.
1852    */
1853   await_death = RtsFlags.ParFlags.nNodes;
1854   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1855     pthread_kill(task_ids[i].id,SIGTERM);
1856   }
1857   while (await_death > 0) {
1858     sched_yield();
1859   }
1860 #endif
1861 }
1862
1863 /* -----------------------------------------------------------------------------
1864    Managing the per-task allocation areas.
1865    
1866    Each capability comes with an allocation area.  These are
1867    fixed-length block lists into which allocation can be done.
1868
1869    ToDo: no support for two-space collection at the moment???
1870    -------------------------------------------------------------------------- */
1871
1872 /* -----------------------------------------------------------------------------
1873  * waitThread is the external interface for running a new computation
1874  * and waiting for the result.
1875  *
1876  * In the non-SMP case, we create a new main thread, push it on the 
1877  * main-thread stack, and invoke the scheduler to run it.  The
1878  * scheduler will return when the top main thread on the stack has
1879  * completed or died, and fill in the necessary fields of the
1880  * main_thread structure.
1881  *
1882  * In the SMP case, we create a main thread as before, but we then
1883  * create a new condition variable and sleep on it.  When our new
1884  * main thread has completed, we'll be woken up and the status/result
1885  * will be in the main_thread struct.
1886  * -------------------------------------------------------------------------- */
1887
1888 int 
1889 howManyThreadsAvail ( void )
1890 {
1891    int i = 0;
1892    StgTSO* q;
1893    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1894       i++;
1895    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1896       i++;
1897    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1898       i++;
1899    return i;
1900 }
1901
1902 void
1903 finishAllThreads ( void )
1904 {
1905    do {
1906       while (run_queue_hd != END_TSO_QUEUE) {
1907          waitThread ( run_queue_hd, NULL );
1908       }
1909       while (blocked_queue_hd != END_TSO_QUEUE) {
1910          waitThread ( blocked_queue_hd, NULL );
1911       }
1912       while (sleeping_queue != END_TSO_QUEUE) {
1913          waitThread ( blocked_queue_hd, NULL );
1914       }
1915    } while 
1916       (blocked_queue_hd != END_TSO_QUEUE || 
1917        run_queue_hd     != END_TSO_QUEUE ||
1918        sleeping_queue   != END_TSO_QUEUE);
1919 }
1920
1921 SchedulerStatus
1922 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1923 {
1924   StgMainThread *m;
1925   SchedulerStatus stat;
1926
1927   ACQUIRE_LOCK(&sched_mutex);
1928   
1929   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1930
1931   m->tso = tso;
1932   m->ret = ret;
1933   m->stat = NoStatus;
1934 #ifdef SMP
1935   pthread_cond_init(&m->wakeup, NULL);
1936 #endif
1937
1938   m->link = main_threads;
1939   main_threads = m;
1940
1941   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
1942                               m->tso->id));
1943
1944 #ifdef SMP
1945   do {
1946     pthread_cond_wait(&m->wakeup, &sched_mutex);
1947   } while (m->stat == NoStatus);
1948 #elif defined(GRAN)
1949   /* GranSim specific init */
1950   CurrentTSO = m->tso;                // the TSO to run
1951   procStatus[MainProc] = Busy;        // status of main PE
1952   CurrentProc = MainProc;             // PE to run it on
1953
1954   schedule();
1955 #else
1956   schedule();
1957   ASSERT(m->stat != NoStatus);
1958 #endif
1959
1960   stat = m->stat;
1961
1962 #ifdef SMP
1963   pthread_cond_destroy(&m->wakeup);
1964 #endif
1965
1966   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
1967                               m->tso->id));
1968   free(m);
1969
1970   RELEASE_LOCK(&sched_mutex);
1971
1972   return stat;
1973 }
1974
1975 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1976 //@subsection Run queue code 
1977
1978 #if 0
1979 /* 
1980    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1981        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1982        implicit global variable that has to be correct when calling these
1983        fcts -- HWL 
1984 */
1985
1986 /* Put the new thread on the head of the runnable queue.
1987  * The caller of createThread better push an appropriate closure
1988  * on this thread's stack before the scheduler is invoked.
1989  */
1990 static /* inline */ void
1991 add_to_run_queue(tso)
1992 StgTSO* tso; 
1993 {
1994   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1995   tso->link = run_queue_hd;
1996   run_queue_hd = tso;
1997   if (run_queue_tl == END_TSO_QUEUE) {
1998     run_queue_tl = tso;
1999   }
2000 }
2001
2002 /* Put the new thread at the end of the runnable queue. */
2003 static /* inline */ void
2004 push_on_run_queue(tso)
2005 StgTSO* tso; 
2006 {
2007   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2008   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2009   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2010   if (run_queue_hd == END_TSO_QUEUE) {
2011     run_queue_hd = tso;
2012   } else {
2013     run_queue_tl->link = tso;
2014   }
2015   run_queue_tl = tso;
2016 }
2017
2018 /* 
2019    Should be inlined because it's used very often in schedule.  The tso
2020    argument is actually only needed in GranSim, where we want to have the
2021    possibility to schedule *any* TSO on the run queue, irrespective of the
2022    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2023    the run queue and dequeue the tso, adjusting the links in the queue. 
2024 */
2025 //@cindex take_off_run_queue
2026 static /* inline */ StgTSO*
2027 take_off_run_queue(StgTSO *tso) {
2028   StgTSO *t, *prev;
2029
2030   /* 
2031      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2032
2033      if tso is specified, unlink that tso from the run_queue (doesn't have
2034      to be at the beginning of the queue); GranSim only 
2035   */
2036   if (tso!=END_TSO_QUEUE) {
2037     /* find tso in queue */
2038     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2039          t!=END_TSO_QUEUE && t!=tso;
2040          prev=t, t=t->link) 
2041       /* nothing */ ;
2042     ASSERT(t==tso);
2043     /* now actually dequeue the tso */
2044     if (prev!=END_TSO_QUEUE) {
2045       ASSERT(run_queue_hd!=t);
2046       prev->link = t->link;
2047     } else {
2048       /* t is at beginning of thread queue */
2049       ASSERT(run_queue_hd==t);
2050       run_queue_hd = t->link;
2051     }
2052     /* t is at end of thread queue */
2053     if (t->link==END_TSO_QUEUE) {
2054       ASSERT(t==run_queue_tl);
2055       run_queue_tl = prev;
2056     } else {
2057       ASSERT(run_queue_tl!=t);
2058     }
2059     t->link = END_TSO_QUEUE;
2060   } else {
2061     /* take tso from the beginning of the queue; std concurrent code */
2062     t = run_queue_hd;
2063     if (t != END_TSO_QUEUE) {
2064       run_queue_hd = t->link;
2065       t->link = END_TSO_QUEUE;
2066       if (run_queue_hd == END_TSO_QUEUE) {
2067         run_queue_tl = END_TSO_QUEUE;
2068       }
2069     }
2070   }
2071   return t;
2072 }
2073
2074 #endif /* 0 */
2075
2076 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2077 //@subsection Garbage Collextion Routines
2078
2079 /* ---------------------------------------------------------------------------
2080    Where are the roots that we know about?
2081
2082         - all the threads on the runnable queue
2083         - all the threads on the blocked queue
2084         - all the threads on the sleeping queue
2085         - all the thread currently executing a _ccall_GC
2086         - all the "main threads"
2087      
2088    ------------------------------------------------------------------------ */
2089
2090 /* This has to be protected either by the scheduler monitor, or by the
2091         garbage collection monitor (probably the latter).
2092         KH @ 25/10/99
2093 */
2094
2095 static void GetRoots(void)
2096 {
2097   StgMainThread *m;
2098
2099 #if defined(GRAN)
2100   {
2101     nat i;
2102     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2103       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2104         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
2105       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2106         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
2107       
2108       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2109         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
2110       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2111         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
2112       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2113         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
2114     }
2115   }
2116
2117   markEventQueue();
2118
2119 #else /* !GRAN */
2120   if (run_queue_hd != END_TSO_QUEUE) {
2121     ASSERT(run_queue_tl != END_TSO_QUEUE);
2122     run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
2123     run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
2124   }
2125
2126   if (blocked_queue_hd != END_TSO_QUEUE) {
2127     ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2128     blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
2129     blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
2130   }
2131
2132   if (sleeping_queue != END_TSO_QUEUE) {
2133     sleeping_queue  = (StgTSO *)MarkRoot((StgClosure *)sleeping_queue);
2134   }
2135 #endif 
2136
2137   for (m = main_threads; m != NULL; m = m->link) {
2138     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
2139   }
2140   if (suspended_ccalling_threads != END_TSO_QUEUE)
2141     suspended_ccalling_threads = 
2142       (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
2143
2144 #if defined(SMP) || defined(PAR) || defined(GRAN)
2145   markSparkQueue();
2146 #endif
2147 }
2148
2149 /* -----------------------------------------------------------------------------
2150    performGC
2151
2152    This is the interface to the garbage collector from Haskell land.
2153    We provide this so that external C code can allocate and garbage
2154    collect when called from Haskell via _ccall_GC.
2155
2156    It might be useful to provide an interface whereby the programmer
2157    can specify more roots (ToDo).
2158    
2159    This needs to be protected by the GC condition variable above.  KH.
2160    -------------------------------------------------------------------------- */
2161
2162 void (*extra_roots)(void);
2163
2164 void
2165 performGC(void)
2166 {
2167   GarbageCollect(GetRoots,rtsFalse);
2168 }
2169
2170 void
2171 performMajorGC(void)
2172 {
2173   GarbageCollect(GetRoots,rtsTrue);
2174 }
2175
2176 static void
2177 AllRoots(void)
2178 {
2179   GetRoots();                   /* the scheduler's roots */
2180   extra_roots();                /* the user's roots */
2181 }
2182
2183 void
2184 performGCWithRoots(void (*get_roots)(void))
2185 {
2186   extra_roots = get_roots;
2187
2188   GarbageCollect(AllRoots,rtsFalse);
2189 }
2190
2191 /* -----------------------------------------------------------------------------
2192    Stack overflow
2193
2194    If the thread has reached its maximum stack size, then raise the
2195    StackOverflow exception in the offending thread.  Otherwise
2196    relocate the TSO into a larger chunk of memory and adjust its stack
2197    size appropriately.
2198    -------------------------------------------------------------------------- */
2199
2200 static StgTSO *
2201 threadStackOverflow(StgTSO *tso)
2202 {
2203   nat new_stack_size, new_tso_size, diff, stack_words;
2204   StgPtr new_sp;
2205   StgTSO *dest;
2206
2207   IF_DEBUG(sanity,checkTSO(tso));
2208   if (tso->stack_size >= tso->max_stack_size) {
2209
2210     IF_DEBUG(gc,
2211              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2212                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2213              /* If we're debugging, just print out the top of the stack */
2214              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2215                                               tso->sp+64)));
2216
2217     /* Send this thread the StackOverflow exception */
2218     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2219     return tso;
2220   }
2221
2222   /* Try to double the current stack size.  If that takes us over the
2223    * maximum stack size for this thread, then use the maximum instead.
2224    * Finally round up so the TSO ends up as a whole number of blocks.
2225    */
2226   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2227   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2228                                        TSO_STRUCT_SIZE)/sizeof(W_);
2229   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2230   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2231
2232   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2233
2234   dest = (StgTSO *)allocate(new_tso_size);
2235   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2236
2237   /* copy the TSO block and the old stack into the new area */
2238   memcpy(dest,tso,TSO_STRUCT_SIZE);
2239   stack_words = tso->stack + tso->stack_size - tso->sp;
2240   new_sp = (P_)dest + new_tso_size - stack_words;
2241   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2242
2243   /* relocate the stack pointers... */
2244   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2245   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2246   dest->sp    = new_sp;
2247   dest->stack_size = new_stack_size;
2248         
2249   /* and relocate the update frame list */
2250   relocate_TSO(tso, dest);
2251
2252   /* Mark the old TSO as relocated.  We have to check for relocated
2253    * TSOs in the garbage collector and any primops that deal with TSOs.
2254    *
2255    * It's important to set the sp and su values to just beyond the end
2256    * of the stack, so we don't attempt to scavenge any part of the
2257    * dead TSO's stack.
2258    */
2259   tso->what_next = ThreadRelocated;
2260   tso->link = dest;
2261   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2262   tso->su = (StgUpdateFrame *)tso->sp;
2263   tso->why_blocked = NotBlocked;
2264   dest->mut_link = NULL;
2265
2266   IF_PAR_DEBUG(verbose,
2267                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2268                      tso->id, tso, tso->stack_size);
2269                /* If we're debugging, just print out the top of the stack */
2270                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2271                                                 tso->sp+64)));
2272   
2273   IF_DEBUG(sanity,checkTSO(tso));
2274 #if 0
2275   IF_DEBUG(scheduler,printTSO(dest));
2276 #endif
2277
2278   return dest;
2279 }
2280
2281 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2282 //@subsection Blocking Queue Routines
2283
2284 /* ---------------------------------------------------------------------------
2285    Wake up a queue that was blocked on some resource.
2286    ------------------------------------------------------------------------ */
2287
2288 #if defined(GRAN)
2289 static inline void
2290 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2291 {
2292 }
2293 #elif defined(PAR)
2294 static inline void
2295 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2296 {
2297   /* write RESUME events to log file and
2298      update blocked and fetch time (depending on type of the orig closure) */
2299   if (RtsFlags.ParFlags.ParStats.Full) {
2300     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2301                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2302                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2303     if (EMPTY_RUN_QUEUE())
2304       emitSchedule = rtsTrue;
2305
2306     switch (get_itbl(node)->type) {
2307         case FETCH_ME_BQ:
2308           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2309           break;
2310         case RBH:
2311         case FETCH_ME:
2312         case BLACKHOLE_BQ:
2313           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2314           break;
2315 #ifdef DIST
2316         case MVAR:
2317           break;
2318 #endif    
2319         default:
2320           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2321         }
2322       }
2323 }
2324 #endif
2325
2326 #if defined(GRAN)
2327 static StgBlockingQueueElement *
2328 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2329 {
2330     StgTSO *tso;
2331     PEs node_loc, tso_loc;
2332
2333     node_loc = where_is(node); // should be lifted out of loop
2334     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2335     tso_loc = where_is((StgClosure *)tso);
2336     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2337       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2338       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2339       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2340       // insertThread(tso, node_loc);
2341       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2342                 ResumeThread,
2343                 tso, node, (rtsSpark*)NULL);
2344       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2345       // len_local++;
2346       // len++;
2347     } else { // TSO is remote (actually should be FMBQ)
2348       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2349                                   RtsFlags.GranFlags.Costs.gunblocktime +
2350                                   RtsFlags.GranFlags.Costs.latency;
2351       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2352                 UnblockThread,
2353                 tso, node, (rtsSpark*)NULL);
2354       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2355       // len++;
2356     }
2357     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2358     IF_GRAN_DEBUG(bq,
2359                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2360                           (node_loc==tso_loc ? "Local" : "Global"), 
2361                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2362     tso->block_info.closure = NULL;
2363     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2364                              tso->id, tso));
2365 }
2366 #elif defined(PAR)
2367 static StgBlockingQueueElement *
2368 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2369 {
2370     StgBlockingQueueElement *next;
2371
2372     switch (get_itbl(bqe)->type) {
2373     case TSO:
2374       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2375       /* if it's a TSO just push it onto the run_queue */
2376       next = bqe->link;
2377       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2378       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2379       THREAD_RUNNABLE();
2380       unblockCount(bqe, node);
2381       /* reset blocking status after dumping event */
2382       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2383       break;
2384
2385     case BLOCKED_FETCH:
2386       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2387       next = bqe->link;
2388       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2389       PendingFetches = (StgBlockedFetch *)bqe;
2390       break;
2391
2392 # if defined(DEBUG)
2393       /* can ignore this case in a non-debugging setup; 
2394          see comments on RBHSave closures above */
2395     case CONSTR:
2396       /* check that the closure is an RBHSave closure */
2397       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2398              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2399              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2400       break;
2401
2402     default:
2403       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2404            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2405            (StgClosure *)bqe);
2406 # endif
2407     }
2408   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2409   return next;
2410 }
2411
2412 #else /* !GRAN && !PAR */
2413 static StgTSO *
2414 unblockOneLocked(StgTSO *tso)
2415 {
2416   StgTSO *next;
2417
2418   ASSERT(get_itbl(tso)->type == TSO);
2419   ASSERT(tso->why_blocked != NotBlocked);
2420   tso->why_blocked = NotBlocked;
2421   next = tso->link;
2422   PUSH_ON_RUN_QUEUE(tso);
2423   THREAD_RUNNABLE();
2424   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2425   return next;
2426 }
2427 #endif
2428
2429 #if defined(GRAN) || defined(PAR)
2430 inline StgBlockingQueueElement *
2431 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2432 {
2433   ACQUIRE_LOCK(&sched_mutex);
2434   bqe = unblockOneLocked(bqe, node);
2435   RELEASE_LOCK(&sched_mutex);
2436   return bqe;
2437 }
2438 #else
2439 inline StgTSO *
2440 unblockOne(StgTSO *tso)
2441 {
2442   ACQUIRE_LOCK(&sched_mutex);
2443   tso = unblockOneLocked(tso);
2444   RELEASE_LOCK(&sched_mutex);
2445   return tso;
2446 }
2447 #endif
2448
2449 #if defined(GRAN)
2450 void 
2451 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2452 {
2453   StgBlockingQueueElement *bqe;
2454   PEs node_loc;
2455   nat len = 0; 
2456
2457   IF_GRAN_DEBUG(bq, 
2458                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2459                       node, CurrentProc, CurrentTime[CurrentProc], 
2460                       CurrentTSO->id, CurrentTSO));
2461
2462   node_loc = where_is(node);
2463
2464   ASSERT(q == END_BQ_QUEUE ||
2465          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2466          get_itbl(q)->type == CONSTR); // closure (type constructor)
2467   ASSERT(is_unique(node));
2468
2469   /* FAKE FETCH: magically copy the node to the tso's proc;
2470      no Fetch necessary because in reality the node should not have been 
2471      moved to the other PE in the first place
2472   */
2473   if (CurrentProc!=node_loc) {
2474     IF_GRAN_DEBUG(bq, 
2475                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2476                         node, node_loc, CurrentProc, CurrentTSO->id, 
2477                         // CurrentTSO, where_is(CurrentTSO),
2478                         node->header.gran.procs));
2479     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2480     IF_GRAN_DEBUG(bq, 
2481                   belch("## new bitmask of node %p is %#x",
2482                         node, node->header.gran.procs));
2483     if (RtsFlags.GranFlags.GranSimStats.Global) {
2484       globalGranStats.tot_fake_fetches++;
2485     }
2486   }
2487
2488   bqe = q;
2489   // ToDo: check: ASSERT(CurrentProc==node_loc);
2490   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2491     //next = bqe->link;
2492     /* 
2493        bqe points to the current element in the queue
2494        next points to the next element in the queue
2495     */
2496     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2497     //tso_loc = where_is(tso);
2498     len++;
2499     bqe = unblockOneLocked(bqe, node);
2500   }
2501
2502   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2503      the closure to make room for the anchor of the BQ */
2504   if (bqe!=END_BQ_QUEUE) {
2505     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2506     /*
2507     ASSERT((info_ptr==&RBH_Save_0_info) ||
2508            (info_ptr==&RBH_Save_1_info) ||
2509            (info_ptr==&RBH_Save_2_info));
2510     */
2511     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2512     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2513     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2514
2515     IF_GRAN_DEBUG(bq,
2516                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2517                         node, info_type(node)));
2518   }
2519
2520   /* statistics gathering */
2521   if (RtsFlags.GranFlags.GranSimStats.Global) {
2522     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2523     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2524     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2525     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2526   }
2527   IF_GRAN_DEBUG(bq,
2528                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2529                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2530 }
2531 #elif defined(PAR)
2532 void 
2533 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2534 {
2535   StgBlockingQueueElement *bqe;
2536
2537   ACQUIRE_LOCK(&sched_mutex);
2538
2539   IF_PAR_DEBUG(verbose, 
2540                belch("##-_ AwBQ for node %p on [%x]: ",
2541                      node, mytid));
2542 #ifdef DIST  
2543   //RFP
2544   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2545     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2546     return;
2547   }
2548 #endif
2549   
2550   ASSERT(q == END_BQ_QUEUE ||
2551          get_itbl(q)->type == TSO ||           
2552          get_itbl(q)->type == BLOCKED_FETCH || 
2553          get_itbl(q)->type == CONSTR); 
2554
2555   bqe = q;
2556   while (get_itbl(bqe)->type==TSO || 
2557          get_itbl(bqe)->type==BLOCKED_FETCH) {
2558     bqe = unblockOneLocked(bqe, node);
2559   }
2560   RELEASE_LOCK(&sched_mutex);
2561 }
2562
2563 #else   /* !GRAN && !PAR */
2564 void
2565 awakenBlockedQueue(StgTSO *tso)
2566 {
2567   ACQUIRE_LOCK(&sched_mutex);
2568   while (tso != END_TSO_QUEUE) {
2569     tso = unblockOneLocked(tso);
2570   }
2571   RELEASE_LOCK(&sched_mutex);
2572 }
2573 #endif
2574
2575 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2576 //@subsection Exception Handling Routines
2577
2578 /* ---------------------------------------------------------------------------
2579    Interrupt execution
2580    - usually called inside a signal handler so it mustn't do anything fancy.   
2581    ------------------------------------------------------------------------ */
2582
2583 void
2584 interruptStgRts(void)
2585 {
2586     interrupted    = 1;
2587     context_switch = 1;
2588 }
2589
2590 /* -----------------------------------------------------------------------------
2591    Unblock a thread
2592
2593    This is for use when we raise an exception in another thread, which
2594    may be blocked.
2595    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2596    -------------------------------------------------------------------------- */
2597
2598 #if defined(GRAN) || defined(PAR)
2599 /*
2600   NB: only the type of the blocking queue is different in GranSim and GUM
2601       the operations on the queue-elements are the same
2602       long live polymorphism!
2603 */
2604 static void
2605 unblockThread(StgTSO *tso)
2606 {
2607   StgBlockingQueueElement *t, **last;
2608
2609   ACQUIRE_LOCK(&sched_mutex);
2610   switch (tso->why_blocked) {
2611
2612   case NotBlocked:
2613     return;  /* not blocked */
2614
2615   case BlockedOnMVar:
2616     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2617     {
2618       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2619       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2620
2621       last = (StgBlockingQueueElement **)&mvar->head;
2622       for (t = (StgBlockingQueueElement *)mvar->head; 
2623            t != END_BQ_QUEUE; 
2624            last = &t->link, last_tso = t, t = t->link) {
2625         if (t == (StgBlockingQueueElement *)tso) {
2626           *last = (StgBlockingQueueElement *)tso->link;
2627           if (mvar->tail == tso) {
2628             mvar->tail = (StgTSO *)last_tso;
2629           }
2630           goto done;
2631         }
2632       }
2633       barf("unblockThread (MVAR): TSO not found");
2634     }
2635
2636   case BlockedOnBlackHole:
2637     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2638     {
2639       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2640
2641       last = &bq->blocking_queue;
2642       for (t = bq->blocking_queue; 
2643            t != END_BQ_QUEUE; 
2644            last = &t->link, t = t->link) {
2645         if (t == (StgBlockingQueueElement *)tso) {
2646           *last = (StgBlockingQueueElement *)tso->link;
2647           goto done;
2648         }
2649       }
2650       barf("unblockThread (BLACKHOLE): TSO not found");
2651     }
2652
2653   case BlockedOnException:
2654     {
2655       StgTSO *target  = tso->block_info.tso;
2656
2657       ASSERT(get_itbl(target)->type == TSO);
2658
2659       if (target->what_next == ThreadRelocated) {
2660           target = target->link;
2661           ASSERT(get_itbl(target)->type == TSO);
2662       }
2663
2664       ASSERT(target->blocked_exceptions != NULL);
2665
2666       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2667       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2668            t != END_BQ_QUEUE; 
2669            last = &t->link, t = t->link) {
2670         ASSERT(get_itbl(t)->type == TSO);
2671         if (t == (StgBlockingQueueElement *)tso) {
2672           *last = (StgBlockingQueueElement *)tso->link;
2673           goto done;
2674         }
2675       }
2676       barf("unblockThread (Exception): TSO not found");
2677     }
2678
2679   case BlockedOnRead:
2680   case BlockedOnWrite:
2681     {
2682       /* take TSO off blocked_queue */
2683       StgBlockingQueueElement *prev = NULL;
2684       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2685            prev = t, t = t->link) {
2686         if (t == (StgBlockingQueueElement *)tso) {
2687           if (prev == NULL) {
2688             blocked_queue_hd = (StgTSO *)t->link;
2689             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2690               blocked_queue_tl = END_TSO_QUEUE;
2691             }
2692           } else {
2693             prev->link = t->link;
2694             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2695               blocked_queue_tl = (StgTSO *)prev;
2696             }
2697           }
2698           goto done;
2699         }
2700       }
2701       barf("unblockThread (I/O): TSO not found");
2702     }
2703
2704   case BlockedOnDelay:
2705     {
2706       /* take TSO off sleeping_queue */
2707       StgBlockingQueueElement *prev = NULL;
2708       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2709            prev = t, t = t->link) {
2710         if (t == (StgBlockingQueueElement *)tso) {
2711           if (prev == NULL) {
2712             sleeping_queue = (StgTSO *)t->link;
2713           } else {
2714             prev->link = t->link;
2715           }
2716           goto done;
2717         }
2718       }
2719       barf("unblockThread (I/O): TSO not found");
2720     }
2721
2722   default:
2723     barf("unblockThread");
2724   }
2725
2726  done:
2727   tso->link = END_TSO_QUEUE;
2728   tso->why_blocked = NotBlocked;
2729   tso->block_info.closure = NULL;
2730   PUSH_ON_RUN_QUEUE(tso);
2731   RELEASE_LOCK(&sched_mutex);
2732 }
2733 #else
2734 static void
2735 unblockThread(StgTSO *tso)
2736 {
2737   StgTSO *t, **last;
2738
2739   ACQUIRE_LOCK(&sched_mutex);
2740   switch (tso->why_blocked) {
2741
2742   case NotBlocked:
2743     return;  /* not blocked */
2744
2745   case BlockedOnMVar:
2746     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2747     {
2748       StgTSO *last_tso = END_TSO_QUEUE;
2749       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2750
2751       last = &mvar->head;
2752       for (t = mvar->head; t != END_TSO_QUEUE; 
2753            last = &t->link, last_tso = t, t = t->link) {
2754         if (t == tso) {
2755           *last = tso->link;
2756           if (mvar->tail == tso) {
2757             mvar->tail = last_tso;
2758           }
2759           goto done;
2760         }
2761       }
2762       barf("unblockThread (MVAR): TSO not found");
2763     }
2764
2765   case BlockedOnBlackHole:
2766     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2767     {
2768       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2769
2770       last = &bq->blocking_queue;
2771       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2772            last = &t->link, t = t->link) {
2773         if (t == tso) {
2774           *last = tso->link;
2775           goto done;
2776         }
2777       }
2778       barf("unblockThread (BLACKHOLE): TSO not found");
2779     }
2780
2781   case BlockedOnException:
2782     {
2783       StgTSO *target  = tso->block_info.tso;
2784
2785       ASSERT(get_itbl(target)->type == TSO);
2786
2787       while (target->what_next == ThreadRelocated) {
2788           target = target->link;
2789           ASSERT(get_itbl(target)->type == TSO);
2790       }
2791       
2792       ASSERT(target->blocked_exceptions != NULL);
2793
2794       last = &target->blocked_exceptions;
2795       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2796            last = &t->link, t = t->link) {
2797         ASSERT(get_itbl(t)->type == TSO);
2798         if (t == tso) {
2799           *last = tso->link;
2800           goto done;
2801         }
2802       }
2803       barf("unblockThread (Exception): TSO not found");
2804     }
2805
2806   case BlockedOnRead:
2807   case BlockedOnWrite:
2808     {
2809       StgTSO *prev = NULL;
2810       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2811            prev = t, t = t->link) {
2812         if (t == tso) {
2813           if (prev == NULL) {
2814             blocked_queue_hd = t->link;
2815             if (blocked_queue_tl == t) {
2816               blocked_queue_tl = END_TSO_QUEUE;
2817             }
2818           } else {
2819             prev->link = t->link;
2820             if (blocked_queue_tl == t) {
2821               blocked_queue_tl = prev;
2822             }
2823           }
2824           goto done;
2825         }
2826       }
2827       barf("unblockThread (I/O): TSO not found");
2828     }
2829
2830   case BlockedOnDelay:
2831     {
2832       StgTSO *prev = NULL;
2833       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2834            prev = t, t = t->link) {
2835         if (t == tso) {
2836           if (prev == NULL) {
2837             sleeping_queue = t->link;
2838           } else {
2839             prev->link = t->link;
2840           }
2841           goto done;
2842         }
2843       }
2844       barf("unblockThread (I/O): TSO not found");
2845     }
2846
2847   default:
2848     barf("unblockThread");
2849   }
2850
2851  done:
2852   tso->link = END_TSO_QUEUE;
2853   tso->why_blocked = NotBlocked;
2854   tso->block_info.closure = NULL;
2855   PUSH_ON_RUN_QUEUE(tso);
2856   RELEASE_LOCK(&sched_mutex);
2857 }
2858 #endif
2859
2860 /* -----------------------------------------------------------------------------
2861  * raiseAsync()
2862  *
2863  * The following function implements the magic for raising an
2864  * asynchronous exception in an existing thread.
2865  *
2866  * We first remove the thread from any queue on which it might be
2867  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2868  *
2869  * We strip the stack down to the innermost CATCH_FRAME, building
2870  * thunks in the heap for all the active computations, so they can 
2871  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2872  * an application of the handler to the exception, and push it on
2873  * the top of the stack.
2874  * 
2875  * How exactly do we save all the active computations?  We create an
2876  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2877  * AP_UPDs pushes everything from the corresponding update frame
2878  * upwards onto the stack.  (Actually, it pushes everything up to the
2879  * next update frame plus a pointer to the next AP_UPD object.
2880  * Entering the next AP_UPD object pushes more onto the stack until we
2881  * reach the last AP_UPD object - at which point the stack should look
2882  * exactly as it did when we killed the TSO and we can continue
2883  * execution by entering the closure on top of the stack.
2884  *
2885  * We can also kill a thread entirely - this happens if either (a) the 
2886  * exception passed to raiseAsync is NULL, or (b) there's no
2887  * CATCH_FRAME on the stack.  In either case, we strip the entire
2888  * stack and replace the thread with a zombie.
2889  *
2890  * -------------------------------------------------------------------------- */
2891  
2892 void 
2893 deleteThread(StgTSO *tso)
2894 {
2895   raiseAsync(tso,NULL);
2896 }
2897
2898 void
2899 raiseAsync(StgTSO *tso, StgClosure *exception)
2900 {
2901   StgUpdateFrame* su = tso->su;
2902   StgPtr          sp = tso->sp;
2903   
2904   /* Thread already dead? */
2905   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2906     return;
2907   }
2908
2909   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2910
2911   /* Remove it from any blocking queues */
2912   unblockThread(tso);
2913
2914   /* The stack freezing code assumes there's a closure pointer on
2915    * the top of the stack.  This isn't always the case with compiled
2916    * code, so we have to push a dummy closure on the top which just
2917    * returns to the next return address on the stack.
2918    */
2919   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2920     *(--sp) = (W_)&stg_dummy_ret_closure;
2921   }
2922
2923   while (1) {
2924     int words = ((P_)su - (P_)sp) - 1;
2925     nat i;
2926     StgAP_UPD * ap;
2927
2928     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2929      * then build PAP(handler,exception,realworld#), and leave it on
2930      * top of the stack ready to enter.
2931      */
2932     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2933       StgCatchFrame *cf = (StgCatchFrame *)su;
2934       /* we've got an exception to raise, so let's pass it to the
2935        * handler in this frame.
2936        */
2937       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2938       TICK_ALLOC_UPD_PAP(3,0);
2939       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
2940               
2941       ap->n_args = 2;
2942       ap->fun = cf->handler;    /* :: Exception -> IO a */
2943       ap->payload[0] = exception;
2944       ap->payload[1] = ARG_TAG(0); /* realworld token */
2945
2946       /* throw away the stack from Sp up to and including the
2947        * CATCH_FRAME.
2948        */
2949       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2950       tso->su = cf->link;
2951
2952       /* Restore the blocked/unblocked state for asynchronous exceptions
2953        * at the CATCH_FRAME.  
2954        *
2955        * If exceptions were unblocked at the catch, arrange that they
2956        * are unblocked again after executing the handler by pushing an
2957        * unblockAsyncExceptions_ret stack frame.
2958        */
2959       if (!cf->exceptions_blocked) {
2960         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
2961       }
2962       
2963       /* Ensure that async exceptions are blocked when running the handler.
2964        */
2965       if (tso->blocked_exceptions == NULL) {
2966         tso->blocked_exceptions = END_TSO_QUEUE;
2967       }
2968       
2969       /* Put the newly-built PAP on top of the stack, ready to execute
2970        * when the thread restarts.
2971        */
2972       sp[0] = (W_)ap;
2973       tso->sp = sp;
2974       tso->what_next = ThreadEnterGHC;
2975       IF_DEBUG(sanity, checkTSO(tso));
2976       return;
2977     }
2978
2979     /* First build an AP_UPD consisting of the stack chunk above the
2980      * current update frame, with the top word on the stack as the
2981      * fun field.
2982      */
2983     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2984     
2985     ASSERT(words >= 0);
2986     
2987     ap->n_args = words;
2988     ap->fun    = (StgClosure *)sp[0];
2989     sp++;
2990     for(i=0; i < (nat)words; ++i) {
2991       ap->payload[i] = (StgClosure *)*sp++;
2992     }
2993     
2994     switch (get_itbl(su)->type) {
2995       
2996     case UPDATE_FRAME:
2997       {
2998         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2999         TICK_ALLOC_UP_THK(words+1,0);
3000         
3001         IF_DEBUG(scheduler,
3002                  fprintf(stderr,  "scheduler: Updating ");
3003                  printPtr((P_)su->updatee); 
3004                  fprintf(stderr,  " with ");
3005                  printObj((StgClosure *)ap);
3006                  );
3007         
3008         /* Replace the updatee with an indirection - happily
3009          * this will also wake up any threads currently
3010          * waiting on the result.
3011          *
3012          * Warning: if we're in a loop, more than one update frame on
3013          * the stack may point to the same object.  Be careful not to
3014          * overwrite an IND_OLDGEN in this case, because we'll screw
3015          * up the mutable lists.  To be on the safe side, don't
3016          * overwrite any kind of indirection at all.  See also
3017          * threadSqueezeStack in GC.c, where we have to make a similar
3018          * check.
3019          */
3020         if (!closure_IND(su->updatee)) {
3021             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3022         }
3023         su = su->link;
3024         sp += sizeofW(StgUpdateFrame) -1;
3025         sp[0] = (W_)ap; /* push onto stack */
3026         break;
3027       }
3028
3029     case CATCH_FRAME:
3030       {
3031         StgCatchFrame *cf = (StgCatchFrame *)su;
3032         StgClosure* o;
3033         
3034         /* We want a PAP, not an AP_UPD.  Fortunately, the
3035          * layout's the same.
3036          */
3037         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3038         TICK_ALLOC_UPD_PAP(words+1,0);
3039         
3040         /* now build o = FUN(catch,ap,handler) */
3041         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3042         TICK_ALLOC_FUN(2,0);
3043         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3044         o->payload[0] = (StgClosure *)ap;
3045         o->payload[1] = cf->handler;
3046         
3047         IF_DEBUG(scheduler,
3048                  fprintf(stderr,  "scheduler: Built ");
3049                  printObj((StgClosure *)o);
3050                  );
3051         
3052         /* pop the old handler and put o on the stack */
3053         su = cf->link;
3054         sp += sizeofW(StgCatchFrame) - 1;
3055         sp[0] = (W_)o;
3056         break;
3057       }
3058       
3059     case SEQ_FRAME:
3060       {
3061         StgSeqFrame *sf = (StgSeqFrame *)su;
3062         StgClosure* o;
3063         
3064         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3065         TICK_ALLOC_UPD_PAP(words+1,0);
3066         
3067         /* now build o = FUN(seq,ap) */
3068         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3069         TICK_ALLOC_SE_THK(1,0);
3070         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3071         o->payload[0] = (StgClosure *)ap;
3072         
3073         IF_DEBUG(scheduler,
3074                  fprintf(stderr,  "scheduler: Built ");
3075                  printObj((StgClosure *)o);
3076                  );
3077         
3078         /* pop the old handler and put o on the stack */
3079         su = sf->link;
3080         sp += sizeofW(StgSeqFrame) - 1;
3081         sp[0] = (W_)o;
3082         break;
3083       }
3084       
3085     case STOP_FRAME:
3086       /* We've stripped the entire stack, the thread is now dead. */
3087       sp += sizeofW(StgStopFrame) - 1;
3088       sp[0] = (W_)exception;    /* save the exception */
3089       tso->what_next = ThreadKilled;
3090       tso->su = (StgUpdateFrame *)(sp+1);
3091       tso->sp = sp;
3092       return;
3093
3094     default:
3095       barf("raiseAsync");
3096     }
3097   }
3098   barf("raiseAsync");
3099 }
3100
3101 /* -----------------------------------------------------------------------------
3102    resurrectThreads is called after garbage collection on the list of
3103    threads found to be garbage.  Each of these threads will be woken
3104    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3105    on an MVar, or NonTermination if the thread was blocked on a Black
3106    Hole.
3107    -------------------------------------------------------------------------- */
3108
3109 void
3110 resurrectThreads( StgTSO *threads )
3111 {
3112   StgTSO *tso, *next;
3113
3114   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3115     next = tso->global_link;
3116     tso->global_link = all_threads;
3117     all_threads = tso;
3118     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3119
3120     switch (tso->why_blocked) {
3121     case BlockedOnMVar:
3122     case BlockedOnException:
3123       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3124       break;
3125     case BlockedOnBlackHole:
3126       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3127       break;
3128     case NotBlocked:
3129       /* This might happen if the thread was blocked on a black hole
3130        * belonging to a thread that we've just woken up (raiseAsync
3131        * can wake up threads, remember...).
3132        */
3133       continue;
3134     default:
3135       barf("resurrectThreads: thread blocked in a strange way");
3136     }
3137   }
3138 }
3139
3140 /* -----------------------------------------------------------------------------
3141  * Blackhole detection: if we reach a deadlock, test whether any
3142  * threads are blocked on themselves.  Any threads which are found to
3143  * be self-blocked get sent a NonTermination exception.
3144  *
3145  * This is only done in a deadlock situation in order to avoid
3146  * performance overhead in the normal case.
3147  * -------------------------------------------------------------------------- */
3148
3149 static void
3150 detectBlackHoles( void )
3151 {
3152     StgTSO *t = all_threads;
3153     StgUpdateFrame *frame;
3154     StgClosure *blocked_on;
3155
3156     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3157
3158         while (t->what_next == ThreadRelocated) {
3159             t = t->link;
3160             ASSERT(get_itbl(t)->type == TSO);
3161         }
3162       
3163         if (t->why_blocked != BlockedOnBlackHole) {
3164             continue;
3165         }
3166
3167         blocked_on = t->block_info.closure;
3168
3169         for (frame = t->su; ; frame = frame->link) {
3170             switch (get_itbl(frame)->type) {
3171
3172             case UPDATE_FRAME:
3173                 if (frame->updatee == blocked_on) {
3174                     /* We are blocking on one of our own computations, so
3175                      * send this thread the NonTermination exception.  
3176                      */
3177                     IF_DEBUG(scheduler, 
3178                              sched_belch("thread %d is blocked on itself", t->id));
3179                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3180                     goto done;
3181                 }
3182                 else {
3183                     continue;
3184                 }
3185
3186             case CATCH_FRAME:
3187             case SEQ_FRAME:
3188                 continue;
3189                 
3190             case STOP_FRAME:
3191                 break;
3192             }
3193             break;
3194         }
3195
3196     done: ;
3197     }   
3198 }
3199
3200 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3201 //@subsection Debugging Routines
3202
3203 /* -----------------------------------------------------------------------------
3204    Debugging: why is a thread blocked
3205    -------------------------------------------------------------------------- */
3206
3207 #ifdef DEBUG
3208
3209 void
3210 printThreadBlockage(StgTSO *tso)
3211 {
3212   switch (tso->why_blocked) {
3213   case BlockedOnRead:
3214     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3215     break;
3216   case BlockedOnWrite:
3217     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3218     break;
3219   case BlockedOnDelay:
3220     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3221     break;
3222   case BlockedOnMVar:
3223     fprintf(stderr,"is blocked on an MVar");
3224     break;
3225   case BlockedOnException:
3226     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3227             tso->block_info.tso->id);
3228     break;
3229   case BlockedOnBlackHole:
3230     fprintf(stderr,"is blocked on a black hole");
3231     break;
3232   case NotBlocked:
3233     fprintf(stderr,"is not blocked");
3234     break;
3235 #if defined(PAR)
3236   case BlockedOnGA:
3237     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3238             tso->block_info.closure, info_type(tso->block_info.closure));
3239     break;
3240   case BlockedOnGA_NoSend:
3241     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3242             tso->block_info.closure, info_type(tso->block_info.closure));
3243     break;
3244 #endif
3245   default:
3246     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3247          tso->why_blocked, tso->id, tso);
3248   }
3249 }
3250
3251 void
3252 printThreadStatus(StgTSO *tso)
3253 {
3254   switch (tso->what_next) {
3255   case ThreadKilled:
3256     fprintf(stderr,"has been killed");
3257     break;
3258   case ThreadComplete:
3259     fprintf(stderr,"has completed");
3260     break;
3261   default:
3262     printThreadBlockage(tso);
3263   }
3264 }
3265
3266 void
3267 printAllThreads(void)
3268 {
3269   StgTSO *t;
3270
3271 # if defined(GRAN)
3272   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3273   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3274                        time_string, rtsFalse/*no commas!*/);
3275
3276   sched_belch("all threads at [%s]:", time_string);
3277 # elif defined(PAR)
3278   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3279   ullong_format_string(CURRENT_TIME,
3280                        time_string, rtsFalse/*no commas!*/);
3281
3282   sched_belch("all threads at [%s]:", time_string);
3283 # else
3284   sched_belch("all threads:");
3285 # endif
3286
3287   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3288     fprintf(stderr, "\tthread %d ", t->id);
3289     printThreadStatus(t);
3290     fprintf(stderr,"\n");
3291   }
3292 }
3293     
3294 /* 
3295    Print a whole blocking queue attached to node (debugging only).
3296 */
3297 //@cindex print_bq
3298 # if defined(PAR)
3299 void 
3300 print_bq (StgClosure *node)
3301 {
3302   StgBlockingQueueElement *bqe;
3303   StgTSO *tso;
3304   rtsBool end;
3305
3306   fprintf(stderr,"## BQ of closure %p (%s): ",
3307           node, info_type(node));
3308
3309   /* should cover all closures that may have a blocking queue */
3310   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3311          get_itbl(node)->type == FETCH_ME_BQ ||
3312          get_itbl(node)->type == RBH ||
3313          get_itbl(node)->type == MVAR);
3314     
3315   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3316
3317   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3318 }
3319
3320 /* 
3321    Print a whole blocking queue starting with the element bqe.
3322 */
3323 void 
3324 print_bqe (StgBlockingQueueElement *bqe)
3325 {
3326   rtsBool end;
3327
3328   /* 
3329      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3330   */
3331   for (end = (bqe==END_BQ_QUEUE);
3332        !end; // iterate until bqe points to a CONSTR
3333        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3334        bqe = end ? END_BQ_QUEUE : bqe->link) {
3335     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3336     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3337     /* types of closures that may appear in a blocking queue */
3338     ASSERT(get_itbl(bqe)->type == TSO ||           
3339            get_itbl(bqe)->type == BLOCKED_FETCH || 
3340            get_itbl(bqe)->type == CONSTR); 
3341     /* only BQs of an RBH end with an RBH_Save closure */
3342     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3343
3344     switch (get_itbl(bqe)->type) {
3345     case TSO:
3346       fprintf(stderr," TSO %u (%x),",
3347               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3348       break;
3349     case BLOCKED_FETCH:
3350       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3351               ((StgBlockedFetch *)bqe)->node, 
3352               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3353               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3354               ((StgBlockedFetch *)bqe)->ga.weight);
3355       break;
3356     case CONSTR:
3357       fprintf(stderr," %s (IP %p),",
3358               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3359                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3360                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3361                "RBH_Save_?"), get_itbl(bqe));
3362       break;
3363     default:
3364       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3365            info_type((StgClosure *)bqe)); // , node, info_type(node));
3366       break;
3367     }
3368   } /* for */
3369   fputc('\n', stderr);
3370 }
3371 # elif defined(GRAN)
3372 void 
3373 print_bq (StgClosure *node)
3374 {
3375   StgBlockingQueueElement *bqe;
3376   PEs node_loc, tso_loc;
3377   rtsBool end;
3378
3379   /* should cover all closures that may have a blocking queue */
3380   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3381          get_itbl(node)->type == FETCH_ME_BQ ||
3382          get_itbl(node)->type == RBH);
3383     
3384   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3385   node_loc = where_is(node);
3386
3387   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3388           node, info_type(node), node_loc);
3389
3390   /* 
3391      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3392   */
3393   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3394        !end; // iterate until bqe points to a CONSTR
3395        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3396     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3397     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3398     /* types of closures that may appear in a blocking queue */
3399     ASSERT(get_itbl(bqe)->type == TSO ||           
3400            get_itbl(bqe)->type == CONSTR); 
3401     /* only BQs of an RBH end with an RBH_Save closure */
3402     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3403
3404     tso_loc = where_is((StgClosure *)bqe);
3405     switch (get_itbl(bqe)->type) {
3406     case TSO:
3407       fprintf(stderr," TSO %d (%p) on [PE %d],",
3408               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3409       break;
3410     case CONSTR:
3411       fprintf(stderr," %s (IP %p),",
3412               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3413                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3414                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3415                "RBH_Save_?"), get_itbl(bqe));
3416       break;
3417     default:
3418       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3419            info_type((StgClosure *)bqe), node, info_type(node));
3420       break;
3421     }
3422   } /* for */
3423   fputc('\n', stderr);
3424 }
3425 #else
3426 /* 
3427    Nice and easy: only TSOs on the blocking queue
3428 */
3429 void 
3430 print_bq (StgClosure *node)
3431 {
3432   StgTSO *tso;
3433
3434   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3435   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3436        tso != END_TSO_QUEUE; 
3437        tso=tso->link) {
3438     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3439     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3440     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3441   }
3442   fputc('\n', stderr);
3443 }
3444 # endif
3445
3446 #if defined(PAR)
3447 static nat
3448 run_queue_len(void)
3449 {
3450   nat i;
3451   StgTSO *tso;
3452
3453   for (i=0, tso=run_queue_hd; 
3454        tso != END_TSO_QUEUE;
3455        i++, tso=tso->link)
3456     /* nothing */
3457
3458   return i;
3459 }
3460 #endif
3461
3462 static void
3463 sched_belch(char *s, ...)
3464 {
3465   va_list ap;
3466   va_start(ap,s);
3467 #ifdef SMP
3468   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3469 #elif defined(PAR)
3470   fprintf(stderr, "== ");
3471 #else
3472   fprintf(stderr, "scheduler: ");
3473 #endif
3474   vfprintf(stderr, s, ap);
3475   fprintf(stderr, "\n");
3476 }
3477
3478 #endif /* DEBUG */
3479
3480
3481 //@node Index,  , Debugging Routines, Main scheduling code
3482 //@subsection Index
3483
3484 //@index
3485 //* MainRegTable::  @cindex\s-+MainRegTable
3486 //* StgMainThread::  @cindex\s-+StgMainThread
3487 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3488 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3489 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3490 //* context_switch::  @cindex\s-+context_switch
3491 //* createThread::  @cindex\s-+createThread
3492 //* free_capabilities::  @cindex\s-+free_capabilities
3493 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3494 //* initScheduler::  @cindex\s-+initScheduler
3495 //* interrupted::  @cindex\s-+interrupted
3496 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3497 //* next_thread_id::  @cindex\s-+next_thread_id
3498 //* print_bq::  @cindex\s-+print_bq
3499 //* run_queue_hd::  @cindex\s-+run_queue_hd
3500 //* run_queue_tl::  @cindex\s-+run_queue_tl
3501 //* sched_mutex::  @cindex\s-+sched_mutex
3502 //* schedule::  @cindex\s-+schedule
3503 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3504 //* task_ids::  @cindex\s-+task_ids
3505 //* term_mutex::  @cindex\s-+term_mutex
3506 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3507 //@end index