[project @ 2004-08-09 14:27:53 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.199 2004/08/09 14:27:53 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2003
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 /* 
21  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22
23    The main scheduling loop in GUM iterates until a finish message is received.
24    In that case a global flag @receivedFinish@ is set and this instance of
25    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
26    for the handling of incoming messages, such as PP_FINISH.
27    Note that in the parallel case we have a system manager that coordinates
28    different PEs, each of which are running one instance of the RTS.
29    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
30    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31
32  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33
34    The main scheduling code in GranSim is quite different from that in std
35    (concurrent) Haskell: while concurrent Haskell just iterates over the
36    threads in the runnable queue, GranSim is event driven, i.e. it iterates
37    over the events in the global event queue.  -- HWL
38 */
39
40 #include "PosixSource.h"
41 #include "Rts.h"
42 #include "SchedAPI.h"
43 #include "RtsUtils.h"
44 #include "RtsFlags.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "StgStartup.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Storage.h"
53 #include "Interpreter.h"
54 #include "Exception.h"
55 #include "Printer.h"
56 #include "Signals.h"
57 #include "Sanity.h"
58 #include "Stats.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #ifdef PROFILING
63 #include "Proftimer.h"
64 #include "ProfHeap.h"
65 #endif
66 #if defined(GRAN) || defined(PAR)
67 # include "GranSimRts.h"
68 # include "GranSim.h"
69 # include "ParallelRts.h"
70 # include "Parallel.h"
71 # include "ParallelDebug.h"
72 # include "FetchMe.h"
73 # include "HLC.h"
74 #endif
75 #include "Sparks.h"
76 #include "Capability.h"
77 #include "OSThreads.h"
78 #include  "Task.h"
79
80 #ifdef HAVE_SYS_TYPES_H
81 #include <sys/types.h>
82 #endif
83 #ifdef HAVE_UNISTD_H
84 #include <unistd.h>
85 #endif
86
87 #include <string.h>
88 #include <stdlib.h>
89 #include <stdarg.h>
90
91 #ifdef HAVE_ERRNO_H
92 #include <errno.h>
93 #endif
94
95 #ifdef THREADED_RTS
96 #define USED_IN_THREADED_RTS
97 #else
98 #define USED_IN_THREADED_RTS STG_UNUSED
99 #endif
100
101 #ifdef RTS_SUPPORTS_THREADS
102 #define USED_WHEN_RTS_SUPPORTS_THREADS
103 #else
104 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
105 #endif
106
107 /* Main thread queue.
108  * Locks required: sched_mutex.
109  */
110 StgMainThread *main_threads = NULL;
111
112 /* Thread queues.
113  * Locks required: sched_mutex.
114  */
115 #if defined(GRAN)
116
117 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
118 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
119
120 /* 
121    In GranSim we have a runnable and a blocked queue for each processor.
122    In order to minimise code changes new arrays run_queue_hds/tls
123    are created. run_queue_hd is then a short cut (macro) for
124    run_queue_hds[CurrentProc] (see GranSim.h).
125    -- HWL
126 */
127 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
128 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
129 StgTSO *ccalling_threadss[MAX_PROC];
130 /* We use the same global list of threads (all_threads) in GranSim as in
131    the std RTS (i.e. we are cheating). However, we don't use this list in
132    the GranSim specific code at the moment (so we are only potentially
133    cheating).  */
134
135 #else /* !GRAN */
136
137 StgTSO *run_queue_hd = NULL;
138 StgTSO *run_queue_tl = NULL;
139 StgTSO *blocked_queue_hd = NULL;
140 StgTSO *blocked_queue_tl = NULL;
141 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
142
143 #endif
144
145 /* Linked list of all threads.
146  * Used for detecting garbage collected threads.
147  */
148 StgTSO *all_threads = NULL;
149
150 /* When a thread performs a safe C call (_ccall_GC, using old
151  * terminology), it gets put on the suspended_ccalling_threads
152  * list. Used by the garbage collector.
153  */
154 static StgTSO *suspended_ccalling_threads;
155
156 static StgTSO *threadStackOverflow(StgTSO *tso);
157
158 /* KH: The following two flags are shared memory locations.  There is no need
159        to lock them, since they are only unset at the end of a scheduler
160        operation.
161 */
162
163 /* flag set by signal handler to precipitate a context switch */
164 nat context_switch = 0;
165
166 /* if this flag is set as well, give up execution */
167 rtsBool interrupted = rtsFalse;
168
169 /* Next thread ID to allocate.
170  * Locks required: thread_id_mutex
171  */
172 static StgThreadID next_thread_id = 1;
173
174 /*
175  * Pointers to the state of the current thread.
176  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
177  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
178  */
179  
180 /* The smallest stack size that makes any sense is:
181  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
182  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
183  *  + 1                       (the closure to enter)
184  *  + 1                       (stg_ap_v_ret)
185  *  + 1                       (spare slot req'd by stg_ap_v_ret)
186  *
187  * A thread with this stack will bomb immediately with a stack
188  * overflow, which will increase its stack size.  
189  */
190
191 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
192
193
194 #if defined(GRAN)
195 StgTSO *CurrentTSO;
196 #endif
197
198 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
199  *  exists - earlier gccs apparently didn't.
200  *  -= chak
201  */
202 StgTSO dummy_tso;
203
204 static rtsBool ready_to_gc;
205
206 /*
207  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
208  * in an MT setting, needed to signal that a worker thread shouldn't hang around
209  * in the scheduler when it is out of work.
210  */
211 static rtsBool shutting_down_scheduler = rtsFalse;
212
213 void            addToBlockedQueue ( StgTSO *tso );
214
215 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
216        void     interruptStgRts   ( void );
217
218 static void     detectBlackHoles  ( void );
219
220 #if defined(RTS_SUPPORTS_THREADS)
221 /* ToDo: carefully document the invariants that go together
222  *       with these synchronisation objects.
223  */
224 Mutex     sched_mutex       = INIT_MUTEX_VAR;
225 Mutex     term_mutex        = INIT_MUTEX_VAR;
226
227 #endif /* RTS_SUPPORTS_THREADS */
228
229 #if defined(PAR)
230 StgTSO *LastTSO;
231 rtsTime TimeOfLastYield;
232 rtsBool emitSchedule = rtsTrue;
233 #endif
234
235 #if DEBUG
236 static char *whatNext_strs[] = {
237   "ThreadRunGHC",
238   "ThreadInterpret",
239   "ThreadKilled",
240   "ThreadRelocated",
241   "ThreadComplete"
242 };
243 #endif
244
245 #if defined(PAR)
246 StgTSO * createSparkThread(rtsSpark spark);
247 StgTSO * activateSpark (rtsSpark spark);  
248 #endif
249
250 /* ----------------------------------------------------------------------------
251  * Starting Tasks
252  * ------------------------------------------------------------------------- */
253
254 #if defined(RTS_SUPPORTS_THREADS)
255 static rtsBool startingWorkerThread = rtsFalse;
256
257 static void taskStart(void);
258 static void
259 taskStart(void)
260 {
261   ACQUIRE_LOCK(&sched_mutex);
262   startingWorkerThread = rtsFalse;
263   schedule(NULL,NULL);
264   RELEASE_LOCK(&sched_mutex);
265 }
266
267 void
268 startSchedulerTaskIfNecessary(void)
269 {
270   if(run_queue_hd != END_TSO_QUEUE
271     || blocked_queue_hd != END_TSO_QUEUE
272     || sleeping_queue != END_TSO_QUEUE)
273   {
274     if(!startingWorkerThread)
275     { // we don't want to start another worker thread
276       // just because the last one hasn't yet reached the
277       // "waiting for capability" state
278       startingWorkerThread = rtsTrue;
279       if(!startTask(taskStart))
280       {
281         startingWorkerThread = rtsFalse;
282       }
283     }
284   }
285 }
286 #endif
287
288 /* ---------------------------------------------------------------------------
289    Main scheduling loop.
290
291    We use round-robin scheduling, each thread returning to the
292    scheduler loop when one of these conditions is detected:
293
294       * out of heap space
295       * timer expires (thread yields)
296       * thread blocks
297       * thread ends
298       * stack overflow
299
300    Locking notes:  we acquire the scheduler lock once at the beginning
301    of the scheduler loop, and release it when
302     
303       * running a thread, or
304       * waiting for work, or
305       * waiting for a GC to complete.
306
307    GRAN version:
308      In a GranSim setup this loop iterates over the global event queue.
309      This revolves around the global event queue, which determines what 
310      to do next. Therefore, it's more complicated than either the 
311      concurrent or the parallel (GUM) setup.
312
313    GUM version:
314      GUM iterates over incoming messages.
315      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
316      and sends out a fish whenever it has nothing to do; in-between
317      doing the actual reductions (shared code below) it processes the
318      incoming messages and deals with delayed operations 
319      (see PendingFetches).
320      This is not the ugliest code you could imagine, but it's bloody close.
321
322    ------------------------------------------------------------------------ */
323 static void
324 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
325           Capability *initialCapability )
326 {
327   StgTSO *t;
328   Capability *cap;
329   StgThreadReturnCode ret;
330 #if defined(GRAN)
331   rtsEvent *event;
332 #elif defined(PAR)
333   StgSparkPool *pool;
334   rtsSpark spark;
335   StgTSO *tso;
336   GlobalTaskId pe;
337   rtsBool receivedFinish = rtsFalse;
338 # if defined(DEBUG)
339   nat tp_size, sp_size; // stats only
340 # endif
341 #endif
342   rtsBool was_interrupted = rtsFalse;
343   StgTSOWhatNext prev_what_next;
344   
345   // Pre-condition: sched_mutex is held.
346   // We might have a capability, passed in as initialCapability.
347   cap = initialCapability;
348
349 #if defined(RTS_SUPPORTS_THREADS)
350   //
351   // in the threaded case, the capability is either passed in via the
352   // initialCapability parameter, or initialized inside the scheduler
353   // loop 
354   //
355   IF_DEBUG(scheduler,
356            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
357                        mainThread, initialCapability);
358       );
359 #else
360   // simply initialise it in the non-threaded case
361   grabCapability(&cap);
362 #endif
363
364 #if defined(GRAN)
365   /* set up first event to get things going */
366   /* ToDo: assign costs for system setup and init MainTSO ! */
367   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
368             ContinueThread, 
369             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
370
371   IF_DEBUG(gran,
372            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
373            G_TSO(CurrentTSO, 5));
374
375   if (RtsFlags.GranFlags.Light) {
376     /* Save current time; GranSim Light only */
377     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
378   }      
379
380   event = get_next_event();
381
382   while (event!=(rtsEvent*)NULL) {
383     /* Choose the processor with the next event */
384     CurrentProc = event->proc;
385     CurrentTSO = event->tso;
386
387 #elif defined(PAR)
388
389   while (!receivedFinish) {    /* set by processMessages */
390                                /* when receiving PP_FINISH message         */ 
391
392 #else // everything except GRAN and PAR
393
394   while (1) {
395
396 #endif
397
398      IF_DEBUG(scheduler, printAllThreads());
399
400 #if defined(RTS_SUPPORTS_THREADS)
401       // Yield the capability to higher-priority tasks if necessary.
402       //
403       if (cap != NULL) {
404           yieldCapability(&cap);
405       }
406
407       // If we do not currently hold a capability, we wait for one
408       //
409       if (cap == NULL) {
410           waitForCapability(&sched_mutex, &cap,
411                             mainThread ? &mainThread->bound_thread_cond : NULL);
412       }
413
414       // We now have a capability...
415 #endif
416
417     //
418     // If we're interrupted (the user pressed ^C, or some other
419     // termination condition occurred), kill all the currently running
420     // threads.
421     //
422     if (interrupted) {
423         IF_DEBUG(scheduler, sched_belch("interrupted"));
424         interrupted = rtsFalse;
425         was_interrupted = rtsTrue;
426 #if defined(RTS_SUPPORTS_THREADS)
427         // In the threaded RTS, deadlock detection doesn't work,
428         // so just exit right away.
429         prog_belch("interrupted");
430         releaseCapability(cap);
431         RELEASE_LOCK(&sched_mutex);
432         shutdownHaskellAndExit(EXIT_SUCCESS);
433 #else
434         deleteAllThreads();
435 #endif
436     }
437
438 #if defined(RTS_USER_SIGNALS)
439     // check for signals each time around the scheduler
440     if (signals_pending()) {
441       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
442       startSignalHandlers();
443       ACQUIRE_LOCK(&sched_mutex);
444     }
445 #endif
446
447     //
448     // Check whether any waiting threads need to be woken up.  If the
449     // run queue is empty, and there are no other tasks running, we
450     // can wait indefinitely for something to happen.
451     //
452     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue)
453 #if defined(RTS_SUPPORTS_THREADS)
454                 || EMPTY_RUN_QUEUE()
455 #endif
456         )
457     {
458       awaitEvent( EMPTY_RUN_QUEUE() );
459     }
460     // we can be interrupted while waiting for I/O...
461     if (interrupted) continue;
462
463     /* 
464      * Detect deadlock: when we have no threads to run, there are no
465      * threads waiting on I/O or sleeping, and all the other tasks are
466      * waiting for work, we must have a deadlock of some description.
467      *
468      * We first try to find threads blocked on themselves (ie. black
469      * holes), and generate NonTermination exceptions where necessary.
470      *
471      * If no threads are black holed, we have a deadlock situation, so
472      * inform all the main threads.
473      */
474 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
475     if (   EMPTY_THREAD_QUEUES() )
476     {
477         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
478         // Garbage collection can release some new threads due to
479         // either (a) finalizers or (b) threads resurrected because
480         // they are about to be send BlockedOnDeadMVar.  Any threads
481         // thus released will be immediately runnable.
482         GarbageCollect(GetRoots,rtsTrue);
483
484         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
485
486         IF_DEBUG(scheduler, 
487                  sched_belch("still deadlocked, checking for black holes..."));
488         detectBlackHoles();
489
490         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
491
492 #if defined(RTS_USER_SIGNALS)
493         /* If we have user-installed signal handlers, then wait
494          * for signals to arrive rather then bombing out with a
495          * deadlock.
496          */
497         if ( anyUserHandlers() ) {
498             IF_DEBUG(scheduler, 
499                      sched_belch("still deadlocked, waiting for signals..."));
500
501             awaitUserSignals();
502
503             // we might be interrupted...
504             if (interrupted) { continue; }
505
506             if (signals_pending()) {
507                 RELEASE_LOCK(&sched_mutex);
508                 startSignalHandlers();
509                 ACQUIRE_LOCK(&sched_mutex);
510             }
511             ASSERT(!EMPTY_RUN_QUEUE());
512             goto not_deadlocked;
513         }
514 #endif
515
516         /* Probably a real deadlock.  Send the current main thread the
517          * Deadlock exception (or in the SMP build, send *all* main
518          * threads the deadlock exception, since none of them can make
519          * progress).
520          */
521         {
522             StgMainThread *m;
523             m = main_threads;
524             switch (m->tso->why_blocked) {
525             case BlockedOnBlackHole:
526             case BlockedOnException:
527             case BlockedOnMVar:
528                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
529                 break;
530             default:
531                 barf("deadlock: main thread blocked in a strange way");
532             }
533         }
534     }
535   not_deadlocked:
536
537 #elif defined(RTS_SUPPORTS_THREADS)
538     // ToDo: add deadlock detection in threaded RTS
539 #elif defined(PAR)
540     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
541 #endif
542
543 #if defined(RTS_SUPPORTS_THREADS)
544     if ( EMPTY_RUN_QUEUE() ) {
545         continue; // nothing to do
546     }
547 #endif
548
549 #if defined(GRAN)
550     if (RtsFlags.GranFlags.Light)
551       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
552
553     /* adjust time based on time-stamp */
554     if (event->time > CurrentTime[CurrentProc] &&
555         event->evttype != ContinueThread)
556       CurrentTime[CurrentProc] = event->time;
557     
558     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
559     if (!RtsFlags.GranFlags.Light)
560       handleIdlePEs();
561
562     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
563
564     /* main event dispatcher in GranSim */
565     switch (event->evttype) {
566       /* Should just be continuing execution */
567     case ContinueThread:
568       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
569       /* ToDo: check assertion
570       ASSERT(run_queue_hd != (StgTSO*)NULL &&
571              run_queue_hd != END_TSO_QUEUE);
572       */
573       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
574       if (!RtsFlags.GranFlags.DoAsyncFetch &&
575           procStatus[CurrentProc]==Fetching) {
576         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
577               CurrentTSO->id, CurrentTSO, CurrentProc);
578         goto next_thread;
579       } 
580       /* Ignore ContinueThreads for completed threads */
581       if (CurrentTSO->what_next == ThreadComplete) {
582         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
583               CurrentTSO->id, CurrentTSO, CurrentProc);
584         goto next_thread;
585       } 
586       /* Ignore ContinueThreads for threads that are being migrated */
587       if (PROCS(CurrentTSO)==Nowhere) { 
588         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
589               CurrentTSO->id, CurrentTSO, CurrentProc);
590         goto next_thread;
591       }
592       /* The thread should be at the beginning of the run queue */
593       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
594         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
595               CurrentTSO->id, CurrentTSO, CurrentProc);
596         break; // run the thread anyway
597       }
598       /*
599       new_event(proc, proc, CurrentTime[proc],
600                 FindWork,
601                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
602       goto next_thread; 
603       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
604       break; // now actually run the thread; DaH Qu'vam yImuHbej 
605
606     case FetchNode:
607       do_the_fetchnode(event);
608       goto next_thread;             /* handle next event in event queue  */
609       
610     case GlobalBlock:
611       do_the_globalblock(event);
612       goto next_thread;             /* handle next event in event queue  */
613       
614     case FetchReply:
615       do_the_fetchreply(event);
616       goto next_thread;             /* handle next event in event queue  */
617       
618     case UnblockThread:   /* Move from the blocked queue to the tail of */
619       do_the_unblock(event);
620       goto next_thread;             /* handle next event in event queue  */
621       
622     case ResumeThread:  /* Move from the blocked queue to the tail of */
623       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
624       event->tso->gran.blocktime += 
625         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
626       do_the_startthread(event);
627       goto next_thread;             /* handle next event in event queue  */
628       
629     case StartThread:
630       do_the_startthread(event);
631       goto next_thread;             /* handle next event in event queue  */
632       
633     case MoveThread:
634       do_the_movethread(event);
635       goto next_thread;             /* handle next event in event queue  */
636       
637     case MoveSpark:
638       do_the_movespark(event);
639       goto next_thread;             /* handle next event in event queue  */
640       
641     case FindWork:
642       do_the_findwork(event);
643       goto next_thread;             /* handle next event in event queue  */
644       
645     default:
646       barf("Illegal event type %u\n", event->evttype);
647     }  /* switch */
648     
649     /* This point was scheduler_loop in the old RTS */
650
651     IF_DEBUG(gran, belch("GRAN: after main switch"));
652
653     TimeOfLastEvent = CurrentTime[CurrentProc];
654     TimeOfNextEvent = get_time_of_next_event();
655     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
656     // CurrentTSO = ThreadQueueHd;
657
658     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
659                          TimeOfNextEvent));
660
661     if (RtsFlags.GranFlags.Light) 
662       GranSimLight_leave_system(event, &ActiveTSO); 
663
664     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
665
666     IF_DEBUG(gran, 
667              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
668
669     /* in a GranSim setup the TSO stays on the run queue */
670     t = CurrentTSO;
671     /* Take a thread from the run queue. */
672     POP_RUN_QUEUE(t); // take_off_run_queue(t);
673
674     IF_DEBUG(gran, 
675              fprintf(stderr, "GRAN: About to run current thread, which is\n");
676              G_TSO(t,5));
677
678     context_switch = 0; // turned on via GranYield, checking events and time slice
679
680     IF_DEBUG(gran, 
681              DumpGranEvent(GR_SCHEDULE, t));
682
683     procStatus[CurrentProc] = Busy;
684
685 #elif defined(PAR)
686     if (PendingFetches != END_BF_QUEUE) {
687         processFetches();
688     }
689
690     /* ToDo: phps merge with spark activation above */
691     /* check whether we have local work and send requests if we have none */
692     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
693       /* :-[  no local threads => look out for local sparks */
694       /* the spark pool for the current PE */
695       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
696       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
697           pool->hd < pool->tl) {
698         /* 
699          * ToDo: add GC code check that we really have enough heap afterwards!!
700          * Old comment:
701          * If we're here (no runnable threads) and we have pending
702          * sparks, we must have a space problem.  Get enough space
703          * to turn one of those pending sparks into a
704          * thread... 
705          */
706
707         spark = findSpark(rtsFalse);                /* get a spark */
708         if (spark != (rtsSpark) NULL) {
709           tso = activateSpark(spark);       /* turn the spark into a thread */
710           IF_PAR_DEBUG(schedule,
711                        belch("==== schedule: Created TSO %d (%p); %d threads active",
712                              tso->id, tso, advisory_thread_count));
713
714           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
715             belch("==^^ failed to activate spark");
716             goto next_thread;
717           }               /* otherwise fall through & pick-up new tso */
718         } else {
719           IF_PAR_DEBUG(verbose,
720                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
721                              spark_queue_len(pool)));
722           goto next_thread;
723         }
724       }
725
726       /* If we still have no work we need to send a FISH to get a spark
727          from another PE 
728       */
729       if (EMPTY_RUN_QUEUE()) {
730       /* =8-[  no local sparks => look for work on other PEs */
731         /*
732          * We really have absolutely no work.  Send out a fish
733          * (there may be some out there already), and wait for
734          * something to arrive.  We clearly can't run any threads
735          * until a SCHEDULE or RESUME arrives, and so that's what
736          * we're hoping to see.  (Of course, we still have to
737          * respond to other types of messages.)
738          */
739         TIME now = msTime() /*CURRENT_TIME*/;
740         IF_PAR_DEBUG(verbose, 
741                      belch("--  now=%ld", now));
742         IF_PAR_DEBUG(verbose,
743                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
744                          (last_fish_arrived_at!=0 &&
745                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
746                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
747                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
748                              last_fish_arrived_at,
749                              RtsFlags.ParFlags.fishDelay, now);
750                      });
751         
752         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
753             (last_fish_arrived_at==0 ||
754              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
755           /* outstandingFishes is set in sendFish, processFish;
756              avoid flooding system with fishes via delay */
757           pe = choosePE();
758           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
759                    NEW_FISH_HUNGER);
760
761           // Global statistics: count no. of fishes
762           if (RtsFlags.ParFlags.ParStats.Global &&
763               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
764             globalParStats.tot_fish_mess++;
765           }
766         }
767       
768         receivedFinish = processMessages();
769         goto next_thread;
770       }
771     } else if (PacketsWaiting()) {  /* Look for incoming messages */
772       receivedFinish = processMessages();
773     }
774
775     /* Now we are sure that we have some work available */
776     ASSERT(run_queue_hd != END_TSO_QUEUE);
777
778     /* Take a thread from the run queue, if we have work */
779     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
780     IF_DEBUG(sanity,checkTSO(t));
781
782     /* ToDo: write something to the log-file
783     if (RTSflags.ParFlags.granSimStats && !sameThread)
784         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
785
786     CurrentTSO = t;
787     */
788     /* the spark pool for the current PE */
789     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
790
791     IF_DEBUG(scheduler, 
792              belch("--=^ %d threads, %d sparks on [%#x]", 
793                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
794
795 # if 1
796     if (0 && RtsFlags.ParFlags.ParStats.Full && 
797         t && LastTSO && t->id != LastTSO->id && 
798         LastTSO->why_blocked == NotBlocked && 
799         LastTSO->what_next != ThreadComplete) {
800       // if previously scheduled TSO not blocked we have to record the context switch
801       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
802                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
803     }
804
805     if (RtsFlags.ParFlags.ParStats.Full && 
806         (emitSchedule /* forced emit */ ||
807         (t && LastTSO && t->id != LastTSO->id))) {
808       /* 
809          we are running a different TSO, so write a schedule event to log file
810          NB: If we use fair scheduling we also have to write  a deschedule 
811              event for LastTSO; with unfair scheduling we know that the
812              previous tso has blocked whenever we switch to another tso, so
813              we don't need it in GUM for now
814       */
815       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
816                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
817       emitSchedule = rtsFalse;
818     }
819      
820 # endif
821 #else /* !GRAN && !PAR */
822   
823     // grab a thread from the run queue
824     ASSERT(run_queue_hd != END_TSO_QUEUE);
825     POP_RUN_QUEUE(t);
826
827     // Sanity check the thread we're about to run.  This can be
828     // expensive if there is lots of thread switching going on...
829     IF_DEBUG(sanity,checkTSO(t));
830 #endif
831
832 #ifdef THREADED_RTS
833     {
834       StgMainThread *m = t->main;
835       
836       if(m)
837       {
838         if(m == mainThread)
839         {
840           IF_DEBUG(scheduler,
841             sched_belch("### Running thread %d in bound thread", t->id));
842           // yes, the Haskell thread is bound to the current native thread
843         }
844         else
845         {
846           IF_DEBUG(scheduler,
847             sched_belch("### thread %d bound to another OS thread", t->id));
848           // no, bound to a different Haskell thread: pass to that thread
849           PUSH_ON_RUN_QUEUE(t);
850           passCapability(&m->bound_thread_cond);
851           continue;
852         }
853       }
854       else
855       {
856         if(mainThread != NULL)
857         // The thread we want to run is bound.
858         {
859           IF_DEBUG(scheduler,
860             sched_belch("### this OS thread cannot run thread %d", t->id));
861           // no, the current native thread is bound to a different
862           // Haskell thread, so pass it to any worker thread
863           PUSH_ON_RUN_QUEUE(t);
864           passCapabilityToWorker();
865           continue; 
866         }
867       }
868     }
869 #endif
870
871     cap->r.rCurrentTSO = t;
872     
873     /* context switches are now initiated by the timer signal, unless
874      * the user specified "context switch as often as possible", with
875      * +RTS -C0
876      */
877     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
878          && (run_queue_hd != END_TSO_QUEUE
879              || blocked_queue_hd != END_TSO_QUEUE
880              || sleeping_queue != END_TSO_QUEUE)))
881         context_switch = 1;
882
883 run_thread:
884
885     RELEASE_LOCK(&sched_mutex);
886
887     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
888                               t->id, whatNext_strs[t->what_next]));
889
890 #ifdef PROFILING
891     startHeapProfTimer();
892 #endif
893
894     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
895     /* Run the current thread 
896      */
897     prev_what_next = t->what_next;
898
899     errno = t->saved_errno;
900
901     switch (prev_what_next) {
902
903     case ThreadKilled:
904     case ThreadComplete:
905         /* Thread already finished, return to scheduler. */
906         ret = ThreadFinished;
907         break;
908
909     case ThreadRunGHC:
910         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
911         break;
912
913     case ThreadInterpret:
914         ret = interpretBCO(cap);
915         break;
916
917     default:
918       barf("schedule: invalid what_next field");
919     }
920
921     // The TSO might have moved, so find the new location:
922     t = cap->r.rCurrentTSO;
923
924     // And save the current errno in this thread.
925     t->saved_errno = errno;
926
927     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
928     
929     /* Costs for the scheduler are assigned to CCS_SYSTEM */
930 #ifdef PROFILING
931     stopHeapProfTimer();
932     CCCS = CCS_SYSTEM;
933 #endif
934     
935     ACQUIRE_LOCK(&sched_mutex);
936     
937 #ifdef RTS_SUPPORTS_THREADS
938     IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
939 #elif !defined(GRAN) && !defined(PAR)
940     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
941 #endif
942     
943 #if defined(PAR)
944     /* HACK 675: if the last thread didn't yield, make sure to print a 
945        SCHEDULE event to the log file when StgRunning the next thread, even
946        if it is the same one as before */
947     LastTSO = t; 
948     TimeOfLastYield = CURRENT_TIME;
949 #endif
950
951     switch (ret) {
952     case HeapOverflow:
953 #if defined(GRAN)
954       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
955       globalGranStats.tot_heapover++;
956 #elif defined(PAR)
957       globalParStats.tot_heapover++;
958 #endif
959
960       // did the task ask for a large block?
961       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
962           // if so, get one and push it on the front of the nursery.
963           bdescr *bd;
964           nat blocks;
965           
966           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
967
968           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
969                                    t->id, whatNext_strs[t->what_next], blocks));
970
971           // don't do this if it would push us over the
972           // alloc_blocks_lim limit; we'll GC first.
973           if (alloc_blocks + blocks < alloc_blocks_lim) {
974
975               alloc_blocks += blocks;
976               bd = allocGroup( blocks );
977
978               // link the new group into the list
979               bd->link = cap->r.rCurrentNursery;
980               bd->u.back = cap->r.rCurrentNursery->u.back;
981               if (cap->r.rCurrentNursery->u.back != NULL) {
982                   cap->r.rCurrentNursery->u.back->link = bd;
983               } else {
984                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
985                          g0s0->blocks == cap->r.rNursery);
986                   cap->r.rNursery = g0s0->blocks = bd;
987               }           
988               cap->r.rCurrentNursery->u.back = bd;
989
990               // initialise it as a nursery block.  We initialise the
991               // step, gen_no, and flags field of *every* sub-block in
992               // this large block, because this is easier than making
993               // sure that we always find the block head of a large
994               // block whenever we call Bdescr() (eg. evacuate() and
995               // isAlive() in the GC would both have to do this, at
996               // least).
997               { 
998                   bdescr *x;
999                   for (x = bd; x < bd + blocks; x++) {
1000                       x->step = g0s0;
1001                       x->gen_no = 0;
1002                       x->flags = 0;
1003                   }
1004               }
1005
1006               // don't forget to update the block count in g0s0.
1007               g0s0->n_blocks += blocks;
1008               // This assert can be a killer if the app is doing lots
1009               // of large block allocations.
1010               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1011
1012               // now update the nursery to point to the new block
1013               cap->r.rCurrentNursery = bd;
1014
1015               // we might be unlucky and have another thread get on the
1016               // run queue before us and steal the large block, but in that
1017               // case the thread will just end up requesting another large
1018               // block.
1019               PUSH_ON_RUN_QUEUE(t);
1020               break;
1021           }
1022       }
1023
1024       /* make all the running tasks block on a condition variable,
1025        * maybe set context_switch and wait till they all pile in,
1026        * then have them wait on a GC condition variable.
1027        */
1028       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1029                                t->id, whatNext_strs[t->what_next]));
1030       threadPaused(t);
1031 #if defined(GRAN)
1032       ASSERT(!is_on_queue(t,CurrentProc));
1033 #elif defined(PAR)
1034       /* Currently we emit a DESCHEDULE event before GC in GUM.
1035          ToDo: either add separate event to distinguish SYSTEM time from rest
1036                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1037       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1038         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1039                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1040         emitSchedule = rtsTrue;
1041       }
1042 #endif
1043       
1044       ready_to_gc = rtsTrue;
1045       context_switch = 1;               /* stop other threads ASAP */
1046       PUSH_ON_RUN_QUEUE(t);
1047       /* actual GC is done at the end of the while loop */
1048       break;
1049       
1050     case StackOverflow:
1051 #if defined(GRAN)
1052       IF_DEBUG(gran, 
1053                DumpGranEvent(GR_DESCHEDULE, t));
1054       globalGranStats.tot_stackover++;
1055 #elif defined(PAR)
1056       // IF_DEBUG(par, 
1057       // DumpGranEvent(GR_DESCHEDULE, t);
1058       globalParStats.tot_stackover++;
1059 #endif
1060       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1061                                t->id, whatNext_strs[t->what_next]));
1062       /* just adjust the stack for this thread, then pop it back
1063        * on the run queue.
1064        */
1065       threadPaused(t);
1066       { 
1067         /* enlarge the stack */
1068         StgTSO *new_t = threadStackOverflow(t);
1069         
1070         /* This TSO has moved, so update any pointers to it from the
1071          * main thread stack.  It better not be on any other queues...
1072          * (it shouldn't be).
1073          */
1074         if (t->main != NULL) {
1075             t->main->tso = new_t;
1076         }
1077         PUSH_ON_RUN_QUEUE(new_t);
1078       }
1079       break;
1080
1081     case ThreadYielding:
1082       // Reset the context switch flag.  We don't do this just before
1083       // running the thread, because that would mean we would lose ticks
1084       // during GC, which can lead to unfair scheduling (a thread hogs
1085       // the CPU because the tick always arrives during GC).  This way
1086       // penalises threads that do a lot of allocation, but that seems
1087       // better than the alternative.
1088       context_switch = 0;
1089
1090 #if defined(GRAN)
1091       IF_DEBUG(gran, 
1092                DumpGranEvent(GR_DESCHEDULE, t));
1093       globalGranStats.tot_yields++;
1094 #elif defined(PAR)
1095       // IF_DEBUG(par, 
1096       // DumpGranEvent(GR_DESCHEDULE, t);
1097       globalParStats.tot_yields++;
1098 #endif
1099       /* put the thread back on the run queue.  Then, if we're ready to
1100        * GC, check whether this is the last task to stop.  If so, wake
1101        * up the GC thread.  getThread will block during a GC until the
1102        * GC is finished.
1103        */
1104       IF_DEBUG(scheduler,
1105                if (t->what_next != prev_what_next) {
1106                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1107                          t->id, whatNext_strs[t->what_next]);
1108                } else {
1109                    belch("--<< thread %ld (%s) stopped, yielding", 
1110                          t->id, whatNext_strs[t->what_next]);
1111                }
1112                );
1113
1114       IF_DEBUG(sanity,
1115                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1116                checkTSO(t));
1117       ASSERT(t->link == END_TSO_QUEUE);
1118
1119       // Shortcut if we're just switching evaluators: don't bother
1120       // doing stack squeezing (which can be expensive), just run the
1121       // thread.
1122       if (t->what_next != prev_what_next) {
1123           goto run_thread;
1124       }
1125
1126       threadPaused(t);
1127
1128 #if defined(GRAN)
1129       ASSERT(!is_on_queue(t,CurrentProc));
1130
1131       IF_DEBUG(sanity,
1132                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1133                checkThreadQsSanity(rtsTrue));
1134 #endif
1135
1136 #if defined(PAR)
1137       if (RtsFlags.ParFlags.doFairScheduling) { 
1138         /* this does round-robin scheduling; good for concurrency */
1139         APPEND_TO_RUN_QUEUE(t);
1140       } else {
1141         /* this does unfair scheduling; good for parallelism */
1142         PUSH_ON_RUN_QUEUE(t);
1143       }
1144 #else
1145       // this does round-robin scheduling; good for concurrency
1146       APPEND_TO_RUN_QUEUE(t);
1147 #endif
1148
1149 #if defined(GRAN)
1150       /* add a ContinueThread event to actually process the thread */
1151       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1152                 ContinueThread,
1153                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1154       IF_GRAN_DEBUG(bq, 
1155                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1156                G_EVENTQ(0);
1157                G_CURR_THREADQ(0));
1158 #endif /* GRAN */
1159       break;
1160
1161     case ThreadBlocked:
1162 #if defined(GRAN)
1163       IF_DEBUG(scheduler,
1164                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1165                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1166                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1167
1168       // ??? needed; should emit block before
1169       IF_DEBUG(gran, 
1170                DumpGranEvent(GR_DESCHEDULE, t)); 
1171       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1172       /*
1173         ngoq Dogh!
1174       ASSERT(procStatus[CurrentProc]==Busy || 
1175               ((procStatus[CurrentProc]==Fetching) && 
1176               (t->block_info.closure!=(StgClosure*)NULL)));
1177       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1178           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1179             procStatus[CurrentProc]==Fetching)) 
1180         procStatus[CurrentProc] = Idle;
1181       */
1182 #elif defined(PAR)
1183       IF_DEBUG(scheduler,
1184                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1185                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1186       IF_PAR_DEBUG(bq,
1187
1188                    if (t->block_info.closure!=(StgClosure*)NULL) 
1189                      print_bq(t->block_info.closure));
1190
1191       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1192       blockThread(t);
1193
1194       /* whatever we schedule next, we must log that schedule */
1195       emitSchedule = rtsTrue;
1196
1197 #else /* !GRAN */
1198       /* don't need to do anything.  Either the thread is blocked on
1199        * I/O, in which case we'll have called addToBlockedQueue
1200        * previously, or it's blocked on an MVar or Blackhole, in which
1201        * case it'll be on the relevant queue already.
1202        */
1203       IF_DEBUG(scheduler,
1204                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1205                        t->id, whatNext_strs[t->what_next]);
1206                printThreadBlockage(t);
1207                fprintf(stderr, "\n"));
1208       fflush(stderr);
1209
1210       /* Only for dumping event to log file 
1211          ToDo: do I need this in GranSim, too?
1212       blockThread(t);
1213       */
1214 #endif
1215       threadPaused(t);
1216       break;
1217
1218     case ThreadFinished:
1219       /* Need to check whether this was a main thread, and if so, signal
1220        * the task that started it with the return value.  If we have no
1221        * more main threads, we probably need to stop all the tasks until
1222        * we get a new one.
1223        */
1224       /* We also end up here if the thread kills itself with an
1225        * uncaught exception, see Exception.hc.
1226        */
1227       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1228                                t->id, whatNext_strs[t->what_next]));
1229 #if defined(GRAN)
1230       endThread(t, CurrentProc); // clean-up the thread
1231 #elif defined(PAR)
1232       /* For now all are advisory -- HWL */
1233       //if(t->priority==AdvisoryPriority) ??
1234       advisory_thread_count--;
1235       
1236 # ifdef DIST
1237       if(t->dist.priority==RevalPriority)
1238         FinishReval(t);
1239 # endif
1240       
1241       if (RtsFlags.ParFlags.ParStats.Full &&
1242           !RtsFlags.ParFlags.ParStats.Suppressed) 
1243         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1244 #endif
1245
1246       //
1247       // Check whether the thread that just completed was a main
1248       // thread, and if so return with the result.  
1249       //
1250       // There is an assumption here that all thread completion goes
1251       // through this point; we need to make sure that if a thread
1252       // ends up in the ThreadKilled state, that it stays on the run
1253       // queue so it can be dealt with here.
1254       //
1255       if (
1256 #if defined(RTS_SUPPORTS_THREADS)
1257           mainThread != NULL
1258 #else
1259           mainThread->tso == t
1260 #endif
1261           )
1262       {
1263           // We are a bound thread: this must be our thread that just
1264           // completed.
1265           ASSERT(mainThread->tso == t);
1266
1267           if (t->what_next == ThreadComplete) {
1268               if (mainThread->ret) {
1269                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1270                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1271               }
1272               mainThread->stat = Success;
1273           } else {
1274               if (mainThread->ret) {
1275                   *(mainThread->ret) = NULL;
1276               }
1277               if (was_interrupted) {
1278                   mainThread->stat = Interrupted;
1279               } else {
1280                   mainThread->stat = Killed;
1281               }
1282           }
1283 #ifdef DEBUG
1284           removeThreadLabel((StgWord)mainThread->tso->id);
1285 #endif
1286           if (mainThread->prev == NULL) {
1287               main_threads = mainThread->link;
1288           } else {
1289               mainThread->prev->link = mainThread->link;
1290           }
1291           if (mainThread->link != NULL) {
1292               mainThread->link->prev = NULL;
1293           }
1294           releaseCapability(cap);
1295           return;
1296       }
1297
1298 #ifdef RTS_SUPPORTS_THREADS
1299       ASSERT(t->main == NULL);
1300 #else
1301       if (t->main != NULL) {
1302           // Must be a main thread that is not the topmost one.  Leave
1303           // it on the run queue until the stack has unwound to the
1304           // point where we can deal with this.  Leaving it on the run
1305           // queue also ensures that the garbage collector knows about
1306           // this thread and its return value (it gets dropped from the
1307           // all_threads list so there's no other way to find it).
1308           APPEND_TO_RUN_QUEUE(t);
1309       }
1310 #endif
1311       break;
1312
1313     default:
1314       barf("schedule: invalid thread return code %d", (int)ret);
1315     }
1316
1317 #ifdef PROFILING
1318     // When we have +RTS -i0 and we're heap profiling, do a census at
1319     // every GC.  This lets us get repeatable runs for debugging.
1320     if (performHeapProfile ||
1321         (RtsFlags.ProfFlags.profileInterval==0 &&
1322          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1323         GarbageCollect(GetRoots, rtsTrue);
1324         heapCensus();
1325         performHeapProfile = rtsFalse;
1326         ready_to_gc = rtsFalse; // we already GC'd
1327     }
1328 #endif
1329
1330     if (ready_to_gc) {
1331       /* everybody back, start the GC.
1332        * Could do it in this thread, or signal a condition var
1333        * to do it in another thread.  Either way, we need to
1334        * broadcast on gc_pending_cond afterward.
1335        */
1336 #if defined(RTS_SUPPORTS_THREADS)
1337       IF_DEBUG(scheduler,sched_belch("doing GC"));
1338 #endif
1339       GarbageCollect(GetRoots,rtsFalse);
1340       ready_to_gc = rtsFalse;
1341 #if defined(GRAN)
1342       /* add a ContinueThread event to continue execution of current thread */
1343       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1344                 ContinueThread,
1345                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1346       IF_GRAN_DEBUG(bq, 
1347                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1348                G_EVENTQ(0);
1349                G_CURR_THREADQ(0));
1350 #endif /* GRAN */
1351     }
1352
1353 #if defined(GRAN)
1354   next_thread:
1355     IF_GRAN_DEBUG(unused,
1356                   print_eventq(EventHd));
1357
1358     event = get_next_event();
1359 #elif defined(PAR)
1360   next_thread:
1361     /* ToDo: wait for next message to arrive rather than busy wait */
1362 #endif /* GRAN */
1363
1364   } /* end of while(1) */
1365
1366   IF_PAR_DEBUG(verbose,
1367                belch("== Leaving schedule() after having received Finish"));
1368 }
1369
1370 /* ---------------------------------------------------------------------------
1371  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1372  * used by Control.Concurrent for error checking.
1373  * ------------------------------------------------------------------------- */
1374  
1375 StgBool
1376 rtsSupportsBoundThreads(void)
1377 {
1378 #ifdef THREADED_RTS
1379   return rtsTrue;
1380 #else
1381   return rtsFalse;
1382 #endif
1383 }
1384
1385 /* ---------------------------------------------------------------------------
1386  * isThreadBound(tso): check whether tso is bound to an OS thread.
1387  * ------------------------------------------------------------------------- */
1388  
1389 StgBool
1390 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1391 {
1392 #ifdef THREADED_RTS
1393   return (tso->main != NULL);
1394 #endif
1395   return rtsFalse;
1396 }
1397
1398 /* ---------------------------------------------------------------------------
1399  * Singleton fork(). Do not copy any running threads.
1400  * ------------------------------------------------------------------------- */
1401
1402 #ifndef mingw32_TARGET_OS
1403 #define FORKPROCESS_PRIMOP_SUPPORTED
1404 #endif
1405
1406 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1407 static void 
1408 deleteThreadImmediately(StgTSO *tso);
1409 #endif
1410 StgInt
1411 forkProcess(HsStablePtr *entry
1412 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1413             STG_UNUSED
1414 #endif
1415            )
1416 {
1417 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1418   pid_t pid;
1419   StgTSO* t,*next;
1420   StgMainThread *m;
1421   SchedulerStatus rc;
1422
1423   IF_DEBUG(scheduler,sched_belch("forking!"));
1424   rts_lock(); // This not only acquires sched_mutex, it also
1425               // makes sure that no other threads are running
1426
1427   pid = fork();
1428
1429   if (pid) { /* parent */
1430
1431   /* just return the pid */
1432     rts_unlock();
1433     return pid;
1434     
1435   } else { /* child */
1436     
1437     
1438       // delete all threads
1439     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1440     
1441     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1442       next = t->link;
1443
1444         // don't allow threads to catch the ThreadKilled exception
1445       deleteThreadImmediately(t);
1446     }
1447     
1448       // wipe the main thread list
1449     while((m = main_threads) != NULL) {
1450       main_threads = m->link;
1451 # ifdef THREADED_RTS
1452       closeCondition(&m->bound_thread_cond);
1453 # endif
1454       stgFree(m);
1455     }
1456     
1457 # ifdef RTS_SUPPORTS_THREADS
1458     resetTaskManagerAfterFork();      // tell startTask() and friends that
1459     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1460     resetWorkerWakeupPipeAfterFork();
1461 # endif
1462     
1463     rc = rts_evalStableIO(entry, NULL);  // run the action
1464     rts_checkSchedStatus("forkProcess",rc);
1465     
1466     rts_unlock();
1467     
1468     hs_exit();                      // clean up and exit
1469     stg_exit(0);
1470   }
1471 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
1472   barf("forkProcess#: primop not supported, sorry!\n");
1473   return -1;
1474 #endif
1475 }
1476
1477 /* ---------------------------------------------------------------------------
1478  * deleteAllThreads():  kill all the live threads.
1479  *
1480  * This is used when we catch a user interrupt (^C), before performing
1481  * any necessary cleanups and running finalizers.
1482  *
1483  * Locks: sched_mutex held.
1484  * ------------------------------------------------------------------------- */
1485    
1486 void
1487 deleteAllThreads ( void )
1488 {
1489   StgTSO* t, *next;
1490   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1491   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1492       next = t->global_link;
1493       deleteThread(t);
1494   }      
1495
1496   // The run queue now contains a bunch of ThreadKilled threads.  We
1497   // must not throw these away: the main thread(s) will be in there
1498   // somewhere, and the main scheduler loop has to deal with it.
1499   // Also, the run queue is the only thing keeping these threads from
1500   // being GC'd, and we don't want the "main thread has been GC'd" panic.
1501
1502   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
1503   ASSERT(sleeping_queue == END_TSO_QUEUE);
1504 }
1505
1506 /* startThread and  insertThread are now in GranSim.c -- HWL */
1507
1508
1509 /* ---------------------------------------------------------------------------
1510  * Suspending & resuming Haskell threads.
1511  * 
1512  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1513  * its capability before calling the C function.  This allows another
1514  * task to pick up the capability and carry on running Haskell
1515  * threads.  It also means that if the C call blocks, it won't lock
1516  * the whole system.
1517  *
1518  * The Haskell thread making the C call is put to sleep for the
1519  * duration of the call, on the susepended_ccalling_threads queue.  We
1520  * give out a token to the task, which it can use to resume the thread
1521  * on return from the C function.
1522  * ------------------------------------------------------------------------- */
1523    
1524 StgInt
1525 suspendThread( StgRegTable *reg, 
1526                rtsBool concCall
1527 #if !defined(DEBUG)
1528                STG_UNUSED
1529 #endif
1530                )
1531 {
1532   nat tok;
1533   Capability *cap;
1534   int saved_errno = errno;
1535
1536   /* assume that *reg is a pointer to the StgRegTable part
1537    * of a Capability.
1538    */
1539   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1540
1541   ACQUIRE_LOCK(&sched_mutex);
1542
1543   IF_DEBUG(scheduler,
1544            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1545
1546   // XXX this might not be necessary --SDM
1547   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1548
1549   threadPaused(cap->r.rCurrentTSO);
1550   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1551   suspended_ccalling_threads = cap->r.rCurrentTSO;
1552
1553   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
1554       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1555       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1556   } else {
1557       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1558   }
1559
1560   /* Use the thread ID as the token; it should be unique */
1561   tok = cap->r.rCurrentTSO->id;
1562
1563   /* Hand back capability */
1564   releaseCapability(cap);
1565   
1566 #if defined(RTS_SUPPORTS_THREADS)
1567   /* Preparing to leave the RTS, so ensure there's a native thread/task
1568      waiting to take over.
1569   */
1570   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1571 #endif
1572
1573   /* Other threads _might_ be available for execution; signal this */
1574   THREAD_RUNNABLE();
1575   RELEASE_LOCK(&sched_mutex);
1576   
1577   errno = saved_errno;
1578   return tok; 
1579 }
1580
1581 StgRegTable *
1582 resumeThread( StgInt tok,
1583               rtsBool concCall STG_UNUSED )
1584 {
1585   StgTSO *tso, **prev;
1586   Capability *cap;
1587   int saved_errno = errno;
1588
1589 #if defined(RTS_SUPPORTS_THREADS)
1590   /* Wait for permission to re-enter the RTS with the result. */
1591   ACQUIRE_LOCK(&sched_mutex);
1592   waitForReturnCapability(&sched_mutex, &cap);
1593
1594   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1595 #else
1596   grabCapability(&cap);
1597 #endif
1598
1599   /* Remove the thread off of the suspended list */
1600   prev = &suspended_ccalling_threads;
1601   for (tso = suspended_ccalling_threads; 
1602        tso != END_TSO_QUEUE; 
1603        prev = &tso->link, tso = tso->link) {
1604     if (tso->id == (StgThreadID)tok) {
1605       *prev = tso->link;
1606       break;
1607     }
1608   }
1609   if (tso == END_TSO_QUEUE) {
1610     barf("resumeThread: thread not found");
1611   }
1612   tso->link = END_TSO_QUEUE;
1613   
1614   if(tso->why_blocked == BlockedOnCCall) {
1615       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1616       tso->blocked_exceptions = NULL;
1617   }
1618   
1619   /* Reset blocking status */
1620   tso->why_blocked  = NotBlocked;
1621
1622   cap->r.rCurrentTSO = tso;
1623   RELEASE_LOCK(&sched_mutex);
1624   errno = saved_errno;
1625   return &cap->r;
1626 }
1627
1628
1629 /* ---------------------------------------------------------------------------
1630  * Static functions
1631  * ------------------------------------------------------------------------ */
1632 static void unblockThread(StgTSO *tso);
1633
1634 /* ---------------------------------------------------------------------------
1635  * Comparing Thread ids.
1636  *
1637  * This is used from STG land in the implementation of the
1638  * instances of Eq/Ord for ThreadIds.
1639  * ------------------------------------------------------------------------ */
1640
1641 int
1642 cmp_thread(StgPtr tso1, StgPtr tso2) 
1643
1644   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1645   StgThreadID id2 = ((StgTSO *)tso2)->id;
1646  
1647   if (id1 < id2) return (-1);
1648   if (id1 > id2) return 1;
1649   return 0;
1650 }
1651
1652 /* ---------------------------------------------------------------------------
1653  * Fetching the ThreadID from an StgTSO.
1654  *
1655  * This is used in the implementation of Show for ThreadIds.
1656  * ------------------------------------------------------------------------ */
1657 int
1658 rts_getThreadId(StgPtr tso) 
1659 {
1660   return ((StgTSO *)tso)->id;
1661 }
1662
1663 #ifdef DEBUG
1664 void
1665 labelThread(StgPtr tso, char *label)
1666 {
1667   int len;
1668   void *buf;
1669
1670   /* Caveat: Once set, you can only set the thread name to "" */
1671   len = strlen(label)+1;
1672   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1673   strncpy(buf,label,len);
1674   /* Update will free the old memory for us */
1675   updateThreadLabel(((StgTSO *)tso)->id,buf);
1676 }
1677 #endif /* DEBUG */
1678
1679 /* ---------------------------------------------------------------------------
1680    Create a new thread.
1681
1682    The new thread starts with the given stack size.  Before the
1683    scheduler can run, however, this thread needs to have a closure
1684    (and possibly some arguments) pushed on its stack.  See
1685    pushClosure() in Schedule.h.
1686
1687    createGenThread() and createIOThread() (in SchedAPI.h) are
1688    convenient packaged versions of this function.
1689
1690    currently pri (priority) is only used in a GRAN setup -- HWL
1691    ------------------------------------------------------------------------ */
1692 #if defined(GRAN)
1693 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1694 StgTSO *
1695 createThread(nat size, StgInt pri)
1696 #else
1697 StgTSO *
1698 createThread(nat size)
1699 #endif
1700 {
1701
1702     StgTSO *tso;
1703     nat stack_size;
1704
1705     /* First check whether we should create a thread at all */
1706 #if defined(PAR)
1707   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1708   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1709     threadsIgnored++;
1710     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1711           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1712     return END_TSO_QUEUE;
1713   }
1714   threadsCreated++;
1715 #endif
1716
1717 #if defined(GRAN)
1718   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1719 #endif
1720
1721   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1722
1723   /* catch ridiculously small stack sizes */
1724   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1725     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1726   }
1727
1728   stack_size = size - TSO_STRUCT_SIZEW;
1729
1730   tso = (StgTSO *)allocate(size);
1731   TICK_ALLOC_TSO(stack_size, 0);
1732
1733   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1734 #if defined(GRAN)
1735   SET_GRAN_HDR(tso, ThisPE);
1736 #endif
1737
1738   // Always start with the compiled code evaluator
1739   tso->what_next = ThreadRunGHC;
1740
1741   tso->id = next_thread_id++; 
1742   tso->why_blocked  = NotBlocked;
1743   tso->blocked_exceptions = NULL;
1744
1745   tso->saved_errno = 0;
1746   tso->main = NULL;
1747   
1748   tso->stack_size   = stack_size;
1749   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1750                               - TSO_STRUCT_SIZEW;
1751   tso->sp           = (P_)&(tso->stack) + stack_size;
1752
1753 #ifdef PROFILING
1754   tso->prof.CCCS = CCS_MAIN;
1755 #endif
1756
1757   /* put a stop frame on the stack */
1758   tso->sp -= sizeofW(StgStopFrame);
1759   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1760   tso->link = END_TSO_QUEUE;
1761
1762   // ToDo: check this
1763 #if defined(GRAN)
1764   /* uses more flexible routine in GranSim */
1765   insertThread(tso, CurrentProc);
1766 #else
1767   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1768    * from its creation
1769    */
1770 #endif
1771
1772 #if defined(GRAN) 
1773   if (RtsFlags.GranFlags.GranSimStats.Full) 
1774     DumpGranEvent(GR_START,tso);
1775 #elif defined(PAR)
1776   if (RtsFlags.ParFlags.ParStats.Full) 
1777     DumpGranEvent(GR_STARTQ,tso);
1778   /* HACk to avoid SCHEDULE 
1779      LastTSO = tso; */
1780 #endif
1781
1782   /* Link the new thread on the global thread list.
1783    */
1784   tso->global_link = all_threads;
1785   all_threads = tso;
1786
1787 #if defined(DIST)
1788   tso->dist.priority = MandatoryPriority; //by default that is...
1789 #endif
1790
1791 #if defined(GRAN)
1792   tso->gran.pri = pri;
1793 # if defined(DEBUG)
1794   tso->gran.magic = TSO_MAGIC; // debugging only
1795 # endif
1796   tso->gran.sparkname   = 0;
1797   tso->gran.startedat   = CURRENT_TIME; 
1798   tso->gran.exported    = 0;
1799   tso->gran.basicblocks = 0;
1800   tso->gran.allocs      = 0;
1801   tso->gran.exectime    = 0;
1802   tso->gran.fetchtime   = 0;
1803   tso->gran.fetchcount  = 0;
1804   tso->gran.blocktime   = 0;
1805   tso->gran.blockcount  = 0;
1806   tso->gran.blockedat   = 0;
1807   tso->gran.globalsparks = 0;
1808   tso->gran.localsparks  = 0;
1809   if (RtsFlags.GranFlags.Light)
1810     tso->gran.clock  = Now; /* local clock */
1811   else
1812     tso->gran.clock  = 0;
1813
1814   IF_DEBUG(gran,printTSO(tso));
1815 #elif defined(PAR)
1816 # if defined(DEBUG)
1817   tso->par.magic = TSO_MAGIC; // debugging only
1818 # endif
1819   tso->par.sparkname   = 0;
1820   tso->par.startedat   = CURRENT_TIME; 
1821   tso->par.exported    = 0;
1822   tso->par.basicblocks = 0;
1823   tso->par.allocs      = 0;
1824   tso->par.exectime    = 0;
1825   tso->par.fetchtime   = 0;
1826   tso->par.fetchcount  = 0;
1827   tso->par.blocktime   = 0;
1828   tso->par.blockcount  = 0;
1829   tso->par.blockedat   = 0;
1830   tso->par.globalsparks = 0;
1831   tso->par.localsparks  = 0;
1832 #endif
1833
1834 #if defined(GRAN)
1835   globalGranStats.tot_threads_created++;
1836   globalGranStats.threads_created_on_PE[CurrentProc]++;
1837   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1838   globalGranStats.tot_sq_probes++;
1839 #elif defined(PAR)
1840   // collect parallel global statistics (currently done together with GC stats)
1841   if (RtsFlags.ParFlags.ParStats.Global &&
1842       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1843     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1844     globalParStats.tot_threads_created++;
1845   }
1846 #endif 
1847
1848 #if defined(GRAN)
1849   IF_GRAN_DEBUG(pri,
1850                 belch("==__ schedule: Created TSO %d (%p);",
1851                       CurrentProc, tso, tso->id));
1852 #elif defined(PAR)
1853     IF_PAR_DEBUG(verbose,
1854                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1855                        tso->id, tso, advisory_thread_count));
1856 #else
1857   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1858                                  tso->id, tso->stack_size));
1859 #endif    
1860   return tso;
1861 }
1862
1863 #if defined(PAR)
1864 /* RFP:
1865    all parallel thread creation calls should fall through the following routine.
1866 */
1867 StgTSO *
1868 createSparkThread(rtsSpark spark) 
1869 { StgTSO *tso;
1870   ASSERT(spark != (rtsSpark)NULL);
1871   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1872   { threadsIgnored++;
1873     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1874           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1875     return END_TSO_QUEUE;
1876   }
1877   else
1878   { threadsCreated++;
1879     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1880     if (tso==END_TSO_QUEUE)     
1881       barf("createSparkThread: Cannot create TSO");
1882 #if defined(DIST)
1883     tso->priority = AdvisoryPriority;
1884 #endif
1885     pushClosure(tso,spark);
1886     PUSH_ON_RUN_QUEUE(tso);
1887     advisory_thread_count++;    
1888   }
1889   return tso;
1890 }
1891 #endif
1892
1893 /*
1894   Turn a spark into a thread.
1895   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1896 */
1897 #if defined(PAR)
1898 StgTSO *
1899 activateSpark (rtsSpark spark) 
1900 {
1901   StgTSO *tso;
1902
1903   tso = createSparkThread(spark);
1904   if (RtsFlags.ParFlags.ParStats.Full) {   
1905     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1906     IF_PAR_DEBUG(verbose,
1907                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1908                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1909   }
1910   // ToDo: fwd info on local/global spark to thread -- HWL
1911   // tso->gran.exported =  spark->exported;
1912   // tso->gran.locked =   !spark->global;
1913   // tso->gran.sparkname = spark->name;
1914
1915   return tso;
1916 }
1917 #endif
1918
1919 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1920                                    Capability *initialCapability
1921                                    );
1922
1923
1924 /* ---------------------------------------------------------------------------
1925  * scheduleThread()
1926  *
1927  * scheduleThread puts a thread on the head of the runnable queue.
1928  * This will usually be done immediately after a thread is created.
1929  * The caller of scheduleThread must create the thread using e.g.
1930  * createThread and push an appropriate closure
1931  * on this thread's stack before the scheduler is invoked.
1932  * ------------------------------------------------------------------------ */
1933
1934 static void scheduleThread_ (StgTSO* tso);
1935
1936 void
1937 scheduleThread_(StgTSO *tso)
1938 {
1939   // Precondition: sched_mutex must be held.
1940   // The thread goes at the *end* of the run-queue, to avoid possible
1941   // starvation of any threads already on the queue.
1942   APPEND_TO_RUN_QUEUE(tso);
1943   THREAD_RUNNABLE();
1944 }
1945
1946 void
1947 scheduleThread(StgTSO* tso)
1948 {
1949   ACQUIRE_LOCK(&sched_mutex);
1950   scheduleThread_(tso);
1951   RELEASE_LOCK(&sched_mutex);
1952 }
1953
1954 #if defined(RTS_SUPPORTS_THREADS)
1955 static Condition bound_cond_cache;
1956 static int bound_cond_cache_full = 0;
1957 #endif
1958
1959
1960 SchedulerStatus
1961 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1962                    Capability *initialCapability)
1963 {
1964     // Precondition: sched_mutex must be held
1965     StgMainThread *m;
1966
1967     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1968     m->tso = tso;
1969     tso->main = m;
1970     m->ret = ret;
1971     m->stat = NoStatus;
1972     m->link = main_threads;
1973     m->prev = NULL;
1974     if (main_threads != NULL) {
1975         main_threads->prev = m;
1976     }
1977     main_threads = m;
1978
1979 #if defined(RTS_SUPPORTS_THREADS)
1980     // Allocating a new condition for each thread is expensive, so we
1981     // cache one.  This is a pretty feeble hack, but it helps speed up
1982     // consecutive call-ins quite a bit.
1983     if (bound_cond_cache_full) {
1984         m->bound_thread_cond = bound_cond_cache;
1985         bound_cond_cache_full = 0;
1986     } else {
1987         initCondition(&m->bound_thread_cond);
1988     }
1989 #endif
1990
1991     /* Put the thread on the main-threads list prior to scheduling the TSO.
1992        Failure to do so introduces a race condition in the MT case (as
1993        identified by Wolfgang Thaller), whereby the new task/OS thread 
1994        created by scheduleThread_() would complete prior to the thread
1995        that spawned it managed to put 'itself' on the main-threads list.
1996        The upshot of it all being that the worker thread wouldn't get to
1997        signal the completion of the its work item for the main thread to
1998        see (==> it got stuck waiting.)    -- sof 6/02.
1999     */
2000     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2001     
2002     APPEND_TO_RUN_QUEUE(tso);
2003     // NB. Don't call THREAD_RUNNABLE() here, because the thread is
2004     // bound and only runnable by *this* OS thread, so waking up other
2005     // workers will just slow things down.
2006
2007     return waitThread_(m, initialCapability);
2008 }
2009
2010 /* ---------------------------------------------------------------------------
2011  * initScheduler()
2012  *
2013  * Initialise the scheduler.  This resets all the queues - if the
2014  * queues contained any threads, they'll be garbage collected at the
2015  * next pass.
2016  *
2017  * ------------------------------------------------------------------------ */
2018
2019 void 
2020 initScheduler(void)
2021 {
2022 #if defined(GRAN)
2023   nat i;
2024
2025   for (i=0; i<=MAX_PROC; i++) {
2026     run_queue_hds[i]      = END_TSO_QUEUE;
2027     run_queue_tls[i]      = END_TSO_QUEUE;
2028     blocked_queue_hds[i]  = END_TSO_QUEUE;
2029     blocked_queue_tls[i]  = END_TSO_QUEUE;
2030     ccalling_threadss[i]  = END_TSO_QUEUE;
2031     sleeping_queue        = END_TSO_QUEUE;
2032   }
2033 #else
2034   run_queue_hd      = END_TSO_QUEUE;
2035   run_queue_tl      = END_TSO_QUEUE;
2036   blocked_queue_hd  = END_TSO_QUEUE;
2037   blocked_queue_tl  = END_TSO_QUEUE;
2038   sleeping_queue    = END_TSO_QUEUE;
2039 #endif 
2040
2041   suspended_ccalling_threads  = END_TSO_QUEUE;
2042
2043   main_threads = NULL;
2044   all_threads  = END_TSO_QUEUE;
2045
2046   context_switch = 0;
2047   interrupted    = 0;
2048
2049   RtsFlags.ConcFlags.ctxtSwitchTicks =
2050       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2051       
2052 #if defined(RTS_SUPPORTS_THREADS)
2053   /* Initialise the mutex and condition variables used by
2054    * the scheduler. */
2055   initMutex(&sched_mutex);
2056   initMutex(&term_mutex);
2057 #endif
2058   
2059   ACQUIRE_LOCK(&sched_mutex);
2060
2061   /* A capability holds the state a native thread needs in
2062    * order to execute STG code. At least one capability is
2063    * floating around (only SMP builds have more than one).
2064    */
2065   initCapabilities();
2066   
2067 #if defined(RTS_SUPPORTS_THREADS)
2068     /* start our haskell execution tasks */
2069     startTaskManager(0,taskStart);
2070 #endif
2071
2072 #if /* defined(SMP) ||*/ defined(PAR)
2073   initSparkPools();
2074 #endif
2075
2076   RELEASE_LOCK(&sched_mutex);
2077 }
2078
2079 void
2080 exitScheduler( void )
2081 {
2082 #if defined(RTS_SUPPORTS_THREADS)
2083   stopTaskManager();
2084 #endif
2085   shutting_down_scheduler = rtsTrue;
2086 }
2087
2088 /* ----------------------------------------------------------------------------
2089    Managing the per-task allocation areas.
2090    
2091    Each capability comes with an allocation area.  These are
2092    fixed-length block lists into which allocation can be done.
2093
2094    ToDo: no support for two-space collection at the moment???
2095    ------------------------------------------------------------------------- */
2096
2097 static
2098 SchedulerStatus
2099 waitThread_(StgMainThread* m, Capability *initialCapability)
2100 {
2101   SchedulerStatus stat;
2102
2103   // Precondition: sched_mutex must be held.
2104   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2105
2106 #if defined(GRAN)
2107   /* GranSim specific init */
2108   CurrentTSO = m->tso;                // the TSO to run
2109   procStatus[MainProc] = Busy;        // status of main PE
2110   CurrentProc = MainProc;             // PE to run it on
2111   schedule(m,initialCapability);
2112 #else
2113   schedule(m,initialCapability);
2114   ASSERT(m->stat != NoStatus);
2115 #endif
2116
2117   stat = m->stat;
2118
2119 #if defined(RTS_SUPPORTS_THREADS)
2120   // Free the condition variable, returning it to the cache if possible.
2121   if (!bound_cond_cache_full) {
2122       bound_cond_cache = m->bound_thread_cond;
2123       bound_cond_cache_full = 1;
2124   } else {
2125       closeCondition(&m->bound_thread_cond);
2126   }
2127 #endif
2128
2129   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2130   stgFree(m);
2131
2132   // Postcondition: sched_mutex still held
2133   return stat;
2134 }
2135
2136 /* ---------------------------------------------------------------------------
2137    Where are the roots that we know about?
2138
2139         - all the threads on the runnable queue
2140         - all the threads on the blocked queue
2141         - all the threads on the sleeping queue
2142         - all the thread currently executing a _ccall_GC
2143         - all the "main threads"
2144      
2145    ------------------------------------------------------------------------ */
2146
2147 /* This has to be protected either by the scheduler monitor, or by the
2148         garbage collection monitor (probably the latter).
2149         KH @ 25/10/99
2150 */
2151
2152 void
2153 GetRoots( evac_fn evac )
2154 {
2155 #if defined(GRAN)
2156   {
2157     nat i;
2158     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2159       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2160           evac((StgClosure **)&run_queue_hds[i]);
2161       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2162           evac((StgClosure **)&run_queue_tls[i]);
2163       
2164       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2165           evac((StgClosure **)&blocked_queue_hds[i]);
2166       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2167           evac((StgClosure **)&blocked_queue_tls[i]);
2168       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2169           evac((StgClosure **)&ccalling_threads[i]);
2170     }
2171   }
2172
2173   markEventQueue();
2174
2175 #else /* !GRAN */
2176   if (run_queue_hd != END_TSO_QUEUE) {
2177       ASSERT(run_queue_tl != END_TSO_QUEUE);
2178       evac((StgClosure **)&run_queue_hd);
2179       evac((StgClosure **)&run_queue_tl);
2180   }
2181   
2182   if (blocked_queue_hd != END_TSO_QUEUE) {
2183       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2184       evac((StgClosure **)&blocked_queue_hd);
2185       evac((StgClosure **)&blocked_queue_tl);
2186   }
2187   
2188   if (sleeping_queue != END_TSO_QUEUE) {
2189       evac((StgClosure **)&sleeping_queue);
2190   }
2191 #endif 
2192
2193   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2194       evac((StgClosure **)&suspended_ccalling_threads);
2195   }
2196
2197 #if defined(PAR) || defined(GRAN)
2198   markSparkQueue(evac);
2199 #endif
2200
2201 #if defined(RTS_USER_SIGNALS)
2202   // mark the signal handlers (signals should be already blocked)
2203   markSignalHandlers(evac);
2204 #endif
2205 }
2206
2207 /* -----------------------------------------------------------------------------
2208    performGC
2209
2210    This is the interface to the garbage collector from Haskell land.
2211    We provide this so that external C code can allocate and garbage
2212    collect when called from Haskell via _ccall_GC.
2213
2214    It might be useful to provide an interface whereby the programmer
2215    can specify more roots (ToDo).
2216    
2217    This needs to be protected by the GC condition variable above.  KH.
2218    -------------------------------------------------------------------------- */
2219
2220 static void (*extra_roots)(evac_fn);
2221
2222 void
2223 performGC(void)
2224 {
2225   /* Obligated to hold this lock upon entry */
2226   ACQUIRE_LOCK(&sched_mutex);
2227   GarbageCollect(GetRoots,rtsFalse);
2228   RELEASE_LOCK(&sched_mutex);
2229 }
2230
2231 void
2232 performMajorGC(void)
2233 {
2234   ACQUIRE_LOCK(&sched_mutex);
2235   GarbageCollect(GetRoots,rtsTrue);
2236   RELEASE_LOCK(&sched_mutex);
2237 }
2238
2239 static void
2240 AllRoots(evac_fn evac)
2241 {
2242     GetRoots(evac);             // the scheduler's roots
2243     extra_roots(evac);          // the user's roots
2244 }
2245
2246 void
2247 performGCWithRoots(void (*get_roots)(evac_fn))
2248 {
2249   ACQUIRE_LOCK(&sched_mutex);
2250   extra_roots = get_roots;
2251   GarbageCollect(AllRoots,rtsFalse);
2252   RELEASE_LOCK(&sched_mutex);
2253 }
2254
2255 /* -----------------------------------------------------------------------------
2256    Stack overflow
2257
2258    If the thread has reached its maximum stack size, then raise the
2259    StackOverflow exception in the offending thread.  Otherwise
2260    relocate the TSO into a larger chunk of memory and adjust its stack
2261    size appropriately.
2262    -------------------------------------------------------------------------- */
2263
2264 static StgTSO *
2265 threadStackOverflow(StgTSO *tso)
2266 {
2267   nat new_stack_size, new_tso_size, stack_words;
2268   StgPtr new_sp;
2269   StgTSO *dest;
2270
2271   IF_DEBUG(sanity,checkTSO(tso));
2272   if (tso->stack_size >= tso->max_stack_size) {
2273
2274     IF_DEBUG(gc,
2275              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2276                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2277              /* If we're debugging, just print out the top of the stack */
2278              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2279                                               tso->sp+64)));
2280
2281     /* Send this thread the StackOverflow exception */
2282     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2283     return tso;
2284   }
2285
2286   /* Try to double the current stack size.  If that takes us over the
2287    * maximum stack size for this thread, then use the maximum instead.
2288    * Finally round up so the TSO ends up as a whole number of blocks.
2289    */
2290   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2291   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2292                                        TSO_STRUCT_SIZE)/sizeof(W_);
2293   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2294   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2295
2296   IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2297
2298   dest = (StgTSO *)allocate(new_tso_size);
2299   TICK_ALLOC_TSO(new_stack_size,0);
2300
2301   /* copy the TSO block and the old stack into the new area */
2302   memcpy(dest,tso,TSO_STRUCT_SIZE);
2303   stack_words = tso->stack + tso->stack_size - tso->sp;
2304   new_sp = (P_)dest + new_tso_size - stack_words;
2305   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2306
2307   /* relocate the stack pointers... */
2308   dest->sp         = new_sp;
2309   dest->stack_size = new_stack_size;
2310         
2311   /* Mark the old TSO as relocated.  We have to check for relocated
2312    * TSOs in the garbage collector and any primops that deal with TSOs.
2313    *
2314    * It's important to set the sp value to just beyond the end
2315    * of the stack, so we don't attempt to scavenge any part of the
2316    * dead TSO's stack.
2317    */
2318   tso->what_next = ThreadRelocated;
2319   tso->link = dest;
2320   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2321   tso->why_blocked = NotBlocked;
2322   dest->mut_link = NULL;
2323
2324   IF_PAR_DEBUG(verbose,
2325                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2326                      tso->id, tso, tso->stack_size);
2327                /* If we're debugging, just print out the top of the stack */
2328                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2329                                                 tso->sp+64)));
2330   
2331   IF_DEBUG(sanity,checkTSO(tso));
2332 #if 0
2333   IF_DEBUG(scheduler,printTSO(dest));
2334 #endif
2335
2336   return dest;
2337 }
2338
2339 /* ---------------------------------------------------------------------------
2340    Wake up a queue that was blocked on some resource.
2341    ------------------------------------------------------------------------ */
2342
2343 #if defined(GRAN)
2344 STATIC_INLINE void
2345 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2346 {
2347 }
2348 #elif defined(PAR)
2349 STATIC_INLINE void
2350 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2351 {
2352   /* write RESUME events to log file and
2353      update blocked and fetch time (depending on type of the orig closure) */
2354   if (RtsFlags.ParFlags.ParStats.Full) {
2355     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2356                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2357                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2358     if (EMPTY_RUN_QUEUE())
2359       emitSchedule = rtsTrue;
2360
2361     switch (get_itbl(node)->type) {
2362         case FETCH_ME_BQ:
2363           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2364           break;
2365         case RBH:
2366         case FETCH_ME:
2367         case BLACKHOLE_BQ:
2368           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2369           break;
2370 #ifdef DIST
2371         case MVAR:
2372           break;
2373 #endif    
2374         default:
2375           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2376         }
2377       }
2378 }
2379 #endif
2380
2381 #if defined(GRAN)
2382 static StgBlockingQueueElement *
2383 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2384 {
2385     StgTSO *tso;
2386     PEs node_loc, tso_loc;
2387
2388     node_loc = where_is(node); // should be lifted out of loop
2389     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2390     tso_loc = where_is((StgClosure *)tso);
2391     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2392       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2393       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2394       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2395       // insertThread(tso, node_loc);
2396       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2397                 ResumeThread,
2398                 tso, node, (rtsSpark*)NULL);
2399       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2400       // len_local++;
2401       // len++;
2402     } else { // TSO is remote (actually should be FMBQ)
2403       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2404                                   RtsFlags.GranFlags.Costs.gunblocktime +
2405                                   RtsFlags.GranFlags.Costs.latency;
2406       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2407                 UnblockThread,
2408                 tso, node, (rtsSpark*)NULL);
2409       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2410       // len++;
2411     }
2412     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2413     IF_GRAN_DEBUG(bq,
2414                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2415                           (node_loc==tso_loc ? "Local" : "Global"), 
2416                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2417     tso->block_info.closure = NULL;
2418     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2419                              tso->id, tso));
2420 }
2421 #elif defined(PAR)
2422 static StgBlockingQueueElement *
2423 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2424 {
2425     StgBlockingQueueElement *next;
2426
2427     switch (get_itbl(bqe)->type) {
2428     case TSO:
2429       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2430       /* if it's a TSO just push it onto the run_queue */
2431       next = bqe->link;
2432       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2433       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2434       THREAD_RUNNABLE();
2435       unblockCount(bqe, node);
2436       /* reset blocking status after dumping event */
2437       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2438       break;
2439
2440     case BLOCKED_FETCH:
2441       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2442       next = bqe->link;
2443       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2444       PendingFetches = (StgBlockedFetch *)bqe;
2445       break;
2446
2447 # if defined(DEBUG)
2448       /* can ignore this case in a non-debugging setup; 
2449          see comments on RBHSave closures above */
2450     case CONSTR:
2451       /* check that the closure is an RBHSave closure */
2452       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2453              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2454              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2455       break;
2456
2457     default:
2458       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2459            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2460            (StgClosure *)bqe);
2461 # endif
2462     }
2463   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2464   return next;
2465 }
2466
2467 #else /* !GRAN && !PAR */
2468 static StgTSO *
2469 unblockOneLocked(StgTSO *tso)
2470 {
2471   StgTSO *next;
2472
2473   ASSERT(get_itbl(tso)->type == TSO);
2474   ASSERT(tso->why_blocked != NotBlocked);
2475   tso->why_blocked = NotBlocked;
2476   next = tso->link;
2477   APPEND_TO_RUN_QUEUE(tso);
2478   THREAD_RUNNABLE();
2479   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2480   return next;
2481 }
2482 #endif
2483
2484 #if defined(GRAN) || defined(PAR)
2485 INLINE_ME StgBlockingQueueElement *
2486 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2487 {
2488   ACQUIRE_LOCK(&sched_mutex);
2489   bqe = unblockOneLocked(bqe, node);
2490   RELEASE_LOCK(&sched_mutex);
2491   return bqe;
2492 }
2493 #else
2494 INLINE_ME StgTSO *
2495 unblockOne(StgTSO *tso)
2496 {
2497   ACQUIRE_LOCK(&sched_mutex);
2498   tso = unblockOneLocked(tso);
2499   RELEASE_LOCK(&sched_mutex);
2500   return tso;
2501 }
2502 #endif
2503
2504 #if defined(GRAN)
2505 void 
2506 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2507 {
2508   StgBlockingQueueElement *bqe;
2509   PEs node_loc;
2510   nat len = 0; 
2511
2512   IF_GRAN_DEBUG(bq, 
2513                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2514                       node, CurrentProc, CurrentTime[CurrentProc], 
2515                       CurrentTSO->id, CurrentTSO));
2516
2517   node_loc = where_is(node);
2518
2519   ASSERT(q == END_BQ_QUEUE ||
2520          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2521          get_itbl(q)->type == CONSTR); // closure (type constructor)
2522   ASSERT(is_unique(node));
2523
2524   /* FAKE FETCH: magically copy the node to the tso's proc;
2525      no Fetch necessary because in reality the node should not have been 
2526      moved to the other PE in the first place
2527   */
2528   if (CurrentProc!=node_loc) {
2529     IF_GRAN_DEBUG(bq, 
2530                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2531                         node, node_loc, CurrentProc, CurrentTSO->id, 
2532                         // CurrentTSO, where_is(CurrentTSO),
2533                         node->header.gran.procs));
2534     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2535     IF_GRAN_DEBUG(bq, 
2536                   belch("## new bitmask of node %p is %#x",
2537                         node, node->header.gran.procs));
2538     if (RtsFlags.GranFlags.GranSimStats.Global) {
2539       globalGranStats.tot_fake_fetches++;
2540     }
2541   }
2542
2543   bqe = q;
2544   // ToDo: check: ASSERT(CurrentProc==node_loc);
2545   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2546     //next = bqe->link;
2547     /* 
2548        bqe points to the current element in the queue
2549        next points to the next element in the queue
2550     */
2551     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2552     //tso_loc = where_is(tso);
2553     len++;
2554     bqe = unblockOneLocked(bqe, node);
2555   }
2556
2557   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2558      the closure to make room for the anchor of the BQ */
2559   if (bqe!=END_BQ_QUEUE) {
2560     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2561     /*
2562     ASSERT((info_ptr==&RBH_Save_0_info) ||
2563            (info_ptr==&RBH_Save_1_info) ||
2564            (info_ptr==&RBH_Save_2_info));
2565     */
2566     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2567     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2568     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2569
2570     IF_GRAN_DEBUG(bq,
2571                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2572                         node, info_type(node)));
2573   }
2574
2575   /* statistics gathering */
2576   if (RtsFlags.GranFlags.GranSimStats.Global) {
2577     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2578     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2579     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2580     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2581   }
2582   IF_GRAN_DEBUG(bq,
2583                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2584                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2585 }
2586 #elif defined(PAR)
2587 void 
2588 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2589 {
2590   StgBlockingQueueElement *bqe;
2591
2592   ACQUIRE_LOCK(&sched_mutex);
2593
2594   IF_PAR_DEBUG(verbose, 
2595                belch("##-_ AwBQ for node %p on [%x]: ",
2596                      node, mytid));
2597 #ifdef DIST  
2598   //RFP
2599   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2600     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2601     return;
2602   }
2603 #endif
2604   
2605   ASSERT(q == END_BQ_QUEUE ||
2606          get_itbl(q)->type == TSO ||           
2607          get_itbl(q)->type == BLOCKED_FETCH || 
2608          get_itbl(q)->type == CONSTR); 
2609
2610   bqe = q;
2611   while (get_itbl(bqe)->type==TSO || 
2612          get_itbl(bqe)->type==BLOCKED_FETCH) {
2613     bqe = unblockOneLocked(bqe, node);
2614   }
2615   RELEASE_LOCK(&sched_mutex);
2616 }
2617
2618 #else   /* !GRAN && !PAR */
2619
2620 void
2621 awakenBlockedQueueNoLock(StgTSO *tso)
2622 {
2623   while (tso != END_TSO_QUEUE) {
2624     tso = unblockOneLocked(tso);
2625   }
2626 }
2627
2628 void
2629 awakenBlockedQueue(StgTSO *tso)
2630 {
2631   ACQUIRE_LOCK(&sched_mutex);
2632   while (tso != END_TSO_QUEUE) {
2633     tso = unblockOneLocked(tso);
2634   }
2635   RELEASE_LOCK(&sched_mutex);
2636 }
2637 #endif
2638
2639 /* ---------------------------------------------------------------------------
2640    Interrupt execution
2641    - usually called inside a signal handler so it mustn't do anything fancy.   
2642    ------------------------------------------------------------------------ */
2643
2644 void
2645 interruptStgRts(void)
2646 {
2647     interrupted    = 1;
2648     context_switch = 1;
2649 #ifdef RTS_SUPPORTS_THREADS
2650     wakeBlockedWorkerThread();
2651 #endif
2652 }
2653
2654 /* -----------------------------------------------------------------------------
2655    Unblock a thread
2656
2657    This is for use when we raise an exception in another thread, which
2658    may be blocked.
2659    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2660    -------------------------------------------------------------------------- */
2661
2662 #if defined(GRAN) || defined(PAR)
2663 /*
2664   NB: only the type of the blocking queue is different in GranSim and GUM
2665       the operations on the queue-elements are the same
2666       long live polymorphism!
2667
2668   Locks: sched_mutex is held upon entry and exit.
2669
2670 */
2671 static void
2672 unblockThread(StgTSO *tso)
2673 {
2674   StgBlockingQueueElement *t, **last;
2675
2676   switch (tso->why_blocked) {
2677
2678   case NotBlocked:
2679     return;  /* not blocked */
2680
2681   case BlockedOnMVar:
2682     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2683     {
2684       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2685       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2686
2687       last = (StgBlockingQueueElement **)&mvar->head;
2688       for (t = (StgBlockingQueueElement *)mvar->head; 
2689            t != END_BQ_QUEUE; 
2690            last = &t->link, last_tso = t, t = t->link) {
2691         if (t == (StgBlockingQueueElement *)tso) {
2692           *last = (StgBlockingQueueElement *)tso->link;
2693           if (mvar->tail == tso) {
2694             mvar->tail = (StgTSO *)last_tso;
2695           }
2696           goto done;
2697         }
2698       }
2699       barf("unblockThread (MVAR): TSO not found");
2700     }
2701
2702   case BlockedOnBlackHole:
2703     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2704     {
2705       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2706
2707       last = &bq->blocking_queue;
2708       for (t = bq->blocking_queue; 
2709            t != END_BQ_QUEUE; 
2710            last = &t->link, t = t->link) {
2711         if (t == (StgBlockingQueueElement *)tso) {
2712           *last = (StgBlockingQueueElement *)tso->link;
2713           goto done;
2714         }
2715       }
2716       barf("unblockThread (BLACKHOLE): TSO not found");
2717     }
2718
2719   case BlockedOnException:
2720     {
2721       StgTSO *target  = tso->block_info.tso;
2722
2723       ASSERT(get_itbl(target)->type == TSO);
2724
2725       if (target->what_next == ThreadRelocated) {
2726           target = target->link;
2727           ASSERT(get_itbl(target)->type == TSO);
2728       }
2729
2730       ASSERT(target->blocked_exceptions != NULL);
2731
2732       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2733       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2734            t != END_BQ_QUEUE; 
2735            last = &t->link, t = t->link) {
2736         ASSERT(get_itbl(t)->type == TSO);
2737         if (t == (StgBlockingQueueElement *)tso) {
2738           *last = (StgBlockingQueueElement *)tso->link;
2739           goto done;
2740         }
2741       }
2742       barf("unblockThread (Exception): TSO not found");
2743     }
2744
2745   case BlockedOnRead:
2746   case BlockedOnWrite:
2747 #if defined(mingw32_TARGET_OS)
2748   case BlockedOnDoProc:
2749 #endif
2750     {
2751       /* take TSO off blocked_queue */
2752       StgBlockingQueueElement *prev = NULL;
2753       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2754            prev = t, t = t->link) {
2755         if (t == (StgBlockingQueueElement *)tso) {
2756           if (prev == NULL) {
2757             blocked_queue_hd = (StgTSO *)t->link;
2758             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2759               blocked_queue_tl = END_TSO_QUEUE;
2760             }
2761           } else {
2762             prev->link = t->link;
2763             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2764               blocked_queue_tl = (StgTSO *)prev;
2765             }
2766           }
2767           goto done;
2768         }
2769       }
2770       barf("unblockThread (I/O): TSO not found");
2771     }
2772
2773   case BlockedOnDelay:
2774     {
2775       /* take TSO off sleeping_queue */
2776       StgBlockingQueueElement *prev = NULL;
2777       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2778            prev = t, t = t->link) {
2779         if (t == (StgBlockingQueueElement *)tso) {
2780           if (prev == NULL) {
2781             sleeping_queue = (StgTSO *)t->link;
2782           } else {
2783             prev->link = t->link;
2784           }
2785           goto done;
2786         }
2787       }
2788       barf("unblockThread (delay): TSO not found");
2789     }
2790
2791   default:
2792     barf("unblockThread");
2793   }
2794
2795  done:
2796   tso->link = END_TSO_QUEUE;
2797   tso->why_blocked = NotBlocked;
2798   tso->block_info.closure = NULL;
2799   PUSH_ON_RUN_QUEUE(tso);
2800 }
2801 #else
2802 static void
2803 unblockThread(StgTSO *tso)
2804 {
2805   StgTSO *t, **last;
2806   
2807   /* To avoid locking unnecessarily. */
2808   if (tso->why_blocked == NotBlocked) {
2809     return;
2810   }
2811
2812   switch (tso->why_blocked) {
2813
2814   case BlockedOnMVar:
2815     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2816     {
2817       StgTSO *last_tso = END_TSO_QUEUE;
2818       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2819
2820       last = &mvar->head;
2821       for (t = mvar->head; t != END_TSO_QUEUE; 
2822            last = &t->link, last_tso = t, t = t->link) {
2823         if (t == tso) {
2824           *last = tso->link;
2825           if (mvar->tail == tso) {
2826             mvar->tail = last_tso;
2827           }
2828           goto done;
2829         }
2830       }
2831       barf("unblockThread (MVAR): TSO not found");
2832     }
2833
2834   case BlockedOnBlackHole:
2835     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2836     {
2837       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2838
2839       last = &bq->blocking_queue;
2840       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2841            last = &t->link, t = t->link) {
2842         if (t == tso) {
2843           *last = tso->link;
2844           goto done;
2845         }
2846       }
2847       barf("unblockThread (BLACKHOLE): TSO not found");
2848     }
2849
2850   case BlockedOnException:
2851     {
2852       StgTSO *target  = tso->block_info.tso;
2853
2854       ASSERT(get_itbl(target)->type == TSO);
2855
2856       while (target->what_next == ThreadRelocated) {
2857           target = target->link;
2858           ASSERT(get_itbl(target)->type == TSO);
2859       }
2860       
2861       ASSERT(target->blocked_exceptions != NULL);
2862
2863       last = &target->blocked_exceptions;
2864       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2865            last = &t->link, t = t->link) {
2866         ASSERT(get_itbl(t)->type == TSO);
2867         if (t == tso) {
2868           *last = tso->link;
2869           goto done;
2870         }
2871       }
2872       barf("unblockThread (Exception): TSO not found");
2873     }
2874
2875   case BlockedOnRead:
2876   case BlockedOnWrite:
2877 #if defined(mingw32_TARGET_OS)
2878   case BlockedOnDoProc:
2879 #endif
2880     {
2881       StgTSO *prev = NULL;
2882       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2883            prev = t, t = t->link) {
2884         if (t == tso) {
2885           if (prev == NULL) {
2886             blocked_queue_hd = t->link;
2887             if (blocked_queue_tl == t) {
2888               blocked_queue_tl = END_TSO_QUEUE;
2889             }
2890           } else {
2891             prev->link = t->link;
2892             if (blocked_queue_tl == t) {
2893               blocked_queue_tl = prev;
2894             }
2895           }
2896           goto done;
2897         }
2898       }
2899       barf("unblockThread (I/O): TSO not found");
2900     }
2901
2902   case BlockedOnDelay:
2903     {
2904       StgTSO *prev = NULL;
2905       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2906            prev = t, t = t->link) {
2907         if (t == tso) {
2908           if (prev == NULL) {
2909             sleeping_queue = t->link;
2910           } else {
2911             prev->link = t->link;
2912           }
2913           goto done;
2914         }
2915       }
2916       barf("unblockThread (delay): TSO not found");
2917     }
2918
2919   default:
2920     barf("unblockThread");
2921   }
2922
2923  done:
2924   tso->link = END_TSO_QUEUE;
2925   tso->why_blocked = NotBlocked;
2926   tso->block_info.closure = NULL;
2927   APPEND_TO_RUN_QUEUE(tso);
2928 }
2929 #endif
2930
2931 /* -----------------------------------------------------------------------------
2932  * raiseAsync()
2933  *
2934  * The following function implements the magic for raising an
2935  * asynchronous exception in an existing thread.
2936  *
2937  * We first remove the thread from any queue on which it might be
2938  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2939  *
2940  * We strip the stack down to the innermost CATCH_FRAME, building
2941  * thunks in the heap for all the active computations, so they can 
2942  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2943  * an application of the handler to the exception, and push it on
2944  * the top of the stack.
2945  * 
2946  * How exactly do we save all the active computations?  We create an
2947  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2948  * AP_STACKs pushes everything from the corresponding update frame
2949  * upwards onto the stack.  (Actually, it pushes everything up to the
2950  * next update frame plus a pointer to the next AP_STACK object.
2951  * Entering the next AP_STACK object pushes more onto the stack until we
2952  * reach the last AP_STACK object - at which point the stack should look
2953  * exactly as it did when we killed the TSO and we can continue
2954  * execution by entering the closure on top of the stack.
2955  *
2956  * We can also kill a thread entirely - this happens if either (a) the 
2957  * exception passed to raiseAsync is NULL, or (b) there's no
2958  * CATCH_FRAME on the stack.  In either case, we strip the entire
2959  * stack and replace the thread with a zombie.
2960  *
2961  * Locks: sched_mutex held upon entry nor exit.
2962  *
2963  * -------------------------------------------------------------------------- */
2964  
2965 void 
2966 deleteThread(StgTSO *tso)
2967 {
2968   raiseAsync(tso,NULL);
2969 }
2970
2971 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2972 static void 
2973 deleteThreadImmediately(StgTSO *tso)
2974 { // for forkProcess only:
2975   // delete thread without giving it a chance to catch the KillThread exception
2976
2977   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2978       return;
2979   }
2980
2981   if (tso->why_blocked != BlockedOnCCall &&
2982       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
2983     unblockThread(tso);
2984   }
2985
2986   tso->what_next = ThreadKilled;
2987 }
2988 #endif
2989
2990 void
2991 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
2992 {
2993   /* When raising async exs from contexts where sched_mutex isn't held;
2994      use raiseAsyncWithLock(). */
2995   ACQUIRE_LOCK(&sched_mutex);
2996   raiseAsync(tso,exception);
2997   RELEASE_LOCK(&sched_mutex);
2998 }
2999
3000 void
3001 raiseAsync(StgTSO *tso, StgClosure *exception)
3002 {
3003     StgRetInfoTable *info;
3004     StgPtr sp;
3005   
3006     // Thread already dead?
3007     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3008         return;
3009     }
3010
3011     IF_DEBUG(scheduler, 
3012              sched_belch("raising exception in thread %ld.", tso->id));
3013     
3014     // Remove it from any blocking queues
3015     unblockThread(tso);
3016
3017     sp = tso->sp;
3018     
3019     // The stack freezing code assumes there's a closure pointer on
3020     // the top of the stack, so we have to arrange that this is the case...
3021     //
3022     if (sp[0] == (W_)&stg_enter_info) {
3023         sp++;
3024     } else {
3025         sp--;
3026         sp[0] = (W_)&stg_dummy_ret_closure;
3027     }
3028
3029     while (1) {
3030         nat i;
3031
3032         // 1. Let the top of the stack be the "current closure"
3033         //
3034         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3035         // CATCH_FRAME.
3036         //
3037         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3038         // current closure applied to the chunk of stack up to (but not
3039         // including) the update frame.  This closure becomes the "current
3040         // closure".  Go back to step 2.
3041         //
3042         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3043         // top of the stack applied to the exception.
3044         // 
3045         // 5. If it's a STOP_FRAME, then kill the thread.
3046         
3047         StgPtr frame;
3048         
3049         frame = sp + 1;
3050         info = get_ret_itbl((StgClosure *)frame);
3051         
3052         while (info->i.type != UPDATE_FRAME
3053                && (info->i.type != CATCH_FRAME || exception == NULL)
3054                && info->i.type != STOP_FRAME) {
3055             frame += stack_frame_sizeW((StgClosure *)frame);
3056             info = get_ret_itbl((StgClosure *)frame);
3057         }
3058         
3059         switch (info->i.type) {
3060             
3061         case CATCH_FRAME:
3062             // If we find a CATCH_FRAME, and we've got an exception to raise,
3063             // then build the THUNK raise(exception), and leave it on
3064             // top of the CATCH_FRAME ready to enter.
3065             //
3066         {
3067 #ifdef PROFILING
3068             StgCatchFrame *cf = (StgCatchFrame *)frame;
3069 #endif
3070             StgClosure *raise;
3071             
3072             // we've got an exception to raise, so let's pass it to the
3073             // handler in this frame.
3074             //
3075             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3076             TICK_ALLOC_SE_THK(1,0);
3077             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3078             raise->payload[0] = exception;
3079             
3080             // throw away the stack from Sp up to the CATCH_FRAME.
3081             //
3082             sp = frame - 1;
3083             
3084             /* Ensure that async excpetions are blocked now, so we don't get
3085              * a surprise exception before we get around to executing the
3086              * handler.
3087              */
3088             if (tso->blocked_exceptions == NULL) {
3089                 tso->blocked_exceptions = END_TSO_QUEUE;
3090             }
3091             
3092             /* Put the newly-built THUNK on top of the stack, ready to execute
3093              * when the thread restarts.
3094              */
3095             sp[0] = (W_)raise;
3096             sp[-1] = (W_)&stg_enter_info;
3097             tso->sp = sp-1;
3098             tso->what_next = ThreadRunGHC;
3099             IF_DEBUG(sanity, checkTSO(tso));
3100             return;
3101         }
3102         
3103         case UPDATE_FRAME:
3104         {
3105             StgAP_STACK * ap;
3106             nat words;
3107             
3108             // First build an AP_STACK consisting of the stack chunk above the
3109             // current update frame, with the top word on the stack as the
3110             // fun field.
3111             //
3112             words = frame - sp - 1;
3113             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3114             
3115             ap->size = words;
3116             ap->fun  = (StgClosure *)sp[0];
3117             sp++;
3118             for(i=0; i < (nat)words; ++i) {
3119                 ap->payload[i] = (StgClosure *)*sp++;
3120             }
3121             
3122             SET_HDR(ap,&stg_AP_STACK_info,
3123                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3124             TICK_ALLOC_UP_THK(words+1,0);
3125             
3126             IF_DEBUG(scheduler,
3127                      fprintf(stderr,  "sched: Updating ");
3128                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3129                      fprintf(stderr,  " with ");
3130                      printObj((StgClosure *)ap);
3131                 );
3132
3133             // Replace the updatee with an indirection - happily
3134             // this will also wake up any threads currently
3135             // waiting on the result.
3136             //
3137             // Warning: if we're in a loop, more than one update frame on
3138             // the stack may point to the same object.  Be careful not to
3139             // overwrite an IND_OLDGEN in this case, because we'll screw
3140             // up the mutable lists.  To be on the safe side, don't
3141             // overwrite any kind of indirection at all.  See also
3142             // threadSqueezeStack in GC.c, where we have to make a similar
3143             // check.
3144             //
3145             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3146                 // revert the black hole
3147                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3148             }
3149             sp += sizeofW(StgUpdateFrame) - 1;
3150             sp[0] = (W_)ap; // push onto stack
3151             break;
3152         }
3153         
3154         case STOP_FRAME:
3155             // We've stripped the entire stack, the thread is now dead.
3156             sp += sizeofW(StgStopFrame);
3157             tso->what_next = ThreadKilled;
3158             tso->sp = sp;
3159             return;
3160             
3161         default:
3162             barf("raiseAsync");
3163         }
3164     }
3165     barf("raiseAsync");
3166 }
3167
3168 /* -----------------------------------------------------------------------------
3169    resurrectThreads is called after garbage collection on the list of
3170    threads found to be garbage.  Each of these threads will be woken
3171    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3172    on an MVar, or NonTermination if the thread was blocked on a Black
3173    Hole.
3174
3175    Locks: sched_mutex isn't held upon entry nor exit.
3176    -------------------------------------------------------------------------- */
3177
3178 void
3179 resurrectThreads( StgTSO *threads )
3180 {
3181   StgTSO *tso, *next;
3182
3183   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3184     next = tso->global_link;
3185     tso->global_link = all_threads;
3186     all_threads = tso;
3187     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3188
3189     switch (tso->why_blocked) {
3190     case BlockedOnMVar:
3191     case BlockedOnException:
3192       /* Called by GC - sched_mutex lock is currently held. */
3193       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3194       break;
3195     case BlockedOnBlackHole:
3196       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3197       break;
3198     case NotBlocked:
3199       /* This might happen if the thread was blocked on a black hole
3200        * belonging to a thread that we've just woken up (raiseAsync
3201        * can wake up threads, remember...).
3202        */
3203       continue;
3204     default:
3205       barf("resurrectThreads: thread blocked in a strange way");
3206     }
3207   }
3208 }
3209
3210 /* -----------------------------------------------------------------------------
3211  * Blackhole detection: if we reach a deadlock, test whether any
3212  * threads are blocked on themselves.  Any threads which are found to
3213  * be self-blocked get sent a NonTermination exception.
3214  *
3215  * This is only done in a deadlock situation in order to avoid
3216  * performance overhead in the normal case.
3217  *
3218  * Locks: sched_mutex is held upon entry and exit.
3219  * -------------------------------------------------------------------------- */
3220
3221 static void
3222 detectBlackHoles( void )
3223 {
3224     StgTSO *tso = all_threads;
3225     StgClosure *frame;
3226     StgClosure *blocked_on;
3227     StgRetInfoTable *info;
3228
3229     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3230
3231         while (tso->what_next == ThreadRelocated) {
3232             tso = tso->link;
3233             ASSERT(get_itbl(tso)->type == TSO);
3234         }
3235       
3236         if (tso->why_blocked != BlockedOnBlackHole) {
3237             continue;
3238         }
3239         blocked_on = tso->block_info.closure;
3240
3241         frame = (StgClosure *)tso->sp;
3242
3243         while(1) {
3244             info = get_ret_itbl(frame);
3245             switch (info->i.type) {
3246             case UPDATE_FRAME:
3247                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3248                     /* We are blocking on one of our own computations, so
3249                      * send this thread the NonTermination exception.  
3250                      */
3251                     IF_DEBUG(scheduler, 
3252                              sched_belch("thread %d is blocked on itself", tso->id));
3253                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3254                     goto done;
3255                 }
3256                 
3257                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3258                 continue;
3259
3260             case STOP_FRAME:
3261                 goto done;
3262
3263                 // normal stack frames; do nothing except advance the pointer
3264             default:
3265                 (StgPtr)frame += stack_frame_sizeW(frame);
3266             }
3267         }   
3268         done: ;
3269     }
3270 }
3271
3272 /* ----------------------------------------------------------------------------
3273  * Debugging: why is a thread blocked
3274  * [Also provides useful information when debugging threaded programs
3275  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3276    ------------------------------------------------------------------------- */
3277
3278 static
3279 void
3280 printThreadBlockage(StgTSO *tso)
3281 {
3282   switch (tso->why_blocked) {
3283   case BlockedOnRead:
3284     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3285     break;
3286   case BlockedOnWrite:
3287     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3288     break;
3289 #if defined(mingw32_TARGET_OS)
3290     case BlockedOnDoProc:
3291     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3292     break;
3293 #endif
3294   case BlockedOnDelay:
3295     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3296     break;
3297   case BlockedOnMVar:
3298     fprintf(stderr,"is blocked on an MVar");
3299     break;
3300   case BlockedOnException:
3301     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3302             tso->block_info.tso->id);
3303     break;
3304   case BlockedOnBlackHole:
3305     fprintf(stderr,"is blocked on a black hole");
3306     break;
3307   case NotBlocked:
3308     fprintf(stderr,"is not blocked");
3309     break;
3310 #if defined(PAR)
3311   case BlockedOnGA:
3312     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3313             tso->block_info.closure, info_type(tso->block_info.closure));
3314     break;
3315   case BlockedOnGA_NoSend:
3316     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3317             tso->block_info.closure, info_type(tso->block_info.closure));
3318     break;
3319 #endif
3320   case BlockedOnCCall:
3321     fprintf(stderr,"is blocked on an external call");
3322     break;
3323   case BlockedOnCCall_NoUnblockExc:
3324     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3325     break;
3326   default:
3327     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3328          tso->why_blocked, tso->id, tso);
3329   }
3330 }
3331
3332 static
3333 void
3334 printThreadStatus(StgTSO *tso)
3335 {
3336   switch (tso->what_next) {
3337   case ThreadKilled:
3338     fprintf(stderr,"has been killed");
3339     break;
3340   case ThreadComplete:
3341     fprintf(stderr,"has completed");
3342     break;
3343   default:
3344     printThreadBlockage(tso);
3345   }
3346 }
3347
3348 void
3349 printAllThreads(void)
3350 {
3351   StgTSO *t;
3352   void *label;
3353
3354 # if defined(GRAN)
3355   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3356   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3357                        time_string, rtsFalse/*no commas!*/);
3358
3359   fprintf(stderr, "all threads at [%s]:\n", time_string);
3360 # elif defined(PAR)
3361   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3362   ullong_format_string(CURRENT_TIME,
3363                        time_string, rtsFalse/*no commas!*/);
3364
3365   fprintf(stderr,"all threads at [%s]:\n", time_string);
3366 # else
3367   fprintf(stderr,"all threads:\n");
3368 # endif
3369
3370   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3371     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3372     label = lookupThreadLabel(t->id);
3373     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3374     printThreadStatus(t);
3375     fprintf(stderr,"\n");
3376   }
3377 }
3378     
3379 #ifdef DEBUG
3380
3381 /* 
3382    Print a whole blocking queue attached to node (debugging only).
3383 */
3384 # if defined(PAR)
3385 void 
3386 print_bq (StgClosure *node)
3387 {
3388   StgBlockingQueueElement *bqe;
3389   StgTSO *tso;
3390   rtsBool end;
3391
3392   fprintf(stderr,"## BQ of closure %p (%s): ",
3393           node, info_type(node));
3394
3395   /* should cover all closures that may have a blocking queue */
3396   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3397          get_itbl(node)->type == FETCH_ME_BQ ||
3398          get_itbl(node)->type == RBH ||
3399          get_itbl(node)->type == MVAR);
3400     
3401   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3402
3403   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3404 }
3405
3406 /* 
3407    Print a whole blocking queue starting with the element bqe.
3408 */
3409 void 
3410 print_bqe (StgBlockingQueueElement *bqe)
3411 {
3412   rtsBool end;
3413
3414   /* 
3415      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3416   */
3417   for (end = (bqe==END_BQ_QUEUE);
3418        !end; // iterate until bqe points to a CONSTR
3419        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3420        bqe = end ? END_BQ_QUEUE : bqe->link) {
3421     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3422     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3423     /* types of closures that may appear in a blocking queue */
3424     ASSERT(get_itbl(bqe)->type == TSO ||           
3425            get_itbl(bqe)->type == BLOCKED_FETCH || 
3426            get_itbl(bqe)->type == CONSTR); 
3427     /* only BQs of an RBH end with an RBH_Save closure */
3428     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3429
3430     switch (get_itbl(bqe)->type) {
3431     case TSO:
3432       fprintf(stderr," TSO %u (%x),",
3433               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3434       break;
3435     case BLOCKED_FETCH:
3436       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3437               ((StgBlockedFetch *)bqe)->node, 
3438               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3439               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3440               ((StgBlockedFetch *)bqe)->ga.weight);
3441       break;
3442     case CONSTR:
3443       fprintf(stderr," %s (IP %p),",
3444               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3445                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3446                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3447                "RBH_Save_?"), get_itbl(bqe));
3448       break;
3449     default:
3450       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3451            info_type((StgClosure *)bqe)); // , node, info_type(node));
3452       break;
3453     }
3454   } /* for */
3455   fputc('\n', stderr);
3456 }
3457 # elif defined(GRAN)
3458 void 
3459 print_bq (StgClosure *node)
3460 {
3461   StgBlockingQueueElement *bqe;
3462   PEs node_loc, tso_loc;
3463   rtsBool end;
3464
3465   /* should cover all closures that may have a blocking queue */
3466   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3467          get_itbl(node)->type == FETCH_ME_BQ ||
3468          get_itbl(node)->type == RBH);
3469     
3470   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3471   node_loc = where_is(node);
3472
3473   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3474           node, info_type(node), node_loc);
3475
3476   /* 
3477      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3478   */
3479   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3480        !end; // iterate until bqe points to a CONSTR
3481        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3482     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3483     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3484     /* types of closures that may appear in a blocking queue */
3485     ASSERT(get_itbl(bqe)->type == TSO ||           
3486            get_itbl(bqe)->type == CONSTR); 
3487     /* only BQs of an RBH end with an RBH_Save closure */
3488     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3489
3490     tso_loc = where_is((StgClosure *)bqe);
3491     switch (get_itbl(bqe)->type) {
3492     case TSO:
3493       fprintf(stderr," TSO %d (%p) on [PE %d],",
3494               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3495       break;
3496     case CONSTR:
3497       fprintf(stderr," %s (IP %p),",
3498               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3499                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3500                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3501                "RBH_Save_?"), get_itbl(bqe));
3502       break;
3503     default:
3504       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3505            info_type((StgClosure *)bqe), node, info_type(node));
3506       break;
3507     }
3508   } /* for */
3509   fputc('\n', stderr);
3510 }
3511 #else
3512 /* 
3513    Nice and easy: only TSOs on the blocking queue
3514 */
3515 void 
3516 print_bq (StgClosure *node)
3517 {
3518   StgTSO *tso;
3519
3520   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3521   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3522        tso != END_TSO_QUEUE; 
3523        tso=tso->link) {
3524     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3525     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3526     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3527   }
3528   fputc('\n', stderr);
3529 }
3530 # endif
3531
3532 #if defined(PAR)
3533 static nat
3534 run_queue_len(void)
3535 {
3536   nat i;
3537   StgTSO *tso;
3538
3539   for (i=0, tso=run_queue_hd; 
3540        tso != END_TSO_QUEUE;
3541        i++, tso=tso->link)
3542     /* nothing */
3543
3544   return i;
3545 }
3546 #endif
3547
3548 void
3549 sched_belch(char *s, ...)
3550 {
3551   va_list ap;
3552   va_start(ap,s);
3553 #ifdef RTS_SUPPORTS_THREADS
3554   fprintf(stderr, "sched (task %p): ", osThreadId());
3555 #elif defined(PAR)
3556   fprintf(stderr, "== ");
3557 #else
3558   fprintf(stderr, "sched: ");
3559 #endif
3560   vfprintf(stderr, s, ap);
3561   fprintf(stderr, "\n");
3562   fflush(stderr);
3563   va_end(ap);
3564 }
3565
3566 #endif /* DEBUG */