c8a6a94d338a59d74624f7b4a12c69670688849a
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.100 2001/08/14 13:40:09 sewardj Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "PosixSource.h"
78 #include "Rts.h"
79 #include "SchedAPI.h"
80 #include "RtsUtils.h"
81 #include "RtsFlags.h"
82 #include "Storage.h"
83 #include "StgRun.h"
84 #include "StgStartup.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #if defined(GRAN) || defined(PAR)
99 # include "GranSimRts.h"
100 # include "GranSim.h"
101 # include "ParallelRts.h"
102 # include "Parallel.h"
103 # include "ParallelDebug.h"
104 # include "FetchMe.h"
105 # include "HLC.h"
106 #endif
107 #include "Sparks.h"
108
109 #include <stdarg.h>
110
111 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
112 //@subsection Variables and Data structures
113
114 /* Main threads:
115  *
116  * These are the threads which clients have requested that we run.  
117  *
118  * In an SMP build, we might have several concurrent clients all
119  * waiting for results, and each one will wait on a condition variable
120  * until the result is available.
121  *
122  * In non-SMP, clients are strictly nested: the first client calls
123  * into the RTS, which might call out again to C with a _ccall_GC, and
124  * eventually re-enter the RTS.
125  *
126  * Main threads information is kept in a linked list:
127  */
128 //@cindex StgMainThread
129 typedef struct StgMainThread_ {
130   StgTSO *         tso;
131   SchedulerStatus  stat;
132   StgClosure **    ret;
133 #ifdef SMP
134   pthread_cond_t wakeup;
135 #endif
136   struct StgMainThread_ *link;
137 } StgMainThread;
138
139 /* Main thread queue.
140  * Locks required: sched_mutex.
141  */
142 static StgMainThread *main_threads;
143
144 /* Thread queues.
145  * Locks required: sched_mutex.
146  */
147 #if defined(GRAN)
148
149 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
150 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
151
152 /* 
153    In GranSim we have a runable and a blocked queue for each processor.
154    In order to minimise code changes new arrays run_queue_hds/tls
155    are created. run_queue_hd is then a short cut (macro) for
156    run_queue_hds[CurrentProc] (see GranSim.h).
157    -- HWL
158 */
159 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
160 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
161 StgTSO *ccalling_threadss[MAX_PROC];
162 /* We use the same global list of threads (all_threads) in GranSim as in
163    the std RTS (i.e. we are cheating). However, we don't use this list in
164    the GranSim specific code at the moment (so we are only potentially
165    cheating).  */
166
167 #else /* !GRAN */
168
169 StgTSO *run_queue_hd, *run_queue_tl;
170 StgTSO *blocked_queue_hd, *blocked_queue_tl;
171 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
172
173 #endif
174
175 /* Linked list of all threads.
176  * Used for detecting garbage collected threads.
177  */
178 StgTSO *all_threads;
179
180 /* Threads suspended in _ccall_GC.
181  */
182 static StgTSO *suspended_ccalling_threads;
183
184 static void GetRoots(evac_fn);
185 static StgTSO *threadStackOverflow(StgTSO *tso);
186
187 /* KH: The following two flags are shared memory locations.  There is no need
188        to lock them, since they are only unset at the end of a scheduler
189        operation.
190 */
191
192 /* flag set by signal handler to precipitate a context switch */
193 //@cindex context_switch
194 nat context_switch;
195
196 /* if this flag is set as well, give up execution */
197 //@cindex interrupted
198 rtsBool interrupted;
199
200 /* Next thread ID to allocate.
201  * Locks required: sched_mutex
202  */
203 //@cindex next_thread_id
204 StgThreadID next_thread_id = 1;
205
206 /*
207  * Pointers to the state of the current thread.
208  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
209  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
210  */
211  
212 /* The smallest stack size that makes any sense is:
213  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
214  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
215  *  + 1                       (the realworld token for an IO thread)
216  *  + 1                       (the closure to enter)
217  *
218  * A thread with this stack will bomb immediately with a stack
219  * overflow, which will increase its stack size.  
220  */
221
222 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
223
224 /* Free capability list.
225  * Locks required: sched_mutex.
226  */
227 #ifdef SMP
228 //@cindex free_capabilities
229 //@cindex n_free_capabilities
230 Capability *free_capabilities; /* Available capabilities for running threads */
231 nat n_free_capabilities;       /* total number of available capabilities */
232 #else
233 //@cindex MainRegTable
234 Capability MainRegTable;       /* for non-SMP, we have one global capability */
235 #endif
236
237 #if defined(GRAN)
238 StgTSO *CurrentTSO;
239 #endif
240
241 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
242  *  exists - earlier gccs apparently didn't.
243  *  -= chak
244  */
245 StgTSO dummy_tso;
246
247 rtsBool ready_to_gc;
248
249 /* All our current task ids, saved in case we need to kill them later.
250  */
251 #ifdef SMP
252 //@cindex task_ids
253 task_info *task_ids;
254 #endif
255
256 void            addToBlockedQueue ( StgTSO *tso );
257
258 static void     schedule          ( void );
259        void     interruptStgRts   ( void );
260 #if defined(GRAN)
261 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
262 #else
263 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
264 #endif
265
266 static void     detectBlackHoles  ( void );
267
268 #ifdef DEBUG
269 static void sched_belch(char *s, ...);
270 #endif
271
272 #ifdef SMP
273 //@cindex sched_mutex
274 //@cindex term_mutex
275 //@cindex thread_ready_cond
276 //@cindex gc_pending_cond
277 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
278 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
279 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
280 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
281
282 nat await_death;
283 #endif
284
285 #if defined(PAR)
286 StgTSO *LastTSO;
287 rtsTime TimeOfLastYield;
288 rtsBool emitSchedule = rtsTrue;
289 #endif
290
291 #if DEBUG
292 char *whatNext_strs[] = {
293   "ThreadEnterGHC",
294   "ThreadRunGHC",
295   "ThreadEnterInterp",
296   "ThreadKilled",
297   "ThreadComplete"
298 };
299
300 char *threadReturnCode_strs[] = {
301   "HeapOverflow",                       /* might also be StackOverflow */
302   "StackOverflow",
303   "ThreadYielding",
304   "ThreadBlocked",
305   "ThreadFinished"
306 };
307 #endif
308
309 #ifdef PAR
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);  
312 #endif
313
314 /*
315  * The thread state for the main thread.
316 // ToDo: check whether not needed any more
317 StgTSO   *MainTSO;
318  */
319
320 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
321 //@subsection Main scheduling loop
322
323 /* ---------------------------------------------------------------------------
324    Main scheduling loop.
325
326    We use round-robin scheduling, each thread returning to the
327    scheduler loop when one of these conditions is detected:
328
329       * out of heap space
330       * timer expires (thread yields)
331       * thread blocks
332       * thread ends
333       * stack overflow
334
335    Locking notes:  we acquire the scheduler lock once at the beginning
336    of the scheduler loop, and release it when
337     
338       * running a thread, or
339       * waiting for work, or
340       * waiting for a GC to complete.
341
342    GRAN version:
343      In a GranSim setup this loop iterates over the global event queue.
344      This revolves around the global event queue, which determines what 
345      to do next. Therefore, it's more complicated than either the 
346      concurrent or the parallel (GUM) setup.
347
348    GUM version:
349      GUM iterates over incoming messages.
350      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
351      and sends out a fish whenever it has nothing to do; in-between
352      doing the actual reductions (shared code below) it processes the
353      incoming messages and deals with delayed operations 
354      (see PendingFetches).
355      This is not the ugliest code you could imagine, but it's bloody close.
356
357    ------------------------------------------------------------------------ */
358 //@cindex schedule
359 static void
360 schedule( void )
361 {
362   StgTSO *t;
363   Capability *cap;
364   StgThreadReturnCode ret;
365 #if defined(GRAN)
366   rtsEvent *event;
367 #elif defined(PAR)
368   StgSparkPool *pool;
369   rtsSpark spark;
370   StgTSO *tso;
371   GlobalTaskId pe;
372   rtsBool receivedFinish = rtsFalse;
373 # if defined(DEBUG)
374   nat tp_size, sp_size; // stats only
375 # endif
376 #endif
377   rtsBool was_interrupted = rtsFalse;
378   
379   ACQUIRE_LOCK(&sched_mutex);
380
381 #if defined(GRAN)
382
383   /* set up first event to get things going */
384   /* ToDo: assign costs for system setup and init MainTSO ! */
385   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
386             ContinueThread, 
387             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
388
389   IF_DEBUG(gran,
390            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391            G_TSO(CurrentTSO, 5));
392
393   if (RtsFlags.GranFlags.Light) {
394     /* Save current time; GranSim Light only */
395     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
396   }      
397
398   event = get_next_event();
399
400   while (event!=(rtsEvent*)NULL) {
401     /* Choose the processor with the next event */
402     CurrentProc = event->proc;
403     CurrentTSO = event->tso;
404
405 #elif defined(PAR)
406
407   while (!receivedFinish) {    /* set by processMessages */
408                                /* when receiving PP_FINISH message         */ 
409 #else
410
411   while (1) {
412
413 #endif
414
415     IF_DEBUG(scheduler, printAllThreads());
416
417     /* If we're interrupted (the user pressed ^C, or some other
418      * termination condition occurred), kill all the currently running
419      * threads.
420      */
421     if (interrupted) {
422       IF_DEBUG(scheduler, sched_belch("interrupted"));
423       deleteAllThreads();
424       interrupted = rtsFalse;
425       was_interrupted = rtsTrue;
426     }
427
428     /* Go through the list of main threads and wake up any
429      * clients whose computations have finished.  ToDo: this
430      * should be done more efficiently without a linear scan
431      * of the main threads list, somehow...
432      */
433 #ifdef SMP
434     { 
435       StgMainThread *m, **prev;
436       prev = &main_threads;
437       for (m = main_threads; m != NULL; m = m->link) {
438         switch (m->tso->what_next) {
439         case ThreadComplete:
440           if (m->ret) {
441             *(m->ret) = (StgClosure *)m->tso->sp[0];
442           }
443           *prev = m->link;
444           m->stat = Success;
445           pthread_cond_broadcast(&m->wakeup);
446           break;
447         case ThreadKilled:
448           *prev = m->link;
449           if (was_interrupted) {
450             m->stat = Interrupted;
451           } else {
452             m->stat = Killed;
453           }
454           pthread_cond_broadcast(&m->wakeup);
455           break;
456         default:
457           break;
458         }
459       }
460     }
461
462 #else
463 # if defined(PAR)
464     /* in GUM do this only on the Main PE */
465     if (IAmMainThread)
466 # endif
467     /* If our main thread has finished or been killed, return.
468      */
469     {
470       StgMainThread *m = main_threads;
471       if (m->tso->what_next == ThreadComplete
472           || m->tso->what_next == ThreadKilled) {
473         main_threads = main_threads->link;
474         if (m->tso->what_next == ThreadComplete) {
475           /* we finished successfully, fill in the return value */
476           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
477           m->stat = Success;
478           return;
479         } else {
480           if (was_interrupted) {
481             m->stat = Interrupted;
482           } else {
483             m->stat = Killed;
484           }
485           return;
486         }
487       }
488     }
489 #endif
490
491     /* Top up the run queue from our spark pool.  We try to make the
492      * number of threads in the run queue equal to the number of
493      * free capabilities.
494      */
495 #if defined(SMP)
496     {
497       nat n = n_free_capabilities;
498       StgTSO *tso = run_queue_hd;
499
500       /* Count the run queue */
501       while (n > 0 && tso != END_TSO_QUEUE) {
502         tso = tso->link;
503         n--;
504       }
505
506       for (; n > 0; n--) {
507         StgClosure *spark;
508         spark = findSpark(rtsFalse);
509         if (spark == NULL) {
510           break; /* no more sparks in the pool */
511         } else {
512           /* I'd prefer this to be done in activateSpark -- HWL */
513           /* tricky - it needs to hold the scheduler lock and
514            * not try to re-acquire it -- SDM */
515           createSparkThread(spark);       
516           IF_DEBUG(scheduler,
517                    sched_belch("==^^ turning spark of closure %p into a thread",
518                                (StgClosure *)spark));
519         }
520       }
521       /* We need to wake up the other tasks if we just created some
522        * work for them.
523        */
524       if (n_free_capabilities - n > 1) {
525           pthread_cond_signal(&thread_ready_cond);
526       }
527     }
528 #endif /* SMP */
529
530     /* Check whether any waiting threads need to be woken up.  If the
531      * run queue is empty, and there are no other tasks running, we
532      * can wait indefinitely for something to happen.
533      * ToDo: what if another client comes along & requests another
534      * main thread?
535      */
536     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
537       awaitEvent(
538            (run_queue_hd == END_TSO_QUEUE)
539 #ifdef SMP
540         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
541 #endif
542         );
543     }
544     /* we can be interrupted while waiting for I/O... */
545     if (interrupted) continue;
546
547     /* check for signals each time around the scheduler */
548 #ifndef mingw32_TARGET_OS
549     if (signals_pending()) {
550       start_signal_handlers();
551     }
552 #endif
553
554     /* 
555      * Detect deadlock: when we have no threads to run, there are no
556      * threads waiting on I/O or sleeping, and all the other tasks are
557      * waiting for work, we must have a deadlock of some description.
558      *
559      * We first try to find threads blocked on themselves (ie. black
560      * holes), and generate NonTermination exceptions where necessary.
561      *
562      * If no threads are black holed, we have a deadlock situation, so
563      * inform all the main threads.
564      */
565 #ifndef PAR
566     if (blocked_queue_hd == END_TSO_QUEUE
567         && run_queue_hd == END_TSO_QUEUE
568         && sleeping_queue == END_TSO_QUEUE
569 #ifdef SMP
570         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
571 #endif
572         )
573     {
574         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
575         GarbageCollect(GetRoots,rtsTrue);
576         if (blocked_queue_hd == END_TSO_QUEUE
577             && run_queue_hd == END_TSO_QUEUE
578             && sleeping_queue == END_TSO_QUEUE) {
579             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
580             detectBlackHoles();
581             if (run_queue_hd == END_TSO_QUEUE) {
582                 StgMainThread *m = main_threads;
583 #ifdef SMP
584                 for (; m != NULL; m = m->link) {
585                     m->ret = NULL;
586                     m->stat = Deadlock;
587                     pthread_cond_broadcast(&m->wakeup);
588                 }
589                 main_threads = NULL;
590 #else
591                 m->ret = NULL;
592                 m->stat = Deadlock;
593                 main_threads = m->link;
594                 return;
595 #endif
596             }
597         }
598     }
599 #elif defined(PAR)
600     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
601 #endif
602
603 #ifdef SMP
604     /* If there's a GC pending, don't do anything until it has
605      * completed.
606      */
607     if (ready_to_gc) {
608       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
609       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
610     }
611     
612     /* block until we've got a thread on the run queue and a free
613      * capability.
614      */
615     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
616       IF_DEBUG(scheduler, sched_belch("waiting for work"));
617       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
618       IF_DEBUG(scheduler, sched_belch("work now available"));
619     }
620 #endif
621
622 #if defined(GRAN)
623
624     if (RtsFlags.GranFlags.Light)
625       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
626
627     /* adjust time based on time-stamp */
628     if (event->time > CurrentTime[CurrentProc] &&
629         event->evttype != ContinueThread)
630       CurrentTime[CurrentProc] = event->time;
631     
632     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
633     if (!RtsFlags.GranFlags.Light)
634       handleIdlePEs();
635
636     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
637
638     /* main event dispatcher in GranSim */
639     switch (event->evttype) {
640       /* Should just be continuing execution */
641     case ContinueThread:
642       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
643       /* ToDo: check assertion
644       ASSERT(run_queue_hd != (StgTSO*)NULL &&
645              run_queue_hd != END_TSO_QUEUE);
646       */
647       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
648       if (!RtsFlags.GranFlags.DoAsyncFetch &&
649           procStatus[CurrentProc]==Fetching) {
650         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
651               CurrentTSO->id, CurrentTSO, CurrentProc);
652         goto next_thread;
653       } 
654       /* Ignore ContinueThreads for completed threads */
655       if (CurrentTSO->what_next == ThreadComplete) {
656         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
657               CurrentTSO->id, CurrentTSO, CurrentProc);
658         goto next_thread;
659       } 
660       /* Ignore ContinueThreads for threads that are being migrated */
661       if (PROCS(CurrentTSO)==Nowhere) { 
662         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
663               CurrentTSO->id, CurrentTSO, CurrentProc);
664         goto next_thread;
665       }
666       /* The thread should be at the beginning of the run queue */
667       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
668         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
669               CurrentTSO->id, CurrentTSO, CurrentProc);
670         break; // run the thread anyway
671       }
672       /*
673       new_event(proc, proc, CurrentTime[proc],
674                 FindWork,
675                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
676       goto next_thread; 
677       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
678       break; // now actually run the thread; DaH Qu'vam yImuHbej 
679
680     case FetchNode:
681       do_the_fetchnode(event);
682       goto next_thread;             /* handle next event in event queue  */
683       
684     case GlobalBlock:
685       do_the_globalblock(event);
686       goto next_thread;             /* handle next event in event queue  */
687       
688     case FetchReply:
689       do_the_fetchreply(event);
690       goto next_thread;             /* handle next event in event queue  */
691       
692     case UnblockThread:   /* Move from the blocked queue to the tail of */
693       do_the_unblock(event);
694       goto next_thread;             /* handle next event in event queue  */
695       
696     case ResumeThread:  /* Move from the blocked queue to the tail of */
697       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
698       event->tso->gran.blocktime += 
699         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
700       do_the_startthread(event);
701       goto next_thread;             /* handle next event in event queue  */
702       
703     case StartThread:
704       do_the_startthread(event);
705       goto next_thread;             /* handle next event in event queue  */
706       
707     case MoveThread:
708       do_the_movethread(event);
709       goto next_thread;             /* handle next event in event queue  */
710       
711     case MoveSpark:
712       do_the_movespark(event);
713       goto next_thread;             /* handle next event in event queue  */
714       
715     case FindWork:
716       do_the_findwork(event);
717       goto next_thread;             /* handle next event in event queue  */
718       
719     default:
720       barf("Illegal event type %u\n", event->evttype);
721     }  /* switch */
722     
723     /* This point was scheduler_loop in the old RTS */
724
725     IF_DEBUG(gran, belch("GRAN: after main switch"));
726
727     TimeOfLastEvent = CurrentTime[CurrentProc];
728     TimeOfNextEvent = get_time_of_next_event();
729     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
730     // CurrentTSO = ThreadQueueHd;
731
732     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
733                          TimeOfNextEvent));
734
735     if (RtsFlags.GranFlags.Light) 
736       GranSimLight_leave_system(event, &ActiveTSO); 
737
738     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
739
740     IF_DEBUG(gran, 
741              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
742
743     /* in a GranSim setup the TSO stays on the run queue */
744     t = CurrentTSO;
745     /* Take a thread from the run queue. */
746     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
747
748     IF_DEBUG(gran, 
749              fprintf(stderr, "GRAN: About to run current thread, which is\n");
750              G_TSO(t,5));
751
752     context_switch = 0; // turned on via GranYield, checking events and time slice
753
754     IF_DEBUG(gran, 
755              DumpGranEvent(GR_SCHEDULE, t));
756
757     procStatus[CurrentProc] = Busy;
758
759 #elif defined(PAR)
760     if (PendingFetches != END_BF_QUEUE) {
761         processFetches();
762     }
763
764     /* ToDo: phps merge with spark activation above */
765     /* check whether we have local work and send requests if we have none */
766     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
767       /* :-[  no local threads => look out for local sparks */
768       /* the spark pool for the current PE */
769       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
770       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
771           pool->hd < pool->tl) {
772         /* 
773          * ToDo: add GC code check that we really have enough heap afterwards!!
774          * Old comment:
775          * If we're here (no runnable threads) and we have pending
776          * sparks, we must have a space problem.  Get enough space
777          * to turn one of those pending sparks into a
778          * thread... 
779          */
780
781         spark = findSpark(rtsFalse);                /* get a spark */
782         if (spark != (rtsSpark) NULL) {
783           tso = activateSpark(spark);       /* turn the spark into a thread */
784           IF_PAR_DEBUG(schedule,
785                        belch("==== schedule: Created TSO %d (%p); %d threads active",
786                              tso->id, tso, advisory_thread_count));
787
788           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
789             belch("==^^ failed to activate spark");
790             goto next_thread;
791           }               /* otherwise fall through & pick-up new tso */
792         } else {
793           IF_PAR_DEBUG(verbose,
794                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
795                              spark_queue_len(pool)));
796           goto next_thread;
797         }
798       }
799
800       /* If we still have no work we need to send a FISH to get a spark
801          from another PE 
802       */
803       if (EMPTY_RUN_QUEUE()) {
804       /* =8-[  no local sparks => look for work on other PEs */
805         /*
806          * We really have absolutely no work.  Send out a fish
807          * (there may be some out there already), and wait for
808          * something to arrive.  We clearly can't run any threads
809          * until a SCHEDULE or RESUME arrives, and so that's what
810          * we're hoping to see.  (Of course, we still have to
811          * respond to other types of messages.)
812          */
813         TIME now = msTime() /*CURRENT_TIME*/;
814         IF_PAR_DEBUG(verbose, 
815                      belch("--  now=%ld", now));
816         IF_PAR_DEBUG(verbose,
817                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
818                          (last_fish_arrived_at!=0 &&
819                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
820                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
821                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
822                              last_fish_arrived_at,
823                              RtsFlags.ParFlags.fishDelay, now);
824                      });
825         
826         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
827             (last_fish_arrived_at==0 ||
828              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
829           /* outstandingFishes is set in sendFish, processFish;
830              avoid flooding system with fishes via delay */
831           pe = choosePE();
832           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
833                    NEW_FISH_HUNGER);
834
835           // Global statistics: count no. of fishes
836           if (RtsFlags.ParFlags.ParStats.Global &&
837               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
838             globalParStats.tot_fish_mess++;
839           }
840         }
841       
842         receivedFinish = processMessages();
843         goto next_thread;
844       }
845     } else if (PacketsWaiting()) {  /* Look for incoming messages */
846       receivedFinish = processMessages();
847     }
848
849     /* Now we are sure that we have some work available */
850     ASSERT(run_queue_hd != END_TSO_QUEUE);
851
852     /* Take a thread from the run queue, if we have work */
853     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
854     IF_DEBUG(sanity,checkTSO(t));
855
856     /* ToDo: write something to the log-file
857     if (RTSflags.ParFlags.granSimStats && !sameThread)
858         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
859
860     CurrentTSO = t;
861     */
862     /* the spark pool for the current PE */
863     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
864
865     IF_DEBUG(scheduler, 
866              belch("--=^ %d threads, %d sparks on [%#x]", 
867                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
868
869 #if 1
870     if (0 && RtsFlags.ParFlags.ParStats.Full && 
871         t && LastTSO && t->id != LastTSO->id && 
872         LastTSO->why_blocked == NotBlocked && 
873         LastTSO->what_next != ThreadComplete) {
874       // if previously scheduled TSO not blocked we have to record the context switch
875       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
876                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
877     }
878
879     if (RtsFlags.ParFlags.ParStats.Full && 
880         (emitSchedule /* forced emit */ ||
881         (t && LastTSO && t->id != LastTSO->id))) {
882       /* 
883          we are running a different TSO, so write a schedule event to log file
884          NB: If we use fair scheduling we also have to write  a deschedule 
885              event for LastTSO; with unfair scheduling we know that the
886              previous tso has blocked whenever we switch to another tso, so
887              we don't need it in GUM for now
888       */
889       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
890                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
891       emitSchedule = rtsFalse;
892     }
893      
894 #endif
895 #else /* !GRAN && !PAR */
896   
897     /* grab a thread from the run queue
898      */
899     ASSERT(run_queue_hd != END_TSO_QUEUE);
900     t = POP_RUN_QUEUE();
901     IF_DEBUG(sanity,checkTSO(t));
902
903 #endif
904     
905     /* grab a capability
906      */
907 #ifdef SMP
908     cap = free_capabilities;
909     free_capabilities = cap->link;
910     n_free_capabilities--;
911 #else
912     cap = &MainRegTable;
913 #endif
914
915     cap->rCurrentTSO = t;
916     
917     /* context switches are now initiated by the timer signal, unless
918      * the user specified "context switch as often as possible", with
919      * +RTS -C0
920      */
921     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
922         && (run_queue_hd != END_TSO_QUEUE
923             || blocked_queue_hd != END_TSO_QUEUE
924             || sleeping_queue != END_TSO_QUEUE))
925         context_switch = 1;
926     else
927         context_switch = 0;
928
929     RELEASE_LOCK(&sched_mutex);
930
931     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
932                               t->id, t, whatNext_strs[t->what_next]));
933
934     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
935     /* Run the current thread 
936      */
937     switch (cap->rCurrentTSO->what_next) {
938     case ThreadKilled:
939     case ThreadComplete:
940         /* Thread already finished, return to scheduler. */
941         ret = ThreadFinished;
942         break;
943     case ThreadEnterGHC:
944         ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
945         break;
946     case ThreadRunGHC:
947         ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
948         break;
949     case ThreadEnterInterp:
950         ret = interpretBCO(cap);
951         break;
952     default:
953       barf("schedule: invalid what_next field");
954     }
955     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
956     
957     /* Costs for the scheduler are assigned to CCS_SYSTEM */
958 #ifdef PROFILING
959     CCCS = CCS_SYSTEM;
960 #endif
961     
962     ACQUIRE_LOCK(&sched_mutex);
963
964 #ifdef SMP
965     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
966 #elif !defined(GRAN) && !defined(PAR)
967     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
968 #endif
969     t = cap->rCurrentTSO;
970     
971 #if defined(PAR)
972     /* HACK 675: if the last thread didn't yield, make sure to print a 
973        SCHEDULE event to the log file when StgRunning the next thread, even
974        if it is the same one as before */
975     LastTSO = t; 
976     TimeOfLastYield = CURRENT_TIME;
977 #endif
978
979     switch (ret) {
980     case HeapOverflow:
981 #if defined(GRAN)
982       IF_DEBUG(gran, 
983                DumpGranEvent(GR_DESCHEDULE, t));
984       globalGranStats.tot_heapover++;
985 #elif defined(PAR)
986       // IF_DEBUG(par, 
987       //DumpGranEvent(GR_DESCHEDULE, t);
988       globalParStats.tot_heapover++;
989 #endif
990       /* make all the running tasks block on a condition variable,
991        * maybe set context_switch and wait till they all pile in,
992        * then have them wait on a GC condition variable.
993        */
994       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
995                                t->id, t, whatNext_strs[t->what_next]));
996       threadPaused(t);
997 #if defined(GRAN)
998       ASSERT(!is_on_queue(t,CurrentProc));
999 #elif defined(PAR)
1000       /* Currently we emit a DESCHEDULE event before GC in GUM.
1001          ToDo: either add separate event to distinguish SYSTEM time from rest
1002                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1003       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1004         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1005                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1006         emitSchedule = rtsTrue;
1007       }
1008 #endif
1009       
1010       ready_to_gc = rtsTrue;
1011       context_switch = 1;               /* stop other threads ASAP */
1012       PUSH_ON_RUN_QUEUE(t);
1013       /* actual GC is done at the end of the while loop */
1014       break;
1015       
1016     case StackOverflow:
1017 #if defined(GRAN)
1018       IF_DEBUG(gran, 
1019                DumpGranEvent(GR_DESCHEDULE, t));
1020       globalGranStats.tot_stackover++;
1021 #elif defined(PAR)
1022       // IF_DEBUG(par, 
1023       // DumpGranEvent(GR_DESCHEDULE, t);
1024       globalParStats.tot_stackover++;
1025 #endif
1026       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1027                                t->id, t, whatNext_strs[t->what_next]));
1028       /* just adjust the stack for this thread, then pop it back
1029        * on the run queue.
1030        */
1031       threadPaused(t);
1032       { 
1033         StgMainThread *m;
1034         /* enlarge the stack */
1035         StgTSO *new_t = threadStackOverflow(t);
1036         
1037         /* This TSO has moved, so update any pointers to it from the
1038          * main thread stack.  It better not be on any other queues...
1039          * (it shouldn't be).
1040          */
1041         for (m = main_threads; m != NULL; m = m->link) {
1042           if (m->tso == t) {
1043             m->tso = new_t;
1044           }
1045         }
1046         threadPaused(new_t);
1047         PUSH_ON_RUN_QUEUE(new_t);
1048       }
1049       break;
1050
1051     case ThreadYielding:
1052 #if defined(GRAN)
1053       IF_DEBUG(gran, 
1054                DumpGranEvent(GR_DESCHEDULE, t));
1055       globalGranStats.tot_yields++;
1056 #elif defined(PAR)
1057       // IF_DEBUG(par, 
1058       // DumpGranEvent(GR_DESCHEDULE, t);
1059       globalParStats.tot_yields++;
1060 #endif
1061       /* put the thread back on the run queue.  Then, if we're ready to
1062        * GC, check whether this is the last task to stop.  If so, wake
1063        * up the GC thread.  getThread will block during a GC until the
1064        * GC is finished.
1065        */
1066       IF_DEBUG(scheduler,
1067                if (t->what_next == ThreadEnterInterp) {
1068                    /* ToDo: or maybe a timer expired when we were in Hugs?
1069                     * or maybe someone hit ctrl-C
1070                     */
1071                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1072                          t->id, t, whatNext_strs[t->what_next]);
1073                } else {
1074                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1075                          t->id, t, whatNext_strs[t->what_next]);
1076                }
1077                );
1078
1079       threadPaused(t);
1080
1081       IF_DEBUG(sanity,
1082                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1083                checkTSO(t));
1084       ASSERT(t->link == END_TSO_QUEUE);
1085 #if defined(GRAN)
1086       ASSERT(!is_on_queue(t,CurrentProc));
1087
1088       IF_DEBUG(sanity,
1089                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1090                checkThreadQsSanity(rtsTrue));
1091 #endif
1092 #if defined(PAR)
1093       if (RtsFlags.ParFlags.doFairScheduling) { 
1094         /* this does round-robin scheduling; good for concurrency */
1095         APPEND_TO_RUN_QUEUE(t);
1096       } else {
1097         /* this does unfair scheduling; good for parallelism */
1098         PUSH_ON_RUN_QUEUE(t);
1099       }
1100 #else
1101       /* this does round-robin scheduling; good for concurrency */
1102       APPEND_TO_RUN_QUEUE(t);
1103 #endif
1104 #if defined(GRAN)
1105       /* add a ContinueThread event to actually process the thread */
1106       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1107                 ContinueThread,
1108                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1109       IF_GRAN_DEBUG(bq, 
1110                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1111                G_EVENTQ(0);
1112                G_CURR_THREADQ(0));
1113 #endif /* GRAN */
1114       break;
1115       
1116     case ThreadBlocked:
1117 #if defined(GRAN)
1118       IF_DEBUG(scheduler,
1119                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1120                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1121                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1122
1123       // ??? needed; should emit block before
1124       IF_DEBUG(gran, 
1125                DumpGranEvent(GR_DESCHEDULE, t)); 
1126       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1127       /*
1128         ngoq Dogh!
1129       ASSERT(procStatus[CurrentProc]==Busy || 
1130               ((procStatus[CurrentProc]==Fetching) && 
1131               (t->block_info.closure!=(StgClosure*)NULL)));
1132       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1133           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1134             procStatus[CurrentProc]==Fetching)) 
1135         procStatus[CurrentProc] = Idle;
1136       */
1137 #elif defined(PAR)
1138       IF_DEBUG(scheduler,
1139                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1140                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1141       IF_PAR_DEBUG(bq,
1142
1143                    if (t->block_info.closure!=(StgClosure*)NULL) 
1144                      print_bq(t->block_info.closure));
1145
1146       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1147       blockThread(t);
1148
1149       /* whatever we schedule next, we must log that schedule */
1150       emitSchedule = rtsTrue;
1151
1152 #else /* !GRAN */
1153       /* don't need to do anything.  Either the thread is blocked on
1154        * I/O, in which case we'll have called addToBlockedQueue
1155        * previously, or it's blocked on an MVar or Blackhole, in which
1156        * case it'll be on the relevant queue already.
1157        */
1158       IF_DEBUG(scheduler,
1159                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1160                printThreadBlockage(t);
1161                fprintf(stderr, "\n"));
1162
1163       /* Only for dumping event to log file 
1164          ToDo: do I need this in GranSim, too?
1165       blockThread(t);
1166       */
1167 #endif
1168       threadPaused(t);
1169       break;
1170       
1171     case ThreadFinished:
1172       /* Need to check whether this was a main thread, and if so, signal
1173        * the task that started it with the return value.  If we have no
1174        * more main threads, we probably need to stop all the tasks until
1175        * we get a new one.
1176        */
1177       /* We also end up here if the thread kills itself with an
1178        * uncaught exception, see Exception.hc.
1179        */
1180       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1181 #if defined(GRAN)
1182       endThread(t, CurrentProc); // clean-up the thread
1183 #elif defined(PAR)
1184       /* For now all are advisory -- HWL */
1185       //if(t->priority==AdvisoryPriority) ??
1186       advisory_thread_count--;
1187       
1188 # ifdef DIST
1189       if(t->dist.priority==RevalPriority)
1190         FinishReval(t);
1191 # endif
1192       
1193       if (RtsFlags.ParFlags.ParStats.Full &&
1194           !RtsFlags.ParFlags.ParStats.Suppressed) 
1195         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1196 #endif
1197       break;
1198       
1199     default:
1200       barf("schedule: invalid thread return code %d", (int)ret);
1201     }
1202     
1203 #ifdef SMP
1204     cap->link = free_capabilities;
1205     free_capabilities = cap;
1206     n_free_capabilities++;
1207 #endif
1208
1209 #ifdef SMP
1210     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1211 #else
1212     if (ready_to_gc) 
1213 #endif
1214       {
1215       /* everybody back, start the GC.
1216        * Could do it in this thread, or signal a condition var
1217        * to do it in another thread.  Either way, we need to
1218        * broadcast on gc_pending_cond afterward.
1219        */
1220 #ifdef SMP
1221       IF_DEBUG(scheduler,sched_belch("doing GC"));
1222 #endif
1223       GarbageCollect(GetRoots,rtsFalse);
1224       ready_to_gc = rtsFalse;
1225 #ifdef SMP
1226       pthread_cond_broadcast(&gc_pending_cond);
1227 #endif
1228 #if defined(GRAN)
1229       /* add a ContinueThread event to continue execution of current thread */
1230       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1231                 ContinueThread,
1232                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1233       IF_GRAN_DEBUG(bq, 
1234                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1235                G_EVENTQ(0);
1236                G_CURR_THREADQ(0));
1237 #endif /* GRAN */
1238     }
1239 #if defined(GRAN)
1240   next_thread:
1241     IF_GRAN_DEBUG(unused,
1242                   print_eventq(EventHd));
1243
1244     event = get_next_event();
1245
1246 #elif defined(PAR)
1247   next_thread:
1248     /* ToDo: wait for next message to arrive rather than busy wait */
1249
1250 #else /* GRAN */
1251   /* not any more
1252   next_thread:
1253     t = take_off_run_queue(END_TSO_QUEUE);
1254   */
1255 #endif /* GRAN */
1256   } /* end of while(1) */
1257   IF_PAR_DEBUG(verbose,
1258                belch("== Leaving schedule() after having received Finish"));
1259 }
1260
1261 /* ---------------------------------------------------------------------------
1262  * deleteAllThreads():  kill all the live threads.
1263  *
1264  * This is used when we catch a user interrupt (^C), before performing
1265  * any necessary cleanups and running finalizers.
1266  * ------------------------------------------------------------------------- */
1267    
1268 void deleteAllThreads ( void )
1269 {
1270   StgTSO* t;
1271   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1272   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1273       deleteThread(t);
1274   }
1275   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1276       deleteThread(t);
1277   }
1278   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1279       deleteThread(t);
1280   }
1281   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1282   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1283   sleeping_queue = END_TSO_QUEUE;
1284 }
1285
1286 /* startThread and  insertThread are now in GranSim.c -- HWL */
1287
1288 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1289 //@subsection Suspend and Resume
1290
1291 /* ---------------------------------------------------------------------------
1292  * Suspending & resuming Haskell threads.
1293  * 
1294  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1295  * its capability before calling the C function.  This allows another
1296  * task to pick up the capability and carry on running Haskell
1297  * threads.  It also means that if the C call blocks, it won't lock
1298  * the whole system.
1299  *
1300  * The Haskell thread making the C call is put to sleep for the
1301  * duration of the call, on the susepended_ccalling_threads queue.  We
1302  * give out a token to the task, which it can use to resume the thread
1303  * on return from the C function.
1304  * ------------------------------------------------------------------------- */
1305    
1306 StgInt
1307 suspendThread( Capability *cap )
1308 {
1309   nat tok;
1310
1311   ACQUIRE_LOCK(&sched_mutex);
1312
1313   IF_DEBUG(scheduler,
1314            sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id));
1315
1316   threadPaused(cap->rCurrentTSO);
1317   cap->rCurrentTSO->link = suspended_ccalling_threads;
1318   suspended_ccalling_threads = cap->rCurrentTSO;
1319
1320   /* Use the thread ID as the token; it should be unique */
1321   tok = cap->rCurrentTSO->id;
1322
1323 #ifdef SMP
1324   cap->link = free_capabilities;
1325   free_capabilities = cap;
1326   n_free_capabilities++;
1327 #endif
1328
1329   RELEASE_LOCK(&sched_mutex);
1330   return tok; 
1331 }
1332
1333 Capability *
1334 resumeThread( StgInt tok )
1335 {
1336   StgTSO *tso, **prev;
1337   Capability *cap;
1338
1339   ACQUIRE_LOCK(&sched_mutex);
1340
1341   prev = &suspended_ccalling_threads;
1342   for (tso = suspended_ccalling_threads; 
1343        tso != END_TSO_QUEUE; 
1344        prev = &tso->link, tso = tso->link) {
1345     if (tso->id == (StgThreadID)tok) {
1346       *prev = tso->link;
1347       break;
1348     }
1349   }
1350   if (tso == END_TSO_QUEUE) {
1351     barf("resumeThread: thread not found");
1352   }
1353   tso->link = END_TSO_QUEUE;
1354
1355 #ifdef SMP
1356   while (free_capabilities == NULL) {
1357     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1358     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1359     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1360   }
1361   cap = free_capabilities;
1362   free_capabilities = cap->link;
1363   n_free_capabilities--;
1364 #else  
1365   cap = &MainRegTable;
1366 #endif
1367
1368   cap->rCurrentTSO = tso;
1369
1370   RELEASE_LOCK(&sched_mutex);
1371   return cap;
1372 }
1373
1374
1375 /* ---------------------------------------------------------------------------
1376  * Static functions
1377  * ------------------------------------------------------------------------ */
1378 static void unblockThread(StgTSO *tso);
1379
1380 /* ---------------------------------------------------------------------------
1381  * Comparing Thread ids.
1382  *
1383  * This is used from STG land in the implementation of the
1384  * instances of Eq/Ord for ThreadIds.
1385  * ------------------------------------------------------------------------ */
1386
1387 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1388
1389   StgThreadID id1 = tso1->id; 
1390   StgThreadID id2 = tso2->id;
1391  
1392   if (id1 < id2) return (-1);
1393   if (id1 > id2) return 1;
1394   return 0;
1395 }
1396
1397 /* ---------------------------------------------------------------------------
1398    Create a new thread.
1399
1400    The new thread starts with the given stack size.  Before the
1401    scheduler can run, however, this thread needs to have a closure
1402    (and possibly some arguments) pushed on its stack.  See
1403    pushClosure() in Schedule.h.
1404
1405    createGenThread() and createIOThread() (in SchedAPI.h) are
1406    convenient packaged versions of this function.
1407
1408    currently pri (priority) is only used in a GRAN setup -- HWL
1409    ------------------------------------------------------------------------ */
1410 //@cindex createThread
1411 #if defined(GRAN)
1412 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1413 StgTSO *
1414 createThread(nat stack_size, StgInt pri)
1415 {
1416   return createThread_(stack_size, rtsFalse, pri);
1417 }
1418
1419 static StgTSO *
1420 createThread_(nat size, rtsBool have_lock, StgInt pri)
1421 {
1422 #else
1423 StgTSO *
1424 createThread(nat stack_size)
1425 {
1426   return createThread_(stack_size, rtsFalse);
1427 }
1428
1429 static StgTSO *
1430 createThread_(nat size, rtsBool have_lock)
1431 {
1432 #endif
1433
1434     StgTSO *tso;
1435     nat stack_size;
1436
1437     /* First check whether we should create a thread at all */
1438 #if defined(PAR)
1439   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1440   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1441     threadsIgnored++;
1442     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1443           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1444     return END_TSO_QUEUE;
1445   }
1446   threadsCreated++;
1447 #endif
1448
1449 #if defined(GRAN)
1450   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1451 #endif
1452
1453   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1454
1455   /* catch ridiculously small stack sizes */
1456   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1457     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1458   }
1459
1460   stack_size = size - TSO_STRUCT_SIZEW;
1461
1462   tso = (StgTSO *)allocate(size);
1463   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1464
1465   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1466 #if defined(GRAN)
1467   SET_GRAN_HDR(tso, ThisPE);
1468 #endif
1469   tso->what_next     = ThreadEnterGHC;
1470
1471   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1472    * protect the increment operation on next_thread_id.
1473    * In future, we could use an atomic increment instead.
1474    */
1475   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1476   tso->id = next_thread_id++; 
1477   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1478
1479   tso->why_blocked  = NotBlocked;
1480   tso->blocked_exceptions = NULL;
1481
1482   tso->stack_size   = stack_size;
1483   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1484                               - TSO_STRUCT_SIZEW;
1485   tso->sp           = (P_)&(tso->stack) + stack_size;
1486
1487 #ifdef PROFILING
1488   tso->prof.CCCS = CCS_MAIN;
1489 #endif
1490
1491   /* put a stop frame on the stack */
1492   tso->sp -= sizeofW(StgStopFrame);
1493   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1494   tso->su = (StgUpdateFrame*)tso->sp;
1495
1496   // ToDo: check this
1497 #if defined(GRAN)
1498   tso->link = END_TSO_QUEUE;
1499   /* uses more flexible routine in GranSim */
1500   insertThread(tso, CurrentProc);
1501 #else
1502   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1503    * from its creation
1504    */
1505 #endif
1506
1507 #if defined(GRAN) 
1508   if (RtsFlags.GranFlags.GranSimStats.Full) 
1509     DumpGranEvent(GR_START,tso);
1510 #elif defined(PAR)
1511   if (RtsFlags.ParFlags.ParStats.Full) 
1512     DumpGranEvent(GR_STARTQ,tso);
1513   /* HACk to avoid SCHEDULE 
1514      LastTSO = tso; */
1515 #endif
1516
1517   /* Link the new thread on the global thread list.
1518    */
1519   tso->global_link = all_threads;
1520   all_threads = tso;
1521
1522 #if defined(DIST)
1523   tso->dist.priority = MandatoryPriority; //by default that is...
1524 #endif
1525
1526 #if defined(GRAN)
1527   tso->gran.pri = pri;
1528 # if defined(DEBUG)
1529   tso->gran.magic = TSO_MAGIC; // debugging only
1530 # endif
1531   tso->gran.sparkname   = 0;
1532   tso->gran.startedat   = CURRENT_TIME; 
1533   tso->gran.exported    = 0;
1534   tso->gran.basicblocks = 0;
1535   tso->gran.allocs      = 0;
1536   tso->gran.exectime    = 0;
1537   tso->gran.fetchtime   = 0;
1538   tso->gran.fetchcount  = 0;
1539   tso->gran.blocktime   = 0;
1540   tso->gran.blockcount  = 0;
1541   tso->gran.blockedat   = 0;
1542   tso->gran.globalsparks = 0;
1543   tso->gran.localsparks  = 0;
1544   if (RtsFlags.GranFlags.Light)
1545     tso->gran.clock  = Now; /* local clock */
1546   else
1547     tso->gran.clock  = 0;
1548
1549   IF_DEBUG(gran,printTSO(tso));
1550 #elif defined(PAR)
1551 # if defined(DEBUG)
1552   tso->par.magic = TSO_MAGIC; // debugging only
1553 # endif
1554   tso->par.sparkname   = 0;
1555   tso->par.startedat   = CURRENT_TIME; 
1556   tso->par.exported    = 0;
1557   tso->par.basicblocks = 0;
1558   tso->par.allocs      = 0;
1559   tso->par.exectime    = 0;
1560   tso->par.fetchtime   = 0;
1561   tso->par.fetchcount  = 0;
1562   tso->par.blocktime   = 0;
1563   tso->par.blockcount  = 0;
1564   tso->par.blockedat   = 0;
1565   tso->par.globalsparks = 0;
1566   tso->par.localsparks  = 0;
1567 #endif
1568
1569 #if defined(GRAN)
1570   globalGranStats.tot_threads_created++;
1571   globalGranStats.threads_created_on_PE[CurrentProc]++;
1572   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1573   globalGranStats.tot_sq_probes++;
1574 #elif defined(PAR)
1575   // collect parallel global statistics (currently done together with GC stats)
1576   if (RtsFlags.ParFlags.ParStats.Global &&
1577       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1578     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1579     globalParStats.tot_threads_created++;
1580   }
1581 #endif 
1582
1583 #if defined(GRAN)
1584   IF_GRAN_DEBUG(pri,
1585                 belch("==__ schedule: Created TSO %d (%p);",
1586                       CurrentProc, tso, tso->id));
1587 #elif defined(PAR)
1588     IF_PAR_DEBUG(verbose,
1589                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1590                        tso->id, tso, advisory_thread_count));
1591 #else
1592   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1593                                  tso->id, tso->stack_size));
1594 #endif    
1595   return tso;
1596 }
1597
1598 #if defined(PAR)
1599 /* RFP:
1600    all parallel thread creation calls should fall through the following routine.
1601 */
1602 StgTSO *
1603 createSparkThread(rtsSpark spark) 
1604 { StgTSO *tso;
1605   ASSERT(spark != (rtsSpark)NULL);
1606   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1607   { threadsIgnored++;
1608     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1609           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1610     return END_TSO_QUEUE;
1611   }
1612   else
1613   { threadsCreated++;
1614     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1615     if (tso==END_TSO_QUEUE)     
1616       barf("createSparkThread: Cannot create TSO");
1617 #if defined(DIST)
1618     tso->priority = AdvisoryPriority;
1619 #endif
1620     pushClosure(tso,spark);
1621     PUSH_ON_RUN_QUEUE(tso);
1622     advisory_thread_count++;    
1623   }
1624   return tso;
1625 }
1626 #endif
1627
1628 /*
1629   Turn a spark into a thread.
1630   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1631 */
1632 #if defined(PAR)
1633 //@cindex activateSpark
1634 StgTSO *
1635 activateSpark (rtsSpark spark) 
1636 {
1637   StgTSO *tso;
1638
1639   tso = createSparkThread(spark);
1640   if (RtsFlags.ParFlags.ParStats.Full) {   
1641     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1642     IF_PAR_DEBUG(verbose,
1643                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1644                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1645   }
1646   // ToDo: fwd info on local/global spark to thread -- HWL
1647   // tso->gran.exported =  spark->exported;
1648   // tso->gran.locked =   !spark->global;
1649   // tso->gran.sparkname = spark->name;
1650
1651   return tso;
1652 }
1653 #endif
1654
1655 /* ---------------------------------------------------------------------------
1656  * scheduleThread()
1657  *
1658  * scheduleThread puts a thread on the head of the runnable queue.
1659  * This will usually be done immediately after a thread is created.
1660  * The caller of scheduleThread must create the thread using e.g.
1661  * createThread and push an appropriate closure
1662  * on this thread's stack before the scheduler is invoked.
1663  * ------------------------------------------------------------------------ */
1664
1665 void
1666 scheduleThread(StgTSO *tso)
1667 {
1668   if (tso==END_TSO_QUEUE){    
1669     schedule();
1670     return;
1671   }
1672
1673   ACQUIRE_LOCK(&sched_mutex);
1674
1675   /* Put the new thread on the head of the runnable queue.  The caller
1676    * better push an appropriate closure on this thread's stack
1677    * beforehand.  In the SMP case, the thread may start running as
1678    * soon as we release the scheduler lock below.
1679    */
1680   PUSH_ON_RUN_QUEUE(tso);
1681   THREAD_RUNNABLE();
1682
1683 #if 0
1684   IF_DEBUG(scheduler,printTSO(tso));
1685 #endif
1686   RELEASE_LOCK(&sched_mutex);
1687 }
1688
1689 /* ---------------------------------------------------------------------------
1690  * startTasks()
1691  *
1692  * Start up Posix threads to run each of the scheduler tasks.
1693  * I believe the task ids are not needed in the system as defined.
1694  *  KH @ 25/10/99
1695  * ------------------------------------------------------------------------ */
1696
1697 #if defined(PAR) || defined(SMP)
1698 void
1699 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1700 {
1701   scheduleThread(END_TSO_QUEUE);
1702 }
1703 #endif
1704
1705 /* ---------------------------------------------------------------------------
1706  * initScheduler()
1707  *
1708  * Initialise the scheduler.  This resets all the queues - if the
1709  * queues contained any threads, they'll be garbage collected at the
1710  * next pass.
1711  *
1712  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1713  * ------------------------------------------------------------------------ */
1714
1715 #ifdef SMP
1716 static void
1717 term_handler(int sig STG_UNUSED)
1718 {
1719   stat_workerStop();
1720   ACQUIRE_LOCK(&term_mutex);
1721   await_death--;
1722   RELEASE_LOCK(&term_mutex);
1723   pthread_exit(NULL);
1724 }
1725 #endif
1726
1727 //@cindex initScheduler
1728 void 
1729 initScheduler(void)
1730 {
1731 #if defined(GRAN)
1732   nat i;
1733
1734   for (i=0; i<=MAX_PROC; i++) {
1735     run_queue_hds[i]      = END_TSO_QUEUE;
1736     run_queue_tls[i]      = END_TSO_QUEUE;
1737     blocked_queue_hds[i]  = END_TSO_QUEUE;
1738     blocked_queue_tls[i]  = END_TSO_QUEUE;
1739     ccalling_threadss[i]  = END_TSO_QUEUE;
1740     sleeping_queue        = END_TSO_QUEUE;
1741   }
1742 #else
1743   run_queue_hd      = END_TSO_QUEUE;
1744   run_queue_tl      = END_TSO_QUEUE;
1745   blocked_queue_hd  = END_TSO_QUEUE;
1746   blocked_queue_tl  = END_TSO_QUEUE;
1747   sleeping_queue    = END_TSO_QUEUE;
1748 #endif 
1749
1750   suspended_ccalling_threads  = END_TSO_QUEUE;
1751
1752   main_threads = NULL;
1753   all_threads  = END_TSO_QUEUE;
1754
1755   context_switch = 0;
1756   interrupted    = 0;
1757
1758   RtsFlags.ConcFlags.ctxtSwitchTicks =
1759       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1760
1761   /* Install the SIGHUP handler */
1762 #ifdef SMP
1763   {
1764     struct sigaction action,oact;
1765
1766     action.sa_handler = term_handler;
1767     sigemptyset(&action.sa_mask);
1768     action.sa_flags = 0;
1769     if (sigaction(SIGTERM, &action, &oact) != 0) {
1770       barf("can't install TERM handler");
1771     }
1772   }
1773 #endif
1774
1775 #ifdef SMP
1776   /* Allocate N Capabilities */
1777   {
1778     nat i;
1779     Capability *cap, *prev;
1780     cap  = NULL;
1781     prev = NULL;
1782     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1783       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1784       cap->link = prev;
1785       prev = cap;
1786     }
1787     free_capabilities = cap;
1788     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1789   }
1790   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1791                              n_free_capabilities););
1792 #endif
1793
1794 #if defined(SMP) || defined(PAR)
1795   initSparkPools();
1796 #endif
1797 }
1798
1799 #ifdef SMP
1800 void
1801 startTasks( void )
1802 {
1803   nat i;
1804   int r;
1805   pthread_t tid;
1806   
1807   /* make some space for saving all the thread ids */
1808   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1809                             "initScheduler:task_ids");
1810   
1811   /* and create all the threads */
1812   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1813     r = pthread_create(&tid,NULL,taskStart,NULL);
1814     if (r != 0) {
1815       barf("startTasks: Can't create new Posix thread");
1816     }
1817     task_ids[i].id = tid;
1818     task_ids[i].mut_time = 0.0;
1819     task_ids[i].mut_etime = 0.0;
1820     task_ids[i].gc_time = 0.0;
1821     task_ids[i].gc_etime = 0.0;
1822     task_ids[i].elapsedtimestart = elapsedtime();
1823     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1824   }
1825 }
1826 #endif
1827
1828 void
1829 exitScheduler( void )
1830 {
1831 #ifdef SMP
1832   nat i;
1833
1834   /* Don't want to use pthread_cancel, since we'd have to install
1835    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1836    * all our locks.
1837    */
1838 #if 0
1839   /* Cancel all our tasks */
1840   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1841     pthread_cancel(task_ids[i].id);
1842   }
1843   
1844   /* Wait for all the tasks to terminate */
1845   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1846     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1847                                task_ids[i].id));
1848     pthread_join(task_ids[i].id, NULL);
1849   }
1850 #endif
1851
1852   /* Send 'em all a SIGHUP.  That should shut 'em up.
1853    */
1854   await_death = RtsFlags.ParFlags.nNodes;
1855   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1856     pthread_kill(task_ids[i].id,SIGTERM);
1857   }
1858   while (await_death > 0) {
1859     sched_yield();
1860   }
1861 #endif
1862 }
1863
1864 /* -----------------------------------------------------------------------------
1865    Managing the per-task allocation areas.
1866    
1867    Each capability comes with an allocation area.  These are
1868    fixed-length block lists into which allocation can be done.
1869
1870    ToDo: no support for two-space collection at the moment???
1871    -------------------------------------------------------------------------- */
1872
1873 /* -----------------------------------------------------------------------------
1874  * waitThread is the external interface for running a new computation
1875  * and waiting for the result.
1876  *
1877  * In the non-SMP case, we create a new main thread, push it on the 
1878  * main-thread stack, and invoke the scheduler to run it.  The
1879  * scheduler will return when the top main thread on the stack has
1880  * completed or died, and fill in the necessary fields of the
1881  * main_thread structure.
1882  *
1883  * In the SMP case, we create a main thread as before, but we then
1884  * create a new condition variable and sleep on it.  When our new
1885  * main thread has completed, we'll be woken up and the status/result
1886  * will be in the main_thread struct.
1887  * -------------------------------------------------------------------------- */
1888
1889 int 
1890 howManyThreadsAvail ( void )
1891 {
1892    int i = 0;
1893    StgTSO* q;
1894    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1895       i++;
1896    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1897       i++;
1898    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1899       i++;
1900    return i;
1901 }
1902
1903 void
1904 finishAllThreads ( void )
1905 {
1906    do {
1907       while (run_queue_hd != END_TSO_QUEUE) {
1908          waitThread ( run_queue_hd, NULL );
1909       }
1910       while (blocked_queue_hd != END_TSO_QUEUE) {
1911          waitThread ( blocked_queue_hd, NULL );
1912       }
1913       while (sleeping_queue != END_TSO_QUEUE) {
1914          waitThread ( blocked_queue_hd, NULL );
1915       }
1916    } while 
1917       (blocked_queue_hd != END_TSO_QUEUE || 
1918        run_queue_hd     != END_TSO_QUEUE ||
1919        sleeping_queue   != END_TSO_QUEUE);
1920 }
1921
1922 SchedulerStatus
1923 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1924 {
1925   StgMainThread *m;
1926   SchedulerStatus stat;
1927
1928   ACQUIRE_LOCK(&sched_mutex);
1929   
1930   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1931
1932   m->tso = tso;
1933   m->ret = ret;
1934   m->stat = NoStatus;
1935 #ifdef SMP
1936   pthread_cond_init(&m->wakeup, NULL);
1937 #endif
1938
1939   m->link = main_threads;
1940   main_threads = m;
1941
1942   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
1943                               m->tso->id));
1944
1945 #ifdef SMP
1946   do {
1947     pthread_cond_wait(&m->wakeup, &sched_mutex);
1948   } while (m->stat == NoStatus);
1949 #elif defined(GRAN)
1950   /* GranSim specific init */
1951   CurrentTSO = m->tso;                // the TSO to run
1952   procStatus[MainProc] = Busy;        // status of main PE
1953   CurrentProc = MainProc;             // PE to run it on
1954
1955   schedule();
1956 #else
1957   schedule();
1958   ASSERT(m->stat != NoStatus);
1959 #endif
1960
1961   stat = m->stat;
1962
1963 #ifdef SMP
1964   pthread_cond_destroy(&m->wakeup);
1965 #endif
1966
1967   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
1968                               m->tso->id));
1969   free(m);
1970
1971   RELEASE_LOCK(&sched_mutex);
1972
1973   return stat;
1974 }
1975
1976 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1977 //@subsection Run queue code 
1978
1979 #if 0
1980 /* 
1981    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1982        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1983        implicit global variable that has to be correct when calling these
1984        fcts -- HWL 
1985 */
1986
1987 /* Put the new thread on the head of the runnable queue.
1988  * The caller of createThread better push an appropriate closure
1989  * on this thread's stack before the scheduler is invoked.
1990  */
1991 static /* inline */ void
1992 add_to_run_queue(tso)
1993 StgTSO* tso; 
1994 {
1995   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1996   tso->link = run_queue_hd;
1997   run_queue_hd = tso;
1998   if (run_queue_tl == END_TSO_QUEUE) {
1999     run_queue_tl = tso;
2000   }
2001 }
2002
2003 /* Put the new thread at the end of the runnable queue. */
2004 static /* inline */ void
2005 push_on_run_queue(tso)
2006 StgTSO* tso; 
2007 {
2008   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2009   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2010   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2011   if (run_queue_hd == END_TSO_QUEUE) {
2012     run_queue_hd = tso;
2013   } else {
2014     run_queue_tl->link = tso;
2015   }
2016   run_queue_tl = tso;
2017 }
2018
2019 /* 
2020    Should be inlined because it's used very often in schedule.  The tso
2021    argument is actually only needed in GranSim, where we want to have the
2022    possibility to schedule *any* TSO on the run queue, irrespective of the
2023    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2024    the run queue and dequeue the tso, adjusting the links in the queue. 
2025 */
2026 //@cindex take_off_run_queue
2027 static /* inline */ StgTSO*
2028 take_off_run_queue(StgTSO *tso) {
2029   StgTSO *t, *prev;
2030
2031   /* 
2032      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2033
2034      if tso is specified, unlink that tso from the run_queue (doesn't have
2035      to be at the beginning of the queue); GranSim only 
2036   */
2037   if (tso!=END_TSO_QUEUE) {
2038     /* find tso in queue */
2039     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2040          t!=END_TSO_QUEUE && t!=tso;
2041          prev=t, t=t->link) 
2042       /* nothing */ ;
2043     ASSERT(t==tso);
2044     /* now actually dequeue the tso */
2045     if (prev!=END_TSO_QUEUE) {
2046       ASSERT(run_queue_hd!=t);
2047       prev->link = t->link;
2048     } else {
2049       /* t is at beginning of thread queue */
2050       ASSERT(run_queue_hd==t);
2051       run_queue_hd = t->link;
2052     }
2053     /* t is at end of thread queue */
2054     if (t->link==END_TSO_QUEUE) {
2055       ASSERT(t==run_queue_tl);
2056       run_queue_tl = prev;
2057     } else {
2058       ASSERT(run_queue_tl!=t);
2059     }
2060     t->link = END_TSO_QUEUE;
2061   } else {
2062     /* take tso from the beginning of the queue; std concurrent code */
2063     t = run_queue_hd;
2064     if (t != END_TSO_QUEUE) {
2065       run_queue_hd = t->link;
2066       t->link = END_TSO_QUEUE;
2067       if (run_queue_hd == END_TSO_QUEUE) {
2068         run_queue_tl = END_TSO_QUEUE;
2069       }
2070     }
2071   }
2072   return t;
2073 }
2074
2075 #endif /* 0 */
2076
2077 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2078 //@subsection Garbage Collextion Routines
2079
2080 /* ---------------------------------------------------------------------------
2081    Where are the roots that we know about?
2082
2083         - all the threads on the runnable queue
2084         - all the threads on the blocked queue
2085         - all the threads on the sleeping queue
2086         - all the thread currently executing a _ccall_GC
2087         - all the "main threads"
2088      
2089    ------------------------------------------------------------------------ */
2090
2091 /* This has to be protected either by the scheduler monitor, or by the
2092         garbage collection monitor (probably the latter).
2093         KH @ 25/10/99
2094 */
2095
2096 static void
2097 GetRoots(evac_fn evac)
2098 {
2099   StgMainThread *m;
2100
2101 #if defined(GRAN)
2102   {
2103     nat i;
2104     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2105       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2106           evac((StgClosure **)&run_queue_hds[i]);
2107       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2108           evac((StgClosure **)&run_queue_tls[i]);
2109       
2110       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2111           evac((StgClosure **)&blocked_queue_hds[i]);
2112       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2113           evac((StgClosure **)&blocked_queue_tls[i]);
2114       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2115           evac((StgClosure **)&ccalling_threads[i]);
2116     }
2117   }
2118
2119   markEventQueue();
2120
2121 #else /* !GRAN */
2122   if (run_queue_hd != END_TSO_QUEUE) {
2123       ASSERT(run_queue_tl != END_TSO_QUEUE);
2124       evac((StgClosure **)&run_queue_hd);
2125       evac((StgClosure **)&run_queue_tl);
2126   }
2127   
2128   if (blocked_queue_hd != END_TSO_QUEUE) {
2129       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2130       evac((StgClosure **)&blocked_queue_hd);
2131       evac((StgClosure **)&blocked_queue_tl);
2132   }
2133   
2134   if (sleeping_queue != END_TSO_QUEUE) {
2135       evac((StgClosure **)&sleeping_queue);
2136   }
2137 #endif 
2138
2139   for (m = main_threads; m != NULL; m = m->link) {
2140       evac((StgClosure **)&m->tso);
2141   }
2142   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2143       evac((StgClosure **)&suspended_ccalling_threads);
2144   }
2145
2146 #if defined(SMP) || defined(PAR) || defined(GRAN)
2147   markSparkQueue(evac);
2148 #endif
2149 }
2150
2151 /* -----------------------------------------------------------------------------
2152    performGC
2153
2154    This is the interface to the garbage collector from Haskell land.
2155    We provide this so that external C code can allocate and garbage
2156    collect when called from Haskell via _ccall_GC.
2157
2158    It might be useful to provide an interface whereby the programmer
2159    can specify more roots (ToDo).
2160    
2161    This needs to be protected by the GC condition variable above.  KH.
2162    -------------------------------------------------------------------------- */
2163
2164 void (*extra_roots)(evac_fn);
2165
2166 void
2167 performGC(void)
2168 {
2169   GarbageCollect(GetRoots,rtsFalse);
2170 }
2171
2172 void
2173 performMajorGC(void)
2174 {
2175   GarbageCollect(GetRoots,rtsTrue);
2176 }
2177
2178 static void
2179 AllRoots(evac_fn evac)
2180 {
2181     GetRoots(evac);             // the scheduler's roots
2182     extra_roots(evac);          // the user's roots
2183 }
2184
2185 void
2186 performGCWithRoots(void (*get_roots)(evac_fn))
2187 {
2188   extra_roots = get_roots;
2189   GarbageCollect(AllRoots,rtsFalse);
2190 }
2191
2192 /* -----------------------------------------------------------------------------
2193    Stack overflow
2194
2195    If the thread has reached its maximum stack size, then raise the
2196    StackOverflow exception in the offending thread.  Otherwise
2197    relocate the TSO into a larger chunk of memory and adjust its stack
2198    size appropriately.
2199    -------------------------------------------------------------------------- */
2200
2201 static StgTSO *
2202 threadStackOverflow(StgTSO *tso)
2203 {
2204   nat new_stack_size, new_tso_size, diff, stack_words;
2205   StgPtr new_sp;
2206   StgTSO *dest;
2207
2208   IF_DEBUG(sanity,checkTSO(tso));
2209   if (tso->stack_size >= tso->max_stack_size) {
2210
2211     IF_DEBUG(gc,
2212              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2213                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2214              /* If we're debugging, just print out the top of the stack */
2215              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2216                                               tso->sp+64)));
2217
2218     /* Send this thread the StackOverflow exception */
2219     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2220     return tso;
2221   }
2222
2223   /* Try to double the current stack size.  If that takes us over the
2224    * maximum stack size for this thread, then use the maximum instead.
2225    * Finally round up so the TSO ends up as a whole number of blocks.
2226    */
2227   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2228   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2229                                        TSO_STRUCT_SIZE)/sizeof(W_);
2230   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2231   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2232
2233   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2234
2235   dest = (StgTSO *)allocate(new_tso_size);
2236   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2237
2238   /* copy the TSO block and the old stack into the new area */
2239   memcpy(dest,tso,TSO_STRUCT_SIZE);
2240   stack_words = tso->stack + tso->stack_size - tso->sp;
2241   new_sp = (P_)dest + new_tso_size - stack_words;
2242   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2243
2244   /* relocate the stack pointers... */
2245   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2246   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2247   dest->sp    = new_sp;
2248   dest->stack_size = new_stack_size;
2249         
2250   /* and relocate the update frame list */
2251   relocate_stack(dest, diff);
2252
2253   /* Mark the old TSO as relocated.  We have to check for relocated
2254    * TSOs in the garbage collector and any primops that deal with TSOs.
2255    *
2256    * It's important to set the sp and su values to just beyond the end
2257    * of the stack, so we don't attempt to scavenge any part of the
2258    * dead TSO's stack.
2259    */
2260   tso->what_next = ThreadRelocated;
2261   tso->link = dest;
2262   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2263   tso->su = (StgUpdateFrame *)tso->sp;
2264   tso->why_blocked = NotBlocked;
2265   dest->mut_link = NULL;
2266
2267   IF_PAR_DEBUG(verbose,
2268                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2269                      tso->id, tso, tso->stack_size);
2270                /* If we're debugging, just print out the top of the stack */
2271                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2272                                                 tso->sp+64)));
2273   
2274   IF_DEBUG(sanity,checkTSO(tso));
2275 #if 0
2276   IF_DEBUG(scheduler,printTSO(dest));
2277 #endif
2278
2279   return dest;
2280 }
2281
2282 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2283 //@subsection Blocking Queue Routines
2284
2285 /* ---------------------------------------------------------------------------
2286    Wake up a queue that was blocked on some resource.
2287    ------------------------------------------------------------------------ */
2288
2289 #if defined(GRAN)
2290 static inline void
2291 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2292 {
2293 }
2294 #elif defined(PAR)
2295 static inline void
2296 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2297 {
2298   /* write RESUME events to log file and
2299      update blocked and fetch time (depending on type of the orig closure) */
2300   if (RtsFlags.ParFlags.ParStats.Full) {
2301     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2302                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2303                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2304     if (EMPTY_RUN_QUEUE())
2305       emitSchedule = rtsTrue;
2306
2307     switch (get_itbl(node)->type) {
2308         case FETCH_ME_BQ:
2309           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2310           break;
2311         case RBH:
2312         case FETCH_ME:
2313         case BLACKHOLE_BQ:
2314           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2315           break;
2316 #ifdef DIST
2317         case MVAR:
2318           break;
2319 #endif    
2320         default:
2321           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2322         }
2323       }
2324 }
2325 #endif
2326
2327 #if defined(GRAN)
2328 static StgBlockingQueueElement *
2329 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2330 {
2331     StgTSO *tso;
2332     PEs node_loc, tso_loc;
2333
2334     node_loc = where_is(node); // should be lifted out of loop
2335     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2336     tso_loc = where_is((StgClosure *)tso);
2337     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2338       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2339       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2340       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2341       // insertThread(tso, node_loc);
2342       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2343                 ResumeThread,
2344                 tso, node, (rtsSpark*)NULL);
2345       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2346       // len_local++;
2347       // len++;
2348     } else { // TSO is remote (actually should be FMBQ)
2349       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2350                                   RtsFlags.GranFlags.Costs.gunblocktime +
2351                                   RtsFlags.GranFlags.Costs.latency;
2352       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2353                 UnblockThread,
2354                 tso, node, (rtsSpark*)NULL);
2355       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2356       // len++;
2357     }
2358     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2359     IF_GRAN_DEBUG(bq,
2360                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2361                           (node_loc==tso_loc ? "Local" : "Global"), 
2362                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2363     tso->block_info.closure = NULL;
2364     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2365                              tso->id, tso));
2366 }
2367 #elif defined(PAR)
2368 static StgBlockingQueueElement *
2369 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2370 {
2371     StgBlockingQueueElement *next;
2372
2373     switch (get_itbl(bqe)->type) {
2374     case TSO:
2375       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2376       /* if it's a TSO just push it onto the run_queue */
2377       next = bqe->link;
2378       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2379       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2380       THREAD_RUNNABLE();
2381       unblockCount(bqe, node);
2382       /* reset blocking status after dumping event */
2383       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2384       break;
2385
2386     case BLOCKED_FETCH:
2387       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2388       next = bqe->link;
2389       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2390       PendingFetches = (StgBlockedFetch *)bqe;
2391       break;
2392
2393 # if defined(DEBUG)
2394       /* can ignore this case in a non-debugging setup; 
2395          see comments on RBHSave closures above */
2396     case CONSTR:
2397       /* check that the closure is an RBHSave closure */
2398       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2399              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2400              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2401       break;
2402
2403     default:
2404       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2405            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2406            (StgClosure *)bqe);
2407 # endif
2408     }
2409   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2410   return next;
2411 }
2412
2413 #else /* !GRAN && !PAR */
2414 static StgTSO *
2415 unblockOneLocked(StgTSO *tso)
2416 {
2417   StgTSO *next;
2418
2419   ASSERT(get_itbl(tso)->type == TSO);
2420   ASSERT(tso->why_blocked != NotBlocked);
2421   tso->why_blocked = NotBlocked;
2422   next = tso->link;
2423   PUSH_ON_RUN_QUEUE(tso);
2424   THREAD_RUNNABLE();
2425   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2426   return next;
2427 }
2428 #endif
2429
2430 #if defined(GRAN) || defined(PAR)
2431 inline StgBlockingQueueElement *
2432 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2433 {
2434   ACQUIRE_LOCK(&sched_mutex);
2435   bqe = unblockOneLocked(bqe, node);
2436   RELEASE_LOCK(&sched_mutex);
2437   return bqe;
2438 }
2439 #else
2440 inline StgTSO *
2441 unblockOne(StgTSO *tso)
2442 {
2443   ACQUIRE_LOCK(&sched_mutex);
2444   tso = unblockOneLocked(tso);
2445   RELEASE_LOCK(&sched_mutex);
2446   return tso;
2447 }
2448 #endif
2449
2450 #if defined(GRAN)
2451 void 
2452 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2453 {
2454   StgBlockingQueueElement *bqe;
2455   PEs node_loc;
2456   nat len = 0; 
2457
2458   IF_GRAN_DEBUG(bq, 
2459                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2460                       node, CurrentProc, CurrentTime[CurrentProc], 
2461                       CurrentTSO->id, CurrentTSO));
2462
2463   node_loc = where_is(node);
2464
2465   ASSERT(q == END_BQ_QUEUE ||
2466          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2467          get_itbl(q)->type == CONSTR); // closure (type constructor)
2468   ASSERT(is_unique(node));
2469
2470   /* FAKE FETCH: magically copy the node to the tso's proc;
2471      no Fetch necessary because in reality the node should not have been 
2472      moved to the other PE in the first place
2473   */
2474   if (CurrentProc!=node_loc) {
2475     IF_GRAN_DEBUG(bq, 
2476                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2477                         node, node_loc, CurrentProc, CurrentTSO->id, 
2478                         // CurrentTSO, where_is(CurrentTSO),
2479                         node->header.gran.procs));
2480     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2481     IF_GRAN_DEBUG(bq, 
2482                   belch("## new bitmask of node %p is %#x",
2483                         node, node->header.gran.procs));
2484     if (RtsFlags.GranFlags.GranSimStats.Global) {
2485       globalGranStats.tot_fake_fetches++;
2486     }
2487   }
2488
2489   bqe = q;
2490   // ToDo: check: ASSERT(CurrentProc==node_loc);
2491   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2492     //next = bqe->link;
2493     /* 
2494        bqe points to the current element in the queue
2495        next points to the next element in the queue
2496     */
2497     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2498     //tso_loc = where_is(tso);
2499     len++;
2500     bqe = unblockOneLocked(bqe, node);
2501   }
2502
2503   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2504      the closure to make room for the anchor of the BQ */
2505   if (bqe!=END_BQ_QUEUE) {
2506     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2507     /*
2508     ASSERT((info_ptr==&RBH_Save_0_info) ||
2509            (info_ptr==&RBH_Save_1_info) ||
2510            (info_ptr==&RBH_Save_2_info));
2511     */
2512     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2513     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2514     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2515
2516     IF_GRAN_DEBUG(bq,
2517                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2518                         node, info_type(node)));
2519   }
2520
2521   /* statistics gathering */
2522   if (RtsFlags.GranFlags.GranSimStats.Global) {
2523     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2524     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2525     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2526     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2527   }
2528   IF_GRAN_DEBUG(bq,
2529                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2530                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2531 }
2532 #elif defined(PAR)
2533 void 
2534 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2535 {
2536   StgBlockingQueueElement *bqe;
2537
2538   ACQUIRE_LOCK(&sched_mutex);
2539
2540   IF_PAR_DEBUG(verbose, 
2541                belch("##-_ AwBQ for node %p on [%x]: ",
2542                      node, mytid));
2543 #ifdef DIST  
2544   //RFP
2545   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2546     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2547     return;
2548   }
2549 #endif
2550   
2551   ASSERT(q == END_BQ_QUEUE ||
2552          get_itbl(q)->type == TSO ||           
2553          get_itbl(q)->type == BLOCKED_FETCH || 
2554          get_itbl(q)->type == CONSTR); 
2555
2556   bqe = q;
2557   while (get_itbl(bqe)->type==TSO || 
2558          get_itbl(bqe)->type==BLOCKED_FETCH) {
2559     bqe = unblockOneLocked(bqe, node);
2560   }
2561   RELEASE_LOCK(&sched_mutex);
2562 }
2563
2564 #else   /* !GRAN && !PAR */
2565 void
2566 awakenBlockedQueue(StgTSO *tso)
2567 {
2568   ACQUIRE_LOCK(&sched_mutex);
2569   while (tso != END_TSO_QUEUE) {
2570     tso = unblockOneLocked(tso);
2571   }
2572   RELEASE_LOCK(&sched_mutex);
2573 }
2574 #endif
2575
2576 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2577 //@subsection Exception Handling Routines
2578
2579 /* ---------------------------------------------------------------------------
2580    Interrupt execution
2581    - usually called inside a signal handler so it mustn't do anything fancy.   
2582    ------------------------------------------------------------------------ */
2583
2584 void
2585 interruptStgRts(void)
2586 {
2587     interrupted    = 1;
2588     context_switch = 1;
2589 }
2590
2591 /* -----------------------------------------------------------------------------
2592    Unblock a thread
2593
2594    This is for use when we raise an exception in another thread, which
2595    may be blocked.
2596    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2597    -------------------------------------------------------------------------- */
2598
2599 #if defined(GRAN) || defined(PAR)
2600 /*
2601   NB: only the type of the blocking queue is different in GranSim and GUM
2602       the operations on the queue-elements are the same
2603       long live polymorphism!
2604 */
2605 static void
2606 unblockThread(StgTSO *tso)
2607 {
2608   StgBlockingQueueElement *t, **last;
2609
2610   ACQUIRE_LOCK(&sched_mutex);
2611   switch (tso->why_blocked) {
2612
2613   case NotBlocked:
2614     return;  /* not blocked */
2615
2616   case BlockedOnMVar:
2617     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2618     {
2619       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2620       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2621
2622       last = (StgBlockingQueueElement **)&mvar->head;
2623       for (t = (StgBlockingQueueElement *)mvar->head; 
2624            t != END_BQ_QUEUE; 
2625            last = &t->link, last_tso = t, t = t->link) {
2626         if (t == (StgBlockingQueueElement *)tso) {
2627           *last = (StgBlockingQueueElement *)tso->link;
2628           if (mvar->tail == tso) {
2629             mvar->tail = (StgTSO *)last_tso;
2630           }
2631           goto done;
2632         }
2633       }
2634       barf("unblockThread (MVAR): TSO not found");
2635     }
2636
2637   case BlockedOnBlackHole:
2638     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2639     {
2640       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2641
2642       last = &bq->blocking_queue;
2643       for (t = bq->blocking_queue; 
2644            t != END_BQ_QUEUE; 
2645            last = &t->link, t = t->link) {
2646         if (t == (StgBlockingQueueElement *)tso) {
2647           *last = (StgBlockingQueueElement *)tso->link;
2648           goto done;
2649         }
2650       }
2651       barf("unblockThread (BLACKHOLE): TSO not found");
2652     }
2653
2654   case BlockedOnException:
2655     {
2656       StgTSO *target  = tso->block_info.tso;
2657
2658       ASSERT(get_itbl(target)->type == TSO);
2659
2660       if (target->what_next == ThreadRelocated) {
2661           target = target->link;
2662           ASSERT(get_itbl(target)->type == TSO);
2663       }
2664
2665       ASSERT(target->blocked_exceptions != NULL);
2666
2667       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2668       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2669            t != END_BQ_QUEUE; 
2670            last = &t->link, t = t->link) {
2671         ASSERT(get_itbl(t)->type == TSO);
2672         if (t == (StgBlockingQueueElement *)tso) {
2673           *last = (StgBlockingQueueElement *)tso->link;
2674           goto done;
2675         }
2676       }
2677       barf("unblockThread (Exception): TSO not found");
2678     }
2679
2680   case BlockedOnRead:
2681   case BlockedOnWrite:
2682     {
2683       /* take TSO off blocked_queue */
2684       StgBlockingQueueElement *prev = NULL;
2685       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2686            prev = t, t = t->link) {
2687         if (t == (StgBlockingQueueElement *)tso) {
2688           if (prev == NULL) {
2689             blocked_queue_hd = (StgTSO *)t->link;
2690             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2691               blocked_queue_tl = END_TSO_QUEUE;
2692             }
2693           } else {
2694             prev->link = t->link;
2695             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2696               blocked_queue_tl = (StgTSO *)prev;
2697             }
2698           }
2699           goto done;
2700         }
2701       }
2702       barf("unblockThread (I/O): TSO not found");
2703     }
2704
2705   case BlockedOnDelay:
2706     {
2707       /* take TSO off sleeping_queue */
2708       StgBlockingQueueElement *prev = NULL;
2709       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2710            prev = t, t = t->link) {
2711         if (t == (StgBlockingQueueElement *)tso) {
2712           if (prev == NULL) {
2713             sleeping_queue = (StgTSO *)t->link;
2714           } else {
2715             prev->link = t->link;
2716           }
2717           goto done;
2718         }
2719       }
2720       barf("unblockThread (I/O): TSO not found");
2721     }
2722
2723   default:
2724     barf("unblockThread");
2725   }
2726
2727  done:
2728   tso->link = END_TSO_QUEUE;
2729   tso->why_blocked = NotBlocked;
2730   tso->block_info.closure = NULL;
2731   PUSH_ON_RUN_QUEUE(tso);
2732   RELEASE_LOCK(&sched_mutex);
2733 }
2734 #else
2735 static void
2736 unblockThread(StgTSO *tso)
2737 {
2738   StgTSO *t, **last;
2739
2740   ACQUIRE_LOCK(&sched_mutex);
2741   switch (tso->why_blocked) {
2742
2743   case NotBlocked:
2744     return;  /* not blocked */
2745
2746   case BlockedOnMVar:
2747     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2748     {
2749       StgTSO *last_tso = END_TSO_QUEUE;
2750       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2751
2752       last = &mvar->head;
2753       for (t = mvar->head; t != END_TSO_QUEUE; 
2754            last = &t->link, last_tso = t, t = t->link) {
2755         if (t == tso) {
2756           *last = tso->link;
2757           if (mvar->tail == tso) {
2758             mvar->tail = last_tso;
2759           }
2760           goto done;
2761         }
2762       }
2763       barf("unblockThread (MVAR): TSO not found");
2764     }
2765
2766   case BlockedOnBlackHole:
2767     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2768     {
2769       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2770
2771       last = &bq->blocking_queue;
2772       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2773            last = &t->link, t = t->link) {
2774         if (t == tso) {
2775           *last = tso->link;
2776           goto done;
2777         }
2778       }
2779       barf("unblockThread (BLACKHOLE): TSO not found");
2780     }
2781
2782   case BlockedOnException:
2783     {
2784       StgTSO *target  = tso->block_info.tso;
2785
2786       ASSERT(get_itbl(target)->type == TSO);
2787
2788       while (target->what_next == ThreadRelocated) {
2789           target = target->link;
2790           ASSERT(get_itbl(target)->type == TSO);
2791       }
2792       
2793       ASSERT(target->blocked_exceptions != NULL);
2794
2795       last = &target->blocked_exceptions;
2796       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2797            last = &t->link, t = t->link) {
2798         ASSERT(get_itbl(t)->type == TSO);
2799         if (t == tso) {
2800           *last = tso->link;
2801           goto done;
2802         }
2803       }
2804       barf("unblockThread (Exception): TSO not found");
2805     }
2806
2807   case BlockedOnRead:
2808   case BlockedOnWrite:
2809     {
2810       StgTSO *prev = NULL;
2811       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2812            prev = t, t = t->link) {
2813         if (t == tso) {
2814           if (prev == NULL) {
2815             blocked_queue_hd = t->link;
2816             if (blocked_queue_tl == t) {
2817               blocked_queue_tl = END_TSO_QUEUE;
2818             }
2819           } else {
2820             prev->link = t->link;
2821             if (blocked_queue_tl == t) {
2822               blocked_queue_tl = prev;
2823             }
2824           }
2825           goto done;
2826         }
2827       }
2828       barf("unblockThread (I/O): TSO not found");
2829     }
2830
2831   case BlockedOnDelay:
2832     {
2833       StgTSO *prev = NULL;
2834       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2835            prev = t, t = t->link) {
2836         if (t == tso) {
2837           if (prev == NULL) {
2838             sleeping_queue = t->link;
2839           } else {
2840             prev->link = t->link;
2841           }
2842           goto done;
2843         }
2844       }
2845       barf("unblockThread (I/O): TSO not found");
2846     }
2847
2848   default:
2849     barf("unblockThread");
2850   }
2851
2852  done:
2853   tso->link = END_TSO_QUEUE;
2854   tso->why_blocked = NotBlocked;
2855   tso->block_info.closure = NULL;
2856   PUSH_ON_RUN_QUEUE(tso);
2857   RELEASE_LOCK(&sched_mutex);
2858 }
2859 #endif
2860
2861 /* -----------------------------------------------------------------------------
2862  * raiseAsync()
2863  *
2864  * The following function implements the magic for raising an
2865  * asynchronous exception in an existing thread.
2866  *
2867  * We first remove the thread from any queue on which it might be
2868  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2869  *
2870  * We strip the stack down to the innermost CATCH_FRAME, building
2871  * thunks in the heap for all the active computations, so they can 
2872  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2873  * an application of the handler to the exception, and push it on
2874  * the top of the stack.
2875  * 
2876  * How exactly do we save all the active computations?  We create an
2877  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2878  * AP_UPDs pushes everything from the corresponding update frame
2879  * upwards onto the stack.  (Actually, it pushes everything up to the
2880  * next update frame plus a pointer to the next AP_UPD object.
2881  * Entering the next AP_UPD object pushes more onto the stack until we
2882  * reach the last AP_UPD object - at which point the stack should look
2883  * exactly as it did when we killed the TSO and we can continue
2884  * execution by entering the closure on top of the stack.
2885  *
2886  * We can also kill a thread entirely - this happens if either (a) the 
2887  * exception passed to raiseAsync is NULL, or (b) there's no
2888  * CATCH_FRAME on the stack.  In either case, we strip the entire
2889  * stack and replace the thread with a zombie.
2890  *
2891  * -------------------------------------------------------------------------- */
2892  
2893 void 
2894 deleteThread(StgTSO *tso)
2895 {
2896   raiseAsync(tso,NULL);
2897 }
2898
2899 void
2900 raiseAsync(StgTSO *tso, StgClosure *exception)
2901 {
2902   StgUpdateFrame* su = tso->su;
2903   StgPtr          sp = tso->sp;
2904   
2905   /* Thread already dead? */
2906   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2907     return;
2908   }
2909
2910   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2911
2912   /* Remove it from any blocking queues */
2913   unblockThread(tso);
2914
2915   /* The stack freezing code assumes there's a closure pointer on
2916    * the top of the stack.  This isn't always the case with compiled
2917    * code, so we have to push a dummy closure on the top which just
2918    * returns to the next return address on the stack.
2919    */
2920   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2921     *(--sp) = (W_)&stg_dummy_ret_closure;
2922   }
2923
2924   while (1) {
2925     nat words = ((P_)su - (P_)sp) - 1;
2926     nat i;
2927     StgAP_UPD * ap;
2928
2929     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2930      * then build PAP(handler,exception,realworld#), and leave it on
2931      * top of the stack ready to enter.
2932      */
2933     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2934       StgCatchFrame *cf = (StgCatchFrame *)su;
2935       /* we've got an exception to raise, so let's pass it to the
2936        * handler in this frame.
2937        */
2938       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2939       TICK_ALLOC_UPD_PAP(3,0);
2940       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
2941               
2942       ap->n_args = 2;
2943       ap->fun = cf->handler;    /* :: Exception -> IO a */
2944       ap->payload[0] = exception;
2945       ap->payload[1] = ARG_TAG(0); /* realworld token */
2946
2947       /* throw away the stack from Sp up to and including the
2948        * CATCH_FRAME.
2949        */
2950       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2951       tso->su = cf->link;
2952
2953       /* Restore the blocked/unblocked state for asynchronous exceptions
2954        * at the CATCH_FRAME.  
2955        *
2956        * If exceptions were unblocked at the catch, arrange that they
2957        * are unblocked again after executing the handler by pushing an
2958        * unblockAsyncExceptions_ret stack frame.
2959        */
2960       if (!cf->exceptions_blocked) {
2961         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
2962       }
2963       
2964       /* Ensure that async exceptions are blocked when running the handler.
2965        */
2966       if (tso->blocked_exceptions == NULL) {
2967         tso->blocked_exceptions = END_TSO_QUEUE;
2968       }
2969       
2970       /* Put the newly-built PAP on top of the stack, ready to execute
2971        * when the thread restarts.
2972        */
2973       sp[0] = (W_)ap;
2974       tso->sp = sp;
2975       tso->what_next = ThreadEnterGHC;
2976       IF_DEBUG(sanity, checkTSO(tso));
2977       return;
2978     }
2979
2980     /* First build an AP_UPD consisting of the stack chunk above the
2981      * current update frame, with the top word on the stack as the
2982      * fun field.
2983      */
2984     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2985     
2986     ASSERT(words >= 0);
2987     
2988     ap->n_args = words;
2989     ap->fun    = (StgClosure *)sp[0];
2990     sp++;
2991     for(i=0; i < (nat)words; ++i) {
2992       ap->payload[i] = (StgClosure *)*sp++;
2993     }
2994     
2995     switch (get_itbl(su)->type) {
2996       
2997     case UPDATE_FRAME:
2998       {
2999         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3000         TICK_ALLOC_UP_THK(words+1,0);
3001         
3002         IF_DEBUG(scheduler,
3003                  fprintf(stderr,  "scheduler: Updating ");
3004                  printPtr((P_)su->updatee); 
3005                  fprintf(stderr,  " with ");
3006                  printObj((StgClosure *)ap);
3007                  );
3008         
3009         /* Replace the updatee with an indirection - happily
3010          * this will also wake up any threads currently
3011          * waiting on the result.
3012          *
3013          * Warning: if we're in a loop, more than one update frame on
3014          * the stack may point to the same object.  Be careful not to
3015          * overwrite an IND_OLDGEN in this case, because we'll screw
3016          * up the mutable lists.  To be on the safe side, don't
3017          * overwrite any kind of indirection at all.  See also
3018          * threadSqueezeStack in GC.c, where we have to make a similar
3019          * check.
3020          */
3021         if (!closure_IND(su->updatee)) {
3022             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3023         }
3024         su = su->link;
3025         sp += sizeofW(StgUpdateFrame) -1;
3026         sp[0] = (W_)ap; /* push onto stack */
3027         break;
3028       }
3029
3030     case CATCH_FRAME:
3031       {
3032         StgCatchFrame *cf = (StgCatchFrame *)su;
3033         StgClosure* o;
3034         
3035         /* We want a PAP, not an AP_UPD.  Fortunately, the
3036          * layout's the same.
3037          */
3038         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3039         TICK_ALLOC_UPD_PAP(words+1,0);
3040         
3041         /* now build o = FUN(catch,ap,handler) */
3042         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3043         TICK_ALLOC_FUN(2,0);
3044         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3045         o->payload[0] = (StgClosure *)ap;
3046         o->payload[1] = cf->handler;
3047         
3048         IF_DEBUG(scheduler,
3049                  fprintf(stderr,  "scheduler: Built ");
3050                  printObj((StgClosure *)o);
3051                  );
3052         
3053         /* pop the old handler and put o on the stack */
3054         su = cf->link;
3055         sp += sizeofW(StgCatchFrame) - 1;
3056         sp[0] = (W_)o;
3057         break;
3058       }
3059       
3060     case SEQ_FRAME:
3061       {
3062         StgSeqFrame *sf = (StgSeqFrame *)su;
3063         StgClosure* o;
3064         
3065         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3066         TICK_ALLOC_UPD_PAP(words+1,0);
3067         
3068         /* now build o = FUN(seq,ap) */
3069         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3070         TICK_ALLOC_SE_THK(1,0);
3071         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3072         o->payload[0] = (StgClosure *)ap;
3073         
3074         IF_DEBUG(scheduler,
3075                  fprintf(stderr,  "scheduler: Built ");
3076                  printObj((StgClosure *)o);
3077                  );
3078         
3079         /* pop the old handler and put o on the stack */
3080         su = sf->link;
3081         sp += sizeofW(StgSeqFrame) - 1;
3082         sp[0] = (W_)o;
3083         break;
3084       }
3085       
3086     case STOP_FRAME:
3087       /* We've stripped the entire stack, the thread is now dead. */
3088       sp += sizeofW(StgStopFrame) - 1;
3089       sp[0] = (W_)exception;    /* save the exception */
3090       tso->what_next = ThreadKilled;
3091       tso->su = (StgUpdateFrame *)(sp+1);
3092       tso->sp = sp;
3093       return;
3094
3095     default:
3096       barf("raiseAsync");
3097     }
3098   }
3099   barf("raiseAsync");
3100 }
3101
3102 /* -----------------------------------------------------------------------------
3103    resurrectThreads is called after garbage collection on the list of
3104    threads found to be garbage.  Each of these threads will be woken
3105    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3106    on an MVar, or NonTermination if the thread was blocked on a Black
3107    Hole.
3108    -------------------------------------------------------------------------- */
3109
3110 void
3111 resurrectThreads( StgTSO *threads )
3112 {
3113   StgTSO *tso, *next;
3114
3115   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3116     next = tso->global_link;
3117     tso->global_link = all_threads;
3118     all_threads = tso;
3119     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3120
3121     switch (tso->why_blocked) {
3122     case BlockedOnMVar:
3123     case BlockedOnException:
3124       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3125       break;
3126     case BlockedOnBlackHole:
3127       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3128       break;
3129     case NotBlocked:
3130       /* This might happen if the thread was blocked on a black hole
3131        * belonging to a thread that we've just woken up (raiseAsync
3132        * can wake up threads, remember...).
3133        */
3134       continue;
3135     default:
3136       barf("resurrectThreads: thread blocked in a strange way");
3137     }
3138   }
3139 }
3140
3141 /* -----------------------------------------------------------------------------
3142  * Blackhole detection: if we reach a deadlock, test whether any
3143  * threads are blocked on themselves.  Any threads which are found to
3144  * be self-blocked get sent a NonTermination exception.
3145  *
3146  * This is only done in a deadlock situation in order to avoid
3147  * performance overhead in the normal case.
3148  * -------------------------------------------------------------------------- */
3149
3150 static void
3151 detectBlackHoles( void )
3152 {
3153     StgTSO *t = all_threads;
3154     StgUpdateFrame *frame;
3155     StgClosure *blocked_on;
3156
3157     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3158
3159         while (t->what_next == ThreadRelocated) {
3160             t = t->link;
3161             ASSERT(get_itbl(t)->type == TSO);
3162         }
3163       
3164         if (t->why_blocked != BlockedOnBlackHole) {
3165             continue;
3166         }
3167
3168         blocked_on = t->block_info.closure;
3169
3170         for (frame = t->su; ; frame = frame->link) {
3171             switch (get_itbl(frame)->type) {
3172
3173             case UPDATE_FRAME:
3174                 if (frame->updatee == blocked_on) {
3175                     /* We are blocking on one of our own computations, so
3176                      * send this thread the NonTermination exception.  
3177                      */
3178                     IF_DEBUG(scheduler, 
3179                              sched_belch("thread %d is blocked on itself", t->id));
3180                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3181                     goto done;
3182                 }
3183                 else {
3184                     continue;
3185                 }
3186
3187             case CATCH_FRAME:
3188             case SEQ_FRAME:
3189                 continue;
3190                 
3191             case STOP_FRAME:
3192                 break;
3193             }
3194             break;
3195         }
3196
3197     done: ;
3198     }   
3199 }
3200
3201 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3202 //@subsection Debugging Routines
3203
3204 /* -----------------------------------------------------------------------------
3205    Debugging: why is a thread blocked
3206    -------------------------------------------------------------------------- */
3207
3208 #ifdef DEBUG
3209
3210 void
3211 printThreadBlockage(StgTSO *tso)
3212 {
3213   switch (tso->why_blocked) {
3214   case BlockedOnRead:
3215     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3216     break;
3217   case BlockedOnWrite:
3218     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3219     break;
3220   case BlockedOnDelay:
3221     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3222     break;
3223   case BlockedOnMVar:
3224     fprintf(stderr,"is blocked on an MVar");
3225     break;
3226   case BlockedOnException:
3227     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3228             tso->block_info.tso->id);
3229     break;
3230   case BlockedOnBlackHole:
3231     fprintf(stderr,"is blocked on a black hole");
3232     break;
3233   case NotBlocked:
3234     fprintf(stderr,"is not blocked");
3235     break;
3236 #if defined(PAR)
3237   case BlockedOnGA:
3238     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3239             tso->block_info.closure, info_type(tso->block_info.closure));
3240     break;
3241   case BlockedOnGA_NoSend:
3242     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3243             tso->block_info.closure, info_type(tso->block_info.closure));
3244     break;
3245 #endif
3246   default:
3247     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3248          tso->why_blocked, tso->id, tso);
3249   }
3250 }
3251
3252 void
3253 printThreadStatus(StgTSO *tso)
3254 {
3255   switch (tso->what_next) {
3256   case ThreadKilled:
3257     fprintf(stderr,"has been killed");
3258     break;
3259   case ThreadComplete:
3260     fprintf(stderr,"has completed");
3261     break;
3262   default:
3263     printThreadBlockage(tso);
3264   }
3265 }
3266
3267 void
3268 printAllThreads(void)
3269 {
3270   StgTSO *t;
3271
3272 # if defined(GRAN)
3273   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3274   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3275                        time_string, rtsFalse/*no commas!*/);
3276
3277   sched_belch("all threads at [%s]:", time_string);
3278 # elif defined(PAR)
3279   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3280   ullong_format_string(CURRENT_TIME,
3281                        time_string, rtsFalse/*no commas!*/);
3282
3283   sched_belch("all threads at [%s]:", time_string);
3284 # else
3285   sched_belch("all threads:");
3286 # endif
3287
3288   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3289     fprintf(stderr, "\tthread %d ", t->id);
3290     printThreadStatus(t);
3291     fprintf(stderr,"\n");
3292   }
3293 }
3294     
3295 /* 
3296    Print a whole blocking queue attached to node (debugging only).
3297 */
3298 //@cindex print_bq
3299 # if defined(PAR)
3300 void 
3301 print_bq (StgClosure *node)
3302 {
3303   StgBlockingQueueElement *bqe;
3304   StgTSO *tso;
3305   rtsBool end;
3306
3307   fprintf(stderr,"## BQ of closure %p (%s): ",
3308           node, info_type(node));
3309
3310   /* should cover all closures that may have a blocking queue */
3311   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3312          get_itbl(node)->type == FETCH_ME_BQ ||
3313          get_itbl(node)->type == RBH ||
3314          get_itbl(node)->type == MVAR);
3315     
3316   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3317
3318   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3319 }
3320
3321 /* 
3322    Print a whole blocking queue starting with the element bqe.
3323 */
3324 void 
3325 print_bqe (StgBlockingQueueElement *bqe)
3326 {
3327   rtsBool end;
3328
3329   /* 
3330      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3331   */
3332   for (end = (bqe==END_BQ_QUEUE);
3333        !end; // iterate until bqe points to a CONSTR
3334        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3335        bqe = end ? END_BQ_QUEUE : bqe->link) {
3336     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3337     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3338     /* types of closures that may appear in a blocking queue */
3339     ASSERT(get_itbl(bqe)->type == TSO ||           
3340            get_itbl(bqe)->type == BLOCKED_FETCH || 
3341            get_itbl(bqe)->type == CONSTR); 
3342     /* only BQs of an RBH end with an RBH_Save closure */
3343     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3344
3345     switch (get_itbl(bqe)->type) {
3346     case TSO:
3347       fprintf(stderr," TSO %u (%x),",
3348               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3349       break;
3350     case BLOCKED_FETCH:
3351       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3352               ((StgBlockedFetch *)bqe)->node, 
3353               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3354               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3355               ((StgBlockedFetch *)bqe)->ga.weight);
3356       break;
3357     case CONSTR:
3358       fprintf(stderr," %s (IP %p),",
3359               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3360                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3361                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3362                "RBH_Save_?"), get_itbl(bqe));
3363       break;
3364     default:
3365       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3366            info_type((StgClosure *)bqe)); // , node, info_type(node));
3367       break;
3368     }
3369   } /* for */
3370   fputc('\n', stderr);
3371 }
3372 # elif defined(GRAN)
3373 void 
3374 print_bq (StgClosure *node)
3375 {
3376   StgBlockingQueueElement *bqe;
3377   PEs node_loc, tso_loc;
3378   rtsBool end;
3379
3380   /* should cover all closures that may have a blocking queue */
3381   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3382          get_itbl(node)->type == FETCH_ME_BQ ||
3383          get_itbl(node)->type == RBH);
3384     
3385   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3386   node_loc = where_is(node);
3387
3388   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3389           node, info_type(node), node_loc);
3390
3391   /* 
3392      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3393   */
3394   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3395        !end; // iterate until bqe points to a CONSTR
3396        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3397     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3398     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3399     /* types of closures that may appear in a blocking queue */
3400     ASSERT(get_itbl(bqe)->type == TSO ||           
3401            get_itbl(bqe)->type == CONSTR); 
3402     /* only BQs of an RBH end with an RBH_Save closure */
3403     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3404
3405     tso_loc = where_is((StgClosure *)bqe);
3406     switch (get_itbl(bqe)->type) {
3407     case TSO:
3408       fprintf(stderr," TSO %d (%p) on [PE %d],",
3409               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3410       break;
3411     case CONSTR:
3412       fprintf(stderr," %s (IP %p),",
3413               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3414                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3415                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3416                "RBH_Save_?"), get_itbl(bqe));
3417       break;
3418     default:
3419       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3420            info_type((StgClosure *)bqe), node, info_type(node));
3421       break;
3422     }
3423   } /* for */
3424   fputc('\n', stderr);
3425 }
3426 #else
3427 /* 
3428    Nice and easy: only TSOs on the blocking queue
3429 */
3430 void 
3431 print_bq (StgClosure *node)
3432 {
3433   StgTSO *tso;
3434
3435   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3436   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3437        tso != END_TSO_QUEUE; 
3438        tso=tso->link) {
3439     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3440     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3441     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3442   }
3443   fputc('\n', stderr);
3444 }
3445 # endif
3446
3447 #if defined(PAR)
3448 static nat
3449 run_queue_len(void)
3450 {
3451   nat i;
3452   StgTSO *tso;
3453
3454   for (i=0, tso=run_queue_hd; 
3455        tso != END_TSO_QUEUE;
3456        i++, tso=tso->link)
3457     /* nothing */
3458
3459   return i;
3460 }
3461 #endif
3462
3463 static void
3464 sched_belch(char *s, ...)
3465 {
3466   va_list ap;
3467   va_start(ap,s);
3468 #ifdef SMP
3469   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3470 #elif defined(PAR)
3471   fprintf(stderr, "== ");
3472 #else
3473   fprintf(stderr, "scheduler: ");
3474 #endif
3475   vfprintf(stderr, s, ap);
3476   fprintf(stderr, "\n");
3477 }
3478
3479 #endif /* DEBUG */
3480
3481
3482 //@node Index,  , Debugging Routines, Main scheduling code
3483 //@subsection Index
3484
3485 //@index
3486 //* MainRegTable::  @cindex\s-+MainRegTable
3487 //* StgMainThread::  @cindex\s-+StgMainThread
3488 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3489 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3490 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3491 //* context_switch::  @cindex\s-+context_switch
3492 //* createThread::  @cindex\s-+createThread
3493 //* free_capabilities::  @cindex\s-+free_capabilities
3494 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3495 //* initScheduler::  @cindex\s-+initScheduler
3496 //* interrupted::  @cindex\s-+interrupted
3497 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3498 //* next_thread_id::  @cindex\s-+next_thread_id
3499 //* print_bq::  @cindex\s-+print_bq
3500 //* run_queue_hd::  @cindex\s-+run_queue_hd
3501 //* run_queue_tl::  @cindex\s-+run_queue_tl
3502 //* sched_mutex::  @cindex\s-+sched_mutex
3503 //* schedule::  @cindex\s-+schedule
3504 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3505 //* task_ids::  @cindex\s-+task_ids
3506 //* term_mutex::  @cindex\s-+term_mutex
3507 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3508 //@end index