d87f7ab99a5901334d66ee97a4a44ce207db54d6
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.41 2000/01/13 14:34:05 hwloidl Exp $
3  *
4  * (c) The GHC Team, 1998-1999
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Prototypes::                
41 //* Main scheduling loop::      
42 //* Suspend and Resume::        
43 //* Run queue code::            
44 //* Garbage Collextion Routines::  
45 //* Blocking Queue Routines::   
46 //* Exception Handling Routines::  
47 //* Debugging Routines::        
48 //* Index::                     
49 //@end menu
50
51 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
52 //@subsection Includes
53
54 #include "Rts.h"
55 #include "SchedAPI.h"
56 #include "RtsUtils.h"
57 #include "RtsFlags.h"
58 #include "Storage.h"
59 #include "StgRun.h"
60 #include "StgStartup.h"
61 #include "GC.h"
62 #include "Hooks.h"
63 #include "Schedule.h"
64 #include "StgMiscClosures.h"
65 #include "Storage.h"
66 #include "Evaluator.h"
67 #include "Exception.h"
68 #include "Printer.h"
69 #include "Main.h"
70 #include "Signals.h"
71 #include "Profiling.h"
72 #include "Sanity.h"
73 #include "Stats.h"
74 #include "Sparks.h"
75 #if defined(GRAN) || defined(PAR)
76 # include "GranSimRts.h"
77 # include "GranSim.h"
78 # include "ParallelRts.h"
79 # include "Parallel.h"
80 # include "ParallelDebug.h"
81 # include "FetchMe.h"
82 # include "HLC.h"
83 #endif
84
85 #include <stdarg.h>
86
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
89
90 /* Main threads:
91  *
92  * These are the threads which clients have requested that we run.  
93  *
94  * In an SMP build, we might have several concurrent clients all
95  * waiting for results, and each one will wait on a condition variable
96  * until the result is available.
97  *
98  * In non-SMP, clients are strictly nested: the first client calls
99  * into the RTS, which might call out again to C with a _ccall_GC, and
100  * eventually re-enter the RTS.
101  *
102  * Main threads information is kept in a linked list:
103  */
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
106   StgTSO *         tso;
107   SchedulerStatus  stat;
108   StgClosure **    ret;
109 #ifdef SMP
110   pthread_cond_t wakeup;
111 #endif
112   struct StgMainThread_ *link;
113 } StgMainThread;
114
115 /* Main thread queue.
116  * Locks required: sched_mutex.
117  */
118 static StgMainThread *main_threads;
119
120 /* Thread queues.
121  * Locks required: sched_mutex.
122  */
123
124 #if DEBUG
125 char *whatNext_strs[] = {
126   "ThreadEnterGHC",
127   "ThreadRunGHC",
128   "ThreadEnterHugs",
129   "ThreadKilled",
130   "ThreadComplete"
131 };
132
133 char *threadReturnCode_strs[] = {
134   "HeapOverflow",                       /* might also be StackOverflow */
135   "StackOverflow",
136   "ThreadYielding",
137   "ThreadBlocked",
138   "ThreadFinished"
139 };
140 #endif
141
142 #if defined(GRAN)
143
144 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
145 // rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c
146
147 /* 
148    In GranSim we have a runable and a blocked queue for each processor.
149    In order to minimise code changes new arrays run_queue_hds/tls
150    are created. run_queue_hd is then a short cut (macro) for
151    run_queue_hds[CurrentProc] (see GranSim.h).
152    -- HWL
153 */
154 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
155 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
156 StgTSO *ccalling_threadss[MAX_PROC];
157
158 #else /* !GRAN */
159
160 //@cindex run_queue_hd
161 //@cindex run_queue_tl
162 //@cindex blocked_queue_hd
163 //@cindex blocked_queue_tl
164 StgTSO *run_queue_hd, *run_queue_tl;
165 StgTSO *blocked_queue_hd, *blocked_queue_tl;
166
167 /* Threads suspended in _ccall_GC.
168  * Locks required: sched_mutex.
169  */
170 static StgTSO *suspended_ccalling_threads;
171
172 static void GetRoots(void);
173 static StgTSO *threadStackOverflow(StgTSO *tso);
174 #endif
175
176 /* KH: The following two flags are shared memory locations.  There is no need
177        to lock them, since they are only unset at the end of a scheduler
178        operation.
179 */
180
181 /* flag set by signal handler to precipitate a context switch */
182 //@cindex context_switch
183 nat context_switch;
184
185 /* if this flag is set as well, give up execution */
186 //@cindex interrupted
187 rtsBool interrupted;
188
189 /* Next thread ID to allocate.
190  * Locks required: sched_mutex
191  */
192 //@cindex next_thread_id
193 StgThreadID next_thread_id = 1;
194
195 /*
196  * Pointers to the state of the current thread.
197  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
198  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
199  */
200  
201 /* The smallest stack size that makes any sense is:
202  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
203  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
204  *  + 1                       (the realworld token for an IO thread)
205  *  + 1                       (the closure to enter)
206  *
207  * A thread with this stack will bomb immediately with a stack
208  * overflow, which will increase its stack size.  
209  */
210
211 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
212
213 /* Free capability list.
214  * Locks required: sched_mutex.
215  */
216 #ifdef SMP
217 //@cindex free_capabilities
218 //@cindex n_free_capabilities
219 Capability *free_capabilities; /* Available capabilities for running threads */
220 nat n_free_capabilities;       /* total number of available capabilities */
221 #else
222 //@cindex MainRegTable
223 Capability MainRegTable;       /* for non-SMP, we have one global capability */
224 #endif
225
226 #if defined(GRAN)
227 StgTSO      *CurrentTSOs[MAX_PROC];
228 #else
229 StgTSO      *CurrentTSO;
230 #endif
231
232 rtsBool ready_to_gc;
233
234 /* All our current task ids, saved in case we need to kill them later.
235  */
236 #ifdef SMP
237 //@cindex task_ids
238 task_info *task_ids;
239 #endif
240
241 void            addToBlockedQueue ( StgTSO *tso );
242
243 static void     schedule          ( void );
244        void     interruptStgRts   ( void );
245 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
246
247 #ifdef DEBUG
248 static void sched_belch(char *s, ...);
249 #endif
250
251 #ifdef SMP
252 //@cindex sched_mutex
253 //@cindex term_mutex
254 //@cindex thread_ready_cond
255 //@cindex gc_pending_cond
256 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
257 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
258 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
259 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
260
261 nat await_death;
262 #endif
263
264 #if defined(PAR)
265 StgTSO *LastTSO;
266 rtsTime TimeOfLastYield;
267 #endif
268
269 /*
270  * The thread state for the main thread.
271 // ToDo: check whether not needed any more
272 StgTSO   *MainTSO;
273  */
274
275
276 //@node Prototypes, Main scheduling loop, Variables and Data structures, Main scheduling code
277 //@subsection Prototypes
278
279 #if 0 && defined(GRAN)
280 // ToDo: replace these with macros
281 static /* inline */ void    add_to_run_queue(StgTSO* tso); 
282 static /* inline */ void    push_on_run_queue(StgTSO* tso); 
283 static /* inline */ StgTSO *take_off_run_queue(StgTSO *tso);
284
285 /* Thread management */
286 void initScheduler(void);
287 #endif
288
289 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
290 //@subsection Main scheduling loop
291
292 /* ---------------------------------------------------------------------------
293    Main scheduling loop.
294
295    We use round-robin scheduling, each thread returning to the
296    scheduler loop when one of these conditions is detected:
297
298       * out of heap space
299       * timer expires (thread yields)
300       * thread blocks
301       * thread ends
302       * stack overflow
303
304    Locking notes:  we acquire the scheduler lock once at the beginning
305    of the scheduler loop, and release it when
306     
307       * running a thread, or
308       * waiting for work, or
309       * waiting for a GC to complete.
310
311    ------------------------------------------------------------------------ */
312 //@cindex schedule
313 static void
314 schedule( void )
315 {
316   StgTSO *t;
317   Capability *cap;
318   StgThreadReturnCode ret;
319 #if defined(GRAN)
320   rtsEvent *event;
321 #elif defined(PAR)
322   rtsSpark spark;
323   StgTSO *tso;
324   GlobalTaskId pe;
325 #endif
326   
327   ACQUIRE_LOCK(&sched_mutex);
328
329 #if defined(GRAN)
330 # error ToDo: implement GranSim scheduler
331 #elif defined(PAR)
332   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
333
334     if (PendingFetches != END_BF_QUEUE) {
335         processFetches();
336     }
337 #else
338   while (1) {
339 #endif
340
341     /* If we're interrupted (the user pressed ^C, or some other
342      * termination condition occurred), kill all the currently running
343      * threads.
344      */
345     if (interrupted) {
346       IF_DEBUG(scheduler, sched_belch("interrupted"));
347       for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
348         deleteThread(t);
349       }
350       for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
351         deleteThread(t);
352       }
353       run_queue_hd = run_queue_tl = END_TSO_QUEUE;
354       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
355     }
356
357     /* Go through the list of main threads and wake up any
358      * clients whose computations have finished.  ToDo: this
359      * should be done more efficiently without a linear scan
360      * of the main threads list, somehow...
361      */
362 #ifdef SMP
363     { 
364       StgMainThread *m, **prev;
365       prev = &main_threads;
366       for (m = main_threads; m != NULL; m = m->link) {
367         switch (m->tso->whatNext) {
368         case ThreadComplete:
369           if (m->ret) {
370             *(m->ret) = (StgClosure *)m->tso->sp[0];
371           }
372           *prev = m->link;
373           m->stat = Success;
374           pthread_cond_broadcast(&m->wakeup);
375           break;
376         case ThreadKilled:
377           *prev = m->link;
378           m->stat = Killed;
379           pthread_cond_broadcast(&m->wakeup);
380           break;
381         default:
382           break;
383         }
384       }
385     }
386 #else
387     /* If our main thread has finished or been killed, return.
388      */
389     {
390       StgMainThread *m = main_threads;
391       if (m->tso->whatNext == ThreadComplete
392           || m->tso->whatNext == ThreadKilled) {
393         main_threads = main_threads->link;
394         if (m->tso->whatNext == ThreadComplete) {
395           /* we finished successfully, fill in the return value */
396           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
397           m->stat = Success;
398           return;
399         } else {
400           m->stat = Killed;
401           return;
402         }
403       }
404     }
405 #endif
406
407     /* Top up the run queue from our spark pool.  We try to make the
408      * number of threads in the run queue equal to the number of
409      * free capabilities.
410      */
411 #if defined(SMP)
412     {
413       nat n = n_free_capabilities;
414       StgTSO *tso = run_queue_hd;
415
416       /* Count the run queue */
417       while (n > 0 && tso != END_TSO_QUEUE) {
418         tso = tso->link;
419         n--;
420       }
421
422       for (; n > 0; n--) {
423         StgClosure *spark;
424         spark = findSpark();
425         if (spark == NULL) {
426           break; /* no more sparks in the pool */
427         } else {
428           // I'd prefer this to be done in activateSpark -- HWL
429           StgTSO *tso;
430           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
431           pushClosure(tso,spark);
432           PUSH_ON_RUN_QUEUE(tso);
433 #ifdef PAR
434           advisory_thread_count++;
435 #endif
436           
437           IF_DEBUG(scheduler,
438                    sched_belch("turning spark of closure %p into a thread",
439                                (StgClosure *)spark));
440         }
441       }
442       /* We need to wake up the other tasks if we just created some
443        * work for them.
444        */
445       if (n_free_capabilities - n > 1) {
446           pthread_cond_signal(&thread_ready_cond);
447       }
448     }
449 #endif /* SMP */
450
451     /* Check whether any waiting threads need to be woken up.  If the
452      * run queue is empty, and there are no other tasks running, we
453      * can wait indefinitely for something to happen.
454      * ToDo: what if another client comes along & requests another
455      * main thread?
456      */
457     if (blocked_queue_hd != END_TSO_QUEUE) {
458       awaitEvent(
459            (run_queue_hd == END_TSO_QUEUE)
460 #ifdef SMP
461         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
462 #endif
463         );
464     }
465     
466     /* check for signals each time around the scheduler */
467 #ifndef __MINGW32__
468     if (signals_pending()) {
469       start_signal_handlers();
470     }
471 #endif
472
473     /* Detect deadlock: when we have no threads to run, there are
474      * no threads waiting on I/O or sleeping, and all the other
475      * tasks are waiting for work, we must have a deadlock.  Inform
476      * all the main threads.
477      */
478 #ifdef SMP
479     if (blocked_queue_hd == END_TSO_QUEUE
480         && run_queue_hd == END_TSO_QUEUE
481         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
482         ) {
483       StgMainThread *m;
484       for (m = main_threads; m != NULL; m = m->link) {
485           m->ret = NULL;
486           m->stat = Deadlock;
487           pthread_cond_broadcast(&m->wakeup);
488       }
489       main_threads = NULL;
490     }
491 #else /* ! SMP */
492     if (blocked_queue_hd == END_TSO_QUEUE
493         && run_queue_hd == END_TSO_QUEUE) {
494       StgMainThread *m = main_threads;
495       m->ret = NULL;
496       m->stat = Deadlock;
497       main_threads = m->link;
498       return;
499     }
500 #endif
501
502 #ifdef SMP
503     /* If there's a GC pending, don't do anything until it has
504      * completed.
505      */
506     if (ready_to_gc) {
507       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
508       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
509     }
510     
511     /* block until we've got a thread on the run queue and a free
512      * capability.
513      */
514     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
515       IF_DEBUG(scheduler, sched_belch("waiting for work"));
516       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
517       IF_DEBUG(scheduler, sched_belch("work now available"));
518     }
519 #endif
520
521 #if defined(GRAN)
522 # error ToDo: implement GranSim scheduler
523 #elif defined(PAR)
524     // ToDo: phps merge with spark activation above
525     /* check whether we have local work and send requests if we have none */
526     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
527       /* :-[  no local threads => look out for local sparks */
528       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
529           (pending_sparks_hd[REQUIRED_POOL] < pending_sparks_tl[REQUIRED_POOL] ||
530            pending_sparks_hd[ADVISORY_POOL] < pending_sparks_tl[ADVISORY_POOL])) {
531         /* 
532          * ToDo: add GC code check that we really have enough heap afterwards!!
533          * Old comment:
534          * If we're here (no runnable threads) and we have pending
535          * sparks, we must have a space problem.  Get enough space
536          * to turn one of those pending sparks into a
537          * thread... 
538          */
539         
540         spark = findSpark();                /* get a spark */
541         if (spark != (rtsSpark) NULL) {
542           tso = activateSpark(spark);       /* turn the spark into a thread */
543           IF_PAR_DEBUG(verbose,
544                        belch("== [%x] schedule: Created TSO %p (%d); %d threads active",
545                              mytid, tso, tso->id, advisory_thread_count));
546
547           if (tso==END_TSO_QUEUE) { // failed to activate spark -> back to loop
548             belch("^^ failed to activate spark");
549             goto next_thread;
550           }                         // otherwise fall through & pick-up new tso
551         } else {
552           IF_PAR_DEBUG(verbose,
553                        belch("^^ no local sparks (spark pool contains only NFs: %d)", 
554                              spark_queue_len(ADVISORY_POOL)));
555           goto next_thread;
556         }
557       } else  
558       /* =8-[  no local sparks => look for work on other PEs */
559       {
560         /*
561          * We really have absolutely no work.  Send out a fish
562          * (there may be some out there already), and wait for
563          * something to arrive.  We clearly can't run any threads
564          * until a SCHEDULE or RESUME arrives, and so that's what
565          * we're hoping to see.  (Of course, we still have to
566          * respond to other types of messages.)
567          */
568         if (//!fishing &&  
569             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
570           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
571           /* fishing set in sendFish, processFish;
572              avoid flooding system with fishes via delay */
573           pe = choosePE();
574           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
575                    NEW_FISH_HUNGER);
576         }
577         
578         processMessages();
579         goto next_thread;
580         // ReSchedule(0);
581       }
582     } else if (PacketsWaiting()) {  /* Look for incoming messages */
583       processMessages();
584     }
585
586     /* Now we are sure that we have some work available */
587     ASSERT(run_queue_hd != END_TSO_QUEUE);
588     /* Take a thread from the run queue, if we have work */
589     t = take_off_run_queue(END_TSO_QUEUE);
590
591     /* ToDo: write something to the log-file
592     if (RTSflags.ParFlags.granSimStats && !sameThread)
593         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
594     */
595
596     CurrentTSO = t;
597
598     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; lim=%x)", 
599                               spark_queue_len(ADVISORY_POOL), CURRENT_PROC,
600                               pending_sparks_hd[ADVISORY_POOL], 
601                               pending_sparks_tl[ADVISORY_POOL], 
602                               pending_sparks_lim[ADVISORY_POOL]));
603
604     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
605                               run_queue_len(), CURRENT_PROC,
606                               run_queue_hd, run_queue_tl));
607
608     if (t != LastTSO) {
609       /* 
610          we are running a different TSO, so write a schedule event to log file
611          NB: If we use fair scheduling we also have to write  a deschedule 
612              event for LastTSO; with unfair scheduling we know that the
613              previous tso has blocked whenever we switch to another tso, so
614              we don't need it in GUM for now
615       */
616       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
617                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
618       
619     }
620
621 #else /* !GRAN && !PAR */
622   
623     /* grab a thread from the run queue
624      */
625     t = POP_RUN_QUEUE();
626
627 #endif
628     
629     /* grab a capability
630      */
631 #ifdef SMP
632     cap = free_capabilities;
633     free_capabilities = cap->link;
634     n_free_capabilities--;
635 #else
636     cap = &MainRegTable;
637 #endif
638     
639     cap->rCurrentTSO = t;
640     
641     /* set the context_switch flag
642      */
643     if (run_queue_hd == END_TSO_QUEUE)
644       context_switch = 0;
645     else
646       context_switch = 1;
647
648     RELEASE_LOCK(&sched_mutex);
649     
650     IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
651
652     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
653     /* Run the current thread 
654      */
655     switch (cap->rCurrentTSO->whatNext) {
656     case ThreadKilled:
657     case ThreadComplete:
658       /* Thread already finished, return to scheduler. */
659       ret = ThreadFinished;
660       break;
661     case ThreadEnterGHC:
662       ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
663       break;
664     case ThreadRunGHC:
665       ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
666       break;
667     case ThreadEnterHugs:
668 #ifdef INTERPRETER
669       {
670          StgClosure* c;
671          IF_DEBUG(scheduler,sched_belch("entering Hugs"));
672          c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
673          cap->rCurrentTSO->sp += 1;
674          ret = enter(cap,c);
675          break;
676       }
677 #else
678       barf("Panic: entered a BCO but no bytecode interpreter in this build");
679 #endif
680     default:
681       barf("schedule: invalid whatNext field");
682     }
683     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
684     
685     /* Costs for the scheduler are assigned to CCS_SYSTEM */
686 #ifdef PROFILING
687     CCCS = CCS_SYSTEM;
688 #endif
689     
690     ACQUIRE_LOCK(&sched_mutex);
691
692 #ifdef SMP
693     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
694 #else
695     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
696 #endif
697     t = cap->rCurrentTSO;
698     
699     switch (ret) {
700     case HeapOverflow:
701       /* make all the running tasks block on a condition variable,
702        * maybe set context_switch and wait till they all pile in,
703        * then have them wait on a GC condition variable.
704        */
705       IF_DEBUG(scheduler,belch("thread %ld stopped: HeapOverflow", t->id));
706       threadPaused(t);
707       
708       ready_to_gc = rtsTrue;
709       context_switch = 1;               /* stop other threads ASAP */
710       PUSH_ON_RUN_QUEUE(t);
711       break;
712       
713     case StackOverflow:
714       /* just adjust the stack for this thread, then pop it back
715        * on the run queue.
716        */
717       IF_DEBUG(scheduler,belch("thread %ld stopped, StackOverflow", t->id));
718       threadPaused(t);
719       { 
720         StgMainThread *m;
721         /* enlarge the stack */
722         StgTSO *new_t = threadStackOverflow(t);
723         
724         /* This TSO has moved, so update any pointers to it from the
725          * main thread stack.  It better not be on any other queues...
726          * (it shouldn't be)
727          */
728         for (m = main_threads; m != NULL; m = m->link) {
729           if (m->tso == t) {
730             m->tso = new_t;
731           }
732         }
733         PUSH_ON_RUN_QUEUE(new_t);
734       }
735       break;
736
737     case ThreadYielding:
738 #if defined(GRAN)
739       IF_DEBUG(gran, 
740                DumpGranEvent(GR_DESCHEDULE, t));
741       globalGranStats.tot_yields++;
742 #elif defined(PAR)
743       IF_DEBUG(par, 
744                DumpGranEvent(GR_DESCHEDULE, t));
745 #endif
746       /* put the thread back on the run queue.  Then, if we're ready to
747        * GC, check whether this is the last task to stop.  If so, wake
748        * up the GC thread.  getThread will block during a GC until the
749        * GC is finished.
750        */
751       IF_DEBUG(scheduler,
752                if (t->whatNext == ThreadEnterHugs) {
753                  /* ToDo: or maybe a timer expired when we were in Hugs?
754                   * or maybe someone hit ctrl-C
755                   */
756                  belch("thread %ld stopped to switch to Hugs", t->id);
757                } else {
758                  belch("thread %ld stopped, yielding", t->id);
759                }
760                );
761       threadPaused(t);
762       APPEND_TO_RUN_QUEUE(t);
763       break;
764       
765     case ThreadBlocked:
766 #if defined(GRAN)
767 # error ToDo: implement GranSim scheduler
768 #elif defined(PAR)
769       IF_DEBUG(par, 
770                DumpGranEvent(GR_DESCHEDULE, t)); 
771 #else
772 #endif
773       /* don't need to do anything.  Either the thread is blocked on
774        * I/O, in which case we'll have called addToBlockedQueue
775        * previously, or it's blocked on an MVar or Blackhole, in which
776        * case it'll be on the relevant queue already.
777        */
778       IF_DEBUG(scheduler,
779                fprintf(stderr, "thread %d stopped, ", t->id);
780                printThreadBlockage(t);
781                fprintf(stderr, "\n"));
782       threadPaused(t);
783       break;
784       
785     case ThreadFinished:
786       /* Need to check whether this was a main thread, and if so, signal
787        * the task that started it with the return value.  If we have no
788        * more main threads, we probably need to stop all the tasks until
789        * we get a new one.
790        */
791       IF_DEBUG(scheduler,belch("thread %ld finished", t->id));
792       t->whatNext = ThreadComplete;
793 #if defined(GRAN)
794       // ToDo: endThread(t, CurrentProc); // clean-up the thread
795 #elif defined(PAR)
796       advisory_thread_count--;
797       if (RtsFlags.ParFlags.ParStats.Full) 
798         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
799 #endif
800       break;
801       
802     default:
803       barf("doneThread: invalid thread return code");
804     }
805     
806 #ifdef SMP
807     cap->link = free_capabilities;
808     free_capabilities = cap;
809     n_free_capabilities++;
810 #endif
811
812 #ifdef SMP
813     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
814 #else
815     if (ready_to_gc) 
816 #endif
817       {
818       /* everybody back, start the GC.
819        * Could do it in this thread, or signal a condition var
820        * to do it in another thread.  Either way, we need to
821        * broadcast on gc_pending_cond afterward.
822        */
823 #ifdef SMP
824       IF_DEBUG(scheduler,sched_belch("doing GC"));
825 #endif
826       GarbageCollect(GetRoots);
827       ready_to_gc = rtsFalse;
828 #ifdef SMP
829       pthread_cond_broadcast(&gc_pending_cond);
830 #endif
831     }
832 #if defined(GRAN)
833   next_thread:
834     IF_GRAN_DEBUG(unused,
835                   print_eventq(EventHd));
836
837     event = get_next_event();
838
839 #elif defined(PAR)
840   next_thread:
841     /* ToDo: wait for next message to arrive rather than busy wait */
842
843 #else /* GRAN */
844   /* not any more
845   next_thread:
846     t = take_off_run_queue(END_TSO_QUEUE);
847   */
848 #endif /* GRAN */
849   } /* end of while(1) */
850 }
851
852 /* A hack for Hugs concurrency support.  Needs sanitisation (?) */
853 void deleteAllThreads ( void )
854 {
855   StgTSO* t;
856   IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
857   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
858     deleteThread(t);
859   }
860   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
861     deleteThread(t);
862   }
863   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
864   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
865 }
866
867 /* startThread and  insertThread are now in GranSim.c -- HWL */
868
869 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
870 //@subsection Suspend and Resume
871
872 /* ---------------------------------------------------------------------------
873  * Suspending & resuming Haskell threads.
874  * 
875  * When making a "safe" call to C (aka _ccall_GC), the task gives back
876  * its capability before calling the C function.  This allows another
877  * task to pick up the capability and carry on running Haskell
878  * threads.  It also means that if the C call blocks, it won't lock
879  * the whole system.
880  *
881  * The Haskell thread making the C call is put to sleep for the
882  * duration of the call, on the susepended_ccalling_threads queue.  We
883  * give out a token to the task, which it can use to resume the thread
884  * on return from the C function.
885  * ------------------------------------------------------------------------- */
886    
887 StgInt
888 suspendThread( Capability *cap )
889 {
890   nat tok;
891
892   ACQUIRE_LOCK(&sched_mutex);
893
894   IF_DEBUG(scheduler,
895            sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
896
897   threadPaused(cap->rCurrentTSO);
898   cap->rCurrentTSO->link = suspended_ccalling_threads;
899   suspended_ccalling_threads = cap->rCurrentTSO;
900
901   /* Use the thread ID as the token; it should be unique */
902   tok = cap->rCurrentTSO->id;
903
904 #ifdef SMP
905   cap->link = free_capabilities;
906   free_capabilities = cap;
907   n_free_capabilities++;
908 #endif
909
910   RELEASE_LOCK(&sched_mutex);
911   return tok; 
912 }
913
914 Capability *
915 resumeThread( StgInt tok )
916 {
917   StgTSO *tso, **prev;
918   Capability *cap;
919
920   ACQUIRE_LOCK(&sched_mutex);
921
922   prev = &suspended_ccalling_threads;
923   for (tso = suspended_ccalling_threads; 
924        tso != END_TSO_QUEUE; 
925        prev = &tso->link, tso = tso->link) {
926     if (tso->id == (StgThreadID)tok) {
927       *prev = tso->link;
928       break;
929     }
930   }
931   if (tso == END_TSO_QUEUE) {
932     barf("resumeThread: thread not found");
933   }
934
935 #ifdef SMP
936   while (free_capabilities == NULL) {
937     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
938     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
939     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
940   }
941   cap = free_capabilities;
942   free_capabilities = cap->link;
943   n_free_capabilities--;
944 #else  
945   cap = &MainRegTable;
946 #endif
947
948   cap->rCurrentTSO = tso;
949
950   RELEASE_LOCK(&sched_mutex);
951   return cap;
952 }
953
954
955 /* ---------------------------------------------------------------------------
956  * Static functions
957  * ------------------------------------------------------------------------ */
958 static void unblockThread(StgTSO *tso);
959
960 /* ---------------------------------------------------------------------------
961  * Comparing Thread ids.
962  *
963  * This is used from STG land in the implementation of the
964  * instances of Eq/Ord for ThreadIds.
965  * ------------------------------------------------------------------------ */
966
967 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
968
969   StgThreadID id1 = tso1->id; 
970   StgThreadID id2 = tso2->id;
971  
972   if (id1 < id2) return (-1);
973   if (id1 > id2) return 1;
974   return 0;
975 }
976
977 /* ---------------------------------------------------------------------------
978    Create a new thread.
979
980    The new thread starts with the given stack size.  Before the
981    scheduler can run, however, this thread needs to have a closure
982    (and possibly some arguments) pushed on its stack.  See
983    pushClosure() in Schedule.h.
984
985    createGenThread() and createIOThread() (in SchedAPI.h) are
986    convenient packaged versions of this function.
987    ------------------------------------------------------------------------ */
988 //@cindex createThread
989 #if defined(GRAN)
990 /* currently pri (priority) is only used in a GRAN setup -- HWL */
991 StgTSO *
992 createThread(nat stack_size, StgInt pri)
993 {
994   return createThread_(stack_size, rtsFalse, pri);
995 }
996
997 static StgTSO *
998 createThread_(nat size, rtsBool have_lock, StgInt pri)
999 {
1000 #else
1001 StgTSO *
1002 createThread(nat stack_size)
1003 {
1004   return createThread_(stack_size, rtsFalse);
1005 }
1006
1007 static StgTSO *
1008 createThread_(nat size, rtsBool have_lock)
1009 {
1010 #endif
1011     StgTSO *tso;
1012     nat stack_size;
1013
1014     /* First check whether we should create a thread at all */
1015 #if defined(PAR)
1016   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1017   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1018     threadsIgnored++;
1019     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1020           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1021     return END_TSO_QUEUE;
1022   }
1023   threadsCreated++;
1024 #endif
1025
1026 #if defined(GRAN)
1027   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1028 #endif
1029
1030   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1031
1032   /* catch ridiculously small stack sizes */
1033   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1034     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1035   }
1036
1037   tso = (StgTSO *)allocate(size);
1038   TICK_ALLOC_TSO(size-sizeofW(StgTSO),0);
1039   
1040   stack_size = size - TSO_STRUCT_SIZEW;
1041
1042   // Hmm, this CCS_MAIN is not protected by a PROFILING cpp var;
1043   SET_HDR(tso, &TSO_info, CCS_MAIN);
1044 #if defined(GRAN)
1045   SET_GRAN_HDR(tso, ThisPE);
1046 #endif
1047   tso->whatNext     = ThreadEnterGHC;
1048
1049   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1050          protect the increment operation on next_thread_id.
1051          In future, we could use an atomic increment instead.
1052   */
1053   
1054   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1055   tso->id = next_thread_id++; 
1056   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1057
1058   tso->why_blocked  = NotBlocked;
1059   tso->blocked_exceptions = NULL;
1060
1061   tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1062   tso->stack_size   = stack_size;
1063   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1064                               - TSO_STRUCT_SIZEW;
1065   tso->sp           = (P_)&(tso->stack) + stack_size;
1066
1067 #ifdef PROFILING
1068   tso->prof.CCCS = CCS_MAIN;
1069 #endif
1070
1071   /* put a stop frame on the stack */
1072   tso->sp -= sizeofW(StgStopFrame);
1073   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
1074   tso->su = (StgUpdateFrame*)tso->sp;
1075
1076   IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words", 
1077                            tso->id, tso, tso->stack_size));
1078
1079   // ToDo: check this
1080 #if defined(GRAN)
1081   tso->link = END_TSO_QUEUE;
1082   /* uses more flexible routine in GranSim */
1083   insertThread(tso, CurrentProc);
1084 #else
1085   add_to_run_queue(tso);
1086 #endif
1087
1088 #if defined(GRAN)
1089   tso->gran.pri = pri;
1090   tso->gran.magic = TSO_MAGIC; // debugging only
1091   tso->gran.sparkname   = 0;
1092   tso->gran.startedat   = CURRENT_TIME; 
1093   tso->gran.exported    = 0;
1094   tso->gran.basicblocks = 0;
1095   tso->gran.allocs      = 0;
1096   tso->gran.exectime    = 0;
1097   tso->gran.fetchtime   = 0;
1098   tso->gran.fetchcount  = 0;
1099   tso->gran.blocktime   = 0;
1100   tso->gran.blockcount  = 0;
1101   tso->gran.blockedat   = 0;
1102   tso->gran.globalsparks = 0;
1103   tso->gran.localsparks  = 0;
1104   if (RtsFlags.GranFlags.Light)
1105     tso->gran.clock  = Now; /* local clock */
1106   else
1107     tso->gran.clock  = 0;
1108
1109   IF_DEBUG(gran,printTSO(tso));
1110 #elif defined(PAR)
1111   tso->par.sparkname   = 0;
1112   tso->par.startedat   = CURRENT_TIME; 
1113   tso->par.exported    = 0;
1114   tso->par.basicblocks = 0;
1115   tso->par.allocs      = 0;
1116   tso->par.exectime    = 0;
1117   tso->par.fetchtime   = 0;
1118   tso->par.fetchcount  = 0;
1119   tso->par.blocktime   = 0;
1120   tso->par.blockcount  = 0;
1121   tso->par.blockedat   = 0;
1122   tso->par.globalsparks = 0;
1123   tso->par.localsparks  = 0;
1124 #endif
1125
1126 #if defined(GRAN)
1127   globalGranStats.tot_threads_created++;
1128   globalGranStats.threads_created_on_PE[CurrentProc]++;
1129   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1130   globalGranStats.tot_sq_probes++;
1131 #endif 
1132
1133   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1134                                  tso->id, tso->stack_size));
1135   return tso;
1136 }
1137
1138 /* ---------------------------------------------------------------------------
1139  * scheduleThread()
1140  *
1141  * scheduleThread puts a thread on the head of the runnable queue.
1142  * This will usually be done immediately after a thread is created.
1143  * The caller of scheduleThread must create the thread using e.g.
1144  * createThread and push an appropriate closure
1145  * on this thread's stack before the scheduler is invoked.
1146  * ------------------------------------------------------------------------ */
1147
1148 void
1149 scheduleThread(StgTSO *tso)
1150 {
1151   ACQUIRE_LOCK(&sched_mutex);
1152
1153   /* Put the new thread on the head of the runnable queue.  The caller
1154    * better push an appropriate closure on this thread's stack
1155    * beforehand.  In the SMP case, the thread may start running as
1156    * soon as we release the scheduler lock below.
1157    */
1158   PUSH_ON_RUN_QUEUE(tso);
1159   THREAD_RUNNABLE();
1160
1161   IF_DEBUG(scheduler,printTSO(tso));
1162   RELEASE_LOCK(&sched_mutex);
1163 }
1164
1165 /* ---------------------------------------------------------------------------
1166  * startTasks()
1167  *
1168  * Start up Posix threads to run each of the scheduler tasks.
1169  * I believe the task ids are not needed in the system as defined.
1170  *  KH @ 25/10/99
1171  * ------------------------------------------------------------------------ */
1172
1173 #ifdef SMP
1174 static void *
1175 taskStart( void *arg STG_UNUSED )
1176 {
1177   schedule();
1178   return NULL;
1179 }
1180 #endif
1181
1182 /* ---------------------------------------------------------------------------
1183  * initScheduler()
1184  *
1185  * Initialise the scheduler.  This resets all the queues - if the
1186  * queues contained any threads, they'll be garbage collected at the
1187  * next pass.
1188  *
1189  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1190  * ------------------------------------------------------------------------ */
1191
1192 #ifdef SMP
1193 static void
1194 term_handler(int sig STG_UNUSED)
1195 {
1196   stat_workerStop();
1197   ACQUIRE_LOCK(&term_mutex);
1198   await_death--;
1199   RELEASE_LOCK(&term_mutex);
1200   pthread_exit(NULL);
1201 }
1202 #endif
1203
1204 //@cindex initScheduler
1205 void 
1206 initScheduler(void)
1207 {
1208 #if defined(GRAN)
1209   nat i;
1210
1211   for (i=0; i<=MAX_PROC; i++) {
1212     run_queue_hds[i]      = END_TSO_QUEUE;
1213     run_queue_tls[i]      = END_TSO_QUEUE;
1214     blocked_queue_hds[i]  = END_TSO_QUEUE;
1215     blocked_queue_tls[i]  = END_TSO_QUEUE;
1216     ccalling_threadss[i]  = END_TSO_QUEUE;
1217   }
1218 #else
1219   run_queue_hd      = END_TSO_QUEUE;
1220   run_queue_tl      = END_TSO_QUEUE;
1221   blocked_queue_hd  = END_TSO_QUEUE;
1222   blocked_queue_tl  = END_TSO_QUEUE;
1223 #endif 
1224
1225   suspended_ccalling_threads  = END_TSO_QUEUE;
1226
1227   main_threads = NULL;
1228
1229   context_switch = 0;
1230   interrupted    = 0;
1231
1232   enteredCAFs = END_CAF_LIST;
1233
1234   /* Install the SIGHUP handler */
1235 #ifdef SMP
1236   {
1237     struct sigaction action,oact;
1238
1239     action.sa_handler = term_handler;
1240     sigemptyset(&action.sa_mask);
1241     action.sa_flags = 0;
1242     if (sigaction(SIGTERM, &action, &oact) != 0) {
1243       barf("can't install TERM handler");
1244     }
1245   }
1246 #endif
1247
1248 #ifdef SMP
1249   /* Allocate N Capabilities */
1250   {
1251     nat i;
1252     Capability *cap, *prev;
1253     cap  = NULL;
1254     prev = NULL;
1255     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1256       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1257       cap->link = prev;
1258       prev = cap;
1259     }
1260     free_capabilities = cap;
1261     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1262   }
1263   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1264                              n_free_capabilities););
1265 #endif
1266
1267 #if defined(SMP) || defined(PAR)
1268   initSparkPools();
1269 #endif
1270 }
1271
1272 #ifdef SMP
1273 void
1274 startTasks( void )
1275 {
1276   nat i;
1277   int r;
1278   pthread_t tid;
1279   
1280   /* make some space for saving all the thread ids */
1281   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1282                             "initScheduler:task_ids");
1283   
1284   /* and create all the threads */
1285   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1286     r = pthread_create(&tid,NULL,taskStart,NULL);
1287     if (r != 0) {
1288       barf("startTasks: Can't create new Posix thread");
1289     }
1290     task_ids[i].id = tid;
1291     task_ids[i].mut_time = 0.0;
1292     task_ids[i].mut_etime = 0.0;
1293     task_ids[i].gc_time = 0.0;
1294     task_ids[i].gc_etime = 0.0;
1295     task_ids[i].elapsedtimestart = elapsedtime();
1296     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1297   }
1298 }
1299 #endif
1300
1301 void
1302 exitScheduler( void )
1303 {
1304 #ifdef SMP
1305   nat i;
1306
1307   /* Don't want to use pthread_cancel, since we'd have to install
1308    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1309    * all our locks.
1310    */
1311 #if 0
1312   /* Cancel all our tasks */
1313   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1314     pthread_cancel(task_ids[i].id);
1315   }
1316   
1317   /* Wait for all the tasks to terminate */
1318   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1319     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1320                                task_ids[i].id));
1321     pthread_join(task_ids[i].id, NULL);
1322   }
1323 #endif
1324
1325   /* Send 'em all a SIGHUP.  That should shut 'em up.
1326    */
1327   await_death = RtsFlags.ParFlags.nNodes;
1328   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1329     pthread_kill(task_ids[i].id,SIGTERM);
1330   }
1331   while (await_death > 0) {
1332     sched_yield();
1333   }
1334 #endif
1335 }
1336
1337 /* -----------------------------------------------------------------------------
1338    Managing the per-task allocation areas.
1339    
1340    Each capability comes with an allocation area.  These are
1341    fixed-length block lists into which allocation can be done.
1342
1343    ToDo: no support for two-space collection at the moment???
1344    -------------------------------------------------------------------------- */
1345
1346 /* -----------------------------------------------------------------------------
1347  * waitThread is the external interface for running a new computataion
1348  * and waiting for the result.
1349  *
1350  * In the non-SMP case, we create a new main thread, push it on the 
1351  * main-thread stack, and invoke the scheduler to run it.  The
1352  * scheduler will return when the top main thread on the stack has
1353  * completed or died, and fill in the necessary fields of the
1354  * main_thread structure.
1355  *
1356  * In the SMP case, we create a main thread as before, but we then
1357  * create a new condition variable and sleep on it.  When our new
1358  * main thread has completed, we'll be woken up and the status/result
1359  * will be in the main_thread struct.
1360  * -------------------------------------------------------------------------- */
1361
1362 SchedulerStatus
1363 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1364 {
1365   StgMainThread *m;
1366   SchedulerStatus stat;
1367
1368   ACQUIRE_LOCK(&sched_mutex);
1369   
1370   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1371
1372   m->tso = tso;
1373   m->ret = ret;
1374   m->stat = NoStatus;
1375 #ifdef SMP
1376   pthread_cond_init(&m->wakeup, NULL);
1377 #endif
1378
1379   m->link = main_threads;
1380   main_threads = m;
1381
1382   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1383                               m->tso->id));
1384
1385 #ifdef SMP
1386   do {
1387     pthread_cond_wait(&m->wakeup, &sched_mutex);
1388   } while (m->stat == NoStatus);
1389 #else
1390   schedule();
1391   ASSERT(m->stat != NoStatus);
1392 #endif
1393
1394   stat = m->stat;
1395
1396 #ifdef SMP
1397   pthread_cond_destroy(&m->wakeup);
1398 #endif
1399
1400   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1401                               m->tso->id));
1402   free(m);
1403
1404   RELEASE_LOCK(&sched_mutex);
1405
1406   return stat;
1407 }
1408
1409 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1410 //@subsection Run queue code 
1411
1412 #if 0
1413 /* 
1414    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1415        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1416        implicit global variable that has to be correct when calling these
1417        fcts -- HWL 
1418 */
1419
1420 /* Put the new thread on the head of the runnable queue.
1421  * The caller of createThread better push an appropriate closure
1422  * on this thread's stack before the scheduler is invoked.
1423  */
1424 static /* inline */ void
1425 add_to_run_queue(tso)
1426 StgTSO* tso; 
1427 {
1428   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1429   tso->link = run_queue_hd;
1430   run_queue_hd = tso;
1431   if (run_queue_tl == END_TSO_QUEUE) {
1432     run_queue_tl = tso;
1433   }
1434 }
1435
1436 /* Put the new thread at the end of the runnable queue. */
1437 static /* inline */ void
1438 push_on_run_queue(tso)
1439 StgTSO* tso; 
1440 {
1441   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1442   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1443   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1444   if (run_queue_hd == END_TSO_QUEUE) {
1445     run_queue_hd = tso;
1446   } else {
1447     run_queue_tl->link = tso;
1448   }
1449   run_queue_tl = tso;
1450 }
1451
1452 /* 
1453    Should be inlined because it's used very often in schedule.  The tso
1454    argument is actually only needed in GranSim, where we want to have the
1455    possibility to schedule *any* TSO on the run queue, irrespective of the
1456    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1457    the run queue and dequeue the tso, adjusting the links in the queue. 
1458 */
1459 //@cindex take_off_run_queue
1460 static /* inline */ StgTSO*
1461 take_off_run_queue(StgTSO *tso) {
1462   StgTSO *t, *prev;
1463
1464   /* 
1465      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1466
1467      if tso is specified, unlink that tso from the run_queue (doesn't have
1468      to be at the beginning of the queue); GranSim only 
1469   */
1470   if (tso!=END_TSO_QUEUE) {
1471     /* find tso in queue */
1472     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1473          t!=END_TSO_QUEUE && t!=tso;
1474          prev=t, t=t->link) 
1475       /* nothing */ ;
1476     ASSERT(t==tso);
1477     /* now actually dequeue the tso */
1478     if (prev!=END_TSO_QUEUE) {
1479       ASSERT(run_queue_hd!=t);
1480       prev->link = t->link;
1481     } else {
1482       /* t is at beginning of thread queue */
1483       ASSERT(run_queue_hd==t);
1484       run_queue_hd = t->link;
1485     }
1486     /* t is at end of thread queue */
1487     if (t->link==END_TSO_QUEUE) {
1488       ASSERT(t==run_queue_tl);
1489       run_queue_tl = prev;
1490     } else {
1491       ASSERT(run_queue_tl!=t);
1492     }
1493     t->link = END_TSO_QUEUE;
1494   } else {
1495     /* take tso from the beginning of the queue; std concurrent code */
1496     t = run_queue_hd;
1497     if (t != END_TSO_QUEUE) {
1498       run_queue_hd = t->link;
1499       t->link = END_TSO_QUEUE;
1500       if (run_queue_hd == END_TSO_QUEUE) {
1501         run_queue_tl = END_TSO_QUEUE;
1502       }
1503     }
1504   }
1505   return t;
1506 }
1507
1508 #endif /* 0 */
1509
1510 nat
1511 run_queue_len(void)
1512 {
1513   nat i;
1514   StgTSO *tso;
1515
1516   for (i=0, tso=run_queue_hd; 
1517        tso != END_TSO_QUEUE;
1518        i++, tso=tso->link)
1519     /* nothing */
1520
1521   return i;
1522 }
1523
1524
1525 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1526 //@subsection Garbage Collextion Routines
1527
1528 /* ---------------------------------------------------------------------------
1529    Where are the roots that we know about?
1530
1531         - all the threads on the runnable queue
1532         - all the threads on the blocked queue
1533         - all the thread currently executing a _ccall_GC
1534         - all the "main threads"
1535      
1536    ------------------------------------------------------------------------ */
1537
1538 /* This has to be protected either by the scheduler monitor, or by the
1539         garbage collection monitor (probably the latter).
1540         KH @ 25/10/99
1541 */
1542
1543 static void GetRoots(void)
1544 {
1545   StgMainThread *m;
1546   nat i;
1547
1548 #if defined(GRAN)
1549   for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1550     if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1551       run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1552     if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1553       run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1554     
1555     if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1556       blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1557     if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1558       blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1559     if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1560       ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1561   }
1562
1563   markEventQueue();
1564 #elif defined(PAR)
1565   run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1566   run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1567   blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1568   blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1569 #else
1570   run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1571   run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1572
1573   blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1574   blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1575 #endif 
1576
1577   for (m = main_threads; m != NULL; m = m->link) {
1578     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1579   }
1580   suspended_ccalling_threads = 
1581     (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1582
1583 #if defined(SMP) || defined(PAR) || defined(GRAN)
1584   markSparkQueue();
1585 #endif
1586 }
1587
1588 /* -----------------------------------------------------------------------------
1589    performGC
1590
1591    This is the interface to the garbage collector from Haskell land.
1592    We provide this so that external C code can allocate and garbage
1593    collect when called from Haskell via _ccall_GC.
1594
1595    It might be useful to provide an interface whereby the programmer
1596    can specify more roots (ToDo).
1597    
1598    This needs to be protected by the GC condition variable above.  KH.
1599    -------------------------------------------------------------------------- */
1600
1601 void (*extra_roots)(void);
1602
1603 void
1604 performGC(void)
1605 {
1606   GarbageCollect(GetRoots);
1607 }
1608
1609 static void
1610 AllRoots(void)
1611 {
1612   GetRoots();                   /* the scheduler's roots */
1613   extra_roots();                /* the user's roots */
1614 }
1615
1616 void
1617 performGCWithRoots(void (*get_roots)(void))
1618 {
1619   extra_roots = get_roots;
1620
1621   GarbageCollect(AllRoots);
1622 }
1623
1624 /* -----------------------------------------------------------------------------
1625    Stack overflow
1626
1627    If the thread has reached its maximum stack size,
1628    then bomb out.  Otherwise relocate the TSO into a larger chunk of
1629    memory and adjust its stack size appropriately.
1630    -------------------------------------------------------------------------- */
1631
1632 static StgTSO *
1633 threadStackOverflow(StgTSO *tso)
1634 {
1635   nat new_stack_size, new_tso_size, diff, stack_words;
1636   StgPtr new_sp;
1637   StgTSO *dest;
1638
1639   if (tso->stack_size >= tso->max_stack_size) {
1640 #if 0
1641     /* If we're debugging, just print out the top of the stack */
1642     printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
1643                                      tso->sp+64));
1644 #endif
1645 #ifdef INTERPRETER
1646     fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
1647     exit(1);
1648 #else
1649     /* Send this thread the StackOverflow exception */
1650     raiseAsync(tso, (StgClosure *)&stackOverflow_closure);
1651 #endif
1652     return tso;
1653   }
1654
1655   /* Try to double the current stack size.  If that takes us over the
1656    * maximum stack size for this thread, then use the maximum instead.
1657    * Finally round up so the TSO ends up as a whole number of blocks.
1658    */
1659   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
1660   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
1661                                        TSO_STRUCT_SIZE)/sizeof(W_);
1662   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
1663   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
1664
1665   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
1666
1667   dest = (StgTSO *)allocate(new_tso_size);
1668   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
1669
1670   /* copy the TSO block and the old stack into the new area */
1671   memcpy(dest,tso,TSO_STRUCT_SIZE);
1672   stack_words = tso->stack + tso->stack_size - tso->sp;
1673   new_sp = (P_)dest + new_tso_size - stack_words;
1674   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
1675
1676   /* relocate the stack pointers... */
1677   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
1678   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
1679   dest->sp    = new_sp;
1680   dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
1681   dest->stack_size = new_stack_size;
1682         
1683   /* and relocate the update frame list */
1684   relocate_TSO(tso, dest);
1685
1686   /* Mark the old one as dead so we don't try to scavenge it during
1687    * garbage collection (the TSO will likely be on a mutables list in
1688    * some generation, but it'll get collected soon enough).  It's
1689    * important to set the sp and su values to just beyond the end of
1690    * the stack, so we don't attempt to scavenge any part of the dead
1691    * TSO's stack.
1692    */
1693   tso->whatNext = ThreadKilled;
1694   tso->sp = (P_)&(tso->stack[tso->stack_size]);
1695   tso->su = (StgUpdateFrame *)tso->sp;
1696   tso->why_blocked = NotBlocked;
1697   dest->mut_link = NULL;
1698
1699   IF_DEBUG(sanity,checkTSO(tso));
1700 #if 0
1701   IF_DEBUG(scheduler,printTSO(dest));
1702 #endif
1703
1704 #if 0
1705   /* This will no longer work: KH */
1706   if (tso == MainTSO) { /* hack */
1707       MainTSO = dest;
1708   }
1709 #endif
1710   return dest;
1711 }
1712
1713 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
1714 //@subsection Blocking Queue Routines
1715
1716 /* ---------------------------------------------------------------------------
1717    Wake up a queue that was blocked on some resource.
1718    ------------------------------------------------------------------------ */
1719
1720 // ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE
1721
1722 #if defined(GRAN)
1723 # error FixME
1724 #elif defined(PAR)
1725 static inline void
1726 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
1727 {
1728   /* write RESUME events to log file and
1729      update blocked and fetch time (depending on type of the orig closure) */
1730   if (RtsFlags.ParFlags.ParStats.Full) {
1731     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
1732                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
1733                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
1734
1735     switch (get_itbl(node)->type) {
1736         case FETCH_ME_BQ:
1737           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
1738           break;
1739         case RBH:
1740         case FETCH_ME:
1741         case BLACKHOLE_BQ:
1742           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
1743           break;
1744         default:
1745           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
1746         }
1747       }
1748 }
1749 #endif
1750
1751 #if defined(GRAN)
1752 # error FixME
1753 #elif defined(PAR)
1754 static StgBlockingQueueElement *
1755 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
1756 {
1757     StgBlockingQueueElement *next;
1758
1759     switch (get_itbl(bqe)->type) {
1760     case TSO:
1761       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
1762       /* if it's a TSO just push it onto the run_queue */
1763       next = bqe->link;
1764       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
1765       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
1766       THREAD_RUNNABLE();
1767       unblockCount(bqe, node);
1768       /* reset blocking status after dumping event */
1769       ((StgTSO *)bqe)->why_blocked = NotBlocked;
1770       break;
1771
1772     case BLOCKED_FETCH:
1773       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
1774       next = bqe->link;
1775       bqe->link = PendingFetches;
1776       PendingFetches = bqe;
1777       break;
1778
1779 # if defined(DEBUG)
1780       /* can ignore this case in a non-debugging setup; 
1781          see comments on RBHSave closures above */
1782     case CONSTR:
1783       /* check that the closure is an RBHSave closure */
1784       ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
1785              get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
1786              get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
1787       break;
1788
1789     default:
1790       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
1791            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
1792            (StgClosure *)bqe);
1793 # endif
1794     }
1795   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
1796   return next;
1797 }
1798
1799 #else /* !GRAN && !PAR */
1800 static StgTSO *
1801 unblockOneLocked(StgTSO *tso)
1802 {
1803   StgTSO *next;
1804
1805   ASSERT(get_itbl(tso)->type == TSO);
1806   ASSERT(tso->why_blocked != NotBlocked);
1807   tso->why_blocked = NotBlocked;
1808   next = tso->link;
1809   PUSH_ON_RUN_QUEUE(tso);
1810   THREAD_RUNNABLE();
1811   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
1812   return next;
1813 }
1814 #endif
1815
1816 #if defined(GRAN)
1817 # error FixME
1818 #elif defined(PAR)
1819 inline StgTSO *
1820 unblockOne(StgTSO *tso, StgClosure *node)
1821 {
1822   ACQUIRE_LOCK(&sched_mutex);
1823   tso = unblockOneLocked(tso, node);
1824   RELEASE_LOCK(&sched_mutex);
1825   return tso;
1826 }
1827 #else
1828 inline StgTSO *
1829 unblockOne(StgTSO *tso)
1830 {
1831   ACQUIRE_LOCK(&sched_mutex);
1832   tso = unblockOneLocked(tso);
1833   RELEASE_LOCK(&sched_mutex);
1834   return tso;
1835 }
1836 #endif
1837
1838 #if defined(GRAN)
1839 # error FixME
1840 #elif defined(PAR)
1841 void 
1842 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
1843 {
1844   StgBlockingQueueElement *bqe, *next;
1845
1846   ACQUIRE_LOCK(&sched_mutex);
1847
1848   IF_PAR_DEBUG(verbose, 
1849                belch("## AwBQ for node %p on [%x]: ",
1850                      node, mytid));
1851
1852   ASSERT(get_itbl(q)->type == TSO ||           
1853          get_itbl(q)->type == BLOCKED_FETCH || 
1854          get_itbl(q)->type == CONSTR); 
1855
1856   bqe = q;
1857   while (get_itbl(bqe)->type==TSO || 
1858          get_itbl(bqe)->type==BLOCKED_FETCH) {
1859     bqe = unblockOneLocked(bqe, node);
1860   }
1861   RELEASE_LOCK(&sched_mutex);
1862 }
1863
1864 #else   /* !GRAN && !PAR */
1865 void
1866 awakenBlockedQueue(StgTSO *tso)
1867 {
1868   ACQUIRE_LOCK(&sched_mutex);
1869   while (tso != END_TSO_QUEUE) {
1870     tso = unblockOneLocked(tso);
1871   }
1872   RELEASE_LOCK(&sched_mutex);
1873 }
1874 #endif
1875
1876 #if 0
1877 // ngoq ngo'
1878
1879 #if defined(GRAN)
1880 /* 
1881    Awakening a blocking queue in GranSim means checking for each of the
1882    TSOs in the queue whether they are local or not, issuing a ResumeThread
1883    or an UnblockThread event, respectively. The basic iteration over the
1884    blocking queue is the same as in the standard setup.  
1885 */
1886 void
1887 awaken_blocked_queue(StgBlockingQueueElement *q, StgClosure *node)
1888 {
1889   StgBlockingQueueElement *bqe, *next;
1890   StgTSO *tso;
1891   PEs node_loc, tso_loc;
1892   rtsTime bq_processing_time = 0;
1893   nat len = 0, len_local = 0;
1894
1895   IF_GRAN_DEBUG(bq, 
1896                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
1897                       node, CurrentProc, CurrentTime[CurrentProc], 
1898                       CurrentTSO->id, CurrentTSO));
1899
1900   node_loc = where_is(node);
1901
1902   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
1903          get_itbl(q)->type == CONSTR); // closure (type constructor)
1904   ASSERT(is_unique(node));
1905
1906   /* FAKE FETCH: magically copy the node to the tso's proc;
1907      no Fetch necessary because in reality the node should not have been 
1908      moved to the other PE in the first place
1909   */
1910   if (CurrentProc!=node_loc) {
1911     IF_GRAN_DEBUG(bq, 
1912                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
1913                         node, node_loc, CurrentProc, CurrentTSO->id, 
1914                         // CurrentTSO, where_is(CurrentTSO),
1915                         node->header.gran.procs));
1916     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
1917     IF_GRAN_DEBUG(bq, 
1918                   belch("## new bitmask of node %p is %#x",
1919                         node, node->header.gran.procs));
1920     if (RtsFlags.GranFlags.GranSimStats.Global) {
1921       globalGranStats.tot_fake_fetches++;
1922     }
1923   }
1924
1925   next = q;
1926   // ToDo: check: ASSERT(CurrentProc==node_loc);
1927   while (get_itbl(next)->type==TSO) { // q != END_TSO_QUEUE) {
1928     bqe = next;
1929     next = bqe->link;
1930     /* 
1931        bqe points to the current element in the queue
1932        next points to the next element in the queue
1933     */
1934     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
1935     tso_loc = where_is(tso);
1936     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
1937       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
1938       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
1939       bq_processing_time += RtsFlags.GranFlags.Costs.lunblocktime;
1940       // insertThread(tso, node_loc);
1941       new_event(tso_loc, tso_loc,
1942                 CurrentTime[CurrentProc]+bq_processing_time,
1943                 ResumeThread,
1944                 tso, node, (rtsSpark*)NULL);
1945       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
1946       len_local++;
1947       len++;
1948     } else { // TSO is remote (actually should be FMBQ)
1949       bq_processing_time += RtsFlags.GranFlags.Costs.mpacktime;
1950       bq_processing_time += RtsFlags.GranFlags.Costs.gunblocktime;
1951       new_event(tso_loc, CurrentProc, 
1952                 CurrentTime[CurrentProc]+bq_processing_time+
1953                 RtsFlags.GranFlags.Costs.latency,
1954                 UnblockThread,
1955                 tso, node, (rtsSpark*)NULL);
1956       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
1957       bq_processing_time += RtsFlags.GranFlags.Costs.mtidytime;
1958       len++;
1959     }      
1960     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
1961     IF_GRAN_DEBUG(bq,
1962                   fprintf(stderr," %s TSO %d (%p) [PE %d] (blocked_on=%p) (next=%p) ,",
1963                           (node_loc==tso_loc ? "Local" : "Global"), 
1964                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link))
1965     tso->block_info.closure = NULL;
1966     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
1967                              tso->id, tso));
1968   }
1969
1970   /* if this is the BQ of an RBH, we have to put back the info ripped out of
1971      the closure to make room for the anchor of the BQ */
1972   if (next!=END_BQ_QUEUE) {
1973     ASSERT(get_itbl(node)->type == RBH && get_itbl(next)->type == CONSTR);
1974     /*
1975     ASSERT((info_ptr==&RBH_Save_0_info) ||
1976            (info_ptr==&RBH_Save_1_info) ||
1977            (info_ptr==&RBH_Save_2_info));
1978     */
1979     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
1980     ((StgRBH *)node)->blocking_queue = ((StgRBHSave *)next)->payload[0];
1981     ((StgRBH *)node)->mut_link       = ((StgRBHSave *)next)->payload[1];
1982
1983     IF_GRAN_DEBUG(bq,
1984                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
1985                         node, info_type(node)));
1986   }
1987
1988   /* statistics gathering */
1989   if (RtsFlags.GranFlags.GranSimStats.Global) {
1990     globalGranStats.tot_bq_processing_time += bq_processing_time;
1991     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
1992     globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
1993     globalGranStats.tot_awbq++;             // total no. of bqs awakened
1994   }
1995   IF_GRAN_DEBUG(bq,
1996                 fprintf(stderr,"## BQ Stats of %p: [%d entries, %d local] %s\n",
1997                         node, len, len_local, (next!=END_TSO_QUEUE) ? "RBH" : ""));
1998 }
1999
2000 #elif defined(PAR)
2001
2002 /*
2003   Awakening a blocking queue in GUM has to check whether an entry in the
2004   queue is a normal TSO or a BLOCKED_FETCH. The later indicates that a TSO is
2005   waiting for the result of this computation on another PE. Thus, when
2006   finding a BLOCKED_FETCH we have to send off a message to that PE. 
2007   Actually, we defer sending off a message, by just putting the BLOCKED_FETCH
2008   onto the PendingFetches queue, which will be later traversed by
2009   processFetches, sending off a RESUME message for each BLOCKED_FETCH.
2010
2011   NB: There is no check for an RBHSave closure (type CONSTR) in the code 
2012       below. The reason is, if we awaken the BQ of an RBH closure (RBHSaves 
2013       only exist at the end of such BQs) we know that the closure has been
2014       unpacked successfully on the other PE, and we can discard the info
2015       contained in the RBHSave closure. The current closure will be turned 
2016       into a FetchMe closure anyway.
2017 */
2018 void 
2019 awaken_blocked_queue(StgBlockingQueueElement *q, StgClosure *node)
2020 {
2021   StgBlockingQueueElement *bqe, *next;
2022
2023   IF_PAR_DEBUG(verbose, 
2024                belch("## AwBQ for node %p on [%x]: ",
2025                      node, mytid));
2026
2027   ASSERT(get_itbl(q)->type == TSO ||           
2028          get_itbl(q)->type == BLOCKED_FETCH || 
2029          get_itbl(q)->type == CONSTR); 
2030
2031   next = q;
2032   while (get_itbl(next)->type==TSO || 
2033          get_itbl(next)->type==BLOCKED_FETCH) {
2034     bqe = next;
2035     switch (get_itbl(bqe)->type) {
2036     case TSO:
2037       /* if it's a TSO just push it onto the run_queue */
2038       next = bqe->link;
2039 #if defined(DEBUG)
2040       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging only
2041 #endif
2042       push_on_run_queue((StgTSO *)bqe); // HWL: was: PUSH_ON_RUN_QUEUE(tso);
2043
2044       /* write RESUME events to log file and
2045          update blocked and fetch time (depending on type of the orig closure) */
2046       if (RtsFlags.ParFlags.ParStats.Full) {
2047         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2048                          GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2049                          0, spark_queue_len(ADVISORY_POOL));
2050
2051         switch (get_itbl(node)->type) {
2052         case FETCH_ME_BQ:
2053           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2054           break;
2055         case RBH:
2056         case FETCH_ME:
2057         case BLACKHOLE_BQ:
2058           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2059           break;
2060         default:
2061           barf("{awaken_blocked_queue}Daq Qagh: unexpected closure %p (%s) with blocking queue",
2062                node, info_type(node));
2063         }
2064       }
2065       /* reset block_info.closure field after dumping event */
2066       ((StgTSO *)bqe)->block_info.closure = NULL;
2067
2068       /* rest of this branch is debugging only */
2069       IF_PAR_DEBUG(verbose,
2070                    fprintf(stderr," TSO %d (%p) [PE %lx] (block_info.closure=%p) (next=%p) ,",
2071                            ((StgTSO *)bqe)->id, (StgTSO *)bqe,
2072                            mytid, ((StgTSO *)bqe)->block_info.closure, ((StgTSO *)bqe)->link));
2073
2074       IF_DEBUG(scheduler,
2075                if (!RtsFlags.ParFlags.Debug.verbose)
2076                  belch("-- Waking up thread %ld (%p)", 
2077                        ((StgTSO *)bqe)->id, (StgTSO *)bqe));
2078       break;
2079
2080     case BLOCKED_FETCH:
2081       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2082       next = bqe->link;
2083       bqe->link = PendingFetches;
2084       PendingFetches = bqe;
2085       // bqe.tso->block_info.closure = NULL;
2086
2087       /* rest of this branch is debugging only */
2088       IF_PAR_DEBUG(verbose,
2089                    fprintf(stderr," BLOCKED_FETCH (%p) on node %p [PE %lx] (next=%p) ,",
2090                            ((StgBlockedFetch *)bqe), 
2091                            ((StgBlockedFetch *)bqe)->node, 
2092                            mytid, ((StgBlockedFetch *)bqe)->link));
2093       break;
2094
2095 # if defined(DEBUG)
2096       /* can ignore this case in a non-debugging setup; 
2097          see comments on RBHSave closures above */
2098     case CONSTR:
2099       /* check that the closure is an RBHSave closure */
2100       ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2101              get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2102              get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2103       break;
2104
2105     default:
2106       barf("{awaken_blocked_queue}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2107            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2108            (StgClosure *)bqe);
2109 # endif
2110     }
2111   }
2112 }
2113
2114 #else /* !GRAN && !PAR */
2115
2116 void 
2117 awaken_blocked_queue(StgTSO *q) { awakenBlockedQueue(q); }
2118
2119 /*
2120 {
2121   StgTSO *tso;
2122
2123   while (q != END_TSO_QUEUE) {
2124     ASSERT(get_itbl(q)->type == TSO);
2125     tso = q;
2126     q = tso->link;
2127     push_on_run_queue(tso); // HWL: was: PUSH_ON_RUN_QUEUE(tso);
2128     //tso->block_info.closure = NULL;
2129     IF_DEBUG(scheduler, belch("-- Waking up thread %ld (%p)", tso->id, tso));
2130   }
2131 }
2132 */
2133 #endif /* GRAN */
2134 #endif /* 0 */
2135
2136 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2137 //@subsection Exception Handling Routines
2138
2139 /* ---------------------------------------------------------------------------
2140    Interrupt execution
2141    - usually called inside a signal handler so it mustn't do anything fancy.   
2142    ------------------------------------------------------------------------ */
2143
2144 void
2145 interruptStgRts(void)
2146 {
2147     interrupted    = 1;
2148     context_switch = 1;
2149 }
2150
2151 /* -----------------------------------------------------------------------------
2152    Unblock a thread
2153
2154    This is for use when we raise an exception in another thread, which
2155    may be blocked.
2156    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2157    -------------------------------------------------------------------------- */
2158
2159 static void
2160 unblockThread(StgTSO *tso)
2161 {
2162   StgTSO *t, **last;
2163
2164   ACQUIRE_LOCK(&sched_mutex);
2165   switch (tso->why_blocked) {
2166
2167   case NotBlocked:
2168     return;  /* not blocked */
2169
2170   case BlockedOnMVar:
2171     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2172     {
2173       StgTSO *last_tso = END_TSO_QUEUE;
2174       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2175
2176       last = &mvar->head;
2177       for (t = mvar->head; t != END_TSO_QUEUE; 
2178            last = &t->link, last_tso = t, t = t->link) {
2179         if (t == tso) {
2180           *last = tso->link;
2181           if (mvar->tail == tso) {
2182             mvar->tail = last_tso;
2183           }
2184           goto done;
2185         }
2186       }
2187       barf("unblockThread (MVAR): TSO not found");
2188     }
2189
2190   case BlockedOnBlackHole:
2191     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2192     {
2193       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2194
2195       last = &bq->blocking_queue;
2196       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2197            last = &t->link, t = t->link) {
2198         if (t == tso) {
2199           *last = tso->link;
2200           goto done;
2201         }
2202       }
2203       barf("unblockThread (BLACKHOLE): TSO not found");
2204     }
2205
2206   case BlockedOnException:
2207     {
2208       StgTSO *target  = tso->block_info.tso;
2209
2210       ASSERT(get_itbl(target)->type == TSO);
2211       ASSERT(target->blocked_exceptions != NULL);
2212
2213       last = &target->blocked_exceptions;
2214       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2215            last = &t->link, t = t->link) {
2216         ASSERT(get_itbl(t)->type == TSO);
2217         if (t == tso) {
2218           *last = tso->link;
2219           goto done;
2220         }
2221       }
2222       barf("unblockThread (Exception): TSO not found");
2223     }
2224
2225   case BlockedOnDelay:
2226   case BlockedOnRead:
2227   case BlockedOnWrite:
2228     {
2229       StgTSO *prev = NULL;
2230       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2231            prev = t, t = t->link) {
2232         if (t == tso) {
2233           if (prev == NULL) {
2234             blocked_queue_hd = t->link;
2235             if (blocked_queue_tl == t) {
2236               blocked_queue_tl = END_TSO_QUEUE;
2237             }
2238           } else {
2239             prev->link = t->link;
2240             if (blocked_queue_tl == t) {
2241               blocked_queue_tl = prev;
2242             }
2243           }
2244           goto done;
2245         }
2246       }
2247       barf("unblockThread (I/O): TSO not found");
2248     }
2249
2250   default:
2251     barf("unblockThread");
2252   }
2253
2254  done:
2255   tso->link = END_TSO_QUEUE;
2256   tso->why_blocked = NotBlocked;
2257   tso->block_info.closure = NULL;
2258   PUSH_ON_RUN_QUEUE(tso);
2259   RELEASE_LOCK(&sched_mutex);
2260 }
2261
2262 /* -----------------------------------------------------------------------------
2263  * raiseAsync()
2264  *
2265  * The following function implements the magic for raising an
2266  * asynchronous exception in an existing thread.
2267  *
2268  * We first remove the thread from any queue on which it might be
2269  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2270  *
2271  * We strip the stack down to the innermost CATCH_FRAME, building
2272  * thunks in the heap for all the active computations, so they can 
2273  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2274  * an application of the handler to the exception, and push it on
2275  * the top of the stack.
2276  * 
2277  * How exactly do we save all the active computations?  We create an
2278  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2279  * AP_UPDs pushes everything from the corresponding update frame
2280  * upwards onto the stack.  (Actually, it pushes everything up to the
2281  * next update frame plus a pointer to the next AP_UPD object.
2282  * Entering the next AP_UPD object pushes more onto the stack until we
2283  * reach the last AP_UPD object - at which point the stack should look
2284  * exactly as it did when we killed the TSO and we can continue
2285  * execution by entering the closure on top of the stack.
2286  *
2287  * We can also kill a thread entirely - this happens if either (a) the 
2288  * exception passed to raiseAsync is NULL, or (b) there's no
2289  * CATCH_FRAME on the stack.  In either case, we strip the entire
2290  * stack and replace the thread with a zombie.
2291  *
2292  * -------------------------------------------------------------------------- */
2293  
2294 void 
2295 deleteThread(StgTSO *tso)
2296 {
2297   raiseAsync(tso,NULL);
2298 }
2299
2300 void
2301 raiseAsync(StgTSO *tso, StgClosure *exception)
2302 {
2303   StgUpdateFrame* su = tso->su;
2304   StgPtr          sp = tso->sp;
2305   
2306   /* Thread already dead? */
2307   if (tso->whatNext == ThreadComplete || tso->whatNext == ThreadKilled) {
2308     return;
2309   }
2310
2311   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2312
2313   /* Remove it from any blocking queues */
2314   unblockThread(tso);
2315
2316   /* The stack freezing code assumes there's a closure pointer on
2317    * the top of the stack.  This isn't always the case with compiled
2318    * code, so we have to push a dummy closure on the top which just
2319    * returns to the next return address on the stack.
2320    */
2321   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2322     *(--sp) = (W_)&dummy_ret_closure;
2323   }
2324
2325   while (1) {
2326     int words = ((P_)su - (P_)sp) - 1;
2327     nat i;
2328     StgAP_UPD * ap;
2329
2330     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2331      * then build PAP(handler,exception), and leave it on top of
2332      * the stack ready to enter.
2333      */
2334     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2335       StgCatchFrame *cf = (StgCatchFrame *)su;
2336       /* we've got an exception to raise, so let's pass it to the
2337        * handler in this frame.
2338        */
2339       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 1);
2340       TICK_ALLOC_UPD_PAP(2,0);
2341       SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2342               
2343       ap->n_args = 1;
2344       ap->fun = cf->handler;
2345       ap->payload[0] = (P_)exception;
2346
2347       /* sp currently points to the word above the CATCH_FRAME on the stack.
2348        */
2349       sp += sizeofW(StgCatchFrame);
2350       tso->su = cf->link;
2351
2352       /* Restore the blocked/unblocked state for asynchronous exceptions
2353        * at the CATCH_FRAME.  
2354        *
2355        * If exceptions were unblocked at the catch, arrange that they
2356        * are unblocked again after executing the handler by pushing an
2357        * unblockAsyncExceptions_ret stack frame.
2358        */
2359       if (!cf->exceptions_blocked) {
2360         *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2361       }
2362       
2363       /* Ensure that async exceptions are blocked when running the handler.
2364        */
2365       if (tso->blocked_exceptions == NULL) {
2366         tso->blocked_exceptions = END_TSO_QUEUE;
2367       }
2368       
2369       /* Put the newly-built PAP on top of the stack, ready to execute
2370        * when the thread restarts.
2371        */
2372       sp[0] = (W_)ap;
2373       tso->sp = sp;
2374       tso->whatNext = ThreadEnterGHC;
2375       return;
2376     }
2377
2378     /* First build an AP_UPD consisting of the stack chunk above the
2379      * current update frame, with the top word on the stack as the
2380      * fun field.
2381      */
2382     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2383     
2384     ASSERT(words >= 0);
2385     
2386     ap->n_args = words;
2387     ap->fun    = (StgClosure *)sp[0];
2388     sp++;
2389     for(i=0; i < (nat)words; ++i) {
2390       ap->payload[i] = (P_)*sp++;
2391     }
2392     
2393     switch (get_itbl(su)->type) {
2394       
2395     case UPDATE_FRAME:
2396       {
2397         SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2398         TICK_ALLOC_UP_THK(words+1,0);
2399         
2400         IF_DEBUG(scheduler,
2401                  fprintf(stderr,  "scheduler: Updating ");
2402                  printPtr((P_)su->updatee); 
2403                  fprintf(stderr,  " with ");
2404                  printObj((StgClosure *)ap);
2405                  );
2406         
2407         /* Replace the updatee with an indirection - happily
2408          * this will also wake up any threads currently
2409          * waiting on the result.
2410          */
2411         UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2412         su = su->link;
2413         sp += sizeofW(StgUpdateFrame) -1;
2414         sp[0] = (W_)ap; /* push onto stack */
2415         break;
2416       }
2417       
2418     case CATCH_FRAME:
2419       {
2420         StgCatchFrame *cf = (StgCatchFrame *)su;
2421         StgClosure* o;
2422         
2423         /* We want a PAP, not an AP_UPD.  Fortunately, the
2424          * layout's the same.
2425          */
2426         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2427         TICK_ALLOC_UPD_PAP(words+1,0);
2428         
2429         /* now build o = FUN(catch,ap,handler) */
2430         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2431         TICK_ALLOC_FUN(2,0);
2432         SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2433         o->payload[0] = (StgClosure *)ap;
2434         o->payload[1] = cf->handler;
2435         
2436         IF_DEBUG(scheduler,
2437                  fprintf(stderr,  "scheduler: Built ");
2438                  printObj((StgClosure *)o);
2439                  );
2440         
2441         /* pop the old handler and put o on the stack */
2442         su = cf->link;
2443         sp += sizeofW(StgCatchFrame) - 1;
2444         sp[0] = (W_)o;
2445         break;
2446       }
2447       
2448     case SEQ_FRAME:
2449       {
2450         StgSeqFrame *sf = (StgSeqFrame *)su;
2451         StgClosure* o;
2452         
2453         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2454         TICK_ALLOC_UPD_PAP(words+1,0);
2455         
2456         /* now build o = FUN(seq,ap) */
2457         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2458         TICK_ALLOC_SE_THK(1,0);
2459         SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2460         payloadCPtr(o,0) = (StgClosure *)ap;
2461         
2462         IF_DEBUG(scheduler,
2463                  fprintf(stderr,  "scheduler: Built ");
2464                  printObj((StgClosure *)o);
2465                  );
2466         
2467         /* pop the old handler and put o on the stack */
2468         su = sf->link;
2469         sp += sizeofW(StgSeqFrame) - 1;
2470         sp[0] = (W_)o;
2471         break;
2472       }
2473       
2474     case STOP_FRAME:
2475       /* We've stripped the entire stack, the thread is now dead. */
2476       sp += sizeofW(StgStopFrame) - 1;
2477       sp[0] = (W_)exception;    /* save the exception */
2478       tso->whatNext = ThreadKilled;
2479       tso->su = (StgUpdateFrame *)(sp+1);
2480       tso->sp = sp;
2481       return;
2482       
2483     default:
2484       barf("raiseAsync");
2485     }
2486   }
2487   barf("raiseAsync");
2488 }
2489
2490 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2491 //@subsection Debugging Routines
2492
2493 /* -----------------------------------------------------------------------------
2494    Debugging: why is a thread blocked
2495    -------------------------------------------------------------------------- */
2496
2497 #ifdef DEBUG
2498
2499 void printThreadBlockage(StgTSO *tso)
2500 {
2501   switch (tso->why_blocked) {
2502   case BlockedOnRead:
2503     fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2504     break;
2505   case BlockedOnWrite:
2506     fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2507     break;
2508   case BlockedOnDelay:
2509     fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2510     break;
2511   case BlockedOnMVar:
2512     fprintf(stderr,"blocked on an MVar");
2513     break;
2514   case BlockedOnException:
2515     fprintf(stderr,"blocked on delivering an exception to thread %d",
2516             tso->block_info.tso->id);
2517     break;
2518   case BlockedOnBlackHole:
2519     fprintf(stderr,"blocked on a black hole");
2520     break;
2521   case NotBlocked:
2522     fprintf(stderr,"not blocked");
2523     break;
2524 #if defined(PAR)
2525   case BlockedOnGA:
2526     fprintf(stderr,"blocked on global address");
2527     break;
2528 #endif
2529   }
2530 }
2531
2532 /* 
2533    Print a whole blocking queue attached to node (debugging only).
2534 */
2535 //@cindex print_bq
2536 # if defined(PAR)
2537 void 
2538 print_bq (StgClosure *node)
2539 {
2540   StgBlockingQueueElement *bqe;
2541   StgTSO *tso;
2542   rtsBool end;
2543
2544   fprintf(stderr,"## BQ of closure %p (%s): ",
2545           node, info_type(node));
2546
2547   /* should cover all closures that may have a blocking queue */
2548   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2549          get_itbl(node)->type == FETCH_ME_BQ ||
2550          get_itbl(node)->type == RBH);
2551     
2552   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2553   /* 
2554      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2555   */
2556   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2557        !end; // iterate until bqe points to a CONSTR
2558        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2559     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2560     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2561     /* types of closures that may appear in a blocking queue */
2562     ASSERT(get_itbl(bqe)->type == TSO ||           
2563            get_itbl(bqe)->type == BLOCKED_FETCH || 
2564            get_itbl(bqe)->type == CONSTR); 
2565     /* only BQs of an RBH end with an RBH_Save closure */
2566     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2567
2568     switch (get_itbl(bqe)->type) {
2569     case TSO:
2570       fprintf(stderr," TSO %d (%x),",
2571               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
2572       break;
2573     case BLOCKED_FETCH:
2574       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
2575               ((StgBlockedFetch *)bqe)->node, 
2576               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
2577               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
2578               ((StgBlockedFetch *)bqe)->ga.weight);
2579       break;
2580     case CONSTR:
2581       fprintf(stderr," %s (IP %p),",
2582               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2583                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2584                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2585                "RBH_Save_?"), get_itbl(bqe));
2586       break;
2587     default:
2588       barf("Unexpected closure type %s in blocking queue of %p (%s)",
2589            info_type(bqe), node, info_type(node));
2590       break;
2591     }
2592   } /* for */
2593   fputc('\n', stderr);
2594 }
2595 # elif defined(GRAN)
2596 void 
2597 print_bq (StgClosure *node)
2598 {
2599   StgBlockingQueueElement *bqe;
2600   StgTSO *tso;
2601   PEs node_loc, tso_loc;
2602   rtsBool end;
2603
2604   /* should cover all closures that may have a blocking queue */
2605   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2606          get_itbl(node)->type == FETCH_ME_BQ ||
2607          get_itbl(node)->type == RBH);
2608     
2609   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2610   node_loc = where_is(node);
2611
2612   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
2613           node, info_type(node), node_loc);
2614
2615   /* 
2616      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2617   */
2618   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2619        !end; // iterate until bqe points to a CONSTR
2620        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2621     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2622     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2623     /* types of closures that may appear in a blocking queue */
2624     ASSERT(get_itbl(bqe)->type == TSO ||           
2625            get_itbl(bqe)->type == CONSTR); 
2626     /* only BQs of an RBH end with an RBH_Save closure */
2627     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2628
2629     tso_loc = where_is((StgClosure *)bqe);
2630     switch (get_itbl(bqe)->type) {
2631     case TSO:
2632       fprintf(stderr," TSO %d (%x) on [PE %d],",
2633               ((StgTSO *)bqe)->id, ((StgTSO *)bqe), tso_loc);
2634       break;
2635     case CONSTR:
2636       fprintf(stderr," %s (IP %p),",
2637               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2638                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2639                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2640                "RBH_Save_?"), get_itbl(bqe));
2641       break;
2642     default:
2643       barf("Unexpected closure type %s in blocking queue of %p (%s)",
2644            info_type(bqe), node, info_type(node));
2645       break;
2646     }
2647   } /* for */
2648   fputc('\n', stderr);
2649 }
2650 #else
2651 /* 
2652    Nice and easy: only TSOs on the blocking queue
2653 */
2654 void 
2655 print_bq (StgClosure *node)
2656 {
2657   StgTSO *tso;
2658
2659   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2660   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
2661        tso != END_TSO_QUEUE; 
2662        tso=tso->link) {
2663     ASSERT(tso!=(StgTSO*)NULL && tso!=END_TSO_QUEUE);   // sanity check
2664     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
2665     fprintf(stderr," TSO %d (%x),", tso->id, tso);
2666   }
2667   fputc('\n', stderr);
2668 }
2669 # endif
2670
2671 /* A debugging function used all over the place in GranSim and GUM.
2672    Dummy function in other setups.
2673 */
2674 # if !defined(GRAN) && !defined(PAR)
2675 char *
2676 info_type(StgClosure *closure){ 
2677   return "petaQ";
2678 }
2679
2680 char *
2681 info_type_by_ip(StgInfoTable *ip){ 
2682   return "petaQ";
2683 }
2684 #endif
2685
2686 static void
2687 sched_belch(char *s, ...)
2688 {
2689   va_list ap;
2690   va_start(ap,s);
2691 #ifdef SMP
2692   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
2693 #else
2694   fprintf(stderr, "scheduler: ");
2695 #endif
2696   vfprintf(stderr, s, ap);
2697   fprintf(stderr, "\n");
2698 }
2699
2700 #endif /* DEBUG */
2701
2702 //@node Index,  , Debugging Routines, Main scheduling code
2703 //@subsection Index
2704
2705 //@index
2706 //* MainRegTable::  @cindex\s-+MainRegTable
2707 //* StgMainThread::  @cindex\s-+StgMainThread
2708 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
2709 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
2710 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
2711 //* context_switch::  @cindex\s-+context_switch
2712 //* createThread::  @cindex\s-+createThread
2713 //* free_capabilities::  @cindex\s-+free_capabilities
2714 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
2715 //* initScheduler::  @cindex\s-+initScheduler
2716 //* interrupted::  @cindex\s-+interrupted
2717 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
2718 //* next_thread_id::  @cindex\s-+next_thread_id
2719 //* print_bq::  @cindex\s-+print_bq
2720 //* run_queue_hd::  @cindex\s-+run_queue_hd
2721 //* run_queue_tl::  @cindex\s-+run_queue_tl
2722 //* sched_mutex::  @cindex\s-+sched_mutex
2723 //* schedule::  @cindex\s-+schedule
2724 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
2725 //* task_ids::  @cindex\s-+task_ids
2726 //* term_mutex::  @cindex\s-+term_mutex
2727 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
2728 //@end index