[project @ 2001-10-23 10:54:14 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.101 2001/10/23 10:54:14 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "PosixSource.h"
78 #include "Rts.h"
79 #include "SchedAPI.h"
80 #include "RtsUtils.h"
81 #include "RtsFlags.h"
82 #include "Storage.h"
83 #include "StgRun.h"
84 #include "StgStartup.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #if defined(GRAN) || defined(PAR)
99 # include "GranSimRts.h"
100 # include "GranSim.h"
101 # include "ParallelRts.h"
102 # include "Parallel.h"
103 # include "ParallelDebug.h"
104 # include "FetchMe.h"
105 # include "HLC.h"
106 #endif
107 #include "Sparks.h"
108
109 #include <stdarg.h>
110
111 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
112 //@subsection Variables and Data structures
113
114 /* Main threads:
115  *
116  * These are the threads which clients have requested that we run.  
117  *
118  * In an SMP build, we might have several concurrent clients all
119  * waiting for results, and each one will wait on a condition variable
120  * until the result is available.
121  *
122  * In non-SMP, clients are strictly nested: the first client calls
123  * into the RTS, which might call out again to C with a _ccall_GC, and
124  * eventually re-enter the RTS.
125  *
126  * Main threads information is kept in a linked list:
127  */
128 //@cindex StgMainThread
129 typedef struct StgMainThread_ {
130   StgTSO *         tso;
131   SchedulerStatus  stat;
132   StgClosure **    ret;
133 #ifdef SMP
134   pthread_cond_t wakeup;
135 #endif
136   struct StgMainThread_ *link;
137 } StgMainThread;
138
139 /* Main thread queue.
140  * Locks required: sched_mutex.
141  */
142 static StgMainThread *main_threads;
143
144 /* Thread queues.
145  * Locks required: sched_mutex.
146  */
147 #if defined(GRAN)
148
149 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
150 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
151
152 /* 
153    In GranSim we have a runable and a blocked queue for each processor.
154    In order to minimise code changes new arrays run_queue_hds/tls
155    are created. run_queue_hd is then a short cut (macro) for
156    run_queue_hds[CurrentProc] (see GranSim.h).
157    -- HWL
158 */
159 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
160 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
161 StgTSO *ccalling_threadss[MAX_PROC];
162 /* We use the same global list of threads (all_threads) in GranSim as in
163    the std RTS (i.e. we are cheating). However, we don't use this list in
164    the GranSim specific code at the moment (so we are only potentially
165    cheating).  */
166
167 #else /* !GRAN */
168
169 StgTSO *run_queue_hd, *run_queue_tl;
170 StgTSO *blocked_queue_hd, *blocked_queue_tl;
171 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
172
173 #endif
174
175 /* Linked list of all threads.
176  * Used for detecting garbage collected threads.
177  */
178 StgTSO *all_threads;
179
180 /* Threads suspended in _ccall_GC.
181  */
182 static StgTSO *suspended_ccalling_threads;
183
184 static void GetRoots(evac_fn);
185 static StgTSO *threadStackOverflow(StgTSO *tso);
186
187 /* KH: The following two flags are shared memory locations.  There is no need
188        to lock them, since they are only unset at the end of a scheduler
189        operation.
190 */
191
192 /* flag set by signal handler to precipitate a context switch */
193 //@cindex context_switch
194 nat context_switch;
195
196 /* if this flag is set as well, give up execution */
197 //@cindex interrupted
198 rtsBool interrupted;
199
200 /* Next thread ID to allocate.
201  * Locks required: sched_mutex
202  */
203 //@cindex next_thread_id
204 StgThreadID next_thread_id = 1;
205
206 /*
207  * Pointers to the state of the current thread.
208  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
209  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
210  */
211  
212 /* The smallest stack size that makes any sense is:
213  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
214  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
215  *  + 1                       (the realworld token for an IO thread)
216  *  + 1                       (the closure to enter)
217  *
218  * A thread with this stack will bomb immediately with a stack
219  * overflow, which will increase its stack size.  
220  */
221
222 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
223
224 /* Free capability list.
225  * Locks required: sched_mutex.
226  */
227 #ifdef SMP
228 //@cindex free_capabilities
229 //@cindex n_free_capabilities
230 Capability *free_capabilities; /* Available capabilities for running threads */
231 nat n_free_capabilities;       /* total number of available capabilities */
232 #else
233 //@cindex MainRegTable
234 Capability MainRegTable;       /* for non-SMP, we have one global capability */
235 #endif
236
237 #if defined(GRAN)
238 StgTSO *CurrentTSO;
239 #endif
240
241 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
242  *  exists - earlier gccs apparently didn't.
243  *  -= chak
244  */
245 StgTSO dummy_tso;
246
247 rtsBool ready_to_gc;
248
249 /* All our current task ids, saved in case we need to kill them later.
250  */
251 #ifdef SMP
252 //@cindex task_ids
253 task_info *task_ids;
254 #endif
255
256 void            addToBlockedQueue ( StgTSO *tso );
257
258 static void     schedule          ( void );
259        void     interruptStgRts   ( void );
260 #if defined(GRAN)
261 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
262 #else
263 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
264 #endif
265
266 static void     detectBlackHoles  ( void );
267
268 #ifdef DEBUG
269 static void sched_belch(char *s, ...);
270 #endif
271
272 #ifdef SMP
273 //@cindex sched_mutex
274 //@cindex term_mutex
275 //@cindex thread_ready_cond
276 //@cindex gc_pending_cond
277 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
278 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
279 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
280 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
281
282 nat await_death;
283 #endif
284
285 #if defined(PAR)
286 StgTSO *LastTSO;
287 rtsTime TimeOfLastYield;
288 rtsBool emitSchedule = rtsTrue;
289 #endif
290
291 #if DEBUG
292 char *whatNext_strs[] = {
293   "ThreadEnterGHC",
294   "ThreadRunGHC",
295   "ThreadEnterInterp",
296   "ThreadKilled",
297   "ThreadComplete"
298 };
299
300 char *threadReturnCode_strs[] = {
301   "HeapOverflow",                       /* might also be StackOverflow */
302   "StackOverflow",
303   "ThreadYielding",
304   "ThreadBlocked",
305   "ThreadFinished"
306 };
307 #endif
308
309 #ifdef PAR
310 StgTSO * createSparkThread(rtsSpark spark);
311 StgTSO * activateSpark (rtsSpark spark);  
312 #endif
313
314 /*
315  * The thread state for the main thread.
316 // ToDo: check whether not needed any more
317 StgTSO   *MainTSO;
318  */
319
320 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
321 //@subsection Main scheduling loop
322
323 /* ---------------------------------------------------------------------------
324    Main scheduling loop.
325
326    We use round-robin scheduling, each thread returning to the
327    scheduler loop when one of these conditions is detected:
328
329       * out of heap space
330       * timer expires (thread yields)
331       * thread blocks
332       * thread ends
333       * stack overflow
334
335    Locking notes:  we acquire the scheduler lock once at the beginning
336    of the scheduler loop, and release it when
337     
338       * running a thread, or
339       * waiting for work, or
340       * waiting for a GC to complete.
341
342    GRAN version:
343      In a GranSim setup this loop iterates over the global event queue.
344      This revolves around the global event queue, which determines what 
345      to do next. Therefore, it's more complicated than either the 
346      concurrent or the parallel (GUM) setup.
347
348    GUM version:
349      GUM iterates over incoming messages.
350      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
351      and sends out a fish whenever it has nothing to do; in-between
352      doing the actual reductions (shared code below) it processes the
353      incoming messages and deals with delayed operations 
354      (see PendingFetches).
355      This is not the ugliest code you could imagine, but it's bloody close.
356
357    ------------------------------------------------------------------------ */
358 //@cindex schedule
359 static void
360 schedule( void )
361 {
362   StgTSO *t;
363   Capability *cap;
364   StgThreadReturnCode ret;
365 #if defined(GRAN)
366   rtsEvent *event;
367 #elif defined(PAR)
368   StgSparkPool *pool;
369   rtsSpark spark;
370   StgTSO *tso;
371   GlobalTaskId pe;
372   rtsBool receivedFinish = rtsFalse;
373 # if defined(DEBUG)
374   nat tp_size, sp_size; // stats only
375 # endif
376 #endif
377   rtsBool was_interrupted = rtsFalse;
378   
379   ACQUIRE_LOCK(&sched_mutex);
380
381 #if defined(GRAN)
382
383   /* set up first event to get things going */
384   /* ToDo: assign costs for system setup and init MainTSO ! */
385   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
386             ContinueThread, 
387             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
388
389   IF_DEBUG(gran,
390            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
391            G_TSO(CurrentTSO, 5));
392
393   if (RtsFlags.GranFlags.Light) {
394     /* Save current time; GranSim Light only */
395     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
396   }      
397
398   event = get_next_event();
399
400   while (event!=(rtsEvent*)NULL) {
401     /* Choose the processor with the next event */
402     CurrentProc = event->proc;
403     CurrentTSO = event->tso;
404
405 #elif defined(PAR)
406
407   while (!receivedFinish) {    /* set by processMessages */
408                                /* when receiving PP_FINISH message         */ 
409 #else
410
411   while (1) {
412
413 #endif
414
415     IF_DEBUG(scheduler, printAllThreads());
416
417     /* If we're interrupted (the user pressed ^C, or some other
418      * termination condition occurred), kill all the currently running
419      * threads.
420      */
421     if (interrupted) {
422       IF_DEBUG(scheduler, sched_belch("interrupted"));
423       deleteAllThreads();
424       interrupted = rtsFalse;
425       was_interrupted = rtsTrue;
426     }
427
428     /* Go through the list of main threads and wake up any
429      * clients whose computations have finished.  ToDo: this
430      * should be done more efficiently without a linear scan
431      * of the main threads list, somehow...
432      */
433 #ifdef SMP
434     { 
435       StgMainThread *m, **prev;
436       prev = &main_threads;
437       for (m = main_threads; m != NULL; m = m->link) {
438         switch (m->tso->what_next) {
439         case ThreadComplete:
440           if (m->ret) {
441             *(m->ret) = (StgClosure *)m->tso->sp[0];
442           }
443           *prev = m->link;
444           m->stat = Success;
445           pthread_cond_broadcast(&m->wakeup);
446           break;
447         case ThreadKilled:
448           *prev = m->link;
449           if (was_interrupted) {
450             m->stat = Interrupted;
451           } else {
452             m->stat = Killed;
453           }
454           pthread_cond_broadcast(&m->wakeup);
455           break;
456         default:
457           break;
458         }
459       }
460     }
461
462 #else
463 # if defined(PAR)
464     /* in GUM do this only on the Main PE */
465     if (IAmMainThread)
466 # endif
467     /* If our main thread has finished or been killed, return.
468      */
469     {
470       StgMainThread *m = main_threads;
471       if (m->tso->what_next == ThreadComplete
472           || m->tso->what_next == ThreadKilled) {
473         main_threads = main_threads->link;
474         if (m->tso->what_next == ThreadComplete) {
475           /* we finished successfully, fill in the return value */
476           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
477           m->stat = Success;
478           return;
479         } else {
480           if (was_interrupted) {
481             m->stat = Interrupted;
482           } else {
483             m->stat = Killed;
484           }
485           return;
486         }
487       }
488     }
489 #endif
490
491     /* Top up the run queue from our spark pool.  We try to make the
492      * number of threads in the run queue equal to the number of
493      * free capabilities.
494      */
495 #if defined(SMP)
496     {
497       nat n = n_free_capabilities;
498       StgTSO *tso = run_queue_hd;
499
500       /* Count the run queue */
501       while (n > 0 && tso != END_TSO_QUEUE) {
502         tso = tso->link;
503         n--;
504       }
505
506       for (; n > 0; n--) {
507         StgClosure *spark;
508         spark = findSpark(rtsFalse);
509         if (spark == NULL) {
510           break; /* no more sparks in the pool */
511         } else {
512           /* I'd prefer this to be done in activateSpark -- HWL */
513           /* tricky - it needs to hold the scheduler lock and
514            * not try to re-acquire it -- SDM */
515           createSparkThread(spark);       
516           IF_DEBUG(scheduler,
517                    sched_belch("==^^ turning spark of closure %p into a thread",
518                                (StgClosure *)spark));
519         }
520       }
521       /* We need to wake up the other tasks if we just created some
522        * work for them.
523        */
524       if (n_free_capabilities - n > 1) {
525           pthread_cond_signal(&thread_ready_cond);
526       }
527     }
528 #endif /* SMP */
529
530     /* Check whether any waiting threads need to be woken up.  If the
531      * run queue is empty, and there are no other tasks running, we
532      * can wait indefinitely for something to happen.
533      * ToDo: what if another client comes along & requests another
534      * main thread?
535      */
536     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
537       awaitEvent(
538            (run_queue_hd == END_TSO_QUEUE)
539 #ifdef SMP
540         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
541 #endif
542         );
543     }
544     /* we can be interrupted while waiting for I/O... */
545     if (interrupted) continue;
546
547     /* check for signals each time around the scheduler */
548 #ifndef mingw32_TARGET_OS
549     if (signals_pending()) {
550       start_signal_handlers();
551     }
552 #endif
553
554     /* 
555      * Detect deadlock: when we have no threads to run, there are no
556      * threads waiting on I/O or sleeping, and all the other tasks are
557      * waiting for work, we must have a deadlock of some description.
558      *
559      * We first try to find threads blocked on themselves (ie. black
560      * holes), and generate NonTermination exceptions where necessary.
561      *
562      * If no threads are black holed, we have a deadlock situation, so
563      * inform all the main threads.
564      */
565 #ifndef PAR
566     if (blocked_queue_hd == END_TSO_QUEUE
567         && run_queue_hd == END_TSO_QUEUE
568         && sleeping_queue == END_TSO_QUEUE
569 #ifdef SMP
570         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
571 #endif
572         )
573     {
574         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
575         GarbageCollect(GetRoots,rtsTrue);
576         if (blocked_queue_hd == END_TSO_QUEUE
577             && run_queue_hd == END_TSO_QUEUE
578             && sleeping_queue == END_TSO_QUEUE) {
579             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
580             detectBlackHoles();
581             if (run_queue_hd == END_TSO_QUEUE) {
582                 StgMainThread *m = main_threads;
583 #ifdef SMP
584                 for (; m != NULL; m = m->link) {
585                     deleteThread(m->tso);
586                     m->ret = NULL;
587                     m->stat = Deadlock;
588                     pthread_cond_broadcast(&m->wakeup);
589                 }
590                 main_threads = NULL;
591 #else
592                 deleteThread(m->tso);
593                 m->ret = NULL;
594                 m->stat = Deadlock;
595                 main_threads = m->link;
596                 return;
597 #endif
598             }
599         }
600     }
601 #elif defined(PAR)
602     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
603 #endif
604
605 #ifdef SMP
606     /* If there's a GC pending, don't do anything until it has
607      * completed.
608      */
609     if (ready_to_gc) {
610       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
611       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
612     }
613     
614     /* block until we've got a thread on the run queue and a free
615      * capability.
616      */
617     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
618       IF_DEBUG(scheduler, sched_belch("waiting for work"));
619       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
620       IF_DEBUG(scheduler, sched_belch("work now available"));
621     }
622 #endif
623
624 #if defined(GRAN)
625
626     if (RtsFlags.GranFlags.Light)
627       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
628
629     /* adjust time based on time-stamp */
630     if (event->time > CurrentTime[CurrentProc] &&
631         event->evttype != ContinueThread)
632       CurrentTime[CurrentProc] = event->time;
633     
634     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
635     if (!RtsFlags.GranFlags.Light)
636       handleIdlePEs();
637
638     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
639
640     /* main event dispatcher in GranSim */
641     switch (event->evttype) {
642       /* Should just be continuing execution */
643     case ContinueThread:
644       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
645       /* ToDo: check assertion
646       ASSERT(run_queue_hd != (StgTSO*)NULL &&
647              run_queue_hd != END_TSO_QUEUE);
648       */
649       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
650       if (!RtsFlags.GranFlags.DoAsyncFetch &&
651           procStatus[CurrentProc]==Fetching) {
652         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
653               CurrentTSO->id, CurrentTSO, CurrentProc);
654         goto next_thread;
655       } 
656       /* Ignore ContinueThreads for completed threads */
657       if (CurrentTSO->what_next == ThreadComplete) {
658         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
659               CurrentTSO->id, CurrentTSO, CurrentProc);
660         goto next_thread;
661       } 
662       /* Ignore ContinueThreads for threads that are being migrated */
663       if (PROCS(CurrentTSO)==Nowhere) { 
664         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
665               CurrentTSO->id, CurrentTSO, CurrentProc);
666         goto next_thread;
667       }
668       /* The thread should be at the beginning of the run queue */
669       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
670         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
671               CurrentTSO->id, CurrentTSO, CurrentProc);
672         break; // run the thread anyway
673       }
674       /*
675       new_event(proc, proc, CurrentTime[proc],
676                 FindWork,
677                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
678       goto next_thread; 
679       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
680       break; // now actually run the thread; DaH Qu'vam yImuHbej 
681
682     case FetchNode:
683       do_the_fetchnode(event);
684       goto next_thread;             /* handle next event in event queue  */
685       
686     case GlobalBlock:
687       do_the_globalblock(event);
688       goto next_thread;             /* handle next event in event queue  */
689       
690     case FetchReply:
691       do_the_fetchreply(event);
692       goto next_thread;             /* handle next event in event queue  */
693       
694     case UnblockThread:   /* Move from the blocked queue to the tail of */
695       do_the_unblock(event);
696       goto next_thread;             /* handle next event in event queue  */
697       
698     case ResumeThread:  /* Move from the blocked queue to the tail of */
699       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
700       event->tso->gran.blocktime += 
701         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
702       do_the_startthread(event);
703       goto next_thread;             /* handle next event in event queue  */
704       
705     case StartThread:
706       do_the_startthread(event);
707       goto next_thread;             /* handle next event in event queue  */
708       
709     case MoveThread:
710       do_the_movethread(event);
711       goto next_thread;             /* handle next event in event queue  */
712       
713     case MoveSpark:
714       do_the_movespark(event);
715       goto next_thread;             /* handle next event in event queue  */
716       
717     case FindWork:
718       do_the_findwork(event);
719       goto next_thread;             /* handle next event in event queue  */
720       
721     default:
722       barf("Illegal event type %u\n", event->evttype);
723     }  /* switch */
724     
725     /* This point was scheduler_loop in the old RTS */
726
727     IF_DEBUG(gran, belch("GRAN: after main switch"));
728
729     TimeOfLastEvent = CurrentTime[CurrentProc];
730     TimeOfNextEvent = get_time_of_next_event();
731     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
732     // CurrentTSO = ThreadQueueHd;
733
734     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
735                          TimeOfNextEvent));
736
737     if (RtsFlags.GranFlags.Light) 
738       GranSimLight_leave_system(event, &ActiveTSO); 
739
740     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
741
742     IF_DEBUG(gran, 
743              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
744
745     /* in a GranSim setup the TSO stays on the run queue */
746     t = CurrentTSO;
747     /* Take a thread from the run queue. */
748     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
749
750     IF_DEBUG(gran, 
751              fprintf(stderr, "GRAN: About to run current thread, which is\n");
752              G_TSO(t,5));
753
754     context_switch = 0; // turned on via GranYield, checking events and time slice
755
756     IF_DEBUG(gran, 
757              DumpGranEvent(GR_SCHEDULE, t));
758
759     procStatus[CurrentProc] = Busy;
760
761 #elif defined(PAR)
762     if (PendingFetches != END_BF_QUEUE) {
763         processFetches();
764     }
765
766     /* ToDo: phps merge with spark activation above */
767     /* check whether we have local work and send requests if we have none */
768     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
769       /* :-[  no local threads => look out for local sparks */
770       /* the spark pool for the current PE */
771       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
772       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
773           pool->hd < pool->tl) {
774         /* 
775          * ToDo: add GC code check that we really have enough heap afterwards!!
776          * Old comment:
777          * If we're here (no runnable threads) and we have pending
778          * sparks, we must have a space problem.  Get enough space
779          * to turn one of those pending sparks into a
780          * thread... 
781          */
782
783         spark = findSpark(rtsFalse);                /* get a spark */
784         if (spark != (rtsSpark) NULL) {
785           tso = activateSpark(spark);       /* turn the spark into a thread */
786           IF_PAR_DEBUG(schedule,
787                        belch("==== schedule: Created TSO %d (%p); %d threads active",
788                              tso->id, tso, advisory_thread_count));
789
790           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
791             belch("==^^ failed to activate spark");
792             goto next_thread;
793           }               /* otherwise fall through & pick-up new tso */
794         } else {
795           IF_PAR_DEBUG(verbose,
796                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
797                              spark_queue_len(pool)));
798           goto next_thread;
799         }
800       }
801
802       /* If we still have no work we need to send a FISH to get a spark
803          from another PE 
804       */
805       if (EMPTY_RUN_QUEUE()) {
806       /* =8-[  no local sparks => look for work on other PEs */
807         /*
808          * We really have absolutely no work.  Send out a fish
809          * (there may be some out there already), and wait for
810          * something to arrive.  We clearly can't run any threads
811          * until a SCHEDULE or RESUME arrives, and so that's what
812          * we're hoping to see.  (Of course, we still have to
813          * respond to other types of messages.)
814          */
815         TIME now = msTime() /*CURRENT_TIME*/;
816         IF_PAR_DEBUG(verbose, 
817                      belch("--  now=%ld", now));
818         IF_PAR_DEBUG(verbose,
819                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
820                          (last_fish_arrived_at!=0 &&
821                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
822                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
823                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
824                              last_fish_arrived_at,
825                              RtsFlags.ParFlags.fishDelay, now);
826                      });
827         
828         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
829             (last_fish_arrived_at==0 ||
830              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
831           /* outstandingFishes is set in sendFish, processFish;
832              avoid flooding system with fishes via delay */
833           pe = choosePE();
834           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
835                    NEW_FISH_HUNGER);
836
837           // Global statistics: count no. of fishes
838           if (RtsFlags.ParFlags.ParStats.Global &&
839               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
840             globalParStats.tot_fish_mess++;
841           }
842         }
843       
844         receivedFinish = processMessages();
845         goto next_thread;
846       }
847     } else if (PacketsWaiting()) {  /* Look for incoming messages */
848       receivedFinish = processMessages();
849     }
850
851     /* Now we are sure that we have some work available */
852     ASSERT(run_queue_hd != END_TSO_QUEUE);
853
854     /* Take a thread from the run queue, if we have work */
855     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
856     IF_DEBUG(sanity,checkTSO(t));
857
858     /* ToDo: write something to the log-file
859     if (RTSflags.ParFlags.granSimStats && !sameThread)
860         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
861
862     CurrentTSO = t;
863     */
864     /* the spark pool for the current PE */
865     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
866
867     IF_DEBUG(scheduler, 
868              belch("--=^ %d threads, %d sparks on [%#x]", 
869                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
870
871 #if 1
872     if (0 && RtsFlags.ParFlags.ParStats.Full && 
873         t && LastTSO && t->id != LastTSO->id && 
874         LastTSO->why_blocked == NotBlocked && 
875         LastTSO->what_next != ThreadComplete) {
876       // if previously scheduled TSO not blocked we have to record the context switch
877       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
878                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
879     }
880
881     if (RtsFlags.ParFlags.ParStats.Full && 
882         (emitSchedule /* forced emit */ ||
883         (t && LastTSO && t->id != LastTSO->id))) {
884       /* 
885          we are running a different TSO, so write a schedule event to log file
886          NB: If we use fair scheduling we also have to write  a deschedule 
887              event for LastTSO; with unfair scheduling we know that the
888              previous tso has blocked whenever we switch to another tso, so
889              we don't need it in GUM for now
890       */
891       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
892                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
893       emitSchedule = rtsFalse;
894     }
895      
896 #endif
897 #else /* !GRAN && !PAR */
898   
899     /* grab a thread from the run queue
900      */
901     ASSERT(run_queue_hd != END_TSO_QUEUE);
902     t = POP_RUN_QUEUE();
903     IF_DEBUG(sanity,checkTSO(t));
904
905 #endif
906     
907     /* grab a capability
908      */
909 #ifdef SMP
910     cap = free_capabilities;
911     free_capabilities = cap->link;
912     n_free_capabilities--;
913 #else
914     cap = &MainRegTable;
915 #endif
916
917     cap->rCurrentTSO = t;
918     
919     /* context switches are now initiated by the timer signal, unless
920      * the user specified "context switch as often as possible", with
921      * +RTS -C0
922      */
923     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
924         && (run_queue_hd != END_TSO_QUEUE
925             || blocked_queue_hd != END_TSO_QUEUE
926             || sleeping_queue != END_TSO_QUEUE))
927         context_switch = 1;
928     else
929         context_switch = 0;
930
931     RELEASE_LOCK(&sched_mutex);
932
933     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
934                               t->id, t, whatNext_strs[t->what_next]));
935
936     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
937     /* Run the current thread 
938      */
939     switch (cap->rCurrentTSO->what_next) {
940     case ThreadKilled:
941     case ThreadComplete:
942         /* Thread already finished, return to scheduler. */
943         ret = ThreadFinished;
944         break;
945     case ThreadEnterGHC:
946         ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
947         break;
948     case ThreadRunGHC:
949         ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
950         break;
951     case ThreadEnterInterp:
952         ret = interpretBCO(cap);
953         break;
954     default:
955       barf("schedule: invalid what_next field");
956     }
957     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
958     
959     /* Costs for the scheduler are assigned to CCS_SYSTEM */
960 #ifdef PROFILING
961     CCCS = CCS_SYSTEM;
962 #endif
963     
964     ACQUIRE_LOCK(&sched_mutex);
965
966 #ifdef SMP
967     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
968 #elif !defined(GRAN) && !defined(PAR)
969     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
970 #endif
971     t = cap->rCurrentTSO;
972     
973 #if defined(PAR)
974     /* HACK 675: if the last thread didn't yield, make sure to print a 
975        SCHEDULE event to the log file when StgRunning the next thread, even
976        if it is the same one as before */
977     LastTSO = t; 
978     TimeOfLastYield = CURRENT_TIME;
979 #endif
980
981     switch (ret) {
982     case HeapOverflow:
983 #if defined(GRAN)
984       IF_DEBUG(gran, 
985                DumpGranEvent(GR_DESCHEDULE, t));
986       globalGranStats.tot_heapover++;
987 #elif defined(PAR)
988       // IF_DEBUG(par, 
989       //DumpGranEvent(GR_DESCHEDULE, t);
990       globalParStats.tot_heapover++;
991 #endif
992       /* make all the running tasks block on a condition variable,
993        * maybe set context_switch and wait till they all pile in,
994        * then have them wait on a GC condition variable.
995        */
996       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
997                                t->id, t, whatNext_strs[t->what_next]));
998       threadPaused(t);
999 #if defined(GRAN)
1000       ASSERT(!is_on_queue(t,CurrentProc));
1001 #elif defined(PAR)
1002       /* Currently we emit a DESCHEDULE event before GC in GUM.
1003          ToDo: either add separate event to distinguish SYSTEM time from rest
1004                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1005       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1006         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1007                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1008         emitSchedule = rtsTrue;
1009       }
1010 #endif
1011       
1012       ready_to_gc = rtsTrue;
1013       context_switch = 1;               /* stop other threads ASAP */
1014       PUSH_ON_RUN_QUEUE(t);
1015       /* actual GC is done at the end of the while loop */
1016       break;
1017       
1018     case StackOverflow:
1019 #if defined(GRAN)
1020       IF_DEBUG(gran, 
1021                DumpGranEvent(GR_DESCHEDULE, t));
1022       globalGranStats.tot_stackover++;
1023 #elif defined(PAR)
1024       // IF_DEBUG(par, 
1025       // DumpGranEvent(GR_DESCHEDULE, t);
1026       globalParStats.tot_stackover++;
1027 #endif
1028       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1029                                t->id, t, whatNext_strs[t->what_next]));
1030       /* just adjust the stack for this thread, then pop it back
1031        * on the run queue.
1032        */
1033       threadPaused(t);
1034       { 
1035         StgMainThread *m;
1036         /* enlarge the stack */
1037         StgTSO *new_t = threadStackOverflow(t);
1038         
1039         /* This TSO has moved, so update any pointers to it from the
1040          * main thread stack.  It better not be on any other queues...
1041          * (it shouldn't be).
1042          */
1043         for (m = main_threads; m != NULL; m = m->link) {
1044           if (m->tso == t) {
1045             m->tso = new_t;
1046           }
1047         }
1048         threadPaused(new_t);
1049         PUSH_ON_RUN_QUEUE(new_t);
1050       }
1051       break;
1052
1053     case ThreadYielding:
1054 #if defined(GRAN)
1055       IF_DEBUG(gran, 
1056                DumpGranEvent(GR_DESCHEDULE, t));
1057       globalGranStats.tot_yields++;
1058 #elif defined(PAR)
1059       // IF_DEBUG(par, 
1060       // DumpGranEvent(GR_DESCHEDULE, t);
1061       globalParStats.tot_yields++;
1062 #endif
1063       /* put the thread back on the run queue.  Then, if we're ready to
1064        * GC, check whether this is the last task to stop.  If so, wake
1065        * up the GC thread.  getThread will block during a GC until the
1066        * GC is finished.
1067        */
1068       IF_DEBUG(scheduler,
1069                if (t->what_next == ThreadEnterInterp) {
1070                    /* ToDo: or maybe a timer expired when we were in Hugs?
1071                     * or maybe someone hit ctrl-C
1072                     */
1073                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1074                          t->id, t, whatNext_strs[t->what_next]);
1075                } else {
1076                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1077                          t->id, t, whatNext_strs[t->what_next]);
1078                }
1079                );
1080
1081       threadPaused(t);
1082
1083       IF_DEBUG(sanity,
1084                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1085                checkTSO(t));
1086       ASSERT(t->link == END_TSO_QUEUE);
1087 #if defined(GRAN)
1088       ASSERT(!is_on_queue(t,CurrentProc));
1089
1090       IF_DEBUG(sanity,
1091                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1092                checkThreadQsSanity(rtsTrue));
1093 #endif
1094 #if defined(PAR)
1095       if (RtsFlags.ParFlags.doFairScheduling) { 
1096         /* this does round-robin scheduling; good for concurrency */
1097         APPEND_TO_RUN_QUEUE(t);
1098       } else {
1099         /* this does unfair scheduling; good for parallelism */
1100         PUSH_ON_RUN_QUEUE(t);
1101       }
1102 #else
1103       /* this does round-robin scheduling; good for concurrency */
1104       APPEND_TO_RUN_QUEUE(t);
1105 #endif
1106 #if defined(GRAN)
1107       /* add a ContinueThread event to actually process the thread */
1108       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1109                 ContinueThread,
1110                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1111       IF_GRAN_DEBUG(bq, 
1112                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1113                G_EVENTQ(0);
1114                G_CURR_THREADQ(0));
1115 #endif /* GRAN */
1116       break;
1117       
1118     case ThreadBlocked:
1119 #if defined(GRAN)
1120       IF_DEBUG(scheduler,
1121                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1122                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1123                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1124
1125       // ??? needed; should emit block before
1126       IF_DEBUG(gran, 
1127                DumpGranEvent(GR_DESCHEDULE, t)); 
1128       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1129       /*
1130         ngoq Dogh!
1131       ASSERT(procStatus[CurrentProc]==Busy || 
1132               ((procStatus[CurrentProc]==Fetching) && 
1133               (t->block_info.closure!=(StgClosure*)NULL)));
1134       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1135           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1136             procStatus[CurrentProc]==Fetching)) 
1137         procStatus[CurrentProc] = Idle;
1138       */
1139 #elif defined(PAR)
1140       IF_DEBUG(scheduler,
1141                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1142                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1143       IF_PAR_DEBUG(bq,
1144
1145                    if (t->block_info.closure!=(StgClosure*)NULL) 
1146                      print_bq(t->block_info.closure));
1147
1148       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1149       blockThread(t);
1150
1151       /* whatever we schedule next, we must log that schedule */
1152       emitSchedule = rtsTrue;
1153
1154 #else /* !GRAN */
1155       /* don't need to do anything.  Either the thread is blocked on
1156        * I/O, in which case we'll have called addToBlockedQueue
1157        * previously, or it's blocked on an MVar or Blackhole, in which
1158        * case it'll be on the relevant queue already.
1159        */
1160       IF_DEBUG(scheduler,
1161                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1162                printThreadBlockage(t);
1163                fprintf(stderr, "\n"));
1164
1165       /* Only for dumping event to log file 
1166          ToDo: do I need this in GranSim, too?
1167       blockThread(t);
1168       */
1169 #endif
1170       threadPaused(t);
1171       break;
1172       
1173     case ThreadFinished:
1174       /* Need to check whether this was a main thread, and if so, signal
1175        * the task that started it with the return value.  If we have no
1176        * more main threads, we probably need to stop all the tasks until
1177        * we get a new one.
1178        */
1179       /* We also end up here if the thread kills itself with an
1180        * uncaught exception, see Exception.hc.
1181        */
1182       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1183 #if defined(GRAN)
1184       endThread(t, CurrentProc); // clean-up the thread
1185 #elif defined(PAR)
1186       /* For now all are advisory -- HWL */
1187       //if(t->priority==AdvisoryPriority) ??
1188       advisory_thread_count--;
1189       
1190 # ifdef DIST
1191       if(t->dist.priority==RevalPriority)
1192         FinishReval(t);
1193 # endif
1194       
1195       if (RtsFlags.ParFlags.ParStats.Full &&
1196           !RtsFlags.ParFlags.ParStats.Suppressed) 
1197         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1198 #endif
1199       break;
1200       
1201     default:
1202       barf("schedule: invalid thread return code %d", (int)ret);
1203     }
1204     
1205 #ifdef SMP
1206     cap->link = free_capabilities;
1207     free_capabilities = cap;
1208     n_free_capabilities++;
1209 #endif
1210
1211 #ifdef SMP
1212     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1213 #else
1214     if (ready_to_gc) 
1215 #endif
1216       {
1217       /* everybody back, start the GC.
1218        * Could do it in this thread, or signal a condition var
1219        * to do it in another thread.  Either way, we need to
1220        * broadcast on gc_pending_cond afterward.
1221        */
1222 #ifdef SMP
1223       IF_DEBUG(scheduler,sched_belch("doing GC"));
1224 #endif
1225       GarbageCollect(GetRoots,rtsFalse);
1226       ready_to_gc = rtsFalse;
1227 #ifdef SMP
1228       pthread_cond_broadcast(&gc_pending_cond);
1229 #endif
1230 #if defined(GRAN)
1231       /* add a ContinueThread event to continue execution of current thread */
1232       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1233                 ContinueThread,
1234                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1235       IF_GRAN_DEBUG(bq, 
1236                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1237                G_EVENTQ(0);
1238                G_CURR_THREADQ(0));
1239 #endif /* GRAN */
1240     }
1241 #if defined(GRAN)
1242   next_thread:
1243     IF_GRAN_DEBUG(unused,
1244                   print_eventq(EventHd));
1245
1246     event = get_next_event();
1247
1248 #elif defined(PAR)
1249   next_thread:
1250     /* ToDo: wait for next message to arrive rather than busy wait */
1251
1252 #else /* GRAN */
1253   /* not any more
1254   next_thread:
1255     t = take_off_run_queue(END_TSO_QUEUE);
1256   */
1257 #endif /* GRAN */
1258   } /* end of while(1) */
1259   IF_PAR_DEBUG(verbose,
1260                belch("== Leaving schedule() after having received Finish"));
1261 }
1262
1263 /* ---------------------------------------------------------------------------
1264  * deleteAllThreads():  kill all the live threads.
1265  *
1266  * This is used when we catch a user interrupt (^C), before performing
1267  * any necessary cleanups and running finalizers.
1268  * ------------------------------------------------------------------------- */
1269    
1270 void deleteAllThreads ( void )
1271 {
1272   StgTSO* t;
1273   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1274   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1275       deleteThread(t);
1276   }
1277   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1278       deleteThread(t);
1279   }
1280   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1281       deleteThread(t);
1282   }
1283   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1284   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1285   sleeping_queue = END_TSO_QUEUE;
1286 }
1287
1288 /* startThread and  insertThread are now in GranSim.c -- HWL */
1289
1290 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1291 //@subsection Suspend and Resume
1292
1293 /* ---------------------------------------------------------------------------
1294  * Suspending & resuming Haskell threads.
1295  * 
1296  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1297  * its capability before calling the C function.  This allows another
1298  * task to pick up the capability and carry on running Haskell
1299  * threads.  It also means that if the C call blocks, it won't lock
1300  * the whole system.
1301  *
1302  * The Haskell thread making the C call is put to sleep for the
1303  * duration of the call, on the susepended_ccalling_threads queue.  We
1304  * give out a token to the task, which it can use to resume the thread
1305  * on return from the C function.
1306  * ------------------------------------------------------------------------- */
1307    
1308 StgInt
1309 suspendThread( Capability *cap )
1310 {
1311   nat tok;
1312
1313   ACQUIRE_LOCK(&sched_mutex);
1314
1315   IF_DEBUG(scheduler,
1316            sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id));
1317
1318   threadPaused(cap->rCurrentTSO);
1319   cap->rCurrentTSO->link = suspended_ccalling_threads;
1320   suspended_ccalling_threads = cap->rCurrentTSO;
1321
1322   /* Use the thread ID as the token; it should be unique */
1323   tok = cap->rCurrentTSO->id;
1324
1325 #ifdef SMP
1326   cap->link = free_capabilities;
1327   free_capabilities = cap;
1328   n_free_capabilities++;
1329 #endif
1330
1331   RELEASE_LOCK(&sched_mutex);
1332   return tok; 
1333 }
1334
1335 Capability *
1336 resumeThread( StgInt tok )
1337 {
1338   StgTSO *tso, **prev;
1339   Capability *cap;
1340
1341   ACQUIRE_LOCK(&sched_mutex);
1342
1343   prev = &suspended_ccalling_threads;
1344   for (tso = suspended_ccalling_threads; 
1345        tso != END_TSO_QUEUE; 
1346        prev = &tso->link, tso = tso->link) {
1347     if (tso->id == (StgThreadID)tok) {
1348       *prev = tso->link;
1349       break;
1350     }
1351   }
1352   if (tso == END_TSO_QUEUE) {
1353     barf("resumeThread: thread not found");
1354   }
1355   tso->link = END_TSO_QUEUE;
1356
1357 #ifdef SMP
1358   while (free_capabilities == NULL) {
1359     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1360     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1361     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1362   }
1363   cap = free_capabilities;
1364   free_capabilities = cap->link;
1365   n_free_capabilities--;
1366 #else  
1367   cap = &MainRegTable;
1368 #endif
1369
1370   cap->rCurrentTSO = tso;
1371
1372   RELEASE_LOCK(&sched_mutex);
1373   return cap;
1374 }
1375
1376
1377 /* ---------------------------------------------------------------------------
1378  * Static functions
1379  * ------------------------------------------------------------------------ */
1380 static void unblockThread(StgTSO *tso);
1381
1382 /* ---------------------------------------------------------------------------
1383  * Comparing Thread ids.
1384  *
1385  * This is used from STG land in the implementation of the
1386  * instances of Eq/Ord for ThreadIds.
1387  * ------------------------------------------------------------------------ */
1388
1389 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1390
1391   StgThreadID id1 = tso1->id; 
1392   StgThreadID id2 = tso2->id;
1393  
1394   if (id1 < id2) return (-1);
1395   if (id1 > id2) return 1;
1396   return 0;
1397 }
1398
1399 /* ---------------------------------------------------------------------------
1400    Create a new thread.
1401
1402    The new thread starts with the given stack size.  Before the
1403    scheduler can run, however, this thread needs to have a closure
1404    (and possibly some arguments) pushed on its stack.  See
1405    pushClosure() in Schedule.h.
1406
1407    createGenThread() and createIOThread() (in SchedAPI.h) are
1408    convenient packaged versions of this function.
1409
1410    currently pri (priority) is only used in a GRAN setup -- HWL
1411    ------------------------------------------------------------------------ */
1412 //@cindex createThread
1413 #if defined(GRAN)
1414 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1415 StgTSO *
1416 createThread(nat stack_size, StgInt pri)
1417 {
1418   return createThread_(stack_size, rtsFalse, pri);
1419 }
1420
1421 static StgTSO *
1422 createThread_(nat size, rtsBool have_lock, StgInt pri)
1423 {
1424 #else
1425 StgTSO *
1426 createThread(nat stack_size)
1427 {
1428   return createThread_(stack_size, rtsFalse);
1429 }
1430
1431 static StgTSO *
1432 createThread_(nat size, rtsBool have_lock)
1433 {
1434 #endif
1435
1436     StgTSO *tso;
1437     nat stack_size;
1438
1439     /* First check whether we should create a thread at all */
1440 #if defined(PAR)
1441   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1442   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1443     threadsIgnored++;
1444     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1445           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1446     return END_TSO_QUEUE;
1447   }
1448   threadsCreated++;
1449 #endif
1450
1451 #if defined(GRAN)
1452   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1453 #endif
1454
1455   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1456
1457   /* catch ridiculously small stack sizes */
1458   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1459     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1460   }
1461
1462   stack_size = size - TSO_STRUCT_SIZEW;
1463
1464   tso = (StgTSO *)allocate(size);
1465   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1466
1467   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1468 #if defined(GRAN)
1469   SET_GRAN_HDR(tso, ThisPE);
1470 #endif
1471   tso->what_next     = ThreadEnterGHC;
1472
1473   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1474    * protect the increment operation on next_thread_id.
1475    * In future, we could use an atomic increment instead.
1476    */
1477   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1478   tso->id = next_thread_id++; 
1479   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1480
1481   tso->why_blocked  = NotBlocked;
1482   tso->blocked_exceptions = NULL;
1483
1484   tso->stack_size   = stack_size;
1485   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1486                               - TSO_STRUCT_SIZEW;
1487   tso->sp           = (P_)&(tso->stack) + stack_size;
1488
1489 #ifdef PROFILING
1490   tso->prof.CCCS = CCS_MAIN;
1491 #endif
1492
1493   /* put a stop frame on the stack */
1494   tso->sp -= sizeofW(StgStopFrame);
1495   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1496   tso->su = (StgUpdateFrame*)tso->sp;
1497
1498   // ToDo: check this
1499 #if defined(GRAN)
1500   tso->link = END_TSO_QUEUE;
1501   /* uses more flexible routine in GranSim */
1502   insertThread(tso, CurrentProc);
1503 #else
1504   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1505    * from its creation
1506    */
1507 #endif
1508
1509 #if defined(GRAN) 
1510   if (RtsFlags.GranFlags.GranSimStats.Full) 
1511     DumpGranEvent(GR_START,tso);
1512 #elif defined(PAR)
1513   if (RtsFlags.ParFlags.ParStats.Full) 
1514     DumpGranEvent(GR_STARTQ,tso);
1515   /* HACk to avoid SCHEDULE 
1516      LastTSO = tso; */
1517 #endif
1518
1519   /* Link the new thread on the global thread list.
1520    */
1521   tso->global_link = all_threads;
1522   all_threads = tso;
1523
1524 #if defined(DIST)
1525   tso->dist.priority = MandatoryPriority; //by default that is...
1526 #endif
1527
1528 #if defined(GRAN)
1529   tso->gran.pri = pri;
1530 # if defined(DEBUG)
1531   tso->gran.magic = TSO_MAGIC; // debugging only
1532 # endif
1533   tso->gran.sparkname   = 0;
1534   tso->gran.startedat   = CURRENT_TIME; 
1535   tso->gran.exported    = 0;
1536   tso->gran.basicblocks = 0;
1537   tso->gran.allocs      = 0;
1538   tso->gran.exectime    = 0;
1539   tso->gran.fetchtime   = 0;
1540   tso->gran.fetchcount  = 0;
1541   tso->gran.blocktime   = 0;
1542   tso->gran.blockcount  = 0;
1543   tso->gran.blockedat   = 0;
1544   tso->gran.globalsparks = 0;
1545   tso->gran.localsparks  = 0;
1546   if (RtsFlags.GranFlags.Light)
1547     tso->gran.clock  = Now; /* local clock */
1548   else
1549     tso->gran.clock  = 0;
1550
1551   IF_DEBUG(gran,printTSO(tso));
1552 #elif defined(PAR)
1553 # if defined(DEBUG)
1554   tso->par.magic = TSO_MAGIC; // debugging only
1555 # endif
1556   tso->par.sparkname   = 0;
1557   tso->par.startedat   = CURRENT_TIME; 
1558   tso->par.exported    = 0;
1559   tso->par.basicblocks = 0;
1560   tso->par.allocs      = 0;
1561   tso->par.exectime    = 0;
1562   tso->par.fetchtime   = 0;
1563   tso->par.fetchcount  = 0;
1564   tso->par.blocktime   = 0;
1565   tso->par.blockcount  = 0;
1566   tso->par.blockedat   = 0;
1567   tso->par.globalsparks = 0;
1568   tso->par.localsparks  = 0;
1569 #endif
1570
1571 #if defined(GRAN)
1572   globalGranStats.tot_threads_created++;
1573   globalGranStats.threads_created_on_PE[CurrentProc]++;
1574   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1575   globalGranStats.tot_sq_probes++;
1576 #elif defined(PAR)
1577   // collect parallel global statistics (currently done together with GC stats)
1578   if (RtsFlags.ParFlags.ParStats.Global &&
1579       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1580     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1581     globalParStats.tot_threads_created++;
1582   }
1583 #endif 
1584
1585 #if defined(GRAN)
1586   IF_GRAN_DEBUG(pri,
1587                 belch("==__ schedule: Created TSO %d (%p);",
1588                       CurrentProc, tso, tso->id));
1589 #elif defined(PAR)
1590     IF_PAR_DEBUG(verbose,
1591                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1592                        tso->id, tso, advisory_thread_count));
1593 #else
1594   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1595                                  tso->id, tso->stack_size));
1596 #endif    
1597   return tso;
1598 }
1599
1600 #if defined(PAR)
1601 /* RFP:
1602    all parallel thread creation calls should fall through the following routine.
1603 */
1604 StgTSO *
1605 createSparkThread(rtsSpark spark) 
1606 { StgTSO *tso;
1607   ASSERT(spark != (rtsSpark)NULL);
1608   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1609   { threadsIgnored++;
1610     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1611           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1612     return END_TSO_QUEUE;
1613   }
1614   else
1615   { threadsCreated++;
1616     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1617     if (tso==END_TSO_QUEUE)     
1618       barf("createSparkThread: Cannot create TSO");
1619 #if defined(DIST)
1620     tso->priority = AdvisoryPriority;
1621 #endif
1622     pushClosure(tso,spark);
1623     PUSH_ON_RUN_QUEUE(tso);
1624     advisory_thread_count++;    
1625   }
1626   return tso;
1627 }
1628 #endif
1629
1630 /*
1631   Turn a spark into a thread.
1632   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1633 */
1634 #if defined(PAR)
1635 //@cindex activateSpark
1636 StgTSO *
1637 activateSpark (rtsSpark spark) 
1638 {
1639   StgTSO *tso;
1640
1641   tso = createSparkThread(spark);
1642   if (RtsFlags.ParFlags.ParStats.Full) {   
1643     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1644     IF_PAR_DEBUG(verbose,
1645                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1646                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1647   }
1648   // ToDo: fwd info on local/global spark to thread -- HWL
1649   // tso->gran.exported =  spark->exported;
1650   // tso->gran.locked =   !spark->global;
1651   // tso->gran.sparkname = spark->name;
1652
1653   return tso;
1654 }
1655 #endif
1656
1657 /* ---------------------------------------------------------------------------
1658  * scheduleThread()
1659  *
1660  * scheduleThread puts a thread on the head of the runnable queue.
1661  * This will usually be done immediately after a thread is created.
1662  * The caller of scheduleThread must create the thread using e.g.
1663  * createThread and push an appropriate closure
1664  * on this thread's stack before the scheduler is invoked.
1665  * ------------------------------------------------------------------------ */
1666
1667 void
1668 scheduleThread(StgTSO *tso)
1669 {
1670   if (tso==END_TSO_QUEUE){    
1671     schedule();
1672     return;
1673   }
1674
1675   ACQUIRE_LOCK(&sched_mutex);
1676
1677   /* Put the new thread on the head of the runnable queue.  The caller
1678    * better push an appropriate closure on this thread's stack
1679    * beforehand.  In the SMP case, the thread may start running as
1680    * soon as we release the scheduler lock below.
1681    */
1682   PUSH_ON_RUN_QUEUE(tso);
1683   THREAD_RUNNABLE();
1684
1685 #if 0
1686   IF_DEBUG(scheduler,printTSO(tso));
1687 #endif
1688   RELEASE_LOCK(&sched_mutex);
1689 }
1690
1691 /* ---------------------------------------------------------------------------
1692  * startTasks()
1693  *
1694  * Start up Posix threads to run each of the scheduler tasks.
1695  * I believe the task ids are not needed in the system as defined.
1696  *  KH @ 25/10/99
1697  * ------------------------------------------------------------------------ */
1698
1699 #if defined(PAR) || defined(SMP)
1700 void
1701 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1702 {
1703   scheduleThread(END_TSO_QUEUE);
1704 }
1705 #endif
1706
1707 /* ---------------------------------------------------------------------------
1708  * initScheduler()
1709  *
1710  * Initialise the scheduler.  This resets all the queues - if the
1711  * queues contained any threads, they'll be garbage collected at the
1712  * next pass.
1713  *
1714  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1715  * ------------------------------------------------------------------------ */
1716
1717 #ifdef SMP
1718 static void
1719 term_handler(int sig STG_UNUSED)
1720 {
1721   stat_workerStop();
1722   ACQUIRE_LOCK(&term_mutex);
1723   await_death--;
1724   RELEASE_LOCK(&term_mutex);
1725   pthread_exit(NULL);
1726 }
1727 #endif
1728
1729 //@cindex initScheduler
1730 void 
1731 initScheduler(void)
1732 {
1733 #if defined(GRAN)
1734   nat i;
1735
1736   for (i=0; i<=MAX_PROC; i++) {
1737     run_queue_hds[i]      = END_TSO_QUEUE;
1738     run_queue_tls[i]      = END_TSO_QUEUE;
1739     blocked_queue_hds[i]  = END_TSO_QUEUE;
1740     blocked_queue_tls[i]  = END_TSO_QUEUE;
1741     ccalling_threadss[i]  = END_TSO_QUEUE;
1742     sleeping_queue        = END_TSO_QUEUE;
1743   }
1744 #else
1745   run_queue_hd      = END_TSO_QUEUE;
1746   run_queue_tl      = END_TSO_QUEUE;
1747   blocked_queue_hd  = END_TSO_QUEUE;
1748   blocked_queue_tl  = END_TSO_QUEUE;
1749   sleeping_queue    = END_TSO_QUEUE;
1750 #endif 
1751
1752   suspended_ccalling_threads  = END_TSO_QUEUE;
1753
1754   main_threads = NULL;
1755   all_threads  = END_TSO_QUEUE;
1756
1757   context_switch = 0;
1758   interrupted    = 0;
1759
1760   RtsFlags.ConcFlags.ctxtSwitchTicks =
1761       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1762
1763   /* Install the SIGHUP handler */
1764 #ifdef SMP
1765   {
1766     struct sigaction action,oact;
1767
1768     action.sa_handler = term_handler;
1769     sigemptyset(&action.sa_mask);
1770     action.sa_flags = 0;
1771     if (sigaction(SIGTERM, &action, &oact) != 0) {
1772       barf("can't install TERM handler");
1773     }
1774   }
1775 #endif
1776
1777 #ifdef SMP
1778   /* Allocate N Capabilities */
1779   {
1780     nat i;
1781     Capability *cap, *prev;
1782     cap  = NULL;
1783     prev = NULL;
1784     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1785       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1786       cap->link = prev;
1787       prev = cap;
1788     }
1789     free_capabilities = cap;
1790     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1791   }
1792   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1793                              n_free_capabilities););
1794 #endif
1795
1796 #if defined(SMP) || defined(PAR)
1797   initSparkPools();
1798 #endif
1799 }
1800
1801 #ifdef SMP
1802 void
1803 startTasks( void )
1804 {
1805   nat i;
1806   int r;
1807   pthread_t tid;
1808   
1809   /* make some space for saving all the thread ids */
1810   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1811                             "initScheduler:task_ids");
1812   
1813   /* and create all the threads */
1814   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1815     r = pthread_create(&tid,NULL,taskStart,NULL);
1816     if (r != 0) {
1817       barf("startTasks: Can't create new Posix thread");
1818     }
1819     task_ids[i].id = tid;
1820     task_ids[i].mut_time = 0.0;
1821     task_ids[i].mut_etime = 0.0;
1822     task_ids[i].gc_time = 0.0;
1823     task_ids[i].gc_etime = 0.0;
1824     task_ids[i].elapsedtimestart = elapsedtime();
1825     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1826   }
1827 }
1828 #endif
1829
1830 void
1831 exitScheduler( void )
1832 {
1833 #ifdef SMP
1834   nat i;
1835
1836   /* Don't want to use pthread_cancel, since we'd have to install
1837    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1838    * all our locks.
1839    */
1840 #if 0
1841   /* Cancel all our tasks */
1842   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1843     pthread_cancel(task_ids[i].id);
1844   }
1845   
1846   /* Wait for all the tasks to terminate */
1847   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1848     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1849                                task_ids[i].id));
1850     pthread_join(task_ids[i].id, NULL);
1851   }
1852 #endif
1853
1854   /* Send 'em all a SIGHUP.  That should shut 'em up.
1855    */
1856   await_death = RtsFlags.ParFlags.nNodes;
1857   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1858     pthread_kill(task_ids[i].id,SIGTERM);
1859   }
1860   while (await_death > 0) {
1861     sched_yield();
1862   }
1863 #endif
1864 }
1865
1866 /* -----------------------------------------------------------------------------
1867    Managing the per-task allocation areas.
1868    
1869    Each capability comes with an allocation area.  These are
1870    fixed-length block lists into which allocation can be done.
1871
1872    ToDo: no support for two-space collection at the moment???
1873    -------------------------------------------------------------------------- */
1874
1875 /* -----------------------------------------------------------------------------
1876  * waitThread is the external interface for running a new computation
1877  * and waiting for the result.
1878  *
1879  * In the non-SMP case, we create a new main thread, push it on the 
1880  * main-thread stack, and invoke the scheduler to run it.  The
1881  * scheduler will return when the top main thread on the stack has
1882  * completed or died, and fill in the necessary fields of the
1883  * main_thread structure.
1884  *
1885  * In the SMP case, we create a main thread as before, but we then
1886  * create a new condition variable and sleep on it.  When our new
1887  * main thread has completed, we'll be woken up and the status/result
1888  * will be in the main_thread struct.
1889  * -------------------------------------------------------------------------- */
1890
1891 int 
1892 howManyThreadsAvail ( void )
1893 {
1894    int i = 0;
1895    StgTSO* q;
1896    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1897       i++;
1898    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1899       i++;
1900    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1901       i++;
1902    return i;
1903 }
1904
1905 void
1906 finishAllThreads ( void )
1907 {
1908    do {
1909       while (run_queue_hd != END_TSO_QUEUE) {
1910          waitThread ( run_queue_hd, NULL );
1911       }
1912       while (blocked_queue_hd != END_TSO_QUEUE) {
1913          waitThread ( blocked_queue_hd, NULL );
1914       }
1915       while (sleeping_queue != END_TSO_QUEUE) {
1916          waitThread ( blocked_queue_hd, NULL );
1917       }
1918    } while 
1919       (blocked_queue_hd != END_TSO_QUEUE || 
1920        run_queue_hd     != END_TSO_QUEUE ||
1921        sleeping_queue   != END_TSO_QUEUE);
1922 }
1923
1924 SchedulerStatus
1925 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1926 {
1927   StgMainThread *m;
1928   SchedulerStatus stat;
1929
1930   ACQUIRE_LOCK(&sched_mutex);
1931   
1932   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1933
1934   m->tso = tso;
1935   m->ret = ret;
1936   m->stat = NoStatus;
1937 #ifdef SMP
1938   pthread_cond_init(&m->wakeup, NULL);
1939 #endif
1940
1941   m->link = main_threads;
1942   main_threads = m;
1943
1944   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
1945                               m->tso->id));
1946
1947 #ifdef SMP
1948   do {
1949     pthread_cond_wait(&m->wakeup, &sched_mutex);
1950   } while (m->stat == NoStatus);
1951 #elif defined(GRAN)
1952   /* GranSim specific init */
1953   CurrentTSO = m->tso;                // the TSO to run
1954   procStatus[MainProc] = Busy;        // status of main PE
1955   CurrentProc = MainProc;             // PE to run it on
1956
1957   schedule();
1958 #else
1959   schedule();
1960   ASSERT(m->stat != NoStatus);
1961 #endif
1962
1963   stat = m->stat;
1964
1965 #ifdef SMP
1966   pthread_cond_destroy(&m->wakeup);
1967 #endif
1968
1969   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
1970                               m->tso->id));
1971   free(m);
1972
1973   RELEASE_LOCK(&sched_mutex);
1974
1975   return stat;
1976 }
1977
1978 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1979 //@subsection Run queue code 
1980
1981 #if 0
1982 /* 
1983    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1984        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1985        implicit global variable that has to be correct when calling these
1986        fcts -- HWL 
1987 */
1988
1989 /* Put the new thread on the head of the runnable queue.
1990  * The caller of createThread better push an appropriate closure
1991  * on this thread's stack before the scheduler is invoked.
1992  */
1993 static /* inline */ void
1994 add_to_run_queue(tso)
1995 StgTSO* tso; 
1996 {
1997   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1998   tso->link = run_queue_hd;
1999   run_queue_hd = tso;
2000   if (run_queue_tl == END_TSO_QUEUE) {
2001     run_queue_tl = tso;
2002   }
2003 }
2004
2005 /* Put the new thread at the end of the runnable queue. */
2006 static /* inline */ void
2007 push_on_run_queue(tso)
2008 StgTSO* tso; 
2009 {
2010   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2011   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2012   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2013   if (run_queue_hd == END_TSO_QUEUE) {
2014     run_queue_hd = tso;
2015   } else {
2016     run_queue_tl->link = tso;
2017   }
2018   run_queue_tl = tso;
2019 }
2020
2021 /* 
2022    Should be inlined because it's used very often in schedule.  The tso
2023    argument is actually only needed in GranSim, where we want to have the
2024    possibility to schedule *any* TSO on the run queue, irrespective of the
2025    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2026    the run queue and dequeue the tso, adjusting the links in the queue. 
2027 */
2028 //@cindex take_off_run_queue
2029 static /* inline */ StgTSO*
2030 take_off_run_queue(StgTSO *tso) {
2031   StgTSO *t, *prev;
2032
2033   /* 
2034      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2035
2036      if tso is specified, unlink that tso from the run_queue (doesn't have
2037      to be at the beginning of the queue); GranSim only 
2038   */
2039   if (tso!=END_TSO_QUEUE) {
2040     /* find tso in queue */
2041     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2042          t!=END_TSO_QUEUE && t!=tso;
2043          prev=t, t=t->link) 
2044       /* nothing */ ;
2045     ASSERT(t==tso);
2046     /* now actually dequeue the tso */
2047     if (prev!=END_TSO_QUEUE) {
2048       ASSERT(run_queue_hd!=t);
2049       prev->link = t->link;
2050     } else {
2051       /* t is at beginning of thread queue */
2052       ASSERT(run_queue_hd==t);
2053       run_queue_hd = t->link;
2054     }
2055     /* t is at end of thread queue */
2056     if (t->link==END_TSO_QUEUE) {
2057       ASSERT(t==run_queue_tl);
2058       run_queue_tl = prev;
2059     } else {
2060       ASSERT(run_queue_tl!=t);
2061     }
2062     t->link = END_TSO_QUEUE;
2063   } else {
2064     /* take tso from the beginning of the queue; std concurrent code */
2065     t = run_queue_hd;
2066     if (t != END_TSO_QUEUE) {
2067       run_queue_hd = t->link;
2068       t->link = END_TSO_QUEUE;
2069       if (run_queue_hd == END_TSO_QUEUE) {
2070         run_queue_tl = END_TSO_QUEUE;
2071       }
2072     }
2073   }
2074   return t;
2075 }
2076
2077 #endif /* 0 */
2078
2079 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2080 //@subsection Garbage Collextion Routines
2081
2082 /* ---------------------------------------------------------------------------
2083    Where are the roots that we know about?
2084
2085         - all the threads on the runnable queue
2086         - all the threads on the blocked queue
2087         - all the threads on the sleeping queue
2088         - all the thread currently executing a _ccall_GC
2089         - all the "main threads"
2090      
2091    ------------------------------------------------------------------------ */
2092
2093 /* This has to be protected either by the scheduler monitor, or by the
2094         garbage collection monitor (probably the latter).
2095         KH @ 25/10/99
2096 */
2097
2098 static void
2099 GetRoots(evac_fn evac)
2100 {
2101   StgMainThread *m;
2102
2103 #if defined(GRAN)
2104   {
2105     nat i;
2106     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2107       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2108           evac((StgClosure **)&run_queue_hds[i]);
2109       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2110           evac((StgClosure **)&run_queue_tls[i]);
2111       
2112       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2113           evac((StgClosure **)&blocked_queue_hds[i]);
2114       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2115           evac((StgClosure **)&blocked_queue_tls[i]);
2116       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2117           evac((StgClosure **)&ccalling_threads[i]);
2118     }
2119   }
2120
2121   markEventQueue();
2122
2123 #else /* !GRAN */
2124   if (run_queue_hd != END_TSO_QUEUE) {
2125       ASSERT(run_queue_tl != END_TSO_QUEUE);
2126       evac((StgClosure **)&run_queue_hd);
2127       evac((StgClosure **)&run_queue_tl);
2128   }
2129   
2130   if (blocked_queue_hd != END_TSO_QUEUE) {
2131       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2132       evac((StgClosure **)&blocked_queue_hd);
2133       evac((StgClosure **)&blocked_queue_tl);
2134   }
2135   
2136   if (sleeping_queue != END_TSO_QUEUE) {
2137       evac((StgClosure **)&sleeping_queue);
2138   }
2139 #endif 
2140
2141   for (m = main_threads; m != NULL; m = m->link) {
2142       evac((StgClosure **)&m->tso);
2143   }
2144   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2145       evac((StgClosure **)&suspended_ccalling_threads);
2146   }
2147
2148 #if defined(SMP) || defined(PAR) || defined(GRAN)
2149   markSparkQueue(evac);
2150 #endif
2151 }
2152
2153 /* -----------------------------------------------------------------------------
2154    performGC
2155
2156    This is the interface to the garbage collector from Haskell land.
2157    We provide this so that external C code can allocate and garbage
2158    collect when called from Haskell via _ccall_GC.
2159
2160    It might be useful to provide an interface whereby the programmer
2161    can specify more roots (ToDo).
2162    
2163    This needs to be protected by the GC condition variable above.  KH.
2164    -------------------------------------------------------------------------- */
2165
2166 void (*extra_roots)(evac_fn);
2167
2168 void
2169 performGC(void)
2170 {
2171   GarbageCollect(GetRoots,rtsFalse);
2172 }
2173
2174 void
2175 performMajorGC(void)
2176 {
2177   GarbageCollect(GetRoots,rtsTrue);
2178 }
2179
2180 static void
2181 AllRoots(evac_fn evac)
2182 {
2183     GetRoots(evac);             // the scheduler's roots
2184     extra_roots(evac);          // the user's roots
2185 }
2186
2187 void
2188 performGCWithRoots(void (*get_roots)(evac_fn))
2189 {
2190   extra_roots = get_roots;
2191   GarbageCollect(AllRoots,rtsFalse);
2192 }
2193
2194 /* -----------------------------------------------------------------------------
2195    Stack overflow
2196
2197    If the thread has reached its maximum stack size, then raise the
2198    StackOverflow exception in the offending thread.  Otherwise
2199    relocate the TSO into a larger chunk of memory and adjust its stack
2200    size appropriately.
2201    -------------------------------------------------------------------------- */
2202
2203 static StgTSO *
2204 threadStackOverflow(StgTSO *tso)
2205 {
2206   nat new_stack_size, new_tso_size, diff, stack_words;
2207   StgPtr new_sp;
2208   StgTSO *dest;
2209
2210   IF_DEBUG(sanity,checkTSO(tso));
2211   if (tso->stack_size >= tso->max_stack_size) {
2212
2213     IF_DEBUG(gc,
2214              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2215                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2216              /* If we're debugging, just print out the top of the stack */
2217              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2218                                               tso->sp+64)));
2219
2220     /* Send this thread the StackOverflow exception */
2221     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2222     return tso;
2223   }
2224
2225   /* Try to double the current stack size.  If that takes us over the
2226    * maximum stack size for this thread, then use the maximum instead.
2227    * Finally round up so the TSO ends up as a whole number of blocks.
2228    */
2229   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2230   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2231                                        TSO_STRUCT_SIZE)/sizeof(W_);
2232   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2233   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2234
2235   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2236
2237   dest = (StgTSO *)allocate(new_tso_size);
2238   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2239
2240   /* copy the TSO block and the old stack into the new area */
2241   memcpy(dest,tso,TSO_STRUCT_SIZE);
2242   stack_words = tso->stack + tso->stack_size - tso->sp;
2243   new_sp = (P_)dest + new_tso_size - stack_words;
2244   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2245
2246   /* relocate the stack pointers... */
2247   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2248   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2249   dest->sp    = new_sp;
2250   dest->stack_size = new_stack_size;
2251         
2252   /* and relocate the update frame list */
2253   relocate_stack(dest, diff);
2254
2255   /* Mark the old TSO as relocated.  We have to check for relocated
2256    * TSOs in the garbage collector and any primops that deal with TSOs.
2257    *
2258    * It's important to set the sp and su values to just beyond the end
2259    * of the stack, so we don't attempt to scavenge any part of the
2260    * dead TSO's stack.
2261    */
2262   tso->what_next = ThreadRelocated;
2263   tso->link = dest;
2264   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2265   tso->su = (StgUpdateFrame *)tso->sp;
2266   tso->why_blocked = NotBlocked;
2267   dest->mut_link = NULL;
2268
2269   IF_PAR_DEBUG(verbose,
2270                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2271                      tso->id, tso, tso->stack_size);
2272                /* If we're debugging, just print out the top of the stack */
2273                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2274                                                 tso->sp+64)));
2275   
2276   IF_DEBUG(sanity,checkTSO(tso));
2277 #if 0
2278   IF_DEBUG(scheduler,printTSO(dest));
2279 #endif
2280
2281   return dest;
2282 }
2283
2284 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2285 //@subsection Blocking Queue Routines
2286
2287 /* ---------------------------------------------------------------------------
2288    Wake up a queue that was blocked on some resource.
2289    ------------------------------------------------------------------------ */
2290
2291 #if defined(GRAN)
2292 static inline void
2293 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2294 {
2295 }
2296 #elif defined(PAR)
2297 static inline void
2298 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2299 {
2300   /* write RESUME events to log file and
2301      update blocked and fetch time (depending on type of the orig closure) */
2302   if (RtsFlags.ParFlags.ParStats.Full) {
2303     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2304                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2305                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2306     if (EMPTY_RUN_QUEUE())
2307       emitSchedule = rtsTrue;
2308
2309     switch (get_itbl(node)->type) {
2310         case FETCH_ME_BQ:
2311           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2312           break;
2313         case RBH:
2314         case FETCH_ME:
2315         case BLACKHOLE_BQ:
2316           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2317           break;
2318 #ifdef DIST
2319         case MVAR:
2320           break;
2321 #endif    
2322         default:
2323           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2324         }
2325       }
2326 }
2327 #endif
2328
2329 #if defined(GRAN)
2330 static StgBlockingQueueElement *
2331 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2332 {
2333     StgTSO *tso;
2334     PEs node_loc, tso_loc;
2335
2336     node_loc = where_is(node); // should be lifted out of loop
2337     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2338     tso_loc = where_is((StgClosure *)tso);
2339     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2340       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2341       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2342       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2343       // insertThread(tso, node_loc);
2344       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2345                 ResumeThread,
2346                 tso, node, (rtsSpark*)NULL);
2347       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2348       // len_local++;
2349       // len++;
2350     } else { // TSO is remote (actually should be FMBQ)
2351       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2352                                   RtsFlags.GranFlags.Costs.gunblocktime +
2353                                   RtsFlags.GranFlags.Costs.latency;
2354       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2355                 UnblockThread,
2356                 tso, node, (rtsSpark*)NULL);
2357       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2358       // len++;
2359     }
2360     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2361     IF_GRAN_DEBUG(bq,
2362                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2363                           (node_loc==tso_loc ? "Local" : "Global"), 
2364                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2365     tso->block_info.closure = NULL;
2366     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2367                              tso->id, tso));
2368 }
2369 #elif defined(PAR)
2370 static StgBlockingQueueElement *
2371 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2372 {
2373     StgBlockingQueueElement *next;
2374
2375     switch (get_itbl(bqe)->type) {
2376     case TSO:
2377       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2378       /* if it's a TSO just push it onto the run_queue */
2379       next = bqe->link;
2380       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2381       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2382       THREAD_RUNNABLE();
2383       unblockCount(bqe, node);
2384       /* reset blocking status after dumping event */
2385       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2386       break;
2387
2388     case BLOCKED_FETCH:
2389       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2390       next = bqe->link;
2391       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2392       PendingFetches = (StgBlockedFetch *)bqe;
2393       break;
2394
2395 # if defined(DEBUG)
2396       /* can ignore this case in a non-debugging setup; 
2397          see comments on RBHSave closures above */
2398     case CONSTR:
2399       /* check that the closure is an RBHSave closure */
2400       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2401              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2402              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2403       break;
2404
2405     default:
2406       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2407            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2408            (StgClosure *)bqe);
2409 # endif
2410     }
2411   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2412   return next;
2413 }
2414
2415 #else /* !GRAN && !PAR */
2416 static StgTSO *
2417 unblockOneLocked(StgTSO *tso)
2418 {
2419   StgTSO *next;
2420
2421   ASSERT(get_itbl(tso)->type == TSO);
2422   ASSERT(tso->why_blocked != NotBlocked);
2423   tso->why_blocked = NotBlocked;
2424   next = tso->link;
2425   PUSH_ON_RUN_QUEUE(tso);
2426   THREAD_RUNNABLE();
2427   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2428   return next;
2429 }
2430 #endif
2431
2432 #if defined(GRAN) || defined(PAR)
2433 inline StgBlockingQueueElement *
2434 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2435 {
2436   ACQUIRE_LOCK(&sched_mutex);
2437   bqe = unblockOneLocked(bqe, node);
2438   RELEASE_LOCK(&sched_mutex);
2439   return bqe;
2440 }
2441 #else
2442 inline StgTSO *
2443 unblockOne(StgTSO *tso)
2444 {
2445   ACQUIRE_LOCK(&sched_mutex);
2446   tso = unblockOneLocked(tso);
2447   RELEASE_LOCK(&sched_mutex);
2448   return tso;
2449 }
2450 #endif
2451
2452 #if defined(GRAN)
2453 void 
2454 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2455 {
2456   StgBlockingQueueElement *bqe;
2457   PEs node_loc;
2458   nat len = 0; 
2459
2460   IF_GRAN_DEBUG(bq, 
2461                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2462                       node, CurrentProc, CurrentTime[CurrentProc], 
2463                       CurrentTSO->id, CurrentTSO));
2464
2465   node_loc = where_is(node);
2466
2467   ASSERT(q == END_BQ_QUEUE ||
2468          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2469          get_itbl(q)->type == CONSTR); // closure (type constructor)
2470   ASSERT(is_unique(node));
2471
2472   /* FAKE FETCH: magically copy the node to the tso's proc;
2473      no Fetch necessary because in reality the node should not have been 
2474      moved to the other PE in the first place
2475   */
2476   if (CurrentProc!=node_loc) {
2477     IF_GRAN_DEBUG(bq, 
2478                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2479                         node, node_loc, CurrentProc, CurrentTSO->id, 
2480                         // CurrentTSO, where_is(CurrentTSO),
2481                         node->header.gran.procs));
2482     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2483     IF_GRAN_DEBUG(bq, 
2484                   belch("## new bitmask of node %p is %#x",
2485                         node, node->header.gran.procs));
2486     if (RtsFlags.GranFlags.GranSimStats.Global) {
2487       globalGranStats.tot_fake_fetches++;
2488     }
2489   }
2490
2491   bqe = q;
2492   // ToDo: check: ASSERT(CurrentProc==node_loc);
2493   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2494     //next = bqe->link;
2495     /* 
2496        bqe points to the current element in the queue
2497        next points to the next element in the queue
2498     */
2499     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2500     //tso_loc = where_is(tso);
2501     len++;
2502     bqe = unblockOneLocked(bqe, node);
2503   }
2504
2505   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2506      the closure to make room for the anchor of the BQ */
2507   if (bqe!=END_BQ_QUEUE) {
2508     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2509     /*
2510     ASSERT((info_ptr==&RBH_Save_0_info) ||
2511            (info_ptr==&RBH_Save_1_info) ||
2512            (info_ptr==&RBH_Save_2_info));
2513     */
2514     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2515     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2516     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2517
2518     IF_GRAN_DEBUG(bq,
2519                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2520                         node, info_type(node)));
2521   }
2522
2523   /* statistics gathering */
2524   if (RtsFlags.GranFlags.GranSimStats.Global) {
2525     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2526     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2527     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2528     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2529   }
2530   IF_GRAN_DEBUG(bq,
2531                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2532                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2533 }
2534 #elif defined(PAR)
2535 void 
2536 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2537 {
2538   StgBlockingQueueElement *bqe;
2539
2540   ACQUIRE_LOCK(&sched_mutex);
2541
2542   IF_PAR_DEBUG(verbose, 
2543                belch("##-_ AwBQ for node %p on [%x]: ",
2544                      node, mytid));
2545 #ifdef DIST  
2546   //RFP
2547   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2548     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2549     return;
2550   }
2551 #endif
2552   
2553   ASSERT(q == END_BQ_QUEUE ||
2554          get_itbl(q)->type == TSO ||           
2555          get_itbl(q)->type == BLOCKED_FETCH || 
2556          get_itbl(q)->type == CONSTR); 
2557
2558   bqe = q;
2559   while (get_itbl(bqe)->type==TSO || 
2560          get_itbl(bqe)->type==BLOCKED_FETCH) {
2561     bqe = unblockOneLocked(bqe, node);
2562   }
2563   RELEASE_LOCK(&sched_mutex);
2564 }
2565
2566 #else   /* !GRAN && !PAR */
2567 void
2568 awakenBlockedQueue(StgTSO *tso)
2569 {
2570   ACQUIRE_LOCK(&sched_mutex);
2571   while (tso != END_TSO_QUEUE) {
2572     tso = unblockOneLocked(tso);
2573   }
2574   RELEASE_LOCK(&sched_mutex);
2575 }
2576 #endif
2577
2578 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2579 //@subsection Exception Handling Routines
2580
2581 /* ---------------------------------------------------------------------------
2582    Interrupt execution
2583    - usually called inside a signal handler so it mustn't do anything fancy.   
2584    ------------------------------------------------------------------------ */
2585
2586 void
2587 interruptStgRts(void)
2588 {
2589     interrupted    = 1;
2590     context_switch = 1;
2591 }
2592
2593 /* -----------------------------------------------------------------------------
2594    Unblock a thread
2595
2596    This is for use when we raise an exception in another thread, which
2597    may be blocked.
2598    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2599    -------------------------------------------------------------------------- */
2600
2601 #if defined(GRAN) || defined(PAR)
2602 /*
2603   NB: only the type of the blocking queue is different in GranSim and GUM
2604       the operations on the queue-elements are the same
2605       long live polymorphism!
2606 */
2607 static void
2608 unblockThread(StgTSO *tso)
2609 {
2610   StgBlockingQueueElement *t, **last;
2611
2612   ACQUIRE_LOCK(&sched_mutex);
2613   switch (tso->why_blocked) {
2614
2615   case NotBlocked:
2616     return;  /* not blocked */
2617
2618   case BlockedOnMVar:
2619     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2620     {
2621       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2622       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2623
2624       last = (StgBlockingQueueElement **)&mvar->head;
2625       for (t = (StgBlockingQueueElement *)mvar->head; 
2626            t != END_BQ_QUEUE; 
2627            last = &t->link, last_tso = t, t = t->link) {
2628         if (t == (StgBlockingQueueElement *)tso) {
2629           *last = (StgBlockingQueueElement *)tso->link;
2630           if (mvar->tail == tso) {
2631             mvar->tail = (StgTSO *)last_tso;
2632           }
2633           goto done;
2634         }
2635       }
2636       barf("unblockThread (MVAR): TSO not found");
2637     }
2638
2639   case BlockedOnBlackHole:
2640     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2641     {
2642       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2643
2644       last = &bq->blocking_queue;
2645       for (t = bq->blocking_queue; 
2646            t != END_BQ_QUEUE; 
2647            last = &t->link, t = t->link) {
2648         if (t == (StgBlockingQueueElement *)tso) {
2649           *last = (StgBlockingQueueElement *)tso->link;
2650           goto done;
2651         }
2652       }
2653       barf("unblockThread (BLACKHOLE): TSO not found");
2654     }
2655
2656   case BlockedOnException:
2657     {
2658       StgTSO *target  = tso->block_info.tso;
2659
2660       ASSERT(get_itbl(target)->type == TSO);
2661
2662       if (target->what_next == ThreadRelocated) {
2663           target = target->link;
2664           ASSERT(get_itbl(target)->type == TSO);
2665       }
2666
2667       ASSERT(target->blocked_exceptions != NULL);
2668
2669       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2670       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2671            t != END_BQ_QUEUE; 
2672            last = &t->link, t = t->link) {
2673         ASSERT(get_itbl(t)->type == TSO);
2674         if (t == (StgBlockingQueueElement *)tso) {
2675           *last = (StgBlockingQueueElement *)tso->link;
2676           goto done;
2677         }
2678       }
2679       barf("unblockThread (Exception): TSO not found");
2680     }
2681
2682   case BlockedOnRead:
2683   case BlockedOnWrite:
2684     {
2685       /* take TSO off blocked_queue */
2686       StgBlockingQueueElement *prev = NULL;
2687       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2688            prev = t, t = t->link) {
2689         if (t == (StgBlockingQueueElement *)tso) {
2690           if (prev == NULL) {
2691             blocked_queue_hd = (StgTSO *)t->link;
2692             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2693               blocked_queue_tl = END_TSO_QUEUE;
2694             }
2695           } else {
2696             prev->link = t->link;
2697             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2698               blocked_queue_tl = (StgTSO *)prev;
2699             }
2700           }
2701           goto done;
2702         }
2703       }
2704       barf("unblockThread (I/O): TSO not found");
2705     }
2706
2707   case BlockedOnDelay:
2708     {
2709       /* take TSO off sleeping_queue */
2710       StgBlockingQueueElement *prev = NULL;
2711       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2712            prev = t, t = t->link) {
2713         if (t == (StgBlockingQueueElement *)tso) {
2714           if (prev == NULL) {
2715             sleeping_queue = (StgTSO *)t->link;
2716           } else {
2717             prev->link = t->link;
2718           }
2719           goto done;
2720         }
2721       }
2722       barf("unblockThread (I/O): TSO not found");
2723     }
2724
2725   default:
2726     barf("unblockThread");
2727   }
2728
2729  done:
2730   tso->link = END_TSO_QUEUE;
2731   tso->why_blocked = NotBlocked;
2732   tso->block_info.closure = NULL;
2733   PUSH_ON_RUN_QUEUE(tso);
2734   RELEASE_LOCK(&sched_mutex);
2735 }
2736 #else
2737 static void
2738 unblockThread(StgTSO *tso)
2739 {
2740   StgTSO *t, **last;
2741
2742   ACQUIRE_LOCK(&sched_mutex);
2743   switch (tso->why_blocked) {
2744
2745   case NotBlocked:
2746     return;  /* not blocked */
2747
2748   case BlockedOnMVar:
2749     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2750     {
2751       StgTSO *last_tso = END_TSO_QUEUE;
2752       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2753
2754       last = &mvar->head;
2755       for (t = mvar->head; t != END_TSO_QUEUE; 
2756            last = &t->link, last_tso = t, t = t->link) {
2757         if (t == tso) {
2758           *last = tso->link;
2759           if (mvar->tail == tso) {
2760             mvar->tail = last_tso;
2761           }
2762           goto done;
2763         }
2764       }
2765       barf("unblockThread (MVAR): TSO not found");
2766     }
2767
2768   case BlockedOnBlackHole:
2769     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2770     {
2771       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2772
2773       last = &bq->blocking_queue;
2774       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2775            last = &t->link, t = t->link) {
2776         if (t == tso) {
2777           *last = tso->link;
2778           goto done;
2779         }
2780       }
2781       barf("unblockThread (BLACKHOLE): TSO not found");
2782     }
2783
2784   case BlockedOnException:
2785     {
2786       StgTSO *target  = tso->block_info.tso;
2787
2788       ASSERT(get_itbl(target)->type == TSO);
2789
2790       while (target->what_next == ThreadRelocated) {
2791           target = target->link;
2792           ASSERT(get_itbl(target)->type == TSO);
2793       }
2794       
2795       ASSERT(target->blocked_exceptions != NULL);
2796
2797       last = &target->blocked_exceptions;
2798       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2799            last = &t->link, t = t->link) {
2800         ASSERT(get_itbl(t)->type == TSO);
2801         if (t == tso) {
2802           *last = tso->link;
2803           goto done;
2804         }
2805       }
2806       barf("unblockThread (Exception): TSO not found");
2807     }
2808
2809   case BlockedOnRead:
2810   case BlockedOnWrite:
2811     {
2812       StgTSO *prev = NULL;
2813       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2814            prev = t, t = t->link) {
2815         if (t == tso) {
2816           if (prev == NULL) {
2817             blocked_queue_hd = t->link;
2818             if (blocked_queue_tl == t) {
2819               blocked_queue_tl = END_TSO_QUEUE;
2820             }
2821           } else {
2822             prev->link = t->link;
2823             if (blocked_queue_tl == t) {
2824               blocked_queue_tl = prev;
2825             }
2826           }
2827           goto done;
2828         }
2829       }
2830       barf("unblockThread (I/O): TSO not found");
2831     }
2832
2833   case BlockedOnDelay:
2834     {
2835       StgTSO *prev = NULL;
2836       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2837            prev = t, t = t->link) {
2838         if (t == tso) {
2839           if (prev == NULL) {
2840             sleeping_queue = t->link;
2841           } else {
2842             prev->link = t->link;
2843           }
2844           goto done;
2845         }
2846       }
2847       barf("unblockThread (I/O): TSO not found");
2848     }
2849
2850   default:
2851     barf("unblockThread");
2852   }
2853
2854  done:
2855   tso->link = END_TSO_QUEUE;
2856   tso->why_blocked = NotBlocked;
2857   tso->block_info.closure = NULL;
2858   PUSH_ON_RUN_QUEUE(tso);
2859   RELEASE_LOCK(&sched_mutex);
2860 }
2861 #endif
2862
2863 /* -----------------------------------------------------------------------------
2864  * raiseAsync()
2865  *
2866  * The following function implements the magic for raising an
2867  * asynchronous exception in an existing thread.
2868  *
2869  * We first remove the thread from any queue on which it might be
2870  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2871  *
2872  * We strip the stack down to the innermost CATCH_FRAME, building
2873  * thunks in the heap for all the active computations, so they can 
2874  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2875  * an application of the handler to the exception, and push it on
2876  * the top of the stack.
2877  * 
2878  * How exactly do we save all the active computations?  We create an
2879  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2880  * AP_UPDs pushes everything from the corresponding update frame
2881  * upwards onto the stack.  (Actually, it pushes everything up to the
2882  * next update frame plus a pointer to the next AP_UPD object.
2883  * Entering the next AP_UPD object pushes more onto the stack until we
2884  * reach the last AP_UPD object - at which point the stack should look
2885  * exactly as it did when we killed the TSO and we can continue
2886  * execution by entering the closure on top of the stack.
2887  *
2888  * We can also kill a thread entirely - this happens if either (a) the 
2889  * exception passed to raiseAsync is NULL, or (b) there's no
2890  * CATCH_FRAME on the stack.  In either case, we strip the entire
2891  * stack and replace the thread with a zombie.
2892  *
2893  * -------------------------------------------------------------------------- */
2894  
2895 void 
2896 deleteThread(StgTSO *tso)
2897 {
2898   raiseAsync(tso,NULL);
2899 }
2900
2901 void
2902 raiseAsync(StgTSO *tso, StgClosure *exception)
2903 {
2904   StgUpdateFrame* su = tso->su;
2905   StgPtr          sp = tso->sp;
2906   
2907   /* Thread already dead? */
2908   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2909     return;
2910   }
2911
2912   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2913
2914   /* Remove it from any blocking queues */
2915   unblockThread(tso);
2916
2917   /* The stack freezing code assumes there's a closure pointer on
2918    * the top of the stack.  This isn't always the case with compiled
2919    * code, so we have to push a dummy closure on the top which just
2920    * returns to the next return address on the stack.
2921    */
2922   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2923     *(--sp) = (W_)&stg_dummy_ret_closure;
2924   }
2925
2926   while (1) {
2927     nat words = ((P_)su - (P_)sp) - 1;
2928     nat i;
2929     StgAP_UPD * ap;
2930
2931     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2932      * then build PAP(handler,exception,realworld#), and leave it on
2933      * top of the stack ready to enter.
2934      */
2935     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2936       StgCatchFrame *cf = (StgCatchFrame *)su;
2937       /* we've got an exception to raise, so let's pass it to the
2938        * handler in this frame.
2939        */
2940       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2941       TICK_ALLOC_UPD_PAP(3,0);
2942       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
2943               
2944       ap->n_args = 2;
2945       ap->fun = cf->handler;    /* :: Exception -> IO a */
2946       ap->payload[0] = exception;
2947       ap->payload[1] = ARG_TAG(0); /* realworld token */
2948
2949       /* throw away the stack from Sp up to and including the
2950        * CATCH_FRAME.
2951        */
2952       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2953       tso->su = cf->link;
2954
2955       /* Restore the blocked/unblocked state for asynchronous exceptions
2956        * at the CATCH_FRAME.  
2957        *
2958        * If exceptions were unblocked at the catch, arrange that they
2959        * are unblocked again after executing the handler by pushing an
2960        * unblockAsyncExceptions_ret stack frame.
2961        */
2962       if (!cf->exceptions_blocked) {
2963         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
2964       }
2965       
2966       /* Ensure that async exceptions are blocked when running the handler.
2967        */
2968       if (tso->blocked_exceptions == NULL) {
2969         tso->blocked_exceptions = END_TSO_QUEUE;
2970       }
2971       
2972       /* Put the newly-built PAP on top of the stack, ready to execute
2973        * when the thread restarts.
2974        */
2975       sp[0] = (W_)ap;
2976       tso->sp = sp;
2977       tso->what_next = ThreadEnterGHC;
2978       IF_DEBUG(sanity, checkTSO(tso));
2979       return;
2980     }
2981
2982     /* First build an AP_UPD consisting of the stack chunk above the
2983      * current update frame, with the top word on the stack as the
2984      * fun field.
2985      */
2986     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2987     
2988     ASSERT(words >= 0);
2989     
2990     ap->n_args = words;
2991     ap->fun    = (StgClosure *)sp[0];
2992     sp++;
2993     for(i=0; i < (nat)words; ++i) {
2994       ap->payload[i] = (StgClosure *)*sp++;
2995     }
2996     
2997     switch (get_itbl(su)->type) {
2998       
2999     case UPDATE_FRAME:
3000       {
3001         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3002         TICK_ALLOC_UP_THK(words+1,0);
3003         
3004         IF_DEBUG(scheduler,
3005                  fprintf(stderr,  "scheduler: Updating ");
3006                  printPtr((P_)su->updatee); 
3007                  fprintf(stderr,  " with ");
3008                  printObj((StgClosure *)ap);
3009                  );
3010         
3011         /* Replace the updatee with an indirection - happily
3012          * this will also wake up any threads currently
3013          * waiting on the result.
3014          *
3015          * Warning: if we're in a loop, more than one update frame on
3016          * the stack may point to the same object.  Be careful not to
3017          * overwrite an IND_OLDGEN in this case, because we'll screw
3018          * up the mutable lists.  To be on the safe side, don't
3019          * overwrite any kind of indirection at all.  See also
3020          * threadSqueezeStack in GC.c, where we have to make a similar
3021          * check.
3022          */
3023         if (!closure_IND(su->updatee)) {
3024             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3025         }
3026         su = su->link;
3027         sp += sizeofW(StgUpdateFrame) -1;
3028         sp[0] = (W_)ap; /* push onto stack */
3029         break;
3030       }
3031
3032     case CATCH_FRAME:
3033       {
3034         StgCatchFrame *cf = (StgCatchFrame *)su;
3035         StgClosure* o;
3036         
3037         /* We want a PAP, not an AP_UPD.  Fortunately, the
3038          * layout's the same.
3039          */
3040         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3041         TICK_ALLOC_UPD_PAP(words+1,0);
3042         
3043         /* now build o = FUN(catch,ap,handler) */
3044         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3045         TICK_ALLOC_FUN(2,0);
3046         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3047         o->payload[0] = (StgClosure *)ap;
3048         o->payload[1] = cf->handler;
3049         
3050         IF_DEBUG(scheduler,
3051                  fprintf(stderr,  "scheduler: Built ");
3052                  printObj((StgClosure *)o);
3053                  );
3054         
3055         /* pop the old handler and put o on the stack */
3056         su = cf->link;
3057         sp += sizeofW(StgCatchFrame) - 1;
3058         sp[0] = (W_)o;
3059         break;
3060       }
3061       
3062     case SEQ_FRAME:
3063       {
3064         StgSeqFrame *sf = (StgSeqFrame *)su;
3065         StgClosure* o;
3066         
3067         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3068         TICK_ALLOC_UPD_PAP(words+1,0);
3069         
3070         /* now build o = FUN(seq,ap) */
3071         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3072         TICK_ALLOC_SE_THK(1,0);
3073         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3074         o->payload[0] = (StgClosure *)ap;
3075         
3076         IF_DEBUG(scheduler,
3077                  fprintf(stderr,  "scheduler: Built ");
3078                  printObj((StgClosure *)o);
3079                  );
3080         
3081         /* pop the old handler and put o on the stack */
3082         su = sf->link;
3083         sp += sizeofW(StgSeqFrame) - 1;
3084         sp[0] = (W_)o;
3085         break;
3086       }
3087       
3088     case STOP_FRAME:
3089       /* We've stripped the entire stack, the thread is now dead. */
3090       sp += sizeofW(StgStopFrame) - 1;
3091       sp[0] = (W_)exception;    /* save the exception */
3092       tso->what_next = ThreadKilled;
3093       tso->su = (StgUpdateFrame *)(sp+1);
3094       tso->sp = sp;
3095       return;
3096
3097     default:
3098       barf("raiseAsync");
3099     }
3100   }
3101   barf("raiseAsync");
3102 }
3103
3104 /* -----------------------------------------------------------------------------
3105    resurrectThreads is called after garbage collection on the list of
3106    threads found to be garbage.  Each of these threads will be woken
3107    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3108    on an MVar, or NonTermination if the thread was blocked on a Black
3109    Hole.
3110    -------------------------------------------------------------------------- */
3111
3112 void
3113 resurrectThreads( StgTSO *threads )
3114 {
3115   StgTSO *tso, *next;
3116
3117   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3118     next = tso->global_link;
3119     tso->global_link = all_threads;
3120     all_threads = tso;
3121     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3122
3123     switch (tso->why_blocked) {
3124     case BlockedOnMVar:
3125     case BlockedOnException:
3126       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3127       break;
3128     case BlockedOnBlackHole:
3129       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3130       break;
3131     case NotBlocked:
3132       /* This might happen if the thread was blocked on a black hole
3133        * belonging to a thread that we've just woken up (raiseAsync
3134        * can wake up threads, remember...).
3135        */
3136       continue;
3137     default:
3138       barf("resurrectThreads: thread blocked in a strange way");
3139     }
3140   }
3141 }
3142
3143 /* -----------------------------------------------------------------------------
3144  * Blackhole detection: if we reach a deadlock, test whether any
3145  * threads are blocked on themselves.  Any threads which are found to
3146  * be self-blocked get sent a NonTermination exception.
3147  *
3148  * This is only done in a deadlock situation in order to avoid
3149  * performance overhead in the normal case.
3150  * -------------------------------------------------------------------------- */
3151
3152 static void
3153 detectBlackHoles( void )
3154 {
3155     StgTSO *t = all_threads;
3156     StgUpdateFrame *frame;
3157     StgClosure *blocked_on;
3158
3159     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3160
3161         while (t->what_next == ThreadRelocated) {
3162             t = t->link;
3163             ASSERT(get_itbl(t)->type == TSO);
3164         }
3165       
3166         if (t->why_blocked != BlockedOnBlackHole) {
3167             continue;
3168         }
3169
3170         blocked_on = t->block_info.closure;
3171
3172         for (frame = t->su; ; frame = frame->link) {
3173             switch (get_itbl(frame)->type) {
3174
3175             case UPDATE_FRAME:
3176                 if (frame->updatee == blocked_on) {
3177                     /* We are blocking on one of our own computations, so
3178                      * send this thread the NonTermination exception.  
3179                      */
3180                     IF_DEBUG(scheduler, 
3181                              sched_belch("thread %d is blocked on itself", t->id));
3182                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3183                     goto done;
3184                 }
3185                 else {
3186                     continue;
3187                 }
3188
3189             case CATCH_FRAME:
3190             case SEQ_FRAME:
3191                 continue;
3192                 
3193             case STOP_FRAME:
3194                 break;
3195             }
3196             break;
3197         }
3198
3199     done: ;
3200     }   
3201 }
3202
3203 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3204 //@subsection Debugging Routines
3205
3206 /* -----------------------------------------------------------------------------
3207    Debugging: why is a thread blocked
3208    -------------------------------------------------------------------------- */
3209
3210 #ifdef DEBUG
3211
3212 void
3213 printThreadBlockage(StgTSO *tso)
3214 {
3215   switch (tso->why_blocked) {
3216   case BlockedOnRead:
3217     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3218     break;
3219   case BlockedOnWrite:
3220     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3221     break;
3222   case BlockedOnDelay:
3223     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3224     break;
3225   case BlockedOnMVar:
3226     fprintf(stderr,"is blocked on an MVar");
3227     break;
3228   case BlockedOnException:
3229     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3230             tso->block_info.tso->id);
3231     break;
3232   case BlockedOnBlackHole:
3233     fprintf(stderr,"is blocked on a black hole");
3234     break;
3235   case NotBlocked:
3236     fprintf(stderr,"is not blocked");
3237     break;
3238 #if defined(PAR)
3239   case BlockedOnGA:
3240     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3241             tso->block_info.closure, info_type(tso->block_info.closure));
3242     break;
3243   case BlockedOnGA_NoSend:
3244     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3245             tso->block_info.closure, info_type(tso->block_info.closure));
3246     break;
3247 #endif
3248   default:
3249     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3250          tso->why_blocked, tso->id, tso);
3251   }
3252 }
3253
3254 void
3255 printThreadStatus(StgTSO *tso)
3256 {
3257   switch (tso->what_next) {
3258   case ThreadKilled:
3259     fprintf(stderr,"has been killed");
3260     break;
3261   case ThreadComplete:
3262     fprintf(stderr,"has completed");
3263     break;
3264   default:
3265     printThreadBlockage(tso);
3266   }
3267 }
3268
3269 void
3270 printAllThreads(void)
3271 {
3272   StgTSO *t;
3273
3274 # if defined(GRAN)
3275   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3276   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3277                        time_string, rtsFalse/*no commas!*/);
3278
3279   sched_belch("all threads at [%s]:", time_string);
3280 # elif defined(PAR)
3281   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3282   ullong_format_string(CURRENT_TIME,
3283                        time_string, rtsFalse/*no commas!*/);
3284
3285   sched_belch("all threads at [%s]:", time_string);
3286 # else
3287   sched_belch("all threads:");
3288 # endif
3289
3290   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3291     fprintf(stderr, "\tthread %d ", t->id);
3292     printThreadStatus(t);
3293     fprintf(stderr,"\n");
3294   }
3295 }
3296     
3297 /* 
3298    Print a whole blocking queue attached to node (debugging only).
3299 */
3300 //@cindex print_bq
3301 # if defined(PAR)
3302 void 
3303 print_bq (StgClosure *node)
3304 {
3305   StgBlockingQueueElement *bqe;
3306   StgTSO *tso;
3307   rtsBool end;
3308
3309   fprintf(stderr,"## BQ of closure %p (%s): ",
3310           node, info_type(node));
3311
3312   /* should cover all closures that may have a blocking queue */
3313   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3314          get_itbl(node)->type == FETCH_ME_BQ ||
3315          get_itbl(node)->type == RBH ||
3316          get_itbl(node)->type == MVAR);
3317     
3318   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3319
3320   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3321 }
3322
3323 /* 
3324    Print a whole blocking queue starting with the element bqe.
3325 */
3326 void 
3327 print_bqe (StgBlockingQueueElement *bqe)
3328 {
3329   rtsBool end;
3330
3331   /* 
3332      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3333   */
3334   for (end = (bqe==END_BQ_QUEUE);
3335        !end; // iterate until bqe points to a CONSTR
3336        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3337        bqe = end ? END_BQ_QUEUE : bqe->link) {
3338     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3339     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3340     /* types of closures that may appear in a blocking queue */
3341     ASSERT(get_itbl(bqe)->type == TSO ||           
3342            get_itbl(bqe)->type == BLOCKED_FETCH || 
3343            get_itbl(bqe)->type == CONSTR); 
3344     /* only BQs of an RBH end with an RBH_Save closure */
3345     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3346
3347     switch (get_itbl(bqe)->type) {
3348     case TSO:
3349       fprintf(stderr," TSO %u (%x),",
3350               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3351       break;
3352     case BLOCKED_FETCH:
3353       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3354               ((StgBlockedFetch *)bqe)->node, 
3355               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3356               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3357               ((StgBlockedFetch *)bqe)->ga.weight);
3358       break;
3359     case CONSTR:
3360       fprintf(stderr," %s (IP %p),",
3361               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3362                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3363                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3364                "RBH_Save_?"), get_itbl(bqe));
3365       break;
3366     default:
3367       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3368            info_type((StgClosure *)bqe)); // , node, info_type(node));
3369       break;
3370     }
3371   } /* for */
3372   fputc('\n', stderr);
3373 }
3374 # elif defined(GRAN)
3375 void 
3376 print_bq (StgClosure *node)
3377 {
3378   StgBlockingQueueElement *bqe;
3379   PEs node_loc, tso_loc;
3380   rtsBool end;
3381
3382   /* should cover all closures that may have a blocking queue */
3383   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3384          get_itbl(node)->type == FETCH_ME_BQ ||
3385          get_itbl(node)->type == RBH);
3386     
3387   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3388   node_loc = where_is(node);
3389
3390   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3391           node, info_type(node), node_loc);
3392
3393   /* 
3394      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3395   */
3396   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3397        !end; // iterate until bqe points to a CONSTR
3398        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3399     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3400     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3401     /* types of closures that may appear in a blocking queue */
3402     ASSERT(get_itbl(bqe)->type == TSO ||           
3403            get_itbl(bqe)->type == CONSTR); 
3404     /* only BQs of an RBH end with an RBH_Save closure */
3405     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3406
3407     tso_loc = where_is((StgClosure *)bqe);
3408     switch (get_itbl(bqe)->type) {
3409     case TSO:
3410       fprintf(stderr," TSO %d (%p) on [PE %d],",
3411               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3412       break;
3413     case CONSTR:
3414       fprintf(stderr," %s (IP %p),",
3415               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3416                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3417                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3418                "RBH_Save_?"), get_itbl(bqe));
3419       break;
3420     default:
3421       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3422            info_type((StgClosure *)bqe), node, info_type(node));
3423       break;
3424     }
3425   } /* for */
3426   fputc('\n', stderr);
3427 }
3428 #else
3429 /* 
3430    Nice and easy: only TSOs on the blocking queue
3431 */
3432 void 
3433 print_bq (StgClosure *node)
3434 {
3435   StgTSO *tso;
3436
3437   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3438   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3439        tso != END_TSO_QUEUE; 
3440        tso=tso->link) {
3441     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3442     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3443     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3444   }
3445   fputc('\n', stderr);
3446 }
3447 # endif
3448
3449 #if defined(PAR)
3450 static nat
3451 run_queue_len(void)
3452 {
3453   nat i;
3454   StgTSO *tso;
3455
3456   for (i=0, tso=run_queue_hd; 
3457        tso != END_TSO_QUEUE;
3458        i++, tso=tso->link)
3459     /* nothing */
3460
3461   return i;
3462 }
3463 #endif
3464
3465 static void
3466 sched_belch(char *s, ...)
3467 {
3468   va_list ap;
3469   va_start(ap,s);
3470 #ifdef SMP
3471   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3472 #elif defined(PAR)
3473   fprintf(stderr, "== ");
3474 #else
3475   fprintf(stderr, "scheduler: ");
3476 #endif
3477   vfprintf(stderr, s, ap);
3478   fprintf(stderr, "\n");
3479 }
3480
3481 #endif /* DEBUG */
3482
3483
3484 //@node Index,  , Debugging Routines, Main scheduling code
3485 //@subsection Index
3486
3487 //@index
3488 //* MainRegTable::  @cindex\s-+MainRegTable
3489 //* StgMainThread::  @cindex\s-+StgMainThread
3490 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3491 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3492 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3493 //* context_switch::  @cindex\s-+context_switch
3494 //* createThread::  @cindex\s-+createThread
3495 //* free_capabilities::  @cindex\s-+free_capabilities
3496 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3497 //* initScheduler::  @cindex\s-+initScheduler
3498 //* interrupted::  @cindex\s-+interrupted
3499 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3500 //* next_thread_id::  @cindex\s-+next_thread_id
3501 //* print_bq::  @cindex\s-+print_bq
3502 //* run_queue_hd::  @cindex\s-+run_queue_hd
3503 //* run_queue_tl::  @cindex\s-+run_queue_tl
3504 //* sched_mutex::  @cindex\s-+sched_mutex
3505 //* schedule::  @cindex\s-+schedule
3506 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3507 //* task_ids::  @cindex\s-+task_ids
3508 //* term_mutex::  @cindex\s-+term_mutex
3509 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3510 //@end index