[project @ 2003-06-19 10:35:37 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.170 2003/06/19 10:35:37 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #define COMPILING_SCHEDULER
88 #include "Schedule.h"
89 #include "StgMiscClosures.h"
90 #include "Storage.h"
91 #include "Interpreter.h"
92 #include "Exception.h"
93 #include "Printer.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Timer.h"
98 #include "Prelude.h"
99 #include "ThreadLabels.h"
100 #ifdef PROFILING
101 #include "Proftimer.h"
102 #include "ProfHeap.h"
103 #endif
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
111 # include "HLC.h"
112 #endif
113 #include "Sparks.h"
114 #include "Capability.h"
115 #include "OSThreads.h"
116 #include  "Task.h"
117
118 #ifdef HAVE_SYS_TYPES_H
119 #include <sys/types.h>
120 #endif
121 #ifdef HAVE_UNISTD_H
122 #include <unistd.h>
123 #endif
124
125 #include <string.h>
126 #include <stdlib.h>
127 #include <stdarg.h>
128
129 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
130 //@subsection Variables and Data structures
131
132 /* Main thread queue.
133  * Locks required: sched_mutex.
134  */
135 StgMainThread *main_threads = NULL;
136
137 #ifdef THREADED_RTS
138 // Pointer to the thread that executes main
139 // When this thread is finished, the program terminates
140 // by calling shutdownHaskellAndExit.
141 // It would be better to add a call to shutdownHaskellAndExit
142 // to the Main.main wrapper and to remove this hack.
143 StgMainThread *main_main_thread = NULL;
144 #endif
145
146 /* Thread queues.
147  * Locks required: sched_mutex.
148  */
149 #if defined(GRAN)
150
151 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
152 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
153
154 /* 
155    In GranSim we have a runnable and a blocked queue for each processor.
156    In order to minimise code changes new arrays run_queue_hds/tls
157    are created. run_queue_hd is then a short cut (macro) for
158    run_queue_hds[CurrentProc] (see GranSim.h).
159    -- HWL
160 */
161 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
162 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
163 StgTSO *ccalling_threadss[MAX_PROC];
164 /* We use the same global list of threads (all_threads) in GranSim as in
165    the std RTS (i.e. we are cheating). However, we don't use this list in
166    the GranSim specific code at the moment (so we are only potentially
167    cheating).  */
168
169 #else /* !GRAN */
170
171 StgTSO *run_queue_hd = NULL;
172 StgTSO *run_queue_tl = NULL;
173 StgTSO *blocked_queue_hd = NULL;
174 StgTSO *blocked_queue_tl = NULL;
175 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
176
177 #endif
178
179 /* Linked list of all threads.
180  * Used for detecting garbage collected threads.
181  */
182 StgTSO *all_threads = NULL;
183
184 /* When a thread performs a safe C call (_ccall_GC, using old
185  * terminology), it gets put on the suspended_ccalling_threads
186  * list. Used by the garbage collector.
187  */
188 static StgTSO *suspended_ccalling_threads;
189
190 static StgTSO *threadStackOverflow(StgTSO *tso);
191
192 /* KH: The following two flags are shared memory locations.  There is no need
193        to lock them, since they are only unset at the end of a scheduler
194        operation.
195 */
196
197 /* flag set by signal handler to precipitate a context switch */
198 //@cindex context_switch
199 nat context_switch = 0;
200
201 /* if this flag is set as well, give up execution */
202 //@cindex interrupted
203 rtsBool interrupted = rtsFalse;
204
205 /* Next thread ID to allocate.
206  * Locks required: thread_id_mutex
207  */
208 //@cindex next_thread_id
209 static StgThreadID next_thread_id = 1;
210
211 /*
212  * Pointers to the state of the current thread.
213  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
214  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
215  */
216  
217 /* The smallest stack size that makes any sense is:
218  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
219  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
220  *  + 1                       (the closure to enter)
221  *  + 1                       (stg_ap_v_ret)
222  *  + 1                       (spare slot req'd by stg_ap_v_ret)
223  *
224  * A thread with this stack will bomb immediately with a stack
225  * overflow, which will increase its stack size.  
226  */
227
228 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
229
230
231 #if defined(GRAN)
232 StgTSO *CurrentTSO;
233 #endif
234
235 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
236  *  exists - earlier gccs apparently didn't.
237  *  -= chak
238  */
239 StgTSO dummy_tso;
240
241 static rtsBool ready_to_gc;
242
243 /*
244  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
245  * in an MT setting, needed to signal that a worker thread shouldn't hang around
246  * in the scheduler when it is out of work.
247  */
248 static rtsBool shutting_down_scheduler = rtsFalse;
249
250 void            addToBlockedQueue ( StgTSO *tso );
251
252 static void     schedule          ( void );
253        void     interruptStgRts   ( void );
254
255 static void     detectBlackHoles  ( void );
256
257 #ifdef DEBUG
258 static void sched_belch(char *s, ...);
259 #endif
260
261 #if defined(RTS_SUPPORTS_THREADS)
262 /* ToDo: carefully document the invariants that go together
263  *       with these synchronisation objects.
264  */
265 Mutex     sched_mutex       = INIT_MUTEX_VAR;
266 Mutex     term_mutex        = INIT_MUTEX_VAR;
267
268 /*
269  * A heavyweight solution to the problem of protecting
270  * the thread_id from concurrent update.
271  */
272 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
273
274
275 # if defined(SMP)
276 static Condition gc_pending_cond = INIT_COND_VAR;
277 nat await_death;
278 # endif
279
280 #endif /* RTS_SUPPORTS_THREADS */
281
282 #if defined(PAR)
283 StgTSO *LastTSO;
284 rtsTime TimeOfLastYield;
285 rtsBool emitSchedule = rtsTrue;
286 #endif
287
288 #if DEBUG
289 static char *whatNext_strs[] = {
290   "ThreadRunGHC",
291   "ThreadInterpret",
292   "ThreadKilled",
293   "ThreadRelocated",
294   "ThreadComplete"
295 };
296 #endif
297
298 #if defined(PAR)
299 StgTSO * createSparkThread(rtsSpark spark);
300 StgTSO * activateSpark (rtsSpark spark);  
301 #endif
302
303 /*
304  * The thread state for the main thread.
305 // ToDo: check whether not needed any more
306 StgTSO   *MainTSO;
307  */
308
309 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
310 static void taskStart(void);
311 static void
312 taskStart(void)
313 {
314   schedule();
315 }
316 #endif
317
318 #if defined(RTS_SUPPORTS_THREADS)
319 void
320 startSchedulerTask(void)
321 {
322     startTask(taskStart);
323 }
324 #endif
325
326 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
327 //@subsection Main scheduling loop
328
329 /* ---------------------------------------------------------------------------
330    Main scheduling loop.
331
332    We use round-robin scheduling, each thread returning to the
333    scheduler loop when one of these conditions is detected:
334
335       * out of heap space
336       * timer expires (thread yields)
337       * thread blocks
338       * thread ends
339       * stack overflow
340
341    Locking notes:  we acquire the scheduler lock once at the beginning
342    of the scheduler loop, and release it when
343     
344       * running a thread, or
345       * waiting for work, or
346       * waiting for a GC to complete.
347
348    GRAN version:
349      In a GranSim setup this loop iterates over the global event queue.
350      This revolves around the global event queue, which determines what 
351      to do next. Therefore, it's more complicated than either the 
352      concurrent or the parallel (GUM) setup.
353
354    GUM version:
355      GUM iterates over incoming messages.
356      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
357      and sends out a fish whenever it has nothing to do; in-between
358      doing the actual reductions (shared code below) it processes the
359      incoming messages and deals with delayed operations 
360      (see PendingFetches).
361      This is not the ugliest code you could imagine, but it's bloody close.
362
363    ------------------------------------------------------------------------ */
364 //@cindex schedule
365 static void
366 schedule( void )
367 {
368   StgTSO *t;
369   Capability *cap;
370   StgThreadReturnCode ret;
371 #if defined(GRAN)
372   rtsEvent *event;
373 #elif defined(PAR)
374   StgSparkPool *pool;
375   rtsSpark spark;
376   StgTSO *tso;
377   GlobalTaskId pe;
378   rtsBool receivedFinish = rtsFalse;
379 # if defined(DEBUG)
380   nat tp_size, sp_size; // stats only
381 # endif
382 #endif
383   rtsBool was_interrupted = rtsFalse;
384   StgTSOWhatNext prev_what_next;
385   
386   ACQUIRE_LOCK(&sched_mutex);
387  
388 #if defined(RTS_SUPPORTS_THREADS)
389   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
390   IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId()));
391 #else
392   /* simply initialise it in the non-threaded case */
393   grabCapability(&cap);
394 #endif
395
396 #if defined(GRAN)
397   /* set up first event to get things going */
398   /* ToDo: assign costs for system setup and init MainTSO ! */
399   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
400             ContinueThread, 
401             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
402
403   IF_DEBUG(gran,
404            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
405            G_TSO(CurrentTSO, 5));
406
407   if (RtsFlags.GranFlags.Light) {
408     /* Save current time; GranSim Light only */
409     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
410   }      
411
412   event = get_next_event();
413
414   while (event!=(rtsEvent*)NULL) {
415     /* Choose the processor with the next event */
416     CurrentProc = event->proc;
417     CurrentTSO = event->tso;
418
419 #elif defined(PAR)
420
421   while (!receivedFinish) {    /* set by processMessages */
422                                /* when receiving PP_FINISH message         */ 
423 #else
424
425   while (1) {
426
427 #endif
428
429     IF_DEBUG(scheduler, printAllThreads());
430
431 #if defined(RTS_SUPPORTS_THREADS)
432     /* Check to see whether there are any worker threads
433        waiting to deposit external call results. If so,
434        yield our capability */
435     yieldToReturningWorker(&sched_mutex, &cap);
436 #endif
437
438     /* If we're interrupted (the user pressed ^C, or some other
439      * termination condition occurred), kill all the currently running
440      * threads.
441      */
442     if (interrupted) {
443       IF_DEBUG(scheduler, sched_belch("interrupted"));
444       interrupted = rtsFalse;
445       was_interrupted = rtsTrue;
446 #if defined(RTS_SUPPORTS_THREADS)
447       // In the threaded RTS, deadlock detection doesn't work,
448       // so just exit right away.
449       prog_belch("interrupted");
450       releaseCapability(cap);
451       startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
452       RELEASE_LOCK(&sched_mutex);
453       shutdownHaskellAndExit(EXIT_SUCCESS);
454 #else
455       deleteAllThreads();
456 #endif
457     }
458
459     /* Go through the list of main threads and wake up any
460      * clients whose computations have finished.  ToDo: this
461      * should be done more efficiently without a linear scan
462      * of the main threads list, somehow...
463      */
464 #if defined(RTS_SUPPORTS_THREADS)
465     { 
466       StgMainThread *m, **prev;
467       prev = &main_threads;
468       for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
469         switch (m->tso->what_next) {
470         case ThreadComplete:
471           if (m->ret) {
472               // NOTE: return val is tso->sp[1] (see StgStartup.hc)
473               *(m->ret) = (StgClosure *)m->tso->sp[1]; 
474           }
475           *prev = m->link;
476           m->stat = Success;
477           broadcastCondition(&m->wakeup);
478 #ifdef DEBUG
479           removeThreadLabel((StgWord)m->tso);
480 #endif
481           if(m == main_main_thread)
482           {
483               releaseCapability(cap);
484               startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
485               RELEASE_LOCK(&sched_mutex);
486               shutdownHaskellAndExit(EXIT_SUCCESS);
487           }
488           break;
489         case ThreadKilled:
490           if (m->ret) *(m->ret) = NULL;
491           *prev = m->link;
492           if (was_interrupted) {
493             m->stat = Interrupted;
494           } else {
495             m->stat = Killed;
496           }
497           broadcastCondition(&m->wakeup);
498 #ifdef DEBUG
499           removeThreadLabel((StgWord)m->tso);
500 #endif
501           if(m == main_main_thread)
502           {
503               releaseCapability(cap);
504               startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
505               RELEASE_LOCK(&sched_mutex);
506               shutdownHaskellAndExit(EXIT_SUCCESS);
507           }
508           break;
509         default:
510           break;
511         }
512       }
513     }
514
515 #else /* not threaded */
516
517 # if defined(PAR)
518     /* in GUM do this only on the Main PE */
519     if (IAmMainThread)
520 # endif
521     /* If our main thread has finished or been killed, return.
522      */
523     {
524       StgMainThread *m = main_threads;
525       if (m->tso->what_next == ThreadComplete
526           || m->tso->what_next == ThreadKilled) {
527 #ifdef DEBUG
528         removeThreadLabel((StgWord)m->tso);
529 #endif
530         main_threads = main_threads->link;
531         if (m->tso->what_next == ThreadComplete) {
532             // We finished successfully, fill in the return value
533             // NOTE: return val is tso->sp[1] (see StgStartup.hc)
534             if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
535             m->stat = Success;
536             return;
537         } else {
538           if (m->ret) { *(m->ret) = NULL; };
539           if (was_interrupted) {
540             m->stat = Interrupted;
541           } else {
542             m->stat = Killed;
543           }
544           return;
545         }
546       }
547     }
548 #endif
549
550     /* Top up the run queue from our spark pool.  We try to make the
551      * number of threads in the run queue equal to the number of
552      * free capabilities.
553      *
554      * Disable spark support in SMP for now, non-essential & requires
555      * a little bit of work to make it compile cleanly. -- sof 1/02.
556      */
557 #if 0 /* defined(SMP) */
558     {
559       nat n = getFreeCapabilities();
560       StgTSO *tso = run_queue_hd;
561
562       /* Count the run queue */
563       while (n > 0 && tso != END_TSO_QUEUE) {
564         tso = tso->link;
565         n--;
566       }
567
568       for (; n > 0; n--) {
569         StgClosure *spark;
570         spark = findSpark(rtsFalse);
571         if (spark == NULL) {
572           break; /* no more sparks in the pool */
573         } else {
574           /* I'd prefer this to be done in activateSpark -- HWL */
575           /* tricky - it needs to hold the scheduler lock and
576            * not try to re-acquire it -- SDM */
577           createSparkThread(spark);       
578           IF_DEBUG(scheduler,
579                    sched_belch("==^^ turning spark of closure %p into a thread",
580                                (StgClosure *)spark));
581         }
582       }
583       /* We need to wake up the other tasks if we just created some
584        * work for them.
585        */
586       if (getFreeCapabilities() - n > 1) {
587           signalCondition( &thread_ready_cond );
588       }
589     }
590 #endif // SMP
591
592     /* check for signals each time around the scheduler */
593 #if defined(RTS_USER_SIGNALS)
594     if (signals_pending()) {
595       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
596       startSignalHandlers();
597       ACQUIRE_LOCK(&sched_mutex);
598     }
599 #endif
600
601     /* Check whether any waiting threads need to be woken up.  If the
602      * run queue is empty, and there are no other tasks running, we
603      * can wait indefinitely for something to happen.
604      */
605     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
606 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
607                 || EMPTY_RUN_QUEUE()
608 #endif
609         )
610     {
611       awaitEvent( EMPTY_RUN_QUEUE()
612 #if defined(SMP)
613         && allFreeCapabilities()
614 #endif
615         );
616     }
617     /* we can be interrupted while waiting for I/O... */
618     if (interrupted) continue;
619
620     /* 
621      * Detect deadlock: when we have no threads to run, there are no
622      * threads waiting on I/O or sleeping, and all the other tasks are
623      * waiting for work, we must have a deadlock of some description.
624      *
625      * We first try to find threads blocked on themselves (ie. black
626      * holes), and generate NonTermination exceptions where necessary.
627      *
628      * If no threads are black holed, we have a deadlock situation, so
629      * inform all the main threads.
630      */
631 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
632     if (   EMPTY_THREAD_QUEUES()
633 #if defined(RTS_SUPPORTS_THREADS)
634         && EMPTY_QUEUE(suspended_ccalling_threads)
635 #endif
636 #ifdef SMP
637         && allFreeCapabilities()
638 #endif
639         )
640     {
641         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
642 #if defined(THREADED_RTS)
643         /* and SMP mode ..? */
644         releaseCapability(cap);
645 #endif
646         // Garbage collection can release some new threads due to
647         // either (a) finalizers or (b) threads resurrected because
648         // they are about to be send BlockedOnDeadMVar.  Any threads
649         // thus released will be immediately runnable.
650         GarbageCollect(GetRoots,rtsTrue);
651
652         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
653
654         IF_DEBUG(scheduler, 
655                  sched_belch("still deadlocked, checking for black holes..."));
656         detectBlackHoles();
657
658         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
659
660 #if defined(RTS_USER_SIGNALS)
661         /* If we have user-installed signal handlers, then wait
662          * for signals to arrive rather then bombing out with a
663          * deadlock.
664          */
665 #if defined(RTS_SUPPORTS_THREADS)
666         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
667                       a signal with no runnable threads (or I/O
668                       suspended ones) leads nowhere quick.
669                       For now, simply shut down when we reach this
670                       condition.
671                       
672                       ToDo: define precisely under what conditions
673                       the Scheduler should shut down in an MT setting.
674                    */
675 #else
676         if ( anyUserHandlers() ) {
677 #endif
678             IF_DEBUG(scheduler, 
679                      sched_belch("still deadlocked, waiting for signals..."));
680
681             awaitUserSignals();
682
683             // we might be interrupted...
684             if (interrupted) { continue; }
685
686             if (signals_pending()) {
687                 RELEASE_LOCK(&sched_mutex);
688                 startSignalHandlers();
689                 ACQUIRE_LOCK(&sched_mutex);
690             }
691             ASSERT(!EMPTY_RUN_QUEUE());
692             goto not_deadlocked;
693         }
694 #endif
695
696         /* Probably a real deadlock.  Send the current main thread the
697          * Deadlock exception (or in the SMP build, send *all* main
698          * threads the deadlock exception, since none of them can make
699          * progress).
700          */
701         {
702             StgMainThread *m;
703 #if defined(RTS_SUPPORTS_THREADS)
704             for (m = main_threads; m != NULL; m = m->link) {
705                 switch (m->tso->why_blocked) {
706                 case BlockedOnBlackHole:
707                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
708                     break;
709                 case BlockedOnException:
710                 case BlockedOnMVar:
711                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
712                     break;
713                 default:
714                     barf("deadlock: main thread blocked in a strange way");
715                 }
716             }
717 #else
718             m = main_threads;
719             switch (m->tso->why_blocked) {
720             case BlockedOnBlackHole:
721                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
722                 break;
723             case BlockedOnException:
724             case BlockedOnMVar:
725                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
726                 break;
727             default:
728                 barf("deadlock: main thread blocked in a strange way");
729             }
730 #endif
731         }
732
733 #if defined(RTS_SUPPORTS_THREADS)
734         /* ToDo: revisit conditions (and mechanism) for shutting
735            down a multi-threaded world  */
736         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
737         RELEASE_LOCK(&sched_mutex);
738         shutdownHaskell();
739         return;
740 #endif
741     }
742   not_deadlocked:
743
744 #elif defined(RTS_SUPPORTS_THREADS)
745     /* ToDo: add deadlock detection in threaded RTS */
746 #elif defined(PAR)
747     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
748 #endif
749
750 #if defined(SMP)
751     /* If there's a GC pending, don't do anything until it has
752      * completed.
753      */
754     if (ready_to_gc) {
755       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
756       waitCondition( &gc_pending_cond, &sched_mutex );
757     }
758 #endif    
759
760 #if defined(RTS_SUPPORTS_THREADS)
761 #if defined(SMP)
762     /* block until we've got a thread on the run queue and a free
763      * capability.
764      *
765      */
766     if ( EMPTY_RUN_QUEUE() ) {
767       /* Give up our capability */
768       releaseCapability(cap);
769
770       /* If we're in the process of shutting down (& running the
771        * a batch of finalisers), don't wait around.
772        */
773       if ( shutting_down_scheduler ) {
774         RELEASE_LOCK(&sched_mutex);
775         return;
776       }
777       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
778       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
779       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
780     }
781 #else
782     if ( EMPTY_RUN_QUEUE() ) {
783       continue; // nothing to do
784     }
785 #endif
786 #endif
787
788 #if defined(GRAN)
789     if (RtsFlags.GranFlags.Light)
790       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
791
792     /* adjust time based on time-stamp */
793     if (event->time > CurrentTime[CurrentProc] &&
794         event->evttype != ContinueThread)
795       CurrentTime[CurrentProc] = event->time;
796     
797     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
798     if (!RtsFlags.GranFlags.Light)
799       handleIdlePEs();
800
801     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
802
803     /* main event dispatcher in GranSim */
804     switch (event->evttype) {
805       /* Should just be continuing execution */
806     case ContinueThread:
807       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
808       /* ToDo: check assertion
809       ASSERT(run_queue_hd != (StgTSO*)NULL &&
810              run_queue_hd != END_TSO_QUEUE);
811       */
812       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
813       if (!RtsFlags.GranFlags.DoAsyncFetch &&
814           procStatus[CurrentProc]==Fetching) {
815         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
816               CurrentTSO->id, CurrentTSO, CurrentProc);
817         goto next_thread;
818       } 
819       /* Ignore ContinueThreads for completed threads */
820       if (CurrentTSO->what_next == ThreadComplete) {
821         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
822               CurrentTSO->id, CurrentTSO, CurrentProc);
823         goto next_thread;
824       } 
825       /* Ignore ContinueThreads for threads that are being migrated */
826       if (PROCS(CurrentTSO)==Nowhere) { 
827         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
828               CurrentTSO->id, CurrentTSO, CurrentProc);
829         goto next_thread;
830       }
831       /* The thread should be at the beginning of the run queue */
832       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
833         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
834               CurrentTSO->id, CurrentTSO, CurrentProc);
835         break; // run the thread anyway
836       }
837       /*
838       new_event(proc, proc, CurrentTime[proc],
839                 FindWork,
840                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
841       goto next_thread; 
842       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
843       break; // now actually run the thread; DaH Qu'vam yImuHbej 
844
845     case FetchNode:
846       do_the_fetchnode(event);
847       goto next_thread;             /* handle next event in event queue  */
848       
849     case GlobalBlock:
850       do_the_globalblock(event);
851       goto next_thread;             /* handle next event in event queue  */
852       
853     case FetchReply:
854       do_the_fetchreply(event);
855       goto next_thread;             /* handle next event in event queue  */
856       
857     case UnblockThread:   /* Move from the blocked queue to the tail of */
858       do_the_unblock(event);
859       goto next_thread;             /* handle next event in event queue  */
860       
861     case ResumeThread:  /* Move from the blocked queue to the tail of */
862       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
863       event->tso->gran.blocktime += 
864         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
865       do_the_startthread(event);
866       goto next_thread;             /* handle next event in event queue  */
867       
868     case StartThread:
869       do_the_startthread(event);
870       goto next_thread;             /* handle next event in event queue  */
871       
872     case MoveThread:
873       do_the_movethread(event);
874       goto next_thread;             /* handle next event in event queue  */
875       
876     case MoveSpark:
877       do_the_movespark(event);
878       goto next_thread;             /* handle next event in event queue  */
879       
880     case FindWork:
881       do_the_findwork(event);
882       goto next_thread;             /* handle next event in event queue  */
883       
884     default:
885       barf("Illegal event type %u\n", event->evttype);
886     }  /* switch */
887     
888     /* This point was scheduler_loop in the old RTS */
889
890     IF_DEBUG(gran, belch("GRAN: after main switch"));
891
892     TimeOfLastEvent = CurrentTime[CurrentProc];
893     TimeOfNextEvent = get_time_of_next_event();
894     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
895     // CurrentTSO = ThreadQueueHd;
896
897     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
898                          TimeOfNextEvent));
899
900     if (RtsFlags.GranFlags.Light) 
901       GranSimLight_leave_system(event, &ActiveTSO); 
902
903     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
904
905     IF_DEBUG(gran, 
906              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
907
908     /* in a GranSim setup the TSO stays on the run queue */
909     t = CurrentTSO;
910     /* Take a thread from the run queue. */
911     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
912
913     IF_DEBUG(gran, 
914              fprintf(stderr, "GRAN: About to run current thread, which is\n");
915              G_TSO(t,5));
916
917     context_switch = 0; // turned on via GranYield, checking events and time slice
918
919     IF_DEBUG(gran, 
920              DumpGranEvent(GR_SCHEDULE, t));
921
922     procStatus[CurrentProc] = Busy;
923
924 #elif defined(PAR)
925     if (PendingFetches != END_BF_QUEUE) {
926         processFetches();
927     }
928
929     /* ToDo: phps merge with spark activation above */
930     /* check whether we have local work and send requests if we have none */
931     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
932       /* :-[  no local threads => look out for local sparks */
933       /* the spark pool for the current PE */
934       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
935       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
936           pool->hd < pool->tl) {
937         /* 
938          * ToDo: add GC code check that we really have enough heap afterwards!!
939          * Old comment:
940          * If we're here (no runnable threads) and we have pending
941          * sparks, we must have a space problem.  Get enough space
942          * to turn one of those pending sparks into a
943          * thread... 
944          */
945
946         spark = findSpark(rtsFalse);                /* get a spark */
947         if (spark != (rtsSpark) NULL) {
948           tso = activateSpark(spark);       /* turn the spark into a thread */
949           IF_PAR_DEBUG(schedule,
950                        belch("==== schedule: Created TSO %d (%p); %d threads active",
951                              tso->id, tso, advisory_thread_count));
952
953           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
954             belch("==^^ failed to activate spark");
955             goto next_thread;
956           }               /* otherwise fall through & pick-up new tso */
957         } else {
958           IF_PAR_DEBUG(verbose,
959                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
960                              spark_queue_len(pool)));
961           goto next_thread;
962         }
963       }
964
965       /* If we still have no work we need to send a FISH to get a spark
966          from another PE 
967       */
968       if (EMPTY_RUN_QUEUE()) {
969       /* =8-[  no local sparks => look for work on other PEs */
970         /*
971          * We really have absolutely no work.  Send out a fish
972          * (there may be some out there already), and wait for
973          * something to arrive.  We clearly can't run any threads
974          * until a SCHEDULE or RESUME arrives, and so that's what
975          * we're hoping to see.  (Of course, we still have to
976          * respond to other types of messages.)
977          */
978         TIME now = msTime() /*CURRENT_TIME*/;
979         IF_PAR_DEBUG(verbose, 
980                      belch("--  now=%ld", now));
981         IF_PAR_DEBUG(verbose,
982                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
983                          (last_fish_arrived_at!=0 &&
984                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
985                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
986                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
987                              last_fish_arrived_at,
988                              RtsFlags.ParFlags.fishDelay, now);
989                      });
990         
991         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
992             (last_fish_arrived_at==0 ||
993              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
994           /* outstandingFishes is set in sendFish, processFish;
995              avoid flooding system with fishes via delay */
996           pe = choosePE();
997           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
998                    NEW_FISH_HUNGER);
999
1000           // Global statistics: count no. of fishes
1001           if (RtsFlags.ParFlags.ParStats.Global &&
1002               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1003             globalParStats.tot_fish_mess++;
1004           }
1005         }
1006       
1007         receivedFinish = processMessages();
1008         goto next_thread;
1009       }
1010     } else if (PacketsWaiting()) {  /* Look for incoming messages */
1011       receivedFinish = processMessages();
1012     }
1013
1014     /* Now we are sure that we have some work available */
1015     ASSERT(run_queue_hd != END_TSO_QUEUE);
1016
1017     /* Take a thread from the run queue, if we have work */
1018     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
1019     IF_DEBUG(sanity,checkTSO(t));
1020
1021     /* ToDo: write something to the log-file
1022     if (RTSflags.ParFlags.granSimStats && !sameThread)
1023         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1024
1025     CurrentTSO = t;
1026     */
1027     /* the spark pool for the current PE */
1028     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1029
1030     IF_DEBUG(scheduler, 
1031              belch("--=^ %d threads, %d sparks on [%#x]", 
1032                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1033
1034 # if 1
1035     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1036         t && LastTSO && t->id != LastTSO->id && 
1037         LastTSO->why_blocked == NotBlocked && 
1038         LastTSO->what_next != ThreadComplete) {
1039       // if previously scheduled TSO not blocked we have to record the context switch
1040       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1041                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1042     }
1043
1044     if (RtsFlags.ParFlags.ParStats.Full && 
1045         (emitSchedule /* forced emit */ ||
1046         (t && LastTSO && t->id != LastTSO->id))) {
1047       /* 
1048          we are running a different TSO, so write a schedule event to log file
1049          NB: If we use fair scheduling we also have to write  a deschedule 
1050              event for LastTSO; with unfair scheduling we know that the
1051              previous tso has blocked whenever we switch to another tso, so
1052              we don't need it in GUM for now
1053       */
1054       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1055                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1056       emitSchedule = rtsFalse;
1057     }
1058      
1059 # endif
1060 #else /* !GRAN && !PAR */
1061   
1062     /* grab a thread from the run queue */
1063     ASSERT(run_queue_hd != END_TSO_QUEUE);
1064     t = POP_RUN_QUEUE();
1065     // Sanity check the thread we're about to run.  This can be
1066     // expensive if there is lots of thread switching going on...
1067     IF_DEBUG(sanity,checkTSO(t));
1068 #endif
1069
1070     cap->r.rCurrentTSO = t;
1071     
1072     /* context switches are now initiated by the timer signal, unless
1073      * the user specified "context switch as often as possible", with
1074      * +RTS -C0
1075      */
1076     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1077          && (run_queue_hd != END_TSO_QUEUE
1078              || blocked_queue_hd != END_TSO_QUEUE
1079              || sleeping_queue != END_TSO_QUEUE)))
1080         context_switch = 1;
1081     else
1082         context_switch = 0;
1083
1084 run_thread:
1085
1086     RELEASE_LOCK(&sched_mutex);
1087
1088     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
1089                               t->id, whatNext_strs[t->what_next]));
1090
1091 #ifdef PROFILING
1092     startHeapProfTimer();
1093 #endif
1094
1095     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1096     /* Run the current thread 
1097      */
1098     prev_what_next = t->what_next;
1099     switch (prev_what_next) {
1100     case ThreadKilled:
1101     case ThreadComplete:
1102         /* Thread already finished, return to scheduler. */
1103         ret = ThreadFinished;
1104         break;
1105     case ThreadRunGHC:
1106         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1107         break;
1108     case ThreadInterpret:
1109         ret = interpretBCO(cap);
1110         break;
1111     default:
1112       barf("schedule: invalid what_next field");
1113     }
1114     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1115     
1116     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1117 #ifdef PROFILING
1118     stopHeapProfTimer();
1119     CCCS = CCS_SYSTEM;
1120 #endif
1121     
1122     ACQUIRE_LOCK(&sched_mutex);
1123     
1124 #ifdef RTS_SUPPORTS_THREADS
1125     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1126 #elif !defined(GRAN) && !defined(PAR)
1127     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1128 #endif
1129     t = cap->r.rCurrentTSO;
1130     
1131 #if defined(PAR)
1132     /* HACK 675: if the last thread didn't yield, make sure to print a 
1133        SCHEDULE event to the log file when StgRunning the next thread, even
1134        if it is the same one as before */
1135     LastTSO = t; 
1136     TimeOfLastYield = CURRENT_TIME;
1137 #endif
1138
1139     switch (ret) {
1140     case HeapOverflow:
1141 #if defined(GRAN)
1142       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1143       globalGranStats.tot_heapover++;
1144 #elif defined(PAR)
1145       globalParStats.tot_heapover++;
1146 #endif
1147
1148       // did the task ask for a large block?
1149       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1150           // if so, get one and push it on the front of the nursery.
1151           bdescr *bd;
1152           nat blocks;
1153           
1154           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1155
1156           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
1157                                    t->id, whatNext_strs[t->what_next], blocks));
1158
1159           // don't do this if it would push us over the
1160           // alloc_blocks_lim limit; we'll GC first.
1161           if (alloc_blocks + blocks < alloc_blocks_lim) {
1162
1163               alloc_blocks += blocks;
1164               bd = allocGroup( blocks );
1165
1166               // link the new group into the list
1167               bd->link = cap->r.rCurrentNursery;
1168               bd->u.back = cap->r.rCurrentNursery->u.back;
1169               if (cap->r.rCurrentNursery->u.back != NULL) {
1170                   cap->r.rCurrentNursery->u.back->link = bd;
1171               } else {
1172                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1173                          g0s0->blocks == cap->r.rNursery);
1174                   cap->r.rNursery = g0s0->blocks = bd;
1175               }           
1176               cap->r.rCurrentNursery->u.back = bd;
1177
1178               // initialise it as a nursery block.  We initialise the
1179               // step, gen_no, and flags field of *every* sub-block in
1180               // this large block, because this is easier than making
1181               // sure that we always find the block head of a large
1182               // block whenever we call Bdescr() (eg. evacuate() and
1183               // isAlive() in the GC would both have to do this, at
1184               // least).
1185               { 
1186                   bdescr *x;
1187                   for (x = bd; x < bd + blocks; x++) {
1188                       x->step = g0s0;
1189                       x->gen_no = 0;
1190                       x->flags = 0;
1191                   }
1192               }
1193
1194               // don't forget to update the block count in g0s0.
1195               g0s0->n_blocks += blocks;
1196               // This assert can be a killer if the app is doing lots
1197               // of large block allocations.
1198               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1199
1200               // now update the nursery to point to the new block
1201               cap->r.rCurrentNursery = bd;
1202
1203               // we might be unlucky and have another thread get on the
1204               // run queue before us and steal the large block, but in that
1205               // case the thread will just end up requesting another large
1206               // block.
1207               PUSH_ON_RUN_QUEUE(t);
1208               break;
1209           }
1210       }
1211
1212       /* make all the running tasks block on a condition variable,
1213        * maybe set context_switch and wait till they all pile in,
1214        * then have them wait on a GC condition variable.
1215        */
1216       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1217                                t->id, whatNext_strs[t->what_next]));
1218       threadPaused(t);
1219 #if defined(GRAN)
1220       ASSERT(!is_on_queue(t,CurrentProc));
1221 #elif defined(PAR)
1222       /* Currently we emit a DESCHEDULE event before GC in GUM.
1223          ToDo: either add separate event to distinguish SYSTEM time from rest
1224                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1225       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1226         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1227                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1228         emitSchedule = rtsTrue;
1229       }
1230 #endif
1231       
1232       ready_to_gc = rtsTrue;
1233       context_switch = 1;               /* stop other threads ASAP */
1234       PUSH_ON_RUN_QUEUE(t);
1235       /* actual GC is done at the end of the while loop */
1236       break;
1237       
1238     case StackOverflow:
1239 #if defined(GRAN)
1240       IF_DEBUG(gran, 
1241                DumpGranEvent(GR_DESCHEDULE, t));
1242       globalGranStats.tot_stackover++;
1243 #elif defined(PAR)
1244       // IF_DEBUG(par, 
1245       // DumpGranEvent(GR_DESCHEDULE, t);
1246       globalParStats.tot_stackover++;
1247 #endif
1248       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1249                                t->id, whatNext_strs[t->what_next]));
1250       /* just adjust the stack for this thread, then pop it back
1251        * on the run queue.
1252        */
1253       threadPaused(t);
1254       { 
1255         StgMainThread *m;
1256         /* enlarge the stack */
1257         StgTSO *new_t = threadStackOverflow(t);
1258         
1259         /* This TSO has moved, so update any pointers to it from the
1260          * main thread stack.  It better not be on any other queues...
1261          * (it shouldn't be).
1262          */
1263         for (m = main_threads; m != NULL; m = m->link) {
1264           if (m->tso == t) {
1265             m->tso = new_t;
1266           }
1267         }
1268         threadPaused(new_t);
1269         PUSH_ON_RUN_QUEUE(new_t);
1270       }
1271       break;
1272
1273     case ThreadYielding:
1274 #if defined(GRAN)
1275       IF_DEBUG(gran, 
1276                DumpGranEvent(GR_DESCHEDULE, t));
1277       globalGranStats.tot_yields++;
1278 #elif defined(PAR)
1279       // IF_DEBUG(par, 
1280       // DumpGranEvent(GR_DESCHEDULE, t);
1281       globalParStats.tot_yields++;
1282 #endif
1283       /* put the thread back on the run queue.  Then, if we're ready to
1284        * GC, check whether this is the last task to stop.  If so, wake
1285        * up the GC thread.  getThread will block during a GC until the
1286        * GC is finished.
1287        */
1288       IF_DEBUG(scheduler,
1289                if (t->what_next != prev_what_next) {
1290                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1291                          t->id, whatNext_strs[t->what_next]);
1292                } else {
1293                    belch("--<< thread %ld (%s) stopped, yielding", 
1294                          t->id, whatNext_strs[t->what_next]);
1295                }
1296                );
1297
1298       IF_DEBUG(sanity,
1299                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1300                checkTSO(t));
1301       ASSERT(t->link == END_TSO_QUEUE);
1302
1303       // Shortcut if we're just switching evaluators: don't bother
1304       // doing stack squeezing (which can be expensive), just run the
1305       // thread.
1306       if (t->what_next != prev_what_next) {
1307           goto run_thread;
1308       }
1309
1310       threadPaused(t);
1311
1312 #if defined(GRAN)
1313       ASSERT(!is_on_queue(t,CurrentProc));
1314
1315       IF_DEBUG(sanity,
1316                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1317                checkThreadQsSanity(rtsTrue));
1318 #endif
1319
1320 #if defined(PAR)
1321       if (RtsFlags.ParFlags.doFairScheduling) { 
1322         /* this does round-robin scheduling; good for concurrency */
1323         APPEND_TO_RUN_QUEUE(t);
1324       } else {
1325         /* this does unfair scheduling; good for parallelism */
1326         PUSH_ON_RUN_QUEUE(t);
1327       }
1328 #else
1329       // this does round-robin scheduling; good for concurrency
1330       APPEND_TO_RUN_QUEUE(t);
1331 #endif
1332
1333 #if defined(GRAN)
1334       /* add a ContinueThread event to actually process the thread */
1335       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1336                 ContinueThread,
1337                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1338       IF_GRAN_DEBUG(bq, 
1339                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1340                G_EVENTQ(0);
1341                G_CURR_THREADQ(0));
1342 #endif /* GRAN */
1343       break;
1344
1345     case ThreadBlocked:
1346 #if defined(GRAN)
1347       IF_DEBUG(scheduler,
1348                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1349                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1350                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1351
1352       // ??? needed; should emit block before
1353       IF_DEBUG(gran, 
1354                DumpGranEvent(GR_DESCHEDULE, t)); 
1355       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1356       /*
1357         ngoq Dogh!
1358       ASSERT(procStatus[CurrentProc]==Busy || 
1359               ((procStatus[CurrentProc]==Fetching) && 
1360               (t->block_info.closure!=(StgClosure*)NULL)));
1361       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1362           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1363             procStatus[CurrentProc]==Fetching)) 
1364         procStatus[CurrentProc] = Idle;
1365       */
1366 #elif defined(PAR)
1367       IF_DEBUG(scheduler,
1368                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1369                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1370       IF_PAR_DEBUG(bq,
1371
1372                    if (t->block_info.closure!=(StgClosure*)NULL) 
1373                      print_bq(t->block_info.closure));
1374
1375       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1376       blockThread(t);
1377
1378       /* whatever we schedule next, we must log that schedule */
1379       emitSchedule = rtsTrue;
1380
1381 #else /* !GRAN */
1382       /* don't need to do anything.  Either the thread is blocked on
1383        * I/O, in which case we'll have called addToBlockedQueue
1384        * previously, or it's blocked on an MVar or Blackhole, in which
1385        * case it'll be on the relevant queue already.
1386        */
1387       IF_DEBUG(scheduler,
1388                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1389                        t->id, whatNext_strs[t->what_next]);
1390                printThreadBlockage(t);
1391                fprintf(stderr, "\n"));
1392
1393       /* Only for dumping event to log file 
1394          ToDo: do I need this in GranSim, too?
1395       blockThread(t);
1396       */
1397 #endif
1398       threadPaused(t);
1399       break;
1400       
1401     case ThreadFinished:
1402       /* Need to check whether this was a main thread, and if so, signal
1403        * the task that started it with the return value.  If we have no
1404        * more main threads, we probably need to stop all the tasks until
1405        * we get a new one.
1406        */
1407       /* We also end up here if the thread kills itself with an
1408        * uncaught exception, see Exception.hc.
1409        */
1410       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1411                                t->id, whatNext_strs[t->what_next]));
1412 #if defined(GRAN)
1413       endThread(t, CurrentProc); // clean-up the thread
1414 #elif defined(PAR)
1415       /* For now all are advisory -- HWL */
1416       //if(t->priority==AdvisoryPriority) ??
1417       advisory_thread_count--;
1418       
1419 # ifdef DIST
1420       if(t->dist.priority==RevalPriority)
1421         FinishReval(t);
1422 # endif
1423       
1424       if (RtsFlags.ParFlags.ParStats.Full &&
1425           !RtsFlags.ParFlags.ParStats.Suppressed) 
1426         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1427 #endif
1428       break;
1429       
1430     default:
1431       barf("schedule: invalid thread return code %d", (int)ret);
1432     }
1433
1434 #ifdef PROFILING
1435     // When we have +RTS -i0 and we're heap profiling, do a census at
1436     // every GC.  This lets us get repeatable runs for debugging.
1437     if (performHeapProfile ||
1438         (RtsFlags.ProfFlags.profileInterval==0 &&
1439          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1440         GarbageCollect(GetRoots, rtsTrue);
1441         heapCensus();
1442         performHeapProfile = rtsFalse;
1443         ready_to_gc = rtsFalse; // we already GC'd
1444     }
1445 #endif
1446
1447     if (ready_to_gc 
1448 #ifdef SMP
1449         && allFreeCapabilities() 
1450 #endif
1451         ) {
1452       /* everybody back, start the GC.
1453        * Could do it in this thread, or signal a condition var
1454        * to do it in another thread.  Either way, we need to
1455        * broadcast on gc_pending_cond afterward.
1456        */
1457 #if defined(RTS_SUPPORTS_THREADS)
1458       IF_DEBUG(scheduler,sched_belch("doing GC"));
1459 #endif
1460       GarbageCollect(GetRoots,rtsFalse);
1461       ready_to_gc = rtsFalse;
1462 #ifdef SMP
1463       broadcastCondition(&gc_pending_cond);
1464 #endif
1465 #if defined(GRAN)
1466       /* add a ContinueThread event to continue execution of current thread */
1467       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1468                 ContinueThread,
1469                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1470       IF_GRAN_DEBUG(bq, 
1471                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1472                G_EVENTQ(0);
1473                G_CURR_THREADQ(0));
1474 #endif /* GRAN */
1475     }
1476
1477 #if defined(GRAN)
1478   next_thread:
1479     IF_GRAN_DEBUG(unused,
1480                   print_eventq(EventHd));
1481
1482     event = get_next_event();
1483 #elif defined(PAR)
1484   next_thread:
1485     /* ToDo: wait for next message to arrive rather than busy wait */
1486 #endif /* GRAN */
1487
1488   } /* end of while(1) */
1489
1490   IF_PAR_DEBUG(verbose,
1491                belch("== Leaving schedule() after having received Finish"));
1492 }
1493
1494 /* ---------------------------------------------------------------------------
1495  * Singleton fork(). Do not copy any running threads.
1496  * ------------------------------------------------------------------------- */
1497
1498 StgInt
1499 forkProcess(StgTSO* tso)
1500 {
1501 #ifndef mingw32_TARGET_OS
1502   pid_t pid;
1503   StgTSO* t,*next;
1504   StgMainThread *m;
1505   rtsBool doKill;
1506
1507   IF_DEBUG(scheduler,sched_belch("forking!"));
1508
1509   pid = fork();
1510   if (pid) { /* parent */
1511
1512   /* just return the pid */
1513     
1514   } else { /* child */
1515   /* wipe all other threads */
1516   run_queue_hd = run_queue_tl = tso;
1517   tso->link = END_TSO_QUEUE;
1518
1519   /* When clearing out the threads, we need to ensure
1520      that a 'main thread' is left behind; if there isn't,
1521      the Scheduler will shutdown next time it is entered.
1522      
1523      ==> we don't kill a thread that's on the main_threads
1524          list (nor the current thread.)
1525     
1526      [ Attempts at implementing the more ambitious scheme of
1527        killing the main_threads also, and then adding the
1528        current thread onto the main_threads list if it wasn't
1529        there already, failed -- waitThread() (for one) wasn't
1530        up to it. If it proves to be desirable to also kill
1531        the main threads, then this scheme will have to be
1532        revisited (and fully debugged!)
1533        
1534        -- sof 7/2002
1535      ]
1536   */
1537   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1538      us is picky about finding the thread still in its queue when
1539      handling the deleteThread() */
1540
1541   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1542     next = t->link;
1543     
1544     /* Don't kill the current thread.. */
1545     if (t->id == tso->id) continue;
1546     doKill=rtsTrue;
1547     /* ..or a main thread */
1548     for (m = main_threads; m != NULL; m = m->link) {
1549         if (m->tso->id == t->id) {
1550           doKill=rtsFalse;
1551           break;
1552         }
1553     }
1554     if (doKill) {
1555       deleteThread(t);
1556     }
1557   }
1558   }
1559   return pid;
1560 #else /* mingw32 */
1561   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1562   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1563   return -1;
1564 #endif /* mingw32 */
1565 }
1566
1567 /* ---------------------------------------------------------------------------
1568  * deleteAllThreads():  kill all the live threads.
1569  *
1570  * This is used when we catch a user interrupt (^C), before performing
1571  * any necessary cleanups and running finalizers.
1572  *
1573  * Locks: sched_mutex held.
1574  * ------------------------------------------------------------------------- */
1575    
1576 void
1577 deleteAllThreads ( void )
1578 {
1579   StgTSO* t, *next;
1580   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1581   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1582       next = t->global_link;
1583       deleteThread(t);
1584   }      
1585   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1586   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1587   sleeping_queue = END_TSO_QUEUE;
1588 }
1589
1590 /* startThread and  insertThread are now in GranSim.c -- HWL */
1591
1592
1593 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1594 //@subsection Suspend and Resume
1595
1596 /* ---------------------------------------------------------------------------
1597  * Suspending & resuming Haskell threads.
1598  * 
1599  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1600  * its capability before calling the C function.  This allows another
1601  * task to pick up the capability and carry on running Haskell
1602  * threads.  It also means that if the C call blocks, it won't lock
1603  * the whole system.
1604  *
1605  * The Haskell thread making the C call is put to sleep for the
1606  * duration of the call, on the susepended_ccalling_threads queue.  We
1607  * give out a token to the task, which it can use to resume the thread
1608  * on return from the C function.
1609  * ------------------------------------------------------------------------- */
1610    
1611 StgInt
1612 suspendThread( StgRegTable *reg, 
1613                rtsBool concCall
1614 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1615                STG_UNUSED
1616 #endif
1617                )
1618 {
1619   nat tok;
1620   Capability *cap;
1621
1622   /* assume that *reg is a pointer to the StgRegTable part
1623    * of a Capability.
1624    */
1625   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1626
1627   ACQUIRE_LOCK(&sched_mutex);
1628
1629   IF_DEBUG(scheduler,
1630            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1631
1632   // XXX this might not be necessary --SDM
1633   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1634
1635   threadPaused(cap->r.rCurrentTSO);
1636   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1637   suspended_ccalling_threads = cap->r.rCurrentTSO;
1638
1639 #if defined(RTS_SUPPORTS_THREADS)
1640   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1641   {
1642       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1643       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1644   }
1645   else
1646   {
1647       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1648   }
1649 #endif
1650
1651   /* Use the thread ID as the token; it should be unique */
1652   tok = cap->r.rCurrentTSO->id;
1653
1654   /* Hand back capability */
1655   releaseCapability(cap);
1656   
1657 #if defined(RTS_SUPPORTS_THREADS)
1658   /* Preparing to leave the RTS, so ensure there's a native thread/task
1659      waiting to take over.
1660   */
1661   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
1662   //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
1663       startTask(taskStart);
1664   //}
1665 #endif
1666
1667   /* Other threads _might_ be available for execution; signal this */
1668   THREAD_RUNNABLE();
1669   RELEASE_LOCK(&sched_mutex);
1670   return tok; 
1671 }
1672
1673 StgRegTable *
1674 resumeThread( StgInt tok,
1675               rtsBool concCall STG_UNUSED )
1676 {
1677   StgTSO *tso, **prev;
1678   Capability *cap;
1679
1680 #if defined(RTS_SUPPORTS_THREADS)
1681   /* Wait for permission to re-enter the RTS with the result. */
1682   ACQUIRE_LOCK(&sched_mutex);
1683   grabReturnCapability(&sched_mutex, &cap);
1684
1685   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
1686 #else
1687   grabCapability(&cap);
1688 #endif
1689
1690   /* Remove the thread off of the suspended list */
1691   prev = &suspended_ccalling_threads;
1692   for (tso = suspended_ccalling_threads; 
1693        tso != END_TSO_QUEUE; 
1694        prev = &tso->link, tso = tso->link) {
1695     if (tso->id == (StgThreadID)tok) {
1696       *prev = tso->link;
1697       break;
1698     }
1699   }
1700   if (tso == END_TSO_QUEUE) {
1701     barf("resumeThread: thread not found");
1702   }
1703   tso->link = END_TSO_QUEUE;
1704   
1705 #if defined(RTS_SUPPORTS_THREADS)
1706   if(tso->why_blocked == BlockedOnCCall)
1707   {
1708       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1709       tso->blocked_exceptions = NULL;
1710   }
1711 #endif
1712   
1713   /* Reset blocking status */
1714   tso->why_blocked  = NotBlocked;
1715
1716   cap->r.rCurrentTSO = tso;
1717 #if defined(RTS_SUPPORTS_THREADS)
1718   RELEASE_LOCK(&sched_mutex);
1719 #endif
1720   return &cap->r;
1721 }
1722
1723
1724 /* ---------------------------------------------------------------------------
1725  * Static functions
1726  * ------------------------------------------------------------------------ */
1727 static void unblockThread(StgTSO *tso);
1728
1729 /* ---------------------------------------------------------------------------
1730  * Comparing Thread ids.
1731  *
1732  * This is used from STG land in the implementation of the
1733  * instances of Eq/Ord for ThreadIds.
1734  * ------------------------------------------------------------------------ */
1735
1736 int
1737 cmp_thread(StgPtr tso1, StgPtr tso2) 
1738
1739   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1740   StgThreadID id2 = ((StgTSO *)tso2)->id;
1741  
1742   if (id1 < id2) return (-1);
1743   if (id1 > id2) return 1;
1744   return 0;
1745 }
1746
1747 /* ---------------------------------------------------------------------------
1748  * Fetching the ThreadID from an StgTSO.
1749  *
1750  * This is used in the implementation of Show for ThreadIds.
1751  * ------------------------------------------------------------------------ */
1752 int
1753 rts_getThreadId(StgPtr tso) 
1754 {
1755   return ((StgTSO *)tso)->id;
1756 }
1757
1758 #ifdef DEBUG
1759 void
1760 labelThread(StgPtr tso, char *label)
1761 {
1762   int len;
1763   void *buf;
1764
1765   /* Caveat: Once set, you can only set the thread name to "" */
1766   len = strlen(label)+1;
1767   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1768   strncpy(buf,label,len);
1769   /* Update will free the old memory for us */
1770   updateThreadLabel((StgWord)tso,buf);
1771 }
1772 #endif /* DEBUG */
1773
1774 /* ---------------------------------------------------------------------------
1775    Create a new thread.
1776
1777    The new thread starts with the given stack size.  Before the
1778    scheduler can run, however, this thread needs to have a closure
1779    (and possibly some arguments) pushed on its stack.  See
1780    pushClosure() in Schedule.h.
1781
1782    createGenThread() and createIOThread() (in SchedAPI.h) are
1783    convenient packaged versions of this function.
1784
1785    currently pri (priority) is only used in a GRAN setup -- HWL
1786    ------------------------------------------------------------------------ */
1787 //@cindex createThread
1788 #if defined(GRAN)
1789 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1790 StgTSO *
1791 createThread(nat size, StgInt pri)
1792 #else
1793 StgTSO *
1794 createThread(nat size)
1795 #endif
1796 {
1797
1798     StgTSO *tso;
1799     nat stack_size;
1800
1801     /* First check whether we should create a thread at all */
1802 #if defined(PAR)
1803   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1804   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1805     threadsIgnored++;
1806     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1807           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1808     return END_TSO_QUEUE;
1809   }
1810   threadsCreated++;
1811 #endif
1812
1813 #if defined(GRAN)
1814   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1815 #endif
1816
1817   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1818
1819   /* catch ridiculously small stack sizes */
1820   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1821     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1822   }
1823
1824   stack_size = size - TSO_STRUCT_SIZEW;
1825
1826   tso = (StgTSO *)allocate(size);
1827   TICK_ALLOC_TSO(stack_size, 0);
1828
1829   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1830 #if defined(GRAN)
1831   SET_GRAN_HDR(tso, ThisPE);
1832 #endif
1833
1834   // Always start with the compiled code evaluator
1835   tso->what_next = ThreadRunGHC;
1836
1837   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1838    * protect the increment operation on next_thread_id.
1839    * In future, we could use an atomic increment instead.
1840    */
1841   ACQUIRE_LOCK(&thread_id_mutex);
1842   tso->id = next_thread_id++; 
1843   RELEASE_LOCK(&thread_id_mutex);
1844
1845   tso->why_blocked  = NotBlocked;
1846   tso->blocked_exceptions = NULL;
1847
1848   tso->stack_size   = stack_size;
1849   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1850                               - TSO_STRUCT_SIZEW;
1851   tso->sp           = (P_)&(tso->stack) + stack_size;
1852
1853 #ifdef PROFILING
1854   tso->prof.CCCS = CCS_MAIN;
1855 #endif
1856
1857   /* put a stop frame on the stack */
1858   tso->sp -= sizeofW(StgStopFrame);
1859   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1860   // ToDo: check this
1861 #if defined(GRAN)
1862   tso->link = END_TSO_QUEUE;
1863   /* uses more flexible routine in GranSim */
1864   insertThread(tso, CurrentProc);
1865 #else
1866   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1867    * from its creation
1868    */
1869 #endif
1870
1871 #if defined(GRAN) 
1872   if (RtsFlags.GranFlags.GranSimStats.Full) 
1873     DumpGranEvent(GR_START,tso);
1874 #elif defined(PAR)
1875   if (RtsFlags.ParFlags.ParStats.Full) 
1876     DumpGranEvent(GR_STARTQ,tso);
1877   /* HACk to avoid SCHEDULE 
1878      LastTSO = tso; */
1879 #endif
1880
1881   /* Link the new thread on the global thread list.
1882    */
1883   tso->global_link = all_threads;
1884   all_threads = tso;
1885
1886 #if defined(DIST)
1887   tso->dist.priority = MandatoryPriority; //by default that is...
1888 #endif
1889
1890 #if defined(GRAN)
1891   tso->gran.pri = pri;
1892 # if defined(DEBUG)
1893   tso->gran.magic = TSO_MAGIC; // debugging only
1894 # endif
1895   tso->gran.sparkname   = 0;
1896   tso->gran.startedat   = CURRENT_TIME; 
1897   tso->gran.exported    = 0;
1898   tso->gran.basicblocks = 0;
1899   tso->gran.allocs      = 0;
1900   tso->gran.exectime    = 0;
1901   tso->gran.fetchtime   = 0;
1902   tso->gran.fetchcount  = 0;
1903   tso->gran.blocktime   = 0;
1904   tso->gran.blockcount  = 0;
1905   tso->gran.blockedat   = 0;
1906   tso->gran.globalsparks = 0;
1907   tso->gran.localsparks  = 0;
1908   if (RtsFlags.GranFlags.Light)
1909     tso->gran.clock  = Now; /* local clock */
1910   else
1911     tso->gran.clock  = 0;
1912
1913   IF_DEBUG(gran,printTSO(tso));
1914 #elif defined(PAR)
1915 # if defined(DEBUG)
1916   tso->par.magic = TSO_MAGIC; // debugging only
1917 # endif
1918   tso->par.sparkname   = 0;
1919   tso->par.startedat   = CURRENT_TIME; 
1920   tso->par.exported    = 0;
1921   tso->par.basicblocks = 0;
1922   tso->par.allocs      = 0;
1923   tso->par.exectime    = 0;
1924   tso->par.fetchtime   = 0;
1925   tso->par.fetchcount  = 0;
1926   tso->par.blocktime   = 0;
1927   tso->par.blockcount  = 0;
1928   tso->par.blockedat   = 0;
1929   tso->par.globalsparks = 0;
1930   tso->par.localsparks  = 0;
1931 #endif
1932
1933 #if defined(GRAN)
1934   globalGranStats.tot_threads_created++;
1935   globalGranStats.threads_created_on_PE[CurrentProc]++;
1936   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1937   globalGranStats.tot_sq_probes++;
1938 #elif defined(PAR)
1939   // collect parallel global statistics (currently done together with GC stats)
1940   if (RtsFlags.ParFlags.ParStats.Global &&
1941       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1942     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1943     globalParStats.tot_threads_created++;
1944   }
1945 #endif 
1946
1947 #if defined(GRAN)
1948   IF_GRAN_DEBUG(pri,
1949                 belch("==__ schedule: Created TSO %d (%p);",
1950                       CurrentProc, tso, tso->id));
1951 #elif defined(PAR)
1952     IF_PAR_DEBUG(verbose,
1953                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1954                        tso->id, tso, advisory_thread_count));
1955 #else
1956   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1957                                  tso->id, tso->stack_size));
1958 #endif    
1959   return tso;
1960 }
1961
1962 #if defined(PAR)
1963 /* RFP:
1964    all parallel thread creation calls should fall through the following routine.
1965 */
1966 StgTSO *
1967 createSparkThread(rtsSpark spark) 
1968 { StgTSO *tso;
1969   ASSERT(spark != (rtsSpark)NULL);
1970   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1971   { threadsIgnored++;
1972     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1973           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1974     return END_TSO_QUEUE;
1975   }
1976   else
1977   { threadsCreated++;
1978     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1979     if (tso==END_TSO_QUEUE)     
1980       barf("createSparkThread: Cannot create TSO");
1981 #if defined(DIST)
1982     tso->priority = AdvisoryPriority;
1983 #endif
1984     pushClosure(tso,spark);
1985     PUSH_ON_RUN_QUEUE(tso);
1986     advisory_thread_count++;    
1987   }
1988   return tso;
1989 }
1990 #endif
1991
1992 /*
1993   Turn a spark into a thread.
1994   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1995 */
1996 #if defined(PAR)
1997 //@cindex activateSpark
1998 StgTSO *
1999 activateSpark (rtsSpark spark) 
2000 {
2001   StgTSO *tso;
2002
2003   tso = createSparkThread(spark);
2004   if (RtsFlags.ParFlags.ParStats.Full) {   
2005     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2006     IF_PAR_DEBUG(verbose,
2007                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
2008                        (StgClosure *)spark, info_type((StgClosure *)spark)));
2009   }
2010   // ToDo: fwd info on local/global spark to thread -- HWL
2011   // tso->gran.exported =  spark->exported;
2012   // tso->gran.locked =   !spark->global;
2013   // tso->gran.sparkname = spark->name;
2014
2015   return tso;
2016 }
2017 #endif
2018
2019 static SchedulerStatus waitThread_(/*out*/StgMainThread* m
2020 #if defined(THREADED_RTS)
2021                                    , rtsBool blockWaiting
2022 #endif
2023                                    );
2024
2025
2026 /* ---------------------------------------------------------------------------
2027  * scheduleThread()
2028  *
2029  * scheduleThread puts a thread on the head of the runnable queue.
2030  * This will usually be done immediately after a thread is created.
2031  * The caller of scheduleThread must create the thread using e.g.
2032  * createThread and push an appropriate closure
2033  * on this thread's stack before the scheduler is invoked.
2034  * ------------------------------------------------------------------------ */
2035
2036 static void scheduleThread_ (StgTSO* tso);
2037
2038 void
2039 scheduleThread_(StgTSO *tso)
2040 {
2041   // Precondition: sched_mutex must be held.
2042
2043   /* Put the new thread on the head of the runnable queue.  The caller
2044    * better push an appropriate closure on this thread's stack
2045    * beforehand.  In the SMP case, the thread may start running as
2046    * soon as we release the scheduler lock below.
2047    */
2048   PUSH_ON_RUN_QUEUE(tso);
2049   THREAD_RUNNABLE();
2050
2051 #if 0
2052   IF_DEBUG(scheduler,printTSO(tso));
2053 #endif
2054 }
2055
2056 void scheduleThread(StgTSO* tso)
2057 {
2058   ACQUIRE_LOCK(&sched_mutex);
2059   scheduleThread_(tso);
2060   RELEASE_LOCK(&sched_mutex);
2061 }
2062
2063 SchedulerStatus
2064 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
2065 {       // Precondition: sched_mutex must be held
2066   StgMainThread *m;
2067
2068   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2069   m->tso = tso;
2070   m->ret = ret;
2071   m->stat = NoStatus;
2072 #if defined(RTS_SUPPORTS_THREADS)
2073   initCondition(&m->wakeup);
2074 #endif
2075
2076   /* Put the thread on the main-threads list prior to scheduling the TSO.
2077      Failure to do so introduces a race condition in the MT case (as
2078      identified by Wolfgang Thaller), whereby the new task/OS thread 
2079      created by scheduleThread_() would complete prior to the thread
2080      that spawned it managed to put 'itself' on the main-threads list.
2081      The upshot of it all being that the worker thread wouldn't get to
2082      signal the completion of the its work item for the main thread to
2083      see (==> it got stuck waiting.)    -- sof 6/02.
2084   */
2085   IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
2086   
2087   m->link = main_threads;
2088   main_threads = m;
2089
2090   scheduleThread_(tso);
2091 #if defined(THREADED_RTS)
2092   return waitThread_(m, rtsTrue);
2093 #else
2094   return waitThread_(m);
2095 #endif
2096 }
2097
2098 /* ---------------------------------------------------------------------------
2099  * initScheduler()
2100  *
2101  * Initialise the scheduler.  This resets all the queues - if the
2102  * queues contained any threads, they'll be garbage collected at the
2103  * next pass.
2104  *
2105  * ------------------------------------------------------------------------ */
2106
2107 #ifdef SMP
2108 static void
2109 term_handler(int sig STG_UNUSED)
2110 {
2111   stat_workerStop();
2112   ACQUIRE_LOCK(&term_mutex);
2113   await_death--;
2114   RELEASE_LOCK(&term_mutex);
2115   shutdownThread();
2116 }
2117 #endif
2118
2119 void 
2120 initScheduler(void)
2121 {
2122 #if defined(GRAN)
2123   nat i;
2124
2125   for (i=0; i<=MAX_PROC; i++) {
2126     run_queue_hds[i]      = END_TSO_QUEUE;
2127     run_queue_tls[i]      = END_TSO_QUEUE;
2128     blocked_queue_hds[i]  = END_TSO_QUEUE;
2129     blocked_queue_tls[i]  = END_TSO_QUEUE;
2130     ccalling_threadss[i]  = END_TSO_QUEUE;
2131     sleeping_queue        = END_TSO_QUEUE;
2132   }
2133 #else
2134   run_queue_hd      = END_TSO_QUEUE;
2135   run_queue_tl      = END_TSO_QUEUE;
2136   blocked_queue_hd  = END_TSO_QUEUE;
2137   blocked_queue_tl  = END_TSO_QUEUE;
2138   sleeping_queue    = END_TSO_QUEUE;
2139 #endif 
2140
2141   suspended_ccalling_threads  = END_TSO_QUEUE;
2142
2143   main_threads = NULL;
2144   all_threads  = END_TSO_QUEUE;
2145
2146   context_switch = 0;
2147   interrupted    = 0;
2148
2149   RtsFlags.ConcFlags.ctxtSwitchTicks =
2150       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2151       
2152 #if defined(RTS_SUPPORTS_THREADS)
2153   /* Initialise the mutex and condition variables used by
2154    * the scheduler. */
2155   initMutex(&sched_mutex);
2156   initMutex(&term_mutex);
2157   initMutex(&thread_id_mutex);
2158
2159   initCondition(&thread_ready_cond);
2160 #endif
2161   
2162 #if defined(SMP)
2163   initCondition(&gc_pending_cond);
2164 #endif
2165
2166 #if defined(RTS_SUPPORTS_THREADS)
2167   ACQUIRE_LOCK(&sched_mutex);
2168 #endif
2169
2170   /* Install the SIGHUP handler */
2171 #if defined(SMP)
2172   {
2173     struct sigaction action,oact;
2174
2175     action.sa_handler = term_handler;
2176     sigemptyset(&action.sa_mask);
2177     action.sa_flags = 0;
2178     if (sigaction(SIGTERM, &action, &oact) != 0) {
2179       barf("can't install TERM handler");
2180     }
2181   }
2182 #endif
2183
2184   /* A capability holds the state a native thread needs in
2185    * order to execute STG code. At least one capability is
2186    * floating around (only SMP builds have more than one).
2187    */
2188   initCapabilities();
2189   
2190 #if defined(RTS_SUPPORTS_THREADS)
2191     /* start our haskell execution tasks */
2192 # if defined(SMP)
2193     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2194 # else
2195     startTaskManager(0,taskStart);
2196 # endif
2197 #endif
2198
2199 #if /* defined(SMP) ||*/ defined(PAR)
2200   initSparkPools();
2201 #endif
2202
2203 #if defined(RTS_SUPPORTS_THREADS)
2204   RELEASE_LOCK(&sched_mutex);
2205 #endif
2206
2207 }
2208
2209 void
2210 exitScheduler( void )
2211 {
2212 #if defined(RTS_SUPPORTS_THREADS)
2213   stopTaskManager();
2214 #endif
2215   shutting_down_scheduler = rtsTrue;
2216 }
2217
2218 /* -----------------------------------------------------------------------------
2219    Managing the per-task allocation areas.
2220    
2221    Each capability comes with an allocation area.  These are
2222    fixed-length block lists into which allocation can be done.
2223
2224    ToDo: no support for two-space collection at the moment???
2225    -------------------------------------------------------------------------- */
2226
2227 /* -----------------------------------------------------------------------------
2228  * waitThread is the external interface for running a new computation
2229  * and waiting for the result.
2230  *
2231  * In the non-SMP case, we create a new main thread, push it on the 
2232  * main-thread stack, and invoke the scheduler to run it.  The
2233  * scheduler will return when the top main thread on the stack has
2234  * completed or died, and fill in the necessary fields of the
2235  * main_thread structure.
2236  *
2237  * In the SMP case, we create a main thread as before, but we then
2238  * create a new condition variable and sleep on it.  When our new
2239  * main thread has completed, we'll be woken up and the status/result
2240  * will be in the main_thread struct.
2241  * -------------------------------------------------------------------------- */
2242
2243 int 
2244 howManyThreadsAvail ( void )
2245 {
2246    int i = 0;
2247    StgTSO* q;
2248    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2249       i++;
2250    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2251       i++;
2252    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2253       i++;
2254    return i;
2255 }
2256
2257 void
2258 finishAllThreads ( void )
2259 {
2260    do {
2261       while (run_queue_hd != END_TSO_QUEUE) {
2262          waitThread ( run_queue_hd, NULL);
2263       }
2264       while (blocked_queue_hd != END_TSO_QUEUE) {
2265          waitThread ( blocked_queue_hd, NULL);
2266       }
2267       while (sleeping_queue != END_TSO_QUEUE) {
2268          waitThread ( blocked_queue_hd, NULL);
2269       }
2270    } while 
2271       (blocked_queue_hd != END_TSO_QUEUE || 
2272        run_queue_hd     != END_TSO_QUEUE ||
2273        sleeping_queue   != END_TSO_QUEUE);
2274 }
2275
2276 SchedulerStatus
2277 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2278
2279   StgMainThread *m;
2280   SchedulerStatus stat;
2281
2282   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2283   m->tso = tso;
2284   m->ret = ret;
2285   m->stat = NoStatus;
2286 #if defined(RTS_SUPPORTS_THREADS)
2287   initCondition(&m->wakeup);
2288 #endif
2289
2290   /* see scheduleWaitThread() comment */
2291   ACQUIRE_LOCK(&sched_mutex);
2292   m->link = main_threads;
2293   main_threads = m;
2294
2295   IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
2296 #if defined(THREADED_RTS)
2297   stat = waitThread_(m, rtsFalse);
2298 #else
2299   stat = waitThread_(m);
2300 #endif
2301   RELEASE_LOCK(&sched_mutex);
2302   return stat;
2303 }
2304
2305 static
2306 SchedulerStatus
2307 waitThread_(StgMainThread* m
2308 #if defined(THREADED_RTS)
2309             , rtsBool blockWaiting
2310 #endif
2311            )
2312 {
2313   SchedulerStatus stat;
2314
2315   // Precondition: sched_mutex must be held.
2316   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2317
2318 #if defined(RTS_SUPPORTS_THREADS)
2319
2320 # if defined(THREADED_RTS)
2321   if (!blockWaiting) {
2322     /* In the threaded case, the OS thread that called main()
2323      * gets to enter the RTS directly without going via another
2324      * task/thread.
2325      */
2326     main_main_thread = m;
2327     RELEASE_LOCK(&sched_mutex);
2328     schedule();
2329     ACQUIRE_LOCK(&sched_mutex);
2330     main_main_thread = NULL;
2331     ASSERT(m->stat != NoStatus);
2332   } else 
2333 # endif
2334   {
2335     do {
2336       waitCondition(&m->wakeup, &sched_mutex);
2337     } while (m->stat == NoStatus);
2338   }
2339 #elif defined(GRAN)
2340   /* GranSim specific init */
2341   CurrentTSO = m->tso;                // the TSO to run
2342   procStatus[MainProc] = Busy;        // status of main PE
2343   CurrentProc = MainProc;             // PE to run it on
2344
2345   RELEASE_LOCK(&sched_mutex);
2346   schedule();
2347 #else
2348   RELEASE_LOCK(&sched_mutex);
2349   schedule();
2350   ASSERT(m->stat != NoStatus);
2351 #endif
2352
2353   stat = m->stat;
2354
2355 #if defined(RTS_SUPPORTS_THREADS)
2356   closeCondition(&m->wakeup);
2357 #endif
2358
2359   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2360                               m->tso->id));
2361   stgFree(m);
2362
2363   // Postcondition: sched_mutex still held
2364   return stat;
2365 }
2366
2367 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2368 //@subsection Run queue code 
2369
2370 #if 0
2371 /* 
2372    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2373        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2374        implicit global variable that has to be correct when calling these
2375        fcts -- HWL 
2376 */
2377
2378 /* Put the new thread on the head of the runnable queue.
2379  * The caller of createThread better push an appropriate closure
2380  * on this thread's stack before the scheduler is invoked.
2381  */
2382 static /* inline */ void
2383 add_to_run_queue(tso)
2384 StgTSO* tso; 
2385 {
2386   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2387   tso->link = run_queue_hd;
2388   run_queue_hd = tso;
2389   if (run_queue_tl == END_TSO_QUEUE) {
2390     run_queue_tl = tso;
2391   }
2392 }
2393
2394 /* Put the new thread at the end of the runnable queue. */
2395 static /* inline */ void
2396 push_on_run_queue(tso)
2397 StgTSO* tso; 
2398 {
2399   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2400   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2401   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2402   if (run_queue_hd == END_TSO_QUEUE) {
2403     run_queue_hd = tso;
2404   } else {
2405     run_queue_tl->link = tso;
2406   }
2407   run_queue_tl = tso;
2408 }
2409
2410 /* 
2411    Should be inlined because it's used very often in schedule.  The tso
2412    argument is actually only needed in GranSim, where we want to have the
2413    possibility to schedule *any* TSO on the run queue, irrespective of the
2414    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2415    the run queue and dequeue the tso, adjusting the links in the queue. 
2416 */
2417 //@cindex take_off_run_queue
2418 static /* inline */ StgTSO*
2419 take_off_run_queue(StgTSO *tso) {
2420   StgTSO *t, *prev;
2421
2422   /* 
2423      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2424
2425      if tso is specified, unlink that tso from the run_queue (doesn't have
2426      to be at the beginning of the queue); GranSim only 
2427   */
2428   if (tso!=END_TSO_QUEUE) {
2429     /* find tso in queue */
2430     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2431          t!=END_TSO_QUEUE && t!=tso;
2432          prev=t, t=t->link) 
2433       /* nothing */ ;
2434     ASSERT(t==tso);
2435     /* now actually dequeue the tso */
2436     if (prev!=END_TSO_QUEUE) {
2437       ASSERT(run_queue_hd!=t);
2438       prev->link = t->link;
2439     } else {
2440       /* t is at beginning of thread queue */
2441       ASSERT(run_queue_hd==t);
2442       run_queue_hd = t->link;
2443     }
2444     /* t is at end of thread queue */
2445     if (t->link==END_TSO_QUEUE) {
2446       ASSERT(t==run_queue_tl);
2447       run_queue_tl = prev;
2448     } else {
2449       ASSERT(run_queue_tl!=t);
2450     }
2451     t->link = END_TSO_QUEUE;
2452   } else {
2453     /* take tso from the beginning of the queue; std concurrent code */
2454     t = run_queue_hd;
2455     if (t != END_TSO_QUEUE) {
2456       run_queue_hd = t->link;
2457       t->link = END_TSO_QUEUE;
2458       if (run_queue_hd == END_TSO_QUEUE) {
2459         run_queue_tl = END_TSO_QUEUE;
2460       }
2461     }
2462   }
2463   return t;
2464 }
2465
2466 #endif /* 0 */
2467
2468 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2469 //@subsection Garbage Collextion Routines
2470
2471 /* ---------------------------------------------------------------------------
2472    Where are the roots that we know about?
2473
2474         - all the threads on the runnable queue
2475         - all the threads on the blocked queue
2476         - all the threads on the sleeping queue
2477         - all the thread currently executing a _ccall_GC
2478         - all the "main threads"
2479      
2480    ------------------------------------------------------------------------ */
2481
2482 /* This has to be protected either by the scheduler monitor, or by the
2483         garbage collection monitor (probably the latter).
2484         KH @ 25/10/99
2485 */
2486
2487 void
2488 GetRoots(evac_fn evac)
2489 {
2490 #if defined(GRAN)
2491   {
2492     nat i;
2493     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2494       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2495           evac((StgClosure **)&run_queue_hds[i]);
2496       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2497           evac((StgClosure **)&run_queue_tls[i]);
2498       
2499       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2500           evac((StgClosure **)&blocked_queue_hds[i]);
2501       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2502           evac((StgClosure **)&blocked_queue_tls[i]);
2503       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2504           evac((StgClosure **)&ccalling_threads[i]);
2505     }
2506   }
2507
2508   markEventQueue();
2509
2510 #else /* !GRAN */
2511   if (run_queue_hd != END_TSO_QUEUE) {
2512       ASSERT(run_queue_tl != END_TSO_QUEUE);
2513       evac((StgClosure **)&run_queue_hd);
2514       evac((StgClosure **)&run_queue_tl);
2515   }
2516   
2517   if (blocked_queue_hd != END_TSO_QUEUE) {
2518       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2519       evac((StgClosure **)&blocked_queue_hd);
2520       evac((StgClosure **)&blocked_queue_tl);
2521   }
2522   
2523   if (sleeping_queue != END_TSO_QUEUE) {
2524       evac((StgClosure **)&sleeping_queue);
2525   }
2526 #endif 
2527
2528   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2529       evac((StgClosure **)&suspended_ccalling_threads);
2530   }
2531
2532 #if defined(PAR) || defined(GRAN)
2533   markSparkQueue(evac);
2534 #endif
2535
2536 #if defined(RTS_USER_SIGNALS)
2537   // mark the signal handlers (signals should be already blocked)
2538   markSignalHandlers(evac);
2539 #endif
2540
2541   // main threads which have completed need to be retained until they
2542   // are dealt with in the main scheduler loop.  They won't be
2543   // retained any other way: the GC will drop them from the
2544   // all_threads list, so we have to be careful to treat them as roots
2545   // here.
2546   { 
2547       StgMainThread *m;
2548       for (m = main_threads; m != NULL; m = m->link) {
2549           switch (m->tso->what_next) {
2550           case ThreadComplete:
2551           case ThreadKilled:
2552               evac((StgClosure **)&m->tso);
2553               break;
2554           default:
2555               break;
2556           }
2557       }
2558   }
2559 }
2560
2561 /* -----------------------------------------------------------------------------
2562    performGC
2563
2564    This is the interface to the garbage collector from Haskell land.
2565    We provide this so that external C code can allocate and garbage
2566    collect when called from Haskell via _ccall_GC.
2567
2568    It might be useful to provide an interface whereby the programmer
2569    can specify more roots (ToDo).
2570    
2571    This needs to be protected by the GC condition variable above.  KH.
2572    -------------------------------------------------------------------------- */
2573
2574 static void (*extra_roots)(evac_fn);
2575
2576 void
2577 performGC(void)
2578 {
2579   /* Obligated to hold this lock upon entry */
2580   ACQUIRE_LOCK(&sched_mutex);
2581   GarbageCollect(GetRoots,rtsFalse);
2582   RELEASE_LOCK(&sched_mutex);
2583 }
2584
2585 void
2586 performMajorGC(void)
2587 {
2588   ACQUIRE_LOCK(&sched_mutex);
2589   GarbageCollect(GetRoots,rtsTrue);
2590   RELEASE_LOCK(&sched_mutex);
2591 }
2592
2593 static void
2594 AllRoots(evac_fn evac)
2595 {
2596     GetRoots(evac);             // the scheduler's roots
2597     extra_roots(evac);          // the user's roots
2598 }
2599
2600 void
2601 performGCWithRoots(void (*get_roots)(evac_fn))
2602 {
2603   ACQUIRE_LOCK(&sched_mutex);
2604   extra_roots = get_roots;
2605   GarbageCollect(AllRoots,rtsFalse);
2606   RELEASE_LOCK(&sched_mutex);
2607 }
2608
2609 /* -----------------------------------------------------------------------------
2610    Stack overflow
2611
2612    If the thread has reached its maximum stack size, then raise the
2613    StackOverflow exception in the offending thread.  Otherwise
2614    relocate the TSO into a larger chunk of memory and adjust its stack
2615    size appropriately.
2616    -------------------------------------------------------------------------- */
2617
2618 static StgTSO *
2619 threadStackOverflow(StgTSO *tso)
2620 {
2621   nat new_stack_size, new_tso_size, stack_words;
2622   StgPtr new_sp;
2623   StgTSO *dest;
2624
2625   IF_DEBUG(sanity,checkTSO(tso));
2626   if (tso->stack_size >= tso->max_stack_size) {
2627
2628     IF_DEBUG(gc,
2629              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2630                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2631              /* If we're debugging, just print out the top of the stack */
2632              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2633                                               tso->sp+64)));
2634
2635     /* Send this thread the StackOverflow exception */
2636     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2637     return tso;
2638   }
2639
2640   /* Try to double the current stack size.  If that takes us over the
2641    * maximum stack size for this thread, then use the maximum instead.
2642    * Finally round up so the TSO ends up as a whole number of blocks.
2643    */
2644   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2645   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2646                                        TSO_STRUCT_SIZE)/sizeof(W_);
2647   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2648   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2649
2650   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2651
2652   dest = (StgTSO *)allocate(new_tso_size);
2653   TICK_ALLOC_TSO(new_stack_size,0);
2654
2655   /* copy the TSO block and the old stack into the new area */
2656   memcpy(dest,tso,TSO_STRUCT_SIZE);
2657   stack_words = tso->stack + tso->stack_size - tso->sp;
2658   new_sp = (P_)dest + new_tso_size - stack_words;
2659   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2660
2661   /* relocate the stack pointers... */
2662   dest->sp         = new_sp;
2663   dest->stack_size = new_stack_size;
2664         
2665   /* Mark the old TSO as relocated.  We have to check for relocated
2666    * TSOs in the garbage collector and any primops that deal with TSOs.
2667    *
2668    * It's important to set the sp value to just beyond the end
2669    * of the stack, so we don't attempt to scavenge any part of the
2670    * dead TSO's stack.
2671    */
2672   tso->what_next = ThreadRelocated;
2673   tso->link = dest;
2674   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2675   tso->why_blocked = NotBlocked;
2676   dest->mut_link = NULL;
2677
2678   IF_PAR_DEBUG(verbose,
2679                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2680                      tso->id, tso, tso->stack_size);
2681                /* If we're debugging, just print out the top of the stack */
2682                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2683                                                 tso->sp+64)));
2684   
2685   IF_DEBUG(sanity,checkTSO(tso));
2686 #if 0
2687   IF_DEBUG(scheduler,printTSO(dest));
2688 #endif
2689
2690   return dest;
2691 }
2692
2693 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2694 //@subsection Blocking Queue Routines
2695
2696 /* ---------------------------------------------------------------------------
2697    Wake up a queue that was blocked on some resource.
2698    ------------------------------------------------------------------------ */
2699
2700 #if defined(GRAN)
2701 static inline void
2702 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2703 {
2704 }
2705 #elif defined(PAR)
2706 static inline void
2707 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2708 {
2709   /* write RESUME events to log file and
2710      update blocked and fetch time (depending on type of the orig closure) */
2711   if (RtsFlags.ParFlags.ParStats.Full) {
2712     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2713                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2714                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2715     if (EMPTY_RUN_QUEUE())
2716       emitSchedule = rtsTrue;
2717
2718     switch (get_itbl(node)->type) {
2719         case FETCH_ME_BQ:
2720           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2721           break;
2722         case RBH:
2723         case FETCH_ME:
2724         case BLACKHOLE_BQ:
2725           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2726           break;
2727 #ifdef DIST
2728         case MVAR:
2729           break;
2730 #endif    
2731         default:
2732           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2733         }
2734       }
2735 }
2736 #endif
2737
2738 #if defined(GRAN)
2739 static StgBlockingQueueElement *
2740 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2741 {
2742     StgTSO *tso;
2743     PEs node_loc, tso_loc;
2744
2745     node_loc = where_is(node); // should be lifted out of loop
2746     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2747     tso_loc = where_is((StgClosure *)tso);
2748     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2749       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2750       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2751       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2752       // insertThread(tso, node_loc);
2753       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2754                 ResumeThread,
2755                 tso, node, (rtsSpark*)NULL);
2756       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2757       // len_local++;
2758       // len++;
2759     } else { // TSO is remote (actually should be FMBQ)
2760       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2761                                   RtsFlags.GranFlags.Costs.gunblocktime +
2762                                   RtsFlags.GranFlags.Costs.latency;
2763       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2764                 UnblockThread,
2765                 tso, node, (rtsSpark*)NULL);
2766       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2767       // len++;
2768     }
2769     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2770     IF_GRAN_DEBUG(bq,
2771                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2772                           (node_loc==tso_loc ? "Local" : "Global"), 
2773                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2774     tso->block_info.closure = NULL;
2775     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2776                              tso->id, tso));
2777 }
2778 #elif defined(PAR)
2779 static StgBlockingQueueElement *
2780 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2781 {
2782     StgBlockingQueueElement *next;
2783
2784     switch (get_itbl(bqe)->type) {
2785     case TSO:
2786       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2787       /* if it's a TSO just push it onto the run_queue */
2788       next = bqe->link;
2789       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2790       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2791       THREAD_RUNNABLE();
2792       unblockCount(bqe, node);
2793       /* reset blocking status after dumping event */
2794       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2795       break;
2796
2797     case BLOCKED_FETCH:
2798       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2799       next = bqe->link;
2800       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2801       PendingFetches = (StgBlockedFetch *)bqe;
2802       break;
2803
2804 # if defined(DEBUG)
2805       /* can ignore this case in a non-debugging setup; 
2806          see comments on RBHSave closures above */
2807     case CONSTR:
2808       /* check that the closure is an RBHSave closure */
2809       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2810              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2811              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2812       break;
2813
2814     default:
2815       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2816            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2817            (StgClosure *)bqe);
2818 # endif
2819     }
2820   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2821   return next;
2822 }
2823
2824 #else /* !GRAN && !PAR */
2825 static StgTSO *
2826 unblockOneLocked(StgTSO *tso)
2827 {
2828   StgTSO *next;
2829
2830   ASSERT(get_itbl(tso)->type == TSO);
2831   ASSERT(tso->why_blocked != NotBlocked);
2832   tso->why_blocked = NotBlocked;
2833   next = tso->link;
2834   PUSH_ON_RUN_QUEUE(tso);
2835   THREAD_RUNNABLE();
2836   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2837   return next;
2838 }
2839 #endif
2840
2841 #if defined(GRAN) || defined(PAR)
2842 inline StgBlockingQueueElement *
2843 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2844 {
2845   ACQUIRE_LOCK(&sched_mutex);
2846   bqe = unblockOneLocked(bqe, node);
2847   RELEASE_LOCK(&sched_mutex);
2848   return bqe;
2849 }
2850 #else
2851 inline StgTSO *
2852 unblockOne(StgTSO *tso)
2853 {
2854   ACQUIRE_LOCK(&sched_mutex);
2855   tso = unblockOneLocked(tso);
2856   RELEASE_LOCK(&sched_mutex);
2857   return tso;
2858 }
2859 #endif
2860
2861 #if defined(GRAN)
2862 void 
2863 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2864 {
2865   StgBlockingQueueElement *bqe;
2866   PEs node_loc;
2867   nat len = 0; 
2868
2869   IF_GRAN_DEBUG(bq, 
2870                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2871                       node, CurrentProc, CurrentTime[CurrentProc], 
2872                       CurrentTSO->id, CurrentTSO));
2873
2874   node_loc = where_is(node);
2875
2876   ASSERT(q == END_BQ_QUEUE ||
2877          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2878          get_itbl(q)->type == CONSTR); // closure (type constructor)
2879   ASSERT(is_unique(node));
2880
2881   /* FAKE FETCH: magically copy the node to the tso's proc;
2882      no Fetch necessary because in reality the node should not have been 
2883      moved to the other PE in the first place
2884   */
2885   if (CurrentProc!=node_loc) {
2886     IF_GRAN_DEBUG(bq, 
2887                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2888                         node, node_loc, CurrentProc, CurrentTSO->id, 
2889                         // CurrentTSO, where_is(CurrentTSO),
2890                         node->header.gran.procs));
2891     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2892     IF_GRAN_DEBUG(bq, 
2893                   belch("## new bitmask of node %p is %#x",
2894                         node, node->header.gran.procs));
2895     if (RtsFlags.GranFlags.GranSimStats.Global) {
2896       globalGranStats.tot_fake_fetches++;
2897     }
2898   }
2899
2900   bqe = q;
2901   // ToDo: check: ASSERT(CurrentProc==node_loc);
2902   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2903     //next = bqe->link;
2904     /* 
2905        bqe points to the current element in the queue
2906        next points to the next element in the queue
2907     */
2908     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2909     //tso_loc = where_is(tso);
2910     len++;
2911     bqe = unblockOneLocked(bqe, node);
2912   }
2913
2914   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2915      the closure to make room for the anchor of the BQ */
2916   if (bqe!=END_BQ_QUEUE) {
2917     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2918     /*
2919     ASSERT((info_ptr==&RBH_Save_0_info) ||
2920            (info_ptr==&RBH_Save_1_info) ||
2921            (info_ptr==&RBH_Save_2_info));
2922     */
2923     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2924     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2925     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2926
2927     IF_GRAN_DEBUG(bq,
2928                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2929                         node, info_type(node)));
2930   }
2931
2932   /* statistics gathering */
2933   if (RtsFlags.GranFlags.GranSimStats.Global) {
2934     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2935     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2936     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2937     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2938   }
2939   IF_GRAN_DEBUG(bq,
2940                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2941                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2942 }
2943 #elif defined(PAR)
2944 void 
2945 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2946 {
2947   StgBlockingQueueElement *bqe;
2948
2949   ACQUIRE_LOCK(&sched_mutex);
2950
2951   IF_PAR_DEBUG(verbose, 
2952                belch("##-_ AwBQ for node %p on [%x]: ",
2953                      node, mytid));
2954 #ifdef DIST  
2955   //RFP
2956   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2957     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2958     return;
2959   }
2960 #endif
2961   
2962   ASSERT(q == END_BQ_QUEUE ||
2963          get_itbl(q)->type == TSO ||           
2964          get_itbl(q)->type == BLOCKED_FETCH || 
2965          get_itbl(q)->type == CONSTR); 
2966
2967   bqe = q;
2968   while (get_itbl(bqe)->type==TSO || 
2969          get_itbl(bqe)->type==BLOCKED_FETCH) {
2970     bqe = unblockOneLocked(bqe, node);
2971   }
2972   RELEASE_LOCK(&sched_mutex);
2973 }
2974
2975 #else   /* !GRAN && !PAR */
2976
2977 #ifdef RTS_SUPPORTS_THREADS
2978 void
2979 awakenBlockedQueueNoLock(StgTSO *tso)
2980 {
2981   while (tso != END_TSO_QUEUE) {
2982     tso = unblockOneLocked(tso);
2983   }
2984 }
2985 #endif
2986
2987 void
2988 awakenBlockedQueue(StgTSO *tso)
2989 {
2990   ACQUIRE_LOCK(&sched_mutex);
2991   while (tso != END_TSO_QUEUE) {
2992     tso = unblockOneLocked(tso);
2993   }
2994   RELEASE_LOCK(&sched_mutex);
2995 }
2996 #endif
2997
2998 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2999 //@subsection Exception Handling Routines
3000
3001 /* ---------------------------------------------------------------------------
3002    Interrupt execution
3003    - usually called inside a signal handler so it mustn't do anything fancy.   
3004    ------------------------------------------------------------------------ */
3005
3006 void
3007 interruptStgRts(void)
3008 {
3009     interrupted    = 1;
3010     context_switch = 1;
3011 }
3012
3013 /* -----------------------------------------------------------------------------
3014    Unblock a thread
3015
3016    This is for use when we raise an exception in another thread, which
3017    may be blocked.
3018    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3019    -------------------------------------------------------------------------- */
3020
3021 #if defined(GRAN) || defined(PAR)
3022 /*
3023   NB: only the type of the blocking queue is different in GranSim and GUM
3024       the operations on the queue-elements are the same
3025       long live polymorphism!
3026
3027   Locks: sched_mutex is held upon entry and exit.
3028
3029 */
3030 static void
3031 unblockThread(StgTSO *tso)
3032 {
3033   StgBlockingQueueElement *t, **last;
3034
3035   switch (tso->why_blocked) {
3036
3037   case NotBlocked:
3038     return;  /* not blocked */
3039
3040   case BlockedOnMVar:
3041     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3042     {
3043       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3044       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3045
3046       last = (StgBlockingQueueElement **)&mvar->head;
3047       for (t = (StgBlockingQueueElement *)mvar->head; 
3048            t != END_BQ_QUEUE; 
3049            last = &t->link, last_tso = t, t = t->link) {
3050         if (t == (StgBlockingQueueElement *)tso) {
3051           *last = (StgBlockingQueueElement *)tso->link;
3052           if (mvar->tail == tso) {
3053             mvar->tail = (StgTSO *)last_tso;
3054           }
3055           goto done;
3056         }
3057       }
3058       barf("unblockThread (MVAR): TSO not found");
3059     }
3060
3061   case BlockedOnBlackHole:
3062     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3063     {
3064       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3065
3066       last = &bq->blocking_queue;
3067       for (t = bq->blocking_queue; 
3068            t != END_BQ_QUEUE; 
3069            last = &t->link, t = t->link) {
3070         if (t == (StgBlockingQueueElement *)tso) {
3071           *last = (StgBlockingQueueElement *)tso->link;
3072           goto done;
3073         }
3074       }
3075       barf("unblockThread (BLACKHOLE): TSO not found");
3076     }
3077
3078   case BlockedOnException:
3079     {
3080       StgTSO *target  = tso->block_info.tso;
3081
3082       ASSERT(get_itbl(target)->type == TSO);
3083
3084       if (target->what_next == ThreadRelocated) {
3085           target = target->link;
3086           ASSERT(get_itbl(target)->type == TSO);
3087       }
3088
3089       ASSERT(target->blocked_exceptions != NULL);
3090
3091       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3092       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3093            t != END_BQ_QUEUE; 
3094            last = &t->link, t = t->link) {
3095         ASSERT(get_itbl(t)->type == TSO);
3096         if (t == (StgBlockingQueueElement *)tso) {
3097           *last = (StgBlockingQueueElement *)tso->link;
3098           goto done;
3099         }
3100       }
3101       barf("unblockThread (Exception): TSO not found");
3102     }
3103
3104   case BlockedOnRead:
3105   case BlockedOnWrite:
3106     {
3107       /* take TSO off blocked_queue */
3108       StgBlockingQueueElement *prev = NULL;
3109       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3110            prev = t, t = t->link) {
3111         if (t == (StgBlockingQueueElement *)tso) {
3112           if (prev == NULL) {
3113             blocked_queue_hd = (StgTSO *)t->link;
3114             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3115               blocked_queue_tl = END_TSO_QUEUE;
3116             }
3117           } else {
3118             prev->link = t->link;
3119             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3120               blocked_queue_tl = (StgTSO *)prev;
3121             }
3122           }
3123           goto done;
3124         }
3125       }
3126       barf("unblockThread (I/O): TSO not found");
3127     }
3128
3129   case BlockedOnDelay:
3130     {
3131       /* take TSO off sleeping_queue */
3132       StgBlockingQueueElement *prev = NULL;
3133       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3134            prev = t, t = t->link) {
3135         if (t == (StgBlockingQueueElement *)tso) {
3136           if (prev == NULL) {
3137             sleeping_queue = (StgTSO *)t->link;
3138           } else {
3139             prev->link = t->link;
3140           }
3141           goto done;
3142         }
3143       }
3144       barf("unblockThread (I/O): TSO not found");
3145     }
3146
3147   default:
3148     barf("unblockThread");
3149   }
3150
3151  done:
3152   tso->link = END_TSO_QUEUE;
3153   tso->why_blocked = NotBlocked;
3154   tso->block_info.closure = NULL;
3155   PUSH_ON_RUN_QUEUE(tso);
3156 }
3157 #else
3158 static void
3159 unblockThread(StgTSO *tso)
3160 {
3161   StgTSO *t, **last;
3162   
3163   /* To avoid locking unnecessarily. */
3164   if (tso->why_blocked == NotBlocked) {
3165     return;
3166   }
3167
3168   switch (tso->why_blocked) {
3169
3170   case BlockedOnMVar:
3171     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3172     {
3173       StgTSO *last_tso = END_TSO_QUEUE;
3174       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3175
3176       last = &mvar->head;
3177       for (t = mvar->head; t != END_TSO_QUEUE; 
3178            last = &t->link, last_tso = t, t = t->link) {
3179         if (t == tso) {
3180           *last = tso->link;
3181           if (mvar->tail == tso) {
3182             mvar->tail = last_tso;
3183           }
3184           goto done;
3185         }
3186       }
3187       barf("unblockThread (MVAR): TSO not found");
3188     }
3189
3190   case BlockedOnBlackHole:
3191     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3192     {
3193       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3194
3195       last = &bq->blocking_queue;
3196       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3197            last = &t->link, t = t->link) {
3198         if (t == tso) {
3199           *last = tso->link;
3200           goto done;
3201         }
3202       }
3203       barf("unblockThread (BLACKHOLE): TSO not found");
3204     }
3205
3206   case BlockedOnException:
3207     {
3208       StgTSO *target  = tso->block_info.tso;
3209
3210       ASSERT(get_itbl(target)->type == TSO);
3211
3212       while (target->what_next == ThreadRelocated) {
3213           target = target->link;
3214           ASSERT(get_itbl(target)->type == TSO);
3215       }
3216       
3217       ASSERT(target->blocked_exceptions != NULL);
3218
3219       last = &target->blocked_exceptions;
3220       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3221            last = &t->link, t = t->link) {
3222         ASSERT(get_itbl(t)->type == TSO);
3223         if (t == tso) {
3224           *last = tso->link;
3225           goto done;
3226         }
3227       }
3228       barf("unblockThread (Exception): TSO not found");
3229     }
3230
3231   case BlockedOnRead:
3232   case BlockedOnWrite:
3233     {
3234       StgTSO *prev = NULL;
3235       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3236            prev = t, t = t->link) {
3237         if (t == tso) {
3238           if (prev == NULL) {
3239             blocked_queue_hd = t->link;
3240             if (blocked_queue_tl == t) {
3241               blocked_queue_tl = END_TSO_QUEUE;
3242             }
3243           } else {
3244             prev->link = t->link;
3245             if (blocked_queue_tl == t) {
3246               blocked_queue_tl = prev;
3247             }
3248           }
3249           goto done;
3250         }
3251       }
3252       barf("unblockThread (I/O): TSO not found");
3253     }
3254
3255   case BlockedOnDelay:
3256     {
3257       StgTSO *prev = NULL;
3258       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3259            prev = t, t = t->link) {
3260         if (t == tso) {
3261           if (prev == NULL) {
3262             sleeping_queue = t->link;
3263           } else {
3264             prev->link = t->link;
3265           }
3266           goto done;
3267         }
3268       }
3269       barf("unblockThread (I/O): TSO not found");
3270     }
3271
3272   default:
3273     barf("unblockThread");
3274   }
3275
3276  done:
3277   tso->link = END_TSO_QUEUE;
3278   tso->why_blocked = NotBlocked;
3279   tso->block_info.closure = NULL;
3280   PUSH_ON_RUN_QUEUE(tso);
3281 }
3282 #endif
3283
3284 /* -----------------------------------------------------------------------------
3285  * raiseAsync()
3286  *
3287  * The following function implements the magic for raising an
3288  * asynchronous exception in an existing thread.
3289  *
3290  * We first remove the thread from any queue on which it might be
3291  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3292  *
3293  * We strip the stack down to the innermost CATCH_FRAME, building
3294  * thunks in the heap for all the active computations, so they can 
3295  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3296  * an application of the handler to the exception, and push it on
3297  * the top of the stack.
3298  * 
3299  * How exactly do we save all the active computations?  We create an
3300  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3301  * AP_STACKs pushes everything from the corresponding update frame
3302  * upwards onto the stack.  (Actually, it pushes everything up to the
3303  * next update frame plus a pointer to the next AP_STACK object.
3304  * Entering the next AP_STACK object pushes more onto the stack until we
3305  * reach the last AP_STACK object - at which point the stack should look
3306  * exactly as it did when we killed the TSO and we can continue
3307  * execution by entering the closure on top of the stack.
3308  *
3309  * We can also kill a thread entirely - this happens if either (a) the 
3310  * exception passed to raiseAsync is NULL, or (b) there's no
3311  * CATCH_FRAME on the stack.  In either case, we strip the entire
3312  * stack and replace the thread with a zombie.
3313  *
3314  * Locks: sched_mutex held upon entry nor exit.
3315  *
3316  * -------------------------------------------------------------------------- */
3317  
3318 void 
3319 deleteThread(StgTSO *tso)
3320 {
3321   raiseAsync(tso,NULL);
3322 }
3323
3324 void
3325 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3326 {
3327   /* When raising async exs from contexts where sched_mutex isn't held;
3328      use raiseAsyncWithLock(). */
3329   ACQUIRE_LOCK(&sched_mutex);
3330   raiseAsync(tso,exception);
3331   RELEASE_LOCK(&sched_mutex);
3332 }
3333
3334 void
3335 raiseAsync(StgTSO *tso, StgClosure *exception)
3336 {
3337     StgRetInfoTable *info;
3338     StgPtr sp;
3339   
3340     // Thread already dead?
3341     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3342         return;
3343     }
3344
3345     IF_DEBUG(scheduler, 
3346              sched_belch("raising exception in thread %ld.", tso->id));
3347     
3348     // Remove it from any blocking queues
3349     unblockThread(tso);
3350
3351     sp = tso->sp;
3352     
3353     // The stack freezing code assumes there's a closure pointer on
3354     // the top of the stack, so we have to arrange that this is the case...
3355     //
3356     if (sp[0] == (W_)&stg_enter_info) {
3357         sp++;
3358     } else {
3359         sp--;
3360         sp[0] = (W_)&stg_dummy_ret_closure;
3361     }
3362
3363     while (1) {
3364         nat i;
3365
3366         // 1. Let the top of the stack be the "current closure"
3367         //
3368         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3369         // CATCH_FRAME.
3370         //
3371         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3372         // current closure applied to the chunk of stack up to (but not
3373         // including) the update frame.  This closure becomes the "current
3374         // closure".  Go back to step 2.
3375         //
3376         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3377         // top of the stack applied to the exception.
3378         // 
3379         // 5. If it's a STOP_FRAME, then kill the thread.
3380         
3381         StgPtr frame;
3382         
3383         frame = sp + 1;
3384         info = get_ret_itbl((StgClosure *)frame);
3385         
3386         while (info->i.type != UPDATE_FRAME
3387                && (info->i.type != CATCH_FRAME || exception == NULL)
3388                && info->i.type != STOP_FRAME) {
3389             frame += stack_frame_sizeW((StgClosure *)frame);
3390             info = get_ret_itbl((StgClosure *)frame);
3391         }
3392         
3393         switch (info->i.type) {
3394             
3395         case CATCH_FRAME:
3396             // If we find a CATCH_FRAME, and we've got an exception to raise,
3397             // then build the THUNK raise(exception), and leave it on
3398             // top of the CATCH_FRAME ready to enter.
3399             //
3400         {
3401 #ifdef PROFILING
3402             StgCatchFrame *cf = (StgCatchFrame *)frame;
3403 #endif
3404             StgClosure *raise;
3405             
3406             // we've got an exception to raise, so let's pass it to the
3407             // handler in this frame.
3408             //
3409             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3410             TICK_ALLOC_SE_THK(1,0);
3411             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3412             raise->payload[0] = exception;
3413             
3414             // throw away the stack from Sp up to the CATCH_FRAME.
3415             //
3416             sp = frame - 1;
3417             
3418             /* Ensure that async excpetions are blocked now, so we don't get
3419              * a surprise exception before we get around to executing the
3420              * handler.
3421              */
3422             if (tso->blocked_exceptions == NULL) {
3423                 tso->blocked_exceptions = END_TSO_QUEUE;
3424             }
3425             
3426             /* Put the newly-built THUNK on top of the stack, ready to execute
3427              * when the thread restarts.
3428              */
3429             sp[0] = (W_)raise;
3430             sp[-1] = (W_)&stg_enter_info;
3431             tso->sp = sp-1;
3432             tso->what_next = ThreadRunGHC;
3433             IF_DEBUG(sanity, checkTSO(tso));
3434             return;
3435         }
3436         
3437         case UPDATE_FRAME:
3438         {
3439             StgAP_STACK * ap;
3440             nat words;
3441             
3442             // First build an AP_STACK consisting of the stack chunk above the
3443             // current update frame, with the top word on the stack as the
3444             // fun field.
3445             //
3446             words = frame - sp - 1;
3447             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3448             
3449             ap->size = words;
3450             ap->fun  = (StgClosure *)sp[0];
3451             sp++;
3452             for(i=0; i < (nat)words; ++i) {
3453                 ap->payload[i] = (StgClosure *)*sp++;
3454             }
3455             
3456             SET_HDR(ap,&stg_AP_STACK_info,
3457                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3458             TICK_ALLOC_UP_THK(words+1,0);
3459             
3460             IF_DEBUG(scheduler,
3461                      fprintf(stderr,  "scheduler: Updating ");
3462                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3463                      fprintf(stderr,  " with ");
3464                      printObj((StgClosure *)ap);
3465                 );
3466
3467             // Replace the updatee with an indirection - happily
3468             // this will also wake up any threads currently
3469             // waiting on the result.
3470             //
3471             // Warning: if we're in a loop, more than one update frame on
3472             // the stack may point to the same object.  Be careful not to
3473             // overwrite an IND_OLDGEN in this case, because we'll screw
3474             // up the mutable lists.  To be on the safe side, don't
3475             // overwrite any kind of indirection at all.  See also
3476             // threadSqueezeStack in GC.c, where we have to make a similar
3477             // check.
3478             //
3479             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3480                 // revert the black hole
3481                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3482             }
3483             sp += sizeofW(StgUpdateFrame) - 1;
3484             sp[0] = (W_)ap; // push onto stack
3485             break;
3486         }
3487         
3488         case STOP_FRAME:
3489             // We've stripped the entire stack, the thread is now dead.
3490             sp += sizeofW(StgStopFrame);
3491             tso->what_next = ThreadKilled;
3492             tso->sp = sp;
3493             return;
3494             
3495         default:
3496             barf("raiseAsync");
3497         }
3498     }
3499     barf("raiseAsync");
3500 }
3501
3502 /* -----------------------------------------------------------------------------
3503    resurrectThreads is called after garbage collection on the list of
3504    threads found to be garbage.  Each of these threads will be woken
3505    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3506    on an MVar, or NonTermination if the thread was blocked on a Black
3507    Hole.
3508
3509    Locks: sched_mutex isn't held upon entry nor exit.
3510    -------------------------------------------------------------------------- */
3511
3512 void
3513 resurrectThreads( StgTSO *threads )
3514 {
3515   StgTSO *tso, *next;
3516
3517   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3518     next = tso->global_link;
3519     tso->global_link = all_threads;
3520     all_threads = tso;
3521     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3522
3523     switch (tso->why_blocked) {
3524     case BlockedOnMVar:
3525     case BlockedOnException:
3526       /* Called by GC - sched_mutex lock is currently held. */
3527       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3528       break;
3529     case BlockedOnBlackHole:
3530       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3531       break;
3532     case NotBlocked:
3533       /* This might happen if the thread was blocked on a black hole
3534        * belonging to a thread that we've just woken up (raiseAsync
3535        * can wake up threads, remember...).
3536        */
3537       continue;
3538     default:
3539       barf("resurrectThreads: thread blocked in a strange way");
3540     }
3541   }
3542 }
3543
3544 /* -----------------------------------------------------------------------------
3545  * Blackhole detection: if we reach a deadlock, test whether any
3546  * threads are blocked on themselves.  Any threads which are found to
3547  * be self-blocked get sent a NonTermination exception.
3548  *
3549  * This is only done in a deadlock situation in order to avoid
3550  * performance overhead in the normal case.
3551  *
3552  * Locks: sched_mutex is held upon entry and exit.
3553  * -------------------------------------------------------------------------- */
3554
3555 static void
3556 detectBlackHoles( void )
3557 {
3558     StgTSO *tso = all_threads;
3559     StgClosure *frame;
3560     StgClosure *blocked_on;
3561     StgRetInfoTable *info;
3562
3563     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3564
3565         while (tso->what_next == ThreadRelocated) {
3566             tso = tso->link;
3567             ASSERT(get_itbl(tso)->type == TSO);
3568         }
3569       
3570         if (tso->why_blocked != BlockedOnBlackHole) {
3571             continue;
3572         }
3573         blocked_on = tso->block_info.closure;
3574
3575         frame = (StgClosure *)tso->sp;
3576
3577         while(1) {
3578             info = get_ret_itbl(frame);
3579             switch (info->i.type) {
3580             case UPDATE_FRAME:
3581                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3582                     /* We are blocking on one of our own computations, so
3583                      * send this thread the NonTermination exception.  
3584                      */
3585                     IF_DEBUG(scheduler, 
3586                              sched_belch("thread %d is blocked on itself", tso->id));
3587                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3588                     goto done;
3589                 }
3590                 
3591                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3592                 continue;
3593
3594             case STOP_FRAME:
3595                 goto done;
3596
3597                 // normal stack frames; do nothing except advance the pointer
3598             default:
3599                 (StgPtr)frame += stack_frame_sizeW(frame);
3600             }
3601         }   
3602         done: ;
3603     }
3604 }
3605
3606 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3607 //@subsection Debugging Routines
3608
3609 /* -----------------------------------------------------------------------------
3610  * Debugging: why is a thread blocked
3611  * [Also provides useful information when debugging threaded programs
3612  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3613    -------------------------------------------------------------------------- */
3614
3615 static
3616 void
3617 printThreadBlockage(StgTSO *tso)
3618 {
3619   switch (tso->why_blocked) {
3620   case BlockedOnRead:
3621     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3622     break;
3623   case BlockedOnWrite:
3624     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3625     break;
3626   case BlockedOnDelay:
3627     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3628     break;
3629   case BlockedOnMVar:
3630     fprintf(stderr,"is blocked on an MVar");
3631     break;
3632   case BlockedOnException:
3633     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3634             tso->block_info.tso->id);
3635     break;
3636   case BlockedOnBlackHole:
3637     fprintf(stderr,"is blocked on a black hole");
3638     break;
3639   case NotBlocked:
3640     fprintf(stderr,"is not blocked");
3641     break;
3642 #if defined(PAR)
3643   case BlockedOnGA:
3644     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3645             tso->block_info.closure, info_type(tso->block_info.closure));
3646     break;
3647   case BlockedOnGA_NoSend:
3648     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3649             tso->block_info.closure, info_type(tso->block_info.closure));
3650     break;
3651 #endif
3652 #if defined(RTS_SUPPORTS_THREADS)
3653   case BlockedOnCCall:
3654     fprintf(stderr,"is blocked on an external call");
3655     break;
3656   case BlockedOnCCall_NoUnblockExc:
3657     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3658     break;
3659 #endif
3660   default:
3661     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3662          tso->why_blocked, tso->id, tso);
3663   }
3664 }
3665
3666 static
3667 void
3668 printThreadStatus(StgTSO *tso)
3669 {
3670   switch (tso->what_next) {
3671   case ThreadKilled:
3672     fprintf(stderr,"has been killed");
3673     break;
3674   case ThreadComplete:
3675     fprintf(stderr,"has completed");
3676     break;
3677   default:
3678     printThreadBlockage(tso);
3679   }
3680 }
3681
3682 void
3683 printAllThreads(void)
3684 {
3685   StgTSO *t;
3686   void *label;
3687
3688 # if defined(GRAN)
3689   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3690   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3691                        time_string, rtsFalse/*no commas!*/);
3692
3693   fprintf(stderr, "all threads at [%s]:\n", time_string);
3694 # elif defined(PAR)
3695   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3696   ullong_format_string(CURRENT_TIME,
3697                        time_string, rtsFalse/*no commas!*/);
3698
3699   fprintf(stderr,"all threads at [%s]:\n", time_string);
3700 # else
3701   fprintf(stderr,"all threads:\n");
3702 # endif
3703
3704   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3705     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3706     label = lookupThreadLabel((StgWord)t);
3707     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3708     printThreadStatus(t);
3709     fprintf(stderr,"\n");
3710   }
3711 }
3712     
3713 #ifdef DEBUG
3714
3715 /* 
3716    Print a whole blocking queue attached to node (debugging only).
3717 */
3718 //@cindex print_bq
3719 # if defined(PAR)
3720 void 
3721 print_bq (StgClosure *node)
3722 {
3723   StgBlockingQueueElement *bqe;
3724   StgTSO *tso;
3725   rtsBool end;
3726
3727   fprintf(stderr,"## BQ of closure %p (%s): ",
3728           node, info_type(node));
3729
3730   /* should cover all closures that may have a blocking queue */
3731   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3732          get_itbl(node)->type == FETCH_ME_BQ ||
3733          get_itbl(node)->type == RBH ||
3734          get_itbl(node)->type == MVAR);
3735     
3736   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3737
3738   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3739 }
3740
3741 /* 
3742    Print a whole blocking queue starting with the element bqe.
3743 */
3744 void 
3745 print_bqe (StgBlockingQueueElement *bqe)
3746 {
3747   rtsBool end;
3748
3749   /* 
3750      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3751   */
3752   for (end = (bqe==END_BQ_QUEUE);
3753        !end; // iterate until bqe points to a CONSTR
3754        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3755        bqe = end ? END_BQ_QUEUE : bqe->link) {
3756     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3757     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3758     /* types of closures that may appear in a blocking queue */
3759     ASSERT(get_itbl(bqe)->type == TSO ||           
3760            get_itbl(bqe)->type == BLOCKED_FETCH || 
3761            get_itbl(bqe)->type == CONSTR); 
3762     /* only BQs of an RBH end with an RBH_Save closure */
3763     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3764
3765     switch (get_itbl(bqe)->type) {
3766     case TSO:
3767       fprintf(stderr," TSO %u (%x),",
3768               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3769       break;
3770     case BLOCKED_FETCH:
3771       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3772               ((StgBlockedFetch *)bqe)->node, 
3773               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3774               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3775               ((StgBlockedFetch *)bqe)->ga.weight);
3776       break;
3777     case CONSTR:
3778       fprintf(stderr," %s (IP %p),",
3779               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3780                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3781                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3782                "RBH_Save_?"), get_itbl(bqe));
3783       break;
3784     default:
3785       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3786            info_type((StgClosure *)bqe)); // , node, info_type(node));
3787       break;
3788     }
3789   } /* for */
3790   fputc('\n', stderr);
3791 }
3792 # elif defined(GRAN)
3793 void 
3794 print_bq (StgClosure *node)
3795 {
3796   StgBlockingQueueElement *bqe;
3797   PEs node_loc, tso_loc;
3798   rtsBool end;
3799
3800   /* should cover all closures that may have a blocking queue */
3801   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3802          get_itbl(node)->type == FETCH_ME_BQ ||
3803          get_itbl(node)->type == RBH);
3804     
3805   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3806   node_loc = where_is(node);
3807
3808   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3809           node, info_type(node), node_loc);
3810
3811   /* 
3812      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3813   */
3814   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3815        !end; // iterate until bqe points to a CONSTR
3816        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3817     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3818     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3819     /* types of closures that may appear in a blocking queue */
3820     ASSERT(get_itbl(bqe)->type == TSO ||           
3821            get_itbl(bqe)->type == CONSTR); 
3822     /* only BQs of an RBH end with an RBH_Save closure */
3823     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3824
3825     tso_loc = where_is((StgClosure *)bqe);
3826     switch (get_itbl(bqe)->type) {
3827     case TSO:
3828       fprintf(stderr," TSO %d (%p) on [PE %d],",
3829               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3830       break;
3831     case CONSTR:
3832       fprintf(stderr," %s (IP %p),",
3833               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3834                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3835                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3836                "RBH_Save_?"), get_itbl(bqe));
3837       break;
3838     default:
3839       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3840            info_type((StgClosure *)bqe), node, info_type(node));
3841       break;
3842     }
3843   } /* for */
3844   fputc('\n', stderr);
3845 }
3846 #else
3847 /* 
3848    Nice and easy: only TSOs on the blocking queue
3849 */
3850 void 
3851 print_bq (StgClosure *node)
3852 {
3853   StgTSO *tso;
3854
3855   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3856   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3857        tso != END_TSO_QUEUE; 
3858        tso=tso->link) {
3859     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3860     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3861     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3862   }
3863   fputc('\n', stderr);
3864 }
3865 # endif
3866
3867 #if defined(PAR)
3868 static nat
3869 run_queue_len(void)
3870 {
3871   nat i;
3872   StgTSO *tso;
3873
3874   for (i=0, tso=run_queue_hd; 
3875        tso != END_TSO_QUEUE;
3876        i++, tso=tso->link)
3877     /* nothing */
3878
3879   return i;
3880 }
3881 #endif
3882
3883 static void
3884 sched_belch(char *s, ...)
3885 {
3886   va_list ap;
3887   va_start(ap,s);
3888 #ifdef SMP
3889   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3890 #elif defined(PAR)
3891   fprintf(stderr, "== ");
3892 #else
3893   fprintf(stderr, "scheduler: ");
3894 #endif
3895   vfprintf(stderr, s, ap);
3896   fprintf(stderr, "\n");
3897   va_end(ap);
3898 }
3899
3900 #endif /* DEBUG */
3901
3902
3903 //@node Index,  , Debugging Routines, Main scheduling code
3904 //@subsection Index
3905
3906 //@index
3907 //* StgMainThread::  @cindex\s-+StgMainThread
3908 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3909 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3910 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3911 //* context_switch::  @cindex\s-+context_switch
3912 //* createThread::  @cindex\s-+createThread
3913 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3914 //* initScheduler::  @cindex\s-+initScheduler
3915 //* interrupted::  @cindex\s-+interrupted
3916 //* next_thread_id::  @cindex\s-+next_thread_id
3917 //* print_bq::  @cindex\s-+print_bq
3918 //* run_queue_hd::  @cindex\s-+run_queue_hd
3919 //* run_queue_tl::  @cindex\s-+run_queue_tl
3920 //* sched_mutex::  @cindex\s-+sched_mutex
3921 //* schedule::  @cindex\s-+schedule
3922 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3923 //* term_mutex::  @cindex\s-+term_mutex
3924 //@end index