944b22349a79da9a6df24c151ed1fe7fdedc8d8d
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.82 2000/11/13 14:42:16 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Main scheduling loop::      
41 //* Suspend and Resume::        
42 //* Run queue code::            
43 //* Garbage Collextion Routines::  
44 //* Blocking Queue Routines::   
45 //* Exception Handling Routines::  
46 //* Debugging Routines::        
47 //* Index::                     
48 //@end menu
49
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
52
53 #include "Rts.h"
54 #include "SchedAPI.h"
55 #include "RtsUtils.h"
56 #include "RtsFlags.h"
57 #include "Storage.h"
58 #include "StgRun.h"
59 #include "StgStartup.h"
60 #include "GC.h"
61 #include "Hooks.h"
62 #include "Schedule.h"
63 #include "StgMiscClosures.h"
64 #include "Storage.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
67 #include "Printer.h"
68 #include "Main.h"
69 #include "Signals.h"
70 #include "Sanity.h"
71 #include "Stats.h"
72 #include "Itimer.h"
73 #include "Prelude.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
76 # include "GranSim.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
80 # include "FetchMe.h"
81 # include "HLC.h"
82 #endif
83 #include "Sparks.h"
84
85 #include <stdarg.h>
86
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
89
90 /* Main threads:
91  *
92  * These are the threads which clients have requested that we run.  
93  *
94  * In an SMP build, we might have several concurrent clients all
95  * waiting for results, and each one will wait on a condition variable
96  * until the result is available.
97  *
98  * In non-SMP, clients are strictly nested: the first client calls
99  * into the RTS, which might call out again to C with a _ccall_GC, and
100  * eventually re-enter the RTS.
101  *
102  * Main threads information is kept in a linked list:
103  */
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
106   StgTSO *         tso;
107   SchedulerStatus  stat;
108   StgClosure **    ret;
109 #ifdef SMP
110   pthread_cond_t wakeup;
111 #endif
112   struct StgMainThread_ *link;
113 } StgMainThread;
114
115 /* Main thread queue.
116  * Locks required: sched_mutex.
117  */
118 static StgMainThread *main_threads;
119
120 /* Thread queues.
121  * Locks required: sched_mutex.
122  */
123 #if defined(GRAN)
124
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
127
128 /* 
129    In GranSim we have a runable and a blocked queue for each processor.
130    In order to minimise code changes new arrays run_queue_hds/tls
131    are created. run_queue_hd is then a short cut (macro) for
132    run_queue_hds[CurrentProc] (see GranSim.h).
133    -- HWL
134 */
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139    the std RTS (i.e. we are cheating). However, we don't use this list in
140    the GranSim specific code at the moment (so we are only potentially
141    cheating).  */
142
143 #else /* !GRAN */
144
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
147 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
148
149 #endif
150
151 /* Linked list of all threads.
152  * Used for detecting garbage collected threads.
153  */
154 StgTSO *all_threads;
155
156 /* Threads suspended in _ccall_GC.
157  */
158 static StgTSO *suspended_ccalling_threads;
159
160 static void GetRoots(void);
161 static StgTSO *threadStackOverflow(StgTSO *tso);
162
163 /* KH: The following two flags are shared memory locations.  There is no need
164        to lock them, since they are only unset at the end of a scheduler
165        operation.
166 */
167
168 /* flag set by signal handler to precipitate a context switch */
169 //@cindex context_switch
170 nat context_switch;
171
172 /* if this flag is set as well, give up execution */
173 //@cindex interrupted
174 rtsBool interrupted;
175
176 /* Next thread ID to allocate.
177  * Locks required: sched_mutex
178  */
179 //@cindex next_thread_id
180 StgThreadID next_thread_id = 1;
181
182 /*
183  * Pointers to the state of the current thread.
184  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
185  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
186  */
187  
188 /* The smallest stack size that makes any sense is:
189  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
190  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
191  *  + 1                       (the realworld token for an IO thread)
192  *  + 1                       (the closure to enter)
193  *
194  * A thread with this stack will bomb immediately with a stack
195  * overflow, which will increase its stack size.  
196  */
197
198 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
199
200 /* Free capability list.
201  * Locks required: sched_mutex.
202  */
203 #ifdef SMP
204 //@cindex free_capabilities
205 //@cindex n_free_capabilities
206 Capability *free_capabilities; /* Available capabilities for running threads */
207 nat n_free_capabilities;       /* total number of available capabilities */
208 #else
209 //@cindex MainRegTable
210 Capability MainRegTable;       /* for non-SMP, we have one global capability */
211 #endif
212
213 #if defined(GRAN)
214 StgTSO *CurrentTSO;
215 #endif
216
217 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
218  *  exists - earlier gccs apparently didn't.
219  *  -= chak
220  */
221 StgTSO dummy_tso;
222
223 rtsBool ready_to_gc;
224
225 /* All our current task ids, saved in case we need to kill them later.
226  */
227 #ifdef SMP
228 //@cindex task_ids
229 task_info *task_ids;
230 #endif
231
232 void            addToBlockedQueue ( StgTSO *tso );
233
234 static void     schedule          ( void );
235        void     interruptStgRts   ( void );
236 #if defined(GRAN)
237 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
238 #else
239 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
240 #endif
241
242 static void     detectBlackHoles  ( void );
243
244 #ifdef DEBUG
245 static void sched_belch(char *s, ...);
246 #endif
247
248 #ifdef SMP
249 //@cindex sched_mutex
250 //@cindex term_mutex
251 //@cindex thread_ready_cond
252 //@cindex gc_pending_cond
253 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
254 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
255 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
256 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
257
258 nat await_death;
259 #endif
260
261 #if defined(PAR)
262 StgTSO *LastTSO;
263 rtsTime TimeOfLastYield;
264 #endif
265
266 #if DEBUG
267 char *whatNext_strs[] = {
268   "ThreadEnterGHC",
269   "ThreadRunGHC",
270   "ThreadEnterHugs",
271   "ThreadKilled",
272   "ThreadComplete"
273 };
274
275 char *threadReturnCode_strs[] = {
276   "HeapOverflow",                       /* might also be StackOverflow */
277   "StackOverflow",
278   "ThreadYielding",
279   "ThreadBlocked",
280   "ThreadFinished"
281 };
282 #endif
283
284 /*
285  * The thread state for the main thread.
286 // ToDo: check whether not needed any more
287 StgTSO   *MainTSO;
288  */
289
290 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
291 //@subsection Main scheduling loop
292
293 /* ---------------------------------------------------------------------------
294    Main scheduling loop.
295
296    We use round-robin scheduling, each thread returning to the
297    scheduler loop when one of these conditions is detected:
298
299       * out of heap space
300       * timer expires (thread yields)
301       * thread blocks
302       * thread ends
303       * stack overflow
304
305    Locking notes:  we acquire the scheduler lock once at the beginning
306    of the scheduler loop, and release it when
307     
308       * running a thread, or
309       * waiting for work, or
310       * waiting for a GC to complete.
311
312    GRAN version:
313      In a GranSim setup this loop iterates over the global event queue.
314      This revolves around the global event queue, which determines what 
315      to do next. Therefore, it's more complicated than either the 
316      concurrent or the parallel (GUM) setup.
317
318    GUM version:
319      GUM iterates over incoming messages.
320      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
321      and sends out a fish whenever it has nothing to do; in-between
322      doing the actual reductions (shared code below) it processes the
323      incoming messages and deals with delayed operations 
324      (see PendingFetches).
325      This is not the ugliest code you could imagine, but it's bloody close.
326
327    ------------------------------------------------------------------------ */
328 //@cindex schedule
329 static void
330 schedule( void )
331 {
332   StgTSO *t;
333   Capability *cap;
334   StgThreadReturnCode ret;
335 #if defined(GRAN)
336   rtsEvent *event;
337 #elif defined(PAR)
338   StgSparkPool *pool;
339   rtsSpark spark;
340   StgTSO *tso;
341   GlobalTaskId pe;
342 #endif
343   rtsBool was_interrupted = rtsFalse;
344   
345   ACQUIRE_LOCK(&sched_mutex);
346
347 #if defined(GRAN)
348
349   /* set up first event to get things going */
350   /* ToDo: assign costs for system setup and init MainTSO ! */
351   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
352             ContinueThread, 
353             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
354
355   IF_DEBUG(gran,
356            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
357            G_TSO(CurrentTSO, 5));
358
359   if (RtsFlags.GranFlags.Light) {
360     /* Save current time; GranSim Light only */
361     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
362   }      
363
364   event = get_next_event();
365
366   while (event!=(rtsEvent*)NULL) {
367     /* Choose the processor with the next event */
368     CurrentProc = event->proc;
369     CurrentTSO = event->tso;
370
371 #elif defined(PAR)
372
373   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
374
375 #else
376
377   while (1) {
378
379 #endif
380
381     IF_DEBUG(scheduler, printAllThreads());
382
383     /* If we're interrupted (the user pressed ^C, or some other
384      * termination condition occurred), kill all the currently running
385      * threads.
386      */
387     if (interrupted) {
388       IF_DEBUG(scheduler, sched_belch("interrupted"));
389       deleteAllThreads();
390       interrupted = rtsFalse;
391       was_interrupted = rtsTrue;
392     }
393
394     /* Go through the list of main threads and wake up any
395      * clients whose computations have finished.  ToDo: this
396      * should be done more efficiently without a linear scan
397      * of the main threads list, somehow...
398      */
399 #ifdef SMP
400     { 
401       StgMainThread *m, **prev;
402       prev = &main_threads;
403       for (m = main_threads; m != NULL; m = m->link) {
404         switch (m->tso->what_next) {
405         case ThreadComplete:
406           if (m->ret) {
407             *(m->ret) = (StgClosure *)m->tso->sp[0];
408           }
409           *prev = m->link;
410           m->stat = Success;
411           pthread_cond_broadcast(&m->wakeup);
412           break;
413         case ThreadKilled:
414           *prev = m->link;
415           if (was_interrupted) {
416             m->stat = Interrupted;
417           } else {
418             m->stat = Killed;
419           }
420           pthread_cond_broadcast(&m->wakeup);
421           break;
422         default:
423           break;
424         }
425       }
426     }
427
428 #else
429 # if defined(PAR)
430     /* in GUM do this only on the Main PE */
431     if (IAmMainThread)
432 # endif
433     /* If our main thread has finished or been killed, return.
434      */
435     {
436       StgMainThread *m = main_threads;
437       if (m->tso->what_next == ThreadComplete
438           || m->tso->what_next == ThreadKilled) {
439         main_threads = main_threads->link;
440         if (m->tso->what_next == ThreadComplete) {
441           /* we finished successfully, fill in the return value */
442           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
443           m->stat = Success;
444           return;
445         } else {
446           if (was_interrupted) {
447             m->stat = Interrupted;
448           } else {
449             m->stat = Killed;
450           }
451           return;
452         }
453       }
454     }
455 #endif
456
457     /* Top up the run queue from our spark pool.  We try to make the
458      * number of threads in the run queue equal to the number of
459      * free capabilities.
460      */
461 #if defined(SMP)
462     {
463       nat n = n_free_capabilities;
464       StgTSO *tso = run_queue_hd;
465
466       /* Count the run queue */
467       while (n > 0 && tso != END_TSO_QUEUE) {
468         tso = tso->link;
469         n--;
470       }
471
472       for (; n > 0; n--) {
473         StgClosure *spark;
474         spark = findSpark();
475         if (spark == NULL) {
476           break; /* no more sparks in the pool */
477         } else {
478           /* I'd prefer this to be done in activateSpark -- HWL */
479           /* tricky - it needs to hold the scheduler lock and
480            * not try to re-acquire it -- SDM */
481           StgTSO *tso;
482           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
483           pushClosure(tso,spark);
484           PUSH_ON_RUN_QUEUE(tso);
485 #ifdef PAR
486           advisory_thread_count++;
487 #endif
488           
489           IF_DEBUG(scheduler,
490                    sched_belch("turning spark of closure %p into a thread",
491                                (StgClosure *)spark));
492         }
493       }
494       /* We need to wake up the other tasks if we just created some
495        * work for them.
496        */
497       if (n_free_capabilities - n > 1) {
498           pthread_cond_signal(&thread_ready_cond);
499       }
500     }
501 #endif /* SMP */
502
503     /* Check whether any waiting threads need to be woken up.  If the
504      * run queue is empty, and there are no other tasks running, we
505      * can wait indefinitely for something to happen.
506      * ToDo: what if another client comes along & requests another
507      * main thread?
508      */
509     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
510       awaitEvent(
511            (run_queue_hd == END_TSO_QUEUE)
512 #ifdef SMP
513         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
514 #endif
515         );
516     }
517     /* we can be interrupted while waiting for I/O... */
518     if (interrupted) continue;
519
520     /* check for signals each time around the scheduler */
521 #ifndef mingw32_TARGET_OS
522     if (signals_pending()) {
523       start_signal_handlers();
524     }
525 #endif
526
527     /* 
528      * Detect deadlock: when we have no threads to run, there are no
529      * threads waiting on I/O or sleeping, and all the other tasks are
530      * waiting for work, we must have a deadlock of some description.
531      *
532      * We first try to find threads blocked on themselves (ie. black
533      * holes), and generate NonTermination exceptions where necessary.
534      *
535      * If no threads are black holed, we have a deadlock situation, so
536      * inform all the main threads.
537      */
538 #ifdef SMP
539     if (blocked_queue_hd == END_TSO_QUEUE
540         && run_queue_hd == END_TSO_QUEUE
541         && sleeping_queue == END_TSO_QUEUE
542         && (n_free_capabilities == RtsFlags.ParFlags.nNodes))
543     {
544         IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes..."));
545         detectBlackHoles();
546         if (run_queue_hd == END_TSO_QUEUE) {
547             StgMainThread *m;
548             for (m = main_threads; m != NULL; m = m->link) {
549                 m->ret = NULL;
550                 m->stat = Deadlock;
551                 pthread_cond_broadcast(&m->wakeup);
552             }
553             main_threads = NULL;
554         }
555     }
556 #else /* ! SMP */
557     if (blocked_queue_hd == END_TSO_QUEUE
558         && run_queue_hd == END_TSO_QUEUE
559         && sleeping_queue == END_TSO_QUEUE)
560     {
561         IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes..."));
562         detectBlackHoles();
563         if (run_queue_hd == END_TSO_QUEUE) {
564             StgMainThread *m = main_threads;
565             m->ret = NULL;
566             m->stat = Deadlock;
567             main_threads = m->link;
568             return;
569         }
570     }
571 #endif
572
573 #ifdef SMP
574     /* If there's a GC pending, don't do anything until it has
575      * completed.
576      */
577     if (ready_to_gc) {
578       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
579       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
580     }
581     
582     /* block until we've got a thread on the run queue and a free
583      * capability.
584      */
585     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
586       IF_DEBUG(scheduler, sched_belch("waiting for work"));
587       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
588       IF_DEBUG(scheduler, sched_belch("work now available"));
589     }
590 #endif
591
592 #if defined(GRAN)
593
594     if (RtsFlags.GranFlags.Light)
595       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
596
597     /* adjust time based on time-stamp */
598     if (event->time > CurrentTime[CurrentProc] &&
599         event->evttype != ContinueThread)
600       CurrentTime[CurrentProc] = event->time;
601     
602     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
603     if (!RtsFlags.GranFlags.Light)
604       handleIdlePEs();
605
606     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
607
608     /* main event dispatcher in GranSim */
609     switch (event->evttype) {
610       /* Should just be continuing execution */
611     case ContinueThread:
612       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
613       /* ToDo: check assertion
614       ASSERT(run_queue_hd != (StgTSO*)NULL &&
615              run_queue_hd != END_TSO_QUEUE);
616       */
617       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
618       if (!RtsFlags.GranFlags.DoAsyncFetch &&
619           procStatus[CurrentProc]==Fetching) {
620         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
621               CurrentTSO->id, CurrentTSO, CurrentProc);
622         goto next_thread;
623       } 
624       /* Ignore ContinueThreads for completed threads */
625       if (CurrentTSO->what_next == ThreadComplete) {
626         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
627               CurrentTSO->id, CurrentTSO, CurrentProc);
628         goto next_thread;
629       } 
630       /* Ignore ContinueThreads for threads that are being migrated */
631       if (PROCS(CurrentTSO)==Nowhere) { 
632         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
633               CurrentTSO->id, CurrentTSO, CurrentProc);
634         goto next_thread;
635       }
636       /* The thread should be at the beginning of the run queue */
637       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
638         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
639               CurrentTSO->id, CurrentTSO, CurrentProc);
640         break; // run the thread anyway
641       }
642       /*
643       new_event(proc, proc, CurrentTime[proc],
644                 FindWork,
645                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
646       goto next_thread; 
647       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
648       break; // now actually run the thread; DaH Qu'vam yImuHbej 
649
650     case FetchNode:
651       do_the_fetchnode(event);
652       goto next_thread;             /* handle next event in event queue  */
653       
654     case GlobalBlock:
655       do_the_globalblock(event);
656       goto next_thread;             /* handle next event in event queue  */
657       
658     case FetchReply:
659       do_the_fetchreply(event);
660       goto next_thread;             /* handle next event in event queue  */
661       
662     case UnblockThread:   /* Move from the blocked queue to the tail of */
663       do_the_unblock(event);
664       goto next_thread;             /* handle next event in event queue  */
665       
666     case ResumeThread:  /* Move from the blocked queue to the tail of */
667       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
668       event->tso->gran.blocktime += 
669         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
670       do_the_startthread(event);
671       goto next_thread;             /* handle next event in event queue  */
672       
673     case StartThread:
674       do_the_startthread(event);
675       goto next_thread;             /* handle next event in event queue  */
676       
677     case MoveThread:
678       do_the_movethread(event);
679       goto next_thread;             /* handle next event in event queue  */
680       
681     case MoveSpark:
682       do_the_movespark(event);
683       goto next_thread;             /* handle next event in event queue  */
684       
685     case FindWork:
686       do_the_findwork(event);
687       goto next_thread;             /* handle next event in event queue  */
688       
689     default:
690       barf("Illegal event type %u\n", event->evttype);
691     }  /* switch */
692     
693     /* This point was scheduler_loop in the old RTS */
694
695     IF_DEBUG(gran, belch("GRAN: after main switch"));
696
697     TimeOfLastEvent = CurrentTime[CurrentProc];
698     TimeOfNextEvent = get_time_of_next_event();
699     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
700     // CurrentTSO = ThreadQueueHd;
701
702     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
703                          TimeOfNextEvent));
704
705     if (RtsFlags.GranFlags.Light) 
706       GranSimLight_leave_system(event, &ActiveTSO); 
707
708     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
709
710     IF_DEBUG(gran, 
711              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
712
713     /* in a GranSim setup the TSO stays on the run queue */
714     t = CurrentTSO;
715     /* Take a thread from the run queue. */
716     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
717
718     IF_DEBUG(gran, 
719              fprintf(stderr, "GRAN: About to run current thread, which is\n");
720              G_TSO(t,5))
721
722     context_switch = 0; // turned on via GranYield, checking events and time slice
723
724     IF_DEBUG(gran, 
725              DumpGranEvent(GR_SCHEDULE, t));
726
727     procStatus[CurrentProc] = Busy;
728
729 #elif defined(PAR)
730
731     if (PendingFetches != END_BF_QUEUE) {
732         processFetches();
733     }
734
735     /* ToDo: phps merge with spark activation above */
736     /* check whether we have local work and send requests if we have none */
737     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
738       /* :-[  no local threads => look out for local sparks */
739       /* the spark pool for the current PE */
740       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
741       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
742           pool->hd < pool->tl) {
743         /* 
744          * ToDo: add GC code check that we really have enough heap afterwards!!
745          * Old comment:
746          * If we're here (no runnable threads) and we have pending
747          * sparks, we must have a space problem.  Get enough space
748          * to turn one of those pending sparks into a
749          * thread... 
750          */
751         
752         spark = findSpark();                /* get a spark */
753         if (spark != (rtsSpark) NULL) {
754           tso = activateSpark(spark);       /* turn the spark into a thread */
755           IF_PAR_DEBUG(schedule,
756                        belch("==== schedule: Created TSO %d (%p); %d threads active",
757                              tso->id, tso, advisory_thread_count));
758
759           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
760             belch("==^^ failed to activate spark");
761             goto next_thread;
762           }               /* otherwise fall through & pick-up new tso */
763         } else {
764           IF_PAR_DEBUG(verbose,
765                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
766                              spark_queue_len(pool)));
767           goto next_thread;
768         }
769       } else  
770       /* =8-[  no local sparks => look for work on other PEs */
771       {
772         /*
773          * We really have absolutely no work.  Send out a fish
774          * (there may be some out there already), and wait for
775          * something to arrive.  We clearly can't run any threads
776          * until a SCHEDULE or RESUME arrives, and so that's what
777          * we're hoping to see.  (Of course, we still have to
778          * respond to other types of messages.)
779          */
780         if (//!fishing &&  
781             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
782           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
783           /* fishing set in sendFish, processFish;
784              avoid flooding system with fishes via delay */
785           pe = choosePE();
786           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
787                    NEW_FISH_HUNGER);
788         }
789         
790         processMessages();
791         goto next_thread;
792         // ReSchedule(0);
793       }
794     } else if (PacketsWaiting()) {  /* Look for incoming messages */
795       processMessages();
796     }
797
798     /* Now we are sure that we have some work available */
799     ASSERT(run_queue_hd != END_TSO_QUEUE);
800     /* Take a thread from the run queue, if we have work */
801     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
802
803     /* ToDo: write something to the log-file
804     if (RTSflags.ParFlags.granSimStats && !sameThread)
805         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
806
807     CurrentTSO = t;
808     */
809     /* the spark pool for the current PE */
810     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
811
812     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)", 
813                               spark_queue_len(pool), 
814                               CURRENT_PROC,
815                               pool->hd, pool->tl, pool->base, pool->lim));
816
817     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
818                               run_queue_len(), CURRENT_PROC,
819                               run_queue_hd, run_queue_tl));
820
821 #if 0
822     if (t != LastTSO) {
823       /* 
824          we are running a different TSO, so write a schedule event to log file
825          NB: If we use fair scheduling we also have to write  a deschedule 
826              event for LastTSO; with unfair scheduling we know that the
827              previous tso has blocked whenever we switch to another tso, so
828              we don't need it in GUM for now
829       */
830       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
831                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
832       
833     }
834 #endif
835 #else /* !GRAN && !PAR */
836   
837     /* grab a thread from the run queue
838      */
839     ASSERT(run_queue_hd != END_TSO_QUEUE);
840     t = POP_RUN_QUEUE();
841     IF_DEBUG(sanity,checkTSO(t));
842
843 #endif
844     
845     /* grab a capability
846      */
847 #ifdef SMP
848     cap = free_capabilities;
849     free_capabilities = cap->link;
850     n_free_capabilities--;
851 #else
852     cap = &MainRegTable;
853 #endif
854     
855     cap->rCurrentTSO = t;
856     
857     /* context switches are now initiated by the timer signal, unless
858      * the user specified "context switch as often as possible", with
859      * +RTS -C0
860      */
861     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
862         && (run_queue_hd != END_TSO_QUEUE
863             || blocked_queue_hd != END_TSO_QUEUE
864             || sleeping_queue != END_TSO_QUEUE))
865         context_switch = 1;
866     else
867         context_switch = 0;
868
869     RELEASE_LOCK(&sched_mutex);
870
871     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
872                               t->id, t, whatNext_strs[t->what_next]));
873
874     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
875     /* Run the current thread 
876      */
877     switch (cap->rCurrentTSO->what_next) {
878     case ThreadKilled:
879     case ThreadComplete:
880       /* Thread already finished, return to scheduler. */
881       ret = ThreadFinished;
882       break;
883     case ThreadEnterGHC:
884       ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
885       break;
886     case ThreadRunGHC:
887       ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
888       break;
889     case ThreadEnterHugs:
890 #ifdef INTERPRETER
891       {
892          StgClosure* c;
893          IF_DEBUG(scheduler,sched_belch("entering Hugs"));
894          c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
895          cap->rCurrentTSO->sp += 1;
896          ret = enter(cap,c);
897          break;
898       }
899 #else
900       barf("Panic: entered a BCO but no bytecode interpreter in this build");
901 #endif
902     default:
903       barf("schedule: invalid what_next field");
904     }
905     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
906     
907     /* Costs for the scheduler are assigned to CCS_SYSTEM */
908 #ifdef PROFILING
909     CCCS = CCS_SYSTEM;
910 #endif
911     
912     ACQUIRE_LOCK(&sched_mutex);
913
914 #ifdef SMP
915     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
916 #elif !defined(GRAN) && !defined(PAR)
917     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
918 #endif
919     t = cap->rCurrentTSO;
920     
921 #if defined(PAR)
922     /* HACK 675: if the last thread didn't yield, make sure to print a 
923        SCHEDULE event to the log file when StgRunning the next thread, even
924        if it is the same one as before */
925     LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t; 
926     TimeOfLastYield = CURRENT_TIME;
927 #endif
928
929     switch (ret) {
930     case HeapOverflow:
931       /* make all the running tasks block on a condition variable,
932        * maybe set context_switch and wait till they all pile in,
933        * then have them wait on a GC condition variable.
934        */
935       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
936                                t->id, t, whatNext_strs[t->what_next]));
937       threadPaused(t);
938 #if defined(GRAN)
939       ASSERT(!is_on_queue(t,CurrentProc));
940 #endif
941       
942       ready_to_gc = rtsTrue;
943       context_switch = 1;               /* stop other threads ASAP */
944       PUSH_ON_RUN_QUEUE(t);
945       /* actual GC is done at the end of the while loop */
946       break;
947       
948     case StackOverflow:
949       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
950                                t->id, t, whatNext_strs[t->what_next]));
951       /* just adjust the stack for this thread, then pop it back
952        * on the run queue.
953        */
954       threadPaused(t);
955       { 
956         StgMainThread *m;
957         /* enlarge the stack */
958         StgTSO *new_t = threadStackOverflow(t);
959         
960         /* This TSO has moved, so update any pointers to it from the
961          * main thread stack.  It better not be on any other queues...
962          * (it shouldn't be).
963          */
964         for (m = main_threads; m != NULL; m = m->link) {
965           if (m->tso == t) {
966             m->tso = new_t;
967           }
968         }
969         threadPaused(new_t);
970         PUSH_ON_RUN_QUEUE(new_t);
971       }
972       break;
973
974     case ThreadYielding:
975 #if defined(GRAN)
976       IF_DEBUG(gran, 
977                DumpGranEvent(GR_DESCHEDULE, t));
978       globalGranStats.tot_yields++;
979 #elif defined(PAR)
980       IF_DEBUG(par, 
981                DumpGranEvent(GR_DESCHEDULE, t));
982 #endif
983       /* put the thread back on the run queue.  Then, if we're ready to
984        * GC, check whether this is the last task to stop.  If so, wake
985        * up the GC thread.  getThread will block during a GC until the
986        * GC is finished.
987        */
988       IF_DEBUG(scheduler,
989                if (t->what_next == ThreadEnterHugs) {
990                    /* ToDo: or maybe a timer expired when we were in Hugs?
991                     * or maybe someone hit ctrl-C
992                     */
993                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
994                          t->id, t, whatNext_strs[t->what_next]);
995                } else {
996                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
997                          t->id, t, whatNext_strs[t->what_next]);
998                }
999                );
1000
1001       threadPaused(t);
1002
1003       IF_DEBUG(sanity,
1004                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1005                checkTSO(t));
1006       ASSERT(t->link == END_TSO_QUEUE);
1007 #if defined(GRAN)
1008       ASSERT(!is_on_queue(t,CurrentProc));
1009
1010       IF_DEBUG(sanity,
1011                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1012                checkThreadQsSanity(rtsTrue));
1013 #endif
1014       APPEND_TO_RUN_QUEUE(t);
1015 #if defined(GRAN)
1016       /* add a ContinueThread event to actually process the thread */
1017       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1018                 ContinueThread,
1019                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1020       IF_GRAN_DEBUG(bq, 
1021                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1022                G_EVENTQ(0);
1023                G_CURR_THREADQ(0))
1024 #endif /* GRAN */
1025       break;
1026       
1027     case ThreadBlocked:
1028 #if defined(GRAN)
1029       IF_DEBUG(scheduler,
1030                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1031                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1032                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1033
1034       // ??? needed; should emit block before
1035       IF_DEBUG(gran, 
1036                DumpGranEvent(GR_DESCHEDULE, t)); 
1037       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1038       /*
1039         ngoq Dogh!
1040       ASSERT(procStatus[CurrentProc]==Busy || 
1041               ((procStatus[CurrentProc]==Fetching) && 
1042               (t->block_info.closure!=(StgClosure*)NULL)));
1043       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1044           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1045             procStatus[CurrentProc]==Fetching)) 
1046         procStatus[CurrentProc] = Idle;
1047       */
1048 #elif defined(PAR)
1049       IF_DEBUG(par, 
1050                DumpGranEvent(GR_DESCHEDULE, t)); 
1051
1052       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1053       blockThread(t);
1054
1055       IF_DEBUG(scheduler,
1056                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1057                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1058                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1059
1060 #else /* !GRAN */
1061       /* don't need to do anything.  Either the thread is blocked on
1062        * I/O, in which case we'll have called addToBlockedQueue
1063        * previously, or it's blocked on an MVar or Blackhole, in which
1064        * case it'll be on the relevant queue already.
1065        */
1066       IF_DEBUG(scheduler,
1067                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1068                printThreadBlockage(t);
1069                fprintf(stderr, "\n"));
1070
1071       /* Only for dumping event to log file 
1072          ToDo: do I need this in GranSim, too?
1073       blockThread(t);
1074       */
1075 #endif
1076       threadPaused(t);
1077       break;
1078       
1079     case ThreadFinished:
1080       /* Need to check whether this was a main thread, and if so, signal
1081        * the task that started it with the return value.  If we have no
1082        * more main threads, we probably need to stop all the tasks until
1083        * we get a new one.
1084        */
1085       /* We also end up here if the thread kills itself with an
1086        * uncaught exception, see Exception.hc.
1087        */
1088       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1089 #if defined(GRAN)
1090       endThread(t, CurrentProc); // clean-up the thread
1091 #elif defined(PAR)
1092       advisory_thread_count--;
1093       if (RtsFlags.ParFlags.ParStats.Full) 
1094         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1095 #endif
1096       break;
1097       
1098     default:
1099       barf("schedule: invalid thread return code %d", (int)ret);
1100     }
1101     
1102 #ifdef SMP
1103     cap->link = free_capabilities;
1104     free_capabilities = cap;
1105     n_free_capabilities++;
1106 #endif
1107
1108 #ifdef SMP
1109     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1110 #else
1111     if (ready_to_gc) 
1112 #endif
1113       {
1114       /* everybody back, start the GC.
1115        * Could do it in this thread, or signal a condition var
1116        * to do it in another thread.  Either way, we need to
1117        * broadcast on gc_pending_cond afterward.
1118        */
1119 #ifdef SMP
1120       IF_DEBUG(scheduler,sched_belch("doing GC"));
1121 #endif
1122       GarbageCollect(GetRoots,rtsFalse);
1123       ready_to_gc = rtsFalse;
1124 #ifdef SMP
1125       pthread_cond_broadcast(&gc_pending_cond);
1126 #endif
1127 #if defined(GRAN)
1128       /* add a ContinueThread event to continue execution of current thread */
1129       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1130                 ContinueThread,
1131                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1132       IF_GRAN_DEBUG(bq, 
1133                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1134                G_EVENTQ(0);
1135                G_CURR_THREADQ(0))
1136 #endif /* GRAN */
1137     }
1138 #if defined(GRAN)
1139   next_thread:
1140     IF_GRAN_DEBUG(unused,
1141                   print_eventq(EventHd));
1142
1143     event = get_next_event();
1144
1145 #elif defined(PAR)
1146   next_thread:
1147     /* ToDo: wait for next message to arrive rather than busy wait */
1148
1149 #else /* GRAN */
1150   /* not any more
1151   next_thread:
1152     t = take_off_run_queue(END_TSO_QUEUE);
1153   */
1154 #endif /* GRAN */
1155   } /* end of while(1) */
1156 }
1157
1158 /* ---------------------------------------------------------------------------
1159  * deleteAllThreads():  kill all the live threads.
1160  *
1161  * This is used when we catch a user interrupt (^C), before performing
1162  * any necessary cleanups and running finalizers.
1163  * ------------------------------------------------------------------------- */
1164    
1165 void deleteAllThreads ( void )
1166 {
1167   StgTSO* t;
1168   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1169   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1170       deleteThread(t);
1171   }
1172   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1173       deleteThread(t);
1174   }
1175   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1176       deleteThread(t);
1177   }
1178   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1179   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1180   sleeping_queue = END_TSO_QUEUE;
1181 }
1182
1183 /* startThread and  insertThread are now in GranSim.c -- HWL */
1184
1185 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1186 //@subsection Suspend and Resume
1187
1188 /* ---------------------------------------------------------------------------
1189  * Suspending & resuming Haskell threads.
1190  * 
1191  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1192  * its capability before calling the C function.  This allows another
1193  * task to pick up the capability and carry on running Haskell
1194  * threads.  It also means that if the C call blocks, it won't lock
1195  * the whole system.
1196  *
1197  * The Haskell thread making the C call is put to sleep for the
1198  * duration of the call, on the susepended_ccalling_threads queue.  We
1199  * give out a token to the task, which it can use to resume the thread
1200  * on return from the C function.
1201  * ------------------------------------------------------------------------- */
1202    
1203 StgInt
1204 suspendThread( Capability *cap )
1205 {
1206   nat tok;
1207
1208   ACQUIRE_LOCK(&sched_mutex);
1209
1210   IF_DEBUG(scheduler,
1211            sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id));
1212
1213   threadPaused(cap->rCurrentTSO);
1214   cap->rCurrentTSO->link = suspended_ccalling_threads;
1215   suspended_ccalling_threads = cap->rCurrentTSO;
1216
1217   /* Use the thread ID as the token; it should be unique */
1218   tok = cap->rCurrentTSO->id;
1219
1220 #ifdef SMP
1221   cap->link = free_capabilities;
1222   free_capabilities = cap;
1223   n_free_capabilities++;
1224 #endif
1225
1226   RELEASE_LOCK(&sched_mutex);
1227   return tok; 
1228 }
1229
1230 Capability *
1231 resumeThread( StgInt tok )
1232 {
1233   StgTSO *tso, **prev;
1234   Capability *cap;
1235
1236   ACQUIRE_LOCK(&sched_mutex);
1237
1238   prev = &suspended_ccalling_threads;
1239   for (tso = suspended_ccalling_threads; 
1240        tso != END_TSO_QUEUE; 
1241        prev = &tso->link, tso = tso->link) {
1242     if (tso->id == (StgThreadID)tok) {
1243       *prev = tso->link;
1244       break;
1245     }
1246   }
1247   if (tso == END_TSO_QUEUE) {
1248     barf("resumeThread: thread not found");
1249   }
1250   tso->link = END_TSO_QUEUE;
1251
1252 #ifdef SMP
1253   while (free_capabilities == NULL) {
1254     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1255     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1256     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1257   }
1258   cap = free_capabilities;
1259   free_capabilities = cap->link;
1260   n_free_capabilities--;
1261 #else  
1262   cap = &MainRegTable;
1263 #endif
1264
1265   cap->rCurrentTSO = tso;
1266
1267   RELEASE_LOCK(&sched_mutex);
1268   return cap;
1269 }
1270
1271
1272 /* ---------------------------------------------------------------------------
1273  * Static functions
1274  * ------------------------------------------------------------------------ */
1275 static void unblockThread(StgTSO *tso);
1276
1277 /* ---------------------------------------------------------------------------
1278  * Comparing Thread ids.
1279  *
1280  * This is used from STG land in the implementation of the
1281  * instances of Eq/Ord for ThreadIds.
1282  * ------------------------------------------------------------------------ */
1283
1284 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1285
1286   StgThreadID id1 = tso1->id; 
1287   StgThreadID id2 = tso2->id;
1288  
1289   if (id1 < id2) return (-1);
1290   if (id1 > id2) return 1;
1291   return 0;
1292 }
1293
1294 /* ---------------------------------------------------------------------------
1295    Create a new thread.
1296
1297    The new thread starts with the given stack size.  Before the
1298    scheduler can run, however, this thread needs to have a closure
1299    (and possibly some arguments) pushed on its stack.  See
1300    pushClosure() in Schedule.h.
1301
1302    createGenThread() and createIOThread() (in SchedAPI.h) are
1303    convenient packaged versions of this function.
1304
1305    currently pri (priority) is only used in a GRAN setup -- HWL
1306    ------------------------------------------------------------------------ */
1307 //@cindex createThread
1308 #if defined(GRAN)
1309 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1310 StgTSO *
1311 createThread(nat stack_size, StgInt pri)
1312 {
1313   return createThread_(stack_size, rtsFalse, pri);
1314 }
1315
1316 static StgTSO *
1317 createThread_(nat size, rtsBool have_lock, StgInt pri)
1318 {
1319 #else
1320 StgTSO *
1321 createThread(nat stack_size)
1322 {
1323   return createThread_(stack_size, rtsFalse);
1324 }
1325
1326 static StgTSO *
1327 createThread_(nat size, rtsBool have_lock)
1328 {
1329 #endif
1330
1331     StgTSO *tso;
1332     nat stack_size;
1333
1334     /* First check whether we should create a thread at all */
1335 #if defined(PAR)
1336   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1337   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1338     threadsIgnored++;
1339     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1340           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1341     return END_TSO_QUEUE;
1342   }
1343   threadsCreated++;
1344 #endif
1345
1346 #if defined(GRAN)
1347   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1348 #endif
1349
1350   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1351
1352   /* catch ridiculously small stack sizes */
1353   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1354     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1355   }
1356
1357   stack_size = size - TSO_STRUCT_SIZEW;
1358
1359   tso = (StgTSO *)allocate(size);
1360   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1361
1362   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1363 #if defined(GRAN)
1364   SET_GRAN_HDR(tso, ThisPE);
1365 #endif
1366   tso->what_next     = ThreadEnterGHC;
1367
1368   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1369    * protect the increment operation on next_thread_id.
1370    * In future, we could use an atomic increment instead.
1371    */
1372   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1373   tso->id = next_thread_id++; 
1374   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1375
1376   tso->why_blocked  = NotBlocked;
1377   tso->blocked_exceptions = NULL;
1378
1379   tso->stack_size   = stack_size;
1380   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1381                               - TSO_STRUCT_SIZEW;
1382   tso->sp           = (P_)&(tso->stack) + stack_size;
1383
1384 #ifdef PROFILING
1385   tso->prof.CCCS = CCS_MAIN;
1386 #endif
1387
1388   /* put a stop frame on the stack */
1389   tso->sp -= sizeofW(StgStopFrame);
1390   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1391   tso->su = (StgUpdateFrame*)tso->sp;
1392
1393   // ToDo: check this
1394 #if defined(GRAN)
1395   tso->link = END_TSO_QUEUE;
1396   /* uses more flexible routine in GranSim */
1397   insertThread(tso, CurrentProc);
1398 #else
1399   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1400    * from its creation
1401    */
1402 #endif
1403
1404 #if defined(GRAN) || defined(PAR)
1405   DumpGranEvent(GR_START,tso);
1406 #endif
1407
1408   /* Link the new thread on the global thread list.
1409    */
1410   tso->global_link = all_threads;
1411   all_threads = tso;
1412
1413 #if defined(GRAN)
1414   tso->gran.pri = pri;
1415 # if defined(DEBUG)
1416   tso->gran.magic = TSO_MAGIC; // debugging only
1417 # endif
1418   tso->gran.sparkname   = 0;
1419   tso->gran.startedat   = CURRENT_TIME; 
1420   tso->gran.exported    = 0;
1421   tso->gran.basicblocks = 0;
1422   tso->gran.allocs      = 0;
1423   tso->gran.exectime    = 0;
1424   tso->gran.fetchtime   = 0;
1425   tso->gran.fetchcount  = 0;
1426   tso->gran.blocktime   = 0;
1427   tso->gran.blockcount  = 0;
1428   tso->gran.blockedat   = 0;
1429   tso->gran.globalsparks = 0;
1430   tso->gran.localsparks  = 0;
1431   if (RtsFlags.GranFlags.Light)
1432     tso->gran.clock  = Now; /* local clock */
1433   else
1434     tso->gran.clock  = 0;
1435
1436   IF_DEBUG(gran,printTSO(tso));
1437 #elif defined(PAR)
1438 # if defined(DEBUG)
1439   tso->par.magic = TSO_MAGIC; // debugging only
1440 # endif
1441   tso->par.sparkname   = 0;
1442   tso->par.startedat   = CURRENT_TIME; 
1443   tso->par.exported    = 0;
1444   tso->par.basicblocks = 0;
1445   tso->par.allocs      = 0;
1446   tso->par.exectime    = 0;
1447   tso->par.fetchtime   = 0;
1448   tso->par.fetchcount  = 0;
1449   tso->par.blocktime   = 0;
1450   tso->par.blockcount  = 0;
1451   tso->par.blockedat   = 0;
1452   tso->par.globalsparks = 0;
1453   tso->par.localsparks  = 0;
1454 #endif
1455
1456 #if defined(GRAN)
1457   globalGranStats.tot_threads_created++;
1458   globalGranStats.threads_created_on_PE[CurrentProc]++;
1459   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1460   globalGranStats.tot_sq_probes++;
1461 #endif 
1462
1463 #if defined(GRAN)
1464   IF_GRAN_DEBUG(pri,
1465                 belch("==__ schedule: Created TSO %d (%p);",
1466                       CurrentProc, tso, tso->id));
1467 #elif defined(PAR)
1468     IF_PAR_DEBUG(verbose,
1469                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1470                        tso->id, tso, advisory_thread_count));
1471 #else
1472   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1473                                  tso->id, tso->stack_size));
1474 #endif    
1475   return tso;
1476 }
1477
1478 /*
1479   Turn a spark into a thread.
1480   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1481 */
1482 #if defined(PAR)
1483 //@cindex activateSpark
1484 StgTSO *
1485 activateSpark (rtsSpark spark) 
1486 {
1487   StgTSO *tso;
1488   
1489   ASSERT(spark != (rtsSpark)NULL);
1490   tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1491   if (tso!=END_TSO_QUEUE) {
1492     pushClosure(tso,spark);
1493     PUSH_ON_RUN_QUEUE(tso);
1494     advisory_thread_count++;
1495
1496     if (RtsFlags.ParFlags.ParStats.Full) {
1497       //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1498       IF_PAR_DEBUG(verbose,
1499                    belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1500                          (StgClosure *)spark, info_type((StgClosure *)spark)));
1501     }
1502   } else {
1503     barf("activateSpark: Cannot create TSO");
1504   }
1505   // ToDo: fwd info on local/global spark to thread -- HWL
1506   // tso->gran.exported =  spark->exported;
1507   // tso->gran.locked =   !spark->global;
1508   // tso->gran.sparkname = spark->name;
1509
1510   return tso;
1511 }
1512 #endif
1513
1514 /* ---------------------------------------------------------------------------
1515  * scheduleThread()
1516  *
1517  * scheduleThread puts a thread on the head of the runnable queue.
1518  * This will usually be done immediately after a thread is created.
1519  * The caller of scheduleThread must create the thread using e.g.
1520  * createThread and push an appropriate closure
1521  * on this thread's stack before the scheduler is invoked.
1522  * ------------------------------------------------------------------------ */
1523
1524 void
1525 scheduleThread(StgTSO *tso)
1526 {
1527   if (tso==END_TSO_QUEUE){    
1528     schedule();
1529     return;
1530   }
1531
1532   ACQUIRE_LOCK(&sched_mutex);
1533
1534   /* Put the new thread on the head of the runnable queue.  The caller
1535    * better push an appropriate closure on this thread's stack
1536    * beforehand.  In the SMP case, the thread may start running as
1537    * soon as we release the scheduler lock below.
1538    */
1539   PUSH_ON_RUN_QUEUE(tso);
1540   THREAD_RUNNABLE();
1541
1542 #if 0
1543   IF_DEBUG(scheduler,printTSO(tso));
1544 #endif
1545   RELEASE_LOCK(&sched_mutex);
1546 }
1547
1548 /* ---------------------------------------------------------------------------
1549  * startTasks()
1550  *
1551  * Start up Posix threads to run each of the scheduler tasks.
1552  * I believe the task ids are not needed in the system as defined.
1553  *  KH @ 25/10/99
1554  * ------------------------------------------------------------------------ */
1555
1556 #if defined(PAR) || defined(SMP)
1557 void *
1558 taskStart( void *arg STG_UNUSED )
1559 {
1560   rts_evalNothing(NULL);
1561 }
1562 #endif
1563
1564 /* ---------------------------------------------------------------------------
1565  * initScheduler()
1566  *
1567  * Initialise the scheduler.  This resets all the queues - if the
1568  * queues contained any threads, they'll be garbage collected at the
1569  * next pass.
1570  *
1571  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1572  * ------------------------------------------------------------------------ */
1573
1574 #ifdef SMP
1575 static void
1576 term_handler(int sig STG_UNUSED)
1577 {
1578   stat_workerStop();
1579   ACQUIRE_LOCK(&term_mutex);
1580   await_death--;
1581   RELEASE_LOCK(&term_mutex);
1582   pthread_exit(NULL);
1583 }
1584 #endif
1585
1586 //@cindex initScheduler
1587 void 
1588 initScheduler(void)
1589 {
1590 #if defined(GRAN)
1591   nat i;
1592
1593   for (i=0; i<=MAX_PROC; i++) {
1594     run_queue_hds[i]      = END_TSO_QUEUE;
1595     run_queue_tls[i]      = END_TSO_QUEUE;
1596     blocked_queue_hds[i]  = END_TSO_QUEUE;
1597     blocked_queue_tls[i]  = END_TSO_QUEUE;
1598     ccalling_threadss[i]  = END_TSO_QUEUE;
1599     sleeping_queue        = END_TSO_QUEUE;
1600   }
1601 #else
1602   run_queue_hd      = END_TSO_QUEUE;
1603   run_queue_tl      = END_TSO_QUEUE;
1604   blocked_queue_hd  = END_TSO_QUEUE;
1605   blocked_queue_tl  = END_TSO_QUEUE;
1606   sleeping_queue    = END_TSO_QUEUE;
1607 #endif 
1608
1609   suspended_ccalling_threads  = END_TSO_QUEUE;
1610
1611   main_threads = NULL;
1612   all_threads  = END_TSO_QUEUE;
1613
1614   context_switch = 0;
1615   interrupted    = 0;
1616
1617   RtsFlags.ConcFlags.ctxtSwitchTicks =
1618       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1619
1620 #ifdef INTERPRETER
1621   ecafList = END_ECAF_LIST;
1622   clearECafTable();
1623 #endif
1624
1625   /* Install the SIGHUP handler */
1626 #ifdef SMP
1627   {
1628     struct sigaction action,oact;
1629
1630     action.sa_handler = term_handler;
1631     sigemptyset(&action.sa_mask);
1632     action.sa_flags = 0;
1633     if (sigaction(SIGTERM, &action, &oact) != 0) {
1634       barf("can't install TERM handler");
1635     }
1636   }
1637 #endif
1638
1639 #ifdef SMP
1640   /* Allocate N Capabilities */
1641   {
1642     nat i;
1643     Capability *cap, *prev;
1644     cap  = NULL;
1645     prev = NULL;
1646     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1647       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1648       cap->link = prev;
1649       prev = cap;
1650     }
1651     free_capabilities = cap;
1652     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1653   }
1654   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1655                              n_free_capabilities););
1656 #endif
1657
1658 #if defined(SMP) || defined(PAR)
1659   initSparkPools();
1660 #endif
1661 }
1662
1663 #ifdef SMP
1664 void
1665 startTasks( void )
1666 {
1667   nat i;
1668   int r;
1669   pthread_t tid;
1670   
1671   /* make some space for saving all the thread ids */
1672   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1673                             "initScheduler:task_ids");
1674   
1675   /* and create all the threads */
1676   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1677     r = pthread_create(&tid,NULL,taskStart,NULL);
1678     if (r != 0) {
1679       barf("startTasks: Can't create new Posix thread");
1680     }
1681     task_ids[i].id = tid;
1682     task_ids[i].mut_time = 0.0;
1683     task_ids[i].mut_etime = 0.0;
1684     task_ids[i].gc_time = 0.0;
1685     task_ids[i].gc_etime = 0.0;
1686     task_ids[i].elapsedtimestart = elapsedtime();
1687     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1688   }
1689 }
1690 #endif
1691
1692 void
1693 exitScheduler( void )
1694 {
1695 #ifdef SMP
1696   nat i;
1697
1698   /* Don't want to use pthread_cancel, since we'd have to install
1699    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1700    * all our locks.
1701    */
1702 #if 0
1703   /* Cancel all our tasks */
1704   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1705     pthread_cancel(task_ids[i].id);
1706   }
1707   
1708   /* Wait for all the tasks to terminate */
1709   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1710     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1711                                task_ids[i].id));
1712     pthread_join(task_ids[i].id, NULL);
1713   }
1714 #endif
1715
1716   /* Send 'em all a SIGHUP.  That should shut 'em up.
1717    */
1718   await_death = RtsFlags.ParFlags.nNodes;
1719   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1720     pthread_kill(task_ids[i].id,SIGTERM);
1721   }
1722   while (await_death > 0) {
1723     sched_yield();
1724   }
1725 #endif
1726 }
1727
1728 /* -----------------------------------------------------------------------------
1729    Managing the per-task allocation areas.
1730    
1731    Each capability comes with an allocation area.  These are
1732    fixed-length block lists into which allocation can be done.
1733
1734    ToDo: no support for two-space collection at the moment???
1735    -------------------------------------------------------------------------- */
1736
1737 /* -----------------------------------------------------------------------------
1738  * waitThread is the external interface for running a new computation
1739  * and waiting for the result.
1740  *
1741  * In the non-SMP case, we create a new main thread, push it on the 
1742  * main-thread stack, and invoke the scheduler to run it.  The
1743  * scheduler will return when the top main thread on the stack has
1744  * completed or died, and fill in the necessary fields of the
1745  * main_thread structure.
1746  *
1747  * In the SMP case, we create a main thread as before, but we then
1748  * create a new condition variable and sleep on it.  When our new
1749  * main thread has completed, we'll be woken up and the status/result
1750  * will be in the main_thread struct.
1751  * -------------------------------------------------------------------------- */
1752
1753 int 
1754 howManyThreadsAvail ( void )
1755 {
1756    int i = 0;
1757    StgTSO* q;
1758    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1759       i++;
1760    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1761       i++;
1762    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1763       i++;
1764    return i;
1765 }
1766
1767 void
1768 finishAllThreads ( void )
1769 {
1770    do {
1771       while (run_queue_hd != END_TSO_QUEUE) {
1772          waitThread ( run_queue_hd, NULL );
1773       }
1774       while (blocked_queue_hd != END_TSO_QUEUE) {
1775          waitThread ( blocked_queue_hd, NULL );
1776       }
1777       while (sleeping_queue != END_TSO_QUEUE) {
1778          waitThread ( blocked_queue_hd, NULL );
1779       }
1780    } while 
1781       (blocked_queue_hd != END_TSO_QUEUE || 
1782        run_queue_hd     != END_TSO_QUEUE ||
1783        sleeping_queue   != END_TSO_QUEUE);
1784 }
1785
1786 SchedulerStatus
1787 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1788 {
1789   StgMainThread *m;
1790   SchedulerStatus stat;
1791
1792   ACQUIRE_LOCK(&sched_mutex);
1793   
1794   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1795
1796   m->tso = tso;
1797   m->ret = ret;
1798   m->stat = NoStatus;
1799 #ifdef SMP
1800   pthread_cond_init(&m->wakeup, NULL);
1801 #endif
1802
1803   m->link = main_threads;
1804   main_threads = m;
1805
1806   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1807                               m->tso->id));
1808
1809 #ifdef SMP
1810   do {
1811     pthread_cond_wait(&m->wakeup, &sched_mutex);
1812   } while (m->stat == NoStatus);
1813 #elif defined(GRAN)
1814   /* GranSim specific init */
1815   CurrentTSO = m->tso;                // the TSO to run
1816   procStatus[MainProc] = Busy;        // status of main PE
1817   CurrentProc = MainProc;             // PE to run it on
1818
1819   schedule();
1820 #else
1821   schedule();
1822   ASSERT(m->stat != NoStatus);
1823 #endif
1824
1825   stat = m->stat;
1826
1827 #ifdef SMP
1828   pthread_cond_destroy(&m->wakeup);
1829 #endif
1830
1831   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1832                               m->tso->id));
1833   free(m);
1834
1835   RELEASE_LOCK(&sched_mutex);
1836
1837   return stat;
1838 }
1839
1840 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1841 //@subsection Run queue code 
1842
1843 #if 0
1844 /* 
1845    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1846        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1847        implicit global variable that has to be correct when calling these
1848        fcts -- HWL 
1849 */
1850
1851 /* Put the new thread on the head of the runnable queue.
1852  * The caller of createThread better push an appropriate closure
1853  * on this thread's stack before the scheduler is invoked.
1854  */
1855 static /* inline */ void
1856 add_to_run_queue(tso)
1857 StgTSO* tso; 
1858 {
1859   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1860   tso->link = run_queue_hd;
1861   run_queue_hd = tso;
1862   if (run_queue_tl == END_TSO_QUEUE) {
1863     run_queue_tl = tso;
1864   }
1865 }
1866
1867 /* Put the new thread at the end of the runnable queue. */
1868 static /* inline */ void
1869 push_on_run_queue(tso)
1870 StgTSO* tso; 
1871 {
1872   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1873   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1874   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1875   if (run_queue_hd == END_TSO_QUEUE) {
1876     run_queue_hd = tso;
1877   } else {
1878     run_queue_tl->link = tso;
1879   }
1880   run_queue_tl = tso;
1881 }
1882
1883 /* 
1884    Should be inlined because it's used very often in schedule.  The tso
1885    argument is actually only needed in GranSim, where we want to have the
1886    possibility to schedule *any* TSO on the run queue, irrespective of the
1887    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1888    the run queue and dequeue the tso, adjusting the links in the queue. 
1889 */
1890 //@cindex take_off_run_queue
1891 static /* inline */ StgTSO*
1892 take_off_run_queue(StgTSO *tso) {
1893   StgTSO *t, *prev;
1894
1895   /* 
1896      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1897
1898      if tso is specified, unlink that tso from the run_queue (doesn't have
1899      to be at the beginning of the queue); GranSim only 
1900   */
1901   if (tso!=END_TSO_QUEUE) {
1902     /* find tso in queue */
1903     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1904          t!=END_TSO_QUEUE && t!=tso;
1905          prev=t, t=t->link) 
1906       /* nothing */ ;
1907     ASSERT(t==tso);
1908     /* now actually dequeue the tso */
1909     if (prev!=END_TSO_QUEUE) {
1910       ASSERT(run_queue_hd!=t);
1911       prev->link = t->link;
1912     } else {
1913       /* t is at beginning of thread queue */
1914       ASSERT(run_queue_hd==t);
1915       run_queue_hd = t->link;
1916     }
1917     /* t is at end of thread queue */
1918     if (t->link==END_TSO_QUEUE) {
1919       ASSERT(t==run_queue_tl);
1920       run_queue_tl = prev;
1921     } else {
1922       ASSERT(run_queue_tl!=t);
1923     }
1924     t->link = END_TSO_QUEUE;
1925   } else {
1926     /* take tso from the beginning of the queue; std concurrent code */
1927     t = run_queue_hd;
1928     if (t != END_TSO_QUEUE) {
1929       run_queue_hd = t->link;
1930       t->link = END_TSO_QUEUE;
1931       if (run_queue_hd == END_TSO_QUEUE) {
1932         run_queue_tl = END_TSO_QUEUE;
1933       }
1934     }
1935   }
1936   return t;
1937 }
1938
1939 #endif /* 0 */
1940
1941 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1942 //@subsection Garbage Collextion Routines
1943
1944 /* ---------------------------------------------------------------------------
1945    Where are the roots that we know about?
1946
1947         - all the threads on the runnable queue
1948         - all the threads on the blocked queue
1949         - all the threads on the sleeping queue
1950         - all the thread currently executing a _ccall_GC
1951         - all the "main threads"
1952      
1953    ------------------------------------------------------------------------ */
1954
1955 /* This has to be protected either by the scheduler monitor, or by the
1956         garbage collection monitor (probably the latter).
1957         KH @ 25/10/99
1958 */
1959
1960 static void GetRoots(void)
1961 {
1962   StgMainThread *m;
1963
1964 #if defined(GRAN)
1965   {
1966     nat i;
1967     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1968       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1969         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1970       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1971         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1972       
1973       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1974         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1975       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1976         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1977       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1978         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1979     }
1980   }
1981
1982   markEventQueue();
1983
1984 #else /* !GRAN */
1985   if (run_queue_hd != END_TSO_QUEUE) {
1986     ASSERT(run_queue_tl != END_TSO_QUEUE);
1987     run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1988     run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1989   }
1990
1991   if (blocked_queue_hd != END_TSO_QUEUE) {
1992     ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1993     blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1994     blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1995   }
1996
1997   if (sleeping_queue != END_TSO_QUEUE) {
1998     sleeping_queue  = (StgTSO *)MarkRoot((StgClosure *)sleeping_queue);
1999   }
2000 #endif 
2001
2002   for (m = main_threads; m != NULL; m = m->link) {
2003     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
2004   }
2005   if (suspended_ccalling_threads != END_TSO_QUEUE)
2006     suspended_ccalling_threads = 
2007       (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
2008
2009 #if defined(SMP) || defined(PAR) || defined(GRAN)
2010   markSparkQueue();
2011 #endif
2012 }
2013
2014 /* -----------------------------------------------------------------------------
2015    performGC
2016
2017    This is the interface to the garbage collector from Haskell land.
2018    We provide this so that external C code can allocate and garbage
2019    collect when called from Haskell via _ccall_GC.
2020
2021    It might be useful to provide an interface whereby the programmer
2022    can specify more roots (ToDo).
2023    
2024    This needs to be protected by the GC condition variable above.  KH.
2025    -------------------------------------------------------------------------- */
2026
2027 void (*extra_roots)(void);
2028
2029 void
2030 performGC(void)
2031 {
2032   GarbageCollect(GetRoots,rtsFalse);
2033 }
2034
2035 void
2036 performMajorGC(void)
2037 {
2038   GarbageCollect(GetRoots,rtsTrue);
2039 }
2040
2041 static void
2042 AllRoots(void)
2043 {
2044   GetRoots();                   /* the scheduler's roots */
2045   extra_roots();                /* the user's roots */
2046 }
2047
2048 void
2049 performGCWithRoots(void (*get_roots)(void))
2050 {
2051   extra_roots = get_roots;
2052
2053   GarbageCollect(AllRoots,rtsFalse);
2054 }
2055
2056 /* -----------------------------------------------------------------------------
2057    Stack overflow
2058
2059    If the thread has reached its maximum stack size, then raise the
2060    StackOverflow exception in the offending thread.  Otherwise
2061    relocate the TSO into a larger chunk of memory and adjust its stack
2062    size appropriately.
2063    -------------------------------------------------------------------------- */
2064
2065 static StgTSO *
2066 threadStackOverflow(StgTSO *tso)
2067 {
2068   nat new_stack_size, new_tso_size, diff, stack_words;
2069   StgPtr new_sp;
2070   StgTSO *dest;
2071
2072   IF_DEBUG(sanity,checkTSO(tso));
2073   if (tso->stack_size >= tso->max_stack_size) {
2074
2075     IF_DEBUG(gc,
2076              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2077                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2078              /* If we're debugging, just print out the top of the stack */
2079              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2080                                               tso->sp+64)));
2081
2082 #ifdef INTERPRETER
2083     fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
2084     exit(1);
2085 #else
2086     /* Send this thread the StackOverflow exception */
2087     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2088 #endif
2089     return tso;
2090   }
2091
2092   /* Try to double the current stack size.  If that takes us over the
2093    * maximum stack size for this thread, then use the maximum instead.
2094    * Finally round up so the TSO ends up as a whole number of blocks.
2095    */
2096   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2097   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2098                                        TSO_STRUCT_SIZE)/sizeof(W_);
2099   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2100   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2101
2102   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2103
2104   dest = (StgTSO *)allocate(new_tso_size);
2105   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2106
2107   /* copy the TSO block and the old stack into the new area */
2108   memcpy(dest,tso,TSO_STRUCT_SIZE);
2109   stack_words = tso->stack + tso->stack_size - tso->sp;
2110   new_sp = (P_)dest + new_tso_size - stack_words;
2111   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2112
2113   /* relocate the stack pointers... */
2114   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2115   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2116   dest->sp    = new_sp;
2117   dest->stack_size = new_stack_size;
2118         
2119   /* and relocate the update frame list */
2120   relocate_TSO(tso, dest);
2121
2122   /* Mark the old TSO as relocated.  We have to check for relocated
2123    * TSOs in the garbage collector and any primops that deal with TSOs.
2124    *
2125    * It's important to set the sp and su values to just beyond the end
2126    * of the stack, so we don't attempt to scavenge any part of the
2127    * dead TSO's stack.
2128    */
2129   tso->what_next = ThreadRelocated;
2130   tso->link = dest;
2131   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2132   tso->su = (StgUpdateFrame *)tso->sp;
2133   tso->why_blocked = NotBlocked;
2134   dest->mut_link = NULL;
2135
2136   IF_PAR_DEBUG(verbose,
2137                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2138                      tso->id, tso, tso->stack_size);
2139                /* If we're debugging, just print out the top of the stack */
2140                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2141                                                 tso->sp+64)));
2142   
2143   IF_DEBUG(sanity,checkTSO(tso));
2144 #if 0
2145   IF_DEBUG(scheduler,printTSO(dest));
2146 #endif
2147
2148   return dest;
2149 }
2150
2151 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2152 //@subsection Blocking Queue Routines
2153
2154 /* ---------------------------------------------------------------------------
2155    Wake up a queue that was blocked on some resource.
2156    ------------------------------------------------------------------------ */
2157
2158 #if defined(GRAN)
2159 static inline void
2160 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2161 {
2162 }
2163 #elif defined(PAR)
2164 static inline void
2165 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2166 {
2167   /* write RESUME events to log file and
2168      update blocked and fetch time (depending on type of the orig closure) */
2169   if (RtsFlags.ParFlags.ParStats.Full) {
2170     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2171                      GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2172                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2173
2174     switch (get_itbl(node)->type) {
2175         case FETCH_ME_BQ:
2176           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2177           break;
2178         case RBH:
2179         case FETCH_ME:
2180         case BLACKHOLE_BQ:
2181           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2182           break;
2183         default:
2184           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2185         }
2186       }
2187 }
2188 #endif
2189
2190 #if defined(GRAN)
2191 static StgBlockingQueueElement *
2192 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2193 {
2194     StgTSO *tso;
2195     PEs node_loc, tso_loc;
2196
2197     node_loc = where_is(node); // should be lifted out of loop
2198     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2199     tso_loc = where_is((StgClosure *)tso);
2200     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2201       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2202       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2203       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2204       // insertThread(tso, node_loc);
2205       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2206                 ResumeThread,
2207                 tso, node, (rtsSpark*)NULL);
2208       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2209       // len_local++;
2210       // len++;
2211     } else { // TSO is remote (actually should be FMBQ)
2212       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2213                                   RtsFlags.GranFlags.Costs.gunblocktime +
2214                                   RtsFlags.GranFlags.Costs.latency;
2215       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2216                 UnblockThread,
2217                 tso, node, (rtsSpark*)NULL);
2218       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2219       // len++;
2220     }
2221     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2222     IF_GRAN_DEBUG(bq,
2223                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2224                           (node_loc==tso_loc ? "Local" : "Global"), 
2225                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2226     tso->block_info.closure = NULL;
2227     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2228                              tso->id, tso));
2229 }
2230 #elif defined(PAR)
2231 static StgBlockingQueueElement *
2232 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2233 {
2234     StgBlockingQueueElement *next;
2235
2236     switch (get_itbl(bqe)->type) {
2237     case TSO:
2238       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2239       /* if it's a TSO just push it onto the run_queue */
2240       next = bqe->link;
2241       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2242       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2243       THREAD_RUNNABLE();
2244       unblockCount(bqe, node);
2245       /* reset blocking status after dumping event */
2246       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2247       break;
2248
2249     case BLOCKED_FETCH:
2250       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2251       next = bqe->link;
2252       bqe->link = PendingFetches;
2253       PendingFetches = bqe;
2254       break;
2255
2256 # if defined(DEBUG)
2257       /* can ignore this case in a non-debugging setup; 
2258          see comments on RBHSave closures above */
2259     case CONSTR:
2260       /* check that the closure is an RBHSave closure */
2261       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2262              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2263              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2264       break;
2265
2266     default:
2267       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2268            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2269            (StgClosure *)bqe);
2270 # endif
2271     }
2272   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2273   return next;
2274 }
2275
2276 #else /* !GRAN && !PAR */
2277 static StgTSO *
2278 unblockOneLocked(StgTSO *tso)
2279 {
2280   StgTSO *next;
2281
2282   ASSERT(get_itbl(tso)->type == TSO);
2283   ASSERT(tso->why_blocked != NotBlocked);
2284   tso->why_blocked = NotBlocked;
2285   next = tso->link;
2286   PUSH_ON_RUN_QUEUE(tso);
2287   THREAD_RUNNABLE();
2288   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2289   return next;
2290 }
2291 #endif
2292
2293 #if defined(GRAN) || defined(PAR)
2294 inline StgBlockingQueueElement *
2295 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2296 {
2297   ACQUIRE_LOCK(&sched_mutex);
2298   bqe = unblockOneLocked(bqe, node);
2299   RELEASE_LOCK(&sched_mutex);
2300   return bqe;
2301 }
2302 #else
2303 inline StgTSO *
2304 unblockOne(StgTSO *tso)
2305 {
2306   ACQUIRE_LOCK(&sched_mutex);
2307   tso = unblockOneLocked(tso);
2308   RELEASE_LOCK(&sched_mutex);
2309   return tso;
2310 }
2311 #endif
2312
2313 #if defined(GRAN)
2314 void 
2315 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2316 {
2317   StgBlockingQueueElement *bqe;
2318   PEs node_loc;
2319   nat len = 0; 
2320
2321   IF_GRAN_DEBUG(bq, 
2322                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2323                       node, CurrentProc, CurrentTime[CurrentProc], 
2324                       CurrentTSO->id, CurrentTSO));
2325
2326   node_loc = where_is(node);
2327
2328   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2329          get_itbl(q)->type == CONSTR); // closure (type constructor)
2330   ASSERT(is_unique(node));
2331
2332   /* FAKE FETCH: magically copy the node to the tso's proc;
2333      no Fetch necessary because in reality the node should not have been 
2334      moved to the other PE in the first place
2335   */
2336   if (CurrentProc!=node_loc) {
2337     IF_GRAN_DEBUG(bq, 
2338                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2339                         node, node_loc, CurrentProc, CurrentTSO->id, 
2340                         // CurrentTSO, where_is(CurrentTSO),
2341                         node->header.gran.procs));
2342     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2343     IF_GRAN_DEBUG(bq, 
2344                   belch("## new bitmask of node %p is %#x",
2345                         node, node->header.gran.procs));
2346     if (RtsFlags.GranFlags.GranSimStats.Global) {
2347       globalGranStats.tot_fake_fetches++;
2348     }
2349   }
2350
2351   bqe = q;
2352   // ToDo: check: ASSERT(CurrentProc==node_loc);
2353   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2354     //next = bqe->link;
2355     /* 
2356        bqe points to the current element in the queue
2357        next points to the next element in the queue
2358     */
2359     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2360     //tso_loc = where_is(tso);
2361     len++;
2362     bqe = unblockOneLocked(bqe, node);
2363   }
2364
2365   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2366      the closure to make room for the anchor of the BQ */
2367   if (bqe!=END_BQ_QUEUE) {
2368     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2369     /*
2370     ASSERT((info_ptr==&RBH_Save_0_info) ||
2371            (info_ptr==&RBH_Save_1_info) ||
2372            (info_ptr==&RBH_Save_2_info));
2373     */
2374     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2375     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2376     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2377
2378     IF_GRAN_DEBUG(bq,
2379                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2380                         node, info_type(node)));
2381   }
2382
2383   /* statistics gathering */
2384   if (RtsFlags.GranFlags.GranSimStats.Global) {
2385     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2386     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2387     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2388     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2389   }
2390   IF_GRAN_DEBUG(bq,
2391                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2392                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2393 }
2394 #elif defined(PAR)
2395 void 
2396 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2397 {
2398   StgBlockingQueueElement *bqe, *next;
2399
2400   ACQUIRE_LOCK(&sched_mutex);
2401
2402   IF_PAR_DEBUG(verbose, 
2403                belch("## AwBQ for node %p on [%x]: ",
2404                      node, mytid));
2405
2406   ASSERT(get_itbl(q)->type == TSO ||           
2407          get_itbl(q)->type == BLOCKED_FETCH || 
2408          get_itbl(q)->type == CONSTR); 
2409
2410   bqe = q;
2411   while (get_itbl(bqe)->type==TSO || 
2412          get_itbl(bqe)->type==BLOCKED_FETCH) {
2413     bqe = unblockOneLocked(bqe, node);
2414   }
2415   RELEASE_LOCK(&sched_mutex);
2416 }
2417
2418 #else   /* !GRAN && !PAR */
2419 void
2420 awakenBlockedQueue(StgTSO *tso)
2421 {
2422   ACQUIRE_LOCK(&sched_mutex);
2423   while (tso != END_TSO_QUEUE) {
2424     tso = unblockOneLocked(tso);
2425   }
2426   RELEASE_LOCK(&sched_mutex);
2427 }
2428 #endif
2429
2430 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2431 //@subsection Exception Handling Routines
2432
2433 /* ---------------------------------------------------------------------------
2434    Interrupt execution
2435    - usually called inside a signal handler so it mustn't do anything fancy.   
2436    ------------------------------------------------------------------------ */
2437
2438 void
2439 interruptStgRts(void)
2440 {
2441     interrupted    = 1;
2442     context_switch = 1;
2443 }
2444
2445 /* -----------------------------------------------------------------------------
2446    Unblock a thread
2447
2448    This is for use when we raise an exception in another thread, which
2449    may be blocked.
2450    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2451    -------------------------------------------------------------------------- */
2452
2453 #if defined(GRAN) || defined(PAR)
2454 /*
2455   NB: only the type of the blocking queue is different in GranSim and GUM
2456       the operations on the queue-elements are the same
2457       long live polymorphism!
2458 */
2459 static void
2460 unblockThread(StgTSO *tso)
2461 {
2462   StgBlockingQueueElement *t, **last;
2463
2464   ACQUIRE_LOCK(&sched_mutex);
2465   switch (tso->why_blocked) {
2466
2467   case NotBlocked:
2468     return;  /* not blocked */
2469
2470   case BlockedOnMVar:
2471     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2472     {
2473       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2474       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2475
2476       last = (StgBlockingQueueElement **)&mvar->head;
2477       for (t = (StgBlockingQueueElement *)mvar->head; 
2478            t != END_BQ_QUEUE; 
2479            last = &t->link, last_tso = t, t = t->link) {
2480         if (t == (StgBlockingQueueElement *)tso) {
2481           *last = (StgBlockingQueueElement *)tso->link;
2482           if (mvar->tail == tso) {
2483             mvar->tail = (StgTSO *)last_tso;
2484           }
2485           goto done;
2486         }
2487       }
2488       barf("unblockThread (MVAR): TSO not found");
2489     }
2490
2491   case BlockedOnBlackHole:
2492     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2493     {
2494       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2495
2496       last = &bq->blocking_queue;
2497       for (t = bq->blocking_queue; 
2498            t != END_BQ_QUEUE; 
2499            last = &t->link, t = t->link) {
2500         if (t == (StgBlockingQueueElement *)tso) {
2501           *last = (StgBlockingQueueElement *)tso->link;
2502           goto done;
2503         }
2504       }
2505       barf("unblockThread (BLACKHOLE): TSO not found");
2506     }
2507
2508   case BlockedOnException:
2509     {
2510       StgTSO *target  = tso->block_info.tso;
2511
2512       ASSERT(get_itbl(target)->type == TSO);
2513       ASSERT(target->blocked_exceptions != NULL);
2514
2515       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2516       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2517            t != END_BQ_QUEUE; 
2518            last = &t->link, t = t->link) {
2519         ASSERT(get_itbl(t)->type == TSO);
2520         if (t == (StgBlockingQueueElement *)tso) {
2521           *last = (StgBlockingQueueElement *)tso->link;
2522           goto done;
2523         }
2524       }
2525       barf("unblockThread (Exception): TSO not found");
2526     }
2527
2528   case BlockedOnRead:
2529   case BlockedOnWrite:
2530     {
2531       StgBlockingQueueElement *prev = NULL;
2532       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2533            prev = t, t = t->link) {
2534         if (t == (StgBlockingQueueElement *)tso) {
2535           if (prev == NULL) {
2536             blocked_queue_hd = (StgTSO *)t->link;
2537             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2538               blocked_queue_tl = END_TSO_QUEUE;
2539             }
2540           } else {
2541             prev->link = t->link;
2542             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2543               blocked_queue_tl = (StgTSO *)prev;
2544             }
2545           }
2546           goto done;
2547         }
2548       }
2549       barf("unblockThread (I/O): TSO not found");
2550     }
2551
2552   case BlockedOnDelay:
2553     {
2554       StgBlockingQueueElement *prev = NULL;
2555       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2556            prev = t, t = t->link) {
2557         if (t == (StgBlockingQueueElement *)tso) {
2558           if (prev == NULL) {
2559             sleeping_queue = (StgTSO *)t->link;
2560           } else {
2561             prev->link = t->link;
2562           }
2563           goto done;
2564         }
2565       }
2566       barf("unblockThread (I/O): TSO not found");
2567     }
2568
2569   default:
2570     barf("unblockThread");
2571   }
2572
2573  done:
2574   tso->link = END_TSO_QUEUE;
2575   tso->why_blocked = NotBlocked;
2576   tso->block_info.closure = NULL;
2577   PUSH_ON_RUN_QUEUE(tso);
2578   RELEASE_LOCK(&sched_mutex);
2579 }
2580 #else
2581 static void
2582 unblockThread(StgTSO *tso)
2583 {
2584   StgTSO *t, **last;
2585
2586   ACQUIRE_LOCK(&sched_mutex);
2587   switch (tso->why_blocked) {
2588
2589   case NotBlocked:
2590     return;  /* not blocked */
2591
2592   case BlockedOnMVar:
2593     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2594     {
2595       StgTSO *last_tso = END_TSO_QUEUE;
2596       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2597
2598       last = &mvar->head;
2599       for (t = mvar->head; t != END_TSO_QUEUE; 
2600            last = &t->link, last_tso = t, t = t->link) {
2601         if (t == tso) {
2602           *last = tso->link;
2603           if (mvar->tail == tso) {
2604             mvar->tail = last_tso;
2605           }
2606           goto done;
2607         }
2608       }
2609       barf("unblockThread (MVAR): TSO not found");
2610     }
2611
2612   case BlockedOnBlackHole:
2613     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2614     {
2615       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2616
2617       last = &bq->blocking_queue;
2618       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2619            last = &t->link, t = t->link) {
2620         if (t == tso) {
2621           *last = tso->link;
2622           goto done;
2623         }
2624       }
2625       barf("unblockThread (BLACKHOLE): TSO not found");
2626     }
2627
2628   case BlockedOnException:
2629     {
2630       StgTSO *target  = tso->block_info.tso;
2631
2632       ASSERT(get_itbl(target)->type == TSO);
2633       ASSERT(target->blocked_exceptions != NULL);
2634
2635       last = &target->blocked_exceptions;
2636       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2637            last = &t->link, t = t->link) {
2638         ASSERT(get_itbl(t)->type == TSO);
2639         if (t == tso) {
2640           *last = tso->link;
2641           goto done;
2642         }
2643       }
2644       barf("unblockThread (Exception): TSO not found");
2645     }
2646
2647   case BlockedOnRead:
2648   case BlockedOnWrite:
2649     {
2650       StgTSO *prev = NULL;
2651       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2652            prev = t, t = t->link) {
2653         if (t == tso) {
2654           if (prev == NULL) {
2655             blocked_queue_hd = t->link;
2656             if (blocked_queue_tl == t) {
2657               blocked_queue_tl = END_TSO_QUEUE;
2658             }
2659           } else {
2660             prev->link = t->link;
2661             if (blocked_queue_tl == t) {
2662               blocked_queue_tl = prev;
2663             }
2664           }
2665           goto done;
2666         }
2667       }
2668       barf("unblockThread (I/O): TSO not found");
2669     }
2670
2671   case BlockedOnDelay:
2672     {
2673       StgTSO *prev = NULL;
2674       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2675            prev = t, t = t->link) {
2676         if (t == tso) {
2677           if (prev == NULL) {
2678             sleeping_queue = t->link;
2679           } else {
2680             prev->link = t->link;
2681           }
2682           goto done;
2683         }
2684       }
2685       barf("unblockThread (I/O): TSO not found");
2686     }
2687
2688   default:
2689     barf("unblockThread");
2690   }
2691
2692  done:
2693   tso->link = END_TSO_QUEUE;
2694   tso->why_blocked = NotBlocked;
2695   tso->block_info.closure = NULL;
2696   PUSH_ON_RUN_QUEUE(tso);
2697   RELEASE_LOCK(&sched_mutex);
2698 }
2699 #endif
2700
2701 /* -----------------------------------------------------------------------------
2702  * raiseAsync()
2703  *
2704  * The following function implements the magic for raising an
2705  * asynchronous exception in an existing thread.
2706  *
2707  * We first remove the thread from any queue on which it might be
2708  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2709  *
2710  * We strip the stack down to the innermost CATCH_FRAME, building
2711  * thunks in the heap for all the active computations, so they can 
2712  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2713  * an application of the handler to the exception, and push it on
2714  * the top of the stack.
2715  * 
2716  * How exactly do we save all the active computations?  We create an
2717  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2718  * AP_UPDs pushes everything from the corresponding update frame
2719  * upwards onto the stack.  (Actually, it pushes everything up to the
2720  * next update frame plus a pointer to the next AP_UPD object.
2721  * Entering the next AP_UPD object pushes more onto the stack until we
2722  * reach the last AP_UPD object - at which point the stack should look
2723  * exactly as it did when we killed the TSO and we can continue
2724  * execution by entering the closure on top of the stack.
2725  *
2726  * We can also kill a thread entirely - this happens if either (a) the 
2727  * exception passed to raiseAsync is NULL, or (b) there's no
2728  * CATCH_FRAME on the stack.  In either case, we strip the entire
2729  * stack and replace the thread with a zombie.
2730  *
2731  * -------------------------------------------------------------------------- */
2732  
2733 void 
2734 deleteThread(StgTSO *tso)
2735 {
2736   raiseAsync(tso,NULL);
2737 }
2738
2739 void
2740 raiseAsync(StgTSO *tso, StgClosure *exception)
2741 {
2742   StgUpdateFrame* su = tso->su;
2743   StgPtr          sp = tso->sp;
2744   
2745   /* Thread already dead? */
2746   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2747     return;
2748   }
2749
2750   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2751
2752   /* Remove it from any blocking queues */
2753   unblockThread(tso);
2754
2755   /* The stack freezing code assumes there's a closure pointer on
2756    * the top of the stack.  This isn't always the case with compiled
2757    * code, so we have to push a dummy closure on the top which just
2758    * returns to the next return address on the stack.
2759    */
2760   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2761     *(--sp) = (W_)&stg_dummy_ret_closure;
2762   }
2763
2764   while (1) {
2765     int words = ((P_)su - (P_)sp) - 1;
2766     nat i;
2767     StgAP_UPD * ap;
2768
2769     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2770      * then build PAP(handler,exception,realworld#), and leave it on
2771      * top of the stack ready to enter.
2772      */
2773     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2774       StgCatchFrame *cf = (StgCatchFrame *)su;
2775       /* we've got an exception to raise, so let's pass it to the
2776        * handler in this frame.
2777        */
2778       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2779       TICK_ALLOC_UPD_PAP(3,0);
2780       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
2781               
2782       ap->n_args = 2;
2783       ap->fun = cf->handler;    /* :: Exception -> IO a */
2784       ap->payload[0] = exception;
2785       ap->payload[1] = ARG_TAG(0); /* realworld token */
2786
2787       /* throw away the stack from Sp up to and including the
2788        * CATCH_FRAME.
2789        */
2790       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2791       tso->su = cf->link;
2792
2793       /* Restore the blocked/unblocked state for asynchronous exceptions
2794        * at the CATCH_FRAME.  
2795        *
2796        * If exceptions were unblocked at the catch, arrange that they
2797        * are unblocked again after executing the handler by pushing an
2798        * unblockAsyncExceptions_ret stack frame.
2799        */
2800       if (!cf->exceptions_blocked) {
2801         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
2802       }
2803       
2804       /* Ensure that async exceptions are blocked when running the handler.
2805        */
2806       if (tso->blocked_exceptions == NULL) {
2807         tso->blocked_exceptions = END_TSO_QUEUE;
2808       }
2809       
2810       /* Put the newly-built PAP on top of the stack, ready to execute
2811        * when the thread restarts.
2812        */
2813       sp[0] = (W_)ap;
2814       tso->sp = sp;
2815       tso->what_next = ThreadEnterGHC;
2816       IF_DEBUG(sanity, checkTSO(tso));
2817       return;
2818     }
2819
2820     /* First build an AP_UPD consisting of the stack chunk above the
2821      * current update frame, with the top word on the stack as the
2822      * fun field.
2823      */
2824     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2825     
2826     ASSERT(words >= 0);
2827     
2828     ap->n_args = words;
2829     ap->fun    = (StgClosure *)sp[0];
2830     sp++;
2831     for(i=0; i < (nat)words; ++i) {
2832       ap->payload[i] = (StgClosure *)*sp++;
2833     }
2834     
2835     switch (get_itbl(su)->type) {
2836       
2837     case UPDATE_FRAME:
2838       {
2839         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2840         TICK_ALLOC_UP_THK(words+1,0);
2841         
2842         IF_DEBUG(scheduler,
2843                  fprintf(stderr,  "scheduler: Updating ");
2844                  printPtr((P_)su->updatee); 
2845                  fprintf(stderr,  " with ");
2846                  printObj((StgClosure *)ap);
2847                  );
2848         
2849         /* Replace the updatee with an indirection - happily
2850          * this will also wake up any threads currently
2851          * waiting on the result.
2852          */
2853         UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2854         su = su->link;
2855         sp += sizeofW(StgUpdateFrame) -1;
2856         sp[0] = (W_)ap; /* push onto stack */
2857         break;
2858       }
2859       
2860     case CATCH_FRAME:
2861       {
2862         StgCatchFrame *cf = (StgCatchFrame *)su;
2863         StgClosure* o;
2864         
2865         /* We want a PAP, not an AP_UPD.  Fortunately, the
2866          * layout's the same.
2867          */
2868         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
2869         TICK_ALLOC_UPD_PAP(words+1,0);
2870         
2871         /* now build o = FUN(catch,ap,handler) */
2872         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2873         TICK_ALLOC_FUN(2,0);
2874         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
2875         o->payload[0] = (StgClosure *)ap;
2876         o->payload[1] = cf->handler;
2877         
2878         IF_DEBUG(scheduler,
2879                  fprintf(stderr,  "scheduler: Built ");
2880                  printObj((StgClosure *)o);
2881                  );
2882         
2883         /* pop the old handler and put o on the stack */
2884         su = cf->link;
2885         sp += sizeofW(StgCatchFrame) - 1;
2886         sp[0] = (W_)o;
2887         break;
2888       }
2889       
2890     case SEQ_FRAME:
2891       {
2892         StgSeqFrame *sf = (StgSeqFrame *)su;
2893         StgClosure* o;
2894         
2895         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
2896         TICK_ALLOC_UPD_PAP(words+1,0);
2897         
2898         /* now build o = FUN(seq,ap) */
2899         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2900         TICK_ALLOC_SE_THK(1,0);
2901         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
2902         o->payload[0] = (StgClosure *)ap;
2903         
2904         IF_DEBUG(scheduler,
2905                  fprintf(stderr,  "scheduler: Built ");
2906                  printObj((StgClosure *)o);
2907                  );
2908         
2909         /* pop the old handler and put o on the stack */
2910         su = sf->link;
2911         sp += sizeofW(StgSeqFrame) - 1;
2912         sp[0] = (W_)o;
2913         break;
2914       }
2915       
2916     case STOP_FRAME:
2917       /* We've stripped the entire stack, the thread is now dead. */
2918       sp += sizeofW(StgStopFrame) - 1;
2919       sp[0] = (W_)exception;    /* save the exception */
2920       tso->what_next = ThreadKilled;
2921       tso->su = (StgUpdateFrame *)(sp+1);
2922       tso->sp = sp;
2923       return;
2924
2925     default:
2926       barf("raiseAsync");
2927     }
2928   }
2929   barf("raiseAsync");
2930 }
2931
2932 /* -----------------------------------------------------------------------------
2933    resurrectThreads is called after garbage collection on the list of
2934    threads found to be garbage.  Each of these threads will be woken
2935    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2936    on an MVar, or NonTermination if the thread was blocked on a Black
2937    Hole.
2938    -------------------------------------------------------------------------- */
2939
2940 void
2941 resurrectThreads( StgTSO *threads )
2942 {
2943   StgTSO *tso, *next;
2944
2945   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2946     next = tso->global_link;
2947     tso->global_link = all_threads;
2948     all_threads = tso;
2949     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2950
2951     switch (tso->why_blocked) {
2952     case BlockedOnMVar:
2953     case BlockedOnException:
2954       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2955       break;
2956     case BlockedOnBlackHole:
2957       raiseAsync(tso,(StgClosure *)NonTermination_closure);
2958       break;
2959     case NotBlocked:
2960       /* This might happen if the thread was blocked on a black hole
2961        * belonging to a thread that we've just woken up (raiseAsync
2962        * can wake up threads, remember...).
2963        */
2964       continue;
2965     default:
2966       barf("resurrectThreads: thread blocked in a strange way");
2967     }
2968   }
2969 }
2970
2971 /* -----------------------------------------------------------------------------
2972  * Blackhole detection: if we reach a deadlock, test whether any
2973  * threads are blocked on themselves.  Any threads which are found to
2974  * be self-blocked get sent a NonTermination exception.
2975  *
2976  * This is only done in a deadlock situation in order to avoid
2977  * performance overhead in the normal case.
2978  * -------------------------------------------------------------------------- */
2979
2980 static void
2981 detectBlackHoles( void )
2982 {
2983     StgTSO *t = all_threads;
2984     StgUpdateFrame *frame;
2985     StgClosure *blocked_on;
2986
2987     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2988
2989         if (t->why_blocked != BlockedOnBlackHole) {
2990             continue;
2991         }
2992
2993         blocked_on = t->block_info.closure;
2994
2995         for (frame = t->su; ; frame = frame->link) {
2996             switch (get_itbl(frame)->type) {
2997
2998             case UPDATE_FRAME:
2999                 if (frame->updatee == blocked_on) {
3000                     /* We are blocking on one of our own computations, so
3001                      * send this thread the NonTermination exception.  
3002                      */
3003                     IF_DEBUG(scheduler, 
3004                              sched_belch("thread %d is blocked on itself", t->id));
3005                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3006                     goto done;
3007                 }
3008                 else {
3009                     continue;
3010                 }
3011
3012             case CATCH_FRAME:
3013             case SEQ_FRAME:
3014                 continue;
3015                 
3016             case STOP_FRAME:
3017                 break;
3018             }
3019             break;
3020         }
3021
3022     done:
3023     }   
3024 }
3025
3026 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3027 //@subsection Debugging Routines
3028
3029 /* -----------------------------------------------------------------------------
3030    Debugging: why is a thread blocked
3031    -------------------------------------------------------------------------- */
3032
3033 #ifdef DEBUG
3034
3035 void
3036 printThreadBlockage(StgTSO *tso)
3037 {
3038   switch (tso->why_blocked) {
3039   case BlockedOnRead:
3040     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3041     break;
3042   case BlockedOnWrite:
3043     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3044     break;
3045   case BlockedOnDelay:
3046     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3047     break;
3048   case BlockedOnMVar:
3049     fprintf(stderr,"is blocked on an MVar");
3050     break;
3051   case BlockedOnException:
3052     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3053             tso->block_info.tso->id);
3054     break;
3055   case BlockedOnBlackHole:
3056     fprintf(stderr,"is blocked on a black hole");
3057     break;
3058   case NotBlocked:
3059     fprintf(stderr,"is not blocked");
3060     break;
3061 #if defined(PAR)
3062   case BlockedOnGA:
3063     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3064             tso->block_info.closure, info_type(tso->block_info.closure));
3065     break;
3066   case BlockedOnGA_NoSend:
3067     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3068             tso->block_info.closure, info_type(tso->block_info.closure));
3069     break;
3070 #endif
3071   default:
3072     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3073          tso->why_blocked, tso->id, tso);
3074   }
3075 }
3076
3077 void
3078 printThreadStatus(StgTSO *tso)
3079 {
3080   switch (tso->what_next) {
3081   case ThreadKilled:
3082     fprintf(stderr,"has been killed");
3083     break;
3084   case ThreadComplete:
3085     fprintf(stderr,"has completed");
3086     break;
3087   default:
3088     printThreadBlockage(tso);
3089   }
3090 }
3091
3092 void
3093 printAllThreads(void)
3094 {
3095   StgTSO *t;
3096
3097   sched_belch("all threads:");
3098   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3099     fprintf(stderr, "\tthread %d ", t->id);
3100     printThreadStatus(t);
3101     fprintf(stderr,"\n");
3102   }
3103 }
3104     
3105 /* 
3106    Print a whole blocking queue attached to node (debugging only).
3107 */
3108 //@cindex print_bq
3109 # if defined(PAR)
3110 void 
3111 print_bq (StgClosure *node)
3112 {
3113   StgBlockingQueueElement *bqe;
3114   StgTSO *tso;
3115   rtsBool end;
3116
3117   fprintf(stderr,"## BQ of closure %p (%s): ",
3118           node, info_type(node));
3119
3120   /* should cover all closures that may have a blocking queue */
3121   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3122          get_itbl(node)->type == FETCH_ME_BQ ||
3123          get_itbl(node)->type == RBH);
3124     
3125   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3126   /* 
3127      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3128   */
3129   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3130        !end; // iterate until bqe points to a CONSTR
3131        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3132     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3133     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
3134     /* types of closures that may appear in a blocking queue */
3135     ASSERT(get_itbl(bqe)->type == TSO ||           
3136            get_itbl(bqe)->type == BLOCKED_FETCH || 
3137            get_itbl(bqe)->type == CONSTR); 
3138     /* only BQs of an RBH end with an RBH_Save closure */
3139     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3140
3141     switch (get_itbl(bqe)->type) {
3142     case TSO:
3143       fprintf(stderr," TSO %d (%x),",
3144               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3145       break;
3146     case BLOCKED_FETCH:
3147       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3148               ((StgBlockedFetch *)bqe)->node, 
3149               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3150               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3151               ((StgBlockedFetch *)bqe)->ga.weight);
3152       break;
3153     case CONSTR:
3154       fprintf(stderr," %s (IP %p),",
3155               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3156                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3157                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3158                "RBH_Save_?"), get_itbl(bqe));
3159       break;
3160     default:
3161       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3162            info_type(bqe), node, info_type(node));
3163       break;
3164     }
3165   } /* for */
3166   fputc('\n', stderr);
3167 }
3168 # elif defined(GRAN)
3169 void 
3170 print_bq (StgClosure *node)
3171 {
3172   StgBlockingQueueElement *bqe;
3173   PEs node_loc, tso_loc;
3174   rtsBool end;
3175
3176   /* should cover all closures that may have a blocking queue */
3177   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3178          get_itbl(node)->type == FETCH_ME_BQ ||
3179          get_itbl(node)->type == RBH);
3180     
3181   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3182   node_loc = where_is(node);
3183
3184   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3185           node, info_type(node), node_loc);
3186
3187   /* 
3188      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3189   */
3190   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3191        !end; // iterate until bqe points to a CONSTR
3192        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3193     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3194     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3195     /* types of closures that may appear in a blocking queue */
3196     ASSERT(get_itbl(bqe)->type == TSO ||           
3197            get_itbl(bqe)->type == CONSTR); 
3198     /* only BQs of an RBH end with an RBH_Save closure */
3199     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3200
3201     tso_loc = where_is((StgClosure *)bqe);
3202     switch (get_itbl(bqe)->type) {
3203     case TSO:
3204       fprintf(stderr," TSO %d (%p) on [PE %d],",
3205               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3206       break;
3207     case CONSTR:
3208       fprintf(stderr," %s (IP %p),",
3209               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3210                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3211                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3212                "RBH_Save_?"), get_itbl(bqe));
3213       break;
3214     default:
3215       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3216            info_type((StgClosure *)bqe), node, info_type(node));
3217       break;
3218     }
3219   } /* for */
3220   fputc('\n', stderr);
3221 }
3222 #else
3223 /* 
3224    Nice and easy: only TSOs on the blocking queue
3225 */
3226 void 
3227 print_bq (StgClosure *node)
3228 {
3229   StgTSO *tso;
3230
3231   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3232   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3233        tso != END_TSO_QUEUE; 
3234        tso=tso->link) {
3235     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3236     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3237     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3238   }
3239   fputc('\n', stderr);
3240 }
3241 # endif
3242
3243 #if defined(PAR)
3244 static nat
3245 run_queue_len(void)
3246 {
3247   nat i;
3248   StgTSO *tso;
3249
3250   for (i=0, tso=run_queue_hd; 
3251        tso != END_TSO_QUEUE;
3252        i++, tso=tso->link)
3253     /* nothing */
3254
3255   return i;
3256 }
3257 #endif
3258
3259 static void
3260 sched_belch(char *s, ...)
3261 {
3262   va_list ap;
3263   va_start(ap,s);
3264 #ifdef SMP
3265   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3266 #else
3267   fprintf(stderr, "scheduler: ");
3268 #endif
3269   vfprintf(stderr, s, ap);
3270   fprintf(stderr, "\n");
3271 }
3272
3273 #endif /* DEBUG */
3274
3275
3276 //@node Index,  , Debugging Routines, Main scheduling code
3277 //@subsection Index
3278
3279 //@index
3280 //* MainRegTable::  @cindex\s-+MainRegTable
3281 //* StgMainThread::  @cindex\s-+StgMainThread
3282 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3283 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3284 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3285 //* context_switch::  @cindex\s-+context_switch
3286 //* createThread::  @cindex\s-+createThread
3287 //* free_capabilities::  @cindex\s-+free_capabilities
3288 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3289 //* initScheduler::  @cindex\s-+initScheduler
3290 //* interrupted::  @cindex\s-+interrupted
3291 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3292 //* next_thread_id::  @cindex\s-+next_thread_id
3293 //* print_bq::  @cindex\s-+print_bq
3294 //* run_queue_hd::  @cindex\s-+run_queue_hd
3295 //* run_queue_tl::  @cindex\s-+run_queue_tl
3296 //* sched_mutex::  @cindex\s-+sched_mutex
3297 //* schedule::  @cindex\s-+schedule
3298 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3299 //* task_ids::  @cindex\s-+task_ids
3300 //* term_mutex::  @cindex\s-+term_mutex
3301 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3302 //@end index