[project @ 2003-11-02 06:55:24 by dons]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.179 2003/10/05 20:18:36 panne Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #define COMPILING_SCHEDULER
88 #include "Schedule.h"
89 #include "StgMiscClosures.h"
90 #include "Storage.h"
91 #include "Interpreter.h"
92 #include "Exception.h"
93 #include "Printer.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Timer.h"
98 #include "Prelude.h"
99 #include "ThreadLabels.h"
100 #ifdef PROFILING
101 #include "Proftimer.h"
102 #include "ProfHeap.h"
103 #endif
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
111 # include "HLC.h"
112 #endif
113 #include "Sparks.h"
114 #include "Capability.h"
115 #include "OSThreads.h"
116 #include  "Task.h"
117
118 #ifdef HAVE_SYS_TYPES_H
119 #include <sys/types.h>
120 #endif
121 #ifdef HAVE_UNISTD_H
122 #include <unistd.h>
123 #endif
124
125 #include <string.h>
126 #include <stdlib.h>
127 #include <stdarg.h>
128
129 #ifdef HAVE_ERRNO_H
130 #include <errno.h>
131 #endif
132
133 #ifdef THREADED_RTS
134 #define USED_IN_THREADED_RTS
135 #else
136 #define USED_IN_THREADED_RTS STG_UNUSED
137 #endif
138
139 #ifdef RTS_SUPPORTS_THREADS
140 #define USED_WHEN_RTS_SUPPORTS_THREADS
141 #else
142 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
143 #endif
144
145 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
146 //@subsection Variables and Data structures
147
148 /* Main thread queue.
149  * Locks required: sched_mutex.
150  */
151 StgMainThread *main_threads = NULL;
152
153 /* Thread queues.
154  * Locks required: sched_mutex.
155  */
156 #if defined(GRAN)
157
158 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
159 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
160
161 /* 
162    In GranSim we have a runnable and a blocked queue for each processor.
163    In order to minimise code changes new arrays run_queue_hds/tls
164    are created. run_queue_hd is then a short cut (macro) for
165    run_queue_hds[CurrentProc] (see GranSim.h).
166    -- HWL
167 */
168 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
169 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
170 StgTSO *ccalling_threadss[MAX_PROC];
171 /* We use the same global list of threads (all_threads) in GranSim as in
172    the std RTS (i.e. we are cheating). However, we don't use this list in
173    the GranSim specific code at the moment (so we are only potentially
174    cheating).  */
175
176 #else /* !GRAN */
177
178 StgTSO *run_queue_hd = NULL;
179 StgTSO *run_queue_tl = NULL;
180 StgTSO *blocked_queue_hd = NULL;
181 StgTSO *blocked_queue_tl = NULL;
182 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
183
184 #endif
185
186 /* Linked list of all threads.
187  * Used for detecting garbage collected threads.
188  */
189 StgTSO *all_threads = NULL;
190
191 /* When a thread performs a safe C call (_ccall_GC, using old
192  * terminology), it gets put on the suspended_ccalling_threads
193  * list. Used by the garbage collector.
194  */
195 static StgTSO *suspended_ccalling_threads;
196
197 static StgTSO *threadStackOverflow(StgTSO *tso);
198
199 /* KH: The following two flags are shared memory locations.  There is no need
200        to lock them, since they are only unset at the end of a scheduler
201        operation.
202 */
203
204 /* flag set by signal handler to precipitate a context switch */
205 //@cindex context_switch
206 nat context_switch = 0;
207
208 /* if this flag is set as well, give up execution */
209 //@cindex interrupted
210 rtsBool interrupted = rtsFalse;
211
212 /* Next thread ID to allocate.
213  * Locks required: thread_id_mutex
214  */
215 //@cindex next_thread_id
216 static StgThreadID next_thread_id = 1;
217
218 /*
219  * Pointers to the state of the current thread.
220  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
221  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
222  */
223  
224 /* The smallest stack size that makes any sense is:
225  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
226  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
227  *  + 1                       (the closure to enter)
228  *  + 1                       (stg_ap_v_ret)
229  *  + 1                       (spare slot req'd by stg_ap_v_ret)
230  *
231  * A thread with this stack will bomb immediately with a stack
232  * overflow, which will increase its stack size.  
233  */
234
235 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
236
237
238 #if defined(GRAN)
239 StgTSO *CurrentTSO;
240 #endif
241
242 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
243  *  exists - earlier gccs apparently didn't.
244  *  -= chak
245  */
246 StgTSO dummy_tso;
247
248 static rtsBool ready_to_gc;
249
250 /*
251  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
252  * in an MT setting, needed to signal that a worker thread shouldn't hang around
253  * in the scheduler when it is out of work.
254  */
255 static rtsBool shutting_down_scheduler = rtsFalse;
256
257 void            addToBlockedQueue ( StgTSO *tso );
258
259 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
260        void     interruptStgRts   ( void );
261
262 static void     detectBlackHoles  ( void );
263
264 #ifdef DEBUG
265 static void sched_belch(char *s, ...);
266 #endif
267
268 #if defined(RTS_SUPPORTS_THREADS)
269 /* ToDo: carefully document the invariants that go together
270  *       with these synchronisation objects.
271  */
272 Mutex     sched_mutex       = INIT_MUTEX_VAR;
273 Mutex     term_mutex        = INIT_MUTEX_VAR;
274
275 /*
276  * A heavyweight solution to the problem of protecting
277  * the thread_id from concurrent update.
278  */
279 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
280
281
282 # if defined(SMP)
283 static Condition gc_pending_cond = INIT_COND_VAR;
284 nat await_death;
285 # endif
286
287 #endif /* RTS_SUPPORTS_THREADS */
288
289 #if defined(PAR)
290 StgTSO *LastTSO;
291 rtsTime TimeOfLastYield;
292 rtsBool emitSchedule = rtsTrue;
293 #endif
294
295 #if DEBUG
296 static char *whatNext_strs[] = {
297   "ThreadRunGHC",
298   "ThreadInterpret",
299   "ThreadKilled",
300   "ThreadRelocated",
301   "ThreadComplete"
302 };
303 #endif
304
305 #if defined(PAR)
306 StgTSO * createSparkThread(rtsSpark spark);
307 StgTSO * activateSpark (rtsSpark spark);  
308 #endif
309
310 /*
311  * The thread state for the main thread.
312 // ToDo: check whether not needed any more
313 StgTSO   *MainTSO;
314  */
315
316 #if defined(RTS_SUPPORTS_THREADS)
317 static rtsBool startingWorkerThread = rtsFalse;
318
319 static void taskStart(void);
320 static void
321 taskStart(void)
322 {
323   Capability *cap;
324   
325   ACQUIRE_LOCK(&sched_mutex);
326   startingWorkerThread = rtsFalse;
327   waitForWorkCapability(&sched_mutex, &cap, NULL);
328   RELEASE_LOCK(&sched_mutex);
329   
330   schedule(NULL,cap);
331 }
332
333 void
334 startSchedulerTaskIfNecessary(void)
335 {
336   if(run_queue_hd != END_TSO_QUEUE
337     || blocked_queue_hd != END_TSO_QUEUE
338     || sleeping_queue != END_TSO_QUEUE)
339   {
340     if(!startingWorkerThread)
341     { // we don't want to start another worker thread
342       // just because the last one hasn't yet reached the
343       // "waiting for capability" state
344       startingWorkerThread = rtsTrue;
345       startTask(taskStart);
346     }
347   }
348 }
349 #endif
350
351 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
352 //@subsection Main scheduling loop
353
354 /* ---------------------------------------------------------------------------
355    Main scheduling loop.
356
357    We use round-robin scheduling, each thread returning to the
358    scheduler loop when one of these conditions is detected:
359
360       * out of heap space
361       * timer expires (thread yields)
362       * thread blocks
363       * thread ends
364       * stack overflow
365
366    Locking notes:  we acquire the scheduler lock once at the beginning
367    of the scheduler loop, and release it when
368     
369       * running a thread, or
370       * waiting for work, or
371       * waiting for a GC to complete.
372
373    GRAN version:
374      In a GranSim setup this loop iterates over the global event queue.
375      This revolves around the global event queue, which determines what 
376      to do next. Therefore, it's more complicated than either the 
377      concurrent or the parallel (GUM) setup.
378
379    GUM version:
380      GUM iterates over incoming messages.
381      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
382      and sends out a fish whenever it has nothing to do; in-between
383      doing the actual reductions (shared code below) it processes the
384      incoming messages and deals with delayed operations 
385      (see PendingFetches).
386      This is not the ugliest code you could imagine, but it's bloody close.
387
388    ------------------------------------------------------------------------ */
389 //@cindex schedule
390 static void
391 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
392           Capability *initialCapability )
393 {
394   StgTSO *t;
395   Capability *cap = initialCapability;
396   StgThreadReturnCode ret;
397 #if defined(GRAN)
398   rtsEvent *event;
399 #elif defined(PAR)
400   StgSparkPool *pool;
401   rtsSpark spark;
402   StgTSO *tso;
403   GlobalTaskId pe;
404   rtsBool receivedFinish = rtsFalse;
405 # if defined(DEBUG)
406   nat tp_size, sp_size; // stats only
407 # endif
408 #endif
409   rtsBool was_interrupted = rtsFalse;
410   StgTSOWhatNext prev_what_next;
411   
412   ACQUIRE_LOCK(&sched_mutex);
413  
414 #if defined(RTS_SUPPORTS_THREADS)
415   /* in the threaded case, the capability is either passed in via the initialCapability
416      parameter, or initialized inside the scheduler loop */
417
418   IF_DEBUG(scheduler,
419     fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n",
420             osThreadId(), osThreadId()));
421   IF_DEBUG(scheduler,
422     fprintf(stderr,"### main thread: %p\n",mainThread));
423   IF_DEBUG(scheduler,
424     fprintf(stderr,"### initial cap: %p\n",initialCapability));
425 #else
426   /* simply initialise it in the non-threaded case */
427   grabCapability(&cap);
428 #endif
429
430 #if defined(GRAN)
431   /* set up first event to get things going */
432   /* ToDo: assign costs for system setup and init MainTSO ! */
433   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
434             ContinueThread, 
435             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
436
437   IF_DEBUG(gran,
438            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
439            G_TSO(CurrentTSO, 5));
440
441   if (RtsFlags.GranFlags.Light) {
442     /* Save current time; GranSim Light only */
443     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
444   }      
445
446   event = get_next_event();
447
448   while (event!=(rtsEvent*)NULL) {
449     /* Choose the processor with the next event */
450     CurrentProc = event->proc;
451     CurrentTSO = event->tso;
452
453 #elif defined(PAR)
454
455   while (!receivedFinish) {    /* set by processMessages */
456                                /* when receiving PP_FINISH message         */ 
457 #else
458
459   while (1) {
460
461 #endif
462
463     IF_DEBUG(scheduler, printAllThreads());
464
465 #if defined(RTS_SUPPORTS_THREADS)
466     /* Check to see whether there are any worker threads
467        waiting to deposit external call results. If so,
468        yield our capability... if we have a capability, that is. */
469     if(cap)
470       yieldToReturningWorker(&sched_mutex, &cap,
471           mainThread ? &mainThread->bound_thread_cond : NULL);
472
473     /* If we do not currently hold a capability, we wait for one */
474     if(!cap)
475     {
476       waitForWorkCapability(&sched_mutex, &cap,
477           mainThread ? &mainThread->bound_thread_cond : NULL);
478       IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap",
479                                       osThreadId()));
480     }
481 #endif
482
483     /* If we're interrupted (the user pressed ^C, or some other
484      * termination condition occurred), kill all the currently running
485      * threads.
486      */
487     if (interrupted) {
488       IF_DEBUG(scheduler, sched_belch("interrupted"));
489       interrupted = rtsFalse;
490       was_interrupted = rtsTrue;
491 #if defined(RTS_SUPPORTS_THREADS)
492       // In the threaded RTS, deadlock detection doesn't work,
493       // so just exit right away.
494       prog_belch("interrupted");
495       releaseCapability(cap);
496       RELEASE_LOCK(&sched_mutex);
497       shutdownHaskellAndExit(EXIT_SUCCESS);
498 #else
499       deleteAllThreads();
500 #endif
501     }
502
503     /* Go through the list of main threads and wake up any
504      * clients whose computations have finished.  ToDo: this
505      * should be done more efficiently without a linear scan
506      * of the main threads list, somehow...
507      */
508 #if defined(RTS_SUPPORTS_THREADS)
509     { 
510         StgMainThread *m, **prev;
511         prev = &main_threads;
512         for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
513           if (m->tso->what_next == ThreadComplete
514               || m->tso->what_next == ThreadKilled)
515           {
516             if(m == mainThread)
517             {
518               if(m->tso->what_next == ThreadComplete)
519               {
520                 if (m->ret)
521                 {
522                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
523                   *(m->ret) = (StgClosure *)m->tso->sp[1]; 
524                 }
525                 m->stat = Success;
526               }
527               else
528               {
529                 if (m->ret)
530                 {
531                   *(m->ret) = NULL;
532                 }
533                 if (was_interrupted)
534                 {
535                   m->stat = Interrupted;
536                 }
537                 else
538                 {
539                   m->stat = Killed;
540                 }
541               }
542               *prev = m->link;
543             
544 #ifdef DEBUG
545               removeThreadLabel((StgWord)m->tso);
546 #endif
547               releaseCapability(cap);
548               RELEASE_LOCK(&sched_mutex);
549               return;
550             }
551             else
552             {
553                 // The current OS thread can not handle the fact that the Haskell
554                 // thread "m" has ended. 
555                 // "m" is bound; the scheduler loop in it's bound OS thread has
556                 // to return, so let's pass our capability directly to that thread.
557               passCapability(&sched_mutex, cap, &m->bound_thread_cond);
558               cap = NULL;
559             }
560           }
561         }
562     }
563     
564     if(!cap)    // If we gave our capability away,
565       continue; // go to the top to get it back
566       
567 #else /* not threaded */
568
569 # if defined(PAR)
570     /* in GUM do this only on the Main PE */
571     if (IAmMainThread)
572 # endif
573     /* If our main thread has finished or been killed, return.
574      */
575     {
576       StgMainThread *m = main_threads;
577       if (m->tso->what_next == ThreadComplete
578           || m->tso->what_next == ThreadKilled) {
579 #ifdef DEBUG
580         removeThreadLabel((StgWord)m->tso);
581 #endif
582         main_threads = main_threads->link;
583         if (m->tso->what_next == ThreadComplete) {
584             // We finished successfully, fill in the return value
585             // NOTE: return val is tso->sp[1] (see StgStartup.hc)
586             if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
587             m->stat = Success;
588             return;
589         } else {
590           if (m->ret) { *(m->ret) = NULL; };
591           if (was_interrupted) {
592             m->stat = Interrupted;
593           } else {
594             m->stat = Killed;
595           }
596           return;
597         }
598       }
599     }
600 #endif
601
602     /* Top up the run queue from our spark pool.  We try to make the
603      * number of threads in the run queue equal to the number of
604      * free capabilities.
605      *
606      * Disable spark support in SMP for now, non-essential & requires
607      * a little bit of work to make it compile cleanly. -- sof 1/02.
608      */
609 #if 0 /* defined(SMP) */
610     {
611       nat n = getFreeCapabilities();
612       StgTSO *tso = run_queue_hd;
613
614       /* Count the run queue */
615       while (n > 0 && tso != END_TSO_QUEUE) {
616         tso = tso->link;
617         n--;
618       }
619
620       for (; n > 0; n--) {
621         StgClosure *spark;
622         spark = findSpark(rtsFalse);
623         if (spark == NULL) {
624           break; /* no more sparks in the pool */
625         } else {
626           /* I'd prefer this to be done in activateSpark -- HWL */
627           /* tricky - it needs to hold the scheduler lock and
628            * not try to re-acquire it -- SDM */
629           createSparkThread(spark);       
630           IF_DEBUG(scheduler,
631                    sched_belch("==^^ turning spark of closure %p into a thread",
632                                (StgClosure *)spark));
633         }
634       }
635       /* We need to wake up the other tasks if we just created some
636        * work for them.
637        */
638       if (getFreeCapabilities() - n > 1) {
639           signalCondition( &thread_ready_cond );
640       }
641     }
642 #endif // SMP
643
644     /* check for signals each time around the scheduler */
645 #if defined(RTS_USER_SIGNALS)
646     if (signals_pending()) {
647       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
648       startSignalHandlers();
649       ACQUIRE_LOCK(&sched_mutex);
650     }
651 #endif
652
653     /* Check whether any waiting threads need to be woken up.  If the
654      * run queue is empty, and there are no other tasks running, we
655      * can wait indefinitely for something to happen.
656      */
657     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
658 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
659                 || EMPTY_RUN_QUEUE()
660 #endif
661         )
662     {
663       awaitEvent( EMPTY_RUN_QUEUE()
664 #if defined(SMP)
665         && allFreeCapabilities()
666 #endif
667         );
668     }
669     /* we can be interrupted while waiting for I/O... */
670     if (interrupted) continue;
671
672     /* 
673      * Detect deadlock: when we have no threads to run, there are no
674      * threads waiting on I/O or sleeping, and all the other tasks are
675      * waiting for work, we must have a deadlock of some description.
676      *
677      * We first try to find threads blocked on themselves (ie. black
678      * holes), and generate NonTermination exceptions where necessary.
679      *
680      * If no threads are black holed, we have a deadlock situation, so
681      * inform all the main threads.
682      */
683 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
684     if (   EMPTY_THREAD_QUEUES()
685 #if defined(RTS_SUPPORTS_THREADS)
686         && EMPTY_QUEUE(suspended_ccalling_threads)
687 #endif
688 #ifdef SMP
689         && allFreeCapabilities()
690 #endif
691         )
692     {
693         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
694 #if defined(THREADED_RTS)
695         /* and SMP mode ..? */
696         releaseCapability(cap);
697 #endif
698         // Garbage collection can release some new threads due to
699         // either (a) finalizers or (b) threads resurrected because
700         // they are about to be send BlockedOnDeadMVar.  Any threads
701         // thus released will be immediately runnable.
702         GarbageCollect(GetRoots,rtsTrue);
703
704         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
705
706         IF_DEBUG(scheduler, 
707                  sched_belch("still deadlocked, checking for black holes..."));
708         detectBlackHoles();
709
710         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
711
712 #if defined(RTS_USER_SIGNALS)
713         /* If we have user-installed signal handlers, then wait
714          * for signals to arrive rather then bombing out with a
715          * deadlock.
716          */
717 #if defined(RTS_SUPPORTS_THREADS)
718         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
719                       a signal with no runnable threads (or I/O
720                       suspended ones) leads nowhere quick.
721                       For now, simply shut down when we reach this
722                       condition.
723                       
724                       ToDo: define precisely under what conditions
725                       the Scheduler should shut down in an MT setting.
726                    */
727 #else
728         if ( anyUserHandlers() ) {
729 #endif
730             IF_DEBUG(scheduler, 
731                      sched_belch("still deadlocked, waiting for signals..."));
732
733             awaitUserSignals();
734
735             // we might be interrupted...
736             if (interrupted) { continue; }
737
738             if (signals_pending()) {
739                 RELEASE_LOCK(&sched_mutex);
740                 startSignalHandlers();
741                 ACQUIRE_LOCK(&sched_mutex);
742             }
743             ASSERT(!EMPTY_RUN_QUEUE());
744             goto not_deadlocked;
745         }
746 #endif
747
748         /* Probably a real deadlock.  Send the current main thread the
749          * Deadlock exception (or in the SMP build, send *all* main
750          * threads the deadlock exception, since none of them can make
751          * progress).
752          */
753         {
754             StgMainThread *m;
755 #if defined(RTS_SUPPORTS_THREADS)
756             for (m = main_threads; m != NULL; m = m->link) {
757                 switch (m->tso->why_blocked) {
758                 case BlockedOnBlackHole:
759                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
760                     break;
761                 case BlockedOnException:
762                 case BlockedOnMVar:
763                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
764                     break;
765                 default:
766                     barf("deadlock: main thread blocked in a strange way");
767                 }
768             }
769 #else
770             m = main_threads;
771             switch (m->tso->why_blocked) {
772             case BlockedOnBlackHole:
773                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
774                 break;
775             case BlockedOnException:
776             case BlockedOnMVar:
777                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
778                 break;
779             default:
780                 barf("deadlock: main thread blocked in a strange way");
781             }
782 #endif
783         }
784
785 #if defined(RTS_SUPPORTS_THREADS)
786         /* ToDo: revisit conditions (and mechanism) for shutting
787            down a multi-threaded world  */
788         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
789         RELEASE_LOCK(&sched_mutex);
790         shutdownHaskell();
791         return;
792 #endif
793     }
794   not_deadlocked:
795
796 #elif defined(RTS_SUPPORTS_THREADS)
797     /* ToDo: add deadlock detection in threaded RTS */
798 #elif defined(PAR)
799     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
800 #endif
801
802 #if defined(SMP)
803     /* If there's a GC pending, don't do anything until it has
804      * completed.
805      */
806     if (ready_to_gc) {
807       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
808       waitCondition( &gc_pending_cond, &sched_mutex );
809     }
810 #endif    
811
812 #if defined(RTS_SUPPORTS_THREADS)
813 #if defined(SMP)
814     /* block until we've got a thread on the run queue and a free
815      * capability.
816      *
817      */
818     if ( EMPTY_RUN_QUEUE() ) {
819       /* Give up our capability */
820       releaseCapability(cap);
821
822       /* If we're in the process of shutting down (& running the
823        * a batch of finalisers), don't wait around.
824        */
825       if ( shutting_down_scheduler ) {
826         RELEASE_LOCK(&sched_mutex);
827         return;
828       }
829       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
830       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
831       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
832     }
833 #else
834     if ( EMPTY_RUN_QUEUE() ) {
835       continue; // nothing to do
836     }
837 #endif
838 #endif
839
840 #if defined(GRAN)
841     if (RtsFlags.GranFlags.Light)
842       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
843
844     /* adjust time based on time-stamp */
845     if (event->time > CurrentTime[CurrentProc] &&
846         event->evttype != ContinueThread)
847       CurrentTime[CurrentProc] = event->time;
848     
849     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
850     if (!RtsFlags.GranFlags.Light)
851       handleIdlePEs();
852
853     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
854
855     /* main event dispatcher in GranSim */
856     switch (event->evttype) {
857       /* Should just be continuing execution */
858     case ContinueThread:
859       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
860       /* ToDo: check assertion
861       ASSERT(run_queue_hd != (StgTSO*)NULL &&
862              run_queue_hd != END_TSO_QUEUE);
863       */
864       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
865       if (!RtsFlags.GranFlags.DoAsyncFetch &&
866           procStatus[CurrentProc]==Fetching) {
867         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
868               CurrentTSO->id, CurrentTSO, CurrentProc);
869         goto next_thread;
870       } 
871       /* Ignore ContinueThreads for completed threads */
872       if (CurrentTSO->what_next == ThreadComplete) {
873         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
874               CurrentTSO->id, CurrentTSO, CurrentProc);
875         goto next_thread;
876       } 
877       /* Ignore ContinueThreads for threads that are being migrated */
878       if (PROCS(CurrentTSO)==Nowhere) { 
879         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
880               CurrentTSO->id, CurrentTSO, CurrentProc);
881         goto next_thread;
882       }
883       /* The thread should be at the beginning of the run queue */
884       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
885         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
886               CurrentTSO->id, CurrentTSO, CurrentProc);
887         break; // run the thread anyway
888       }
889       /*
890       new_event(proc, proc, CurrentTime[proc],
891                 FindWork,
892                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
893       goto next_thread; 
894       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
895       break; // now actually run the thread; DaH Qu'vam yImuHbej 
896
897     case FetchNode:
898       do_the_fetchnode(event);
899       goto next_thread;             /* handle next event in event queue  */
900       
901     case GlobalBlock:
902       do_the_globalblock(event);
903       goto next_thread;             /* handle next event in event queue  */
904       
905     case FetchReply:
906       do_the_fetchreply(event);
907       goto next_thread;             /* handle next event in event queue  */
908       
909     case UnblockThread:   /* Move from the blocked queue to the tail of */
910       do_the_unblock(event);
911       goto next_thread;             /* handle next event in event queue  */
912       
913     case ResumeThread:  /* Move from the blocked queue to the tail of */
914       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
915       event->tso->gran.blocktime += 
916         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
917       do_the_startthread(event);
918       goto next_thread;             /* handle next event in event queue  */
919       
920     case StartThread:
921       do_the_startthread(event);
922       goto next_thread;             /* handle next event in event queue  */
923       
924     case MoveThread:
925       do_the_movethread(event);
926       goto next_thread;             /* handle next event in event queue  */
927       
928     case MoveSpark:
929       do_the_movespark(event);
930       goto next_thread;             /* handle next event in event queue  */
931       
932     case FindWork:
933       do_the_findwork(event);
934       goto next_thread;             /* handle next event in event queue  */
935       
936     default:
937       barf("Illegal event type %u\n", event->evttype);
938     }  /* switch */
939     
940     /* This point was scheduler_loop in the old RTS */
941
942     IF_DEBUG(gran, belch("GRAN: after main switch"));
943
944     TimeOfLastEvent = CurrentTime[CurrentProc];
945     TimeOfNextEvent = get_time_of_next_event();
946     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
947     // CurrentTSO = ThreadQueueHd;
948
949     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
950                          TimeOfNextEvent));
951
952     if (RtsFlags.GranFlags.Light) 
953       GranSimLight_leave_system(event, &ActiveTSO); 
954
955     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
956
957     IF_DEBUG(gran, 
958              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
959
960     /* in a GranSim setup the TSO stays on the run queue */
961     t = CurrentTSO;
962     /* Take a thread from the run queue. */
963     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
964
965     IF_DEBUG(gran, 
966              fprintf(stderr, "GRAN: About to run current thread, which is\n");
967              G_TSO(t,5));
968
969     context_switch = 0; // turned on via GranYield, checking events and time slice
970
971     IF_DEBUG(gran, 
972              DumpGranEvent(GR_SCHEDULE, t));
973
974     procStatus[CurrentProc] = Busy;
975
976 #elif defined(PAR)
977     if (PendingFetches != END_BF_QUEUE) {
978         processFetches();
979     }
980
981     /* ToDo: phps merge with spark activation above */
982     /* check whether we have local work and send requests if we have none */
983     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
984       /* :-[  no local threads => look out for local sparks */
985       /* the spark pool for the current PE */
986       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
987       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
988           pool->hd < pool->tl) {
989         /* 
990          * ToDo: add GC code check that we really have enough heap afterwards!!
991          * Old comment:
992          * If we're here (no runnable threads) and we have pending
993          * sparks, we must have a space problem.  Get enough space
994          * to turn one of those pending sparks into a
995          * thread... 
996          */
997
998         spark = findSpark(rtsFalse);                /* get a spark */
999         if (spark != (rtsSpark) NULL) {
1000           tso = activateSpark(spark);       /* turn the spark into a thread */
1001           IF_PAR_DEBUG(schedule,
1002                        belch("==== schedule: Created TSO %d (%p); %d threads active",
1003                              tso->id, tso, advisory_thread_count));
1004
1005           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1006             belch("==^^ failed to activate spark");
1007             goto next_thread;
1008           }               /* otherwise fall through & pick-up new tso */
1009         } else {
1010           IF_PAR_DEBUG(verbose,
1011                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
1012                              spark_queue_len(pool)));
1013           goto next_thread;
1014         }
1015       }
1016
1017       /* If we still have no work we need to send a FISH to get a spark
1018          from another PE 
1019       */
1020       if (EMPTY_RUN_QUEUE()) {
1021       /* =8-[  no local sparks => look for work on other PEs */
1022         /*
1023          * We really have absolutely no work.  Send out a fish
1024          * (there may be some out there already), and wait for
1025          * something to arrive.  We clearly can't run any threads
1026          * until a SCHEDULE or RESUME arrives, and so that's what
1027          * we're hoping to see.  (Of course, we still have to
1028          * respond to other types of messages.)
1029          */
1030         TIME now = msTime() /*CURRENT_TIME*/;
1031         IF_PAR_DEBUG(verbose, 
1032                      belch("--  now=%ld", now));
1033         IF_PAR_DEBUG(verbose,
1034                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1035                          (last_fish_arrived_at!=0 &&
1036                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
1037                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
1038                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
1039                              last_fish_arrived_at,
1040                              RtsFlags.ParFlags.fishDelay, now);
1041                      });
1042         
1043         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1044             (last_fish_arrived_at==0 ||
1045              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
1046           /* outstandingFishes is set in sendFish, processFish;
1047              avoid flooding system with fishes via delay */
1048           pe = choosePE();
1049           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1050                    NEW_FISH_HUNGER);
1051
1052           // Global statistics: count no. of fishes
1053           if (RtsFlags.ParFlags.ParStats.Global &&
1054               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1055             globalParStats.tot_fish_mess++;
1056           }
1057         }
1058       
1059         receivedFinish = processMessages();
1060         goto next_thread;
1061       }
1062     } else if (PacketsWaiting()) {  /* Look for incoming messages */
1063       receivedFinish = processMessages();
1064     }
1065
1066     /* Now we are sure that we have some work available */
1067     ASSERT(run_queue_hd != END_TSO_QUEUE);
1068
1069     /* Take a thread from the run queue, if we have work */
1070     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
1071     IF_DEBUG(sanity,checkTSO(t));
1072
1073     /* ToDo: write something to the log-file
1074     if (RTSflags.ParFlags.granSimStats && !sameThread)
1075         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1076
1077     CurrentTSO = t;
1078     */
1079     /* the spark pool for the current PE */
1080     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1081
1082     IF_DEBUG(scheduler, 
1083              belch("--=^ %d threads, %d sparks on [%#x]", 
1084                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1085
1086 # if 1
1087     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1088         t && LastTSO && t->id != LastTSO->id && 
1089         LastTSO->why_blocked == NotBlocked && 
1090         LastTSO->what_next != ThreadComplete) {
1091       // if previously scheduled TSO not blocked we have to record the context switch
1092       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1093                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1094     }
1095
1096     if (RtsFlags.ParFlags.ParStats.Full && 
1097         (emitSchedule /* forced emit */ ||
1098         (t && LastTSO && t->id != LastTSO->id))) {
1099       /* 
1100          we are running a different TSO, so write a schedule event to log file
1101          NB: If we use fair scheduling we also have to write  a deschedule 
1102              event for LastTSO; with unfair scheduling we know that the
1103              previous tso has blocked whenever we switch to another tso, so
1104              we don't need it in GUM for now
1105       */
1106       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1107                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1108       emitSchedule = rtsFalse;
1109     }
1110      
1111 # endif
1112 #else /* !GRAN && !PAR */
1113   
1114     /* grab a thread from the run queue */
1115     ASSERT(run_queue_hd != END_TSO_QUEUE);
1116     t = POP_RUN_QUEUE();
1117     // Sanity check the thread we're about to run.  This can be
1118     // expensive if there is lots of thread switching going on...
1119     IF_DEBUG(sanity,checkTSO(t));
1120 #endif
1121
1122 #ifdef THREADED_RTS
1123     {
1124       StgMainThread *m;
1125       for(m = main_threads; m; m = m->link)
1126       {
1127         if(m->tso == t)
1128           break;
1129       }
1130       
1131       if(m)
1132       {
1133         if(m == mainThread)
1134         {
1135           IF_DEBUG(scheduler,
1136             fprintf(stderr,"### Running TSO %p in bound OS thread %u\n",
1137                     t, osThreadId()));
1138           // yes, the Haskell thread is bound to the current native thread
1139         }
1140         else
1141         {
1142           IF_DEBUG(scheduler,
1143             fprintf(stderr,"### TSO %p bound to other OS thread than %u\n",
1144                     t, osThreadId()));
1145           // no, bound to a different Haskell thread: pass to that thread
1146           PUSH_ON_RUN_QUEUE(t);
1147           passCapability(&sched_mutex,cap,&m->bound_thread_cond);
1148           cap = NULL;
1149           continue;
1150         }
1151       }
1152       else
1153       {
1154         // The thread we want to run is not bound.
1155         if(mainThread == NULL)
1156         {
1157           IF_DEBUG(scheduler,
1158             fprintf(stderr,"### Running TSO %p in worker OS thread %u\n",
1159                     t, osThreadId()));
1160           // if we are a worker thread,
1161           // we may run it here
1162         }
1163         else
1164         {
1165           IF_DEBUG(scheduler,
1166             fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n",
1167                     t, mainThread, osThreadId()));
1168           // no, the current native thread is bound to a different
1169           // Haskell thread, so pass it to any worker thread
1170           PUSH_ON_RUN_QUEUE(t);
1171           passCapabilityToWorker(&sched_mutex, cap);
1172           cap = NULL;
1173           continue; 
1174         }
1175       }
1176     }
1177 #endif
1178
1179     cap->r.rCurrentTSO = t;
1180     
1181     /* context switches are now initiated by the timer signal, unless
1182      * the user specified "context switch as often as possible", with
1183      * +RTS -C0
1184      */
1185     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1186          && (run_queue_hd != END_TSO_QUEUE
1187              || blocked_queue_hd != END_TSO_QUEUE
1188              || sleeping_queue != END_TSO_QUEUE)))
1189         context_switch = 1;
1190     else
1191         context_switch = 0;
1192
1193 run_thread:
1194
1195     RELEASE_LOCK(&sched_mutex);
1196
1197     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
1198                               t->id, whatNext_strs[t->what_next]));
1199
1200 #ifdef PROFILING
1201     startHeapProfTimer();
1202 #endif
1203
1204     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1205     /* Run the current thread 
1206      */
1207     prev_what_next = t->what_next;
1208     switch (prev_what_next) {
1209     case ThreadKilled:
1210     case ThreadComplete:
1211         /* Thread already finished, return to scheduler. */
1212         ret = ThreadFinished;
1213         break;
1214     case ThreadRunGHC:
1215         errno = t->saved_errno;
1216         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1217         t->saved_errno = errno;
1218         break;
1219     case ThreadInterpret:
1220         ret = interpretBCO(cap);
1221         break;
1222     default:
1223       barf("schedule: invalid what_next field");
1224     }
1225     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1226     
1227     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1228 #ifdef PROFILING
1229     stopHeapProfTimer();
1230     CCCS = CCS_SYSTEM;
1231 #endif
1232     
1233     ACQUIRE_LOCK(&sched_mutex);
1234     
1235 #ifdef RTS_SUPPORTS_THREADS
1236     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId()););
1237 #elif !defined(GRAN) && !defined(PAR)
1238     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1239 #endif
1240     t = cap->r.rCurrentTSO;
1241     
1242 #if defined(PAR)
1243     /* HACK 675: if the last thread didn't yield, make sure to print a 
1244        SCHEDULE event to the log file when StgRunning the next thread, even
1245        if it is the same one as before */
1246     LastTSO = t; 
1247     TimeOfLastYield = CURRENT_TIME;
1248 #endif
1249
1250     switch (ret) {
1251     case HeapOverflow:
1252 #if defined(GRAN)
1253       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1254       globalGranStats.tot_heapover++;
1255 #elif defined(PAR)
1256       globalParStats.tot_heapover++;
1257 #endif
1258
1259       // did the task ask for a large block?
1260       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1261           // if so, get one and push it on the front of the nursery.
1262           bdescr *bd;
1263           nat blocks;
1264           
1265           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1266
1267           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
1268                                    t->id, whatNext_strs[t->what_next], blocks));
1269
1270           // don't do this if it would push us over the
1271           // alloc_blocks_lim limit; we'll GC first.
1272           if (alloc_blocks + blocks < alloc_blocks_lim) {
1273
1274               alloc_blocks += blocks;
1275               bd = allocGroup( blocks );
1276
1277               // link the new group into the list
1278               bd->link = cap->r.rCurrentNursery;
1279               bd->u.back = cap->r.rCurrentNursery->u.back;
1280               if (cap->r.rCurrentNursery->u.back != NULL) {
1281                   cap->r.rCurrentNursery->u.back->link = bd;
1282               } else {
1283                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1284                          g0s0->blocks == cap->r.rNursery);
1285                   cap->r.rNursery = g0s0->blocks = bd;
1286               }           
1287               cap->r.rCurrentNursery->u.back = bd;
1288
1289               // initialise it as a nursery block.  We initialise the
1290               // step, gen_no, and flags field of *every* sub-block in
1291               // this large block, because this is easier than making
1292               // sure that we always find the block head of a large
1293               // block whenever we call Bdescr() (eg. evacuate() and
1294               // isAlive() in the GC would both have to do this, at
1295               // least).
1296               { 
1297                   bdescr *x;
1298                   for (x = bd; x < bd + blocks; x++) {
1299                       x->step = g0s0;
1300                       x->gen_no = 0;
1301                       x->flags = 0;
1302                   }
1303               }
1304
1305               // don't forget to update the block count in g0s0.
1306               g0s0->n_blocks += blocks;
1307               // This assert can be a killer if the app is doing lots
1308               // of large block allocations.
1309               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1310
1311               // now update the nursery to point to the new block
1312               cap->r.rCurrentNursery = bd;
1313
1314               // we might be unlucky and have another thread get on the
1315               // run queue before us and steal the large block, but in that
1316               // case the thread will just end up requesting another large
1317               // block.
1318               PUSH_ON_RUN_QUEUE(t);
1319               break;
1320           }
1321       }
1322
1323       /* make all the running tasks block on a condition variable,
1324        * maybe set context_switch and wait till they all pile in,
1325        * then have them wait on a GC condition variable.
1326        */
1327       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1328                                t->id, whatNext_strs[t->what_next]));
1329       threadPaused(t);
1330 #if defined(GRAN)
1331       ASSERT(!is_on_queue(t,CurrentProc));
1332 #elif defined(PAR)
1333       /* Currently we emit a DESCHEDULE event before GC in GUM.
1334          ToDo: either add separate event to distinguish SYSTEM time from rest
1335                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1336       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1337         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1338                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1339         emitSchedule = rtsTrue;
1340       }
1341 #endif
1342       
1343       ready_to_gc = rtsTrue;
1344       context_switch = 1;               /* stop other threads ASAP */
1345       PUSH_ON_RUN_QUEUE(t);
1346       /* actual GC is done at the end of the while loop */
1347       break;
1348       
1349     case StackOverflow:
1350 #if defined(GRAN)
1351       IF_DEBUG(gran, 
1352                DumpGranEvent(GR_DESCHEDULE, t));
1353       globalGranStats.tot_stackover++;
1354 #elif defined(PAR)
1355       // IF_DEBUG(par, 
1356       // DumpGranEvent(GR_DESCHEDULE, t);
1357       globalParStats.tot_stackover++;
1358 #endif
1359       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1360                                t->id, whatNext_strs[t->what_next]));
1361       /* just adjust the stack for this thread, then pop it back
1362        * on the run queue.
1363        */
1364       threadPaused(t);
1365       { 
1366         StgMainThread *m;
1367         /* enlarge the stack */
1368         StgTSO *new_t = threadStackOverflow(t);
1369         
1370         /* This TSO has moved, so update any pointers to it from the
1371          * main thread stack.  It better not be on any other queues...
1372          * (it shouldn't be).
1373          */
1374         for (m = main_threads; m != NULL; m = m->link) {
1375           if (m->tso == t) {
1376             m->tso = new_t;
1377           }
1378         }
1379         threadPaused(new_t);
1380         PUSH_ON_RUN_QUEUE(new_t);
1381       }
1382       break;
1383
1384     case ThreadYielding:
1385 #if defined(GRAN)
1386       IF_DEBUG(gran, 
1387                DumpGranEvent(GR_DESCHEDULE, t));
1388       globalGranStats.tot_yields++;
1389 #elif defined(PAR)
1390       // IF_DEBUG(par, 
1391       // DumpGranEvent(GR_DESCHEDULE, t);
1392       globalParStats.tot_yields++;
1393 #endif
1394       /* put the thread back on the run queue.  Then, if we're ready to
1395        * GC, check whether this is the last task to stop.  If so, wake
1396        * up the GC thread.  getThread will block during a GC until the
1397        * GC is finished.
1398        */
1399       IF_DEBUG(scheduler,
1400                if (t->what_next != prev_what_next) {
1401                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1402                          t->id, whatNext_strs[t->what_next]);
1403                } else {
1404                    belch("--<< thread %ld (%s) stopped, yielding", 
1405                          t->id, whatNext_strs[t->what_next]);
1406                }
1407                );
1408
1409       IF_DEBUG(sanity,
1410                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1411                checkTSO(t));
1412       ASSERT(t->link == END_TSO_QUEUE);
1413
1414       // Shortcut if we're just switching evaluators: don't bother
1415       // doing stack squeezing (which can be expensive), just run the
1416       // thread.
1417       if (t->what_next != prev_what_next) {
1418           goto run_thread;
1419       }
1420
1421       threadPaused(t);
1422
1423 #if defined(GRAN)
1424       ASSERT(!is_on_queue(t,CurrentProc));
1425
1426       IF_DEBUG(sanity,
1427                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1428                checkThreadQsSanity(rtsTrue));
1429 #endif
1430
1431 #if defined(PAR)
1432       if (RtsFlags.ParFlags.doFairScheduling) { 
1433         /* this does round-robin scheduling; good for concurrency */
1434         APPEND_TO_RUN_QUEUE(t);
1435       } else {
1436         /* this does unfair scheduling; good for parallelism */
1437         PUSH_ON_RUN_QUEUE(t);
1438       }
1439 #else
1440       // this does round-robin scheduling; good for concurrency
1441       APPEND_TO_RUN_QUEUE(t);
1442 #endif
1443
1444 #if defined(GRAN)
1445       /* add a ContinueThread event to actually process the thread */
1446       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1447                 ContinueThread,
1448                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1449       IF_GRAN_DEBUG(bq, 
1450                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1451                G_EVENTQ(0);
1452                G_CURR_THREADQ(0));
1453 #endif /* GRAN */
1454       break;
1455
1456     case ThreadBlocked:
1457 #if defined(GRAN)
1458       IF_DEBUG(scheduler,
1459                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1460                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1461                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1462
1463       // ??? needed; should emit block before
1464       IF_DEBUG(gran, 
1465                DumpGranEvent(GR_DESCHEDULE, t)); 
1466       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1467       /*
1468         ngoq Dogh!
1469       ASSERT(procStatus[CurrentProc]==Busy || 
1470               ((procStatus[CurrentProc]==Fetching) && 
1471               (t->block_info.closure!=(StgClosure*)NULL)));
1472       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1473           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1474             procStatus[CurrentProc]==Fetching)) 
1475         procStatus[CurrentProc] = Idle;
1476       */
1477 #elif defined(PAR)
1478       IF_DEBUG(scheduler,
1479                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1480                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1481       IF_PAR_DEBUG(bq,
1482
1483                    if (t->block_info.closure!=(StgClosure*)NULL) 
1484                      print_bq(t->block_info.closure));
1485
1486       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1487       blockThread(t);
1488
1489       /* whatever we schedule next, we must log that schedule */
1490       emitSchedule = rtsTrue;
1491
1492 #else /* !GRAN */
1493       /* don't need to do anything.  Either the thread is blocked on
1494        * I/O, in which case we'll have called addToBlockedQueue
1495        * previously, or it's blocked on an MVar or Blackhole, in which
1496        * case it'll be on the relevant queue already.
1497        */
1498       IF_DEBUG(scheduler,
1499                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1500                        t->id, whatNext_strs[t->what_next]);
1501                printThreadBlockage(t);
1502                fprintf(stderr, "\n"));
1503
1504       /* Only for dumping event to log file 
1505          ToDo: do I need this in GranSim, too?
1506       blockThread(t);
1507       */
1508 #endif
1509       threadPaused(t);
1510       break;
1511       
1512     case ThreadFinished:
1513       /* Need to check whether this was a main thread, and if so, signal
1514        * the task that started it with the return value.  If we have no
1515        * more main threads, we probably need to stop all the tasks until
1516        * we get a new one.
1517        */
1518       /* We also end up here if the thread kills itself with an
1519        * uncaught exception, see Exception.hc.
1520        */
1521       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1522                                t->id, whatNext_strs[t->what_next]));
1523 #if defined(GRAN)
1524       endThread(t, CurrentProc); // clean-up the thread
1525 #elif defined(PAR)
1526       /* For now all are advisory -- HWL */
1527       //if(t->priority==AdvisoryPriority) ??
1528       advisory_thread_count--;
1529       
1530 # ifdef DIST
1531       if(t->dist.priority==RevalPriority)
1532         FinishReval(t);
1533 # endif
1534       
1535       if (RtsFlags.ParFlags.ParStats.Full &&
1536           !RtsFlags.ParFlags.ParStats.Suppressed) 
1537         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1538 #endif
1539       break;
1540       
1541     default:
1542       barf("schedule: invalid thread return code %d", (int)ret);
1543     }
1544
1545 #ifdef PROFILING
1546     // When we have +RTS -i0 and we're heap profiling, do a census at
1547     // every GC.  This lets us get repeatable runs for debugging.
1548     if (performHeapProfile ||
1549         (RtsFlags.ProfFlags.profileInterval==0 &&
1550          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1551         GarbageCollect(GetRoots, rtsTrue);
1552         heapCensus();
1553         performHeapProfile = rtsFalse;
1554         ready_to_gc = rtsFalse; // we already GC'd
1555     }
1556 #endif
1557
1558     if (ready_to_gc 
1559 #ifdef SMP
1560         && allFreeCapabilities() 
1561 #endif
1562         ) {
1563       /* everybody back, start the GC.
1564        * Could do it in this thread, or signal a condition var
1565        * to do it in another thread.  Either way, we need to
1566        * broadcast on gc_pending_cond afterward.
1567        */
1568 #if defined(RTS_SUPPORTS_THREADS)
1569       IF_DEBUG(scheduler,sched_belch("doing GC"));
1570 #endif
1571       GarbageCollect(GetRoots,rtsFalse);
1572       ready_to_gc = rtsFalse;
1573 #ifdef SMP
1574       broadcastCondition(&gc_pending_cond);
1575 #endif
1576 #if defined(GRAN)
1577       /* add a ContinueThread event to continue execution of current thread */
1578       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1579                 ContinueThread,
1580                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1581       IF_GRAN_DEBUG(bq, 
1582                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1583                G_EVENTQ(0);
1584                G_CURR_THREADQ(0));
1585 #endif /* GRAN */
1586     }
1587
1588 #if defined(GRAN)
1589   next_thread:
1590     IF_GRAN_DEBUG(unused,
1591                   print_eventq(EventHd));
1592
1593     event = get_next_event();
1594 #elif defined(PAR)
1595   next_thread:
1596     /* ToDo: wait for next message to arrive rather than busy wait */
1597 #endif /* GRAN */
1598
1599   } /* end of while(1) */
1600
1601   IF_PAR_DEBUG(verbose,
1602                belch("== Leaving schedule() after having received Finish"));
1603 }
1604
1605 /* ---------------------------------------------------------------------------
1606  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1607  * used by Control.Concurrent for error checking.
1608  * ------------------------------------------------------------------------- */
1609  
1610 StgBool
1611 rtsSupportsBoundThreads(void)
1612 {
1613 #ifdef THREADED_RTS
1614   return rtsTrue;
1615 #else
1616   return rtsFalse;
1617 #endif
1618 }
1619
1620 /* ---------------------------------------------------------------------------
1621  * isThreadBound(tso): check whether tso is bound to an OS thread.
1622  * ------------------------------------------------------------------------- */
1623  
1624 StgBool
1625 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1626 {
1627 #ifdef THREADED_RTS
1628   StgMainThread *m;
1629   for(m = main_threads; m; m = m->link)
1630   {
1631     if(m->tso == tso)
1632       return rtsTrue;
1633   }
1634 #endif
1635   return rtsFalse;
1636 }
1637
1638 /* ---------------------------------------------------------------------------
1639  * Singleton fork(). Do not copy any running threads.
1640  * ------------------------------------------------------------------------- */
1641
1642 static void 
1643 deleteThreadImmediately(StgTSO *tso);
1644
1645 StgInt
1646 forkProcess(HsStablePtr *entry)
1647 {
1648 #ifndef mingw32_TARGET_OS
1649   pid_t pid;
1650   StgTSO* t,*next;
1651   StgMainThread *m;
1652   SchedulerStatus rc;
1653
1654   IF_DEBUG(scheduler,sched_belch("forking!"));
1655   rts_lock(); // This not only acquires sched_mutex, it also
1656               // makes sure that no other threads are running
1657
1658   pid = fork();
1659
1660   if (pid) { /* parent */
1661
1662   /* just return the pid */
1663     rts_unlock();
1664     return pid;
1665     
1666   } else { /* child */
1667     
1668     
1669       // delete all threads
1670     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1671     
1672     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1673       next = t->link;
1674
1675         // don't allow threads to catch the ThreadKilled exception
1676       deleteThreadImmediately(t);
1677     }
1678     
1679       // wipe the main thread list
1680     while((m = main_threads) != NULL) {
1681       main_threads = m->link;
1682 #ifdef THREADED_RTS
1683       closeCondition(&m->bound_thread_cond);
1684 #endif
1685       stgFree(m);
1686     }
1687     
1688 #ifdef RTS_SUPPORTS_THREADS
1689     resetTaskManagerAfterFork();      // tell startTask() and friends that
1690     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1691     resetWorkerWakeupPipeAfterFork();
1692 #endif
1693     
1694     rc = rts_evalStableIO(entry, NULL);  // run the action
1695     rts_checkSchedStatus("forkProcess",rc);
1696     
1697     rts_unlock();
1698     
1699     hs_exit();                      // clean up and exit
1700     stg_exit(0);
1701   }
1702 #else /* mingw32 */
1703   barf("forkProcess#: primop not implemented for mingw32, sorry!\n");
1704   return -1;
1705 #endif /* mingw32 */
1706 }
1707
1708 /* ---------------------------------------------------------------------------
1709  * deleteAllThreads():  kill all the live threads.
1710  *
1711  * This is used when we catch a user interrupt (^C), before performing
1712  * any necessary cleanups and running finalizers.
1713  *
1714  * Locks: sched_mutex held.
1715  * ------------------------------------------------------------------------- */
1716    
1717 void
1718 deleteAllThreads ( void )
1719 {
1720   StgTSO* t, *next;
1721   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1722   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1723       next = t->global_link;
1724       deleteThread(t);
1725   }      
1726   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1727   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1728   sleeping_queue = END_TSO_QUEUE;
1729 }
1730
1731 /* startThread and  insertThread are now in GranSim.c -- HWL */
1732
1733
1734 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1735 //@subsection Suspend and Resume
1736
1737 /* ---------------------------------------------------------------------------
1738  * Suspending & resuming Haskell threads.
1739  * 
1740  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1741  * its capability before calling the C function.  This allows another
1742  * task to pick up the capability and carry on running Haskell
1743  * threads.  It also means that if the C call blocks, it won't lock
1744  * the whole system.
1745  *
1746  * The Haskell thread making the C call is put to sleep for the
1747  * duration of the call, on the susepended_ccalling_threads queue.  We
1748  * give out a token to the task, which it can use to resume the thread
1749  * on return from the C function.
1750  * ------------------------------------------------------------------------- */
1751    
1752 StgInt
1753 suspendThread( StgRegTable *reg, 
1754                rtsBool concCall
1755 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1756                STG_UNUSED
1757 #endif
1758                )
1759 {
1760   nat tok;
1761   Capability *cap;
1762   int saved_errno = errno;
1763
1764   /* assume that *reg is a pointer to the StgRegTable part
1765    * of a Capability.
1766    */
1767   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1768
1769   ACQUIRE_LOCK(&sched_mutex);
1770
1771   IF_DEBUG(scheduler,
1772            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1773
1774   // XXX this might not be necessary --SDM
1775   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1776
1777   threadPaused(cap->r.rCurrentTSO);
1778   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1779   suspended_ccalling_threads = cap->r.rCurrentTSO;
1780
1781 #if defined(RTS_SUPPORTS_THREADS)
1782   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1783   {
1784       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1785       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1786   }
1787   else
1788   {
1789       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1790   }
1791 #endif
1792
1793   /* Use the thread ID as the token; it should be unique */
1794   tok = cap->r.rCurrentTSO->id;
1795
1796   /* Hand back capability */
1797   releaseCapability(cap);
1798   
1799 #if defined(RTS_SUPPORTS_THREADS)
1800   /* Preparing to leave the RTS, so ensure there's a native thread/task
1801      waiting to take over.
1802   */
1803   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
1804 #endif
1805
1806   /* Other threads _might_ be available for execution; signal this */
1807   THREAD_RUNNABLE();
1808   RELEASE_LOCK(&sched_mutex);
1809   
1810   errno = saved_errno;
1811   return tok; 
1812 }
1813
1814 StgRegTable *
1815 resumeThread( StgInt tok,
1816               rtsBool concCall STG_UNUSED )
1817 {
1818   StgTSO *tso, **prev;
1819   Capability *cap;
1820   int saved_errno = errno;
1821
1822 #if defined(RTS_SUPPORTS_THREADS)
1823   /* Wait for permission to re-enter the RTS with the result. */
1824   ACQUIRE_LOCK(&sched_mutex);
1825   grabReturnCapability(&sched_mutex, &cap);
1826
1827   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
1828 #else
1829   grabCapability(&cap);
1830 #endif
1831
1832   /* Remove the thread off of the suspended list */
1833   prev = &suspended_ccalling_threads;
1834   for (tso = suspended_ccalling_threads; 
1835        tso != END_TSO_QUEUE; 
1836        prev = &tso->link, tso = tso->link) {
1837     if (tso->id == (StgThreadID)tok) {
1838       *prev = tso->link;
1839       break;
1840     }
1841   }
1842   if (tso == END_TSO_QUEUE) {
1843     barf("resumeThread: thread not found");
1844   }
1845   tso->link = END_TSO_QUEUE;
1846   
1847 #if defined(RTS_SUPPORTS_THREADS)
1848   if(tso->why_blocked == BlockedOnCCall)
1849   {
1850       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1851       tso->blocked_exceptions = NULL;
1852   }
1853 #endif
1854   
1855   /* Reset blocking status */
1856   tso->why_blocked  = NotBlocked;
1857
1858   cap->r.rCurrentTSO = tso;
1859 #if defined(RTS_SUPPORTS_THREADS)
1860   RELEASE_LOCK(&sched_mutex);
1861 #endif
1862   errno = saved_errno;
1863   return &cap->r;
1864 }
1865
1866
1867 /* ---------------------------------------------------------------------------
1868  * Static functions
1869  * ------------------------------------------------------------------------ */
1870 static void unblockThread(StgTSO *tso);
1871
1872 /* ---------------------------------------------------------------------------
1873  * Comparing Thread ids.
1874  *
1875  * This is used from STG land in the implementation of the
1876  * instances of Eq/Ord for ThreadIds.
1877  * ------------------------------------------------------------------------ */
1878
1879 int
1880 cmp_thread(StgPtr tso1, StgPtr tso2) 
1881
1882   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1883   StgThreadID id2 = ((StgTSO *)tso2)->id;
1884  
1885   if (id1 < id2) return (-1);
1886   if (id1 > id2) return 1;
1887   return 0;
1888 }
1889
1890 /* ---------------------------------------------------------------------------
1891  * Fetching the ThreadID from an StgTSO.
1892  *
1893  * This is used in the implementation of Show for ThreadIds.
1894  * ------------------------------------------------------------------------ */
1895 int
1896 rts_getThreadId(StgPtr tso) 
1897 {
1898   return ((StgTSO *)tso)->id;
1899 }
1900
1901 #ifdef DEBUG
1902 void
1903 labelThread(StgPtr tso, char *label)
1904 {
1905   int len;
1906   void *buf;
1907
1908   /* Caveat: Once set, you can only set the thread name to "" */
1909   len = strlen(label)+1;
1910   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1911   strncpy(buf,label,len);
1912   /* Update will free the old memory for us */
1913   updateThreadLabel((StgWord)tso,buf);
1914 }
1915 #endif /* DEBUG */
1916
1917 /* ---------------------------------------------------------------------------
1918    Create a new thread.
1919
1920    The new thread starts with the given stack size.  Before the
1921    scheduler can run, however, this thread needs to have a closure
1922    (and possibly some arguments) pushed on its stack.  See
1923    pushClosure() in Schedule.h.
1924
1925    createGenThread() and createIOThread() (in SchedAPI.h) are
1926    convenient packaged versions of this function.
1927
1928    currently pri (priority) is only used in a GRAN setup -- HWL
1929    ------------------------------------------------------------------------ */
1930 //@cindex createThread
1931 #if defined(GRAN)
1932 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1933 StgTSO *
1934 createThread(nat size, StgInt pri)
1935 #else
1936 StgTSO *
1937 createThread(nat size)
1938 #endif
1939 {
1940
1941     StgTSO *tso;
1942     nat stack_size;
1943
1944     /* First check whether we should create a thread at all */
1945 #if defined(PAR)
1946   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1947   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1948     threadsIgnored++;
1949     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1950           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1951     return END_TSO_QUEUE;
1952   }
1953   threadsCreated++;
1954 #endif
1955
1956 #if defined(GRAN)
1957   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1958 #endif
1959
1960   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1961
1962   /* catch ridiculously small stack sizes */
1963   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1964     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1965   }
1966
1967   stack_size = size - TSO_STRUCT_SIZEW;
1968
1969   tso = (StgTSO *)allocate(size);
1970   TICK_ALLOC_TSO(stack_size, 0);
1971
1972   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1973 #if defined(GRAN)
1974   SET_GRAN_HDR(tso, ThisPE);
1975 #endif
1976
1977   // Always start with the compiled code evaluator
1978   tso->what_next = ThreadRunGHC;
1979
1980   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1981    * protect the increment operation on next_thread_id.
1982    * In future, we could use an atomic increment instead.
1983    */
1984   ACQUIRE_LOCK(&thread_id_mutex);
1985   tso->id = next_thread_id++; 
1986   RELEASE_LOCK(&thread_id_mutex);
1987
1988   tso->why_blocked  = NotBlocked;
1989   tso->blocked_exceptions = NULL;
1990
1991   tso->saved_errno = 0;
1992   
1993   tso->stack_size   = stack_size;
1994   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1995                               - TSO_STRUCT_SIZEW;
1996   tso->sp           = (P_)&(tso->stack) + stack_size;
1997
1998 #ifdef PROFILING
1999   tso->prof.CCCS = CCS_MAIN;
2000 #endif
2001
2002   /* put a stop frame on the stack */
2003   tso->sp -= sizeofW(StgStopFrame);
2004   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2005   // ToDo: check this
2006 #if defined(GRAN)
2007   tso->link = END_TSO_QUEUE;
2008   /* uses more flexible routine in GranSim */
2009   insertThread(tso, CurrentProc);
2010 #else
2011   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2012    * from its creation
2013    */
2014 #endif
2015
2016 #if defined(GRAN) 
2017   if (RtsFlags.GranFlags.GranSimStats.Full) 
2018     DumpGranEvent(GR_START,tso);
2019 #elif defined(PAR)
2020   if (RtsFlags.ParFlags.ParStats.Full) 
2021     DumpGranEvent(GR_STARTQ,tso);
2022   /* HACk to avoid SCHEDULE 
2023      LastTSO = tso; */
2024 #endif
2025
2026   /* Link the new thread on the global thread list.
2027    */
2028   tso->global_link = all_threads;
2029   all_threads = tso;
2030
2031 #if defined(DIST)
2032   tso->dist.priority = MandatoryPriority; //by default that is...
2033 #endif
2034
2035 #if defined(GRAN)
2036   tso->gran.pri = pri;
2037 # if defined(DEBUG)
2038   tso->gran.magic = TSO_MAGIC; // debugging only
2039 # endif
2040   tso->gran.sparkname   = 0;
2041   tso->gran.startedat   = CURRENT_TIME; 
2042   tso->gran.exported    = 0;
2043   tso->gran.basicblocks = 0;
2044   tso->gran.allocs      = 0;
2045   tso->gran.exectime    = 0;
2046   tso->gran.fetchtime   = 0;
2047   tso->gran.fetchcount  = 0;
2048   tso->gran.blocktime   = 0;
2049   tso->gran.blockcount  = 0;
2050   tso->gran.blockedat   = 0;
2051   tso->gran.globalsparks = 0;
2052   tso->gran.localsparks  = 0;
2053   if (RtsFlags.GranFlags.Light)
2054     tso->gran.clock  = Now; /* local clock */
2055   else
2056     tso->gran.clock  = 0;
2057
2058   IF_DEBUG(gran,printTSO(tso));
2059 #elif defined(PAR)
2060 # if defined(DEBUG)
2061   tso->par.magic = TSO_MAGIC; // debugging only
2062 # endif
2063   tso->par.sparkname   = 0;
2064   tso->par.startedat   = CURRENT_TIME; 
2065   tso->par.exported    = 0;
2066   tso->par.basicblocks = 0;
2067   tso->par.allocs      = 0;
2068   tso->par.exectime    = 0;
2069   tso->par.fetchtime   = 0;
2070   tso->par.fetchcount  = 0;
2071   tso->par.blocktime   = 0;
2072   tso->par.blockcount  = 0;
2073   tso->par.blockedat   = 0;
2074   tso->par.globalsparks = 0;
2075   tso->par.localsparks  = 0;
2076 #endif
2077
2078 #if defined(GRAN)
2079   globalGranStats.tot_threads_created++;
2080   globalGranStats.threads_created_on_PE[CurrentProc]++;
2081   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2082   globalGranStats.tot_sq_probes++;
2083 #elif defined(PAR)
2084   // collect parallel global statistics (currently done together with GC stats)
2085   if (RtsFlags.ParFlags.ParStats.Global &&
2086       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2087     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2088     globalParStats.tot_threads_created++;
2089   }
2090 #endif 
2091
2092 #if defined(GRAN)
2093   IF_GRAN_DEBUG(pri,
2094                 belch("==__ schedule: Created TSO %d (%p);",
2095                       CurrentProc, tso, tso->id));
2096 #elif defined(PAR)
2097     IF_PAR_DEBUG(verbose,
2098                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
2099                        tso->id, tso, advisory_thread_count));
2100 #else
2101   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2102                                  tso->id, tso->stack_size));
2103 #endif    
2104   return tso;
2105 }
2106
2107 #if defined(PAR)
2108 /* RFP:
2109    all parallel thread creation calls should fall through the following routine.
2110 */
2111 StgTSO *
2112 createSparkThread(rtsSpark spark) 
2113 { StgTSO *tso;
2114   ASSERT(spark != (rtsSpark)NULL);
2115   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2116   { threadsIgnored++;
2117     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2118           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2119     return END_TSO_QUEUE;
2120   }
2121   else
2122   { threadsCreated++;
2123     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2124     if (tso==END_TSO_QUEUE)     
2125       barf("createSparkThread: Cannot create TSO");
2126 #if defined(DIST)
2127     tso->priority = AdvisoryPriority;
2128 #endif
2129     pushClosure(tso,spark);
2130     PUSH_ON_RUN_QUEUE(tso);
2131     advisory_thread_count++;    
2132   }
2133   return tso;
2134 }
2135 #endif
2136
2137 /*
2138   Turn a spark into a thread.
2139   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2140 */
2141 #if defined(PAR)
2142 //@cindex activateSpark
2143 StgTSO *
2144 activateSpark (rtsSpark spark) 
2145 {
2146   StgTSO *tso;
2147
2148   tso = createSparkThread(spark);
2149   if (RtsFlags.ParFlags.ParStats.Full) {   
2150     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2151     IF_PAR_DEBUG(verbose,
2152                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
2153                        (StgClosure *)spark, info_type((StgClosure *)spark)));
2154   }
2155   // ToDo: fwd info on local/global spark to thread -- HWL
2156   // tso->gran.exported =  spark->exported;
2157   // tso->gran.locked =   !spark->global;
2158   // tso->gran.sparkname = spark->name;
2159
2160   return tso;
2161 }
2162 #endif
2163
2164 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
2165                                    Capability *initialCapability
2166                                    );
2167
2168
2169 /* ---------------------------------------------------------------------------
2170  * scheduleThread()
2171  *
2172  * scheduleThread puts a thread on the head of the runnable queue.
2173  * This will usually be done immediately after a thread is created.
2174  * The caller of scheduleThread must create the thread using e.g.
2175  * createThread and push an appropriate closure
2176  * on this thread's stack before the scheduler is invoked.
2177  * ------------------------------------------------------------------------ */
2178
2179 static void scheduleThread_ (StgTSO* tso);
2180
2181 void
2182 scheduleThread_(StgTSO *tso)
2183 {
2184   // Precondition: sched_mutex must be held.
2185
2186   /* Put the new thread on the head of the runnable queue.  The caller
2187    * better push an appropriate closure on this thread's stack
2188    * beforehand.  In the SMP case, the thread may start running as
2189    * soon as we release the scheduler lock below.
2190    */
2191   PUSH_ON_RUN_QUEUE(tso);
2192   THREAD_RUNNABLE();
2193
2194 #if 0
2195   IF_DEBUG(scheduler,printTSO(tso));
2196 #endif
2197 }
2198
2199 void scheduleThread(StgTSO* tso)
2200 {
2201   ACQUIRE_LOCK(&sched_mutex);
2202   scheduleThread_(tso);
2203   RELEASE_LOCK(&sched_mutex);
2204 }
2205
2206 SchedulerStatus
2207 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability)
2208 {       // Precondition: sched_mutex must be held
2209   StgMainThread *m;
2210
2211   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2212   m->tso = tso;
2213   m->ret = ret;
2214   m->stat = NoStatus;
2215 #if defined(RTS_SUPPORTS_THREADS)
2216 #if defined(THREADED_RTS)
2217   initCondition(&m->bound_thread_cond);
2218 #else
2219   initCondition(&m->wakeup);
2220 #endif
2221 #endif
2222
2223   /* Put the thread on the main-threads list prior to scheduling the TSO.
2224      Failure to do so introduces a race condition in the MT case (as
2225      identified by Wolfgang Thaller), whereby the new task/OS thread 
2226      created by scheduleThread_() would complete prior to the thread
2227      that spawned it managed to put 'itself' on the main-threads list.
2228      The upshot of it all being that the worker thread wouldn't get to
2229      signal the completion of the its work item for the main thread to
2230      see (==> it got stuck waiting.)    -- sof 6/02.
2231   */
2232   IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
2233   
2234   m->link = main_threads;
2235   main_threads = m;
2236
2237   scheduleThread_(tso);
2238
2239   return waitThread_(m, initialCapability);
2240 }
2241
2242 /* ---------------------------------------------------------------------------
2243  * initScheduler()
2244  *
2245  * Initialise the scheduler.  This resets all the queues - if the
2246  * queues contained any threads, they'll be garbage collected at the
2247  * next pass.
2248  *
2249  * ------------------------------------------------------------------------ */
2250
2251 #ifdef SMP
2252 static void
2253 term_handler(int sig STG_UNUSED)
2254 {
2255   stat_workerStop();
2256   ACQUIRE_LOCK(&term_mutex);
2257   await_death--;
2258   RELEASE_LOCK(&term_mutex);
2259   shutdownThread();
2260 }
2261 #endif
2262
2263 void 
2264 initScheduler(void)
2265 {
2266 #if defined(GRAN)
2267   nat i;
2268
2269   for (i=0; i<=MAX_PROC; i++) {
2270     run_queue_hds[i]      = END_TSO_QUEUE;
2271     run_queue_tls[i]      = END_TSO_QUEUE;
2272     blocked_queue_hds[i]  = END_TSO_QUEUE;
2273     blocked_queue_tls[i]  = END_TSO_QUEUE;
2274     ccalling_threadss[i]  = END_TSO_QUEUE;
2275     sleeping_queue        = END_TSO_QUEUE;
2276   }
2277 #else
2278   run_queue_hd      = END_TSO_QUEUE;
2279   run_queue_tl      = END_TSO_QUEUE;
2280   blocked_queue_hd  = END_TSO_QUEUE;
2281   blocked_queue_tl  = END_TSO_QUEUE;
2282   sleeping_queue    = END_TSO_QUEUE;
2283 #endif 
2284
2285   suspended_ccalling_threads  = END_TSO_QUEUE;
2286
2287   main_threads = NULL;
2288   all_threads  = END_TSO_QUEUE;
2289
2290   context_switch = 0;
2291   interrupted    = 0;
2292
2293   RtsFlags.ConcFlags.ctxtSwitchTicks =
2294       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2295       
2296 #if defined(RTS_SUPPORTS_THREADS)
2297   /* Initialise the mutex and condition variables used by
2298    * the scheduler. */
2299   initMutex(&sched_mutex);
2300   initMutex(&term_mutex);
2301   initMutex(&thread_id_mutex);
2302
2303   initCondition(&thread_ready_cond);
2304 #endif
2305   
2306 #if defined(SMP)
2307   initCondition(&gc_pending_cond);
2308 #endif
2309
2310 #if defined(RTS_SUPPORTS_THREADS)
2311   ACQUIRE_LOCK(&sched_mutex);
2312 #endif
2313
2314   /* Install the SIGHUP handler */
2315 #if defined(SMP)
2316   {
2317     struct sigaction action,oact;
2318
2319     action.sa_handler = term_handler;
2320     sigemptyset(&action.sa_mask);
2321     action.sa_flags = 0;
2322     if (sigaction(SIGTERM, &action, &oact) != 0) {
2323       barf("can't install TERM handler");
2324     }
2325   }
2326 #endif
2327
2328   /* A capability holds the state a native thread needs in
2329    * order to execute STG code. At least one capability is
2330    * floating around (only SMP builds have more than one).
2331    */
2332   initCapabilities();
2333   
2334 #if defined(RTS_SUPPORTS_THREADS)
2335     /* start our haskell execution tasks */
2336 # if defined(SMP)
2337     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2338 # else
2339     startTaskManager(0,taskStart);
2340 # endif
2341 #endif
2342
2343 #if /* defined(SMP) ||*/ defined(PAR)
2344   initSparkPools();
2345 #endif
2346
2347 #if defined(RTS_SUPPORTS_THREADS)
2348   RELEASE_LOCK(&sched_mutex);
2349 #endif
2350
2351 }
2352
2353 void
2354 exitScheduler( void )
2355 {
2356 #if defined(RTS_SUPPORTS_THREADS)
2357   stopTaskManager();
2358 #endif
2359   shutting_down_scheduler = rtsTrue;
2360 }
2361
2362 /* -----------------------------------------------------------------------------
2363    Managing the per-task allocation areas.
2364    
2365    Each capability comes with an allocation area.  These are
2366    fixed-length block lists into which allocation can be done.
2367
2368    ToDo: no support for two-space collection at the moment???
2369    -------------------------------------------------------------------------- */
2370
2371 /* -----------------------------------------------------------------------------
2372  * waitThread is the external interface for running a new computation
2373  * and waiting for the result.
2374  *
2375  * In the non-SMP case, we create a new main thread, push it on the 
2376  * main-thread stack, and invoke the scheduler to run it.  The
2377  * scheduler will return when the top main thread on the stack has
2378  * completed or died, and fill in the necessary fields of the
2379  * main_thread structure.
2380  *
2381  * In the SMP case, we create a main thread as before, but we then
2382  * create a new condition variable and sleep on it.  When our new
2383  * main thread has completed, we'll be woken up and the status/result
2384  * will be in the main_thread struct.
2385  * -------------------------------------------------------------------------- */
2386
2387 int 
2388 howManyThreadsAvail ( void )
2389 {
2390    int i = 0;
2391    StgTSO* q;
2392    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2393       i++;
2394    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2395       i++;
2396    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2397       i++;
2398    return i;
2399 }
2400
2401 void
2402 finishAllThreads ( void )
2403 {
2404    do {
2405       while (run_queue_hd != END_TSO_QUEUE) {
2406          waitThread ( run_queue_hd, NULL, NULL );
2407       }
2408       while (blocked_queue_hd != END_TSO_QUEUE) {
2409          waitThread ( blocked_queue_hd, NULL, NULL );
2410       }
2411       while (sleeping_queue != END_TSO_QUEUE) {
2412          waitThread ( blocked_queue_hd, NULL, NULL );
2413       }
2414    } while 
2415       (blocked_queue_hd != END_TSO_QUEUE || 
2416        run_queue_hd     != END_TSO_QUEUE ||
2417        sleeping_queue   != END_TSO_QUEUE);
2418 }
2419
2420 SchedulerStatus
2421 waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
2422
2423   StgMainThread *m;
2424   SchedulerStatus stat;
2425
2426   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2427   m->tso = tso;
2428   m->ret = ret;
2429   m->stat = NoStatus;
2430 #if defined(RTS_SUPPORTS_THREADS)
2431 #if defined(THREADED_RTS)
2432   initCondition(&m->bound_thread_cond);
2433 #else
2434   initCondition(&m->wakeup);
2435 #endif
2436 #endif
2437
2438   /* see scheduleWaitThread() comment */
2439   ACQUIRE_LOCK(&sched_mutex);
2440   m->link = main_threads;
2441   main_threads = m;
2442
2443   IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
2444
2445   stat = waitThread_(m,initialCapability);
2446   
2447   RELEASE_LOCK(&sched_mutex);
2448   return stat;
2449 }
2450
2451 static
2452 SchedulerStatus
2453 waitThread_(StgMainThread* m, Capability *initialCapability)
2454 {
2455   SchedulerStatus stat;
2456
2457   // Precondition: sched_mutex must be held.
2458   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2459
2460 #if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS)
2461   {     // FIXME: does this still make sense?
2462         // It's not for the threaded rts => SMP only
2463     do {
2464       waitCondition(&m->wakeup, &sched_mutex);
2465     } while (m->stat == NoStatus);
2466   }
2467 #elif defined(GRAN)
2468   /* GranSim specific init */
2469   CurrentTSO = m->tso;                // the TSO to run
2470   procStatus[MainProc] = Busy;        // status of main PE
2471   CurrentProc = MainProc;             // PE to run it on
2472
2473   RELEASE_LOCK(&sched_mutex);
2474   schedule(m,initialCapability);
2475 #else
2476   RELEASE_LOCK(&sched_mutex);
2477   schedule(m,initialCapability);
2478   ACQUIRE_LOCK(&sched_mutex);
2479   ASSERT(m->stat != NoStatus);
2480 #endif
2481
2482   stat = m->stat;
2483
2484 #if defined(RTS_SUPPORTS_THREADS)
2485 #if defined(THREADED_RTS)
2486   closeCondition(&m->bound_thread_cond);
2487 #else
2488   closeCondition(&m->wakeup);
2489 #endif
2490 #endif
2491
2492   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2493                               m->tso->id));
2494   stgFree(m);
2495
2496   // Postcondition: sched_mutex still held
2497   return stat;
2498 }
2499
2500 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2501 //@subsection Run queue code 
2502
2503 #if 0
2504 /* 
2505    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2506        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2507        implicit global variable that has to be correct when calling these
2508        fcts -- HWL 
2509 */
2510
2511 /* Put the new thread on the head of the runnable queue.
2512  * The caller of createThread better push an appropriate closure
2513  * on this thread's stack before the scheduler is invoked.
2514  */
2515 static /* inline */ void
2516 add_to_run_queue(tso)
2517 StgTSO* tso; 
2518 {
2519   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2520   tso->link = run_queue_hd;
2521   run_queue_hd = tso;
2522   if (run_queue_tl == END_TSO_QUEUE) {
2523     run_queue_tl = tso;
2524   }
2525 }
2526
2527 /* Put the new thread at the end of the runnable queue. */
2528 static /* inline */ void
2529 push_on_run_queue(tso)
2530 StgTSO* tso; 
2531 {
2532   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2533   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2534   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2535   if (run_queue_hd == END_TSO_QUEUE) {
2536     run_queue_hd = tso;
2537   } else {
2538     run_queue_tl->link = tso;
2539   }
2540   run_queue_tl = tso;
2541 }
2542
2543 /* 
2544    Should be inlined because it's used very often in schedule.  The tso
2545    argument is actually only needed in GranSim, where we want to have the
2546    possibility to schedule *any* TSO on the run queue, irrespective of the
2547    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2548    the run queue and dequeue the tso, adjusting the links in the queue. 
2549 */
2550 //@cindex take_off_run_queue
2551 static /* inline */ StgTSO*
2552 take_off_run_queue(StgTSO *tso) {
2553   StgTSO *t, *prev;
2554
2555   /* 
2556      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2557
2558      if tso is specified, unlink that tso from the run_queue (doesn't have
2559      to be at the beginning of the queue); GranSim only 
2560   */
2561   if (tso!=END_TSO_QUEUE) {
2562     /* find tso in queue */
2563     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2564          t!=END_TSO_QUEUE && t!=tso;
2565          prev=t, t=t->link) 
2566       /* nothing */ ;
2567     ASSERT(t==tso);
2568     /* now actually dequeue the tso */
2569     if (prev!=END_TSO_QUEUE) {
2570       ASSERT(run_queue_hd!=t);
2571       prev->link = t->link;
2572     } else {
2573       /* t is at beginning of thread queue */
2574       ASSERT(run_queue_hd==t);
2575       run_queue_hd = t->link;
2576     }
2577     /* t is at end of thread queue */
2578     if (t->link==END_TSO_QUEUE) {
2579       ASSERT(t==run_queue_tl);
2580       run_queue_tl = prev;
2581     } else {
2582       ASSERT(run_queue_tl!=t);
2583     }
2584     t->link = END_TSO_QUEUE;
2585   } else {
2586     /* take tso from the beginning of the queue; std concurrent code */
2587     t = run_queue_hd;
2588     if (t != END_TSO_QUEUE) {
2589       run_queue_hd = t->link;
2590       t->link = END_TSO_QUEUE;
2591       if (run_queue_hd == END_TSO_QUEUE) {
2592         run_queue_tl = END_TSO_QUEUE;
2593       }
2594     }
2595   }
2596   return t;
2597 }
2598
2599 #endif /* 0 */
2600
2601 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2602 //@subsection Garbage Collextion Routines
2603
2604 /* ---------------------------------------------------------------------------
2605    Where are the roots that we know about?
2606
2607         - all the threads on the runnable queue
2608         - all the threads on the blocked queue
2609         - all the threads on the sleeping queue
2610         - all the thread currently executing a _ccall_GC
2611         - all the "main threads"
2612      
2613    ------------------------------------------------------------------------ */
2614
2615 /* This has to be protected either by the scheduler monitor, or by the
2616         garbage collection monitor (probably the latter).
2617         KH @ 25/10/99
2618 */
2619
2620 void
2621 GetRoots(evac_fn evac)
2622 {
2623 #if defined(GRAN)
2624   {
2625     nat i;
2626     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2627       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2628           evac((StgClosure **)&run_queue_hds[i]);
2629       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2630           evac((StgClosure **)&run_queue_tls[i]);
2631       
2632       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2633           evac((StgClosure **)&blocked_queue_hds[i]);
2634       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2635           evac((StgClosure **)&blocked_queue_tls[i]);
2636       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2637           evac((StgClosure **)&ccalling_threads[i]);
2638     }
2639   }
2640
2641   markEventQueue();
2642
2643 #else /* !GRAN */
2644   if (run_queue_hd != END_TSO_QUEUE) {
2645       ASSERT(run_queue_tl != END_TSO_QUEUE);
2646       evac((StgClosure **)&run_queue_hd);
2647       evac((StgClosure **)&run_queue_tl);
2648   }
2649   
2650   if (blocked_queue_hd != END_TSO_QUEUE) {
2651       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2652       evac((StgClosure **)&blocked_queue_hd);
2653       evac((StgClosure **)&blocked_queue_tl);
2654   }
2655   
2656   if (sleeping_queue != END_TSO_QUEUE) {
2657       evac((StgClosure **)&sleeping_queue);
2658   }
2659 #endif 
2660
2661   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2662       evac((StgClosure **)&suspended_ccalling_threads);
2663   }
2664
2665 #if defined(PAR) || defined(GRAN)
2666   markSparkQueue(evac);
2667 #endif
2668
2669 #if defined(RTS_USER_SIGNALS)
2670   // mark the signal handlers (signals should be already blocked)
2671   markSignalHandlers(evac);
2672 #endif
2673
2674   // main threads which have completed need to be retained until they
2675   // are dealt with in the main scheduler loop.  They won't be
2676   // retained any other way: the GC will drop them from the
2677   // all_threads list, so we have to be careful to treat them as roots
2678   // here.
2679   { 
2680       StgMainThread *m;
2681       for (m = main_threads; m != NULL; m = m->link) {
2682           switch (m->tso->what_next) {
2683           case ThreadComplete:
2684           case ThreadKilled:
2685               evac((StgClosure **)&m->tso);
2686               break;
2687           default:
2688               break;
2689           }
2690       }
2691   }
2692 }
2693
2694 /* -----------------------------------------------------------------------------
2695    performGC
2696
2697    This is the interface to the garbage collector from Haskell land.
2698    We provide this so that external C code can allocate and garbage
2699    collect when called from Haskell via _ccall_GC.
2700
2701    It might be useful to provide an interface whereby the programmer
2702    can specify more roots (ToDo).
2703    
2704    This needs to be protected by the GC condition variable above.  KH.
2705    -------------------------------------------------------------------------- */
2706
2707 static void (*extra_roots)(evac_fn);
2708
2709 void
2710 performGC(void)
2711 {
2712   /* Obligated to hold this lock upon entry */
2713   ACQUIRE_LOCK(&sched_mutex);
2714   GarbageCollect(GetRoots,rtsFalse);
2715   RELEASE_LOCK(&sched_mutex);
2716 }
2717
2718 void
2719 performMajorGC(void)
2720 {
2721   ACQUIRE_LOCK(&sched_mutex);
2722   GarbageCollect(GetRoots,rtsTrue);
2723   RELEASE_LOCK(&sched_mutex);
2724 }
2725
2726 static void
2727 AllRoots(evac_fn evac)
2728 {
2729     GetRoots(evac);             // the scheduler's roots
2730     extra_roots(evac);          // the user's roots
2731 }
2732
2733 void
2734 performGCWithRoots(void (*get_roots)(evac_fn))
2735 {
2736   ACQUIRE_LOCK(&sched_mutex);
2737   extra_roots = get_roots;
2738   GarbageCollect(AllRoots,rtsFalse);
2739   RELEASE_LOCK(&sched_mutex);
2740 }
2741
2742 /* -----------------------------------------------------------------------------
2743    Stack overflow
2744
2745    If the thread has reached its maximum stack size, then raise the
2746    StackOverflow exception in the offending thread.  Otherwise
2747    relocate the TSO into a larger chunk of memory and adjust its stack
2748    size appropriately.
2749    -------------------------------------------------------------------------- */
2750
2751 static StgTSO *
2752 threadStackOverflow(StgTSO *tso)
2753 {
2754   nat new_stack_size, new_tso_size, stack_words;
2755   StgPtr new_sp;
2756   StgTSO *dest;
2757
2758   IF_DEBUG(sanity,checkTSO(tso));
2759   if (tso->stack_size >= tso->max_stack_size) {
2760
2761     IF_DEBUG(gc,
2762              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2763                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2764              /* If we're debugging, just print out the top of the stack */
2765              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2766                                               tso->sp+64)));
2767
2768     /* Send this thread the StackOverflow exception */
2769     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2770     return tso;
2771   }
2772
2773   /* Try to double the current stack size.  If that takes us over the
2774    * maximum stack size for this thread, then use the maximum instead.
2775    * Finally round up so the TSO ends up as a whole number of blocks.
2776    */
2777   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2778   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2779                                        TSO_STRUCT_SIZE)/sizeof(W_);
2780   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2781   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2782
2783   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2784
2785   dest = (StgTSO *)allocate(new_tso_size);
2786   TICK_ALLOC_TSO(new_stack_size,0);
2787
2788   /* copy the TSO block and the old stack into the new area */
2789   memcpy(dest,tso,TSO_STRUCT_SIZE);
2790   stack_words = tso->stack + tso->stack_size - tso->sp;
2791   new_sp = (P_)dest + new_tso_size - stack_words;
2792   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2793
2794   /* relocate the stack pointers... */
2795   dest->sp         = new_sp;
2796   dest->stack_size = new_stack_size;
2797         
2798   /* Mark the old TSO as relocated.  We have to check for relocated
2799    * TSOs in the garbage collector and any primops that deal with TSOs.
2800    *
2801    * It's important to set the sp value to just beyond the end
2802    * of the stack, so we don't attempt to scavenge any part of the
2803    * dead TSO's stack.
2804    */
2805   tso->what_next = ThreadRelocated;
2806   tso->link = dest;
2807   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2808   tso->why_blocked = NotBlocked;
2809   dest->mut_link = NULL;
2810
2811   IF_PAR_DEBUG(verbose,
2812                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2813                      tso->id, tso, tso->stack_size);
2814                /* If we're debugging, just print out the top of the stack */
2815                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2816                                                 tso->sp+64)));
2817   
2818   IF_DEBUG(sanity,checkTSO(tso));
2819 #if 0
2820   IF_DEBUG(scheduler,printTSO(dest));
2821 #endif
2822
2823   return dest;
2824 }
2825
2826 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2827 //@subsection Blocking Queue Routines
2828
2829 /* ---------------------------------------------------------------------------
2830    Wake up a queue that was blocked on some resource.
2831    ------------------------------------------------------------------------ */
2832
2833 #if defined(GRAN)
2834 static inline void
2835 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2836 {
2837 }
2838 #elif defined(PAR)
2839 static inline void
2840 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2841 {
2842   /* write RESUME events to log file and
2843      update blocked and fetch time (depending on type of the orig closure) */
2844   if (RtsFlags.ParFlags.ParStats.Full) {
2845     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2846                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2847                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2848     if (EMPTY_RUN_QUEUE())
2849       emitSchedule = rtsTrue;
2850
2851     switch (get_itbl(node)->type) {
2852         case FETCH_ME_BQ:
2853           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2854           break;
2855         case RBH:
2856         case FETCH_ME:
2857         case BLACKHOLE_BQ:
2858           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2859           break;
2860 #ifdef DIST
2861         case MVAR:
2862           break;
2863 #endif    
2864         default:
2865           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2866         }
2867       }
2868 }
2869 #endif
2870
2871 #if defined(GRAN)
2872 static StgBlockingQueueElement *
2873 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2874 {
2875     StgTSO *tso;
2876     PEs node_loc, tso_loc;
2877
2878     node_loc = where_is(node); // should be lifted out of loop
2879     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2880     tso_loc = where_is((StgClosure *)tso);
2881     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2882       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2883       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2884       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2885       // insertThread(tso, node_loc);
2886       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2887                 ResumeThread,
2888                 tso, node, (rtsSpark*)NULL);
2889       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2890       // len_local++;
2891       // len++;
2892     } else { // TSO is remote (actually should be FMBQ)
2893       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2894                                   RtsFlags.GranFlags.Costs.gunblocktime +
2895                                   RtsFlags.GranFlags.Costs.latency;
2896       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2897                 UnblockThread,
2898                 tso, node, (rtsSpark*)NULL);
2899       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2900       // len++;
2901     }
2902     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2903     IF_GRAN_DEBUG(bq,
2904                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2905                           (node_loc==tso_loc ? "Local" : "Global"), 
2906                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2907     tso->block_info.closure = NULL;
2908     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2909                              tso->id, tso));
2910 }
2911 #elif defined(PAR)
2912 static StgBlockingQueueElement *
2913 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2914 {
2915     StgBlockingQueueElement *next;
2916
2917     switch (get_itbl(bqe)->type) {
2918     case TSO:
2919       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2920       /* if it's a TSO just push it onto the run_queue */
2921       next = bqe->link;
2922       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2923       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2924       THREAD_RUNNABLE();
2925       unblockCount(bqe, node);
2926       /* reset blocking status after dumping event */
2927       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2928       break;
2929
2930     case BLOCKED_FETCH:
2931       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2932       next = bqe->link;
2933       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2934       PendingFetches = (StgBlockedFetch *)bqe;
2935       break;
2936
2937 # if defined(DEBUG)
2938       /* can ignore this case in a non-debugging setup; 
2939          see comments on RBHSave closures above */
2940     case CONSTR:
2941       /* check that the closure is an RBHSave closure */
2942       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2943              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2944              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2945       break;
2946
2947     default:
2948       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2949            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2950            (StgClosure *)bqe);
2951 # endif
2952     }
2953   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2954   return next;
2955 }
2956
2957 #else /* !GRAN && !PAR */
2958 static StgTSO *
2959 unblockOneLocked(StgTSO *tso)
2960 {
2961   StgTSO *next;
2962
2963   ASSERT(get_itbl(tso)->type == TSO);
2964   ASSERT(tso->why_blocked != NotBlocked);
2965   tso->why_blocked = NotBlocked;
2966   next = tso->link;
2967   PUSH_ON_RUN_QUEUE(tso);
2968   THREAD_RUNNABLE();
2969   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2970   return next;
2971 }
2972 #endif
2973
2974 #if defined(GRAN) || defined(PAR)
2975 inline StgBlockingQueueElement *
2976 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2977 {
2978   ACQUIRE_LOCK(&sched_mutex);
2979   bqe = unblockOneLocked(bqe, node);
2980   RELEASE_LOCK(&sched_mutex);
2981   return bqe;
2982 }
2983 #else
2984 inline StgTSO *
2985 unblockOne(StgTSO *tso)
2986 {
2987   ACQUIRE_LOCK(&sched_mutex);
2988   tso = unblockOneLocked(tso);
2989   RELEASE_LOCK(&sched_mutex);
2990   return tso;
2991 }
2992 #endif
2993
2994 #if defined(GRAN)
2995 void 
2996 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2997 {
2998   StgBlockingQueueElement *bqe;
2999   PEs node_loc;
3000   nat len = 0; 
3001
3002   IF_GRAN_DEBUG(bq, 
3003                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
3004                       node, CurrentProc, CurrentTime[CurrentProc], 
3005                       CurrentTSO->id, CurrentTSO));
3006
3007   node_loc = where_is(node);
3008
3009   ASSERT(q == END_BQ_QUEUE ||
3010          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3011          get_itbl(q)->type == CONSTR); // closure (type constructor)
3012   ASSERT(is_unique(node));
3013
3014   /* FAKE FETCH: magically copy the node to the tso's proc;
3015      no Fetch necessary because in reality the node should not have been 
3016      moved to the other PE in the first place
3017   */
3018   if (CurrentProc!=node_loc) {
3019     IF_GRAN_DEBUG(bq, 
3020                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
3021                         node, node_loc, CurrentProc, CurrentTSO->id, 
3022                         // CurrentTSO, where_is(CurrentTSO),
3023                         node->header.gran.procs));
3024     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3025     IF_GRAN_DEBUG(bq, 
3026                   belch("## new bitmask of node %p is %#x",
3027                         node, node->header.gran.procs));
3028     if (RtsFlags.GranFlags.GranSimStats.Global) {
3029       globalGranStats.tot_fake_fetches++;
3030     }
3031   }
3032
3033   bqe = q;
3034   // ToDo: check: ASSERT(CurrentProc==node_loc);
3035   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3036     //next = bqe->link;
3037     /* 
3038        bqe points to the current element in the queue
3039        next points to the next element in the queue
3040     */
3041     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3042     //tso_loc = where_is(tso);
3043     len++;
3044     bqe = unblockOneLocked(bqe, node);
3045   }
3046
3047   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3048      the closure to make room for the anchor of the BQ */
3049   if (bqe!=END_BQ_QUEUE) {
3050     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3051     /*
3052     ASSERT((info_ptr==&RBH_Save_0_info) ||
3053            (info_ptr==&RBH_Save_1_info) ||
3054            (info_ptr==&RBH_Save_2_info));
3055     */
3056     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3057     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3058     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3059
3060     IF_GRAN_DEBUG(bq,
3061                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
3062                         node, info_type(node)));
3063   }
3064
3065   /* statistics gathering */
3066   if (RtsFlags.GranFlags.GranSimStats.Global) {
3067     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3068     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3069     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3070     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3071   }
3072   IF_GRAN_DEBUG(bq,
3073                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
3074                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3075 }
3076 #elif defined(PAR)
3077 void 
3078 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3079 {
3080   StgBlockingQueueElement *bqe;
3081
3082   ACQUIRE_LOCK(&sched_mutex);
3083
3084   IF_PAR_DEBUG(verbose, 
3085                belch("##-_ AwBQ for node %p on [%x]: ",
3086                      node, mytid));
3087 #ifdef DIST  
3088   //RFP
3089   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3090     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
3091     return;
3092   }
3093 #endif
3094   
3095   ASSERT(q == END_BQ_QUEUE ||
3096          get_itbl(q)->type == TSO ||           
3097          get_itbl(q)->type == BLOCKED_FETCH || 
3098          get_itbl(q)->type == CONSTR); 
3099
3100   bqe = q;
3101   while (get_itbl(bqe)->type==TSO || 
3102          get_itbl(bqe)->type==BLOCKED_FETCH) {
3103     bqe = unblockOneLocked(bqe, node);
3104   }
3105   RELEASE_LOCK(&sched_mutex);
3106 }
3107
3108 #else   /* !GRAN && !PAR */
3109
3110 #ifdef RTS_SUPPORTS_THREADS
3111 void
3112 awakenBlockedQueueNoLock(StgTSO *tso)
3113 {
3114   while (tso != END_TSO_QUEUE) {
3115     tso = unblockOneLocked(tso);
3116   }
3117 }
3118 #endif
3119
3120 void
3121 awakenBlockedQueue(StgTSO *tso)
3122 {
3123   ACQUIRE_LOCK(&sched_mutex);
3124   while (tso != END_TSO_QUEUE) {
3125     tso = unblockOneLocked(tso);
3126   }
3127   RELEASE_LOCK(&sched_mutex);
3128 }
3129 #endif
3130
3131 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
3132 //@subsection Exception Handling Routines
3133
3134 /* ---------------------------------------------------------------------------
3135    Interrupt execution
3136    - usually called inside a signal handler so it mustn't do anything fancy.   
3137    ------------------------------------------------------------------------ */
3138
3139 void
3140 interruptStgRts(void)
3141 {
3142     interrupted    = 1;
3143     context_switch = 1;
3144 #ifdef RTS_SUPPORTS_THREADS
3145     wakeBlockedWorkerThread();
3146 #endif
3147 }
3148
3149 /* -----------------------------------------------------------------------------
3150    Unblock a thread
3151
3152    This is for use when we raise an exception in another thread, which
3153    may be blocked.
3154    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3155    -------------------------------------------------------------------------- */
3156
3157 #if defined(GRAN) || defined(PAR)
3158 /*
3159   NB: only the type of the blocking queue is different in GranSim and GUM
3160       the operations on the queue-elements are the same
3161       long live polymorphism!
3162
3163   Locks: sched_mutex is held upon entry and exit.
3164
3165 */
3166 static void
3167 unblockThread(StgTSO *tso)
3168 {
3169   StgBlockingQueueElement *t, **last;
3170
3171   switch (tso->why_blocked) {
3172
3173   case NotBlocked:
3174     return;  /* not blocked */
3175
3176   case BlockedOnMVar:
3177     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3178     {
3179       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3180       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3181
3182       last = (StgBlockingQueueElement **)&mvar->head;
3183       for (t = (StgBlockingQueueElement *)mvar->head; 
3184            t != END_BQ_QUEUE; 
3185            last = &t->link, last_tso = t, t = t->link) {
3186         if (t == (StgBlockingQueueElement *)tso) {
3187           *last = (StgBlockingQueueElement *)tso->link;
3188           if (mvar->tail == tso) {
3189             mvar->tail = (StgTSO *)last_tso;
3190           }
3191           goto done;
3192         }
3193       }
3194       barf("unblockThread (MVAR): TSO not found");
3195     }
3196
3197   case BlockedOnBlackHole:
3198     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3199     {
3200       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3201
3202       last = &bq->blocking_queue;
3203       for (t = bq->blocking_queue; 
3204            t != END_BQ_QUEUE; 
3205            last = &t->link, t = t->link) {
3206         if (t == (StgBlockingQueueElement *)tso) {
3207           *last = (StgBlockingQueueElement *)tso->link;
3208           goto done;
3209         }
3210       }
3211       barf("unblockThread (BLACKHOLE): TSO not found");
3212     }
3213
3214   case BlockedOnException:
3215     {
3216       StgTSO *target  = tso->block_info.tso;
3217
3218       ASSERT(get_itbl(target)->type == TSO);
3219
3220       if (target->what_next == ThreadRelocated) {
3221           target = target->link;
3222           ASSERT(get_itbl(target)->type == TSO);
3223       }
3224
3225       ASSERT(target->blocked_exceptions != NULL);
3226
3227       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3228       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3229            t != END_BQ_QUEUE; 
3230            last = &t->link, t = t->link) {
3231         ASSERT(get_itbl(t)->type == TSO);
3232         if (t == (StgBlockingQueueElement *)tso) {
3233           *last = (StgBlockingQueueElement *)tso->link;
3234           goto done;
3235         }
3236       }
3237       barf("unblockThread (Exception): TSO not found");
3238     }
3239
3240   case BlockedOnRead:
3241   case BlockedOnWrite:
3242 #if defined(mingw32_TARGET_OS)
3243   case BlockedOnDoProc:
3244 #endif
3245     {
3246       /* take TSO off blocked_queue */
3247       StgBlockingQueueElement *prev = NULL;
3248       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3249            prev = t, t = t->link) {
3250         if (t == (StgBlockingQueueElement *)tso) {
3251           if (prev == NULL) {
3252             blocked_queue_hd = (StgTSO *)t->link;
3253             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3254               blocked_queue_tl = END_TSO_QUEUE;
3255             }
3256           } else {
3257             prev->link = t->link;
3258             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3259               blocked_queue_tl = (StgTSO *)prev;
3260             }
3261           }
3262           goto done;
3263         }
3264       }
3265       barf("unblockThread (I/O): TSO not found");
3266     }
3267
3268   case BlockedOnDelay:
3269     {
3270       /* take TSO off sleeping_queue */
3271       StgBlockingQueueElement *prev = NULL;
3272       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3273            prev = t, t = t->link) {
3274         if (t == (StgBlockingQueueElement *)tso) {
3275           if (prev == NULL) {
3276             sleeping_queue = (StgTSO *)t->link;
3277           } else {
3278             prev->link = t->link;
3279           }
3280           goto done;
3281         }
3282       }
3283       barf("unblockThread (delay): TSO not found");
3284     }
3285
3286   default:
3287     barf("unblockThread");
3288   }
3289
3290  done:
3291   tso->link = END_TSO_QUEUE;
3292   tso->why_blocked = NotBlocked;
3293   tso->block_info.closure = NULL;
3294   PUSH_ON_RUN_QUEUE(tso);
3295 }
3296 #else
3297 static void
3298 unblockThread(StgTSO *tso)
3299 {
3300   StgTSO *t, **last;
3301   
3302   /* To avoid locking unnecessarily. */
3303   if (tso->why_blocked == NotBlocked) {
3304     return;
3305   }
3306
3307   switch (tso->why_blocked) {
3308
3309   case BlockedOnMVar:
3310     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3311     {
3312       StgTSO *last_tso = END_TSO_QUEUE;
3313       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3314
3315       last = &mvar->head;
3316       for (t = mvar->head; t != END_TSO_QUEUE; 
3317            last = &t->link, last_tso = t, t = t->link) {
3318         if (t == tso) {
3319           *last = tso->link;
3320           if (mvar->tail == tso) {
3321             mvar->tail = last_tso;
3322           }
3323           goto done;
3324         }
3325       }
3326       barf("unblockThread (MVAR): TSO not found");
3327     }
3328
3329   case BlockedOnBlackHole:
3330     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3331     {
3332       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3333
3334       last = &bq->blocking_queue;
3335       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3336            last = &t->link, t = t->link) {
3337         if (t == tso) {
3338           *last = tso->link;
3339           goto done;
3340         }
3341       }
3342       barf("unblockThread (BLACKHOLE): TSO not found");
3343     }
3344
3345   case BlockedOnException:
3346     {
3347       StgTSO *target  = tso->block_info.tso;
3348
3349       ASSERT(get_itbl(target)->type == TSO);
3350
3351       while (target->what_next == ThreadRelocated) {
3352           target = target->link;
3353           ASSERT(get_itbl(target)->type == TSO);
3354       }
3355       
3356       ASSERT(target->blocked_exceptions != NULL);
3357
3358       last = &target->blocked_exceptions;
3359       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3360            last = &t->link, t = t->link) {
3361         ASSERT(get_itbl(t)->type == TSO);
3362         if (t == tso) {
3363           *last = tso->link;
3364           goto done;
3365         }
3366       }
3367       barf("unblockThread (Exception): TSO not found");
3368     }
3369
3370   case BlockedOnRead:
3371   case BlockedOnWrite:
3372 #if defined(mingw32_TARGET_OS)
3373   case BlockedOnDoProc:
3374 #endif
3375     {
3376       StgTSO *prev = NULL;
3377       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3378            prev = t, t = t->link) {
3379         if (t == tso) {
3380           if (prev == NULL) {
3381             blocked_queue_hd = t->link;
3382             if (blocked_queue_tl == t) {
3383               blocked_queue_tl = END_TSO_QUEUE;
3384             }
3385           } else {
3386             prev->link = t->link;
3387             if (blocked_queue_tl == t) {
3388               blocked_queue_tl = prev;
3389             }
3390           }
3391           goto done;
3392         }
3393       }
3394       barf("unblockThread (I/O): TSO not found");
3395     }
3396
3397   case BlockedOnDelay:
3398     {
3399       StgTSO *prev = NULL;
3400       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3401            prev = t, t = t->link) {
3402         if (t == tso) {
3403           if (prev == NULL) {
3404             sleeping_queue = t->link;
3405           } else {
3406             prev->link = t->link;
3407           }
3408           goto done;
3409         }
3410       }
3411       barf("unblockThread (delay): TSO not found");
3412     }
3413
3414   default:
3415     barf("unblockThread");
3416   }
3417
3418  done:
3419   tso->link = END_TSO_QUEUE;
3420   tso->why_blocked = NotBlocked;
3421   tso->block_info.closure = NULL;
3422   PUSH_ON_RUN_QUEUE(tso);
3423 }
3424 #endif
3425
3426 /* -----------------------------------------------------------------------------
3427  * raiseAsync()
3428  *
3429  * The following function implements the magic for raising an
3430  * asynchronous exception in an existing thread.
3431  *
3432  * We first remove the thread from any queue on which it might be
3433  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3434  *
3435  * We strip the stack down to the innermost CATCH_FRAME, building
3436  * thunks in the heap for all the active computations, so they can 
3437  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3438  * an application of the handler to the exception, and push it on
3439  * the top of the stack.
3440  * 
3441  * How exactly do we save all the active computations?  We create an
3442  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3443  * AP_STACKs pushes everything from the corresponding update frame
3444  * upwards onto the stack.  (Actually, it pushes everything up to the
3445  * next update frame plus a pointer to the next AP_STACK object.
3446  * Entering the next AP_STACK object pushes more onto the stack until we
3447  * reach the last AP_STACK object - at which point the stack should look
3448  * exactly as it did when we killed the TSO and we can continue
3449  * execution by entering the closure on top of the stack.
3450  *
3451  * We can also kill a thread entirely - this happens if either (a) the 
3452  * exception passed to raiseAsync is NULL, or (b) there's no
3453  * CATCH_FRAME on the stack.  In either case, we strip the entire
3454  * stack and replace the thread with a zombie.
3455  *
3456  * Locks: sched_mutex held upon entry nor exit.
3457  *
3458  * -------------------------------------------------------------------------- */
3459  
3460 void 
3461 deleteThread(StgTSO *tso)
3462 {
3463   raiseAsync(tso,NULL);
3464 }
3465
3466 static void 
3467 deleteThreadImmediately(StgTSO *tso)
3468 { // for forkProcess only:
3469   // delete thread without giving it a chance to catch the KillThread exception
3470
3471   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3472       return;
3473   }
3474 #if defined(RTS_SUPPORTS_THREADS)
3475   if (tso->why_blocked != BlockedOnCCall
3476       && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
3477 #endif
3478     unblockThread(tso);
3479   tso->what_next = ThreadKilled;
3480 }
3481
3482 void
3483 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3484 {
3485   /* When raising async exs from contexts where sched_mutex isn't held;
3486      use raiseAsyncWithLock(). */
3487   ACQUIRE_LOCK(&sched_mutex);
3488   raiseAsync(tso,exception);
3489   RELEASE_LOCK(&sched_mutex);
3490 }
3491
3492 void
3493 raiseAsync(StgTSO *tso, StgClosure *exception)
3494 {
3495     StgRetInfoTable *info;
3496     StgPtr sp;
3497   
3498     // Thread already dead?
3499     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3500         return;
3501     }
3502
3503     IF_DEBUG(scheduler, 
3504              sched_belch("raising exception in thread %ld.", tso->id));
3505     
3506     // Remove it from any blocking queues
3507     unblockThread(tso);
3508
3509     sp = tso->sp;
3510     
3511     // The stack freezing code assumes there's a closure pointer on
3512     // the top of the stack, so we have to arrange that this is the case...
3513     //
3514     if (sp[0] == (W_)&stg_enter_info) {
3515         sp++;
3516     } else {
3517         sp--;
3518         sp[0] = (W_)&stg_dummy_ret_closure;
3519     }
3520
3521     while (1) {
3522         nat i;
3523
3524         // 1. Let the top of the stack be the "current closure"
3525         //
3526         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3527         // CATCH_FRAME.
3528         //
3529         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3530         // current closure applied to the chunk of stack up to (but not
3531         // including) the update frame.  This closure becomes the "current
3532         // closure".  Go back to step 2.
3533         //
3534         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3535         // top of the stack applied to the exception.
3536         // 
3537         // 5. If it's a STOP_FRAME, then kill the thread.
3538         
3539         StgPtr frame;
3540         
3541         frame = sp + 1;
3542         info = get_ret_itbl((StgClosure *)frame);
3543         
3544         while (info->i.type != UPDATE_FRAME
3545                && (info->i.type != CATCH_FRAME || exception == NULL)
3546                && info->i.type != STOP_FRAME) {
3547             frame += stack_frame_sizeW((StgClosure *)frame);
3548             info = get_ret_itbl((StgClosure *)frame);
3549         }
3550         
3551         switch (info->i.type) {
3552             
3553         case CATCH_FRAME:
3554             // If we find a CATCH_FRAME, and we've got an exception to raise,
3555             // then build the THUNK raise(exception), and leave it on
3556             // top of the CATCH_FRAME ready to enter.
3557             //
3558         {
3559 #ifdef PROFILING
3560             StgCatchFrame *cf = (StgCatchFrame *)frame;
3561 #endif
3562             StgClosure *raise;
3563             
3564             // we've got an exception to raise, so let's pass it to the
3565             // handler in this frame.
3566             //
3567             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3568             TICK_ALLOC_SE_THK(1,0);
3569             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3570             raise->payload[0] = exception;
3571             
3572             // throw away the stack from Sp up to the CATCH_FRAME.
3573             //
3574             sp = frame - 1;
3575             
3576             /* Ensure that async excpetions are blocked now, so we don't get
3577              * a surprise exception before we get around to executing the
3578              * handler.
3579              */
3580             if (tso->blocked_exceptions == NULL) {
3581                 tso->blocked_exceptions = END_TSO_QUEUE;
3582             }
3583             
3584             /* Put the newly-built THUNK on top of the stack, ready to execute
3585              * when the thread restarts.
3586              */
3587             sp[0] = (W_)raise;
3588             sp[-1] = (W_)&stg_enter_info;
3589             tso->sp = sp-1;
3590             tso->what_next = ThreadRunGHC;
3591             IF_DEBUG(sanity, checkTSO(tso));
3592             return;
3593         }
3594         
3595         case UPDATE_FRAME:
3596         {
3597             StgAP_STACK * ap;
3598             nat words;
3599             
3600             // First build an AP_STACK consisting of the stack chunk above the
3601             // current update frame, with the top word on the stack as the
3602             // fun field.
3603             //
3604             words = frame - sp - 1;
3605             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3606             
3607             ap->size = words;
3608             ap->fun  = (StgClosure *)sp[0];
3609             sp++;
3610             for(i=0; i < (nat)words; ++i) {
3611                 ap->payload[i] = (StgClosure *)*sp++;
3612             }
3613             
3614             SET_HDR(ap,&stg_AP_STACK_info,
3615                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3616             TICK_ALLOC_UP_THK(words+1,0);
3617             
3618             IF_DEBUG(scheduler,
3619                      fprintf(stderr,  "scheduler: Updating ");
3620                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3621                      fprintf(stderr,  " with ");
3622                      printObj((StgClosure *)ap);
3623                 );
3624
3625             // Replace the updatee with an indirection - happily
3626             // this will also wake up any threads currently
3627             // waiting on the result.
3628             //
3629             // Warning: if we're in a loop, more than one update frame on
3630             // the stack may point to the same object.  Be careful not to
3631             // overwrite an IND_OLDGEN in this case, because we'll screw
3632             // up the mutable lists.  To be on the safe side, don't
3633             // overwrite any kind of indirection at all.  See also
3634             // threadSqueezeStack in GC.c, where we have to make a similar
3635             // check.
3636             //
3637             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3638                 // revert the black hole
3639                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3640             }
3641             sp += sizeofW(StgUpdateFrame) - 1;
3642             sp[0] = (W_)ap; // push onto stack
3643             break;
3644         }
3645         
3646         case STOP_FRAME:
3647             // We've stripped the entire stack, the thread is now dead.
3648             sp += sizeofW(StgStopFrame);
3649             tso->what_next = ThreadKilled;
3650             tso->sp = sp;
3651             return;
3652             
3653         default:
3654             barf("raiseAsync");
3655         }
3656     }
3657     barf("raiseAsync");
3658 }
3659
3660 /* -----------------------------------------------------------------------------
3661    resurrectThreads is called after garbage collection on the list of
3662    threads found to be garbage.  Each of these threads will be woken
3663    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3664    on an MVar, or NonTermination if the thread was blocked on a Black
3665    Hole.
3666
3667    Locks: sched_mutex isn't held upon entry nor exit.
3668    -------------------------------------------------------------------------- */
3669
3670 void
3671 resurrectThreads( StgTSO *threads )
3672 {
3673   StgTSO *tso, *next;
3674
3675   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3676     next = tso->global_link;
3677     tso->global_link = all_threads;
3678     all_threads = tso;
3679     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3680
3681     switch (tso->why_blocked) {
3682     case BlockedOnMVar:
3683     case BlockedOnException:
3684       /* Called by GC - sched_mutex lock is currently held. */
3685       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3686       break;
3687     case BlockedOnBlackHole:
3688       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3689       break;
3690     case NotBlocked:
3691       /* This might happen if the thread was blocked on a black hole
3692        * belonging to a thread that we've just woken up (raiseAsync
3693        * can wake up threads, remember...).
3694        */
3695       continue;
3696     default:
3697       barf("resurrectThreads: thread blocked in a strange way");
3698     }
3699   }
3700 }
3701
3702 /* -----------------------------------------------------------------------------
3703  * Blackhole detection: if we reach a deadlock, test whether any
3704  * threads are blocked on themselves.  Any threads which are found to
3705  * be self-blocked get sent a NonTermination exception.
3706  *
3707  * This is only done in a deadlock situation in order to avoid
3708  * performance overhead in the normal case.
3709  *
3710  * Locks: sched_mutex is held upon entry and exit.
3711  * -------------------------------------------------------------------------- */
3712
3713 static void
3714 detectBlackHoles( void )
3715 {
3716     StgTSO *tso = all_threads;
3717     StgClosure *frame;
3718     StgClosure *blocked_on;
3719     StgRetInfoTable *info;
3720
3721     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3722
3723         while (tso->what_next == ThreadRelocated) {
3724             tso = tso->link;
3725             ASSERT(get_itbl(tso)->type == TSO);
3726         }
3727       
3728         if (tso->why_blocked != BlockedOnBlackHole) {
3729             continue;
3730         }
3731         blocked_on = tso->block_info.closure;
3732
3733         frame = (StgClosure *)tso->sp;
3734
3735         while(1) {
3736             info = get_ret_itbl(frame);
3737             switch (info->i.type) {
3738             case UPDATE_FRAME:
3739                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3740                     /* We are blocking on one of our own computations, so
3741                      * send this thread the NonTermination exception.  
3742                      */
3743                     IF_DEBUG(scheduler, 
3744                              sched_belch("thread %d is blocked on itself", tso->id));
3745                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3746                     goto done;
3747                 }
3748                 
3749                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3750                 continue;
3751
3752             case STOP_FRAME:
3753                 goto done;
3754
3755                 // normal stack frames; do nothing except advance the pointer
3756             default:
3757                 (StgPtr)frame += stack_frame_sizeW(frame);
3758             }
3759         }   
3760         done: ;
3761     }
3762 }
3763
3764 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3765 //@subsection Debugging Routines
3766
3767 /* -----------------------------------------------------------------------------
3768  * Debugging: why is a thread blocked
3769  * [Also provides useful information when debugging threaded programs
3770  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3771    -------------------------------------------------------------------------- */
3772
3773 static
3774 void
3775 printThreadBlockage(StgTSO *tso)
3776 {
3777   switch (tso->why_blocked) {
3778   case BlockedOnRead:
3779     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3780     break;
3781   case BlockedOnWrite:
3782     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3783     break;
3784 #if defined(mingw32_TARGET_OS)
3785     case BlockedOnDoProc:
3786     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3787     break;
3788 #endif
3789   case BlockedOnDelay:
3790     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3791     break;
3792   case BlockedOnMVar:
3793     fprintf(stderr,"is blocked on an MVar");
3794     break;
3795   case BlockedOnException:
3796     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3797             tso->block_info.tso->id);
3798     break;
3799   case BlockedOnBlackHole:
3800     fprintf(stderr,"is blocked on a black hole");
3801     break;
3802   case NotBlocked:
3803     fprintf(stderr,"is not blocked");
3804     break;
3805 #if defined(PAR)
3806   case BlockedOnGA:
3807     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3808             tso->block_info.closure, info_type(tso->block_info.closure));
3809     break;
3810   case BlockedOnGA_NoSend:
3811     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3812             tso->block_info.closure, info_type(tso->block_info.closure));
3813     break;
3814 #endif
3815 #if defined(RTS_SUPPORTS_THREADS)
3816   case BlockedOnCCall:
3817     fprintf(stderr,"is blocked on an external call");
3818     break;
3819   case BlockedOnCCall_NoUnblockExc:
3820     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3821     break;
3822 #endif
3823   default:
3824     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3825          tso->why_blocked, tso->id, tso);
3826   }
3827 }
3828
3829 static
3830 void
3831 printThreadStatus(StgTSO *tso)
3832 {
3833   switch (tso->what_next) {
3834   case ThreadKilled:
3835     fprintf(stderr,"has been killed");
3836     break;
3837   case ThreadComplete:
3838     fprintf(stderr,"has completed");
3839     break;
3840   default:
3841     printThreadBlockage(tso);
3842   }
3843 }
3844
3845 void
3846 printAllThreads(void)
3847 {
3848   StgTSO *t;
3849   void *label;
3850
3851 # if defined(GRAN)
3852   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3853   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3854                        time_string, rtsFalse/*no commas!*/);
3855
3856   fprintf(stderr, "all threads at [%s]:\n", time_string);
3857 # elif defined(PAR)
3858   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3859   ullong_format_string(CURRENT_TIME,
3860                        time_string, rtsFalse/*no commas!*/);
3861
3862   fprintf(stderr,"all threads at [%s]:\n", time_string);
3863 # else
3864   fprintf(stderr,"all threads:\n");
3865 # endif
3866
3867   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3868     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3869     label = lookupThreadLabel((StgWord)t);
3870     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3871     printThreadStatus(t);
3872     fprintf(stderr,"\n");
3873   }
3874 }
3875     
3876 #ifdef DEBUG
3877
3878 /* 
3879    Print a whole blocking queue attached to node (debugging only).
3880 */
3881 //@cindex print_bq
3882 # if defined(PAR)
3883 void 
3884 print_bq (StgClosure *node)
3885 {
3886   StgBlockingQueueElement *bqe;
3887   StgTSO *tso;
3888   rtsBool end;
3889
3890   fprintf(stderr,"## BQ of closure %p (%s): ",
3891           node, info_type(node));
3892
3893   /* should cover all closures that may have a blocking queue */
3894   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3895          get_itbl(node)->type == FETCH_ME_BQ ||
3896          get_itbl(node)->type == RBH ||
3897          get_itbl(node)->type == MVAR);
3898     
3899   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3900
3901   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3902 }
3903
3904 /* 
3905    Print a whole blocking queue starting with the element bqe.
3906 */
3907 void 
3908 print_bqe (StgBlockingQueueElement *bqe)
3909 {
3910   rtsBool end;
3911
3912   /* 
3913      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3914   */
3915   for (end = (bqe==END_BQ_QUEUE);
3916        !end; // iterate until bqe points to a CONSTR
3917        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3918        bqe = end ? END_BQ_QUEUE : bqe->link) {
3919     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3920     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3921     /* types of closures that may appear in a blocking queue */
3922     ASSERT(get_itbl(bqe)->type == TSO ||           
3923            get_itbl(bqe)->type == BLOCKED_FETCH || 
3924            get_itbl(bqe)->type == CONSTR); 
3925     /* only BQs of an RBH end with an RBH_Save closure */
3926     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3927
3928     switch (get_itbl(bqe)->type) {
3929     case TSO:
3930       fprintf(stderr," TSO %u (%x),",
3931               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3932       break;
3933     case BLOCKED_FETCH:
3934       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3935               ((StgBlockedFetch *)bqe)->node, 
3936               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3937               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3938               ((StgBlockedFetch *)bqe)->ga.weight);
3939       break;
3940     case CONSTR:
3941       fprintf(stderr," %s (IP %p),",
3942               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3943                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3944                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3945                "RBH_Save_?"), get_itbl(bqe));
3946       break;
3947     default:
3948       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3949            info_type((StgClosure *)bqe)); // , node, info_type(node));
3950       break;
3951     }
3952   } /* for */
3953   fputc('\n', stderr);
3954 }
3955 # elif defined(GRAN)
3956 void 
3957 print_bq (StgClosure *node)
3958 {
3959   StgBlockingQueueElement *bqe;
3960   PEs node_loc, tso_loc;
3961   rtsBool end;
3962
3963   /* should cover all closures that may have a blocking queue */
3964   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3965          get_itbl(node)->type == FETCH_ME_BQ ||
3966          get_itbl(node)->type == RBH);
3967     
3968   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3969   node_loc = where_is(node);
3970
3971   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3972           node, info_type(node), node_loc);
3973
3974   /* 
3975      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3976   */
3977   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3978        !end; // iterate until bqe points to a CONSTR
3979        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3980     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3981     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3982     /* types of closures that may appear in a blocking queue */
3983     ASSERT(get_itbl(bqe)->type == TSO ||           
3984            get_itbl(bqe)->type == CONSTR); 
3985     /* only BQs of an RBH end with an RBH_Save closure */
3986     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3987
3988     tso_loc = where_is((StgClosure *)bqe);
3989     switch (get_itbl(bqe)->type) {
3990     case TSO:
3991       fprintf(stderr," TSO %d (%p) on [PE %d],",
3992               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3993       break;
3994     case CONSTR:
3995       fprintf(stderr," %s (IP %p),",
3996               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3997                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3998                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3999                "RBH_Save_?"), get_itbl(bqe));
4000       break;
4001     default:
4002       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4003            info_type((StgClosure *)bqe), node, info_type(node));
4004       break;
4005     }
4006   } /* for */
4007   fputc('\n', stderr);
4008 }
4009 #else
4010 /* 
4011    Nice and easy: only TSOs on the blocking queue
4012 */
4013 void 
4014 print_bq (StgClosure *node)
4015 {
4016   StgTSO *tso;
4017
4018   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4019   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
4020        tso != END_TSO_QUEUE; 
4021        tso=tso->link) {
4022     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
4023     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
4024     fprintf(stderr," TSO %d (%p),", tso->id, tso);
4025   }
4026   fputc('\n', stderr);
4027 }
4028 # endif
4029
4030 #if defined(PAR)
4031 static nat
4032 run_queue_len(void)
4033 {
4034   nat i;
4035   StgTSO *tso;
4036
4037   for (i=0, tso=run_queue_hd; 
4038        tso != END_TSO_QUEUE;
4039        i++, tso=tso->link)
4040     /* nothing */
4041
4042   return i;
4043 }
4044 #endif
4045
4046 static void
4047 sched_belch(char *s, ...)
4048 {
4049   va_list ap;
4050   va_start(ap,s);
4051 #ifdef SMP
4052   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
4053 #elif defined(PAR)
4054   fprintf(stderr, "== ");
4055 #else
4056   fprintf(stderr, "scheduler: ");
4057 #endif
4058   vfprintf(stderr, s, ap);
4059   fprintf(stderr, "\n");
4060   va_end(ap);
4061 }
4062
4063 #endif /* DEBUG */
4064
4065
4066 //@node Index,  , Debugging Routines, Main scheduling code
4067 //@subsection Index
4068
4069 //@index
4070 //* StgMainThread::  @cindex\s-+StgMainThread
4071 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
4072 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
4073 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
4074 //* context_switch::  @cindex\s-+context_switch
4075 //* createThread::  @cindex\s-+createThread
4076 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
4077 //* initScheduler::  @cindex\s-+initScheduler
4078 //* interrupted::  @cindex\s-+interrupted
4079 //* next_thread_id::  @cindex\s-+next_thread_id
4080 //* print_bq::  @cindex\s-+print_bq
4081 //* run_queue_hd::  @cindex\s-+run_queue_hd
4082 //* run_queue_tl::  @cindex\s-+run_queue_tl
4083 //* sched_mutex::  @cindex\s-+sched_mutex
4084 //* schedule::  @cindex\s-+schedule
4085 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
4086 //* term_mutex::  @cindex\s-+term_mutex
4087 //@end index