[project @ 2003-03-19 18:41:18 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.163 2003/03/19 18:41:19 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #define COMPILING_SCHEDULER
88 #include "Schedule.h"
89 #include "StgMiscClosures.h"
90 #include "Storage.h"
91 #include "Interpreter.h"
92 #include "Exception.h"
93 #include "Printer.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Timer.h"
98 #include "Prelude.h"
99 #include "ThreadLabels.h"
100 #ifdef PROFILING
101 #include "Proftimer.h"
102 #include "ProfHeap.h"
103 #endif
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
111 # include "HLC.h"
112 #endif
113 #include "Sparks.h"
114 #include "Capability.h"
115 #include "OSThreads.h"
116 #include  "Task.h"
117
118 #ifdef HAVE_SYS_TYPES_H
119 #include <sys/types.h>
120 #endif
121 #ifdef HAVE_UNISTD_H
122 #include <unistd.h>
123 #endif
124
125 #include <string.h>
126 #include <stdlib.h>
127 #include <stdarg.h>
128
129 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
130 //@subsection Variables and Data structures
131
132 /* Main thread queue.
133  * Locks required: sched_mutex.
134  */
135 StgMainThread *main_threads = NULL;
136
137 #ifdef THREADED_RTS
138 // Pointer to the thread that executes main
139 // When this thread is finished, the program terminates
140 // by calling shutdownHaskellAndExit.
141 // It would be better to add a call to shutdownHaskellAndExit
142 // to the Main.main wrapper and to remove this hack.
143 StgMainThread *main_main_thread = NULL;
144 #endif
145
146 /* Thread queues.
147  * Locks required: sched_mutex.
148  */
149 #if defined(GRAN)
150
151 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
152 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
153
154 /* 
155    In GranSim we have a runnable and a blocked queue for each processor.
156    In order to minimise code changes new arrays run_queue_hds/tls
157    are created. run_queue_hd is then a short cut (macro) for
158    run_queue_hds[CurrentProc] (see GranSim.h).
159    -- HWL
160 */
161 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
162 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
163 StgTSO *ccalling_threadss[MAX_PROC];
164 /* We use the same global list of threads (all_threads) in GranSim as in
165    the std RTS (i.e. we are cheating). However, we don't use this list in
166    the GranSim specific code at the moment (so we are only potentially
167    cheating).  */
168
169 #else /* !GRAN */
170
171 StgTSO *run_queue_hd = NULL;
172 StgTSO *run_queue_tl = NULL;
173 StgTSO *blocked_queue_hd = NULL;
174 StgTSO *blocked_queue_tl = NULL;
175 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
176
177 #endif
178
179 /* Linked list of all threads.
180  * Used for detecting garbage collected threads.
181  */
182 StgTSO *all_threads = NULL;
183
184 /* When a thread performs a safe C call (_ccall_GC, using old
185  * terminology), it gets put on the suspended_ccalling_threads
186  * list. Used by the garbage collector.
187  */
188 static StgTSO *suspended_ccalling_threads;
189
190 static StgTSO *threadStackOverflow(StgTSO *tso);
191
192 /* KH: The following two flags are shared memory locations.  There is no need
193        to lock them, since they are only unset at the end of a scheduler
194        operation.
195 */
196
197 /* flag set by signal handler to precipitate a context switch */
198 //@cindex context_switch
199 nat context_switch = 0;
200
201 /* if this flag is set as well, give up execution */
202 //@cindex interrupted
203 rtsBool interrupted = rtsFalse;
204
205 /* Next thread ID to allocate.
206  * Locks required: thread_id_mutex
207  */
208 //@cindex next_thread_id
209 static StgThreadID next_thread_id = 1;
210
211 /*
212  * Pointers to the state of the current thread.
213  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
214  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
215  */
216  
217 /* The smallest stack size that makes any sense is:
218  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
219  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
220  *  + 1                       (the realworld token for an IO thread)
221  *  + 1                       (the closure to enter)
222  *
223  * A thread with this stack will bomb immediately with a stack
224  * overflow, which will increase its stack size.  
225  */
226
227 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
228
229
230 #if defined(GRAN)
231 StgTSO *CurrentTSO;
232 #endif
233
234 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
235  *  exists - earlier gccs apparently didn't.
236  *  -= chak
237  */
238 StgTSO dummy_tso;
239
240 static rtsBool ready_to_gc;
241
242 /*
243  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
244  * in an MT setting, needed to signal that a worker thread shouldn't hang around
245  * in the scheduler when it is out of work.
246  */
247 static rtsBool shutting_down_scheduler = rtsFalse;
248
249 void            addToBlockedQueue ( StgTSO *tso );
250
251 static void     schedule          ( void );
252        void     interruptStgRts   ( void );
253
254 static void     detectBlackHoles  ( void );
255
256 #ifdef DEBUG
257 static void sched_belch(char *s, ...);
258 #endif
259
260 #if defined(RTS_SUPPORTS_THREADS)
261 /* ToDo: carefully document the invariants that go together
262  *       with these synchronisation objects.
263  */
264 Mutex     sched_mutex       = INIT_MUTEX_VAR;
265 Mutex     term_mutex        = INIT_MUTEX_VAR;
266
267 /*
268  * A heavyweight solution to the problem of protecting
269  * the thread_id from concurrent update.
270  */
271 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
272
273
274 # if defined(SMP)
275 static Condition gc_pending_cond = INIT_COND_VAR;
276 nat await_death;
277 # endif
278
279 #endif /* RTS_SUPPORTS_THREADS */
280
281 #if defined(PAR)
282 StgTSO *LastTSO;
283 rtsTime TimeOfLastYield;
284 rtsBool emitSchedule = rtsTrue;
285 #endif
286
287 #if DEBUG
288 static char *whatNext_strs[] = {
289   "ThreadRunGHC",
290   "ThreadInterpret",
291   "ThreadKilled",
292   "ThreadRelocated",
293   "ThreadComplete"
294 };
295 #endif
296
297 #if defined(PAR)
298 StgTSO * createSparkThread(rtsSpark spark);
299 StgTSO * activateSpark (rtsSpark spark);  
300 #endif
301
302 /*
303  * The thread state for the main thread.
304 // ToDo: check whether not needed any more
305 StgTSO   *MainTSO;
306  */
307
308 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
309 static void taskStart(void);
310 static void
311 taskStart(void)
312 {
313   schedule();
314 }
315 #endif
316
317 #if defined(RTS_SUPPORTS_THREADS)
318 void
319 startSchedulerTask(void)
320 {
321     startTask(taskStart);
322 }
323 #endif
324
325 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
326 //@subsection Main scheduling loop
327
328 /* ---------------------------------------------------------------------------
329    Main scheduling loop.
330
331    We use round-robin scheduling, each thread returning to the
332    scheduler loop when one of these conditions is detected:
333
334       * out of heap space
335       * timer expires (thread yields)
336       * thread blocks
337       * thread ends
338       * stack overflow
339
340    Locking notes:  we acquire the scheduler lock once at the beginning
341    of the scheduler loop, and release it when
342     
343       * running a thread, or
344       * waiting for work, or
345       * waiting for a GC to complete.
346
347    GRAN version:
348      In a GranSim setup this loop iterates over the global event queue.
349      This revolves around the global event queue, which determines what 
350      to do next. Therefore, it's more complicated than either the 
351      concurrent or the parallel (GUM) setup.
352
353    GUM version:
354      GUM iterates over incoming messages.
355      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
356      and sends out a fish whenever it has nothing to do; in-between
357      doing the actual reductions (shared code below) it processes the
358      incoming messages and deals with delayed operations 
359      (see PendingFetches).
360      This is not the ugliest code you could imagine, but it's bloody close.
361
362    ------------------------------------------------------------------------ */
363 //@cindex schedule
364 static void
365 schedule( void )
366 {
367   StgTSO *t;
368   Capability *cap;
369   StgThreadReturnCode ret;
370 #if defined(GRAN)
371   rtsEvent *event;
372 #elif defined(PAR)
373   StgSparkPool *pool;
374   rtsSpark spark;
375   StgTSO *tso;
376   GlobalTaskId pe;
377   rtsBool receivedFinish = rtsFalse;
378 # if defined(DEBUG)
379   nat tp_size, sp_size; // stats only
380 # endif
381 #endif
382   rtsBool was_interrupted = rtsFalse;
383   StgTSOWhatNext prev_what_next;
384   
385   ACQUIRE_LOCK(&sched_mutex);
386  
387 #if defined(RTS_SUPPORTS_THREADS)
388   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
389   IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId()));
390 #else
391   /* simply initialise it in the non-threaded case */
392   grabCapability(&cap);
393 #endif
394
395 #if defined(GRAN)
396   /* set up first event to get things going */
397   /* ToDo: assign costs for system setup and init MainTSO ! */
398   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
399             ContinueThread, 
400             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
401
402   IF_DEBUG(gran,
403            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
404            G_TSO(CurrentTSO, 5));
405
406   if (RtsFlags.GranFlags.Light) {
407     /* Save current time; GranSim Light only */
408     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
409   }      
410
411   event = get_next_event();
412
413   while (event!=(rtsEvent*)NULL) {
414     /* Choose the processor with the next event */
415     CurrentProc = event->proc;
416     CurrentTSO = event->tso;
417
418 #elif defined(PAR)
419
420   while (!receivedFinish) {    /* set by processMessages */
421                                /* when receiving PP_FINISH message         */ 
422 #else
423
424   while (1) {
425
426 #endif
427
428     IF_DEBUG(scheduler, printAllThreads());
429
430 #if defined(RTS_SUPPORTS_THREADS)
431     /* Check to see whether there are any worker threads
432        waiting to deposit external call results. If so,
433        yield our capability */
434     yieldToReturningWorker(&sched_mutex, &cap);
435 #endif
436
437     /* If we're interrupted (the user pressed ^C, or some other
438      * termination condition occurred), kill all the currently running
439      * threads.
440      */
441     if (interrupted) {
442       IF_DEBUG(scheduler, sched_belch("interrupted"));
443       interrupted = rtsFalse;
444       was_interrupted = rtsTrue;
445 #if defined(RTS_SUPPORTS_THREADS)
446       // In the threaded RTS, deadlock detection doesn't work,
447       // so just exit right away.
448       prog_belch("interrupted");
449       releaseCapability(cap);
450       startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
451       RELEASE_LOCK(&sched_mutex);
452       shutdownHaskellAndExit(EXIT_SUCCESS);
453 #else
454       deleteAllThreads();
455 #endif
456     }
457
458     /* Go through the list of main threads and wake up any
459      * clients whose computations have finished.  ToDo: this
460      * should be done more efficiently without a linear scan
461      * of the main threads list, somehow...
462      */
463 #if defined(RTS_SUPPORTS_THREADS)
464     { 
465       StgMainThread *m, **prev;
466       prev = &main_threads;
467       for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
468         switch (m->tso->what_next) {
469         case ThreadComplete:
470           if (m->ret) {
471               // NOTE: return val is tso->sp[1] (see StgStartup.hc)
472               *(m->ret) = (StgClosure *)m->tso->sp[1]; 
473           }
474           *prev = m->link;
475           m->stat = Success;
476           broadcastCondition(&m->wakeup);
477 #ifdef DEBUG
478           removeThreadLabel((StgWord)m->tso);
479 #endif
480           if(m == main_main_thread)
481           {
482               releaseCapability(cap);
483               startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
484               RELEASE_LOCK(&sched_mutex);
485               shutdownHaskellAndExit(EXIT_SUCCESS);
486           }
487           break;
488         case ThreadKilled:
489           if (m->ret) *(m->ret) = NULL;
490           *prev = m->link;
491           if (was_interrupted) {
492             m->stat = Interrupted;
493           } else {
494             m->stat = Killed;
495           }
496           broadcastCondition(&m->wakeup);
497 #ifdef DEBUG
498           removeThreadLabel((StgWord)m->tso);
499 #endif
500           if(m == main_main_thread)
501           {
502               releaseCapability(cap);
503               startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
504               RELEASE_LOCK(&sched_mutex);
505               shutdownHaskellAndExit(EXIT_SUCCESS);
506           }
507           break;
508         default:
509           break;
510         }
511       }
512     }
513
514 #else /* not threaded */
515
516 # if defined(PAR)
517     /* in GUM do this only on the Main PE */
518     if (IAmMainThread)
519 # endif
520     /* If our main thread has finished or been killed, return.
521      */
522     {
523       StgMainThread *m = main_threads;
524       if (m->tso->what_next == ThreadComplete
525           || m->tso->what_next == ThreadKilled) {
526 #ifdef DEBUG
527         removeThreadLabel((StgWord)m->tso);
528 #endif
529         main_threads = main_threads->link;
530         if (m->tso->what_next == ThreadComplete) {
531             // We finished successfully, fill in the return value
532             // NOTE: return val is tso->sp[1] (see StgStartup.hc)
533             if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
534             m->stat = Success;
535             return;
536         } else {
537           if (m->ret) { *(m->ret) = NULL; };
538           if (was_interrupted) {
539             m->stat = Interrupted;
540           } else {
541             m->stat = Killed;
542           }
543           return;
544         }
545       }
546     }
547 #endif
548
549     /* Top up the run queue from our spark pool.  We try to make the
550      * number of threads in the run queue equal to the number of
551      * free capabilities.
552      *
553      * Disable spark support in SMP for now, non-essential & requires
554      * a little bit of work to make it compile cleanly. -- sof 1/02.
555      */
556 #if 0 /* defined(SMP) */
557     {
558       nat n = getFreeCapabilities();
559       StgTSO *tso = run_queue_hd;
560
561       /* Count the run queue */
562       while (n > 0 && tso != END_TSO_QUEUE) {
563         tso = tso->link;
564         n--;
565       }
566
567       for (; n > 0; n--) {
568         StgClosure *spark;
569         spark = findSpark(rtsFalse);
570         if (spark == NULL) {
571           break; /* no more sparks in the pool */
572         } else {
573           /* I'd prefer this to be done in activateSpark -- HWL */
574           /* tricky - it needs to hold the scheduler lock and
575            * not try to re-acquire it -- SDM */
576           createSparkThread(spark);       
577           IF_DEBUG(scheduler,
578                    sched_belch("==^^ turning spark of closure %p into a thread",
579                                (StgClosure *)spark));
580         }
581       }
582       /* We need to wake up the other tasks if we just created some
583        * work for them.
584        */
585       if (getFreeCapabilities() - n > 1) {
586           signalCondition( &thread_ready_cond );
587       }
588     }
589 #endif // SMP
590
591     /* check for signals each time around the scheduler */
592 #ifndef mingw32_TARGET_OS
593     if (signals_pending()) {
594       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
595       startSignalHandlers();
596       ACQUIRE_LOCK(&sched_mutex);
597     }
598 #endif
599
600     /* Check whether any waiting threads need to be woken up.  If the
601      * run queue is empty, and there are no other tasks running, we
602      * can wait indefinitely for something to happen.
603      */
604     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
605 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
606                 || EMPTY_RUN_QUEUE()
607 #endif
608         )
609     {
610       awaitEvent( EMPTY_RUN_QUEUE()
611 #if defined(SMP)
612         && allFreeCapabilities()
613 #endif
614         );
615     }
616     /* we can be interrupted while waiting for I/O... */
617     if (interrupted) continue;
618
619     /* 
620      * Detect deadlock: when we have no threads to run, there are no
621      * threads waiting on I/O or sleeping, and all the other tasks are
622      * waiting for work, we must have a deadlock of some description.
623      *
624      * We first try to find threads blocked on themselves (ie. black
625      * holes), and generate NonTermination exceptions where necessary.
626      *
627      * If no threads are black holed, we have a deadlock situation, so
628      * inform all the main threads.
629      */
630 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
631     if (   EMPTY_THREAD_QUEUES()
632 #if defined(RTS_SUPPORTS_THREADS)
633         && EMPTY_QUEUE(suspended_ccalling_threads)
634 #endif
635 #ifdef SMP
636         && allFreeCapabilities()
637 #endif
638         )
639     {
640         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
641 #if defined(THREADED_RTS)
642         /* and SMP mode ..? */
643         releaseCapability(cap);
644 #endif
645         // Garbage collection can release some new threads due to
646         // either (a) finalizers or (b) threads resurrected because
647         // they are about to be send BlockedOnDeadMVar.  Any threads
648         // thus released will be immediately runnable.
649         GarbageCollect(GetRoots,rtsTrue);
650
651         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
652
653         IF_DEBUG(scheduler, 
654                  sched_belch("still deadlocked, checking for black holes..."));
655         detectBlackHoles();
656
657         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
658
659 #ifndef mingw32_TARGET_OS
660         /* If we have user-installed signal handlers, then wait
661          * for signals to arrive rather then bombing out with a
662          * deadlock.
663          */
664 #if defined(RTS_SUPPORTS_THREADS)
665         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
666                       a signal with no runnable threads (or I/O
667                       suspended ones) leads nowhere quick.
668                       For now, simply shut down when we reach this
669                       condition.
670                       
671                       ToDo: define precisely under what conditions
672                       the Scheduler should shut down in an MT setting.
673                    */
674 #else
675         if ( anyUserHandlers() ) {
676 #endif
677             IF_DEBUG(scheduler, 
678                      sched_belch("still deadlocked, waiting for signals..."));
679
680             awaitUserSignals();
681
682             // we might be interrupted...
683             if (interrupted) { continue; }
684
685             if (signals_pending()) {
686                 RELEASE_LOCK(&sched_mutex);
687                 startSignalHandlers();
688                 ACQUIRE_LOCK(&sched_mutex);
689             }
690             ASSERT(!EMPTY_RUN_QUEUE());
691             goto not_deadlocked;
692         }
693 #endif
694
695         /* Probably a real deadlock.  Send the current main thread the
696          * Deadlock exception (or in the SMP build, send *all* main
697          * threads the deadlock exception, since none of them can make
698          * progress).
699          */
700         {
701             StgMainThread *m;
702 #if defined(RTS_SUPPORTS_THREADS)
703             for (m = main_threads; m != NULL; m = m->link) {
704                 switch (m->tso->why_blocked) {
705                 case BlockedOnBlackHole:
706                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
707                     break;
708                 case BlockedOnException:
709                 case BlockedOnMVar:
710                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
711                     break;
712                 default:
713                     barf("deadlock: main thread blocked in a strange way");
714                 }
715             }
716 #else
717             m = main_threads;
718             switch (m->tso->why_blocked) {
719             case BlockedOnBlackHole:
720                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
721                 break;
722             case BlockedOnException:
723             case BlockedOnMVar:
724                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
725                 break;
726             default:
727                 barf("deadlock: main thread blocked in a strange way");
728             }
729 #endif
730         }
731
732 #if defined(RTS_SUPPORTS_THREADS)
733         /* ToDo: revisit conditions (and mechanism) for shutting
734            down a multi-threaded world  */
735         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
736         RELEASE_LOCK(&sched_mutex);
737         shutdownHaskell();
738         return;
739 #endif
740     }
741   not_deadlocked:
742
743 #elif defined(RTS_SUPPORTS_THREADS)
744     /* ToDo: add deadlock detection in threaded RTS */
745 #elif defined(PAR)
746     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
747 #endif
748
749 #if defined(SMP)
750     /* If there's a GC pending, don't do anything until it has
751      * completed.
752      */
753     if (ready_to_gc) {
754       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
755       waitCondition( &gc_pending_cond, &sched_mutex );
756     }
757 #endif    
758
759 #if defined(RTS_SUPPORTS_THREADS)
760 #if defined(SMP)
761     /* block until we've got a thread on the run queue and a free
762      * capability.
763      *
764      */
765     if ( EMPTY_RUN_QUEUE() ) {
766       /* Give up our capability */
767       releaseCapability(cap);
768
769       /* If we're in the process of shutting down (& running the
770        * a batch of finalisers), don't wait around.
771        */
772       if ( shutting_down_scheduler ) {
773         RELEASE_LOCK(&sched_mutex);
774         return;
775       }
776       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
777       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
778       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
779     }
780 #else
781     if ( EMPTY_RUN_QUEUE() ) {
782       continue; // nothing to do
783     }
784 #endif
785 #endif
786
787 #if defined(GRAN)
788     if (RtsFlags.GranFlags.Light)
789       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
790
791     /* adjust time based on time-stamp */
792     if (event->time > CurrentTime[CurrentProc] &&
793         event->evttype != ContinueThread)
794       CurrentTime[CurrentProc] = event->time;
795     
796     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
797     if (!RtsFlags.GranFlags.Light)
798       handleIdlePEs();
799
800     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
801
802     /* main event dispatcher in GranSim */
803     switch (event->evttype) {
804       /* Should just be continuing execution */
805     case ContinueThread:
806       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
807       /* ToDo: check assertion
808       ASSERT(run_queue_hd != (StgTSO*)NULL &&
809              run_queue_hd != END_TSO_QUEUE);
810       */
811       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
812       if (!RtsFlags.GranFlags.DoAsyncFetch &&
813           procStatus[CurrentProc]==Fetching) {
814         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
815               CurrentTSO->id, CurrentTSO, CurrentProc);
816         goto next_thread;
817       } 
818       /* Ignore ContinueThreads for completed threads */
819       if (CurrentTSO->what_next == ThreadComplete) {
820         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
821               CurrentTSO->id, CurrentTSO, CurrentProc);
822         goto next_thread;
823       } 
824       /* Ignore ContinueThreads for threads that are being migrated */
825       if (PROCS(CurrentTSO)==Nowhere) { 
826         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
827               CurrentTSO->id, CurrentTSO, CurrentProc);
828         goto next_thread;
829       }
830       /* The thread should be at the beginning of the run queue */
831       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
832         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
833               CurrentTSO->id, CurrentTSO, CurrentProc);
834         break; // run the thread anyway
835       }
836       /*
837       new_event(proc, proc, CurrentTime[proc],
838                 FindWork,
839                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
840       goto next_thread; 
841       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
842       break; // now actually run the thread; DaH Qu'vam yImuHbej 
843
844     case FetchNode:
845       do_the_fetchnode(event);
846       goto next_thread;             /* handle next event in event queue  */
847       
848     case GlobalBlock:
849       do_the_globalblock(event);
850       goto next_thread;             /* handle next event in event queue  */
851       
852     case FetchReply:
853       do_the_fetchreply(event);
854       goto next_thread;             /* handle next event in event queue  */
855       
856     case UnblockThread:   /* Move from the blocked queue to the tail of */
857       do_the_unblock(event);
858       goto next_thread;             /* handle next event in event queue  */
859       
860     case ResumeThread:  /* Move from the blocked queue to the tail of */
861       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
862       event->tso->gran.blocktime += 
863         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
864       do_the_startthread(event);
865       goto next_thread;             /* handle next event in event queue  */
866       
867     case StartThread:
868       do_the_startthread(event);
869       goto next_thread;             /* handle next event in event queue  */
870       
871     case MoveThread:
872       do_the_movethread(event);
873       goto next_thread;             /* handle next event in event queue  */
874       
875     case MoveSpark:
876       do_the_movespark(event);
877       goto next_thread;             /* handle next event in event queue  */
878       
879     case FindWork:
880       do_the_findwork(event);
881       goto next_thread;             /* handle next event in event queue  */
882       
883     default:
884       barf("Illegal event type %u\n", event->evttype);
885     }  /* switch */
886     
887     /* This point was scheduler_loop in the old RTS */
888
889     IF_DEBUG(gran, belch("GRAN: after main switch"));
890
891     TimeOfLastEvent = CurrentTime[CurrentProc];
892     TimeOfNextEvent = get_time_of_next_event();
893     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
894     // CurrentTSO = ThreadQueueHd;
895
896     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
897                          TimeOfNextEvent));
898
899     if (RtsFlags.GranFlags.Light) 
900       GranSimLight_leave_system(event, &ActiveTSO); 
901
902     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
903
904     IF_DEBUG(gran, 
905              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
906
907     /* in a GranSim setup the TSO stays on the run queue */
908     t = CurrentTSO;
909     /* Take a thread from the run queue. */
910     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
911
912     IF_DEBUG(gran, 
913              fprintf(stderr, "GRAN: About to run current thread, which is\n");
914              G_TSO(t,5));
915
916     context_switch = 0; // turned on via GranYield, checking events and time slice
917
918     IF_DEBUG(gran, 
919              DumpGranEvent(GR_SCHEDULE, t));
920
921     procStatus[CurrentProc] = Busy;
922
923 #elif defined(PAR)
924     if (PendingFetches != END_BF_QUEUE) {
925         processFetches();
926     }
927
928     /* ToDo: phps merge with spark activation above */
929     /* check whether we have local work and send requests if we have none */
930     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
931       /* :-[  no local threads => look out for local sparks */
932       /* the spark pool for the current PE */
933       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
934       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
935           pool->hd < pool->tl) {
936         /* 
937          * ToDo: add GC code check that we really have enough heap afterwards!!
938          * Old comment:
939          * If we're here (no runnable threads) and we have pending
940          * sparks, we must have a space problem.  Get enough space
941          * to turn one of those pending sparks into a
942          * thread... 
943          */
944
945         spark = findSpark(rtsFalse);                /* get a spark */
946         if (spark != (rtsSpark) NULL) {
947           tso = activateSpark(spark);       /* turn the spark into a thread */
948           IF_PAR_DEBUG(schedule,
949                        belch("==== schedule: Created TSO %d (%p); %d threads active",
950                              tso->id, tso, advisory_thread_count));
951
952           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
953             belch("==^^ failed to activate spark");
954             goto next_thread;
955           }               /* otherwise fall through & pick-up new tso */
956         } else {
957           IF_PAR_DEBUG(verbose,
958                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
959                              spark_queue_len(pool)));
960           goto next_thread;
961         }
962       }
963
964       /* If we still have no work we need to send a FISH to get a spark
965          from another PE 
966       */
967       if (EMPTY_RUN_QUEUE()) {
968       /* =8-[  no local sparks => look for work on other PEs */
969         /*
970          * We really have absolutely no work.  Send out a fish
971          * (there may be some out there already), and wait for
972          * something to arrive.  We clearly can't run any threads
973          * until a SCHEDULE or RESUME arrives, and so that's what
974          * we're hoping to see.  (Of course, we still have to
975          * respond to other types of messages.)
976          */
977         TIME now = msTime() /*CURRENT_TIME*/;
978         IF_PAR_DEBUG(verbose, 
979                      belch("--  now=%ld", now));
980         IF_PAR_DEBUG(verbose,
981                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
982                          (last_fish_arrived_at!=0 &&
983                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
984                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
985                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
986                              last_fish_arrived_at,
987                              RtsFlags.ParFlags.fishDelay, now);
988                      });
989         
990         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
991             (last_fish_arrived_at==0 ||
992              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
993           /* outstandingFishes is set in sendFish, processFish;
994              avoid flooding system with fishes via delay */
995           pe = choosePE();
996           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
997                    NEW_FISH_HUNGER);
998
999           // Global statistics: count no. of fishes
1000           if (RtsFlags.ParFlags.ParStats.Global &&
1001               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1002             globalParStats.tot_fish_mess++;
1003           }
1004         }
1005       
1006         receivedFinish = processMessages();
1007         goto next_thread;
1008       }
1009     } else if (PacketsWaiting()) {  /* Look for incoming messages */
1010       receivedFinish = processMessages();
1011     }
1012
1013     /* Now we are sure that we have some work available */
1014     ASSERT(run_queue_hd != END_TSO_QUEUE);
1015
1016     /* Take a thread from the run queue, if we have work */
1017     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
1018     IF_DEBUG(sanity,checkTSO(t));
1019
1020     /* ToDo: write something to the log-file
1021     if (RTSflags.ParFlags.granSimStats && !sameThread)
1022         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1023
1024     CurrentTSO = t;
1025     */
1026     /* the spark pool for the current PE */
1027     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1028
1029     IF_DEBUG(scheduler, 
1030              belch("--=^ %d threads, %d sparks on [%#x]", 
1031                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1032
1033 # if 1
1034     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1035         t && LastTSO && t->id != LastTSO->id && 
1036         LastTSO->why_blocked == NotBlocked && 
1037         LastTSO->what_next != ThreadComplete) {
1038       // if previously scheduled TSO not blocked we have to record the context switch
1039       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1040                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1041     }
1042
1043     if (RtsFlags.ParFlags.ParStats.Full && 
1044         (emitSchedule /* forced emit */ ||
1045         (t && LastTSO && t->id != LastTSO->id))) {
1046       /* 
1047          we are running a different TSO, so write a schedule event to log file
1048          NB: If we use fair scheduling we also have to write  a deschedule 
1049              event for LastTSO; with unfair scheduling we know that the
1050              previous tso has blocked whenever we switch to another tso, so
1051              we don't need it in GUM for now
1052       */
1053       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1054                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1055       emitSchedule = rtsFalse;
1056     }
1057      
1058 # endif
1059 #else /* !GRAN && !PAR */
1060   
1061     /* grab a thread from the run queue */
1062     ASSERT(run_queue_hd != END_TSO_QUEUE);
1063     t = POP_RUN_QUEUE();
1064     // Sanity check the thread we're about to run.  This can be
1065     // expensive if there is lots of thread switching going on...
1066     IF_DEBUG(sanity,checkTSO(t));
1067 #endif
1068
1069     cap->r.rCurrentTSO = t;
1070     
1071     /* context switches are now initiated by the timer signal, unless
1072      * the user specified "context switch as often as possible", with
1073      * +RTS -C0
1074      */
1075     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1076          && (run_queue_hd != END_TSO_QUEUE
1077              || blocked_queue_hd != END_TSO_QUEUE
1078              || sleeping_queue != END_TSO_QUEUE)))
1079         context_switch = 1;
1080     else
1081         context_switch = 0;
1082
1083 run_thread:
1084
1085     RELEASE_LOCK(&sched_mutex);
1086
1087     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
1088                               t->id, whatNext_strs[t->what_next]));
1089
1090 #ifdef PROFILING
1091     startHeapProfTimer();
1092 #endif
1093
1094     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1095     /* Run the current thread 
1096      */
1097     prev_what_next = t->what_next;
1098     switch (prev_what_next) {
1099     case ThreadKilled:
1100     case ThreadComplete:
1101         /* Thread already finished, return to scheduler. */
1102         ret = ThreadFinished;
1103         break;
1104     case ThreadRunGHC:
1105         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1106         break;
1107     case ThreadInterpret:
1108         ret = interpretBCO(cap);
1109         break;
1110     default:
1111       barf("schedule: invalid what_next field");
1112     }
1113     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1114     
1115     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1116 #ifdef PROFILING
1117     stopHeapProfTimer();
1118     CCCS = CCS_SYSTEM;
1119 #endif
1120     
1121     ACQUIRE_LOCK(&sched_mutex);
1122     
1123 #ifdef RTS_SUPPORTS_THREADS
1124     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1125 #elif !defined(GRAN) && !defined(PAR)
1126     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1127 #endif
1128     t = cap->r.rCurrentTSO;
1129     
1130 #if defined(PAR)
1131     /* HACK 675: if the last thread didn't yield, make sure to print a 
1132        SCHEDULE event to the log file when StgRunning the next thread, even
1133        if it is the same one as before */
1134     LastTSO = t; 
1135     TimeOfLastYield = CURRENT_TIME;
1136 #endif
1137
1138     switch (ret) {
1139     case HeapOverflow:
1140 #if defined(GRAN)
1141       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1142       globalGranStats.tot_heapover++;
1143 #elif defined(PAR)
1144       globalParStats.tot_heapover++;
1145 #endif
1146
1147       // did the task ask for a large block?
1148       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1149           // if so, get one and push it on the front of the nursery.
1150           bdescr *bd;
1151           nat blocks;
1152           
1153           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1154
1155           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
1156                                    t->id, whatNext_strs[t->what_next], blocks));
1157
1158           // don't do this if it would push us over the
1159           // alloc_blocks_lim limit; we'll GC first.
1160           if (alloc_blocks + blocks < alloc_blocks_lim) {
1161
1162               alloc_blocks += blocks;
1163               bd = allocGroup( blocks );
1164
1165               // link the new group into the list
1166               bd->link = cap->r.rCurrentNursery;
1167               bd->u.back = cap->r.rCurrentNursery->u.back;
1168               if (cap->r.rCurrentNursery->u.back != NULL) {
1169                   cap->r.rCurrentNursery->u.back->link = bd;
1170               } else {
1171                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1172                          g0s0->blocks == cap->r.rNursery);
1173                   cap->r.rNursery = g0s0->blocks = bd;
1174               }           
1175               cap->r.rCurrentNursery->u.back = bd;
1176
1177               // initialise it as a nursery block.  We initialise the
1178               // step, gen_no, and flags field of *every* sub-block in
1179               // this large block, because this is easier than making
1180               // sure that we always find the block head of a large
1181               // block whenever we call Bdescr() (eg. evacuate() and
1182               // isAlive() in the GC would both have to do this, at
1183               // least).
1184               { 
1185                   bdescr *x;
1186                   for (x = bd; x < bd + blocks; x++) {
1187                       x->step = g0s0;
1188                       x->gen_no = 0;
1189                       x->flags = 0;
1190                   }
1191               }
1192
1193               // don't forget to update the block count in g0s0.
1194               g0s0->n_blocks += blocks;
1195               // This assert can be a killer if the app is doing lots
1196               // of large block allocations.
1197               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1198
1199               // now update the nursery to point to the new block
1200               cap->r.rCurrentNursery = bd;
1201
1202               // we might be unlucky and have another thread get on the
1203               // run queue before us and steal the large block, but in that
1204               // case the thread will just end up requesting another large
1205               // block.
1206               PUSH_ON_RUN_QUEUE(t);
1207               break;
1208           }
1209       }
1210
1211       /* make all the running tasks block on a condition variable,
1212        * maybe set context_switch and wait till they all pile in,
1213        * then have them wait on a GC condition variable.
1214        */
1215       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1216                                t->id, whatNext_strs[t->what_next]));
1217       threadPaused(t);
1218 #if defined(GRAN)
1219       ASSERT(!is_on_queue(t,CurrentProc));
1220 #elif defined(PAR)
1221       /* Currently we emit a DESCHEDULE event before GC in GUM.
1222          ToDo: either add separate event to distinguish SYSTEM time from rest
1223                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1224       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1225         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1226                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1227         emitSchedule = rtsTrue;
1228       }
1229 #endif
1230       
1231       ready_to_gc = rtsTrue;
1232       context_switch = 1;               /* stop other threads ASAP */
1233       PUSH_ON_RUN_QUEUE(t);
1234       /* actual GC is done at the end of the while loop */
1235       break;
1236       
1237     case StackOverflow:
1238 #if defined(GRAN)
1239       IF_DEBUG(gran, 
1240                DumpGranEvent(GR_DESCHEDULE, t));
1241       globalGranStats.tot_stackover++;
1242 #elif defined(PAR)
1243       // IF_DEBUG(par, 
1244       // DumpGranEvent(GR_DESCHEDULE, t);
1245       globalParStats.tot_stackover++;
1246 #endif
1247       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1248                                t->id, whatNext_strs[t->what_next]));
1249       /* just adjust the stack for this thread, then pop it back
1250        * on the run queue.
1251        */
1252       threadPaused(t);
1253       { 
1254         StgMainThread *m;
1255         /* enlarge the stack */
1256         StgTSO *new_t = threadStackOverflow(t);
1257         
1258         /* This TSO has moved, so update any pointers to it from the
1259          * main thread stack.  It better not be on any other queues...
1260          * (it shouldn't be).
1261          */
1262         for (m = main_threads; m != NULL; m = m->link) {
1263           if (m->tso == t) {
1264             m->tso = new_t;
1265           }
1266         }
1267         threadPaused(new_t);
1268         PUSH_ON_RUN_QUEUE(new_t);
1269       }
1270       break;
1271
1272     case ThreadYielding:
1273 #if defined(GRAN)
1274       IF_DEBUG(gran, 
1275                DumpGranEvent(GR_DESCHEDULE, t));
1276       globalGranStats.tot_yields++;
1277 #elif defined(PAR)
1278       // IF_DEBUG(par, 
1279       // DumpGranEvent(GR_DESCHEDULE, t);
1280       globalParStats.tot_yields++;
1281 #endif
1282       /* put the thread back on the run queue.  Then, if we're ready to
1283        * GC, check whether this is the last task to stop.  If so, wake
1284        * up the GC thread.  getThread will block during a GC until the
1285        * GC is finished.
1286        */
1287       IF_DEBUG(scheduler,
1288                if (t->what_next != prev_what_next) {
1289                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1290                          t->id, whatNext_strs[t->what_next]);
1291                } else {
1292                    belch("--<< thread %ld (%s) stopped, yielding", 
1293                          t->id, whatNext_strs[t->what_next]);
1294                }
1295                );
1296
1297       IF_DEBUG(sanity,
1298                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1299                checkTSO(t));
1300       ASSERT(t->link == END_TSO_QUEUE);
1301
1302       // Shortcut if we're just switching evaluators: don't bother
1303       // doing stack squeezing (which can be expensive), just run the
1304       // thread.
1305       if (t->what_next != prev_what_next) {
1306           goto run_thread;
1307       }
1308
1309       threadPaused(t);
1310
1311 #if defined(GRAN)
1312       ASSERT(!is_on_queue(t,CurrentProc));
1313
1314       IF_DEBUG(sanity,
1315                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1316                checkThreadQsSanity(rtsTrue));
1317 #endif
1318
1319 #if defined(PAR)
1320       if (RtsFlags.ParFlags.doFairScheduling) { 
1321         /* this does round-robin scheduling; good for concurrency */
1322         APPEND_TO_RUN_QUEUE(t);
1323       } else {
1324         /* this does unfair scheduling; good for parallelism */
1325         PUSH_ON_RUN_QUEUE(t);
1326       }
1327 #else
1328       // this does round-robin scheduling; good for concurrency
1329       APPEND_TO_RUN_QUEUE(t);
1330 #endif
1331
1332 #if defined(GRAN)
1333       /* add a ContinueThread event to actually process the thread */
1334       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1335                 ContinueThread,
1336                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1337       IF_GRAN_DEBUG(bq, 
1338                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1339                G_EVENTQ(0);
1340                G_CURR_THREADQ(0));
1341 #endif /* GRAN */
1342       break;
1343
1344     case ThreadBlocked:
1345 #if defined(GRAN)
1346       IF_DEBUG(scheduler,
1347                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1348                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1349                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1350
1351       // ??? needed; should emit block before
1352       IF_DEBUG(gran, 
1353                DumpGranEvent(GR_DESCHEDULE, t)); 
1354       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1355       /*
1356         ngoq Dogh!
1357       ASSERT(procStatus[CurrentProc]==Busy || 
1358               ((procStatus[CurrentProc]==Fetching) && 
1359               (t->block_info.closure!=(StgClosure*)NULL)));
1360       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1361           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1362             procStatus[CurrentProc]==Fetching)) 
1363         procStatus[CurrentProc] = Idle;
1364       */
1365 #elif defined(PAR)
1366       IF_DEBUG(scheduler,
1367                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1368                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1369       IF_PAR_DEBUG(bq,
1370
1371                    if (t->block_info.closure!=(StgClosure*)NULL) 
1372                      print_bq(t->block_info.closure));
1373
1374       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1375       blockThread(t);
1376
1377       /* whatever we schedule next, we must log that schedule */
1378       emitSchedule = rtsTrue;
1379
1380 #else /* !GRAN */
1381       /* don't need to do anything.  Either the thread is blocked on
1382        * I/O, in which case we'll have called addToBlockedQueue
1383        * previously, or it's blocked on an MVar or Blackhole, in which
1384        * case it'll be on the relevant queue already.
1385        */
1386       IF_DEBUG(scheduler,
1387                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1388                        t->id, whatNext_strs[t->what_next]);
1389                printThreadBlockage(t);
1390                fprintf(stderr, "\n"));
1391
1392       /* Only for dumping event to log file 
1393          ToDo: do I need this in GranSim, too?
1394       blockThread(t);
1395       */
1396 #endif
1397       threadPaused(t);
1398       break;
1399       
1400     case ThreadFinished:
1401       /* Need to check whether this was a main thread, and if so, signal
1402        * the task that started it with the return value.  If we have no
1403        * more main threads, we probably need to stop all the tasks until
1404        * we get a new one.
1405        */
1406       /* We also end up here if the thread kills itself with an
1407        * uncaught exception, see Exception.hc.
1408        */
1409       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1410                                t->id, whatNext_strs[t->what_next]));
1411 #if defined(GRAN)
1412       endThread(t, CurrentProc); // clean-up the thread
1413 #elif defined(PAR)
1414       /* For now all are advisory -- HWL */
1415       //if(t->priority==AdvisoryPriority) ??
1416       advisory_thread_count--;
1417       
1418 # ifdef DIST
1419       if(t->dist.priority==RevalPriority)
1420         FinishReval(t);
1421 # endif
1422       
1423       if (RtsFlags.ParFlags.ParStats.Full &&
1424           !RtsFlags.ParFlags.ParStats.Suppressed) 
1425         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1426 #endif
1427       break;
1428       
1429     default:
1430       barf("schedule: invalid thread return code %d", (int)ret);
1431     }
1432
1433 #ifdef PROFILING
1434     // When we have +RTS -i0 and we're heap profiling, do a census at
1435     // every GC.  This lets us get repeatable runs for debugging.
1436     if (performHeapProfile ||
1437         (RtsFlags.ProfFlags.profileInterval==0 &&
1438          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1439         GarbageCollect(GetRoots, rtsTrue);
1440         heapCensus();
1441         performHeapProfile = rtsFalse;
1442         ready_to_gc = rtsFalse; // we already GC'd
1443     }
1444 #endif
1445
1446     if (ready_to_gc 
1447 #ifdef SMP
1448         && allFreeCapabilities() 
1449 #endif
1450         ) {
1451       /* everybody back, start the GC.
1452        * Could do it in this thread, or signal a condition var
1453        * to do it in another thread.  Either way, we need to
1454        * broadcast on gc_pending_cond afterward.
1455        */
1456 #if defined(RTS_SUPPORTS_THREADS)
1457       IF_DEBUG(scheduler,sched_belch("doing GC"));
1458 #endif
1459       GarbageCollect(GetRoots,rtsFalse);
1460       ready_to_gc = rtsFalse;
1461 #ifdef SMP
1462       broadcastCondition(&gc_pending_cond);
1463 #endif
1464 #if defined(GRAN)
1465       /* add a ContinueThread event to continue execution of current thread */
1466       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1467                 ContinueThread,
1468                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1469       IF_GRAN_DEBUG(bq, 
1470                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1471                G_EVENTQ(0);
1472                G_CURR_THREADQ(0));
1473 #endif /* GRAN */
1474     }
1475
1476 #if defined(GRAN)
1477   next_thread:
1478     IF_GRAN_DEBUG(unused,
1479                   print_eventq(EventHd));
1480
1481     event = get_next_event();
1482 #elif defined(PAR)
1483   next_thread:
1484     /* ToDo: wait for next message to arrive rather than busy wait */
1485 #endif /* GRAN */
1486
1487   } /* end of while(1) */
1488
1489   IF_PAR_DEBUG(verbose,
1490                belch("== Leaving schedule() after having received Finish"));
1491 }
1492
1493 /* ---------------------------------------------------------------------------
1494  * Singleton fork(). Do not copy any running threads.
1495  * ------------------------------------------------------------------------- */
1496
1497 StgInt forkProcess(StgTSO* tso) {
1498
1499 #ifndef mingw32_TARGET_OS
1500   pid_t pid;
1501   StgTSO* t,*next;
1502   StgMainThread *m;
1503   rtsBool doKill;
1504
1505   IF_DEBUG(scheduler,sched_belch("forking!"));
1506
1507   pid = fork();
1508   if (pid) { /* parent */
1509
1510   /* just return the pid */
1511     
1512   } else { /* child */
1513   /* wipe all other threads */
1514   run_queue_hd = run_queue_tl = tso;
1515   tso->link = END_TSO_QUEUE;
1516
1517   /* When clearing out the threads, we need to ensure
1518      that a 'main thread' is left behind; if there isn't,
1519      the Scheduler will shutdown next time it is entered.
1520      
1521      ==> we don't kill a thread that's on the main_threads
1522          list (nor the current thread.)
1523     
1524      [ Attempts at implementing the more ambitious scheme of
1525        killing the main_threads also, and then adding the
1526        current thread onto the main_threads list if it wasn't
1527        there already, failed -- waitThread() (for one) wasn't
1528        up to it. If it proves to be desirable to also kill
1529        the main threads, then this scheme will have to be
1530        revisited (and fully debugged!)
1531        
1532        -- sof 7/2002
1533      ]
1534   */
1535   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1536      us is picky about finding the thread still in its queue when
1537      handling the deleteThread() */
1538
1539   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1540     next = t->link;
1541     
1542     /* Don't kill the current thread.. */
1543     if (t->id == tso->id) continue;
1544     doKill=rtsTrue;
1545     /* ..or a main thread */
1546     for (m = main_threads; m != NULL; m = m->link) {
1547         if (m->tso->id == t->id) {
1548           doKill=rtsFalse;
1549           break;
1550         }
1551     }
1552     if (doKill) {
1553       deleteThread(t);
1554     }
1555   }
1556   }
1557   return pid;
1558 #else /* mingw32 */
1559   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1560   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1561   return -1;
1562 #endif /* mingw32 */
1563 }
1564
1565 /* ---------------------------------------------------------------------------
1566  * deleteAllThreads():  kill all the live threads.
1567  *
1568  * This is used when we catch a user interrupt (^C), before performing
1569  * any necessary cleanups and running finalizers.
1570  *
1571  * Locks: sched_mutex held.
1572  * ------------------------------------------------------------------------- */
1573    
1574 void deleteAllThreads ( void )
1575 {
1576   StgTSO* t, *next;
1577   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1578   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1579       next = t->global_link;
1580       deleteThread(t);
1581   }      
1582   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1583   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1584   sleeping_queue = END_TSO_QUEUE;
1585 }
1586
1587 /* startThread and  insertThread are now in GranSim.c -- HWL */
1588
1589
1590 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1591 //@subsection Suspend and Resume
1592
1593 /* ---------------------------------------------------------------------------
1594  * Suspending & resuming Haskell threads.
1595  * 
1596  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1597  * its capability before calling the C function.  This allows another
1598  * task to pick up the capability and carry on running Haskell
1599  * threads.  It also means that if the C call blocks, it won't lock
1600  * the whole system.
1601  *
1602  * The Haskell thread making the C call is put to sleep for the
1603  * duration of the call, on the susepended_ccalling_threads queue.  We
1604  * give out a token to the task, which it can use to resume the thread
1605  * on return from the C function.
1606  * ------------------------------------------------------------------------- */
1607    
1608 StgInt
1609 suspendThread( StgRegTable *reg, 
1610                rtsBool concCall
1611 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1612                STG_UNUSED
1613 #endif
1614                )
1615 {
1616   nat tok;
1617   Capability *cap;
1618
1619   /* assume that *reg is a pointer to the StgRegTable part
1620    * of a Capability.
1621    */
1622   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1623
1624   ACQUIRE_LOCK(&sched_mutex);
1625
1626   IF_DEBUG(scheduler,
1627            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1628
1629   // XXX this might not be necessary --SDM
1630   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1631
1632   threadPaused(cap->r.rCurrentTSO);
1633   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1634   suspended_ccalling_threads = cap->r.rCurrentTSO;
1635
1636 #if defined(RTS_SUPPORTS_THREADS)
1637   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1638   {
1639       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1640       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1641   }
1642   else
1643   {
1644       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1645   }
1646 #endif
1647
1648   /* Use the thread ID as the token; it should be unique */
1649   tok = cap->r.rCurrentTSO->id;
1650
1651   /* Hand back capability */
1652   releaseCapability(cap);
1653   
1654 #if defined(RTS_SUPPORTS_THREADS)
1655   /* Preparing to leave the RTS, so ensure there's a native thread/task
1656      waiting to take over.
1657   */
1658   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
1659   //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
1660       startTask(taskStart);
1661   //}
1662 #endif
1663
1664   /* Other threads _might_ be available for execution; signal this */
1665   THREAD_RUNNABLE();
1666   RELEASE_LOCK(&sched_mutex);
1667   return tok; 
1668 }
1669
1670 StgRegTable *
1671 resumeThread( StgInt tok,
1672               rtsBool concCall
1673 #if !defined(RTS_SUPPORTS_THREADS)
1674                STG_UNUSED
1675 #endif
1676               )
1677 {
1678   StgTSO *tso, **prev;
1679   Capability *cap;
1680
1681 #if defined(RTS_SUPPORTS_THREADS)
1682   /* Wait for permission to re-enter the RTS with the result. */
1683   ACQUIRE_LOCK(&sched_mutex);
1684   grabReturnCapability(&sched_mutex, &cap);
1685
1686   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
1687 #else
1688   grabCapability(&cap);
1689 #endif
1690
1691   /* Remove the thread off of the suspended list */
1692   prev = &suspended_ccalling_threads;
1693   for (tso = suspended_ccalling_threads; 
1694        tso != END_TSO_QUEUE; 
1695        prev = &tso->link, tso = tso->link) {
1696     if (tso->id == (StgThreadID)tok) {
1697       *prev = tso->link;
1698       break;
1699     }
1700   }
1701   if (tso == END_TSO_QUEUE) {
1702     barf("resumeThread: thread not found");
1703   }
1704   tso->link = END_TSO_QUEUE;
1705   
1706 #if defined(RTS_SUPPORTS_THREADS)
1707   if(tso->why_blocked == BlockedOnCCall)
1708   {
1709       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1710       tso->blocked_exceptions = NULL;
1711   }
1712 #endif
1713   
1714   /* Reset blocking status */
1715   tso->why_blocked  = NotBlocked;
1716
1717   cap->r.rCurrentTSO = tso;
1718 #if defined(RTS_SUPPORTS_THREADS)
1719   RELEASE_LOCK(&sched_mutex);
1720 #endif
1721   return &cap->r;
1722 }
1723
1724
1725 /* ---------------------------------------------------------------------------
1726  * Static functions
1727  * ------------------------------------------------------------------------ */
1728 static void unblockThread(StgTSO *tso);
1729
1730 /* ---------------------------------------------------------------------------
1731  * Comparing Thread ids.
1732  *
1733  * This is used from STG land in the implementation of the
1734  * instances of Eq/Ord for ThreadIds.
1735  * ------------------------------------------------------------------------ */
1736
1737 int
1738 cmp_thread(StgPtr tso1, StgPtr tso2) 
1739
1740   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1741   StgThreadID id2 = ((StgTSO *)tso2)->id;
1742  
1743   if (id1 < id2) return (-1);
1744   if (id1 > id2) return 1;
1745   return 0;
1746 }
1747
1748 /* ---------------------------------------------------------------------------
1749  * Fetching the ThreadID from an StgTSO.
1750  *
1751  * This is used in the implementation of Show for ThreadIds.
1752  * ------------------------------------------------------------------------ */
1753 int
1754 rts_getThreadId(StgPtr tso) 
1755 {
1756   return ((StgTSO *)tso)->id;
1757 }
1758
1759 #ifdef DEBUG
1760 void
1761 labelThread(StgPtr tso, char *label)
1762 {
1763   int len;
1764   void *buf;
1765
1766   /* Caveat: Once set, you can only set the thread name to "" */
1767   len = strlen(label)+1;
1768   buf = malloc(len);
1769   if (buf == NULL) {
1770     fprintf(stderr,"insufficient memory for labelThread!\n");
1771   } else
1772     strncpy(buf,label,len);
1773   /* Update will free the old memory for us */
1774   updateThreadLabel((StgWord)tso,buf);
1775 }
1776 #endif /* DEBUG */
1777
1778 /* ---------------------------------------------------------------------------
1779    Create a new thread.
1780
1781    The new thread starts with the given stack size.  Before the
1782    scheduler can run, however, this thread needs to have a closure
1783    (and possibly some arguments) pushed on its stack.  See
1784    pushClosure() in Schedule.h.
1785
1786    createGenThread() and createIOThread() (in SchedAPI.h) are
1787    convenient packaged versions of this function.
1788
1789    currently pri (priority) is only used in a GRAN setup -- HWL
1790    ------------------------------------------------------------------------ */
1791 //@cindex createThread
1792 #if defined(GRAN)
1793 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1794 StgTSO *
1795 createThread(nat size, StgInt pri)
1796 #else
1797 StgTSO *
1798 createThread(nat size)
1799 #endif
1800 {
1801
1802     StgTSO *tso;
1803     nat stack_size;
1804
1805     /* First check whether we should create a thread at all */
1806 #if defined(PAR)
1807   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1808   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1809     threadsIgnored++;
1810     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1811           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1812     return END_TSO_QUEUE;
1813   }
1814   threadsCreated++;
1815 #endif
1816
1817 #if defined(GRAN)
1818   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1819 #endif
1820
1821   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1822
1823   /* catch ridiculously small stack sizes */
1824   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1825     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1826   }
1827
1828   stack_size = size - TSO_STRUCT_SIZEW;
1829
1830   tso = (StgTSO *)allocate(size);
1831   TICK_ALLOC_TSO(stack_size, 0);
1832
1833   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1834 #if defined(GRAN)
1835   SET_GRAN_HDR(tso, ThisPE);
1836 #endif
1837
1838   // Always start with the compiled code evaluator
1839   tso->what_next = ThreadRunGHC;
1840
1841   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1842    * protect the increment operation on next_thread_id.
1843    * In future, we could use an atomic increment instead.
1844    */
1845   ACQUIRE_LOCK(&thread_id_mutex);
1846   tso->id = next_thread_id++; 
1847   RELEASE_LOCK(&thread_id_mutex);
1848
1849   tso->why_blocked  = NotBlocked;
1850   tso->blocked_exceptions = NULL;
1851
1852   tso->stack_size   = stack_size;
1853   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1854                               - TSO_STRUCT_SIZEW;
1855   tso->sp           = (P_)&(tso->stack) + stack_size;
1856
1857 #ifdef PROFILING
1858   tso->prof.CCCS = CCS_MAIN;
1859 #endif
1860
1861   /* put a stop frame on the stack */
1862   tso->sp -= sizeofW(StgStopFrame);
1863   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1864   // ToDo: check this
1865 #if defined(GRAN)
1866   tso->link = END_TSO_QUEUE;
1867   /* uses more flexible routine in GranSim */
1868   insertThread(tso, CurrentProc);
1869 #else
1870   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1871    * from its creation
1872    */
1873 #endif
1874
1875 #if defined(GRAN) 
1876   if (RtsFlags.GranFlags.GranSimStats.Full) 
1877     DumpGranEvent(GR_START,tso);
1878 #elif defined(PAR)
1879   if (RtsFlags.ParFlags.ParStats.Full) 
1880     DumpGranEvent(GR_STARTQ,tso);
1881   /* HACk to avoid SCHEDULE 
1882      LastTSO = tso; */
1883 #endif
1884
1885   /* Link the new thread on the global thread list.
1886    */
1887   tso->global_link = all_threads;
1888   all_threads = tso;
1889
1890 #if defined(DIST)
1891   tso->dist.priority = MandatoryPriority; //by default that is...
1892 #endif
1893
1894 #if defined(GRAN)
1895   tso->gran.pri = pri;
1896 # if defined(DEBUG)
1897   tso->gran.magic = TSO_MAGIC; // debugging only
1898 # endif
1899   tso->gran.sparkname   = 0;
1900   tso->gran.startedat   = CURRENT_TIME; 
1901   tso->gran.exported    = 0;
1902   tso->gran.basicblocks = 0;
1903   tso->gran.allocs      = 0;
1904   tso->gran.exectime    = 0;
1905   tso->gran.fetchtime   = 0;
1906   tso->gran.fetchcount  = 0;
1907   tso->gran.blocktime   = 0;
1908   tso->gran.blockcount  = 0;
1909   tso->gran.blockedat   = 0;
1910   tso->gran.globalsparks = 0;
1911   tso->gran.localsparks  = 0;
1912   if (RtsFlags.GranFlags.Light)
1913     tso->gran.clock  = Now; /* local clock */
1914   else
1915     tso->gran.clock  = 0;
1916
1917   IF_DEBUG(gran,printTSO(tso));
1918 #elif defined(PAR)
1919 # if defined(DEBUG)
1920   tso->par.magic = TSO_MAGIC; // debugging only
1921 # endif
1922   tso->par.sparkname   = 0;
1923   tso->par.startedat   = CURRENT_TIME; 
1924   tso->par.exported    = 0;
1925   tso->par.basicblocks = 0;
1926   tso->par.allocs      = 0;
1927   tso->par.exectime    = 0;
1928   tso->par.fetchtime   = 0;
1929   tso->par.fetchcount  = 0;
1930   tso->par.blocktime   = 0;
1931   tso->par.blockcount  = 0;
1932   tso->par.blockedat   = 0;
1933   tso->par.globalsparks = 0;
1934   tso->par.localsparks  = 0;
1935 #endif
1936
1937 #if defined(GRAN)
1938   globalGranStats.tot_threads_created++;
1939   globalGranStats.threads_created_on_PE[CurrentProc]++;
1940   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1941   globalGranStats.tot_sq_probes++;
1942 #elif defined(PAR)
1943   // collect parallel global statistics (currently done together with GC stats)
1944   if (RtsFlags.ParFlags.ParStats.Global &&
1945       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1946     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1947     globalParStats.tot_threads_created++;
1948   }
1949 #endif 
1950
1951 #if defined(GRAN)
1952   IF_GRAN_DEBUG(pri,
1953                 belch("==__ schedule: Created TSO %d (%p);",
1954                       CurrentProc, tso, tso->id));
1955 #elif defined(PAR)
1956     IF_PAR_DEBUG(verbose,
1957                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1958                        tso->id, tso, advisory_thread_count));
1959 #else
1960   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1961                                  tso->id, tso->stack_size));
1962 #endif    
1963   return tso;
1964 }
1965
1966 #if defined(PAR)
1967 /* RFP:
1968    all parallel thread creation calls should fall through the following routine.
1969 */
1970 StgTSO *
1971 createSparkThread(rtsSpark spark) 
1972 { StgTSO *tso;
1973   ASSERT(spark != (rtsSpark)NULL);
1974   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1975   { threadsIgnored++;
1976     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1977           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1978     return END_TSO_QUEUE;
1979   }
1980   else
1981   { threadsCreated++;
1982     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1983     if (tso==END_TSO_QUEUE)     
1984       barf("createSparkThread: Cannot create TSO");
1985 #if defined(DIST)
1986     tso->priority = AdvisoryPriority;
1987 #endif
1988     pushClosure(tso,spark);
1989     PUSH_ON_RUN_QUEUE(tso);
1990     advisory_thread_count++;    
1991   }
1992   return tso;
1993 }
1994 #endif
1995
1996 /*
1997   Turn a spark into a thread.
1998   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1999 */
2000 #if defined(PAR)
2001 //@cindex activateSpark
2002 StgTSO *
2003 activateSpark (rtsSpark spark) 
2004 {
2005   StgTSO *tso;
2006
2007   tso = createSparkThread(spark);
2008   if (RtsFlags.ParFlags.ParStats.Full) {   
2009     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2010     IF_PAR_DEBUG(verbose,
2011                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
2012                        (StgClosure *)spark, info_type((StgClosure *)spark)));
2013   }
2014   // ToDo: fwd info on local/global spark to thread -- HWL
2015   // tso->gran.exported =  spark->exported;
2016   // tso->gran.locked =   !spark->global;
2017   // tso->gran.sparkname = spark->name;
2018
2019   return tso;
2020 }
2021 #endif
2022
2023 static SchedulerStatus waitThread_(/*out*/StgMainThread* m
2024 #if defined(THREADED_RTS)
2025                                    , rtsBool blockWaiting
2026 #endif
2027                                    );
2028
2029
2030 /* ---------------------------------------------------------------------------
2031  * scheduleThread()
2032  *
2033  * scheduleThread puts a thread on the head of the runnable queue.
2034  * This will usually be done immediately after a thread is created.
2035  * The caller of scheduleThread must create the thread using e.g.
2036  * createThread and push an appropriate closure
2037  * on this thread's stack before the scheduler is invoked.
2038  * ------------------------------------------------------------------------ */
2039
2040 static void scheduleThread_ (StgTSO* tso);
2041
2042 void
2043 scheduleThread_(StgTSO *tso)
2044 {
2045   // Precondition: sched_mutex must be held.
2046
2047   /* Put the new thread on the head of the runnable queue.  The caller
2048    * better push an appropriate closure on this thread's stack
2049    * beforehand.  In the SMP case, the thread may start running as
2050    * soon as we release the scheduler lock below.
2051    */
2052   PUSH_ON_RUN_QUEUE(tso);
2053   THREAD_RUNNABLE();
2054
2055 #if 0
2056   IF_DEBUG(scheduler,printTSO(tso));
2057 #endif
2058 }
2059
2060 void scheduleThread(StgTSO* tso)
2061 {
2062   ACQUIRE_LOCK(&sched_mutex);
2063   scheduleThread_(tso);
2064   RELEASE_LOCK(&sched_mutex);
2065 }
2066
2067 SchedulerStatus
2068 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
2069 {       // Precondition: sched_mutex must be held
2070   StgMainThread *m;
2071
2072   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2073   m->tso = tso;
2074   m->ret = ret;
2075   m->stat = NoStatus;
2076 #if defined(RTS_SUPPORTS_THREADS)
2077   initCondition(&m->wakeup);
2078 #endif
2079
2080   /* Put the thread on the main-threads list prior to scheduling the TSO.
2081      Failure to do so introduces a race condition in the MT case (as
2082      identified by Wolfgang Thaller), whereby the new task/OS thread 
2083      created by scheduleThread_() would complete prior to the thread
2084      that spawned it managed to put 'itself' on the main-threads list.
2085      The upshot of it all being that the worker thread wouldn't get to
2086      signal the completion of the its work item for the main thread to
2087      see (==> it got stuck waiting.)    -- sof 6/02.
2088   */
2089   IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
2090   
2091   m->link = main_threads;
2092   main_threads = m;
2093
2094   scheduleThread_(tso);
2095 #if defined(THREADED_RTS)
2096   return waitThread_(m, rtsTrue);
2097 #else
2098   return waitThread_(m);
2099 #endif
2100 }
2101
2102 /* ---------------------------------------------------------------------------
2103  * initScheduler()
2104  *
2105  * Initialise the scheduler.  This resets all the queues - if the
2106  * queues contained any threads, they'll be garbage collected at the
2107  * next pass.
2108  *
2109  * ------------------------------------------------------------------------ */
2110
2111 #ifdef SMP
2112 static void
2113 term_handler(int sig STG_UNUSED)
2114 {
2115   stat_workerStop();
2116   ACQUIRE_LOCK(&term_mutex);
2117   await_death--;
2118   RELEASE_LOCK(&term_mutex);
2119   shutdownThread();
2120 }
2121 #endif
2122
2123 void 
2124 initScheduler(void)
2125 {
2126 #if defined(GRAN)
2127   nat i;
2128
2129   for (i=0; i<=MAX_PROC; i++) {
2130     run_queue_hds[i]      = END_TSO_QUEUE;
2131     run_queue_tls[i]      = END_TSO_QUEUE;
2132     blocked_queue_hds[i]  = END_TSO_QUEUE;
2133     blocked_queue_tls[i]  = END_TSO_QUEUE;
2134     ccalling_threadss[i]  = END_TSO_QUEUE;
2135     sleeping_queue        = END_TSO_QUEUE;
2136   }
2137 #else
2138   run_queue_hd      = END_TSO_QUEUE;
2139   run_queue_tl      = END_TSO_QUEUE;
2140   blocked_queue_hd  = END_TSO_QUEUE;
2141   blocked_queue_tl  = END_TSO_QUEUE;
2142   sleeping_queue    = END_TSO_QUEUE;
2143 #endif 
2144
2145   suspended_ccalling_threads  = END_TSO_QUEUE;
2146
2147   main_threads = NULL;
2148   all_threads  = END_TSO_QUEUE;
2149
2150   context_switch = 0;
2151   interrupted    = 0;
2152
2153   RtsFlags.ConcFlags.ctxtSwitchTicks =
2154       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2155       
2156 #if defined(RTS_SUPPORTS_THREADS)
2157   /* Initialise the mutex and condition variables used by
2158    * the scheduler. */
2159   initMutex(&sched_mutex);
2160   initMutex(&term_mutex);
2161   initMutex(&thread_id_mutex);
2162
2163   initCondition(&thread_ready_cond);
2164 #endif
2165   
2166 #if defined(SMP)
2167   initCondition(&gc_pending_cond);
2168 #endif
2169
2170 #if defined(RTS_SUPPORTS_THREADS)
2171   ACQUIRE_LOCK(&sched_mutex);
2172 #endif
2173
2174   /* Install the SIGHUP handler */
2175 #if defined(SMP)
2176   {
2177     struct sigaction action,oact;
2178
2179     action.sa_handler = term_handler;
2180     sigemptyset(&action.sa_mask);
2181     action.sa_flags = 0;
2182     if (sigaction(SIGTERM, &action, &oact) != 0) {
2183       barf("can't install TERM handler");
2184     }
2185   }
2186 #endif
2187
2188   /* A capability holds the state a native thread needs in
2189    * order to execute STG code. At least one capability is
2190    * floating around (only SMP builds have more than one).
2191    */
2192   initCapabilities();
2193   
2194 #if defined(RTS_SUPPORTS_THREADS)
2195     /* start our haskell execution tasks */
2196 # if defined(SMP)
2197     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2198 # else
2199     startTaskManager(0,taskStart);
2200 # endif
2201 #endif
2202
2203 #if /* defined(SMP) ||*/ defined(PAR)
2204   initSparkPools();
2205 #endif
2206
2207 #if defined(RTS_SUPPORTS_THREADS)
2208   RELEASE_LOCK(&sched_mutex);
2209 #endif
2210
2211 }
2212
2213 void
2214 exitScheduler( void )
2215 {
2216 #if defined(RTS_SUPPORTS_THREADS)
2217   stopTaskManager();
2218 #endif
2219   shutting_down_scheduler = rtsTrue;
2220 }
2221
2222 /* -----------------------------------------------------------------------------
2223    Managing the per-task allocation areas.
2224    
2225    Each capability comes with an allocation area.  These are
2226    fixed-length block lists into which allocation can be done.
2227
2228    ToDo: no support for two-space collection at the moment???
2229    -------------------------------------------------------------------------- */
2230
2231 /* -----------------------------------------------------------------------------
2232  * waitThread is the external interface for running a new computation
2233  * and waiting for the result.
2234  *
2235  * In the non-SMP case, we create a new main thread, push it on the 
2236  * main-thread stack, and invoke the scheduler to run it.  The
2237  * scheduler will return when the top main thread on the stack has
2238  * completed or died, and fill in the necessary fields of the
2239  * main_thread structure.
2240  *
2241  * In the SMP case, we create a main thread as before, but we then
2242  * create a new condition variable and sleep on it.  When our new
2243  * main thread has completed, we'll be woken up and the status/result
2244  * will be in the main_thread struct.
2245  * -------------------------------------------------------------------------- */
2246
2247 int 
2248 howManyThreadsAvail ( void )
2249 {
2250    int i = 0;
2251    StgTSO* q;
2252    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2253       i++;
2254    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2255       i++;
2256    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2257       i++;
2258    return i;
2259 }
2260
2261 void
2262 finishAllThreads ( void )
2263 {
2264    do {
2265       while (run_queue_hd != END_TSO_QUEUE) {
2266          waitThread ( run_queue_hd, NULL);
2267       }
2268       while (blocked_queue_hd != END_TSO_QUEUE) {
2269          waitThread ( blocked_queue_hd, NULL);
2270       }
2271       while (sleeping_queue != END_TSO_QUEUE) {
2272          waitThread ( blocked_queue_hd, NULL);
2273       }
2274    } while 
2275       (blocked_queue_hd != END_TSO_QUEUE || 
2276        run_queue_hd     != END_TSO_QUEUE ||
2277        sleeping_queue   != END_TSO_QUEUE);
2278 }
2279
2280 SchedulerStatus
2281 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2282
2283   StgMainThread *m;
2284   SchedulerStatus stat;
2285
2286   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2287   m->tso = tso;
2288   m->ret = ret;
2289   m->stat = NoStatus;
2290 #if defined(RTS_SUPPORTS_THREADS)
2291   initCondition(&m->wakeup);
2292 #endif
2293
2294   /* see scheduleWaitThread() comment */
2295   ACQUIRE_LOCK(&sched_mutex);
2296   m->link = main_threads;
2297   main_threads = m;
2298
2299   IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
2300 #if defined(THREADED_RTS)
2301   stat = waitThread_(m, rtsFalse);
2302 #else
2303   stat = waitThread_(m);
2304 #endif
2305   RELEASE_LOCK(&sched_mutex);
2306   return stat;
2307 }
2308
2309 static
2310 SchedulerStatus
2311 waitThread_(StgMainThread* m
2312 #if defined(THREADED_RTS)
2313             , rtsBool blockWaiting
2314 #endif
2315            )
2316 {
2317   SchedulerStatus stat;
2318
2319   // Precondition: sched_mutex must be held.
2320   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2321
2322 #if defined(RTS_SUPPORTS_THREADS)
2323
2324 # if defined(THREADED_RTS)
2325   if (!blockWaiting) {
2326     /* In the threaded case, the OS thread that called main()
2327      * gets to enter the RTS directly without going via another
2328      * task/thread.
2329      */
2330     main_main_thread = m;
2331     RELEASE_LOCK(&sched_mutex);
2332     schedule();
2333     ACQUIRE_LOCK(&sched_mutex);
2334     main_main_thread = NULL;
2335     ASSERT(m->stat != NoStatus);
2336   } else 
2337 # endif
2338   {
2339     do {
2340       waitCondition(&m->wakeup, &sched_mutex);
2341     } while (m->stat == NoStatus);
2342   }
2343 #elif defined(GRAN)
2344   /* GranSim specific init */
2345   CurrentTSO = m->tso;                // the TSO to run
2346   procStatus[MainProc] = Busy;        // status of main PE
2347   CurrentProc = MainProc;             // PE to run it on
2348
2349   RELEASE_LOCK(&sched_mutex);
2350   schedule();
2351 #else
2352   RELEASE_LOCK(&sched_mutex);
2353   schedule();
2354   ASSERT(m->stat != NoStatus);
2355 #endif
2356
2357   stat = m->stat;
2358
2359 #if defined(RTS_SUPPORTS_THREADS)
2360   closeCondition(&m->wakeup);
2361 #endif
2362
2363   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2364                               m->tso->id));
2365   free(m);
2366
2367   // Postcondition: sched_mutex still held
2368   return stat;
2369 }
2370
2371 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2372 //@subsection Run queue code 
2373
2374 #if 0
2375 /* 
2376    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2377        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2378        implicit global variable that has to be correct when calling these
2379        fcts -- HWL 
2380 */
2381
2382 /* Put the new thread on the head of the runnable queue.
2383  * The caller of createThread better push an appropriate closure
2384  * on this thread's stack before the scheduler is invoked.
2385  */
2386 static /* inline */ void
2387 add_to_run_queue(tso)
2388 StgTSO* tso; 
2389 {
2390   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2391   tso->link = run_queue_hd;
2392   run_queue_hd = tso;
2393   if (run_queue_tl == END_TSO_QUEUE) {
2394     run_queue_tl = tso;
2395   }
2396 }
2397
2398 /* Put the new thread at the end of the runnable queue. */
2399 static /* inline */ void
2400 push_on_run_queue(tso)
2401 StgTSO* tso; 
2402 {
2403   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2404   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2405   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2406   if (run_queue_hd == END_TSO_QUEUE) {
2407     run_queue_hd = tso;
2408   } else {
2409     run_queue_tl->link = tso;
2410   }
2411   run_queue_tl = tso;
2412 }
2413
2414 /* 
2415    Should be inlined because it's used very often in schedule.  The tso
2416    argument is actually only needed in GranSim, where we want to have the
2417    possibility to schedule *any* TSO on the run queue, irrespective of the
2418    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2419    the run queue and dequeue the tso, adjusting the links in the queue. 
2420 */
2421 //@cindex take_off_run_queue
2422 static /* inline */ StgTSO*
2423 take_off_run_queue(StgTSO *tso) {
2424   StgTSO *t, *prev;
2425
2426   /* 
2427      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2428
2429      if tso is specified, unlink that tso from the run_queue (doesn't have
2430      to be at the beginning of the queue); GranSim only 
2431   */
2432   if (tso!=END_TSO_QUEUE) {
2433     /* find tso in queue */
2434     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2435          t!=END_TSO_QUEUE && t!=tso;
2436          prev=t, t=t->link) 
2437       /* nothing */ ;
2438     ASSERT(t==tso);
2439     /* now actually dequeue the tso */
2440     if (prev!=END_TSO_QUEUE) {
2441       ASSERT(run_queue_hd!=t);
2442       prev->link = t->link;
2443     } else {
2444       /* t is at beginning of thread queue */
2445       ASSERT(run_queue_hd==t);
2446       run_queue_hd = t->link;
2447     }
2448     /* t is at end of thread queue */
2449     if (t->link==END_TSO_QUEUE) {
2450       ASSERT(t==run_queue_tl);
2451       run_queue_tl = prev;
2452     } else {
2453       ASSERT(run_queue_tl!=t);
2454     }
2455     t->link = END_TSO_QUEUE;
2456   } else {
2457     /* take tso from the beginning of the queue; std concurrent code */
2458     t = run_queue_hd;
2459     if (t != END_TSO_QUEUE) {
2460       run_queue_hd = t->link;
2461       t->link = END_TSO_QUEUE;
2462       if (run_queue_hd == END_TSO_QUEUE) {
2463         run_queue_tl = END_TSO_QUEUE;
2464       }
2465     }
2466   }
2467   return t;
2468 }
2469
2470 #endif /* 0 */
2471
2472 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2473 //@subsection Garbage Collextion Routines
2474
2475 /* ---------------------------------------------------------------------------
2476    Where are the roots that we know about?
2477
2478         - all the threads on the runnable queue
2479         - all the threads on the blocked queue
2480         - all the threads on the sleeping queue
2481         - all the thread currently executing a _ccall_GC
2482         - all the "main threads"
2483      
2484    ------------------------------------------------------------------------ */
2485
2486 /* This has to be protected either by the scheduler monitor, or by the
2487         garbage collection monitor (probably the latter).
2488         KH @ 25/10/99
2489 */
2490
2491 void
2492 GetRoots(evac_fn evac)
2493 {
2494 #if defined(GRAN)
2495   {
2496     nat i;
2497     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2498       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2499           evac((StgClosure **)&run_queue_hds[i]);
2500       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2501           evac((StgClosure **)&run_queue_tls[i]);
2502       
2503       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2504           evac((StgClosure **)&blocked_queue_hds[i]);
2505       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2506           evac((StgClosure **)&blocked_queue_tls[i]);
2507       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2508           evac((StgClosure **)&ccalling_threads[i]);
2509     }
2510   }
2511
2512   markEventQueue();
2513
2514 #else /* !GRAN */
2515   if (run_queue_hd != END_TSO_QUEUE) {
2516       ASSERT(run_queue_tl != END_TSO_QUEUE);
2517       evac((StgClosure **)&run_queue_hd);
2518       evac((StgClosure **)&run_queue_tl);
2519   }
2520   
2521   if (blocked_queue_hd != END_TSO_QUEUE) {
2522       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2523       evac((StgClosure **)&blocked_queue_hd);
2524       evac((StgClosure **)&blocked_queue_tl);
2525   }
2526   
2527   if (sleeping_queue != END_TSO_QUEUE) {
2528       evac((StgClosure **)&sleeping_queue);
2529   }
2530 #endif 
2531
2532   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2533       evac((StgClosure **)&suspended_ccalling_threads);
2534   }
2535
2536 #if defined(PAR) || defined(GRAN)
2537   markSparkQueue(evac);
2538 #endif
2539
2540 #ifndef mingw32_TARGET_OS
2541   // mark the signal handlers (signals should be already blocked)
2542   markSignalHandlers(evac);
2543 #endif
2544
2545   // main threads which have completed need to be retained until they
2546   // are dealt with in the main scheduler loop.  They won't be
2547   // retained any other way: the GC will drop them from the
2548   // all_threads list, so we have to be careful to treat them as roots
2549   // here.
2550   { 
2551       StgMainThread *m;
2552       for (m = main_threads; m != NULL; m = m->link) {
2553           switch (m->tso->what_next) {
2554           case ThreadComplete:
2555           case ThreadKilled:
2556               evac((StgClosure **)&m->tso);
2557               break;
2558           default:
2559               break;
2560           }
2561       }
2562   }
2563 }
2564
2565 /* -----------------------------------------------------------------------------
2566    performGC
2567
2568    This is the interface to the garbage collector from Haskell land.
2569    We provide this so that external C code can allocate and garbage
2570    collect when called from Haskell via _ccall_GC.
2571
2572    It might be useful to provide an interface whereby the programmer
2573    can specify more roots (ToDo).
2574    
2575    This needs to be protected by the GC condition variable above.  KH.
2576    -------------------------------------------------------------------------- */
2577
2578 static void (*extra_roots)(evac_fn);
2579
2580 void
2581 performGC(void)
2582 {
2583   /* Obligated to hold this lock upon entry */
2584   ACQUIRE_LOCK(&sched_mutex);
2585   GarbageCollect(GetRoots,rtsFalse);
2586   RELEASE_LOCK(&sched_mutex);
2587 }
2588
2589 void
2590 performMajorGC(void)
2591 {
2592   ACQUIRE_LOCK(&sched_mutex);
2593   GarbageCollect(GetRoots,rtsTrue);
2594   RELEASE_LOCK(&sched_mutex);
2595 }
2596
2597 static void
2598 AllRoots(evac_fn evac)
2599 {
2600     GetRoots(evac);             // the scheduler's roots
2601     extra_roots(evac);          // the user's roots
2602 }
2603
2604 void
2605 performGCWithRoots(void (*get_roots)(evac_fn))
2606 {
2607   ACQUIRE_LOCK(&sched_mutex);
2608   extra_roots = get_roots;
2609   GarbageCollect(AllRoots,rtsFalse);
2610   RELEASE_LOCK(&sched_mutex);
2611 }
2612
2613 /* -----------------------------------------------------------------------------
2614    Stack overflow
2615
2616    If the thread has reached its maximum stack size, then raise the
2617    StackOverflow exception in the offending thread.  Otherwise
2618    relocate the TSO into a larger chunk of memory and adjust its stack
2619    size appropriately.
2620    -------------------------------------------------------------------------- */
2621
2622 static StgTSO *
2623 threadStackOverflow(StgTSO *tso)
2624 {
2625   nat new_stack_size, new_tso_size, diff, stack_words;
2626   StgPtr new_sp;
2627   StgTSO *dest;
2628
2629   IF_DEBUG(sanity,checkTSO(tso));
2630   if (tso->stack_size >= tso->max_stack_size) {
2631
2632     IF_DEBUG(gc,
2633              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2634                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2635              /* If we're debugging, just print out the top of the stack */
2636              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2637                                               tso->sp+64)));
2638
2639     /* Send this thread the StackOverflow exception */
2640     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2641     return tso;
2642   }
2643
2644   /* Try to double the current stack size.  If that takes us over the
2645    * maximum stack size for this thread, then use the maximum instead.
2646    * Finally round up so the TSO ends up as a whole number of blocks.
2647    */
2648   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2649   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2650                                        TSO_STRUCT_SIZE)/sizeof(W_);
2651   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2652   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2653
2654   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2655
2656   dest = (StgTSO *)allocate(new_tso_size);
2657   TICK_ALLOC_TSO(new_stack_size,0);
2658
2659   /* copy the TSO block and the old stack into the new area */
2660   memcpy(dest,tso,TSO_STRUCT_SIZE);
2661   stack_words = tso->stack + tso->stack_size - tso->sp;
2662   new_sp = (P_)dest + new_tso_size - stack_words;
2663   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2664
2665   /* relocate the stack pointers... */
2666   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2667   dest->sp    = new_sp;
2668   dest->stack_size = new_stack_size;
2669         
2670   /* Mark the old TSO as relocated.  We have to check for relocated
2671    * TSOs in the garbage collector and any primops that deal with TSOs.
2672    *
2673    * It's important to set the sp value to just beyond the end
2674    * of the stack, so we don't attempt to scavenge any part of the
2675    * dead TSO's stack.
2676    */
2677   tso->what_next = ThreadRelocated;
2678   tso->link = dest;
2679   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2680   tso->why_blocked = NotBlocked;
2681   dest->mut_link = NULL;
2682
2683   IF_PAR_DEBUG(verbose,
2684                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2685                      tso->id, tso, tso->stack_size);
2686                /* If we're debugging, just print out the top of the stack */
2687                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2688                                                 tso->sp+64)));
2689   
2690   IF_DEBUG(sanity,checkTSO(tso));
2691 #if 0
2692   IF_DEBUG(scheduler,printTSO(dest));
2693 #endif
2694
2695   return dest;
2696 }
2697
2698 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2699 //@subsection Blocking Queue Routines
2700
2701 /* ---------------------------------------------------------------------------
2702    Wake up a queue that was blocked on some resource.
2703    ------------------------------------------------------------------------ */
2704
2705 #if defined(GRAN)
2706 static inline void
2707 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2708 {
2709 }
2710 #elif defined(PAR)
2711 static inline void
2712 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2713 {
2714   /* write RESUME events to log file and
2715      update blocked and fetch time (depending on type of the orig closure) */
2716   if (RtsFlags.ParFlags.ParStats.Full) {
2717     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2718                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2719                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2720     if (EMPTY_RUN_QUEUE())
2721       emitSchedule = rtsTrue;
2722
2723     switch (get_itbl(node)->type) {
2724         case FETCH_ME_BQ:
2725           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2726           break;
2727         case RBH:
2728         case FETCH_ME:
2729         case BLACKHOLE_BQ:
2730           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2731           break;
2732 #ifdef DIST
2733         case MVAR:
2734           break;
2735 #endif    
2736         default:
2737           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2738         }
2739       }
2740 }
2741 #endif
2742
2743 #if defined(GRAN)
2744 static StgBlockingQueueElement *
2745 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2746 {
2747     StgTSO *tso;
2748     PEs node_loc, tso_loc;
2749
2750     node_loc = where_is(node); // should be lifted out of loop
2751     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2752     tso_loc = where_is((StgClosure *)tso);
2753     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2754       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2755       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2756       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2757       // insertThread(tso, node_loc);
2758       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2759                 ResumeThread,
2760                 tso, node, (rtsSpark*)NULL);
2761       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2762       // len_local++;
2763       // len++;
2764     } else { // TSO is remote (actually should be FMBQ)
2765       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2766                                   RtsFlags.GranFlags.Costs.gunblocktime +
2767                                   RtsFlags.GranFlags.Costs.latency;
2768       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2769                 UnblockThread,
2770                 tso, node, (rtsSpark*)NULL);
2771       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2772       // len++;
2773     }
2774     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2775     IF_GRAN_DEBUG(bq,
2776                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2777                           (node_loc==tso_loc ? "Local" : "Global"), 
2778                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2779     tso->block_info.closure = NULL;
2780     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2781                              tso->id, tso));
2782 }
2783 #elif defined(PAR)
2784 static StgBlockingQueueElement *
2785 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2786 {
2787     StgBlockingQueueElement *next;
2788
2789     switch (get_itbl(bqe)->type) {
2790     case TSO:
2791       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2792       /* if it's a TSO just push it onto the run_queue */
2793       next = bqe->link;
2794       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2795       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2796       THREAD_RUNNABLE();
2797       unblockCount(bqe, node);
2798       /* reset blocking status after dumping event */
2799       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2800       break;
2801
2802     case BLOCKED_FETCH:
2803       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2804       next = bqe->link;
2805       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2806       PendingFetches = (StgBlockedFetch *)bqe;
2807       break;
2808
2809 # if defined(DEBUG)
2810       /* can ignore this case in a non-debugging setup; 
2811          see comments on RBHSave closures above */
2812     case CONSTR:
2813       /* check that the closure is an RBHSave closure */
2814       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2815              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2816              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2817       break;
2818
2819     default:
2820       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2821            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2822            (StgClosure *)bqe);
2823 # endif
2824     }
2825   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2826   return next;
2827 }
2828
2829 #else /* !GRAN && !PAR */
2830 static StgTSO *
2831 unblockOneLocked(StgTSO *tso)
2832 {
2833   StgTSO *next;
2834
2835   ASSERT(get_itbl(tso)->type == TSO);
2836   ASSERT(tso->why_blocked != NotBlocked);
2837   tso->why_blocked = NotBlocked;
2838   next = tso->link;
2839   PUSH_ON_RUN_QUEUE(tso);
2840   THREAD_RUNNABLE();
2841   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2842   return next;
2843 }
2844 #endif
2845
2846 #if defined(GRAN) || defined(PAR)
2847 inline StgBlockingQueueElement *
2848 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2849 {
2850   ACQUIRE_LOCK(&sched_mutex);
2851   bqe = unblockOneLocked(bqe, node);
2852   RELEASE_LOCK(&sched_mutex);
2853   return bqe;
2854 }
2855 #else
2856 inline StgTSO *
2857 unblockOne(StgTSO *tso)
2858 {
2859   ACQUIRE_LOCK(&sched_mutex);
2860   tso = unblockOneLocked(tso);
2861   RELEASE_LOCK(&sched_mutex);
2862   return tso;
2863 }
2864 #endif
2865
2866 #if defined(GRAN)
2867 void 
2868 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2869 {
2870   StgBlockingQueueElement *bqe;
2871   PEs node_loc;
2872   nat len = 0; 
2873
2874   IF_GRAN_DEBUG(bq, 
2875                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2876                       node, CurrentProc, CurrentTime[CurrentProc], 
2877                       CurrentTSO->id, CurrentTSO));
2878
2879   node_loc = where_is(node);
2880
2881   ASSERT(q == END_BQ_QUEUE ||
2882          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2883          get_itbl(q)->type == CONSTR); // closure (type constructor)
2884   ASSERT(is_unique(node));
2885
2886   /* FAKE FETCH: magically copy the node to the tso's proc;
2887      no Fetch necessary because in reality the node should not have been 
2888      moved to the other PE in the first place
2889   */
2890   if (CurrentProc!=node_loc) {
2891     IF_GRAN_DEBUG(bq, 
2892                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2893                         node, node_loc, CurrentProc, CurrentTSO->id, 
2894                         // CurrentTSO, where_is(CurrentTSO),
2895                         node->header.gran.procs));
2896     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2897     IF_GRAN_DEBUG(bq, 
2898                   belch("## new bitmask of node %p is %#x",
2899                         node, node->header.gran.procs));
2900     if (RtsFlags.GranFlags.GranSimStats.Global) {
2901       globalGranStats.tot_fake_fetches++;
2902     }
2903   }
2904
2905   bqe = q;
2906   // ToDo: check: ASSERT(CurrentProc==node_loc);
2907   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2908     //next = bqe->link;
2909     /* 
2910        bqe points to the current element in the queue
2911        next points to the next element in the queue
2912     */
2913     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2914     //tso_loc = where_is(tso);
2915     len++;
2916     bqe = unblockOneLocked(bqe, node);
2917   }
2918
2919   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2920      the closure to make room for the anchor of the BQ */
2921   if (bqe!=END_BQ_QUEUE) {
2922     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2923     /*
2924     ASSERT((info_ptr==&RBH_Save_0_info) ||
2925            (info_ptr==&RBH_Save_1_info) ||
2926            (info_ptr==&RBH_Save_2_info));
2927     */
2928     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2929     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2930     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2931
2932     IF_GRAN_DEBUG(bq,
2933                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2934                         node, info_type(node)));
2935   }
2936
2937   /* statistics gathering */
2938   if (RtsFlags.GranFlags.GranSimStats.Global) {
2939     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2940     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2941     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2942     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2943   }
2944   IF_GRAN_DEBUG(bq,
2945                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2946                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2947 }
2948 #elif defined(PAR)
2949 void 
2950 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2951 {
2952   StgBlockingQueueElement *bqe;
2953
2954   ACQUIRE_LOCK(&sched_mutex);
2955
2956   IF_PAR_DEBUG(verbose, 
2957                belch("##-_ AwBQ for node %p on [%x]: ",
2958                      node, mytid));
2959 #ifdef DIST  
2960   //RFP
2961   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2962     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2963     return;
2964   }
2965 #endif
2966   
2967   ASSERT(q == END_BQ_QUEUE ||
2968          get_itbl(q)->type == TSO ||           
2969          get_itbl(q)->type == BLOCKED_FETCH || 
2970          get_itbl(q)->type == CONSTR); 
2971
2972   bqe = q;
2973   while (get_itbl(bqe)->type==TSO || 
2974          get_itbl(bqe)->type==BLOCKED_FETCH) {
2975     bqe = unblockOneLocked(bqe, node);
2976   }
2977   RELEASE_LOCK(&sched_mutex);
2978 }
2979
2980 #else   /* !GRAN && !PAR */
2981
2982 #ifdef RTS_SUPPORTS_THREADS
2983 void
2984 awakenBlockedQueueNoLock(StgTSO *tso)
2985 {
2986   while (tso != END_TSO_QUEUE) {
2987     tso = unblockOneLocked(tso);
2988   }
2989 }
2990 #endif
2991
2992 void
2993 awakenBlockedQueue(StgTSO *tso)
2994 {
2995   ACQUIRE_LOCK(&sched_mutex);
2996   while (tso != END_TSO_QUEUE) {
2997     tso = unblockOneLocked(tso);
2998   }
2999   RELEASE_LOCK(&sched_mutex);
3000 }
3001 #endif
3002
3003 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
3004 //@subsection Exception Handling Routines
3005
3006 /* ---------------------------------------------------------------------------
3007    Interrupt execution
3008    - usually called inside a signal handler so it mustn't do anything fancy.   
3009    ------------------------------------------------------------------------ */
3010
3011 void
3012 interruptStgRts(void)
3013 {
3014     interrupted    = 1;
3015     context_switch = 1;
3016 }
3017
3018 /* -----------------------------------------------------------------------------
3019    Unblock a thread
3020
3021    This is for use when we raise an exception in another thread, which
3022    may be blocked.
3023    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3024    -------------------------------------------------------------------------- */
3025
3026 #if defined(GRAN) || defined(PAR)
3027 /*
3028   NB: only the type of the blocking queue is different in GranSim and GUM
3029       the operations on the queue-elements are the same
3030       long live polymorphism!
3031
3032   Locks: sched_mutex is held upon entry and exit.
3033
3034 */
3035 static void
3036 unblockThread(StgTSO *tso)
3037 {
3038   StgBlockingQueueElement *t, **last;
3039
3040   switch (tso->why_blocked) {
3041
3042   case NotBlocked:
3043     return;  /* not blocked */
3044
3045   case BlockedOnMVar:
3046     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3047     {
3048       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3049       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3050
3051       last = (StgBlockingQueueElement **)&mvar->head;
3052       for (t = (StgBlockingQueueElement *)mvar->head; 
3053            t != END_BQ_QUEUE; 
3054            last = &t->link, last_tso = t, t = t->link) {
3055         if (t == (StgBlockingQueueElement *)tso) {
3056           *last = (StgBlockingQueueElement *)tso->link;
3057           if (mvar->tail == tso) {
3058             mvar->tail = (StgTSO *)last_tso;
3059           }
3060           goto done;
3061         }
3062       }
3063       barf("unblockThread (MVAR): TSO not found");
3064     }
3065
3066   case BlockedOnBlackHole:
3067     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3068     {
3069       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3070
3071       last = &bq->blocking_queue;
3072       for (t = bq->blocking_queue; 
3073            t != END_BQ_QUEUE; 
3074            last = &t->link, t = t->link) {
3075         if (t == (StgBlockingQueueElement *)tso) {
3076           *last = (StgBlockingQueueElement *)tso->link;
3077           goto done;
3078         }
3079       }
3080       barf("unblockThread (BLACKHOLE): TSO not found");
3081     }
3082
3083   case BlockedOnException:
3084     {
3085       StgTSO *target  = tso->block_info.tso;
3086
3087       ASSERT(get_itbl(target)->type == TSO);
3088
3089       if (target->what_next == ThreadRelocated) {
3090           target = target->link;
3091           ASSERT(get_itbl(target)->type == TSO);
3092       }
3093
3094       ASSERT(target->blocked_exceptions != NULL);
3095
3096       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3097       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3098            t != END_BQ_QUEUE; 
3099            last = &t->link, t = t->link) {
3100         ASSERT(get_itbl(t)->type == TSO);
3101         if (t == (StgBlockingQueueElement *)tso) {
3102           *last = (StgBlockingQueueElement *)tso->link;
3103           goto done;
3104         }
3105       }
3106       barf("unblockThread (Exception): TSO not found");
3107     }
3108
3109   case BlockedOnRead:
3110   case BlockedOnWrite:
3111     {
3112       /* take TSO off blocked_queue */
3113       StgBlockingQueueElement *prev = NULL;
3114       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3115            prev = t, t = t->link) {
3116         if (t == (StgBlockingQueueElement *)tso) {
3117           if (prev == NULL) {
3118             blocked_queue_hd = (StgTSO *)t->link;
3119             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3120               blocked_queue_tl = END_TSO_QUEUE;
3121             }
3122           } else {
3123             prev->link = t->link;
3124             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3125               blocked_queue_tl = (StgTSO *)prev;
3126             }
3127           }
3128           goto done;
3129         }
3130       }
3131       barf("unblockThread (I/O): TSO not found");
3132     }
3133
3134   case BlockedOnDelay:
3135     {
3136       /* take TSO off sleeping_queue */
3137       StgBlockingQueueElement *prev = NULL;
3138       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3139            prev = t, t = t->link) {
3140         if (t == (StgBlockingQueueElement *)tso) {
3141           if (prev == NULL) {
3142             sleeping_queue = (StgTSO *)t->link;
3143           } else {
3144             prev->link = t->link;
3145           }
3146           goto done;
3147         }
3148       }
3149       barf("unblockThread (I/O): TSO not found");
3150     }
3151
3152   default:
3153     barf("unblockThread");
3154   }
3155
3156  done:
3157   tso->link = END_TSO_QUEUE;
3158   tso->why_blocked = NotBlocked;
3159   tso->block_info.closure = NULL;
3160   PUSH_ON_RUN_QUEUE(tso);
3161 }
3162 #else
3163 static void
3164 unblockThread(StgTSO *tso)
3165 {
3166   StgTSO *t, **last;
3167   
3168   /* To avoid locking unnecessarily. */
3169   if (tso->why_blocked == NotBlocked) {
3170     return;
3171   }
3172
3173   switch (tso->why_blocked) {
3174
3175   case BlockedOnMVar:
3176     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3177     {
3178       StgTSO *last_tso = END_TSO_QUEUE;
3179       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3180
3181       last = &mvar->head;
3182       for (t = mvar->head; t != END_TSO_QUEUE; 
3183            last = &t->link, last_tso = t, t = t->link) {
3184         if (t == tso) {
3185           *last = tso->link;
3186           if (mvar->tail == tso) {
3187             mvar->tail = last_tso;
3188           }
3189           goto done;
3190         }
3191       }
3192       barf("unblockThread (MVAR): TSO not found");
3193     }
3194
3195   case BlockedOnBlackHole:
3196     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3197     {
3198       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3199
3200       last = &bq->blocking_queue;
3201       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3202            last = &t->link, t = t->link) {
3203         if (t == tso) {
3204           *last = tso->link;
3205           goto done;
3206         }
3207       }
3208       barf("unblockThread (BLACKHOLE): TSO not found");
3209     }
3210
3211   case BlockedOnException:
3212     {
3213       StgTSO *target  = tso->block_info.tso;
3214
3215       ASSERT(get_itbl(target)->type == TSO);
3216
3217       while (target->what_next == ThreadRelocated) {
3218           target = target->link;
3219           ASSERT(get_itbl(target)->type == TSO);
3220       }
3221       
3222       ASSERT(target->blocked_exceptions != NULL);
3223
3224       last = &target->blocked_exceptions;
3225       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3226            last = &t->link, t = t->link) {
3227         ASSERT(get_itbl(t)->type == TSO);
3228         if (t == tso) {
3229           *last = tso->link;
3230           goto done;
3231         }
3232       }
3233       barf("unblockThread (Exception): TSO not found");
3234     }
3235
3236   case BlockedOnRead:
3237   case BlockedOnWrite:
3238     {
3239       StgTSO *prev = NULL;
3240       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3241            prev = t, t = t->link) {
3242         if (t == tso) {
3243           if (prev == NULL) {
3244             blocked_queue_hd = t->link;
3245             if (blocked_queue_tl == t) {
3246               blocked_queue_tl = END_TSO_QUEUE;
3247             }
3248           } else {
3249             prev->link = t->link;
3250             if (blocked_queue_tl == t) {
3251               blocked_queue_tl = prev;
3252             }
3253           }
3254           goto done;
3255         }
3256       }
3257       barf("unblockThread (I/O): TSO not found");
3258     }
3259
3260   case BlockedOnDelay:
3261     {
3262       StgTSO *prev = NULL;
3263       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3264            prev = t, t = t->link) {
3265         if (t == tso) {
3266           if (prev == NULL) {
3267             sleeping_queue = t->link;
3268           } else {
3269             prev->link = t->link;
3270           }
3271           goto done;
3272         }
3273       }
3274       barf("unblockThread (I/O): TSO not found");
3275     }
3276
3277   default:
3278     barf("unblockThread");
3279   }
3280
3281  done:
3282   tso->link = END_TSO_QUEUE;
3283   tso->why_blocked = NotBlocked;
3284   tso->block_info.closure = NULL;
3285   PUSH_ON_RUN_QUEUE(tso);
3286 }
3287 #endif
3288
3289 /* -----------------------------------------------------------------------------
3290  * raiseAsync()
3291  *
3292  * The following function implements the magic for raising an
3293  * asynchronous exception in an existing thread.
3294  *
3295  * We first remove the thread from any queue on which it might be
3296  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3297  *
3298  * We strip the stack down to the innermost CATCH_FRAME, building
3299  * thunks in the heap for all the active computations, so they can 
3300  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3301  * an application of the handler to the exception, and push it on
3302  * the top of the stack.
3303  * 
3304  * How exactly do we save all the active computations?  We create an
3305  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3306  * AP_STACKs pushes everything from the corresponding update frame
3307  * upwards onto the stack.  (Actually, it pushes everything up to the
3308  * next update frame plus a pointer to the next AP_STACK object.
3309  * Entering the next AP_STACK object pushes more onto the stack until we
3310  * reach the last AP_STACK object - at which point the stack should look
3311  * exactly as it did when we killed the TSO and we can continue
3312  * execution by entering the closure on top of the stack.
3313  *
3314  * We can also kill a thread entirely - this happens if either (a) the 
3315  * exception passed to raiseAsync is NULL, or (b) there's no
3316  * CATCH_FRAME on the stack.  In either case, we strip the entire
3317  * stack and replace the thread with a zombie.
3318  *
3319  * Locks: sched_mutex held upon entry nor exit.
3320  *
3321  * -------------------------------------------------------------------------- */
3322  
3323 void 
3324 deleteThread(StgTSO *tso)
3325 {
3326   raiseAsync(tso,NULL);
3327 }
3328
3329 void
3330 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3331 {
3332   /* When raising async exs from contexts where sched_mutex isn't held;
3333      use raiseAsyncWithLock(). */
3334   ACQUIRE_LOCK(&sched_mutex);
3335   raiseAsync(tso,exception);
3336   RELEASE_LOCK(&sched_mutex);
3337 }
3338
3339 void
3340 raiseAsync(StgTSO *tso, StgClosure *exception)
3341 {
3342     StgRetInfoTable *info;
3343     StgPtr sp;
3344   
3345     // Thread already dead?
3346     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3347         return;
3348     }
3349
3350     IF_DEBUG(scheduler, 
3351              sched_belch("raising exception in thread %ld.", tso->id));
3352     
3353     // Remove it from any blocking queues
3354     unblockThread(tso);
3355
3356     sp = tso->sp;
3357     
3358     // The stack freezing code assumes there's a closure pointer on
3359     // the top of the stack, so we have to arrange that this is the case...
3360     //
3361     if (sp[0] == (W_)&stg_enter_info) {
3362         sp++;
3363     } else {
3364         sp--;
3365         sp[0] = (W_)&stg_dummy_ret_closure;
3366     }
3367
3368     while (1) {
3369         nat i;
3370
3371         // 1. Let the top of the stack be the "current closure"
3372         //
3373         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3374         // CATCH_FRAME.
3375         //
3376         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3377         // current closure applied to the chunk of stack up to (but not
3378         // including) the update frame.  This closure becomes the "current
3379         // closure".  Go back to step 2.
3380         //
3381         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3382         // top of the stack applied to the exception.
3383         // 
3384         // 5. If it's a STOP_FRAME, then kill the thread.
3385         
3386         StgPtr frame;
3387         
3388         frame = sp + 1;
3389         info = get_ret_itbl((StgClosure *)frame);
3390         
3391         while (info->i.type != UPDATE_FRAME
3392                && (info->i.type != CATCH_FRAME || exception == NULL)
3393                && info->i.type != STOP_FRAME) {
3394             frame += stack_frame_sizeW((StgClosure *)frame);
3395             info = get_ret_itbl((StgClosure *)frame);
3396         }
3397         
3398         switch (info->i.type) {
3399             
3400         case CATCH_FRAME:
3401             // If we find a CATCH_FRAME, and we've got an exception to raise,
3402             // then build the THUNK raise(exception), and leave it on
3403             // top of the CATCH_FRAME ready to enter.
3404             //
3405         {
3406 #ifdef PROFILING
3407             StgCatchFrame *cf = (StgCatchFrame *)frame;
3408 #endif
3409             StgClosure *raise;
3410             
3411             // we've got an exception to raise, so let's pass it to the
3412             // handler in this frame.
3413             //
3414             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3415             TICK_ALLOC_SE_THK(1,0);
3416             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3417             raise->payload[0] = exception;
3418             
3419             // throw away the stack from Sp up to the CATCH_FRAME.
3420             //
3421             sp = frame - 1;
3422             
3423             /* Ensure that async excpetions are blocked now, so we don't get
3424              * a surprise exception before we get around to executing the
3425              * handler.
3426              */
3427             if (tso->blocked_exceptions == NULL) {
3428                 tso->blocked_exceptions = END_TSO_QUEUE;
3429             }
3430             
3431             /* Put the newly-built THUNK on top of the stack, ready to execute
3432              * when the thread restarts.
3433              */
3434             sp[0] = (W_)raise;
3435             sp[-1] = (W_)&stg_enter_info;
3436             tso->sp = sp-1;
3437             tso->what_next = ThreadRunGHC;
3438             IF_DEBUG(sanity, checkTSO(tso));
3439             return;
3440         }
3441         
3442         case UPDATE_FRAME:
3443         {
3444             StgAP_STACK * ap;
3445             nat words;
3446             
3447             // First build an AP_STACK consisting of the stack chunk above the
3448             // current update frame, with the top word on the stack as the
3449             // fun field.
3450             //
3451             words = frame - sp - 1;
3452             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3453             
3454             ap->size = words;
3455             ap->fun  = (StgClosure *)sp[0];
3456             sp++;
3457             for(i=0; i < (nat)words; ++i) {
3458                 ap->payload[i] = (StgClosure *)*sp++;
3459             }
3460             
3461             SET_HDR(ap,&stg_AP_STACK_info,
3462                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3463             TICK_ALLOC_UP_THK(words+1,0);
3464             
3465             IF_DEBUG(scheduler,
3466                      fprintf(stderr,  "scheduler: Updating ");
3467                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3468                      fprintf(stderr,  " with ");
3469                      printObj((StgClosure *)ap);
3470                 );
3471
3472             // Replace the updatee with an indirection - happily
3473             // this will also wake up any threads currently
3474             // waiting on the result.
3475             //
3476             // Warning: if we're in a loop, more than one update frame on
3477             // the stack may point to the same object.  Be careful not to
3478             // overwrite an IND_OLDGEN in this case, because we'll screw
3479             // up the mutable lists.  To be on the safe side, don't
3480             // overwrite any kind of indirection at all.  See also
3481             // threadSqueezeStack in GC.c, where we have to make a similar
3482             // check.
3483             //
3484             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3485                 // revert the black hole
3486                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3487             }
3488             sp += sizeofW(StgUpdateFrame) - 1;
3489             sp[0] = (W_)ap; // push onto stack
3490             break;
3491         }
3492         
3493         case STOP_FRAME:
3494             // We've stripped the entire stack, the thread is now dead.
3495             sp += sizeofW(StgStopFrame);
3496             tso->what_next = ThreadKilled;
3497             tso->sp = sp;
3498             return;
3499             
3500         default:
3501             barf("raiseAsync");
3502         }
3503     }
3504     barf("raiseAsync");
3505 }
3506
3507 /* -----------------------------------------------------------------------------
3508    resurrectThreads is called after garbage collection on the list of
3509    threads found to be garbage.  Each of these threads will be woken
3510    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3511    on an MVar, or NonTermination if the thread was blocked on a Black
3512    Hole.
3513
3514    Locks: sched_mutex isn't held upon entry nor exit.
3515    -------------------------------------------------------------------------- */
3516
3517 void
3518 resurrectThreads( StgTSO *threads )
3519 {
3520   StgTSO *tso, *next;
3521
3522   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3523     next = tso->global_link;
3524     tso->global_link = all_threads;
3525     all_threads = tso;
3526     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3527
3528     switch (tso->why_blocked) {
3529     case BlockedOnMVar:
3530     case BlockedOnException:
3531       /* Called by GC - sched_mutex lock is currently held. */
3532       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3533       break;
3534     case BlockedOnBlackHole:
3535       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3536       break;
3537     case NotBlocked:
3538       /* This might happen if the thread was blocked on a black hole
3539        * belonging to a thread that we've just woken up (raiseAsync
3540        * can wake up threads, remember...).
3541        */
3542       continue;
3543     default:
3544       barf("resurrectThreads: thread blocked in a strange way");
3545     }
3546   }
3547 }
3548
3549 /* -----------------------------------------------------------------------------
3550  * Blackhole detection: if we reach a deadlock, test whether any
3551  * threads are blocked on themselves.  Any threads which are found to
3552  * be self-blocked get sent a NonTermination exception.
3553  *
3554  * This is only done in a deadlock situation in order to avoid
3555  * performance overhead in the normal case.
3556  *
3557  * Locks: sched_mutex is held upon entry and exit.
3558  * -------------------------------------------------------------------------- */
3559
3560 static void
3561 detectBlackHoles( void )
3562 {
3563     StgTSO *tso = all_threads;
3564     StgClosure *frame;
3565     StgClosure *blocked_on;
3566     StgRetInfoTable *info;
3567
3568     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3569
3570         while (tso->what_next == ThreadRelocated) {
3571             tso = tso->link;
3572             ASSERT(get_itbl(tso)->type == TSO);
3573         }
3574       
3575         if (tso->why_blocked != BlockedOnBlackHole) {
3576             continue;
3577         }
3578         blocked_on = tso->block_info.closure;
3579
3580         frame = (StgClosure *)tso->sp;
3581
3582         while(1) {
3583             info = get_ret_itbl(frame);
3584             switch (info->i.type) {
3585             case UPDATE_FRAME:
3586                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3587                     /* We are blocking on one of our own computations, so
3588                      * send this thread the NonTermination exception.  
3589                      */
3590                     IF_DEBUG(scheduler, 
3591                              sched_belch("thread %d is blocked on itself", tso->id));
3592                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3593                     goto done;
3594                 }
3595                 
3596                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3597                 continue;
3598
3599             case STOP_FRAME:
3600                 goto done;
3601
3602                 // normal stack frames; do nothing except advance the pointer
3603             default:
3604                 (StgPtr)frame += stack_frame_sizeW(frame);
3605             }
3606         }   
3607         done: ;
3608     }
3609 }
3610
3611 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3612 //@subsection Debugging Routines
3613
3614 /* -----------------------------------------------------------------------------
3615  * Debugging: why is a thread blocked
3616  * [Also provides useful information when debugging threaded programs
3617  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3618    -------------------------------------------------------------------------- */
3619
3620 static
3621 void
3622 printThreadBlockage(StgTSO *tso)
3623 {
3624   switch (tso->why_blocked) {
3625   case BlockedOnRead:
3626     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3627     break;
3628   case BlockedOnWrite:
3629     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3630     break;
3631   case BlockedOnDelay:
3632     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3633     break;
3634   case BlockedOnMVar:
3635     fprintf(stderr,"is blocked on an MVar");
3636     break;
3637   case BlockedOnException:
3638     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3639             tso->block_info.tso->id);
3640     break;
3641   case BlockedOnBlackHole:
3642     fprintf(stderr,"is blocked on a black hole");
3643     break;
3644   case NotBlocked:
3645     fprintf(stderr,"is not blocked");
3646     break;
3647 #if defined(PAR)
3648   case BlockedOnGA:
3649     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3650             tso->block_info.closure, info_type(tso->block_info.closure));
3651     break;
3652   case BlockedOnGA_NoSend:
3653     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3654             tso->block_info.closure, info_type(tso->block_info.closure));
3655     break;
3656 #endif
3657 #if defined(RTS_SUPPORTS_THREADS)
3658   case BlockedOnCCall:
3659     fprintf(stderr,"is blocked on an external call");
3660     break;
3661   case BlockedOnCCall_NoUnblockExc:
3662     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3663     break;
3664 #endif
3665   default:
3666     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3667          tso->why_blocked, tso->id, tso);
3668   }
3669 }
3670
3671 static
3672 void
3673 printThreadStatus(StgTSO *tso)
3674 {
3675   switch (tso->what_next) {
3676   case ThreadKilled:
3677     fprintf(stderr,"has been killed");
3678     break;
3679   case ThreadComplete:
3680     fprintf(stderr,"has completed");
3681     break;
3682   default:
3683     printThreadBlockage(tso);
3684   }
3685 }
3686
3687 void
3688 printAllThreads(void)
3689 {
3690   StgTSO *t;
3691   void *label;
3692
3693 # if defined(GRAN)
3694   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3695   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3696                        time_string, rtsFalse/*no commas!*/);
3697
3698   fprintf(stderr, "all threads at [%s]:\n", time_string);
3699 # elif defined(PAR)
3700   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3701   ullong_format_string(CURRENT_TIME,
3702                        time_string, rtsFalse/*no commas!*/);
3703
3704   fprintf(stderr,"all threads at [%s]:\n", time_string);
3705 # else
3706   fprintf(stderr,"all threads:\n");
3707 # endif
3708
3709   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3710     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3711     label = lookupThreadLabel((StgWord)t);
3712     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3713     printThreadStatus(t);
3714     fprintf(stderr,"\n");
3715   }
3716 }
3717     
3718 #ifdef DEBUG
3719
3720 /* 
3721    Print a whole blocking queue attached to node (debugging only).
3722 */
3723 //@cindex print_bq
3724 # if defined(PAR)
3725 void 
3726 print_bq (StgClosure *node)
3727 {
3728   StgBlockingQueueElement *bqe;
3729   StgTSO *tso;
3730   rtsBool end;
3731
3732   fprintf(stderr,"## BQ of closure %p (%s): ",
3733           node, info_type(node));
3734
3735   /* should cover all closures that may have a blocking queue */
3736   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3737          get_itbl(node)->type == FETCH_ME_BQ ||
3738          get_itbl(node)->type == RBH ||
3739          get_itbl(node)->type == MVAR);
3740     
3741   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3742
3743   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3744 }
3745
3746 /* 
3747    Print a whole blocking queue starting with the element bqe.
3748 */
3749 void 
3750 print_bqe (StgBlockingQueueElement *bqe)
3751 {
3752   rtsBool end;
3753
3754   /* 
3755      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3756   */
3757   for (end = (bqe==END_BQ_QUEUE);
3758        !end; // iterate until bqe points to a CONSTR
3759        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3760        bqe = end ? END_BQ_QUEUE : bqe->link) {
3761     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3762     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3763     /* types of closures that may appear in a blocking queue */
3764     ASSERT(get_itbl(bqe)->type == TSO ||           
3765            get_itbl(bqe)->type == BLOCKED_FETCH || 
3766            get_itbl(bqe)->type == CONSTR); 
3767     /* only BQs of an RBH end with an RBH_Save closure */
3768     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3769
3770     switch (get_itbl(bqe)->type) {
3771     case TSO:
3772       fprintf(stderr," TSO %u (%x),",
3773               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3774       break;
3775     case BLOCKED_FETCH:
3776       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3777               ((StgBlockedFetch *)bqe)->node, 
3778               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3779               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3780               ((StgBlockedFetch *)bqe)->ga.weight);
3781       break;
3782     case CONSTR:
3783       fprintf(stderr," %s (IP %p),",
3784               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3785                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3786                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3787                "RBH_Save_?"), get_itbl(bqe));
3788       break;
3789     default:
3790       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3791            info_type((StgClosure *)bqe)); // , node, info_type(node));
3792       break;
3793     }
3794   } /* for */
3795   fputc('\n', stderr);
3796 }
3797 # elif defined(GRAN)
3798 void 
3799 print_bq (StgClosure *node)
3800 {
3801   StgBlockingQueueElement *bqe;
3802   PEs node_loc, tso_loc;
3803   rtsBool end;
3804
3805   /* should cover all closures that may have a blocking queue */
3806   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3807          get_itbl(node)->type == FETCH_ME_BQ ||
3808          get_itbl(node)->type == RBH);
3809     
3810   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3811   node_loc = where_is(node);
3812
3813   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3814           node, info_type(node), node_loc);
3815
3816   /* 
3817      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3818   */
3819   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3820        !end; // iterate until bqe points to a CONSTR
3821        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3822     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3823     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3824     /* types of closures that may appear in a blocking queue */
3825     ASSERT(get_itbl(bqe)->type == TSO ||           
3826            get_itbl(bqe)->type == CONSTR); 
3827     /* only BQs of an RBH end with an RBH_Save closure */
3828     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3829
3830     tso_loc = where_is((StgClosure *)bqe);
3831     switch (get_itbl(bqe)->type) {
3832     case TSO:
3833       fprintf(stderr," TSO %d (%p) on [PE %d],",
3834               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3835       break;
3836     case CONSTR:
3837       fprintf(stderr," %s (IP %p),",
3838               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3839                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3840                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3841                "RBH_Save_?"), get_itbl(bqe));
3842       break;
3843     default:
3844       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3845            info_type((StgClosure *)bqe), node, info_type(node));
3846       break;
3847     }
3848   } /* for */
3849   fputc('\n', stderr);
3850 }
3851 #else
3852 /* 
3853    Nice and easy: only TSOs on the blocking queue
3854 */
3855 void 
3856 print_bq (StgClosure *node)
3857 {
3858   StgTSO *tso;
3859
3860   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3861   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3862        tso != END_TSO_QUEUE; 
3863        tso=tso->link) {
3864     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3865     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3866     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3867   }
3868   fputc('\n', stderr);
3869 }
3870 # endif
3871
3872 #if defined(PAR)
3873 static nat
3874 run_queue_len(void)
3875 {
3876   nat i;
3877   StgTSO *tso;
3878
3879   for (i=0, tso=run_queue_hd; 
3880        tso != END_TSO_QUEUE;
3881        i++, tso=tso->link)
3882     /* nothing */
3883
3884   return i;
3885 }
3886 #endif
3887
3888 static void
3889 sched_belch(char *s, ...)
3890 {
3891   va_list ap;
3892   va_start(ap,s);
3893 #ifdef SMP
3894   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3895 #elif defined(PAR)
3896   fprintf(stderr, "== ");
3897 #else
3898   fprintf(stderr, "scheduler: ");
3899 #endif
3900   vfprintf(stderr, s, ap);
3901   fprintf(stderr, "\n");
3902   va_end(ap);
3903 }
3904
3905 #endif /* DEBUG */
3906
3907
3908 //@node Index,  , Debugging Routines, Main scheduling code
3909 //@subsection Index
3910
3911 //@index
3912 //* StgMainThread::  @cindex\s-+StgMainThread
3913 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3914 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3915 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3916 //* context_switch::  @cindex\s-+context_switch
3917 //* createThread::  @cindex\s-+createThread
3918 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3919 //* initScheduler::  @cindex\s-+initScheduler
3920 //* interrupted::  @cindex\s-+interrupted
3921 //* next_thread_id::  @cindex\s-+next_thread_id
3922 //* print_bq::  @cindex\s-+print_bq
3923 //* run_queue_hd::  @cindex\s-+run_queue_hd
3924 //* run_queue_tl::  @cindex\s-+run_queue_tl
3925 //* sched_mutex::  @cindex\s-+sched_mutex
3926 //* schedule::  @cindex\s-+schedule
3927 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3928 //* term_mutex::  @cindex\s-+term_mutex
3929 //@end index