2754c4f96e42e3caaf7bc523d1d4887c5353b2a3
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.175 2003/09/26 13:32:14 panne Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #define COMPILING_SCHEDULER
88 #include "Schedule.h"
89 #include "StgMiscClosures.h"
90 #include "Storage.h"
91 #include "Interpreter.h"
92 #include "Exception.h"
93 #include "Printer.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Timer.h"
98 #include "Prelude.h"
99 #include "ThreadLabels.h"
100 #ifdef PROFILING
101 #include "Proftimer.h"
102 #include "ProfHeap.h"
103 #endif
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
111 # include "HLC.h"
112 #endif
113 #include "Sparks.h"
114 #include "Capability.h"
115 #include "OSThreads.h"
116 #include  "Task.h"
117
118 #ifdef HAVE_SYS_TYPES_H
119 #include <sys/types.h>
120 #endif
121 #ifdef HAVE_UNISTD_H
122 #include <unistd.h>
123 #endif
124
125 #include <string.h>
126 #include <stdlib.h>
127 #include <stdarg.h>
128
129 #ifdef HAVE_ERRNO_H
130 #include <errno.h>
131 #endif
132
133 #ifdef THREADED_RTS
134 #define USED_IN_THREADED_RTS
135 #else
136 #define USED_IN_THREADED_RTS STG_UNUSED
137 #endif
138
139 #ifdef RTS_SUPPORTS_THREADS
140 #define USED_WHEN_RTS_SUPPORTS_THREADS
141 #else
142 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
143 #endif
144
145 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
146 //@subsection Variables and Data structures
147
148 /* Main thread queue.
149  * Locks required: sched_mutex.
150  */
151 StgMainThread *main_threads = NULL;
152
153 /* Thread queues.
154  * Locks required: sched_mutex.
155  */
156 #if defined(GRAN)
157
158 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
159 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
160
161 /* 
162    In GranSim we have a runnable and a blocked queue for each processor.
163    In order to minimise code changes new arrays run_queue_hds/tls
164    are created. run_queue_hd is then a short cut (macro) for
165    run_queue_hds[CurrentProc] (see GranSim.h).
166    -- HWL
167 */
168 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
169 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
170 StgTSO *ccalling_threadss[MAX_PROC];
171 /* We use the same global list of threads (all_threads) in GranSim as in
172    the std RTS (i.e. we are cheating). However, we don't use this list in
173    the GranSim specific code at the moment (so we are only potentially
174    cheating).  */
175
176 #else /* !GRAN */
177
178 StgTSO *run_queue_hd = NULL;
179 StgTSO *run_queue_tl = NULL;
180 StgTSO *blocked_queue_hd = NULL;
181 StgTSO *blocked_queue_tl = NULL;
182 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
183
184 #endif
185
186 /* Linked list of all threads.
187  * Used for detecting garbage collected threads.
188  */
189 StgTSO *all_threads = NULL;
190
191 /* When a thread performs a safe C call (_ccall_GC, using old
192  * terminology), it gets put on the suspended_ccalling_threads
193  * list. Used by the garbage collector.
194  */
195 static StgTSO *suspended_ccalling_threads;
196
197 static StgTSO *threadStackOverflow(StgTSO *tso);
198
199 /* KH: The following two flags are shared memory locations.  There is no need
200        to lock them, since they are only unset at the end of a scheduler
201        operation.
202 */
203
204 /* flag set by signal handler to precipitate a context switch */
205 //@cindex context_switch
206 nat context_switch = 0;
207
208 /* if this flag is set as well, give up execution */
209 //@cindex interrupted
210 rtsBool interrupted = rtsFalse;
211
212 /* Next thread ID to allocate.
213  * Locks required: thread_id_mutex
214  */
215 //@cindex next_thread_id
216 static StgThreadID next_thread_id = 1;
217
218 /*
219  * Pointers to the state of the current thread.
220  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
221  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
222  */
223  
224 /* The smallest stack size that makes any sense is:
225  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
226  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
227  *  + 1                       (the closure to enter)
228  *  + 1                       (stg_ap_v_ret)
229  *  + 1                       (spare slot req'd by stg_ap_v_ret)
230  *
231  * A thread with this stack will bomb immediately with a stack
232  * overflow, which will increase its stack size.  
233  */
234
235 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
236
237
238 #if defined(GRAN)
239 StgTSO *CurrentTSO;
240 #endif
241
242 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
243  *  exists - earlier gccs apparently didn't.
244  *  -= chak
245  */
246 StgTSO dummy_tso;
247
248 static rtsBool ready_to_gc;
249
250 /*
251  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
252  * in an MT setting, needed to signal that a worker thread shouldn't hang around
253  * in the scheduler when it is out of work.
254  */
255 static rtsBool shutting_down_scheduler = rtsFalse;
256
257 void            addToBlockedQueue ( StgTSO *tso );
258
259 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
260        void     interruptStgRts   ( void );
261
262 static void     detectBlackHoles  ( void );
263
264 #ifdef DEBUG
265 static void sched_belch(char *s, ...);
266 #endif
267
268 #if defined(RTS_SUPPORTS_THREADS)
269 /* ToDo: carefully document the invariants that go together
270  *       with these synchronisation objects.
271  */
272 Mutex     sched_mutex       = INIT_MUTEX_VAR;
273 Mutex     term_mutex        = INIT_MUTEX_VAR;
274
275 /*
276  * A heavyweight solution to the problem of protecting
277  * the thread_id from concurrent update.
278  */
279 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
280
281
282 # if defined(SMP)
283 static Condition gc_pending_cond = INIT_COND_VAR;
284 nat await_death;
285 # endif
286
287 #endif /* RTS_SUPPORTS_THREADS */
288
289 #if defined(PAR)
290 StgTSO *LastTSO;
291 rtsTime TimeOfLastYield;
292 rtsBool emitSchedule = rtsTrue;
293 #endif
294
295 #if DEBUG
296 static char *whatNext_strs[] = {
297   "ThreadRunGHC",
298   "ThreadInterpret",
299   "ThreadKilled",
300   "ThreadRelocated",
301   "ThreadComplete"
302 };
303 #endif
304
305 #if defined(PAR)
306 StgTSO * createSparkThread(rtsSpark spark);
307 StgTSO * activateSpark (rtsSpark spark);  
308 #endif
309
310 /*
311  * The thread state for the main thread.
312 // ToDo: check whether not needed any more
313 StgTSO   *MainTSO;
314  */
315
316 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
317 static void taskStart(void);
318 static void
319 taskStart(void)
320 {
321   schedule(NULL,NULL);
322 }
323 #endif
324
325 #if defined(RTS_SUPPORTS_THREADS)
326 void
327 startSchedulerTask(void)
328 {
329     startTask(taskStart);
330 }
331 #endif
332
333 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
334 //@subsection Main scheduling loop
335
336 /* ---------------------------------------------------------------------------
337    Main scheduling loop.
338
339    We use round-robin scheduling, each thread returning to the
340    scheduler loop when one of these conditions is detected:
341
342       * out of heap space
343       * timer expires (thread yields)
344       * thread blocks
345       * thread ends
346       * stack overflow
347
348    Locking notes:  we acquire the scheduler lock once at the beginning
349    of the scheduler loop, and release it when
350     
351       * running a thread, or
352       * waiting for work, or
353       * waiting for a GC to complete.
354
355    GRAN version:
356      In a GranSim setup this loop iterates over the global event queue.
357      This revolves around the global event queue, which determines what 
358      to do next. Therefore, it's more complicated than either the 
359      concurrent or the parallel (GUM) setup.
360
361    GUM version:
362      GUM iterates over incoming messages.
363      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
364      and sends out a fish whenever it has nothing to do; in-between
365      doing the actual reductions (shared code below) it processes the
366      incoming messages and deals with delayed operations 
367      (see PendingFetches).
368      This is not the ugliest code you could imagine, but it's bloody close.
369
370    ------------------------------------------------------------------------ */
371 //@cindex schedule
372 static void
373 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
374           Capability *initialCapability )
375 {
376   StgTSO *t;
377   Capability *cap = initialCapability;
378   StgThreadReturnCode ret;
379 #if defined(GRAN)
380   rtsEvent *event;
381 #elif defined(PAR)
382   StgSparkPool *pool;
383   rtsSpark spark;
384   StgTSO *tso;
385   GlobalTaskId pe;
386   rtsBool receivedFinish = rtsFalse;
387 # if defined(DEBUG)
388   nat tp_size, sp_size; // stats only
389 # endif
390 #endif
391   rtsBool was_interrupted = rtsFalse;
392   StgTSOWhatNext prev_what_next;
393   
394   ACQUIRE_LOCK(&sched_mutex);
395  
396 #if defined(RTS_SUPPORTS_THREADS)
397   /* in the threaded case, the capability is either passed in via the initialCapability
398      parameter, or initialized inside the scheduler loop */
399
400   IF_DEBUG(scheduler,
401     fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n",
402             osThreadId(), osThreadId()));
403   IF_DEBUG(scheduler,
404     fprintf(stderr,"### main thread: %p\n",mainThread));
405   IF_DEBUG(scheduler,
406     fprintf(stderr,"### initial cap: %p\n",initialCapability));
407 #else
408   /* simply initialise it in the non-threaded case */
409   grabCapability(&cap);
410 #endif
411
412 #if defined(GRAN)
413   /* set up first event to get things going */
414   /* ToDo: assign costs for system setup and init MainTSO ! */
415   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
416             ContinueThread, 
417             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
418
419   IF_DEBUG(gran,
420            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
421            G_TSO(CurrentTSO, 5));
422
423   if (RtsFlags.GranFlags.Light) {
424     /* Save current time; GranSim Light only */
425     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
426   }      
427
428   event = get_next_event();
429
430   while (event!=(rtsEvent*)NULL) {
431     /* Choose the processor with the next event */
432     CurrentProc = event->proc;
433     CurrentTSO = event->tso;
434
435 #elif defined(PAR)
436
437   while (!receivedFinish) {    /* set by processMessages */
438                                /* when receiving PP_FINISH message         */ 
439 #else
440
441   while (1) {
442
443 #endif
444
445     IF_DEBUG(scheduler, printAllThreads());
446
447 #if defined(RTS_SUPPORTS_THREADS)
448     /* Check to see whether there are any worker threads
449        waiting to deposit external call results. If so,
450        yield our capability... if we have a capability, that is. */
451     if(cap)
452       yieldToReturningWorker(&sched_mutex, &cap,
453           mainThread ? &mainThread->bound_thread_cond : NULL);
454
455     /* If we do not currently hold a capability, we wait for one */
456     if(!cap)
457     {
458       waitForWorkCapability(&sched_mutex, &cap,
459           mainThread ? &mainThread->bound_thread_cond : NULL);
460       IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap",
461                                       osThreadId()));
462     }
463 #endif
464
465     /* If we're interrupted (the user pressed ^C, or some other
466      * termination condition occurred), kill all the currently running
467      * threads.
468      */
469     if (interrupted) {
470       IF_DEBUG(scheduler, sched_belch("interrupted"));
471       interrupted = rtsFalse;
472       was_interrupted = rtsTrue;
473 #if defined(RTS_SUPPORTS_THREADS)
474       // In the threaded RTS, deadlock detection doesn't work,
475       // so just exit right away.
476       prog_belch("interrupted");
477       releaseCapability(cap);
478       startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
479       RELEASE_LOCK(&sched_mutex);
480       shutdownHaskellAndExit(EXIT_SUCCESS);
481 #else
482       deleteAllThreads();
483 #endif
484     }
485
486     /* Go through the list of main threads and wake up any
487      * clients whose computations have finished.  ToDo: this
488      * should be done more efficiently without a linear scan
489      * of the main threads list, somehow...
490      */
491 #if defined(RTS_SUPPORTS_THREADS)
492     { 
493         StgMainThread *m, **prev;
494         prev = &main_threads;
495         for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
496           if (m->tso->what_next == ThreadComplete
497               || m->tso->what_next == ThreadKilled)
498           {
499             if(m == mainThread)
500             {
501               if(m->tso->what_next == ThreadComplete)
502               {
503                 if (m->ret)
504                 {
505                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
506                   *(m->ret) = (StgClosure *)m->tso->sp[1]; 
507                 }
508                 m->stat = Success;
509               }
510               else
511               {
512                 if (m->ret)
513                 {
514                   *(m->ret) = NULL;
515                 }
516                 if (was_interrupted)
517                 {
518                   m->stat = Interrupted;
519                 }
520                 else
521                 {
522                   m->stat = Killed;
523                 }
524               }
525               *prev = m->link;
526             
527 #ifdef DEBUG
528               removeThreadLabel((StgWord)m->tso);
529 #endif
530               releaseCapability(cap);
531               RELEASE_LOCK(&sched_mutex);
532               return;
533             }
534             else
535             {
536                 // The current OS thread can not handle the fact that the Haskell
537                 // thread "m" has ended. 
538                 // "m" is bound; the scheduler loop in it's bound OS thread has
539                 // to return, so let's pass our capability directly to that thread.
540               passCapability(&sched_mutex, cap, &m->bound_thread_cond);
541               cap = NULL;
542             }
543           }
544         }
545     }
546     
547     if(!cap)    // If we gave our capability away,
548       continue; // go to the top to get it back
549       
550 #else /* not threaded */
551
552 # if defined(PAR)
553     /* in GUM do this only on the Main PE */
554     if (IAmMainThread)
555 # endif
556     /* If our main thread has finished or been killed, return.
557      */
558     {
559       StgMainThread *m = main_threads;
560       if (m->tso->what_next == ThreadComplete
561           || m->tso->what_next == ThreadKilled) {
562 #ifdef DEBUG
563         removeThreadLabel((StgWord)m->tso);
564 #endif
565         main_threads = main_threads->link;
566         if (m->tso->what_next == ThreadComplete) {
567             // We finished successfully, fill in the return value
568             // NOTE: return val is tso->sp[1] (see StgStartup.hc)
569             if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
570             m->stat = Success;
571             return;
572         } else {
573           if (m->ret) { *(m->ret) = NULL; };
574           if (was_interrupted) {
575             m->stat = Interrupted;
576           } else {
577             m->stat = Killed;
578           }
579           return;
580         }
581       }
582     }
583 #endif
584
585     /* Top up the run queue from our spark pool.  We try to make the
586      * number of threads in the run queue equal to the number of
587      * free capabilities.
588      *
589      * Disable spark support in SMP for now, non-essential & requires
590      * a little bit of work to make it compile cleanly. -- sof 1/02.
591      */
592 #if 0 /* defined(SMP) */
593     {
594       nat n = getFreeCapabilities();
595       StgTSO *tso = run_queue_hd;
596
597       /* Count the run queue */
598       while (n > 0 && tso != END_TSO_QUEUE) {
599         tso = tso->link;
600         n--;
601       }
602
603       for (; n > 0; n--) {
604         StgClosure *spark;
605         spark = findSpark(rtsFalse);
606         if (spark == NULL) {
607           break; /* no more sparks in the pool */
608         } else {
609           /* I'd prefer this to be done in activateSpark -- HWL */
610           /* tricky - it needs to hold the scheduler lock and
611            * not try to re-acquire it -- SDM */
612           createSparkThread(spark);       
613           IF_DEBUG(scheduler,
614                    sched_belch("==^^ turning spark of closure %p into a thread",
615                                (StgClosure *)spark));
616         }
617       }
618       /* We need to wake up the other tasks if we just created some
619        * work for them.
620        */
621       if (getFreeCapabilities() - n > 1) {
622           signalCondition( &thread_ready_cond );
623       }
624     }
625 #endif // SMP
626
627     /* check for signals each time around the scheduler */
628 #if defined(RTS_USER_SIGNALS)
629     if (signals_pending()) {
630       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
631       startSignalHandlers();
632       ACQUIRE_LOCK(&sched_mutex);
633     }
634 #endif
635
636     /* Check whether any waiting threads need to be woken up.  If the
637      * run queue is empty, and there are no other tasks running, we
638      * can wait indefinitely for something to happen.
639      */
640     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
641 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
642                 || EMPTY_RUN_QUEUE()
643 #endif
644         )
645     {
646       awaitEvent( EMPTY_RUN_QUEUE()
647 #if defined(SMP)
648         && allFreeCapabilities()
649 #endif
650         );
651     }
652     /* we can be interrupted while waiting for I/O... */
653     if (interrupted) continue;
654
655     /* 
656      * Detect deadlock: when we have no threads to run, there are no
657      * threads waiting on I/O or sleeping, and all the other tasks are
658      * waiting for work, we must have a deadlock of some description.
659      *
660      * We first try to find threads blocked on themselves (ie. black
661      * holes), and generate NonTermination exceptions where necessary.
662      *
663      * If no threads are black holed, we have a deadlock situation, so
664      * inform all the main threads.
665      */
666 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
667     if (   EMPTY_THREAD_QUEUES()
668 #if defined(RTS_SUPPORTS_THREADS)
669         && EMPTY_QUEUE(suspended_ccalling_threads)
670 #endif
671 #ifdef SMP
672         && allFreeCapabilities()
673 #endif
674         )
675     {
676         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
677 #if defined(THREADED_RTS)
678         /* and SMP mode ..? */
679         releaseCapability(cap);
680 #endif
681         // Garbage collection can release some new threads due to
682         // either (a) finalizers or (b) threads resurrected because
683         // they are about to be send BlockedOnDeadMVar.  Any threads
684         // thus released will be immediately runnable.
685         GarbageCollect(GetRoots,rtsTrue);
686
687         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
688
689         IF_DEBUG(scheduler, 
690                  sched_belch("still deadlocked, checking for black holes..."));
691         detectBlackHoles();
692
693         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
694
695 #if defined(RTS_USER_SIGNALS)
696         /* If we have user-installed signal handlers, then wait
697          * for signals to arrive rather then bombing out with a
698          * deadlock.
699          */
700 #if defined(RTS_SUPPORTS_THREADS)
701         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
702                       a signal with no runnable threads (or I/O
703                       suspended ones) leads nowhere quick.
704                       For now, simply shut down when we reach this
705                       condition.
706                       
707                       ToDo: define precisely under what conditions
708                       the Scheduler should shut down in an MT setting.
709                    */
710 #else
711         if ( anyUserHandlers() ) {
712 #endif
713             IF_DEBUG(scheduler, 
714                      sched_belch("still deadlocked, waiting for signals..."));
715
716             awaitUserSignals();
717
718             // we might be interrupted...
719             if (interrupted) { continue; }
720
721             if (signals_pending()) {
722                 RELEASE_LOCK(&sched_mutex);
723                 startSignalHandlers();
724                 ACQUIRE_LOCK(&sched_mutex);
725             }
726             ASSERT(!EMPTY_RUN_QUEUE());
727             goto not_deadlocked;
728         }
729 #endif
730
731         /* Probably a real deadlock.  Send the current main thread the
732          * Deadlock exception (or in the SMP build, send *all* main
733          * threads the deadlock exception, since none of them can make
734          * progress).
735          */
736         {
737             StgMainThread *m;
738 #if defined(RTS_SUPPORTS_THREADS)
739             for (m = main_threads; m != NULL; m = m->link) {
740                 switch (m->tso->why_blocked) {
741                 case BlockedOnBlackHole:
742                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
743                     break;
744                 case BlockedOnException:
745                 case BlockedOnMVar:
746                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
747                     break;
748                 default:
749                     barf("deadlock: main thread blocked in a strange way");
750                 }
751             }
752 #else
753             m = main_threads;
754             switch (m->tso->why_blocked) {
755             case BlockedOnBlackHole:
756                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
757                 break;
758             case BlockedOnException:
759             case BlockedOnMVar:
760                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
761                 break;
762             default:
763                 barf("deadlock: main thread blocked in a strange way");
764             }
765 #endif
766         }
767
768 #if defined(RTS_SUPPORTS_THREADS)
769         /* ToDo: revisit conditions (and mechanism) for shutting
770            down a multi-threaded world  */
771         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
772         RELEASE_LOCK(&sched_mutex);
773         shutdownHaskell();
774         return;
775 #endif
776     }
777   not_deadlocked:
778
779 #elif defined(RTS_SUPPORTS_THREADS)
780     /* ToDo: add deadlock detection in threaded RTS */
781 #elif defined(PAR)
782     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
783 #endif
784
785 #if defined(SMP)
786     /* If there's a GC pending, don't do anything until it has
787      * completed.
788      */
789     if (ready_to_gc) {
790       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
791       waitCondition( &gc_pending_cond, &sched_mutex );
792     }
793 #endif    
794
795 #if defined(RTS_SUPPORTS_THREADS)
796 #if defined(SMP)
797     /* block until we've got a thread on the run queue and a free
798      * capability.
799      *
800      */
801     if ( EMPTY_RUN_QUEUE() ) {
802       /* Give up our capability */
803       releaseCapability(cap);
804
805       /* If we're in the process of shutting down (& running the
806        * a batch of finalisers), don't wait around.
807        */
808       if ( shutting_down_scheduler ) {
809         RELEASE_LOCK(&sched_mutex);
810         return;
811       }
812       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
813       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
814       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
815     }
816 #else
817     if ( EMPTY_RUN_QUEUE() ) {
818       continue; // nothing to do
819     }
820 #endif
821 #endif
822
823 #if defined(GRAN)
824     if (RtsFlags.GranFlags.Light)
825       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
826
827     /* adjust time based on time-stamp */
828     if (event->time > CurrentTime[CurrentProc] &&
829         event->evttype != ContinueThread)
830       CurrentTime[CurrentProc] = event->time;
831     
832     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
833     if (!RtsFlags.GranFlags.Light)
834       handleIdlePEs();
835
836     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
837
838     /* main event dispatcher in GranSim */
839     switch (event->evttype) {
840       /* Should just be continuing execution */
841     case ContinueThread:
842       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
843       /* ToDo: check assertion
844       ASSERT(run_queue_hd != (StgTSO*)NULL &&
845              run_queue_hd != END_TSO_QUEUE);
846       */
847       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
848       if (!RtsFlags.GranFlags.DoAsyncFetch &&
849           procStatus[CurrentProc]==Fetching) {
850         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
851               CurrentTSO->id, CurrentTSO, CurrentProc);
852         goto next_thread;
853       } 
854       /* Ignore ContinueThreads for completed threads */
855       if (CurrentTSO->what_next == ThreadComplete) {
856         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
857               CurrentTSO->id, CurrentTSO, CurrentProc);
858         goto next_thread;
859       } 
860       /* Ignore ContinueThreads for threads that are being migrated */
861       if (PROCS(CurrentTSO)==Nowhere) { 
862         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
863               CurrentTSO->id, CurrentTSO, CurrentProc);
864         goto next_thread;
865       }
866       /* The thread should be at the beginning of the run queue */
867       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
868         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
869               CurrentTSO->id, CurrentTSO, CurrentProc);
870         break; // run the thread anyway
871       }
872       /*
873       new_event(proc, proc, CurrentTime[proc],
874                 FindWork,
875                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
876       goto next_thread; 
877       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
878       break; // now actually run the thread; DaH Qu'vam yImuHbej 
879
880     case FetchNode:
881       do_the_fetchnode(event);
882       goto next_thread;             /* handle next event in event queue  */
883       
884     case GlobalBlock:
885       do_the_globalblock(event);
886       goto next_thread;             /* handle next event in event queue  */
887       
888     case FetchReply:
889       do_the_fetchreply(event);
890       goto next_thread;             /* handle next event in event queue  */
891       
892     case UnblockThread:   /* Move from the blocked queue to the tail of */
893       do_the_unblock(event);
894       goto next_thread;             /* handle next event in event queue  */
895       
896     case ResumeThread:  /* Move from the blocked queue to the tail of */
897       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
898       event->tso->gran.blocktime += 
899         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
900       do_the_startthread(event);
901       goto next_thread;             /* handle next event in event queue  */
902       
903     case StartThread:
904       do_the_startthread(event);
905       goto next_thread;             /* handle next event in event queue  */
906       
907     case MoveThread:
908       do_the_movethread(event);
909       goto next_thread;             /* handle next event in event queue  */
910       
911     case MoveSpark:
912       do_the_movespark(event);
913       goto next_thread;             /* handle next event in event queue  */
914       
915     case FindWork:
916       do_the_findwork(event);
917       goto next_thread;             /* handle next event in event queue  */
918       
919     default:
920       barf("Illegal event type %u\n", event->evttype);
921     }  /* switch */
922     
923     /* This point was scheduler_loop in the old RTS */
924
925     IF_DEBUG(gran, belch("GRAN: after main switch"));
926
927     TimeOfLastEvent = CurrentTime[CurrentProc];
928     TimeOfNextEvent = get_time_of_next_event();
929     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
930     // CurrentTSO = ThreadQueueHd;
931
932     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
933                          TimeOfNextEvent));
934
935     if (RtsFlags.GranFlags.Light) 
936       GranSimLight_leave_system(event, &ActiveTSO); 
937
938     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
939
940     IF_DEBUG(gran, 
941              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
942
943     /* in a GranSim setup the TSO stays on the run queue */
944     t = CurrentTSO;
945     /* Take a thread from the run queue. */
946     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
947
948     IF_DEBUG(gran, 
949              fprintf(stderr, "GRAN: About to run current thread, which is\n");
950              G_TSO(t,5));
951
952     context_switch = 0; // turned on via GranYield, checking events and time slice
953
954     IF_DEBUG(gran, 
955              DumpGranEvent(GR_SCHEDULE, t));
956
957     procStatus[CurrentProc] = Busy;
958
959 #elif defined(PAR)
960     if (PendingFetches != END_BF_QUEUE) {
961         processFetches();
962     }
963
964     /* ToDo: phps merge with spark activation above */
965     /* check whether we have local work and send requests if we have none */
966     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
967       /* :-[  no local threads => look out for local sparks */
968       /* the spark pool for the current PE */
969       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
970       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
971           pool->hd < pool->tl) {
972         /* 
973          * ToDo: add GC code check that we really have enough heap afterwards!!
974          * Old comment:
975          * If we're here (no runnable threads) and we have pending
976          * sparks, we must have a space problem.  Get enough space
977          * to turn one of those pending sparks into a
978          * thread... 
979          */
980
981         spark = findSpark(rtsFalse);                /* get a spark */
982         if (spark != (rtsSpark) NULL) {
983           tso = activateSpark(spark);       /* turn the spark into a thread */
984           IF_PAR_DEBUG(schedule,
985                        belch("==== schedule: Created TSO %d (%p); %d threads active",
986                              tso->id, tso, advisory_thread_count));
987
988           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
989             belch("==^^ failed to activate spark");
990             goto next_thread;
991           }               /* otherwise fall through & pick-up new tso */
992         } else {
993           IF_PAR_DEBUG(verbose,
994                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
995                              spark_queue_len(pool)));
996           goto next_thread;
997         }
998       }
999
1000       /* If we still have no work we need to send a FISH to get a spark
1001          from another PE 
1002       */
1003       if (EMPTY_RUN_QUEUE()) {
1004       /* =8-[  no local sparks => look for work on other PEs */
1005         /*
1006          * We really have absolutely no work.  Send out a fish
1007          * (there may be some out there already), and wait for
1008          * something to arrive.  We clearly can't run any threads
1009          * until a SCHEDULE or RESUME arrives, and so that's what
1010          * we're hoping to see.  (Of course, we still have to
1011          * respond to other types of messages.)
1012          */
1013         TIME now = msTime() /*CURRENT_TIME*/;
1014         IF_PAR_DEBUG(verbose, 
1015                      belch("--  now=%ld", now));
1016         IF_PAR_DEBUG(verbose,
1017                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1018                          (last_fish_arrived_at!=0 &&
1019                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
1020                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
1021                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
1022                              last_fish_arrived_at,
1023                              RtsFlags.ParFlags.fishDelay, now);
1024                      });
1025         
1026         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1027             (last_fish_arrived_at==0 ||
1028              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
1029           /* outstandingFishes is set in sendFish, processFish;
1030              avoid flooding system with fishes via delay */
1031           pe = choosePE();
1032           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1033                    NEW_FISH_HUNGER);
1034
1035           // Global statistics: count no. of fishes
1036           if (RtsFlags.ParFlags.ParStats.Global &&
1037               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1038             globalParStats.tot_fish_mess++;
1039           }
1040         }
1041       
1042         receivedFinish = processMessages();
1043         goto next_thread;
1044       }
1045     } else if (PacketsWaiting()) {  /* Look for incoming messages */
1046       receivedFinish = processMessages();
1047     }
1048
1049     /* Now we are sure that we have some work available */
1050     ASSERT(run_queue_hd != END_TSO_QUEUE);
1051
1052     /* Take a thread from the run queue, if we have work */
1053     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
1054     IF_DEBUG(sanity,checkTSO(t));
1055
1056     /* ToDo: write something to the log-file
1057     if (RTSflags.ParFlags.granSimStats && !sameThread)
1058         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1059
1060     CurrentTSO = t;
1061     */
1062     /* the spark pool for the current PE */
1063     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1064
1065     IF_DEBUG(scheduler, 
1066              belch("--=^ %d threads, %d sparks on [%#x]", 
1067                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1068
1069 # if 1
1070     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1071         t && LastTSO && t->id != LastTSO->id && 
1072         LastTSO->why_blocked == NotBlocked && 
1073         LastTSO->what_next != ThreadComplete) {
1074       // if previously scheduled TSO not blocked we have to record the context switch
1075       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1076                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1077     }
1078
1079     if (RtsFlags.ParFlags.ParStats.Full && 
1080         (emitSchedule /* forced emit */ ||
1081         (t && LastTSO && t->id != LastTSO->id))) {
1082       /* 
1083          we are running a different TSO, so write a schedule event to log file
1084          NB: If we use fair scheduling we also have to write  a deschedule 
1085              event for LastTSO; with unfair scheduling we know that the
1086              previous tso has blocked whenever we switch to another tso, so
1087              we don't need it in GUM for now
1088       */
1089       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1090                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1091       emitSchedule = rtsFalse;
1092     }
1093      
1094 # endif
1095 #else /* !GRAN && !PAR */
1096   
1097     /* grab a thread from the run queue */
1098     ASSERT(run_queue_hd != END_TSO_QUEUE);
1099     t = POP_RUN_QUEUE();
1100     // Sanity check the thread we're about to run.  This can be
1101     // expensive if there is lots of thread switching going on...
1102     IF_DEBUG(sanity,checkTSO(t));
1103 #endif
1104
1105 #ifdef THREADED_RTS
1106     {
1107       StgMainThread *m;
1108       for(m = main_threads; m; m = m->link)
1109       {
1110         if(m->tso == t)
1111           break;
1112       }
1113       
1114       if(m)
1115       {
1116         if(m == mainThread)
1117         {
1118           IF_DEBUG(scheduler,
1119             fprintf(stderr,"### Running TSO %p in bound OS thread %u\n",
1120                     t, osThreadId()));
1121           // yes, the Haskell thread is bound to the current native thread
1122         }
1123         else
1124         {
1125           IF_DEBUG(scheduler,
1126             fprintf(stderr,"### TSO %p bound to other OS thread than %u\n",
1127                     t, osThreadId()));
1128           // no, bound to a different Haskell thread: pass to that thread
1129           PUSH_ON_RUN_QUEUE(t);
1130           passCapability(&sched_mutex,cap,&m->bound_thread_cond);
1131           cap = NULL;
1132           continue;
1133         }
1134       }
1135       else
1136       {
1137         // The thread we want to run is not bound.
1138         if(mainThread == NULL)
1139         {
1140           IF_DEBUG(scheduler,
1141             fprintf(stderr,"### Running TSO %p in worker OS thread %u\n",
1142                     t, osThreadId()));
1143           // if we are a worker thread,
1144           // we may run it here
1145         }
1146         else
1147         {
1148           IF_DEBUG(scheduler,
1149             fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n",
1150                     t, mainThread, osThreadId()));
1151           // no, the current native thread is bound to a different
1152           // Haskell thread, so pass it to any worker thread
1153           PUSH_ON_RUN_QUEUE(t);
1154           releaseCapability(cap);
1155           cap = NULL;
1156           continue; 
1157         }
1158       }
1159     }
1160 #endif
1161
1162     cap->r.rCurrentTSO = t;
1163     
1164     /* context switches are now initiated by the timer signal, unless
1165      * the user specified "context switch as often as possible", with
1166      * +RTS -C0
1167      */
1168     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1169          && (run_queue_hd != END_TSO_QUEUE
1170              || blocked_queue_hd != END_TSO_QUEUE
1171              || sleeping_queue != END_TSO_QUEUE)))
1172         context_switch = 1;
1173     else
1174         context_switch = 0;
1175
1176 run_thread:
1177
1178     RELEASE_LOCK(&sched_mutex);
1179
1180     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
1181                               t->id, whatNext_strs[t->what_next]));
1182
1183 #ifdef PROFILING
1184     startHeapProfTimer();
1185 #endif
1186
1187     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1188     /* Run the current thread 
1189      */
1190     prev_what_next = t->what_next;
1191     switch (prev_what_next) {
1192     case ThreadKilled:
1193     case ThreadComplete:
1194         /* Thread already finished, return to scheduler. */
1195         ret = ThreadFinished;
1196         break;
1197     case ThreadRunGHC:
1198         errno = t->saved_errno;
1199         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1200         t->saved_errno = errno;
1201         break;
1202     case ThreadInterpret:
1203         ret = interpretBCO(cap);
1204         break;
1205     default:
1206       barf("schedule: invalid what_next field");
1207     }
1208     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1209     
1210     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1211 #ifdef PROFILING
1212     stopHeapProfTimer();
1213     CCCS = CCS_SYSTEM;
1214 #endif
1215     
1216     ACQUIRE_LOCK(&sched_mutex);
1217     
1218 #ifdef RTS_SUPPORTS_THREADS
1219     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId()););
1220 #elif !defined(GRAN) && !defined(PAR)
1221     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1222 #endif
1223     t = cap->r.rCurrentTSO;
1224     
1225 #if defined(PAR)
1226     /* HACK 675: if the last thread didn't yield, make sure to print a 
1227        SCHEDULE event to the log file when StgRunning the next thread, even
1228        if it is the same one as before */
1229     LastTSO = t; 
1230     TimeOfLastYield = CURRENT_TIME;
1231 #endif
1232
1233     switch (ret) {
1234     case HeapOverflow:
1235 #if defined(GRAN)
1236       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1237       globalGranStats.tot_heapover++;
1238 #elif defined(PAR)
1239       globalParStats.tot_heapover++;
1240 #endif
1241
1242       // did the task ask for a large block?
1243       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1244           // if so, get one and push it on the front of the nursery.
1245           bdescr *bd;
1246           nat blocks;
1247           
1248           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1249
1250           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
1251                                    t->id, whatNext_strs[t->what_next], blocks));
1252
1253           // don't do this if it would push us over the
1254           // alloc_blocks_lim limit; we'll GC first.
1255           if (alloc_blocks + blocks < alloc_blocks_lim) {
1256
1257               alloc_blocks += blocks;
1258               bd = allocGroup( blocks );
1259
1260               // link the new group into the list
1261               bd->link = cap->r.rCurrentNursery;
1262               bd->u.back = cap->r.rCurrentNursery->u.back;
1263               if (cap->r.rCurrentNursery->u.back != NULL) {
1264                   cap->r.rCurrentNursery->u.back->link = bd;
1265               } else {
1266                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1267                          g0s0->blocks == cap->r.rNursery);
1268                   cap->r.rNursery = g0s0->blocks = bd;
1269               }           
1270               cap->r.rCurrentNursery->u.back = bd;
1271
1272               // initialise it as a nursery block.  We initialise the
1273               // step, gen_no, and flags field of *every* sub-block in
1274               // this large block, because this is easier than making
1275               // sure that we always find the block head of a large
1276               // block whenever we call Bdescr() (eg. evacuate() and
1277               // isAlive() in the GC would both have to do this, at
1278               // least).
1279               { 
1280                   bdescr *x;
1281                   for (x = bd; x < bd + blocks; x++) {
1282                       x->step = g0s0;
1283                       x->gen_no = 0;
1284                       x->flags = 0;
1285                   }
1286               }
1287
1288               // don't forget to update the block count in g0s0.
1289               g0s0->n_blocks += blocks;
1290               // This assert can be a killer if the app is doing lots
1291               // of large block allocations.
1292               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1293
1294               // now update the nursery to point to the new block
1295               cap->r.rCurrentNursery = bd;
1296
1297               // we might be unlucky and have another thread get on the
1298               // run queue before us and steal the large block, but in that
1299               // case the thread will just end up requesting another large
1300               // block.
1301               PUSH_ON_RUN_QUEUE(t);
1302               break;
1303           }
1304       }
1305
1306       /* make all the running tasks block on a condition variable,
1307        * maybe set context_switch and wait till they all pile in,
1308        * then have them wait on a GC condition variable.
1309        */
1310       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1311                                t->id, whatNext_strs[t->what_next]));
1312       threadPaused(t);
1313 #if defined(GRAN)
1314       ASSERT(!is_on_queue(t,CurrentProc));
1315 #elif defined(PAR)
1316       /* Currently we emit a DESCHEDULE event before GC in GUM.
1317          ToDo: either add separate event to distinguish SYSTEM time from rest
1318                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1319       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1320         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1321                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1322         emitSchedule = rtsTrue;
1323       }
1324 #endif
1325       
1326       ready_to_gc = rtsTrue;
1327       context_switch = 1;               /* stop other threads ASAP */
1328       PUSH_ON_RUN_QUEUE(t);
1329       /* actual GC is done at the end of the while loop */
1330       break;
1331       
1332     case StackOverflow:
1333 #if defined(GRAN)
1334       IF_DEBUG(gran, 
1335                DumpGranEvent(GR_DESCHEDULE, t));
1336       globalGranStats.tot_stackover++;
1337 #elif defined(PAR)
1338       // IF_DEBUG(par, 
1339       // DumpGranEvent(GR_DESCHEDULE, t);
1340       globalParStats.tot_stackover++;
1341 #endif
1342       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1343                                t->id, whatNext_strs[t->what_next]));
1344       /* just adjust the stack for this thread, then pop it back
1345        * on the run queue.
1346        */
1347       threadPaused(t);
1348       { 
1349         StgMainThread *m;
1350         /* enlarge the stack */
1351         StgTSO *new_t = threadStackOverflow(t);
1352         
1353         /* This TSO has moved, so update any pointers to it from the
1354          * main thread stack.  It better not be on any other queues...
1355          * (it shouldn't be).
1356          */
1357         for (m = main_threads; m != NULL; m = m->link) {
1358           if (m->tso == t) {
1359             m->tso = new_t;
1360           }
1361         }
1362         threadPaused(new_t);
1363         PUSH_ON_RUN_QUEUE(new_t);
1364       }
1365       break;
1366
1367     case ThreadYielding:
1368 #if defined(GRAN)
1369       IF_DEBUG(gran, 
1370                DumpGranEvent(GR_DESCHEDULE, t));
1371       globalGranStats.tot_yields++;
1372 #elif defined(PAR)
1373       // IF_DEBUG(par, 
1374       // DumpGranEvent(GR_DESCHEDULE, t);
1375       globalParStats.tot_yields++;
1376 #endif
1377       /* put the thread back on the run queue.  Then, if we're ready to
1378        * GC, check whether this is the last task to stop.  If so, wake
1379        * up the GC thread.  getThread will block during a GC until the
1380        * GC is finished.
1381        */
1382       IF_DEBUG(scheduler,
1383                if (t->what_next != prev_what_next) {
1384                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1385                          t->id, whatNext_strs[t->what_next]);
1386                } else {
1387                    belch("--<< thread %ld (%s) stopped, yielding", 
1388                          t->id, whatNext_strs[t->what_next]);
1389                }
1390                );
1391
1392       IF_DEBUG(sanity,
1393                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1394                checkTSO(t));
1395       ASSERT(t->link == END_TSO_QUEUE);
1396
1397       // Shortcut if we're just switching evaluators: don't bother
1398       // doing stack squeezing (which can be expensive), just run the
1399       // thread.
1400       if (t->what_next != prev_what_next) {
1401           goto run_thread;
1402       }
1403
1404       threadPaused(t);
1405
1406 #if defined(GRAN)
1407       ASSERT(!is_on_queue(t,CurrentProc));
1408
1409       IF_DEBUG(sanity,
1410                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1411                checkThreadQsSanity(rtsTrue));
1412 #endif
1413
1414 #if defined(PAR)
1415       if (RtsFlags.ParFlags.doFairScheduling) { 
1416         /* this does round-robin scheduling; good for concurrency */
1417         APPEND_TO_RUN_QUEUE(t);
1418       } else {
1419         /* this does unfair scheduling; good for parallelism */
1420         PUSH_ON_RUN_QUEUE(t);
1421       }
1422 #else
1423       // this does round-robin scheduling; good for concurrency
1424       APPEND_TO_RUN_QUEUE(t);
1425 #endif
1426
1427 #if defined(GRAN)
1428       /* add a ContinueThread event to actually process the thread */
1429       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1430                 ContinueThread,
1431                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1432       IF_GRAN_DEBUG(bq, 
1433                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1434                G_EVENTQ(0);
1435                G_CURR_THREADQ(0));
1436 #endif /* GRAN */
1437       break;
1438
1439     case ThreadBlocked:
1440 #if defined(GRAN)
1441       IF_DEBUG(scheduler,
1442                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1443                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1444                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1445
1446       // ??? needed; should emit block before
1447       IF_DEBUG(gran, 
1448                DumpGranEvent(GR_DESCHEDULE, t)); 
1449       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1450       /*
1451         ngoq Dogh!
1452       ASSERT(procStatus[CurrentProc]==Busy || 
1453               ((procStatus[CurrentProc]==Fetching) && 
1454               (t->block_info.closure!=(StgClosure*)NULL)));
1455       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1456           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1457             procStatus[CurrentProc]==Fetching)) 
1458         procStatus[CurrentProc] = Idle;
1459       */
1460 #elif defined(PAR)
1461       IF_DEBUG(scheduler,
1462                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1463                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1464       IF_PAR_DEBUG(bq,
1465
1466                    if (t->block_info.closure!=(StgClosure*)NULL) 
1467                      print_bq(t->block_info.closure));
1468
1469       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1470       blockThread(t);
1471
1472       /* whatever we schedule next, we must log that schedule */
1473       emitSchedule = rtsTrue;
1474
1475 #else /* !GRAN */
1476       /* don't need to do anything.  Either the thread is blocked on
1477        * I/O, in which case we'll have called addToBlockedQueue
1478        * previously, or it's blocked on an MVar or Blackhole, in which
1479        * case it'll be on the relevant queue already.
1480        */
1481       IF_DEBUG(scheduler,
1482                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1483                        t->id, whatNext_strs[t->what_next]);
1484                printThreadBlockage(t);
1485                fprintf(stderr, "\n"));
1486
1487       /* Only for dumping event to log file 
1488          ToDo: do I need this in GranSim, too?
1489       blockThread(t);
1490       */
1491 #endif
1492       threadPaused(t);
1493       break;
1494       
1495     case ThreadFinished:
1496       /* Need to check whether this was a main thread, and if so, signal
1497        * the task that started it with the return value.  If we have no
1498        * more main threads, we probably need to stop all the tasks until
1499        * we get a new one.
1500        */
1501       /* We also end up here if the thread kills itself with an
1502        * uncaught exception, see Exception.hc.
1503        */
1504       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1505                                t->id, whatNext_strs[t->what_next]));
1506 #if defined(GRAN)
1507       endThread(t, CurrentProc); // clean-up the thread
1508 #elif defined(PAR)
1509       /* For now all are advisory -- HWL */
1510       //if(t->priority==AdvisoryPriority) ??
1511       advisory_thread_count--;
1512       
1513 # ifdef DIST
1514       if(t->dist.priority==RevalPriority)
1515         FinishReval(t);
1516 # endif
1517       
1518       if (RtsFlags.ParFlags.ParStats.Full &&
1519           !RtsFlags.ParFlags.ParStats.Suppressed) 
1520         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1521 #endif
1522       break;
1523       
1524     default:
1525       barf("schedule: invalid thread return code %d", (int)ret);
1526     }
1527
1528 #ifdef PROFILING
1529     // When we have +RTS -i0 and we're heap profiling, do a census at
1530     // every GC.  This lets us get repeatable runs for debugging.
1531     if (performHeapProfile ||
1532         (RtsFlags.ProfFlags.profileInterval==0 &&
1533          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1534         GarbageCollect(GetRoots, rtsTrue);
1535         heapCensus();
1536         performHeapProfile = rtsFalse;
1537         ready_to_gc = rtsFalse; // we already GC'd
1538     }
1539 #endif
1540
1541     if (ready_to_gc 
1542 #ifdef SMP
1543         && allFreeCapabilities() 
1544 #endif
1545         ) {
1546       /* everybody back, start the GC.
1547        * Could do it in this thread, or signal a condition var
1548        * to do it in another thread.  Either way, we need to
1549        * broadcast on gc_pending_cond afterward.
1550        */
1551 #if defined(RTS_SUPPORTS_THREADS)
1552       IF_DEBUG(scheduler,sched_belch("doing GC"));
1553 #endif
1554       GarbageCollect(GetRoots,rtsFalse);
1555       ready_to_gc = rtsFalse;
1556 #ifdef SMP
1557       broadcastCondition(&gc_pending_cond);
1558 #endif
1559 #if defined(GRAN)
1560       /* add a ContinueThread event to continue execution of current thread */
1561       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1562                 ContinueThread,
1563                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1564       IF_GRAN_DEBUG(bq, 
1565                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1566                G_EVENTQ(0);
1567                G_CURR_THREADQ(0));
1568 #endif /* GRAN */
1569     }
1570
1571 #if defined(GRAN)
1572   next_thread:
1573     IF_GRAN_DEBUG(unused,
1574                   print_eventq(EventHd));
1575
1576     event = get_next_event();
1577 #elif defined(PAR)
1578   next_thread:
1579     /* ToDo: wait for next message to arrive rather than busy wait */
1580 #endif /* GRAN */
1581
1582   } /* end of while(1) */
1583
1584   IF_PAR_DEBUG(verbose,
1585                belch("== Leaving schedule() after having received Finish"));
1586 }
1587
1588 /* ---------------------------------------------------------------------------
1589  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1590  * used by Control.Concurrent for error checking.
1591  * ------------------------------------------------------------------------- */
1592  
1593 StgBool
1594 rtsSupportsBoundThreads(void)
1595 {
1596 #ifdef THREADED_RTS
1597   return rtsTrue;
1598 #else
1599   return rtsFalse;
1600 #endif
1601 }
1602
1603 /* ---------------------------------------------------------------------------
1604  * isThreadBound(tso): check whether tso is bound to an OS thread.
1605  * ------------------------------------------------------------------------- */
1606  
1607 StgBool
1608 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1609 {
1610 #ifdef THREADED_RTS
1611   StgMainThread *m;
1612   for(m = main_threads; m; m = m->link)
1613   {
1614     if(m->tso == tso)
1615       return rtsTrue;
1616   }
1617 #endif
1618   return rtsFalse;
1619 }
1620
1621 /* ---------------------------------------------------------------------------
1622  * Singleton fork(). Do not copy any running threads.
1623  * ------------------------------------------------------------------------- */
1624
1625 #ifdef THREADED_RTS
1626 static void 
1627 deleteThreadImmediately(StgTSO *tso);
1628 #endif
1629
1630 StgInt
1631 forkProcess(StgTSO* tso)
1632 {
1633 #ifndef mingw32_TARGET_OS
1634   pid_t pid;
1635   StgTSO* t,*next;
1636
1637   IF_DEBUG(scheduler,sched_belch("forking!"));
1638   ACQUIRE_LOCK(&sched_mutex);
1639
1640   pid = fork();
1641   if (pid) { /* parent */
1642
1643   /* just return the pid */
1644     
1645   } else { /* child */
1646 #ifdef THREADED_RTS
1647     /* wipe all other threads */
1648     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1649     tso->link = END_TSO_QUEUE;
1650     
1651     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1652       next = t->link;
1653       
1654       /* Don't kill the current thread.. */
1655       if (t->id == tso->id) {
1656         continue;
1657       }
1658       
1659       if (isThreadBound(t)) {
1660         // If the thread is bound, the OS thread that the thread is bound to
1661         // no longer exists after the fork() system call.
1662         // The bound Haskell thread is therefore unable to run at all;
1663         // we must not give it a chance to survive by catching the
1664         // ThreadKilled exception. So we kill it "brutally" rather than
1665         // using deleteThread.
1666         deleteThreadImmediately(t);
1667       } else {
1668         deleteThread(t);
1669       }
1670     }
1671     
1672     if (isThreadBound(tso)) {
1673     } else {
1674       // If the current is not bound, then we should make it so.
1675       // The OS thread left over by fork() is special in that the process
1676       // will terminate as soon as the thread terminates; 
1677       // we'd expect forkProcess to behave similarily.
1678       // FIXME - we don't do this.
1679     }
1680 #else
1681   StgMainThread *m;
1682   rtsBool doKill;
1683   /* wipe all other threads */
1684   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1685   tso->link = END_TSO_QUEUE;
1686
1687   /* When clearing out the threads, we need to ensure
1688      that a 'main thread' is left behind; if there isn't,
1689      the Scheduler will shutdown next time it is entered.
1690      
1691      ==> we don't kill a thread that's on the main_threads
1692          list (nor the current thread.)
1693     
1694      [ Attempts at implementing the more ambitious scheme of
1695        killing the main_threads also, and then adding the
1696        current thread onto the main_threads list if it wasn't
1697        there already, failed -- waitThread() (for one) wasn't
1698        up to it. If it proves to be desirable to also kill
1699        the main threads, then this scheme will have to be
1700        revisited (and fully debugged!)
1701        
1702        -- sof 7/2002
1703      ]
1704   */
1705   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1706      us is picky about finding the thread still in its queue when
1707      handling the deleteThread() */
1708
1709   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1710     next = t->link;
1711     
1712     /* Don't kill the current thread.. */
1713     if (t->id == tso->id) continue;
1714     doKill=rtsTrue;
1715     /* ..or a main thread */
1716     for (m = main_threads; m != NULL; m = m->link) {
1717         if (m->tso->id == t->id) {
1718           doKill=rtsFalse;
1719           break;
1720         }
1721     }
1722     if (doKill) {
1723       deleteThread(t);
1724     }
1725   }
1726 #endif
1727   }
1728   RELEASE_LOCK(&sched_mutex);
1729   return pid;
1730 #else /* mingw32 */
1731   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1732   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1733   return -1;
1734 #endif /* mingw32 */
1735 }
1736
1737 /* ---------------------------------------------------------------------------
1738  * deleteAllThreads():  kill all the live threads.
1739  *
1740  * This is used when we catch a user interrupt (^C), before performing
1741  * any necessary cleanups and running finalizers.
1742  *
1743  * Locks: sched_mutex held.
1744  * ------------------------------------------------------------------------- */
1745    
1746 void
1747 deleteAllThreads ( void )
1748 {
1749   StgTSO* t, *next;
1750   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1751   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1752       next = t->global_link;
1753       deleteThread(t);
1754   }      
1755   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1756   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1757   sleeping_queue = END_TSO_QUEUE;
1758 }
1759
1760 /* startThread and  insertThread are now in GranSim.c -- HWL */
1761
1762
1763 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1764 //@subsection Suspend and Resume
1765
1766 /* ---------------------------------------------------------------------------
1767  * Suspending & resuming Haskell threads.
1768  * 
1769  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1770  * its capability before calling the C function.  This allows another
1771  * task to pick up the capability and carry on running Haskell
1772  * threads.  It also means that if the C call blocks, it won't lock
1773  * the whole system.
1774  *
1775  * The Haskell thread making the C call is put to sleep for the
1776  * duration of the call, on the susepended_ccalling_threads queue.  We
1777  * give out a token to the task, which it can use to resume the thread
1778  * on return from the C function.
1779  * ------------------------------------------------------------------------- */
1780    
1781 StgInt
1782 suspendThread( StgRegTable *reg, 
1783                rtsBool concCall
1784 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1785                STG_UNUSED
1786 #endif
1787                )
1788 {
1789   nat tok;
1790   Capability *cap;
1791   int saved_errno = errno;
1792
1793   /* assume that *reg is a pointer to the StgRegTable part
1794    * of a Capability.
1795    */
1796   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1797
1798   ACQUIRE_LOCK(&sched_mutex);
1799
1800   IF_DEBUG(scheduler,
1801            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1802
1803   // XXX this might not be necessary --SDM
1804   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1805
1806   threadPaused(cap->r.rCurrentTSO);
1807   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1808   suspended_ccalling_threads = cap->r.rCurrentTSO;
1809
1810 #if defined(RTS_SUPPORTS_THREADS)
1811   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1812   {
1813       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1814       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1815   }
1816   else
1817   {
1818       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1819   }
1820 #endif
1821
1822   /* Use the thread ID as the token; it should be unique */
1823   tok = cap->r.rCurrentTSO->id;
1824
1825   /* Hand back capability */
1826   releaseCapability(cap);
1827   
1828 #if defined(RTS_SUPPORTS_THREADS)
1829   /* Preparing to leave the RTS, so ensure there's a native thread/task
1830      waiting to take over.
1831   */
1832   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
1833   //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
1834       startTask(taskStart);
1835   //}
1836 #endif
1837
1838   /* Other threads _might_ be available for execution; signal this */
1839   THREAD_RUNNABLE();
1840   RELEASE_LOCK(&sched_mutex);
1841   
1842   errno = saved_errno;
1843   return tok; 
1844 }
1845
1846 StgRegTable *
1847 resumeThread( StgInt tok,
1848               rtsBool concCall STG_UNUSED )
1849 {
1850   StgTSO *tso, **prev;
1851   Capability *cap;
1852   int saved_errno = errno;
1853
1854 #if defined(RTS_SUPPORTS_THREADS)
1855   /* Wait for permission to re-enter the RTS with the result. */
1856   ACQUIRE_LOCK(&sched_mutex);
1857   grabReturnCapability(&sched_mutex, &cap);
1858
1859   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
1860 #else
1861   grabCapability(&cap);
1862 #endif
1863
1864   /* Remove the thread off of the suspended list */
1865   prev = &suspended_ccalling_threads;
1866   for (tso = suspended_ccalling_threads; 
1867        tso != END_TSO_QUEUE; 
1868        prev = &tso->link, tso = tso->link) {
1869     if (tso->id == (StgThreadID)tok) {
1870       *prev = tso->link;
1871       break;
1872     }
1873   }
1874   if (tso == END_TSO_QUEUE) {
1875     barf("resumeThread: thread not found");
1876   }
1877   tso->link = END_TSO_QUEUE;
1878   
1879 #if defined(RTS_SUPPORTS_THREADS)
1880   if(tso->why_blocked == BlockedOnCCall)
1881   {
1882       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1883       tso->blocked_exceptions = NULL;
1884   }
1885 #endif
1886   
1887   /* Reset blocking status */
1888   tso->why_blocked  = NotBlocked;
1889
1890   cap->r.rCurrentTSO = tso;
1891 #if defined(RTS_SUPPORTS_THREADS)
1892   RELEASE_LOCK(&sched_mutex);
1893 #endif
1894   errno = saved_errno;
1895   return &cap->r;
1896 }
1897
1898
1899 /* ---------------------------------------------------------------------------
1900  * Static functions
1901  * ------------------------------------------------------------------------ */
1902 static void unblockThread(StgTSO *tso);
1903
1904 /* ---------------------------------------------------------------------------
1905  * Comparing Thread ids.
1906  *
1907  * This is used from STG land in the implementation of the
1908  * instances of Eq/Ord for ThreadIds.
1909  * ------------------------------------------------------------------------ */
1910
1911 int
1912 cmp_thread(StgPtr tso1, StgPtr tso2) 
1913
1914   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1915   StgThreadID id2 = ((StgTSO *)tso2)->id;
1916  
1917   if (id1 < id2) return (-1);
1918   if (id1 > id2) return 1;
1919   return 0;
1920 }
1921
1922 /* ---------------------------------------------------------------------------
1923  * Fetching the ThreadID from an StgTSO.
1924  *
1925  * This is used in the implementation of Show for ThreadIds.
1926  * ------------------------------------------------------------------------ */
1927 int
1928 rts_getThreadId(StgPtr tso) 
1929 {
1930   return ((StgTSO *)tso)->id;
1931 }
1932
1933 #ifdef DEBUG
1934 void
1935 labelThread(StgPtr tso, char *label)
1936 {
1937   int len;
1938   void *buf;
1939
1940   /* Caveat: Once set, you can only set the thread name to "" */
1941   len = strlen(label)+1;
1942   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1943   strncpy(buf,label,len);
1944   /* Update will free the old memory for us */
1945   updateThreadLabel((StgWord)tso,buf);
1946 }
1947 #endif /* DEBUG */
1948
1949 /* ---------------------------------------------------------------------------
1950    Create a new thread.
1951
1952    The new thread starts with the given stack size.  Before the
1953    scheduler can run, however, this thread needs to have a closure
1954    (and possibly some arguments) pushed on its stack.  See
1955    pushClosure() in Schedule.h.
1956
1957    createGenThread() and createIOThread() (in SchedAPI.h) are
1958    convenient packaged versions of this function.
1959
1960    currently pri (priority) is only used in a GRAN setup -- HWL
1961    ------------------------------------------------------------------------ */
1962 //@cindex createThread
1963 #if defined(GRAN)
1964 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1965 StgTSO *
1966 createThread(nat size, StgInt pri)
1967 #else
1968 StgTSO *
1969 createThread(nat size)
1970 #endif
1971 {
1972
1973     StgTSO *tso;
1974     nat stack_size;
1975
1976     /* First check whether we should create a thread at all */
1977 #if defined(PAR)
1978   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1979   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1980     threadsIgnored++;
1981     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1982           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1983     return END_TSO_QUEUE;
1984   }
1985   threadsCreated++;
1986 #endif
1987
1988 #if defined(GRAN)
1989   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1990 #endif
1991
1992   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1993
1994   /* catch ridiculously small stack sizes */
1995   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1996     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1997   }
1998
1999   stack_size = size - TSO_STRUCT_SIZEW;
2000
2001   tso = (StgTSO *)allocate(size);
2002   TICK_ALLOC_TSO(stack_size, 0);
2003
2004   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2005 #if defined(GRAN)
2006   SET_GRAN_HDR(tso, ThisPE);
2007 #endif
2008
2009   // Always start with the compiled code evaluator
2010   tso->what_next = ThreadRunGHC;
2011
2012   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
2013    * protect the increment operation on next_thread_id.
2014    * In future, we could use an atomic increment instead.
2015    */
2016   ACQUIRE_LOCK(&thread_id_mutex);
2017   tso->id = next_thread_id++; 
2018   RELEASE_LOCK(&thread_id_mutex);
2019
2020   tso->why_blocked  = NotBlocked;
2021   tso->blocked_exceptions = NULL;
2022
2023   tso->saved_errno = 0;
2024   
2025   tso->stack_size   = stack_size;
2026   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2027                               - TSO_STRUCT_SIZEW;
2028   tso->sp           = (P_)&(tso->stack) + stack_size;
2029
2030 #ifdef PROFILING
2031   tso->prof.CCCS = CCS_MAIN;
2032 #endif
2033
2034   /* put a stop frame on the stack */
2035   tso->sp -= sizeofW(StgStopFrame);
2036   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2037   // ToDo: check this
2038 #if defined(GRAN)
2039   tso->link = END_TSO_QUEUE;
2040   /* uses more flexible routine in GranSim */
2041   insertThread(tso, CurrentProc);
2042 #else
2043   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2044    * from its creation
2045    */
2046 #endif
2047
2048 #if defined(GRAN) 
2049   if (RtsFlags.GranFlags.GranSimStats.Full) 
2050     DumpGranEvent(GR_START,tso);
2051 #elif defined(PAR)
2052   if (RtsFlags.ParFlags.ParStats.Full) 
2053     DumpGranEvent(GR_STARTQ,tso);
2054   /* HACk to avoid SCHEDULE 
2055      LastTSO = tso; */
2056 #endif
2057
2058   /* Link the new thread on the global thread list.
2059    */
2060   tso->global_link = all_threads;
2061   all_threads = tso;
2062
2063 #if defined(DIST)
2064   tso->dist.priority = MandatoryPriority; //by default that is...
2065 #endif
2066
2067 #if defined(GRAN)
2068   tso->gran.pri = pri;
2069 # if defined(DEBUG)
2070   tso->gran.magic = TSO_MAGIC; // debugging only
2071 # endif
2072   tso->gran.sparkname   = 0;
2073   tso->gran.startedat   = CURRENT_TIME; 
2074   tso->gran.exported    = 0;
2075   tso->gran.basicblocks = 0;
2076   tso->gran.allocs      = 0;
2077   tso->gran.exectime    = 0;
2078   tso->gran.fetchtime   = 0;
2079   tso->gran.fetchcount  = 0;
2080   tso->gran.blocktime   = 0;
2081   tso->gran.blockcount  = 0;
2082   tso->gran.blockedat   = 0;
2083   tso->gran.globalsparks = 0;
2084   tso->gran.localsparks  = 0;
2085   if (RtsFlags.GranFlags.Light)
2086     tso->gran.clock  = Now; /* local clock */
2087   else
2088     tso->gran.clock  = 0;
2089
2090   IF_DEBUG(gran,printTSO(tso));
2091 #elif defined(PAR)
2092 # if defined(DEBUG)
2093   tso->par.magic = TSO_MAGIC; // debugging only
2094 # endif
2095   tso->par.sparkname   = 0;
2096   tso->par.startedat   = CURRENT_TIME; 
2097   tso->par.exported    = 0;
2098   tso->par.basicblocks = 0;
2099   tso->par.allocs      = 0;
2100   tso->par.exectime    = 0;
2101   tso->par.fetchtime   = 0;
2102   tso->par.fetchcount  = 0;
2103   tso->par.blocktime   = 0;
2104   tso->par.blockcount  = 0;
2105   tso->par.blockedat   = 0;
2106   tso->par.globalsparks = 0;
2107   tso->par.localsparks  = 0;
2108 #endif
2109
2110 #if defined(GRAN)
2111   globalGranStats.tot_threads_created++;
2112   globalGranStats.threads_created_on_PE[CurrentProc]++;
2113   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2114   globalGranStats.tot_sq_probes++;
2115 #elif defined(PAR)
2116   // collect parallel global statistics (currently done together with GC stats)
2117   if (RtsFlags.ParFlags.ParStats.Global &&
2118       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2119     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2120     globalParStats.tot_threads_created++;
2121   }
2122 #endif 
2123
2124 #if defined(GRAN)
2125   IF_GRAN_DEBUG(pri,
2126                 belch("==__ schedule: Created TSO %d (%p);",
2127                       CurrentProc, tso, tso->id));
2128 #elif defined(PAR)
2129     IF_PAR_DEBUG(verbose,
2130                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
2131                        tso->id, tso, advisory_thread_count));
2132 #else
2133   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2134                                  tso->id, tso->stack_size));
2135 #endif    
2136   return tso;
2137 }
2138
2139 #if defined(PAR)
2140 /* RFP:
2141    all parallel thread creation calls should fall through the following routine.
2142 */
2143 StgTSO *
2144 createSparkThread(rtsSpark spark) 
2145 { StgTSO *tso;
2146   ASSERT(spark != (rtsSpark)NULL);
2147   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2148   { threadsIgnored++;
2149     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2150           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2151     return END_TSO_QUEUE;
2152   }
2153   else
2154   { threadsCreated++;
2155     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2156     if (tso==END_TSO_QUEUE)     
2157       barf("createSparkThread: Cannot create TSO");
2158 #if defined(DIST)
2159     tso->priority = AdvisoryPriority;
2160 #endif
2161     pushClosure(tso,spark);
2162     PUSH_ON_RUN_QUEUE(tso);
2163     advisory_thread_count++;    
2164   }
2165   return tso;
2166 }
2167 #endif
2168
2169 /*
2170   Turn a spark into a thread.
2171   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2172 */
2173 #if defined(PAR)
2174 //@cindex activateSpark
2175 StgTSO *
2176 activateSpark (rtsSpark spark) 
2177 {
2178   StgTSO *tso;
2179
2180   tso = createSparkThread(spark);
2181   if (RtsFlags.ParFlags.ParStats.Full) {   
2182     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2183     IF_PAR_DEBUG(verbose,
2184                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
2185                        (StgClosure *)spark, info_type((StgClosure *)spark)));
2186   }
2187   // ToDo: fwd info on local/global spark to thread -- HWL
2188   // tso->gran.exported =  spark->exported;
2189   // tso->gran.locked =   !spark->global;
2190   // tso->gran.sparkname = spark->name;
2191
2192   return tso;
2193 }
2194 #endif
2195
2196 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
2197                                    Capability *initialCapability
2198                                    );
2199
2200
2201 /* ---------------------------------------------------------------------------
2202  * scheduleThread()
2203  *
2204  * scheduleThread puts a thread on the head of the runnable queue.
2205  * This will usually be done immediately after a thread is created.
2206  * The caller of scheduleThread must create the thread using e.g.
2207  * createThread and push an appropriate closure
2208  * on this thread's stack before the scheduler is invoked.
2209  * ------------------------------------------------------------------------ */
2210
2211 static void scheduleThread_ (StgTSO* tso);
2212
2213 void
2214 scheduleThread_(StgTSO *tso)
2215 {
2216   // Precondition: sched_mutex must be held.
2217
2218   /* Put the new thread on the head of the runnable queue.  The caller
2219    * better push an appropriate closure on this thread's stack
2220    * beforehand.  In the SMP case, the thread may start running as
2221    * soon as we release the scheduler lock below.
2222    */
2223   PUSH_ON_RUN_QUEUE(tso);
2224   THREAD_RUNNABLE();
2225
2226 #if 0
2227   IF_DEBUG(scheduler,printTSO(tso));
2228 #endif
2229 }
2230
2231 void scheduleThread(StgTSO* tso)
2232 {
2233   ACQUIRE_LOCK(&sched_mutex);
2234   scheduleThread_(tso);
2235   RELEASE_LOCK(&sched_mutex);
2236 }
2237
2238 SchedulerStatus
2239 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability)
2240 {       // Precondition: sched_mutex must be held
2241   StgMainThread *m;
2242
2243   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2244   m->tso = tso;
2245   m->ret = ret;
2246   m->stat = NoStatus;
2247 #if defined(RTS_SUPPORTS_THREADS)
2248   initCondition(&m->wakeup);
2249 #if defined(THREADED_RTS)
2250   initCondition(&m->bound_thread_cond);
2251 #endif
2252 #endif
2253
2254   /* Put the thread on the main-threads list prior to scheduling the TSO.
2255      Failure to do so introduces a race condition in the MT case (as
2256      identified by Wolfgang Thaller), whereby the new task/OS thread 
2257      created by scheduleThread_() would complete prior to the thread
2258      that spawned it managed to put 'itself' on the main-threads list.
2259      The upshot of it all being that the worker thread wouldn't get to
2260      signal the completion of the its work item for the main thread to
2261      see (==> it got stuck waiting.)    -- sof 6/02.
2262   */
2263   IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
2264   
2265   m->link = main_threads;
2266   main_threads = m;
2267
2268   scheduleThread_(tso);
2269
2270   return waitThread_(m, initialCapability);
2271 }
2272
2273 /* ---------------------------------------------------------------------------
2274  * initScheduler()
2275  *
2276  * Initialise the scheduler.  This resets all the queues - if the
2277  * queues contained any threads, they'll be garbage collected at the
2278  * next pass.
2279  *
2280  * ------------------------------------------------------------------------ */
2281
2282 #ifdef SMP
2283 static void
2284 term_handler(int sig STG_UNUSED)
2285 {
2286   stat_workerStop();
2287   ACQUIRE_LOCK(&term_mutex);
2288   await_death--;
2289   RELEASE_LOCK(&term_mutex);
2290   shutdownThread();
2291 }
2292 #endif
2293
2294 void 
2295 initScheduler(void)
2296 {
2297 #if defined(GRAN)
2298   nat i;
2299
2300   for (i=0; i<=MAX_PROC; i++) {
2301     run_queue_hds[i]      = END_TSO_QUEUE;
2302     run_queue_tls[i]      = END_TSO_QUEUE;
2303     blocked_queue_hds[i]  = END_TSO_QUEUE;
2304     blocked_queue_tls[i]  = END_TSO_QUEUE;
2305     ccalling_threadss[i]  = END_TSO_QUEUE;
2306     sleeping_queue        = END_TSO_QUEUE;
2307   }
2308 #else
2309   run_queue_hd      = END_TSO_QUEUE;
2310   run_queue_tl      = END_TSO_QUEUE;
2311   blocked_queue_hd  = END_TSO_QUEUE;
2312   blocked_queue_tl  = END_TSO_QUEUE;
2313   sleeping_queue    = END_TSO_QUEUE;
2314 #endif 
2315
2316   suspended_ccalling_threads  = END_TSO_QUEUE;
2317
2318   main_threads = NULL;
2319   all_threads  = END_TSO_QUEUE;
2320
2321   context_switch = 0;
2322   interrupted    = 0;
2323
2324   RtsFlags.ConcFlags.ctxtSwitchTicks =
2325       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2326       
2327 #if defined(RTS_SUPPORTS_THREADS)
2328   /* Initialise the mutex and condition variables used by
2329    * the scheduler. */
2330   initMutex(&sched_mutex);
2331   initMutex(&term_mutex);
2332   initMutex(&thread_id_mutex);
2333
2334   initCondition(&thread_ready_cond);
2335 #endif
2336   
2337 #if defined(SMP)
2338   initCondition(&gc_pending_cond);
2339 #endif
2340
2341 #if defined(RTS_SUPPORTS_THREADS)
2342   ACQUIRE_LOCK(&sched_mutex);
2343 #endif
2344
2345   /* Install the SIGHUP handler */
2346 #if defined(SMP)
2347   {
2348     struct sigaction action,oact;
2349
2350     action.sa_handler = term_handler;
2351     sigemptyset(&action.sa_mask);
2352     action.sa_flags = 0;
2353     if (sigaction(SIGTERM, &action, &oact) != 0) {
2354       barf("can't install TERM handler");
2355     }
2356   }
2357 #endif
2358
2359   /* A capability holds the state a native thread needs in
2360    * order to execute STG code. At least one capability is
2361    * floating around (only SMP builds have more than one).
2362    */
2363   initCapabilities();
2364   
2365 #if defined(RTS_SUPPORTS_THREADS)
2366     /* start our haskell execution tasks */
2367 # if defined(SMP)
2368     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2369 # else
2370     startTaskManager(0,taskStart);
2371 # endif
2372 #endif
2373
2374 #if /* defined(SMP) ||*/ defined(PAR)
2375   initSparkPools();
2376 #endif
2377
2378 #if defined(RTS_SUPPORTS_THREADS)
2379   RELEASE_LOCK(&sched_mutex);
2380 #endif
2381
2382 }
2383
2384 void
2385 exitScheduler( void )
2386 {
2387 #if defined(RTS_SUPPORTS_THREADS)
2388   stopTaskManager();
2389 #endif
2390   shutting_down_scheduler = rtsTrue;
2391 }
2392
2393 /* -----------------------------------------------------------------------------
2394    Managing the per-task allocation areas.
2395    
2396    Each capability comes with an allocation area.  These are
2397    fixed-length block lists into which allocation can be done.
2398
2399    ToDo: no support for two-space collection at the moment???
2400    -------------------------------------------------------------------------- */
2401
2402 /* -----------------------------------------------------------------------------
2403  * waitThread is the external interface for running a new computation
2404  * and waiting for the result.
2405  *
2406  * In the non-SMP case, we create a new main thread, push it on the 
2407  * main-thread stack, and invoke the scheduler to run it.  The
2408  * scheduler will return when the top main thread on the stack has
2409  * completed or died, and fill in the necessary fields of the
2410  * main_thread structure.
2411  *
2412  * In the SMP case, we create a main thread as before, but we then
2413  * create a new condition variable and sleep on it.  When our new
2414  * main thread has completed, we'll be woken up and the status/result
2415  * will be in the main_thread struct.
2416  * -------------------------------------------------------------------------- */
2417
2418 int 
2419 howManyThreadsAvail ( void )
2420 {
2421    int i = 0;
2422    StgTSO* q;
2423    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2424       i++;
2425    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2426       i++;
2427    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2428       i++;
2429    return i;
2430 }
2431
2432 void
2433 finishAllThreads ( void )
2434 {
2435    do {
2436       while (run_queue_hd != END_TSO_QUEUE) {
2437          waitThread ( run_queue_hd, NULL, NULL );
2438       }
2439       while (blocked_queue_hd != END_TSO_QUEUE) {
2440          waitThread ( blocked_queue_hd, NULL, NULL );
2441       }
2442       while (sleeping_queue != END_TSO_QUEUE) {
2443          waitThread ( blocked_queue_hd, NULL, NULL );
2444       }
2445    } while 
2446       (blocked_queue_hd != END_TSO_QUEUE || 
2447        run_queue_hd     != END_TSO_QUEUE ||
2448        sleeping_queue   != END_TSO_QUEUE);
2449 }
2450
2451 SchedulerStatus
2452 waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
2453
2454   StgMainThread *m;
2455   SchedulerStatus stat;
2456
2457   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2458   m->tso = tso;
2459   m->ret = ret;
2460   m->stat = NoStatus;
2461 #if defined(RTS_SUPPORTS_THREADS)
2462   initCondition(&m->wakeup);
2463 #if defined(THREADED_RTS)
2464   initCondition(&m->bound_thread_cond);
2465 #endif
2466 #endif
2467
2468   /* see scheduleWaitThread() comment */
2469   ACQUIRE_LOCK(&sched_mutex);
2470   m->link = main_threads;
2471   main_threads = m;
2472
2473   IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
2474
2475   stat = waitThread_(m,initialCapability);
2476   
2477   RELEASE_LOCK(&sched_mutex);
2478   return stat;
2479 }
2480
2481 static
2482 SchedulerStatus
2483 waitThread_(StgMainThread* m, Capability *initialCapability)
2484 {
2485   SchedulerStatus stat;
2486
2487   // Precondition: sched_mutex must be held.
2488   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2489
2490 #if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS)
2491   {     // FIXME: does this still make sense?
2492         // It's not for the threaded rts => SMP only
2493     do {
2494       waitCondition(&m->wakeup, &sched_mutex);
2495     } while (m->stat == NoStatus);
2496   }
2497 #elif defined(GRAN)
2498   /* GranSim specific init */
2499   CurrentTSO = m->tso;                // the TSO to run
2500   procStatus[MainProc] = Busy;        // status of main PE
2501   CurrentProc = MainProc;             // PE to run it on
2502
2503   RELEASE_LOCK(&sched_mutex);
2504   schedule(m,initialCapability);
2505 #else
2506   RELEASE_LOCK(&sched_mutex);
2507   schedule(m,initialCapability);
2508   ACQUIRE_LOCK(&sched_mutex);
2509   ASSERT(m->stat != NoStatus);
2510 #endif
2511
2512   stat = m->stat;
2513
2514 #if defined(RTS_SUPPORTS_THREADS)
2515   closeCondition(&m->wakeup);
2516 #if defined(THREADED_RTS)
2517   closeCondition(&m->bound_thread_cond);
2518 #endif
2519 #endif
2520
2521   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2522                               m->tso->id));
2523   stgFree(m);
2524
2525   // Postcondition: sched_mutex still held
2526   return stat;
2527 }
2528
2529 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2530 //@subsection Run queue code 
2531
2532 #if 0
2533 /* 
2534    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2535        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2536        implicit global variable that has to be correct when calling these
2537        fcts -- HWL 
2538 */
2539
2540 /* Put the new thread on the head of the runnable queue.
2541  * The caller of createThread better push an appropriate closure
2542  * on this thread's stack before the scheduler is invoked.
2543  */
2544 static /* inline */ void
2545 add_to_run_queue(tso)
2546 StgTSO* tso; 
2547 {
2548   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2549   tso->link = run_queue_hd;
2550   run_queue_hd = tso;
2551   if (run_queue_tl == END_TSO_QUEUE) {
2552     run_queue_tl = tso;
2553   }
2554 }
2555
2556 /* Put the new thread at the end of the runnable queue. */
2557 static /* inline */ void
2558 push_on_run_queue(tso)
2559 StgTSO* tso; 
2560 {
2561   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2562   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2563   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2564   if (run_queue_hd == END_TSO_QUEUE) {
2565     run_queue_hd = tso;
2566   } else {
2567     run_queue_tl->link = tso;
2568   }
2569   run_queue_tl = tso;
2570 }
2571
2572 /* 
2573    Should be inlined because it's used very often in schedule.  The tso
2574    argument is actually only needed in GranSim, where we want to have the
2575    possibility to schedule *any* TSO on the run queue, irrespective of the
2576    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2577    the run queue and dequeue the tso, adjusting the links in the queue. 
2578 */
2579 //@cindex take_off_run_queue
2580 static /* inline */ StgTSO*
2581 take_off_run_queue(StgTSO *tso) {
2582   StgTSO *t, *prev;
2583
2584   /* 
2585      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2586
2587      if tso is specified, unlink that tso from the run_queue (doesn't have
2588      to be at the beginning of the queue); GranSim only 
2589   */
2590   if (tso!=END_TSO_QUEUE) {
2591     /* find tso in queue */
2592     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2593          t!=END_TSO_QUEUE && t!=tso;
2594          prev=t, t=t->link) 
2595       /* nothing */ ;
2596     ASSERT(t==tso);
2597     /* now actually dequeue the tso */
2598     if (prev!=END_TSO_QUEUE) {
2599       ASSERT(run_queue_hd!=t);
2600       prev->link = t->link;
2601     } else {
2602       /* t is at beginning of thread queue */
2603       ASSERT(run_queue_hd==t);
2604       run_queue_hd = t->link;
2605     }
2606     /* t is at end of thread queue */
2607     if (t->link==END_TSO_QUEUE) {
2608       ASSERT(t==run_queue_tl);
2609       run_queue_tl = prev;
2610     } else {
2611       ASSERT(run_queue_tl!=t);
2612     }
2613     t->link = END_TSO_QUEUE;
2614   } else {
2615     /* take tso from the beginning of the queue; std concurrent code */
2616     t = run_queue_hd;
2617     if (t != END_TSO_QUEUE) {
2618       run_queue_hd = t->link;
2619       t->link = END_TSO_QUEUE;
2620       if (run_queue_hd == END_TSO_QUEUE) {
2621         run_queue_tl = END_TSO_QUEUE;
2622       }
2623     }
2624   }
2625   return t;
2626 }
2627
2628 #endif /* 0 */
2629
2630 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2631 //@subsection Garbage Collextion Routines
2632
2633 /* ---------------------------------------------------------------------------
2634    Where are the roots that we know about?
2635
2636         - all the threads on the runnable queue
2637         - all the threads on the blocked queue
2638         - all the threads on the sleeping queue
2639         - all the thread currently executing a _ccall_GC
2640         - all the "main threads"
2641      
2642    ------------------------------------------------------------------------ */
2643
2644 /* This has to be protected either by the scheduler monitor, or by the
2645         garbage collection monitor (probably the latter).
2646         KH @ 25/10/99
2647 */
2648
2649 void
2650 GetRoots(evac_fn evac)
2651 {
2652 #if defined(GRAN)
2653   {
2654     nat i;
2655     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2656       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2657           evac((StgClosure **)&run_queue_hds[i]);
2658       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2659           evac((StgClosure **)&run_queue_tls[i]);
2660       
2661       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2662           evac((StgClosure **)&blocked_queue_hds[i]);
2663       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2664           evac((StgClosure **)&blocked_queue_tls[i]);
2665       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2666           evac((StgClosure **)&ccalling_threads[i]);
2667     }
2668   }
2669
2670   markEventQueue();
2671
2672 #else /* !GRAN */
2673   if (run_queue_hd != END_TSO_QUEUE) {
2674       ASSERT(run_queue_tl != END_TSO_QUEUE);
2675       evac((StgClosure **)&run_queue_hd);
2676       evac((StgClosure **)&run_queue_tl);
2677   }
2678   
2679   if (blocked_queue_hd != END_TSO_QUEUE) {
2680       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2681       evac((StgClosure **)&blocked_queue_hd);
2682       evac((StgClosure **)&blocked_queue_tl);
2683   }
2684   
2685   if (sleeping_queue != END_TSO_QUEUE) {
2686       evac((StgClosure **)&sleeping_queue);
2687   }
2688 #endif 
2689
2690   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2691       evac((StgClosure **)&suspended_ccalling_threads);
2692   }
2693
2694 #if defined(PAR) || defined(GRAN)
2695   markSparkQueue(evac);
2696 #endif
2697
2698 #if defined(RTS_USER_SIGNALS)
2699   // mark the signal handlers (signals should be already blocked)
2700   markSignalHandlers(evac);
2701 #endif
2702
2703   // main threads which have completed need to be retained until they
2704   // are dealt with in the main scheduler loop.  They won't be
2705   // retained any other way: the GC will drop them from the
2706   // all_threads list, so we have to be careful to treat them as roots
2707   // here.
2708   { 
2709       StgMainThread *m;
2710       for (m = main_threads; m != NULL; m = m->link) {
2711           switch (m->tso->what_next) {
2712           case ThreadComplete:
2713           case ThreadKilled:
2714               evac((StgClosure **)&m->tso);
2715               break;
2716           default:
2717               break;
2718           }
2719       }
2720   }
2721 }
2722
2723 /* -----------------------------------------------------------------------------
2724    performGC
2725
2726    This is the interface to the garbage collector from Haskell land.
2727    We provide this so that external C code can allocate and garbage
2728    collect when called from Haskell via _ccall_GC.
2729
2730    It might be useful to provide an interface whereby the programmer
2731    can specify more roots (ToDo).
2732    
2733    This needs to be protected by the GC condition variable above.  KH.
2734    -------------------------------------------------------------------------- */
2735
2736 static void (*extra_roots)(evac_fn);
2737
2738 void
2739 performGC(void)
2740 {
2741   /* Obligated to hold this lock upon entry */
2742   ACQUIRE_LOCK(&sched_mutex);
2743   GarbageCollect(GetRoots,rtsFalse);
2744   RELEASE_LOCK(&sched_mutex);
2745 }
2746
2747 void
2748 performMajorGC(void)
2749 {
2750   ACQUIRE_LOCK(&sched_mutex);
2751   GarbageCollect(GetRoots,rtsTrue);
2752   RELEASE_LOCK(&sched_mutex);
2753 }
2754
2755 static void
2756 AllRoots(evac_fn evac)
2757 {
2758     GetRoots(evac);             // the scheduler's roots
2759     extra_roots(evac);          // the user's roots
2760 }
2761
2762 void
2763 performGCWithRoots(void (*get_roots)(evac_fn))
2764 {
2765   ACQUIRE_LOCK(&sched_mutex);
2766   extra_roots = get_roots;
2767   GarbageCollect(AllRoots,rtsFalse);
2768   RELEASE_LOCK(&sched_mutex);
2769 }
2770
2771 /* -----------------------------------------------------------------------------
2772    Stack overflow
2773
2774    If the thread has reached its maximum stack size, then raise the
2775    StackOverflow exception in the offending thread.  Otherwise
2776    relocate the TSO into a larger chunk of memory and adjust its stack
2777    size appropriately.
2778    -------------------------------------------------------------------------- */
2779
2780 static StgTSO *
2781 threadStackOverflow(StgTSO *tso)
2782 {
2783   nat new_stack_size, new_tso_size, stack_words;
2784   StgPtr new_sp;
2785   StgTSO *dest;
2786
2787   IF_DEBUG(sanity,checkTSO(tso));
2788   if (tso->stack_size >= tso->max_stack_size) {
2789
2790     IF_DEBUG(gc,
2791              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2792                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2793              /* If we're debugging, just print out the top of the stack */
2794              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2795                                               tso->sp+64)));
2796
2797     /* Send this thread the StackOverflow exception */
2798     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2799     return tso;
2800   }
2801
2802   /* Try to double the current stack size.  If that takes us over the
2803    * maximum stack size for this thread, then use the maximum instead.
2804    * Finally round up so the TSO ends up as a whole number of blocks.
2805    */
2806   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2807   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2808                                        TSO_STRUCT_SIZE)/sizeof(W_);
2809   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2810   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2811
2812   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2813
2814   dest = (StgTSO *)allocate(new_tso_size);
2815   TICK_ALLOC_TSO(new_stack_size,0);
2816
2817   /* copy the TSO block and the old stack into the new area */
2818   memcpy(dest,tso,TSO_STRUCT_SIZE);
2819   stack_words = tso->stack + tso->stack_size - tso->sp;
2820   new_sp = (P_)dest + new_tso_size - stack_words;
2821   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2822
2823   /* relocate the stack pointers... */
2824   dest->sp         = new_sp;
2825   dest->stack_size = new_stack_size;
2826         
2827   /* Mark the old TSO as relocated.  We have to check for relocated
2828    * TSOs in the garbage collector and any primops that deal with TSOs.
2829    *
2830    * It's important to set the sp value to just beyond the end
2831    * of the stack, so we don't attempt to scavenge any part of the
2832    * dead TSO's stack.
2833    */
2834   tso->what_next = ThreadRelocated;
2835   tso->link = dest;
2836   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2837   tso->why_blocked = NotBlocked;
2838   dest->mut_link = NULL;
2839
2840   IF_PAR_DEBUG(verbose,
2841                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2842                      tso->id, tso, tso->stack_size);
2843                /* If we're debugging, just print out the top of the stack */
2844                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2845                                                 tso->sp+64)));
2846   
2847   IF_DEBUG(sanity,checkTSO(tso));
2848 #if 0
2849   IF_DEBUG(scheduler,printTSO(dest));
2850 #endif
2851
2852   return dest;
2853 }
2854
2855 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2856 //@subsection Blocking Queue Routines
2857
2858 /* ---------------------------------------------------------------------------
2859    Wake up a queue that was blocked on some resource.
2860    ------------------------------------------------------------------------ */
2861
2862 #if defined(GRAN)
2863 static inline void
2864 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2865 {
2866 }
2867 #elif defined(PAR)
2868 static inline void
2869 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2870 {
2871   /* write RESUME events to log file and
2872      update blocked and fetch time (depending on type of the orig closure) */
2873   if (RtsFlags.ParFlags.ParStats.Full) {
2874     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2875                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2876                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2877     if (EMPTY_RUN_QUEUE())
2878       emitSchedule = rtsTrue;
2879
2880     switch (get_itbl(node)->type) {
2881         case FETCH_ME_BQ:
2882           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2883           break;
2884         case RBH:
2885         case FETCH_ME:
2886         case BLACKHOLE_BQ:
2887           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2888           break;
2889 #ifdef DIST
2890         case MVAR:
2891           break;
2892 #endif    
2893         default:
2894           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2895         }
2896       }
2897 }
2898 #endif
2899
2900 #if defined(GRAN)
2901 static StgBlockingQueueElement *
2902 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2903 {
2904     StgTSO *tso;
2905     PEs node_loc, tso_loc;
2906
2907     node_loc = where_is(node); // should be lifted out of loop
2908     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2909     tso_loc = where_is((StgClosure *)tso);
2910     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2911       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2912       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2913       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2914       // insertThread(tso, node_loc);
2915       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2916                 ResumeThread,
2917                 tso, node, (rtsSpark*)NULL);
2918       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2919       // len_local++;
2920       // len++;
2921     } else { // TSO is remote (actually should be FMBQ)
2922       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2923                                   RtsFlags.GranFlags.Costs.gunblocktime +
2924                                   RtsFlags.GranFlags.Costs.latency;
2925       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2926                 UnblockThread,
2927                 tso, node, (rtsSpark*)NULL);
2928       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2929       // len++;
2930     }
2931     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2932     IF_GRAN_DEBUG(bq,
2933                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2934                           (node_loc==tso_loc ? "Local" : "Global"), 
2935                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2936     tso->block_info.closure = NULL;
2937     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2938                              tso->id, tso));
2939 }
2940 #elif defined(PAR)
2941 static StgBlockingQueueElement *
2942 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2943 {
2944     StgBlockingQueueElement *next;
2945
2946     switch (get_itbl(bqe)->type) {
2947     case TSO:
2948       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2949       /* if it's a TSO just push it onto the run_queue */
2950       next = bqe->link;
2951       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2952       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2953       THREAD_RUNNABLE();
2954       unblockCount(bqe, node);
2955       /* reset blocking status after dumping event */
2956       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2957       break;
2958
2959     case BLOCKED_FETCH:
2960       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2961       next = bqe->link;
2962       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2963       PendingFetches = (StgBlockedFetch *)bqe;
2964       break;
2965
2966 # if defined(DEBUG)
2967       /* can ignore this case in a non-debugging setup; 
2968          see comments on RBHSave closures above */
2969     case CONSTR:
2970       /* check that the closure is an RBHSave closure */
2971       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2972              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2973              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2974       break;
2975
2976     default:
2977       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2978            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2979            (StgClosure *)bqe);
2980 # endif
2981     }
2982   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2983   return next;
2984 }
2985
2986 #else /* !GRAN && !PAR */
2987 static StgTSO *
2988 unblockOneLocked(StgTSO *tso)
2989 {
2990   StgTSO *next;
2991
2992   ASSERT(get_itbl(tso)->type == TSO);
2993   ASSERT(tso->why_blocked != NotBlocked);
2994   tso->why_blocked = NotBlocked;
2995   next = tso->link;
2996   PUSH_ON_RUN_QUEUE(tso);
2997   THREAD_RUNNABLE();
2998   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2999   return next;
3000 }
3001 #endif
3002
3003 #if defined(GRAN) || defined(PAR)
3004 inline StgBlockingQueueElement *
3005 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3006 {
3007   ACQUIRE_LOCK(&sched_mutex);
3008   bqe = unblockOneLocked(bqe, node);
3009   RELEASE_LOCK(&sched_mutex);
3010   return bqe;
3011 }
3012 #else
3013 inline StgTSO *
3014 unblockOne(StgTSO *tso)
3015 {
3016   ACQUIRE_LOCK(&sched_mutex);
3017   tso = unblockOneLocked(tso);
3018   RELEASE_LOCK(&sched_mutex);
3019   return tso;
3020 }
3021 #endif
3022
3023 #if defined(GRAN)
3024 void 
3025 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3026 {
3027   StgBlockingQueueElement *bqe;
3028   PEs node_loc;
3029   nat len = 0; 
3030
3031   IF_GRAN_DEBUG(bq, 
3032                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
3033                       node, CurrentProc, CurrentTime[CurrentProc], 
3034                       CurrentTSO->id, CurrentTSO));
3035
3036   node_loc = where_is(node);
3037
3038   ASSERT(q == END_BQ_QUEUE ||
3039          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3040          get_itbl(q)->type == CONSTR); // closure (type constructor)
3041   ASSERT(is_unique(node));
3042
3043   /* FAKE FETCH: magically copy the node to the tso's proc;
3044      no Fetch necessary because in reality the node should not have been 
3045      moved to the other PE in the first place
3046   */
3047   if (CurrentProc!=node_loc) {
3048     IF_GRAN_DEBUG(bq, 
3049                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
3050                         node, node_loc, CurrentProc, CurrentTSO->id, 
3051                         // CurrentTSO, where_is(CurrentTSO),
3052                         node->header.gran.procs));
3053     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3054     IF_GRAN_DEBUG(bq, 
3055                   belch("## new bitmask of node %p is %#x",
3056                         node, node->header.gran.procs));
3057     if (RtsFlags.GranFlags.GranSimStats.Global) {
3058       globalGranStats.tot_fake_fetches++;
3059     }
3060   }
3061
3062   bqe = q;
3063   // ToDo: check: ASSERT(CurrentProc==node_loc);
3064   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3065     //next = bqe->link;
3066     /* 
3067        bqe points to the current element in the queue
3068        next points to the next element in the queue
3069     */
3070     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3071     //tso_loc = where_is(tso);
3072     len++;
3073     bqe = unblockOneLocked(bqe, node);
3074   }
3075
3076   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3077      the closure to make room for the anchor of the BQ */
3078   if (bqe!=END_BQ_QUEUE) {
3079     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3080     /*
3081     ASSERT((info_ptr==&RBH_Save_0_info) ||
3082            (info_ptr==&RBH_Save_1_info) ||
3083            (info_ptr==&RBH_Save_2_info));
3084     */
3085     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3086     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3087     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3088
3089     IF_GRAN_DEBUG(bq,
3090                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
3091                         node, info_type(node)));
3092   }
3093
3094   /* statistics gathering */
3095   if (RtsFlags.GranFlags.GranSimStats.Global) {
3096     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3097     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3098     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3099     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3100   }
3101   IF_GRAN_DEBUG(bq,
3102                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
3103                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3104 }
3105 #elif defined(PAR)
3106 void 
3107 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3108 {
3109   StgBlockingQueueElement *bqe;
3110
3111   ACQUIRE_LOCK(&sched_mutex);
3112
3113   IF_PAR_DEBUG(verbose, 
3114                belch("##-_ AwBQ for node %p on [%x]: ",
3115                      node, mytid));
3116 #ifdef DIST  
3117   //RFP
3118   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3119     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
3120     return;
3121   }
3122 #endif
3123   
3124   ASSERT(q == END_BQ_QUEUE ||
3125          get_itbl(q)->type == TSO ||           
3126          get_itbl(q)->type == BLOCKED_FETCH || 
3127          get_itbl(q)->type == CONSTR); 
3128
3129   bqe = q;
3130   while (get_itbl(bqe)->type==TSO || 
3131          get_itbl(bqe)->type==BLOCKED_FETCH) {
3132     bqe = unblockOneLocked(bqe, node);
3133   }
3134   RELEASE_LOCK(&sched_mutex);
3135 }
3136
3137 #else   /* !GRAN && !PAR */
3138
3139 #ifdef RTS_SUPPORTS_THREADS
3140 void
3141 awakenBlockedQueueNoLock(StgTSO *tso)
3142 {
3143   while (tso != END_TSO_QUEUE) {
3144     tso = unblockOneLocked(tso);
3145   }
3146 }
3147 #endif
3148
3149 void
3150 awakenBlockedQueue(StgTSO *tso)
3151 {
3152   ACQUIRE_LOCK(&sched_mutex);
3153   while (tso != END_TSO_QUEUE) {
3154     tso = unblockOneLocked(tso);
3155   }
3156   RELEASE_LOCK(&sched_mutex);
3157 }
3158 #endif
3159
3160 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
3161 //@subsection Exception Handling Routines
3162
3163 /* ---------------------------------------------------------------------------
3164    Interrupt execution
3165    - usually called inside a signal handler so it mustn't do anything fancy.   
3166    ------------------------------------------------------------------------ */
3167
3168 void
3169 interruptStgRts(void)
3170 {
3171     interrupted    = 1;
3172     context_switch = 1;
3173 }
3174
3175 /* -----------------------------------------------------------------------------
3176    Unblock a thread
3177
3178    This is for use when we raise an exception in another thread, which
3179    may be blocked.
3180    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3181    -------------------------------------------------------------------------- */
3182
3183 #if defined(GRAN) || defined(PAR)
3184 /*
3185   NB: only the type of the blocking queue is different in GranSim and GUM
3186       the operations on the queue-elements are the same
3187       long live polymorphism!
3188
3189   Locks: sched_mutex is held upon entry and exit.
3190
3191 */
3192 static void
3193 unblockThread(StgTSO *tso)
3194 {
3195   StgBlockingQueueElement *t, **last;
3196
3197   switch (tso->why_blocked) {
3198
3199   case NotBlocked:
3200     return;  /* not blocked */
3201
3202   case BlockedOnMVar:
3203     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3204     {
3205       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3206       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3207
3208       last = (StgBlockingQueueElement **)&mvar->head;
3209       for (t = (StgBlockingQueueElement *)mvar->head; 
3210            t != END_BQ_QUEUE; 
3211            last = &t->link, last_tso = t, t = t->link) {
3212         if (t == (StgBlockingQueueElement *)tso) {
3213           *last = (StgBlockingQueueElement *)tso->link;
3214           if (mvar->tail == tso) {
3215             mvar->tail = (StgTSO *)last_tso;
3216           }
3217           goto done;
3218         }
3219       }
3220       barf("unblockThread (MVAR): TSO not found");
3221     }
3222
3223   case BlockedOnBlackHole:
3224     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3225     {
3226       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3227
3228       last = &bq->blocking_queue;
3229       for (t = bq->blocking_queue; 
3230            t != END_BQ_QUEUE; 
3231            last = &t->link, t = t->link) {
3232         if (t == (StgBlockingQueueElement *)tso) {
3233           *last = (StgBlockingQueueElement *)tso->link;
3234           goto done;
3235         }
3236       }
3237       barf("unblockThread (BLACKHOLE): TSO not found");
3238     }
3239
3240   case BlockedOnException:
3241     {
3242       StgTSO *target  = tso->block_info.tso;
3243
3244       ASSERT(get_itbl(target)->type == TSO);
3245
3246       if (target->what_next == ThreadRelocated) {
3247           target = target->link;
3248           ASSERT(get_itbl(target)->type == TSO);
3249       }
3250
3251       ASSERT(target->blocked_exceptions != NULL);
3252
3253       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3254       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3255            t != END_BQ_QUEUE; 
3256            last = &t->link, t = t->link) {
3257         ASSERT(get_itbl(t)->type == TSO);
3258         if (t == (StgBlockingQueueElement *)tso) {
3259           *last = (StgBlockingQueueElement *)tso->link;
3260           goto done;
3261         }
3262       }
3263       barf("unblockThread (Exception): TSO not found");
3264     }
3265
3266   case BlockedOnRead:
3267   case BlockedOnWrite:
3268 #if defined(mingw32_TARGET_OS)
3269   case BlockedOnDoProc:
3270 #endif
3271     {
3272       /* take TSO off blocked_queue */
3273       StgBlockingQueueElement *prev = NULL;
3274       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3275            prev = t, t = t->link) {
3276         if (t == (StgBlockingQueueElement *)tso) {
3277           if (prev == NULL) {
3278             blocked_queue_hd = (StgTSO *)t->link;
3279             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3280               blocked_queue_tl = END_TSO_QUEUE;
3281             }
3282           } else {
3283             prev->link = t->link;
3284             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3285               blocked_queue_tl = (StgTSO *)prev;
3286             }
3287           }
3288           goto done;
3289         }
3290       }
3291       barf("unblockThread (I/O): TSO not found");
3292     }
3293
3294   case BlockedOnDelay:
3295     {
3296       /* take TSO off sleeping_queue */
3297       StgBlockingQueueElement *prev = NULL;
3298       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3299            prev = t, t = t->link) {
3300         if (t == (StgBlockingQueueElement *)tso) {
3301           if (prev == NULL) {
3302             sleeping_queue = (StgTSO *)t->link;
3303           } else {
3304             prev->link = t->link;
3305           }
3306           goto done;
3307         }
3308       }
3309       barf("unblockThread (delay): TSO not found");
3310     }
3311
3312   default:
3313     barf("unblockThread");
3314   }
3315
3316  done:
3317   tso->link = END_TSO_QUEUE;
3318   tso->why_blocked = NotBlocked;
3319   tso->block_info.closure = NULL;
3320   PUSH_ON_RUN_QUEUE(tso);
3321 }
3322 #else
3323 static void
3324 unblockThread(StgTSO *tso)
3325 {
3326   StgTSO *t, **last;
3327   
3328   /* To avoid locking unnecessarily. */
3329   if (tso->why_blocked == NotBlocked) {
3330     return;
3331   }
3332
3333   switch (tso->why_blocked) {
3334
3335   case BlockedOnMVar:
3336     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3337     {
3338       StgTSO *last_tso = END_TSO_QUEUE;
3339       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3340
3341       last = &mvar->head;
3342       for (t = mvar->head; t != END_TSO_QUEUE; 
3343            last = &t->link, last_tso = t, t = t->link) {
3344         if (t == tso) {
3345           *last = tso->link;
3346           if (mvar->tail == tso) {
3347             mvar->tail = last_tso;
3348           }
3349           goto done;
3350         }
3351       }
3352       barf("unblockThread (MVAR): TSO not found");
3353     }
3354
3355   case BlockedOnBlackHole:
3356     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3357     {
3358       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3359
3360       last = &bq->blocking_queue;
3361       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3362            last = &t->link, t = t->link) {
3363         if (t == tso) {
3364           *last = tso->link;
3365           goto done;
3366         }
3367       }
3368       barf("unblockThread (BLACKHOLE): TSO not found");
3369     }
3370
3371   case BlockedOnException:
3372     {
3373       StgTSO *target  = tso->block_info.tso;
3374
3375       ASSERT(get_itbl(target)->type == TSO);
3376
3377       while (target->what_next == ThreadRelocated) {
3378           target = target->link;
3379           ASSERT(get_itbl(target)->type == TSO);
3380       }
3381       
3382       ASSERT(target->blocked_exceptions != NULL);
3383
3384       last = &target->blocked_exceptions;
3385       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3386            last = &t->link, t = t->link) {
3387         ASSERT(get_itbl(t)->type == TSO);
3388         if (t == tso) {
3389           *last = tso->link;
3390           goto done;
3391         }
3392       }
3393       barf("unblockThread (Exception): TSO not found");
3394     }
3395
3396   case BlockedOnRead:
3397   case BlockedOnWrite:
3398 #if defined(mingw32_TARGET_OS)
3399   case BlockedOnDoProc:
3400 #endif
3401     {
3402       StgTSO *prev = NULL;
3403       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3404            prev = t, t = t->link) {
3405         if (t == tso) {
3406           if (prev == NULL) {
3407             blocked_queue_hd = t->link;
3408             if (blocked_queue_tl == t) {
3409               blocked_queue_tl = END_TSO_QUEUE;
3410             }
3411           } else {
3412             prev->link = t->link;
3413             if (blocked_queue_tl == t) {
3414               blocked_queue_tl = prev;
3415             }
3416           }
3417           goto done;
3418         }
3419       }
3420       barf("unblockThread (I/O): TSO not found");
3421     }
3422
3423   case BlockedOnDelay:
3424     {
3425       StgTSO *prev = NULL;
3426       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3427            prev = t, t = t->link) {
3428         if (t == tso) {
3429           if (prev == NULL) {
3430             sleeping_queue = t->link;
3431           } else {
3432             prev->link = t->link;
3433           }
3434           goto done;
3435         }
3436       }
3437       barf("unblockThread (delay): TSO not found");
3438     }
3439
3440   default:
3441     barf("unblockThread");
3442   }
3443
3444  done:
3445   tso->link = END_TSO_QUEUE;
3446   tso->why_blocked = NotBlocked;
3447   tso->block_info.closure = NULL;
3448   PUSH_ON_RUN_QUEUE(tso);
3449 }
3450 #endif
3451
3452 /* -----------------------------------------------------------------------------
3453  * raiseAsync()
3454  *
3455  * The following function implements the magic for raising an
3456  * asynchronous exception in an existing thread.
3457  *
3458  * We first remove the thread from any queue on which it might be
3459  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3460  *
3461  * We strip the stack down to the innermost CATCH_FRAME, building
3462  * thunks in the heap for all the active computations, so they can 
3463  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3464  * an application of the handler to the exception, and push it on
3465  * the top of the stack.
3466  * 
3467  * How exactly do we save all the active computations?  We create an
3468  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3469  * AP_STACKs pushes everything from the corresponding update frame
3470  * upwards onto the stack.  (Actually, it pushes everything up to the
3471  * next update frame plus a pointer to the next AP_STACK object.
3472  * Entering the next AP_STACK object pushes more onto the stack until we
3473  * reach the last AP_STACK object - at which point the stack should look
3474  * exactly as it did when we killed the TSO and we can continue
3475  * execution by entering the closure on top of the stack.
3476  *
3477  * We can also kill a thread entirely - this happens if either (a) the 
3478  * exception passed to raiseAsync is NULL, or (b) there's no
3479  * CATCH_FRAME on the stack.  In either case, we strip the entire
3480  * stack and replace the thread with a zombie.
3481  *
3482  * Locks: sched_mutex held upon entry nor exit.
3483  *
3484  * -------------------------------------------------------------------------- */
3485  
3486 void 
3487 deleteThread(StgTSO *tso)
3488 {
3489   raiseAsync(tso,NULL);
3490 }
3491
3492 #ifdef THREADED_RTS
3493 static void 
3494 deleteThreadImmediately(StgTSO *tso)
3495 { // for forkProcess only:
3496   // delete thread without giving it a chance to catch the KillThread exception
3497
3498   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3499       return;
3500   }
3501   unblockThread(tso);
3502   tso->what_next = ThreadKilled;
3503 }
3504 #endif
3505
3506 void
3507 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3508 {
3509   /* When raising async exs from contexts where sched_mutex isn't held;
3510      use raiseAsyncWithLock(). */
3511   ACQUIRE_LOCK(&sched_mutex);
3512   raiseAsync(tso,exception);
3513   RELEASE_LOCK(&sched_mutex);
3514 }
3515
3516 void
3517 raiseAsync(StgTSO *tso, StgClosure *exception)
3518 {
3519     StgRetInfoTable *info;
3520     StgPtr sp;
3521   
3522     // Thread already dead?
3523     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3524         return;
3525     }
3526
3527     IF_DEBUG(scheduler, 
3528              sched_belch("raising exception in thread %ld.", tso->id));
3529     
3530     // Remove it from any blocking queues
3531     unblockThread(tso);
3532
3533     sp = tso->sp;
3534     
3535     // The stack freezing code assumes there's a closure pointer on
3536     // the top of the stack, so we have to arrange that this is the case...
3537     //
3538     if (sp[0] == (W_)&stg_enter_info) {
3539         sp++;
3540     } else {
3541         sp--;
3542         sp[0] = (W_)&stg_dummy_ret_closure;
3543     }
3544
3545     while (1) {
3546         nat i;
3547
3548         // 1. Let the top of the stack be the "current closure"
3549         //
3550         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3551         // CATCH_FRAME.
3552         //
3553         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3554         // current closure applied to the chunk of stack up to (but not
3555         // including) the update frame.  This closure becomes the "current
3556         // closure".  Go back to step 2.
3557         //
3558         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3559         // top of the stack applied to the exception.
3560         // 
3561         // 5. If it's a STOP_FRAME, then kill the thread.
3562         
3563         StgPtr frame;
3564         
3565         frame = sp + 1;
3566         info = get_ret_itbl((StgClosure *)frame);
3567         
3568         while (info->i.type != UPDATE_FRAME
3569                && (info->i.type != CATCH_FRAME || exception == NULL)
3570                && info->i.type != STOP_FRAME) {
3571             frame += stack_frame_sizeW((StgClosure *)frame);
3572             info = get_ret_itbl((StgClosure *)frame);
3573         }
3574         
3575         switch (info->i.type) {
3576             
3577         case CATCH_FRAME:
3578             // If we find a CATCH_FRAME, and we've got an exception to raise,
3579             // then build the THUNK raise(exception), and leave it on
3580             // top of the CATCH_FRAME ready to enter.
3581             //
3582         {
3583 #ifdef PROFILING
3584             StgCatchFrame *cf = (StgCatchFrame *)frame;
3585 #endif
3586             StgClosure *raise;
3587             
3588             // we've got an exception to raise, so let's pass it to the
3589             // handler in this frame.
3590             //
3591             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3592             TICK_ALLOC_SE_THK(1,0);
3593             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3594             raise->payload[0] = exception;
3595             
3596             // throw away the stack from Sp up to the CATCH_FRAME.
3597             //
3598             sp = frame - 1;
3599             
3600             /* Ensure that async excpetions are blocked now, so we don't get
3601              * a surprise exception before we get around to executing the
3602              * handler.
3603              */
3604             if (tso->blocked_exceptions == NULL) {
3605                 tso->blocked_exceptions = END_TSO_QUEUE;
3606             }
3607             
3608             /* Put the newly-built THUNK on top of the stack, ready to execute
3609              * when the thread restarts.
3610              */
3611             sp[0] = (W_)raise;
3612             sp[-1] = (W_)&stg_enter_info;
3613             tso->sp = sp-1;
3614             tso->what_next = ThreadRunGHC;
3615             IF_DEBUG(sanity, checkTSO(tso));
3616             return;
3617         }
3618         
3619         case UPDATE_FRAME:
3620         {
3621             StgAP_STACK * ap;
3622             nat words;
3623             
3624             // First build an AP_STACK consisting of the stack chunk above the
3625             // current update frame, with the top word on the stack as the
3626             // fun field.
3627             //
3628             words = frame - sp - 1;
3629             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3630             
3631             ap->size = words;
3632             ap->fun  = (StgClosure *)sp[0];
3633             sp++;
3634             for(i=0; i < (nat)words; ++i) {
3635                 ap->payload[i] = (StgClosure *)*sp++;
3636             }
3637             
3638             SET_HDR(ap,&stg_AP_STACK_info,
3639                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3640             TICK_ALLOC_UP_THK(words+1,0);
3641             
3642             IF_DEBUG(scheduler,
3643                      fprintf(stderr,  "scheduler: Updating ");
3644                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3645                      fprintf(stderr,  " with ");
3646                      printObj((StgClosure *)ap);
3647                 );
3648
3649             // Replace the updatee with an indirection - happily
3650             // this will also wake up any threads currently
3651             // waiting on the result.
3652             //
3653             // Warning: if we're in a loop, more than one update frame on
3654             // the stack may point to the same object.  Be careful not to
3655             // overwrite an IND_OLDGEN in this case, because we'll screw
3656             // up the mutable lists.  To be on the safe side, don't
3657             // overwrite any kind of indirection at all.  See also
3658             // threadSqueezeStack in GC.c, where we have to make a similar
3659             // check.
3660             //
3661             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3662                 // revert the black hole
3663                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3664             }
3665             sp += sizeofW(StgUpdateFrame) - 1;
3666             sp[0] = (W_)ap; // push onto stack
3667             break;
3668         }
3669         
3670         case STOP_FRAME:
3671             // We've stripped the entire stack, the thread is now dead.
3672             sp += sizeofW(StgStopFrame);
3673             tso->what_next = ThreadKilled;
3674             tso->sp = sp;
3675             return;
3676             
3677         default:
3678             barf("raiseAsync");
3679         }
3680     }
3681     barf("raiseAsync");
3682 }
3683
3684 /* -----------------------------------------------------------------------------
3685    resurrectThreads is called after garbage collection on the list of
3686    threads found to be garbage.  Each of these threads will be woken
3687    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3688    on an MVar, or NonTermination if the thread was blocked on a Black
3689    Hole.
3690
3691    Locks: sched_mutex isn't held upon entry nor exit.
3692    -------------------------------------------------------------------------- */
3693
3694 void
3695 resurrectThreads( StgTSO *threads )
3696 {
3697   StgTSO *tso, *next;
3698
3699   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3700     next = tso->global_link;
3701     tso->global_link = all_threads;
3702     all_threads = tso;
3703     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3704
3705     switch (tso->why_blocked) {
3706     case BlockedOnMVar:
3707     case BlockedOnException:
3708       /* Called by GC - sched_mutex lock is currently held. */
3709       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3710       break;
3711     case BlockedOnBlackHole:
3712       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3713       break;
3714     case NotBlocked:
3715       /* This might happen if the thread was blocked on a black hole
3716        * belonging to a thread that we've just woken up (raiseAsync
3717        * can wake up threads, remember...).
3718        */
3719       continue;
3720     default:
3721       barf("resurrectThreads: thread blocked in a strange way");
3722     }
3723   }
3724 }
3725
3726 /* -----------------------------------------------------------------------------
3727  * Blackhole detection: if we reach a deadlock, test whether any
3728  * threads are blocked on themselves.  Any threads which are found to
3729  * be self-blocked get sent a NonTermination exception.
3730  *
3731  * This is only done in a deadlock situation in order to avoid
3732  * performance overhead in the normal case.
3733  *
3734  * Locks: sched_mutex is held upon entry and exit.
3735  * -------------------------------------------------------------------------- */
3736
3737 static void
3738 detectBlackHoles( void )
3739 {
3740     StgTSO *tso = all_threads;
3741     StgClosure *frame;
3742     StgClosure *blocked_on;
3743     StgRetInfoTable *info;
3744
3745     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3746
3747         while (tso->what_next == ThreadRelocated) {
3748             tso = tso->link;
3749             ASSERT(get_itbl(tso)->type == TSO);
3750         }
3751       
3752         if (tso->why_blocked != BlockedOnBlackHole) {
3753             continue;
3754         }
3755         blocked_on = tso->block_info.closure;
3756
3757         frame = (StgClosure *)tso->sp;
3758
3759         while(1) {
3760             info = get_ret_itbl(frame);
3761             switch (info->i.type) {
3762             case UPDATE_FRAME:
3763                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3764                     /* We are blocking on one of our own computations, so
3765                      * send this thread the NonTermination exception.  
3766                      */
3767                     IF_DEBUG(scheduler, 
3768                              sched_belch("thread %d is blocked on itself", tso->id));
3769                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3770                     goto done;
3771                 }
3772                 
3773                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3774                 continue;
3775
3776             case STOP_FRAME:
3777                 goto done;
3778
3779                 // normal stack frames; do nothing except advance the pointer
3780             default:
3781                 (StgPtr)frame += stack_frame_sizeW(frame);
3782             }
3783         }   
3784         done: ;
3785     }
3786 }
3787
3788 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3789 //@subsection Debugging Routines
3790
3791 /* -----------------------------------------------------------------------------
3792  * Debugging: why is a thread blocked
3793  * [Also provides useful information when debugging threaded programs
3794  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3795    -------------------------------------------------------------------------- */
3796
3797 static
3798 void
3799 printThreadBlockage(StgTSO *tso)
3800 {
3801   switch (tso->why_blocked) {
3802   case BlockedOnRead:
3803     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3804     break;
3805   case BlockedOnWrite:
3806     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3807     break;
3808 #if defined(mingw32_TARGET_OS)
3809     case BlockedOnDoProc:
3810     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3811     break;
3812 #endif
3813   case BlockedOnDelay:
3814     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3815     break;
3816   case BlockedOnMVar:
3817     fprintf(stderr,"is blocked on an MVar");
3818     break;
3819   case BlockedOnException:
3820     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3821             tso->block_info.tso->id);
3822     break;
3823   case BlockedOnBlackHole:
3824     fprintf(stderr,"is blocked on a black hole");
3825     break;
3826   case NotBlocked:
3827     fprintf(stderr,"is not blocked");
3828     break;
3829 #if defined(PAR)
3830   case BlockedOnGA:
3831     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3832             tso->block_info.closure, info_type(tso->block_info.closure));
3833     break;
3834   case BlockedOnGA_NoSend:
3835     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3836             tso->block_info.closure, info_type(tso->block_info.closure));
3837     break;
3838 #endif
3839 #if defined(RTS_SUPPORTS_THREADS)
3840   case BlockedOnCCall:
3841     fprintf(stderr,"is blocked on an external call");
3842     break;
3843   case BlockedOnCCall_NoUnblockExc:
3844     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3845     break;
3846 #endif
3847   default:
3848     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3849          tso->why_blocked, tso->id, tso);
3850   }
3851 }
3852
3853 static
3854 void
3855 printThreadStatus(StgTSO *tso)
3856 {
3857   switch (tso->what_next) {
3858   case ThreadKilled:
3859     fprintf(stderr,"has been killed");
3860     break;
3861   case ThreadComplete:
3862     fprintf(stderr,"has completed");
3863     break;
3864   default:
3865     printThreadBlockage(tso);
3866   }
3867 }
3868
3869 void
3870 printAllThreads(void)
3871 {
3872   StgTSO *t;
3873   void *label;
3874
3875 # if defined(GRAN)
3876   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3877   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3878                        time_string, rtsFalse/*no commas!*/);
3879
3880   fprintf(stderr, "all threads at [%s]:\n", time_string);
3881 # elif defined(PAR)
3882   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3883   ullong_format_string(CURRENT_TIME,
3884                        time_string, rtsFalse/*no commas!*/);
3885
3886   fprintf(stderr,"all threads at [%s]:\n", time_string);
3887 # else
3888   fprintf(stderr,"all threads:\n");
3889 # endif
3890
3891   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3892     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3893     label = lookupThreadLabel((StgWord)t);
3894     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3895     printThreadStatus(t);
3896     fprintf(stderr,"\n");
3897   }
3898 }
3899     
3900 #ifdef DEBUG
3901
3902 /* 
3903    Print a whole blocking queue attached to node (debugging only).
3904 */
3905 //@cindex print_bq
3906 # if defined(PAR)
3907 void 
3908 print_bq (StgClosure *node)
3909 {
3910   StgBlockingQueueElement *bqe;
3911   StgTSO *tso;
3912   rtsBool end;
3913
3914   fprintf(stderr,"## BQ of closure %p (%s): ",
3915           node, info_type(node));
3916
3917   /* should cover all closures that may have a blocking queue */
3918   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3919          get_itbl(node)->type == FETCH_ME_BQ ||
3920          get_itbl(node)->type == RBH ||
3921          get_itbl(node)->type == MVAR);
3922     
3923   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3924
3925   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3926 }
3927
3928 /* 
3929    Print a whole blocking queue starting with the element bqe.
3930 */
3931 void 
3932 print_bqe (StgBlockingQueueElement *bqe)
3933 {
3934   rtsBool end;
3935
3936   /* 
3937      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3938   */
3939   for (end = (bqe==END_BQ_QUEUE);
3940        !end; // iterate until bqe points to a CONSTR
3941        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3942        bqe = end ? END_BQ_QUEUE : bqe->link) {
3943     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3944     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3945     /* types of closures that may appear in a blocking queue */
3946     ASSERT(get_itbl(bqe)->type == TSO ||           
3947            get_itbl(bqe)->type == BLOCKED_FETCH || 
3948            get_itbl(bqe)->type == CONSTR); 
3949     /* only BQs of an RBH end with an RBH_Save closure */
3950     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3951
3952     switch (get_itbl(bqe)->type) {
3953     case TSO:
3954       fprintf(stderr," TSO %u (%x),",
3955               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3956       break;
3957     case BLOCKED_FETCH:
3958       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3959               ((StgBlockedFetch *)bqe)->node, 
3960               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3961               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3962               ((StgBlockedFetch *)bqe)->ga.weight);
3963       break;
3964     case CONSTR:
3965       fprintf(stderr," %s (IP %p),",
3966               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3967                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3968                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3969                "RBH_Save_?"), get_itbl(bqe));
3970       break;
3971     default:
3972       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3973            info_type((StgClosure *)bqe)); // , node, info_type(node));
3974       break;
3975     }
3976   } /* for */
3977   fputc('\n', stderr);
3978 }
3979 # elif defined(GRAN)
3980 void 
3981 print_bq (StgClosure *node)
3982 {
3983   StgBlockingQueueElement *bqe;
3984   PEs node_loc, tso_loc;
3985   rtsBool end;
3986
3987   /* should cover all closures that may have a blocking queue */
3988   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3989          get_itbl(node)->type == FETCH_ME_BQ ||
3990          get_itbl(node)->type == RBH);
3991     
3992   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3993   node_loc = where_is(node);
3994
3995   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3996           node, info_type(node), node_loc);
3997
3998   /* 
3999      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4000   */
4001   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4002        !end; // iterate until bqe points to a CONSTR
4003        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4004     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4005     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4006     /* types of closures that may appear in a blocking queue */
4007     ASSERT(get_itbl(bqe)->type == TSO ||           
4008            get_itbl(bqe)->type == CONSTR); 
4009     /* only BQs of an RBH end with an RBH_Save closure */
4010     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4011
4012     tso_loc = where_is((StgClosure *)bqe);
4013     switch (get_itbl(bqe)->type) {
4014     case TSO:
4015       fprintf(stderr," TSO %d (%p) on [PE %d],",
4016               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4017       break;
4018     case CONSTR:
4019       fprintf(stderr," %s (IP %p),",
4020               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4021                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4022                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4023                "RBH_Save_?"), get_itbl(bqe));
4024       break;
4025     default:
4026       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4027            info_type((StgClosure *)bqe), node, info_type(node));
4028       break;
4029     }
4030   } /* for */
4031   fputc('\n', stderr);
4032 }
4033 #else
4034 /* 
4035    Nice and easy: only TSOs on the blocking queue
4036 */
4037 void 
4038 print_bq (StgClosure *node)
4039 {
4040   StgTSO *tso;
4041
4042   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4043   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
4044        tso != END_TSO_QUEUE; 
4045        tso=tso->link) {
4046     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
4047     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
4048     fprintf(stderr," TSO %d (%p),", tso->id, tso);
4049   }
4050   fputc('\n', stderr);
4051 }
4052 # endif
4053
4054 #if defined(PAR)
4055 static nat
4056 run_queue_len(void)
4057 {
4058   nat i;
4059   StgTSO *tso;
4060
4061   for (i=0, tso=run_queue_hd; 
4062        tso != END_TSO_QUEUE;
4063        i++, tso=tso->link)
4064     /* nothing */
4065
4066   return i;
4067 }
4068 #endif
4069
4070 static void
4071 sched_belch(char *s, ...)
4072 {
4073   va_list ap;
4074   va_start(ap,s);
4075 #ifdef SMP
4076   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
4077 #elif defined(PAR)
4078   fprintf(stderr, "== ");
4079 #else
4080   fprintf(stderr, "scheduler: ");
4081 #endif
4082   vfprintf(stderr, s, ap);
4083   fprintf(stderr, "\n");
4084   va_end(ap);
4085 }
4086
4087 #endif /* DEBUG */
4088
4089
4090 //@node Index,  , Debugging Routines, Main scheduling code
4091 //@subsection Index
4092
4093 //@index
4094 //* StgMainThread::  @cindex\s-+StgMainThread
4095 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
4096 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
4097 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
4098 //* context_switch::  @cindex\s-+context_switch
4099 //* createThread::  @cindex\s-+createThread
4100 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
4101 //* initScheduler::  @cindex\s-+initScheduler
4102 //* interrupted::  @cindex\s-+interrupted
4103 //* next_thread_id::  @cindex\s-+next_thread_id
4104 //* print_bq::  @cindex\s-+print_bq
4105 //* run_queue_hd::  @cindex\s-+run_queue_hd
4106 //* run_queue_tl::  @cindex\s-+run_queue_tl
4107 //* sched_mutex::  @cindex\s-+sched_mutex
4108 //* schedule::  @cindex\s-+schedule
4109 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
4110 //* term_mutex::  @cindex\s-+term_mutex
4111 //@end index