[project @ 2003-01-25 15:54:48 by wolfgang]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.161 2003/01/25 15:54:49 wolfgang Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #define COMPILING_SCHEDULER
88 #include "Schedule.h"
89 #include "StgMiscClosures.h"
90 #include "Storage.h"
91 #include "Interpreter.h"
92 #include "Exception.h"
93 #include "Printer.h"
94 #include "Main.h"
95 #include "Signals.h"
96 #include "Sanity.h"
97 #include "Stats.h"
98 #include "Itimer.h"
99 #include "Prelude.h"
100 #include "ThreadLabels.h"
101 #ifdef PROFILING
102 #include "Proftimer.h"
103 #include "ProfHeap.h"
104 #endif
105 #if defined(GRAN) || defined(PAR)
106 # include "GranSimRts.h"
107 # include "GranSim.h"
108 # include "ParallelRts.h"
109 # include "Parallel.h"
110 # include "ParallelDebug.h"
111 # include "FetchMe.h"
112 # include "HLC.h"
113 #endif
114 #include "Sparks.h"
115 #include "Capability.h"
116 #include "OSThreads.h"
117 #include  "Task.h"
118
119 #ifdef HAVE_SYS_TYPES_H
120 #include <sys/types.h>
121 #endif
122 #ifdef HAVE_UNISTD_H
123 #include <unistd.h>
124 #endif
125
126 #include <string.h>
127 #include <stdlib.h>
128 #include <stdarg.h>
129
130 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
131 //@subsection Variables and Data structures
132
133 /* Main thread queue.
134  * Locks required: sched_mutex.
135  */
136 StgMainThread *main_threads = NULL;
137
138 #ifdef THREADED_RTS
139 // Pointer to the thread that executes main
140 // When this thread is finished, the program terminates
141 // by calling shutdownHaskellAndExit.
142 // It would be better to add a call to shutdownHaskellAndExit
143 // to the Main.main wrapper and to remove this hack.
144 StgMainThread *main_main_thread = NULL;
145 #endif
146
147 /* Thread queues.
148  * Locks required: sched_mutex.
149  */
150 #if defined(GRAN)
151
152 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
153 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
154
155 /* 
156    In GranSim we have a runnable and a blocked queue for each processor.
157    In order to minimise code changes new arrays run_queue_hds/tls
158    are created. run_queue_hd is then a short cut (macro) for
159    run_queue_hds[CurrentProc] (see GranSim.h).
160    -- HWL
161 */
162 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
163 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
164 StgTSO *ccalling_threadss[MAX_PROC];
165 /* We use the same global list of threads (all_threads) in GranSim as in
166    the std RTS (i.e. we are cheating). However, we don't use this list in
167    the GranSim specific code at the moment (so we are only potentially
168    cheating).  */
169
170 #else /* !GRAN */
171
172 StgTSO *run_queue_hd = NULL;
173 StgTSO *run_queue_tl = NULL;
174 StgTSO *blocked_queue_hd = NULL;
175 StgTSO *blocked_queue_tl = NULL;
176 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
177
178 #endif
179
180 /* Linked list of all threads.
181  * Used for detecting garbage collected threads.
182  */
183 StgTSO *all_threads = NULL;
184
185 /* When a thread performs a safe C call (_ccall_GC, using old
186  * terminology), it gets put on the suspended_ccalling_threads
187  * list. Used by the garbage collector.
188  */
189 static StgTSO *suspended_ccalling_threads;
190
191 static StgTSO *threadStackOverflow(StgTSO *tso);
192
193 /* KH: The following two flags are shared memory locations.  There is no need
194        to lock them, since they are only unset at the end of a scheduler
195        operation.
196 */
197
198 /* flag set by signal handler to precipitate a context switch */
199 //@cindex context_switch
200 nat context_switch = 0;
201
202 /* if this flag is set as well, give up execution */
203 //@cindex interrupted
204 rtsBool interrupted = rtsFalse;
205
206 /* Next thread ID to allocate.
207  * Locks required: thread_id_mutex
208  */
209 //@cindex next_thread_id
210 static StgThreadID next_thread_id = 1;
211
212 /*
213  * Pointers to the state of the current thread.
214  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
215  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
216  */
217  
218 /* The smallest stack size that makes any sense is:
219  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
220  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
221  *  + 1                       (the realworld token for an IO thread)
222  *  + 1                       (the closure to enter)
223  *
224  * A thread with this stack will bomb immediately with a stack
225  * overflow, which will increase its stack size.  
226  */
227
228 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
229
230
231 #if defined(GRAN)
232 StgTSO *CurrentTSO;
233 #endif
234
235 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
236  *  exists - earlier gccs apparently didn't.
237  *  -= chak
238  */
239 StgTSO dummy_tso;
240
241 static rtsBool ready_to_gc;
242
243 /*
244  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
245  * in an MT setting, needed to signal that a worker thread shouldn't hang around
246  * in the scheduler when it is out of work.
247  */
248 static rtsBool shutting_down_scheduler = rtsFalse;
249
250 void            addToBlockedQueue ( StgTSO *tso );
251
252 static void     schedule          ( void );
253        void     interruptStgRts   ( void );
254
255 static void     detectBlackHoles  ( void );
256
257 #ifdef DEBUG
258 static void sched_belch(char *s, ...);
259 #endif
260
261 #if defined(RTS_SUPPORTS_THREADS)
262 /* ToDo: carefully document the invariants that go together
263  *       with these synchronisation objects.
264  */
265 Mutex     sched_mutex       = INIT_MUTEX_VAR;
266 Mutex     term_mutex        = INIT_MUTEX_VAR;
267
268 /*
269  * A heavyweight solution to the problem of protecting
270  * the thread_id from concurrent update.
271  */
272 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
273
274
275 # if defined(SMP)
276 static Condition gc_pending_cond = INIT_COND_VAR;
277 nat await_death;
278 # endif
279
280 #endif /* RTS_SUPPORTS_THREADS */
281
282 #if defined(PAR)
283 StgTSO *LastTSO;
284 rtsTime TimeOfLastYield;
285 rtsBool emitSchedule = rtsTrue;
286 #endif
287
288 #if DEBUG
289 static char *whatNext_strs[] = {
290   "ThreadRunGHC",
291   "ThreadInterpret",
292   "ThreadKilled",
293   "ThreadRelocated",
294   "ThreadComplete"
295 };
296 #endif
297
298 #if defined(PAR)
299 StgTSO * createSparkThread(rtsSpark spark);
300 StgTSO * activateSpark (rtsSpark spark);  
301 #endif
302
303 /*
304  * The thread state for the main thread.
305 // ToDo: check whether not needed any more
306 StgTSO   *MainTSO;
307  */
308
309 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
310 static void taskStart(void);
311 static void
312 taskStart(void)
313 {
314   schedule();
315 }
316 #endif
317
318 #if defined(RTS_SUPPORTS_THREADS)
319 void
320 startSchedulerTask(void)
321 {
322     startTask(taskStart);
323 }
324 #endif
325
326 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
327 //@subsection Main scheduling loop
328
329 /* ---------------------------------------------------------------------------
330    Main scheduling loop.
331
332    We use round-robin scheduling, each thread returning to the
333    scheduler loop when one of these conditions is detected:
334
335       * out of heap space
336       * timer expires (thread yields)
337       * thread blocks
338       * thread ends
339       * stack overflow
340
341    Locking notes:  we acquire the scheduler lock once at the beginning
342    of the scheduler loop, and release it when
343     
344       * running a thread, or
345       * waiting for work, or
346       * waiting for a GC to complete.
347
348    GRAN version:
349      In a GranSim setup this loop iterates over the global event queue.
350      This revolves around the global event queue, which determines what 
351      to do next. Therefore, it's more complicated than either the 
352      concurrent or the parallel (GUM) setup.
353
354    GUM version:
355      GUM iterates over incoming messages.
356      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
357      and sends out a fish whenever it has nothing to do; in-between
358      doing the actual reductions (shared code below) it processes the
359      incoming messages and deals with delayed operations 
360      (see PendingFetches).
361      This is not the ugliest code you could imagine, but it's bloody close.
362
363    ------------------------------------------------------------------------ */
364 //@cindex schedule
365 static void
366 schedule( void )
367 {
368   StgTSO *t;
369   Capability *cap;
370   StgThreadReturnCode ret;
371 #if defined(GRAN)
372   rtsEvent *event;
373 #elif defined(PAR)
374   StgSparkPool *pool;
375   rtsSpark spark;
376   StgTSO *tso;
377   GlobalTaskId pe;
378   rtsBool receivedFinish = rtsFalse;
379 # if defined(DEBUG)
380   nat tp_size, sp_size; // stats only
381 # endif
382 #endif
383   rtsBool was_interrupted = rtsFalse;
384   StgTSOWhatNext prev_what_next;
385   
386   ACQUIRE_LOCK(&sched_mutex);
387  
388 #if defined(RTS_SUPPORTS_THREADS)
389   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
390   IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): entering RTS", osThreadId()));
391 #else
392   /* simply initialise it in the non-threaded case */
393   grabCapability(&cap);
394 #endif
395
396 #if defined(GRAN)
397   /* set up first event to get things going */
398   /* ToDo: assign costs for system setup and init MainTSO ! */
399   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
400             ContinueThread, 
401             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
402
403   IF_DEBUG(gran,
404            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
405            G_TSO(CurrentTSO, 5));
406
407   if (RtsFlags.GranFlags.Light) {
408     /* Save current time; GranSim Light only */
409     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
410   }      
411
412   event = get_next_event();
413
414   while (event!=(rtsEvent*)NULL) {
415     /* Choose the processor with the next event */
416     CurrentProc = event->proc;
417     CurrentTSO = event->tso;
418
419 #elif defined(PAR)
420
421   while (!receivedFinish) {    /* set by processMessages */
422                                /* when receiving PP_FINISH message         */ 
423 #else
424
425   while (1) {
426
427 #endif
428
429     IF_DEBUG(scheduler, printAllThreads());
430
431 #if defined(RTS_SUPPORTS_THREADS)
432     /* Check to see whether there are any worker threads
433        waiting to deposit external call results. If so,
434        yield our capability */
435     yieldToReturningWorker(&sched_mutex, &cap);
436 #endif
437
438     /* If we're interrupted (the user pressed ^C, or some other
439      * termination condition occurred), kill all the currently running
440      * threads.
441      */
442     if (interrupted) {
443       IF_DEBUG(scheduler, sched_belch("interrupted"));
444       interrupted = rtsFalse;
445       was_interrupted = rtsTrue;
446 #if defined(RTS_SUPPORTS_THREADS)
447       // In the threaded RTS, deadlock detection doesn't work,
448       // so just exit right away.
449       prog_belch("interrupted");
450       releaseCapability(cap);
451       startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
452       RELEASE_LOCK(&sched_mutex);
453       shutdownHaskellAndExit(EXIT_SUCCESS);
454 #else
455       deleteAllThreads();
456 #endif
457     }
458
459     /* Go through the list of main threads and wake up any
460      * clients whose computations have finished.  ToDo: this
461      * should be done more efficiently without a linear scan
462      * of the main threads list, somehow...
463      */
464 #if defined(RTS_SUPPORTS_THREADS)
465     { 
466       StgMainThread *m, **prev;
467       prev = &main_threads;
468       for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
469         switch (m->tso->what_next) {
470         case ThreadComplete:
471           if (m->ret) {
472               // NOTE: return val is tso->sp[1] (see StgStartup.hc)
473               *(m->ret) = (StgClosure *)m->tso->sp[1]; 
474           }
475           *prev = m->link;
476           m->stat = Success;
477           broadcastCondition(&m->wakeup);
478 #ifdef DEBUG
479           removeThreadLabel((StgWord)m->tso);
480 #endif
481           if(m == main_main_thread)
482           {
483               releaseCapability(cap);
484               startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
485               RELEASE_LOCK(&sched_mutex);
486               shutdownHaskellAndExit(EXIT_SUCCESS);
487           }
488           break;
489         case ThreadKilled:
490           if (m->ret) *(m->ret) = NULL;
491           *prev = m->link;
492           if (was_interrupted) {
493             m->stat = Interrupted;
494           } else {
495             m->stat = Killed;
496           }
497           broadcastCondition(&m->wakeup);
498 #ifdef DEBUG
499           removeThreadLabel((StgWord)m->tso);
500 #endif
501           if(m == main_main_thread)
502           {
503               releaseCapability(cap);
504               startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
505               RELEASE_LOCK(&sched_mutex);
506               shutdownHaskellAndExit(EXIT_SUCCESS);
507           }
508           break;
509         default:
510           break;
511         }
512       }
513     }
514
515 #else /* not threaded */
516
517 # if defined(PAR)
518     /* in GUM do this only on the Main PE */
519     if (IAmMainThread)
520 # endif
521     /* If our main thread has finished or been killed, return.
522      */
523     {
524       StgMainThread *m = main_threads;
525       if (m->tso->what_next == ThreadComplete
526           || m->tso->what_next == ThreadKilled) {
527 #ifdef DEBUG
528         removeThreadLabel((StgWord)m->tso);
529 #endif
530         main_threads = main_threads->link;
531         if (m->tso->what_next == ThreadComplete) {
532             // We finished successfully, fill in the return value
533             // NOTE: return val is tso->sp[1] (see StgStartup.hc)
534             if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
535             m->stat = Success;
536             return;
537         } else {
538           if (m->ret) { *(m->ret) = NULL; };
539           if (was_interrupted) {
540             m->stat = Interrupted;
541           } else {
542             m->stat = Killed;
543           }
544           return;
545         }
546       }
547     }
548 #endif
549
550     /* Top up the run queue from our spark pool.  We try to make the
551      * number of threads in the run queue equal to the number of
552      * free capabilities.
553      *
554      * Disable spark support in SMP for now, non-essential & requires
555      * a little bit of work to make it compile cleanly. -- sof 1/02.
556      */
557 #if 0 /* defined(SMP) */
558     {
559       nat n = getFreeCapabilities();
560       StgTSO *tso = run_queue_hd;
561
562       /* Count the run queue */
563       while (n > 0 && tso != END_TSO_QUEUE) {
564         tso = tso->link;
565         n--;
566       }
567
568       for (; n > 0; n--) {
569         StgClosure *spark;
570         spark = findSpark(rtsFalse);
571         if (spark == NULL) {
572           break; /* no more sparks in the pool */
573         } else {
574           /* I'd prefer this to be done in activateSpark -- HWL */
575           /* tricky - it needs to hold the scheduler lock and
576            * not try to re-acquire it -- SDM */
577           createSparkThread(spark);       
578           IF_DEBUG(scheduler,
579                    sched_belch("==^^ turning spark of closure %p into a thread",
580                                (StgClosure *)spark));
581         }
582       }
583       /* We need to wake up the other tasks if we just created some
584        * work for them.
585        */
586       if (getFreeCapabilities() - n > 1) {
587           signalCondition( &thread_ready_cond );
588       }
589     }
590 #endif // SMP
591
592     /* check for signals each time around the scheduler */
593 #ifndef mingw32_TARGET_OS
594     if (signals_pending()) {
595       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
596       startSignalHandlers();
597       ACQUIRE_LOCK(&sched_mutex);
598     }
599 #endif
600
601     /* Check whether any waiting threads need to be woken up.  If the
602      * run queue is empty, and there are no other tasks running, we
603      * can wait indefinitely for something to happen.
604      */
605     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
606 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
607                 || EMPTY_RUN_QUEUE()
608 #endif
609         )
610     {
611       awaitEvent( EMPTY_RUN_QUEUE()
612 #if defined(SMP)
613         && allFreeCapabilities()
614 #endif
615         );
616     }
617     /* we can be interrupted while waiting for I/O... */
618     if (interrupted) continue;
619
620     /* 
621      * Detect deadlock: when we have no threads to run, there are no
622      * threads waiting on I/O or sleeping, and all the other tasks are
623      * waiting for work, we must have a deadlock of some description.
624      *
625      * We first try to find threads blocked on themselves (ie. black
626      * holes), and generate NonTermination exceptions where necessary.
627      *
628      * If no threads are black holed, we have a deadlock situation, so
629      * inform all the main threads.
630      */
631 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
632     if (   EMPTY_THREAD_QUEUES()
633 #if defined(RTS_SUPPORTS_THREADS)
634         && EMPTY_QUEUE(suspended_ccalling_threads)
635 #endif
636 #ifdef SMP
637         && allFreeCapabilities()
638 #endif
639         )
640     {
641         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
642 #if defined(THREADED_RTS)
643         /* and SMP mode ..? */
644         releaseCapability(cap);
645 #endif
646         // Garbage collection can release some new threads due to
647         // either (a) finalizers or (b) threads resurrected because
648         // they are about to be send BlockedOnDeadMVar.  Any threads
649         // thus released will be immediately runnable.
650         GarbageCollect(GetRoots,rtsTrue);
651
652         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
653
654         IF_DEBUG(scheduler, 
655                  sched_belch("still deadlocked, checking for black holes..."));
656         detectBlackHoles();
657
658         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
659
660 #ifndef mingw32_TARGET_OS
661         /* If we have user-installed signal handlers, then wait
662          * for signals to arrive rather then bombing out with a
663          * deadlock.
664          */
665 #if defined(RTS_SUPPORTS_THREADS)
666         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
667                       a signal with no runnable threads (or I/O
668                       suspended ones) leads nowhere quick.
669                       For now, simply shut down when we reach this
670                       condition.
671                       
672                       ToDo: define precisely under what conditions
673                       the Scheduler should shut down in an MT setting.
674                    */
675 #else
676         if ( anyUserHandlers() ) {
677 #endif
678             IF_DEBUG(scheduler, 
679                      sched_belch("still deadlocked, waiting for signals..."));
680
681             awaitUserSignals();
682
683             // we might be interrupted...
684             if (interrupted) { continue; }
685
686             if (signals_pending()) {
687                 RELEASE_LOCK(&sched_mutex);
688                 startSignalHandlers();
689                 ACQUIRE_LOCK(&sched_mutex);
690             }
691             ASSERT(!EMPTY_RUN_QUEUE());
692             goto not_deadlocked;
693         }
694 #endif
695
696         /* Probably a real deadlock.  Send the current main thread the
697          * Deadlock exception (or in the SMP build, send *all* main
698          * threads the deadlock exception, since none of them can make
699          * progress).
700          */
701         {
702             StgMainThread *m;
703 #if defined(RTS_SUPPORTS_THREADS)
704             for (m = main_threads; m != NULL; m = m->link) {
705                 switch (m->tso->why_blocked) {
706                 case BlockedOnBlackHole:
707                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
708                     break;
709                 case BlockedOnException:
710                 case BlockedOnMVar:
711                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
712                     break;
713                 default:
714                     barf("deadlock: main thread blocked in a strange way");
715                 }
716             }
717 #else
718             m = main_threads;
719             switch (m->tso->why_blocked) {
720             case BlockedOnBlackHole:
721                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
722                 break;
723             case BlockedOnException:
724             case BlockedOnMVar:
725                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
726                 break;
727             default:
728                 barf("deadlock: main thread blocked in a strange way");
729             }
730 #endif
731         }
732
733 #if defined(RTS_SUPPORTS_THREADS)
734         /* ToDo: revisit conditions (and mechanism) for shutting
735            down a multi-threaded world  */
736         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
737         RELEASE_LOCK(&sched_mutex);
738         shutdownHaskell();
739         return;
740 #endif
741     }
742   not_deadlocked:
743
744 #elif defined(RTS_SUPPORTS_THREADS)
745     /* ToDo: add deadlock detection in threaded RTS */
746 #elif defined(PAR)
747     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
748 #endif
749
750 #if defined(SMP)
751     /* If there's a GC pending, don't do anything until it has
752      * completed.
753      */
754     if (ready_to_gc) {
755       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
756       waitCondition( &gc_pending_cond, &sched_mutex );
757     }
758 #endif    
759
760 #if defined(RTS_SUPPORTS_THREADS)
761 #if defined(SMP)
762     /* block until we've got a thread on the run queue and a free
763      * capability.
764      *
765      */
766     if ( EMPTY_RUN_QUEUE() ) {
767       /* Give up our capability */
768       releaseCapability(cap);
769
770       /* If we're in the process of shutting down (& running the
771        * a batch of finalisers), don't wait around.
772        */
773       if ( shutting_down_scheduler ) {
774         RELEASE_LOCK(&sched_mutex);
775         return;
776       }
777       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
778       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
779       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
780     }
781 #else
782     if ( EMPTY_RUN_QUEUE() ) {
783       continue; // nothing to do
784     }
785 #endif
786 #endif
787
788 #if defined(GRAN)
789     if (RtsFlags.GranFlags.Light)
790       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
791
792     /* adjust time based on time-stamp */
793     if (event->time > CurrentTime[CurrentProc] &&
794         event->evttype != ContinueThread)
795       CurrentTime[CurrentProc] = event->time;
796     
797     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
798     if (!RtsFlags.GranFlags.Light)
799       handleIdlePEs();
800
801     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
802
803     /* main event dispatcher in GranSim */
804     switch (event->evttype) {
805       /* Should just be continuing execution */
806     case ContinueThread:
807       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
808       /* ToDo: check assertion
809       ASSERT(run_queue_hd != (StgTSO*)NULL &&
810              run_queue_hd != END_TSO_QUEUE);
811       */
812       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
813       if (!RtsFlags.GranFlags.DoAsyncFetch &&
814           procStatus[CurrentProc]==Fetching) {
815         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
816               CurrentTSO->id, CurrentTSO, CurrentProc);
817         goto next_thread;
818       } 
819       /* Ignore ContinueThreads for completed threads */
820       if (CurrentTSO->what_next == ThreadComplete) {
821         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
822               CurrentTSO->id, CurrentTSO, CurrentProc);
823         goto next_thread;
824       } 
825       /* Ignore ContinueThreads for threads that are being migrated */
826       if (PROCS(CurrentTSO)==Nowhere) { 
827         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
828               CurrentTSO->id, CurrentTSO, CurrentProc);
829         goto next_thread;
830       }
831       /* The thread should be at the beginning of the run queue */
832       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
833         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
834               CurrentTSO->id, CurrentTSO, CurrentProc);
835         break; // run the thread anyway
836       }
837       /*
838       new_event(proc, proc, CurrentTime[proc],
839                 FindWork,
840                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
841       goto next_thread; 
842       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
843       break; // now actually run the thread; DaH Qu'vam yImuHbej 
844
845     case FetchNode:
846       do_the_fetchnode(event);
847       goto next_thread;             /* handle next event in event queue  */
848       
849     case GlobalBlock:
850       do_the_globalblock(event);
851       goto next_thread;             /* handle next event in event queue  */
852       
853     case FetchReply:
854       do_the_fetchreply(event);
855       goto next_thread;             /* handle next event in event queue  */
856       
857     case UnblockThread:   /* Move from the blocked queue to the tail of */
858       do_the_unblock(event);
859       goto next_thread;             /* handle next event in event queue  */
860       
861     case ResumeThread:  /* Move from the blocked queue to the tail of */
862       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
863       event->tso->gran.blocktime += 
864         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
865       do_the_startthread(event);
866       goto next_thread;             /* handle next event in event queue  */
867       
868     case StartThread:
869       do_the_startthread(event);
870       goto next_thread;             /* handle next event in event queue  */
871       
872     case MoveThread:
873       do_the_movethread(event);
874       goto next_thread;             /* handle next event in event queue  */
875       
876     case MoveSpark:
877       do_the_movespark(event);
878       goto next_thread;             /* handle next event in event queue  */
879       
880     case FindWork:
881       do_the_findwork(event);
882       goto next_thread;             /* handle next event in event queue  */
883       
884     default:
885       barf("Illegal event type %u\n", event->evttype);
886     }  /* switch */
887     
888     /* This point was scheduler_loop in the old RTS */
889
890     IF_DEBUG(gran, belch("GRAN: after main switch"));
891
892     TimeOfLastEvent = CurrentTime[CurrentProc];
893     TimeOfNextEvent = get_time_of_next_event();
894     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
895     // CurrentTSO = ThreadQueueHd;
896
897     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
898                          TimeOfNextEvent));
899
900     if (RtsFlags.GranFlags.Light) 
901       GranSimLight_leave_system(event, &ActiveTSO); 
902
903     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
904
905     IF_DEBUG(gran, 
906              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
907
908     /* in a GranSim setup the TSO stays on the run queue */
909     t = CurrentTSO;
910     /* Take a thread from the run queue. */
911     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
912
913     IF_DEBUG(gran, 
914              fprintf(stderr, "GRAN: About to run current thread, which is\n");
915              G_TSO(t,5));
916
917     context_switch = 0; // turned on via GranYield, checking events and time slice
918
919     IF_DEBUG(gran, 
920              DumpGranEvent(GR_SCHEDULE, t));
921
922     procStatus[CurrentProc] = Busy;
923
924 #elif defined(PAR)
925     if (PendingFetches != END_BF_QUEUE) {
926         processFetches();
927     }
928
929     /* ToDo: phps merge with spark activation above */
930     /* check whether we have local work and send requests if we have none */
931     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
932       /* :-[  no local threads => look out for local sparks */
933       /* the spark pool for the current PE */
934       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
935       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
936           pool->hd < pool->tl) {
937         /* 
938          * ToDo: add GC code check that we really have enough heap afterwards!!
939          * Old comment:
940          * If we're here (no runnable threads) and we have pending
941          * sparks, we must have a space problem.  Get enough space
942          * to turn one of those pending sparks into a
943          * thread... 
944          */
945
946         spark = findSpark(rtsFalse);                /* get a spark */
947         if (spark != (rtsSpark) NULL) {
948           tso = activateSpark(spark);       /* turn the spark into a thread */
949           IF_PAR_DEBUG(schedule,
950                        belch("==== schedule: Created TSO %d (%p); %d threads active",
951                              tso->id, tso, advisory_thread_count));
952
953           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
954             belch("==^^ failed to activate spark");
955             goto next_thread;
956           }               /* otherwise fall through & pick-up new tso */
957         } else {
958           IF_PAR_DEBUG(verbose,
959                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
960                              spark_queue_len(pool)));
961           goto next_thread;
962         }
963       }
964
965       /* If we still have no work we need to send a FISH to get a spark
966          from another PE 
967       */
968       if (EMPTY_RUN_QUEUE()) {
969       /* =8-[  no local sparks => look for work on other PEs */
970         /*
971          * We really have absolutely no work.  Send out a fish
972          * (there may be some out there already), and wait for
973          * something to arrive.  We clearly can't run any threads
974          * until a SCHEDULE or RESUME arrives, and so that's what
975          * we're hoping to see.  (Of course, we still have to
976          * respond to other types of messages.)
977          */
978         TIME now = msTime() /*CURRENT_TIME*/;
979         IF_PAR_DEBUG(verbose, 
980                      belch("--  now=%ld", now));
981         IF_PAR_DEBUG(verbose,
982                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
983                          (last_fish_arrived_at!=0 &&
984                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
985                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
986                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
987                              last_fish_arrived_at,
988                              RtsFlags.ParFlags.fishDelay, now);
989                      });
990         
991         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
992             (last_fish_arrived_at==0 ||
993              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
994           /* outstandingFishes is set in sendFish, processFish;
995              avoid flooding system with fishes via delay */
996           pe = choosePE();
997           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
998                    NEW_FISH_HUNGER);
999
1000           // Global statistics: count no. of fishes
1001           if (RtsFlags.ParFlags.ParStats.Global &&
1002               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1003             globalParStats.tot_fish_mess++;
1004           }
1005         }
1006       
1007         receivedFinish = processMessages();
1008         goto next_thread;
1009       }
1010     } else if (PacketsWaiting()) {  /* Look for incoming messages */
1011       receivedFinish = processMessages();
1012     }
1013
1014     /* Now we are sure that we have some work available */
1015     ASSERT(run_queue_hd != END_TSO_QUEUE);
1016
1017     /* Take a thread from the run queue, if we have work */
1018     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
1019     IF_DEBUG(sanity,checkTSO(t));
1020
1021     /* ToDo: write something to the log-file
1022     if (RTSflags.ParFlags.granSimStats && !sameThread)
1023         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1024
1025     CurrentTSO = t;
1026     */
1027     /* the spark pool for the current PE */
1028     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1029
1030     IF_DEBUG(scheduler, 
1031              belch("--=^ %d threads, %d sparks on [%#x]", 
1032                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1033
1034 # if 1
1035     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1036         t && LastTSO && t->id != LastTSO->id && 
1037         LastTSO->why_blocked == NotBlocked && 
1038         LastTSO->what_next != ThreadComplete) {
1039       // if previously scheduled TSO not blocked we have to record the context switch
1040       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1041                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1042     }
1043
1044     if (RtsFlags.ParFlags.ParStats.Full && 
1045         (emitSchedule /* forced emit */ ||
1046         (t && LastTSO && t->id != LastTSO->id))) {
1047       /* 
1048          we are running a different TSO, so write a schedule event to log file
1049          NB: If we use fair scheduling we also have to write  a deschedule 
1050              event for LastTSO; with unfair scheduling we know that the
1051              previous tso has blocked whenever we switch to another tso, so
1052              we don't need it in GUM for now
1053       */
1054       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1055                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1056       emitSchedule = rtsFalse;
1057     }
1058      
1059 # endif
1060 #else /* !GRAN && !PAR */
1061   
1062     /* grab a thread from the run queue */
1063     ASSERT(run_queue_hd != END_TSO_QUEUE);
1064     t = POP_RUN_QUEUE();
1065     // Sanity check the thread we're about to run.  This can be
1066     // expensive if there is lots of thread switching going on...
1067     IF_DEBUG(sanity,checkTSO(t));
1068 #endif
1069
1070     cap->r.rCurrentTSO = t;
1071     
1072     /* context switches are now initiated by the timer signal, unless
1073      * the user specified "context switch as often as possible", with
1074      * +RTS -C0
1075      */
1076     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1077          && (run_queue_hd != END_TSO_QUEUE
1078              || blocked_queue_hd != END_TSO_QUEUE
1079              || sleeping_queue != END_TSO_QUEUE)))
1080         context_switch = 1;
1081     else
1082         context_switch = 0;
1083
1084 run_thread:
1085
1086     RELEASE_LOCK(&sched_mutex);
1087
1088     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
1089                               t->id, whatNext_strs[t->what_next]));
1090
1091 #ifdef PROFILING
1092     startHeapProfTimer();
1093 #endif
1094
1095     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1096     /* Run the current thread 
1097      */
1098     prev_what_next = t->what_next;
1099     switch (prev_what_next) {
1100     case ThreadKilled:
1101     case ThreadComplete:
1102         /* Thread already finished, return to scheduler. */
1103         ret = ThreadFinished;
1104         break;
1105     case ThreadRunGHC:
1106         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1107         break;
1108     case ThreadInterpret:
1109         ret = interpretBCO(cap);
1110         break;
1111     default:
1112       barf("schedule: invalid what_next field");
1113     }
1114     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1115     
1116     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1117 #ifdef PROFILING
1118     stopHeapProfTimer();
1119     CCCS = CCS_SYSTEM;
1120 #endif
1121     
1122     ACQUIRE_LOCK(&sched_mutex);
1123     
1124 #ifdef RTS_SUPPORTS_THREADS
1125     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1126 #elif !defined(GRAN) && !defined(PAR)
1127     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1128 #endif
1129     t = cap->r.rCurrentTSO;
1130     
1131 #if defined(PAR)
1132     /* HACK 675: if the last thread didn't yield, make sure to print a 
1133        SCHEDULE event to the log file when StgRunning the next thread, even
1134        if it is the same one as before */
1135     LastTSO = t; 
1136     TimeOfLastYield = CURRENT_TIME;
1137 #endif
1138
1139     switch (ret) {
1140     case HeapOverflow:
1141 #if defined(GRAN)
1142       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1143       globalGranStats.tot_heapover++;
1144 #elif defined(PAR)
1145       globalParStats.tot_heapover++;
1146 #endif
1147
1148       // did the task ask for a large block?
1149       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1150           // if so, get one and push it on the front of the nursery.
1151           bdescr *bd;
1152           nat blocks;
1153           
1154           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1155
1156           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
1157                                    t->id, whatNext_strs[t->what_next], blocks));
1158
1159           // don't do this if it would push us over the
1160           // alloc_blocks_lim limit; we'll GC first.
1161           if (alloc_blocks + blocks < alloc_blocks_lim) {
1162
1163               alloc_blocks += blocks;
1164               bd = allocGroup( blocks );
1165
1166               // link the new group into the list
1167               bd->link = cap->r.rCurrentNursery;
1168               bd->u.back = cap->r.rCurrentNursery->u.back;
1169               if (cap->r.rCurrentNursery->u.back != NULL) {
1170                   cap->r.rCurrentNursery->u.back->link = bd;
1171               } else {
1172                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1173                          g0s0->blocks == cap->r.rNursery);
1174                   cap->r.rNursery = g0s0->blocks = bd;
1175               }           
1176               cap->r.rCurrentNursery->u.back = bd;
1177
1178               // initialise it as a nursery block.  We initialise the
1179               // step, gen_no, and flags field of *every* sub-block in
1180               // this large block, because this is easier than making
1181               // sure that we always find the block head of a large
1182               // block whenever we call Bdescr() (eg. evacuate() and
1183               // isAlive() in the GC would both have to do this, at
1184               // least).
1185               { 
1186                   bdescr *x;
1187                   for (x = bd; x < bd + blocks; x++) {
1188                       x->step = g0s0;
1189                       x->gen_no = 0;
1190                       x->flags = 0;
1191                   }
1192               }
1193
1194               // don't forget to update the block count in g0s0.
1195               g0s0->n_blocks += blocks;
1196               // This assert can be a killer if the app is doing lots
1197               // of large block allocations.
1198               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1199
1200               // now update the nursery to point to the new block
1201               cap->r.rCurrentNursery = bd;
1202
1203               // we might be unlucky and have another thread get on the
1204               // run queue before us and steal the large block, but in that
1205               // case the thread will just end up requesting another large
1206               // block.
1207               PUSH_ON_RUN_QUEUE(t);
1208               break;
1209           }
1210       }
1211
1212       /* make all the running tasks block on a condition variable,
1213        * maybe set context_switch and wait till they all pile in,
1214        * then have them wait on a GC condition variable.
1215        */
1216       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1217                                t->id, whatNext_strs[t->what_next]));
1218       threadPaused(t);
1219 #if defined(GRAN)
1220       ASSERT(!is_on_queue(t,CurrentProc));
1221 #elif defined(PAR)
1222       /* Currently we emit a DESCHEDULE event before GC in GUM.
1223          ToDo: either add separate event to distinguish SYSTEM time from rest
1224                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1225       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1226         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1227                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1228         emitSchedule = rtsTrue;
1229       }
1230 #endif
1231       
1232       ready_to_gc = rtsTrue;
1233       context_switch = 1;               /* stop other threads ASAP */
1234       PUSH_ON_RUN_QUEUE(t);
1235       /* actual GC is done at the end of the while loop */
1236       break;
1237       
1238     case StackOverflow:
1239 #if defined(GRAN)
1240       IF_DEBUG(gran, 
1241                DumpGranEvent(GR_DESCHEDULE, t));
1242       globalGranStats.tot_stackover++;
1243 #elif defined(PAR)
1244       // IF_DEBUG(par, 
1245       // DumpGranEvent(GR_DESCHEDULE, t);
1246       globalParStats.tot_stackover++;
1247 #endif
1248       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1249                                t->id, whatNext_strs[t->what_next]));
1250       /* just adjust the stack for this thread, then pop it back
1251        * on the run queue.
1252        */
1253       threadPaused(t);
1254       { 
1255         StgMainThread *m;
1256         /* enlarge the stack */
1257         StgTSO *new_t = threadStackOverflow(t);
1258         
1259         /* This TSO has moved, so update any pointers to it from the
1260          * main thread stack.  It better not be on any other queues...
1261          * (it shouldn't be).
1262          */
1263         for (m = main_threads; m != NULL; m = m->link) {
1264           if (m->tso == t) {
1265             m->tso = new_t;
1266           }
1267         }
1268         threadPaused(new_t);
1269         PUSH_ON_RUN_QUEUE(new_t);
1270       }
1271       break;
1272
1273     case ThreadYielding:
1274 #if defined(GRAN)
1275       IF_DEBUG(gran, 
1276                DumpGranEvent(GR_DESCHEDULE, t));
1277       globalGranStats.tot_yields++;
1278 #elif defined(PAR)
1279       // IF_DEBUG(par, 
1280       // DumpGranEvent(GR_DESCHEDULE, t);
1281       globalParStats.tot_yields++;
1282 #endif
1283       /* put the thread back on the run queue.  Then, if we're ready to
1284        * GC, check whether this is the last task to stop.  If so, wake
1285        * up the GC thread.  getThread will block during a GC until the
1286        * GC is finished.
1287        */
1288       IF_DEBUG(scheduler,
1289                if (t->what_next != prev_what_next) {
1290                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1291                          t->id, whatNext_strs[t->what_next]);
1292                } else {
1293                    belch("--<< thread %ld (%s) stopped, yielding", 
1294                          t->id, whatNext_strs[t->what_next]);
1295                }
1296                );
1297
1298       IF_DEBUG(sanity,
1299                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1300                checkTSO(t));
1301       ASSERT(t->link == END_TSO_QUEUE);
1302
1303       // Shortcut if we're just switching evaluators: don't bother
1304       // doing stack squeezing (which can be expensive), just run the
1305       // thread.
1306       if (t->what_next != prev_what_next) {
1307           goto run_thread;
1308       }
1309
1310       threadPaused(t);
1311
1312 #if defined(GRAN)
1313       ASSERT(!is_on_queue(t,CurrentProc));
1314
1315       IF_DEBUG(sanity,
1316                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1317                checkThreadQsSanity(rtsTrue));
1318 #endif
1319
1320 #if defined(PAR)
1321       if (RtsFlags.ParFlags.doFairScheduling) { 
1322         /* this does round-robin scheduling; good for concurrency */
1323         APPEND_TO_RUN_QUEUE(t);
1324       } else {
1325         /* this does unfair scheduling; good for parallelism */
1326         PUSH_ON_RUN_QUEUE(t);
1327       }
1328 #else
1329       // this does round-robin scheduling; good for concurrency
1330       APPEND_TO_RUN_QUEUE(t);
1331 #endif
1332
1333 #if defined(GRAN)
1334       /* add a ContinueThread event to actually process the thread */
1335       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1336                 ContinueThread,
1337                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1338       IF_GRAN_DEBUG(bq, 
1339                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1340                G_EVENTQ(0);
1341                G_CURR_THREADQ(0));
1342 #endif /* GRAN */
1343       break;
1344
1345     case ThreadBlocked:
1346 #if defined(GRAN)
1347       IF_DEBUG(scheduler,
1348                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1349                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1350                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1351
1352       // ??? needed; should emit block before
1353       IF_DEBUG(gran, 
1354                DumpGranEvent(GR_DESCHEDULE, t)); 
1355       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1356       /*
1357         ngoq Dogh!
1358       ASSERT(procStatus[CurrentProc]==Busy || 
1359               ((procStatus[CurrentProc]==Fetching) && 
1360               (t->block_info.closure!=(StgClosure*)NULL)));
1361       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1362           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1363             procStatus[CurrentProc]==Fetching)) 
1364         procStatus[CurrentProc] = Idle;
1365       */
1366 #elif defined(PAR)
1367       IF_DEBUG(scheduler,
1368                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1369                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1370       IF_PAR_DEBUG(bq,
1371
1372                    if (t->block_info.closure!=(StgClosure*)NULL) 
1373                      print_bq(t->block_info.closure));
1374
1375       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1376       blockThread(t);
1377
1378       /* whatever we schedule next, we must log that schedule */
1379       emitSchedule = rtsTrue;
1380
1381 #else /* !GRAN */
1382       /* don't need to do anything.  Either the thread is blocked on
1383        * I/O, in which case we'll have called addToBlockedQueue
1384        * previously, or it's blocked on an MVar or Blackhole, in which
1385        * case it'll be on the relevant queue already.
1386        */
1387       IF_DEBUG(scheduler,
1388                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1389                        t->id, whatNext_strs[t->what_next]);
1390                printThreadBlockage(t);
1391                fprintf(stderr, "\n"));
1392
1393       /* Only for dumping event to log file 
1394          ToDo: do I need this in GranSim, too?
1395       blockThread(t);
1396       */
1397 #endif
1398       threadPaused(t);
1399       break;
1400       
1401     case ThreadFinished:
1402       /* Need to check whether this was a main thread, and if so, signal
1403        * the task that started it with the return value.  If we have no
1404        * more main threads, we probably need to stop all the tasks until
1405        * we get a new one.
1406        */
1407       /* We also end up here if the thread kills itself with an
1408        * uncaught exception, see Exception.hc.
1409        */
1410       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1411                                t->id, whatNext_strs[t->what_next]));
1412 #if defined(GRAN)
1413       endThread(t, CurrentProc); // clean-up the thread
1414 #elif defined(PAR)
1415       /* For now all are advisory -- HWL */
1416       //if(t->priority==AdvisoryPriority) ??
1417       advisory_thread_count--;
1418       
1419 # ifdef DIST
1420       if(t->dist.priority==RevalPriority)
1421         FinishReval(t);
1422 # endif
1423       
1424       if (RtsFlags.ParFlags.ParStats.Full &&
1425           !RtsFlags.ParFlags.ParStats.Suppressed) 
1426         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1427 #endif
1428       break;
1429       
1430     default:
1431       barf("schedule: invalid thread return code %d", (int)ret);
1432     }
1433
1434 #ifdef PROFILING
1435     // When we have +RTS -i0 and we're heap profiling, do a census at
1436     // every GC.  This lets us get repeatable runs for debugging.
1437     if (performHeapProfile ||
1438         (RtsFlags.ProfFlags.profileInterval==0 &&
1439          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1440         GarbageCollect(GetRoots, rtsTrue);
1441         heapCensus();
1442         performHeapProfile = rtsFalse;
1443         ready_to_gc = rtsFalse; // we already GC'd
1444     }
1445 #endif
1446
1447     if (ready_to_gc 
1448 #ifdef SMP
1449         && allFreeCapabilities() 
1450 #endif
1451         ) {
1452       /* everybody back, start the GC.
1453        * Could do it in this thread, or signal a condition var
1454        * to do it in another thread.  Either way, we need to
1455        * broadcast on gc_pending_cond afterward.
1456        */
1457 #if defined(RTS_SUPPORTS_THREADS)
1458       IF_DEBUG(scheduler,sched_belch("doing GC"));
1459 #endif
1460       GarbageCollect(GetRoots,rtsFalse);
1461       ready_to_gc = rtsFalse;
1462 #ifdef SMP
1463       broadcastCondition(&gc_pending_cond);
1464 #endif
1465 #if defined(GRAN)
1466       /* add a ContinueThread event to continue execution of current thread */
1467       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1468                 ContinueThread,
1469                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1470       IF_GRAN_DEBUG(bq, 
1471                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1472                G_EVENTQ(0);
1473                G_CURR_THREADQ(0));
1474 #endif /* GRAN */
1475     }
1476
1477 #if defined(GRAN)
1478   next_thread:
1479     IF_GRAN_DEBUG(unused,
1480                   print_eventq(EventHd));
1481
1482     event = get_next_event();
1483 #elif defined(PAR)
1484   next_thread:
1485     /* ToDo: wait for next message to arrive rather than busy wait */
1486 #endif /* GRAN */
1487
1488   } /* end of while(1) */
1489
1490   IF_PAR_DEBUG(verbose,
1491                belch("== Leaving schedule() after having received Finish"));
1492 }
1493
1494 /* ---------------------------------------------------------------------------
1495  * Singleton fork(). Do not copy any running threads.
1496  * ------------------------------------------------------------------------- */
1497
1498 StgInt forkProcess(StgTSO* tso) {
1499
1500 #ifndef mingw32_TARGET_OS
1501   pid_t pid;
1502   StgTSO* t,*next;
1503   StgMainThread *m;
1504   rtsBool doKill;
1505
1506   IF_DEBUG(scheduler,sched_belch("forking!"));
1507
1508   pid = fork();
1509   if (pid) { /* parent */
1510
1511   /* just return the pid */
1512     
1513   } else { /* child */
1514   /* wipe all other threads */
1515   run_queue_hd = run_queue_tl = tso;
1516   tso->link = END_TSO_QUEUE;
1517
1518   /* When clearing out the threads, we need to ensure
1519      that a 'main thread' is left behind; if there isn't,
1520      the Scheduler will shutdown next time it is entered.
1521      
1522      ==> we don't kill a thread that's on the main_threads
1523          list (nor the current thread.)
1524     
1525      [ Attempts at implementing the more ambitious scheme of
1526        killing the main_threads also, and then adding the
1527        current thread onto the main_threads list if it wasn't
1528        there already, failed -- waitThread() (for one) wasn't
1529        up to it. If it proves to be desirable to also kill
1530        the main threads, then this scheme will have to be
1531        revisited (and fully debugged!)
1532        
1533        -- sof 7/2002
1534      ]
1535   */
1536   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1537      us is picky about finding the thread still in its queue when
1538      handling the deleteThread() */
1539
1540   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1541     next = t->link;
1542     
1543     /* Don't kill the current thread.. */
1544     if (t->id == tso->id) continue;
1545     doKill=rtsTrue;
1546     /* ..or a main thread */
1547     for (m = main_threads; m != NULL; m = m->link) {
1548         if (m->tso->id == t->id) {
1549           doKill=rtsFalse;
1550           break;
1551         }
1552     }
1553     if (doKill) {
1554       deleteThread(t);
1555     }
1556   }
1557   }
1558   return pid;
1559 #else /* mingw32 */
1560   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1561   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1562   return -1;
1563 #endif /* mingw32 */
1564 }
1565
1566 /* ---------------------------------------------------------------------------
1567  * deleteAllThreads():  kill all the live threads.
1568  *
1569  * This is used when we catch a user interrupt (^C), before performing
1570  * any necessary cleanups and running finalizers.
1571  *
1572  * Locks: sched_mutex held.
1573  * ------------------------------------------------------------------------- */
1574    
1575 void deleteAllThreads ( void )
1576 {
1577   StgTSO* t, *next;
1578   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1579   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1580       next = t->global_link;
1581       deleteThread(t);
1582   }      
1583   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1584   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1585   sleeping_queue = END_TSO_QUEUE;
1586 }
1587
1588 /* startThread and  insertThread are now in GranSim.c -- HWL */
1589
1590
1591 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1592 //@subsection Suspend and Resume
1593
1594 /* ---------------------------------------------------------------------------
1595  * Suspending & resuming Haskell threads.
1596  * 
1597  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1598  * its capability before calling the C function.  This allows another
1599  * task to pick up the capability and carry on running Haskell
1600  * threads.  It also means that if the C call blocks, it won't lock
1601  * the whole system.
1602  *
1603  * The Haskell thread making the C call is put to sleep for the
1604  * duration of the call, on the susepended_ccalling_threads queue.  We
1605  * give out a token to the task, which it can use to resume the thread
1606  * on return from the C function.
1607  * ------------------------------------------------------------------------- */
1608    
1609 StgInt
1610 suspendThread( StgRegTable *reg, 
1611                rtsBool concCall
1612 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1613                STG_UNUSED
1614 #endif
1615                )
1616 {
1617   nat tok;
1618   Capability *cap;
1619
1620   /* assume that *reg is a pointer to the StgRegTable part
1621    * of a Capability.
1622    */
1623   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1624
1625   ACQUIRE_LOCK(&sched_mutex);
1626
1627   IF_DEBUG(scheduler,
1628            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1629
1630   // XXX this might not be necessary --SDM
1631   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1632
1633   threadPaused(cap->r.rCurrentTSO);
1634   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1635   suspended_ccalling_threads = cap->r.rCurrentTSO;
1636
1637 #if defined(RTS_SUPPORTS_THREADS)
1638   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1639   {
1640       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1641       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1642   }
1643   else
1644   {
1645       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1646   }
1647 #endif
1648
1649   /* Use the thread ID as the token; it should be unique */
1650   tok = cap->r.rCurrentTSO->id;
1651
1652   /* Hand back capability */
1653   releaseCapability(cap);
1654   
1655 #if defined(RTS_SUPPORTS_THREADS)
1656   /* Preparing to leave the RTS, so ensure there's a native thread/task
1657      waiting to take over.
1658   */
1659   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
1660   //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
1661       startTask(taskStart);
1662   //}
1663 #endif
1664
1665   /* Other threads _might_ be available for execution; signal this */
1666   THREAD_RUNNABLE();
1667   RELEASE_LOCK(&sched_mutex);
1668   return tok; 
1669 }
1670
1671 StgRegTable *
1672 resumeThread( StgInt tok,
1673               rtsBool concCall
1674 #if !defined(RTS_SUPPORTS_THREADS)
1675                STG_UNUSED
1676 #endif
1677               )
1678 {
1679   StgTSO *tso, **prev;
1680   Capability *cap;
1681
1682 #if defined(RTS_SUPPORTS_THREADS)
1683   /* Wait for permission to re-enter the RTS with the result. */
1684   ACQUIRE_LOCK(&sched_mutex);
1685   grabReturnCapability(&sched_mutex, &cap);
1686
1687   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
1688 #else
1689   grabCapability(&cap);
1690 #endif
1691
1692   /* Remove the thread off of the suspended list */
1693   prev = &suspended_ccalling_threads;
1694   for (tso = suspended_ccalling_threads; 
1695        tso != END_TSO_QUEUE; 
1696        prev = &tso->link, tso = tso->link) {
1697     if (tso->id == (StgThreadID)tok) {
1698       *prev = tso->link;
1699       break;
1700     }
1701   }
1702   if (tso == END_TSO_QUEUE) {
1703     barf("resumeThread: thread not found");
1704   }
1705   tso->link = END_TSO_QUEUE;
1706   
1707 #if defined(RTS_SUPPORTS_THREADS)
1708   if(tso->why_blocked == BlockedOnCCall)
1709   {
1710       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1711       tso->blocked_exceptions = NULL;
1712   }
1713 #endif
1714   
1715   /* Reset blocking status */
1716   tso->why_blocked  = NotBlocked;
1717
1718   cap->r.rCurrentTSO = tso;
1719 #if defined(RTS_SUPPORTS_THREADS)
1720   RELEASE_LOCK(&sched_mutex);
1721 #endif
1722   return &cap->r;
1723 }
1724
1725
1726 /* ---------------------------------------------------------------------------
1727  * Static functions
1728  * ------------------------------------------------------------------------ */
1729 static void unblockThread(StgTSO *tso);
1730
1731 /* ---------------------------------------------------------------------------
1732  * Comparing Thread ids.
1733  *
1734  * This is used from STG land in the implementation of the
1735  * instances of Eq/Ord for ThreadIds.
1736  * ------------------------------------------------------------------------ */
1737
1738 int
1739 cmp_thread(StgPtr tso1, StgPtr tso2) 
1740
1741   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1742   StgThreadID id2 = ((StgTSO *)tso2)->id;
1743  
1744   if (id1 < id2) return (-1);
1745   if (id1 > id2) return 1;
1746   return 0;
1747 }
1748
1749 /* ---------------------------------------------------------------------------
1750  * Fetching the ThreadID from an StgTSO.
1751  *
1752  * This is used in the implementation of Show for ThreadIds.
1753  * ------------------------------------------------------------------------ */
1754 int
1755 rts_getThreadId(StgPtr tso) 
1756 {
1757   return ((StgTSO *)tso)->id;
1758 }
1759
1760 #ifdef DEBUG
1761 void
1762 labelThread(StgPtr tso, char *label)
1763 {
1764   int len;
1765   void *buf;
1766
1767   /* Caveat: Once set, you can only set the thread name to "" */
1768   len = strlen(label)+1;
1769   buf = malloc(len);
1770   if (buf == NULL) {
1771     fprintf(stderr,"insufficient memory for labelThread!\n");
1772   } else
1773     strncpy(buf,label,len);
1774   /* Update will free the old memory for us */
1775   updateThreadLabel((StgWord)tso,buf);
1776 }
1777 #endif /* DEBUG */
1778
1779 /* ---------------------------------------------------------------------------
1780    Create a new thread.
1781
1782    The new thread starts with the given stack size.  Before the
1783    scheduler can run, however, this thread needs to have a closure
1784    (and possibly some arguments) pushed on its stack.  See
1785    pushClosure() in Schedule.h.
1786
1787    createGenThread() and createIOThread() (in SchedAPI.h) are
1788    convenient packaged versions of this function.
1789
1790    currently pri (priority) is only used in a GRAN setup -- HWL
1791    ------------------------------------------------------------------------ */
1792 //@cindex createThread
1793 #if defined(GRAN)
1794 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1795 StgTSO *
1796 createThread(nat size, StgInt pri)
1797 #else
1798 StgTSO *
1799 createThread(nat size)
1800 #endif
1801 {
1802
1803     StgTSO *tso;
1804     nat stack_size;
1805
1806     /* First check whether we should create a thread at all */
1807 #if defined(PAR)
1808   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1809   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1810     threadsIgnored++;
1811     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1812           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1813     return END_TSO_QUEUE;
1814   }
1815   threadsCreated++;
1816 #endif
1817
1818 #if defined(GRAN)
1819   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1820 #endif
1821
1822   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1823
1824   /* catch ridiculously small stack sizes */
1825   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1826     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1827   }
1828
1829   stack_size = size - TSO_STRUCT_SIZEW;
1830
1831   tso = (StgTSO *)allocate(size);
1832   TICK_ALLOC_TSO(stack_size, 0);
1833
1834   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1835 #if defined(GRAN)
1836   SET_GRAN_HDR(tso, ThisPE);
1837 #endif
1838
1839   // Always start with the compiled code evaluator
1840   tso->what_next = ThreadRunGHC;
1841
1842   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1843    * protect the increment operation on next_thread_id.
1844    * In future, we could use an atomic increment instead.
1845    */
1846   ACQUIRE_LOCK(&thread_id_mutex);
1847   tso->id = next_thread_id++; 
1848   RELEASE_LOCK(&thread_id_mutex);
1849
1850   tso->why_blocked  = NotBlocked;
1851   tso->blocked_exceptions = NULL;
1852
1853   tso->stack_size   = stack_size;
1854   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1855                               - TSO_STRUCT_SIZEW;
1856   tso->sp           = (P_)&(tso->stack) + stack_size;
1857
1858 #ifdef PROFILING
1859   tso->prof.CCCS = CCS_MAIN;
1860 #endif
1861
1862   /* put a stop frame on the stack */
1863   tso->sp -= sizeofW(StgStopFrame);
1864   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1865   // ToDo: check this
1866 #if defined(GRAN)
1867   tso->link = END_TSO_QUEUE;
1868   /* uses more flexible routine in GranSim */
1869   insertThread(tso, CurrentProc);
1870 #else
1871   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1872    * from its creation
1873    */
1874 #endif
1875
1876 #if defined(GRAN) 
1877   if (RtsFlags.GranFlags.GranSimStats.Full) 
1878     DumpGranEvent(GR_START,tso);
1879 #elif defined(PAR)
1880   if (RtsFlags.ParFlags.ParStats.Full) 
1881     DumpGranEvent(GR_STARTQ,tso);
1882   /* HACk to avoid SCHEDULE 
1883      LastTSO = tso; */
1884 #endif
1885
1886   /* Link the new thread on the global thread list.
1887    */
1888   tso->global_link = all_threads;
1889   all_threads = tso;
1890
1891 #if defined(DIST)
1892   tso->dist.priority = MandatoryPriority; //by default that is...
1893 #endif
1894
1895 #if defined(GRAN)
1896   tso->gran.pri = pri;
1897 # if defined(DEBUG)
1898   tso->gran.magic = TSO_MAGIC; // debugging only
1899 # endif
1900   tso->gran.sparkname   = 0;
1901   tso->gran.startedat   = CURRENT_TIME; 
1902   tso->gran.exported    = 0;
1903   tso->gran.basicblocks = 0;
1904   tso->gran.allocs      = 0;
1905   tso->gran.exectime    = 0;
1906   tso->gran.fetchtime   = 0;
1907   tso->gran.fetchcount  = 0;
1908   tso->gran.blocktime   = 0;
1909   tso->gran.blockcount  = 0;
1910   tso->gran.blockedat   = 0;
1911   tso->gran.globalsparks = 0;
1912   tso->gran.localsparks  = 0;
1913   if (RtsFlags.GranFlags.Light)
1914     tso->gran.clock  = Now; /* local clock */
1915   else
1916     tso->gran.clock  = 0;
1917
1918   IF_DEBUG(gran,printTSO(tso));
1919 #elif defined(PAR)
1920 # if defined(DEBUG)
1921   tso->par.magic = TSO_MAGIC; // debugging only
1922 # endif
1923   tso->par.sparkname   = 0;
1924   tso->par.startedat   = CURRENT_TIME; 
1925   tso->par.exported    = 0;
1926   tso->par.basicblocks = 0;
1927   tso->par.allocs      = 0;
1928   tso->par.exectime    = 0;
1929   tso->par.fetchtime   = 0;
1930   tso->par.fetchcount  = 0;
1931   tso->par.blocktime   = 0;
1932   tso->par.blockcount  = 0;
1933   tso->par.blockedat   = 0;
1934   tso->par.globalsparks = 0;
1935   tso->par.localsparks  = 0;
1936 #endif
1937
1938 #if defined(GRAN)
1939   globalGranStats.tot_threads_created++;
1940   globalGranStats.threads_created_on_PE[CurrentProc]++;
1941   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1942   globalGranStats.tot_sq_probes++;
1943 #elif defined(PAR)
1944   // collect parallel global statistics (currently done together with GC stats)
1945   if (RtsFlags.ParFlags.ParStats.Global &&
1946       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1947     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1948     globalParStats.tot_threads_created++;
1949   }
1950 #endif 
1951
1952 #if defined(GRAN)
1953   IF_GRAN_DEBUG(pri,
1954                 belch("==__ schedule: Created TSO %d (%p);",
1955                       CurrentProc, tso, tso->id));
1956 #elif defined(PAR)
1957     IF_PAR_DEBUG(verbose,
1958                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1959                        tso->id, tso, advisory_thread_count));
1960 #else
1961   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1962                                  tso->id, tso->stack_size));
1963 #endif    
1964   return tso;
1965 }
1966
1967 #if defined(PAR)
1968 /* RFP:
1969    all parallel thread creation calls should fall through the following routine.
1970 */
1971 StgTSO *
1972 createSparkThread(rtsSpark spark) 
1973 { StgTSO *tso;
1974   ASSERT(spark != (rtsSpark)NULL);
1975   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1976   { threadsIgnored++;
1977     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1978           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1979     return END_TSO_QUEUE;
1980   }
1981   else
1982   { threadsCreated++;
1983     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1984     if (tso==END_TSO_QUEUE)     
1985       barf("createSparkThread: Cannot create TSO");
1986 #if defined(DIST)
1987     tso->priority = AdvisoryPriority;
1988 #endif
1989     pushClosure(tso,spark);
1990     PUSH_ON_RUN_QUEUE(tso);
1991     advisory_thread_count++;    
1992   }
1993   return tso;
1994 }
1995 #endif
1996
1997 /*
1998   Turn a spark into a thread.
1999   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2000 */
2001 #if defined(PAR)
2002 //@cindex activateSpark
2003 StgTSO *
2004 activateSpark (rtsSpark spark) 
2005 {
2006   StgTSO *tso;
2007
2008   tso = createSparkThread(spark);
2009   if (RtsFlags.ParFlags.ParStats.Full) {   
2010     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2011     IF_PAR_DEBUG(verbose,
2012                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
2013                        (StgClosure *)spark, info_type((StgClosure *)spark)));
2014   }
2015   // ToDo: fwd info on local/global spark to thread -- HWL
2016   // tso->gran.exported =  spark->exported;
2017   // tso->gran.locked =   !spark->global;
2018   // tso->gran.sparkname = spark->name;
2019
2020   return tso;
2021 }
2022 #endif
2023
2024 static SchedulerStatus waitThread_(/*out*/StgMainThread* m
2025 #if defined(THREADED_RTS)
2026                                    , rtsBool blockWaiting
2027 #endif
2028                                    );
2029
2030
2031 /* ---------------------------------------------------------------------------
2032  * scheduleThread()
2033  *
2034  * scheduleThread puts a thread on the head of the runnable queue.
2035  * This will usually be done immediately after a thread is created.
2036  * The caller of scheduleThread must create the thread using e.g.
2037  * createThread and push an appropriate closure
2038  * on this thread's stack before the scheduler is invoked.
2039  * ------------------------------------------------------------------------ */
2040
2041 static void scheduleThread_ (StgTSO* tso);
2042
2043 void
2044 scheduleThread_(StgTSO *tso)
2045 {
2046   // Precondition: sched_mutex must be held.
2047
2048   /* Put the new thread on the head of the runnable queue.  The caller
2049    * better push an appropriate closure on this thread's stack
2050    * beforehand.  In the SMP case, the thread may start running as
2051    * soon as we release the scheduler lock below.
2052    */
2053   PUSH_ON_RUN_QUEUE(tso);
2054   THREAD_RUNNABLE();
2055
2056 #if 0
2057   IF_DEBUG(scheduler,printTSO(tso));
2058 #endif
2059 }
2060
2061 void scheduleThread(StgTSO* tso)
2062 {
2063   ACQUIRE_LOCK(&sched_mutex);
2064   scheduleThread_(tso);
2065   RELEASE_LOCK(&sched_mutex);
2066 }
2067
2068 SchedulerStatus
2069 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
2070 {       // Precondition: sched_mutex must be held
2071   StgMainThread *m;
2072
2073   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2074   m->tso = tso;
2075   m->ret = ret;
2076   m->stat = NoStatus;
2077 #if defined(RTS_SUPPORTS_THREADS)
2078   initCondition(&m->wakeup);
2079 #endif
2080
2081   /* Put the thread on the main-threads list prior to scheduling the TSO.
2082      Failure to do so introduces a race condition in the MT case (as
2083      identified by Wolfgang Thaller), whereby the new task/OS thread 
2084      created by scheduleThread_() would complete prior to the thread
2085      that spawned it managed to put 'itself' on the main-threads list.
2086      The upshot of it all being that the worker thread wouldn't get to
2087      signal the completion of the its work item for the main thread to
2088      see (==> it got stuck waiting.)    -- sof 6/02.
2089   */
2090   IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
2091   
2092   m->link = main_threads;
2093   main_threads = m;
2094
2095   scheduleThread_(tso);
2096 #if defined(THREADED_RTS)
2097   return waitThread_(m, rtsTrue);
2098 #else
2099   return waitThread_(m);
2100 #endif
2101 }
2102
2103 /* ---------------------------------------------------------------------------
2104  * initScheduler()
2105  *
2106  * Initialise the scheduler.  This resets all the queues - if the
2107  * queues contained any threads, they'll be garbage collected at the
2108  * next pass.
2109  *
2110  * ------------------------------------------------------------------------ */
2111
2112 #ifdef SMP
2113 static void
2114 term_handler(int sig STG_UNUSED)
2115 {
2116   stat_workerStop();
2117   ACQUIRE_LOCK(&term_mutex);
2118   await_death--;
2119   RELEASE_LOCK(&term_mutex);
2120   shutdownThread();
2121 }
2122 #endif
2123
2124 void 
2125 initScheduler(void)
2126 {
2127 #if defined(GRAN)
2128   nat i;
2129
2130   for (i=0; i<=MAX_PROC; i++) {
2131     run_queue_hds[i]      = END_TSO_QUEUE;
2132     run_queue_tls[i]      = END_TSO_QUEUE;
2133     blocked_queue_hds[i]  = END_TSO_QUEUE;
2134     blocked_queue_tls[i]  = END_TSO_QUEUE;
2135     ccalling_threadss[i]  = END_TSO_QUEUE;
2136     sleeping_queue        = END_TSO_QUEUE;
2137   }
2138 #else
2139   run_queue_hd      = END_TSO_QUEUE;
2140   run_queue_tl      = END_TSO_QUEUE;
2141   blocked_queue_hd  = END_TSO_QUEUE;
2142   blocked_queue_tl  = END_TSO_QUEUE;
2143   sleeping_queue    = END_TSO_QUEUE;
2144 #endif 
2145
2146   suspended_ccalling_threads  = END_TSO_QUEUE;
2147
2148   main_threads = NULL;
2149   all_threads  = END_TSO_QUEUE;
2150
2151   context_switch = 0;
2152   interrupted    = 0;
2153
2154   RtsFlags.ConcFlags.ctxtSwitchTicks =
2155       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2156       
2157 #if defined(RTS_SUPPORTS_THREADS)
2158   /* Initialise the mutex and condition variables used by
2159    * the scheduler. */
2160   initMutex(&sched_mutex);
2161   initMutex(&term_mutex);
2162   initMutex(&thread_id_mutex);
2163
2164   initCondition(&thread_ready_cond);
2165 #endif
2166   
2167 #if defined(SMP)
2168   initCondition(&gc_pending_cond);
2169 #endif
2170
2171 #if defined(RTS_SUPPORTS_THREADS)
2172   ACQUIRE_LOCK(&sched_mutex);
2173 #endif
2174
2175   /* Install the SIGHUP handler */
2176 #if defined(SMP)
2177   {
2178     struct sigaction action,oact;
2179
2180     action.sa_handler = term_handler;
2181     sigemptyset(&action.sa_mask);
2182     action.sa_flags = 0;
2183     if (sigaction(SIGTERM, &action, &oact) != 0) {
2184       barf("can't install TERM handler");
2185     }
2186   }
2187 #endif
2188
2189   /* A capability holds the state a native thread needs in
2190    * order to execute STG code. At least one capability is
2191    * floating around (only SMP builds have more than one).
2192    */
2193   initCapabilities();
2194   
2195 #if defined(RTS_SUPPORTS_THREADS)
2196     /* start our haskell execution tasks */
2197 # if defined(SMP)
2198     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2199 # else
2200     startTaskManager(0,taskStart);
2201 # endif
2202 #endif
2203
2204 #if /* defined(SMP) ||*/ defined(PAR)
2205   initSparkPools();
2206 #endif
2207
2208 #if defined(RTS_SUPPORTS_THREADS)
2209   RELEASE_LOCK(&sched_mutex);
2210 #endif
2211
2212 }
2213
2214 void
2215 exitScheduler( void )
2216 {
2217 #if defined(RTS_SUPPORTS_THREADS)
2218   stopTaskManager();
2219 #endif
2220   shutting_down_scheduler = rtsTrue;
2221 }
2222
2223 /* -----------------------------------------------------------------------------
2224    Managing the per-task allocation areas.
2225    
2226    Each capability comes with an allocation area.  These are
2227    fixed-length block lists into which allocation can be done.
2228
2229    ToDo: no support for two-space collection at the moment???
2230    -------------------------------------------------------------------------- */
2231
2232 /* -----------------------------------------------------------------------------
2233  * waitThread is the external interface for running a new computation
2234  * and waiting for the result.
2235  *
2236  * In the non-SMP case, we create a new main thread, push it on the 
2237  * main-thread stack, and invoke the scheduler to run it.  The
2238  * scheduler will return when the top main thread on the stack has
2239  * completed or died, and fill in the necessary fields of the
2240  * main_thread structure.
2241  *
2242  * In the SMP case, we create a main thread as before, but we then
2243  * create a new condition variable and sleep on it.  When our new
2244  * main thread has completed, we'll be woken up and the status/result
2245  * will be in the main_thread struct.
2246  * -------------------------------------------------------------------------- */
2247
2248 int 
2249 howManyThreadsAvail ( void )
2250 {
2251    int i = 0;
2252    StgTSO* q;
2253    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2254       i++;
2255    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2256       i++;
2257    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2258       i++;
2259    return i;
2260 }
2261
2262 void
2263 finishAllThreads ( void )
2264 {
2265    do {
2266       while (run_queue_hd != END_TSO_QUEUE) {
2267          waitThread ( run_queue_hd, NULL);
2268       }
2269       while (blocked_queue_hd != END_TSO_QUEUE) {
2270          waitThread ( blocked_queue_hd, NULL);
2271       }
2272       while (sleeping_queue != END_TSO_QUEUE) {
2273          waitThread ( blocked_queue_hd, NULL);
2274       }
2275    } while 
2276       (blocked_queue_hd != END_TSO_QUEUE || 
2277        run_queue_hd     != END_TSO_QUEUE ||
2278        sleeping_queue   != END_TSO_QUEUE);
2279 }
2280
2281 SchedulerStatus
2282 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2283
2284   StgMainThread *m;
2285   SchedulerStatus stat;
2286
2287   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2288   m->tso = tso;
2289   m->ret = ret;
2290   m->stat = NoStatus;
2291 #if defined(RTS_SUPPORTS_THREADS)
2292   initCondition(&m->wakeup);
2293 #endif
2294
2295   /* see scheduleWaitThread() comment */
2296   ACQUIRE_LOCK(&sched_mutex);
2297   m->link = main_threads;
2298   main_threads = m;
2299
2300   IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
2301 #if defined(THREADED_RTS)
2302   stat = waitThread_(m, rtsFalse);
2303 #else
2304   stat = waitThread_(m);
2305 #endif
2306   RELEASE_LOCK(&sched_mutex);
2307   return stat;
2308 }
2309
2310 static
2311 SchedulerStatus
2312 waitThread_(StgMainThread* m
2313 #if defined(THREADED_RTS)
2314             , rtsBool blockWaiting
2315 #endif
2316            )
2317 {
2318   SchedulerStatus stat;
2319
2320   // Precondition: sched_mutex must be held.
2321   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2322
2323 #if defined(RTS_SUPPORTS_THREADS)
2324
2325 # if defined(THREADED_RTS)
2326   if (!blockWaiting) {
2327     /* In the threaded case, the OS thread that called main()
2328      * gets to enter the RTS directly without going via another
2329      * task/thread.
2330      */
2331     main_main_thread = m;
2332     RELEASE_LOCK(&sched_mutex);
2333     schedule();
2334     ACQUIRE_LOCK(&sched_mutex);
2335     main_main_thread = NULL;
2336     ASSERT(m->stat != NoStatus);
2337   } else 
2338 # endif
2339   {
2340     do {
2341       waitCondition(&m->wakeup, &sched_mutex);
2342     } while (m->stat == NoStatus);
2343   }
2344 #elif defined(GRAN)
2345   /* GranSim specific init */
2346   CurrentTSO = m->tso;                // the TSO to run
2347   procStatus[MainProc] = Busy;        // status of main PE
2348   CurrentProc = MainProc;             // PE to run it on
2349
2350   RELEASE_LOCK(&sched_mutex);
2351   schedule();
2352 #else
2353   RELEASE_LOCK(&sched_mutex);
2354   schedule();
2355   ASSERT(m->stat != NoStatus);
2356 #endif
2357
2358   stat = m->stat;
2359
2360 #if defined(RTS_SUPPORTS_THREADS)
2361   closeCondition(&m->wakeup);
2362 #endif
2363
2364   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2365                               m->tso->id));
2366   free(m);
2367
2368   // Postcondition: sched_mutex still held
2369   return stat;
2370 }
2371
2372 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2373 //@subsection Run queue code 
2374
2375 #if 0
2376 /* 
2377    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2378        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2379        implicit global variable that has to be correct when calling these
2380        fcts -- HWL 
2381 */
2382
2383 /* Put the new thread on the head of the runnable queue.
2384  * The caller of createThread better push an appropriate closure
2385  * on this thread's stack before the scheduler is invoked.
2386  */
2387 static /* inline */ void
2388 add_to_run_queue(tso)
2389 StgTSO* tso; 
2390 {
2391   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2392   tso->link = run_queue_hd;
2393   run_queue_hd = tso;
2394   if (run_queue_tl == END_TSO_QUEUE) {
2395     run_queue_tl = tso;
2396   }
2397 }
2398
2399 /* Put the new thread at the end of the runnable queue. */
2400 static /* inline */ void
2401 push_on_run_queue(tso)
2402 StgTSO* tso; 
2403 {
2404   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2405   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2406   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2407   if (run_queue_hd == END_TSO_QUEUE) {
2408     run_queue_hd = tso;
2409   } else {
2410     run_queue_tl->link = tso;
2411   }
2412   run_queue_tl = tso;
2413 }
2414
2415 /* 
2416    Should be inlined because it's used very often in schedule.  The tso
2417    argument is actually only needed in GranSim, where we want to have the
2418    possibility to schedule *any* TSO on the run queue, irrespective of the
2419    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2420    the run queue and dequeue the tso, adjusting the links in the queue. 
2421 */
2422 //@cindex take_off_run_queue
2423 static /* inline */ StgTSO*
2424 take_off_run_queue(StgTSO *tso) {
2425   StgTSO *t, *prev;
2426
2427   /* 
2428      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2429
2430      if tso is specified, unlink that tso from the run_queue (doesn't have
2431      to be at the beginning of the queue); GranSim only 
2432   */
2433   if (tso!=END_TSO_QUEUE) {
2434     /* find tso in queue */
2435     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2436          t!=END_TSO_QUEUE && t!=tso;
2437          prev=t, t=t->link) 
2438       /* nothing */ ;
2439     ASSERT(t==tso);
2440     /* now actually dequeue the tso */
2441     if (prev!=END_TSO_QUEUE) {
2442       ASSERT(run_queue_hd!=t);
2443       prev->link = t->link;
2444     } else {
2445       /* t is at beginning of thread queue */
2446       ASSERT(run_queue_hd==t);
2447       run_queue_hd = t->link;
2448     }
2449     /* t is at end of thread queue */
2450     if (t->link==END_TSO_QUEUE) {
2451       ASSERT(t==run_queue_tl);
2452       run_queue_tl = prev;
2453     } else {
2454       ASSERT(run_queue_tl!=t);
2455     }
2456     t->link = END_TSO_QUEUE;
2457   } else {
2458     /* take tso from the beginning of the queue; std concurrent code */
2459     t = run_queue_hd;
2460     if (t != END_TSO_QUEUE) {
2461       run_queue_hd = t->link;
2462       t->link = END_TSO_QUEUE;
2463       if (run_queue_hd == END_TSO_QUEUE) {
2464         run_queue_tl = END_TSO_QUEUE;
2465       }
2466     }
2467   }
2468   return t;
2469 }
2470
2471 #endif /* 0 */
2472
2473 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2474 //@subsection Garbage Collextion Routines
2475
2476 /* ---------------------------------------------------------------------------
2477    Where are the roots that we know about?
2478
2479         - all the threads on the runnable queue
2480         - all the threads on the blocked queue
2481         - all the threads on the sleeping queue
2482         - all the thread currently executing a _ccall_GC
2483         - all the "main threads"
2484      
2485    ------------------------------------------------------------------------ */
2486
2487 /* This has to be protected either by the scheduler monitor, or by the
2488         garbage collection monitor (probably the latter).
2489         KH @ 25/10/99
2490 */
2491
2492 void
2493 GetRoots(evac_fn evac)
2494 {
2495 #if defined(GRAN)
2496   {
2497     nat i;
2498     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2499       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2500           evac((StgClosure **)&run_queue_hds[i]);
2501       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2502           evac((StgClosure **)&run_queue_tls[i]);
2503       
2504       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2505           evac((StgClosure **)&blocked_queue_hds[i]);
2506       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2507           evac((StgClosure **)&blocked_queue_tls[i]);
2508       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2509           evac((StgClosure **)&ccalling_threads[i]);
2510     }
2511   }
2512
2513   markEventQueue();
2514
2515 #else /* !GRAN */
2516   if (run_queue_hd != END_TSO_QUEUE) {
2517       ASSERT(run_queue_tl != END_TSO_QUEUE);
2518       evac((StgClosure **)&run_queue_hd);
2519       evac((StgClosure **)&run_queue_tl);
2520   }
2521   
2522   if (blocked_queue_hd != END_TSO_QUEUE) {
2523       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2524       evac((StgClosure **)&blocked_queue_hd);
2525       evac((StgClosure **)&blocked_queue_tl);
2526   }
2527   
2528   if (sleeping_queue != END_TSO_QUEUE) {
2529       evac((StgClosure **)&sleeping_queue);
2530   }
2531 #endif 
2532
2533   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2534       evac((StgClosure **)&suspended_ccalling_threads);
2535   }
2536
2537 #if defined(PAR) || defined(GRAN)
2538   markSparkQueue(evac);
2539 #endif
2540
2541 #ifndef mingw32_TARGET_OS
2542   // mark the signal handlers (signals should be already blocked)
2543   markSignalHandlers(evac);
2544 #endif
2545
2546   // main threads which have completed need to be retained until they
2547   // are dealt with in the main scheduler loop.  They won't be
2548   // retained any other way: the GC will drop them from the
2549   // all_threads list, so we have to be careful to treat them as roots
2550   // here.
2551   { 
2552       StgMainThread *m;
2553       for (m = main_threads; m != NULL; m = m->link) {
2554           switch (m->tso->what_next) {
2555           case ThreadComplete:
2556           case ThreadKilled:
2557               evac((StgClosure **)&m->tso);
2558               break;
2559           default:
2560               break;
2561           }
2562       }
2563   }
2564 }
2565
2566 /* -----------------------------------------------------------------------------
2567    performGC
2568
2569    This is the interface to the garbage collector from Haskell land.
2570    We provide this so that external C code can allocate and garbage
2571    collect when called from Haskell via _ccall_GC.
2572
2573    It might be useful to provide an interface whereby the programmer
2574    can specify more roots (ToDo).
2575    
2576    This needs to be protected by the GC condition variable above.  KH.
2577    -------------------------------------------------------------------------- */
2578
2579 static void (*extra_roots)(evac_fn);
2580
2581 void
2582 performGC(void)
2583 {
2584   /* Obligated to hold this lock upon entry */
2585   ACQUIRE_LOCK(&sched_mutex);
2586   GarbageCollect(GetRoots,rtsFalse);
2587   RELEASE_LOCK(&sched_mutex);
2588 }
2589
2590 void
2591 performMajorGC(void)
2592 {
2593   ACQUIRE_LOCK(&sched_mutex);
2594   GarbageCollect(GetRoots,rtsTrue);
2595   RELEASE_LOCK(&sched_mutex);
2596 }
2597
2598 static void
2599 AllRoots(evac_fn evac)
2600 {
2601     GetRoots(evac);             // the scheduler's roots
2602     extra_roots(evac);          // the user's roots
2603 }
2604
2605 void
2606 performGCWithRoots(void (*get_roots)(evac_fn))
2607 {
2608   ACQUIRE_LOCK(&sched_mutex);
2609   extra_roots = get_roots;
2610   GarbageCollect(AllRoots,rtsFalse);
2611   RELEASE_LOCK(&sched_mutex);
2612 }
2613
2614 /* -----------------------------------------------------------------------------
2615    Stack overflow
2616
2617    If the thread has reached its maximum stack size, then raise the
2618    StackOverflow exception in the offending thread.  Otherwise
2619    relocate the TSO into a larger chunk of memory and adjust its stack
2620    size appropriately.
2621    -------------------------------------------------------------------------- */
2622
2623 static StgTSO *
2624 threadStackOverflow(StgTSO *tso)
2625 {
2626   nat new_stack_size, new_tso_size, diff, stack_words;
2627   StgPtr new_sp;
2628   StgTSO *dest;
2629
2630   IF_DEBUG(sanity,checkTSO(tso));
2631   if (tso->stack_size >= tso->max_stack_size) {
2632
2633     IF_DEBUG(gc,
2634              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2635                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2636              /* If we're debugging, just print out the top of the stack */
2637              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2638                                               tso->sp+64)));
2639
2640     /* Send this thread the StackOverflow exception */
2641     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2642     return tso;
2643   }
2644
2645   /* Try to double the current stack size.  If that takes us over the
2646    * maximum stack size for this thread, then use the maximum instead.
2647    * Finally round up so the TSO ends up as a whole number of blocks.
2648    */
2649   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2650   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2651                                        TSO_STRUCT_SIZE)/sizeof(W_);
2652   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2653   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2654
2655   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2656
2657   dest = (StgTSO *)allocate(new_tso_size);
2658   TICK_ALLOC_TSO(new_stack_size,0);
2659
2660   /* copy the TSO block and the old stack into the new area */
2661   memcpy(dest,tso,TSO_STRUCT_SIZE);
2662   stack_words = tso->stack + tso->stack_size - tso->sp;
2663   new_sp = (P_)dest + new_tso_size - stack_words;
2664   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2665
2666   /* relocate the stack pointers... */
2667   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2668   dest->sp    = new_sp;
2669   dest->stack_size = new_stack_size;
2670         
2671   /* Mark the old TSO as relocated.  We have to check for relocated
2672    * TSOs in the garbage collector and any primops that deal with TSOs.
2673    *
2674    * It's important to set the sp value to just beyond the end
2675    * of the stack, so we don't attempt to scavenge any part of the
2676    * dead TSO's stack.
2677    */
2678   tso->what_next = ThreadRelocated;
2679   tso->link = dest;
2680   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2681   tso->why_blocked = NotBlocked;
2682   dest->mut_link = NULL;
2683
2684   IF_PAR_DEBUG(verbose,
2685                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2686                      tso->id, tso, tso->stack_size);
2687                /* If we're debugging, just print out the top of the stack */
2688                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2689                                                 tso->sp+64)));
2690   
2691   IF_DEBUG(sanity,checkTSO(tso));
2692 #if 0
2693   IF_DEBUG(scheduler,printTSO(dest));
2694 #endif
2695
2696   return dest;
2697 }
2698
2699 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2700 //@subsection Blocking Queue Routines
2701
2702 /* ---------------------------------------------------------------------------
2703    Wake up a queue that was blocked on some resource.
2704    ------------------------------------------------------------------------ */
2705
2706 #if defined(GRAN)
2707 static inline void
2708 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2709 {
2710 }
2711 #elif defined(PAR)
2712 static inline void
2713 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2714 {
2715   /* write RESUME events to log file and
2716      update blocked and fetch time (depending on type of the orig closure) */
2717   if (RtsFlags.ParFlags.ParStats.Full) {
2718     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2719                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2720                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2721     if (EMPTY_RUN_QUEUE())
2722       emitSchedule = rtsTrue;
2723
2724     switch (get_itbl(node)->type) {
2725         case FETCH_ME_BQ:
2726           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2727           break;
2728         case RBH:
2729         case FETCH_ME:
2730         case BLACKHOLE_BQ:
2731           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2732           break;
2733 #ifdef DIST
2734         case MVAR:
2735           break;
2736 #endif    
2737         default:
2738           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2739         }
2740       }
2741 }
2742 #endif
2743
2744 #if defined(GRAN)
2745 static StgBlockingQueueElement *
2746 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2747 {
2748     StgTSO *tso;
2749     PEs node_loc, tso_loc;
2750
2751     node_loc = where_is(node); // should be lifted out of loop
2752     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2753     tso_loc = where_is((StgClosure *)tso);
2754     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2755       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2756       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2757       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2758       // insertThread(tso, node_loc);
2759       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2760                 ResumeThread,
2761                 tso, node, (rtsSpark*)NULL);
2762       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2763       // len_local++;
2764       // len++;
2765     } else { // TSO is remote (actually should be FMBQ)
2766       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2767                                   RtsFlags.GranFlags.Costs.gunblocktime +
2768                                   RtsFlags.GranFlags.Costs.latency;
2769       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2770                 UnblockThread,
2771                 tso, node, (rtsSpark*)NULL);
2772       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2773       // len++;
2774     }
2775     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2776     IF_GRAN_DEBUG(bq,
2777                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2778                           (node_loc==tso_loc ? "Local" : "Global"), 
2779                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2780     tso->block_info.closure = NULL;
2781     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2782                              tso->id, tso));
2783 }
2784 #elif defined(PAR)
2785 static StgBlockingQueueElement *
2786 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2787 {
2788     StgBlockingQueueElement *next;
2789
2790     switch (get_itbl(bqe)->type) {
2791     case TSO:
2792       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2793       /* if it's a TSO just push it onto the run_queue */
2794       next = bqe->link;
2795       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2796       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2797       THREAD_RUNNABLE();
2798       unblockCount(bqe, node);
2799       /* reset blocking status after dumping event */
2800       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2801       break;
2802
2803     case BLOCKED_FETCH:
2804       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2805       next = bqe->link;
2806       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2807       PendingFetches = (StgBlockedFetch *)bqe;
2808       break;
2809
2810 # if defined(DEBUG)
2811       /* can ignore this case in a non-debugging setup; 
2812          see comments on RBHSave closures above */
2813     case CONSTR:
2814       /* check that the closure is an RBHSave closure */
2815       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2816              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2817              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2818       break;
2819
2820     default:
2821       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2822            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2823            (StgClosure *)bqe);
2824 # endif
2825     }
2826   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2827   return next;
2828 }
2829
2830 #else /* !GRAN && !PAR */
2831 static StgTSO *
2832 unblockOneLocked(StgTSO *tso)
2833 {
2834   StgTSO *next;
2835
2836   ASSERT(get_itbl(tso)->type == TSO);
2837   ASSERT(tso->why_blocked != NotBlocked);
2838   tso->why_blocked = NotBlocked;
2839   next = tso->link;
2840   PUSH_ON_RUN_QUEUE(tso);
2841   THREAD_RUNNABLE();
2842   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2843   return next;
2844 }
2845 #endif
2846
2847 #if defined(GRAN) || defined(PAR)
2848 inline StgBlockingQueueElement *
2849 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2850 {
2851   ACQUIRE_LOCK(&sched_mutex);
2852   bqe = unblockOneLocked(bqe, node);
2853   RELEASE_LOCK(&sched_mutex);
2854   return bqe;
2855 }
2856 #else
2857 inline StgTSO *
2858 unblockOne(StgTSO *tso)
2859 {
2860   ACQUIRE_LOCK(&sched_mutex);
2861   tso = unblockOneLocked(tso);
2862   RELEASE_LOCK(&sched_mutex);
2863   return tso;
2864 }
2865 #endif
2866
2867 #if defined(GRAN)
2868 void 
2869 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2870 {
2871   StgBlockingQueueElement *bqe;
2872   PEs node_loc;
2873   nat len = 0; 
2874
2875   IF_GRAN_DEBUG(bq, 
2876                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2877                       node, CurrentProc, CurrentTime[CurrentProc], 
2878                       CurrentTSO->id, CurrentTSO));
2879
2880   node_loc = where_is(node);
2881
2882   ASSERT(q == END_BQ_QUEUE ||
2883          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2884          get_itbl(q)->type == CONSTR); // closure (type constructor)
2885   ASSERT(is_unique(node));
2886
2887   /* FAKE FETCH: magically copy the node to the tso's proc;
2888      no Fetch necessary because in reality the node should not have been 
2889      moved to the other PE in the first place
2890   */
2891   if (CurrentProc!=node_loc) {
2892     IF_GRAN_DEBUG(bq, 
2893                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2894                         node, node_loc, CurrentProc, CurrentTSO->id, 
2895                         // CurrentTSO, where_is(CurrentTSO),
2896                         node->header.gran.procs));
2897     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2898     IF_GRAN_DEBUG(bq, 
2899                   belch("## new bitmask of node %p is %#x",
2900                         node, node->header.gran.procs));
2901     if (RtsFlags.GranFlags.GranSimStats.Global) {
2902       globalGranStats.tot_fake_fetches++;
2903     }
2904   }
2905
2906   bqe = q;
2907   // ToDo: check: ASSERT(CurrentProc==node_loc);
2908   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2909     //next = bqe->link;
2910     /* 
2911        bqe points to the current element in the queue
2912        next points to the next element in the queue
2913     */
2914     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2915     //tso_loc = where_is(tso);
2916     len++;
2917     bqe = unblockOneLocked(bqe, node);
2918   }
2919
2920   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2921      the closure to make room for the anchor of the BQ */
2922   if (bqe!=END_BQ_QUEUE) {
2923     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2924     /*
2925     ASSERT((info_ptr==&RBH_Save_0_info) ||
2926            (info_ptr==&RBH_Save_1_info) ||
2927            (info_ptr==&RBH_Save_2_info));
2928     */
2929     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2930     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2931     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2932
2933     IF_GRAN_DEBUG(bq,
2934                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2935                         node, info_type(node)));
2936   }
2937
2938   /* statistics gathering */
2939   if (RtsFlags.GranFlags.GranSimStats.Global) {
2940     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2941     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2942     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2943     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2944   }
2945   IF_GRAN_DEBUG(bq,
2946                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2947                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2948 }
2949 #elif defined(PAR)
2950 void 
2951 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2952 {
2953   StgBlockingQueueElement *bqe;
2954
2955   ACQUIRE_LOCK(&sched_mutex);
2956
2957   IF_PAR_DEBUG(verbose, 
2958                belch("##-_ AwBQ for node %p on [%x]: ",
2959                      node, mytid));
2960 #ifdef DIST  
2961   //RFP
2962   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2963     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2964     return;
2965   }
2966 #endif
2967   
2968   ASSERT(q == END_BQ_QUEUE ||
2969          get_itbl(q)->type == TSO ||           
2970          get_itbl(q)->type == BLOCKED_FETCH || 
2971          get_itbl(q)->type == CONSTR); 
2972
2973   bqe = q;
2974   while (get_itbl(bqe)->type==TSO || 
2975          get_itbl(bqe)->type==BLOCKED_FETCH) {
2976     bqe = unblockOneLocked(bqe, node);
2977   }
2978   RELEASE_LOCK(&sched_mutex);
2979 }
2980
2981 #else   /* !GRAN && !PAR */
2982
2983 #ifdef RTS_SUPPORTS_THREADS
2984 void
2985 awakenBlockedQueueNoLock(StgTSO *tso)
2986 {
2987   while (tso != END_TSO_QUEUE) {
2988     tso = unblockOneLocked(tso);
2989   }
2990 }
2991 #endif
2992
2993 void
2994 awakenBlockedQueue(StgTSO *tso)
2995 {
2996   ACQUIRE_LOCK(&sched_mutex);
2997   while (tso != END_TSO_QUEUE) {
2998     tso = unblockOneLocked(tso);
2999   }
3000   RELEASE_LOCK(&sched_mutex);
3001 }
3002 #endif
3003
3004 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
3005 //@subsection Exception Handling Routines
3006
3007 /* ---------------------------------------------------------------------------
3008    Interrupt execution
3009    - usually called inside a signal handler so it mustn't do anything fancy.   
3010    ------------------------------------------------------------------------ */
3011
3012 void
3013 interruptStgRts(void)
3014 {
3015     interrupted    = 1;
3016     context_switch = 1;
3017 }
3018
3019 /* -----------------------------------------------------------------------------
3020    Unblock a thread
3021
3022    This is for use when we raise an exception in another thread, which
3023    may be blocked.
3024    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3025    -------------------------------------------------------------------------- */
3026
3027 #if defined(GRAN) || defined(PAR)
3028 /*
3029   NB: only the type of the blocking queue is different in GranSim and GUM
3030       the operations on the queue-elements are the same
3031       long live polymorphism!
3032
3033   Locks: sched_mutex is held upon entry and exit.
3034
3035 */
3036 static void
3037 unblockThread(StgTSO *tso)
3038 {
3039   StgBlockingQueueElement *t, **last;
3040
3041   switch (tso->why_blocked) {
3042
3043   case NotBlocked:
3044     return;  /* not blocked */
3045
3046   case BlockedOnMVar:
3047     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3048     {
3049       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3050       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3051
3052       last = (StgBlockingQueueElement **)&mvar->head;
3053       for (t = (StgBlockingQueueElement *)mvar->head; 
3054            t != END_BQ_QUEUE; 
3055            last = &t->link, last_tso = t, t = t->link) {
3056         if (t == (StgBlockingQueueElement *)tso) {
3057           *last = (StgBlockingQueueElement *)tso->link;
3058           if (mvar->tail == tso) {
3059             mvar->tail = (StgTSO *)last_tso;
3060           }
3061           goto done;
3062         }
3063       }
3064       barf("unblockThread (MVAR): TSO not found");
3065     }
3066
3067   case BlockedOnBlackHole:
3068     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3069     {
3070       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3071
3072       last = &bq->blocking_queue;
3073       for (t = bq->blocking_queue; 
3074            t != END_BQ_QUEUE; 
3075            last = &t->link, t = t->link) {
3076         if (t == (StgBlockingQueueElement *)tso) {
3077           *last = (StgBlockingQueueElement *)tso->link;
3078           goto done;
3079         }
3080       }
3081       barf("unblockThread (BLACKHOLE): TSO not found");
3082     }
3083
3084   case BlockedOnException:
3085     {
3086       StgTSO *target  = tso->block_info.tso;
3087
3088       ASSERT(get_itbl(target)->type == TSO);
3089
3090       if (target->what_next == ThreadRelocated) {
3091           target = target->link;
3092           ASSERT(get_itbl(target)->type == TSO);
3093       }
3094
3095       ASSERT(target->blocked_exceptions != NULL);
3096
3097       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3098       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3099            t != END_BQ_QUEUE; 
3100            last = &t->link, t = t->link) {
3101         ASSERT(get_itbl(t)->type == TSO);
3102         if (t == (StgBlockingQueueElement *)tso) {
3103           *last = (StgBlockingQueueElement *)tso->link;
3104           goto done;
3105         }
3106       }
3107       barf("unblockThread (Exception): TSO not found");
3108     }
3109
3110   case BlockedOnRead:
3111   case BlockedOnWrite:
3112     {
3113       /* take TSO off blocked_queue */
3114       StgBlockingQueueElement *prev = NULL;
3115       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3116            prev = t, t = t->link) {
3117         if (t == (StgBlockingQueueElement *)tso) {
3118           if (prev == NULL) {
3119             blocked_queue_hd = (StgTSO *)t->link;
3120             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3121               blocked_queue_tl = END_TSO_QUEUE;
3122             }
3123           } else {
3124             prev->link = t->link;
3125             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3126               blocked_queue_tl = (StgTSO *)prev;
3127             }
3128           }
3129           goto done;
3130         }
3131       }
3132       barf("unblockThread (I/O): TSO not found");
3133     }
3134
3135   case BlockedOnDelay:
3136     {
3137       /* take TSO off sleeping_queue */
3138       StgBlockingQueueElement *prev = NULL;
3139       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3140            prev = t, t = t->link) {
3141         if (t == (StgBlockingQueueElement *)tso) {
3142           if (prev == NULL) {
3143             sleeping_queue = (StgTSO *)t->link;
3144           } else {
3145             prev->link = t->link;
3146           }
3147           goto done;
3148         }
3149       }
3150       barf("unblockThread (I/O): TSO not found");
3151     }
3152
3153   default:
3154     barf("unblockThread");
3155   }
3156
3157  done:
3158   tso->link = END_TSO_QUEUE;
3159   tso->why_blocked = NotBlocked;
3160   tso->block_info.closure = NULL;
3161   PUSH_ON_RUN_QUEUE(tso);
3162 }
3163 #else
3164 static void
3165 unblockThread(StgTSO *tso)
3166 {
3167   StgTSO *t, **last;
3168   
3169   /* To avoid locking unnecessarily. */
3170   if (tso->why_blocked == NotBlocked) {
3171     return;
3172   }
3173
3174   switch (tso->why_blocked) {
3175
3176   case BlockedOnMVar:
3177     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3178     {
3179       StgTSO *last_tso = END_TSO_QUEUE;
3180       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3181
3182       last = &mvar->head;
3183       for (t = mvar->head; t != END_TSO_QUEUE; 
3184            last = &t->link, last_tso = t, t = t->link) {
3185         if (t == tso) {
3186           *last = tso->link;
3187           if (mvar->tail == tso) {
3188             mvar->tail = last_tso;
3189           }
3190           goto done;
3191         }
3192       }
3193       barf("unblockThread (MVAR): TSO not found");
3194     }
3195
3196   case BlockedOnBlackHole:
3197     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3198     {
3199       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3200
3201       last = &bq->blocking_queue;
3202       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3203            last = &t->link, t = t->link) {
3204         if (t == tso) {
3205           *last = tso->link;
3206           goto done;
3207         }
3208       }
3209       barf("unblockThread (BLACKHOLE): TSO not found");
3210     }
3211
3212   case BlockedOnException:
3213     {
3214       StgTSO *target  = tso->block_info.tso;
3215
3216       ASSERT(get_itbl(target)->type == TSO);
3217
3218       while (target->what_next == ThreadRelocated) {
3219           target = target->link;
3220           ASSERT(get_itbl(target)->type == TSO);
3221       }
3222       
3223       ASSERT(target->blocked_exceptions != NULL);
3224
3225       last = &target->blocked_exceptions;
3226       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3227            last = &t->link, t = t->link) {
3228         ASSERT(get_itbl(t)->type == TSO);
3229         if (t == tso) {
3230           *last = tso->link;
3231           goto done;
3232         }
3233       }
3234       barf("unblockThread (Exception): TSO not found");
3235     }
3236
3237   case BlockedOnRead:
3238   case BlockedOnWrite:
3239     {
3240       StgTSO *prev = NULL;
3241       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3242            prev = t, t = t->link) {
3243         if (t == tso) {
3244           if (prev == NULL) {
3245             blocked_queue_hd = t->link;
3246             if (blocked_queue_tl == t) {
3247               blocked_queue_tl = END_TSO_QUEUE;
3248             }
3249           } else {
3250             prev->link = t->link;
3251             if (blocked_queue_tl == t) {
3252               blocked_queue_tl = prev;
3253             }
3254           }
3255           goto done;
3256         }
3257       }
3258       barf("unblockThread (I/O): TSO not found");
3259     }
3260
3261   case BlockedOnDelay:
3262     {
3263       StgTSO *prev = NULL;
3264       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3265            prev = t, t = t->link) {
3266         if (t == tso) {
3267           if (prev == NULL) {
3268             sleeping_queue = t->link;
3269           } else {
3270             prev->link = t->link;
3271           }
3272           goto done;
3273         }
3274       }
3275       barf("unblockThread (I/O): TSO not found");
3276     }
3277
3278   default:
3279     barf("unblockThread");
3280   }
3281
3282  done:
3283   tso->link = END_TSO_QUEUE;
3284   tso->why_blocked = NotBlocked;
3285   tso->block_info.closure = NULL;
3286   PUSH_ON_RUN_QUEUE(tso);
3287 }
3288 #endif
3289
3290 /* -----------------------------------------------------------------------------
3291  * raiseAsync()
3292  *
3293  * The following function implements the magic for raising an
3294  * asynchronous exception in an existing thread.
3295  *
3296  * We first remove the thread from any queue on which it might be
3297  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3298  *
3299  * We strip the stack down to the innermost CATCH_FRAME, building
3300  * thunks in the heap for all the active computations, so they can 
3301  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3302  * an application of the handler to the exception, and push it on
3303  * the top of the stack.
3304  * 
3305  * How exactly do we save all the active computations?  We create an
3306  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3307  * AP_STACKs pushes everything from the corresponding update frame
3308  * upwards onto the stack.  (Actually, it pushes everything up to the
3309  * next update frame plus a pointer to the next AP_STACK object.
3310  * Entering the next AP_STACK object pushes more onto the stack until we
3311  * reach the last AP_STACK object - at which point the stack should look
3312  * exactly as it did when we killed the TSO and we can continue
3313  * execution by entering the closure on top of the stack.
3314  *
3315  * We can also kill a thread entirely - this happens if either (a) the 
3316  * exception passed to raiseAsync is NULL, or (b) there's no
3317  * CATCH_FRAME on the stack.  In either case, we strip the entire
3318  * stack and replace the thread with a zombie.
3319  *
3320  * Locks: sched_mutex held upon entry nor exit.
3321  *
3322  * -------------------------------------------------------------------------- */
3323  
3324 void 
3325 deleteThread(StgTSO *tso)
3326 {
3327   raiseAsync(tso,NULL);
3328 }
3329
3330 void
3331 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3332 {
3333   /* When raising async exs from contexts where sched_mutex isn't held;
3334      use raiseAsyncWithLock(). */
3335   ACQUIRE_LOCK(&sched_mutex);
3336   raiseAsync(tso,exception);
3337   RELEASE_LOCK(&sched_mutex);
3338 }
3339
3340 void
3341 raiseAsync(StgTSO *tso, StgClosure *exception)
3342 {
3343     StgRetInfoTable *info;
3344     StgPtr sp;
3345   
3346     // Thread already dead?
3347     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3348         return;
3349     }
3350
3351     IF_DEBUG(scheduler, 
3352              sched_belch("raising exception in thread %ld.", tso->id));
3353     
3354     // Remove it from any blocking queues
3355     unblockThread(tso);
3356
3357     sp = tso->sp;
3358     
3359     // The stack freezing code assumes there's a closure pointer on
3360     // the top of the stack, so we have to arrange that this is the case...
3361     //
3362     if (sp[0] == (W_)&stg_enter_info) {
3363         sp++;
3364     } else {
3365         sp--;
3366         sp[0] = (W_)&stg_dummy_ret_closure;
3367     }
3368
3369     while (1) {
3370         nat i;
3371
3372         // 1. Let the top of the stack be the "current closure"
3373         //
3374         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3375         // CATCH_FRAME.
3376         //
3377         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3378         // current closure applied to the chunk of stack up to (but not
3379         // including) the update frame.  This closure becomes the "current
3380         // closure".  Go back to step 2.
3381         //
3382         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3383         // top of the stack applied to the exception.
3384         // 
3385         // 5. If it's a STOP_FRAME, then kill the thread.
3386         
3387         StgPtr frame;
3388         
3389         frame = sp + 1;
3390         info = get_ret_itbl((StgClosure *)frame);
3391         
3392         while (info->i.type != UPDATE_FRAME
3393                && (info->i.type != CATCH_FRAME || exception == NULL)
3394                && info->i.type != STOP_FRAME) {
3395             frame += stack_frame_sizeW((StgClosure *)frame);
3396             info = get_ret_itbl((StgClosure *)frame);
3397         }
3398         
3399         switch (info->i.type) {
3400             
3401         case CATCH_FRAME:
3402             // If we find a CATCH_FRAME, and we've got an exception to raise,
3403             // then build the THUNK raise(exception), and leave it on
3404             // top of the CATCH_FRAME ready to enter.
3405             //
3406         {
3407 #ifdef PROFILING
3408             StgCatchFrame *cf = (StgCatchFrame *)frame;
3409 #endif
3410             StgClosure *raise;
3411             
3412             // we've got an exception to raise, so let's pass it to the
3413             // handler in this frame.
3414             //
3415             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3416             TICK_ALLOC_SE_THK(1,0);
3417             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3418             raise->payload[0] = exception;
3419             
3420             // throw away the stack from Sp up to the CATCH_FRAME.
3421             //
3422             sp = frame - 1;
3423             
3424             /* Ensure that async excpetions are blocked now, so we don't get
3425              * a surprise exception before we get around to executing the
3426              * handler.
3427              */
3428             if (tso->blocked_exceptions == NULL) {
3429                 tso->blocked_exceptions = END_TSO_QUEUE;
3430             }
3431             
3432             /* Put the newly-built THUNK on top of the stack, ready to execute
3433              * when the thread restarts.
3434              */
3435             sp[0] = (W_)raise;
3436             sp[-1] = (W_)&stg_enter_info;
3437             tso->sp = sp-1;
3438             tso->what_next = ThreadRunGHC;
3439             IF_DEBUG(sanity, checkTSO(tso));
3440             return;
3441         }
3442         
3443         case UPDATE_FRAME:
3444         {
3445             StgAP_STACK * ap;
3446             nat words;
3447             
3448             // First build an AP_STACK consisting of the stack chunk above the
3449             // current update frame, with the top word on the stack as the
3450             // fun field.
3451             //
3452             words = frame - sp - 1;
3453             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3454             
3455             ap->size = words;
3456             ap->fun  = (StgClosure *)sp[0];
3457             sp++;
3458             for(i=0; i < (nat)words; ++i) {
3459                 ap->payload[i] = (StgClosure *)*sp++;
3460             }
3461             
3462             SET_HDR(ap,&stg_AP_STACK_info,
3463                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3464             TICK_ALLOC_UP_THK(words+1,0);
3465             
3466             IF_DEBUG(scheduler,
3467                      fprintf(stderr,  "scheduler: Updating ");
3468                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3469                      fprintf(stderr,  " with ");
3470                      printObj((StgClosure *)ap);
3471                 );
3472
3473             // Replace the updatee with an indirection - happily
3474             // this will also wake up any threads currently
3475             // waiting on the result.
3476             //
3477             // Warning: if we're in a loop, more than one update frame on
3478             // the stack may point to the same object.  Be careful not to
3479             // overwrite an IND_OLDGEN in this case, because we'll screw
3480             // up the mutable lists.  To be on the safe side, don't
3481             // overwrite any kind of indirection at all.  See also
3482             // threadSqueezeStack in GC.c, where we have to make a similar
3483             // check.
3484             //
3485             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3486                 // revert the black hole
3487                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3488             }
3489             sp += sizeofW(StgUpdateFrame) - 1;
3490             sp[0] = (W_)ap; // push onto stack
3491             break;
3492         }
3493         
3494         case STOP_FRAME:
3495             // We've stripped the entire stack, the thread is now dead.
3496             sp += sizeofW(StgStopFrame);
3497             tso->what_next = ThreadKilled;
3498             tso->sp = sp;
3499             return;
3500             
3501         default:
3502             barf("raiseAsync");
3503         }
3504     }
3505     barf("raiseAsync");
3506 }
3507
3508 /* -----------------------------------------------------------------------------
3509    resurrectThreads is called after garbage collection on the list of
3510    threads found to be garbage.  Each of these threads will be woken
3511    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3512    on an MVar, or NonTermination if the thread was blocked on a Black
3513    Hole.
3514
3515    Locks: sched_mutex isn't held upon entry nor exit.
3516    -------------------------------------------------------------------------- */
3517
3518 void
3519 resurrectThreads( StgTSO *threads )
3520 {
3521   StgTSO *tso, *next;
3522
3523   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3524     next = tso->global_link;
3525     tso->global_link = all_threads;
3526     all_threads = tso;
3527     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3528
3529     switch (tso->why_blocked) {
3530     case BlockedOnMVar:
3531     case BlockedOnException:
3532       /* Called by GC - sched_mutex lock is currently held. */
3533       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3534       break;
3535     case BlockedOnBlackHole:
3536       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3537       break;
3538     case NotBlocked:
3539       /* This might happen if the thread was blocked on a black hole
3540        * belonging to a thread that we've just woken up (raiseAsync
3541        * can wake up threads, remember...).
3542        */
3543       continue;
3544     default:
3545       barf("resurrectThreads: thread blocked in a strange way");
3546     }
3547   }
3548 }
3549
3550 /* -----------------------------------------------------------------------------
3551  * Blackhole detection: if we reach a deadlock, test whether any
3552  * threads are blocked on themselves.  Any threads which are found to
3553  * be self-blocked get sent a NonTermination exception.
3554  *
3555  * This is only done in a deadlock situation in order to avoid
3556  * performance overhead in the normal case.
3557  *
3558  * Locks: sched_mutex is held upon entry and exit.
3559  * -------------------------------------------------------------------------- */
3560
3561 static void
3562 detectBlackHoles( void )
3563 {
3564     StgTSO *tso = all_threads;
3565     StgClosure *frame;
3566     StgClosure *blocked_on;
3567     StgRetInfoTable *info;
3568
3569     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3570
3571         while (tso->what_next == ThreadRelocated) {
3572             tso = tso->link;
3573             ASSERT(get_itbl(tso)->type == TSO);
3574         }
3575       
3576         if (tso->why_blocked != BlockedOnBlackHole) {
3577             continue;
3578         }
3579         blocked_on = tso->block_info.closure;
3580
3581         frame = (StgClosure *)tso->sp;
3582
3583         while(1) {
3584             info = get_ret_itbl(frame);
3585             switch (info->i.type) {
3586             case UPDATE_FRAME:
3587                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3588                     /* We are blocking on one of our own computations, so
3589                      * send this thread the NonTermination exception.  
3590                      */
3591                     IF_DEBUG(scheduler, 
3592                              sched_belch("thread %d is blocked on itself", tso->id));
3593                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3594                     goto done;
3595                 }
3596                 
3597                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3598                 continue;
3599
3600             case STOP_FRAME:
3601                 goto done;
3602
3603                 // normal stack frames; do nothing except advance the pointer
3604             default:
3605                 (StgPtr)frame += stack_frame_sizeW(frame);
3606             }
3607         }   
3608         done: ;
3609     }
3610 }
3611
3612 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3613 //@subsection Debugging Routines
3614
3615 /* -----------------------------------------------------------------------------
3616  * Debugging: why is a thread blocked
3617  * [Also provides useful information when debugging threaded programs
3618  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3619    -------------------------------------------------------------------------- */
3620
3621 static
3622 void
3623 printThreadBlockage(StgTSO *tso)
3624 {
3625   switch (tso->why_blocked) {
3626   case BlockedOnRead:
3627     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3628     break;
3629   case BlockedOnWrite:
3630     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3631     break;
3632   case BlockedOnDelay:
3633     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3634     break;
3635   case BlockedOnMVar:
3636     fprintf(stderr,"is blocked on an MVar");
3637     break;
3638   case BlockedOnException:
3639     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3640             tso->block_info.tso->id);
3641     break;
3642   case BlockedOnBlackHole:
3643     fprintf(stderr,"is blocked on a black hole");
3644     break;
3645   case NotBlocked:
3646     fprintf(stderr,"is not blocked");
3647     break;
3648 #if defined(PAR)
3649   case BlockedOnGA:
3650     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3651             tso->block_info.closure, info_type(tso->block_info.closure));
3652     break;
3653   case BlockedOnGA_NoSend:
3654     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3655             tso->block_info.closure, info_type(tso->block_info.closure));
3656     break;
3657 #endif
3658 #if defined(RTS_SUPPORTS_THREADS)
3659   case BlockedOnCCall:
3660     fprintf(stderr,"is blocked on an external call");
3661     break;
3662   case BlockedOnCCall_NoUnblockExc:
3663     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3664     break;
3665 #endif
3666   default:
3667     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3668          tso->why_blocked, tso->id, tso);
3669   }
3670 }
3671
3672 static
3673 void
3674 printThreadStatus(StgTSO *tso)
3675 {
3676   switch (tso->what_next) {
3677   case ThreadKilled:
3678     fprintf(stderr,"has been killed");
3679     break;
3680   case ThreadComplete:
3681     fprintf(stderr,"has completed");
3682     break;
3683   default:
3684     printThreadBlockage(tso);
3685   }
3686 }
3687
3688 void
3689 printAllThreads(void)
3690 {
3691   StgTSO *t;
3692   void *label;
3693
3694 # if defined(GRAN)
3695   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3696   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3697                        time_string, rtsFalse/*no commas!*/);
3698
3699   fprintf(stderr, "all threads at [%s]:\n", time_string);
3700 # elif defined(PAR)
3701   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3702   ullong_format_string(CURRENT_TIME,
3703                        time_string, rtsFalse/*no commas!*/);
3704
3705   fprintf(stderr,"all threads at [%s]:\n", time_string);
3706 # else
3707   fprintf(stderr,"all threads:\n");
3708 # endif
3709
3710   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3711     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3712     label = lookupThreadLabel((StgWord)t);
3713     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3714     printThreadStatus(t);
3715     fprintf(stderr,"\n");
3716   }
3717 }
3718     
3719 #ifdef DEBUG
3720
3721 /* 
3722    Print a whole blocking queue attached to node (debugging only).
3723 */
3724 //@cindex print_bq
3725 # if defined(PAR)
3726 void 
3727 print_bq (StgClosure *node)
3728 {
3729   StgBlockingQueueElement *bqe;
3730   StgTSO *tso;
3731   rtsBool end;
3732
3733   fprintf(stderr,"## BQ of closure %p (%s): ",
3734           node, info_type(node));
3735
3736   /* should cover all closures that may have a blocking queue */
3737   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3738          get_itbl(node)->type == FETCH_ME_BQ ||
3739          get_itbl(node)->type == RBH ||
3740          get_itbl(node)->type == MVAR);
3741     
3742   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3743
3744   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3745 }
3746
3747 /* 
3748    Print a whole blocking queue starting with the element bqe.
3749 */
3750 void 
3751 print_bqe (StgBlockingQueueElement *bqe)
3752 {
3753   rtsBool end;
3754
3755   /* 
3756      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3757   */
3758   for (end = (bqe==END_BQ_QUEUE);
3759        !end; // iterate until bqe points to a CONSTR
3760        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3761        bqe = end ? END_BQ_QUEUE : bqe->link) {
3762     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3763     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3764     /* types of closures that may appear in a blocking queue */
3765     ASSERT(get_itbl(bqe)->type == TSO ||           
3766            get_itbl(bqe)->type == BLOCKED_FETCH || 
3767            get_itbl(bqe)->type == CONSTR); 
3768     /* only BQs of an RBH end with an RBH_Save closure */
3769     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3770
3771     switch (get_itbl(bqe)->type) {
3772     case TSO:
3773       fprintf(stderr," TSO %u (%x),",
3774               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3775       break;
3776     case BLOCKED_FETCH:
3777       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3778               ((StgBlockedFetch *)bqe)->node, 
3779               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3780               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3781               ((StgBlockedFetch *)bqe)->ga.weight);
3782       break;
3783     case CONSTR:
3784       fprintf(stderr," %s (IP %p),",
3785               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3786                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3787                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3788                "RBH_Save_?"), get_itbl(bqe));
3789       break;
3790     default:
3791       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3792            info_type((StgClosure *)bqe)); // , node, info_type(node));
3793       break;
3794     }
3795   } /* for */
3796   fputc('\n', stderr);
3797 }
3798 # elif defined(GRAN)
3799 void 
3800 print_bq (StgClosure *node)
3801 {
3802   StgBlockingQueueElement *bqe;
3803   PEs node_loc, tso_loc;
3804   rtsBool end;
3805
3806   /* should cover all closures that may have a blocking queue */
3807   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3808          get_itbl(node)->type == FETCH_ME_BQ ||
3809          get_itbl(node)->type == RBH);
3810     
3811   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3812   node_loc = where_is(node);
3813
3814   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3815           node, info_type(node), node_loc);
3816
3817   /* 
3818      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3819   */
3820   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3821        !end; // iterate until bqe points to a CONSTR
3822        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3823     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3824     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3825     /* types of closures that may appear in a blocking queue */
3826     ASSERT(get_itbl(bqe)->type == TSO ||           
3827            get_itbl(bqe)->type == CONSTR); 
3828     /* only BQs of an RBH end with an RBH_Save closure */
3829     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3830
3831     tso_loc = where_is((StgClosure *)bqe);
3832     switch (get_itbl(bqe)->type) {
3833     case TSO:
3834       fprintf(stderr," TSO %d (%p) on [PE %d],",
3835               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3836       break;
3837     case CONSTR:
3838       fprintf(stderr," %s (IP %p),",
3839               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3840                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3841                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3842                "RBH_Save_?"), get_itbl(bqe));
3843       break;
3844     default:
3845       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3846            info_type((StgClosure *)bqe), node, info_type(node));
3847       break;
3848     }
3849   } /* for */
3850   fputc('\n', stderr);
3851 }
3852 #else
3853 /* 
3854    Nice and easy: only TSOs on the blocking queue
3855 */
3856 void 
3857 print_bq (StgClosure *node)
3858 {
3859   StgTSO *tso;
3860
3861   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3862   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3863        tso != END_TSO_QUEUE; 
3864        tso=tso->link) {
3865     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3866     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3867     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3868   }
3869   fputc('\n', stderr);
3870 }
3871 # endif
3872
3873 #if defined(PAR)
3874 static nat
3875 run_queue_len(void)
3876 {
3877   nat i;
3878   StgTSO *tso;
3879
3880   for (i=0, tso=run_queue_hd; 
3881        tso != END_TSO_QUEUE;
3882        i++, tso=tso->link)
3883     /* nothing */
3884
3885   return i;
3886 }
3887 #endif
3888
3889 static void
3890 sched_belch(char *s, ...)
3891 {
3892   va_list ap;
3893   va_start(ap,s);
3894 #ifdef SMP
3895   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3896 #elif defined(PAR)
3897   fprintf(stderr, "== ");
3898 #else
3899   fprintf(stderr, "scheduler: ");
3900 #endif
3901   vfprintf(stderr, s, ap);
3902   fprintf(stderr, "\n");
3903   va_end(ap);
3904 }
3905
3906 #endif /* DEBUG */
3907
3908
3909 //@node Index,  , Debugging Routines, Main scheduling code
3910 //@subsection Index
3911
3912 //@index
3913 //* StgMainThread::  @cindex\s-+StgMainThread
3914 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3915 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3916 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3917 //* context_switch::  @cindex\s-+context_switch
3918 //* createThread::  @cindex\s-+createThread
3919 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3920 //* initScheduler::  @cindex\s-+initScheduler
3921 //* interrupted::  @cindex\s-+interrupted
3922 //* next_thread_id::  @cindex\s-+next_thread_id
3923 //* print_bq::  @cindex\s-+print_bq
3924 //* run_queue_hd::  @cindex\s-+run_queue_hd
3925 //* run_queue_tl::  @cindex\s-+run_queue_tl
3926 //* sched_mutex::  @cindex\s-+sched_mutex
3927 //* schedule::  @cindex\s-+schedule
3928 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3929 //* term_mutex::  @cindex\s-+term_mutex
3930 //@end index