[project @ 2003-10-01 10:49:07 by wolfgang]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.176 2003/10/01 10:49:08 wolfgang Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #define COMPILING_SCHEDULER
88 #include "Schedule.h"
89 #include "StgMiscClosures.h"
90 #include "Storage.h"
91 #include "Interpreter.h"
92 #include "Exception.h"
93 #include "Printer.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Timer.h"
98 #include "Prelude.h"
99 #include "ThreadLabels.h"
100 #ifdef PROFILING
101 #include "Proftimer.h"
102 #include "ProfHeap.h"
103 #endif
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
111 # include "HLC.h"
112 #endif
113 #include "Sparks.h"
114 #include "Capability.h"
115 #include "OSThreads.h"
116 #include  "Task.h"
117
118 #ifdef HAVE_SYS_TYPES_H
119 #include <sys/types.h>
120 #endif
121 #ifdef HAVE_UNISTD_H
122 #include <unistd.h>
123 #endif
124
125 #include <string.h>
126 #include <stdlib.h>
127 #include <stdarg.h>
128
129 #ifdef HAVE_ERRNO_H
130 #include <errno.h>
131 #endif
132
133 #ifdef THREADED_RTS
134 #define USED_IN_THREADED_RTS
135 #else
136 #define USED_IN_THREADED_RTS STG_UNUSED
137 #endif
138
139 #ifdef RTS_SUPPORTS_THREADS
140 #define USED_WHEN_RTS_SUPPORTS_THREADS
141 #else
142 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
143 #endif
144
145 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
146 //@subsection Variables and Data structures
147
148 /* Main thread queue.
149  * Locks required: sched_mutex.
150  */
151 StgMainThread *main_threads = NULL;
152
153 /* Thread queues.
154  * Locks required: sched_mutex.
155  */
156 #if defined(GRAN)
157
158 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
159 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
160
161 /* 
162    In GranSim we have a runnable and a blocked queue for each processor.
163    In order to minimise code changes new arrays run_queue_hds/tls
164    are created. run_queue_hd is then a short cut (macro) for
165    run_queue_hds[CurrentProc] (see GranSim.h).
166    -- HWL
167 */
168 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
169 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
170 StgTSO *ccalling_threadss[MAX_PROC];
171 /* We use the same global list of threads (all_threads) in GranSim as in
172    the std RTS (i.e. we are cheating). However, we don't use this list in
173    the GranSim specific code at the moment (so we are only potentially
174    cheating).  */
175
176 #else /* !GRAN */
177
178 StgTSO *run_queue_hd = NULL;
179 StgTSO *run_queue_tl = NULL;
180 StgTSO *blocked_queue_hd = NULL;
181 StgTSO *blocked_queue_tl = NULL;
182 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
183
184 #endif
185
186 /* Linked list of all threads.
187  * Used for detecting garbage collected threads.
188  */
189 StgTSO *all_threads = NULL;
190
191 /* When a thread performs a safe C call (_ccall_GC, using old
192  * terminology), it gets put on the suspended_ccalling_threads
193  * list. Used by the garbage collector.
194  */
195 static StgTSO *suspended_ccalling_threads;
196
197 static StgTSO *threadStackOverflow(StgTSO *tso);
198
199 /* KH: The following two flags are shared memory locations.  There is no need
200        to lock them, since they are only unset at the end of a scheduler
201        operation.
202 */
203
204 /* flag set by signal handler to precipitate a context switch */
205 //@cindex context_switch
206 nat context_switch = 0;
207
208 /* if this flag is set as well, give up execution */
209 //@cindex interrupted
210 rtsBool interrupted = rtsFalse;
211
212 /* Next thread ID to allocate.
213  * Locks required: thread_id_mutex
214  */
215 //@cindex next_thread_id
216 static StgThreadID next_thread_id = 1;
217
218 /*
219  * Pointers to the state of the current thread.
220  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
221  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
222  */
223  
224 /* The smallest stack size that makes any sense is:
225  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
226  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
227  *  + 1                       (the closure to enter)
228  *  + 1                       (stg_ap_v_ret)
229  *  + 1                       (spare slot req'd by stg_ap_v_ret)
230  *
231  * A thread with this stack will bomb immediately with a stack
232  * overflow, which will increase its stack size.  
233  */
234
235 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
236
237
238 #if defined(GRAN)
239 StgTSO *CurrentTSO;
240 #endif
241
242 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
243  *  exists - earlier gccs apparently didn't.
244  *  -= chak
245  */
246 StgTSO dummy_tso;
247
248 static rtsBool ready_to_gc;
249
250 /*
251  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
252  * in an MT setting, needed to signal that a worker thread shouldn't hang around
253  * in the scheduler when it is out of work.
254  */
255 static rtsBool shutting_down_scheduler = rtsFalse;
256
257 void            addToBlockedQueue ( StgTSO *tso );
258
259 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
260        void     interruptStgRts   ( void );
261
262 static void     detectBlackHoles  ( void );
263
264 #ifdef DEBUG
265 static void sched_belch(char *s, ...);
266 #endif
267
268 #if defined(RTS_SUPPORTS_THREADS)
269 /* ToDo: carefully document the invariants that go together
270  *       with these synchronisation objects.
271  */
272 Mutex     sched_mutex       = INIT_MUTEX_VAR;
273 Mutex     term_mutex        = INIT_MUTEX_VAR;
274
275 /*
276  * A heavyweight solution to the problem of protecting
277  * the thread_id from concurrent update.
278  */
279 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
280
281
282 # if defined(SMP)
283 static Condition gc_pending_cond = INIT_COND_VAR;
284 nat await_death;
285 # endif
286
287 #endif /* RTS_SUPPORTS_THREADS */
288
289 #if defined(PAR)
290 StgTSO *LastTSO;
291 rtsTime TimeOfLastYield;
292 rtsBool emitSchedule = rtsTrue;
293 #endif
294
295 #if DEBUG
296 static char *whatNext_strs[] = {
297   "ThreadRunGHC",
298   "ThreadInterpret",
299   "ThreadKilled",
300   "ThreadRelocated",
301   "ThreadComplete"
302 };
303 #endif
304
305 #if defined(PAR)
306 StgTSO * createSparkThread(rtsSpark spark);
307 StgTSO * activateSpark (rtsSpark spark);  
308 #endif
309
310 /*
311  * The thread state for the main thread.
312 // ToDo: check whether not needed any more
313 StgTSO   *MainTSO;
314  */
315
316 #if defined(RTS_SUPPORTS_THREADS)
317 static rtsBool startingWorkerThread = rtsFalse;
318
319 static void taskStart(void);
320 static void
321 taskStart(void)
322 {
323   Capability *cap;
324   
325   ACQUIRE_LOCK(&sched_mutex);
326   startingWorkerThread = rtsFalse;
327   waitForWorkCapability(&sched_mutex, &cap, NULL);
328   RELEASE_LOCK(&sched_mutex);
329   
330   schedule(NULL,cap);
331 }
332
333 void
334 startSchedulerTaskIfNecessary(void)
335 {
336   if(run_queue_hd != END_TSO_QUEUE
337     || blocked_queue_hd != END_TSO_QUEUE
338     || sleeping_queue != END_TSO_QUEUE)
339   {
340     if(!startingWorkerThread)
341     { // we don't want to start another worker thread
342       // just because the last one hasn't yet reached the
343       // "waiting for capability" state
344       startingWorkerThread = rtsTrue;
345       startTask(taskStart);
346     }
347   }
348 }
349 #endif
350
351 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
352 //@subsection Main scheduling loop
353
354 /* ---------------------------------------------------------------------------
355    Main scheduling loop.
356
357    We use round-robin scheduling, each thread returning to the
358    scheduler loop when one of these conditions is detected:
359
360       * out of heap space
361       * timer expires (thread yields)
362       * thread blocks
363       * thread ends
364       * stack overflow
365
366    Locking notes:  we acquire the scheduler lock once at the beginning
367    of the scheduler loop, and release it when
368     
369       * running a thread, or
370       * waiting for work, or
371       * waiting for a GC to complete.
372
373    GRAN version:
374      In a GranSim setup this loop iterates over the global event queue.
375      This revolves around the global event queue, which determines what 
376      to do next. Therefore, it's more complicated than either the 
377      concurrent or the parallel (GUM) setup.
378
379    GUM version:
380      GUM iterates over incoming messages.
381      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
382      and sends out a fish whenever it has nothing to do; in-between
383      doing the actual reductions (shared code below) it processes the
384      incoming messages and deals with delayed operations 
385      (see PendingFetches).
386      This is not the ugliest code you could imagine, but it's bloody close.
387
388    ------------------------------------------------------------------------ */
389 //@cindex schedule
390 static void
391 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
392           Capability *initialCapability )
393 {
394   StgTSO *t;
395   Capability *cap = initialCapability;
396   StgThreadReturnCode ret;
397 #if defined(GRAN)
398   rtsEvent *event;
399 #elif defined(PAR)
400   StgSparkPool *pool;
401   rtsSpark spark;
402   StgTSO *tso;
403   GlobalTaskId pe;
404   rtsBool receivedFinish = rtsFalse;
405 # if defined(DEBUG)
406   nat tp_size, sp_size; // stats only
407 # endif
408 #endif
409   rtsBool was_interrupted = rtsFalse;
410   StgTSOWhatNext prev_what_next;
411   
412   ACQUIRE_LOCK(&sched_mutex);
413  
414 #if defined(RTS_SUPPORTS_THREADS)
415   /* in the threaded case, the capability is either passed in via the initialCapability
416      parameter, or initialized inside the scheduler loop */
417
418   IF_DEBUG(scheduler,
419     fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n",
420             osThreadId(), osThreadId()));
421   IF_DEBUG(scheduler,
422     fprintf(stderr,"### main thread: %p\n",mainThread));
423   IF_DEBUG(scheduler,
424     fprintf(stderr,"### initial cap: %p\n",initialCapability));
425 #else
426   /* simply initialise it in the non-threaded case */
427   grabCapability(&cap);
428 #endif
429
430 #if defined(GRAN)
431   /* set up first event to get things going */
432   /* ToDo: assign costs for system setup and init MainTSO ! */
433   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
434             ContinueThread, 
435             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
436
437   IF_DEBUG(gran,
438            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
439            G_TSO(CurrentTSO, 5));
440
441   if (RtsFlags.GranFlags.Light) {
442     /* Save current time; GranSim Light only */
443     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
444   }      
445
446   event = get_next_event();
447
448   while (event!=(rtsEvent*)NULL) {
449     /* Choose the processor with the next event */
450     CurrentProc = event->proc;
451     CurrentTSO = event->tso;
452
453 #elif defined(PAR)
454
455   while (!receivedFinish) {    /* set by processMessages */
456                                /* when receiving PP_FINISH message         */ 
457 #else
458
459   while (1) {
460
461 #endif
462
463     IF_DEBUG(scheduler, printAllThreads());
464
465 #if defined(RTS_SUPPORTS_THREADS)
466     /* Check to see whether there are any worker threads
467        waiting to deposit external call results. If so,
468        yield our capability... if we have a capability, that is. */
469     if(cap)
470       yieldToReturningWorker(&sched_mutex, &cap,
471           mainThread ? &mainThread->bound_thread_cond : NULL);
472
473     /* If we do not currently hold a capability, we wait for one */
474     if(!cap)
475     {
476       waitForWorkCapability(&sched_mutex, &cap,
477           mainThread ? &mainThread->bound_thread_cond : NULL);
478       IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap",
479                                       osThreadId()));
480     }
481 #endif
482
483     /* If we're interrupted (the user pressed ^C, or some other
484      * termination condition occurred), kill all the currently running
485      * threads.
486      */
487     if (interrupted) {
488       IF_DEBUG(scheduler, sched_belch("interrupted"));
489       interrupted = rtsFalse;
490       was_interrupted = rtsTrue;
491 #if defined(RTS_SUPPORTS_THREADS)
492       // In the threaded RTS, deadlock detection doesn't work,
493       // so just exit right away.
494       prog_belch("interrupted");
495       releaseCapability(cap);
496       RELEASE_LOCK(&sched_mutex);
497       shutdownHaskellAndExit(EXIT_SUCCESS);
498 #else
499       deleteAllThreads();
500 #endif
501     }
502
503     /* Go through the list of main threads and wake up any
504      * clients whose computations have finished.  ToDo: this
505      * should be done more efficiently without a linear scan
506      * of the main threads list, somehow...
507      */
508 #if defined(RTS_SUPPORTS_THREADS)
509     { 
510         StgMainThread *m, **prev;
511         prev = &main_threads;
512         for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
513           if (m->tso->what_next == ThreadComplete
514               || m->tso->what_next == ThreadKilled)
515           {
516             if(m == mainThread)
517             {
518               if(m->tso->what_next == ThreadComplete)
519               {
520                 if (m->ret)
521                 {
522                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
523                   *(m->ret) = (StgClosure *)m->tso->sp[1]; 
524                 }
525                 m->stat = Success;
526               }
527               else
528               {
529                 if (m->ret)
530                 {
531                   *(m->ret) = NULL;
532                 }
533                 if (was_interrupted)
534                 {
535                   m->stat = Interrupted;
536                 }
537                 else
538                 {
539                   m->stat = Killed;
540                 }
541               }
542               *prev = m->link;
543             
544 #ifdef DEBUG
545               removeThreadLabel((StgWord)m->tso);
546 #endif
547               releaseCapability(cap);
548               RELEASE_LOCK(&sched_mutex);
549               return;
550             }
551             else
552             {
553                 // The current OS thread can not handle the fact that the Haskell
554                 // thread "m" has ended. 
555                 // "m" is bound; the scheduler loop in it's bound OS thread has
556                 // to return, so let's pass our capability directly to that thread.
557               passCapability(&sched_mutex, cap, &m->bound_thread_cond);
558               cap = NULL;
559             }
560           }
561         }
562     }
563     
564     if(!cap)    // If we gave our capability away,
565       continue; // go to the top to get it back
566       
567 #else /* not threaded */
568
569 # if defined(PAR)
570     /* in GUM do this only on the Main PE */
571     if (IAmMainThread)
572 # endif
573     /* If our main thread has finished or been killed, return.
574      */
575     {
576       StgMainThread *m = main_threads;
577       if (m->tso->what_next == ThreadComplete
578           || m->tso->what_next == ThreadKilled) {
579 #ifdef DEBUG
580         removeThreadLabel((StgWord)m->tso);
581 #endif
582         main_threads = main_threads->link;
583         if (m->tso->what_next == ThreadComplete) {
584             // We finished successfully, fill in the return value
585             // NOTE: return val is tso->sp[1] (see StgStartup.hc)
586             if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
587             m->stat = Success;
588             return;
589         } else {
590           if (m->ret) { *(m->ret) = NULL; };
591           if (was_interrupted) {
592             m->stat = Interrupted;
593           } else {
594             m->stat = Killed;
595           }
596           return;
597         }
598       }
599     }
600 #endif
601
602     /* Top up the run queue from our spark pool.  We try to make the
603      * number of threads in the run queue equal to the number of
604      * free capabilities.
605      *
606      * Disable spark support in SMP for now, non-essential & requires
607      * a little bit of work to make it compile cleanly. -- sof 1/02.
608      */
609 #if 0 /* defined(SMP) */
610     {
611       nat n = getFreeCapabilities();
612       StgTSO *tso = run_queue_hd;
613
614       /* Count the run queue */
615       while (n > 0 && tso != END_TSO_QUEUE) {
616         tso = tso->link;
617         n--;
618       }
619
620       for (; n > 0; n--) {
621         StgClosure *spark;
622         spark = findSpark(rtsFalse);
623         if (spark == NULL) {
624           break; /* no more sparks in the pool */
625         } else {
626           /* I'd prefer this to be done in activateSpark -- HWL */
627           /* tricky - it needs to hold the scheduler lock and
628            * not try to re-acquire it -- SDM */
629           createSparkThread(spark);       
630           IF_DEBUG(scheduler,
631                    sched_belch("==^^ turning spark of closure %p into a thread",
632                                (StgClosure *)spark));
633         }
634       }
635       /* We need to wake up the other tasks if we just created some
636        * work for them.
637        */
638       if (getFreeCapabilities() - n > 1) {
639           signalCondition( &thread_ready_cond );
640       }
641     }
642 #endif // SMP
643
644     /* check for signals each time around the scheduler */
645 #if defined(RTS_USER_SIGNALS)
646     if (signals_pending()) {
647       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
648       startSignalHandlers();
649       ACQUIRE_LOCK(&sched_mutex);
650     }
651 #endif
652
653     /* Check whether any waiting threads need to be woken up.  If the
654      * run queue is empty, and there are no other tasks running, we
655      * can wait indefinitely for something to happen.
656      */
657     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
658 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
659                 || EMPTY_RUN_QUEUE()
660 #endif
661         )
662     {
663       awaitEvent( EMPTY_RUN_QUEUE()
664 #if defined(SMP)
665         && allFreeCapabilities()
666 #endif
667         );
668     }
669     /* we can be interrupted while waiting for I/O... */
670     if (interrupted) continue;
671
672     /* 
673      * Detect deadlock: when we have no threads to run, there are no
674      * threads waiting on I/O or sleeping, and all the other tasks are
675      * waiting for work, we must have a deadlock of some description.
676      *
677      * We first try to find threads blocked on themselves (ie. black
678      * holes), and generate NonTermination exceptions where necessary.
679      *
680      * If no threads are black holed, we have a deadlock situation, so
681      * inform all the main threads.
682      */
683 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
684     if (   EMPTY_THREAD_QUEUES()
685 #if defined(RTS_SUPPORTS_THREADS)
686         && EMPTY_QUEUE(suspended_ccalling_threads)
687 #endif
688 #ifdef SMP
689         && allFreeCapabilities()
690 #endif
691         )
692     {
693         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
694 #if defined(THREADED_RTS)
695         /* and SMP mode ..? */
696         releaseCapability(cap);
697 #endif
698         // Garbage collection can release some new threads due to
699         // either (a) finalizers or (b) threads resurrected because
700         // they are about to be send BlockedOnDeadMVar.  Any threads
701         // thus released will be immediately runnable.
702         GarbageCollect(GetRoots,rtsTrue);
703
704         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
705
706         IF_DEBUG(scheduler, 
707                  sched_belch("still deadlocked, checking for black holes..."));
708         detectBlackHoles();
709
710         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
711
712 #if defined(RTS_USER_SIGNALS)
713         /* If we have user-installed signal handlers, then wait
714          * for signals to arrive rather then bombing out with a
715          * deadlock.
716          */
717 #if defined(RTS_SUPPORTS_THREADS)
718         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
719                       a signal with no runnable threads (or I/O
720                       suspended ones) leads nowhere quick.
721                       For now, simply shut down when we reach this
722                       condition.
723                       
724                       ToDo: define precisely under what conditions
725                       the Scheduler should shut down in an MT setting.
726                    */
727 #else
728         if ( anyUserHandlers() ) {
729 #endif
730             IF_DEBUG(scheduler, 
731                      sched_belch("still deadlocked, waiting for signals..."));
732
733             awaitUserSignals();
734
735             // we might be interrupted...
736             if (interrupted) { continue; }
737
738             if (signals_pending()) {
739                 RELEASE_LOCK(&sched_mutex);
740                 startSignalHandlers();
741                 ACQUIRE_LOCK(&sched_mutex);
742             }
743             ASSERT(!EMPTY_RUN_QUEUE());
744             goto not_deadlocked;
745         }
746 #endif
747
748         /* Probably a real deadlock.  Send the current main thread the
749          * Deadlock exception (or in the SMP build, send *all* main
750          * threads the deadlock exception, since none of them can make
751          * progress).
752          */
753         {
754             StgMainThread *m;
755 #if defined(RTS_SUPPORTS_THREADS)
756             for (m = main_threads; m != NULL; m = m->link) {
757                 switch (m->tso->why_blocked) {
758                 case BlockedOnBlackHole:
759                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
760                     break;
761                 case BlockedOnException:
762                 case BlockedOnMVar:
763                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
764                     break;
765                 default:
766                     barf("deadlock: main thread blocked in a strange way");
767                 }
768             }
769 #else
770             m = main_threads;
771             switch (m->tso->why_blocked) {
772             case BlockedOnBlackHole:
773                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
774                 break;
775             case BlockedOnException:
776             case BlockedOnMVar:
777                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
778                 break;
779             default:
780                 barf("deadlock: main thread blocked in a strange way");
781             }
782 #endif
783         }
784
785 #if defined(RTS_SUPPORTS_THREADS)
786         /* ToDo: revisit conditions (and mechanism) for shutting
787            down a multi-threaded world  */
788         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
789         RELEASE_LOCK(&sched_mutex);
790         shutdownHaskell();
791         return;
792 #endif
793     }
794   not_deadlocked:
795
796 #elif defined(RTS_SUPPORTS_THREADS)
797     /* ToDo: add deadlock detection in threaded RTS */
798 #elif defined(PAR)
799     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
800 #endif
801
802 #if defined(SMP)
803     /* If there's a GC pending, don't do anything until it has
804      * completed.
805      */
806     if (ready_to_gc) {
807       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
808       waitCondition( &gc_pending_cond, &sched_mutex );
809     }
810 #endif    
811
812 #if defined(RTS_SUPPORTS_THREADS)
813 #if defined(SMP)
814     /* block until we've got a thread on the run queue and a free
815      * capability.
816      *
817      */
818     if ( EMPTY_RUN_QUEUE() ) {
819       /* Give up our capability */
820       releaseCapability(cap);
821
822       /* If we're in the process of shutting down (& running the
823        * a batch of finalisers), don't wait around.
824        */
825       if ( shutting_down_scheduler ) {
826         RELEASE_LOCK(&sched_mutex);
827         return;
828       }
829       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
830       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
831       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
832     }
833 #else
834     if ( EMPTY_RUN_QUEUE() ) {
835       continue; // nothing to do
836     }
837 #endif
838 #endif
839
840 #if defined(GRAN)
841     if (RtsFlags.GranFlags.Light)
842       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
843
844     /* adjust time based on time-stamp */
845     if (event->time > CurrentTime[CurrentProc] &&
846         event->evttype != ContinueThread)
847       CurrentTime[CurrentProc] = event->time;
848     
849     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
850     if (!RtsFlags.GranFlags.Light)
851       handleIdlePEs();
852
853     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
854
855     /* main event dispatcher in GranSim */
856     switch (event->evttype) {
857       /* Should just be continuing execution */
858     case ContinueThread:
859       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
860       /* ToDo: check assertion
861       ASSERT(run_queue_hd != (StgTSO*)NULL &&
862              run_queue_hd != END_TSO_QUEUE);
863       */
864       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
865       if (!RtsFlags.GranFlags.DoAsyncFetch &&
866           procStatus[CurrentProc]==Fetching) {
867         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
868               CurrentTSO->id, CurrentTSO, CurrentProc);
869         goto next_thread;
870       } 
871       /* Ignore ContinueThreads for completed threads */
872       if (CurrentTSO->what_next == ThreadComplete) {
873         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
874               CurrentTSO->id, CurrentTSO, CurrentProc);
875         goto next_thread;
876       } 
877       /* Ignore ContinueThreads for threads that are being migrated */
878       if (PROCS(CurrentTSO)==Nowhere) { 
879         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
880               CurrentTSO->id, CurrentTSO, CurrentProc);
881         goto next_thread;
882       }
883       /* The thread should be at the beginning of the run queue */
884       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
885         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
886               CurrentTSO->id, CurrentTSO, CurrentProc);
887         break; // run the thread anyway
888       }
889       /*
890       new_event(proc, proc, CurrentTime[proc],
891                 FindWork,
892                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
893       goto next_thread; 
894       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
895       break; // now actually run the thread; DaH Qu'vam yImuHbej 
896
897     case FetchNode:
898       do_the_fetchnode(event);
899       goto next_thread;             /* handle next event in event queue  */
900       
901     case GlobalBlock:
902       do_the_globalblock(event);
903       goto next_thread;             /* handle next event in event queue  */
904       
905     case FetchReply:
906       do_the_fetchreply(event);
907       goto next_thread;             /* handle next event in event queue  */
908       
909     case UnblockThread:   /* Move from the blocked queue to the tail of */
910       do_the_unblock(event);
911       goto next_thread;             /* handle next event in event queue  */
912       
913     case ResumeThread:  /* Move from the blocked queue to the tail of */
914       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
915       event->tso->gran.blocktime += 
916         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
917       do_the_startthread(event);
918       goto next_thread;             /* handle next event in event queue  */
919       
920     case StartThread:
921       do_the_startthread(event);
922       goto next_thread;             /* handle next event in event queue  */
923       
924     case MoveThread:
925       do_the_movethread(event);
926       goto next_thread;             /* handle next event in event queue  */
927       
928     case MoveSpark:
929       do_the_movespark(event);
930       goto next_thread;             /* handle next event in event queue  */
931       
932     case FindWork:
933       do_the_findwork(event);
934       goto next_thread;             /* handle next event in event queue  */
935       
936     default:
937       barf("Illegal event type %u\n", event->evttype);
938     }  /* switch */
939     
940     /* This point was scheduler_loop in the old RTS */
941
942     IF_DEBUG(gran, belch("GRAN: after main switch"));
943
944     TimeOfLastEvent = CurrentTime[CurrentProc];
945     TimeOfNextEvent = get_time_of_next_event();
946     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
947     // CurrentTSO = ThreadQueueHd;
948
949     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
950                          TimeOfNextEvent));
951
952     if (RtsFlags.GranFlags.Light) 
953       GranSimLight_leave_system(event, &ActiveTSO); 
954
955     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
956
957     IF_DEBUG(gran, 
958              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
959
960     /* in a GranSim setup the TSO stays on the run queue */
961     t = CurrentTSO;
962     /* Take a thread from the run queue. */
963     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
964
965     IF_DEBUG(gran, 
966              fprintf(stderr, "GRAN: About to run current thread, which is\n");
967              G_TSO(t,5));
968
969     context_switch = 0; // turned on via GranYield, checking events and time slice
970
971     IF_DEBUG(gran, 
972              DumpGranEvent(GR_SCHEDULE, t));
973
974     procStatus[CurrentProc] = Busy;
975
976 #elif defined(PAR)
977     if (PendingFetches != END_BF_QUEUE) {
978         processFetches();
979     }
980
981     /* ToDo: phps merge with spark activation above */
982     /* check whether we have local work and send requests if we have none */
983     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
984       /* :-[  no local threads => look out for local sparks */
985       /* the spark pool for the current PE */
986       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
987       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
988           pool->hd < pool->tl) {
989         /* 
990          * ToDo: add GC code check that we really have enough heap afterwards!!
991          * Old comment:
992          * If we're here (no runnable threads) and we have pending
993          * sparks, we must have a space problem.  Get enough space
994          * to turn one of those pending sparks into a
995          * thread... 
996          */
997
998         spark = findSpark(rtsFalse);                /* get a spark */
999         if (spark != (rtsSpark) NULL) {
1000           tso = activateSpark(spark);       /* turn the spark into a thread */
1001           IF_PAR_DEBUG(schedule,
1002                        belch("==== schedule: Created TSO %d (%p); %d threads active",
1003                              tso->id, tso, advisory_thread_count));
1004
1005           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1006             belch("==^^ failed to activate spark");
1007             goto next_thread;
1008           }               /* otherwise fall through & pick-up new tso */
1009         } else {
1010           IF_PAR_DEBUG(verbose,
1011                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
1012                              spark_queue_len(pool)));
1013           goto next_thread;
1014         }
1015       }
1016
1017       /* If we still have no work we need to send a FISH to get a spark
1018          from another PE 
1019       */
1020       if (EMPTY_RUN_QUEUE()) {
1021       /* =8-[  no local sparks => look for work on other PEs */
1022         /*
1023          * We really have absolutely no work.  Send out a fish
1024          * (there may be some out there already), and wait for
1025          * something to arrive.  We clearly can't run any threads
1026          * until a SCHEDULE or RESUME arrives, and so that's what
1027          * we're hoping to see.  (Of course, we still have to
1028          * respond to other types of messages.)
1029          */
1030         TIME now = msTime() /*CURRENT_TIME*/;
1031         IF_PAR_DEBUG(verbose, 
1032                      belch("--  now=%ld", now));
1033         IF_PAR_DEBUG(verbose,
1034                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1035                          (last_fish_arrived_at!=0 &&
1036                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
1037                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
1038                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
1039                              last_fish_arrived_at,
1040                              RtsFlags.ParFlags.fishDelay, now);
1041                      });
1042         
1043         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1044             (last_fish_arrived_at==0 ||
1045              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
1046           /* outstandingFishes is set in sendFish, processFish;
1047              avoid flooding system with fishes via delay */
1048           pe = choosePE();
1049           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1050                    NEW_FISH_HUNGER);
1051
1052           // Global statistics: count no. of fishes
1053           if (RtsFlags.ParFlags.ParStats.Global &&
1054               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1055             globalParStats.tot_fish_mess++;
1056           }
1057         }
1058       
1059         receivedFinish = processMessages();
1060         goto next_thread;
1061       }
1062     } else if (PacketsWaiting()) {  /* Look for incoming messages */
1063       receivedFinish = processMessages();
1064     }
1065
1066     /* Now we are sure that we have some work available */
1067     ASSERT(run_queue_hd != END_TSO_QUEUE);
1068
1069     /* Take a thread from the run queue, if we have work */
1070     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
1071     IF_DEBUG(sanity,checkTSO(t));
1072
1073     /* ToDo: write something to the log-file
1074     if (RTSflags.ParFlags.granSimStats && !sameThread)
1075         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1076
1077     CurrentTSO = t;
1078     */
1079     /* the spark pool for the current PE */
1080     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1081
1082     IF_DEBUG(scheduler, 
1083              belch("--=^ %d threads, %d sparks on [%#x]", 
1084                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1085
1086 # if 1
1087     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1088         t && LastTSO && t->id != LastTSO->id && 
1089         LastTSO->why_blocked == NotBlocked && 
1090         LastTSO->what_next != ThreadComplete) {
1091       // if previously scheduled TSO not blocked we have to record the context switch
1092       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1093                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1094     }
1095
1096     if (RtsFlags.ParFlags.ParStats.Full && 
1097         (emitSchedule /* forced emit */ ||
1098         (t && LastTSO && t->id != LastTSO->id))) {
1099       /* 
1100          we are running a different TSO, so write a schedule event to log file
1101          NB: If we use fair scheduling we also have to write  a deschedule 
1102              event for LastTSO; with unfair scheduling we know that the
1103              previous tso has blocked whenever we switch to another tso, so
1104              we don't need it in GUM for now
1105       */
1106       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1107                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1108       emitSchedule = rtsFalse;
1109     }
1110      
1111 # endif
1112 #else /* !GRAN && !PAR */
1113   
1114     /* grab a thread from the run queue */
1115     ASSERT(run_queue_hd != END_TSO_QUEUE);
1116     t = POP_RUN_QUEUE();
1117     // Sanity check the thread we're about to run.  This can be
1118     // expensive if there is lots of thread switching going on...
1119     IF_DEBUG(sanity,checkTSO(t));
1120 #endif
1121
1122 #ifdef THREADED_RTS
1123     {
1124       StgMainThread *m;
1125       for(m = main_threads; m; m = m->link)
1126       {
1127         if(m->tso == t)
1128           break;
1129       }
1130       
1131       if(m)
1132       {
1133         if(m == mainThread)
1134         {
1135           IF_DEBUG(scheduler,
1136             fprintf(stderr,"### Running TSO %p in bound OS thread %u\n",
1137                     t, osThreadId()));
1138           // yes, the Haskell thread is bound to the current native thread
1139         }
1140         else
1141         {
1142           IF_DEBUG(scheduler,
1143             fprintf(stderr,"### TSO %p bound to other OS thread than %u\n",
1144                     t, osThreadId()));
1145           // no, bound to a different Haskell thread: pass to that thread
1146           PUSH_ON_RUN_QUEUE(t);
1147           passCapability(&sched_mutex,cap,&m->bound_thread_cond);
1148           cap = NULL;
1149           continue;
1150         }
1151       }
1152       else
1153       {
1154         // The thread we want to run is not bound.
1155         if(mainThread == NULL)
1156         {
1157           IF_DEBUG(scheduler,
1158             fprintf(stderr,"### Running TSO %p in worker OS thread %u\n",
1159                     t, osThreadId()));
1160           // if we are a worker thread,
1161           // we may run it here
1162         }
1163         else
1164         {
1165           IF_DEBUG(scheduler,
1166             fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n",
1167                     t, mainThread, osThreadId()));
1168           // no, the current native thread is bound to a different
1169           // Haskell thread, so pass it to any worker thread
1170           PUSH_ON_RUN_QUEUE(t);
1171           passCapabilityToWorker(&sched_mutex, cap);
1172           cap = NULL;
1173           continue; 
1174         }
1175       }
1176     }
1177 #endif
1178
1179     cap->r.rCurrentTSO = t;
1180     
1181     /* context switches are now initiated by the timer signal, unless
1182      * the user specified "context switch as often as possible", with
1183      * +RTS -C0
1184      */
1185     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1186          && (run_queue_hd != END_TSO_QUEUE
1187              || blocked_queue_hd != END_TSO_QUEUE
1188              || sleeping_queue != END_TSO_QUEUE)))
1189         context_switch = 1;
1190     else
1191         context_switch = 0;
1192
1193 run_thread:
1194
1195     RELEASE_LOCK(&sched_mutex);
1196
1197     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
1198                               t->id, whatNext_strs[t->what_next]));
1199
1200 #ifdef PROFILING
1201     startHeapProfTimer();
1202 #endif
1203
1204     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1205     /* Run the current thread 
1206      */
1207     prev_what_next = t->what_next;
1208     switch (prev_what_next) {
1209     case ThreadKilled:
1210     case ThreadComplete:
1211         /* Thread already finished, return to scheduler. */
1212         ret = ThreadFinished;
1213         break;
1214     case ThreadRunGHC:
1215         errno = t->saved_errno;
1216         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1217         t->saved_errno = errno;
1218         break;
1219     case ThreadInterpret:
1220         ret = interpretBCO(cap);
1221         break;
1222     default:
1223       barf("schedule: invalid what_next field");
1224     }
1225     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1226     
1227     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1228 #ifdef PROFILING
1229     stopHeapProfTimer();
1230     CCCS = CCS_SYSTEM;
1231 #endif
1232     
1233     ACQUIRE_LOCK(&sched_mutex);
1234     
1235 #ifdef RTS_SUPPORTS_THREADS
1236     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId()););
1237 #elif !defined(GRAN) && !defined(PAR)
1238     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1239 #endif
1240     t = cap->r.rCurrentTSO;
1241     
1242 #if defined(PAR)
1243     /* HACK 675: if the last thread didn't yield, make sure to print a 
1244        SCHEDULE event to the log file when StgRunning the next thread, even
1245        if it is the same one as before */
1246     LastTSO = t; 
1247     TimeOfLastYield = CURRENT_TIME;
1248 #endif
1249
1250     switch (ret) {
1251     case HeapOverflow:
1252 #if defined(GRAN)
1253       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1254       globalGranStats.tot_heapover++;
1255 #elif defined(PAR)
1256       globalParStats.tot_heapover++;
1257 #endif
1258
1259       // did the task ask for a large block?
1260       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1261           // if so, get one and push it on the front of the nursery.
1262           bdescr *bd;
1263           nat blocks;
1264           
1265           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1266
1267           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
1268                                    t->id, whatNext_strs[t->what_next], blocks));
1269
1270           // don't do this if it would push us over the
1271           // alloc_blocks_lim limit; we'll GC first.
1272           if (alloc_blocks + blocks < alloc_blocks_lim) {
1273
1274               alloc_blocks += blocks;
1275               bd = allocGroup( blocks );
1276
1277               // link the new group into the list
1278               bd->link = cap->r.rCurrentNursery;
1279               bd->u.back = cap->r.rCurrentNursery->u.back;
1280               if (cap->r.rCurrentNursery->u.back != NULL) {
1281                   cap->r.rCurrentNursery->u.back->link = bd;
1282               } else {
1283                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1284                          g0s0->blocks == cap->r.rNursery);
1285                   cap->r.rNursery = g0s0->blocks = bd;
1286               }           
1287               cap->r.rCurrentNursery->u.back = bd;
1288
1289               // initialise it as a nursery block.  We initialise the
1290               // step, gen_no, and flags field of *every* sub-block in
1291               // this large block, because this is easier than making
1292               // sure that we always find the block head of a large
1293               // block whenever we call Bdescr() (eg. evacuate() and
1294               // isAlive() in the GC would both have to do this, at
1295               // least).
1296               { 
1297                   bdescr *x;
1298                   for (x = bd; x < bd + blocks; x++) {
1299                       x->step = g0s0;
1300                       x->gen_no = 0;
1301                       x->flags = 0;
1302                   }
1303               }
1304
1305               // don't forget to update the block count in g0s0.
1306               g0s0->n_blocks += blocks;
1307               // This assert can be a killer if the app is doing lots
1308               // of large block allocations.
1309               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1310
1311               // now update the nursery to point to the new block
1312               cap->r.rCurrentNursery = bd;
1313
1314               // we might be unlucky and have another thread get on the
1315               // run queue before us and steal the large block, but in that
1316               // case the thread will just end up requesting another large
1317               // block.
1318               PUSH_ON_RUN_QUEUE(t);
1319               break;
1320           }
1321       }
1322
1323       /* make all the running tasks block on a condition variable,
1324        * maybe set context_switch and wait till they all pile in,
1325        * then have them wait on a GC condition variable.
1326        */
1327       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1328                                t->id, whatNext_strs[t->what_next]));
1329       threadPaused(t);
1330 #if defined(GRAN)
1331       ASSERT(!is_on_queue(t,CurrentProc));
1332 #elif defined(PAR)
1333       /* Currently we emit a DESCHEDULE event before GC in GUM.
1334          ToDo: either add separate event to distinguish SYSTEM time from rest
1335                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1336       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1337         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1338                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1339         emitSchedule = rtsTrue;
1340       }
1341 #endif
1342       
1343       ready_to_gc = rtsTrue;
1344       context_switch = 1;               /* stop other threads ASAP */
1345       PUSH_ON_RUN_QUEUE(t);
1346       /* actual GC is done at the end of the while loop */
1347       break;
1348       
1349     case StackOverflow:
1350 #if defined(GRAN)
1351       IF_DEBUG(gran, 
1352                DumpGranEvent(GR_DESCHEDULE, t));
1353       globalGranStats.tot_stackover++;
1354 #elif defined(PAR)
1355       // IF_DEBUG(par, 
1356       // DumpGranEvent(GR_DESCHEDULE, t);
1357       globalParStats.tot_stackover++;
1358 #endif
1359       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1360                                t->id, whatNext_strs[t->what_next]));
1361       /* just adjust the stack for this thread, then pop it back
1362        * on the run queue.
1363        */
1364       threadPaused(t);
1365       { 
1366         StgMainThread *m;
1367         /* enlarge the stack */
1368         StgTSO *new_t = threadStackOverflow(t);
1369         
1370         /* This TSO has moved, so update any pointers to it from the
1371          * main thread stack.  It better not be on any other queues...
1372          * (it shouldn't be).
1373          */
1374         for (m = main_threads; m != NULL; m = m->link) {
1375           if (m->tso == t) {
1376             m->tso = new_t;
1377           }
1378         }
1379         threadPaused(new_t);
1380         PUSH_ON_RUN_QUEUE(new_t);
1381       }
1382       break;
1383
1384     case ThreadYielding:
1385 #if defined(GRAN)
1386       IF_DEBUG(gran, 
1387                DumpGranEvent(GR_DESCHEDULE, t));
1388       globalGranStats.tot_yields++;
1389 #elif defined(PAR)
1390       // IF_DEBUG(par, 
1391       // DumpGranEvent(GR_DESCHEDULE, t);
1392       globalParStats.tot_yields++;
1393 #endif
1394       /* put the thread back on the run queue.  Then, if we're ready to
1395        * GC, check whether this is the last task to stop.  If so, wake
1396        * up the GC thread.  getThread will block during a GC until the
1397        * GC is finished.
1398        */
1399       IF_DEBUG(scheduler,
1400                if (t->what_next != prev_what_next) {
1401                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1402                          t->id, whatNext_strs[t->what_next]);
1403                } else {
1404                    belch("--<< thread %ld (%s) stopped, yielding", 
1405                          t->id, whatNext_strs[t->what_next]);
1406                }
1407                );
1408
1409       IF_DEBUG(sanity,
1410                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1411                checkTSO(t));
1412       ASSERT(t->link == END_TSO_QUEUE);
1413
1414       // Shortcut if we're just switching evaluators: don't bother
1415       // doing stack squeezing (which can be expensive), just run the
1416       // thread.
1417       if (t->what_next != prev_what_next) {
1418           goto run_thread;
1419       }
1420
1421       threadPaused(t);
1422
1423 #if defined(GRAN)
1424       ASSERT(!is_on_queue(t,CurrentProc));
1425
1426       IF_DEBUG(sanity,
1427                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1428                checkThreadQsSanity(rtsTrue));
1429 #endif
1430
1431 #if defined(PAR)
1432       if (RtsFlags.ParFlags.doFairScheduling) { 
1433         /* this does round-robin scheduling; good for concurrency */
1434         APPEND_TO_RUN_QUEUE(t);
1435       } else {
1436         /* this does unfair scheduling; good for parallelism */
1437         PUSH_ON_RUN_QUEUE(t);
1438       }
1439 #else
1440       // this does round-robin scheduling; good for concurrency
1441       APPEND_TO_RUN_QUEUE(t);
1442 #endif
1443
1444 #if defined(GRAN)
1445       /* add a ContinueThread event to actually process the thread */
1446       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1447                 ContinueThread,
1448                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1449       IF_GRAN_DEBUG(bq, 
1450                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1451                G_EVENTQ(0);
1452                G_CURR_THREADQ(0));
1453 #endif /* GRAN */
1454       break;
1455
1456     case ThreadBlocked:
1457 #if defined(GRAN)
1458       IF_DEBUG(scheduler,
1459                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1460                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1461                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1462
1463       // ??? needed; should emit block before
1464       IF_DEBUG(gran, 
1465                DumpGranEvent(GR_DESCHEDULE, t)); 
1466       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1467       /*
1468         ngoq Dogh!
1469       ASSERT(procStatus[CurrentProc]==Busy || 
1470               ((procStatus[CurrentProc]==Fetching) && 
1471               (t->block_info.closure!=(StgClosure*)NULL)));
1472       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1473           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1474             procStatus[CurrentProc]==Fetching)) 
1475         procStatus[CurrentProc] = Idle;
1476       */
1477 #elif defined(PAR)
1478       IF_DEBUG(scheduler,
1479                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1480                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1481       IF_PAR_DEBUG(bq,
1482
1483                    if (t->block_info.closure!=(StgClosure*)NULL) 
1484                      print_bq(t->block_info.closure));
1485
1486       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1487       blockThread(t);
1488
1489       /* whatever we schedule next, we must log that schedule */
1490       emitSchedule = rtsTrue;
1491
1492 #else /* !GRAN */
1493       /* don't need to do anything.  Either the thread is blocked on
1494        * I/O, in which case we'll have called addToBlockedQueue
1495        * previously, or it's blocked on an MVar or Blackhole, in which
1496        * case it'll be on the relevant queue already.
1497        */
1498       IF_DEBUG(scheduler,
1499                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1500                        t->id, whatNext_strs[t->what_next]);
1501                printThreadBlockage(t);
1502                fprintf(stderr, "\n"));
1503
1504       /* Only for dumping event to log file 
1505          ToDo: do I need this in GranSim, too?
1506       blockThread(t);
1507       */
1508 #endif
1509       threadPaused(t);
1510       break;
1511       
1512     case ThreadFinished:
1513       /* Need to check whether this was a main thread, and if so, signal
1514        * the task that started it with the return value.  If we have no
1515        * more main threads, we probably need to stop all the tasks until
1516        * we get a new one.
1517        */
1518       /* We also end up here if the thread kills itself with an
1519        * uncaught exception, see Exception.hc.
1520        */
1521       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1522                                t->id, whatNext_strs[t->what_next]));
1523 #if defined(GRAN)
1524       endThread(t, CurrentProc); // clean-up the thread
1525 #elif defined(PAR)
1526       /* For now all are advisory -- HWL */
1527       //if(t->priority==AdvisoryPriority) ??
1528       advisory_thread_count--;
1529       
1530 # ifdef DIST
1531       if(t->dist.priority==RevalPriority)
1532         FinishReval(t);
1533 # endif
1534       
1535       if (RtsFlags.ParFlags.ParStats.Full &&
1536           !RtsFlags.ParFlags.ParStats.Suppressed) 
1537         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1538 #endif
1539       break;
1540       
1541     default:
1542       barf("schedule: invalid thread return code %d", (int)ret);
1543     }
1544
1545 #ifdef PROFILING
1546     // When we have +RTS -i0 and we're heap profiling, do a census at
1547     // every GC.  This lets us get repeatable runs for debugging.
1548     if (performHeapProfile ||
1549         (RtsFlags.ProfFlags.profileInterval==0 &&
1550          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1551         GarbageCollect(GetRoots, rtsTrue);
1552         heapCensus();
1553         performHeapProfile = rtsFalse;
1554         ready_to_gc = rtsFalse; // we already GC'd
1555     }
1556 #endif
1557
1558     if (ready_to_gc 
1559 #ifdef SMP
1560         && allFreeCapabilities() 
1561 #endif
1562         ) {
1563       /* everybody back, start the GC.
1564        * Could do it in this thread, or signal a condition var
1565        * to do it in another thread.  Either way, we need to
1566        * broadcast on gc_pending_cond afterward.
1567        */
1568 #if defined(RTS_SUPPORTS_THREADS)
1569       IF_DEBUG(scheduler,sched_belch("doing GC"));
1570 #endif
1571       GarbageCollect(GetRoots,rtsFalse);
1572       ready_to_gc = rtsFalse;
1573 #ifdef SMP
1574       broadcastCondition(&gc_pending_cond);
1575 #endif
1576 #if defined(GRAN)
1577       /* add a ContinueThread event to continue execution of current thread */
1578       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1579                 ContinueThread,
1580                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1581       IF_GRAN_DEBUG(bq, 
1582                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1583                G_EVENTQ(0);
1584                G_CURR_THREADQ(0));
1585 #endif /* GRAN */
1586     }
1587
1588 #if defined(GRAN)
1589   next_thread:
1590     IF_GRAN_DEBUG(unused,
1591                   print_eventq(EventHd));
1592
1593     event = get_next_event();
1594 #elif defined(PAR)
1595   next_thread:
1596     /* ToDo: wait for next message to arrive rather than busy wait */
1597 #endif /* GRAN */
1598
1599   } /* end of while(1) */
1600
1601   IF_PAR_DEBUG(verbose,
1602                belch("== Leaving schedule() after having received Finish"));
1603 }
1604
1605 /* ---------------------------------------------------------------------------
1606  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1607  * used by Control.Concurrent for error checking.
1608  * ------------------------------------------------------------------------- */
1609  
1610 StgBool
1611 rtsSupportsBoundThreads(void)
1612 {
1613 #ifdef THREADED_RTS
1614   return rtsTrue;
1615 #else
1616   return rtsFalse;
1617 #endif
1618 }
1619
1620 /* ---------------------------------------------------------------------------
1621  * isThreadBound(tso): check whether tso is bound to an OS thread.
1622  * ------------------------------------------------------------------------- */
1623  
1624 StgBool
1625 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1626 {
1627 #ifdef THREADED_RTS
1628   StgMainThread *m;
1629   for(m = main_threads; m; m = m->link)
1630   {
1631     if(m->tso == tso)
1632       return rtsTrue;
1633   }
1634 #endif
1635   return rtsFalse;
1636 }
1637
1638 /* ---------------------------------------------------------------------------
1639  * Singleton fork(). Do not copy any running threads.
1640  * ------------------------------------------------------------------------- */
1641
1642 #ifdef THREADED_RTS
1643 static void 
1644 deleteThreadImmediately(StgTSO *tso);
1645 #endif
1646
1647 StgInt
1648 forkProcess(StgTSO* tso)
1649 {
1650 #ifndef mingw32_TARGET_OS
1651   pid_t pid;
1652   StgTSO* t,*next;
1653
1654   IF_DEBUG(scheduler,sched_belch("forking!"));
1655   ACQUIRE_LOCK(&sched_mutex);
1656
1657   pid = fork();
1658   if (pid) { /* parent */
1659
1660   /* just return the pid */
1661     
1662   } else { /* child */
1663 #ifdef THREADED_RTS
1664     /* wipe all other threads */
1665     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1666     tso->link = END_TSO_QUEUE;
1667     
1668     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1669       next = t->link;
1670       
1671       /* Don't kill the current thread.. */
1672       if (t->id == tso->id) {
1673         continue;
1674       }
1675       
1676       if (isThreadBound(t)) {
1677         // If the thread is bound, the OS thread that the thread is bound to
1678         // no longer exists after the fork() system call.
1679         // The bound Haskell thread is therefore unable to run at all;
1680         // we must not give it a chance to survive by catching the
1681         // ThreadKilled exception. So we kill it "brutally" rather than
1682         // using deleteThread.
1683         deleteThreadImmediately(t);
1684       } else {
1685         deleteThread(t);
1686       }
1687     }
1688     
1689     if (isThreadBound(tso)) {
1690     } else {
1691       // If the current is not bound, then we should make it so.
1692       // The OS thread left over by fork() is special in that the process
1693       // will terminate as soon as the thread terminates; 
1694       // we'd expect forkProcess to behave similarily.
1695       // FIXME - we don't do this.
1696     }
1697 #else
1698   StgMainThread *m;
1699   rtsBool doKill;
1700   /* wipe all other threads */
1701   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1702   tso->link = END_TSO_QUEUE;
1703
1704   /* When clearing out the threads, we need to ensure
1705      that a 'main thread' is left behind; if there isn't,
1706      the Scheduler will shutdown next time it is entered.
1707      
1708      ==> we don't kill a thread that's on the main_threads
1709          list (nor the current thread.)
1710     
1711      [ Attempts at implementing the more ambitious scheme of
1712        killing the main_threads also, and then adding the
1713        current thread onto the main_threads list if it wasn't
1714        there already, failed -- waitThread() (for one) wasn't
1715        up to it. If it proves to be desirable to also kill
1716        the main threads, then this scheme will have to be
1717        revisited (and fully debugged!)
1718        
1719        -- sof 7/2002
1720      ]
1721   */
1722   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1723      us is picky about finding the thread still in its queue when
1724      handling the deleteThread() */
1725
1726   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1727     next = t->link;
1728     
1729     /* Don't kill the current thread.. */
1730     if (t->id == tso->id) continue;
1731     doKill=rtsTrue;
1732     /* ..or a main thread */
1733     for (m = main_threads; m != NULL; m = m->link) {
1734         if (m->tso->id == t->id) {
1735           doKill=rtsFalse;
1736           break;
1737         }
1738     }
1739     if (doKill) {
1740       deleteThread(t);
1741     }
1742   }
1743 #endif
1744   }
1745   RELEASE_LOCK(&sched_mutex);
1746   return pid;
1747 #else /* mingw32 */
1748   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1749   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1750   return -1;
1751 #endif /* mingw32 */
1752 }
1753
1754 /* ---------------------------------------------------------------------------
1755  * deleteAllThreads():  kill all the live threads.
1756  *
1757  * This is used when we catch a user interrupt (^C), before performing
1758  * any necessary cleanups and running finalizers.
1759  *
1760  * Locks: sched_mutex held.
1761  * ------------------------------------------------------------------------- */
1762    
1763 void
1764 deleteAllThreads ( void )
1765 {
1766   StgTSO* t, *next;
1767   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1768   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1769       next = t->global_link;
1770       deleteThread(t);
1771   }      
1772   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1773   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1774   sleeping_queue = END_TSO_QUEUE;
1775 }
1776
1777 /* startThread and  insertThread are now in GranSim.c -- HWL */
1778
1779
1780 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1781 //@subsection Suspend and Resume
1782
1783 /* ---------------------------------------------------------------------------
1784  * Suspending & resuming Haskell threads.
1785  * 
1786  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1787  * its capability before calling the C function.  This allows another
1788  * task to pick up the capability and carry on running Haskell
1789  * threads.  It also means that if the C call blocks, it won't lock
1790  * the whole system.
1791  *
1792  * The Haskell thread making the C call is put to sleep for the
1793  * duration of the call, on the susepended_ccalling_threads queue.  We
1794  * give out a token to the task, which it can use to resume the thread
1795  * on return from the C function.
1796  * ------------------------------------------------------------------------- */
1797    
1798 StgInt
1799 suspendThread( StgRegTable *reg, 
1800                rtsBool concCall
1801 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1802                STG_UNUSED
1803 #endif
1804                )
1805 {
1806   nat tok;
1807   Capability *cap;
1808   int saved_errno = errno;
1809
1810   /* assume that *reg is a pointer to the StgRegTable part
1811    * of a Capability.
1812    */
1813   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1814
1815   ACQUIRE_LOCK(&sched_mutex);
1816
1817   IF_DEBUG(scheduler,
1818            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1819
1820   // XXX this might not be necessary --SDM
1821   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1822
1823   threadPaused(cap->r.rCurrentTSO);
1824   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1825   suspended_ccalling_threads = cap->r.rCurrentTSO;
1826
1827 #if defined(RTS_SUPPORTS_THREADS)
1828   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1829   {
1830       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1831       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1832   }
1833   else
1834   {
1835       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1836   }
1837 #endif
1838
1839   /* Use the thread ID as the token; it should be unique */
1840   tok = cap->r.rCurrentTSO->id;
1841
1842   /* Hand back capability */
1843   releaseCapability(cap);
1844   
1845 #if defined(RTS_SUPPORTS_THREADS)
1846   /* Preparing to leave the RTS, so ensure there's a native thread/task
1847      waiting to take over.
1848   */
1849   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
1850 #endif
1851
1852   /* Other threads _might_ be available for execution; signal this */
1853   THREAD_RUNNABLE();
1854   RELEASE_LOCK(&sched_mutex);
1855   
1856   errno = saved_errno;
1857   return tok; 
1858 }
1859
1860 StgRegTable *
1861 resumeThread( StgInt tok,
1862               rtsBool concCall STG_UNUSED )
1863 {
1864   StgTSO *tso, **prev;
1865   Capability *cap;
1866   int saved_errno = errno;
1867
1868 #if defined(RTS_SUPPORTS_THREADS)
1869   /* Wait for permission to re-enter the RTS with the result. */
1870   ACQUIRE_LOCK(&sched_mutex);
1871   grabReturnCapability(&sched_mutex, &cap);
1872
1873   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
1874 #else
1875   grabCapability(&cap);
1876 #endif
1877
1878   /* Remove the thread off of the suspended list */
1879   prev = &suspended_ccalling_threads;
1880   for (tso = suspended_ccalling_threads; 
1881        tso != END_TSO_QUEUE; 
1882        prev = &tso->link, tso = tso->link) {
1883     if (tso->id == (StgThreadID)tok) {
1884       *prev = tso->link;
1885       break;
1886     }
1887   }
1888   if (tso == END_TSO_QUEUE) {
1889     barf("resumeThread: thread not found");
1890   }
1891   tso->link = END_TSO_QUEUE;
1892   
1893 #if defined(RTS_SUPPORTS_THREADS)
1894   if(tso->why_blocked == BlockedOnCCall)
1895   {
1896       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1897       tso->blocked_exceptions = NULL;
1898   }
1899 #endif
1900   
1901   /* Reset blocking status */
1902   tso->why_blocked  = NotBlocked;
1903
1904   cap->r.rCurrentTSO = tso;
1905 #if defined(RTS_SUPPORTS_THREADS)
1906   RELEASE_LOCK(&sched_mutex);
1907 #endif
1908   errno = saved_errno;
1909   return &cap->r;
1910 }
1911
1912
1913 /* ---------------------------------------------------------------------------
1914  * Static functions
1915  * ------------------------------------------------------------------------ */
1916 static void unblockThread(StgTSO *tso);
1917
1918 /* ---------------------------------------------------------------------------
1919  * Comparing Thread ids.
1920  *
1921  * This is used from STG land in the implementation of the
1922  * instances of Eq/Ord for ThreadIds.
1923  * ------------------------------------------------------------------------ */
1924
1925 int
1926 cmp_thread(StgPtr tso1, StgPtr tso2) 
1927
1928   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1929   StgThreadID id2 = ((StgTSO *)tso2)->id;
1930  
1931   if (id1 < id2) return (-1);
1932   if (id1 > id2) return 1;
1933   return 0;
1934 }
1935
1936 /* ---------------------------------------------------------------------------
1937  * Fetching the ThreadID from an StgTSO.
1938  *
1939  * This is used in the implementation of Show for ThreadIds.
1940  * ------------------------------------------------------------------------ */
1941 int
1942 rts_getThreadId(StgPtr tso) 
1943 {
1944   return ((StgTSO *)tso)->id;
1945 }
1946
1947 #ifdef DEBUG
1948 void
1949 labelThread(StgPtr tso, char *label)
1950 {
1951   int len;
1952   void *buf;
1953
1954   /* Caveat: Once set, you can only set the thread name to "" */
1955   len = strlen(label)+1;
1956   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1957   strncpy(buf,label,len);
1958   /* Update will free the old memory for us */
1959   updateThreadLabel((StgWord)tso,buf);
1960 }
1961 #endif /* DEBUG */
1962
1963 /* ---------------------------------------------------------------------------
1964    Create a new thread.
1965
1966    The new thread starts with the given stack size.  Before the
1967    scheduler can run, however, this thread needs to have a closure
1968    (and possibly some arguments) pushed on its stack.  See
1969    pushClosure() in Schedule.h.
1970
1971    createGenThread() and createIOThread() (in SchedAPI.h) are
1972    convenient packaged versions of this function.
1973
1974    currently pri (priority) is only used in a GRAN setup -- HWL
1975    ------------------------------------------------------------------------ */
1976 //@cindex createThread
1977 #if defined(GRAN)
1978 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1979 StgTSO *
1980 createThread(nat size, StgInt pri)
1981 #else
1982 StgTSO *
1983 createThread(nat size)
1984 #endif
1985 {
1986
1987     StgTSO *tso;
1988     nat stack_size;
1989
1990     /* First check whether we should create a thread at all */
1991 #if defined(PAR)
1992   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1993   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1994     threadsIgnored++;
1995     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1996           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1997     return END_TSO_QUEUE;
1998   }
1999   threadsCreated++;
2000 #endif
2001
2002 #if defined(GRAN)
2003   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2004 #endif
2005
2006   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2007
2008   /* catch ridiculously small stack sizes */
2009   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2010     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2011   }
2012
2013   stack_size = size - TSO_STRUCT_SIZEW;
2014
2015   tso = (StgTSO *)allocate(size);
2016   TICK_ALLOC_TSO(stack_size, 0);
2017
2018   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2019 #if defined(GRAN)
2020   SET_GRAN_HDR(tso, ThisPE);
2021 #endif
2022
2023   // Always start with the compiled code evaluator
2024   tso->what_next = ThreadRunGHC;
2025
2026   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
2027    * protect the increment operation on next_thread_id.
2028    * In future, we could use an atomic increment instead.
2029    */
2030   ACQUIRE_LOCK(&thread_id_mutex);
2031   tso->id = next_thread_id++; 
2032   RELEASE_LOCK(&thread_id_mutex);
2033
2034   tso->why_blocked  = NotBlocked;
2035   tso->blocked_exceptions = NULL;
2036
2037   tso->saved_errno = 0;
2038   
2039   tso->stack_size   = stack_size;
2040   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2041                               - TSO_STRUCT_SIZEW;
2042   tso->sp           = (P_)&(tso->stack) + stack_size;
2043
2044 #ifdef PROFILING
2045   tso->prof.CCCS = CCS_MAIN;
2046 #endif
2047
2048   /* put a stop frame on the stack */
2049   tso->sp -= sizeofW(StgStopFrame);
2050   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2051   // ToDo: check this
2052 #if defined(GRAN)
2053   tso->link = END_TSO_QUEUE;
2054   /* uses more flexible routine in GranSim */
2055   insertThread(tso, CurrentProc);
2056 #else
2057   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2058    * from its creation
2059    */
2060 #endif
2061
2062 #if defined(GRAN) 
2063   if (RtsFlags.GranFlags.GranSimStats.Full) 
2064     DumpGranEvent(GR_START,tso);
2065 #elif defined(PAR)
2066   if (RtsFlags.ParFlags.ParStats.Full) 
2067     DumpGranEvent(GR_STARTQ,tso);
2068   /* HACk to avoid SCHEDULE 
2069      LastTSO = tso; */
2070 #endif
2071
2072   /* Link the new thread on the global thread list.
2073    */
2074   tso->global_link = all_threads;
2075   all_threads = tso;
2076
2077 #if defined(DIST)
2078   tso->dist.priority = MandatoryPriority; //by default that is...
2079 #endif
2080
2081 #if defined(GRAN)
2082   tso->gran.pri = pri;
2083 # if defined(DEBUG)
2084   tso->gran.magic = TSO_MAGIC; // debugging only
2085 # endif
2086   tso->gran.sparkname   = 0;
2087   tso->gran.startedat   = CURRENT_TIME; 
2088   tso->gran.exported    = 0;
2089   tso->gran.basicblocks = 0;
2090   tso->gran.allocs      = 0;
2091   tso->gran.exectime    = 0;
2092   tso->gran.fetchtime   = 0;
2093   tso->gran.fetchcount  = 0;
2094   tso->gran.blocktime   = 0;
2095   tso->gran.blockcount  = 0;
2096   tso->gran.blockedat   = 0;
2097   tso->gran.globalsparks = 0;
2098   tso->gran.localsparks  = 0;
2099   if (RtsFlags.GranFlags.Light)
2100     tso->gran.clock  = Now; /* local clock */
2101   else
2102     tso->gran.clock  = 0;
2103
2104   IF_DEBUG(gran,printTSO(tso));
2105 #elif defined(PAR)
2106 # if defined(DEBUG)
2107   tso->par.magic = TSO_MAGIC; // debugging only
2108 # endif
2109   tso->par.sparkname   = 0;
2110   tso->par.startedat   = CURRENT_TIME; 
2111   tso->par.exported    = 0;
2112   tso->par.basicblocks = 0;
2113   tso->par.allocs      = 0;
2114   tso->par.exectime    = 0;
2115   tso->par.fetchtime   = 0;
2116   tso->par.fetchcount  = 0;
2117   tso->par.blocktime   = 0;
2118   tso->par.blockcount  = 0;
2119   tso->par.blockedat   = 0;
2120   tso->par.globalsparks = 0;
2121   tso->par.localsparks  = 0;
2122 #endif
2123
2124 #if defined(GRAN)
2125   globalGranStats.tot_threads_created++;
2126   globalGranStats.threads_created_on_PE[CurrentProc]++;
2127   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2128   globalGranStats.tot_sq_probes++;
2129 #elif defined(PAR)
2130   // collect parallel global statistics (currently done together with GC stats)
2131   if (RtsFlags.ParFlags.ParStats.Global &&
2132       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2133     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2134     globalParStats.tot_threads_created++;
2135   }
2136 #endif 
2137
2138 #if defined(GRAN)
2139   IF_GRAN_DEBUG(pri,
2140                 belch("==__ schedule: Created TSO %d (%p);",
2141                       CurrentProc, tso, tso->id));
2142 #elif defined(PAR)
2143     IF_PAR_DEBUG(verbose,
2144                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
2145                        tso->id, tso, advisory_thread_count));
2146 #else
2147   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2148                                  tso->id, tso->stack_size));
2149 #endif    
2150   return tso;
2151 }
2152
2153 #if defined(PAR)
2154 /* RFP:
2155    all parallel thread creation calls should fall through the following routine.
2156 */
2157 StgTSO *
2158 createSparkThread(rtsSpark spark) 
2159 { StgTSO *tso;
2160   ASSERT(spark != (rtsSpark)NULL);
2161   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2162   { threadsIgnored++;
2163     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2164           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2165     return END_TSO_QUEUE;
2166   }
2167   else
2168   { threadsCreated++;
2169     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2170     if (tso==END_TSO_QUEUE)     
2171       barf("createSparkThread: Cannot create TSO");
2172 #if defined(DIST)
2173     tso->priority = AdvisoryPriority;
2174 #endif
2175     pushClosure(tso,spark);
2176     PUSH_ON_RUN_QUEUE(tso);
2177     advisory_thread_count++;    
2178   }
2179   return tso;
2180 }
2181 #endif
2182
2183 /*
2184   Turn a spark into a thread.
2185   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2186 */
2187 #if defined(PAR)
2188 //@cindex activateSpark
2189 StgTSO *
2190 activateSpark (rtsSpark spark) 
2191 {
2192   StgTSO *tso;
2193
2194   tso = createSparkThread(spark);
2195   if (RtsFlags.ParFlags.ParStats.Full) {   
2196     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2197     IF_PAR_DEBUG(verbose,
2198                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
2199                        (StgClosure *)spark, info_type((StgClosure *)spark)));
2200   }
2201   // ToDo: fwd info on local/global spark to thread -- HWL
2202   // tso->gran.exported =  spark->exported;
2203   // tso->gran.locked =   !spark->global;
2204   // tso->gran.sparkname = spark->name;
2205
2206   return tso;
2207 }
2208 #endif
2209
2210 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
2211                                    Capability *initialCapability
2212                                    );
2213
2214
2215 /* ---------------------------------------------------------------------------
2216  * scheduleThread()
2217  *
2218  * scheduleThread puts a thread on the head of the runnable queue.
2219  * This will usually be done immediately after a thread is created.
2220  * The caller of scheduleThread must create the thread using e.g.
2221  * createThread and push an appropriate closure
2222  * on this thread's stack before the scheduler is invoked.
2223  * ------------------------------------------------------------------------ */
2224
2225 static void scheduleThread_ (StgTSO* tso);
2226
2227 void
2228 scheduleThread_(StgTSO *tso)
2229 {
2230   // Precondition: sched_mutex must be held.
2231
2232   /* Put the new thread on the head of the runnable queue.  The caller
2233    * better push an appropriate closure on this thread's stack
2234    * beforehand.  In the SMP case, the thread may start running as
2235    * soon as we release the scheduler lock below.
2236    */
2237   PUSH_ON_RUN_QUEUE(tso);
2238   THREAD_RUNNABLE();
2239
2240 #if 0
2241   IF_DEBUG(scheduler,printTSO(tso));
2242 #endif
2243 }
2244
2245 void scheduleThread(StgTSO* tso)
2246 {
2247   ACQUIRE_LOCK(&sched_mutex);
2248   scheduleThread_(tso);
2249   RELEASE_LOCK(&sched_mutex);
2250 }
2251
2252 SchedulerStatus
2253 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability)
2254 {       // Precondition: sched_mutex must be held
2255   StgMainThread *m;
2256
2257   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2258   m->tso = tso;
2259   m->ret = ret;
2260   m->stat = NoStatus;
2261 #if defined(RTS_SUPPORTS_THREADS)
2262 #if defined(THREADED_RTS)
2263   initCondition(&m->bound_thread_cond);
2264 #else
2265   initCondition(&m->wakeup);
2266 #endif
2267 #endif
2268
2269   /* Put the thread on the main-threads list prior to scheduling the TSO.
2270      Failure to do so introduces a race condition in the MT case (as
2271      identified by Wolfgang Thaller), whereby the new task/OS thread 
2272      created by scheduleThread_() would complete prior to the thread
2273      that spawned it managed to put 'itself' on the main-threads list.
2274      The upshot of it all being that the worker thread wouldn't get to
2275      signal the completion of the its work item for the main thread to
2276      see (==> it got stuck waiting.)    -- sof 6/02.
2277   */
2278   IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
2279   
2280   m->link = main_threads;
2281   main_threads = m;
2282
2283   scheduleThread_(tso);
2284
2285   return waitThread_(m, initialCapability);
2286 }
2287
2288 /* ---------------------------------------------------------------------------
2289  * initScheduler()
2290  *
2291  * Initialise the scheduler.  This resets all the queues - if the
2292  * queues contained any threads, they'll be garbage collected at the
2293  * next pass.
2294  *
2295  * ------------------------------------------------------------------------ */
2296
2297 #ifdef SMP
2298 static void
2299 term_handler(int sig STG_UNUSED)
2300 {
2301   stat_workerStop();
2302   ACQUIRE_LOCK(&term_mutex);
2303   await_death--;
2304   RELEASE_LOCK(&term_mutex);
2305   shutdownThread();
2306 }
2307 #endif
2308
2309 void 
2310 initScheduler(void)
2311 {
2312 #if defined(GRAN)
2313   nat i;
2314
2315   for (i=0; i<=MAX_PROC; i++) {
2316     run_queue_hds[i]      = END_TSO_QUEUE;
2317     run_queue_tls[i]      = END_TSO_QUEUE;
2318     blocked_queue_hds[i]  = END_TSO_QUEUE;
2319     blocked_queue_tls[i]  = END_TSO_QUEUE;
2320     ccalling_threadss[i]  = END_TSO_QUEUE;
2321     sleeping_queue        = END_TSO_QUEUE;
2322   }
2323 #else
2324   run_queue_hd      = END_TSO_QUEUE;
2325   run_queue_tl      = END_TSO_QUEUE;
2326   blocked_queue_hd  = END_TSO_QUEUE;
2327   blocked_queue_tl  = END_TSO_QUEUE;
2328   sleeping_queue    = END_TSO_QUEUE;
2329 #endif 
2330
2331   suspended_ccalling_threads  = END_TSO_QUEUE;
2332
2333   main_threads = NULL;
2334   all_threads  = END_TSO_QUEUE;
2335
2336   context_switch = 0;
2337   interrupted    = 0;
2338
2339   RtsFlags.ConcFlags.ctxtSwitchTicks =
2340       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2341       
2342 #if defined(RTS_SUPPORTS_THREADS)
2343   /* Initialise the mutex and condition variables used by
2344    * the scheduler. */
2345   initMutex(&sched_mutex);
2346   initMutex(&term_mutex);
2347   initMutex(&thread_id_mutex);
2348
2349   initCondition(&thread_ready_cond);
2350 #endif
2351   
2352 #if defined(SMP)
2353   initCondition(&gc_pending_cond);
2354 #endif
2355
2356 #if defined(RTS_SUPPORTS_THREADS)
2357   ACQUIRE_LOCK(&sched_mutex);
2358 #endif
2359
2360   /* Install the SIGHUP handler */
2361 #if defined(SMP)
2362   {
2363     struct sigaction action,oact;
2364
2365     action.sa_handler = term_handler;
2366     sigemptyset(&action.sa_mask);
2367     action.sa_flags = 0;
2368     if (sigaction(SIGTERM, &action, &oact) != 0) {
2369       barf("can't install TERM handler");
2370     }
2371   }
2372 #endif
2373
2374   /* A capability holds the state a native thread needs in
2375    * order to execute STG code. At least one capability is
2376    * floating around (only SMP builds have more than one).
2377    */
2378   initCapabilities();
2379   
2380 #if defined(RTS_SUPPORTS_THREADS)
2381     /* start our haskell execution tasks */
2382 # if defined(SMP)
2383     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2384 # else
2385     startTaskManager(0,taskStart);
2386 # endif
2387 #endif
2388
2389 #if /* defined(SMP) ||*/ defined(PAR)
2390   initSparkPools();
2391 #endif
2392
2393 #if defined(RTS_SUPPORTS_THREADS)
2394   RELEASE_LOCK(&sched_mutex);
2395 #endif
2396
2397 }
2398
2399 void
2400 exitScheduler( void )
2401 {
2402 #if defined(RTS_SUPPORTS_THREADS)
2403   stopTaskManager();
2404 #endif
2405   shutting_down_scheduler = rtsTrue;
2406 }
2407
2408 /* -----------------------------------------------------------------------------
2409    Managing the per-task allocation areas.
2410    
2411    Each capability comes with an allocation area.  These are
2412    fixed-length block lists into which allocation can be done.
2413
2414    ToDo: no support for two-space collection at the moment???
2415    -------------------------------------------------------------------------- */
2416
2417 /* -----------------------------------------------------------------------------
2418  * waitThread is the external interface for running a new computation
2419  * and waiting for the result.
2420  *
2421  * In the non-SMP case, we create a new main thread, push it on the 
2422  * main-thread stack, and invoke the scheduler to run it.  The
2423  * scheduler will return when the top main thread on the stack has
2424  * completed or died, and fill in the necessary fields of the
2425  * main_thread structure.
2426  *
2427  * In the SMP case, we create a main thread as before, but we then
2428  * create a new condition variable and sleep on it.  When our new
2429  * main thread has completed, we'll be woken up and the status/result
2430  * will be in the main_thread struct.
2431  * -------------------------------------------------------------------------- */
2432
2433 int 
2434 howManyThreadsAvail ( void )
2435 {
2436    int i = 0;
2437    StgTSO* q;
2438    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2439       i++;
2440    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2441       i++;
2442    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2443       i++;
2444    return i;
2445 }
2446
2447 void
2448 finishAllThreads ( void )
2449 {
2450    do {
2451       while (run_queue_hd != END_TSO_QUEUE) {
2452          waitThread ( run_queue_hd, NULL, NULL );
2453       }
2454       while (blocked_queue_hd != END_TSO_QUEUE) {
2455          waitThread ( blocked_queue_hd, NULL, NULL );
2456       }
2457       while (sleeping_queue != END_TSO_QUEUE) {
2458          waitThread ( blocked_queue_hd, NULL, NULL );
2459       }
2460    } while 
2461       (blocked_queue_hd != END_TSO_QUEUE || 
2462        run_queue_hd     != END_TSO_QUEUE ||
2463        sleeping_queue   != END_TSO_QUEUE);
2464 }
2465
2466 SchedulerStatus
2467 waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
2468
2469   StgMainThread *m;
2470   SchedulerStatus stat;
2471
2472   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2473   m->tso = tso;
2474   m->ret = ret;
2475   m->stat = NoStatus;
2476 #if defined(RTS_SUPPORTS_THREADS)
2477 #if defined(THREADED_RTS)
2478   initCondition(&m->bound_thread_cond);
2479 #else
2480   initCondition(&m->wakeup);
2481 #endif
2482 #endif
2483
2484   /* see scheduleWaitThread() comment */
2485   ACQUIRE_LOCK(&sched_mutex);
2486   m->link = main_threads;
2487   main_threads = m;
2488
2489   IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
2490
2491   stat = waitThread_(m,initialCapability);
2492   
2493   RELEASE_LOCK(&sched_mutex);
2494   return stat;
2495 }
2496
2497 static
2498 SchedulerStatus
2499 waitThread_(StgMainThread* m, Capability *initialCapability)
2500 {
2501   SchedulerStatus stat;
2502
2503   // Precondition: sched_mutex must be held.
2504   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2505
2506 #if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS)
2507   {     // FIXME: does this still make sense?
2508         // It's not for the threaded rts => SMP only
2509     do {
2510       waitCondition(&m->wakeup, &sched_mutex);
2511     } while (m->stat == NoStatus);
2512   }
2513 #elif defined(GRAN)
2514   /* GranSim specific init */
2515   CurrentTSO = m->tso;                // the TSO to run
2516   procStatus[MainProc] = Busy;        // status of main PE
2517   CurrentProc = MainProc;             // PE to run it on
2518
2519   RELEASE_LOCK(&sched_mutex);
2520   schedule(m,initialCapability);
2521 #else
2522   RELEASE_LOCK(&sched_mutex);
2523   schedule(m,initialCapability);
2524   ACQUIRE_LOCK(&sched_mutex);
2525   ASSERT(m->stat != NoStatus);
2526 #endif
2527
2528   stat = m->stat;
2529
2530 #if defined(RTS_SUPPORTS_THREADS)
2531 #if defined(THREADED_RTS)
2532   closeCondition(&m->bound_thread_cond);
2533 #else
2534   closeCondition(&m->wakeup);
2535 #endif
2536 #endif
2537
2538   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2539                               m->tso->id));
2540   stgFree(m);
2541
2542   // Postcondition: sched_mutex still held
2543   return stat;
2544 }
2545
2546 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2547 //@subsection Run queue code 
2548
2549 #if 0
2550 /* 
2551    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2552        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2553        implicit global variable that has to be correct when calling these
2554        fcts -- HWL 
2555 */
2556
2557 /* Put the new thread on the head of the runnable queue.
2558  * The caller of createThread better push an appropriate closure
2559  * on this thread's stack before the scheduler is invoked.
2560  */
2561 static /* inline */ void
2562 add_to_run_queue(tso)
2563 StgTSO* tso; 
2564 {
2565   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2566   tso->link = run_queue_hd;
2567   run_queue_hd = tso;
2568   if (run_queue_tl == END_TSO_QUEUE) {
2569     run_queue_tl = tso;
2570   }
2571 }
2572
2573 /* Put the new thread at the end of the runnable queue. */
2574 static /* inline */ void
2575 push_on_run_queue(tso)
2576 StgTSO* tso; 
2577 {
2578   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2579   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2580   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2581   if (run_queue_hd == END_TSO_QUEUE) {
2582     run_queue_hd = tso;
2583   } else {
2584     run_queue_tl->link = tso;
2585   }
2586   run_queue_tl = tso;
2587 }
2588
2589 /* 
2590    Should be inlined because it's used very often in schedule.  The tso
2591    argument is actually only needed in GranSim, where we want to have the
2592    possibility to schedule *any* TSO on the run queue, irrespective of the
2593    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2594    the run queue and dequeue the tso, adjusting the links in the queue. 
2595 */
2596 //@cindex take_off_run_queue
2597 static /* inline */ StgTSO*
2598 take_off_run_queue(StgTSO *tso) {
2599   StgTSO *t, *prev;
2600
2601   /* 
2602      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2603
2604      if tso is specified, unlink that tso from the run_queue (doesn't have
2605      to be at the beginning of the queue); GranSim only 
2606   */
2607   if (tso!=END_TSO_QUEUE) {
2608     /* find tso in queue */
2609     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2610          t!=END_TSO_QUEUE && t!=tso;
2611          prev=t, t=t->link) 
2612       /* nothing */ ;
2613     ASSERT(t==tso);
2614     /* now actually dequeue the tso */
2615     if (prev!=END_TSO_QUEUE) {
2616       ASSERT(run_queue_hd!=t);
2617       prev->link = t->link;
2618     } else {
2619       /* t is at beginning of thread queue */
2620       ASSERT(run_queue_hd==t);
2621       run_queue_hd = t->link;
2622     }
2623     /* t is at end of thread queue */
2624     if (t->link==END_TSO_QUEUE) {
2625       ASSERT(t==run_queue_tl);
2626       run_queue_tl = prev;
2627     } else {
2628       ASSERT(run_queue_tl!=t);
2629     }
2630     t->link = END_TSO_QUEUE;
2631   } else {
2632     /* take tso from the beginning of the queue; std concurrent code */
2633     t = run_queue_hd;
2634     if (t != END_TSO_QUEUE) {
2635       run_queue_hd = t->link;
2636       t->link = END_TSO_QUEUE;
2637       if (run_queue_hd == END_TSO_QUEUE) {
2638         run_queue_tl = END_TSO_QUEUE;
2639       }
2640     }
2641   }
2642   return t;
2643 }
2644
2645 #endif /* 0 */
2646
2647 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2648 //@subsection Garbage Collextion Routines
2649
2650 /* ---------------------------------------------------------------------------
2651    Where are the roots that we know about?
2652
2653         - all the threads on the runnable queue
2654         - all the threads on the blocked queue
2655         - all the threads on the sleeping queue
2656         - all the thread currently executing a _ccall_GC
2657         - all the "main threads"
2658      
2659    ------------------------------------------------------------------------ */
2660
2661 /* This has to be protected either by the scheduler monitor, or by the
2662         garbage collection monitor (probably the latter).
2663         KH @ 25/10/99
2664 */
2665
2666 void
2667 GetRoots(evac_fn evac)
2668 {
2669 #if defined(GRAN)
2670   {
2671     nat i;
2672     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2673       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2674           evac((StgClosure **)&run_queue_hds[i]);
2675       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2676           evac((StgClosure **)&run_queue_tls[i]);
2677       
2678       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2679           evac((StgClosure **)&blocked_queue_hds[i]);
2680       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2681           evac((StgClosure **)&blocked_queue_tls[i]);
2682       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2683           evac((StgClosure **)&ccalling_threads[i]);
2684     }
2685   }
2686
2687   markEventQueue();
2688
2689 #else /* !GRAN */
2690   if (run_queue_hd != END_TSO_QUEUE) {
2691       ASSERT(run_queue_tl != END_TSO_QUEUE);
2692       evac((StgClosure **)&run_queue_hd);
2693       evac((StgClosure **)&run_queue_tl);
2694   }
2695   
2696   if (blocked_queue_hd != END_TSO_QUEUE) {
2697       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2698       evac((StgClosure **)&blocked_queue_hd);
2699       evac((StgClosure **)&blocked_queue_tl);
2700   }
2701   
2702   if (sleeping_queue != END_TSO_QUEUE) {
2703       evac((StgClosure **)&sleeping_queue);
2704   }
2705 #endif 
2706
2707   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2708       evac((StgClosure **)&suspended_ccalling_threads);
2709   }
2710
2711 #if defined(PAR) || defined(GRAN)
2712   markSparkQueue(evac);
2713 #endif
2714
2715 #if defined(RTS_USER_SIGNALS)
2716   // mark the signal handlers (signals should be already blocked)
2717   markSignalHandlers(evac);
2718 #endif
2719
2720   // main threads which have completed need to be retained until they
2721   // are dealt with in the main scheduler loop.  They won't be
2722   // retained any other way: the GC will drop them from the
2723   // all_threads list, so we have to be careful to treat them as roots
2724   // here.
2725   { 
2726       StgMainThread *m;
2727       for (m = main_threads; m != NULL; m = m->link) {
2728           switch (m->tso->what_next) {
2729           case ThreadComplete:
2730           case ThreadKilled:
2731               evac((StgClosure **)&m->tso);
2732               break;
2733           default:
2734               break;
2735           }
2736       }
2737   }
2738 }
2739
2740 /* -----------------------------------------------------------------------------
2741    performGC
2742
2743    This is the interface to the garbage collector from Haskell land.
2744    We provide this so that external C code can allocate and garbage
2745    collect when called from Haskell via _ccall_GC.
2746
2747    It might be useful to provide an interface whereby the programmer
2748    can specify more roots (ToDo).
2749    
2750    This needs to be protected by the GC condition variable above.  KH.
2751    -------------------------------------------------------------------------- */
2752
2753 static void (*extra_roots)(evac_fn);
2754
2755 void
2756 performGC(void)
2757 {
2758   /* Obligated to hold this lock upon entry */
2759   ACQUIRE_LOCK(&sched_mutex);
2760   GarbageCollect(GetRoots,rtsFalse);
2761   RELEASE_LOCK(&sched_mutex);
2762 }
2763
2764 void
2765 performMajorGC(void)
2766 {
2767   ACQUIRE_LOCK(&sched_mutex);
2768   GarbageCollect(GetRoots,rtsTrue);
2769   RELEASE_LOCK(&sched_mutex);
2770 }
2771
2772 static void
2773 AllRoots(evac_fn evac)
2774 {
2775     GetRoots(evac);             // the scheduler's roots
2776     extra_roots(evac);          // the user's roots
2777 }
2778
2779 void
2780 performGCWithRoots(void (*get_roots)(evac_fn))
2781 {
2782   ACQUIRE_LOCK(&sched_mutex);
2783   extra_roots = get_roots;
2784   GarbageCollect(AllRoots,rtsFalse);
2785   RELEASE_LOCK(&sched_mutex);
2786 }
2787
2788 /* -----------------------------------------------------------------------------
2789    Stack overflow
2790
2791    If the thread has reached its maximum stack size, then raise the
2792    StackOverflow exception in the offending thread.  Otherwise
2793    relocate the TSO into a larger chunk of memory and adjust its stack
2794    size appropriately.
2795    -------------------------------------------------------------------------- */
2796
2797 static StgTSO *
2798 threadStackOverflow(StgTSO *tso)
2799 {
2800   nat new_stack_size, new_tso_size, stack_words;
2801   StgPtr new_sp;
2802   StgTSO *dest;
2803
2804   IF_DEBUG(sanity,checkTSO(tso));
2805   if (tso->stack_size >= tso->max_stack_size) {
2806
2807     IF_DEBUG(gc,
2808              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2809                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2810              /* If we're debugging, just print out the top of the stack */
2811              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2812                                               tso->sp+64)));
2813
2814     /* Send this thread the StackOverflow exception */
2815     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2816     return tso;
2817   }
2818
2819   /* Try to double the current stack size.  If that takes us over the
2820    * maximum stack size for this thread, then use the maximum instead.
2821    * Finally round up so the TSO ends up as a whole number of blocks.
2822    */
2823   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2824   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2825                                        TSO_STRUCT_SIZE)/sizeof(W_);
2826   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2827   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2828
2829   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2830
2831   dest = (StgTSO *)allocate(new_tso_size);
2832   TICK_ALLOC_TSO(new_stack_size,0);
2833
2834   /* copy the TSO block and the old stack into the new area */
2835   memcpy(dest,tso,TSO_STRUCT_SIZE);
2836   stack_words = tso->stack + tso->stack_size - tso->sp;
2837   new_sp = (P_)dest + new_tso_size - stack_words;
2838   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2839
2840   /* relocate the stack pointers... */
2841   dest->sp         = new_sp;
2842   dest->stack_size = new_stack_size;
2843         
2844   /* Mark the old TSO as relocated.  We have to check for relocated
2845    * TSOs in the garbage collector and any primops that deal with TSOs.
2846    *
2847    * It's important to set the sp value to just beyond the end
2848    * of the stack, so we don't attempt to scavenge any part of the
2849    * dead TSO's stack.
2850    */
2851   tso->what_next = ThreadRelocated;
2852   tso->link = dest;
2853   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2854   tso->why_blocked = NotBlocked;
2855   dest->mut_link = NULL;
2856
2857   IF_PAR_DEBUG(verbose,
2858                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2859                      tso->id, tso, tso->stack_size);
2860                /* If we're debugging, just print out the top of the stack */
2861                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2862                                                 tso->sp+64)));
2863   
2864   IF_DEBUG(sanity,checkTSO(tso));
2865 #if 0
2866   IF_DEBUG(scheduler,printTSO(dest));
2867 #endif
2868
2869   return dest;
2870 }
2871
2872 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2873 //@subsection Blocking Queue Routines
2874
2875 /* ---------------------------------------------------------------------------
2876    Wake up a queue that was blocked on some resource.
2877    ------------------------------------------------------------------------ */
2878
2879 #if defined(GRAN)
2880 static inline void
2881 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2882 {
2883 }
2884 #elif defined(PAR)
2885 static inline void
2886 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2887 {
2888   /* write RESUME events to log file and
2889      update blocked and fetch time (depending on type of the orig closure) */
2890   if (RtsFlags.ParFlags.ParStats.Full) {
2891     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2892                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2893                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2894     if (EMPTY_RUN_QUEUE())
2895       emitSchedule = rtsTrue;
2896
2897     switch (get_itbl(node)->type) {
2898         case FETCH_ME_BQ:
2899           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2900           break;
2901         case RBH:
2902         case FETCH_ME:
2903         case BLACKHOLE_BQ:
2904           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2905           break;
2906 #ifdef DIST
2907         case MVAR:
2908           break;
2909 #endif    
2910         default:
2911           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2912         }
2913       }
2914 }
2915 #endif
2916
2917 #if defined(GRAN)
2918 static StgBlockingQueueElement *
2919 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2920 {
2921     StgTSO *tso;
2922     PEs node_loc, tso_loc;
2923
2924     node_loc = where_is(node); // should be lifted out of loop
2925     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2926     tso_loc = where_is((StgClosure *)tso);
2927     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2928       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2929       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2930       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2931       // insertThread(tso, node_loc);
2932       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2933                 ResumeThread,
2934                 tso, node, (rtsSpark*)NULL);
2935       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2936       // len_local++;
2937       // len++;
2938     } else { // TSO is remote (actually should be FMBQ)
2939       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2940                                   RtsFlags.GranFlags.Costs.gunblocktime +
2941                                   RtsFlags.GranFlags.Costs.latency;
2942       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2943                 UnblockThread,
2944                 tso, node, (rtsSpark*)NULL);
2945       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2946       // len++;
2947     }
2948     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2949     IF_GRAN_DEBUG(bq,
2950                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2951                           (node_loc==tso_loc ? "Local" : "Global"), 
2952                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2953     tso->block_info.closure = NULL;
2954     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2955                              tso->id, tso));
2956 }
2957 #elif defined(PAR)
2958 static StgBlockingQueueElement *
2959 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2960 {
2961     StgBlockingQueueElement *next;
2962
2963     switch (get_itbl(bqe)->type) {
2964     case TSO:
2965       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2966       /* if it's a TSO just push it onto the run_queue */
2967       next = bqe->link;
2968       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2969       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2970       THREAD_RUNNABLE();
2971       unblockCount(bqe, node);
2972       /* reset blocking status after dumping event */
2973       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2974       break;
2975
2976     case BLOCKED_FETCH:
2977       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2978       next = bqe->link;
2979       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2980       PendingFetches = (StgBlockedFetch *)bqe;
2981       break;
2982
2983 # if defined(DEBUG)
2984       /* can ignore this case in a non-debugging setup; 
2985          see comments on RBHSave closures above */
2986     case CONSTR:
2987       /* check that the closure is an RBHSave closure */
2988       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2989              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2990              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2991       break;
2992
2993     default:
2994       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2995            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2996            (StgClosure *)bqe);
2997 # endif
2998     }
2999   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
3000   return next;
3001 }
3002
3003 #else /* !GRAN && !PAR */
3004 static StgTSO *
3005 unblockOneLocked(StgTSO *tso)
3006 {
3007   StgTSO *next;
3008
3009   ASSERT(get_itbl(tso)->type == TSO);
3010   ASSERT(tso->why_blocked != NotBlocked);
3011   tso->why_blocked = NotBlocked;
3012   next = tso->link;
3013   PUSH_ON_RUN_QUEUE(tso);
3014   THREAD_RUNNABLE();
3015   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
3016   return next;
3017 }
3018 #endif
3019
3020 #if defined(GRAN) || defined(PAR)
3021 inline StgBlockingQueueElement *
3022 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3023 {
3024   ACQUIRE_LOCK(&sched_mutex);
3025   bqe = unblockOneLocked(bqe, node);
3026   RELEASE_LOCK(&sched_mutex);
3027   return bqe;
3028 }
3029 #else
3030 inline StgTSO *
3031 unblockOne(StgTSO *tso)
3032 {
3033   ACQUIRE_LOCK(&sched_mutex);
3034   tso = unblockOneLocked(tso);
3035   RELEASE_LOCK(&sched_mutex);
3036   return tso;
3037 }
3038 #endif
3039
3040 #if defined(GRAN)
3041 void 
3042 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3043 {
3044   StgBlockingQueueElement *bqe;
3045   PEs node_loc;
3046   nat len = 0; 
3047
3048   IF_GRAN_DEBUG(bq, 
3049                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
3050                       node, CurrentProc, CurrentTime[CurrentProc], 
3051                       CurrentTSO->id, CurrentTSO));
3052
3053   node_loc = where_is(node);
3054
3055   ASSERT(q == END_BQ_QUEUE ||
3056          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3057          get_itbl(q)->type == CONSTR); // closure (type constructor)
3058   ASSERT(is_unique(node));
3059
3060   /* FAKE FETCH: magically copy the node to the tso's proc;
3061      no Fetch necessary because in reality the node should not have been 
3062      moved to the other PE in the first place
3063   */
3064   if (CurrentProc!=node_loc) {
3065     IF_GRAN_DEBUG(bq, 
3066                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
3067                         node, node_loc, CurrentProc, CurrentTSO->id, 
3068                         // CurrentTSO, where_is(CurrentTSO),
3069                         node->header.gran.procs));
3070     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3071     IF_GRAN_DEBUG(bq, 
3072                   belch("## new bitmask of node %p is %#x",
3073                         node, node->header.gran.procs));
3074     if (RtsFlags.GranFlags.GranSimStats.Global) {
3075       globalGranStats.tot_fake_fetches++;
3076     }
3077   }
3078
3079   bqe = q;
3080   // ToDo: check: ASSERT(CurrentProc==node_loc);
3081   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3082     //next = bqe->link;
3083     /* 
3084        bqe points to the current element in the queue
3085        next points to the next element in the queue
3086     */
3087     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3088     //tso_loc = where_is(tso);
3089     len++;
3090     bqe = unblockOneLocked(bqe, node);
3091   }
3092
3093   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3094      the closure to make room for the anchor of the BQ */
3095   if (bqe!=END_BQ_QUEUE) {
3096     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3097     /*
3098     ASSERT((info_ptr==&RBH_Save_0_info) ||
3099            (info_ptr==&RBH_Save_1_info) ||
3100            (info_ptr==&RBH_Save_2_info));
3101     */
3102     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3103     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3104     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3105
3106     IF_GRAN_DEBUG(bq,
3107                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
3108                         node, info_type(node)));
3109   }
3110
3111   /* statistics gathering */
3112   if (RtsFlags.GranFlags.GranSimStats.Global) {
3113     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3114     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3115     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3116     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3117   }
3118   IF_GRAN_DEBUG(bq,
3119                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
3120                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3121 }
3122 #elif defined(PAR)
3123 void 
3124 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3125 {
3126   StgBlockingQueueElement *bqe;
3127
3128   ACQUIRE_LOCK(&sched_mutex);
3129
3130   IF_PAR_DEBUG(verbose, 
3131                belch("##-_ AwBQ for node %p on [%x]: ",
3132                      node, mytid));
3133 #ifdef DIST  
3134   //RFP
3135   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3136     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
3137     return;
3138   }
3139 #endif
3140   
3141   ASSERT(q == END_BQ_QUEUE ||
3142          get_itbl(q)->type == TSO ||           
3143          get_itbl(q)->type == BLOCKED_FETCH || 
3144          get_itbl(q)->type == CONSTR); 
3145
3146   bqe = q;
3147   while (get_itbl(bqe)->type==TSO || 
3148          get_itbl(bqe)->type==BLOCKED_FETCH) {
3149     bqe = unblockOneLocked(bqe, node);
3150   }
3151   RELEASE_LOCK(&sched_mutex);
3152 }
3153
3154 #else   /* !GRAN && !PAR */
3155
3156 #ifdef RTS_SUPPORTS_THREADS
3157 void
3158 awakenBlockedQueueNoLock(StgTSO *tso)
3159 {
3160   while (tso != END_TSO_QUEUE) {
3161     tso = unblockOneLocked(tso);
3162   }
3163 }
3164 #endif
3165
3166 void
3167 awakenBlockedQueue(StgTSO *tso)
3168 {
3169   ACQUIRE_LOCK(&sched_mutex);
3170   while (tso != END_TSO_QUEUE) {
3171     tso = unblockOneLocked(tso);
3172   }
3173   RELEASE_LOCK(&sched_mutex);
3174 }
3175 #endif
3176
3177 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
3178 //@subsection Exception Handling Routines
3179
3180 /* ---------------------------------------------------------------------------
3181    Interrupt execution
3182    - usually called inside a signal handler so it mustn't do anything fancy.   
3183    ------------------------------------------------------------------------ */
3184
3185 void
3186 interruptStgRts(void)
3187 {
3188     interrupted    = 1;
3189     context_switch = 1;
3190 }
3191
3192 /* -----------------------------------------------------------------------------
3193    Unblock a thread
3194
3195    This is for use when we raise an exception in another thread, which
3196    may be blocked.
3197    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3198    -------------------------------------------------------------------------- */
3199
3200 #if defined(GRAN) || defined(PAR)
3201 /*
3202   NB: only the type of the blocking queue is different in GranSim and GUM
3203       the operations on the queue-elements are the same
3204       long live polymorphism!
3205
3206   Locks: sched_mutex is held upon entry and exit.
3207
3208 */
3209 static void
3210 unblockThread(StgTSO *tso)
3211 {
3212   StgBlockingQueueElement *t, **last;
3213
3214   switch (tso->why_blocked) {
3215
3216   case NotBlocked:
3217     return;  /* not blocked */
3218
3219   case BlockedOnMVar:
3220     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3221     {
3222       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3223       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3224
3225       last = (StgBlockingQueueElement **)&mvar->head;
3226       for (t = (StgBlockingQueueElement *)mvar->head; 
3227            t != END_BQ_QUEUE; 
3228            last = &t->link, last_tso = t, t = t->link) {
3229         if (t == (StgBlockingQueueElement *)tso) {
3230           *last = (StgBlockingQueueElement *)tso->link;
3231           if (mvar->tail == tso) {
3232             mvar->tail = (StgTSO *)last_tso;
3233           }
3234           goto done;
3235         }
3236       }
3237       barf("unblockThread (MVAR): TSO not found");
3238     }
3239
3240   case BlockedOnBlackHole:
3241     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3242     {
3243       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3244
3245       last = &bq->blocking_queue;
3246       for (t = bq->blocking_queue; 
3247            t != END_BQ_QUEUE; 
3248            last = &t->link, t = t->link) {
3249         if (t == (StgBlockingQueueElement *)tso) {
3250           *last = (StgBlockingQueueElement *)tso->link;
3251           goto done;
3252         }
3253       }
3254       barf("unblockThread (BLACKHOLE): TSO not found");
3255     }
3256
3257   case BlockedOnException:
3258     {
3259       StgTSO *target  = tso->block_info.tso;
3260
3261       ASSERT(get_itbl(target)->type == TSO);
3262
3263       if (target->what_next == ThreadRelocated) {
3264           target = target->link;
3265           ASSERT(get_itbl(target)->type == TSO);
3266       }
3267
3268       ASSERT(target->blocked_exceptions != NULL);
3269
3270       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3271       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3272            t != END_BQ_QUEUE; 
3273            last = &t->link, t = t->link) {
3274         ASSERT(get_itbl(t)->type == TSO);
3275         if (t == (StgBlockingQueueElement *)tso) {
3276           *last = (StgBlockingQueueElement *)tso->link;
3277           goto done;
3278         }
3279       }
3280       barf("unblockThread (Exception): TSO not found");
3281     }
3282
3283   case BlockedOnRead:
3284   case BlockedOnWrite:
3285 #if defined(mingw32_TARGET_OS)
3286   case BlockedOnDoProc:
3287 #endif
3288     {
3289       /* take TSO off blocked_queue */
3290       StgBlockingQueueElement *prev = NULL;
3291       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3292            prev = t, t = t->link) {
3293         if (t == (StgBlockingQueueElement *)tso) {
3294           if (prev == NULL) {
3295             blocked_queue_hd = (StgTSO *)t->link;
3296             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3297               blocked_queue_tl = END_TSO_QUEUE;
3298             }
3299           } else {
3300             prev->link = t->link;
3301             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3302               blocked_queue_tl = (StgTSO *)prev;
3303             }
3304           }
3305           goto done;
3306         }
3307       }
3308       barf("unblockThread (I/O): TSO not found");
3309     }
3310
3311   case BlockedOnDelay:
3312     {
3313       /* take TSO off sleeping_queue */
3314       StgBlockingQueueElement *prev = NULL;
3315       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3316            prev = t, t = t->link) {
3317         if (t == (StgBlockingQueueElement *)tso) {
3318           if (prev == NULL) {
3319             sleeping_queue = (StgTSO *)t->link;
3320           } else {
3321             prev->link = t->link;
3322           }
3323           goto done;
3324         }
3325       }
3326       barf("unblockThread (delay): TSO not found");
3327     }
3328
3329   default:
3330     barf("unblockThread");
3331   }
3332
3333  done:
3334   tso->link = END_TSO_QUEUE;
3335   tso->why_blocked = NotBlocked;
3336   tso->block_info.closure = NULL;
3337   PUSH_ON_RUN_QUEUE(tso);
3338 }
3339 #else
3340 static void
3341 unblockThread(StgTSO *tso)
3342 {
3343   StgTSO *t, **last;
3344   
3345   /* To avoid locking unnecessarily. */
3346   if (tso->why_blocked == NotBlocked) {
3347     return;
3348   }
3349
3350   switch (tso->why_blocked) {
3351
3352   case BlockedOnMVar:
3353     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3354     {
3355       StgTSO *last_tso = END_TSO_QUEUE;
3356       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3357
3358       last = &mvar->head;
3359       for (t = mvar->head; t != END_TSO_QUEUE; 
3360            last = &t->link, last_tso = t, t = t->link) {
3361         if (t == tso) {
3362           *last = tso->link;
3363           if (mvar->tail == tso) {
3364             mvar->tail = last_tso;
3365           }
3366           goto done;
3367         }
3368       }
3369       barf("unblockThread (MVAR): TSO not found");
3370     }
3371
3372   case BlockedOnBlackHole:
3373     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3374     {
3375       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3376
3377       last = &bq->blocking_queue;
3378       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3379            last = &t->link, t = t->link) {
3380         if (t == tso) {
3381           *last = tso->link;
3382           goto done;
3383         }
3384       }
3385       barf("unblockThread (BLACKHOLE): TSO not found");
3386     }
3387
3388   case BlockedOnException:
3389     {
3390       StgTSO *target  = tso->block_info.tso;
3391
3392       ASSERT(get_itbl(target)->type == TSO);
3393
3394       while (target->what_next == ThreadRelocated) {
3395           target = target->link;
3396           ASSERT(get_itbl(target)->type == TSO);
3397       }
3398       
3399       ASSERT(target->blocked_exceptions != NULL);
3400
3401       last = &target->blocked_exceptions;
3402       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3403            last = &t->link, t = t->link) {
3404         ASSERT(get_itbl(t)->type == TSO);
3405         if (t == tso) {
3406           *last = tso->link;
3407           goto done;
3408         }
3409       }
3410       barf("unblockThread (Exception): TSO not found");
3411     }
3412
3413   case BlockedOnRead:
3414   case BlockedOnWrite:
3415 #if defined(mingw32_TARGET_OS)
3416   case BlockedOnDoProc:
3417 #endif
3418     {
3419       StgTSO *prev = NULL;
3420       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3421            prev = t, t = t->link) {
3422         if (t == tso) {
3423           if (prev == NULL) {
3424             blocked_queue_hd = t->link;
3425             if (blocked_queue_tl == t) {
3426               blocked_queue_tl = END_TSO_QUEUE;
3427             }
3428           } else {
3429             prev->link = t->link;
3430             if (blocked_queue_tl == t) {
3431               blocked_queue_tl = prev;
3432             }
3433           }
3434           goto done;
3435         }
3436       }
3437       barf("unblockThread (I/O): TSO not found");
3438     }
3439
3440   case BlockedOnDelay:
3441     {
3442       StgTSO *prev = NULL;
3443       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3444            prev = t, t = t->link) {
3445         if (t == tso) {
3446           if (prev == NULL) {
3447             sleeping_queue = t->link;
3448           } else {
3449             prev->link = t->link;
3450           }
3451           goto done;
3452         }
3453       }
3454       barf("unblockThread (delay): TSO not found");
3455     }
3456
3457   default:
3458     barf("unblockThread");
3459   }
3460
3461  done:
3462   tso->link = END_TSO_QUEUE;
3463   tso->why_blocked = NotBlocked;
3464   tso->block_info.closure = NULL;
3465   PUSH_ON_RUN_QUEUE(tso);
3466 }
3467 #endif
3468
3469 /* -----------------------------------------------------------------------------
3470  * raiseAsync()
3471  *
3472  * The following function implements the magic for raising an
3473  * asynchronous exception in an existing thread.
3474  *
3475  * We first remove the thread from any queue on which it might be
3476  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3477  *
3478  * We strip the stack down to the innermost CATCH_FRAME, building
3479  * thunks in the heap for all the active computations, so they can 
3480  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3481  * an application of the handler to the exception, and push it on
3482  * the top of the stack.
3483  * 
3484  * How exactly do we save all the active computations?  We create an
3485  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3486  * AP_STACKs pushes everything from the corresponding update frame
3487  * upwards onto the stack.  (Actually, it pushes everything up to the
3488  * next update frame plus a pointer to the next AP_STACK object.
3489  * Entering the next AP_STACK object pushes more onto the stack until we
3490  * reach the last AP_STACK object - at which point the stack should look
3491  * exactly as it did when we killed the TSO and we can continue
3492  * execution by entering the closure on top of the stack.
3493  *
3494  * We can also kill a thread entirely - this happens if either (a) the 
3495  * exception passed to raiseAsync is NULL, or (b) there's no
3496  * CATCH_FRAME on the stack.  In either case, we strip the entire
3497  * stack and replace the thread with a zombie.
3498  *
3499  * Locks: sched_mutex held upon entry nor exit.
3500  *
3501  * -------------------------------------------------------------------------- */
3502  
3503 void 
3504 deleteThread(StgTSO *tso)
3505 {
3506   raiseAsync(tso,NULL);
3507 }
3508
3509 #ifdef THREADED_RTS
3510 static void 
3511 deleteThreadImmediately(StgTSO *tso)
3512 { // for forkProcess only:
3513   // delete thread without giving it a chance to catch the KillThread exception
3514
3515   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3516       return;
3517   }
3518 #if defined(RTS_SUPPORTS_THREADS)
3519   if (tso->why_blocked != BlockedOnCCall
3520       && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
3521 #endif
3522     unblockThread(tso);
3523   tso->what_next = ThreadKilled;
3524 }
3525 #endif
3526
3527 void
3528 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3529 {
3530   /* When raising async exs from contexts where sched_mutex isn't held;
3531      use raiseAsyncWithLock(). */
3532   ACQUIRE_LOCK(&sched_mutex);
3533   raiseAsync(tso,exception);
3534   RELEASE_LOCK(&sched_mutex);
3535 }
3536
3537 void
3538 raiseAsync(StgTSO *tso, StgClosure *exception)
3539 {
3540     StgRetInfoTable *info;
3541     StgPtr sp;
3542   
3543     // Thread already dead?
3544     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3545         return;
3546     }
3547
3548     IF_DEBUG(scheduler, 
3549              sched_belch("raising exception in thread %ld.", tso->id));
3550     
3551     // Remove it from any blocking queues
3552     unblockThread(tso);
3553
3554     sp = tso->sp;
3555     
3556     // The stack freezing code assumes there's a closure pointer on
3557     // the top of the stack, so we have to arrange that this is the case...
3558     //
3559     if (sp[0] == (W_)&stg_enter_info) {
3560         sp++;
3561     } else {
3562         sp--;
3563         sp[0] = (W_)&stg_dummy_ret_closure;
3564     }
3565
3566     while (1) {
3567         nat i;
3568
3569         // 1. Let the top of the stack be the "current closure"
3570         //
3571         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3572         // CATCH_FRAME.
3573         //
3574         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3575         // current closure applied to the chunk of stack up to (but not
3576         // including) the update frame.  This closure becomes the "current
3577         // closure".  Go back to step 2.
3578         //
3579         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3580         // top of the stack applied to the exception.
3581         // 
3582         // 5. If it's a STOP_FRAME, then kill the thread.
3583         
3584         StgPtr frame;
3585         
3586         frame = sp + 1;
3587         info = get_ret_itbl((StgClosure *)frame);
3588         
3589         while (info->i.type != UPDATE_FRAME
3590                && (info->i.type != CATCH_FRAME || exception == NULL)
3591                && info->i.type != STOP_FRAME) {
3592             frame += stack_frame_sizeW((StgClosure *)frame);
3593             info = get_ret_itbl((StgClosure *)frame);
3594         }
3595         
3596         switch (info->i.type) {
3597             
3598         case CATCH_FRAME:
3599             // If we find a CATCH_FRAME, and we've got an exception to raise,
3600             // then build the THUNK raise(exception), and leave it on
3601             // top of the CATCH_FRAME ready to enter.
3602             //
3603         {
3604 #ifdef PROFILING
3605             StgCatchFrame *cf = (StgCatchFrame *)frame;
3606 #endif
3607             StgClosure *raise;
3608             
3609             // we've got an exception to raise, so let's pass it to the
3610             // handler in this frame.
3611             //
3612             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3613             TICK_ALLOC_SE_THK(1,0);
3614             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3615             raise->payload[0] = exception;
3616             
3617             // throw away the stack from Sp up to the CATCH_FRAME.
3618             //
3619             sp = frame - 1;
3620             
3621             /* Ensure that async excpetions are blocked now, so we don't get
3622              * a surprise exception before we get around to executing the
3623              * handler.
3624              */
3625             if (tso->blocked_exceptions == NULL) {
3626                 tso->blocked_exceptions = END_TSO_QUEUE;
3627             }
3628             
3629             /* Put the newly-built THUNK on top of the stack, ready to execute
3630              * when the thread restarts.
3631              */
3632             sp[0] = (W_)raise;
3633             sp[-1] = (W_)&stg_enter_info;
3634             tso->sp = sp-1;
3635             tso->what_next = ThreadRunGHC;
3636             IF_DEBUG(sanity, checkTSO(tso));
3637             return;
3638         }
3639         
3640         case UPDATE_FRAME:
3641         {
3642             StgAP_STACK * ap;
3643             nat words;
3644             
3645             // First build an AP_STACK consisting of the stack chunk above the
3646             // current update frame, with the top word on the stack as the
3647             // fun field.
3648             //
3649             words = frame - sp - 1;
3650             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3651             
3652             ap->size = words;
3653             ap->fun  = (StgClosure *)sp[0];
3654             sp++;
3655             for(i=0; i < (nat)words; ++i) {
3656                 ap->payload[i] = (StgClosure *)*sp++;
3657             }
3658             
3659             SET_HDR(ap,&stg_AP_STACK_info,
3660                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3661             TICK_ALLOC_UP_THK(words+1,0);
3662             
3663             IF_DEBUG(scheduler,
3664                      fprintf(stderr,  "scheduler: Updating ");
3665                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3666                      fprintf(stderr,  " with ");
3667                      printObj((StgClosure *)ap);
3668                 );
3669
3670             // Replace the updatee with an indirection - happily
3671             // this will also wake up any threads currently
3672             // waiting on the result.
3673             //
3674             // Warning: if we're in a loop, more than one update frame on
3675             // the stack may point to the same object.  Be careful not to
3676             // overwrite an IND_OLDGEN in this case, because we'll screw
3677             // up the mutable lists.  To be on the safe side, don't
3678             // overwrite any kind of indirection at all.  See also
3679             // threadSqueezeStack in GC.c, where we have to make a similar
3680             // check.
3681             //
3682             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3683                 // revert the black hole
3684                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3685             }
3686             sp += sizeofW(StgUpdateFrame) - 1;
3687             sp[0] = (W_)ap; // push onto stack
3688             break;
3689         }
3690         
3691         case STOP_FRAME:
3692             // We've stripped the entire stack, the thread is now dead.
3693             sp += sizeofW(StgStopFrame);
3694             tso->what_next = ThreadKilled;
3695             tso->sp = sp;
3696             return;
3697             
3698         default:
3699             barf("raiseAsync");
3700         }
3701     }
3702     barf("raiseAsync");
3703 }
3704
3705 /* -----------------------------------------------------------------------------
3706    resurrectThreads is called after garbage collection on the list of
3707    threads found to be garbage.  Each of these threads will be woken
3708    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3709    on an MVar, or NonTermination if the thread was blocked on a Black
3710    Hole.
3711
3712    Locks: sched_mutex isn't held upon entry nor exit.
3713    -------------------------------------------------------------------------- */
3714
3715 void
3716 resurrectThreads( StgTSO *threads )
3717 {
3718   StgTSO *tso, *next;
3719
3720   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3721     next = tso->global_link;
3722     tso->global_link = all_threads;
3723     all_threads = tso;
3724     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3725
3726     switch (tso->why_blocked) {
3727     case BlockedOnMVar:
3728     case BlockedOnException:
3729       /* Called by GC - sched_mutex lock is currently held. */
3730       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3731       break;
3732     case BlockedOnBlackHole:
3733       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3734       break;
3735     case NotBlocked:
3736       /* This might happen if the thread was blocked on a black hole
3737        * belonging to a thread that we've just woken up (raiseAsync
3738        * can wake up threads, remember...).
3739        */
3740       continue;
3741     default:
3742       barf("resurrectThreads: thread blocked in a strange way");
3743     }
3744   }
3745 }
3746
3747 /* -----------------------------------------------------------------------------
3748  * Blackhole detection: if we reach a deadlock, test whether any
3749  * threads are blocked on themselves.  Any threads which are found to
3750  * be self-blocked get sent a NonTermination exception.
3751  *
3752  * This is only done in a deadlock situation in order to avoid
3753  * performance overhead in the normal case.
3754  *
3755  * Locks: sched_mutex is held upon entry and exit.
3756  * -------------------------------------------------------------------------- */
3757
3758 static void
3759 detectBlackHoles( void )
3760 {
3761     StgTSO *tso = all_threads;
3762     StgClosure *frame;
3763     StgClosure *blocked_on;
3764     StgRetInfoTable *info;
3765
3766     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3767
3768         while (tso->what_next == ThreadRelocated) {
3769             tso = tso->link;
3770             ASSERT(get_itbl(tso)->type == TSO);
3771         }
3772       
3773         if (tso->why_blocked != BlockedOnBlackHole) {
3774             continue;
3775         }
3776         blocked_on = tso->block_info.closure;
3777
3778         frame = (StgClosure *)tso->sp;
3779
3780         while(1) {
3781             info = get_ret_itbl(frame);
3782             switch (info->i.type) {
3783             case UPDATE_FRAME:
3784                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3785                     /* We are blocking on one of our own computations, so
3786                      * send this thread the NonTermination exception.  
3787                      */
3788                     IF_DEBUG(scheduler, 
3789                              sched_belch("thread %d is blocked on itself", tso->id));
3790                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3791                     goto done;
3792                 }
3793                 
3794                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3795                 continue;
3796
3797             case STOP_FRAME:
3798                 goto done;
3799
3800                 // normal stack frames; do nothing except advance the pointer
3801             default:
3802                 (StgPtr)frame += stack_frame_sizeW(frame);
3803             }
3804         }   
3805         done: ;
3806     }
3807 }
3808
3809 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3810 //@subsection Debugging Routines
3811
3812 /* -----------------------------------------------------------------------------
3813  * Debugging: why is a thread blocked
3814  * [Also provides useful information when debugging threaded programs
3815  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3816    -------------------------------------------------------------------------- */
3817
3818 static
3819 void
3820 printThreadBlockage(StgTSO *tso)
3821 {
3822   switch (tso->why_blocked) {
3823   case BlockedOnRead:
3824     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3825     break;
3826   case BlockedOnWrite:
3827     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3828     break;
3829 #if defined(mingw32_TARGET_OS)
3830     case BlockedOnDoProc:
3831     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3832     break;
3833 #endif
3834   case BlockedOnDelay:
3835     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3836     break;
3837   case BlockedOnMVar:
3838     fprintf(stderr,"is blocked on an MVar");
3839     break;
3840   case BlockedOnException:
3841     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3842             tso->block_info.tso->id);
3843     break;
3844   case BlockedOnBlackHole:
3845     fprintf(stderr,"is blocked on a black hole");
3846     break;
3847   case NotBlocked:
3848     fprintf(stderr,"is not blocked");
3849     break;
3850 #if defined(PAR)
3851   case BlockedOnGA:
3852     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3853             tso->block_info.closure, info_type(tso->block_info.closure));
3854     break;
3855   case BlockedOnGA_NoSend:
3856     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3857             tso->block_info.closure, info_type(tso->block_info.closure));
3858     break;
3859 #endif
3860 #if defined(RTS_SUPPORTS_THREADS)
3861   case BlockedOnCCall:
3862     fprintf(stderr,"is blocked on an external call");
3863     break;
3864   case BlockedOnCCall_NoUnblockExc:
3865     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3866     break;
3867 #endif
3868   default:
3869     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3870          tso->why_blocked, tso->id, tso);
3871   }
3872 }
3873
3874 static
3875 void
3876 printThreadStatus(StgTSO *tso)
3877 {
3878   switch (tso->what_next) {
3879   case ThreadKilled:
3880     fprintf(stderr,"has been killed");
3881     break;
3882   case ThreadComplete:
3883     fprintf(stderr,"has completed");
3884     break;
3885   default:
3886     printThreadBlockage(tso);
3887   }
3888 }
3889
3890 void
3891 printAllThreads(void)
3892 {
3893   StgTSO *t;
3894   void *label;
3895
3896 # if defined(GRAN)
3897   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3898   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3899                        time_string, rtsFalse/*no commas!*/);
3900
3901   fprintf(stderr, "all threads at [%s]:\n", time_string);
3902 # elif defined(PAR)
3903   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3904   ullong_format_string(CURRENT_TIME,
3905                        time_string, rtsFalse/*no commas!*/);
3906
3907   fprintf(stderr,"all threads at [%s]:\n", time_string);
3908 # else
3909   fprintf(stderr,"all threads:\n");
3910 # endif
3911
3912   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3913     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3914     label = lookupThreadLabel((StgWord)t);
3915     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3916     printThreadStatus(t);
3917     fprintf(stderr,"\n");
3918   }
3919 }
3920     
3921 #ifdef DEBUG
3922
3923 /* 
3924    Print a whole blocking queue attached to node (debugging only).
3925 */
3926 //@cindex print_bq
3927 # if defined(PAR)
3928 void 
3929 print_bq (StgClosure *node)
3930 {
3931   StgBlockingQueueElement *bqe;
3932   StgTSO *tso;
3933   rtsBool end;
3934
3935   fprintf(stderr,"## BQ of closure %p (%s): ",
3936           node, info_type(node));
3937
3938   /* should cover all closures that may have a blocking queue */
3939   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3940          get_itbl(node)->type == FETCH_ME_BQ ||
3941          get_itbl(node)->type == RBH ||
3942          get_itbl(node)->type == MVAR);
3943     
3944   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3945
3946   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3947 }
3948
3949 /* 
3950    Print a whole blocking queue starting with the element bqe.
3951 */
3952 void 
3953 print_bqe (StgBlockingQueueElement *bqe)
3954 {
3955   rtsBool end;
3956
3957   /* 
3958      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3959   */
3960   for (end = (bqe==END_BQ_QUEUE);
3961        !end; // iterate until bqe points to a CONSTR
3962        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3963        bqe = end ? END_BQ_QUEUE : bqe->link) {
3964     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3965     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3966     /* types of closures that may appear in a blocking queue */
3967     ASSERT(get_itbl(bqe)->type == TSO ||           
3968            get_itbl(bqe)->type == BLOCKED_FETCH || 
3969            get_itbl(bqe)->type == CONSTR); 
3970     /* only BQs of an RBH end with an RBH_Save closure */
3971     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3972
3973     switch (get_itbl(bqe)->type) {
3974     case TSO:
3975       fprintf(stderr," TSO %u (%x),",
3976               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3977       break;
3978     case BLOCKED_FETCH:
3979       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3980               ((StgBlockedFetch *)bqe)->node, 
3981               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3982               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3983               ((StgBlockedFetch *)bqe)->ga.weight);
3984       break;
3985     case CONSTR:
3986       fprintf(stderr," %s (IP %p),",
3987               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3988                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3989                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3990                "RBH_Save_?"), get_itbl(bqe));
3991       break;
3992     default:
3993       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3994            info_type((StgClosure *)bqe)); // , node, info_type(node));
3995       break;
3996     }
3997   } /* for */
3998   fputc('\n', stderr);
3999 }
4000 # elif defined(GRAN)
4001 void 
4002 print_bq (StgClosure *node)
4003 {
4004   StgBlockingQueueElement *bqe;
4005   PEs node_loc, tso_loc;
4006   rtsBool end;
4007
4008   /* should cover all closures that may have a blocking queue */
4009   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4010          get_itbl(node)->type == FETCH_ME_BQ ||
4011          get_itbl(node)->type == RBH);
4012     
4013   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4014   node_loc = where_is(node);
4015
4016   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
4017           node, info_type(node), node_loc);
4018
4019   /* 
4020      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4021   */
4022   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4023        !end; // iterate until bqe points to a CONSTR
4024        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4025     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4026     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4027     /* types of closures that may appear in a blocking queue */
4028     ASSERT(get_itbl(bqe)->type == TSO ||           
4029            get_itbl(bqe)->type == CONSTR); 
4030     /* only BQs of an RBH end with an RBH_Save closure */
4031     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4032
4033     tso_loc = where_is((StgClosure *)bqe);
4034     switch (get_itbl(bqe)->type) {
4035     case TSO:
4036       fprintf(stderr," TSO %d (%p) on [PE %d],",
4037               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4038       break;
4039     case CONSTR:
4040       fprintf(stderr," %s (IP %p),",
4041               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4042                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4043                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4044                "RBH_Save_?"), get_itbl(bqe));
4045       break;
4046     default:
4047       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4048            info_type((StgClosure *)bqe), node, info_type(node));
4049       break;
4050     }
4051   } /* for */
4052   fputc('\n', stderr);
4053 }
4054 #else
4055 /* 
4056    Nice and easy: only TSOs on the blocking queue
4057 */
4058 void 
4059 print_bq (StgClosure *node)
4060 {
4061   StgTSO *tso;
4062
4063   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4064   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
4065        tso != END_TSO_QUEUE; 
4066        tso=tso->link) {
4067     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
4068     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
4069     fprintf(stderr," TSO %d (%p),", tso->id, tso);
4070   }
4071   fputc('\n', stderr);
4072 }
4073 # endif
4074
4075 #if defined(PAR)
4076 static nat
4077 run_queue_len(void)
4078 {
4079   nat i;
4080   StgTSO *tso;
4081
4082   for (i=0, tso=run_queue_hd; 
4083        tso != END_TSO_QUEUE;
4084        i++, tso=tso->link)
4085     /* nothing */
4086
4087   return i;
4088 }
4089 #endif
4090
4091 static void
4092 sched_belch(char *s, ...)
4093 {
4094   va_list ap;
4095   va_start(ap,s);
4096 #ifdef SMP
4097   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
4098 #elif defined(PAR)
4099   fprintf(stderr, "== ");
4100 #else
4101   fprintf(stderr, "scheduler: ");
4102 #endif
4103   vfprintf(stderr, s, ap);
4104   fprintf(stderr, "\n");
4105   va_end(ap);
4106 }
4107
4108 #endif /* DEBUG */
4109
4110
4111 //@node Index,  , Debugging Routines, Main scheduling code
4112 //@subsection Index
4113
4114 //@index
4115 //* StgMainThread::  @cindex\s-+StgMainThread
4116 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
4117 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
4118 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
4119 //* context_switch::  @cindex\s-+context_switch
4120 //* createThread::  @cindex\s-+createThread
4121 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
4122 //* initScheduler::  @cindex\s-+initScheduler
4123 //* interrupted::  @cindex\s-+interrupted
4124 //* next_thread_id::  @cindex\s-+next_thread_id
4125 //* print_bq::  @cindex\s-+print_bq
4126 //* run_queue_hd::  @cindex\s-+run_queue_hd
4127 //* run_queue_tl::  @cindex\s-+run_queue_tl
4128 //* sched_mutex::  @cindex\s-+sched_mutex
4129 //* schedule::  @cindex\s-+schedule
4130 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
4131 //* term_mutex::  @cindex\s-+term_mutex
4132 //@end index