[project @ 2003-12-12 16:35:19 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.182 2003/12/12 16:35:20 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #define COMPILING_SCHEDULER
88 #include "Schedule.h"
89 #include "StgMiscClosures.h"
90 #include "Storage.h"
91 #include "Interpreter.h"
92 #include "Exception.h"
93 #include "Printer.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Timer.h"
98 #include "Prelude.h"
99 #include "ThreadLabels.h"
100 #ifdef PROFILING
101 #include "Proftimer.h"
102 #include "ProfHeap.h"
103 #endif
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
111 # include "HLC.h"
112 #endif
113 #include "Sparks.h"
114 #include "Capability.h"
115 #include "OSThreads.h"
116 #include  "Task.h"
117
118 #ifdef HAVE_SYS_TYPES_H
119 #include <sys/types.h>
120 #endif
121 #ifdef HAVE_UNISTD_H
122 #include <unistd.h>
123 #endif
124
125 #include <string.h>
126 #include <stdlib.h>
127 #include <stdarg.h>
128
129 #ifdef HAVE_ERRNO_H
130 #include <errno.h>
131 #endif
132
133 #ifdef THREADED_RTS
134 #define USED_IN_THREADED_RTS
135 #else
136 #define USED_IN_THREADED_RTS STG_UNUSED
137 #endif
138
139 #ifdef RTS_SUPPORTS_THREADS
140 #define USED_WHEN_RTS_SUPPORTS_THREADS
141 #else
142 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
143 #endif
144
145 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
146 //@subsection Variables and Data structures
147
148 /* Main thread queue.
149  * Locks required: sched_mutex.
150  */
151 StgMainThread *main_threads = NULL;
152
153 /* Thread queues.
154  * Locks required: sched_mutex.
155  */
156 #if defined(GRAN)
157
158 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
159 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
160
161 /* 
162    In GranSim we have a runnable and a blocked queue for each processor.
163    In order to minimise code changes new arrays run_queue_hds/tls
164    are created. run_queue_hd is then a short cut (macro) for
165    run_queue_hds[CurrentProc] (see GranSim.h).
166    -- HWL
167 */
168 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
169 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
170 StgTSO *ccalling_threadss[MAX_PROC];
171 /* We use the same global list of threads (all_threads) in GranSim as in
172    the std RTS (i.e. we are cheating). However, we don't use this list in
173    the GranSim specific code at the moment (so we are only potentially
174    cheating).  */
175
176 #else /* !GRAN */
177
178 StgTSO *run_queue_hd = NULL;
179 StgTSO *run_queue_tl = NULL;
180 StgTSO *blocked_queue_hd = NULL;
181 StgTSO *blocked_queue_tl = NULL;
182 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
183
184 #endif
185
186 /* Linked list of all threads.
187  * Used for detecting garbage collected threads.
188  */
189 StgTSO *all_threads = NULL;
190
191 /* When a thread performs a safe C call (_ccall_GC, using old
192  * terminology), it gets put on the suspended_ccalling_threads
193  * list. Used by the garbage collector.
194  */
195 static StgTSO *suspended_ccalling_threads;
196
197 static StgTSO *threadStackOverflow(StgTSO *tso);
198
199 /* KH: The following two flags are shared memory locations.  There is no need
200        to lock them, since they are only unset at the end of a scheduler
201        operation.
202 */
203
204 /* flag set by signal handler to precipitate a context switch */
205 //@cindex context_switch
206 nat context_switch = 0;
207
208 /* if this flag is set as well, give up execution */
209 //@cindex interrupted
210 rtsBool interrupted = rtsFalse;
211
212 /* Next thread ID to allocate.
213  * Locks required: thread_id_mutex
214  */
215 //@cindex next_thread_id
216 static StgThreadID next_thread_id = 1;
217
218 /*
219  * Pointers to the state of the current thread.
220  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
221  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
222  */
223  
224 /* The smallest stack size that makes any sense is:
225  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
226  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
227  *  + 1                       (the closure to enter)
228  *  + 1                       (stg_ap_v_ret)
229  *  + 1                       (spare slot req'd by stg_ap_v_ret)
230  *
231  * A thread with this stack will bomb immediately with a stack
232  * overflow, which will increase its stack size.  
233  */
234
235 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
236
237
238 #if defined(GRAN)
239 StgTSO *CurrentTSO;
240 #endif
241
242 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
243  *  exists - earlier gccs apparently didn't.
244  *  -= chak
245  */
246 StgTSO dummy_tso;
247
248 static rtsBool ready_to_gc;
249
250 /*
251  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
252  * in an MT setting, needed to signal that a worker thread shouldn't hang around
253  * in the scheduler when it is out of work.
254  */
255 static rtsBool shutting_down_scheduler = rtsFalse;
256
257 void            addToBlockedQueue ( StgTSO *tso );
258
259 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
260        void     interruptStgRts   ( void );
261
262 static void     detectBlackHoles  ( void );
263
264 #if defined(RTS_SUPPORTS_THREADS)
265 /* ToDo: carefully document the invariants that go together
266  *       with these synchronisation objects.
267  */
268 Mutex     sched_mutex       = INIT_MUTEX_VAR;
269 Mutex     term_mutex        = INIT_MUTEX_VAR;
270
271 /*
272  * A heavyweight solution to the problem of protecting
273  * the thread_id from concurrent update.
274  */
275 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
276
277
278 # if defined(SMP)
279 static Condition gc_pending_cond = INIT_COND_VAR;
280 nat await_death;
281 # endif
282
283 #endif /* RTS_SUPPORTS_THREADS */
284
285 #if defined(PAR)
286 StgTSO *LastTSO;
287 rtsTime TimeOfLastYield;
288 rtsBool emitSchedule = rtsTrue;
289 #endif
290
291 #if DEBUG
292 static char *whatNext_strs[] = {
293   "ThreadRunGHC",
294   "ThreadInterpret",
295   "ThreadKilled",
296   "ThreadRelocated",
297   "ThreadComplete"
298 };
299 #endif
300
301 #if defined(PAR)
302 StgTSO * createSparkThread(rtsSpark spark);
303 StgTSO * activateSpark (rtsSpark spark);  
304 #endif
305
306 /*
307  * The thread state for the main thread.
308 // ToDo: check whether not needed any more
309 StgTSO   *MainTSO;
310  */
311
312 #if defined(RTS_SUPPORTS_THREADS)
313 static rtsBool startingWorkerThread = rtsFalse;
314
315 static void taskStart(void);
316 static void
317 taskStart(void)
318 {
319   Capability *cap;
320   
321   ACQUIRE_LOCK(&sched_mutex);
322   startingWorkerThread = rtsFalse;
323   waitForWorkCapability(&sched_mutex, &cap, NULL);
324   RELEASE_LOCK(&sched_mutex);
325   
326   schedule(NULL,cap);
327 }
328
329 void
330 startSchedulerTaskIfNecessary(void)
331 {
332   if(run_queue_hd != END_TSO_QUEUE
333     || blocked_queue_hd != END_TSO_QUEUE
334     || sleeping_queue != END_TSO_QUEUE)
335   {
336     if(!startingWorkerThread)
337     { // we don't want to start another worker thread
338       // just because the last one hasn't yet reached the
339       // "waiting for capability" state
340       startingWorkerThread = rtsTrue;
341       startTask(taskStart);
342     }
343   }
344 }
345 #endif
346
347 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
348 //@subsection Main scheduling loop
349
350 /* ---------------------------------------------------------------------------
351    Main scheduling loop.
352
353    We use round-robin scheduling, each thread returning to the
354    scheduler loop when one of these conditions is detected:
355
356       * out of heap space
357       * timer expires (thread yields)
358       * thread blocks
359       * thread ends
360       * stack overflow
361
362    Locking notes:  we acquire the scheduler lock once at the beginning
363    of the scheduler loop, and release it when
364     
365       * running a thread, or
366       * waiting for work, or
367       * waiting for a GC to complete.
368
369    GRAN version:
370      In a GranSim setup this loop iterates over the global event queue.
371      This revolves around the global event queue, which determines what 
372      to do next. Therefore, it's more complicated than either the 
373      concurrent or the parallel (GUM) setup.
374
375    GUM version:
376      GUM iterates over incoming messages.
377      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
378      and sends out a fish whenever it has nothing to do; in-between
379      doing the actual reductions (shared code below) it processes the
380      incoming messages and deals with delayed operations 
381      (see PendingFetches).
382      This is not the ugliest code you could imagine, but it's bloody close.
383
384    ------------------------------------------------------------------------ */
385 //@cindex schedule
386 static void
387 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
388           Capability *initialCapability )
389 {
390   StgTSO *t;
391   Capability *cap = initialCapability;
392   StgThreadReturnCode ret;
393 #if defined(GRAN)
394   rtsEvent *event;
395 #elif defined(PAR)
396   StgSparkPool *pool;
397   rtsSpark spark;
398   StgTSO *tso;
399   GlobalTaskId pe;
400   rtsBool receivedFinish = rtsFalse;
401 # if defined(DEBUG)
402   nat tp_size, sp_size; // stats only
403 # endif
404 #endif
405   rtsBool was_interrupted = rtsFalse;
406   StgTSOWhatNext prev_what_next;
407   
408   ACQUIRE_LOCK(&sched_mutex);
409  
410 #if defined(RTS_SUPPORTS_THREADS)
411   //
412   // in the threaded case, the capability is either passed in via the
413   // initialCapability parameter, or initialized inside the scheduler
414   // loop 
415   //
416   IF_DEBUG(scheduler,
417            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
418                        mainThread, initialCapability);
419       );
420 #else
421   /* simply initialise it in the non-threaded case */
422   grabCapability(&cap);
423 #endif
424
425 #if defined(GRAN)
426   /* set up first event to get things going */
427   /* ToDo: assign costs for system setup and init MainTSO ! */
428   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
429             ContinueThread, 
430             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
431
432   IF_DEBUG(gran,
433            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
434            G_TSO(CurrentTSO, 5));
435
436   if (RtsFlags.GranFlags.Light) {
437     /* Save current time; GranSim Light only */
438     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
439   }      
440
441   event = get_next_event();
442
443   while (event!=(rtsEvent*)NULL) {
444     /* Choose the processor with the next event */
445     CurrentProc = event->proc;
446     CurrentTSO = event->tso;
447
448 #elif defined(PAR)
449
450   while (!receivedFinish) {    /* set by processMessages */
451                                /* when receiving PP_FINISH message         */ 
452
453 #else // everything except GRAN and PAR
454
455   while (1) {
456
457 #endif
458
459      IF_DEBUG(scheduler, printAllThreads());
460
461 #if defined(RTS_SUPPORTS_THREADS)
462     //
463     // Check to see whether there are any worker threads
464     // waiting to deposit external call results. If so,
465     // yield our capability... if we have a capability, that is.
466     //
467     if (cap != NULL) {
468         yieldToReturningWorker(&sched_mutex, &cap,
469                                mainThread ? &mainThread->bound_thread_cond
470                                           : NULL);
471     }
472
473     // If we do not currently hold a capability, we wait for one
474     if (cap == NULL) {
475         waitForWorkCapability(&sched_mutex, &cap,
476                               mainThread ? &mainThread->bound_thread_cond
477                                          : NULL);
478     }
479 #endif
480
481     //
482     // If we're interrupted (the user pressed ^C, or some other
483     // termination condition occurred), kill all the currently running
484     // threads.
485     //
486     if (interrupted) {
487         IF_DEBUG(scheduler, sched_belch("interrupted"));
488         interrupted = rtsFalse;
489         was_interrupted = rtsTrue;
490 #if defined(RTS_SUPPORTS_THREADS)
491         // In the threaded RTS, deadlock detection doesn't work,
492         // so just exit right away.
493         prog_belch("interrupted");
494         releaseCapability(cap);
495         RELEASE_LOCK(&sched_mutex);
496         shutdownHaskellAndExit(EXIT_SUCCESS);
497 #else
498         deleteAllThreads();
499 #endif
500     }
501
502     //
503     // Go through the list of main threads and wake up any
504     // clients whose computations have finished.  ToDo: this
505     // should be done more efficiently without a linear scan
506     // of the main threads list, somehow...
507     //
508 #if defined(RTS_SUPPORTS_THREADS)
509     { 
510         StgMainThread *m, **prev;
511         prev = &main_threads;
512         for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
513           if (m->tso->what_next == ThreadComplete
514               || m->tso->what_next == ThreadKilled)
515           {
516             if (m == mainThread)
517             {
518               if (m->tso->what_next == ThreadComplete)
519               {
520                 if (m->ret)
521                 {
522                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
523                   *(m->ret) = (StgClosure *)m->tso->sp[1]; 
524                 }
525                 m->stat = Success;
526               }
527               else
528               {
529                 if (m->ret)
530                 {
531                   *(m->ret) = NULL;
532                 }
533                 if (was_interrupted)
534                 {
535                   m->stat = Interrupted;
536                 }
537                 else
538                 {
539                   m->stat = Killed;
540                 }
541               }
542               *prev = m->link;
543             
544 #ifdef DEBUG
545               removeThreadLabel((StgWord)m->tso->id);
546 #endif
547               releaseCapability(cap);
548               RELEASE_LOCK(&sched_mutex);
549               return;
550             }
551             else
552             {
553                 // The current OS thread can not handle the fact that
554                 // the Haskell thread "m" has ended.  "m" is bound;
555                 // the scheduler loop in it's bound OS thread has to
556                 // return, so let's pass our capability directly to
557                 // that thread.
558                 passCapability(&sched_mutex, cap, &m->bound_thread_cond);
559                 cap = NULL;
560             }
561           }
562         }
563     }
564     
565     // If we gave our capability away, go to the top to get it back
566     if (cap == NULL) {
567         continue;       
568     }
569       
570 #else /* not threaded */
571
572 # if defined(PAR)
573     /* in GUM do this only on the Main PE */
574     if (IAmMainThread)
575 # endif
576     /* If our main thread has finished or been killed, return.
577      */
578     {
579       StgMainThread *m = main_threads;
580       if (m->tso->what_next == ThreadComplete
581           || m->tso->what_next == ThreadKilled) {
582 #ifdef DEBUG
583         removeThreadLabel((StgWord)m->tso->id);
584 #endif
585         main_threads = main_threads->link;
586         if (m->tso->what_next == ThreadComplete) {
587             // We finished successfully, fill in the return value
588             // NOTE: return val is tso->sp[1] (see StgStartup.hc)
589             if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
590             m->stat = Success;
591             return;
592         } else {
593           if (m->ret) { *(m->ret) = NULL; };
594           if (was_interrupted) {
595             m->stat = Interrupted;
596           } else {
597             m->stat = Killed;
598           }
599           return;
600         }
601       }
602     }
603 #endif
604
605
606 #if 0 /* defined(SMP) */
607     /* Top up the run queue from our spark pool.  We try to make the
608      * number of threads in the run queue equal to the number of
609      * free capabilities.
610      *
611      * Disable spark support in SMP for now, non-essential & requires
612      * a little bit of work to make it compile cleanly. -- sof 1/02.
613      */
614     {
615       nat n = getFreeCapabilities();
616       StgTSO *tso = run_queue_hd;
617
618       /* Count the run queue */
619       while (n > 0 && tso != END_TSO_QUEUE) {
620         tso = tso->link;
621         n--;
622       }
623
624       for (; n > 0; n--) {
625         StgClosure *spark;
626         spark = findSpark(rtsFalse);
627         if (spark == NULL) {
628           break; /* no more sparks in the pool */
629         } else {
630           /* I'd prefer this to be done in activateSpark -- HWL */
631           /* tricky - it needs to hold the scheduler lock and
632            * not try to re-acquire it -- SDM */
633           createSparkThread(spark);       
634           IF_DEBUG(scheduler,
635                    sched_belch("==^^ turning spark of closure %p into a thread",
636                                (StgClosure *)spark));
637         }
638       }
639       /* We need to wake up the other tasks if we just created some
640        * work for them.
641        */
642       if (getFreeCapabilities() - n > 1) {
643           signalCondition( &thread_ready_cond );
644       }
645     }
646 #endif // SMP
647
648 #if defined(RTS_USER_SIGNALS)
649     // check for signals each time around the scheduler
650     if (signals_pending()) {
651       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
652       startSignalHandlers();
653       ACQUIRE_LOCK(&sched_mutex);
654     }
655 #endif
656
657     /* Check whether any waiting threads need to be woken up.  If the
658      * run queue is empty, and there are no other tasks running, we
659      * can wait indefinitely for something to happen.
660      */
661     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
662 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
663                 || EMPTY_RUN_QUEUE()
664 #endif
665         )
666     {
667       awaitEvent( EMPTY_RUN_QUEUE()
668 #if defined(SMP)
669         && allFreeCapabilities()
670 #endif
671         );
672     }
673     /* we can be interrupted while waiting for I/O... */
674     if (interrupted) continue;
675
676     /* 
677      * Detect deadlock: when we have no threads to run, there are no
678      * threads waiting on I/O or sleeping, and all the other tasks are
679      * waiting for work, we must have a deadlock of some description.
680      *
681      * We first try to find threads blocked on themselves (ie. black
682      * holes), and generate NonTermination exceptions where necessary.
683      *
684      * If no threads are black holed, we have a deadlock situation, so
685      * inform all the main threads.
686      */
687 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
688     if (   EMPTY_THREAD_QUEUES()
689 #if defined(RTS_SUPPORTS_THREADS)
690         && EMPTY_QUEUE(suspended_ccalling_threads)
691 #endif
692 #ifdef SMP
693         && allFreeCapabilities()
694 #endif
695         )
696     {
697         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
698 #if defined(THREADED_RTS)
699         /* and SMP mode ..? */
700         releaseCapability(cap);
701 #endif
702         // Garbage collection can release some new threads due to
703         // either (a) finalizers or (b) threads resurrected because
704         // they are about to be send BlockedOnDeadMVar.  Any threads
705         // thus released will be immediately runnable.
706         GarbageCollect(GetRoots,rtsTrue);
707
708         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
709
710         IF_DEBUG(scheduler, 
711                  sched_belch("still deadlocked, checking for black holes..."));
712         detectBlackHoles();
713
714         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
715
716 #if defined(RTS_USER_SIGNALS)
717         /* If we have user-installed signal handlers, then wait
718          * for signals to arrive rather then bombing out with a
719          * deadlock.
720          */
721 #if defined(RTS_SUPPORTS_THREADS)
722         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
723                       a signal with no runnable threads (or I/O
724                       suspended ones) leads nowhere quick.
725                       For now, simply shut down when we reach this
726                       condition.
727                       
728                       ToDo: define precisely under what conditions
729                       the Scheduler should shut down in an MT setting.
730                    */
731 #else
732         if ( anyUserHandlers() ) {
733 #endif
734             IF_DEBUG(scheduler, 
735                      sched_belch("still deadlocked, waiting for signals..."));
736
737             awaitUserSignals();
738
739             // we might be interrupted...
740             if (interrupted) { continue; }
741
742             if (signals_pending()) {
743                 RELEASE_LOCK(&sched_mutex);
744                 startSignalHandlers();
745                 ACQUIRE_LOCK(&sched_mutex);
746             }
747             ASSERT(!EMPTY_RUN_QUEUE());
748             goto not_deadlocked;
749         }
750 #endif
751
752         /* Probably a real deadlock.  Send the current main thread the
753          * Deadlock exception (or in the SMP build, send *all* main
754          * threads the deadlock exception, since none of them can make
755          * progress).
756          */
757         {
758             StgMainThread *m;
759 #if defined(RTS_SUPPORTS_THREADS)
760             for (m = main_threads; m != NULL; m = m->link) {
761                 switch (m->tso->why_blocked) {
762                 case BlockedOnBlackHole:
763                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
764                     break;
765                 case BlockedOnException:
766                 case BlockedOnMVar:
767                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
768                     break;
769                 default:
770                     barf("deadlock: main thread blocked in a strange way");
771                 }
772             }
773 #else
774             m = main_threads;
775             switch (m->tso->why_blocked) {
776             case BlockedOnBlackHole:
777                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
778                 break;
779             case BlockedOnException:
780             case BlockedOnMVar:
781                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
782                 break;
783             default:
784                 barf("deadlock: main thread blocked in a strange way");
785             }
786 #endif
787         }
788
789 #if defined(RTS_SUPPORTS_THREADS)
790         /* ToDo: revisit conditions (and mechanism) for shutting
791            down a multi-threaded world  */
792         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
793         RELEASE_LOCK(&sched_mutex);
794         shutdownHaskell();
795         return;
796 #endif
797     }
798   not_deadlocked:
799
800 #elif defined(RTS_SUPPORTS_THREADS)
801     /* ToDo: add deadlock detection in threaded RTS */
802 #elif defined(PAR)
803     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
804 #endif
805
806 #if defined(SMP)
807     /* If there's a GC pending, don't do anything until it has
808      * completed.
809      */
810     if (ready_to_gc) {
811       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
812       waitCondition( &gc_pending_cond, &sched_mutex );
813     }
814 #endif    
815
816 #if defined(RTS_SUPPORTS_THREADS)
817 #if defined(SMP)
818     /* block until we've got a thread on the run queue and a free
819      * capability.
820      *
821      */
822     if ( EMPTY_RUN_QUEUE() ) {
823       /* Give up our capability */
824       releaseCapability(cap);
825
826       /* If we're in the process of shutting down (& running the
827        * a batch of finalisers), don't wait around.
828        */
829       if ( shutting_down_scheduler ) {
830         RELEASE_LOCK(&sched_mutex);
831         return;
832       }
833       IF_DEBUG(scheduler, sched_belch("waiting for work"));
834       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
835       IF_DEBUG(scheduler, sched_belch("work now available"));
836     }
837 #else
838     if ( EMPTY_RUN_QUEUE() ) {
839       continue; // nothing to do
840     }
841 #endif
842 #endif
843
844 #if defined(GRAN)
845     if (RtsFlags.GranFlags.Light)
846       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
847
848     /* adjust time based on time-stamp */
849     if (event->time > CurrentTime[CurrentProc] &&
850         event->evttype != ContinueThread)
851       CurrentTime[CurrentProc] = event->time;
852     
853     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
854     if (!RtsFlags.GranFlags.Light)
855       handleIdlePEs();
856
857     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
858
859     /* main event dispatcher in GranSim */
860     switch (event->evttype) {
861       /* Should just be continuing execution */
862     case ContinueThread:
863       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
864       /* ToDo: check assertion
865       ASSERT(run_queue_hd != (StgTSO*)NULL &&
866              run_queue_hd != END_TSO_QUEUE);
867       */
868       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
869       if (!RtsFlags.GranFlags.DoAsyncFetch &&
870           procStatus[CurrentProc]==Fetching) {
871         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
872               CurrentTSO->id, CurrentTSO, CurrentProc);
873         goto next_thread;
874       } 
875       /* Ignore ContinueThreads for completed threads */
876       if (CurrentTSO->what_next == ThreadComplete) {
877         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
878               CurrentTSO->id, CurrentTSO, CurrentProc);
879         goto next_thread;
880       } 
881       /* Ignore ContinueThreads for threads that are being migrated */
882       if (PROCS(CurrentTSO)==Nowhere) { 
883         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
884               CurrentTSO->id, CurrentTSO, CurrentProc);
885         goto next_thread;
886       }
887       /* The thread should be at the beginning of the run queue */
888       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
889         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
890               CurrentTSO->id, CurrentTSO, CurrentProc);
891         break; // run the thread anyway
892       }
893       /*
894       new_event(proc, proc, CurrentTime[proc],
895                 FindWork,
896                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
897       goto next_thread; 
898       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
899       break; // now actually run the thread; DaH Qu'vam yImuHbej 
900
901     case FetchNode:
902       do_the_fetchnode(event);
903       goto next_thread;             /* handle next event in event queue  */
904       
905     case GlobalBlock:
906       do_the_globalblock(event);
907       goto next_thread;             /* handle next event in event queue  */
908       
909     case FetchReply:
910       do_the_fetchreply(event);
911       goto next_thread;             /* handle next event in event queue  */
912       
913     case UnblockThread:   /* Move from the blocked queue to the tail of */
914       do_the_unblock(event);
915       goto next_thread;             /* handle next event in event queue  */
916       
917     case ResumeThread:  /* Move from the blocked queue to the tail of */
918       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
919       event->tso->gran.blocktime += 
920         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
921       do_the_startthread(event);
922       goto next_thread;             /* handle next event in event queue  */
923       
924     case StartThread:
925       do_the_startthread(event);
926       goto next_thread;             /* handle next event in event queue  */
927       
928     case MoveThread:
929       do_the_movethread(event);
930       goto next_thread;             /* handle next event in event queue  */
931       
932     case MoveSpark:
933       do_the_movespark(event);
934       goto next_thread;             /* handle next event in event queue  */
935       
936     case FindWork:
937       do_the_findwork(event);
938       goto next_thread;             /* handle next event in event queue  */
939       
940     default:
941       barf("Illegal event type %u\n", event->evttype);
942     }  /* switch */
943     
944     /* This point was scheduler_loop in the old RTS */
945
946     IF_DEBUG(gran, belch("GRAN: after main switch"));
947
948     TimeOfLastEvent = CurrentTime[CurrentProc];
949     TimeOfNextEvent = get_time_of_next_event();
950     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
951     // CurrentTSO = ThreadQueueHd;
952
953     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
954                          TimeOfNextEvent));
955
956     if (RtsFlags.GranFlags.Light) 
957       GranSimLight_leave_system(event, &ActiveTSO); 
958
959     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
960
961     IF_DEBUG(gran, 
962              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
963
964     /* in a GranSim setup the TSO stays on the run queue */
965     t = CurrentTSO;
966     /* Take a thread from the run queue. */
967     POP_RUN_QUEUE(t); // take_off_run_queue(t);
968
969     IF_DEBUG(gran, 
970              fprintf(stderr, "GRAN: About to run current thread, which is\n");
971              G_TSO(t,5));
972
973     context_switch = 0; // turned on via GranYield, checking events and time slice
974
975     IF_DEBUG(gran, 
976              DumpGranEvent(GR_SCHEDULE, t));
977
978     procStatus[CurrentProc] = Busy;
979
980 #elif defined(PAR)
981     if (PendingFetches != END_BF_QUEUE) {
982         processFetches();
983     }
984
985     /* ToDo: phps merge with spark activation above */
986     /* check whether we have local work and send requests if we have none */
987     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
988       /* :-[  no local threads => look out for local sparks */
989       /* the spark pool for the current PE */
990       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
991       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
992           pool->hd < pool->tl) {
993         /* 
994          * ToDo: add GC code check that we really have enough heap afterwards!!
995          * Old comment:
996          * If we're here (no runnable threads) and we have pending
997          * sparks, we must have a space problem.  Get enough space
998          * to turn one of those pending sparks into a
999          * thread... 
1000          */
1001
1002         spark = findSpark(rtsFalse);                /* get a spark */
1003         if (spark != (rtsSpark) NULL) {
1004           tso = activateSpark(spark);       /* turn the spark into a thread */
1005           IF_PAR_DEBUG(schedule,
1006                        belch("==== schedule: Created TSO %d (%p); %d threads active",
1007                              tso->id, tso, advisory_thread_count));
1008
1009           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1010             belch("==^^ failed to activate spark");
1011             goto next_thread;
1012           }               /* otherwise fall through & pick-up new tso */
1013         } else {
1014           IF_PAR_DEBUG(verbose,
1015                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
1016                              spark_queue_len(pool)));
1017           goto next_thread;
1018         }
1019       }
1020
1021       /* If we still have no work we need to send a FISH to get a spark
1022          from another PE 
1023       */
1024       if (EMPTY_RUN_QUEUE()) {
1025       /* =8-[  no local sparks => look for work on other PEs */
1026         /*
1027          * We really have absolutely no work.  Send out a fish
1028          * (there may be some out there already), and wait for
1029          * something to arrive.  We clearly can't run any threads
1030          * until a SCHEDULE or RESUME arrives, and so that's what
1031          * we're hoping to see.  (Of course, we still have to
1032          * respond to other types of messages.)
1033          */
1034         TIME now = msTime() /*CURRENT_TIME*/;
1035         IF_PAR_DEBUG(verbose, 
1036                      belch("--  now=%ld", now));
1037         IF_PAR_DEBUG(verbose,
1038                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1039                          (last_fish_arrived_at!=0 &&
1040                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
1041                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
1042                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
1043                              last_fish_arrived_at,
1044                              RtsFlags.ParFlags.fishDelay, now);
1045                      });
1046         
1047         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1048             (last_fish_arrived_at==0 ||
1049              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
1050           /* outstandingFishes is set in sendFish, processFish;
1051              avoid flooding system with fishes via delay */
1052           pe = choosePE();
1053           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1054                    NEW_FISH_HUNGER);
1055
1056           // Global statistics: count no. of fishes
1057           if (RtsFlags.ParFlags.ParStats.Global &&
1058               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1059             globalParStats.tot_fish_mess++;
1060           }
1061         }
1062       
1063         receivedFinish = processMessages();
1064         goto next_thread;
1065       }
1066     } else if (PacketsWaiting()) {  /* Look for incoming messages */
1067       receivedFinish = processMessages();
1068     }
1069
1070     /* Now we are sure that we have some work available */
1071     ASSERT(run_queue_hd != END_TSO_QUEUE);
1072
1073     /* Take a thread from the run queue, if we have work */
1074     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1075     IF_DEBUG(sanity,checkTSO(t));
1076
1077     /* ToDo: write something to the log-file
1078     if (RTSflags.ParFlags.granSimStats && !sameThread)
1079         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1080
1081     CurrentTSO = t;
1082     */
1083     /* the spark pool for the current PE */
1084     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1085
1086     IF_DEBUG(scheduler, 
1087              belch("--=^ %d threads, %d sparks on [%#x]", 
1088                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1089
1090 # if 1
1091     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1092         t && LastTSO && t->id != LastTSO->id && 
1093         LastTSO->why_blocked == NotBlocked && 
1094         LastTSO->what_next != ThreadComplete) {
1095       // if previously scheduled TSO not blocked we have to record the context switch
1096       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1097                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1098     }
1099
1100     if (RtsFlags.ParFlags.ParStats.Full && 
1101         (emitSchedule /* forced emit */ ||
1102         (t && LastTSO && t->id != LastTSO->id))) {
1103       /* 
1104          we are running a different TSO, so write a schedule event to log file
1105          NB: If we use fair scheduling we also have to write  a deschedule 
1106              event for LastTSO; with unfair scheduling we know that the
1107              previous tso has blocked whenever we switch to another tso, so
1108              we don't need it in GUM for now
1109       */
1110       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1111                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1112       emitSchedule = rtsFalse;
1113     }
1114      
1115 # endif
1116 #else /* !GRAN && !PAR */
1117   
1118     // grab a thread from the run queue
1119     ASSERT(run_queue_hd != END_TSO_QUEUE);
1120     POP_RUN_QUEUE(t);
1121
1122     // Sanity check the thread we're about to run.  This can be
1123     // expensive if there is lots of thread switching going on...
1124     IF_DEBUG(sanity,checkTSO(t));
1125 #endif
1126
1127 #ifdef THREADED_RTS
1128     {
1129       StgMainThread *m;
1130       for(m = main_threads; m; m = m->link)
1131       {
1132         if(m->tso == t)
1133           break;
1134       }
1135       
1136       if(m)
1137       {
1138         if(m == mainThread)
1139         {
1140           IF_DEBUG(scheduler,
1141             sched_belch("### Running thread %d in bound thread", t->id));
1142           // yes, the Haskell thread is bound to the current native thread
1143         }
1144         else
1145         {
1146           IF_DEBUG(scheduler,
1147             sched_belch("### thread %d bound to another OS thread", t->id));
1148           // no, bound to a different Haskell thread: pass to that thread
1149           PUSH_ON_RUN_QUEUE(t);
1150           passCapability(&sched_mutex,cap,&m->bound_thread_cond);
1151           cap = NULL;
1152           continue;
1153         }
1154       }
1155       else
1156       {
1157         if(mainThread != NULL)
1158         // The thread we want to run is bound.
1159         {
1160           IF_DEBUG(scheduler,
1161             sched_belch("### this OS thread cannot run thread %d", t->id));
1162           // no, the current native thread is bound to a different
1163           // Haskell thread, so pass it to any worker thread
1164           PUSH_ON_RUN_QUEUE(t);
1165           passCapabilityToWorker(&sched_mutex, cap);
1166           cap = NULL;
1167           continue; 
1168         }
1169       }
1170     }
1171 #endif
1172
1173     cap->r.rCurrentTSO = t;
1174     
1175     /* context switches are now initiated by the timer signal, unless
1176      * the user specified "context switch as often as possible", with
1177      * +RTS -C0
1178      */
1179     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1180          && (run_queue_hd != END_TSO_QUEUE
1181              || blocked_queue_hd != END_TSO_QUEUE
1182              || sleeping_queue != END_TSO_QUEUE)))
1183         context_switch = 1;
1184     else
1185         context_switch = 0;
1186
1187 run_thread:
1188
1189     RELEASE_LOCK(&sched_mutex);
1190
1191     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
1192                               t->id, whatNext_strs[t->what_next]));
1193
1194 #ifdef PROFILING
1195     startHeapProfTimer();
1196 #endif
1197
1198     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1199     /* Run the current thread 
1200      */
1201     prev_what_next = t->what_next;
1202     switch (prev_what_next) {
1203     case ThreadKilled:
1204     case ThreadComplete:
1205         /* Thread already finished, return to scheduler. */
1206         ret = ThreadFinished;
1207         break;
1208     case ThreadRunGHC:
1209         errno = t->saved_errno;
1210         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1211         t->saved_errno = errno;
1212         break;
1213     case ThreadInterpret:
1214         ret = interpretBCO(cap);
1215         break;
1216     default:
1217       barf("schedule: invalid what_next field");
1218     }
1219     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1220     
1221     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1222 #ifdef PROFILING
1223     stopHeapProfTimer();
1224     CCCS = CCS_SYSTEM;
1225 #endif
1226     
1227     ACQUIRE_LOCK(&sched_mutex);
1228     
1229 #ifdef RTS_SUPPORTS_THREADS
1230     IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
1231 #elif !defined(GRAN) && !defined(PAR)
1232     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
1233 #endif
1234     t = cap->r.rCurrentTSO;
1235     
1236 #if defined(PAR)
1237     /* HACK 675: if the last thread didn't yield, make sure to print a 
1238        SCHEDULE event to the log file when StgRunning the next thread, even
1239        if it is the same one as before */
1240     LastTSO = t; 
1241     TimeOfLastYield = CURRENT_TIME;
1242 #endif
1243
1244     switch (ret) {
1245     case HeapOverflow:
1246 #if defined(GRAN)
1247       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1248       globalGranStats.tot_heapover++;
1249 #elif defined(PAR)
1250       globalParStats.tot_heapover++;
1251 #endif
1252
1253       // did the task ask for a large block?
1254       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1255           // if so, get one and push it on the front of the nursery.
1256           bdescr *bd;
1257           nat blocks;
1258           
1259           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1260
1261           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
1262                                    t->id, whatNext_strs[t->what_next], blocks));
1263
1264           // don't do this if it would push us over the
1265           // alloc_blocks_lim limit; we'll GC first.
1266           if (alloc_blocks + blocks < alloc_blocks_lim) {
1267
1268               alloc_blocks += blocks;
1269               bd = allocGroup( blocks );
1270
1271               // link the new group into the list
1272               bd->link = cap->r.rCurrentNursery;
1273               bd->u.back = cap->r.rCurrentNursery->u.back;
1274               if (cap->r.rCurrentNursery->u.back != NULL) {
1275                   cap->r.rCurrentNursery->u.back->link = bd;
1276               } else {
1277                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1278                          g0s0->blocks == cap->r.rNursery);
1279                   cap->r.rNursery = g0s0->blocks = bd;
1280               }           
1281               cap->r.rCurrentNursery->u.back = bd;
1282
1283               // initialise it as a nursery block.  We initialise the
1284               // step, gen_no, and flags field of *every* sub-block in
1285               // this large block, because this is easier than making
1286               // sure that we always find the block head of a large
1287               // block whenever we call Bdescr() (eg. evacuate() and
1288               // isAlive() in the GC would both have to do this, at
1289               // least).
1290               { 
1291                   bdescr *x;
1292                   for (x = bd; x < bd + blocks; x++) {
1293                       x->step = g0s0;
1294                       x->gen_no = 0;
1295                       x->flags = 0;
1296                   }
1297               }
1298
1299               // don't forget to update the block count in g0s0.
1300               g0s0->n_blocks += blocks;
1301               // This assert can be a killer if the app is doing lots
1302               // of large block allocations.
1303               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1304
1305               // now update the nursery to point to the new block
1306               cap->r.rCurrentNursery = bd;
1307
1308               // we might be unlucky and have another thread get on the
1309               // run queue before us and steal the large block, but in that
1310               // case the thread will just end up requesting another large
1311               // block.
1312               PUSH_ON_RUN_QUEUE(t);
1313               break;
1314           }
1315       }
1316
1317       /* make all the running tasks block on a condition variable,
1318        * maybe set context_switch and wait till they all pile in,
1319        * then have them wait on a GC condition variable.
1320        */
1321       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1322                                t->id, whatNext_strs[t->what_next]));
1323       threadPaused(t);
1324 #if defined(GRAN)
1325       ASSERT(!is_on_queue(t,CurrentProc));
1326 #elif defined(PAR)
1327       /* Currently we emit a DESCHEDULE event before GC in GUM.
1328          ToDo: either add separate event to distinguish SYSTEM time from rest
1329                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1330       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1331         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1332                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1333         emitSchedule = rtsTrue;
1334       }
1335 #endif
1336       
1337       ready_to_gc = rtsTrue;
1338       context_switch = 1;               /* stop other threads ASAP */
1339       PUSH_ON_RUN_QUEUE(t);
1340       /* actual GC is done at the end of the while loop */
1341       break;
1342       
1343     case StackOverflow:
1344 #if defined(GRAN)
1345       IF_DEBUG(gran, 
1346                DumpGranEvent(GR_DESCHEDULE, t));
1347       globalGranStats.tot_stackover++;
1348 #elif defined(PAR)
1349       // IF_DEBUG(par, 
1350       // DumpGranEvent(GR_DESCHEDULE, t);
1351       globalParStats.tot_stackover++;
1352 #endif
1353       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1354                                t->id, whatNext_strs[t->what_next]));
1355       /* just adjust the stack for this thread, then pop it back
1356        * on the run queue.
1357        */
1358       threadPaused(t);
1359       { 
1360         StgMainThread *m;
1361         /* enlarge the stack */
1362         StgTSO *new_t = threadStackOverflow(t);
1363         
1364         /* This TSO has moved, so update any pointers to it from the
1365          * main thread stack.  It better not be on any other queues...
1366          * (it shouldn't be).
1367          */
1368         for (m = main_threads; m != NULL; m = m->link) {
1369           if (m->tso == t) {
1370             m->tso = new_t;
1371           }
1372         }
1373         threadPaused(new_t);
1374         PUSH_ON_RUN_QUEUE(new_t);
1375       }
1376       break;
1377
1378     case ThreadYielding:
1379 #if defined(GRAN)
1380       IF_DEBUG(gran, 
1381                DumpGranEvent(GR_DESCHEDULE, t));
1382       globalGranStats.tot_yields++;
1383 #elif defined(PAR)
1384       // IF_DEBUG(par, 
1385       // DumpGranEvent(GR_DESCHEDULE, t);
1386       globalParStats.tot_yields++;
1387 #endif
1388       /* put the thread back on the run queue.  Then, if we're ready to
1389        * GC, check whether this is the last task to stop.  If so, wake
1390        * up the GC thread.  getThread will block during a GC until the
1391        * GC is finished.
1392        */
1393       IF_DEBUG(scheduler,
1394                if (t->what_next != prev_what_next) {
1395                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1396                          t->id, whatNext_strs[t->what_next]);
1397                } else {
1398                    belch("--<< thread %ld (%s) stopped, yielding", 
1399                          t->id, whatNext_strs[t->what_next]);
1400                }
1401                );
1402
1403       IF_DEBUG(sanity,
1404                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1405                checkTSO(t));
1406       ASSERT(t->link == END_TSO_QUEUE);
1407
1408       // Shortcut if we're just switching evaluators: don't bother
1409       // doing stack squeezing (which can be expensive), just run the
1410       // thread.
1411       if (t->what_next != prev_what_next) {
1412           goto run_thread;
1413       }
1414
1415       threadPaused(t);
1416
1417 #if defined(GRAN)
1418       ASSERT(!is_on_queue(t,CurrentProc));
1419
1420       IF_DEBUG(sanity,
1421                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1422                checkThreadQsSanity(rtsTrue));
1423 #endif
1424
1425 #if defined(PAR)
1426       if (RtsFlags.ParFlags.doFairScheduling) { 
1427         /* this does round-robin scheduling; good for concurrency */
1428         APPEND_TO_RUN_QUEUE(t);
1429       } else {
1430         /* this does unfair scheduling; good for parallelism */
1431         PUSH_ON_RUN_QUEUE(t);
1432       }
1433 #else
1434       // this does round-robin scheduling; good for concurrency
1435       APPEND_TO_RUN_QUEUE(t);
1436 #endif
1437
1438 #if defined(GRAN)
1439       /* add a ContinueThread event to actually process the thread */
1440       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1441                 ContinueThread,
1442                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1443       IF_GRAN_DEBUG(bq, 
1444                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1445                G_EVENTQ(0);
1446                G_CURR_THREADQ(0));
1447 #endif /* GRAN */
1448       break;
1449
1450     case ThreadBlocked:
1451 #if defined(GRAN)
1452       IF_DEBUG(scheduler,
1453                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1454                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1455                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1456
1457       // ??? needed; should emit block before
1458       IF_DEBUG(gran, 
1459                DumpGranEvent(GR_DESCHEDULE, t)); 
1460       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1461       /*
1462         ngoq Dogh!
1463       ASSERT(procStatus[CurrentProc]==Busy || 
1464               ((procStatus[CurrentProc]==Fetching) && 
1465               (t->block_info.closure!=(StgClosure*)NULL)));
1466       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1467           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1468             procStatus[CurrentProc]==Fetching)) 
1469         procStatus[CurrentProc] = Idle;
1470       */
1471 #elif defined(PAR)
1472       IF_DEBUG(scheduler,
1473                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1474                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1475       IF_PAR_DEBUG(bq,
1476
1477                    if (t->block_info.closure!=(StgClosure*)NULL) 
1478                      print_bq(t->block_info.closure));
1479
1480       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1481       blockThread(t);
1482
1483       /* whatever we schedule next, we must log that schedule */
1484       emitSchedule = rtsTrue;
1485
1486 #else /* !GRAN */
1487       /* don't need to do anything.  Either the thread is blocked on
1488        * I/O, in which case we'll have called addToBlockedQueue
1489        * previously, or it's blocked on an MVar or Blackhole, in which
1490        * case it'll be on the relevant queue already.
1491        */
1492       IF_DEBUG(scheduler,
1493                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1494                        t->id, whatNext_strs[t->what_next]);
1495                printThreadBlockage(t);
1496                fprintf(stderr, "\n"));
1497       fflush(stderr);
1498
1499       /* Only for dumping event to log file 
1500          ToDo: do I need this in GranSim, too?
1501       blockThread(t);
1502       */
1503 #endif
1504       threadPaused(t);
1505       break;
1506
1507     case ThreadFinished:
1508       /* Need to check whether this was a main thread, and if so, signal
1509        * the task that started it with the return value.  If we have no
1510        * more main threads, we probably need to stop all the tasks until
1511        * we get a new one.
1512        */
1513       /* We also end up here if the thread kills itself with an
1514        * uncaught exception, see Exception.hc.
1515        */
1516       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1517                                t->id, whatNext_strs[t->what_next]));
1518 #if defined(GRAN)
1519       endThread(t, CurrentProc); // clean-up the thread
1520 #elif defined(PAR)
1521       /* For now all are advisory -- HWL */
1522       //if(t->priority==AdvisoryPriority) ??
1523       advisory_thread_count--;
1524       
1525 # ifdef DIST
1526       if(t->dist.priority==RevalPriority)
1527         FinishReval(t);
1528 # endif
1529       
1530       if (RtsFlags.ParFlags.ParStats.Full &&
1531           !RtsFlags.ParFlags.ParStats.Suppressed) 
1532         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1533 #endif
1534       break;
1535       
1536     default:
1537       barf("schedule: invalid thread return code %d", (int)ret);
1538     }
1539
1540 #ifdef PROFILING
1541     // When we have +RTS -i0 and we're heap profiling, do a census at
1542     // every GC.  This lets us get repeatable runs for debugging.
1543     if (performHeapProfile ||
1544         (RtsFlags.ProfFlags.profileInterval==0 &&
1545          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1546         GarbageCollect(GetRoots, rtsTrue);
1547         heapCensus();
1548         performHeapProfile = rtsFalse;
1549         ready_to_gc = rtsFalse; // we already GC'd
1550     }
1551 #endif
1552
1553     if (ready_to_gc 
1554 #ifdef SMP
1555         && allFreeCapabilities() 
1556 #endif
1557         ) {
1558       /* everybody back, start the GC.
1559        * Could do it in this thread, or signal a condition var
1560        * to do it in another thread.  Either way, we need to
1561        * broadcast on gc_pending_cond afterward.
1562        */
1563 #if defined(RTS_SUPPORTS_THREADS)
1564       IF_DEBUG(scheduler,sched_belch("doing GC"));
1565 #endif
1566       GarbageCollect(GetRoots,rtsFalse);
1567       ready_to_gc = rtsFalse;
1568 #ifdef SMP
1569       broadcastCondition(&gc_pending_cond);
1570 #endif
1571 #if defined(GRAN)
1572       /* add a ContinueThread event to continue execution of current thread */
1573       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1574                 ContinueThread,
1575                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1576       IF_GRAN_DEBUG(bq, 
1577                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1578                G_EVENTQ(0);
1579                G_CURR_THREADQ(0));
1580 #endif /* GRAN */
1581     }
1582
1583 #if defined(GRAN)
1584   next_thread:
1585     IF_GRAN_DEBUG(unused,
1586                   print_eventq(EventHd));
1587
1588     event = get_next_event();
1589 #elif defined(PAR)
1590   next_thread:
1591     /* ToDo: wait for next message to arrive rather than busy wait */
1592 #endif /* GRAN */
1593
1594   } /* end of while(1) */
1595
1596   IF_PAR_DEBUG(verbose,
1597                belch("== Leaving schedule() after having received Finish"));
1598 }
1599
1600 /* ---------------------------------------------------------------------------
1601  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1602  * used by Control.Concurrent for error checking.
1603  * ------------------------------------------------------------------------- */
1604  
1605 StgBool
1606 rtsSupportsBoundThreads(void)
1607 {
1608 #ifdef THREADED_RTS
1609   return rtsTrue;
1610 #else
1611   return rtsFalse;
1612 #endif
1613 }
1614
1615 /* ---------------------------------------------------------------------------
1616  * isThreadBound(tso): check whether tso is bound to an OS thread.
1617  * ------------------------------------------------------------------------- */
1618  
1619 StgBool
1620 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1621 {
1622 #ifdef THREADED_RTS
1623   StgMainThread *m;
1624   for(m = main_threads; m; m = m->link)
1625   {
1626     if(m->tso == tso)
1627       return rtsTrue;
1628   }
1629 #endif
1630   return rtsFalse;
1631 }
1632
1633 /* ---------------------------------------------------------------------------
1634  * Singleton fork(). Do not copy any running threads.
1635  * ------------------------------------------------------------------------- */
1636
1637 static void 
1638 deleteThreadImmediately(StgTSO *tso);
1639
1640 StgInt
1641 forkProcess(HsStablePtr *entry)
1642 {
1643 #ifndef mingw32_TARGET_OS
1644   pid_t pid;
1645   StgTSO* t,*next;
1646   StgMainThread *m;
1647   SchedulerStatus rc;
1648
1649   IF_DEBUG(scheduler,sched_belch("forking!"));
1650   rts_lock(); // This not only acquires sched_mutex, it also
1651               // makes sure that no other threads are running
1652
1653   pid = fork();
1654
1655   if (pid) { /* parent */
1656
1657   /* just return the pid */
1658     rts_unlock();
1659     return pid;
1660     
1661   } else { /* child */
1662     
1663     
1664       // delete all threads
1665     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1666     
1667     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1668       next = t->link;
1669
1670         // don't allow threads to catch the ThreadKilled exception
1671       deleteThreadImmediately(t);
1672     }
1673     
1674       // wipe the main thread list
1675     while((m = main_threads) != NULL) {
1676       main_threads = m->link;
1677 #ifdef THREADED_RTS
1678       closeCondition(&m->bound_thread_cond);
1679 #endif
1680       stgFree(m);
1681     }
1682     
1683 #ifdef RTS_SUPPORTS_THREADS
1684     resetTaskManagerAfterFork();      // tell startTask() and friends that
1685     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1686     resetWorkerWakeupPipeAfterFork();
1687 #endif
1688     
1689     rc = rts_evalStableIO(entry, NULL);  // run the action
1690     rts_checkSchedStatus("forkProcess",rc);
1691     
1692     rts_unlock();
1693     
1694     hs_exit();                      // clean up and exit
1695     stg_exit(0);
1696   }
1697 #else /* mingw32 */
1698   barf("forkProcess#: primop not implemented for mingw32, sorry!\n");
1699   return -1;
1700 #endif /* mingw32 */
1701 }
1702
1703 /* ---------------------------------------------------------------------------
1704  * deleteAllThreads():  kill all the live threads.
1705  *
1706  * This is used when we catch a user interrupt (^C), before performing
1707  * any necessary cleanups and running finalizers.
1708  *
1709  * Locks: sched_mutex held.
1710  * ------------------------------------------------------------------------- */
1711    
1712 void
1713 deleteAllThreads ( void )
1714 {
1715   StgTSO* t, *next;
1716   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1717   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1718       next = t->global_link;
1719       deleteThread(t);
1720   }      
1721   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1722   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1723   sleeping_queue = END_TSO_QUEUE;
1724 }
1725
1726 /* startThread and  insertThread are now in GranSim.c -- HWL */
1727
1728
1729 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1730 //@subsection Suspend and Resume
1731
1732 /* ---------------------------------------------------------------------------
1733  * Suspending & resuming Haskell threads.
1734  * 
1735  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1736  * its capability before calling the C function.  This allows another
1737  * task to pick up the capability and carry on running Haskell
1738  * threads.  It also means that if the C call blocks, it won't lock
1739  * the whole system.
1740  *
1741  * The Haskell thread making the C call is put to sleep for the
1742  * duration of the call, on the susepended_ccalling_threads queue.  We
1743  * give out a token to the task, which it can use to resume the thread
1744  * on return from the C function.
1745  * ------------------------------------------------------------------------- */
1746    
1747 StgInt
1748 suspendThread( StgRegTable *reg, 
1749                rtsBool concCall
1750 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1751                STG_UNUSED
1752 #endif
1753                )
1754 {
1755   nat tok;
1756   Capability *cap;
1757   int saved_errno = errno;
1758
1759   /* assume that *reg is a pointer to the StgRegTable part
1760    * of a Capability.
1761    */
1762   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1763
1764   ACQUIRE_LOCK(&sched_mutex);
1765
1766   IF_DEBUG(scheduler,
1767            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1768
1769   // XXX this might not be necessary --SDM
1770   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1771
1772   threadPaused(cap->r.rCurrentTSO);
1773   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1774   suspended_ccalling_threads = cap->r.rCurrentTSO;
1775
1776 #if defined(RTS_SUPPORTS_THREADS)
1777   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1778   {
1779       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1780       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1781   }
1782   else
1783   {
1784       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1785   }
1786 #endif
1787
1788   /* Use the thread ID as the token; it should be unique */
1789   tok = cap->r.rCurrentTSO->id;
1790
1791   /* Hand back capability */
1792   releaseCapability(cap);
1793   
1794 #if defined(RTS_SUPPORTS_THREADS)
1795   /* Preparing to leave the RTS, so ensure there's a native thread/task
1796      waiting to take over.
1797   */
1798   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1799 #endif
1800
1801   /* Other threads _might_ be available for execution; signal this */
1802   THREAD_RUNNABLE();
1803   RELEASE_LOCK(&sched_mutex);
1804   
1805   errno = saved_errno;
1806   return tok; 
1807 }
1808
1809 StgRegTable *
1810 resumeThread( StgInt tok,
1811               rtsBool concCall STG_UNUSED )
1812 {
1813   StgTSO *tso, **prev;
1814   Capability *cap;
1815   int saved_errno = errno;
1816
1817 #if defined(RTS_SUPPORTS_THREADS)
1818   /* Wait for permission to re-enter the RTS with the result. */
1819   ACQUIRE_LOCK(&sched_mutex);
1820   grabReturnCapability(&sched_mutex, &cap);
1821
1822   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1823 #else
1824   grabCapability(&cap);
1825 #endif
1826
1827   /* Remove the thread off of the suspended list */
1828   prev = &suspended_ccalling_threads;
1829   for (tso = suspended_ccalling_threads; 
1830        tso != END_TSO_QUEUE; 
1831        prev = &tso->link, tso = tso->link) {
1832     if (tso->id == (StgThreadID)tok) {
1833       *prev = tso->link;
1834       break;
1835     }
1836   }
1837   if (tso == END_TSO_QUEUE) {
1838     barf("resumeThread: thread not found");
1839   }
1840   tso->link = END_TSO_QUEUE;
1841   
1842 #if defined(RTS_SUPPORTS_THREADS)
1843   if(tso->why_blocked == BlockedOnCCall)
1844   {
1845       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1846       tso->blocked_exceptions = NULL;
1847   }
1848 #endif
1849   
1850   /* Reset blocking status */
1851   tso->why_blocked  = NotBlocked;
1852
1853   cap->r.rCurrentTSO = tso;
1854 #if defined(RTS_SUPPORTS_THREADS)
1855   RELEASE_LOCK(&sched_mutex);
1856 #endif
1857   errno = saved_errno;
1858   return &cap->r;
1859 }
1860
1861
1862 /* ---------------------------------------------------------------------------
1863  * Static functions
1864  * ------------------------------------------------------------------------ */
1865 static void unblockThread(StgTSO *tso);
1866
1867 /* ---------------------------------------------------------------------------
1868  * Comparing Thread ids.
1869  *
1870  * This is used from STG land in the implementation of the
1871  * instances of Eq/Ord for ThreadIds.
1872  * ------------------------------------------------------------------------ */
1873
1874 int
1875 cmp_thread(StgPtr tso1, StgPtr tso2) 
1876
1877   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1878   StgThreadID id2 = ((StgTSO *)tso2)->id;
1879  
1880   if (id1 < id2) return (-1);
1881   if (id1 > id2) return 1;
1882   return 0;
1883 }
1884
1885 /* ---------------------------------------------------------------------------
1886  * Fetching the ThreadID from an StgTSO.
1887  *
1888  * This is used in the implementation of Show for ThreadIds.
1889  * ------------------------------------------------------------------------ */
1890 int
1891 rts_getThreadId(StgPtr tso) 
1892 {
1893   return ((StgTSO *)tso)->id;
1894 }
1895
1896 #ifdef DEBUG
1897 void
1898 labelThread(StgPtr tso, char *label)
1899 {
1900   int len;
1901   void *buf;
1902
1903   /* Caveat: Once set, you can only set the thread name to "" */
1904   len = strlen(label)+1;
1905   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1906   strncpy(buf,label,len);
1907   /* Update will free the old memory for us */
1908   updateThreadLabel(((StgTSO *)tso)->id,buf);
1909 }
1910 #endif /* DEBUG */
1911
1912 /* ---------------------------------------------------------------------------
1913    Create a new thread.
1914
1915    The new thread starts with the given stack size.  Before the
1916    scheduler can run, however, this thread needs to have a closure
1917    (and possibly some arguments) pushed on its stack.  See
1918    pushClosure() in Schedule.h.
1919
1920    createGenThread() and createIOThread() (in SchedAPI.h) are
1921    convenient packaged versions of this function.
1922
1923    currently pri (priority) is only used in a GRAN setup -- HWL
1924    ------------------------------------------------------------------------ */
1925 //@cindex createThread
1926 #if defined(GRAN)
1927 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1928 StgTSO *
1929 createThread(nat size, StgInt pri)
1930 #else
1931 StgTSO *
1932 createThread(nat size)
1933 #endif
1934 {
1935
1936     StgTSO *tso;
1937     nat stack_size;
1938
1939     /* First check whether we should create a thread at all */
1940 #if defined(PAR)
1941   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1942   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1943     threadsIgnored++;
1944     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1945           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1946     return END_TSO_QUEUE;
1947   }
1948   threadsCreated++;
1949 #endif
1950
1951 #if defined(GRAN)
1952   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1953 #endif
1954
1955   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1956
1957   /* catch ridiculously small stack sizes */
1958   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1959     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1960   }
1961
1962   stack_size = size - TSO_STRUCT_SIZEW;
1963
1964   tso = (StgTSO *)allocate(size);
1965   TICK_ALLOC_TSO(stack_size, 0);
1966
1967   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1968 #if defined(GRAN)
1969   SET_GRAN_HDR(tso, ThisPE);
1970 #endif
1971
1972   // Always start with the compiled code evaluator
1973   tso->what_next = ThreadRunGHC;
1974
1975   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1976    * protect the increment operation on next_thread_id.
1977    * In future, we could use an atomic increment instead.
1978    */
1979   ACQUIRE_LOCK(&thread_id_mutex);
1980   tso->id = next_thread_id++; 
1981   RELEASE_LOCK(&thread_id_mutex);
1982
1983   tso->why_blocked  = NotBlocked;
1984   tso->blocked_exceptions = NULL;
1985
1986   tso->saved_errno = 0;
1987   
1988   tso->stack_size   = stack_size;
1989   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1990                               - TSO_STRUCT_SIZEW;
1991   tso->sp           = (P_)&(tso->stack) + stack_size;
1992
1993 #ifdef PROFILING
1994   tso->prof.CCCS = CCS_MAIN;
1995 #endif
1996
1997   /* put a stop frame on the stack */
1998   tso->sp -= sizeofW(StgStopFrame);
1999   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2000   // ToDo: check this
2001 #if defined(GRAN)
2002   tso->link = END_TSO_QUEUE;
2003   /* uses more flexible routine in GranSim */
2004   insertThread(tso, CurrentProc);
2005 #else
2006   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2007    * from its creation
2008    */
2009 #endif
2010
2011 #if defined(GRAN) 
2012   if (RtsFlags.GranFlags.GranSimStats.Full) 
2013     DumpGranEvent(GR_START,tso);
2014 #elif defined(PAR)
2015   if (RtsFlags.ParFlags.ParStats.Full) 
2016     DumpGranEvent(GR_STARTQ,tso);
2017   /* HACk to avoid SCHEDULE 
2018      LastTSO = tso; */
2019 #endif
2020
2021   /* Link the new thread on the global thread list.
2022    */
2023   tso->global_link = all_threads;
2024   all_threads = tso;
2025
2026 #if defined(DIST)
2027   tso->dist.priority = MandatoryPriority; //by default that is...
2028 #endif
2029
2030 #if defined(GRAN)
2031   tso->gran.pri = pri;
2032 # if defined(DEBUG)
2033   tso->gran.magic = TSO_MAGIC; // debugging only
2034 # endif
2035   tso->gran.sparkname   = 0;
2036   tso->gran.startedat   = CURRENT_TIME; 
2037   tso->gran.exported    = 0;
2038   tso->gran.basicblocks = 0;
2039   tso->gran.allocs      = 0;
2040   tso->gran.exectime    = 0;
2041   tso->gran.fetchtime   = 0;
2042   tso->gran.fetchcount  = 0;
2043   tso->gran.blocktime   = 0;
2044   tso->gran.blockcount  = 0;
2045   tso->gran.blockedat   = 0;
2046   tso->gran.globalsparks = 0;
2047   tso->gran.localsparks  = 0;
2048   if (RtsFlags.GranFlags.Light)
2049     tso->gran.clock  = Now; /* local clock */
2050   else
2051     tso->gran.clock  = 0;
2052
2053   IF_DEBUG(gran,printTSO(tso));
2054 #elif defined(PAR)
2055 # if defined(DEBUG)
2056   tso->par.magic = TSO_MAGIC; // debugging only
2057 # endif
2058   tso->par.sparkname   = 0;
2059   tso->par.startedat   = CURRENT_TIME; 
2060   tso->par.exported    = 0;
2061   tso->par.basicblocks = 0;
2062   tso->par.allocs      = 0;
2063   tso->par.exectime    = 0;
2064   tso->par.fetchtime   = 0;
2065   tso->par.fetchcount  = 0;
2066   tso->par.blocktime   = 0;
2067   tso->par.blockcount  = 0;
2068   tso->par.blockedat   = 0;
2069   tso->par.globalsparks = 0;
2070   tso->par.localsparks  = 0;
2071 #endif
2072
2073 #if defined(GRAN)
2074   globalGranStats.tot_threads_created++;
2075   globalGranStats.threads_created_on_PE[CurrentProc]++;
2076   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2077   globalGranStats.tot_sq_probes++;
2078 #elif defined(PAR)
2079   // collect parallel global statistics (currently done together with GC stats)
2080   if (RtsFlags.ParFlags.ParStats.Global &&
2081       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2082     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2083     globalParStats.tot_threads_created++;
2084   }
2085 #endif 
2086
2087 #if defined(GRAN)
2088   IF_GRAN_DEBUG(pri,
2089                 belch("==__ schedule: Created TSO %d (%p);",
2090                       CurrentProc, tso, tso->id));
2091 #elif defined(PAR)
2092     IF_PAR_DEBUG(verbose,
2093                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
2094                        tso->id, tso, advisory_thread_count));
2095 #else
2096   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2097                                  tso->id, tso->stack_size));
2098 #endif    
2099   return tso;
2100 }
2101
2102 #if defined(PAR)
2103 /* RFP:
2104    all parallel thread creation calls should fall through the following routine.
2105 */
2106 StgTSO *
2107 createSparkThread(rtsSpark spark) 
2108 { StgTSO *tso;
2109   ASSERT(spark != (rtsSpark)NULL);
2110   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2111   { threadsIgnored++;
2112     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2113           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2114     return END_TSO_QUEUE;
2115   }
2116   else
2117   { threadsCreated++;
2118     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2119     if (tso==END_TSO_QUEUE)     
2120       barf("createSparkThread: Cannot create TSO");
2121 #if defined(DIST)
2122     tso->priority = AdvisoryPriority;
2123 #endif
2124     pushClosure(tso,spark);
2125     PUSH_ON_RUN_QUEUE(tso);
2126     advisory_thread_count++;    
2127   }
2128   return tso;
2129 }
2130 #endif
2131
2132 /*
2133   Turn a spark into a thread.
2134   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2135 */
2136 #if defined(PAR)
2137 //@cindex activateSpark
2138 StgTSO *
2139 activateSpark (rtsSpark spark) 
2140 {
2141   StgTSO *tso;
2142
2143   tso = createSparkThread(spark);
2144   if (RtsFlags.ParFlags.ParStats.Full) {   
2145     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2146     IF_PAR_DEBUG(verbose,
2147                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
2148                        (StgClosure *)spark, info_type((StgClosure *)spark)));
2149   }
2150   // ToDo: fwd info on local/global spark to thread -- HWL
2151   // tso->gran.exported =  spark->exported;
2152   // tso->gran.locked =   !spark->global;
2153   // tso->gran.sparkname = spark->name;
2154
2155   return tso;
2156 }
2157 #endif
2158
2159 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
2160                                    Capability *initialCapability
2161                                    );
2162
2163
2164 /* ---------------------------------------------------------------------------
2165  * scheduleThread()
2166  *
2167  * scheduleThread puts a thread on the head of the runnable queue.
2168  * This will usually be done immediately after a thread is created.
2169  * The caller of scheduleThread must create the thread using e.g.
2170  * createThread and push an appropriate closure
2171  * on this thread's stack before the scheduler is invoked.
2172  * ------------------------------------------------------------------------ */
2173
2174 static void scheduleThread_ (StgTSO* tso);
2175
2176 void
2177 scheduleThread_(StgTSO *tso)
2178 {
2179   // Precondition: sched_mutex must be held.
2180
2181   /* Put the new thread on the head of the runnable queue.  The caller
2182    * better push an appropriate closure on this thread's stack
2183    * beforehand.  In the SMP case, the thread may start running as
2184    * soon as we release the scheduler lock below.
2185    */
2186   PUSH_ON_RUN_QUEUE(tso);
2187   THREAD_RUNNABLE();
2188
2189 #if 0
2190   IF_DEBUG(scheduler,printTSO(tso));
2191 #endif
2192 }
2193
2194 void scheduleThread(StgTSO* tso)
2195 {
2196   ACQUIRE_LOCK(&sched_mutex);
2197   scheduleThread_(tso);
2198   RELEASE_LOCK(&sched_mutex);
2199 }
2200
2201 SchedulerStatus
2202 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability)
2203 {       // Precondition: sched_mutex must be held
2204   StgMainThread *m;
2205
2206   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2207   m->tso = tso;
2208   m->ret = ret;
2209   m->stat = NoStatus;
2210 #if defined(RTS_SUPPORTS_THREADS)
2211 #if defined(THREADED_RTS)
2212   initCondition(&m->bound_thread_cond);
2213 #else
2214   initCondition(&m->wakeup);
2215 #endif
2216 #endif
2217
2218   /* Put the thread on the main-threads list prior to scheduling the TSO.
2219      Failure to do so introduces a race condition in the MT case (as
2220      identified by Wolfgang Thaller), whereby the new task/OS thread 
2221      created by scheduleThread_() would complete prior to the thread
2222      that spawned it managed to put 'itself' on the main-threads list.
2223      The upshot of it all being that the worker thread wouldn't get to
2224      signal the completion of the its work item for the main thread to
2225      see (==> it got stuck waiting.)    -- sof 6/02.
2226   */
2227   IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2228   
2229   m->link = main_threads;
2230   main_threads = m;
2231
2232   scheduleThread_(tso);
2233
2234   return waitThread_(m, initialCapability);
2235 }
2236
2237 /* ---------------------------------------------------------------------------
2238  * initScheduler()
2239  *
2240  * Initialise the scheduler.  This resets all the queues - if the
2241  * queues contained any threads, they'll be garbage collected at the
2242  * next pass.
2243  *
2244  * ------------------------------------------------------------------------ */
2245
2246 #ifdef SMP
2247 static void
2248 term_handler(int sig STG_UNUSED)
2249 {
2250   stat_workerStop();
2251   ACQUIRE_LOCK(&term_mutex);
2252   await_death--;
2253   RELEASE_LOCK(&term_mutex);
2254   shutdownThread();
2255 }
2256 #endif
2257
2258 void 
2259 initScheduler(void)
2260 {
2261 #if defined(GRAN)
2262   nat i;
2263
2264   for (i=0; i<=MAX_PROC; i++) {
2265     run_queue_hds[i]      = END_TSO_QUEUE;
2266     run_queue_tls[i]      = END_TSO_QUEUE;
2267     blocked_queue_hds[i]  = END_TSO_QUEUE;
2268     blocked_queue_tls[i]  = END_TSO_QUEUE;
2269     ccalling_threadss[i]  = END_TSO_QUEUE;
2270     sleeping_queue        = END_TSO_QUEUE;
2271   }
2272 #else
2273   run_queue_hd      = END_TSO_QUEUE;
2274   run_queue_tl      = END_TSO_QUEUE;
2275   blocked_queue_hd  = END_TSO_QUEUE;
2276   blocked_queue_tl  = END_TSO_QUEUE;
2277   sleeping_queue    = END_TSO_QUEUE;
2278 #endif 
2279
2280   suspended_ccalling_threads  = END_TSO_QUEUE;
2281
2282   main_threads = NULL;
2283   all_threads  = END_TSO_QUEUE;
2284
2285   context_switch = 0;
2286   interrupted    = 0;
2287
2288   RtsFlags.ConcFlags.ctxtSwitchTicks =
2289       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2290       
2291 #if defined(RTS_SUPPORTS_THREADS)
2292   /* Initialise the mutex and condition variables used by
2293    * the scheduler. */
2294   initMutex(&sched_mutex);
2295   initMutex(&term_mutex);
2296   initMutex(&thread_id_mutex);
2297
2298   initCondition(&thread_ready_cond);
2299 #endif
2300   
2301 #if defined(SMP)
2302   initCondition(&gc_pending_cond);
2303 #endif
2304
2305 #if defined(RTS_SUPPORTS_THREADS)
2306   ACQUIRE_LOCK(&sched_mutex);
2307 #endif
2308
2309   /* Install the SIGHUP handler */
2310 #if defined(SMP)
2311   {
2312     struct sigaction action,oact;
2313
2314     action.sa_handler = term_handler;
2315     sigemptyset(&action.sa_mask);
2316     action.sa_flags = 0;
2317     if (sigaction(SIGTERM, &action, &oact) != 0) {
2318       barf("can't install TERM handler");
2319     }
2320   }
2321 #endif
2322
2323   /* A capability holds the state a native thread needs in
2324    * order to execute STG code. At least one capability is
2325    * floating around (only SMP builds have more than one).
2326    */
2327   initCapabilities();
2328   
2329 #if defined(RTS_SUPPORTS_THREADS)
2330     /* start our haskell execution tasks */
2331 # if defined(SMP)
2332     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2333 # else
2334     startTaskManager(0,taskStart);
2335 # endif
2336 #endif
2337
2338 #if /* defined(SMP) ||*/ defined(PAR)
2339   initSparkPools();
2340 #endif
2341
2342 #if defined(RTS_SUPPORTS_THREADS)
2343   RELEASE_LOCK(&sched_mutex);
2344 #endif
2345
2346 }
2347
2348 void
2349 exitScheduler( void )
2350 {
2351 #if defined(RTS_SUPPORTS_THREADS)
2352   stopTaskManager();
2353 #endif
2354   shutting_down_scheduler = rtsTrue;
2355 }
2356
2357 /* -----------------------------------------------------------------------------
2358    Managing the per-task allocation areas.
2359    
2360    Each capability comes with an allocation area.  These are
2361    fixed-length block lists into which allocation can be done.
2362
2363    ToDo: no support for two-space collection at the moment???
2364    -------------------------------------------------------------------------- */
2365
2366 /* -----------------------------------------------------------------------------
2367  * waitThread is the external interface for running a new computation
2368  * and waiting for the result.
2369  *
2370  * In the non-SMP case, we create a new main thread, push it on the 
2371  * main-thread stack, and invoke the scheduler to run it.  The
2372  * scheduler will return when the top main thread on the stack has
2373  * completed or died, and fill in the necessary fields of the
2374  * main_thread structure.
2375  *
2376  * In the SMP case, we create a main thread as before, but we then
2377  * create a new condition variable and sleep on it.  When our new
2378  * main thread has completed, we'll be woken up and the status/result
2379  * will be in the main_thread struct.
2380  * -------------------------------------------------------------------------- */
2381
2382 int 
2383 howManyThreadsAvail ( void )
2384 {
2385    int i = 0;
2386    StgTSO* q;
2387    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2388       i++;
2389    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2390       i++;
2391    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2392       i++;
2393    return i;
2394 }
2395
2396 void
2397 finishAllThreads ( void )
2398 {
2399    do {
2400       while (run_queue_hd != END_TSO_QUEUE) {
2401          waitThread ( run_queue_hd, NULL, NULL );
2402       }
2403       while (blocked_queue_hd != END_TSO_QUEUE) {
2404          waitThread ( blocked_queue_hd, NULL, NULL );
2405       }
2406       while (sleeping_queue != END_TSO_QUEUE) {
2407          waitThread ( blocked_queue_hd, NULL, NULL );
2408       }
2409    } while 
2410       (blocked_queue_hd != END_TSO_QUEUE || 
2411        run_queue_hd     != END_TSO_QUEUE ||
2412        sleeping_queue   != END_TSO_QUEUE);
2413 }
2414
2415 SchedulerStatus
2416 waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
2417
2418   StgMainThread *m;
2419   SchedulerStatus stat;
2420
2421   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2422   m->tso = tso;
2423   m->ret = ret;
2424   m->stat = NoStatus;
2425 #if defined(RTS_SUPPORTS_THREADS)
2426 #if defined(THREADED_RTS)
2427   initCondition(&m->bound_thread_cond);
2428 #else
2429   initCondition(&m->wakeup);
2430 #endif
2431 #endif
2432
2433   /* see scheduleWaitThread() comment */
2434   ACQUIRE_LOCK(&sched_mutex);
2435   m->link = main_threads;
2436   main_threads = m;
2437
2438   IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
2439
2440   stat = waitThread_(m,initialCapability);
2441   
2442   RELEASE_LOCK(&sched_mutex);
2443   return stat;
2444 }
2445
2446 static
2447 SchedulerStatus
2448 waitThread_(StgMainThread* m, Capability *initialCapability)
2449 {
2450   SchedulerStatus stat;
2451
2452   // Precondition: sched_mutex must be held.
2453   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2454
2455 #if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS)
2456   {     // FIXME: does this still make sense?
2457         // It's not for the threaded rts => SMP only
2458     do {
2459       waitCondition(&m->wakeup, &sched_mutex);
2460     } while (m->stat == NoStatus);
2461   }
2462 #elif defined(GRAN)
2463   /* GranSim specific init */
2464   CurrentTSO = m->tso;                // the TSO to run
2465   procStatus[MainProc] = Busy;        // status of main PE
2466   CurrentProc = MainProc;             // PE to run it on
2467
2468   RELEASE_LOCK(&sched_mutex);
2469   schedule(m,initialCapability);
2470 #else
2471   RELEASE_LOCK(&sched_mutex);
2472   schedule(m,initialCapability);
2473   ACQUIRE_LOCK(&sched_mutex);
2474   ASSERT(m->stat != NoStatus);
2475 #endif
2476
2477   stat = m->stat;
2478
2479 #if defined(RTS_SUPPORTS_THREADS)
2480 #if defined(THREADED_RTS)
2481   closeCondition(&m->bound_thread_cond);
2482 #else
2483   closeCondition(&m->wakeup);
2484 #endif
2485 #endif
2486
2487   IF_DEBUG(scheduler, fprintf(stderr, "== sched: main thread (%d) finished\n", 
2488                               m->tso->id));
2489   stgFree(m);
2490
2491   // Postcondition: sched_mutex still held
2492   return stat;
2493 }
2494
2495 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2496 //@subsection Run queue code 
2497
2498 #if 0
2499 /* 
2500    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2501        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2502        implicit global variable that has to be correct when calling these
2503        fcts -- HWL 
2504 */
2505
2506 /* Put the new thread on the head of the runnable queue.
2507  * The caller of createThread better push an appropriate closure
2508  * on this thread's stack before the scheduler is invoked.
2509  */
2510 static /* inline */ void
2511 add_to_run_queue(tso)
2512 StgTSO* tso; 
2513 {
2514   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2515   tso->link = run_queue_hd;
2516   run_queue_hd = tso;
2517   if (run_queue_tl == END_TSO_QUEUE) {
2518     run_queue_tl = tso;
2519   }
2520 }
2521
2522 /* Put the new thread at the end of the runnable queue. */
2523 static /* inline */ void
2524 push_on_run_queue(tso)
2525 StgTSO* tso; 
2526 {
2527   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2528   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2529   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2530   if (run_queue_hd == END_TSO_QUEUE) {
2531     run_queue_hd = tso;
2532   } else {
2533     run_queue_tl->link = tso;
2534   }
2535   run_queue_tl = tso;
2536 }
2537
2538 /* 
2539    Should be inlined because it's used very often in schedule.  The tso
2540    argument is actually only needed in GranSim, where we want to have the
2541    possibility to schedule *any* TSO on the run queue, irrespective of the
2542    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2543    the run queue and dequeue the tso, adjusting the links in the queue. 
2544 */
2545 //@cindex take_off_run_queue
2546 static /* inline */ StgTSO*
2547 take_off_run_queue(StgTSO *tso) {
2548   StgTSO *t, *prev;
2549
2550   /* 
2551      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2552
2553      if tso is specified, unlink that tso from the run_queue (doesn't have
2554      to be at the beginning of the queue); GranSim only 
2555   */
2556   if (tso!=END_TSO_QUEUE) {
2557     /* find tso in queue */
2558     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2559          t!=END_TSO_QUEUE && t!=tso;
2560          prev=t, t=t->link) 
2561       /* nothing */ ;
2562     ASSERT(t==tso);
2563     /* now actually dequeue the tso */
2564     if (prev!=END_TSO_QUEUE) {
2565       ASSERT(run_queue_hd!=t);
2566       prev->link = t->link;
2567     } else {
2568       /* t is at beginning of thread queue */
2569       ASSERT(run_queue_hd==t);
2570       run_queue_hd = t->link;
2571     }
2572     /* t is at end of thread queue */
2573     if (t->link==END_TSO_QUEUE) {
2574       ASSERT(t==run_queue_tl);
2575       run_queue_tl = prev;
2576     } else {
2577       ASSERT(run_queue_tl!=t);
2578     }
2579     t->link = END_TSO_QUEUE;
2580   } else {
2581     /* take tso from the beginning of the queue; std concurrent code */
2582     t = run_queue_hd;
2583     if (t != END_TSO_QUEUE) {
2584       run_queue_hd = t->link;
2585       t->link = END_TSO_QUEUE;
2586       if (run_queue_hd == END_TSO_QUEUE) {
2587         run_queue_tl = END_TSO_QUEUE;
2588       }
2589     }
2590   }
2591   return t;
2592 }
2593
2594 #endif /* 0 */
2595
2596 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2597 //@subsection Garbage Collextion Routines
2598
2599 /* ---------------------------------------------------------------------------
2600    Where are the roots that we know about?
2601
2602         - all the threads on the runnable queue
2603         - all the threads on the blocked queue
2604         - all the threads on the sleeping queue
2605         - all the thread currently executing a _ccall_GC
2606         - all the "main threads"
2607      
2608    ------------------------------------------------------------------------ */
2609
2610 /* This has to be protected either by the scheduler monitor, or by the
2611         garbage collection monitor (probably the latter).
2612         KH @ 25/10/99
2613 */
2614
2615 void
2616 GetRoots(evac_fn evac)
2617 {
2618 #if defined(GRAN)
2619   {
2620     nat i;
2621     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2622       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2623           evac((StgClosure **)&run_queue_hds[i]);
2624       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2625           evac((StgClosure **)&run_queue_tls[i]);
2626       
2627       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2628           evac((StgClosure **)&blocked_queue_hds[i]);
2629       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2630           evac((StgClosure **)&blocked_queue_tls[i]);
2631       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2632           evac((StgClosure **)&ccalling_threads[i]);
2633     }
2634   }
2635
2636   markEventQueue();
2637
2638 #else /* !GRAN */
2639   if (run_queue_hd != END_TSO_QUEUE) {
2640       ASSERT(run_queue_tl != END_TSO_QUEUE);
2641       evac((StgClosure **)&run_queue_hd);
2642       evac((StgClosure **)&run_queue_tl);
2643   }
2644   
2645   if (blocked_queue_hd != END_TSO_QUEUE) {
2646       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2647       evac((StgClosure **)&blocked_queue_hd);
2648       evac((StgClosure **)&blocked_queue_tl);
2649   }
2650   
2651   if (sleeping_queue != END_TSO_QUEUE) {
2652       evac((StgClosure **)&sleeping_queue);
2653   }
2654 #endif 
2655
2656   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2657       evac((StgClosure **)&suspended_ccalling_threads);
2658   }
2659
2660 #if defined(PAR) || defined(GRAN)
2661   markSparkQueue(evac);
2662 #endif
2663
2664 #if defined(RTS_USER_SIGNALS)
2665   // mark the signal handlers (signals should be already blocked)
2666   markSignalHandlers(evac);
2667 #endif
2668
2669   // main threads which have completed need to be retained until they
2670   // are dealt with in the main scheduler loop.  They won't be
2671   // retained any other way: the GC will drop them from the
2672   // all_threads list, so we have to be careful to treat them as roots
2673   // here.
2674   { 
2675       StgMainThread *m;
2676       for (m = main_threads; m != NULL; m = m->link) {
2677           switch (m->tso->what_next) {
2678           case ThreadComplete:
2679           case ThreadKilled:
2680               evac((StgClosure **)&m->tso);
2681               break;
2682           default:
2683               break;
2684           }
2685       }
2686   }
2687 }
2688
2689 /* -----------------------------------------------------------------------------
2690    performGC
2691
2692    This is the interface to the garbage collector from Haskell land.
2693    We provide this so that external C code can allocate and garbage
2694    collect when called from Haskell via _ccall_GC.
2695
2696    It might be useful to provide an interface whereby the programmer
2697    can specify more roots (ToDo).
2698    
2699    This needs to be protected by the GC condition variable above.  KH.
2700    -------------------------------------------------------------------------- */
2701
2702 static void (*extra_roots)(evac_fn);
2703
2704 void
2705 performGC(void)
2706 {
2707   /* Obligated to hold this lock upon entry */
2708   ACQUIRE_LOCK(&sched_mutex);
2709   GarbageCollect(GetRoots,rtsFalse);
2710   RELEASE_LOCK(&sched_mutex);
2711 }
2712
2713 void
2714 performMajorGC(void)
2715 {
2716   ACQUIRE_LOCK(&sched_mutex);
2717   GarbageCollect(GetRoots,rtsTrue);
2718   RELEASE_LOCK(&sched_mutex);
2719 }
2720
2721 static void
2722 AllRoots(evac_fn evac)
2723 {
2724     GetRoots(evac);             // the scheduler's roots
2725     extra_roots(evac);          // the user's roots
2726 }
2727
2728 void
2729 performGCWithRoots(void (*get_roots)(evac_fn))
2730 {
2731   ACQUIRE_LOCK(&sched_mutex);
2732   extra_roots = get_roots;
2733   GarbageCollect(AllRoots,rtsFalse);
2734   RELEASE_LOCK(&sched_mutex);
2735 }
2736
2737 /* -----------------------------------------------------------------------------
2738    Stack overflow
2739
2740    If the thread has reached its maximum stack size, then raise the
2741    StackOverflow exception in the offending thread.  Otherwise
2742    relocate the TSO into a larger chunk of memory and adjust its stack
2743    size appropriately.
2744    -------------------------------------------------------------------------- */
2745
2746 static StgTSO *
2747 threadStackOverflow(StgTSO *tso)
2748 {
2749   nat new_stack_size, new_tso_size, stack_words;
2750   StgPtr new_sp;
2751   StgTSO *dest;
2752
2753   IF_DEBUG(sanity,checkTSO(tso));
2754   if (tso->stack_size >= tso->max_stack_size) {
2755
2756     IF_DEBUG(gc,
2757              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2758                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2759              /* If we're debugging, just print out the top of the stack */
2760              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2761                                               tso->sp+64)));
2762
2763     /* Send this thread the StackOverflow exception */
2764     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2765     return tso;
2766   }
2767
2768   /* Try to double the current stack size.  If that takes us over the
2769    * maximum stack size for this thread, then use the maximum instead.
2770    * Finally round up so the TSO ends up as a whole number of blocks.
2771    */
2772   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2773   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2774                                        TSO_STRUCT_SIZE)/sizeof(W_);
2775   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2776   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2777
2778   IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2779
2780   dest = (StgTSO *)allocate(new_tso_size);
2781   TICK_ALLOC_TSO(new_stack_size,0);
2782
2783   /* copy the TSO block and the old stack into the new area */
2784   memcpy(dest,tso,TSO_STRUCT_SIZE);
2785   stack_words = tso->stack + tso->stack_size - tso->sp;
2786   new_sp = (P_)dest + new_tso_size - stack_words;
2787   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2788
2789   /* relocate the stack pointers... */
2790   dest->sp         = new_sp;
2791   dest->stack_size = new_stack_size;
2792         
2793   /* Mark the old TSO as relocated.  We have to check for relocated
2794    * TSOs in the garbage collector and any primops that deal with TSOs.
2795    *
2796    * It's important to set the sp value to just beyond the end
2797    * of the stack, so we don't attempt to scavenge any part of the
2798    * dead TSO's stack.
2799    */
2800   tso->what_next = ThreadRelocated;
2801   tso->link = dest;
2802   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2803   tso->why_blocked = NotBlocked;
2804   dest->mut_link = NULL;
2805
2806   IF_PAR_DEBUG(verbose,
2807                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2808                      tso->id, tso, tso->stack_size);
2809                /* If we're debugging, just print out the top of the stack */
2810                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2811                                                 tso->sp+64)));
2812   
2813   IF_DEBUG(sanity,checkTSO(tso));
2814 #if 0
2815   IF_DEBUG(scheduler,printTSO(dest));
2816 #endif
2817
2818   return dest;
2819 }
2820
2821 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2822 //@subsection Blocking Queue Routines
2823
2824 /* ---------------------------------------------------------------------------
2825    Wake up a queue that was blocked on some resource.
2826    ------------------------------------------------------------------------ */
2827
2828 #if defined(GRAN)
2829 STATIC_INLINE void
2830 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2831 {
2832 }
2833 #elif defined(PAR)
2834 STATIC_INLINE void
2835 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2836 {
2837   /* write RESUME events to log file and
2838      update blocked and fetch time (depending on type of the orig closure) */
2839   if (RtsFlags.ParFlags.ParStats.Full) {
2840     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2841                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2842                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2843     if (EMPTY_RUN_QUEUE())
2844       emitSchedule = rtsTrue;
2845
2846     switch (get_itbl(node)->type) {
2847         case FETCH_ME_BQ:
2848           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2849           break;
2850         case RBH:
2851         case FETCH_ME:
2852         case BLACKHOLE_BQ:
2853           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2854           break;
2855 #ifdef DIST
2856         case MVAR:
2857           break;
2858 #endif    
2859         default:
2860           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2861         }
2862       }
2863 }
2864 #endif
2865
2866 #if defined(GRAN)
2867 static StgBlockingQueueElement *
2868 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2869 {
2870     StgTSO *tso;
2871     PEs node_loc, tso_loc;
2872
2873     node_loc = where_is(node); // should be lifted out of loop
2874     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2875     tso_loc = where_is((StgClosure *)tso);
2876     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2877       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2878       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2879       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2880       // insertThread(tso, node_loc);
2881       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2882                 ResumeThread,
2883                 tso, node, (rtsSpark*)NULL);
2884       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2885       // len_local++;
2886       // len++;
2887     } else { // TSO is remote (actually should be FMBQ)
2888       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2889                                   RtsFlags.GranFlags.Costs.gunblocktime +
2890                                   RtsFlags.GranFlags.Costs.latency;
2891       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2892                 UnblockThread,
2893                 tso, node, (rtsSpark*)NULL);
2894       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2895       // len++;
2896     }
2897     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2898     IF_GRAN_DEBUG(bq,
2899                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2900                           (node_loc==tso_loc ? "Local" : "Global"), 
2901                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2902     tso->block_info.closure = NULL;
2903     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2904                              tso->id, tso));
2905 }
2906 #elif defined(PAR)
2907 static StgBlockingQueueElement *
2908 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2909 {
2910     StgBlockingQueueElement *next;
2911
2912     switch (get_itbl(bqe)->type) {
2913     case TSO:
2914       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2915       /* if it's a TSO just push it onto the run_queue */
2916       next = bqe->link;
2917       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2918       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2919       THREAD_RUNNABLE();
2920       unblockCount(bqe, node);
2921       /* reset blocking status after dumping event */
2922       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2923       break;
2924
2925     case BLOCKED_FETCH:
2926       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2927       next = bqe->link;
2928       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2929       PendingFetches = (StgBlockedFetch *)bqe;
2930       break;
2931
2932 # if defined(DEBUG)
2933       /* can ignore this case in a non-debugging setup; 
2934          see comments on RBHSave closures above */
2935     case CONSTR:
2936       /* check that the closure is an RBHSave closure */
2937       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2938              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2939              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2940       break;
2941
2942     default:
2943       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2944            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2945            (StgClosure *)bqe);
2946 # endif
2947     }
2948   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2949   return next;
2950 }
2951
2952 #else /* !GRAN && !PAR */
2953 static StgTSO *
2954 unblockOneLocked(StgTSO *tso)
2955 {
2956   StgTSO *next;
2957
2958   ASSERT(get_itbl(tso)->type == TSO);
2959   ASSERT(tso->why_blocked != NotBlocked);
2960   tso->why_blocked = NotBlocked;
2961   next = tso->link;
2962   PUSH_ON_RUN_QUEUE(tso);
2963   THREAD_RUNNABLE();
2964   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2965   return next;
2966 }
2967 #endif
2968
2969 #if defined(GRAN) || defined(PAR)
2970 INLINE_ME StgBlockingQueueElement *
2971 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2972 {
2973   ACQUIRE_LOCK(&sched_mutex);
2974   bqe = unblockOneLocked(bqe, node);
2975   RELEASE_LOCK(&sched_mutex);
2976   return bqe;
2977 }
2978 #else
2979 INLINE_ME StgTSO *
2980 unblockOne(StgTSO *tso)
2981 {
2982   ACQUIRE_LOCK(&sched_mutex);
2983   tso = unblockOneLocked(tso);
2984   RELEASE_LOCK(&sched_mutex);
2985   return tso;
2986 }
2987 #endif
2988
2989 #if defined(GRAN)
2990 void 
2991 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2992 {
2993   StgBlockingQueueElement *bqe;
2994   PEs node_loc;
2995   nat len = 0; 
2996
2997   IF_GRAN_DEBUG(bq, 
2998                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2999                       node, CurrentProc, CurrentTime[CurrentProc], 
3000                       CurrentTSO->id, CurrentTSO));
3001
3002   node_loc = where_is(node);
3003
3004   ASSERT(q == END_BQ_QUEUE ||
3005          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3006          get_itbl(q)->type == CONSTR); // closure (type constructor)
3007   ASSERT(is_unique(node));
3008
3009   /* FAKE FETCH: magically copy the node to the tso's proc;
3010      no Fetch necessary because in reality the node should not have been 
3011      moved to the other PE in the first place
3012   */
3013   if (CurrentProc!=node_loc) {
3014     IF_GRAN_DEBUG(bq, 
3015                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
3016                         node, node_loc, CurrentProc, CurrentTSO->id, 
3017                         // CurrentTSO, where_is(CurrentTSO),
3018                         node->header.gran.procs));
3019     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3020     IF_GRAN_DEBUG(bq, 
3021                   belch("## new bitmask of node %p is %#x",
3022                         node, node->header.gran.procs));
3023     if (RtsFlags.GranFlags.GranSimStats.Global) {
3024       globalGranStats.tot_fake_fetches++;
3025     }
3026   }
3027
3028   bqe = q;
3029   // ToDo: check: ASSERT(CurrentProc==node_loc);
3030   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3031     //next = bqe->link;
3032     /* 
3033        bqe points to the current element in the queue
3034        next points to the next element in the queue
3035     */
3036     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3037     //tso_loc = where_is(tso);
3038     len++;
3039     bqe = unblockOneLocked(bqe, node);
3040   }
3041
3042   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3043      the closure to make room for the anchor of the BQ */
3044   if (bqe!=END_BQ_QUEUE) {
3045     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3046     /*
3047     ASSERT((info_ptr==&RBH_Save_0_info) ||
3048            (info_ptr==&RBH_Save_1_info) ||
3049            (info_ptr==&RBH_Save_2_info));
3050     */
3051     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3052     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3053     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3054
3055     IF_GRAN_DEBUG(bq,
3056                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
3057                         node, info_type(node)));
3058   }
3059
3060   /* statistics gathering */
3061   if (RtsFlags.GranFlags.GranSimStats.Global) {
3062     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3063     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3064     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3065     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3066   }
3067   IF_GRAN_DEBUG(bq,
3068                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
3069                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3070 }
3071 #elif defined(PAR)
3072 void 
3073 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3074 {
3075   StgBlockingQueueElement *bqe;
3076
3077   ACQUIRE_LOCK(&sched_mutex);
3078
3079   IF_PAR_DEBUG(verbose, 
3080                belch("##-_ AwBQ for node %p on [%x]: ",
3081                      node, mytid));
3082 #ifdef DIST  
3083   //RFP
3084   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3085     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
3086     return;
3087   }
3088 #endif
3089   
3090   ASSERT(q == END_BQ_QUEUE ||
3091          get_itbl(q)->type == TSO ||           
3092          get_itbl(q)->type == BLOCKED_FETCH || 
3093          get_itbl(q)->type == CONSTR); 
3094
3095   bqe = q;
3096   while (get_itbl(bqe)->type==TSO || 
3097          get_itbl(bqe)->type==BLOCKED_FETCH) {
3098     bqe = unblockOneLocked(bqe, node);
3099   }
3100   RELEASE_LOCK(&sched_mutex);
3101 }
3102
3103 #else   /* !GRAN && !PAR */
3104
3105 #ifdef RTS_SUPPORTS_THREADS
3106 void
3107 awakenBlockedQueueNoLock(StgTSO *tso)
3108 {
3109   while (tso != END_TSO_QUEUE) {
3110     tso = unblockOneLocked(tso);
3111   }
3112 }
3113 #endif
3114
3115 void
3116 awakenBlockedQueue(StgTSO *tso)
3117 {
3118   ACQUIRE_LOCK(&sched_mutex);
3119   while (tso != END_TSO_QUEUE) {
3120     tso = unblockOneLocked(tso);
3121   }
3122   RELEASE_LOCK(&sched_mutex);
3123 }
3124 #endif
3125
3126 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
3127 //@subsection Exception Handling Routines
3128
3129 /* ---------------------------------------------------------------------------
3130    Interrupt execution
3131    - usually called inside a signal handler so it mustn't do anything fancy.   
3132    ------------------------------------------------------------------------ */
3133
3134 void
3135 interruptStgRts(void)
3136 {
3137     interrupted    = 1;
3138     context_switch = 1;
3139 #ifdef RTS_SUPPORTS_THREADS
3140     wakeBlockedWorkerThread();
3141 #endif
3142 }
3143
3144 /* -----------------------------------------------------------------------------
3145    Unblock a thread
3146
3147    This is for use when we raise an exception in another thread, which
3148    may be blocked.
3149    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3150    -------------------------------------------------------------------------- */
3151
3152 #if defined(GRAN) || defined(PAR)
3153 /*
3154   NB: only the type of the blocking queue is different in GranSim and GUM
3155       the operations on the queue-elements are the same
3156       long live polymorphism!
3157
3158   Locks: sched_mutex is held upon entry and exit.
3159
3160 */
3161 static void
3162 unblockThread(StgTSO *tso)
3163 {
3164   StgBlockingQueueElement *t, **last;
3165
3166   switch (tso->why_blocked) {
3167
3168   case NotBlocked:
3169     return;  /* not blocked */
3170
3171   case BlockedOnMVar:
3172     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3173     {
3174       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3175       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3176
3177       last = (StgBlockingQueueElement **)&mvar->head;
3178       for (t = (StgBlockingQueueElement *)mvar->head; 
3179            t != END_BQ_QUEUE; 
3180            last = &t->link, last_tso = t, t = t->link) {
3181         if (t == (StgBlockingQueueElement *)tso) {
3182           *last = (StgBlockingQueueElement *)tso->link;
3183           if (mvar->tail == tso) {
3184             mvar->tail = (StgTSO *)last_tso;
3185           }
3186           goto done;
3187         }
3188       }
3189       barf("unblockThread (MVAR): TSO not found");
3190     }
3191
3192   case BlockedOnBlackHole:
3193     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3194     {
3195       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3196
3197       last = &bq->blocking_queue;
3198       for (t = bq->blocking_queue; 
3199            t != END_BQ_QUEUE; 
3200            last = &t->link, t = t->link) {
3201         if (t == (StgBlockingQueueElement *)tso) {
3202           *last = (StgBlockingQueueElement *)tso->link;
3203           goto done;
3204         }
3205       }
3206       barf("unblockThread (BLACKHOLE): TSO not found");
3207     }
3208
3209   case BlockedOnException:
3210     {
3211       StgTSO *target  = tso->block_info.tso;
3212
3213       ASSERT(get_itbl(target)->type == TSO);
3214
3215       if (target->what_next == ThreadRelocated) {
3216           target = target->link;
3217           ASSERT(get_itbl(target)->type == TSO);
3218       }
3219
3220       ASSERT(target->blocked_exceptions != NULL);
3221
3222       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3223       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3224            t != END_BQ_QUEUE; 
3225            last = &t->link, t = t->link) {
3226         ASSERT(get_itbl(t)->type == TSO);
3227         if (t == (StgBlockingQueueElement *)tso) {
3228           *last = (StgBlockingQueueElement *)tso->link;
3229           goto done;
3230         }
3231       }
3232       barf("unblockThread (Exception): TSO not found");
3233     }
3234
3235   case BlockedOnRead:
3236   case BlockedOnWrite:
3237 #if defined(mingw32_TARGET_OS)
3238   case BlockedOnDoProc:
3239 #endif
3240     {
3241       /* take TSO off blocked_queue */
3242       StgBlockingQueueElement *prev = NULL;
3243       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3244            prev = t, t = t->link) {
3245         if (t == (StgBlockingQueueElement *)tso) {
3246           if (prev == NULL) {
3247             blocked_queue_hd = (StgTSO *)t->link;
3248             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3249               blocked_queue_tl = END_TSO_QUEUE;
3250             }
3251           } else {
3252             prev->link = t->link;
3253             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3254               blocked_queue_tl = (StgTSO *)prev;
3255             }
3256           }
3257           goto done;
3258         }
3259       }
3260       barf("unblockThread (I/O): TSO not found");
3261     }
3262
3263   case BlockedOnDelay:
3264     {
3265       /* take TSO off sleeping_queue */
3266       StgBlockingQueueElement *prev = NULL;
3267       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3268            prev = t, t = t->link) {
3269         if (t == (StgBlockingQueueElement *)tso) {
3270           if (prev == NULL) {
3271             sleeping_queue = (StgTSO *)t->link;
3272           } else {
3273             prev->link = t->link;
3274           }
3275           goto done;
3276         }
3277       }
3278       barf("unblockThread (delay): TSO not found");
3279     }
3280
3281   default:
3282     barf("unblockThread");
3283   }
3284
3285  done:
3286   tso->link = END_TSO_QUEUE;
3287   tso->why_blocked = NotBlocked;
3288   tso->block_info.closure = NULL;
3289   PUSH_ON_RUN_QUEUE(tso);
3290 }
3291 #else
3292 static void
3293 unblockThread(StgTSO *tso)
3294 {
3295   StgTSO *t, **last;
3296   
3297   /* To avoid locking unnecessarily. */
3298   if (tso->why_blocked == NotBlocked) {
3299     return;
3300   }
3301
3302   switch (tso->why_blocked) {
3303
3304   case BlockedOnMVar:
3305     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3306     {
3307       StgTSO *last_tso = END_TSO_QUEUE;
3308       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3309
3310       last = &mvar->head;
3311       for (t = mvar->head; t != END_TSO_QUEUE; 
3312            last = &t->link, last_tso = t, t = t->link) {
3313         if (t == tso) {
3314           *last = tso->link;
3315           if (mvar->tail == tso) {
3316             mvar->tail = last_tso;
3317           }
3318           goto done;
3319         }
3320       }
3321       barf("unblockThread (MVAR): TSO not found");
3322     }
3323
3324   case BlockedOnBlackHole:
3325     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3326     {
3327       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3328
3329       last = &bq->blocking_queue;
3330       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3331            last = &t->link, t = t->link) {
3332         if (t == tso) {
3333           *last = tso->link;
3334           goto done;
3335         }
3336       }
3337       barf("unblockThread (BLACKHOLE): TSO not found");
3338     }
3339
3340   case BlockedOnException:
3341     {
3342       StgTSO *target  = tso->block_info.tso;
3343
3344       ASSERT(get_itbl(target)->type == TSO);
3345
3346       while (target->what_next == ThreadRelocated) {
3347           target = target->link;
3348           ASSERT(get_itbl(target)->type == TSO);
3349       }
3350       
3351       ASSERT(target->blocked_exceptions != NULL);
3352
3353       last = &target->blocked_exceptions;
3354       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3355            last = &t->link, t = t->link) {
3356         ASSERT(get_itbl(t)->type == TSO);
3357         if (t == tso) {
3358           *last = tso->link;
3359           goto done;
3360         }
3361       }
3362       barf("unblockThread (Exception): TSO not found");
3363     }
3364
3365   case BlockedOnRead:
3366   case BlockedOnWrite:
3367 #if defined(mingw32_TARGET_OS)
3368   case BlockedOnDoProc:
3369 #endif
3370     {
3371       StgTSO *prev = NULL;
3372       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3373            prev = t, t = t->link) {
3374         if (t == tso) {
3375           if (prev == NULL) {
3376             blocked_queue_hd = t->link;
3377             if (blocked_queue_tl == t) {
3378               blocked_queue_tl = END_TSO_QUEUE;
3379             }
3380           } else {
3381             prev->link = t->link;
3382             if (blocked_queue_tl == t) {
3383               blocked_queue_tl = prev;
3384             }
3385           }
3386           goto done;
3387         }
3388       }
3389       barf("unblockThread (I/O): TSO not found");
3390     }
3391
3392   case BlockedOnDelay:
3393     {
3394       StgTSO *prev = NULL;
3395       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3396            prev = t, t = t->link) {
3397         if (t == tso) {
3398           if (prev == NULL) {
3399             sleeping_queue = t->link;
3400           } else {
3401             prev->link = t->link;
3402           }
3403           goto done;
3404         }
3405       }
3406       barf("unblockThread (delay): TSO not found");
3407     }
3408
3409   default:
3410     barf("unblockThread");
3411   }
3412
3413  done:
3414   tso->link = END_TSO_QUEUE;
3415   tso->why_blocked = NotBlocked;
3416   tso->block_info.closure = NULL;
3417   PUSH_ON_RUN_QUEUE(tso);
3418 }
3419 #endif
3420
3421 /* -----------------------------------------------------------------------------
3422  * raiseAsync()
3423  *
3424  * The following function implements the magic for raising an
3425  * asynchronous exception in an existing thread.
3426  *
3427  * We first remove the thread from any queue on which it might be
3428  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3429  *
3430  * We strip the stack down to the innermost CATCH_FRAME, building
3431  * thunks in the heap for all the active computations, so they can 
3432  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3433  * an application of the handler to the exception, and push it on
3434  * the top of the stack.
3435  * 
3436  * How exactly do we save all the active computations?  We create an
3437  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3438  * AP_STACKs pushes everything from the corresponding update frame
3439  * upwards onto the stack.  (Actually, it pushes everything up to the
3440  * next update frame plus a pointer to the next AP_STACK object.
3441  * Entering the next AP_STACK object pushes more onto the stack until we
3442  * reach the last AP_STACK object - at which point the stack should look
3443  * exactly as it did when we killed the TSO and we can continue
3444  * execution by entering the closure on top of the stack.
3445  *
3446  * We can also kill a thread entirely - this happens if either (a) the 
3447  * exception passed to raiseAsync is NULL, or (b) there's no
3448  * CATCH_FRAME on the stack.  In either case, we strip the entire
3449  * stack and replace the thread with a zombie.
3450  *
3451  * Locks: sched_mutex held upon entry nor exit.
3452  *
3453  * -------------------------------------------------------------------------- */
3454  
3455 void 
3456 deleteThread(StgTSO *tso)
3457 {
3458   raiseAsync(tso,NULL);
3459 }
3460
3461 static void 
3462 deleteThreadImmediately(StgTSO *tso)
3463 { // for forkProcess only:
3464   // delete thread without giving it a chance to catch the KillThread exception
3465
3466   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3467       return;
3468   }
3469 #if defined(RTS_SUPPORTS_THREADS)
3470   if (tso->why_blocked != BlockedOnCCall
3471       && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
3472 #endif
3473     unblockThread(tso);
3474   tso->what_next = ThreadKilled;
3475 }
3476
3477 void
3478 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3479 {
3480   /* When raising async exs from contexts where sched_mutex isn't held;
3481      use raiseAsyncWithLock(). */
3482   ACQUIRE_LOCK(&sched_mutex);
3483   raiseAsync(tso,exception);
3484   RELEASE_LOCK(&sched_mutex);
3485 }
3486
3487 void
3488 raiseAsync(StgTSO *tso, StgClosure *exception)
3489 {
3490     StgRetInfoTable *info;
3491     StgPtr sp;
3492   
3493     // Thread already dead?
3494     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3495         return;
3496     }
3497
3498     IF_DEBUG(scheduler, 
3499              sched_belch("raising exception in thread %ld.", tso->id));
3500     
3501     // Remove it from any blocking queues
3502     unblockThread(tso);
3503
3504     sp = tso->sp;
3505     
3506     // The stack freezing code assumes there's a closure pointer on
3507     // the top of the stack, so we have to arrange that this is the case...
3508     //
3509     if (sp[0] == (W_)&stg_enter_info) {
3510         sp++;
3511     } else {
3512         sp--;
3513         sp[0] = (W_)&stg_dummy_ret_closure;
3514     }
3515
3516     while (1) {
3517         nat i;
3518
3519         // 1. Let the top of the stack be the "current closure"
3520         //
3521         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3522         // CATCH_FRAME.
3523         //
3524         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3525         // current closure applied to the chunk of stack up to (but not
3526         // including) the update frame.  This closure becomes the "current
3527         // closure".  Go back to step 2.
3528         //
3529         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3530         // top of the stack applied to the exception.
3531         // 
3532         // 5. If it's a STOP_FRAME, then kill the thread.
3533         
3534         StgPtr frame;
3535         
3536         frame = sp + 1;
3537         info = get_ret_itbl((StgClosure *)frame);
3538         
3539         while (info->i.type != UPDATE_FRAME
3540                && (info->i.type != CATCH_FRAME || exception == NULL)
3541                && info->i.type != STOP_FRAME) {
3542             frame += stack_frame_sizeW((StgClosure *)frame);
3543             info = get_ret_itbl((StgClosure *)frame);
3544         }
3545         
3546         switch (info->i.type) {
3547             
3548         case CATCH_FRAME:
3549             // If we find a CATCH_FRAME, and we've got an exception to raise,
3550             // then build the THUNK raise(exception), and leave it on
3551             // top of the CATCH_FRAME ready to enter.
3552             //
3553         {
3554 #ifdef PROFILING
3555             StgCatchFrame *cf = (StgCatchFrame *)frame;
3556 #endif
3557             StgClosure *raise;
3558             
3559             // we've got an exception to raise, so let's pass it to the
3560             // handler in this frame.
3561             //
3562             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3563             TICK_ALLOC_SE_THK(1,0);
3564             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3565             raise->payload[0] = exception;
3566             
3567             // throw away the stack from Sp up to the CATCH_FRAME.
3568             //
3569             sp = frame - 1;
3570             
3571             /* Ensure that async excpetions are blocked now, so we don't get
3572              * a surprise exception before we get around to executing the
3573              * handler.
3574              */
3575             if (tso->blocked_exceptions == NULL) {
3576                 tso->blocked_exceptions = END_TSO_QUEUE;
3577             }
3578             
3579             /* Put the newly-built THUNK on top of the stack, ready to execute
3580              * when the thread restarts.
3581              */
3582             sp[0] = (W_)raise;
3583             sp[-1] = (W_)&stg_enter_info;
3584             tso->sp = sp-1;
3585             tso->what_next = ThreadRunGHC;
3586             IF_DEBUG(sanity, checkTSO(tso));
3587             return;
3588         }
3589         
3590         case UPDATE_FRAME:
3591         {
3592             StgAP_STACK * ap;
3593             nat words;
3594             
3595             // First build an AP_STACK consisting of the stack chunk above the
3596             // current update frame, with the top word on the stack as the
3597             // fun field.
3598             //
3599             words = frame - sp - 1;
3600             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3601             
3602             ap->size = words;
3603             ap->fun  = (StgClosure *)sp[0];
3604             sp++;
3605             for(i=0; i < (nat)words; ++i) {
3606                 ap->payload[i] = (StgClosure *)*sp++;
3607             }
3608             
3609             SET_HDR(ap,&stg_AP_STACK_info,
3610                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3611             TICK_ALLOC_UP_THK(words+1,0);
3612             
3613             IF_DEBUG(scheduler,
3614                      fprintf(stderr,  "sched: Updating ");
3615                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3616                      fprintf(stderr,  " with ");
3617                      printObj((StgClosure *)ap);
3618                 );
3619
3620             // Replace the updatee with an indirection - happily
3621             // this will also wake up any threads currently
3622             // waiting on the result.
3623             //
3624             // Warning: if we're in a loop, more than one update frame on
3625             // the stack may point to the same object.  Be careful not to
3626             // overwrite an IND_OLDGEN in this case, because we'll screw
3627             // up the mutable lists.  To be on the safe side, don't
3628             // overwrite any kind of indirection at all.  See also
3629             // threadSqueezeStack in GC.c, where we have to make a similar
3630             // check.
3631             //
3632             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3633                 // revert the black hole
3634                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3635             }
3636             sp += sizeofW(StgUpdateFrame) - 1;
3637             sp[0] = (W_)ap; // push onto stack
3638             break;
3639         }
3640         
3641         case STOP_FRAME:
3642             // We've stripped the entire stack, the thread is now dead.
3643             sp += sizeofW(StgStopFrame);
3644             tso->what_next = ThreadKilled;
3645             tso->sp = sp;
3646             return;
3647             
3648         default:
3649             barf("raiseAsync");
3650         }
3651     }
3652     barf("raiseAsync");
3653 }
3654
3655 /* -----------------------------------------------------------------------------
3656    resurrectThreads is called after garbage collection on the list of
3657    threads found to be garbage.  Each of these threads will be woken
3658    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3659    on an MVar, or NonTermination if the thread was blocked on a Black
3660    Hole.
3661
3662    Locks: sched_mutex isn't held upon entry nor exit.
3663    -------------------------------------------------------------------------- */
3664
3665 void
3666 resurrectThreads( StgTSO *threads )
3667 {
3668   StgTSO *tso, *next;
3669
3670   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3671     next = tso->global_link;
3672     tso->global_link = all_threads;
3673     all_threads = tso;
3674     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3675
3676     switch (tso->why_blocked) {
3677     case BlockedOnMVar:
3678     case BlockedOnException:
3679       /* Called by GC - sched_mutex lock is currently held. */
3680       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3681       break;
3682     case BlockedOnBlackHole:
3683       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3684       break;
3685     case NotBlocked:
3686       /* This might happen if the thread was blocked on a black hole
3687        * belonging to a thread that we've just woken up (raiseAsync
3688        * can wake up threads, remember...).
3689        */
3690       continue;
3691     default:
3692       barf("resurrectThreads: thread blocked in a strange way");
3693     }
3694   }
3695 }
3696
3697 /* -----------------------------------------------------------------------------
3698  * Blackhole detection: if we reach a deadlock, test whether any
3699  * threads are blocked on themselves.  Any threads which are found to
3700  * be self-blocked get sent a NonTermination exception.
3701  *
3702  * This is only done in a deadlock situation in order to avoid
3703  * performance overhead in the normal case.
3704  *
3705  * Locks: sched_mutex is held upon entry and exit.
3706  * -------------------------------------------------------------------------- */
3707
3708 static void
3709 detectBlackHoles( void )
3710 {
3711     StgTSO *tso = all_threads;
3712     StgClosure *frame;
3713     StgClosure *blocked_on;
3714     StgRetInfoTable *info;
3715
3716     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3717
3718         while (tso->what_next == ThreadRelocated) {
3719             tso = tso->link;
3720             ASSERT(get_itbl(tso)->type == TSO);
3721         }
3722       
3723         if (tso->why_blocked != BlockedOnBlackHole) {
3724             continue;
3725         }
3726         blocked_on = tso->block_info.closure;
3727
3728         frame = (StgClosure *)tso->sp;
3729
3730         while(1) {
3731             info = get_ret_itbl(frame);
3732             switch (info->i.type) {
3733             case UPDATE_FRAME:
3734                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3735                     /* We are blocking on one of our own computations, so
3736                      * send this thread the NonTermination exception.  
3737                      */
3738                     IF_DEBUG(scheduler, 
3739                              sched_belch("thread %d is blocked on itself", tso->id));
3740                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3741                     goto done;
3742                 }
3743                 
3744                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3745                 continue;
3746
3747             case STOP_FRAME:
3748                 goto done;
3749
3750                 // normal stack frames; do nothing except advance the pointer
3751             default:
3752                 (StgPtr)frame += stack_frame_sizeW(frame);
3753             }
3754         }   
3755         done: ;
3756     }
3757 }
3758
3759 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3760 //@subsection Debugging Routines
3761
3762 /* -----------------------------------------------------------------------------
3763  * Debugging: why is a thread blocked
3764  * [Also provides useful information when debugging threaded programs
3765  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3766    -------------------------------------------------------------------------- */
3767
3768 static
3769 void
3770 printThreadBlockage(StgTSO *tso)
3771 {
3772   switch (tso->why_blocked) {
3773   case BlockedOnRead:
3774     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3775     break;
3776   case BlockedOnWrite:
3777     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3778     break;
3779 #if defined(mingw32_TARGET_OS)
3780     case BlockedOnDoProc:
3781     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3782     break;
3783 #endif
3784   case BlockedOnDelay:
3785     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3786     break;
3787   case BlockedOnMVar:
3788     fprintf(stderr,"is blocked on an MVar");
3789     break;
3790   case BlockedOnException:
3791     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3792             tso->block_info.tso->id);
3793     break;
3794   case BlockedOnBlackHole:
3795     fprintf(stderr,"is blocked on a black hole");
3796     break;
3797   case NotBlocked:
3798     fprintf(stderr,"is not blocked");
3799     break;
3800 #if defined(PAR)
3801   case BlockedOnGA:
3802     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3803             tso->block_info.closure, info_type(tso->block_info.closure));
3804     break;
3805   case BlockedOnGA_NoSend:
3806     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3807             tso->block_info.closure, info_type(tso->block_info.closure));
3808     break;
3809 #endif
3810 #if defined(RTS_SUPPORTS_THREADS)
3811   case BlockedOnCCall:
3812     fprintf(stderr,"is blocked on an external call");
3813     break;
3814   case BlockedOnCCall_NoUnblockExc:
3815     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3816     break;
3817 #endif
3818   default:
3819     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3820          tso->why_blocked, tso->id, tso);
3821   }
3822 }
3823
3824 static
3825 void
3826 printThreadStatus(StgTSO *tso)
3827 {
3828   switch (tso->what_next) {
3829   case ThreadKilled:
3830     fprintf(stderr,"has been killed");
3831     break;
3832   case ThreadComplete:
3833     fprintf(stderr,"has completed");
3834     break;
3835   default:
3836     printThreadBlockage(tso);
3837   }
3838 }
3839
3840 void
3841 printAllThreads(void)
3842 {
3843   StgTSO *t;
3844   void *label;
3845
3846 # if defined(GRAN)
3847   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3848   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3849                        time_string, rtsFalse/*no commas!*/);
3850
3851   fprintf(stderr, "all threads at [%s]:\n", time_string);
3852 # elif defined(PAR)
3853   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3854   ullong_format_string(CURRENT_TIME,
3855                        time_string, rtsFalse/*no commas!*/);
3856
3857   fprintf(stderr,"all threads at [%s]:\n", time_string);
3858 # else
3859   fprintf(stderr,"all threads:\n");
3860 # endif
3861
3862   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3863     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3864     label = lookupThreadLabel(t->id);
3865     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3866     printThreadStatus(t);
3867     fprintf(stderr,"\n");
3868   }
3869 }
3870     
3871 #ifdef DEBUG
3872
3873 /* 
3874    Print a whole blocking queue attached to node (debugging only).
3875 */
3876 //@cindex print_bq
3877 # if defined(PAR)
3878 void 
3879 print_bq (StgClosure *node)
3880 {
3881   StgBlockingQueueElement *bqe;
3882   StgTSO *tso;
3883   rtsBool end;
3884
3885   fprintf(stderr,"## BQ of closure %p (%s): ",
3886           node, info_type(node));
3887
3888   /* should cover all closures that may have a blocking queue */
3889   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3890          get_itbl(node)->type == FETCH_ME_BQ ||
3891          get_itbl(node)->type == RBH ||
3892          get_itbl(node)->type == MVAR);
3893     
3894   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3895
3896   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3897 }
3898
3899 /* 
3900    Print a whole blocking queue starting with the element bqe.
3901 */
3902 void 
3903 print_bqe (StgBlockingQueueElement *bqe)
3904 {
3905   rtsBool end;
3906
3907   /* 
3908      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3909   */
3910   for (end = (bqe==END_BQ_QUEUE);
3911        !end; // iterate until bqe points to a CONSTR
3912        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3913        bqe = end ? END_BQ_QUEUE : bqe->link) {
3914     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3915     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3916     /* types of closures that may appear in a blocking queue */
3917     ASSERT(get_itbl(bqe)->type == TSO ||           
3918            get_itbl(bqe)->type == BLOCKED_FETCH || 
3919            get_itbl(bqe)->type == CONSTR); 
3920     /* only BQs of an RBH end with an RBH_Save closure */
3921     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3922
3923     switch (get_itbl(bqe)->type) {
3924     case TSO:
3925       fprintf(stderr," TSO %u (%x),",
3926               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3927       break;
3928     case BLOCKED_FETCH:
3929       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3930               ((StgBlockedFetch *)bqe)->node, 
3931               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3932               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3933               ((StgBlockedFetch *)bqe)->ga.weight);
3934       break;
3935     case CONSTR:
3936       fprintf(stderr," %s (IP %p),",
3937               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3938                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3939                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3940                "RBH_Save_?"), get_itbl(bqe));
3941       break;
3942     default:
3943       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3944            info_type((StgClosure *)bqe)); // , node, info_type(node));
3945       break;
3946     }
3947   } /* for */
3948   fputc('\n', stderr);
3949 }
3950 # elif defined(GRAN)
3951 void 
3952 print_bq (StgClosure *node)
3953 {
3954   StgBlockingQueueElement *bqe;
3955   PEs node_loc, tso_loc;
3956   rtsBool end;
3957
3958   /* should cover all closures that may have a blocking queue */
3959   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3960          get_itbl(node)->type == FETCH_ME_BQ ||
3961          get_itbl(node)->type == RBH);
3962     
3963   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3964   node_loc = where_is(node);
3965
3966   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3967           node, info_type(node), node_loc);
3968
3969   /* 
3970      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3971   */
3972   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3973        !end; // iterate until bqe points to a CONSTR
3974        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3975     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3976     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3977     /* types of closures that may appear in a blocking queue */
3978     ASSERT(get_itbl(bqe)->type == TSO ||           
3979            get_itbl(bqe)->type == CONSTR); 
3980     /* only BQs of an RBH end with an RBH_Save closure */
3981     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3982
3983     tso_loc = where_is((StgClosure *)bqe);
3984     switch (get_itbl(bqe)->type) {
3985     case TSO:
3986       fprintf(stderr," TSO %d (%p) on [PE %d],",
3987               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3988       break;
3989     case CONSTR:
3990       fprintf(stderr," %s (IP %p),",
3991               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3992                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3993                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3994                "RBH_Save_?"), get_itbl(bqe));
3995       break;
3996     default:
3997       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3998            info_type((StgClosure *)bqe), node, info_type(node));
3999       break;
4000     }
4001   } /* for */
4002   fputc('\n', stderr);
4003 }
4004 #else
4005 /* 
4006    Nice and easy: only TSOs on the blocking queue
4007 */
4008 void 
4009 print_bq (StgClosure *node)
4010 {
4011   StgTSO *tso;
4012
4013   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4014   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
4015        tso != END_TSO_QUEUE; 
4016        tso=tso->link) {
4017     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
4018     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
4019     fprintf(stderr," TSO %d (%p),", tso->id, tso);
4020   }
4021   fputc('\n', stderr);
4022 }
4023 # endif
4024
4025 #if defined(PAR)
4026 static nat
4027 run_queue_len(void)
4028 {
4029   nat i;
4030   StgTSO *tso;
4031
4032   for (i=0, tso=run_queue_hd; 
4033        tso != END_TSO_QUEUE;
4034        i++, tso=tso->link)
4035     /* nothing */
4036
4037   return i;
4038 }
4039 #endif
4040
4041 void
4042 sched_belch(char *s, ...)
4043 {
4044   va_list ap;
4045   va_start(ap,s);
4046 #ifdef RTS_SUPPORTS_THREADS
4047   fprintf(stderr, "sched (task %p): ", osThreadId());
4048 #elif defined(PAR)
4049   fprintf(stderr, "== ");
4050 #else
4051   fprintf(stderr, "sched: ");
4052 #endif
4053   vfprintf(stderr, s, ap);
4054   fprintf(stderr, "\n");
4055   fflush(stderr);
4056   va_end(ap);
4057 }
4058
4059 #endif /* DEBUG */
4060
4061
4062 //@node Index,  , Debugging Routines, Main scheduling code
4063 //@subsection Index
4064
4065 //@index
4066 //* StgMainThread::  @cindex\s-+StgMainThread
4067 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
4068 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
4069 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
4070 //* context_switch::  @cindex\s-+context_switch
4071 //* createThread::  @cindex\s-+createThread
4072 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
4073 //* initScheduler::  @cindex\s-+initScheduler
4074 //* interrupted::  @cindex\s-+interrupted
4075 //* next_thread_id::  @cindex\s-+next_thread_id
4076 //* print_bq::  @cindex\s-+print_bq
4077 //* run_queue_hd::  @cindex\s-+run_queue_hd
4078 //* run_queue_tl::  @cindex\s-+run_queue_tl
4079 //* sched_mutex::  @cindex\s-+sched_mutex
4080 //* schedule::  @cindex\s-+schedule
4081 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
4082 //* term_mutex::  @cindex\s-+term_mutex
4083 //@end index