2d99ad18bec40e644957dcfd40766b8b1bd7a0d0
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.169 2003/05/14 09:11:49 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #define COMPILING_SCHEDULER
88 #include "Schedule.h"
89 #include "StgMiscClosures.h"
90 #include "Storage.h"
91 #include "Interpreter.h"
92 #include "Exception.h"
93 #include "Printer.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Timer.h"
98 #include "Prelude.h"
99 #include "ThreadLabels.h"
100 #ifdef PROFILING
101 #include "Proftimer.h"
102 #include "ProfHeap.h"
103 #endif
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
111 # include "HLC.h"
112 #endif
113 #include "Sparks.h"
114 #include "Capability.h"
115 #include "OSThreads.h"
116 #include  "Task.h"
117
118 #ifdef HAVE_SYS_TYPES_H
119 #include <sys/types.h>
120 #endif
121 #ifdef HAVE_UNISTD_H
122 #include <unistd.h>
123 #endif
124
125 #include <string.h>
126 #include <stdlib.h>
127 #include <stdarg.h>
128
129 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
130 //@subsection Variables and Data structures
131
132 /* Main thread queue.
133  * Locks required: sched_mutex.
134  */
135 StgMainThread *main_threads = NULL;
136
137 #ifdef THREADED_RTS
138 // Pointer to the thread that executes main
139 // When this thread is finished, the program terminates
140 // by calling shutdownHaskellAndExit.
141 // It would be better to add a call to shutdownHaskellAndExit
142 // to the Main.main wrapper and to remove this hack.
143 StgMainThread *main_main_thread = NULL;
144 #endif
145
146 /* Thread queues.
147  * Locks required: sched_mutex.
148  */
149 #if defined(GRAN)
150
151 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
152 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
153
154 /* 
155    In GranSim we have a runnable and a blocked queue for each processor.
156    In order to minimise code changes new arrays run_queue_hds/tls
157    are created. run_queue_hd is then a short cut (macro) for
158    run_queue_hds[CurrentProc] (see GranSim.h).
159    -- HWL
160 */
161 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
162 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
163 StgTSO *ccalling_threadss[MAX_PROC];
164 /* We use the same global list of threads (all_threads) in GranSim as in
165    the std RTS (i.e. we are cheating). However, we don't use this list in
166    the GranSim specific code at the moment (so we are only potentially
167    cheating).  */
168
169 #else /* !GRAN */
170
171 StgTSO *run_queue_hd = NULL;
172 StgTSO *run_queue_tl = NULL;
173 StgTSO *blocked_queue_hd = NULL;
174 StgTSO *blocked_queue_tl = NULL;
175 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
176
177 #endif
178
179 /* Linked list of all threads.
180  * Used for detecting garbage collected threads.
181  */
182 StgTSO *all_threads = NULL;
183
184 /* When a thread performs a safe C call (_ccall_GC, using old
185  * terminology), it gets put on the suspended_ccalling_threads
186  * list. Used by the garbage collector.
187  */
188 static StgTSO *suspended_ccalling_threads;
189
190 static StgTSO *threadStackOverflow(StgTSO *tso);
191
192 /* KH: The following two flags are shared memory locations.  There is no need
193        to lock them, since they are only unset at the end of a scheduler
194        operation.
195 */
196
197 /* flag set by signal handler to precipitate a context switch */
198 //@cindex context_switch
199 nat context_switch = 0;
200
201 /* if this flag is set as well, give up execution */
202 //@cindex interrupted
203 rtsBool interrupted = rtsFalse;
204
205 /* Next thread ID to allocate.
206  * Locks required: thread_id_mutex
207  */
208 //@cindex next_thread_id
209 static StgThreadID next_thread_id = 1;
210
211 /*
212  * Pointers to the state of the current thread.
213  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
214  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
215  */
216  
217 /* The smallest stack size that makes any sense is:
218  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
219  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
220  *  + 1                       (the closure to enter)
221  *  + 1                       (stg_ap_v_ret)
222  *  + 1                       (spare slot req'd by stg_ap_v_ret)
223  *
224  * A thread with this stack will bomb immediately with a stack
225  * overflow, which will increase its stack size.  
226  */
227
228 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
229
230
231 #if defined(GRAN)
232 StgTSO *CurrentTSO;
233 #endif
234
235 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
236  *  exists - earlier gccs apparently didn't.
237  *  -= chak
238  */
239 StgTSO dummy_tso;
240
241 static rtsBool ready_to_gc;
242
243 /*
244  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
245  * in an MT setting, needed to signal that a worker thread shouldn't hang around
246  * in the scheduler when it is out of work.
247  */
248 static rtsBool shutting_down_scheduler = rtsFalse;
249
250 void            addToBlockedQueue ( StgTSO *tso );
251
252 static void     schedule          ( void );
253        void     interruptStgRts   ( void );
254
255 static void     detectBlackHoles  ( void );
256
257 #ifdef DEBUG
258 static void sched_belch(char *s, ...);
259 #endif
260
261 #if defined(RTS_SUPPORTS_THREADS)
262 /* ToDo: carefully document the invariants that go together
263  *       with these synchronisation objects.
264  */
265 Mutex     sched_mutex       = INIT_MUTEX_VAR;
266 Mutex     term_mutex        = INIT_MUTEX_VAR;
267
268 /*
269  * A heavyweight solution to the problem of protecting
270  * the thread_id from concurrent update.
271  */
272 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
273
274
275 # if defined(SMP)
276 static Condition gc_pending_cond = INIT_COND_VAR;
277 nat await_death;
278 # endif
279
280 #endif /* RTS_SUPPORTS_THREADS */
281
282 #if defined(PAR)
283 StgTSO *LastTSO;
284 rtsTime TimeOfLastYield;
285 rtsBool emitSchedule = rtsTrue;
286 #endif
287
288 #if DEBUG
289 static char *whatNext_strs[] = {
290   "ThreadRunGHC",
291   "ThreadInterpret",
292   "ThreadKilled",
293   "ThreadRelocated",
294   "ThreadComplete"
295 };
296 #endif
297
298 #if defined(PAR)
299 StgTSO * createSparkThread(rtsSpark spark);
300 StgTSO * activateSpark (rtsSpark spark);  
301 #endif
302
303 /*
304  * The thread state for the main thread.
305 // ToDo: check whether not needed any more
306 StgTSO   *MainTSO;
307  */
308
309 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
310 static void taskStart(void);
311 static void
312 taskStart(void)
313 {
314   schedule();
315 }
316 #endif
317
318 #if defined(RTS_SUPPORTS_THREADS)
319 void
320 startSchedulerTask(void)
321 {
322     startTask(taskStart);
323 }
324 #endif
325
326 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
327 //@subsection Main scheduling loop
328
329 /* ---------------------------------------------------------------------------
330    Main scheduling loop.
331
332    We use round-robin scheduling, each thread returning to the
333    scheduler loop when one of these conditions is detected:
334
335       * out of heap space
336       * timer expires (thread yields)
337       * thread blocks
338       * thread ends
339       * stack overflow
340
341    Locking notes:  we acquire the scheduler lock once at the beginning
342    of the scheduler loop, and release it when
343     
344       * running a thread, or
345       * waiting for work, or
346       * waiting for a GC to complete.
347
348    GRAN version:
349      In a GranSim setup this loop iterates over the global event queue.
350      This revolves around the global event queue, which determines what 
351      to do next. Therefore, it's more complicated than either the 
352      concurrent or the parallel (GUM) setup.
353
354    GUM version:
355      GUM iterates over incoming messages.
356      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
357      and sends out a fish whenever it has nothing to do; in-between
358      doing the actual reductions (shared code below) it processes the
359      incoming messages and deals with delayed operations 
360      (see PendingFetches).
361      This is not the ugliest code you could imagine, but it's bloody close.
362
363    ------------------------------------------------------------------------ */
364 //@cindex schedule
365 static void
366 schedule( void )
367 {
368   StgTSO *t;
369   Capability *cap;
370   StgThreadReturnCode ret;
371 #if defined(GRAN)
372   rtsEvent *event;
373 #elif defined(PAR)
374   StgSparkPool *pool;
375   rtsSpark spark;
376   StgTSO *tso;
377   GlobalTaskId pe;
378   rtsBool receivedFinish = rtsFalse;
379 # if defined(DEBUG)
380   nat tp_size, sp_size; // stats only
381 # endif
382 #endif
383   rtsBool was_interrupted = rtsFalse;
384   StgTSOWhatNext prev_what_next;
385   
386   ACQUIRE_LOCK(&sched_mutex);
387  
388 #if defined(RTS_SUPPORTS_THREADS)
389   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
390   IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId()));
391 #else
392   /* simply initialise it in the non-threaded case */
393   grabCapability(&cap);
394 #endif
395
396 #if defined(GRAN)
397   /* set up first event to get things going */
398   /* ToDo: assign costs for system setup and init MainTSO ! */
399   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
400             ContinueThread, 
401             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
402
403   IF_DEBUG(gran,
404            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
405            G_TSO(CurrentTSO, 5));
406
407   if (RtsFlags.GranFlags.Light) {
408     /* Save current time; GranSim Light only */
409     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
410   }      
411
412   event = get_next_event();
413
414   while (event!=(rtsEvent*)NULL) {
415     /* Choose the processor with the next event */
416     CurrentProc = event->proc;
417     CurrentTSO = event->tso;
418
419 #elif defined(PAR)
420
421   while (!receivedFinish) {    /* set by processMessages */
422                                /* when receiving PP_FINISH message         */ 
423 #else
424
425   while (1) {
426
427 #endif
428
429     IF_DEBUG(scheduler, printAllThreads());
430
431 #if defined(RTS_SUPPORTS_THREADS)
432     /* Check to see whether there are any worker threads
433        waiting to deposit external call results. If so,
434        yield our capability */
435     yieldToReturningWorker(&sched_mutex, &cap);
436 #endif
437
438     /* If we're interrupted (the user pressed ^C, or some other
439      * termination condition occurred), kill all the currently running
440      * threads.
441      */
442     if (interrupted) {
443       IF_DEBUG(scheduler, sched_belch("interrupted"));
444       interrupted = rtsFalse;
445       was_interrupted = rtsTrue;
446 #if defined(RTS_SUPPORTS_THREADS)
447       // In the threaded RTS, deadlock detection doesn't work,
448       // so just exit right away.
449       prog_belch("interrupted");
450       releaseCapability(cap);
451       startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
452       RELEASE_LOCK(&sched_mutex);
453       shutdownHaskellAndExit(EXIT_SUCCESS);
454 #else
455       deleteAllThreads();
456 #endif
457     }
458
459     /* Go through the list of main threads and wake up any
460      * clients whose computations have finished.  ToDo: this
461      * should be done more efficiently without a linear scan
462      * of the main threads list, somehow...
463      */
464 #if defined(RTS_SUPPORTS_THREADS)
465     { 
466       StgMainThread *m, **prev;
467       prev = &main_threads;
468       for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
469         switch (m->tso->what_next) {
470         case ThreadComplete:
471           if (m->ret) {
472               // NOTE: return val is tso->sp[1] (see StgStartup.hc)
473               *(m->ret) = (StgClosure *)m->tso->sp[1]; 
474           }
475           *prev = m->link;
476           m->stat = Success;
477           broadcastCondition(&m->wakeup);
478 #ifdef DEBUG
479           removeThreadLabel((StgWord)m->tso);
480 #endif
481           if(m == main_main_thread)
482           {
483               releaseCapability(cap);
484               startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
485               RELEASE_LOCK(&sched_mutex);
486               shutdownHaskellAndExit(EXIT_SUCCESS);
487           }
488           break;
489         case ThreadKilled:
490           if (m->ret) *(m->ret) = NULL;
491           *prev = m->link;
492           if (was_interrupted) {
493             m->stat = Interrupted;
494           } else {
495             m->stat = Killed;
496           }
497           broadcastCondition(&m->wakeup);
498 #ifdef DEBUG
499           removeThreadLabel((StgWord)m->tso);
500 #endif
501           if(m == main_main_thread)
502           {
503               releaseCapability(cap);
504               startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
505               RELEASE_LOCK(&sched_mutex);
506               shutdownHaskellAndExit(EXIT_SUCCESS);
507           }
508           break;
509         default:
510           break;
511         }
512       }
513     }
514
515 #else /* not threaded */
516
517 # if defined(PAR)
518     /* in GUM do this only on the Main PE */
519     if (IAmMainThread)
520 # endif
521     /* If our main thread has finished or been killed, return.
522      */
523     {
524       StgMainThread *m = main_threads;
525       if (m->tso->what_next == ThreadComplete
526           || m->tso->what_next == ThreadKilled) {
527 #ifdef DEBUG
528         removeThreadLabel((StgWord)m->tso);
529 #endif
530         main_threads = main_threads->link;
531         if (m->tso->what_next == ThreadComplete) {
532             // We finished successfully, fill in the return value
533             // NOTE: return val is tso->sp[1] (see StgStartup.hc)
534             if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
535             m->stat = Success;
536             return;
537         } else {
538           if (m->ret) { *(m->ret) = NULL; };
539           if (was_interrupted) {
540             m->stat = Interrupted;
541           } else {
542             m->stat = Killed;
543           }
544           return;
545         }
546       }
547     }
548 #endif
549
550     /* Top up the run queue from our spark pool.  We try to make the
551      * number of threads in the run queue equal to the number of
552      * free capabilities.
553      *
554      * Disable spark support in SMP for now, non-essential & requires
555      * a little bit of work to make it compile cleanly. -- sof 1/02.
556      */
557 #if 0 /* defined(SMP) */
558     {
559       nat n = getFreeCapabilities();
560       StgTSO *tso = run_queue_hd;
561
562       /* Count the run queue */
563       while (n > 0 && tso != END_TSO_QUEUE) {
564         tso = tso->link;
565         n--;
566       }
567
568       for (; n > 0; n--) {
569         StgClosure *spark;
570         spark = findSpark(rtsFalse);
571         if (spark == NULL) {
572           break; /* no more sparks in the pool */
573         } else {
574           /* I'd prefer this to be done in activateSpark -- HWL */
575           /* tricky - it needs to hold the scheduler lock and
576            * not try to re-acquire it -- SDM */
577           createSparkThread(spark);       
578           IF_DEBUG(scheduler,
579                    sched_belch("==^^ turning spark of closure %p into a thread",
580                                (StgClosure *)spark));
581         }
582       }
583       /* We need to wake up the other tasks if we just created some
584        * work for them.
585        */
586       if (getFreeCapabilities() - n > 1) {
587           signalCondition( &thread_ready_cond );
588       }
589     }
590 #endif // SMP
591
592     /* check for signals each time around the scheduler */
593 #if defined(RTS_USER_SIGNALS)
594     if (signals_pending()) {
595       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
596       startSignalHandlers();
597       ACQUIRE_LOCK(&sched_mutex);
598     }
599 #endif
600
601     /* Check whether any waiting threads need to be woken up.  If the
602      * run queue is empty, and there are no other tasks running, we
603      * can wait indefinitely for something to happen.
604      */
605     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
606 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
607                 || EMPTY_RUN_QUEUE()
608 #endif
609         )
610     {
611       awaitEvent( EMPTY_RUN_QUEUE()
612 #if defined(SMP)
613         && allFreeCapabilities()
614 #endif
615         );
616     }
617     /* we can be interrupted while waiting for I/O... */
618     if (interrupted) continue;
619
620     /* 
621      * Detect deadlock: when we have no threads to run, there are no
622      * threads waiting on I/O or sleeping, and all the other tasks are
623      * waiting for work, we must have a deadlock of some description.
624      *
625      * We first try to find threads blocked on themselves (ie. black
626      * holes), and generate NonTermination exceptions where necessary.
627      *
628      * If no threads are black holed, we have a deadlock situation, so
629      * inform all the main threads.
630      */
631 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
632     if (   EMPTY_THREAD_QUEUES()
633 #if defined(RTS_SUPPORTS_THREADS)
634         && EMPTY_QUEUE(suspended_ccalling_threads)
635 #endif
636 #ifdef SMP
637         && allFreeCapabilities()
638 #endif
639         )
640     {
641         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
642 #if defined(THREADED_RTS)
643         /* and SMP mode ..? */
644         releaseCapability(cap);
645 #endif
646         // Garbage collection can release some new threads due to
647         // either (a) finalizers or (b) threads resurrected because
648         // they are about to be send BlockedOnDeadMVar.  Any threads
649         // thus released will be immediately runnable.
650         GarbageCollect(GetRoots,rtsTrue);
651
652         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
653
654         IF_DEBUG(scheduler, 
655                  sched_belch("still deadlocked, checking for black holes..."));
656         detectBlackHoles();
657
658         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
659
660 #if defined(RTS_USER_SIGNALS)
661         /* If we have user-installed signal handlers, then wait
662          * for signals to arrive rather then bombing out with a
663          * deadlock.
664          */
665 #if defined(RTS_SUPPORTS_THREADS)
666         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
667                       a signal with no runnable threads (or I/O
668                       suspended ones) leads nowhere quick.
669                       For now, simply shut down when we reach this
670                       condition.
671                       
672                       ToDo: define precisely under what conditions
673                       the Scheduler should shut down in an MT setting.
674                    */
675 #else
676         if ( anyUserHandlers() ) {
677 #endif
678             IF_DEBUG(scheduler, 
679                      sched_belch("still deadlocked, waiting for signals..."));
680
681             awaitUserSignals();
682
683             // we might be interrupted...
684             if (interrupted) { continue; }
685
686             if (signals_pending()) {
687                 RELEASE_LOCK(&sched_mutex);
688                 startSignalHandlers();
689                 ACQUIRE_LOCK(&sched_mutex);
690             }
691             ASSERT(!EMPTY_RUN_QUEUE());
692             goto not_deadlocked;
693         }
694 #endif
695
696         /* Probably a real deadlock.  Send the current main thread the
697          * Deadlock exception (or in the SMP build, send *all* main
698          * threads the deadlock exception, since none of them can make
699          * progress).
700          */
701         {
702             StgMainThread *m;
703 #if defined(RTS_SUPPORTS_THREADS)
704             for (m = main_threads; m != NULL; m = m->link) {
705                 switch (m->tso->why_blocked) {
706                 case BlockedOnBlackHole:
707                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
708                     break;
709                 case BlockedOnException:
710                 case BlockedOnMVar:
711                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
712                     break;
713                 default:
714                     barf("deadlock: main thread blocked in a strange way");
715                 }
716             }
717 #else
718             m = main_threads;
719             switch (m->tso->why_blocked) {
720             case BlockedOnBlackHole:
721                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
722                 break;
723             case BlockedOnException:
724             case BlockedOnMVar:
725                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
726                 break;
727             default:
728                 barf("deadlock: main thread blocked in a strange way");
729             }
730 #endif
731         }
732
733 #if defined(RTS_SUPPORTS_THREADS)
734         /* ToDo: revisit conditions (and mechanism) for shutting
735            down a multi-threaded world  */
736         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
737         RELEASE_LOCK(&sched_mutex);
738         shutdownHaskell();
739         return;
740 #endif
741     }
742   not_deadlocked:
743
744 #elif defined(RTS_SUPPORTS_THREADS)
745     /* ToDo: add deadlock detection in threaded RTS */
746 #elif defined(PAR)
747     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
748 #endif
749
750 #if defined(SMP)
751     /* If there's a GC pending, don't do anything until it has
752      * completed.
753      */
754     if (ready_to_gc) {
755       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
756       waitCondition( &gc_pending_cond, &sched_mutex );
757     }
758 #endif    
759
760 #if defined(RTS_SUPPORTS_THREADS)
761 #if defined(SMP)
762     /* block until we've got a thread on the run queue and a free
763      * capability.
764      *
765      */
766     if ( EMPTY_RUN_QUEUE() ) {
767       /* Give up our capability */
768       releaseCapability(cap);
769
770       /* If we're in the process of shutting down (& running the
771        * a batch of finalisers), don't wait around.
772        */
773       if ( shutting_down_scheduler ) {
774         RELEASE_LOCK(&sched_mutex);
775         return;
776       }
777       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
778       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
779       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
780     }
781 #else
782     if ( EMPTY_RUN_QUEUE() ) {
783       continue; // nothing to do
784     }
785 #endif
786 #endif
787
788 #if defined(GRAN)
789     if (RtsFlags.GranFlags.Light)
790       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
791
792     /* adjust time based on time-stamp */
793     if (event->time > CurrentTime[CurrentProc] &&
794         event->evttype != ContinueThread)
795       CurrentTime[CurrentProc] = event->time;
796     
797     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
798     if (!RtsFlags.GranFlags.Light)
799       handleIdlePEs();
800
801     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
802
803     /* main event dispatcher in GranSim */
804     switch (event->evttype) {
805       /* Should just be continuing execution */
806     case ContinueThread:
807       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
808       /* ToDo: check assertion
809       ASSERT(run_queue_hd != (StgTSO*)NULL &&
810              run_queue_hd != END_TSO_QUEUE);
811       */
812       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
813       if (!RtsFlags.GranFlags.DoAsyncFetch &&
814           procStatus[CurrentProc]==Fetching) {
815         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
816               CurrentTSO->id, CurrentTSO, CurrentProc);
817         goto next_thread;
818       } 
819       /* Ignore ContinueThreads for completed threads */
820       if (CurrentTSO->what_next == ThreadComplete) {
821         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
822               CurrentTSO->id, CurrentTSO, CurrentProc);
823         goto next_thread;
824       } 
825       /* Ignore ContinueThreads for threads that are being migrated */
826       if (PROCS(CurrentTSO)==Nowhere) { 
827         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
828               CurrentTSO->id, CurrentTSO, CurrentProc);
829         goto next_thread;
830       }
831       /* The thread should be at the beginning of the run queue */
832       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
833         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
834               CurrentTSO->id, CurrentTSO, CurrentProc);
835         break; // run the thread anyway
836       }
837       /*
838       new_event(proc, proc, CurrentTime[proc],
839                 FindWork,
840                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
841       goto next_thread; 
842       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
843       break; // now actually run the thread; DaH Qu'vam yImuHbej 
844
845     case FetchNode:
846       do_the_fetchnode(event);
847       goto next_thread;             /* handle next event in event queue  */
848       
849     case GlobalBlock:
850       do_the_globalblock(event);
851       goto next_thread;             /* handle next event in event queue  */
852       
853     case FetchReply:
854       do_the_fetchreply(event);
855       goto next_thread;             /* handle next event in event queue  */
856       
857     case UnblockThread:   /* Move from the blocked queue to the tail of */
858       do_the_unblock(event);
859       goto next_thread;             /* handle next event in event queue  */
860       
861     case ResumeThread:  /* Move from the blocked queue to the tail of */
862       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
863       event->tso->gran.blocktime += 
864         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
865       do_the_startthread(event);
866       goto next_thread;             /* handle next event in event queue  */
867       
868     case StartThread:
869       do_the_startthread(event);
870       goto next_thread;             /* handle next event in event queue  */
871       
872     case MoveThread:
873       do_the_movethread(event);
874       goto next_thread;             /* handle next event in event queue  */
875       
876     case MoveSpark:
877       do_the_movespark(event);
878       goto next_thread;             /* handle next event in event queue  */
879       
880     case FindWork:
881       do_the_findwork(event);
882       goto next_thread;             /* handle next event in event queue  */
883       
884     default:
885       barf("Illegal event type %u\n", event->evttype);
886     }  /* switch */
887     
888     /* This point was scheduler_loop in the old RTS */
889
890     IF_DEBUG(gran, belch("GRAN: after main switch"));
891
892     TimeOfLastEvent = CurrentTime[CurrentProc];
893     TimeOfNextEvent = get_time_of_next_event();
894     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
895     // CurrentTSO = ThreadQueueHd;
896
897     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
898                          TimeOfNextEvent));
899
900     if (RtsFlags.GranFlags.Light) 
901       GranSimLight_leave_system(event, &ActiveTSO); 
902
903     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
904
905     IF_DEBUG(gran, 
906              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
907
908     /* in a GranSim setup the TSO stays on the run queue */
909     t = CurrentTSO;
910     /* Take a thread from the run queue. */
911     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
912
913     IF_DEBUG(gran, 
914              fprintf(stderr, "GRAN: About to run current thread, which is\n");
915              G_TSO(t,5));
916
917     context_switch = 0; // turned on via GranYield, checking events and time slice
918
919     IF_DEBUG(gran, 
920              DumpGranEvent(GR_SCHEDULE, t));
921
922     procStatus[CurrentProc] = Busy;
923
924 #elif defined(PAR)
925     if (PendingFetches != END_BF_QUEUE) {
926         processFetches();
927     }
928
929     /* ToDo: phps merge with spark activation above */
930     /* check whether we have local work and send requests if we have none */
931     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
932       /* :-[  no local threads => look out for local sparks */
933       /* the spark pool for the current PE */
934       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
935       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
936           pool->hd < pool->tl) {
937         /* 
938          * ToDo: add GC code check that we really have enough heap afterwards!!
939          * Old comment:
940          * If we're here (no runnable threads) and we have pending
941          * sparks, we must have a space problem.  Get enough space
942          * to turn one of those pending sparks into a
943          * thread... 
944          */
945
946         spark = findSpark(rtsFalse);                /* get a spark */
947         if (spark != (rtsSpark) NULL) {
948           tso = activateSpark(spark);       /* turn the spark into a thread */
949           IF_PAR_DEBUG(schedule,
950                        belch("==== schedule: Created TSO %d (%p); %d threads active",
951                              tso->id, tso, advisory_thread_count));
952
953           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
954             belch("==^^ failed to activate spark");
955             goto next_thread;
956           }               /* otherwise fall through & pick-up new tso */
957         } else {
958           IF_PAR_DEBUG(verbose,
959                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
960                              spark_queue_len(pool)));
961           goto next_thread;
962         }
963       }
964
965       /* If we still have no work we need to send a FISH to get a spark
966          from another PE 
967       */
968       if (EMPTY_RUN_QUEUE()) {
969       /* =8-[  no local sparks => look for work on other PEs */
970         /*
971          * We really have absolutely no work.  Send out a fish
972          * (there may be some out there already), and wait for
973          * something to arrive.  We clearly can't run any threads
974          * until a SCHEDULE or RESUME arrives, and so that's what
975          * we're hoping to see.  (Of course, we still have to
976          * respond to other types of messages.)
977          */
978         TIME now = msTime() /*CURRENT_TIME*/;
979         IF_PAR_DEBUG(verbose, 
980                      belch("--  now=%ld", now));
981         IF_PAR_DEBUG(verbose,
982                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
983                          (last_fish_arrived_at!=0 &&
984                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
985                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
986                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
987                              last_fish_arrived_at,
988                              RtsFlags.ParFlags.fishDelay, now);
989                      });
990         
991         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
992             (last_fish_arrived_at==0 ||
993              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
994           /* outstandingFishes is set in sendFish, processFish;
995              avoid flooding system with fishes via delay */
996           pe = choosePE();
997           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
998                    NEW_FISH_HUNGER);
999
1000           // Global statistics: count no. of fishes
1001           if (RtsFlags.ParFlags.ParStats.Global &&
1002               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1003             globalParStats.tot_fish_mess++;
1004           }
1005         }
1006       
1007         receivedFinish = processMessages();
1008         goto next_thread;
1009       }
1010     } else if (PacketsWaiting()) {  /* Look for incoming messages */
1011       receivedFinish = processMessages();
1012     }
1013
1014     /* Now we are sure that we have some work available */
1015     ASSERT(run_queue_hd != END_TSO_QUEUE);
1016
1017     /* Take a thread from the run queue, if we have work */
1018     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
1019     IF_DEBUG(sanity,checkTSO(t));
1020
1021     /* ToDo: write something to the log-file
1022     if (RTSflags.ParFlags.granSimStats && !sameThread)
1023         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1024
1025     CurrentTSO = t;
1026     */
1027     /* the spark pool for the current PE */
1028     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1029
1030     IF_DEBUG(scheduler, 
1031              belch("--=^ %d threads, %d sparks on [%#x]", 
1032                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1033
1034 # if 1
1035     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1036         t && LastTSO && t->id != LastTSO->id && 
1037         LastTSO->why_blocked == NotBlocked && 
1038         LastTSO->what_next != ThreadComplete) {
1039       // if previously scheduled TSO not blocked we have to record the context switch
1040       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1041                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1042     }
1043
1044     if (RtsFlags.ParFlags.ParStats.Full && 
1045         (emitSchedule /* forced emit */ ||
1046         (t && LastTSO && t->id != LastTSO->id))) {
1047       /* 
1048          we are running a different TSO, so write a schedule event to log file
1049          NB: If we use fair scheduling we also have to write  a deschedule 
1050              event for LastTSO; with unfair scheduling we know that the
1051              previous tso has blocked whenever we switch to another tso, so
1052              we don't need it in GUM for now
1053       */
1054       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1055                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1056       emitSchedule = rtsFalse;
1057     }
1058      
1059 # endif
1060 #else /* !GRAN && !PAR */
1061   
1062     /* grab a thread from the run queue */
1063     ASSERT(run_queue_hd != END_TSO_QUEUE);
1064     t = POP_RUN_QUEUE();
1065     // Sanity check the thread we're about to run.  This can be
1066     // expensive if there is lots of thread switching going on...
1067     IF_DEBUG(sanity,checkTSO(t));
1068 #endif
1069
1070     cap->r.rCurrentTSO = t;
1071     
1072     /* context switches are now initiated by the timer signal, unless
1073      * the user specified "context switch as often as possible", with
1074      * +RTS -C0
1075      */
1076     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1077          && (run_queue_hd != END_TSO_QUEUE
1078              || blocked_queue_hd != END_TSO_QUEUE
1079              || sleeping_queue != END_TSO_QUEUE)))
1080         context_switch = 1;
1081     else
1082         context_switch = 0;
1083
1084 run_thread:
1085
1086     RELEASE_LOCK(&sched_mutex);
1087
1088     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
1089                               t->id, whatNext_strs[t->what_next]));
1090
1091 #ifdef PROFILING
1092     startHeapProfTimer();
1093 #endif
1094
1095     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1096     /* Run the current thread 
1097      */
1098     prev_what_next = t->what_next;
1099     switch (prev_what_next) {
1100     case ThreadKilled:
1101     case ThreadComplete:
1102         /* Thread already finished, return to scheduler. */
1103         ret = ThreadFinished;
1104         break;
1105     case ThreadRunGHC:
1106         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1107         break;
1108     case ThreadInterpret:
1109         ret = interpretBCO(cap);
1110         break;
1111     default:
1112       barf("schedule: invalid what_next field");
1113     }
1114     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1115     
1116     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1117 #ifdef PROFILING
1118     stopHeapProfTimer();
1119     CCCS = CCS_SYSTEM;
1120 #endif
1121     
1122     ACQUIRE_LOCK(&sched_mutex);
1123     
1124 #ifdef RTS_SUPPORTS_THREADS
1125     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1126 #elif !defined(GRAN) && !defined(PAR)
1127     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1128 #endif
1129     t = cap->r.rCurrentTSO;
1130     
1131 #if defined(PAR)
1132     /* HACK 675: if the last thread didn't yield, make sure to print a 
1133        SCHEDULE event to the log file when StgRunning the next thread, even
1134        if it is the same one as before */
1135     LastTSO = t; 
1136     TimeOfLastYield = CURRENT_TIME;
1137 #endif
1138
1139     switch (ret) {
1140     case HeapOverflow:
1141 #if defined(GRAN)
1142       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1143       globalGranStats.tot_heapover++;
1144 #elif defined(PAR)
1145       globalParStats.tot_heapover++;
1146 #endif
1147
1148       // did the task ask for a large block?
1149       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1150           // if so, get one and push it on the front of the nursery.
1151           bdescr *bd;
1152           nat blocks;
1153           
1154           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1155
1156           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
1157                                    t->id, whatNext_strs[t->what_next], blocks));
1158
1159           // don't do this if it would push us over the
1160           // alloc_blocks_lim limit; we'll GC first.
1161           if (alloc_blocks + blocks < alloc_blocks_lim) {
1162
1163               alloc_blocks += blocks;
1164               bd = allocGroup( blocks );
1165
1166               // link the new group into the list
1167               bd->link = cap->r.rCurrentNursery;
1168               bd->u.back = cap->r.rCurrentNursery->u.back;
1169               if (cap->r.rCurrentNursery->u.back != NULL) {
1170                   cap->r.rCurrentNursery->u.back->link = bd;
1171               } else {
1172                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1173                          g0s0->blocks == cap->r.rNursery);
1174                   cap->r.rNursery = g0s0->blocks = bd;
1175               }           
1176               cap->r.rCurrentNursery->u.back = bd;
1177
1178               // initialise it as a nursery block.  We initialise the
1179               // step, gen_no, and flags field of *every* sub-block in
1180               // this large block, because this is easier than making
1181               // sure that we always find the block head of a large
1182               // block whenever we call Bdescr() (eg. evacuate() and
1183               // isAlive() in the GC would both have to do this, at
1184               // least).
1185               { 
1186                   bdescr *x;
1187                   for (x = bd; x < bd + blocks; x++) {
1188                       x->step = g0s0;
1189                       x->gen_no = 0;
1190                       x->flags = 0;
1191                   }
1192               }
1193
1194               // don't forget to update the block count in g0s0.
1195               g0s0->n_blocks += blocks;
1196               // This assert can be a killer if the app is doing lots
1197               // of large block allocations.
1198               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1199
1200               // now update the nursery to point to the new block
1201               cap->r.rCurrentNursery = bd;
1202
1203               // we might be unlucky and have another thread get on the
1204               // run queue before us and steal the large block, but in that
1205               // case the thread will just end up requesting another large
1206               // block.
1207               PUSH_ON_RUN_QUEUE(t);
1208               break;
1209           }
1210       }
1211
1212       /* make all the running tasks block on a condition variable,
1213        * maybe set context_switch and wait till they all pile in,
1214        * then have them wait on a GC condition variable.
1215        */
1216       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1217                                t->id, whatNext_strs[t->what_next]));
1218       threadPaused(t);
1219 #if defined(GRAN)
1220       ASSERT(!is_on_queue(t,CurrentProc));
1221 #elif defined(PAR)
1222       /* Currently we emit a DESCHEDULE event before GC in GUM.
1223          ToDo: either add separate event to distinguish SYSTEM time from rest
1224                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1225       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1226         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1227                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1228         emitSchedule = rtsTrue;
1229       }
1230 #endif
1231       
1232       ready_to_gc = rtsTrue;
1233       context_switch = 1;               /* stop other threads ASAP */
1234       PUSH_ON_RUN_QUEUE(t);
1235       /* actual GC is done at the end of the while loop */
1236       break;
1237       
1238     case StackOverflow:
1239 #if defined(GRAN)
1240       IF_DEBUG(gran, 
1241                DumpGranEvent(GR_DESCHEDULE, t));
1242       globalGranStats.tot_stackover++;
1243 #elif defined(PAR)
1244       // IF_DEBUG(par, 
1245       // DumpGranEvent(GR_DESCHEDULE, t);
1246       globalParStats.tot_stackover++;
1247 #endif
1248       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1249                                t->id, whatNext_strs[t->what_next]));
1250       /* just adjust the stack for this thread, then pop it back
1251        * on the run queue.
1252        */
1253       threadPaused(t);
1254       { 
1255         StgMainThread *m;
1256         /* enlarge the stack */
1257         StgTSO *new_t = threadStackOverflow(t);
1258         
1259         /* This TSO has moved, so update any pointers to it from the
1260          * main thread stack.  It better not be on any other queues...
1261          * (it shouldn't be).
1262          */
1263         for (m = main_threads; m != NULL; m = m->link) {
1264           if (m->tso == t) {
1265             m->tso = new_t;
1266           }
1267         }
1268         threadPaused(new_t);
1269         PUSH_ON_RUN_QUEUE(new_t);
1270       }
1271       break;
1272
1273     case ThreadYielding:
1274 #if defined(GRAN)
1275       IF_DEBUG(gran, 
1276                DumpGranEvent(GR_DESCHEDULE, t));
1277       globalGranStats.tot_yields++;
1278 #elif defined(PAR)
1279       // IF_DEBUG(par, 
1280       // DumpGranEvent(GR_DESCHEDULE, t);
1281       globalParStats.tot_yields++;
1282 #endif
1283       /* put the thread back on the run queue.  Then, if we're ready to
1284        * GC, check whether this is the last task to stop.  If so, wake
1285        * up the GC thread.  getThread will block during a GC until the
1286        * GC is finished.
1287        */
1288       IF_DEBUG(scheduler,
1289                if (t->what_next != prev_what_next) {
1290                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1291                          t->id, whatNext_strs[t->what_next]);
1292                } else {
1293                    belch("--<< thread %ld (%s) stopped, yielding", 
1294                          t->id, whatNext_strs[t->what_next]);
1295                }
1296                );
1297
1298       IF_DEBUG(sanity,
1299                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1300                checkTSO(t));
1301       ASSERT(t->link == END_TSO_QUEUE);
1302
1303       // Shortcut if we're just switching evaluators: don't bother
1304       // doing stack squeezing (which can be expensive), just run the
1305       // thread.
1306       if (t->what_next != prev_what_next) {
1307           goto run_thread;
1308       }
1309
1310       threadPaused(t);
1311
1312 #if defined(GRAN)
1313       ASSERT(!is_on_queue(t,CurrentProc));
1314
1315       IF_DEBUG(sanity,
1316                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1317                checkThreadQsSanity(rtsTrue));
1318 #endif
1319
1320 #if defined(PAR)
1321       if (RtsFlags.ParFlags.doFairScheduling) { 
1322         /* this does round-robin scheduling; good for concurrency */
1323         APPEND_TO_RUN_QUEUE(t);
1324       } else {
1325         /* this does unfair scheduling; good for parallelism */
1326         PUSH_ON_RUN_QUEUE(t);
1327       }
1328 #else
1329       // this does round-robin scheduling; good for concurrency
1330       APPEND_TO_RUN_QUEUE(t);
1331 #endif
1332
1333 #if defined(GRAN)
1334       /* add a ContinueThread event to actually process the thread */
1335       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1336                 ContinueThread,
1337                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1338       IF_GRAN_DEBUG(bq, 
1339                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1340                G_EVENTQ(0);
1341                G_CURR_THREADQ(0));
1342 #endif /* GRAN */
1343       break;
1344
1345     case ThreadBlocked:
1346 #if defined(GRAN)
1347       IF_DEBUG(scheduler,
1348                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1349                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1350                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1351
1352       // ??? needed; should emit block before
1353       IF_DEBUG(gran, 
1354                DumpGranEvent(GR_DESCHEDULE, t)); 
1355       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1356       /*
1357         ngoq Dogh!
1358       ASSERT(procStatus[CurrentProc]==Busy || 
1359               ((procStatus[CurrentProc]==Fetching) && 
1360               (t->block_info.closure!=(StgClosure*)NULL)));
1361       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1362           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1363             procStatus[CurrentProc]==Fetching)) 
1364         procStatus[CurrentProc] = Idle;
1365       */
1366 #elif defined(PAR)
1367       IF_DEBUG(scheduler,
1368                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1369                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1370       IF_PAR_DEBUG(bq,
1371
1372                    if (t->block_info.closure!=(StgClosure*)NULL) 
1373                      print_bq(t->block_info.closure));
1374
1375       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1376       blockThread(t);
1377
1378       /* whatever we schedule next, we must log that schedule */
1379       emitSchedule = rtsTrue;
1380
1381 #else /* !GRAN */
1382       /* don't need to do anything.  Either the thread is blocked on
1383        * I/O, in which case we'll have called addToBlockedQueue
1384        * previously, or it's blocked on an MVar or Blackhole, in which
1385        * case it'll be on the relevant queue already.
1386        */
1387       IF_DEBUG(scheduler,
1388                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1389                        t->id, whatNext_strs[t->what_next]);
1390                printThreadBlockage(t);
1391                fprintf(stderr, "\n"));
1392
1393       /* Only for dumping event to log file 
1394          ToDo: do I need this in GranSim, too?
1395       blockThread(t);
1396       */
1397 #endif
1398       threadPaused(t);
1399       break;
1400       
1401     case ThreadFinished:
1402       /* Need to check whether this was a main thread, and if so, signal
1403        * the task that started it with the return value.  If we have no
1404        * more main threads, we probably need to stop all the tasks until
1405        * we get a new one.
1406        */
1407       /* We also end up here if the thread kills itself with an
1408        * uncaught exception, see Exception.hc.
1409        */
1410       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1411                                t->id, whatNext_strs[t->what_next]));
1412 #if defined(GRAN)
1413       endThread(t, CurrentProc); // clean-up the thread
1414 #elif defined(PAR)
1415       /* For now all are advisory -- HWL */
1416       //if(t->priority==AdvisoryPriority) ??
1417       advisory_thread_count--;
1418       
1419 # ifdef DIST
1420       if(t->dist.priority==RevalPriority)
1421         FinishReval(t);
1422 # endif
1423       
1424       if (RtsFlags.ParFlags.ParStats.Full &&
1425           !RtsFlags.ParFlags.ParStats.Suppressed) 
1426         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1427 #endif
1428       break;
1429       
1430     default:
1431       barf("schedule: invalid thread return code %d", (int)ret);
1432     }
1433
1434 #ifdef PROFILING
1435     // When we have +RTS -i0 and we're heap profiling, do a census at
1436     // every GC.  This lets us get repeatable runs for debugging.
1437     if (performHeapProfile ||
1438         (RtsFlags.ProfFlags.profileInterval==0 &&
1439          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1440         GarbageCollect(GetRoots, rtsTrue);
1441         heapCensus();
1442         performHeapProfile = rtsFalse;
1443         ready_to_gc = rtsFalse; // we already GC'd
1444     }
1445 #endif
1446
1447     if (ready_to_gc 
1448 #ifdef SMP
1449         && allFreeCapabilities() 
1450 #endif
1451         ) {
1452       /* everybody back, start the GC.
1453        * Could do it in this thread, or signal a condition var
1454        * to do it in another thread.  Either way, we need to
1455        * broadcast on gc_pending_cond afterward.
1456        */
1457 #if defined(RTS_SUPPORTS_THREADS)
1458       IF_DEBUG(scheduler,sched_belch("doing GC"));
1459 #endif
1460       GarbageCollect(GetRoots,rtsFalse);
1461       ready_to_gc = rtsFalse;
1462 #ifdef SMP
1463       broadcastCondition(&gc_pending_cond);
1464 #endif
1465 #if defined(GRAN)
1466       /* add a ContinueThread event to continue execution of current thread */
1467       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1468                 ContinueThread,
1469                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1470       IF_GRAN_DEBUG(bq, 
1471                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1472                G_EVENTQ(0);
1473                G_CURR_THREADQ(0));
1474 #endif /* GRAN */
1475     }
1476
1477 #if defined(GRAN)
1478   next_thread:
1479     IF_GRAN_DEBUG(unused,
1480                   print_eventq(EventHd));
1481
1482     event = get_next_event();
1483 #elif defined(PAR)
1484   next_thread:
1485     /* ToDo: wait for next message to arrive rather than busy wait */
1486 #endif /* GRAN */
1487
1488   } /* end of while(1) */
1489
1490   IF_PAR_DEBUG(verbose,
1491                belch("== Leaving schedule() after having received Finish"));
1492 }
1493
1494 /* ---------------------------------------------------------------------------
1495  * Singleton fork(). Do not copy any running threads.
1496  * ------------------------------------------------------------------------- */
1497
1498 StgInt forkProcess(StgTSO* tso) {
1499
1500 #ifndef mingw32_TARGET_OS
1501   pid_t pid;
1502   StgTSO* t,*next;
1503   StgMainThread *m;
1504   rtsBool doKill;
1505
1506   IF_DEBUG(scheduler,sched_belch("forking!"));
1507
1508   pid = fork();
1509   if (pid) { /* parent */
1510
1511   /* just return the pid */
1512     
1513   } else { /* child */
1514   /* wipe all other threads */
1515   run_queue_hd = run_queue_tl = tso;
1516   tso->link = END_TSO_QUEUE;
1517
1518   /* When clearing out the threads, we need to ensure
1519      that a 'main thread' is left behind; if there isn't,
1520      the Scheduler will shutdown next time it is entered.
1521      
1522      ==> we don't kill a thread that's on the main_threads
1523          list (nor the current thread.)
1524     
1525      [ Attempts at implementing the more ambitious scheme of
1526        killing the main_threads also, and then adding the
1527        current thread onto the main_threads list if it wasn't
1528        there already, failed -- waitThread() (for one) wasn't
1529        up to it. If it proves to be desirable to also kill
1530        the main threads, then this scheme will have to be
1531        revisited (and fully debugged!)
1532        
1533        -- sof 7/2002
1534      ]
1535   */
1536   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1537      us is picky about finding the thread still in its queue when
1538      handling the deleteThread() */
1539
1540   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1541     next = t->link;
1542     
1543     /* Don't kill the current thread.. */
1544     if (t->id == tso->id) continue;
1545     doKill=rtsTrue;
1546     /* ..or a main thread */
1547     for (m = main_threads; m != NULL; m = m->link) {
1548         if (m->tso->id == t->id) {
1549           doKill=rtsFalse;
1550           break;
1551         }
1552     }
1553     if (doKill) {
1554       deleteThread(t);
1555     }
1556   }
1557   }
1558   return pid;
1559 #else /* mingw32 */
1560   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1561   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1562   return -1;
1563 #endif /* mingw32 */
1564 }
1565
1566 /* ---------------------------------------------------------------------------
1567  * deleteAllThreads():  kill all the live threads.
1568  *
1569  * This is used when we catch a user interrupt (^C), before performing
1570  * any necessary cleanups and running finalizers.
1571  *
1572  * Locks: sched_mutex held.
1573  * ------------------------------------------------------------------------- */
1574    
1575 void deleteAllThreads ( void )
1576 {
1577   StgTSO* t, *next;
1578   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1579   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1580       next = t->global_link;
1581       deleteThread(t);
1582   }      
1583   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1584   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1585   sleeping_queue = END_TSO_QUEUE;
1586 }
1587
1588 /* startThread and  insertThread are now in GranSim.c -- HWL */
1589
1590
1591 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1592 //@subsection Suspend and Resume
1593
1594 /* ---------------------------------------------------------------------------
1595  * Suspending & resuming Haskell threads.
1596  * 
1597  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1598  * its capability before calling the C function.  This allows another
1599  * task to pick up the capability and carry on running Haskell
1600  * threads.  It also means that if the C call blocks, it won't lock
1601  * the whole system.
1602  *
1603  * The Haskell thread making the C call is put to sleep for the
1604  * duration of the call, on the susepended_ccalling_threads queue.  We
1605  * give out a token to the task, which it can use to resume the thread
1606  * on return from the C function.
1607  * ------------------------------------------------------------------------- */
1608    
1609 StgInt
1610 suspendThread( StgRegTable *reg, 
1611                rtsBool concCall
1612 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1613                STG_UNUSED
1614 #endif
1615                )
1616 {
1617   nat tok;
1618   Capability *cap;
1619
1620   /* assume that *reg is a pointer to the StgRegTable part
1621    * of a Capability.
1622    */
1623   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1624
1625   ACQUIRE_LOCK(&sched_mutex);
1626
1627   IF_DEBUG(scheduler,
1628            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1629
1630   // XXX this might not be necessary --SDM
1631   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1632
1633   threadPaused(cap->r.rCurrentTSO);
1634   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1635   suspended_ccalling_threads = cap->r.rCurrentTSO;
1636
1637 #if defined(RTS_SUPPORTS_THREADS)
1638   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1639   {
1640       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1641       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1642   }
1643   else
1644   {
1645       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1646   }
1647 #endif
1648
1649   /* Use the thread ID as the token; it should be unique */
1650   tok = cap->r.rCurrentTSO->id;
1651
1652   /* Hand back capability */
1653   releaseCapability(cap);
1654   
1655 #if defined(RTS_SUPPORTS_THREADS)
1656   /* Preparing to leave the RTS, so ensure there's a native thread/task
1657      waiting to take over.
1658   */
1659   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
1660   //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
1661       startTask(taskStart);
1662   //}
1663 #endif
1664
1665   /* Other threads _might_ be available for execution; signal this */
1666   THREAD_RUNNABLE();
1667   RELEASE_LOCK(&sched_mutex);
1668   return tok; 
1669 }
1670
1671 StgRegTable *
1672 resumeThread( StgInt tok,
1673               rtsBool concCall STG_UNUSED )
1674 {
1675   StgTSO *tso, **prev;
1676   Capability *cap;
1677
1678 #if defined(RTS_SUPPORTS_THREADS)
1679   /* Wait for permission to re-enter the RTS with the result. */
1680   ACQUIRE_LOCK(&sched_mutex);
1681   grabReturnCapability(&sched_mutex, &cap);
1682
1683   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
1684 #else
1685   grabCapability(&cap);
1686 #endif
1687
1688   /* Remove the thread off of the suspended list */
1689   prev = &suspended_ccalling_threads;
1690   for (tso = suspended_ccalling_threads; 
1691        tso != END_TSO_QUEUE; 
1692        prev = &tso->link, tso = tso->link) {
1693     if (tso->id == (StgThreadID)tok) {
1694       *prev = tso->link;
1695       break;
1696     }
1697   }
1698   if (tso == END_TSO_QUEUE) {
1699     barf("resumeThread: thread not found");
1700   }
1701   tso->link = END_TSO_QUEUE;
1702   
1703 #if defined(RTS_SUPPORTS_THREADS)
1704   if(tso->why_blocked == BlockedOnCCall)
1705   {
1706       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1707       tso->blocked_exceptions = NULL;
1708   }
1709 #endif
1710   
1711   /* Reset blocking status */
1712   tso->why_blocked  = NotBlocked;
1713
1714   cap->r.rCurrentTSO = tso;
1715 #if defined(RTS_SUPPORTS_THREADS)
1716   RELEASE_LOCK(&sched_mutex);
1717 #endif
1718   return &cap->r;
1719 }
1720
1721
1722 /* ---------------------------------------------------------------------------
1723  * Static functions
1724  * ------------------------------------------------------------------------ */
1725 static void unblockThread(StgTSO *tso);
1726
1727 /* ---------------------------------------------------------------------------
1728  * Comparing Thread ids.
1729  *
1730  * This is used from STG land in the implementation of the
1731  * instances of Eq/Ord for ThreadIds.
1732  * ------------------------------------------------------------------------ */
1733
1734 int
1735 cmp_thread(StgPtr tso1, StgPtr tso2) 
1736
1737   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1738   StgThreadID id2 = ((StgTSO *)tso2)->id;
1739  
1740   if (id1 < id2) return (-1);
1741   if (id1 > id2) return 1;
1742   return 0;
1743 }
1744
1745 /* ---------------------------------------------------------------------------
1746  * Fetching the ThreadID from an StgTSO.
1747  *
1748  * This is used in the implementation of Show for ThreadIds.
1749  * ------------------------------------------------------------------------ */
1750 int
1751 rts_getThreadId(StgPtr tso) 
1752 {
1753   return ((StgTSO *)tso)->id;
1754 }
1755
1756 #ifdef DEBUG
1757 void
1758 labelThread(StgPtr tso, char *label)
1759 {
1760   int len;
1761   void *buf;
1762
1763   /* Caveat: Once set, you can only set the thread name to "" */
1764   len = strlen(label)+1;
1765   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1766   strncpy(buf,label,len);
1767   /* Update will free the old memory for us */
1768   updateThreadLabel((StgWord)tso,buf);
1769 }
1770 #endif /* DEBUG */
1771
1772 /* ---------------------------------------------------------------------------
1773    Create a new thread.
1774
1775    The new thread starts with the given stack size.  Before the
1776    scheduler can run, however, this thread needs to have a closure
1777    (and possibly some arguments) pushed on its stack.  See
1778    pushClosure() in Schedule.h.
1779
1780    createGenThread() and createIOThread() (in SchedAPI.h) are
1781    convenient packaged versions of this function.
1782
1783    currently pri (priority) is only used in a GRAN setup -- HWL
1784    ------------------------------------------------------------------------ */
1785 //@cindex createThread
1786 #if defined(GRAN)
1787 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1788 StgTSO *
1789 createThread(nat size, StgInt pri)
1790 #else
1791 StgTSO *
1792 createThread(nat size)
1793 #endif
1794 {
1795
1796     StgTSO *tso;
1797     nat stack_size;
1798
1799     /* First check whether we should create a thread at all */
1800 #if defined(PAR)
1801   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1802   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1803     threadsIgnored++;
1804     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1805           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1806     return END_TSO_QUEUE;
1807   }
1808   threadsCreated++;
1809 #endif
1810
1811 #if defined(GRAN)
1812   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1813 #endif
1814
1815   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1816
1817   /* catch ridiculously small stack sizes */
1818   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1819     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1820   }
1821
1822   stack_size = size - TSO_STRUCT_SIZEW;
1823
1824   tso = (StgTSO *)allocate(size);
1825   TICK_ALLOC_TSO(stack_size, 0);
1826
1827   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1828 #if defined(GRAN)
1829   SET_GRAN_HDR(tso, ThisPE);
1830 #endif
1831
1832   // Always start with the compiled code evaluator
1833   tso->what_next = ThreadRunGHC;
1834
1835   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1836    * protect the increment operation on next_thread_id.
1837    * In future, we could use an atomic increment instead.
1838    */
1839   ACQUIRE_LOCK(&thread_id_mutex);
1840   tso->id = next_thread_id++; 
1841   RELEASE_LOCK(&thread_id_mutex);
1842
1843   tso->why_blocked  = NotBlocked;
1844   tso->blocked_exceptions = NULL;
1845
1846   tso->stack_size   = stack_size;
1847   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1848                               - TSO_STRUCT_SIZEW;
1849   tso->sp           = (P_)&(tso->stack) + stack_size;
1850
1851 #ifdef PROFILING
1852   tso->prof.CCCS = CCS_MAIN;
1853 #endif
1854
1855   /* put a stop frame on the stack */
1856   tso->sp -= sizeofW(StgStopFrame);
1857   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1858   // ToDo: check this
1859 #if defined(GRAN)
1860   tso->link = END_TSO_QUEUE;
1861   /* uses more flexible routine in GranSim */
1862   insertThread(tso, CurrentProc);
1863 #else
1864   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1865    * from its creation
1866    */
1867 #endif
1868
1869 #if defined(GRAN) 
1870   if (RtsFlags.GranFlags.GranSimStats.Full) 
1871     DumpGranEvent(GR_START,tso);
1872 #elif defined(PAR)
1873   if (RtsFlags.ParFlags.ParStats.Full) 
1874     DumpGranEvent(GR_STARTQ,tso);
1875   /* HACk to avoid SCHEDULE 
1876      LastTSO = tso; */
1877 #endif
1878
1879   /* Link the new thread on the global thread list.
1880    */
1881   tso->global_link = all_threads;
1882   all_threads = tso;
1883
1884 #if defined(DIST)
1885   tso->dist.priority = MandatoryPriority; //by default that is...
1886 #endif
1887
1888 #if defined(GRAN)
1889   tso->gran.pri = pri;
1890 # if defined(DEBUG)
1891   tso->gran.magic = TSO_MAGIC; // debugging only
1892 # endif
1893   tso->gran.sparkname   = 0;
1894   tso->gran.startedat   = CURRENT_TIME; 
1895   tso->gran.exported    = 0;
1896   tso->gran.basicblocks = 0;
1897   tso->gran.allocs      = 0;
1898   tso->gran.exectime    = 0;
1899   tso->gran.fetchtime   = 0;
1900   tso->gran.fetchcount  = 0;
1901   tso->gran.blocktime   = 0;
1902   tso->gran.blockcount  = 0;
1903   tso->gran.blockedat   = 0;
1904   tso->gran.globalsparks = 0;
1905   tso->gran.localsparks  = 0;
1906   if (RtsFlags.GranFlags.Light)
1907     tso->gran.clock  = Now; /* local clock */
1908   else
1909     tso->gran.clock  = 0;
1910
1911   IF_DEBUG(gran,printTSO(tso));
1912 #elif defined(PAR)
1913 # if defined(DEBUG)
1914   tso->par.magic = TSO_MAGIC; // debugging only
1915 # endif
1916   tso->par.sparkname   = 0;
1917   tso->par.startedat   = CURRENT_TIME; 
1918   tso->par.exported    = 0;
1919   tso->par.basicblocks = 0;
1920   tso->par.allocs      = 0;
1921   tso->par.exectime    = 0;
1922   tso->par.fetchtime   = 0;
1923   tso->par.fetchcount  = 0;
1924   tso->par.blocktime   = 0;
1925   tso->par.blockcount  = 0;
1926   tso->par.blockedat   = 0;
1927   tso->par.globalsparks = 0;
1928   tso->par.localsparks  = 0;
1929 #endif
1930
1931 #if defined(GRAN)
1932   globalGranStats.tot_threads_created++;
1933   globalGranStats.threads_created_on_PE[CurrentProc]++;
1934   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1935   globalGranStats.tot_sq_probes++;
1936 #elif defined(PAR)
1937   // collect parallel global statistics (currently done together with GC stats)
1938   if (RtsFlags.ParFlags.ParStats.Global &&
1939       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1940     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1941     globalParStats.tot_threads_created++;
1942   }
1943 #endif 
1944
1945 #if defined(GRAN)
1946   IF_GRAN_DEBUG(pri,
1947                 belch("==__ schedule: Created TSO %d (%p);",
1948                       CurrentProc, tso, tso->id));
1949 #elif defined(PAR)
1950     IF_PAR_DEBUG(verbose,
1951                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1952                        tso->id, tso, advisory_thread_count));
1953 #else
1954   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1955                                  tso->id, tso->stack_size));
1956 #endif    
1957   return tso;
1958 }
1959
1960 #if defined(PAR)
1961 /* RFP:
1962    all parallel thread creation calls should fall through the following routine.
1963 */
1964 StgTSO *
1965 createSparkThread(rtsSpark spark) 
1966 { StgTSO *tso;
1967   ASSERT(spark != (rtsSpark)NULL);
1968   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1969   { threadsIgnored++;
1970     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1971           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1972     return END_TSO_QUEUE;
1973   }
1974   else
1975   { threadsCreated++;
1976     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1977     if (tso==END_TSO_QUEUE)     
1978       barf("createSparkThread: Cannot create TSO");
1979 #if defined(DIST)
1980     tso->priority = AdvisoryPriority;
1981 #endif
1982     pushClosure(tso,spark);
1983     PUSH_ON_RUN_QUEUE(tso);
1984     advisory_thread_count++;    
1985   }
1986   return tso;
1987 }
1988 #endif
1989
1990 /*
1991   Turn a spark into a thread.
1992   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1993 */
1994 #if defined(PAR)
1995 //@cindex activateSpark
1996 StgTSO *
1997 activateSpark (rtsSpark spark) 
1998 {
1999   StgTSO *tso;
2000
2001   tso = createSparkThread(spark);
2002   if (RtsFlags.ParFlags.ParStats.Full) {   
2003     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2004     IF_PAR_DEBUG(verbose,
2005                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
2006                        (StgClosure *)spark, info_type((StgClosure *)spark)));
2007   }
2008   // ToDo: fwd info on local/global spark to thread -- HWL
2009   // tso->gran.exported =  spark->exported;
2010   // tso->gran.locked =   !spark->global;
2011   // tso->gran.sparkname = spark->name;
2012
2013   return tso;
2014 }
2015 #endif
2016
2017 static SchedulerStatus waitThread_(/*out*/StgMainThread* m
2018 #if defined(THREADED_RTS)
2019                                    , rtsBool blockWaiting
2020 #endif
2021                                    );
2022
2023
2024 /* ---------------------------------------------------------------------------
2025  * scheduleThread()
2026  *
2027  * scheduleThread puts a thread on the head of the runnable queue.
2028  * This will usually be done immediately after a thread is created.
2029  * The caller of scheduleThread must create the thread using e.g.
2030  * createThread and push an appropriate closure
2031  * on this thread's stack before the scheduler is invoked.
2032  * ------------------------------------------------------------------------ */
2033
2034 static void scheduleThread_ (StgTSO* tso);
2035
2036 void
2037 scheduleThread_(StgTSO *tso)
2038 {
2039   // Precondition: sched_mutex must be held.
2040
2041   /* Put the new thread on the head of the runnable queue.  The caller
2042    * better push an appropriate closure on this thread's stack
2043    * beforehand.  In the SMP case, the thread may start running as
2044    * soon as we release the scheduler lock below.
2045    */
2046   PUSH_ON_RUN_QUEUE(tso);
2047   THREAD_RUNNABLE();
2048
2049 #if 0
2050   IF_DEBUG(scheduler,printTSO(tso));
2051 #endif
2052 }
2053
2054 void scheduleThread(StgTSO* tso)
2055 {
2056   ACQUIRE_LOCK(&sched_mutex);
2057   scheduleThread_(tso);
2058   RELEASE_LOCK(&sched_mutex);
2059 }
2060
2061 SchedulerStatus
2062 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
2063 {       // Precondition: sched_mutex must be held
2064   StgMainThread *m;
2065
2066   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2067   m->tso = tso;
2068   m->ret = ret;
2069   m->stat = NoStatus;
2070 #if defined(RTS_SUPPORTS_THREADS)
2071   initCondition(&m->wakeup);
2072 #endif
2073
2074   /* Put the thread on the main-threads list prior to scheduling the TSO.
2075      Failure to do so introduces a race condition in the MT case (as
2076      identified by Wolfgang Thaller), whereby the new task/OS thread 
2077      created by scheduleThread_() would complete prior to the thread
2078      that spawned it managed to put 'itself' on the main-threads list.
2079      The upshot of it all being that the worker thread wouldn't get to
2080      signal the completion of the its work item for the main thread to
2081      see (==> it got stuck waiting.)    -- sof 6/02.
2082   */
2083   IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
2084   
2085   m->link = main_threads;
2086   main_threads = m;
2087
2088   scheduleThread_(tso);
2089 #if defined(THREADED_RTS)
2090   return waitThread_(m, rtsTrue);
2091 #else
2092   return waitThread_(m);
2093 #endif
2094 }
2095
2096 /* ---------------------------------------------------------------------------
2097  * initScheduler()
2098  *
2099  * Initialise the scheduler.  This resets all the queues - if the
2100  * queues contained any threads, they'll be garbage collected at the
2101  * next pass.
2102  *
2103  * ------------------------------------------------------------------------ */
2104
2105 #ifdef SMP
2106 static void
2107 term_handler(int sig STG_UNUSED)
2108 {
2109   stat_workerStop();
2110   ACQUIRE_LOCK(&term_mutex);
2111   await_death--;
2112   RELEASE_LOCK(&term_mutex);
2113   shutdownThread();
2114 }
2115 #endif
2116
2117 void 
2118 initScheduler(void)
2119 {
2120 #if defined(GRAN)
2121   nat i;
2122
2123   for (i=0; i<=MAX_PROC; i++) {
2124     run_queue_hds[i]      = END_TSO_QUEUE;
2125     run_queue_tls[i]      = END_TSO_QUEUE;
2126     blocked_queue_hds[i]  = END_TSO_QUEUE;
2127     blocked_queue_tls[i]  = END_TSO_QUEUE;
2128     ccalling_threadss[i]  = END_TSO_QUEUE;
2129     sleeping_queue        = END_TSO_QUEUE;
2130   }
2131 #else
2132   run_queue_hd      = END_TSO_QUEUE;
2133   run_queue_tl      = END_TSO_QUEUE;
2134   blocked_queue_hd  = END_TSO_QUEUE;
2135   blocked_queue_tl  = END_TSO_QUEUE;
2136   sleeping_queue    = END_TSO_QUEUE;
2137 #endif 
2138
2139   suspended_ccalling_threads  = END_TSO_QUEUE;
2140
2141   main_threads = NULL;
2142   all_threads  = END_TSO_QUEUE;
2143
2144   context_switch = 0;
2145   interrupted    = 0;
2146
2147   RtsFlags.ConcFlags.ctxtSwitchTicks =
2148       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2149       
2150 #if defined(RTS_SUPPORTS_THREADS)
2151   /* Initialise the mutex and condition variables used by
2152    * the scheduler. */
2153   initMutex(&sched_mutex);
2154   initMutex(&term_mutex);
2155   initMutex(&thread_id_mutex);
2156
2157   initCondition(&thread_ready_cond);
2158 #endif
2159   
2160 #if defined(SMP)
2161   initCondition(&gc_pending_cond);
2162 #endif
2163
2164 #if defined(RTS_SUPPORTS_THREADS)
2165   ACQUIRE_LOCK(&sched_mutex);
2166 #endif
2167
2168   /* Install the SIGHUP handler */
2169 #if defined(SMP)
2170   {
2171     struct sigaction action,oact;
2172
2173     action.sa_handler = term_handler;
2174     sigemptyset(&action.sa_mask);
2175     action.sa_flags = 0;
2176     if (sigaction(SIGTERM, &action, &oact) != 0) {
2177       barf("can't install TERM handler");
2178     }
2179   }
2180 #endif
2181
2182   /* A capability holds the state a native thread needs in
2183    * order to execute STG code. At least one capability is
2184    * floating around (only SMP builds have more than one).
2185    */
2186   initCapabilities();
2187   
2188 #if defined(RTS_SUPPORTS_THREADS)
2189     /* start our haskell execution tasks */
2190 # if defined(SMP)
2191     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2192 # else
2193     startTaskManager(0,taskStart);
2194 # endif
2195 #endif
2196
2197 #if /* defined(SMP) ||*/ defined(PAR)
2198   initSparkPools();
2199 #endif
2200
2201 #if defined(RTS_SUPPORTS_THREADS)
2202   RELEASE_LOCK(&sched_mutex);
2203 #endif
2204
2205 }
2206
2207 void
2208 exitScheduler( void )
2209 {
2210 #if defined(RTS_SUPPORTS_THREADS)
2211   stopTaskManager();
2212 #endif
2213   shutting_down_scheduler = rtsTrue;
2214 }
2215
2216 /* -----------------------------------------------------------------------------
2217    Managing the per-task allocation areas.
2218    
2219    Each capability comes with an allocation area.  These are
2220    fixed-length block lists into which allocation can be done.
2221
2222    ToDo: no support for two-space collection at the moment???
2223    -------------------------------------------------------------------------- */
2224
2225 /* -----------------------------------------------------------------------------
2226  * waitThread is the external interface for running a new computation
2227  * and waiting for the result.
2228  *
2229  * In the non-SMP case, we create a new main thread, push it on the 
2230  * main-thread stack, and invoke the scheduler to run it.  The
2231  * scheduler will return when the top main thread on the stack has
2232  * completed or died, and fill in the necessary fields of the
2233  * main_thread structure.
2234  *
2235  * In the SMP case, we create a main thread as before, but we then
2236  * create a new condition variable and sleep on it.  When our new
2237  * main thread has completed, we'll be woken up and the status/result
2238  * will be in the main_thread struct.
2239  * -------------------------------------------------------------------------- */
2240
2241 int 
2242 howManyThreadsAvail ( void )
2243 {
2244    int i = 0;
2245    StgTSO* q;
2246    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2247       i++;
2248    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2249       i++;
2250    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2251       i++;
2252    return i;
2253 }
2254
2255 void
2256 finishAllThreads ( void )
2257 {
2258    do {
2259       while (run_queue_hd != END_TSO_QUEUE) {
2260          waitThread ( run_queue_hd, NULL);
2261       }
2262       while (blocked_queue_hd != END_TSO_QUEUE) {
2263          waitThread ( blocked_queue_hd, NULL);
2264       }
2265       while (sleeping_queue != END_TSO_QUEUE) {
2266          waitThread ( blocked_queue_hd, NULL);
2267       }
2268    } while 
2269       (blocked_queue_hd != END_TSO_QUEUE || 
2270        run_queue_hd     != END_TSO_QUEUE ||
2271        sleeping_queue   != END_TSO_QUEUE);
2272 }
2273
2274 SchedulerStatus
2275 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2276
2277   StgMainThread *m;
2278   SchedulerStatus stat;
2279
2280   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2281   m->tso = tso;
2282   m->ret = ret;
2283   m->stat = NoStatus;
2284 #if defined(RTS_SUPPORTS_THREADS)
2285   initCondition(&m->wakeup);
2286 #endif
2287
2288   /* see scheduleWaitThread() comment */
2289   ACQUIRE_LOCK(&sched_mutex);
2290   m->link = main_threads;
2291   main_threads = m;
2292
2293   IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
2294 #if defined(THREADED_RTS)
2295   stat = waitThread_(m, rtsFalse);
2296 #else
2297   stat = waitThread_(m);
2298 #endif
2299   RELEASE_LOCK(&sched_mutex);
2300   return stat;
2301 }
2302
2303 static
2304 SchedulerStatus
2305 waitThread_(StgMainThread* m
2306 #if defined(THREADED_RTS)
2307             , rtsBool blockWaiting
2308 #endif
2309            )
2310 {
2311   SchedulerStatus stat;
2312
2313   // Precondition: sched_mutex must be held.
2314   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2315
2316 #if defined(RTS_SUPPORTS_THREADS)
2317
2318 # if defined(THREADED_RTS)
2319   if (!blockWaiting) {
2320     /* In the threaded case, the OS thread that called main()
2321      * gets to enter the RTS directly without going via another
2322      * task/thread.
2323      */
2324     main_main_thread = m;
2325     RELEASE_LOCK(&sched_mutex);
2326     schedule();
2327     ACQUIRE_LOCK(&sched_mutex);
2328     main_main_thread = NULL;
2329     ASSERT(m->stat != NoStatus);
2330   } else 
2331 # endif
2332   {
2333     do {
2334       waitCondition(&m->wakeup, &sched_mutex);
2335     } while (m->stat == NoStatus);
2336   }
2337 #elif defined(GRAN)
2338   /* GranSim specific init */
2339   CurrentTSO = m->tso;                // the TSO to run
2340   procStatus[MainProc] = Busy;        // status of main PE
2341   CurrentProc = MainProc;             // PE to run it on
2342
2343   RELEASE_LOCK(&sched_mutex);
2344   schedule();
2345 #else
2346   RELEASE_LOCK(&sched_mutex);
2347   schedule();
2348   ASSERT(m->stat != NoStatus);
2349 #endif
2350
2351   stat = m->stat;
2352
2353 #if defined(RTS_SUPPORTS_THREADS)
2354   closeCondition(&m->wakeup);
2355 #endif
2356
2357   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2358                               m->tso->id));
2359   stgFree(m);
2360
2361   // Postcondition: sched_mutex still held
2362   return stat;
2363 }
2364
2365 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2366 //@subsection Run queue code 
2367
2368 #if 0
2369 /* 
2370    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2371        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2372        implicit global variable that has to be correct when calling these
2373        fcts -- HWL 
2374 */
2375
2376 /* Put the new thread on the head of the runnable queue.
2377  * The caller of createThread better push an appropriate closure
2378  * on this thread's stack before the scheduler is invoked.
2379  */
2380 static /* inline */ void
2381 add_to_run_queue(tso)
2382 StgTSO* tso; 
2383 {
2384   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2385   tso->link = run_queue_hd;
2386   run_queue_hd = tso;
2387   if (run_queue_tl == END_TSO_QUEUE) {
2388     run_queue_tl = tso;
2389   }
2390 }
2391
2392 /* Put the new thread at the end of the runnable queue. */
2393 static /* inline */ void
2394 push_on_run_queue(tso)
2395 StgTSO* tso; 
2396 {
2397   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2398   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2399   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2400   if (run_queue_hd == END_TSO_QUEUE) {
2401     run_queue_hd = tso;
2402   } else {
2403     run_queue_tl->link = tso;
2404   }
2405   run_queue_tl = tso;
2406 }
2407
2408 /* 
2409    Should be inlined because it's used very often in schedule.  The tso
2410    argument is actually only needed in GranSim, where we want to have the
2411    possibility to schedule *any* TSO on the run queue, irrespective of the
2412    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2413    the run queue and dequeue the tso, adjusting the links in the queue. 
2414 */
2415 //@cindex take_off_run_queue
2416 static /* inline */ StgTSO*
2417 take_off_run_queue(StgTSO *tso) {
2418   StgTSO *t, *prev;
2419
2420   /* 
2421      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2422
2423      if tso is specified, unlink that tso from the run_queue (doesn't have
2424      to be at the beginning of the queue); GranSim only 
2425   */
2426   if (tso!=END_TSO_QUEUE) {
2427     /* find tso in queue */
2428     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2429          t!=END_TSO_QUEUE && t!=tso;
2430          prev=t, t=t->link) 
2431       /* nothing */ ;
2432     ASSERT(t==tso);
2433     /* now actually dequeue the tso */
2434     if (prev!=END_TSO_QUEUE) {
2435       ASSERT(run_queue_hd!=t);
2436       prev->link = t->link;
2437     } else {
2438       /* t is at beginning of thread queue */
2439       ASSERT(run_queue_hd==t);
2440       run_queue_hd = t->link;
2441     }
2442     /* t is at end of thread queue */
2443     if (t->link==END_TSO_QUEUE) {
2444       ASSERT(t==run_queue_tl);
2445       run_queue_tl = prev;
2446     } else {
2447       ASSERT(run_queue_tl!=t);
2448     }
2449     t->link = END_TSO_QUEUE;
2450   } else {
2451     /* take tso from the beginning of the queue; std concurrent code */
2452     t = run_queue_hd;
2453     if (t != END_TSO_QUEUE) {
2454       run_queue_hd = t->link;
2455       t->link = END_TSO_QUEUE;
2456       if (run_queue_hd == END_TSO_QUEUE) {
2457         run_queue_tl = END_TSO_QUEUE;
2458       }
2459     }
2460   }
2461   return t;
2462 }
2463
2464 #endif /* 0 */
2465
2466 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2467 //@subsection Garbage Collextion Routines
2468
2469 /* ---------------------------------------------------------------------------
2470    Where are the roots that we know about?
2471
2472         - all the threads on the runnable queue
2473         - all the threads on the blocked queue
2474         - all the threads on the sleeping queue
2475         - all the thread currently executing a _ccall_GC
2476         - all the "main threads"
2477      
2478    ------------------------------------------------------------------------ */
2479
2480 /* This has to be protected either by the scheduler monitor, or by the
2481         garbage collection monitor (probably the latter).
2482         KH @ 25/10/99
2483 */
2484
2485 void
2486 GetRoots(evac_fn evac)
2487 {
2488 #if defined(GRAN)
2489   {
2490     nat i;
2491     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2492       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2493           evac((StgClosure **)&run_queue_hds[i]);
2494       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2495           evac((StgClosure **)&run_queue_tls[i]);
2496       
2497       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2498           evac((StgClosure **)&blocked_queue_hds[i]);
2499       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2500           evac((StgClosure **)&blocked_queue_tls[i]);
2501       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2502           evac((StgClosure **)&ccalling_threads[i]);
2503     }
2504   }
2505
2506   markEventQueue();
2507
2508 #else /* !GRAN */
2509   if (run_queue_hd != END_TSO_QUEUE) {
2510       ASSERT(run_queue_tl != END_TSO_QUEUE);
2511       evac((StgClosure **)&run_queue_hd);
2512       evac((StgClosure **)&run_queue_tl);
2513   }
2514   
2515   if (blocked_queue_hd != END_TSO_QUEUE) {
2516       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2517       evac((StgClosure **)&blocked_queue_hd);
2518       evac((StgClosure **)&blocked_queue_tl);
2519   }
2520   
2521   if (sleeping_queue != END_TSO_QUEUE) {
2522       evac((StgClosure **)&sleeping_queue);
2523   }
2524 #endif 
2525
2526   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2527       evac((StgClosure **)&suspended_ccalling_threads);
2528   }
2529
2530 #if defined(PAR) || defined(GRAN)
2531   markSparkQueue(evac);
2532 #endif
2533
2534 #if defined(RTS_USER_SIGNALS)
2535   // mark the signal handlers (signals should be already blocked)
2536   markSignalHandlers(evac);
2537 #endif
2538
2539   // main threads which have completed need to be retained until they
2540   // are dealt with in the main scheduler loop.  They won't be
2541   // retained any other way: the GC will drop them from the
2542   // all_threads list, so we have to be careful to treat them as roots
2543   // here.
2544   { 
2545       StgMainThread *m;
2546       for (m = main_threads; m != NULL; m = m->link) {
2547           switch (m->tso->what_next) {
2548           case ThreadComplete:
2549           case ThreadKilled:
2550               evac((StgClosure **)&m->tso);
2551               break;
2552           default:
2553               break;
2554           }
2555       }
2556   }
2557 }
2558
2559 /* -----------------------------------------------------------------------------
2560    performGC
2561
2562    This is the interface to the garbage collector from Haskell land.
2563    We provide this so that external C code can allocate and garbage
2564    collect when called from Haskell via _ccall_GC.
2565
2566    It might be useful to provide an interface whereby the programmer
2567    can specify more roots (ToDo).
2568    
2569    This needs to be protected by the GC condition variable above.  KH.
2570    -------------------------------------------------------------------------- */
2571
2572 static void (*extra_roots)(evac_fn);
2573
2574 void
2575 performGC(void)
2576 {
2577   /* Obligated to hold this lock upon entry */
2578   ACQUIRE_LOCK(&sched_mutex);
2579   GarbageCollect(GetRoots,rtsFalse);
2580   RELEASE_LOCK(&sched_mutex);
2581 }
2582
2583 void
2584 performMajorGC(void)
2585 {
2586   ACQUIRE_LOCK(&sched_mutex);
2587   GarbageCollect(GetRoots,rtsTrue);
2588   RELEASE_LOCK(&sched_mutex);
2589 }
2590
2591 static void
2592 AllRoots(evac_fn evac)
2593 {
2594     GetRoots(evac);             // the scheduler's roots
2595     extra_roots(evac);          // the user's roots
2596 }
2597
2598 void
2599 performGCWithRoots(void (*get_roots)(evac_fn))
2600 {
2601   ACQUIRE_LOCK(&sched_mutex);
2602   extra_roots = get_roots;
2603   GarbageCollect(AllRoots,rtsFalse);
2604   RELEASE_LOCK(&sched_mutex);
2605 }
2606
2607 /* -----------------------------------------------------------------------------
2608    Stack overflow
2609
2610    If the thread has reached its maximum stack size, then raise the
2611    StackOverflow exception in the offending thread.  Otherwise
2612    relocate the TSO into a larger chunk of memory and adjust its stack
2613    size appropriately.
2614    -------------------------------------------------------------------------- */
2615
2616 static StgTSO *
2617 threadStackOverflow(StgTSO *tso)
2618 {
2619   nat new_stack_size, new_tso_size, stack_words;
2620   StgPtr new_sp;
2621   StgTSO *dest;
2622
2623   IF_DEBUG(sanity,checkTSO(tso));
2624   if (tso->stack_size >= tso->max_stack_size) {
2625
2626     IF_DEBUG(gc,
2627              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2628                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2629              /* If we're debugging, just print out the top of the stack */
2630              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2631                                               tso->sp+64)));
2632
2633     /* Send this thread the StackOverflow exception */
2634     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2635     return tso;
2636   }
2637
2638   /* Try to double the current stack size.  If that takes us over the
2639    * maximum stack size for this thread, then use the maximum instead.
2640    * Finally round up so the TSO ends up as a whole number of blocks.
2641    */
2642   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2643   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2644                                        TSO_STRUCT_SIZE)/sizeof(W_);
2645   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2646   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2647
2648   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2649
2650   dest = (StgTSO *)allocate(new_tso_size);
2651   TICK_ALLOC_TSO(new_stack_size,0);
2652
2653   /* copy the TSO block and the old stack into the new area */
2654   memcpy(dest,tso,TSO_STRUCT_SIZE);
2655   stack_words = tso->stack + tso->stack_size - tso->sp;
2656   new_sp = (P_)dest + new_tso_size - stack_words;
2657   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2658
2659   /* relocate the stack pointers... */
2660   dest->sp         = new_sp;
2661   dest->stack_size = new_stack_size;
2662         
2663   /* Mark the old TSO as relocated.  We have to check for relocated
2664    * TSOs in the garbage collector and any primops that deal with TSOs.
2665    *
2666    * It's important to set the sp value to just beyond the end
2667    * of the stack, so we don't attempt to scavenge any part of the
2668    * dead TSO's stack.
2669    */
2670   tso->what_next = ThreadRelocated;
2671   tso->link = dest;
2672   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2673   tso->why_blocked = NotBlocked;
2674   dest->mut_link = NULL;
2675
2676   IF_PAR_DEBUG(verbose,
2677                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2678                      tso->id, tso, tso->stack_size);
2679                /* If we're debugging, just print out the top of the stack */
2680                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2681                                                 tso->sp+64)));
2682   
2683   IF_DEBUG(sanity,checkTSO(tso));
2684 #if 0
2685   IF_DEBUG(scheduler,printTSO(dest));
2686 #endif
2687
2688   return dest;
2689 }
2690
2691 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2692 //@subsection Blocking Queue Routines
2693
2694 /* ---------------------------------------------------------------------------
2695    Wake up a queue that was blocked on some resource.
2696    ------------------------------------------------------------------------ */
2697
2698 #if defined(GRAN)
2699 static inline void
2700 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2701 {
2702 }
2703 #elif defined(PAR)
2704 static inline void
2705 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2706 {
2707   /* write RESUME events to log file and
2708      update blocked and fetch time (depending on type of the orig closure) */
2709   if (RtsFlags.ParFlags.ParStats.Full) {
2710     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2711                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2712                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2713     if (EMPTY_RUN_QUEUE())
2714       emitSchedule = rtsTrue;
2715
2716     switch (get_itbl(node)->type) {
2717         case FETCH_ME_BQ:
2718           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2719           break;
2720         case RBH:
2721         case FETCH_ME:
2722         case BLACKHOLE_BQ:
2723           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2724           break;
2725 #ifdef DIST
2726         case MVAR:
2727           break;
2728 #endif    
2729         default:
2730           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2731         }
2732       }
2733 }
2734 #endif
2735
2736 #if defined(GRAN)
2737 static StgBlockingQueueElement *
2738 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2739 {
2740     StgTSO *tso;
2741     PEs node_loc, tso_loc;
2742
2743     node_loc = where_is(node); // should be lifted out of loop
2744     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2745     tso_loc = where_is((StgClosure *)tso);
2746     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2747       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2748       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2749       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2750       // insertThread(tso, node_loc);
2751       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2752                 ResumeThread,
2753                 tso, node, (rtsSpark*)NULL);
2754       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2755       // len_local++;
2756       // len++;
2757     } else { // TSO is remote (actually should be FMBQ)
2758       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2759                                   RtsFlags.GranFlags.Costs.gunblocktime +
2760                                   RtsFlags.GranFlags.Costs.latency;
2761       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2762                 UnblockThread,
2763                 tso, node, (rtsSpark*)NULL);
2764       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2765       // len++;
2766     }
2767     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2768     IF_GRAN_DEBUG(bq,
2769                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2770                           (node_loc==tso_loc ? "Local" : "Global"), 
2771                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2772     tso->block_info.closure = NULL;
2773     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2774                              tso->id, tso));
2775 }
2776 #elif defined(PAR)
2777 static StgBlockingQueueElement *
2778 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2779 {
2780     StgBlockingQueueElement *next;
2781
2782     switch (get_itbl(bqe)->type) {
2783     case TSO:
2784       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2785       /* if it's a TSO just push it onto the run_queue */
2786       next = bqe->link;
2787       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2788       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2789       THREAD_RUNNABLE();
2790       unblockCount(bqe, node);
2791       /* reset blocking status after dumping event */
2792       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2793       break;
2794
2795     case BLOCKED_FETCH:
2796       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2797       next = bqe->link;
2798       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2799       PendingFetches = (StgBlockedFetch *)bqe;
2800       break;
2801
2802 # if defined(DEBUG)
2803       /* can ignore this case in a non-debugging setup; 
2804          see comments on RBHSave closures above */
2805     case CONSTR:
2806       /* check that the closure is an RBHSave closure */
2807       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2808              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2809              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2810       break;
2811
2812     default:
2813       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2814            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2815            (StgClosure *)bqe);
2816 # endif
2817     }
2818   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2819   return next;
2820 }
2821
2822 #else /* !GRAN && !PAR */
2823 static StgTSO *
2824 unblockOneLocked(StgTSO *tso)
2825 {
2826   StgTSO *next;
2827
2828   ASSERT(get_itbl(tso)->type == TSO);
2829   ASSERT(tso->why_blocked != NotBlocked);
2830   tso->why_blocked = NotBlocked;
2831   next = tso->link;
2832   PUSH_ON_RUN_QUEUE(tso);
2833   THREAD_RUNNABLE();
2834   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2835   return next;
2836 }
2837 #endif
2838
2839 #if defined(GRAN) || defined(PAR)
2840 inline StgBlockingQueueElement *
2841 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2842 {
2843   ACQUIRE_LOCK(&sched_mutex);
2844   bqe = unblockOneLocked(bqe, node);
2845   RELEASE_LOCK(&sched_mutex);
2846   return bqe;
2847 }
2848 #else
2849 inline StgTSO *
2850 unblockOne(StgTSO *tso)
2851 {
2852   ACQUIRE_LOCK(&sched_mutex);
2853   tso = unblockOneLocked(tso);
2854   RELEASE_LOCK(&sched_mutex);
2855   return tso;
2856 }
2857 #endif
2858
2859 #if defined(GRAN)
2860 void 
2861 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2862 {
2863   StgBlockingQueueElement *bqe;
2864   PEs node_loc;
2865   nat len = 0; 
2866
2867   IF_GRAN_DEBUG(bq, 
2868                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2869                       node, CurrentProc, CurrentTime[CurrentProc], 
2870                       CurrentTSO->id, CurrentTSO));
2871
2872   node_loc = where_is(node);
2873
2874   ASSERT(q == END_BQ_QUEUE ||
2875          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2876          get_itbl(q)->type == CONSTR); // closure (type constructor)
2877   ASSERT(is_unique(node));
2878
2879   /* FAKE FETCH: magically copy the node to the tso's proc;
2880      no Fetch necessary because in reality the node should not have been 
2881      moved to the other PE in the first place
2882   */
2883   if (CurrentProc!=node_loc) {
2884     IF_GRAN_DEBUG(bq, 
2885                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2886                         node, node_loc, CurrentProc, CurrentTSO->id, 
2887                         // CurrentTSO, where_is(CurrentTSO),
2888                         node->header.gran.procs));
2889     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2890     IF_GRAN_DEBUG(bq, 
2891                   belch("## new bitmask of node %p is %#x",
2892                         node, node->header.gran.procs));
2893     if (RtsFlags.GranFlags.GranSimStats.Global) {
2894       globalGranStats.tot_fake_fetches++;
2895     }
2896   }
2897
2898   bqe = q;
2899   // ToDo: check: ASSERT(CurrentProc==node_loc);
2900   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2901     //next = bqe->link;
2902     /* 
2903        bqe points to the current element in the queue
2904        next points to the next element in the queue
2905     */
2906     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2907     //tso_loc = where_is(tso);
2908     len++;
2909     bqe = unblockOneLocked(bqe, node);
2910   }
2911
2912   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2913      the closure to make room for the anchor of the BQ */
2914   if (bqe!=END_BQ_QUEUE) {
2915     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2916     /*
2917     ASSERT((info_ptr==&RBH_Save_0_info) ||
2918            (info_ptr==&RBH_Save_1_info) ||
2919            (info_ptr==&RBH_Save_2_info));
2920     */
2921     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2922     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2923     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2924
2925     IF_GRAN_DEBUG(bq,
2926                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2927                         node, info_type(node)));
2928   }
2929
2930   /* statistics gathering */
2931   if (RtsFlags.GranFlags.GranSimStats.Global) {
2932     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2933     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2934     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2935     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2936   }
2937   IF_GRAN_DEBUG(bq,
2938                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2939                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2940 }
2941 #elif defined(PAR)
2942 void 
2943 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2944 {
2945   StgBlockingQueueElement *bqe;
2946
2947   ACQUIRE_LOCK(&sched_mutex);
2948
2949   IF_PAR_DEBUG(verbose, 
2950                belch("##-_ AwBQ for node %p on [%x]: ",
2951                      node, mytid));
2952 #ifdef DIST  
2953   //RFP
2954   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2955     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2956     return;
2957   }
2958 #endif
2959   
2960   ASSERT(q == END_BQ_QUEUE ||
2961          get_itbl(q)->type == TSO ||           
2962          get_itbl(q)->type == BLOCKED_FETCH || 
2963          get_itbl(q)->type == CONSTR); 
2964
2965   bqe = q;
2966   while (get_itbl(bqe)->type==TSO || 
2967          get_itbl(bqe)->type==BLOCKED_FETCH) {
2968     bqe = unblockOneLocked(bqe, node);
2969   }
2970   RELEASE_LOCK(&sched_mutex);
2971 }
2972
2973 #else   /* !GRAN && !PAR */
2974
2975 #ifdef RTS_SUPPORTS_THREADS
2976 void
2977 awakenBlockedQueueNoLock(StgTSO *tso)
2978 {
2979   while (tso != END_TSO_QUEUE) {
2980     tso = unblockOneLocked(tso);
2981   }
2982 }
2983 #endif
2984
2985 void
2986 awakenBlockedQueue(StgTSO *tso)
2987 {
2988   ACQUIRE_LOCK(&sched_mutex);
2989   while (tso != END_TSO_QUEUE) {
2990     tso = unblockOneLocked(tso);
2991   }
2992   RELEASE_LOCK(&sched_mutex);
2993 }
2994 #endif
2995
2996 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2997 //@subsection Exception Handling Routines
2998
2999 /* ---------------------------------------------------------------------------
3000    Interrupt execution
3001    - usually called inside a signal handler so it mustn't do anything fancy.   
3002    ------------------------------------------------------------------------ */
3003
3004 void
3005 interruptStgRts(void)
3006 {
3007     interrupted    = 1;
3008     context_switch = 1;
3009 }
3010
3011 /* -----------------------------------------------------------------------------
3012    Unblock a thread
3013
3014    This is for use when we raise an exception in another thread, which
3015    may be blocked.
3016    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3017    -------------------------------------------------------------------------- */
3018
3019 #if defined(GRAN) || defined(PAR)
3020 /*
3021   NB: only the type of the blocking queue is different in GranSim and GUM
3022       the operations on the queue-elements are the same
3023       long live polymorphism!
3024
3025   Locks: sched_mutex is held upon entry and exit.
3026
3027 */
3028 static void
3029 unblockThread(StgTSO *tso)
3030 {
3031   StgBlockingQueueElement *t, **last;
3032
3033   switch (tso->why_blocked) {
3034
3035   case NotBlocked:
3036     return;  /* not blocked */
3037
3038   case BlockedOnMVar:
3039     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3040     {
3041       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3042       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3043
3044       last = (StgBlockingQueueElement **)&mvar->head;
3045       for (t = (StgBlockingQueueElement *)mvar->head; 
3046            t != END_BQ_QUEUE; 
3047            last = &t->link, last_tso = t, t = t->link) {
3048         if (t == (StgBlockingQueueElement *)tso) {
3049           *last = (StgBlockingQueueElement *)tso->link;
3050           if (mvar->tail == tso) {
3051             mvar->tail = (StgTSO *)last_tso;
3052           }
3053           goto done;
3054         }
3055       }
3056       barf("unblockThread (MVAR): TSO not found");
3057     }
3058
3059   case BlockedOnBlackHole:
3060     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3061     {
3062       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3063
3064       last = &bq->blocking_queue;
3065       for (t = bq->blocking_queue; 
3066            t != END_BQ_QUEUE; 
3067            last = &t->link, t = t->link) {
3068         if (t == (StgBlockingQueueElement *)tso) {
3069           *last = (StgBlockingQueueElement *)tso->link;
3070           goto done;
3071         }
3072       }
3073       barf("unblockThread (BLACKHOLE): TSO not found");
3074     }
3075
3076   case BlockedOnException:
3077     {
3078       StgTSO *target  = tso->block_info.tso;
3079
3080       ASSERT(get_itbl(target)->type == TSO);
3081
3082       if (target->what_next == ThreadRelocated) {
3083           target = target->link;
3084           ASSERT(get_itbl(target)->type == TSO);
3085       }
3086
3087       ASSERT(target->blocked_exceptions != NULL);
3088
3089       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3090       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3091            t != END_BQ_QUEUE; 
3092            last = &t->link, t = t->link) {
3093         ASSERT(get_itbl(t)->type == TSO);
3094         if (t == (StgBlockingQueueElement *)tso) {
3095           *last = (StgBlockingQueueElement *)tso->link;
3096           goto done;
3097         }
3098       }
3099       barf("unblockThread (Exception): TSO not found");
3100     }
3101
3102   case BlockedOnRead:
3103   case BlockedOnWrite:
3104     {
3105       /* take TSO off blocked_queue */
3106       StgBlockingQueueElement *prev = NULL;
3107       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3108            prev = t, t = t->link) {
3109         if (t == (StgBlockingQueueElement *)tso) {
3110           if (prev == NULL) {
3111             blocked_queue_hd = (StgTSO *)t->link;
3112             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3113               blocked_queue_tl = END_TSO_QUEUE;
3114             }
3115           } else {
3116             prev->link = t->link;
3117             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3118               blocked_queue_tl = (StgTSO *)prev;
3119             }
3120           }
3121           goto done;
3122         }
3123       }
3124       barf("unblockThread (I/O): TSO not found");
3125     }
3126
3127   case BlockedOnDelay:
3128     {
3129       /* take TSO off sleeping_queue */
3130       StgBlockingQueueElement *prev = NULL;
3131       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3132            prev = t, t = t->link) {
3133         if (t == (StgBlockingQueueElement *)tso) {
3134           if (prev == NULL) {
3135             sleeping_queue = (StgTSO *)t->link;
3136           } else {
3137             prev->link = t->link;
3138           }
3139           goto done;
3140         }
3141       }
3142       barf("unblockThread (I/O): TSO not found");
3143     }
3144
3145   default:
3146     barf("unblockThread");
3147   }
3148
3149  done:
3150   tso->link = END_TSO_QUEUE;
3151   tso->why_blocked = NotBlocked;
3152   tso->block_info.closure = NULL;
3153   PUSH_ON_RUN_QUEUE(tso);
3154 }
3155 #else
3156 static void
3157 unblockThread(StgTSO *tso)
3158 {
3159   StgTSO *t, **last;
3160   
3161   /* To avoid locking unnecessarily. */
3162   if (tso->why_blocked == NotBlocked) {
3163     return;
3164   }
3165
3166   switch (tso->why_blocked) {
3167
3168   case BlockedOnMVar:
3169     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3170     {
3171       StgTSO *last_tso = END_TSO_QUEUE;
3172       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3173
3174       last = &mvar->head;
3175       for (t = mvar->head; t != END_TSO_QUEUE; 
3176            last = &t->link, last_tso = t, t = t->link) {
3177         if (t == tso) {
3178           *last = tso->link;
3179           if (mvar->tail == tso) {
3180             mvar->tail = last_tso;
3181           }
3182           goto done;
3183         }
3184       }
3185       barf("unblockThread (MVAR): TSO not found");
3186     }
3187
3188   case BlockedOnBlackHole:
3189     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3190     {
3191       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3192
3193       last = &bq->blocking_queue;
3194       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3195            last = &t->link, t = t->link) {
3196         if (t == tso) {
3197           *last = tso->link;
3198           goto done;
3199         }
3200       }
3201       barf("unblockThread (BLACKHOLE): TSO not found");
3202     }
3203
3204   case BlockedOnException:
3205     {
3206       StgTSO *target  = tso->block_info.tso;
3207
3208       ASSERT(get_itbl(target)->type == TSO);
3209
3210       while (target->what_next == ThreadRelocated) {
3211           target = target->link;
3212           ASSERT(get_itbl(target)->type == TSO);
3213       }
3214       
3215       ASSERT(target->blocked_exceptions != NULL);
3216
3217       last = &target->blocked_exceptions;
3218       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3219            last = &t->link, t = t->link) {
3220         ASSERT(get_itbl(t)->type == TSO);
3221         if (t == tso) {
3222           *last = tso->link;
3223           goto done;
3224         }
3225       }
3226       barf("unblockThread (Exception): TSO not found");
3227     }
3228
3229   case BlockedOnRead:
3230   case BlockedOnWrite:
3231     {
3232       StgTSO *prev = NULL;
3233       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3234            prev = t, t = t->link) {
3235         if (t == tso) {
3236           if (prev == NULL) {
3237             blocked_queue_hd = t->link;
3238             if (blocked_queue_tl == t) {
3239               blocked_queue_tl = END_TSO_QUEUE;
3240             }
3241           } else {
3242             prev->link = t->link;
3243             if (blocked_queue_tl == t) {
3244               blocked_queue_tl = prev;
3245             }
3246           }
3247           goto done;
3248         }
3249       }
3250       barf("unblockThread (I/O): TSO not found");
3251     }
3252
3253   case BlockedOnDelay:
3254     {
3255       StgTSO *prev = NULL;
3256       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3257            prev = t, t = t->link) {
3258         if (t == tso) {
3259           if (prev == NULL) {
3260             sleeping_queue = t->link;
3261           } else {
3262             prev->link = t->link;
3263           }
3264           goto done;
3265         }
3266       }
3267       barf("unblockThread (I/O): TSO not found");
3268     }
3269
3270   default:
3271     barf("unblockThread");
3272   }
3273
3274  done:
3275   tso->link = END_TSO_QUEUE;
3276   tso->why_blocked = NotBlocked;
3277   tso->block_info.closure = NULL;
3278   PUSH_ON_RUN_QUEUE(tso);
3279 }
3280 #endif
3281
3282 /* -----------------------------------------------------------------------------
3283  * raiseAsync()
3284  *
3285  * The following function implements the magic for raising an
3286  * asynchronous exception in an existing thread.
3287  *
3288  * We first remove the thread from any queue on which it might be
3289  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3290  *
3291  * We strip the stack down to the innermost CATCH_FRAME, building
3292  * thunks in the heap for all the active computations, so they can 
3293  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3294  * an application of the handler to the exception, and push it on
3295  * the top of the stack.
3296  * 
3297  * How exactly do we save all the active computations?  We create an
3298  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3299  * AP_STACKs pushes everything from the corresponding update frame
3300  * upwards onto the stack.  (Actually, it pushes everything up to the
3301  * next update frame plus a pointer to the next AP_STACK object.
3302  * Entering the next AP_STACK object pushes more onto the stack until we
3303  * reach the last AP_STACK object - at which point the stack should look
3304  * exactly as it did when we killed the TSO and we can continue
3305  * execution by entering the closure on top of the stack.
3306  *
3307  * We can also kill a thread entirely - this happens if either (a) the 
3308  * exception passed to raiseAsync is NULL, or (b) there's no
3309  * CATCH_FRAME on the stack.  In either case, we strip the entire
3310  * stack and replace the thread with a zombie.
3311  *
3312  * Locks: sched_mutex held upon entry nor exit.
3313  *
3314  * -------------------------------------------------------------------------- */
3315  
3316 void 
3317 deleteThread(StgTSO *tso)
3318 {
3319   raiseAsync(tso,NULL);
3320 }
3321
3322 void
3323 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3324 {
3325   /* When raising async exs from contexts where sched_mutex isn't held;
3326      use raiseAsyncWithLock(). */
3327   ACQUIRE_LOCK(&sched_mutex);
3328   raiseAsync(tso,exception);
3329   RELEASE_LOCK(&sched_mutex);
3330 }
3331
3332 void
3333 raiseAsync(StgTSO *tso, StgClosure *exception)
3334 {
3335     StgRetInfoTable *info;
3336     StgPtr sp;
3337   
3338     // Thread already dead?
3339     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3340         return;
3341     }
3342
3343     IF_DEBUG(scheduler, 
3344              sched_belch("raising exception in thread %ld.", tso->id));
3345     
3346     // Remove it from any blocking queues
3347     unblockThread(tso);
3348
3349     sp = tso->sp;
3350     
3351     // The stack freezing code assumes there's a closure pointer on
3352     // the top of the stack, so we have to arrange that this is the case...
3353     //
3354     if (sp[0] == (W_)&stg_enter_info) {
3355         sp++;
3356     } else {
3357         sp--;
3358         sp[0] = (W_)&stg_dummy_ret_closure;
3359     }
3360
3361     while (1) {
3362         nat i;
3363
3364         // 1. Let the top of the stack be the "current closure"
3365         //
3366         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3367         // CATCH_FRAME.
3368         //
3369         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3370         // current closure applied to the chunk of stack up to (but not
3371         // including) the update frame.  This closure becomes the "current
3372         // closure".  Go back to step 2.
3373         //
3374         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3375         // top of the stack applied to the exception.
3376         // 
3377         // 5. If it's a STOP_FRAME, then kill the thread.
3378         
3379         StgPtr frame;
3380         
3381         frame = sp + 1;
3382         info = get_ret_itbl((StgClosure *)frame);
3383         
3384         while (info->i.type != UPDATE_FRAME
3385                && (info->i.type != CATCH_FRAME || exception == NULL)
3386                && info->i.type != STOP_FRAME) {
3387             frame += stack_frame_sizeW((StgClosure *)frame);
3388             info = get_ret_itbl((StgClosure *)frame);
3389         }
3390         
3391         switch (info->i.type) {
3392             
3393         case CATCH_FRAME:
3394             // If we find a CATCH_FRAME, and we've got an exception to raise,
3395             // then build the THUNK raise(exception), and leave it on
3396             // top of the CATCH_FRAME ready to enter.
3397             //
3398         {
3399 #ifdef PROFILING
3400             StgCatchFrame *cf = (StgCatchFrame *)frame;
3401 #endif
3402             StgClosure *raise;
3403             
3404             // we've got an exception to raise, so let's pass it to the
3405             // handler in this frame.
3406             //
3407             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3408             TICK_ALLOC_SE_THK(1,0);
3409             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3410             raise->payload[0] = exception;
3411             
3412             // throw away the stack from Sp up to the CATCH_FRAME.
3413             //
3414             sp = frame - 1;
3415             
3416             /* Ensure that async excpetions are blocked now, so we don't get
3417              * a surprise exception before we get around to executing the
3418              * handler.
3419              */
3420             if (tso->blocked_exceptions == NULL) {
3421                 tso->blocked_exceptions = END_TSO_QUEUE;
3422             }
3423             
3424             /* Put the newly-built THUNK on top of the stack, ready to execute
3425              * when the thread restarts.
3426              */
3427             sp[0] = (W_)raise;
3428             sp[-1] = (W_)&stg_enter_info;
3429             tso->sp = sp-1;
3430             tso->what_next = ThreadRunGHC;
3431             IF_DEBUG(sanity, checkTSO(tso));
3432             return;
3433         }
3434         
3435         case UPDATE_FRAME:
3436         {
3437             StgAP_STACK * ap;
3438             nat words;
3439             
3440             // First build an AP_STACK consisting of the stack chunk above the
3441             // current update frame, with the top word on the stack as the
3442             // fun field.
3443             //
3444             words = frame - sp - 1;
3445             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3446             
3447             ap->size = words;
3448             ap->fun  = (StgClosure *)sp[0];
3449             sp++;
3450             for(i=0; i < (nat)words; ++i) {
3451                 ap->payload[i] = (StgClosure *)*sp++;
3452             }
3453             
3454             SET_HDR(ap,&stg_AP_STACK_info,
3455                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3456             TICK_ALLOC_UP_THK(words+1,0);
3457             
3458             IF_DEBUG(scheduler,
3459                      fprintf(stderr,  "scheduler: Updating ");
3460                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3461                      fprintf(stderr,  " with ");
3462                      printObj((StgClosure *)ap);
3463                 );
3464
3465             // Replace the updatee with an indirection - happily
3466             // this will also wake up any threads currently
3467             // waiting on the result.
3468             //
3469             // Warning: if we're in a loop, more than one update frame on
3470             // the stack may point to the same object.  Be careful not to
3471             // overwrite an IND_OLDGEN in this case, because we'll screw
3472             // up the mutable lists.  To be on the safe side, don't
3473             // overwrite any kind of indirection at all.  See also
3474             // threadSqueezeStack in GC.c, where we have to make a similar
3475             // check.
3476             //
3477             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3478                 // revert the black hole
3479                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3480             }
3481             sp += sizeofW(StgUpdateFrame) - 1;
3482             sp[0] = (W_)ap; // push onto stack
3483             break;
3484         }
3485         
3486         case STOP_FRAME:
3487             // We've stripped the entire stack, the thread is now dead.
3488             sp += sizeofW(StgStopFrame);
3489             tso->what_next = ThreadKilled;
3490             tso->sp = sp;
3491             return;
3492             
3493         default:
3494             barf("raiseAsync");
3495         }
3496     }
3497     barf("raiseAsync");
3498 }
3499
3500 /* -----------------------------------------------------------------------------
3501    resurrectThreads is called after garbage collection on the list of
3502    threads found to be garbage.  Each of these threads will be woken
3503    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3504    on an MVar, or NonTermination if the thread was blocked on a Black
3505    Hole.
3506
3507    Locks: sched_mutex isn't held upon entry nor exit.
3508    -------------------------------------------------------------------------- */
3509
3510 void
3511 resurrectThreads( StgTSO *threads )
3512 {
3513   StgTSO *tso, *next;
3514
3515   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3516     next = tso->global_link;
3517     tso->global_link = all_threads;
3518     all_threads = tso;
3519     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3520
3521     switch (tso->why_blocked) {
3522     case BlockedOnMVar:
3523     case BlockedOnException:
3524       /* Called by GC - sched_mutex lock is currently held. */
3525       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3526       break;
3527     case BlockedOnBlackHole:
3528       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3529       break;
3530     case NotBlocked:
3531       /* This might happen if the thread was blocked on a black hole
3532        * belonging to a thread that we've just woken up (raiseAsync
3533        * can wake up threads, remember...).
3534        */
3535       continue;
3536     default:
3537       barf("resurrectThreads: thread blocked in a strange way");
3538     }
3539   }
3540 }
3541
3542 /* -----------------------------------------------------------------------------
3543  * Blackhole detection: if we reach a deadlock, test whether any
3544  * threads are blocked on themselves.  Any threads which are found to
3545  * be self-blocked get sent a NonTermination exception.
3546  *
3547  * This is only done in a deadlock situation in order to avoid
3548  * performance overhead in the normal case.
3549  *
3550  * Locks: sched_mutex is held upon entry and exit.
3551  * -------------------------------------------------------------------------- */
3552
3553 static void
3554 detectBlackHoles( void )
3555 {
3556     StgTSO *tso = all_threads;
3557     StgClosure *frame;
3558     StgClosure *blocked_on;
3559     StgRetInfoTable *info;
3560
3561     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3562
3563         while (tso->what_next == ThreadRelocated) {
3564             tso = tso->link;
3565             ASSERT(get_itbl(tso)->type == TSO);
3566         }
3567       
3568         if (tso->why_blocked != BlockedOnBlackHole) {
3569             continue;
3570         }
3571         blocked_on = tso->block_info.closure;
3572
3573         frame = (StgClosure *)tso->sp;
3574
3575         while(1) {
3576             info = get_ret_itbl(frame);
3577             switch (info->i.type) {
3578             case UPDATE_FRAME:
3579                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3580                     /* We are blocking on one of our own computations, so
3581                      * send this thread the NonTermination exception.  
3582                      */
3583                     IF_DEBUG(scheduler, 
3584                              sched_belch("thread %d is blocked on itself", tso->id));
3585                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3586                     goto done;
3587                 }
3588                 
3589                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3590                 continue;
3591
3592             case STOP_FRAME:
3593                 goto done;
3594
3595                 // normal stack frames; do nothing except advance the pointer
3596             default:
3597                 (StgPtr)frame += stack_frame_sizeW(frame);
3598             }
3599         }   
3600         done: ;
3601     }
3602 }
3603
3604 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3605 //@subsection Debugging Routines
3606
3607 /* -----------------------------------------------------------------------------
3608  * Debugging: why is a thread blocked
3609  * [Also provides useful information when debugging threaded programs
3610  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3611    -------------------------------------------------------------------------- */
3612
3613 static
3614 void
3615 printThreadBlockage(StgTSO *tso)
3616 {
3617   switch (tso->why_blocked) {
3618   case BlockedOnRead:
3619     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3620     break;
3621   case BlockedOnWrite:
3622     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3623     break;
3624   case BlockedOnDelay:
3625     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3626     break;
3627   case BlockedOnMVar:
3628     fprintf(stderr,"is blocked on an MVar");
3629     break;
3630   case BlockedOnException:
3631     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3632             tso->block_info.tso->id);
3633     break;
3634   case BlockedOnBlackHole:
3635     fprintf(stderr,"is blocked on a black hole");
3636     break;
3637   case NotBlocked:
3638     fprintf(stderr,"is not blocked");
3639     break;
3640 #if defined(PAR)
3641   case BlockedOnGA:
3642     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3643             tso->block_info.closure, info_type(tso->block_info.closure));
3644     break;
3645   case BlockedOnGA_NoSend:
3646     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3647             tso->block_info.closure, info_type(tso->block_info.closure));
3648     break;
3649 #endif
3650 #if defined(RTS_SUPPORTS_THREADS)
3651   case BlockedOnCCall:
3652     fprintf(stderr,"is blocked on an external call");
3653     break;
3654   case BlockedOnCCall_NoUnblockExc:
3655     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3656     break;
3657 #endif
3658   default:
3659     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3660          tso->why_blocked, tso->id, tso);
3661   }
3662 }
3663
3664 static
3665 void
3666 printThreadStatus(StgTSO *tso)
3667 {
3668   switch (tso->what_next) {
3669   case ThreadKilled:
3670     fprintf(stderr,"has been killed");
3671     break;
3672   case ThreadComplete:
3673     fprintf(stderr,"has completed");
3674     break;
3675   default:
3676     printThreadBlockage(tso);
3677   }
3678 }
3679
3680 void
3681 printAllThreads(void)
3682 {
3683   StgTSO *t;
3684   void *label;
3685
3686 # if defined(GRAN)
3687   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3688   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3689                        time_string, rtsFalse/*no commas!*/);
3690
3691   fprintf(stderr, "all threads at [%s]:\n", time_string);
3692 # elif defined(PAR)
3693   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3694   ullong_format_string(CURRENT_TIME,
3695                        time_string, rtsFalse/*no commas!*/);
3696
3697   fprintf(stderr,"all threads at [%s]:\n", time_string);
3698 # else
3699   fprintf(stderr,"all threads:\n");
3700 # endif
3701
3702   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3703     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3704     label = lookupThreadLabel((StgWord)t);
3705     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3706     printThreadStatus(t);
3707     fprintf(stderr,"\n");
3708   }
3709 }
3710     
3711 #ifdef DEBUG
3712
3713 /* 
3714    Print a whole blocking queue attached to node (debugging only).
3715 */
3716 //@cindex print_bq
3717 # if defined(PAR)
3718 void 
3719 print_bq (StgClosure *node)
3720 {
3721   StgBlockingQueueElement *bqe;
3722   StgTSO *tso;
3723   rtsBool end;
3724
3725   fprintf(stderr,"## BQ of closure %p (%s): ",
3726           node, info_type(node));
3727
3728   /* should cover all closures that may have a blocking queue */
3729   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3730          get_itbl(node)->type == FETCH_ME_BQ ||
3731          get_itbl(node)->type == RBH ||
3732          get_itbl(node)->type == MVAR);
3733     
3734   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3735
3736   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3737 }
3738
3739 /* 
3740    Print a whole blocking queue starting with the element bqe.
3741 */
3742 void 
3743 print_bqe (StgBlockingQueueElement *bqe)
3744 {
3745   rtsBool end;
3746
3747   /* 
3748      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3749   */
3750   for (end = (bqe==END_BQ_QUEUE);
3751        !end; // iterate until bqe points to a CONSTR
3752        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3753        bqe = end ? END_BQ_QUEUE : bqe->link) {
3754     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3755     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3756     /* types of closures that may appear in a blocking queue */
3757     ASSERT(get_itbl(bqe)->type == TSO ||           
3758            get_itbl(bqe)->type == BLOCKED_FETCH || 
3759            get_itbl(bqe)->type == CONSTR); 
3760     /* only BQs of an RBH end with an RBH_Save closure */
3761     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3762
3763     switch (get_itbl(bqe)->type) {
3764     case TSO:
3765       fprintf(stderr," TSO %u (%x),",
3766               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3767       break;
3768     case BLOCKED_FETCH:
3769       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3770               ((StgBlockedFetch *)bqe)->node, 
3771               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3772               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3773               ((StgBlockedFetch *)bqe)->ga.weight);
3774       break;
3775     case CONSTR:
3776       fprintf(stderr," %s (IP %p),",
3777               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3778                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3779                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3780                "RBH_Save_?"), get_itbl(bqe));
3781       break;
3782     default:
3783       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3784            info_type((StgClosure *)bqe)); // , node, info_type(node));
3785       break;
3786     }
3787   } /* for */
3788   fputc('\n', stderr);
3789 }
3790 # elif defined(GRAN)
3791 void 
3792 print_bq (StgClosure *node)
3793 {
3794   StgBlockingQueueElement *bqe;
3795   PEs node_loc, tso_loc;
3796   rtsBool end;
3797
3798   /* should cover all closures that may have a blocking queue */
3799   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3800          get_itbl(node)->type == FETCH_ME_BQ ||
3801          get_itbl(node)->type == RBH);
3802     
3803   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3804   node_loc = where_is(node);
3805
3806   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3807           node, info_type(node), node_loc);
3808
3809   /* 
3810      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3811   */
3812   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3813        !end; // iterate until bqe points to a CONSTR
3814        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3815     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3816     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3817     /* types of closures that may appear in a blocking queue */
3818     ASSERT(get_itbl(bqe)->type == TSO ||           
3819            get_itbl(bqe)->type == CONSTR); 
3820     /* only BQs of an RBH end with an RBH_Save closure */
3821     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3822
3823     tso_loc = where_is((StgClosure *)bqe);
3824     switch (get_itbl(bqe)->type) {
3825     case TSO:
3826       fprintf(stderr," TSO %d (%p) on [PE %d],",
3827               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3828       break;
3829     case CONSTR:
3830       fprintf(stderr," %s (IP %p),",
3831               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3832                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3833                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3834                "RBH_Save_?"), get_itbl(bqe));
3835       break;
3836     default:
3837       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3838            info_type((StgClosure *)bqe), node, info_type(node));
3839       break;
3840     }
3841   } /* for */
3842   fputc('\n', stderr);
3843 }
3844 #else
3845 /* 
3846    Nice and easy: only TSOs on the blocking queue
3847 */
3848 void 
3849 print_bq (StgClosure *node)
3850 {
3851   StgTSO *tso;
3852
3853   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3854   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3855        tso != END_TSO_QUEUE; 
3856        tso=tso->link) {
3857     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3858     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3859     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3860   }
3861   fputc('\n', stderr);
3862 }
3863 # endif
3864
3865 #if defined(PAR)
3866 static nat
3867 run_queue_len(void)
3868 {
3869   nat i;
3870   StgTSO *tso;
3871
3872   for (i=0, tso=run_queue_hd; 
3873        tso != END_TSO_QUEUE;
3874        i++, tso=tso->link)
3875     /* nothing */
3876
3877   return i;
3878 }
3879 #endif
3880
3881 static void
3882 sched_belch(char *s, ...)
3883 {
3884   va_list ap;
3885   va_start(ap,s);
3886 #ifdef SMP
3887   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3888 #elif defined(PAR)
3889   fprintf(stderr, "== ");
3890 #else
3891   fprintf(stderr, "scheduler: ");
3892 #endif
3893   vfprintf(stderr, s, ap);
3894   fprintf(stderr, "\n");
3895   va_end(ap);
3896 }
3897
3898 #endif /* DEBUG */
3899
3900
3901 //@node Index,  , Debugging Routines, Main scheduling code
3902 //@subsection Index
3903
3904 //@index
3905 //* StgMainThread::  @cindex\s-+StgMainThread
3906 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3907 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3908 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3909 //* context_switch::  @cindex\s-+context_switch
3910 //* createThread::  @cindex\s-+createThread
3911 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3912 //* initScheduler::  @cindex\s-+initScheduler
3913 //* interrupted::  @cindex\s-+interrupted
3914 //* next_thread_id::  @cindex\s-+next_thread_id
3915 //* print_bq::  @cindex\s-+print_bq
3916 //* run_queue_hd::  @cindex\s-+run_queue_hd
3917 //* run_queue_tl::  @cindex\s-+run_queue_tl
3918 //* sched_mutex::  @cindex\s-+sched_mutex
3919 //* schedule::  @cindex\s-+schedule
3920 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3921 //* term_mutex::  @cindex\s-+term_mutex
3922 //@end index