14be29bc165d8e487eccca2df569a2ad51b4f4bf
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.93 2001/03/02 16:15:53 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Main scheduling loop::      
41 //* Suspend and Resume::        
42 //* Run queue code::            
43 //* Garbage Collextion Routines::  
44 //* Blocking Queue Routines::   
45 //* Exception Handling Routines::  
46 //* Debugging Routines::        
47 //* Index::                     
48 //@end menu
49
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
52
53 #include "Rts.h"
54 #include "SchedAPI.h"
55 #include "RtsUtils.h"
56 #include "RtsFlags.h"
57 #include "Storage.h"
58 #include "StgRun.h"
59 #include "StgStartup.h"
60 #include "GC.h"
61 #include "Hooks.h"
62 #include "Schedule.h"
63 #include "StgMiscClosures.h"
64 #include "Storage.h"
65 #include "Interpreter.h"
66 #include "Exception.h"
67 #include "Printer.h"
68 #include "Main.h"
69 #include "Signals.h"
70 #include "Sanity.h"
71 #include "Stats.h"
72 #include "Itimer.h"
73 #include "Prelude.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
76 # include "GranSim.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
80 # include "FetchMe.h"
81 # include "HLC.h"
82 #endif
83 #include "Sparks.h"
84
85 #include <stdarg.h>
86
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
89
90 /* Main threads:
91  *
92  * These are the threads which clients have requested that we run.  
93  *
94  * In an SMP build, we might have several concurrent clients all
95  * waiting for results, and each one will wait on a condition variable
96  * until the result is available.
97  *
98  * In non-SMP, clients are strictly nested: the first client calls
99  * into the RTS, which might call out again to C with a _ccall_GC, and
100  * eventually re-enter the RTS.
101  *
102  * Main threads information is kept in a linked list:
103  */
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
106   StgTSO *         tso;
107   SchedulerStatus  stat;
108   StgClosure **    ret;
109 #ifdef SMP
110   pthread_cond_t wakeup;
111 #endif
112   struct StgMainThread_ *link;
113 } StgMainThread;
114
115 /* Main thread queue.
116  * Locks required: sched_mutex.
117  */
118 static StgMainThread *main_threads;
119
120 /* Thread queues.
121  * Locks required: sched_mutex.
122  */
123 #if defined(GRAN)
124
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
127
128 /* 
129    In GranSim we have a runable and a blocked queue for each processor.
130    In order to minimise code changes new arrays run_queue_hds/tls
131    are created. run_queue_hd is then a short cut (macro) for
132    run_queue_hds[CurrentProc] (see GranSim.h).
133    -- HWL
134 */
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139    the std RTS (i.e. we are cheating). However, we don't use this list in
140    the GranSim specific code at the moment (so we are only potentially
141    cheating).  */
142
143 #else /* !GRAN */
144
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
147 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
148
149 #endif
150
151 /* Linked list of all threads.
152  * Used for detecting garbage collected threads.
153  */
154 StgTSO *all_threads;
155
156 /* Threads suspended in _ccall_GC.
157  */
158 static StgTSO *suspended_ccalling_threads;
159
160 static void GetRoots(void);
161 static StgTSO *threadStackOverflow(StgTSO *tso);
162
163 /* KH: The following two flags are shared memory locations.  There is no need
164        to lock them, since they are only unset at the end of a scheduler
165        operation.
166 */
167
168 /* flag set by signal handler to precipitate a context switch */
169 //@cindex context_switch
170 nat context_switch;
171
172 /* if this flag is set as well, give up execution */
173 //@cindex interrupted
174 rtsBool interrupted;
175
176 /* Next thread ID to allocate.
177  * Locks required: sched_mutex
178  */
179 //@cindex next_thread_id
180 StgThreadID next_thread_id = 1;
181
182 /*
183  * Pointers to the state of the current thread.
184  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
185  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
186  */
187  
188 /* The smallest stack size that makes any sense is:
189  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
190  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
191  *  + 1                       (the realworld token for an IO thread)
192  *  + 1                       (the closure to enter)
193  *
194  * A thread with this stack will bomb immediately with a stack
195  * overflow, which will increase its stack size.  
196  */
197
198 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
199
200 /* Free capability list.
201  * Locks required: sched_mutex.
202  */
203 #ifdef SMP
204 //@cindex free_capabilities
205 //@cindex n_free_capabilities
206 Capability *free_capabilities; /* Available capabilities for running threads */
207 nat n_free_capabilities;       /* total number of available capabilities */
208 #else
209 //@cindex MainRegTable
210 Capability MainRegTable;       /* for non-SMP, we have one global capability */
211 #endif
212
213 #if defined(GRAN)
214 StgTSO *CurrentTSO;
215 #endif
216
217 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
218  *  exists - earlier gccs apparently didn't.
219  *  -= chak
220  */
221 StgTSO dummy_tso;
222
223 rtsBool ready_to_gc;
224
225 /* All our current task ids, saved in case we need to kill them later.
226  */
227 #ifdef SMP
228 //@cindex task_ids
229 task_info *task_ids;
230 #endif
231
232 void            addToBlockedQueue ( StgTSO *tso );
233
234 static void     schedule          ( void );
235        void     interruptStgRts   ( void );
236 #if defined(GRAN)
237 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
238 #else
239 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
240 #endif
241
242 static void     detectBlackHoles  ( void );
243
244 #ifdef DEBUG
245 static void sched_belch(char *s, ...);
246 #endif
247
248 #ifdef SMP
249 //@cindex sched_mutex
250 //@cindex term_mutex
251 //@cindex thread_ready_cond
252 //@cindex gc_pending_cond
253 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
254 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
255 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
256 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
257
258 nat await_death;
259 #endif
260
261 #if defined(PAR)
262 StgTSO *LastTSO;
263 rtsTime TimeOfLastYield;
264 #endif
265
266 #if DEBUG
267 char *whatNext_strs[] = {
268   "ThreadEnterGHC",
269   "ThreadRunGHC",
270   "ThreadEnterInterp",
271   "ThreadKilled",
272   "ThreadComplete"
273 };
274
275 char *threadReturnCode_strs[] = {
276   "HeapOverflow",                       /* might also be StackOverflow */
277   "StackOverflow",
278   "ThreadYielding",
279   "ThreadBlocked",
280   "ThreadFinished"
281 };
282 #endif
283
284 /*
285  * The thread state for the main thread.
286 // ToDo: check whether not needed any more
287 StgTSO   *MainTSO;
288  */
289
290 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
291 //@subsection Main scheduling loop
292
293 /* ---------------------------------------------------------------------------
294    Main scheduling loop.
295
296    We use round-robin scheduling, each thread returning to the
297    scheduler loop when one of these conditions is detected:
298
299       * out of heap space
300       * timer expires (thread yields)
301       * thread blocks
302       * thread ends
303       * stack overflow
304
305    Locking notes:  we acquire the scheduler lock once at the beginning
306    of the scheduler loop, and release it when
307     
308       * running a thread, or
309       * waiting for work, or
310       * waiting for a GC to complete.
311
312    GRAN version:
313      In a GranSim setup this loop iterates over the global event queue.
314      This revolves around the global event queue, which determines what 
315      to do next. Therefore, it's more complicated than either the 
316      concurrent or the parallel (GUM) setup.
317
318    GUM version:
319      GUM iterates over incoming messages.
320      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
321      and sends out a fish whenever it has nothing to do; in-between
322      doing the actual reductions (shared code below) it processes the
323      incoming messages and deals with delayed operations 
324      (see PendingFetches).
325      This is not the ugliest code you could imagine, but it's bloody close.
326
327    ------------------------------------------------------------------------ */
328 //@cindex schedule
329 static void
330 schedule( void )
331 {
332   StgTSO *t;
333   Capability *cap;
334   StgThreadReturnCode ret;
335 #if defined(GRAN)
336   rtsEvent *event;
337 #elif defined(PAR)
338   StgSparkPool *pool;
339   rtsSpark spark;
340   StgTSO *tso;
341   GlobalTaskId pe;
342 #endif
343   rtsBool was_interrupted = rtsFalse;
344   
345   ACQUIRE_LOCK(&sched_mutex);
346
347 #if defined(GRAN)
348
349   /* set up first event to get things going */
350   /* ToDo: assign costs for system setup and init MainTSO ! */
351   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
352             ContinueThread, 
353             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
354
355   IF_DEBUG(gran,
356            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
357            G_TSO(CurrentTSO, 5));
358
359   if (RtsFlags.GranFlags.Light) {
360     /* Save current time; GranSim Light only */
361     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
362   }      
363
364   event = get_next_event();
365
366   while (event!=(rtsEvent*)NULL) {
367     /* Choose the processor with the next event */
368     CurrentProc = event->proc;
369     CurrentTSO = event->tso;
370
371 #elif defined(PAR)
372
373   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
374
375 #else
376
377   while (1) {
378
379 #endif
380
381     IF_DEBUG(scheduler, printAllThreads());
382
383     /* If we're interrupted (the user pressed ^C, or some other
384      * termination condition occurred), kill all the currently running
385      * threads.
386      */
387     if (interrupted) {
388       IF_DEBUG(scheduler, sched_belch("interrupted"));
389       deleteAllThreads();
390       interrupted = rtsFalse;
391       was_interrupted = rtsTrue;
392     }
393
394     /* Go through the list of main threads and wake up any
395      * clients whose computations have finished.  ToDo: this
396      * should be done more efficiently without a linear scan
397      * of the main threads list, somehow...
398      */
399 #ifdef SMP
400     { 
401       StgMainThread *m, **prev;
402       prev = &main_threads;
403       for (m = main_threads; m != NULL; m = m->link) {
404         switch (m->tso->what_next) {
405         case ThreadComplete:
406           if (m->ret) {
407             *(m->ret) = (StgClosure *)m->tso->sp[0];
408           }
409           *prev = m->link;
410           m->stat = Success;
411           pthread_cond_broadcast(&m->wakeup);
412           break;
413         case ThreadKilled:
414           *prev = m->link;
415           if (was_interrupted) {
416             m->stat = Interrupted;
417           } else {
418             m->stat = Killed;
419           }
420           pthread_cond_broadcast(&m->wakeup);
421           break;
422         default:
423           break;
424         }
425       }
426     }
427
428 #else
429 # if defined(PAR)
430     /* in GUM do this only on the Main PE */
431     if (IAmMainThread)
432 # endif
433     /* If our main thread has finished or been killed, return.
434      */
435     {
436       StgMainThread *m = main_threads;
437       if (m->tso->what_next == ThreadComplete
438           || m->tso->what_next == ThreadKilled) {
439         main_threads = main_threads->link;
440         if (m->tso->what_next == ThreadComplete) {
441           /* we finished successfully, fill in the return value */
442           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
443           m->stat = Success;
444           return;
445         } else {
446           if (was_interrupted) {
447             m->stat = Interrupted;
448           } else {
449             m->stat = Killed;
450           }
451           return;
452         }
453       }
454     }
455 #endif
456
457     /* Top up the run queue from our spark pool.  We try to make the
458      * number of threads in the run queue equal to the number of
459      * free capabilities.
460      */
461 #if defined(SMP)
462     {
463       nat n = n_free_capabilities;
464       StgTSO *tso = run_queue_hd;
465
466       /* Count the run queue */
467       while (n > 0 && tso != END_TSO_QUEUE) {
468         tso = tso->link;
469         n--;
470       }
471
472       for (; n > 0; n--) {
473         StgClosure *spark;
474         spark = findSpark();
475         if (spark == NULL) {
476           break; /* no more sparks in the pool */
477         } else {
478           /* I'd prefer this to be done in activateSpark -- HWL */
479           /* tricky - it needs to hold the scheduler lock and
480            * not try to re-acquire it -- SDM */
481           StgTSO *tso;
482           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
483           pushClosure(tso,spark);
484           PUSH_ON_RUN_QUEUE(tso);
485 #ifdef PAR
486           advisory_thread_count++;
487 #endif
488           
489           IF_DEBUG(scheduler,
490                    sched_belch("turning spark of closure %p into a thread",
491                                (StgClosure *)spark));
492         }
493       }
494       /* We need to wake up the other tasks if we just created some
495        * work for them.
496        */
497       if (n_free_capabilities - n > 1) {
498           pthread_cond_signal(&thread_ready_cond);
499       }
500     }
501 #endif /* SMP */
502
503     /* Check whether any waiting threads need to be woken up.  If the
504      * run queue is empty, and there are no other tasks running, we
505      * can wait indefinitely for something to happen.
506      * ToDo: what if another client comes along & requests another
507      * main thread?
508      */
509     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
510       awaitEvent(
511            (run_queue_hd == END_TSO_QUEUE)
512 #ifdef SMP
513         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
514 #endif
515         );
516     }
517     /* we can be interrupted while waiting for I/O... */
518     if (interrupted) continue;
519
520     /* check for signals each time around the scheduler */
521 #ifndef mingw32_TARGET_OS
522     if (signals_pending()) {
523       start_signal_handlers();
524     }
525 #endif
526
527     /* 
528      * Detect deadlock: when we have no threads to run, there are no
529      * threads waiting on I/O or sleeping, and all the other tasks are
530      * waiting for work, we must have a deadlock of some description.
531      *
532      * We first try to find threads blocked on themselves (ie. black
533      * holes), and generate NonTermination exceptions where necessary.
534      *
535      * If no threads are black holed, we have a deadlock situation, so
536      * inform all the main threads.
537      */
538 #ifdef SMP
539     if (blocked_queue_hd == END_TSO_QUEUE
540         && run_queue_hd == END_TSO_QUEUE
541         && sleeping_queue == END_TSO_QUEUE
542         && (n_free_capabilities == RtsFlags.ParFlags.nNodes))
543     {
544         IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes..."));
545         detectBlackHoles();
546         if (run_queue_hd == END_TSO_QUEUE) {
547             StgMainThread *m;
548             for (m = main_threads; m != NULL; m = m->link) {
549                 m->ret = NULL;
550                 m->stat = Deadlock;
551                 pthread_cond_broadcast(&m->wakeup);
552             }
553             main_threads = NULL;
554         }
555     }
556 #else /* ! SMP */
557     if (blocked_queue_hd == END_TSO_QUEUE
558         && run_queue_hd == END_TSO_QUEUE
559         && sleeping_queue == END_TSO_QUEUE)
560     {
561         IF_DEBUG(scheduler, sched_belch("deadlocked, checking for black holes..."));
562         detectBlackHoles();
563         if (run_queue_hd == END_TSO_QUEUE) {
564             StgMainThread *m = main_threads;
565             m->ret = NULL;
566             m->stat = Deadlock;
567             main_threads = m->link;
568             return;
569         }
570     }
571 #endif
572
573 #ifdef SMP
574     /* If there's a GC pending, don't do anything until it has
575      * completed.
576      */
577     if (ready_to_gc) {
578       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
579       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
580     }
581     
582     /* block until we've got a thread on the run queue and a free
583      * capability.
584      */
585     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
586       IF_DEBUG(scheduler, sched_belch("waiting for work"));
587       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
588       IF_DEBUG(scheduler, sched_belch("work now available"));
589     }
590 #endif
591
592 #if defined(GRAN)
593
594     if (RtsFlags.GranFlags.Light)
595       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
596
597     /* adjust time based on time-stamp */
598     if (event->time > CurrentTime[CurrentProc] &&
599         event->evttype != ContinueThread)
600       CurrentTime[CurrentProc] = event->time;
601     
602     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
603     if (!RtsFlags.GranFlags.Light)
604       handleIdlePEs();
605
606     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
607
608     /* main event dispatcher in GranSim */
609     switch (event->evttype) {
610       /* Should just be continuing execution */
611     case ContinueThread:
612       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
613       /* ToDo: check assertion
614       ASSERT(run_queue_hd != (StgTSO*)NULL &&
615              run_queue_hd != END_TSO_QUEUE);
616       */
617       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
618       if (!RtsFlags.GranFlags.DoAsyncFetch &&
619           procStatus[CurrentProc]==Fetching) {
620         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
621               CurrentTSO->id, CurrentTSO, CurrentProc);
622         goto next_thread;
623       } 
624       /* Ignore ContinueThreads for completed threads */
625       if (CurrentTSO->what_next == ThreadComplete) {
626         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
627               CurrentTSO->id, CurrentTSO, CurrentProc);
628         goto next_thread;
629       } 
630       /* Ignore ContinueThreads for threads that are being migrated */
631       if (PROCS(CurrentTSO)==Nowhere) { 
632         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
633               CurrentTSO->id, CurrentTSO, CurrentProc);
634         goto next_thread;
635       }
636       /* The thread should be at the beginning of the run queue */
637       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
638         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
639               CurrentTSO->id, CurrentTSO, CurrentProc);
640         break; // run the thread anyway
641       }
642       /*
643       new_event(proc, proc, CurrentTime[proc],
644                 FindWork,
645                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
646       goto next_thread; 
647       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
648       break; // now actually run the thread; DaH Qu'vam yImuHbej 
649
650     case FetchNode:
651       do_the_fetchnode(event);
652       goto next_thread;             /* handle next event in event queue  */
653       
654     case GlobalBlock:
655       do_the_globalblock(event);
656       goto next_thread;             /* handle next event in event queue  */
657       
658     case FetchReply:
659       do_the_fetchreply(event);
660       goto next_thread;             /* handle next event in event queue  */
661       
662     case UnblockThread:   /* Move from the blocked queue to the tail of */
663       do_the_unblock(event);
664       goto next_thread;             /* handle next event in event queue  */
665       
666     case ResumeThread:  /* Move from the blocked queue to the tail of */
667       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
668       event->tso->gran.blocktime += 
669         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
670       do_the_startthread(event);
671       goto next_thread;             /* handle next event in event queue  */
672       
673     case StartThread:
674       do_the_startthread(event);
675       goto next_thread;             /* handle next event in event queue  */
676       
677     case MoveThread:
678       do_the_movethread(event);
679       goto next_thread;             /* handle next event in event queue  */
680       
681     case MoveSpark:
682       do_the_movespark(event);
683       goto next_thread;             /* handle next event in event queue  */
684       
685     case FindWork:
686       do_the_findwork(event);
687       goto next_thread;             /* handle next event in event queue  */
688       
689     default:
690       barf("Illegal event type %u\n", event->evttype);
691     }  /* switch */
692     
693     /* This point was scheduler_loop in the old RTS */
694
695     IF_DEBUG(gran, belch("GRAN: after main switch"));
696
697     TimeOfLastEvent = CurrentTime[CurrentProc];
698     TimeOfNextEvent = get_time_of_next_event();
699     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
700     // CurrentTSO = ThreadQueueHd;
701
702     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
703                          TimeOfNextEvent));
704
705     if (RtsFlags.GranFlags.Light) 
706       GranSimLight_leave_system(event, &ActiveTSO); 
707
708     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
709
710     IF_DEBUG(gran, 
711              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
712
713     /* in a GranSim setup the TSO stays on the run queue */
714     t = CurrentTSO;
715     /* Take a thread from the run queue. */
716     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
717
718     IF_DEBUG(gran, 
719              fprintf(stderr, "GRAN: About to run current thread, which is\n");
720              G_TSO(t,5))
721
722     context_switch = 0; // turned on via GranYield, checking events and time slice
723
724     IF_DEBUG(gran, 
725              DumpGranEvent(GR_SCHEDULE, t));
726
727     procStatus[CurrentProc] = Busy;
728
729 #elif defined(PAR)
730
731     if (PendingFetches != END_BF_QUEUE) {
732         processFetches();
733     }
734
735     /* ToDo: phps merge with spark activation above */
736     /* check whether we have local work and send requests if we have none */
737     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
738       /* :-[  no local threads => look out for local sparks */
739       /* the spark pool for the current PE */
740       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
741       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
742           pool->hd < pool->tl) {
743         /* 
744          * ToDo: add GC code check that we really have enough heap afterwards!!
745          * Old comment:
746          * If we're here (no runnable threads) and we have pending
747          * sparks, we must have a space problem.  Get enough space
748          * to turn one of those pending sparks into a
749          * thread... 
750          */
751         
752         spark = findSpark();                /* get a spark */
753         if (spark != (rtsSpark) NULL) {
754           tso = activateSpark(spark);       /* turn the spark into a thread */
755           IF_PAR_DEBUG(schedule,
756                        belch("==== schedule: Created TSO %d (%p); %d threads active",
757                              tso->id, tso, advisory_thread_count));
758
759           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
760             belch("==^^ failed to activate spark");
761             goto next_thread;
762           }               /* otherwise fall through & pick-up new tso */
763         } else {
764           IF_PAR_DEBUG(verbose,
765                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
766                              spark_queue_len(pool)));
767           goto next_thread;
768         }
769       } else  
770       /* =8-[  no local sparks => look for work on other PEs */
771       {
772         /*
773          * We really have absolutely no work.  Send out a fish
774          * (there may be some out there already), and wait for
775          * something to arrive.  We clearly can't run any threads
776          * until a SCHEDULE or RESUME arrives, and so that's what
777          * we're hoping to see.  (Of course, we still have to
778          * respond to other types of messages.)
779          */
780         if (//!fishing &&  
781             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
782           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
783           /* fishing set in sendFish, processFish;
784              avoid flooding system with fishes via delay */
785           pe = choosePE();
786           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
787                    NEW_FISH_HUNGER);
788         }
789         
790         processMessages();
791         goto next_thread;
792         // ReSchedule(0);
793       }
794     } else if (PacketsWaiting()) {  /* Look for incoming messages */
795       processMessages();
796     }
797
798     /* Now we are sure that we have some work available */
799     ASSERT(run_queue_hd != END_TSO_QUEUE);
800     /* Take a thread from the run queue, if we have work */
801     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
802
803     /* ToDo: write something to the log-file
804     if (RTSflags.ParFlags.granSimStats && !sameThread)
805         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
806
807     CurrentTSO = t;
808     */
809     /* the spark pool for the current PE */
810     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
811
812     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)", 
813                               spark_queue_len(pool), 
814                               CURRENT_PROC,
815                               pool->hd, pool->tl, pool->base, pool->lim));
816
817     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
818                               run_queue_len(), CURRENT_PROC,
819                               run_queue_hd, run_queue_tl));
820
821 #if 0
822     if (t != LastTSO) {
823       /* 
824          we are running a different TSO, so write a schedule event to log file
825          NB: If we use fair scheduling we also have to write  a deschedule 
826              event for LastTSO; with unfair scheduling we know that the
827              previous tso has blocked whenever we switch to another tso, so
828              we don't need it in GUM for now
829       */
830       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
831                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
832       
833     }
834 #endif
835 #else /* !GRAN && !PAR */
836   
837     /* grab a thread from the run queue
838      */
839     ASSERT(run_queue_hd != END_TSO_QUEUE);
840     t = POP_RUN_QUEUE();
841     IF_DEBUG(sanity,checkTSO(t));
842
843 #endif
844     
845     /* grab a capability
846      */
847 #ifdef SMP
848     cap = free_capabilities;
849     free_capabilities = cap->link;
850     n_free_capabilities--;
851 #else
852     cap = &MainRegTable;
853 #endif
854     
855     cap->rCurrentTSO = t;
856     
857     /* context switches are now initiated by the timer signal, unless
858      * the user specified "context switch as often as possible", with
859      * +RTS -C0
860      */
861     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
862         && (run_queue_hd != END_TSO_QUEUE
863             || blocked_queue_hd != END_TSO_QUEUE
864             || sleeping_queue != END_TSO_QUEUE))
865         context_switch = 1;
866     else
867         context_switch = 0;
868
869     RELEASE_LOCK(&sched_mutex);
870
871     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
872                               t->id, t, whatNext_strs[t->what_next]));
873
874     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
875     /* Run the current thread 
876      */
877     switch (cap->rCurrentTSO->what_next) {
878     case ThreadKilled:
879     case ThreadComplete:
880         /* Thread already finished, return to scheduler. */
881         ret = ThreadFinished;
882         break;
883     case ThreadEnterGHC:
884         ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
885         break;
886     case ThreadRunGHC:
887         ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
888         break;
889     case ThreadEnterInterp:
890         ret = interpretBCO(cap);
891         break;
892     default:
893       barf("schedule: invalid what_next field");
894     }
895     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
896     
897     /* Costs for the scheduler are assigned to CCS_SYSTEM */
898 #ifdef PROFILING
899     CCCS = CCS_SYSTEM;
900 #endif
901     
902     ACQUIRE_LOCK(&sched_mutex);
903
904 #ifdef SMP
905     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
906 #elif !defined(GRAN) && !defined(PAR)
907     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
908 #endif
909     t = cap->rCurrentTSO;
910     
911 #if defined(PAR)
912     /* HACK 675: if the last thread didn't yield, make sure to print a 
913        SCHEDULE event to the log file when StgRunning the next thread, even
914        if it is the same one as before */
915     LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t; 
916     TimeOfLastYield = CURRENT_TIME;
917 #endif
918
919     switch (ret) {
920     case HeapOverflow:
921       /* make all the running tasks block on a condition variable,
922        * maybe set context_switch and wait till they all pile in,
923        * then have them wait on a GC condition variable.
924        */
925       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
926                                t->id, t, whatNext_strs[t->what_next]));
927       threadPaused(t);
928 #if defined(GRAN)
929       ASSERT(!is_on_queue(t,CurrentProc));
930 #endif
931       
932       ready_to_gc = rtsTrue;
933       context_switch = 1;               /* stop other threads ASAP */
934       PUSH_ON_RUN_QUEUE(t);
935       /* actual GC is done at the end of the while loop */
936       break;
937       
938     case StackOverflow:
939       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
940                                t->id, t, whatNext_strs[t->what_next]));
941       /* just adjust the stack for this thread, then pop it back
942        * on the run queue.
943        */
944       threadPaused(t);
945       { 
946         StgMainThread *m;
947         /* enlarge the stack */
948         StgTSO *new_t = threadStackOverflow(t);
949         
950         /* This TSO has moved, so update any pointers to it from the
951          * main thread stack.  It better not be on any other queues...
952          * (it shouldn't be).
953          */
954         for (m = main_threads; m != NULL; m = m->link) {
955           if (m->tso == t) {
956             m->tso = new_t;
957           }
958         }
959         threadPaused(new_t);
960         PUSH_ON_RUN_QUEUE(new_t);
961       }
962       break;
963
964     case ThreadYielding:
965 #if defined(GRAN)
966       IF_DEBUG(gran, 
967                DumpGranEvent(GR_DESCHEDULE, t));
968       globalGranStats.tot_yields++;
969 #elif defined(PAR)
970       IF_DEBUG(par, 
971                DumpGranEvent(GR_DESCHEDULE, t));
972 #endif
973       /* put the thread back on the run queue.  Then, if we're ready to
974        * GC, check whether this is the last task to stop.  If so, wake
975        * up the GC thread.  getThread will block during a GC until the
976        * GC is finished.
977        */
978       IF_DEBUG(scheduler,
979                if (t->what_next == ThreadEnterInterp) {
980                    /* ToDo: or maybe a timer expired when we were in Hugs?
981                     * or maybe someone hit ctrl-C
982                     */
983                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
984                          t->id, t, whatNext_strs[t->what_next]);
985                } else {
986                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
987                          t->id, t, whatNext_strs[t->what_next]);
988                }
989                );
990
991       threadPaused(t);
992
993       IF_DEBUG(sanity,
994                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
995                checkTSO(t));
996       ASSERT(t->link == END_TSO_QUEUE);
997 #if defined(GRAN)
998       ASSERT(!is_on_queue(t,CurrentProc));
999
1000       IF_DEBUG(sanity,
1001                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1002                checkThreadQsSanity(rtsTrue));
1003 #endif
1004       APPEND_TO_RUN_QUEUE(t);
1005 #if defined(GRAN)
1006       /* add a ContinueThread event to actually process the thread */
1007       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1008                 ContinueThread,
1009                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1010       IF_GRAN_DEBUG(bq, 
1011                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1012                G_EVENTQ(0);
1013                G_CURR_THREADQ(0))
1014 #endif /* GRAN */
1015       break;
1016       
1017     case ThreadBlocked:
1018 #if defined(GRAN)
1019       IF_DEBUG(scheduler,
1020                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1021                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1022                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1023
1024       // ??? needed; should emit block before
1025       IF_DEBUG(gran, 
1026                DumpGranEvent(GR_DESCHEDULE, t)); 
1027       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1028       /*
1029         ngoq Dogh!
1030       ASSERT(procStatus[CurrentProc]==Busy || 
1031               ((procStatus[CurrentProc]==Fetching) && 
1032               (t->block_info.closure!=(StgClosure*)NULL)));
1033       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1034           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1035             procStatus[CurrentProc]==Fetching)) 
1036         procStatus[CurrentProc] = Idle;
1037       */
1038 #elif defined(PAR)
1039       IF_DEBUG(par, 
1040                DumpGranEvent(GR_DESCHEDULE, t)); 
1041
1042       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1043       blockThread(t);
1044
1045       IF_DEBUG(scheduler,
1046                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1047                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1048                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1049
1050 #else /* !GRAN */
1051       /* don't need to do anything.  Either the thread is blocked on
1052        * I/O, in which case we'll have called addToBlockedQueue
1053        * previously, or it's blocked on an MVar or Blackhole, in which
1054        * case it'll be on the relevant queue already.
1055        */
1056       IF_DEBUG(scheduler,
1057                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1058                printThreadBlockage(t);
1059                fprintf(stderr, "\n"));
1060
1061       /* Only for dumping event to log file 
1062          ToDo: do I need this in GranSim, too?
1063       blockThread(t);
1064       */
1065 #endif
1066       threadPaused(t);
1067       break;
1068       
1069     case ThreadFinished:
1070       /* Need to check whether this was a main thread, and if so, signal
1071        * the task that started it with the return value.  If we have no
1072        * more main threads, we probably need to stop all the tasks until
1073        * we get a new one.
1074        */
1075       /* We also end up here if the thread kills itself with an
1076        * uncaught exception, see Exception.hc.
1077        */
1078       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1079 #if defined(GRAN)
1080       endThread(t, CurrentProc); // clean-up the thread
1081 #elif defined(PAR)
1082       advisory_thread_count--;
1083       if (RtsFlags.ParFlags.ParStats.Full) 
1084         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1085 #endif
1086       break;
1087       
1088     default:
1089       barf("schedule: invalid thread return code %d", (int)ret);
1090     }
1091     
1092 #ifdef SMP
1093     cap->link = free_capabilities;
1094     free_capabilities = cap;
1095     n_free_capabilities++;
1096 #endif
1097
1098 #ifdef SMP
1099     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1100 #else
1101     if (ready_to_gc) 
1102 #endif
1103       {
1104       /* everybody back, start the GC.
1105        * Could do it in this thread, or signal a condition var
1106        * to do it in another thread.  Either way, we need to
1107        * broadcast on gc_pending_cond afterward.
1108        */
1109 #ifdef SMP
1110       IF_DEBUG(scheduler,sched_belch("doing GC"));
1111 #endif
1112       GarbageCollect(GetRoots,rtsFalse);
1113       ready_to_gc = rtsFalse;
1114 #ifdef SMP
1115       pthread_cond_broadcast(&gc_pending_cond);
1116 #endif
1117 #if defined(GRAN)
1118       /* add a ContinueThread event to continue execution of current thread */
1119       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1120                 ContinueThread,
1121                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1122       IF_GRAN_DEBUG(bq, 
1123                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1124                G_EVENTQ(0);
1125                G_CURR_THREADQ(0))
1126 #endif /* GRAN */
1127     }
1128 #if defined(GRAN)
1129   next_thread:
1130     IF_GRAN_DEBUG(unused,
1131                   print_eventq(EventHd));
1132
1133     event = get_next_event();
1134
1135 #elif defined(PAR)
1136   next_thread:
1137     /* ToDo: wait for next message to arrive rather than busy wait */
1138
1139 #else /* GRAN */
1140   /* not any more
1141   next_thread:
1142     t = take_off_run_queue(END_TSO_QUEUE);
1143   */
1144 #endif /* GRAN */
1145   } /* end of while(1) */
1146 }
1147
1148 /* ---------------------------------------------------------------------------
1149  * deleteAllThreads():  kill all the live threads.
1150  *
1151  * This is used when we catch a user interrupt (^C), before performing
1152  * any necessary cleanups and running finalizers.
1153  * ------------------------------------------------------------------------- */
1154    
1155 void deleteAllThreads ( void )
1156 {
1157   StgTSO* t;
1158   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1159   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1160       deleteThread(t);
1161   }
1162   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1163       deleteThread(t);
1164   }
1165   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1166       deleteThread(t);
1167   }
1168   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1169   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1170   sleeping_queue = END_TSO_QUEUE;
1171 }
1172
1173 /* startThread and  insertThread are now in GranSim.c -- HWL */
1174
1175 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1176 //@subsection Suspend and Resume
1177
1178 /* ---------------------------------------------------------------------------
1179  * Suspending & resuming Haskell threads.
1180  * 
1181  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1182  * its capability before calling the C function.  This allows another
1183  * task to pick up the capability and carry on running Haskell
1184  * threads.  It also means that if the C call blocks, it won't lock
1185  * the whole system.
1186  *
1187  * The Haskell thread making the C call is put to sleep for the
1188  * duration of the call, on the susepended_ccalling_threads queue.  We
1189  * give out a token to the task, which it can use to resume the thread
1190  * on return from the C function.
1191  * ------------------------------------------------------------------------- */
1192    
1193 StgInt
1194 suspendThread( Capability *cap )
1195 {
1196   nat tok;
1197
1198   ACQUIRE_LOCK(&sched_mutex);
1199
1200   IF_DEBUG(scheduler,
1201            sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id));
1202
1203   threadPaused(cap->rCurrentTSO);
1204   cap->rCurrentTSO->link = suspended_ccalling_threads;
1205   suspended_ccalling_threads = cap->rCurrentTSO;
1206
1207   /* Use the thread ID as the token; it should be unique */
1208   tok = cap->rCurrentTSO->id;
1209
1210 #ifdef SMP
1211   cap->link = free_capabilities;
1212   free_capabilities = cap;
1213   n_free_capabilities++;
1214 #endif
1215
1216   RELEASE_LOCK(&sched_mutex);
1217   return tok; 
1218 }
1219
1220 Capability *
1221 resumeThread( StgInt tok )
1222 {
1223   StgTSO *tso, **prev;
1224   Capability *cap;
1225
1226   ACQUIRE_LOCK(&sched_mutex);
1227
1228   prev = &suspended_ccalling_threads;
1229   for (tso = suspended_ccalling_threads; 
1230        tso != END_TSO_QUEUE; 
1231        prev = &tso->link, tso = tso->link) {
1232     if (tso->id == (StgThreadID)tok) {
1233       *prev = tso->link;
1234       break;
1235     }
1236   }
1237   if (tso == END_TSO_QUEUE) {
1238     barf("resumeThread: thread not found");
1239   }
1240   tso->link = END_TSO_QUEUE;
1241
1242 #ifdef SMP
1243   while (free_capabilities == NULL) {
1244     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1245     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1246     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1247   }
1248   cap = free_capabilities;
1249   free_capabilities = cap->link;
1250   n_free_capabilities--;
1251 #else  
1252   cap = &MainRegTable;
1253 #endif
1254
1255   cap->rCurrentTSO = tso;
1256
1257   RELEASE_LOCK(&sched_mutex);
1258   return cap;
1259 }
1260
1261
1262 /* ---------------------------------------------------------------------------
1263  * Static functions
1264  * ------------------------------------------------------------------------ */
1265 static void unblockThread(StgTSO *tso);
1266
1267 /* ---------------------------------------------------------------------------
1268  * Comparing Thread ids.
1269  *
1270  * This is used from STG land in the implementation of the
1271  * instances of Eq/Ord for ThreadIds.
1272  * ------------------------------------------------------------------------ */
1273
1274 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1275
1276   StgThreadID id1 = tso1->id; 
1277   StgThreadID id2 = tso2->id;
1278  
1279   if (id1 < id2) return (-1);
1280   if (id1 > id2) return 1;
1281   return 0;
1282 }
1283
1284 /* ---------------------------------------------------------------------------
1285    Create a new thread.
1286
1287    The new thread starts with the given stack size.  Before the
1288    scheduler can run, however, this thread needs to have a closure
1289    (and possibly some arguments) pushed on its stack.  See
1290    pushClosure() in Schedule.h.
1291
1292    createGenThread() and createIOThread() (in SchedAPI.h) are
1293    convenient packaged versions of this function.
1294
1295    currently pri (priority) is only used in a GRAN setup -- HWL
1296    ------------------------------------------------------------------------ */
1297 //@cindex createThread
1298 #if defined(GRAN)
1299 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1300 StgTSO *
1301 createThread(nat stack_size, StgInt pri)
1302 {
1303   return createThread_(stack_size, rtsFalse, pri);
1304 }
1305
1306 static StgTSO *
1307 createThread_(nat size, rtsBool have_lock, StgInt pri)
1308 {
1309 #else
1310 StgTSO *
1311 createThread(nat stack_size)
1312 {
1313   return createThread_(stack_size, rtsFalse);
1314 }
1315
1316 static StgTSO *
1317 createThread_(nat size, rtsBool have_lock)
1318 {
1319 #endif
1320
1321     StgTSO *tso;
1322     nat stack_size;
1323
1324     /* First check whether we should create a thread at all */
1325 #if defined(PAR)
1326   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1327   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1328     threadsIgnored++;
1329     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1330           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1331     return END_TSO_QUEUE;
1332   }
1333   threadsCreated++;
1334 #endif
1335
1336 #if defined(GRAN)
1337   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1338 #endif
1339
1340   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1341
1342   /* catch ridiculously small stack sizes */
1343   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1344     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1345   }
1346
1347   stack_size = size - TSO_STRUCT_SIZEW;
1348
1349   tso = (StgTSO *)allocate(size);
1350   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1351
1352   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1353 #if defined(GRAN)
1354   SET_GRAN_HDR(tso, ThisPE);
1355 #endif
1356   tso->what_next     = ThreadEnterGHC;
1357
1358   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1359    * protect the increment operation on next_thread_id.
1360    * In future, we could use an atomic increment instead.
1361    */
1362   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1363   tso->id = next_thread_id++; 
1364   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1365
1366   tso->why_blocked  = NotBlocked;
1367   tso->blocked_exceptions = NULL;
1368
1369   tso->stack_size   = stack_size;
1370   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1371                               - TSO_STRUCT_SIZEW;
1372   tso->sp           = (P_)&(tso->stack) + stack_size;
1373
1374 #ifdef PROFILING
1375   tso->prof.CCCS = CCS_MAIN;
1376 #endif
1377
1378   /* put a stop frame on the stack */
1379   tso->sp -= sizeofW(StgStopFrame);
1380   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1381   tso->su = (StgUpdateFrame*)tso->sp;
1382
1383   // ToDo: check this
1384 #if defined(GRAN)
1385   tso->link = END_TSO_QUEUE;
1386   /* uses more flexible routine in GranSim */
1387   insertThread(tso, CurrentProc);
1388 #else
1389   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1390    * from its creation
1391    */
1392 #endif
1393
1394 #if defined(GRAN) || defined(PAR)
1395   DumpGranEvent(GR_START,tso);
1396 #endif
1397
1398   /* Link the new thread on the global thread list.
1399    */
1400   tso->global_link = all_threads;
1401   all_threads = tso;
1402
1403 #if defined(GRAN)
1404   tso->gran.pri = pri;
1405 # if defined(DEBUG)
1406   tso->gran.magic = TSO_MAGIC; // debugging only
1407 # endif
1408   tso->gran.sparkname   = 0;
1409   tso->gran.startedat   = CURRENT_TIME; 
1410   tso->gran.exported    = 0;
1411   tso->gran.basicblocks = 0;
1412   tso->gran.allocs      = 0;
1413   tso->gran.exectime    = 0;
1414   tso->gran.fetchtime   = 0;
1415   tso->gran.fetchcount  = 0;
1416   tso->gran.blocktime   = 0;
1417   tso->gran.blockcount  = 0;
1418   tso->gran.blockedat   = 0;
1419   tso->gran.globalsparks = 0;
1420   tso->gran.localsparks  = 0;
1421   if (RtsFlags.GranFlags.Light)
1422     tso->gran.clock  = Now; /* local clock */
1423   else
1424     tso->gran.clock  = 0;
1425
1426   IF_DEBUG(gran,printTSO(tso));
1427 #elif defined(PAR)
1428 # if defined(DEBUG)
1429   tso->par.magic = TSO_MAGIC; // debugging only
1430 # endif
1431   tso->par.sparkname   = 0;
1432   tso->par.startedat   = CURRENT_TIME; 
1433   tso->par.exported    = 0;
1434   tso->par.basicblocks = 0;
1435   tso->par.allocs      = 0;
1436   tso->par.exectime    = 0;
1437   tso->par.fetchtime   = 0;
1438   tso->par.fetchcount  = 0;
1439   tso->par.blocktime   = 0;
1440   tso->par.blockcount  = 0;
1441   tso->par.blockedat   = 0;
1442   tso->par.globalsparks = 0;
1443   tso->par.localsparks  = 0;
1444 #endif
1445
1446 #if defined(GRAN)
1447   globalGranStats.tot_threads_created++;
1448   globalGranStats.threads_created_on_PE[CurrentProc]++;
1449   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1450   globalGranStats.tot_sq_probes++;
1451 #endif 
1452
1453 #if defined(GRAN)
1454   IF_GRAN_DEBUG(pri,
1455                 belch("==__ schedule: Created TSO %d (%p);",
1456                       CurrentProc, tso, tso->id));
1457 #elif defined(PAR)
1458     IF_PAR_DEBUG(verbose,
1459                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1460                        tso->id, tso, advisory_thread_count));
1461 #else
1462   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1463                                  tso->id, tso->stack_size));
1464 #endif    
1465   return tso;
1466 }
1467
1468 /*
1469   Turn a spark into a thread.
1470   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1471 */
1472 #if defined(PAR)
1473 //@cindex activateSpark
1474 StgTSO *
1475 activateSpark (rtsSpark spark) 
1476 {
1477   StgTSO *tso;
1478   
1479   ASSERT(spark != (rtsSpark)NULL);
1480   tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1481   if (tso!=END_TSO_QUEUE) {
1482     pushClosure(tso,spark);
1483     PUSH_ON_RUN_QUEUE(tso);
1484     advisory_thread_count++;
1485
1486     if (RtsFlags.ParFlags.ParStats.Full) {
1487       //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1488       IF_PAR_DEBUG(verbose,
1489                    belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1490                          (StgClosure *)spark, info_type((StgClosure *)spark)));
1491     }
1492   } else {
1493     barf("activateSpark: Cannot create TSO");
1494   }
1495   // ToDo: fwd info on local/global spark to thread -- HWL
1496   // tso->gran.exported =  spark->exported;
1497   // tso->gran.locked =   !spark->global;
1498   // tso->gran.sparkname = spark->name;
1499
1500   return tso;
1501 }
1502 #endif
1503
1504 /* ---------------------------------------------------------------------------
1505  * scheduleThread()
1506  *
1507  * scheduleThread puts a thread on the head of the runnable queue.
1508  * This will usually be done immediately after a thread is created.
1509  * The caller of scheduleThread must create the thread using e.g.
1510  * createThread and push an appropriate closure
1511  * on this thread's stack before the scheduler is invoked.
1512  * ------------------------------------------------------------------------ */
1513
1514 void
1515 scheduleThread(StgTSO *tso)
1516 {
1517   if (tso==END_TSO_QUEUE){    
1518     schedule();
1519     return;
1520   }
1521
1522   ACQUIRE_LOCK(&sched_mutex);
1523
1524   /* Put the new thread on the head of the runnable queue.  The caller
1525    * better push an appropriate closure on this thread's stack
1526    * beforehand.  In the SMP case, the thread may start running as
1527    * soon as we release the scheduler lock below.
1528    */
1529   PUSH_ON_RUN_QUEUE(tso);
1530   THREAD_RUNNABLE();
1531
1532 #if 0
1533   IF_DEBUG(scheduler,printTSO(tso));
1534 #endif
1535   RELEASE_LOCK(&sched_mutex);
1536 }
1537
1538 /* ---------------------------------------------------------------------------
1539  * startTasks()
1540  *
1541  * Start up Posix threads to run each of the scheduler tasks.
1542  * I believe the task ids are not needed in the system as defined.
1543  *  KH @ 25/10/99
1544  * ------------------------------------------------------------------------ */
1545
1546 #if defined(PAR) || defined(SMP)
1547 void *
1548 taskStart( void *arg STG_UNUSED )
1549 {
1550   rts_evalNothing(NULL);
1551 }
1552 #endif
1553
1554 /* ---------------------------------------------------------------------------
1555  * initScheduler()
1556  *
1557  * Initialise the scheduler.  This resets all the queues - if the
1558  * queues contained any threads, they'll be garbage collected at the
1559  * next pass.
1560  *
1561  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1562  * ------------------------------------------------------------------------ */
1563
1564 #ifdef SMP
1565 static void
1566 term_handler(int sig STG_UNUSED)
1567 {
1568   stat_workerStop();
1569   ACQUIRE_LOCK(&term_mutex);
1570   await_death--;
1571   RELEASE_LOCK(&term_mutex);
1572   pthread_exit(NULL);
1573 }
1574 #endif
1575
1576 //@cindex initScheduler
1577 void 
1578 initScheduler(void)
1579 {
1580 #if defined(GRAN)
1581   nat i;
1582
1583   for (i=0; i<=MAX_PROC; i++) {
1584     run_queue_hds[i]      = END_TSO_QUEUE;
1585     run_queue_tls[i]      = END_TSO_QUEUE;
1586     blocked_queue_hds[i]  = END_TSO_QUEUE;
1587     blocked_queue_tls[i]  = END_TSO_QUEUE;
1588     ccalling_threadss[i]  = END_TSO_QUEUE;
1589     sleeping_queue        = END_TSO_QUEUE;
1590   }
1591 #else
1592   run_queue_hd      = END_TSO_QUEUE;
1593   run_queue_tl      = END_TSO_QUEUE;
1594   blocked_queue_hd  = END_TSO_QUEUE;
1595   blocked_queue_tl  = END_TSO_QUEUE;
1596   sleeping_queue    = END_TSO_QUEUE;
1597 #endif 
1598
1599   suspended_ccalling_threads  = END_TSO_QUEUE;
1600
1601   main_threads = NULL;
1602   all_threads  = END_TSO_QUEUE;
1603
1604   context_switch = 0;
1605   interrupted    = 0;
1606
1607   RtsFlags.ConcFlags.ctxtSwitchTicks =
1608       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1609
1610   /* Install the SIGHUP handler */
1611 #ifdef SMP
1612   {
1613     struct sigaction action,oact;
1614
1615     action.sa_handler = term_handler;
1616     sigemptyset(&action.sa_mask);
1617     action.sa_flags = 0;
1618     if (sigaction(SIGTERM, &action, &oact) != 0) {
1619       barf("can't install TERM handler");
1620     }
1621   }
1622 #endif
1623
1624 #ifdef SMP
1625   /* Allocate N Capabilities */
1626   {
1627     nat i;
1628     Capability *cap, *prev;
1629     cap  = NULL;
1630     prev = NULL;
1631     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1632       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1633       cap->link = prev;
1634       prev = cap;
1635     }
1636     free_capabilities = cap;
1637     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1638   }
1639   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1640                              n_free_capabilities););
1641 #endif
1642
1643 #if defined(SMP) || defined(PAR)
1644   initSparkPools();
1645 #endif
1646 }
1647
1648 #ifdef SMP
1649 void
1650 startTasks( void )
1651 {
1652   nat i;
1653   int r;
1654   pthread_t tid;
1655   
1656   /* make some space for saving all the thread ids */
1657   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1658                             "initScheduler:task_ids");
1659   
1660   /* and create all the threads */
1661   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1662     r = pthread_create(&tid,NULL,taskStart,NULL);
1663     if (r != 0) {
1664       barf("startTasks: Can't create new Posix thread");
1665     }
1666     task_ids[i].id = tid;
1667     task_ids[i].mut_time = 0.0;
1668     task_ids[i].mut_etime = 0.0;
1669     task_ids[i].gc_time = 0.0;
1670     task_ids[i].gc_etime = 0.0;
1671     task_ids[i].elapsedtimestart = elapsedtime();
1672     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1673   }
1674 }
1675 #endif
1676
1677 void
1678 exitScheduler( void )
1679 {
1680 #ifdef SMP
1681   nat i;
1682
1683   /* Don't want to use pthread_cancel, since we'd have to install
1684    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1685    * all our locks.
1686    */
1687 #if 0
1688   /* Cancel all our tasks */
1689   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1690     pthread_cancel(task_ids[i].id);
1691   }
1692   
1693   /* Wait for all the tasks to terminate */
1694   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1695     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1696                                task_ids[i].id));
1697     pthread_join(task_ids[i].id, NULL);
1698   }
1699 #endif
1700
1701   /* Send 'em all a SIGHUP.  That should shut 'em up.
1702    */
1703   await_death = RtsFlags.ParFlags.nNodes;
1704   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1705     pthread_kill(task_ids[i].id,SIGTERM);
1706   }
1707   while (await_death > 0) {
1708     sched_yield();
1709   }
1710 #endif
1711 }
1712
1713 /* -----------------------------------------------------------------------------
1714    Managing the per-task allocation areas.
1715    
1716    Each capability comes with an allocation area.  These are
1717    fixed-length block lists into which allocation can be done.
1718
1719    ToDo: no support for two-space collection at the moment???
1720    -------------------------------------------------------------------------- */
1721
1722 /* -----------------------------------------------------------------------------
1723  * waitThread is the external interface for running a new computation
1724  * and waiting for the result.
1725  *
1726  * In the non-SMP case, we create a new main thread, push it on the 
1727  * main-thread stack, and invoke the scheduler to run it.  The
1728  * scheduler will return when the top main thread on the stack has
1729  * completed or died, and fill in the necessary fields of the
1730  * main_thread structure.
1731  *
1732  * In the SMP case, we create a main thread as before, but we then
1733  * create a new condition variable and sleep on it.  When our new
1734  * main thread has completed, we'll be woken up and the status/result
1735  * will be in the main_thread struct.
1736  * -------------------------------------------------------------------------- */
1737
1738 int 
1739 howManyThreadsAvail ( void )
1740 {
1741    int i = 0;
1742    StgTSO* q;
1743    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1744       i++;
1745    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1746       i++;
1747    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1748       i++;
1749    return i;
1750 }
1751
1752 void
1753 finishAllThreads ( void )
1754 {
1755    do {
1756       while (run_queue_hd != END_TSO_QUEUE) {
1757          waitThread ( run_queue_hd, NULL );
1758       }
1759       while (blocked_queue_hd != END_TSO_QUEUE) {
1760          waitThread ( blocked_queue_hd, NULL );
1761       }
1762       while (sleeping_queue != END_TSO_QUEUE) {
1763          waitThread ( blocked_queue_hd, NULL );
1764       }
1765    } while 
1766       (blocked_queue_hd != END_TSO_QUEUE || 
1767        run_queue_hd     != END_TSO_QUEUE ||
1768        sleeping_queue   != END_TSO_QUEUE);
1769 }
1770
1771 SchedulerStatus
1772 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1773 {
1774   StgMainThread *m;
1775   SchedulerStatus stat;
1776
1777   ACQUIRE_LOCK(&sched_mutex);
1778   
1779   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1780
1781   m->tso = tso;
1782   m->ret = ret;
1783   m->stat = NoStatus;
1784 #ifdef SMP
1785   pthread_cond_init(&m->wakeup, NULL);
1786 #endif
1787
1788   m->link = main_threads;
1789   main_threads = m;
1790
1791   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1792                               m->tso->id));
1793
1794 #ifdef SMP
1795   do {
1796     pthread_cond_wait(&m->wakeup, &sched_mutex);
1797   } while (m->stat == NoStatus);
1798 #elif defined(GRAN)
1799   /* GranSim specific init */
1800   CurrentTSO = m->tso;                // the TSO to run
1801   procStatus[MainProc] = Busy;        // status of main PE
1802   CurrentProc = MainProc;             // PE to run it on
1803
1804   schedule();
1805 #else
1806   schedule();
1807   ASSERT(m->stat != NoStatus);
1808 #endif
1809
1810   stat = m->stat;
1811
1812 #ifdef SMP
1813   pthread_cond_destroy(&m->wakeup);
1814 #endif
1815
1816   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1817                               m->tso->id));
1818   free(m);
1819
1820   RELEASE_LOCK(&sched_mutex);
1821
1822   return stat;
1823 }
1824
1825 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1826 //@subsection Run queue code 
1827
1828 #if 0
1829 /* 
1830    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1831        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1832        implicit global variable that has to be correct when calling these
1833        fcts -- HWL 
1834 */
1835
1836 /* Put the new thread on the head of the runnable queue.
1837  * The caller of createThread better push an appropriate closure
1838  * on this thread's stack before the scheduler is invoked.
1839  */
1840 static /* inline */ void
1841 add_to_run_queue(tso)
1842 StgTSO* tso; 
1843 {
1844   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1845   tso->link = run_queue_hd;
1846   run_queue_hd = tso;
1847   if (run_queue_tl == END_TSO_QUEUE) {
1848     run_queue_tl = tso;
1849   }
1850 }
1851
1852 /* Put the new thread at the end of the runnable queue. */
1853 static /* inline */ void
1854 push_on_run_queue(tso)
1855 StgTSO* tso; 
1856 {
1857   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1858   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1859   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1860   if (run_queue_hd == END_TSO_QUEUE) {
1861     run_queue_hd = tso;
1862   } else {
1863     run_queue_tl->link = tso;
1864   }
1865   run_queue_tl = tso;
1866 }
1867
1868 /* 
1869    Should be inlined because it's used very often in schedule.  The tso
1870    argument is actually only needed in GranSim, where we want to have the
1871    possibility to schedule *any* TSO on the run queue, irrespective of the
1872    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1873    the run queue and dequeue the tso, adjusting the links in the queue. 
1874 */
1875 //@cindex take_off_run_queue
1876 static /* inline */ StgTSO*
1877 take_off_run_queue(StgTSO *tso) {
1878   StgTSO *t, *prev;
1879
1880   /* 
1881      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1882
1883      if tso is specified, unlink that tso from the run_queue (doesn't have
1884      to be at the beginning of the queue); GranSim only 
1885   */
1886   if (tso!=END_TSO_QUEUE) {
1887     /* find tso in queue */
1888     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1889          t!=END_TSO_QUEUE && t!=tso;
1890          prev=t, t=t->link) 
1891       /* nothing */ ;
1892     ASSERT(t==tso);
1893     /* now actually dequeue the tso */
1894     if (prev!=END_TSO_QUEUE) {
1895       ASSERT(run_queue_hd!=t);
1896       prev->link = t->link;
1897     } else {
1898       /* t is at beginning of thread queue */
1899       ASSERT(run_queue_hd==t);
1900       run_queue_hd = t->link;
1901     }
1902     /* t is at end of thread queue */
1903     if (t->link==END_TSO_QUEUE) {
1904       ASSERT(t==run_queue_tl);
1905       run_queue_tl = prev;
1906     } else {
1907       ASSERT(run_queue_tl!=t);
1908     }
1909     t->link = END_TSO_QUEUE;
1910   } else {
1911     /* take tso from the beginning of the queue; std concurrent code */
1912     t = run_queue_hd;
1913     if (t != END_TSO_QUEUE) {
1914       run_queue_hd = t->link;
1915       t->link = END_TSO_QUEUE;
1916       if (run_queue_hd == END_TSO_QUEUE) {
1917         run_queue_tl = END_TSO_QUEUE;
1918       }
1919     }
1920   }
1921   return t;
1922 }
1923
1924 #endif /* 0 */
1925
1926 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1927 //@subsection Garbage Collextion Routines
1928
1929 /* ---------------------------------------------------------------------------
1930    Where are the roots that we know about?
1931
1932         - all the threads on the runnable queue
1933         - all the threads on the blocked queue
1934         - all the threads on the sleeping queue
1935         - all the thread currently executing a _ccall_GC
1936         - all the "main threads"
1937      
1938    ------------------------------------------------------------------------ */
1939
1940 /* This has to be protected either by the scheduler monitor, or by the
1941         garbage collection monitor (probably the latter).
1942         KH @ 25/10/99
1943 */
1944
1945 static void GetRoots(void)
1946 {
1947   StgMainThread *m;
1948
1949 #if defined(GRAN)
1950   {
1951     nat i;
1952     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1953       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1954         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1955       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1956         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1957       
1958       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1959         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1960       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1961         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1962       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1963         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1964     }
1965   }
1966
1967   markEventQueue();
1968
1969 #else /* !GRAN */
1970   if (run_queue_hd != END_TSO_QUEUE) {
1971     ASSERT(run_queue_tl != END_TSO_QUEUE);
1972     run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1973     run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1974   }
1975
1976   if (blocked_queue_hd != END_TSO_QUEUE) {
1977     ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1978     blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1979     blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1980   }
1981
1982   if (sleeping_queue != END_TSO_QUEUE) {
1983     sleeping_queue  = (StgTSO *)MarkRoot((StgClosure *)sleeping_queue);
1984   }
1985 #endif 
1986
1987   for (m = main_threads; m != NULL; m = m->link) {
1988     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1989   }
1990   if (suspended_ccalling_threads != END_TSO_QUEUE)
1991     suspended_ccalling_threads = 
1992       (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1993
1994 #if defined(SMP) || defined(PAR) || defined(GRAN)
1995   markSparkQueue();
1996 #endif
1997 }
1998
1999 /* -----------------------------------------------------------------------------
2000    performGC
2001
2002    This is the interface to the garbage collector from Haskell land.
2003    We provide this so that external C code can allocate and garbage
2004    collect when called from Haskell via _ccall_GC.
2005
2006    It might be useful to provide an interface whereby the programmer
2007    can specify more roots (ToDo).
2008    
2009    This needs to be protected by the GC condition variable above.  KH.
2010    -------------------------------------------------------------------------- */
2011
2012 void (*extra_roots)(void);
2013
2014 void
2015 performGC(void)
2016 {
2017   GarbageCollect(GetRoots,rtsFalse);
2018 }
2019
2020 void
2021 performMajorGC(void)
2022 {
2023   GarbageCollect(GetRoots,rtsTrue);
2024 }
2025
2026 static void
2027 AllRoots(void)
2028 {
2029   GetRoots();                   /* the scheduler's roots */
2030   extra_roots();                /* the user's roots */
2031 }
2032
2033 void
2034 performGCWithRoots(void (*get_roots)(void))
2035 {
2036   extra_roots = get_roots;
2037
2038   GarbageCollect(AllRoots,rtsFalse);
2039 }
2040
2041 /* -----------------------------------------------------------------------------
2042    Stack overflow
2043
2044    If the thread has reached its maximum stack size, then raise the
2045    StackOverflow exception in the offending thread.  Otherwise
2046    relocate the TSO into a larger chunk of memory and adjust its stack
2047    size appropriately.
2048    -------------------------------------------------------------------------- */
2049
2050 static StgTSO *
2051 threadStackOverflow(StgTSO *tso)
2052 {
2053   nat new_stack_size, new_tso_size, diff, stack_words;
2054   StgPtr new_sp;
2055   StgTSO *dest;
2056
2057   IF_DEBUG(sanity,checkTSO(tso));
2058   if (tso->stack_size >= tso->max_stack_size) {
2059
2060     IF_DEBUG(gc,
2061              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2062                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2063              /* If we're debugging, just print out the top of the stack */
2064              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2065                                               tso->sp+64)));
2066
2067     /* Send this thread the StackOverflow exception */
2068     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2069     return tso;
2070   }
2071
2072   /* Try to double the current stack size.  If that takes us over the
2073    * maximum stack size for this thread, then use the maximum instead.
2074    * Finally round up so the TSO ends up as a whole number of blocks.
2075    */
2076   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2077   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2078                                        TSO_STRUCT_SIZE)/sizeof(W_);
2079   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2080   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2081
2082   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2083
2084   dest = (StgTSO *)allocate(new_tso_size);
2085   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2086
2087   /* copy the TSO block and the old stack into the new area */
2088   memcpy(dest,tso,TSO_STRUCT_SIZE);
2089   stack_words = tso->stack + tso->stack_size - tso->sp;
2090   new_sp = (P_)dest + new_tso_size - stack_words;
2091   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2092
2093   /* relocate the stack pointers... */
2094   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2095   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2096   dest->sp    = new_sp;
2097   dest->stack_size = new_stack_size;
2098         
2099   /* and relocate the update frame list */
2100   relocate_TSO(tso, dest);
2101
2102   /* Mark the old TSO as relocated.  We have to check for relocated
2103    * TSOs in the garbage collector and any primops that deal with TSOs.
2104    *
2105    * It's important to set the sp and su values to just beyond the end
2106    * of the stack, so we don't attempt to scavenge any part of the
2107    * dead TSO's stack.
2108    */
2109   tso->what_next = ThreadRelocated;
2110   tso->link = dest;
2111   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2112   tso->su = (StgUpdateFrame *)tso->sp;
2113   tso->why_blocked = NotBlocked;
2114   dest->mut_link = NULL;
2115
2116   IF_PAR_DEBUG(verbose,
2117                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2118                      tso->id, tso, tso->stack_size);
2119                /* If we're debugging, just print out the top of the stack */
2120                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2121                                                 tso->sp+64)));
2122   
2123   IF_DEBUG(sanity,checkTSO(tso));
2124 #if 0
2125   IF_DEBUG(scheduler,printTSO(dest));
2126 #endif
2127
2128   return dest;
2129 }
2130
2131 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2132 //@subsection Blocking Queue Routines
2133
2134 /* ---------------------------------------------------------------------------
2135    Wake up a queue that was blocked on some resource.
2136    ------------------------------------------------------------------------ */
2137
2138 #if defined(GRAN)
2139 static inline void
2140 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2141 {
2142 }
2143 #elif defined(PAR)
2144 static inline void
2145 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2146 {
2147   /* write RESUME events to log file and
2148      update blocked and fetch time (depending on type of the orig closure) */
2149   if (RtsFlags.ParFlags.ParStats.Full) {
2150     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2151                      GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2152                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2153
2154     switch (get_itbl(node)->type) {
2155         case FETCH_ME_BQ:
2156           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2157           break;
2158         case RBH:
2159         case FETCH_ME:
2160         case BLACKHOLE_BQ:
2161           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2162           break;
2163         default:
2164           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2165         }
2166       }
2167 }
2168 #endif
2169
2170 #if defined(GRAN)
2171 static StgBlockingQueueElement *
2172 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2173 {
2174     StgTSO *tso;
2175     PEs node_loc, tso_loc;
2176
2177     node_loc = where_is(node); // should be lifted out of loop
2178     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2179     tso_loc = where_is((StgClosure *)tso);
2180     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2181       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2182       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2183       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2184       // insertThread(tso, node_loc);
2185       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2186                 ResumeThread,
2187                 tso, node, (rtsSpark*)NULL);
2188       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2189       // len_local++;
2190       // len++;
2191     } else { // TSO is remote (actually should be FMBQ)
2192       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2193                                   RtsFlags.GranFlags.Costs.gunblocktime +
2194                                   RtsFlags.GranFlags.Costs.latency;
2195       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2196                 UnblockThread,
2197                 tso, node, (rtsSpark*)NULL);
2198       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2199       // len++;
2200     }
2201     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2202     IF_GRAN_DEBUG(bq,
2203                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2204                           (node_loc==tso_loc ? "Local" : "Global"), 
2205                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2206     tso->block_info.closure = NULL;
2207     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2208                              tso->id, tso));
2209 }
2210 #elif defined(PAR)
2211 static StgBlockingQueueElement *
2212 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2213 {
2214     StgBlockingQueueElement *next;
2215
2216     switch (get_itbl(bqe)->type) {
2217     case TSO:
2218       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2219       /* if it's a TSO just push it onto the run_queue */
2220       next = bqe->link;
2221       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2222       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2223       THREAD_RUNNABLE();
2224       unblockCount(bqe, node);
2225       /* reset blocking status after dumping event */
2226       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2227       break;
2228
2229     case BLOCKED_FETCH:
2230       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2231       next = bqe->link;
2232       bqe->link = PendingFetches;
2233       PendingFetches = bqe;
2234       break;
2235
2236 # if defined(DEBUG)
2237       /* can ignore this case in a non-debugging setup; 
2238          see comments on RBHSave closures above */
2239     case CONSTR:
2240       /* check that the closure is an RBHSave closure */
2241       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2242              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2243              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2244       break;
2245
2246     default:
2247       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2248            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2249            (StgClosure *)bqe);
2250 # endif
2251     }
2252   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2253   return next;
2254 }
2255
2256 #else /* !GRAN && !PAR */
2257 static StgTSO *
2258 unblockOneLocked(StgTSO *tso)
2259 {
2260   StgTSO *next;
2261
2262   ASSERT(get_itbl(tso)->type == TSO);
2263   ASSERT(tso->why_blocked != NotBlocked);
2264   tso->why_blocked = NotBlocked;
2265   next = tso->link;
2266   PUSH_ON_RUN_QUEUE(tso);
2267   THREAD_RUNNABLE();
2268   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2269   return next;
2270 }
2271 #endif
2272
2273 #if defined(GRAN) || defined(PAR)
2274 inline StgBlockingQueueElement *
2275 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2276 {
2277   ACQUIRE_LOCK(&sched_mutex);
2278   bqe = unblockOneLocked(bqe, node);
2279   RELEASE_LOCK(&sched_mutex);
2280   return bqe;
2281 }
2282 #else
2283 inline StgTSO *
2284 unblockOne(StgTSO *tso)
2285 {
2286   ACQUIRE_LOCK(&sched_mutex);
2287   tso = unblockOneLocked(tso);
2288   RELEASE_LOCK(&sched_mutex);
2289   return tso;
2290 }
2291 #endif
2292
2293 #if defined(GRAN)
2294 void 
2295 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2296 {
2297   StgBlockingQueueElement *bqe;
2298   PEs node_loc;
2299   nat len = 0; 
2300
2301   IF_GRAN_DEBUG(bq, 
2302                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2303                       node, CurrentProc, CurrentTime[CurrentProc], 
2304                       CurrentTSO->id, CurrentTSO));
2305
2306   node_loc = where_is(node);
2307
2308   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2309          get_itbl(q)->type == CONSTR); // closure (type constructor)
2310   ASSERT(is_unique(node));
2311
2312   /* FAKE FETCH: magically copy the node to the tso's proc;
2313      no Fetch necessary because in reality the node should not have been 
2314      moved to the other PE in the first place
2315   */
2316   if (CurrentProc!=node_loc) {
2317     IF_GRAN_DEBUG(bq, 
2318                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2319                         node, node_loc, CurrentProc, CurrentTSO->id, 
2320                         // CurrentTSO, where_is(CurrentTSO),
2321                         node->header.gran.procs));
2322     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2323     IF_GRAN_DEBUG(bq, 
2324                   belch("## new bitmask of node %p is %#x",
2325                         node, node->header.gran.procs));
2326     if (RtsFlags.GranFlags.GranSimStats.Global) {
2327       globalGranStats.tot_fake_fetches++;
2328     }
2329   }
2330
2331   bqe = q;
2332   // ToDo: check: ASSERT(CurrentProc==node_loc);
2333   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2334     //next = bqe->link;
2335     /* 
2336        bqe points to the current element in the queue
2337        next points to the next element in the queue
2338     */
2339     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2340     //tso_loc = where_is(tso);
2341     len++;
2342     bqe = unblockOneLocked(bqe, node);
2343   }
2344
2345   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2346      the closure to make room for the anchor of the BQ */
2347   if (bqe!=END_BQ_QUEUE) {
2348     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2349     /*
2350     ASSERT((info_ptr==&RBH_Save_0_info) ||
2351            (info_ptr==&RBH_Save_1_info) ||
2352            (info_ptr==&RBH_Save_2_info));
2353     */
2354     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2355     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2356     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2357
2358     IF_GRAN_DEBUG(bq,
2359                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2360                         node, info_type(node)));
2361   }
2362
2363   /* statistics gathering */
2364   if (RtsFlags.GranFlags.GranSimStats.Global) {
2365     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2366     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2367     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2368     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2369   }
2370   IF_GRAN_DEBUG(bq,
2371                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2372                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2373 }
2374 #elif defined(PAR)
2375 void 
2376 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2377 {
2378   StgBlockingQueueElement *bqe, *next;
2379
2380   ACQUIRE_LOCK(&sched_mutex);
2381
2382   IF_PAR_DEBUG(verbose, 
2383                belch("## AwBQ for node %p on [%x]: ",
2384                      node, mytid));
2385
2386   ASSERT(get_itbl(q)->type == TSO ||           
2387          get_itbl(q)->type == BLOCKED_FETCH || 
2388          get_itbl(q)->type == CONSTR); 
2389
2390   bqe = q;
2391   while (get_itbl(bqe)->type==TSO || 
2392          get_itbl(bqe)->type==BLOCKED_FETCH) {
2393     bqe = unblockOneLocked(bqe, node);
2394   }
2395   RELEASE_LOCK(&sched_mutex);
2396 }
2397
2398 #else   /* !GRAN && !PAR */
2399 void
2400 awakenBlockedQueue(StgTSO *tso)
2401 {
2402   ACQUIRE_LOCK(&sched_mutex);
2403   while (tso != END_TSO_QUEUE) {
2404     tso = unblockOneLocked(tso);
2405   }
2406   RELEASE_LOCK(&sched_mutex);
2407 }
2408 #endif
2409
2410 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2411 //@subsection Exception Handling Routines
2412
2413 /* ---------------------------------------------------------------------------
2414    Interrupt execution
2415    - usually called inside a signal handler so it mustn't do anything fancy.   
2416    ------------------------------------------------------------------------ */
2417
2418 void
2419 interruptStgRts(void)
2420 {
2421     interrupted    = 1;
2422     context_switch = 1;
2423 }
2424
2425 /* -----------------------------------------------------------------------------
2426    Unblock a thread
2427
2428    This is for use when we raise an exception in another thread, which
2429    may be blocked.
2430    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2431    -------------------------------------------------------------------------- */
2432
2433 #if defined(GRAN) || defined(PAR)
2434 /*
2435   NB: only the type of the blocking queue is different in GranSim and GUM
2436       the operations on the queue-elements are the same
2437       long live polymorphism!
2438 */
2439 static void
2440 unblockThread(StgTSO *tso)
2441 {
2442   StgBlockingQueueElement *t, **last;
2443
2444   ACQUIRE_LOCK(&sched_mutex);
2445   switch (tso->why_blocked) {
2446
2447   case NotBlocked:
2448     return;  /* not blocked */
2449
2450   case BlockedOnMVar:
2451     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2452     {
2453       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2454       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2455
2456       last = (StgBlockingQueueElement **)&mvar->head;
2457       for (t = (StgBlockingQueueElement *)mvar->head; 
2458            t != END_BQ_QUEUE; 
2459            last = &t->link, last_tso = t, t = t->link) {
2460         if (t == (StgBlockingQueueElement *)tso) {
2461           *last = (StgBlockingQueueElement *)tso->link;
2462           if (mvar->tail == tso) {
2463             mvar->tail = (StgTSO *)last_tso;
2464           }
2465           goto done;
2466         }
2467       }
2468       barf("unblockThread (MVAR): TSO not found");
2469     }
2470
2471   case BlockedOnBlackHole:
2472     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2473     {
2474       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2475
2476       last = &bq->blocking_queue;
2477       for (t = bq->blocking_queue; 
2478            t != END_BQ_QUEUE; 
2479            last = &t->link, t = t->link) {
2480         if (t == (StgBlockingQueueElement *)tso) {
2481           *last = (StgBlockingQueueElement *)tso->link;
2482           goto done;
2483         }
2484       }
2485       barf("unblockThread (BLACKHOLE): TSO not found");
2486     }
2487
2488   case BlockedOnException:
2489     {
2490       StgTSO *target  = tso->block_info.tso;
2491
2492       ASSERT(get_itbl(target)->type == TSO);
2493
2494       if (target->what_next == ThreadRelocated) {
2495           target = target->link;
2496           ASSERT(get_itbl(target)->type == TSO);
2497       }
2498
2499       ASSERT(target->blocked_exceptions != NULL);
2500
2501       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2502       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2503            t != END_BQ_QUEUE; 
2504            last = &t->link, t = t->link) {
2505         ASSERT(get_itbl(t)->type == TSO);
2506         if (t == (StgBlockingQueueElement *)tso) {
2507           *last = (StgBlockingQueueElement *)tso->link;
2508           goto done;
2509         }
2510       }
2511       barf("unblockThread (Exception): TSO not found");
2512     }
2513
2514   case BlockedOnRead:
2515   case BlockedOnWrite:
2516     {
2517       StgBlockingQueueElement *prev = NULL;
2518       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2519            prev = t, t = t->link) {
2520         if (t == (StgBlockingQueueElement *)tso) {
2521           if (prev == NULL) {
2522             blocked_queue_hd = (StgTSO *)t->link;
2523             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2524               blocked_queue_tl = END_TSO_QUEUE;
2525             }
2526           } else {
2527             prev->link = t->link;
2528             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2529               blocked_queue_tl = (StgTSO *)prev;
2530             }
2531           }
2532           goto done;
2533         }
2534       }
2535       barf("unblockThread (I/O): TSO not found");
2536     }
2537
2538   case BlockedOnDelay:
2539     {
2540       StgBlockingQueueElement *prev = NULL;
2541       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2542            prev = t, t = t->link) {
2543         if (t == (StgBlockingQueueElement *)tso) {
2544           if (prev == NULL) {
2545             sleeping_queue = (StgTSO *)t->link;
2546           } else {
2547             prev->link = t->link;
2548           }
2549           goto done;
2550         }
2551       }
2552       barf("unblockThread (I/O): TSO not found");
2553     }
2554
2555   default:
2556     barf("unblockThread");
2557   }
2558
2559  done:
2560   tso->link = END_TSO_QUEUE;
2561   tso->why_blocked = NotBlocked;
2562   tso->block_info.closure = NULL;
2563   PUSH_ON_RUN_QUEUE(tso);
2564   RELEASE_LOCK(&sched_mutex);
2565 }
2566 #else
2567 static void
2568 unblockThread(StgTSO *tso)
2569 {
2570   StgTSO *t, **last;
2571
2572   ACQUIRE_LOCK(&sched_mutex);
2573   switch (tso->why_blocked) {
2574
2575   case NotBlocked:
2576     return;  /* not blocked */
2577
2578   case BlockedOnMVar:
2579     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2580     {
2581       StgTSO *last_tso = END_TSO_QUEUE;
2582       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2583
2584       last = &mvar->head;
2585       for (t = mvar->head; t != END_TSO_QUEUE; 
2586            last = &t->link, last_tso = t, t = t->link) {
2587         if (t == tso) {
2588           *last = tso->link;
2589           if (mvar->tail == tso) {
2590             mvar->tail = last_tso;
2591           }
2592           goto done;
2593         }
2594       }
2595       barf("unblockThread (MVAR): TSO not found");
2596     }
2597
2598   case BlockedOnBlackHole:
2599     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2600     {
2601       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2602
2603       last = &bq->blocking_queue;
2604       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2605            last = &t->link, t = t->link) {
2606         if (t == tso) {
2607           *last = tso->link;
2608           goto done;
2609         }
2610       }
2611       barf("unblockThread (BLACKHOLE): TSO not found");
2612     }
2613
2614   case BlockedOnException:
2615     {
2616       StgTSO *target  = tso->block_info.tso;
2617
2618       ASSERT(get_itbl(target)->type == TSO);
2619
2620       while (target->what_next == ThreadRelocated) {
2621           target = target->link;
2622           ASSERT(get_itbl(target)->type == TSO);
2623       }
2624       
2625       ASSERT(target->blocked_exceptions != NULL);
2626
2627       last = &target->blocked_exceptions;
2628       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2629            last = &t->link, t = t->link) {
2630         ASSERT(get_itbl(t)->type == TSO);
2631         if (t == tso) {
2632           *last = tso->link;
2633           goto done;
2634         }
2635       }
2636       barf("unblockThread (Exception): TSO not found");
2637     }
2638
2639   case BlockedOnRead:
2640   case BlockedOnWrite:
2641     {
2642       StgTSO *prev = NULL;
2643       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2644            prev = t, t = t->link) {
2645         if (t == tso) {
2646           if (prev == NULL) {
2647             blocked_queue_hd = t->link;
2648             if (blocked_queue_tl == t) {
2649               blocked_queue_tl = END_TSO_QUEUE;
2650             }
2651           } else {
2652             prev->link = t->link;
2653             if (blocked_queue_tl == t) {
2654               blocked_queue_tl = prev;
2655             }
2656           }
2657           goto done;
2658         }
2659       }
2660       barf("unblockThread (I/O): TSO not found");
2661     }
2662
2663   case BlockedOnDelay:
2664     {
2665       StgTSO *prev = NULL;
2666       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2667            prev = t, t = t->link) {
2668         if (t == tso) {
2669           if (prev == NULL) {
2670             sleeping_queue = t->link;
2671           } else {
2672             prev->link = t->link;
2673           }
2674           goto done;
2675         }
2676       }
2677       barf("unblockThread (I/O): TSO not found");
2678     }
2679
2680   default:
2681     barf("unblockThread");
2682   }
2683
2684  done:
2685   tso->link = END_TSO_QUEUE;
2686   tso->why_blocked = NotBlocked;
2687   tso->block_info.closure = NULL;
2688   PUSH_ON_RUN_QUEUE(tso);
2689   RELEASE_LOCK(&sched_mutex);
2690 }
2691 #endif
2692
2693 /* -----------------------------------------------------------------------------
2694  * raiseAsync()
2695  *
2696  * The following function implements the magic for raising an
2697  * asynchronous exception in an existing thread.
2698  *
2699  * We first remove the thread from any queue on which it might be
2700  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2701  *
2702  * We strip the stack down to the innermost CATCH_FRAME, building
2703  * thunks in the heap for all the active computations, so they can 
2704  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2705  * an application of the handler to the exception, and push it on
2706  * the top of the stack.
2707  * 
2708  * How exactly do we save all the active computations?  We create an
2709  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2710  * AP_UPDs pushes everything from the corresponding update frame
2711  * upwards onto the stack.  (Actually, it pushes everything up to the
2712  * next update frame plus a pointer to the next AP_UPD object.
2713  * Entering the next AP_UPD object pushes more onto the stack until we
2714  * reach the last AP_UPD object - at which point the stack should look
2715  * exactly as it did when we killed the TSO and we can continue
2716  * execution by entering the closure on top of the stack.
2717  *
2718  * We can also kill a thread entirely - this happens if either (a) the 
2719  * exception passed to raiseAsync is NULL, or (b) there's no
2720  * CATCH_FRAME on the stack.  In either case, we strip the entire
2721  * stack and replace the thread with a zombie.
2722  *
2723  * -------------------------------------------------------------------------- */
2724  
2725 void 
2726 deleteThread(StgTSO *tso)
2727 {
2728   raiseAsync(tso,NULL);
2729 }
2730
2731 void
2732 raiseAsync(StgTSO *tso, StgClosure *exception)
2733 {
2734   StgUpdateFrame* su = tso->su;
2735   StgPtr          sp = tso->sp;
2736   
2737   /* Thread already dead? */
2738   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2739     return;
2740   }
2741
2742   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2743
2744   /* Remove it from any blocking queues */
2745   unblockThread(tso);
2746
2747   /* The stack freezing code assumes there's a closure pointer on
2748    * the top of the stack.  This isn't always the case with compiled
2749    * code, so we have to push a dummy closure on the top which just
2750    * returns to the next return address on the stack.
2751    */
2752   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2753     *(--sp) = (W_)&stg_dummy_ret_closure;
2754   }
2755
2756   while (1) {
2757     int words = ((P_)su - (P_)sp) - 1;
2758     nat i;
2759     StgAP_UPD * ap;
2760
2761     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2762      * then build PAP(handler,exception,realworld#), and leave it on
2763      * top of the stack ready to enter.
2764      */
2765     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2766       StgCatchFrame *cf = (StgCatchFrame *)su;
2767       /* we've got an exception to raise, so let's pass it to the
2768        * handler in this frame.
2769        */
2770       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2771       TICK_ALLOC_UPD_PAP(3,0);
2772       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
2773               
2774       ap->n_args = 2;
2775       ap->fun = cf->handler;    /* :: Exception -> IO a */
2776       ap->payload[0] = exception;
2777       ap->payload[1] = ARG_TAG(0); /* realworld token */
2778
2779       /* throw away the stack from Sp up to and including the
2780        * CATCH_FRAME.
2781        */
2782       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2783       tso->su = cf->link;
2784
2785       /* Restore the blocked/unblocked state for asynchronous exceptions
2786        * at the CATCH_FRAME.  
2787        *
2788        * If exceptions were unblocked at the catch, arrange that they
2789        * are unblocked again after executing the handler by pushing an
2790        * unblockAsyncExceptions_ret stack frame.
2791        */
2792       if (!cf->exceptions_blocked) {
2793         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
2794       }
2795       
2796       /* Ensure that async exceptions are blocked when running the handler.
2797        */
2798       if (tso->blocked_exceptions == NULL) {
2799         tso->blocked_exceptions = END_TSO_QUEUE;
2800       }
2801       
2802       /* Put the newly-built PAP on top of the stack, ready to execute
2803        * when the thread restarts.
2804        */
2805       sp[0] = (W_)ap;
2806       tso->sp = sp;
2807       tso->what_next = ThreadEnterGHC;
2808       IF_DEBUG(sanity, checkTSO(tso));
2809       return;
2810     }
2811
2812     /* First build an AP_UPD consisting of the stack chunk above the
2813      * current update frame, with the top word on the stack as the
2814      * fun field.
2815      */
2816     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2817     
2818     ASSERT(words >= 0);
2819     
2820     ap->n_args = words;
2821     ap->fun    = (StgClosure *)sp[0];
2822     sp++;
2823     for(i=0; i < (nat)words; ++i) {
2824       ap->payload[i] = (StgClosure *)*sp++;
2825     }
2826     
2827     switch (get_itbl(su)->type) {
2828       
2829     case UPDATE_FRAME:
2830       {
2831         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2832         TICK_ALLOC_UP_THK(words+1,0);
2833         
2834         IF_DEBUG(scheduler,
2835                  fprintf(stderr,  "scheduler: Updating ");
2836                  printPtr((P_)su->updatee); 
2837                  fprintf(stderr,  " with ");
2838                  printObj((StgClosure *)ap);
2839                  );
2840         
2841         /* Replace the updatee with an indirection - happily
2842          * this will also wake up any threads currently
2843          * waiting on the result.
2844          *
2845          * Warning: if we're in a loop, more than one update frame on
2846          * the stack may point to the same object.  Be careful not to
2847          * overwrite an IND_OLDGEN in this case, because we'll screw
2848          * up the mutable lists.  To be on the safe side, don't
2849          * overwrite any kind of indirection at all.  See also
2850          * threadSqueezeStack in GC.c, where we have to make a similar
2851          * check.
2852          */
2853         if (!closure_IND(su->updatee)) {
2854             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2855         }
2856         su = su->link;
2857         sp += sizeofW(StgUpdateFrame) -1;
2858         sp[0] = (W_)ap; /* push onto stack */
2859         break;
2860       }
2861
2862     case CATCH_FRAME:
2863       {
2864         StgCatchFrame *cf = (StgCatchFrame *)su;
2865         StgClosure* o;
2866         
2867         /* We want a PAP, not an AP_UPD.  Fortunately, the
2868          * layout's the same.
2869          */
2870         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
2871         TICK_ALLOC_UPD_PAP(words+1,0);
2872         
2873         /* now build o = FUN(catch,ap,handler) */
2874         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2875         TICK_ALLOC_FUN(2,0);
2876         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
2877         o->payload[0] = (StgClosure *)ap;
2878         o->payload[1] = cf->handler;
2879         
2880         IF_DEBUG(scheduler,
2881                  fprintf(stderr,  "scheduler: Built ");
2882                  printObj((StgClosure *)o);
2883                  );
2884         
2885         /* pop the old handler and put o on the stack */
2886         su = cf->link;
2887         sp += sizeofW(StgCatchFrame) - 1;
2888         sp[0] = (W_)o;
2889         break;
2890       }
2891       
2892     case SEQ_FRAME:
2893       {
2894         StgSeqFrame *sf = (StgSeqFrame *)su;
2895         StgClosure* o;
2896         
2897         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
2898         TICK_ALLOC_UPD_PAP(words+1,0);
2899         
2900         /* now build o = FUN(seq,ap) */
2901         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2902         TICK_ALLOC_SE_THK(1,0);
2903         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
2904         o->payload[0] = (StgClosure *)ap;
2905         
2906         IF_DEBUG(scheduler,
2907                  fprintf(stderr,  "scheduler: Built ");
2908                  printObj((StgClosure *)o);
2909                  );
2910         
2911         /* pop the old handler and put o on the stack */
2912         su = sf->link;
2913         sp += sizeofW(StgSeqFrame) - 1;
2914         sp[0] = (W_)o;
2915         break;
2916       }
2917       
2918     case STOP_FRAME:
2919       /* We've stripped the entire stack, the thread is now dead. */
2920       sp += sizeofW(StgStopFrame) - 1;
2921       sp[0] = (W_)exception;    /* save the exception */
2922       tso->what_next = ThreadKilled;
2923       tso->su = (StgUpdateFrame *)(sp+1);
2924       tso->sp = sp;
2925       return;
2926
2927     default:
2928       barf("raiseAsync");
2929     }
2930   }
2931   barf("raiseAsync");
2932 }
2933
2934 /* -----------------------------------------------------------------------------
2935    resurrectThreads is called after garbage collection on the list of
2936    threads found to be garbage.  Each of these threads will be woken
2937    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2938    on an MVar, or NonTermination if the thread was blocked on a Black
2939    Hole.
2940    -------------------------------------------------------------------------- */
2941
2942 void
2943 resurrectThreads( StgTSO *threads )
2944 {
2945   StgTSO *tso, *next;
2946
2947   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2948     next = tso->global_link;
2949     tso->global_link = all_threads;
2950     all_threads = tso;
2951     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2952
2953     switch (tso->why_blocked) {
2954     case BlockedOnMVar:
2955     case BlockedOnException:
2956       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2957       break;
2958     case BlockedOnBlackHole:
2959       raiseAsync(tso,(StgClosure *)NonTermination_closure);
2960       break;
2961     case NotBlocked:
2962       /* This might happen if the thread was blocked on a black hole
2963        * belonging to a thread that we've just woken up (raiseAsync
2964        * can wake up threads, remember...).
2965        */
2966       continue;
2967     default:
2968       barf("resurrectThreads: thread blocked in a strange way");
2969     }
2970   }
2971 }
2972
2973 /* -----------------------------------------------------------------------------
2974  * Blackhole detection: if we reach a deadlock, test whether any
2975  * threads are blocked on themselves.  Any threads which are found to
2976  * be self-blocked get sent a NonTermination exception.
2977  *
2978  * This is only done in a deadlock situation in order to avoid
2979  * performance overhead in the normal case.
2980  * -------------------------------------------------------------------------- */
2981
2982 static void
2983 detectBlackHoles( void )
2984 {
2985     StgTSO *t = all_threads;
2986     StgUpdateFrame *frame;
2987     StgClosure *blocked_on;
2988
2989     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2990
2991         while (t->what_next == ThreadRelocated) {
2992             t = t->link;
2993             ASSERT(get_itbl(t)->type == TSO);
2994         }
2995       
2996         if (t->why_blocked != BlockedOnBlackHole) {
2997             continue;
2998         }
2999
3000         blocked_on = t->block_info.closure;
3001
3002         for (frame = t->su; ; frame = frame->link) {
3003             switch (get_itbl(frame)->type) {
3004
3005             case UPDATE_FRAME:
3006                 if (frame->updatee == blocked_on) {
3007                     /* We are blocking on one of our own computations, so
3008                      * send this thread the NonTermination exception.  
3009                      */
3010                     IF_DEBUG(scheduler, 
3011                              sched_belch("thread %d is blocked on itself", t->id));
3012                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3013                     goto done;
3014                 }
3015                 else {
3016                     continue;
3017                 }
3018
3019             case CATCH_FRAME:
3020             case SEQ_FRAME:
3021                 continue;
3022                 
3023             case STOP_FRAME:
3024                 break;
3025             }
3026             break;
3027         }
3028
3029     done: ;
3030     }   
3031 }
3032
3033 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3034 //@subsection Debugging Routines
3035
3036 /* -----------------------------------------------------------------------------
3037    Debugging: why is a thread blocked
3038    -------------------------------------------------------------------------- */
3039
3040 #ifdef DEBUG
3041
3042 void
3043 printThreadBlockage(StgTSO *tso)
3044 {
3045   switch (tso->why_blocked) {
3046   case BlockedOnRead:
3047     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3048     break;
3049   case BlockedOnWrite:
3050     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3051     break;
3052   case BlockedOnDelay:
3053     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3054     break;
3055   case BlockedOnMVar:
3056     fprintf(stderr,"is blocked on an MVar");
3057     break;
3058   case BlockedOnException:
3059     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3060             tso->block_info.tso->id);
3061     break;
3062   case BlockedOnBlackHole:
3063     fprintf(stderr,"is blocked on a black hole");
3064     break;
3065   case NotBlocked:
3066     fprintf(stderr,"is not blocked");
3067     break;
3068 #if defined(PAR)
3069   case BlockedOnGA:
3070     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3071             tso->block_info.closure, info_type(tso->block_info.closure));
3072     break;
3073   case BlockedOnGA_NoSend:
3074     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3075             tso->block_info.closure, info_type(tso->block_info.closure));
3076     break;
3077 #endif
3078   default:
3079     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3080          tso->why_blocked, tso->id, tso);
3081   }
3082 }
3083
3084 void
3085 printThreadStatus(StgTSO *tso)
3086 {
3087   switch (tso->what_next) {
3088   case ThreadKilled:
3089     fprintf(stderr,"has been killed");
3090     break;
3091   case ThreadComplete:
3092     fprintf(stderr,"has completed");
3093     break;
3094   default:
3095     printThreadBlockage(tso);
3096   }
3097 }
3098
3099 void
3100 printAllThreads(void)
3101 {
3102   StgTSO *t;
3103
3104   sched_belch("all threads:");
3105   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3106     fprintf(stderr, "\tthread %d ", t->id);
3107     printThreadStatus(t);
3108     fprintf(stderr,"\n");
3109   }
3110 }
3111     
3112 /* 
3113    Print a whole blocking queue attached to node (debugging only).
3114 */
3115 //@cindex print_bq
3116 # if defined(PAR)
3117 void 
3118 print_bq (StgClosure *node)
3119 {
3120   StgBlockingQueueElement *bqe;
3121   StgTSO *tso;
3122   rtsBool end;
3123
3124   fprintf(stderr,"## BQ of closure %p (%s): ",
3125           node, info_type(node));
3126
3127   /* should cover all closures that may have a blocking queue */
3128   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3129          get_itbl(node)->type == FETCH_ME_BQ ||
3130          get_itbl(node)->type == RBH);
3131     
3132   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3133   /* 
3134      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3135   */
3136   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3137        !end; // iterate until bqe points to a CONSTR
3138        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3139     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3140     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
3141     /* types of closures that may appear in a blocking queue */
3142     ASSERT(get_itbl(bqe)->type == TSO ||           
3143            get_itbl(bqe)->type == BLOCKED_FETCH || 
3144            get_itbl(bqe)->type == CONSTR); 
3145     /* only BQs of an RBH end with an RBH_Save closure */
3146     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3147
3148     switch (get_itbl(bqe)->type) {
3149     case TSO:
3150       fprintf(stderr," TSO %d (%x),",
3151               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3152       break;
3153     case BLOCKED_FETCH:
3154       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3155               ((StgBlockedFetch *)bqe)->node, 
3156               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3157               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3158               ((StgBlockedFetch *)bqe)->ga.weight);
3159       break;
3160     case CONSTR:
3161       fprintf(stderr," %s (IP %p),",
3162               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3163                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3164                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3165                "RBH_Save_?"), get_itbl(bqe));
3166       break;
3167     default:
3168       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3169            info_type(bqe), node, info_type(node));
3170       break;
3171     }
3172   } /* for */
3173   fputc('\n', stderr);
3174 }
3175 # elif defined(GRAN)
3176 void 
3177 print_bq (StgClosure *node)
3178 {
3179   StgBlockingQueueElement *bqe;
3180   PEs node_loc, tso_loc;
3181   rtsBool end;
3182
3183   /* should cover all closures that may have a blocking queue */
3184   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3185          get_itbl(node)->type == FETCH_ME_BQ ||
3186          get_itbl(node)->type == RBH);
3187     
3188   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3189   node_loc = where_is(node);
3190
3191   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3192           node, info_type(node), node_loc);
3193
3194   /* 
3195      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3196   */
3197   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3198        !end; // iterate until bqe points to a CONSTR
3199        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3200     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3201     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3202     /* types of closures that may appear in a blocking queue */
3203     ASSERT(get_itbl(bqe)->type == TSO ||           
3204            get_itbl(bqe)->type == CONSTR); 
3205     /* only BQs of an RBH end with an RBH_Save closure */
3206     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3207
3208     tso_loc = where_is((StgClosure *)bqe);
3209     switch (get_itbl(bqe)->type) {
3210     case TSO:
3211       fprintf(stderr," TSO %d (%p) on [PE %d],",
3212               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3213       break;
3214     case CONSTR:
3215       fprintf(stderr," %s (IP %p),",
3216               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3217                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3218                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3219                "RBH_Save_?"), get_itbl(bqe));
3220       break;
3221     default:
3222       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3223            info_type((StgClosure *)bqe), node, info_type(node));
3224       break;
3225     }
3226   } /* for */
3227   fputc('\n', stderr);
3228 }
3229 #else
3230 /* 
3231    Nice and easy: only TSOs on the blocking queue
3232 */
3233 void 
3234 print_bq (StgClosure *node)
3235 {
3236   StgTSO *tso;
3237
3238   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3239   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3240        tso != END_TSO_QUEUE; 
3241        tso=tso->link) {
3242     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3243     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3244     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3245   }
3246   fputc('\n', stderr);
3247 }
3248 # endif
3249
3250 #if defined(PAR)
3251 static nat
3252 run_queue_len(void)
3253 {
3254   nat i;
3255   StgTSO *tso;
3256
3257   for (i=0, tso=run_queue_hd; 
3258        tso != END_TSO_QUEUE;
3259        i++, tso=tso->link)
3260     /* nothing */
3261
3262   return i;
3263 }
3264 #endif
3265
3266 static void
3267 sched_belch(char *s, ...)
3268 {
3269   va_list ap;
3270   va_start(ap,s);
3271 #ifdef SMP
3272   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3273 #else
3274   fprintf(stderr, "scheduler: ");
3275 #endif
3276   vfprintf(stderr, s, ap);
3277   fprintf(stderr, "\n");
3278 }
3279
3280 #endif /* DEBUG */
3281
3282
3283 //@node Index,  , Debugging Routines, Main scheduling code
3284 //@subsection Index
3285
3286 //@index
3287 //* MainRegTable::  @cindex\s-+MainRegTable
3288 //* StgMainThread::  @cindex\s-+StgMainThread
3289 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3290 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3291 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3292 //* context_switch::  @cindex\s-+context_switch
3293 //* createThread::  @cindex\s-+createThread
3294 //* free_capabilities::  @cindex\s-+free_capabilities
3295 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3296 //* initScheduler::  @cindex\s-+initScheduler
3297 //* interrupted::  @cindex\s-+interrupted
3298 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3299 //* next_thread_id::  @cindex\s-+next_thread_id
3300 //* print_bq::  @cindex\s-+print_bq
3301 //* run_queue_hd::  @cindex\s-+run_queue_hd
3302 //* run_queue_tl::  @cindex\s-+run_queue_tl
3303 //* sched_mutex::  @cindex\s-+sched_mutex
3304 //* schedule::  @cindex\s-+schedule
3305 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3306 //* task_ids::  @cindex\s-+task_ids
3307 //* term_mutex::  @cindex\s-+term_mutex
3308 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3309 //@end index