[project @ 2003-09-21 22:20:51 by wolfgang]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.174 2003/09/21 22:20:56 wolfgang Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #define COMPILING_SCHEDULER
88 #include "Schedule.h"
89 #include "StgMiscClosures.h"
90 #include "Storage.h"
91 #include "Interpreter.h"
92 #include "Exception.h"
93 #include "Printer.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Timer.h"
98 #include "Prelude.h"
99 #include "ThreadLabels.h"
100 #ifdef PROFILING
101 #include "Proftimer.h"
102 #include "ProfHeap.h"
103 #endif
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
111 # include "HLC.h"
112 #endif
113 #include "Sparks.h"
114 #include "Capability.h"
115 #include "OSThreads.h"
116 #include  "Task.h"
117
118 #ifdef HAVE_SYS_TYPES_H
119 #include <sys/types.h>
120 #endif
121 #ifdef HAVE_UNISTD_H
122 #include <unistd.h>
123 #endif
124
125 #include <string.h>
126 #include <stdlib.h>
127 #include <stdarg.h>
128
129 #ifdef HAVE_ERRNO_H
130 #include <errno.h>
131 #endif
132
133 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
134 //@subsection Variables and Data structures
135
136 /* Main thread queue.
137  * Locks required: sched_mutex.
138  */
139 StgMainThread *main_threads = NULL;
140
141 /* Thread queues.
142  * Locks required: sched_mutex.
143  */
144 #if defined(GRAN)
145
146 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
147 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
148
149 /* 
150    In GranSim we have a runnable and a blocked queue for each processor.
151    In order to minimise code changes new arrays run_queue_hds/tls
152    are created. run_queue_hd is then a short cut (macro) for
153    run_queue_hds[CurrentProc] (see GranSim.h).
154    -- HWL
155 */
156 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
157 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
158 StgTSO *ccalling_threadss[MAX_PROC];
159 /* We use the same global list of threads (all_threads) in GranSim as in
160    the std RTS (i.e. we are cheating). However, we don't use this list in
161    the GranSim specific code at the moment (so we are only potentially
162    cheating).  */
163
164 #else /* !GRAN */
165
166 StgTSO *run_queue_hd = NULL;
167 StgTSO *run_queue_tl = NULL;
168 StgTSO *blocked_queue_hd = NULL;
169 StgTSO *blocked_queue_tl = NULL;
170 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
171
172 #endif
173
174 /* Linked list of all threads.
175  * Used for detecting garbage collected threads.
176  */
177 StgTSO *all_threads = NULL;
178
179 /* When a thread performs a safe C call (_ccall_GC, using old
180  * terminology), it gets put on the suspended_ccalling_threads
181  * list. Used by the garbage collector.
182  */
183 static StgTSO *suspended_ccalling_threads;
184
185 static StgTSO *threadStackOverflow(StgTSO *tso);
186
187 /* KH: The following two flags are shared memory locations.  There is no need
188        to lock them, since they are only unset at the end of a scheduler
189        operation.
190 */
191
192 /* flag set by signal handler to precipitate a context switch */
193 //@cindex context_switch
194 nat context_switch = 0;
195
196 /* if this flag is set as well, give up execution */
197 //@cindex interrupted
198 rtsBool interrupted = rtsFalse;
199
200 /* Next thread ID to allocate.
201  * Locks required: thread_id_mutex
202  */
203 //@cindex next_thread_id
204 static StgThreadID next_thread_id = 1;
205
206 /*
207  * Pointers to the state of the current thread.
208  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
209  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
210  */
211  
212 /* The smallest stack size that makes any sense is:
213  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
214  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
215  *  + 1                       (the closure to enter)
216  *  + 1                       (stg_ap_v_ret)
217  *  + 1                       (spare slot req'd by stg_ap_v_ret)
218  *
219  * A thread with this stack will bomb immediately with a stack
220  * overflow, which will increase its stack size.  
221  */
222
223 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
224
225
226 #if defined(GRAN)
227 StgTSO *CurrentTSO;
228 #endif
229
230 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
231  *  exists - earlier gccs apparently didn't.
232  *  -= chak
233  */
234 StgTSO dummy_tso;
235
236 static rtsBool ready_to_gc;
237
238 /*
239  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
240  * in an MT setting, needed to signal that a worker thread shouldn't hang around
241  * in the scheduler when it is out of work.
242  */
243 static rtsBool shutting_down_scheduler = rtsFalse;
244
245 void            addToBlockedQueue ( StgTSO *tso );
246
247 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
248        void     interruptStgRts   ( void );
249
250 static void     detectBlackHoles  ( void );
251
252 #ifdef DEBUG
253 static void sched_belch(char *s, ...);
254 #endif
255
256 #if defined(RTS_SUPPORTS_THREADS)
257 /* ToDo: carefully document the invariants that go together
258  *       with these synchronisation objects.
259  */
260 Mutex     sched_mutex       = INIT_MUTEX_VAR;
261 Mutex     term_mutex        = INIT_MUTEX_VAR;
262
263 /*
264  * A heavyweight solution to the problem of protecting
265  * the thread_id from concurrent update.
266  */
267 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
268
269
270 # if defined(SMP)
271 static Condition gc_pending_cond = INIT_COND_VAR;
272 nat await_death;
273 # endif
274
275 #endif /* RTS_SUPPORTS_THREADS */
276
277 #if defined(PAR)
278 StgTSO *LastTSO;
279 rtsTime TimeOfLastYield;
280 rtsBool emitSchedule = rtsTrue;
281 #endif
282
283 #if DEBUG
284 static char *whatNext_strs[] = {
285   "ThreadRunGHC",
286   "ThreadInterpret",
287   "ThreadKilled",
288   "ThreadRelocated",
289   "ThreadComplete"
290 };
291 #endif
292
293 #if defined(PAR)
294 StgTSO * createSparkThread(rtsSpark spark);
295 StgTSO * activateSpark (rtsSpark spark);  
296 #endif
297
298 /*
299  * The thread state for the main thread.
300 // ToDo: check whether not needed any more
301 StgTSO   *MainTSO;
302  */
303
304 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
305 static void taskStart(void);
306 static void
307 taskStart(void)
308 {
309   schedule(NULL,NULL);
310 }
311 #endif
312
313 #if defined(RTS_SUPPORTS_THREADS)
314 void
315 startSchedulerTask(void)
316 {
317     startTask(taskStart);
318 }
319 #endif
320
321 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
322 //@subsection Main scheduling loop
323
324 /* ---------------------------------------------------------------------------
325    Main scheduling loop.
326
327    We use round-robin scheduling, each thread returning to the
328    scheduler loop when one of these conditions is detected:
329
330       * out of heap space
331       * timer expires (thread yields)
332       * thread blocks
333       * thread ends
334       * stack overflow
335
336    Locking notes:  we acquire the scheduler lock once at the beginning
337    of the scheduler loop, and release it when
338     
339       * running a thread, or
340       * waiting for work, or
341       * waiting for a GC to complete.
342
343    GRAN version:
344      In a GranSim setup this loop iterates over the global event queue.
345      This revolves around the global event queue, which determines what 
346      to do next. Therefore, it's more complicated than either the 
347      concurrent or the parallel (GUM) setup.
348
349    GUM version:
350      GUM iterates over incoming messages.
351      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
352      and sends out a fish whenever it has nothing to do; in-between
353      doing the actual reductions (shared code below) it processes the
354      incoming messages and deals with delayed operations 
355      (see PendingFetches).
356      This is not the ugliest code you could imagine, but it's bloody close.
357
358    ------------------------------------------------------------------------ */
359 //@cindex schedule
360 static void
361 schedule( StgMainThread *mainThread, Capability *initialCapability )
362 {
363   StgTSO *t;
364   Capability *cap = initialCapability;
365   StgThreadReturnCode ret;
366 #if defined(GRAN)
367   rtsEvent *event;
368 #elif defined(PAR)
369   StgSparkPool *pool;
370   rtsSpark spark;
371   StgTSO *tso;
372   GlobalTaskId pe;
373   rtsBool receivedFinish = rtsFalse;
374 # if defined(DEBUG)
375   nat tp_size, sp_size; // stats only
376 # endif
377 #endif
378   rtsBool was_interrupted = rtsFalse;
379   StgTSOWhatNext prev_what_next;
380   
381   ACQUIRE_LOCK(&sched_mutex);
382  
383 #if defined(RTS_SUPPORTS_THREADS)
384   /* in the threaded case, the capability is either passed in via the initialCapability
385      parameter, or initialized inside the scheduler loop */
386
387   IF_DEBUG(scheduler,
388     fprintf(stderr,"### NEW SCHEDULER LOOP in os thread %u(%p)\n",
389             osThreadId(), osThreadId()));
390   IF_DEBUG(scheduler,
391     fprintf(stderr,"### main thread: %p\n",mainThread));
392   IF_DEBUG(scheduler,
393     fprintf(stderr,"### initial cap: %p\n",initialCapability));
394 #else
395   /* simply initialise it in the non-threaded case */
396   grabCapability(&cap);
397 #endif
398
399 #if defined(GRAN)
400   /* set up first event to get things going */
401   /* ToDo: assign costs for system setup and init MainTSO ! */
402   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
403             ContinueThread, 
404             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
405
406   IF_DEBUG(gran,
407            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
408            G_TSO(CurrentTSO, 5));
409
410   if (RtsFlags.GranFlags.Light) {
411     /* Save current time; GranSim Light only */
412     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
413   }      
414
415   event = get_next_event();
416
417   while (event!=(rtsEvent*)NULL) {
418     /* Choose the processor with the next event */
419     CurrentProc = event->proc;
420     CurrentTSO = event->tso;
421
422 #elif defined(PAR)
423
424   while (!receivedFinish) {    /* set by processMessages */
425                                /* when receiving PP_FINISH message         */ 
426 #else
427
428   while (1) {
429
430 #endif
431
432     IF_DEBUG(scheduler, printAllThreads());
433
434 #if defined(RTS_SUPPORTS_THREADS)
435     /* Check to see whether there are any worker threads
436        waiting to deposit external call results. If so,
437        yield our capability... if we have a capability, that is. */
438     if(cap)
439       yieldToReturningWorker(&sched_mutex, &cap,
440           mainThread ? &mainThread->bound_thread_cond : NULL);
441
442     /* If we do not currently hold a capability, we wait for one */
443     if(!cap)
444     {
445       waitForWorkCapability(&sched_mutex, &cap,
446           mainThread ? &mainThread->bound_thread_cond : NULL);
447       IF_DEBUG(scheduler, sched_belch("worker thread (osthread %p): got cap",
448                                       osThreadId()));
449     }
450 #endif
451
452     /* If we're interrupted (the user pressed ^C, or some other
453      * termination condition occurred), kill all the currently running
454      * threads.
455      */
456     if (interrupted) {
457       IF_DEBUG(scheduler, sched_belch("interrupted"));
458       interrupted = rtsFalse;
459       was_interrupted = rtsTrue;
460 #if defined(RTS_SUPPORTS_THREADS)
461       // In the threaded RTS, deadlock detection doesn't work,
462       // so just exit right away.
463       prog_belch("interrupted");
464       releaseCapability(cap);
465       startTask(taskStart);     // thread-safe-call to shutdownHaskellAndExit
466       RELEASE_LOCK(&sched_mutex);
467       shutdownHaskellAndExit(EXIT_SUCCESS);
468 #else
469       deleteAllThreads();
470 #endif
471     }
472
473     /* Go through the list of main threads and wake up any
474      * clients whose computations have finished.  ToDo: this
475      * should be done more efficiently without a linear scan
476      * of the main threads list, somehow...
477      */
478 #if defined(RTS_SUPPORTS_THREADS)
479     { 
480         StgMainThread *m, **prev;
481         prev = &main_threads;
482         for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
483           if (m->tso->what_next == ThreadComplete
484               || m->tso->what_next == ThreadKilled)
485           {
486             if(m == mainThread)
487             {
488               if(m->tso->what_next == ThreadComplete)
489               {
490                 if (m->ret)
491                 {
492                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
493                   *(m->ret) = (StgClosure *)m->tso->sp[1]; 
494                 }
495                 m->stat = Success;
496               }
497               else
498               {
499                 if (m->ret)
500                 {
501                   *(m->ret) = NULL;
502                 }
503                 if (was_interrupted)
504                 {
505                   m->stat = Interrupted;
506                 }
507                 else
508                 {
509                   m->stat = Killed;
510                 }
511               }
512               *prev = m->link;
513             
514 #ifdef DEBUG
515               removeThreadLabel((StgWord)m->tso);
516 #endif
517               releaseCapability(cap);
518               RELEASE_LOCK(&sched_mutex);
519               return;
520             }
521             else
522             {
523                 // The current OS thread can not handle the fact that the Haskell
524                 // thread "m" has ended. 
525                 // "m" is bound; the scheduler loop in it's bound OS thread has
526                 // to return, so let's pass our capability directly to that thread.
527               passCapability(&sched_mutex, cap, &m->bound_thread_cond);
528               cap = NULL;
529             }
530           }
531         }
532     }
533     
534     if(!cap)    // If we gave our capability away,
535       continue; // go to the top to get it back
536       
537 #else /* not threaded */
538
539 # if defined(PAR)
540     /* in GUM do this only on the Main PE */
541     if (IAmMainThread)
542 # endif
543     /* If our main thread has finished or been killed, return.
544      */
545     {
546       StgMainThread *m = main_threads;
547       if (m->tso->what_next == ThreadComplete
548           || m->tso->what_next == ThreadKilled) {
549 #ifdef DEBUG
550         removeThreadLabel((StgWord)m->tso);
551 #endif
552         main_threads = main_threads->link;
553         if (m->tso->what_next == ThreadComplete) {
554             // We finished successfully, fill in the return value
555             // NOTE: return val is tso->sp[1] (see StgStartup.hc)
556             if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
557             m->stat = Success;
558             return;
559         } else {
560           if (m->ret) { *(m->ret) = NULL; };
561           if (was_interrupted) {
562             m->stat = Interrupted;
563           } else {
564             m->stat = Killed;
565           }
566           return;
567         }
568       }
569     }
570 #endif
571
572     /* Top up the run queue from our spark pool.  We try to make the
573      * number of threads in the run queue equal to the number of
574      * free capabilities.
575      *
576      * Disable spark support in SMP for now, non-essential & requires
577      * a little bit of work to make it compile cleanly. -- sof 1/02.
578      */
579 #if 0 /* defined(SMP) */
580     {
581       nat n = getFreeCapabilities();
582       StgTSO *tso = run_queue_hd;
583
584       /* Count the run queue */
585       while (n > 0 && tso != END_TSO_QUEUE) {
586         tso = tso->link;
587         n--;
588       }
589
590       for (; n > 0; n--) {
591         StgClosure *spark;
592         spark = findSpark(rtsFalse);
593         if (spark == NULL) {
594           break; /* no more sparks in the pool */
595         } else {
596           /* I'd prefer this to be done in activateSpark -- HWL */
597           /* tricky - it needs to hold the scheduler lock and
598            * not try to re-acquire it -- SDM */
599           createSparkThread(spark);       
600           IF_DEBUG(scheduler,
601                    sched_belch("==^^ turning spark of closure %p into a thread",
602                                (StgClosure *)spark));
603         }
604       }
605       /* We need to wake up the other tasks if we just created some
606        * work for them.
607        */
608       if (getFreeCapabilities() - n > 1) {
609           signalCondition( &thread_ready_cond );
610       }
611     }
612 #endif // SMP
613
614     /* check for signals each time around the scheduler */
615 #if defined(RTS_USER_SIGNALS)
616     if (signals_pending()) {
617       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
618       startSignalHandlers();
619       ACQUIRE_LOCK(&sched_mutex);
620     }
621 #endif
622
623     /* Check whether any waiting threads need to be woken up.  If the
624      * run queue is empty, and there are no other tasks running, we
625      * can wait indefinitely for something to happen.
626      */
627     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
628 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
629                 || EMPTY_RUN_QUEUE()
630 #endif
631         )
632     {
633       awaitEvent( EMPTY_RUN_QUEUE()
634 #if defined(SMP)
635         && allFreeCapabilities()
636 #endif
637         );
638     }
639     /* we can be interrupted while waiting for I/O... */
640     if (interrupted) continue;
641
642     /* 
643      * Detect deadlock: when we have no threads to run, there are no
644      * threads waiting on I/O or sleeping, and all the other tasks are
645      * waiting for work, we must have a deadlock of some description.
646      *
647      * We first try to find threads blocked on themselves (ie. black
648      * holes), and generate NonTermination exceptions where necessary.
649      *
650      * If no threads are black holed, we have a deadlock situation, so
651      * inform all the main threads.
652      */
653 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
654     if (   EMPTY_THREAD_QUEUES()
655 #if defined(RTS_SUPPORTS_THREADS)
656         && EMPTY_QUEUE(suspended_ccalling_threads)
657 #endif
658 #ifdef SMP
659         && allFreeCapabilities()
660 #endif
661         )
662     {
663         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
664 #if defined(THREADED_RTS)
665         /* and SMP mode ..? */
666         releaseCapability(cap);
667 #endif
668         // Garbage collection can release some new threads due to
669         // either (a) finalizers or (b) threads resurrected because
670         // they are about to be send BlockedOnDeadMVar.  Any threads
671         // thus released will be immediately runnable.
672         GarbageCollect(GetRoots,rtsTrue);
673
674         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
675
676         IF_DEBUG(scheduler, 
677                  sched_belch("still deadlocked, checking for black holes..."));
678         detectBlackHoles();
679
680         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
681
682 #if defined(RTS_USER_SIGNALS)
683         /* If we have user-installed signal handlers, then wait
684          * for signals to arrive rather then bombing out with a
685          * deadlock.
686          */
687 #if defined(RTS_SUPPORTS_THREADS)
688         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
689                       a signal with no runnable threads (or I/O
690                       suspended ones) leads nowhere quick.
691                       For now, simply shut down when we reach this
692                       condition.
693                       
694                       ToDo: define precisely under what conditions
695                       the Scheduler should shut down in an MT setting.
696                    */
697 #else
698         if ( anyUserHandlers() ) {
699 #endif
700             IF_DEBUG(scheduler, 
701                      sched_belch("still deadlocked, waiting for signals..."));
702
703             awaitUserSignals();
704
705             // we might be interrupted...
706             if (interrupted) { continue; }
707
708             if (signals_pending()) {
709                 RELEASE_LOCK(&sched_mutex);
710                 startSignalHandlers();
711                 ACQUIRE_LOCK(&sched_mutex);
712             }
713             ASSERT(!EMPTY_RUN_QUEUE());
714             goto not_deadlocked;
715         }
716 #endif
717
718         /* Probably a real deadlock.  Send the current main thread the
719          * Deadlock exception (or in the SMP build, send *all* main
720          * threads the deadlock exception, since none of them can make
721          * progress).
722          */
723         {
724             StgMainThread *m;
725 #if defined(RTS_SUPPORTS_THREADS)
726             for (m = main_threads; m != NULL; m = m->link) {
727                 switch (m->tso->why_blocked) {
728                 case BlockedOnBlackHole:
729                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
730                     break;
731                 case BlockedOnException:
732                 case BlockedOnMVar:
733                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
734                     break;
735                 default:
736                     barf("deadlock: main thread blocked in a strange way");
737                 }
738             }
739 #else
740             m = main_threads;
741             switch (m->tso->why_blocked) {
742             case BlockedOnBlackHole:
743                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
744                 break;
745             case BlockedOnException:
746             case BlockedOnMVar:
747                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
748                 break;
749             default:
750                 barf("deadlock: main thread blocked in a strange way");
751             }
752 #endif
753         }
754
755 #if defined(RTS_SUPPORTS_THREADS)
756         /* ToDo: revisit conditions (and mechanism) for shutting
757            down a multi-threaded world  */
758         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
759         RELEASE_LOCK(&sched_mutex);
760         shutdownHaskell();
761         return;
762 #endif
763     }
764   not_deadlocked:
765
766 #elif defined(RTS_SUPPORTS_THREADS)
767     /* ToDo: add deadlock detection in threaded RTS */
768 #elif defined(PAR)
769     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
770 #endif
771
772 #if defined(SMP)
773     /* If there's a GC pending, don't do anything until it has
774      * completed.
775      */
776     if (ready_to_gc) {
777       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
778       waitCondition( &gc_pending_cond, &sched_mutex );
779     }
780 #endif    
781
782 #if defined(RTS_SUPPORTS_THREADS)
783 #if defined(SMP)
784     /* block until we've got a thread on the run queue and a free
785      * capability.
786      *
787      */
788     if ( EMPTY_RUN_QUEUE() ) {
789       /* Give up our capability */
790       releaseCapability(cap);
791
792       /* If we're in the process of shutting down (& running the
793        * a batch of finalisers), don't wait around.
794        */
795       if ( shutting_down_scheduler ) {
796         RELEASE_LOCK(&sched_mutex);
797         return;
798       }
799       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
800       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
801       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
802     }
803 #else
804     if ( EMPTY_RUN_QUEUE() ) {
805       continue; // nothing to do
806     }
807 #endif
808 #endif
809
810 #if defined(GRAN)
811     if (RtsFlags.GranFlags.Light)
812       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
813
814     /* adjust time based on time-stamp */
815     if (event->time > CurrentTime[CurrentProc] &&
816         event->evttype != ContinueThread)
817       CurrentTime[CurrentProc] = event->time;
818     
819     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
820     if (!RtsFlags.GranFlags.Light)
821       handleIdlePEs();
822
823     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
824
825     /* main event dispatcher in GranSim */
826     switch (event->evttype) {
827       /* Should just be continuing execution */
828     case ContinueThread:
829       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
830       /* ToDo: check assertion
831       ASSERT(run_queue_hd != (StgTSO*)NULL &&
832              run_queue_hd != END_TSO_QUEUE);
833       */
834       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
835       if (!RtsFlags.GranFlags.DoAsyncFetch &&
836           procStatus[CurrentProc]==Fetching) {
837         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
838               CurrentTSO->id, CurrentTSO, CurrentProc);
839         goto next_thread;
840       } 
841       /* Ignore ContinueThreads for completed threads */
842       if (CurrentTSO->what_next == ThreadComplete) {
843         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
844               CurrentTSO->id, CurrentTSO, CurrentProc);
845         goto next_thread;
846       } 
847       /* Ignore ContinueThreads for threads that are being migrated */
848       if (PROCS(CurrentTSO)==Nowhere) { 
849         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
850               CurrentTSO->id, CurrentTSO, CurrentProc);
851         goto next_thread;
852       }
853       /* The thread should be at the beginning of the run queue */
854       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
855         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
856               CurrentTSO->id, CurrentTSO, CurrentProc);
857         break; // run the thread anyway
858       }
859       /*
860       new_event(proc, proc, CurrentTime[proc],
861                 FindWork,
862                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
863       goto next_thread; 
864       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
865       break; // now actually run the thread; DaH Qu'vam yImuHbej 
866
867     case FetchNode:
868       do_the_fetchnode(event);
869       goto next_thread;             /* handle next event in event queue  */
870       
871     case GlobalBlock:
872       do_the_globalblock(event);
873       goto next_thread;             /* handle next event in event queue  */
874       
875     case FetchReply:
876       do_the_fetchreply(event);
877       goto next_thread;             /* handle next event in event queue  */
878       
879     case UnblockThread:   /* Move from the blocked queue to the tail of */
880       do_the_unblock(event);
881       goto next_thread;             /* handle next event in event queue  */
882       
883     case ResumeThread:  /* Move from the blocked queue to the tail of */
884       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
885       event->tso->gran.blocktime += 
886         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
887       do_the_startthread(event);
888       goto next_thread;             /* handle next event in event queue  */
889       
890     case StartThread:
891       do_the_startthread(event);
892       goto next_thread;             /* handle next event in event queue  */
893       
894     case MoveThread:
895       do_the_movethread(event);
896       goto next_thread;             /* handle next event in event queue  */
897       
898     case MoveSpark:
899       do_the_movespark(event);
900       goto next_thread;             /* handle next event in event queue  */
901       
902     case FindWork:
903       do_the_findwork(event);
904       goto next_thread;             /* handle next event in event queue  */
905       
906     default:
907       barf("Illegal event type %u\n", event->evttype);
908     }  /* switch */
909     
910     /* This point was scheduler_loop in the old RTS */
911
912     IF_DEBUG(gran, belch("GRAN: after main switch"));
913
914     TimeOfLastEvent = CurrentTime[CurrentProc];
915     TimeOfNextEvent = get_time_of_next_event();
916     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
917     // CurrentTSO = ThreadQueueHd;
918
919     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
920                          TimeOfNextEvent));
921
922     if (RtsFlags.GranFlags.Light) 
923       GranSimLight_leave_system(event, &ActiveTSO); 
924
925     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
926
927     IF_DEBUG(gran, 
928              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
929
930     /* in a GranSim setup the TSO stays on the run queue */
931     t = CurrentTSO;
932     /* Take a thread from the run queue. */
933     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
934
935     IF_DEBUG(gran, 
936              fprintf(stderr, "GRAN: About to run current thread, which is\n");
937              G_TSO(t,5));
938
939     context_switch = 0; // turned on via GranYield, checking events and time slice
940
941     IF_DEBUG(gran, 
942              DumpGranEvent(GR_SCHEDULE, t));
943
944     procStatus[CurrentProc] = Busy;
945
946 #elif defined(PAR)
947     if (PendingFetches != END_BF_QUEUE) {
948         processFetches();
949     }
950
951     /* ToDo: phps merge with spark activation above */
952     /* check whether we have local work and send requests if we have none */
953     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
954       /* :-[  no local threads => look out for local sparks */
955       /* the spark pool for the current PE */
956       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
957       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
958           pool->hd < pool->tl) {
959         /* 
960          * ToDo: add GC code check that we really have enough heap afterwards!!
961          * Old comment:
962          * If we're here (no runnable threads) and we have pending
963          * sparks, we must have a space problem.  Get enough space
964          * to turn one of those pending sparks into a
965          * thread... 
966          */
967
968         spark = findSpark(rtsFalse);                /* get a spark */
969         if (spark != (rtsSpark) NULL) {
970           tso = activateSpark(spark);       /* turn the spark into a thread */
971           IF_PAR_DEBUG(schedule,
972                        belch("==== schedule: Created TSO %d (%p); %d threads active",
973                              tso->id, tso, advisory_thread_count));
974
975           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
976             belch("==^^ failed to activate spark");
977             goto next_thread;
978           }               /* otherwise fall through & pick-up new tso */
979         } else {
980           IF_PAR_DEBUG(verbose,
981                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
982                              spark_queue_len(pool)));
983           goto next_thread;
984         }
985       }
986
987       /* If we still have no work we need to send a FISH to get a spark
988          from another PE 
989       */
990       if (EMPTY_RUN_QUEUE()) {
991       /* =8-[  no local sparks => look for work on other PEs */
992         /*
993          * We really have absolutely no work.  Send out a fish
994          * (there may be some out there already), and wait for
995          * something to arrive.  We clearly can't run any threads
996          * until a SCHEDULE or RESUME arrives, and so that's what
997          * we're hoping to see.  (Of course, we still have to
998          * respond to other types of messages.)
999          */
1000         TIME now = msTime() /*CURRENT_TIME*/;
1001         IF_PAR_DEBUG(verbose, 
1002                      belch("--  now=%ld", now));
1003         IF_PAR_DEBUG(verbose,
1004                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1005                          (last_fish_arrived_at!=0 &&
1006                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
1007                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
1008                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
1009                              last_fish_arrived_at,
1010                              RtsFlags.ParFlags.fishDelay, now);
1011                      });
1012         
1013         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1014             (last_fish_arrived_at==0 ||
1015              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
1016           /* outstandingFishes is set in sendFish, processFish;
1017              avoid flooding system with fishes via delay */
1018           pe = choosePE();
1019           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1020                    NEW_FISH_HUNGER);
1021
1022           // Global statistics: count no. of fishes
1023           if (RtsFlags.ParFlags.ParStats.Global &&
1024               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1025             globalParStats.tot_fish_mess++;
1026           }
1027         }
1028       
1029         receivedFinish = processMessages();
1030         goto next_thread;
1031       }
1032     } else if (PacketsWaiting()) {  /* Look for incoming messages */
1033       receivedFinish = processMessages();
1034     }
1035
1036     /* Now we are sure that we have some work available */
1037     ASSERT(run_queue_hd != END_TSO_QUEUE);
1038
1039     /* Take a thread from the run queue, if we have work */
1040     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
1041     IF_DEBUG(sanity,checkTSO(t));
1042
1043     /* ToDo: write something to the log-file
1044     if (RTSflags.ParFlags.granSimStats && !sameThread)
1045         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1046
1047     CurrentTSO = t;
1048     */
1049     /* the spark pool for the current PE */
1050     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1051
1052     IF_DEBUG(scheduler, 
1053              belch("--=^ %d threads, %d sparks on [%#x]", 
1054                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1055
1056 # if 1
1057     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1058         t && LastTSO && t->id != LastTSO->id && 
1059         LastTSO->why_blocked == NotBlocked && 
1060         LastTSO->what_next != ThreadComplete) {
1061       // if previously scheduled TSO not blocked we have to record the context switch
1062       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1063                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1064     }
1065
1066     if (RtsFlags.ParFlags.ParStats.Full && 
1067         (emitSchedule /* forced emit */ ||
1068         (t && LastTSO && t->id != LastTSO->id))) {
1069       /* 
1070          we are running a different TSO, so write a schedule event to log file
1071          NB: If we use fair scheduling we also have to write  a deschedule 
1072              event for LastTSO; with unfair scheduling we know that the
1073              previous tso has blocked whenever we switch to another tso, so
1074              we don't need it in GUM for now
1075       */
1076       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1077                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1078       emitSchedule = rtsFalse;
1079     }
1080      
1081 # endif
1082 #else /* !GRAN && !PAR */
1083   
1084     /* grab a thread from the run queue */
1085     ASSERT(run_queue_hd != END_TSO_QUEUE);
1086     t = POP_RUN_QUEUE();
1087     // Sanity check the thread we're about to run.  This can be
1088     // expensive if there is lots of thread switching going on...
1089     IF_DEBUG(sanity,checkTSO(t));
1090 #endif
1091
1092 #ifdef THREADED_RTS
1093     {
1094       StgMainThread *m;
1095       for(m = main_threads; m; m = m->link)
1096       {
1097         if(m->tso == t)
1098           break;
1099       }
1100       
1101       if(m)
1102       {
1103         if(m == mainThread)
1104         {
1105           IF_DEBUG(scheduler,
1106             fprintf(stderr,"### Running TSO %p in bound OS thread %u\n",
1107                     t, osThreadId()));
1108           // yes, the Haskell thread is bound to the current native thread
1109         }
1110         else
1111         {
1112           IF_DEBUG(scheduler,
1113             fprintf(stderr,"### TSO %p bound to other OS thread than %u\n",
1114                     t, osThreadId()));
1115           // no, bound to a different Haskell thread: pass to that thread
1116           PUSH_ON_RUN_QUEUE(t);
1117           passCapability(&sched_mutex,cap,&m->bound_thread_cond);
1118           cap = NULL;
1119           continue;
1120         }
1121       }
1122       else
1123       {
1124         // The thread we want to run is not bound.
1125         if(mainThread == NULL)
1126         {
1127           IF_DEBUG(scheduler,
1128             fprintf(stderr,"### Running TSO %p in worker OS thread %u\n",
1129                     t, osThreadId()));
1130           // if we are a worker thread,
1131           // we may run it here
1132         }
1133         else
1134         {
1135           IF_DEBUG(scheduler,
1136             fprintf(stderr,"### TSO %p is not appropriate for main thread %p in OS thread %u\n",
1137                     t, mainThread, osThreadId()));
1138           // no, the current native thread is bound to a different
1139           // Haskell thread, so pass it to any worker thread
1140           PUSH_ON_RUN_QUEUE(t);
1141           releaseCapability(cap);
1142           cap = NULL;
1143           continue; 
1144         }
1145       }
1146     }
1147 #endif
1148
1149     cap->r.rCurrentTSO = t;
1150     
1151     /* context switches are now initiated by the timer signal, unless
1152      * the user specified "context switch as often as possible", with
1153      * +RTS -C0
1154      */
1155     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1156          && (run_queue_hd != END_TSO_QUEUE
1157              || blocked_queue_hd != END_TSO_QUEUE
1158              || sleeping_queue != END_TSO_QUEUE)))
1159         context_switch = 1;
1160     else
1161         context_switch = 0;
1162
1163 run_thread:
1164
1165     RELEASE_LOCK(&sched_mutex);
1166
1167     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
1168                               t->id, whatNext_strs[t->what_next]));
1169
1170 #ifdef PROFILING
1171     startHeapProfTimer();
1172 #endif
1173
1174     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1175     /* Run the current thread 
1176      */
1177     prev_what_next = t->what_next;
1178     switch (prev_what_next) {
1179     case ThreadKilled:
1180     case ThreadComplete:
1181         /* Thread already finished, return to scheduler. */
1182         ret = ThreadFinished;
1183         break;
1184     case ThreadRunGHC:
1185         errno = t->saved_errno;
1186         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1187         t->saved_errno = errno;
1188         break;
1189     case ThreadInterpret:
1190         ret = interpretBCO(cap);
1191         break;
1192     default:
1193       barf("schedule: invalid what_next field");
1194     }
1195     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1196     
1197     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1198 #ifdef PROFILING
1199     stopHeapProfTimer();
1200     CCCS = CCS_SYSTEM;
1201 #endif
1202     
1203     ACQUIRE_LOCK(&sched_mutex);
1204     
1205 #ifdef RTS_SUPPORTS_THREADS
1206     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %p): ", osThreadId()););
1207 #elif !defined(GRAN) && !defined(PAR)
1208     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1209 #endif
1210     t = cap->r.rCurrentTSO;
1211     
1212 #if defined(PAR)
1213     /* HACK 675: if the last thread didn't yield, make sure to print a 
1214        SCHEDULE event to the log file when StgRunning the next thread, even
1215        if it is the same one as before */
1216     LastTSO = t; 
1217     TimeOfLastYield = CURRENT_TIME;
1218 #endif
1219
1220     switch (ret) {
1221     case HeapOverflow:
1222 #if defined(GRAN)
1223       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1224       globalGranStats.tot_heapover++;
1225 #elif defined(PAR)
1226       globalParStats.tot_heapover++;
1227 #endif
1228
1229       // did the task ask for a large block?
1230       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1231           // if so, get one and push it on the front of the nursery.
1232           bdescr *bd;
1233           nat blocks;
1234           
1235           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1236
1237           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
1238                                    t->id, whatNext_strs[t->what_next], blocks));
1239
1240           // don't do this if it would push us over the
1241           // alloc_blocks_lim limit; we'll GC first.
1242           if (alloc_blocks + blocks < alloc_blocks_lim) {
1243
1244               alloc_blocks += blocks;
1245               bd = allocGroup( blocks );
1246
1247               // link the new group into the list
1248               bd->link = cap->r.rCurrentNursery;
1249               bd->u.back = cap->r.rCurrentNursery->u.back;
1250               if (cap->r.rCurrentNursery->u.back != NULL) {
1251                   cap->r.rCurrentNursery->u.back->link = bd;
1252               } else {
1253                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1254                          g0s0->blocks == cap->r.rNursery);
1255                   cap->r.rNursery = g0s0->blocks = bd;
1256               }           
1257               cap->r.rCurrentNursery->u.back = bd;
1258
1259               // initialise it as a nursery block.  We initialise the
1260               // step, gen_no, and flags field of *every* sub-block in
1261               // this large block, because this is easier than making
1262               // sure that we always find the block head of a large
1263               // block whenever we call Bdescr() (eg. evacuate() and
1264               // isAlive() in the GC would both have to do this, at
1265               // least).
1266               { 
1267                   bdescr *x;
1268                   for (x = bd; x < bd + blocks; x++) {
1269                       x->step = g0s0;
1270                       x->gen_no = 0;
1271                       x->flags = 0;
1272                   }
1273               }
1274
1275               // don't forget to update the block count in g0s0.
1276               g0s0->n_blocks += blocks;
1277               // This assert can be a killer if the app is doing lots
1278               // of large block allocations.
1279               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1280
1281               // now update the nursery to point to the new block
1282               cap->r.rCurrentNursery = bd;
1283
1284               // we might be unlucky and have another thread get on the
1285               // run queue before us and steal the large block, but in that
1286               // case the thread will just end up requesting another large
1287               // block.
1288               PUSH_ON_RUN_QUEUE(t);
1289               break;
1290           }
1291       }
1292
1293       /* make all the running tasks block on a condition variable,
1294        * maybe set context_switch and wait till they all pile in,
1295        * then have them wait on a GC condition variable.
1296        */
1297       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1298                                t->id, whatNext_strs[t->what_next]));
1299       threadPaused(t);
1300 #if defined(GRAN)
1301       ASSERT(!is_on_queue(t,CurrentProc));
1302 #elif defined(PAR)
1303       /* Currently we emit a DESCHEDULE event before GC in GUM.
1304          ToDo: either add separate event to distinguish SYSTEM time from rest
1305                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1306       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1307         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1308                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1309         emitSchedule = rtsTrue;
1310       }
1311 #endif
1312       
1313       ready_to_gc = rtsTrue;
1314       context_switch = 1;               /* stop other threads ASAP */
1315       PUSH_ON_RUN_QUEUE(t);
1316       /* actual GC is done at the end of the while loop */
1317       break;
1318       
1319     case StackOverflow:
1320 #if defined(GRAN)
1321       IF_DEBUG(gran, 
1322                DumpGranEvent(GR_DESCHEDULE, t));
1323       globalGranStats.tot_stackover++;
1324 #elif defined(PAR)
1325       // IF_DEBUG(par, 
1326       // DumpGranEvent(GR_DESCHEDULE, t);
1327       globalParStats.tot_stackover++;
1328 #endif
1329       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1330                                t->id, whatNext_strs[t->what_next]));
1331       /* just adjust the stack for this thread, then pop it back
1332        * on the run queue.
1333        */
1334       threadPaused(t);
1335       { 
1336         StgMainThread *m;
1337         /* enlarge the stack */
1338         StgTSO *new_t = threadStackOverflow(t);
1339         
1340         /* This TSO has moved, so update any pointers to it from the
1341          * main thread stack.  It better not be on any other queues...
1342          * (it shouldn't be).
1343          */
1344         for (m = main_threads; m != NULL; m = m->link) {
1345           if (m->tso == t) {
1346             m->tso = new_t;
1347           }
1348         }
1349         threadPaused(new_t);
1350         PUSH_ON_RUN_QUEUE(new_t);
1351       }
1352       break;
1353
1354     case ThreadYielding:
1355 #if defined(GRAN)
1356       IF_DEBUG(gran, 
1357                DumpGranEvent(GR_DESCHEDULE, t));
1358       globalGranStats.tot_yields++;
1359 #elif defined(PAR)
1360       // IF_DEBUG(par, 
1361       // DumpGranEvent(GR_DESCHEDULE, t);
1362       globalParStats.tot_yields++;
1363 #endif
1364       /* put the thread back on the run queue.  Then, if we're ready to
1365        * GC, check whether this is the last task to stop.  If so, wake
1366        * up the GC thread.  getThread will block during a GC until the
1367        * GC is finished.
1368        */
1369       IF_DEBUG(scheduler,
1370                if (t->what_next != prev_what_next) {
1371                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1372                          t->id, whatNext_strs[t->what_next]);
1373                } else {
1374                    belch("--<< thread %ld (%s) stopped, yielding", 
1375                          t->id, whatNext_strs[t->what_next]);
1376                }
1377                );
1378
1379       IF_DEBUG(sanity,
1380                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1381                checkTSO(t));
1382       ASSERT(t->link == END_TSO_QUEUE);
1383
1384       // Shortcut if we're just switching evaluators: don't bother
1385       // doing stack squeezing (which can be expensive), just run the
1386       // thread.
1387       if (t->what_next != prev_what_next) {
1388           goto run_thread;
1389       }
1390
1391       threadPaused(t);
1392
1393 #if defined(GRAN)
1394       ASSERT(!is_on_queue(t,CurrentProc));
1395
1396       IF_DEBUG(sanity,
1397                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1398                checkThreadQsSanity(rtsTrue));
1399 #endif
1400
1401 #if defined(PAR)
1402       if (RtsFlags.ParFlags.doFairScheduling) { 
1403         /* this does round-robin scheduling; good for concurrency */
1404         APPEND_TO_RUN_QUEUE(t);
1405       } else {
1406         /* this does unfair scheduling; good for parallelism */
1407         PUSH_ON_RUN_QUEUE(t);
1408       }
1409 #else
1410       // this does round-robin scheduling; good for concurrency
1411       APPEND_TO_RUN_QUEUE(t);
1412 #endif
1413
1414 #if defined(GRAN)
1415       /* add a ContinueThread event to actually process the thread */
1416       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1417                 ContinueThread,
1418                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1419       IF_GRAN_DEBUG(bq, 
1420                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1421                G_EVENTQ(0);
1422                G_CURR_THREADQ(0));
1423 #endif /* GRAN */
1424       break;
1425
1426     case ThreadBlocked:
1427 #if defined(GRAN)
1428       IF_DEBUG(scheduler,
1429                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1430                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1431                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1432
1433       // ??? needed; should emit block before
1434       IF_DEBUG(gran, 
1435                DumpGranEvent(GR_DESCHEDULE, t)); 
1436       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1437       /*
1438         ngoq Dogh!
1439       ASSERT(procStatus[CurrentProc]==Busy || 
1440               ((procStatus[CurrentProc]==Fetching) && 
1441               (t->block_info.closure!=(StgClosure*)NULL)));
1442       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1443           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1444             procStatus[CurrentProc]==Fetching)) 
1445         procStatus[CurrentProc] = Idle;
1446       */
1447 #elif defined(PAR)
1448       IF_DEBUG(scheduler,
1449                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1450                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1451       IF_PAR_DEBUG(bq,
1452
1453                    if (t->block_info.closure!=(StgClosure*)NULL) 
1454                      print_bq(t->block_info.closure));
1455
1456       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1457       blockThread(t);
1458
1459       /* whatever we schedule next, we must log that schedule */
1460       emitSchedule = rtsTrue;
1461
1462 #else /* !GRAN */
1463       /* don't need to do anything.  Either the thread is blocked on
1464        * I/O, in which case we'll have called addToBlockedQueue
1465        * previously, or it's blocked on an MVar or Blackhole, in which
1466        * case it'll be on the relevant queue already.
1467        */
1468       IF_DEBUG(scheduler,
1469                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1470                        t->id, whatNext_strs[t->what_next]);
1471                printThreadBlockage(t);
1472                fprintf(stderr, "\n"));
1473
1474       /* Only for dumping event to log file 
1475          ToDo: do I need this in GranSim, too?
1476       blockThread(t);
1477       */
1478 #endif
1479       threadPaused(t);
1480       break;
1481       
1482     case ThreadFinished:
1483       /* Need to check whether this was a main thread, and if so, signal
1484        * the task that started it with the return value.  If we have no
1485        * more main threads, we probably need to stop all the tasks until
1486        * we get a new one.
1487        */
1488       /* We also end up here if the thread kills itself with an
1489        * uncaught exception, see Exception.hc.
1490        */
1491       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1492                                t->id, whatNext_strs[t->what_next]));
1493 #if defined(GRAN)
1494       endThread(t, CurrentProc); // clean-up the thread
1495 #elif defined(PAR)
1496       /* For now all are advisory -- HWL */
1497       //if(t->priority==AdvisoryPriority) ??
1498       advisory_thread_count--;
1499       
1500 # ifdef DIST
1501       if(t->dist.priority==RevalPriority)
1502         FinishReval(t);
1503 # endif
1504       
1505       if (RtsFlags.ParFlags.ParStats.Full &&
1506           !RtsFlags.ParFlags.ParStats.Suppressed) 
1507         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1508 #endif
1509       break;
1510       
1511     default:
1512       barf("schedule: invalid thread return code %d", (int)ret);
1513     }
1514
1515 #ifdef PROFILING
1516     // When we have +RTS -i0 and we're heap profiling, do a census at
1517     // every GC.  This lets us get repeatable runs for debugging.
1518     if (performHeapProfile ||
1519         (RtsFlags.ProfFlags.profileInterval==0 &&
1520          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1521         GarbageCollect(GetRoots, rtsTrue);
1522         heapCensus();
1523         performHeapProfile = rtsFalse;
1524         ready_to_gc = rtsFalse; // we already GC'd
1525     }
1526 #endif
1527
1528     if (ready_to_gc 
1529 #ifdef SMP
1530         && allFreeCapabilities() 
1531 #endif
1532         ) {
1533       /* everybody back, start the GC.
1534        * Could do it in this thread, or signal a condition var
1535        * to do it in another thread.  Either way, we need to
1536        * broadcast on gc_pending_cond afterward.
1537        */
1538 #if defined(RTS_SUPPORTS_THREADS)
1539       IF_DEBUG(scheduler,sched_belch("doing GC"));
1540 #endif
1541       GarbageCollect(GetRoots,rtsFalse);
1542       ready_to_gc = rtsFalse;
1543 #ifdef SMP
1544       broadcastCondition(&gc_pending_cond);
1545 #endif
1546 #if defined(GRAN)
1547       /* add a ContinueThread event to continue execution of current thread */
1548       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1549                 ContinueThread,
1550                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1551       IF_GRAN_DEBUG(bq, 
1552                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1553                G_EVENTQ(0);
1554                G_CURR_THREADQ(0));
1555 #endif /* GRAN */
1556     }
1557
1558 #if defined(GRAN)
1559   next_thread:
1560     IF_GRAN_DEBUG(unused,
1561                   print_eventq(EventHd));
1562
1563     event = get_next_event();
1564 #elif defined(PAR)
1565   next_thread:
1566     /* ToDo: wait for next message to arrive rather than busy wait */
1567 #endif /* GRAN */
1568
1569   } /* end of while(1) */
1570
1571   IF_PAR_DEBUG(verbose,
1572                belch("== Leaving schedule() after having received Finish"));
1573 }
1574
1575 /* ---------------------------------------------------------------------------
1576  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1577  * used by Control.Concurrent for error checking.
1578  * ------------------------------------------------------------------------- */
1579  
1580 StgBool
1581 rtsSupportsBoundThreads(void)
1582 {
1583 #ifdef THREADED_RTS
1584   return rtsTrue;
1585 #else
1586   return rtsFalse;
1587 #endif
1588 }
1589
1590 /* ---------------------------------------------------------------------------
1591  * isThreadBound(tso): check whether tso is bound to an OS thread.
1592  * ------------------------------------------------------------------------- */
1593  
1594 StgBool
1595 isThreadBound(StgTSO* tso)
1596 {
1597 #ifdef THREADED_RTS
1598   StgMainThread *m;
1599   for(m = main_threads; m; m = m->link)
1600   {
1601     if(m->tso == tso)
1602       return rtsTrue;
1603   }
1604 #endif
1605   return rtsFalse;
1606 }
1607
1608 /* ---------------------------------------------------------------------------
1609  * Singleton fork(). Do not copy any running threads.
1610  * ------------------------------------------------------------------------- */
1611
1612 static void 
1613 deleteThreadImmediately(StgTSO *tso);
1614
1615 StgInt
1616 forkProcess(StgTSO* tso)
1617 {
1618 #ifndef mingw32_TARGET_OS
1619   pid_t pid;
1620   StgTSO* t,*next;
1621
1622   IF_DEBUG(scheduler,sched_belch("forking!"));
1623   ACQUIRE_LOCK(&sched_mutex);
1624
1625   pid = fork();
1626   if (pid) { /* parent */
1627
1628   /* just return the pid */
1629     
1630   } else { /* child */
1631 #ifdef THREADED_RTS
1632     /* wipe all other threads */
1633     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1634     tso->link = END_TSO_QUEUE;
1635     
1636     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1637       next = t->link;
1638       
1639       /* Don't kill the current thread.. */
1640       if (t->id == tso->id) {
1641         continue;
1642       }
1643       
1644       if (isThreadBound(t)) {
1645         // If the thread is bound, the OS thread that the thread is bound to
1646         // no longer exists after the fork() system call.
1647         // The bound Haskell thread is therefore unable to run at all;
1648         // we must not give it a chance to survive by catching the
1649         // ThreadKilled exception. So we kill it "brutally" rather than
1650         // using deleteThread.
1651         deleteThreadImmediately(t);
1652       } else {
1653         deleteThread(t);
1654       }
1655     }
1656     
1657     if (isThreadBound(tso)) {
1658     } else {
1659       // If the current is not bound, then we should make it so.
1660       // The OS thread left over by fork() is special in that the process
1661       // will terminate as soon as the thread terminates; 
1662       // we'd expect forkProcess to behave similarily.
1663       // FIXME - we don't do this.
1664     }
1665 #else
1666   StgMainThread *m;
1667   rtsBool doKill;
1668   /* wipe all other threads */
1669   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1670   tso->link = END_TSO_QUEUE;
1671
1672   /* When clearing out the threads, we need to ensure
1673      that a 'main thread' is left behind; if there isn't,
1674      the Scheduler will shutdown next time it is entered.
1675      
1676      ==> we don't kill a thread that's on the main_threads
1677          list (nor the current thread.)
1678     
1679      [ Attempts at implementing the more ambitious scheme of
1680        killing the main_threads also, and then adding the
1681        current thread onto the main_threads list if it wasn't
1682        there already, failed -- waitThread() (for one) wasn't
1683        up to it. If it proves to be desirable to also kill
1684        the main threads, then this scheme will have to be
1685        revisited (and fully debugged!)
1686        
1687        -- sof 7/2002
1688      ]
1689   */
1690   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1691      us is picky about finding the thread still in its queue when
1692      handling the deleteThread() */
1693
1694   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1695     next = t->link;
1696     
1697     /* Don't kill the current thread.. */
1698     if (t->id == tso->id) continue;
1699     doKill=rtsTrue;
1700     /* ..or a main thread */
1701     for (m = main_threads; m != NULL; m = m->link) {
1702         if (m->tso->id == t->id) {
1703           doKill=rtsFalse;
1704           break;
1705         }
1706     }
1707     if (doKill) {
1708       deleteThread(t);
1709     }
1710   }
1711 #endif
1712   }
1713   RELEASE_LOCK(&sched_mutex);
1714   return pid;
1715 #else /* mingw32 */
1716   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1717   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1718   return -1;
1719 #endif /* mingw32 */
1720 }
1721
1722 /* ---------------------------------------------------------------------------
1723  * deleteAllThreads():  kill all the live threads.
1724  *
1725  * This is used when we catch a user interrupt (^C), before performing
1726  * any necessary cleanups and running finalizers.
1727  *
1728  * Locks: sched_mutex held.
1729  * ------------------------------------------------------------------------- */
1730    
1731 void
1732 deleteAllThreads ( void )
1733 {
1734   StgTSO* t, *next;
1735   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1736   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1737       next = t->global_link;
1738       deleteThread(t);
1739   }      
1740   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1741   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1742   sleeping_queue = END_TSO_QUEUE;
1743 }
1744
1745 /* startThread and  insertThread are now in GranSim.c -- HWL */
1746
1747
1748 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1749 //@subsection Suspend and Resume
1750
1751 /* ---------------------------------------------------------------------------
1752  * Suspending & resuming Haskell threads.
1753  * 
1754  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1755  * its capability before calling the C function.  This allows another
1756  * task to pick up the capability and carry on running Haskell
1757  * threads.  It also means that if the C call blocks, it won't lock
1758  * the whole system.
1759  *
1760  * The Haskell thread making the C call is put to sleep for the
1761  * duration of the call, on the susepended_ccalling_threads queue.  We
1762  * give out a token to the task, which it can use to resume the thread
1763  * on return from the C function.
1764  * ------------------------------------------------------------------------- */
1765    
1766 StgInt
1767 suspendThread( StgRegTable *reg, 
1768                rtsBool concCall
1769 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1770                STG_UNUSED
1771 #endif
1772                )
1773 {
1774   nat tok;
1775   Capability *cap;
1776   int saved_errno = errno;
1777
1778   /* assume that *reg is a pointer to the StgRegTable part
1779    * of a Capability.
1780    */
1781   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1782
1783   ACQUIRE_LOCK(&sched_mutex);
1784
1785   IF_DEBUG(scheduler,
1786            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1787
1788   // XXX this might not be necessary --SDM
1789   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1790
1791   threadPaused(cap->r.rCurrentTSO);
1792   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1793   suspended_ccalling_threads = cap->r.rCurrentTSO;
1794
1795 #if defined(RTS_SUPPORTS_THREADS)
1796   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1797   {
1798       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1799       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1800   }
1801   else
1802   {
1803       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1804   }
1805 #endif
1806
1807   /* Use the thread ID as the token; it should be unique */
1808   tok = cap->r.rCurrentTSO->id;
1809
1810   /* Hand back capability */
1811   releaseCapability(cap);
1812   
1813 #if defined(RTS_SUPPORTS_THREADS)
1814   /* Preparing to leave the RTS, so ensure there's a native thread/task
1815      waiting to take over.
1816   */
1817   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): leaving RTS", tok, osThreadId()));
1818   //if (concCall) { // implementing "safe" as opposed to "threadsafe" is more difficult
1819       startTask(taskStart);
1820   //}
1821 #endif
1822
1823   /* Other threads _might_ be available for execution; signal this */
1824   THREAD_RUNNABLE();
1825   RELEASE_LOCK(&sched_mutex);
1826   
1827   errno = saved_errno;
1828   return tok; 
1829 }
1830
1831 StgRegTable *
1832 resumeThread( StgInt tok,
1833               rtsBool concCall STG_UNUSED )
1834 {
1835   StgTSO *tso, **prev;
1836   Capability *cap;
1837   int saved_errno = errno;
1838
1839 #if defined(RTS_SUPPORTS_THREADS)
1840   /* Wait for permission to re-enter the RTS with the result. */
1841   ACQUIRE_LOCK(&sched_mutex);
1842   grabReturnCapability(&sched_mutex, &cap);
1843
1844   IF_DEBUG(scheduler, sched_belch("worker thread (%d, osthread %p): re-entering RTS", tok, osThreadId()));
1845 #else
1846   grabCapability(&cap);
1847 #endif
1848
1849   /* Remove the thread off of the suspended list */
1850   prev = &suspended_ccalling_threads;
1851   for (tso = suspended_ccalling_threads; 
1852        tso != END_TSO_QUEUE; 
1853        prev = &tso->link, tso = tso->link) {
1854     if (tso->id == (StgThreadID)tok) {
1855       *prev = tso->link;
1856       break;
1857     }
1858   }
1859   if (tso == END_TSO_QUEUE) {
1860     barf("resumeThread: thread not found");
1861   }
1862   tso->link = END_TSO_QUEUE;
1863   
1864 #if defined(RTS_SUPPORTS_THREADS)
1865   if(tso->why_blocked == BlockedOnCCall)
1866   {
1867       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1868       tso->blocked_exceptions = NULL;
1869   }
1870 #endif
1871   
1872   /* Reset blocking status */
1873   tso->why_blocked  = NotBlocked;
1874
1875   cap->r.rCurrentTSO = tso;
1876 #if defined(RTS_SUPPORTS_THREADS)
1877   RELEASE_LOCK(&sched_mutex);
1878 #endif
1879   errno = saved_errno;
1880   return &cap->r;
1881 }
1882
1883
1884 /* ---------------------------------------------------------------------------
1885  * Static functions
1886  * ------------------------------------------------------------------------ */
1887 static void unblockThread(StgTSO *tso);
1888
1889 /* ---------------------------------------------------------------------------
1890  * Comparing Thread ids.
1891  *
1892  * This is used from STG land in the implementation of the
1893  * instances of Eq/Ord for ThreadIds.
1894  * ------------------------------------------------------------------------ */
1895
1896 int
1897 cmp_thread(StgPtr tso1, StgPtr tso2) 
1898
1899   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1900   StgThreadID id2 = ((StgTSO *)tso2)->id;
1901  
1902   if (id1 < id2) return (-1);
1903   if (id1 > id2) return 1;
1904   return 0;
1905 }
1906
1907 /* ---------------------------------------------------------------------------
1908  * Fetching the ThreadID from an StgTSO.
1909  *
1910  * This is used in the implementation of Show for ThreadIds.
1911  * ------------------------------------------------------------------------ */
1912 int
1913 rts_getThreadId(StgPtr tso) 
1914 {
1915   return ((StgTSO *)tso)->id;
1916 }
1917
1918 #ifdef DEBUG
1919 void
1920 labelThread(StgPtr tso, char *label)
1921 {
1922   int len;
1923   void *buf;
1924
1925   /* Caveat: Once set, you can only set the thread name to "" */
1926   len = strlen(label)+1;
1927   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1928   strncpy(buf,label,len);
1929   /* Update will free the old memory for us */
1930   updateThreadLabel((StgWord)tso,buf);
1931 }
1932 #endif /* DEBUG */
1933
1934 /* ---------------------------------------------------------------------------
1935    Create a new thread.
1936
1937    The new thread starts with the given stack size.  Before the
1938    scheduler can run, however, this thread needs to have a closure
1939    (and possibly some arguments) pushed on its stack.  See
1940    pushClosure() in Schedule.h.
1941
1942    createGenThread() and createIOThread() (in SchedAPI.h) are
1943    convenient packaged versions of this function.
1944
1945    currently pri (priority) is only used in a GRAN setup -- HWL
1946    ------------------------------------------------------------------------ */
1947 //@cindex createThread
1948 #if defined(GRAN)
1949 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1950 StgTSO *
1951 createThread(nat size, StgInt pri)
1952 #else
1953 StgTSO *
1954 createThread(nat size)
1955 #endif
1956 {
1957
1958     StgTSO *tso;
1959     nat stack_size;
1960
1961     /* First check whether we should create a thread at all */
1962 #if defined(PAR)
1963   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1964   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1965     threadsIgnored++;
1966     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1967           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1968     return END_TSO_QUEUE;
1969   }
1970   threadsCreated++;
1971 #endif
1972
1973 #if defined(GRAN)
1974   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1975 #endif
1976
1977   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1978
1979   /* catch ridiculously small stack sizes */
1980   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1981     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1982   }
1983
1984   stack_size = size - TSO_STRUCT_SIZEW;
1985
1986   tso = (StgTSO *)allocate(size);
1987   TICK_ALLOC_TSO(stack_size, 0);
1988
1989   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1990 #if defined(GRAN)
1991   SET_GRAN_HDR(tso, ThisPE);
1992 #endif
1993
1994   // Always start with the compiled code evaluator
1995   tso->what_next = ThreadRunGHC;
1996
1997   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1998    * protect the increment operation on next_thread_id.
1999    * In future, we could use an atomic increment instead.
2000    */
2001   ACQUIRE_LOCK(&thread_id_mutex);
2002   tso->id = next_thread_id++; 
2003   RELEASE_LOCK(&thread_id_mutex);
2004
2005   tso->why_blocked  = NotBlocked;
2006   tso->blocked_exceptions = NULL;
2007
2008   tso->saved_errno = 0;
2009   
2010   tso->stack_size   = stack_size;
2011   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2012                               - TSO_STRUCT_SIZEW;
2013   tso->sp           = (P_)&(tso->stack) + stack_size;
2014
2015 #ifdef PROFILING
2016   tso->prof.CCCS = CCS_MAIN;
2017 #endif
2018
2019   /* put a stop frame on the stack */
2020   tso->sp -= sizeofW(StgStopFrame);
2021   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2022   // ToDo: check this
2023 #if defined(GRAN)
2024   tso->link = END_TSO_QUEUE;
2025   /* uses more flexible routine in GranSim */
2026   insertThread(tso, CurrentProc);
2027 #else
2028   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2029    * from its creation
2030    */
2031 #endif
2032
2033 #if defined(GRAN) 
2034   if (RtsFlags.GranFlags.GranSimStats.Full) 
2035     DumpGranEvent(GR_START,tso);
2036 #elif defined(PAR)
2037   if (RtsFlags.ParFlags.ParStats.Full) 
2038     DumpGranEvent(GR_STARTQ,tso);
2039   /* HACk to avoid SCHEDULE 
2040      LastTSO = tso; */
2041 #endif
2042
2043   /* Link the new thread on the global thread list.
2044    */
2045   tso->global_link = all_threads;
2046   all_threads = tso;
2047
2048 #if defined(DIST)
2049   tso->dist.priority = MandatoryPriority; //by default that is...
2050 #endif
2051
2052 #if defined(GRAN)
2053   tso->gran.pri = pri;
2054 # if defined(DEBUG)
2055   tso->gran.magic = TSO_MAGIC; // debugging only
2056 # endif
2057   tso->gran.sparkname   = 0;
2058   tso->gran.startedat   = CURRENT_TIME; 
2059   tso->gran.exported    = 0;
2060   tso->gran.basicblocks = 0;
2061   tso->gran.allocs      = 0;
2062   tso->gran.exectime    = 0;
2063   tso->gran.fetchtime   = 0;
2064   tso->gran.fetchcount  = 0;
2065   tso->gran.blocktime   = 0;
2066   tso->gran.blockcount  = 0;
2067   tso->gran.blockedat   = 0;
2068   tso->gran.globalsparks = 0;
2069   tso->gran.localsparks  = 0;
2070   if (RtsFlags.GranFlags.Light)
2071     tso->gran.clock  = Now; /* local clock */
2072   else
2073     tso->gran.clock  = 0;
2074
2075   IF_DEBUG(gran,printTSO(tso));
2076 #elif defined(PAR)
2077 # if defined(DEBUG)
2078   tso->par.magic = TSO_MAGIC; // debugging only
2079 # endif
2080   tso->par.sparkname   = 0;
2081   tso->par.startedat   = CURRENT_TIME; 
2082   tso->par.exported    = 0;
2083   tso->par.basicblocks = 0;
2084   tso->par.allocs      = 0;
2085   tso->par.exectime    = 0;
2086   tso->par.fetchtime   = 0;
2087   tso->par.fetchcount  = 0;
2088   tso->par.blocktime   = 0;
2089   tso->par.blockcount  = 0;
2090   tso->par.blockedat   = 0;
2091   tso->par.globalsparks = 0;
2092   tso->par.localsparks  = 0;
2093 #endif
2094
2095 #if defined(GRAN)
2096   globalGranStats.tot_threads_created++;
2097   globalGranStats.threads_created_on_PE[CurrentProc]++;
2098   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2099   globalGranStats.tot_sq_probes++;
2100 #elif defined(PAR)
2101   // collect parallel global statistics (currently done together with GC stats)
2102   if (RtsFlags.ParFlags.ParStats.Global &&
2103       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2104     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2105     globalParStats.tot_threads_created++;
2106   }
2107 #endif 
2108
2109 #if defined(GRAN)
2110   IF_GRAN_DEBUG(pri,
2111                 belch("==__ schedule: Created TSO %d (%p);",
2112                       CurrentProc, tso, tso->id));
2113 #elif defined(PAR)
2114     IF_PAR_DEBUG(verbose,
2115                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
2116                        tso->id, tso, advisory_thread_count));
2117 #else
2118   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2119                                  tso->id, tso->stack_size));
2120 #endif    
2121   return tso;
2122 }
2123
2124 #if defined(PAR)
2125 /* RFP:
2126    all parallel thread creation calls should fall through the following routine.
2127 */
2128 StgTSO *
2129 createSparkThread(rtsSpark spark) 
2130 { StgTSO *tso;
2131   ASSERT(spark != (rtsSpark)NULL);
2132   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2133   { threadsIgnored++;
2134     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2135           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2136     return END_TSO_QUEUE;
2137   }
2138   else
2139   { threadsCreated++;
2140     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2141     if (tso==END_TSO_QUEUE)     
2142       barf("createSparkThread: Cannot create TSO");
2143 #if defined(DIST)
2144     tso->priority = AdvisoryPriority;
2145 #endif
2146     pushClosure(tso,spark);
2147     PUSH_ON_RUN_QUEUE(tso);
2148     advisory_thread_count++;    
2149   }
2150   return tso;
2151 }
2152 #endif
2153
2154 /*
2155   Turn a spark into a thread.
2156   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2157 */
2158 #if defined(PAR)
2159 //@cindex activateSpark
2160 StgTSO *
2161 activateSpark (rtsSpark spark) 
2162 {
2163   StgTSO *tso;
2164
2165   tso = createSparkThread(spark);
2166   if (RtsFlags.ParFlags.ParStats.Full) {   
2167     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2168     IF_PAR_DEBUG(verbose,
2169                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
2170                        (StgClosure *)spark, info_type((StgClosure *)spark)));
2171   }
2172   // ToDo: fwd info on local/global spark to thread -- HWL
2173   // tso->gran.exported =  spark->exported;
2174   // tso->gran.locked =   !spark->global;
2175   // tso->gran.sparkname = spark->name;
2176
2177   return tso;
2178 }
2179 #endif
2180
2181 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
2182                                    Capability *initialCapability
2183                                    );
2184
2185
2186 /* ---------------------------------------------------------------------------
2187  * scheduleThread()
2188  *
2189  * scheduleThread puts a thread on the head of the runnable queue.
2190  * This will usually be done immediately after a thread is created.
2191  * The caller of scheduleThread must create the thread using e.g.
2192  * createThread and push an appropriate closure
2193  * on this thread's stack before the scheduler is invoked.
2194  * ------------------------------------------------------------------------ */
2195
2196 static void scheduleThread_ (StgTSO* tso);
2197
2198 void
2199 scheduleThread_(StgTSO *tso)
2200 {
2201   // Precondition: sched_mutex must be held.
2202
2203   /* Put the new thread on the head of the runnable queue.  The caller
2204    * better push an appropriate closure on this thread's stack
2205    * beforehand.  In the SMP case, the thread may start running as
2206    * soon as we release the scheduler lock below.
2207    */
2208   PUSH_ON_RUN_QUEUE(tso);
2209   THREAD_RUNNABLE();
2210
2211 #if 0
2212   IF_DEBUG(scheduler,printTSO(tso));
2213 #endif
2214 }
2215
2216 void scheduleThread(StgTSO* tso)
2217 {
2218   ACQUIRE_LOCK(&sched_mutex);
2219   scheduleThread_(tso);
2220   RELEASE_LOCK(&sched_mutex);
2221 }
2222
2223 SchedulerStatus
2224 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *initialCapability)
2225 {       // Precondition: sched_mutex must be held
2226   StgMainThread *m;
2227
2228   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2229   m->tso = tso;
2230   m->ret = ret;
2231   m->stat = NoStatus;
2232 #if defined(RTS_SUPPORTS_THREADS)
2233   initCondition(&m->wakeup);
2234 #if defined(THREADED_RTS)
2235   initCondition(&m->bound_thread_cond);
2236 #endif
2237 #endif
2238
2239   /* Put the thread on the main-threads list prior to scheduling the TSO.
2240      Failure to do so introduces a race condition in the MT case (as
2241      identified by Wolfgang Thaller), whereby the new task/OS thread 
2242      created by scheduleThread_() would complete prior to the thread
2243      that spawned it managed to put 'itself' on the main-threads list.
2244      The upshot of it all being that the worker thread wouldn't get to
2245      signal the completion of the its work item for the main thread to
2246      see (==> it got stuck waiting.)    -- sof 6/02.
2247   */
2248   IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)\n", tso->id));
2249   
2250   m->link = main_threads;
2251   main_threads = m;
2252
2253   scheduleThread_(tso);
2254
2255   return waitThread_(m, initialCapability);
2256 }
2257
2258 /* ---------------------------------------------------------------------------
2259  * initScheduler()
2260  *
2261  * Initialise the scheduler.  This resets all the queues - if the
2262  * queues contained any threads, they'll be garbage collected at the
2263  * next pass.
2264  *
2265  * ------------------------------------------------------------------------ */
2266
2267 #ifdef SMP
2268 static void
2269 term_handler(int sig STG_UNUSED)
2270 {
2271   stat_workerStop();
2272   ACQUIRE_LOCK(&term_mutex);
2273   await_death--;
2274   RELEASE_LOCK(&term_mutex);
2275   shutdownThread();
2276 }
2277 #endif
2278
2279 void 
2280 initScheduler(void)
2281 {
2282 #if defined(GRAN)
2283   nat i;
2284
2285   for (i=0; i<=MAX_PROC; i++) {
2286     run_queue_hds[i]      = END_TSO_QUEUE;
2287     run_queue_tls[i]      = END_TSO_QUEUE;
2288     blocked_queue_hds[i]  = END_TSO_QUEUE;
2289     blocked_queue_tls[i]  = END_TSO_QUEUE;
2290     ccalling_threadss[i]  = END_TSO_QUEUE;
2291     sleeping_queue        = END_TSO_QUEUE;
2292   }
2293 #else
2294   run_queue_hd      = END_TSO_QUEUE;
2295   run_queue_tl      = END_TSO_QUEUE;
2296   blocked_queue_hd  = END_TSO_QUEUE;
2297   blocked_queue_tl  = END_TSO_QUEUE;
2298   sleeping_queue    = END_TSO_QUEUE;
2299 #endif 
2300
2301   suspended_ccalling_threads  = END_TSO_QUEUE;
2302
2303   main_threads = NULL;
2304   all_threads  = END_TSO_QUEUE;
2305
2306   context_switch = 0;
2307   interrupted    = 0;
2308
2309   RtsFlags.ConcFlags.ctxtSwitchTicks =
2310       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2311       
2312 #if defined(RTS_SUPPORTS_THREADS)
2313   /* Initialise the mutex and condition variables used by
2314    * the scheduler. */
2315   initMutex(&sched_mutex);
2316   initMutex(&term_mutex);
2317   initMutex(&thread_id_mutex);
2318
2319   initCondition(&thread_ready_cond);
2320 #endif
2321   
2322 #if defined(SMP)
2323   initCondition(&gc_pending_cond);
2324 #endif
2325
2326 #if defined(RTS_SUPPORTS_THREADS)
2327   ACQUIRE_LOCK(&sched_mutex);
2328 #endif
2329
2330   /* Install the SIGHUP handler */
2331 #if defined(SMP)
2332   {
2333     struct sigaction action,oact;
2334
2335     action.sa_handler = term_handler;
2336     sigemptyset(&action.sa_mask);
2337     action.sa_flags = 0;
2338     if (sigaction(SIGTERM, &action, &oact) != 0) {
2339       barf("can't install TERM handler");
2340     }
2341   }
2342 #endif
2343
2344   /* A capability holds the state a native thread needs in
2345    * order to execute STG code. At least one capability is
2346    * floating around (only SMP builds have more than one).
2347    */
2348   initCapabilities();
2349   
2350 #if defined(RTS_SUPPORTS_THREADS)
2351     /* start our haskell execution tasks */
2352 # if defined(SMP)
2353     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2354 # else
2355     startTaskManager(0,taskStart);
2356 # endif
2357 #endif
2358
2359 #if /* defined(SMP) ||*/ defined(PAR)
2360   initSparkPools();
2361 #endif
2362
2363 #if defined(RTS_SUPPORTS_THREADS)
2364   RELEASE_LOCK(&sched_mutex);
2365 #endif
2366
2367 }
2368
2369 void
2370 exitScheduler( void )
2371 {
2372 #if defined(RTS_SUPPORTS_THREADS)
2373   stopTaskManager();
2374 #endif
2375   shutting_down_scheduler = rtsTrue;
2376 }
2377
2378 /* -----------------------------------------------------------------------------
2379    Managing the per-task allocation areas.
2380    
2381    Each capability comes with an allocation area.  These are
2382    fixed-length block lists into which allocation can be done.
2383
2384    ToDo: no support for two-space collection at the moment???
2385    -------------------------------------------------------------------------- */
2386
2387 /* -----------------------------------------------------------------------------
2388  * waitThread is the external interface for running a new computation
2389  * and waiting for the result.
2390  *
2391  * In the non-SMP case, we create a new main thread, push it on the 
2392  * main-thread stack, and invoke the scheduler to run it.  The
2393  * scheduler will return when the top main thread on the stack has
2394  * completed or died, and fill in the necessary fields of the
2395  * main_thread structure.
2396  *
2397  * In the SMP case, we create a main thread as before, but we then
2398  * create a new condition variable and sleep on it.  When our new
2399  * main thread has completed, we'll be woken up and the status/result
2400  * will be in the main_thread struct.
2401  * -------------------------------------------------------------------------- */
2402
2403 int 
2404 howManyThreadsAvail ( void )
2405 {
2406    int i = 0;
2407    StgTSO* q;
2408    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2409       i++;
2410    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2411       i++;
2412    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2413       i++;
2414    return i;
2415 }
2416
2417 void
2418 finishAllThreads ( void )
2419 {
2420    do {
2421       while (run_queue_hd != END_TSO_QUEUE) {
2422          waitThread ( run_queue_hd, NULL, NULL );
2423       }
2424       while (blocked_queue_hd != END_TSO_QUEUE) {
2425          waitThread ( blocked_queue_hd, NULL, NULL );
2426       }
2427       while (sleeping_queue != END_TSO_QUEUE) {
2428          waitThread ( blocked_queue_hd, NULL, NULL );
2429       }
2430    } while 
2431       (blocked_queue_hd != END_TSO_QUEUE || 
2432        run_queue_hd     != END_TSO_QUEUE ||
2433        sleeping_queue   != END_TSO_QUEUE);
2434 }
2435
2436 SchedulerStatus
2437 waitThread(StgTSO *tso, /*out*/StgClosure **ret, Capability *initialCapability)
2438
2439   StgMainThread *m;
2440   SchedulerStatus stat;
2441
2442   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2443   m->tso = tso;
2444   m->ret = ret;
2445   m->stat = NoStatus;
2446 #if defined(RTS_SUPPORTS_THREADS)
2447   initCondition(&m->wakeup);
2448 #if defined(THREADED_RTS)
2449   initCondition(&m->bound_thread_cond);
2450 #endif
2451 #endif
2452
2453   /* see scheduleWaitThread() comment */
2454   ACQUIRE_LOCK(&sched_mutex);
2455   m->link = main_threads;
2456   main_threads = m;
2457
2458   IF_DEBUG(scheduler, sched_belch("waiting for thread %d", tso->id));
2459
2460   stat = waitThread_(m,initialCapability);
2461   
2462   RELEASE_LOCK(&sched_mutex);
2463   return stat;
2464 }
2465
2466 static
2467 SchedulerStatus
2468 waitThread_(StgMainThread* m, Capability *initialCapability)
2469 {
2470   SchedulerStatus stat;
2471
2472   // Precondition: sched_mutex must be held.
2473   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2474
2475 #if defined(RTS_SUPPORTS_THREADS) && !defined(THREADED_RTS)
2476   {     // FIXME: does this still make sense?
2477         // It's not for the threaded rts => SMP only
2478     do {
2479       waitCondition(&m->wakeup, &sched_mutex);
2480     } while (m->stat == NoStatus);
2481   }
2482 #elif defined(GRAN)
2483   /* GranSim specific init */
2484   CurrentTSO = m->tso;                // the TSO to run
2485   procStatus[MainProc] = Busy;        // status of main PE
2486   CurrentProc = MainProc;             // PE to run it on
2487
2488   RELEASE_LOCK(&sched_mutex);
2489   schedule(m,initialCapability);
2490 #else
2491   RELEASE_LOCK(&sched_mutex);
2492   schedule(m,initialCapability);
2493   ACQUIRE_LOCK(&sched_mutex);
2494   ASSERT(m->stat != NoStatus);
2495 #endif
2496
2497   stat = m->stat;
2498
2499 #if defined(RTS_SUPPORTS_THREADS)
2500   closeCondition(&m->wakeup);
2501 #if defined(THREADED_RTS)
2502   closeCondition(&m->bound_thread_cond);
2503 #endif
2504 #endif
2505
2506   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2507                               m->tso->id));
2508   stgFree(m);
2509
2510   // Postcondition: sched_mutex still held
2511   return stat;
2512 }
2513
2514 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2515 //@subsection Run queue code 
2516
2517 #if 0
2518 /* 
2519    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2520        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2521        implicit global variable that has to be correct when calling these
2522        fcts -- HWL 
2523 */
2524
2525 /* Put the new thread on the head of the runnable queue.
2526  * The caller of createThread better push an appropriate closure
2527  * on this thread's stack before the scheduler is invoked.
2528  */
2529 static /* inline */ void
2530 add_to_run_queue(tso)
2531 StgTSO* tso; 
2532 {
2533   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2534   tso->link = run_queue_hd;
2535   run_queue_hd = tso;
2536   if (run_queue_tl == END_TSO_QUEUE) {
2537     run_queue_tl = tso;
2538   }
2539 }
2540
2541 /* Put the new thread at the end of the runnable queue. */
2542 static /* inline */ void
2543 push_on_run_queue(tso)
2544 StgTSO* tso; 
2545 {
2546   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2547   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2548   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2549   if (run_queue_hd == END_TSO_QUEUE) {
2550     run_queue_hd = tso;
2551   } else {
2552     run_queue_tl->link = tso;
2553   }
2554   run_queue_tl = tso;
2555 }
2556
2557 /* 
2558    Should be inlined because it's used very often in schedule.  The tso
2559    argument is actually only needed in GranSim, where we want to have the
2560    possibility to schedule *any* TSO on the run queue, irrespective of the
2561    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2562    the run queue and dequeue the tso, adjusting the links in the queue. 
2563 */
2564 //@cindex take_off_run_queue
2565 static /* inline */ StgTSO*
2566 take_off_run_queue(StgTSO *tso) {
2567   StgTSO *t, *prev;
2568
2569   /* 
2570      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2571
2572      if tso is specified, unlink that tso from the run_queue (doesn't have
2573      to be at the beginning of the queue); GranSim only 
2574   */
2575   if (tso!=END_TSO_QUEUE) {
2576     /* find tso in queue */
2577     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2578          t!=END_TSO_QUEUE && t!=tso;
2579          prev=t, t=t->link) 
2580       /* nothing */ ;
2581     ASSERT(t==tso);
2582     /* now actually dequeue the tso */
2583     if (prev!=END_TSO_QUEUE) {
2584       ASSERT(run_queue_hd!=t);
2585       prev->link = t->link;
2586     } else {
2587       /* t is at beginning of thread queue */
2588       ASSERT(run_queue_hd==t);
2589       run_queue_hd = t->link;
2590     }
2591     /* t is at end of thread queue */
2592     if (t->link==END_TSO_QUEUE) {
2593       ASSERT(t==run_queue_tl);
2594       run_queue_tl = prev;
2595     } else {
2596       ASSERT(run_queue_tl!=t);
2597     }
2598     t->link = END_TSO_QUEUE;
2599   } else {
2600     /* take tso from the beginning of the queue; std concurrent code */
2601     t = run_queue_hd;
2602     if (t != END_TSO_QUEUE) {
2603       run_queue_hd = t->link;
2604       t->link = END_TSO_QUEUE;
2605       if (run_queue_hd == END_TSO_QUEUE) {
2606         run_queue_tl = END_TSO_QUEUE;
2607       }
2608     }
2609   }
2610   return t;
2611 }
2612
2613 #endif /* 0 */
2614
2615 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2616 //@subsection Garbage Collextion Routines
2617
2618 /* ---------------------------------------------------------------------------
2619    Where are the roots that we know about?
2620
2621         - all the threads on the runnable queue
2622         - all the threads on the blocked queue
2623         - all the threads on the sleeping queue
2624         - all the thread currently executing a _ccall_GC
2625         - all the "main threads"
2626      
2627    ------------------------------------------------------------------------ */
2628
2629 /* This has to be protected either by the scheduler monitor, or by the
2630         garbage collection monitor (probably the latter).
2631         KH @ 25/10/99
2632 */
2633
2634 void
2635 GetRoots(evac_fn evac)
2636 {
2637 #if defined(GRAN)
2638   {
2639     nat i;
2640     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2641       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2642           evac((StgClosure **)&run_queue_hds[i]);
2643       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2644           evac((StgClosure **)&run_queue_tls[i]);
2645       
2646       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2647           evac((StgClosure **)&blocked_queue_hds[i]);
2648       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2649           evac((StgClosure **)&blocked_queue_tls[i]);
2650       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2651           evac((StgClosure **)&ccalling_threads[i]);
2652     }
2653   }
2654
2655   markEventQueue();
2656
2657 #else /* !GRAN */
2658   if (run_queue_hd != END_TSO_QUEUE) {
2659       ASSERT(run_queue_tl != END_TSO_QUEUE);
2660       evac((StgClosure **)&run_queue_hd);
2661       evac((StgClosure **)&run_queue_tl);
2662   }
2663   
2664   if (blocked_queue_hd != END_TSO_QUEUE) {
2665       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2666       evac((StgClosure **)&blocked_queue_hd);
2667       evac((StgClosure **)&blocked_queue_tl);
2668   }
2669   
2670   if (sleeping_queue != END_TSO_QUEUE) {
2671       evac((StgClosure **)&sleeping_queue);
2672   }
2673 #endif 
2674
2675   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2676       evac((StgClosure **)&suspended_ccalling_threads);
2677   }
2678
2679 #if defined(PAR) || defined(GRAN)
2680   markSparkQueue(evac);
2681 #endif
2682
2683 #if defined(RTS_USER_SIGNALS)
2684   // mark the signal handlers (signals should be already blocked)
2685   markSignalHandlers(evac);
2686 #endif
2687
2688   // main threads which have completed need to be retained until they
2689   // are dealt with in the main scheduler loop.  They won't be
2690   // retained any other way: the GC will drop them from the
2691   // all_threads list, so we have to be careful to treat them as roots
2692   // here.
2693   { 
2694       StgMainThread *m;
2695       for (m = main_threads; m != NULL; m = m->link) {
2696           switch (m->tso->what_next) {
2697           case ThreadComplete:
2698           case ThreadKilled:
2699               evac((StgClosure **)&m->tso);
2700               break;
2701           default:
2702               break;
2703           }
2704       }
2705   }
2706 }
2707
2708 /* -----------------------------------------------------------------------------
2709    performGC
2710
2711    This is the interface to the garbage collector from Haskell land.
2712    We provide this so that external C code can allocate and garbage
2713    collect when called from Haskell via _ccall_GC.
2714
2715    It might be useful to provide an interface whereby the programmer
2716    can specify more roots (ToDo).
2717    
2718    This needs to be protected by the GC condition variable above.  KH.
2719    -------------------------------------------------------------------------- */
2720
2721 static void (*extra_roots)(evac_fn);
2722
2723 void
2724 performGC(void)
2725 {
2726   /* Obligated to hold this lock upon entry */
2727   ACQUIRE_LOCK(&sched_mutex);
2728   GarbageCollect(GetRoots,rtsFalse);
2729   RELEASE_LOCK(&sched_mutex);
2730 }
2731
2732 void
2733 performMajorGC(void)
2734 {
2735   ACQUIRE_LOCK(&sched_mutex);
2736   GarbageCollect(GetRoots,rtsTrue);
2737   RELEASE_LOCK(&sched_mutex);
2738 }
2739
2740 static void
2741 AllRoots(evac_fn evac)
2742 {
2743     GetRoots(evac);             // the scheduler's roots
2744     extra_roots(evac);          // the user's roots
2745 }
2746
2747 void
2748 performGCWithRoots(void (*get_roots)(evac_fn))
2749 {
2750   ACQUIRE_LOCK(&sched_mutex);
2751   extra_roots = get_roots;
2752   GarbageCollect(AllRoots,rtsFalse);
2753   RELEASE_LOCK(&sched_mutex);
2754 }
2755
2756 /* -----------------------------------------------------------------------------
2757    Stack overflow
2758
2759    If the thread has reached its maximum stack size, then raise the
2760    StackOverflow exception in the offending thread.  Otherwise
2761    relocate the TSO into a larger chunk of memory and adjust its stack
2762    size appropriately.
2763    -------------------------------------------------------------------------- */
2764
2765 static StgTSO *
2766 threadStackOverflow(StgTSO *tso)
2767 {
2768   nat new_stack_size, new_tso_size, stack_words;
2769   StgPtr new_sp;
2770   StgTSO *dest;
2771
2772   IF_DEBUG(sanity,checkTSO(tso));
2773   if (tso->stack_size >= tso->max_stack_size) {
2774
2775     IF_DEBUG(gc,
2776              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2777                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2778              /* If we're debugging, just print out the top of the stack */
2779              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2780                                               tso->sp+64)));
2781
2782     /* Send this thread the StackOverflow exception */
2783     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2784     return tso;
2785   }
2786
2787   /* Try to double the current stack size.  If that takes us over the
2788    * maximum stack size for this thread, then use the maximum instead.
2789    * Finally round up so the TSO ends up as a whole number of blocks.
2790    */
2791   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2792   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2793                                        TSO_STRUCT_SIZE)/sizeof(W_);
2794   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2795   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2796
2797   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2798
2799   dest = (StgTSO *)allocate(new_tso_size);
2800   TICK_ALLOC_TSO(new_stack_size,0);
2801
2802   /* copy the TSO block and the old stack into the new area */
2803   memcpy(dest,tso,TSO_STRUCT_SIZE);
2804   stack_words = tso->stack + tso->stack_size - tso->sp;
2805   new_sp = (P_)dest + new_tso_size - stack_words;
2806   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2807
2808   /* relocate the stack pointers... */
2809   dest->sp         = new_sp;
2810   dest->stack_size = new_stack_size;
2811         
2812   /* Mark the old TSO as relocated.  We have to check for relocated
2813    * TSOs in the garbage collector and any primops that deal with TSOs.
2814    *
2815    * It's important to set the sp value to just beyond the end
2816    * of the stack, so we don't attempt to scavenge any part of the
2817    * dead TSO's stack.
2818    */
2819   tso->what_next = ThreadRelocated;
2820   tso->link = dest;
2821   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2822   tso->why_blocked = NotBlocked;
2823   dest->mut_link = NULL;
2824
2825   IF_PAR_DEBUG(verbose,
2826                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2827                      tso->id, tso, tso->stack_size);
2828                /* If we're debugging, just print out the top of the stack */
2829                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2830                                                 tso->sp+64)));
2831   
2832   IF_DEBUG(sanity,checkTSO(tso));
2833 #if 0
2834   IF_DEBUG(scheduler,printTSO(dest));
2835 #endif
2836
2837   return dest;
2838 }
2839
2840 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2841 //@subsection Blocking Queue Routines
2842
2843 /* ---------------------------------------------------------------------------
2844    Wake up a queue that was blocked on some resource.
2845    ------------------------------------------------------------------------ */
2846
2847 #if defined(GRAN)
2848 static inline void
2849 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2850 {
2851 }
2852 #elif defined(PAR)
2853 static inline void
2854 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2855 {
2856   /* write RESUME events to log file and
2857      update blocked and fetch time (depending on type of the orig closure) */
2858   if (RtsFlags.ParFlags.ParStats.Full) {
2859     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2860                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2861                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2862     if (EMPTY_RUN_QUEUE())
2863       emitSchedule = rtsTrue;
2864
2865     switch (get_itbl(node)->type) {
2866         case FETCH_ME_BQ:
2867           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2868           break;
2869         case RBH:
2870         case FETCH_ME:
2871         case BLACKHOLE_BQ:
2872           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2873           break;
2874 #ifdef DIST
2875         case MVAR:
2876           break;
2877 #endif    
2878         default:
2879           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2880         }
2881       }
2882 }
2883 #endif
2884
2885 #if defined(GRAN)
2886 static StgBlockingQueueElement *
2887 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2888 {
2889     StgTSO *tso;
2890     PEs node_loc, tso_loc;
2891
2892     node_loc = where_is(node); // should be lifted out of loop
2893     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2894     tso_loc = where_is((StgClosure *)tso);
2895     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2896       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2897       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2898       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2899       // insertThread(tso, node_loc);
2900       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2901                 ResumeThread,
2902                 tso, node, (rtsSpark*)NULL);
2903       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2904       // len_local++;
2905       // len++;
2906     } else { // TSO is remote (actually should be FMBQ)
2907       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2908                                   RtsFlags.GranFlags.Costs.gunblocktime +
2909                                   RtsFlags.GranFlags.Costs.latency;
2910       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2911                 UnblockThread,
2912                 tso, node, (rtsSpark*)NULL);
2913       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2914       // len++;
2915     }
2916     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2917     IF_GRAN_DEBUG(bq,
2918                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2919                           (node_loc==tso_loc ? "Local" : "Global"), 
2920                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2921     tso->block_info.closure = NULL;
2922     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2923                              tso->id, tso));
2924 }
2925 #elif defined(PAR)
2926 static StgBlockingQueueElement *
2927 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2928 {
2929     StgBlockingQueueElement *next;
2930
2931     switch (get_itbl(bqe)->type) {
2932     case TSO:
2933       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2934       /* if it's a TSO just push it onto the run_queue */
2935       next = bqe->link;
2936       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2937       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2938       THREAD_RUNNABLE();
2939       unblockCount(bqe, node);
2940       /* reset blocking status after dumping event */
2941       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2942       break;
2943
2944     case BLOCKED_FETCH:
2945       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2946       next = bqe->link;
2947       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2948       PendingFetches = (StgBlockedFetch *)bqe;
2949       break;
2950
2951 # if defined(DEBUG)
2952       /* can ignore this case in a non-debugging setup; 
2953          see comments on RBHSave closures above */
2954     case CONSTR:
2955       /* check that the closure is an RBHSave closure */
2956       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2957              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2958              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2959       break;
2960
2961     default:
2962       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2963            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2964            (StgClosure *)bqe);
2965 # endif
2966     }
2967   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2968   return next;
2969 }
2970
2971 #else /* !GRAN && !PAR */
2972 static StgTSO *
2973 unblockOneLocked(StgTSO *tso)
2974 {
2975   StgTSO *next;
2976
2977   ASSERT(get_itbl(tso)->type == TSO);
2978   ASSERT(tso->why_blocked != NotBlocked);
2979   tso->why_blocked = NotBlocked;
2980   next = tso->link;
2981   PUSH_ON_RUN_QUEUE(tso);
2982   THREAD_RUNNABLE();
2983   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2984   return next;
2985 }
2986 #endif
2987
2988 #if defined(GRAN) || defined(PAR)
2989 inline StgBlockingQueueElement *
2990 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2991 {
2992   ACQUIRE_LOCK(&sched_mutex);
2993   bqe = unblockOneLocked(bqe, node);
2994   RELEASE_LOCK(&sched_mutex);
2995   return bqe;
2996 }
2997 #else
2998 inline StgTSO *
2999 unblockOne(StgTSO *tso)
3000 {
3001   ACQUIRE_LOCK(&sched_mutex);
3002   tso = unblockOneLocked(tso);
3003   RELEASE_LOCK(&sched_mutex);
3004   return tso;
3005 }
3006 #endif
3007
3008 #if defined(GRAN)
3009 void 
3010 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3011 {
3012   StgBlockingQueueElement *bqe;
3013   PEs node_loc;
3014   nat len = 0; 
3015
3016   IF_GRAN_DEBUG(bq, 
3017                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
3018                       node, CurrentProc, CurrentTime[CurrentProc], 
3019                       CurrentTSO->id, CurrentTSO));
3020
3021   node_loc = where_is(node);
3022
3023   ASSERT(q == END_BQ_QUEUE ||
3024          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3025          get_itbl(q)->type == CONSTR); // closure (type constructor)
3026   ASSERT(is_unique(node));
3027
3028   /* FAKE FETCH: magically copy the node to the tso's proc;
3029      no Fetch necessary because in reality the node should not have been 
3030      moved to the other PE in the first place
3031   */
3032   if (CurrentProc!=node_loc) {
3033     IF_GRAN_DEBUG(bq, 
3034                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
3035                         node, node_loc, CurrentProc, CurrentTSO->id, 
3036                         // CurrentTSO, where_is(CurrentTSO),
3037                         node->header.gran.procs));
3038     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3039     IF_GRAN_DEBUG(bq, 
3040                   belch("## new bitmask of node %p is %#x",
3041                         node, node->header.gran.procs));
3042     if (RtsFlags.GranFlags.GranSimStats.Global) {
3043       globalGranStats.tot_fake_fetches++;
3044     }
3045   }
3046
3047   bqe = q;
3048   // ToDo: check: ASSERT(CurrentProc==node_loc);
3049   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3050     //next = bqe->link;
3051     /* 
3052        bqe points to the current element in the queue
3053        next points to the next element in the queue
3054     */
3055     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3056     //tso_loc = where_is(tso);
3057     len++;
3058     bqe = unblockOneLocked(bqe, node);
3059   }
3060
3061   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3062      the closure to make room for the anchor of the BQ */
3063   if (bqe!=END_BQ_QUEUE) {
3064     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3065     /*
3066     ASSERT((info_ptr==&RBH_Save_0_info) ||
3067            (info_ptr==&RBH_Save_1_info) ||
3068            (info_ptr==&RBH_Save_2_info));
3069     */
3070     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3071     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3072     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3073
3074     IF_GRAN_DEBUG(bq,
3075                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
3076                         node, info_type(node)));
3077   }
3078
3079   /* statistics gathering */
3080   if (RtsFlags.GranFlags.GranSimStats.Global) {
3081     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3082     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3083     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3084     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3085   }
3086   IF_GRAN_DEBUG(bq,
3087                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
3088                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3089 }
3090 #elif defined(PAR)
3091 void 
3092 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3093 {
3094   StgBlockingQueueElement *bqe;
3095
3096   ACQUIRE_LOCK(&sched_mutex);
3097
3098   IF_PAR_DEBUG(verbose, 
3099                belch("##-_ AwBQ for node %p on [%x]: ",
3100                      node, mytid));
3101 #ifdef DIST  
3102   //RFP
3103   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3104     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
3105     return;
3106   }
3107 #endif
3108   
3109   ASSERT(q == END_BQ_QUEUE ||
3110          get_itbl(q)->type == TSO ||           
3111          get_itbl(q)->type == BLOCKED_FETCH || 
3112          get_itbl(q)->type == CONSTR); 
3113
3114   bqe = q;
3115   while (get_itbl(bqe)->type==TSO || 
3116          get_itbl(bqe)->type==BLOCKED_FETCH) {
3117     bqe = unblockOneLocked(bqe, node);
3118   }
3119   RELEASE_LOCK(&sched_mutex);
3120 }
3121
3122 #else   /* !GRAN && !PAR */
3123
3124 #ifdef RTS_SUPPORTS_THREADS
3125 void
3126 awakenBlockedQueueNoLock(StgTSO *tso)
3127 {
3128   while (tso != END_TSO_QUEUE) {
3129     tso = unblockOneLocked(tso);
3130   }
3131 }
3132 #endif
3133
3134 void
3135 awakenBlockedQueue(StgTSO *tso)
3136 {
3137   ACQUIRE_LOCK(&sched_mutex);
3138   while (tso != END_TSO_QUEUE) {
3139     tso = unblockOneLocked(tso);
3140   }
3141   RELEASE_LOCK(&sched_mutex);
3142 }
3143 #endif
3144
3145 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
3146 //@subsection Exception Handling Routines
3147
3148 /* ---------------------------------------------------------------------------
3149    Interrupt execution
3150    - usually called inside a signal handler so it mustn't do anything fancy.   
3151    ------------------------------------------------------------------------ */
3152
3153 void
3154 interruptStgRts(void)
3155 {
3156     interrupted    = 1;
3157     context_switch = 1;
3158 }
3159
3160 /* -----------------------------------------------------------------------------
3161    Unblock a thread
3162
3163    This is for use when we raise an exception in another thread, which
3164    may be blocked.
3165    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3166    -------------------------------------------------------------------------- */
3167
3168 #if defined(GRAN) || defined(PAR)
3169 /*
3170   NB: only the type of the blocking queue is different in GranSim and GUM
3171       the operations on the queue-elements are the same
3172       long live polymorphism!
3173
3174   Locks: sched_mutex is held upon entry and exit.
3175
3176 */
3177 static void
3178 unblockThread(StgTSO *tso)
3179 {
3180   StgBlockingQueueElement *t, **last;
3181
3182   switch (tso->why_blocked) {
3183
3184   case NotBlocked:
3185     return;  /* not blocked */
3186
3187   case BlockedOnMVar:
3188     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3189     {
3190       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3191       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3192
3193       last = (StgBlockingQueueElement **)&mvar->head;
3194       for (t = (StgBlockingQueueElement *)mvar->head; 
3195            t != END_BQ_QUEUE; 
3196            last = &t->link, last_tso = t, t = t->link) {
3197         if (t == (StgBlockingQueueElement *)tso) {
3198           *last = (StgBlockingQueueElement *)tso->link;
3199           if (mvar->tail == tso) {
3200             mvar->tail = (StgTSO *)last_tso;
3201           }
3202           goto done;
3203         }
3204       }
3205       barf("unblockThread (MVAR): TSO not found");
3206     }
3207
3208   case BlockedOnBlackHole:
3209     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3210     {
3211       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3212
3213       last = &bq->blocking_queue;
3214       for (t = bq->blocking_queue; 
3215            t != END_BQ_QUEUE; 
3216            last = &t->link, t = t->link) {
3217         if (t == (StgBlockingQueueElement *)tso) {
3218           *last = (StgBlockingQueueElement *)tso->link;
3219           goto done;
3220         }
3221       }
3222       barf("unblockThread (BLACKHOLE): TSO not found");
3223     }
3224
3225   case BlockedOnException:
3226     {
3227       StgTSO *target  = tso->block_info.tso;
3228
3229       ASSERT(get_itbl(target)->type == TSO);
3230
3231       if (target->what_next == ThreadRelocated) {
3232           target = target->link;
3233           ASSERT(get_itbl(target)->type == TSO);
3234       }
3235
3236       ASSERT(target->blocked_exceptions != NULL);
3237
3238       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3239       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3240            t != END_BQ_QUEUE; 
3241            last = &t->link, t = t->link) {
3242         ASSERT(get_itbl(t)->type == TSO);
3243         if (t == (StgBlockingQueueElement *)tso) {
3244           *last = (StgBlockingQueueElement *)tso->link;
3245           goto done;
3246         }
3247       }
3248       barf("unblockThread (Exception): TSO not found");
3249     }
3250
3251   case BlockedOnRead:
3252   case BlockedOnWrite:
3253 #if defined(mingw32_TARGET_OS)
3254   case BlockedOnDoProc:
3255 #endif
3256     {
3257       /* take TSO off blocked_queue */
3258       StgBlockingQueueElement *prev = NULL;
3259       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3260            prev = t, t = t->link) {
3261         if (t == (StgBlockingQueueElement *)tso) {
3262           if (prev == NULL) {
3263             blocked_queue_hd = (StgTSO *)t->link;
3264             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3265               blocked_queue_tl = END_TSO_QUEUE;
3266             }
3267           } else {
3268             prev->link = t->link;
3269             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3270               blocked_queue_tl = (StgTSO *)prev;
3271             }
3272           }
3273           goto done;
3274         }
3275       }
3276       barf("unblockThread (I/O): TSO not found");
3277     }
3278
3279   case BlockedOnDelay:
3280     {
3281       /* take TSO off sleeping_queue */
3282       StgBlockingQueueElement *prev = NULL;
3283       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3284            prev = t, t = t->link) {
3285         if (t == (StgBlockingQueueElement *)tso) {
3286           if (prev == NULL) {
3287             sleeping_queue = (StgTSO *)t->link;
3288           } else {
3289             prev->link = t->link;
3290           }
3291           goto done;
3292         }
3293       }
3294       barf("unblockThread (delay): TSO not found");
3295     }
3296
3297   default:
3298     barf("unblockThread");
3299   }
3300
3301  done:
3302   tso->link = END_TSO_QUEUE;
3303   tso->why_blocked = NotBlocked;
3304   tso->block_info.closure = NULL;
3305   PUSH_ON_RUN_QUEUE(tso);
3306 }
3307 #else
3308 static void
3309 unblockThread(StgTSO *tso)
3310 {
3311   StgTSO *t, **last;
3312   
3313   /* To avoid locking unnecessarily. */
3314   if (tso->why_blocked == NotBlocked) {
3315     return;
3316   }
3317
3318   switch (tso->why_blocked) {
3319
3320   case BlockedOnMVar:
3321     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3322     {
3323       StgTSO *last_tso = END_TSO_QUEUE;
3324       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3325
3326       last = &mvar->head;
3327       for (t = mvar->head; t != END_TSO_QUEUE; 
3328            last = &t->link, last_tso = t, t = t->link) {
3329         if (t == tso) {
3330           *last = tso->link;
3331           if (mvar->tail == tso) {
3332             mvar->tail = last_tso;
3333           }
3334           goto done;
3335         }
3336       }
3337       barf("unblockThread (MVAR): TSO not found");
3338     }
3339
3340   case BlockedOnBlackHole:
3341     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3342     {
3343       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3344
3345       last = &bq->blocking_queue;
3346       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3347            last = &t->link, t = t->link) {
3348         if (t == tso) {
3349           *last = tso->link;
3350           goto done;
3351         }
3352       }
3353       barf("unblockThread (BLACKHOLE): TSO not found");
3354     }
3355
3356   case BlockedOnException:
3357     {
3358       StgTSO *target  = tso->block_info.tso;
3359
3360       ASSERT(get_itbl(target)->type == TSO);
3361
3362       while (target->what_next == ThreadRelocated) {
3363           target = target->link;
3364           ASSERT(get_itbl(target)->type == TSO);
3365       }
3366       
3367       ASSERT(target->blocked_exceptions != NULL);
3368
3369       last = &target->blocked_exceptions;
3370       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3371            last = &t->link, t = t->link) {
3372         ASSERT(get_itbl(t)->type == TSO);
3373         if (t == tso) {
3374           *last = tso->link;
3375           goto done;
3376         }
3377       }
3378       barf("unblockThread (Exception): TSO not found");
3379     }
3380
3381   case BlockedOnRead:
3382   case BlockedOnWrite:
3383 #if defined(mingw32_TARGET_OS)
3384   case BlockedOnDoProc:
3385 #endif
3386     {
3387       StgTSO *prev = NULL;
3388       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3389            prev = t, t = t->link) {
3390         if (t == tso) {
3391           if (prev == NULL) {
3392             blocked_queue_hd = t->link;
3393             if (blocked_queue_tl == t) {
3394               blocked_queue_tl = END_TSO_QUEUE;
3395             }
3396           } else {
3397             prev->link = t->link;
3398             if (blocked_queue_tl == t) {
3399               blocked_queue_tl = prev;
3400             }
3401           }
3402           goto done;
3403         }
3404       }
3405       barf("unblockThread (I/O): TSO not found");
3406     }
3407
3408   case BlockedOnDelay:
3409     {
3410       StgTSO *prev = NULL;
3411       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3412            prev = t, t = t->link) {
3413         if (t == tso) {
3414           if (prev == NULL) {
3415             sleeping_queue = t->link;
3416           } else {
3417             prev->link = t->link;
3418           }
3419           goto done;
3420         }
3421       }
3422       barf("unblockThread (delay): TSO not found");
3423     }
3424
3425   default:
3426     barf("unblockThread");
3427   }
3428
3429  done:
3430   tso->link = END_TSO_QUEUE;
3431   tso->why_blocked = NotBlocked;
3432   tso->block_info.closure = NULL;
3433   PUSH_ON_RUN_QUEUE(tso);
3434 }
3435 #endif
3436
3437 /* -----------------------------------------------------------------------------
3438  * raiseAsync()
3439  *
3440  * The following function implements the magic for raising an
3441  * asynchronous exception in an existing thread.
3442  *
3443  * We first remove the thread from any queue on which it might be
3444  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3445  *
3446  * We strip the stack down to the innermost CATCH_FRAME, building
3447  * thunks in the heap for all the active computations, so they can 
3448  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3449  * an application of the handler to the exception, and push it on
3450  * the top of the stack.
3451  * 
3452  * How exactly do we save all the active computations?  We create an
3453  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3454  * AP_STACKs pushes everything from the corresponding update frame
3455  * upwards onto the stack.  (Actually, it pushes everything up to the
3456  * next update frame plus a pointer to the next AP_STACK object.
3457  * Entering the next AP_STACK object pushes more onto the stack until we
3458  * reach the last AP_STACK object - at which point the stack should look
3459  * exactly as it did when we killed the TSO and we can continue
3460  * execution by entering the closure on top of the stack.
3461  *
3462  * We can also kill a thread entirely - this happens if either (a) the 
3463  * exception passed to raiseAsync is NULL, or (b) there's no
3464  * CATCH_FRAME on the stack.  In either case, we strip the entire
3465  * stack and replace the thread with a zombie.
3466  *
3467  * Locks: sched_mutex held upon entry nor exit.
3468  *
3469  * -------------------------------------------------------------------------- */
3470  
3471 void 
3472 deleteThread(StgTSO *tso)
3473 {
3474   raiseAsync(tso,NULL);
3475 }
3476
3477 static void 
3478 deleteThreadImmediately(StgTSO *tso)
3479 { // for forkProcess only:
3480   // delete thread without giving it a chance to catch the KillThread exception
3481
3482   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3483       return;
3484   }
3485   unblockThread(tso);
3486   tso->what_next = ThreadKilled;
3487 }
3488
3489 void
3490 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3491 {
3492   /* When raising async exs from contexts where sched_mutex isn't held;
3493      use raiseAsyncWithLock(). */
3494   ACQUIRE_LOCK(&sched_mutex);
3495   raiseAsync(tso,exception);
3496   RELEASE_LOCK(&sched_mutex);
3497 }
3498
3499 void
3500 raiseAsync(StgTSO *tso, StgClosure *exception)
3501 {
3502     StgRetInfoTable *info;
3503     StgPtr sp;
3504   
3505     // Thread already dead?
3506     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3507         return;
3508     }
3509
3510     IF_DEBUG(scheduler, 
3511              sched_belch("raising exception in thread %ld.", tso->id));
3512     
3513     // Remove it from any blocking queues
3514     unblockThread(tso);
3515
3516     sp = tso->sp;
3517     
3518     // The stack freezing code assumes there's a closure pointer on
3519     // the top of the stack, so we have to arrange that this is the case...
3520     //
3521     if (sp[0] == (W_)&stg_enter_info) {
3522         sp++;
3523     } else {
3524         sp--;
3525         sp[0] = (W_)&stg_dummy_ret_closure;
3526     }
3527
3528     while (1) {
3529         nat i;
3530
3531         // 1. Let the top of the stack be the "current closure"
3532         //
3533         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3534         // CATCH_FRAME.
3535         //
3536         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3537         // current closure applied to the chunk of stack up to (but not
3538         // including) the update frame.  This closure becomes the "current
3539         // closure".  Go back to step 2.
3540         //
3541         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3542         // top of the stack applied to the exception.
3543         // 
3544         // 5. If it's a STOP_FRAME, then kill the thread.
3545         
3546         StgPtr frame;
3547         
3548         frame = sp + 1;
3549         info = get_ret_itbl((StgClosure *)frame);
3550         
3551         while (info->i.type != UPDATE_FRAME
3552                && (info->i.type != CATCH_FRAME || exception == NULL)
3553                && info->i.type != STOP_FRAME) {
3554             frame += stack_frame_sizeW((StgClosure *)frame);
3555             info = get_ret_itbl((StgClosure *)frame);
3556         }
3557         
3558         switch (info->i.type) {
3559             
3560         case CATCH_FRAME:
3561             // If we find a CATCH_FRAME, and we've got an exception to raise,
3562             // then build the THUNK raise(exception), and leave it on
3563             // top of the CATCH_FRAME ready to enter.
3564             //
3565         {
3566 #ifdef PROFILING
3567             StgCatchFrame *cf = (StgCatchFrame *)frame;
3568 #endif
3569             StgClosure *raise;
3570             
3571             // we've got an exception to raise, so let's pass it to the
3572             // handler in this frame.
3573             //
3574             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3575             TICK_ALLOC_SE_THK(1,0);
3576             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3577             raise->payload[0] = exception;
3578             
3579             // throw away the stack from Sp up to the CATCH_FRAME.
3580             //
3581             sp = frame - 1;
3582             
3583             /* Ensure that async excpetions are blocked now, so we don't get
3584              * a surprise exception before we get around to executing the
3585              * handler.
3586              */
3587             if (tso->blocked_exceptions == NULL) {
3588                 tso->blocked_exceptions = END_TSO_QUEUE;
3589             }
3590             
3591             /* Put the newly-built THUNK on top of the stack, ready to execute
3592              * when the thread restarts.
3593              */
3594             sp[0] = (W_)raise;
3595             sp[-1] = (W_)&stg_enter_info;
3596             tso->sp = sp-1;
3597             tso->what_next = ThreadRunGHC;
3598             IF_DEBUG(sanity, checkTSO(tso));
3599             return;
3600         }
3601         
3602         case UPDATE_FRAME:
3603         {
3604             StgAP_STACK * ap;
3605             nat words;
3606             
3607             // First build an AP_STACK consisting of the stack chunk above the
3608             // current update frame, with the top word on the stack as the
3609             // fun field.
3610             //
3611             words = frame - sp - 1;
3612             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3613             
3614             ap->size = words;
3615             ap->fun  = (StgClosure *)sp[0];
3616             sp++;
3617             for(i=0; i < (nat)words; ++i) {
3618                 ap->payload[i] = (StgClosure *)*sp++;
3619             }
3620             
3621             SET_HDR(ap,&stg_AP_STACK_info,
3622                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3623             TICK_ALLOC_UP_THK(words+1,0);
3624             
3625             IF_DEBUG(scheduler,
3626                      fprintf(stderr,  "scheduler: Updating ");
3627                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3628                      fprintf(stderr,  " with ");
3629                      printObj((StgClosure *)ap);
3630                 );
3631
3632             // Replace the updatee with an indirection - happily
3633             // this will also wake up any threads currently
3634             // waiting on the result.
3635             //
3636             // Warning: if we're in a loop, more than one update frame on
3637             // the stack may point to the same object.  Be careful not to
3638             // overwrite an IND_OLDGEN in this case, because we'll screw
3639             // up the mutable lists.  To be on the safe side, don't
3640             // overwrite any kind of indirection at all.  See also
3641             // threadSqueezeStack in GC.c, where we have to make a similar
3642             // check.
3643             //
3644             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3645                 // revert the black hole
3646                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3647             }
3648             sp += sizeofW(StgUpdateFrame) - 1;
3649             sp[0] = (W_)ap; // push onto stack
3650             break;
3651         }
3652         
3653         case STOP_FRAME:
3654             // We've stripped the entire stack, the thread is now dead.
3655             sp += sizeofW(StgStopFrame);
3656             tso->what_next = ThreadKilled;
3657             tso->sp = sp;
3658             return;
3659             
3660         default:
3661             barf("raiseAsync");
3662         }
3663     }
3664     barf("raiseAsync");
3665 }
3666
3667 /* -----------------------------------------------------------------------------
3668    resurrectThreads is called after garbage collection on the list of
3669    threads found to be garbage.  Each of these threads will be woken
3670    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3671    on an MVar, or NonTermination if the thread was blocked on a Black
3672    Hole.
3673
3674    Locks: sched_mutex isn't held upon entry nor exit.
3675    -------------------------------------------------------------------------- */
3676
3677 void
3678 resurrectThreads( StgTSO *threads )
3679 {
3680   StgTSO *tso, *next;
3681
3682   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3683     next = tso->global_link;
3684     tso->global_link = all_threads;
3685     all_threads = tso;
3686     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3687
3688     switch (tso->why_blocked) {
3689     case BlockedOnMVar:
3690     case BlockedOnException:
3691       /* Called by GC - sched_mutex lock is currently held. */
3692       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3693       break;
3694     case BlockedOnBlackHole:
3695       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3696       break;
3697     case NotBlocked:
3698       /* This might happen if the thread was blocked on a black hole
3699        * belonging to a thread that we've just woken up (raiseAsync
3700        * can wake up threads, remember...).
3701        */
3702       continue;
3703     default:
3704       barf("resurrectThreads: thread blocked in a strange way");
3705     }
3706   }
3707 }
3708
3709 /* -----------------------------------------------------------------------------
3710  * Blackhole detection: if we reach a deadlock, test whether any
3711  * threads are blocked on themselves.  Any threads which are found to
3712  * be self-blocked get sent a NonTermination exception.
3713  *
3714  * This is only done in a deadlock situation in order to avoid
3715  * performance overhead in the normal case.
3716  *
3717  * Locks: sched_mutex is held upon entry and exit.
3718  * -------------------------------------------------------------------------- */
3719
3720 static void
3721 detectBlackHoles( void )
3722 {
3723     StgTSO *tso = all_threads;
3724     StgClosure *frame;
3725     StgClosure *blocked_on;
3726     StgRetInfoTable *info;
3727
3728     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3729
3730         while (tso->what_next == ThreadRelocated) {
3731             tso = tso->link;
3732             ASSERT(get_itbl(tso)->type == TSO);
3733         }
3734       
3735         if (tso->why_blocked != BlockedOnBlackHole) {
3736             continue;
3737         }
3738         blocked_on = tso->block_info.closure;
3739
3740         frame = (StgClosure *)tso->sp;
3741
3742         while(1) {
3743             info = get_ret_itbl(frame);
3744             switch (info->i.type) {
3745             case UPDATE_FRAME:
3746                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3747                     /* We are blocking on one of our own computations, so
3748                      * send this thread the NonTermination exception.  
3749                      */
3750                     IF_DEBUG(scheduler, 
3751                              sched_belch("thread %d is blocked on itself", tso->id));
3752                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3753                     goto done;
3754                 }
3755                 
3756                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3757                 continue;
3758
3759             case STOP_FRAME:
3760                 goto done;
3761
3762                 // normal stack frames; do nothing except advance the pointer
3763             default:
3764                 (StgPtr)frame += stack_frame_sizeW(frame);
3765             }
3766         }   
3767         done: ;
3768     }
3769 }
3770
3771 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3772 //@subsection Debugging Routines
3773
3774 /* -----------------------------------------------------------------------------
3775  * Debugging: why is a thread blocked
3776  * [Also provides useful information when debugging threaded programs
3777  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3778    -------------------------------------------------------------------------- */
3779
3780 static
3781 void
3782 printThreadBlockage(StgTSO *tso)
3783 {
3784   switch (tso->why_blocked) {
3785   case BlockedOnRead:
3786     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3787     break;
3788   case BlockedOnWrite:
3789     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3790     break;
3791 #if defined(mingw32_TARGET_OS)
3792     case BlockedOnDoProc:
3793     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3794     break;
3795 #endif
3796   case BlockedOnDelay:
3797     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3798     break;
3799   case BlockedOnMVar:
3800     fprintf(stderr,"is blocked on an MVar");
3801     break;
3802   case BlockedOnException:
3803     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3804             tso->block_info.tso->id);
3805     break;
3806   case BlockedOnBlackHole:
3807     fprintf(stderr,"is blocked on a black hole");
3808     break;
3809   case NotBlocked:
3810     fprintf(stderr,"is not blocked");
3811     break;
3812 #if defined(PAR)
3813   case BlockedOnGA:
3814     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3815             tso->block_info.closure, info_type(tso->block_info.closure));
3816     break;
3817   case BlockedOnGA_NoSend:
3818     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3819             tso->block_info.closure, info_type(tso->block_info.closure));
3820     break;
3821 #endif
3822 #if defined(RTS_SUPPORTS_THREADS)
3823   case BlockedOnCCall:
3824     fprintf(stderr,"is blocked on an external call");
3825     break;
3826   case BlockedOnCCall_NoUnblockExc:
3827     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3828     break;
3829 #endif
3830   default:
3831     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3832          tso->why_blocked, tso->id, tso);
3833   }
3834 }
3835
3836 static
3837 void
3838 printThreadStatus(StgTSO *tso)
3839 {
3840   switch (tso->what_next) {
3841   case ThreadKilled:
3842     fprintf(stderr,"has been killed");
3843     break;
3844   case ThreadComplete:
3845     fprintf(stderr,"has completed");
3846     break;
3847   default:
3848     printThreadBlockage(tso);
3849   }
3850 }
3851
3852 void
3853 printAllThreads(void)
3854 {
3855   StgTSO *t;
3856   void *label;
3857
3858 # if defined(GRAN)
3859   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3860   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3861                        time_string, rtsFalse/*no commas!*/);
3862
3863   fprintf(stderr, "all threads at [%s]:\n", time_string);
3864 # elif defined(PAR)
3865   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3866   ullong_format_string(CURRENT_TIME,
3867                        time_string, rtsFalse/*no commas!*/);
3868
3869   fprintf(stderr,"all threads at [%s]:\n", time_string);
3870 # else
3871   fprintf(stderr,"all threads:\n");
3872 # endif
3873
3874   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3875     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3876     label = lookupThreadLabel((StgWord)t);
3877     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3878     printThreadStatus(t);
3879     fprintf(stderr,"\n");
3880   }
3881 }
3882     
3883 #ifdef DEBUG
3884
3885 /* 
3886    Print a whole blocking queue attached to node (debugging only).
3887 */
3888 //@cindex print_bq
3889 # if defined(PAR)
3890 void 
3891 print_bq (StgClosure *node)
3892 {
3893   StgBlockingQueueElement *bqe;
3894   StgTSO *tso;
3895   rtsBool end;
3896
3897   fprintf(stderr,"## BQ of closure %p (%s): ",
3898           node, info_type(node));
3899
3900   /* should cover all closures that may have a blocking queue */
3901   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3902          get_itbl(node)->type == FETCH_ME_BQ ||
3903          get_itbl(node)->type == RBH ||
3904          get_itbl(node)->type == MVAR);
3905     
3906   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3907
3908   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3909 }
3910
3911 /* 
3912    Print a whole blocking queue starting with the element bqe.
3913 */
3914 void 
3915 print_bqe (StgBlockingQueueElement *bqe)
3916 {
3917   rtsBool end;
3918
3919   /* 
3920      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3921   */
3922   for (end = (bqe==END_BQ_QUEUE);
3923        !end; // iterate until bqe points to a CONSTR
3924        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3925        bqe = end ? END_BQ_QUEUE : bqe->link) {
3926     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3927     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3928     /* types of closures that may appear in a blocking queue */
3929     ASSERT(get_itbl(bqe)->type == TSO ||           
3930            get_itbl(bqe)->type == BLOCKED_FETCH || 
3931            get_itbl(bqe)->type == CONSTR); 
3932     /* only BQs of an RBH end with an RBH_Save closure */
3933     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3934
3935     switch (get_itbl(bqe)->type) {
3936     case TSO:
3937       fprintf(stderr," TSO %u (%x),",
3938               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3939       break;
3940     case BLOCKED_FETCH:
3941       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3942               ((StgBlockedFetch *)bqe)->node, 
3943               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3944               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3945               ((StgBlockedFetch *)bqe)->ga.weight);
3946       break;
3947     case CONSTR:
3948       fprintf(stderr," %s (IP %p),",
3949               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3950                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3951                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3952                "RBH_Save_?"), get_itbl(bqe));
3953       break;
3954     default:
3955       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3956            info_type((StgClosure *)bqe)); // , node, info_type(node));
3957       break;
3958     }
3959   } /* for */
3960   fputc('\n', stderr);
3961 }
3962 # elif defined(GRAN)
3963 void 
3964 print_bq (StgClosure *node)
3965 {
3966   StgBlockingQueueElement *bqe;
3967   PEs node_loc, tso_loc;
3968   rtsBool end;
3969
3970   /* should cover all closures that may have a blocking queue */
3971   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3972          get_itbl(node)->type == FETCH_ME_BQ ||
3973          get_itbl(node)->type == RBH);
3974     
3975   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3976   node_loc = where_is(node);
3977
3978   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3979           node, info_type(node), node_loc);
3980
3981   /* 
3982      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3983   */
3984   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3985        !end; // iterate until bqe points to a CONSTR
3986        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3987     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3988     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3989     /* types of closures that may appear in a blocking queue */
3990     ASSERT(get_itbl(bqe)->type == TSO ||           
3991            get_itbl(bqe)->type == CONSTR); 
3992     /* only BQs of an RBH end with an RBH_Save closure */
3993     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3994
3995     tso_loc = where_is((StgClosure *)bqe);
3996     switch (get_itbl(bqe)->type) {
3997     case TSO:
3998       fprintf(stderr," TSO %d (%p) on [PE %d],",
3999               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4000       break;
4001     case CONSTR:
4002       fprintf(stderr," %s (IP %p),",
4003               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4004                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4005                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4006                "RBH_Save_?"), get_itbl(bqe));
4007       break;
4008     default:
4009       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4010            info_type((StgClosure *)bqe), node, info_type(node));
4011       break;
4012     }
4013   } /* for */
4014   fputc('\n', stderr);
4015 }
4016 #else
4017 /* 
4018    Nice and easy: only TSOs on the blocking queue
4019 */
4020 void 
4021 print_bq (StgClosure *node)
4022 {
4023   StgTSO *tso;
4024
4025   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4026   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
4027        tso != END_TSO_QUEUE; 
4028        tso=tso->link) {
4029     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
4030     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
4031     fprintf(stderr," TSO %d (%p),", tso->id, tso);
4032   }
4033   fputc('\n', stderr);
4034 }
4035 # endif
4036
4037 #if defined(PAR)
4038 static nat
4039 run_queue_len(void)
4040 {
4041   nat i;
4042   StgTSO *tso;
4043
4044   for (i=0, tso=run_queue_hd; 
4045        tso != END_TSO_QUEUE;
4046        i++, tso=tso->link)
4047     /* nothing */
4048
4049   return i;
4050 }
4051 #endif
4052
4053 static void
4054 sched_belch(char *s, ...)
4055 {
4056   va_list ap;
4057   va_start(ap,s);
4058 #ifdef SMP
4059   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
4060 #elif defined(PAR)
4061   fprintf(stderr, "== ");
4062 #else
4063   fprintf(stderr, "scheduler: ");
4064 #endif
4065   vfprintf(stderr, s, ap);
4066   fprintf(stderr, "\n");
4067   va_end(ap);
4068 }
4069
4070 #endif /* DEBUG */
4071
4072
4073 //@node Index,  , Debugging Routines, Main scheduling code
4074 //@subsection Index
4075
4076 //@index
4077 //* StgMainThread::  @cindex\s-+StgMainThread
4078 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
4079 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
4080 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
4081 //* context_switch::  @cindex\s-+context_switch
4082 //* createThread::  @cindex\s-+createThread
4083 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
4084 //* initScheduler::  @cindex\s-+initScheduler
4085 //* interrupted::  @cindex\s-+interrupted
4086 //* next_thread_id::  @cindex\s-+next_thread_id
4087 //* print_bq::  @cindex\s-+print_bq
4088 //* run_queue_hd::  @cindex\s-+run_queue_hd
4089 //* run_queue_tl::  @cindex\s-+run_queue_tl
4090 //* sched_mutex::  @cindex\s-+sched_mutex
4091 //* schedule::  @cindex\s-+schedule
4092 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
4093 //* term_mutex::  @cindex\s-+term_mutex
4094 //@end index