[project @ 2004-02-26 16:19:32 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.188 2004/02/26 16:19:32 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2003
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 /* 
21  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22
23    The main scheduling loop in GUM iterates until a finish message is received.
24    In that case a global flag @receivedFinish@ is set and this instance of
25    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
26    for the handling of incoming messages, such as PP_FINISH.
27    Note that in the parallel case we have a system manager that coordinates
28    different PEs, each of which are running one instance of the RTS.
29    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
30    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31
32  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33
34    The main scheduling code in GranSim is quite different from that in std
35    (concurrent) Haskell: while concurrent Haskell just iterates over the
36    threads in the runnable queue, GranSim is event driven, i.e. it iterates
37    over the events in the global event queue.  -- HWL
38 */
39
40 #include "PosixSource.h"
41 #include "Rts.h"
42 #include "SchedAPI.h"
43 #include "RtsUtils.h"
44 #include "RtsFlags.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "StgStartup.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Storage.h"
53 #include "Interpreter.h"
54 #include "Exception.h"
55 #include "Printer.h"
56 #include "Signals.h"
57 #include "Sanity.h"
58 #include "Stats.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #ifdef PROFILING
63 #include "Proftimer.h"
64 #include "ProfHeap.h"
65 #endif
66 #if defined(GRAN) || defined(PAR)
67 # include "GranSimRts.h"
68 # include "GranSim.h"
69 # include "ParallelRts.h"
70 # include "Parallel.h"
71 # include "ParallelDebug.h"
72 # include "FetchMe.h"
73 # include "HLC.h"
74 #endif
75 #include "Sparks.h"
76 #include "Capability.h"
77 #include "OSThreads.h"
78 #include  "Task.h"
79
80 #ifdef HAVE_SYS_TYPES_H
81 #include <sys/types.h>
82 #endif
83 #ifdef HAVE_UNISTD_H
84 #include <unistd.h>
85 #endif
86
87 #include <string.h>
88 #include <stdlib.h>
89 #include <stdarg.h>
90
91 #ifdef HAVE_ERRNO_H
92 #include <errno.h>
93 #endif
94
95 #ifdef THREADED_RTS
96 #define USED_IN_THREADED_RTS
97 #else
98 #define USED_IN_THREADED_RTS STG_UNUSED
99 #endif
100
101 #ifdef RTS_SUPPORTS_THREADS
102 #define USED_WHEN_RTS_SUPPORTS_THREADS
103 #else
104 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
105 #endif
106
107 /* Main thread queue.
108  * Locks required: sched_mutex.
109  */
110 StgMainThread *main_threads = NULL;
111
112 /* Thread queues.
113  * Locks required: sched_mutex.
114  */
115 #if defined(GRAN)
116
117 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
118 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
119
120 /* 
121    In GranSim we have a runnable and a blocked queue for each processor.
122    In order to minimise code changes new arrays run_queue_hds/tls
123    are created. run_queue_hd is then a short cut (macro) for
124    run_queue_hds[CurrentProc] (see GranSim.h).
125    -- HWL
126 */
127 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
128 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
129 StgTSO *ccalling_threadss[MAX_PROC];
130 /* We use the same global list of threads (all_threads) in GranSim as in
131    the std RTS (i.e. we are cheating). However, we don't use this list in
132    the GranSim specific code at the moment (so we are only potentially
133    cheating).  */
134
135 #else /* !GRAN */
136
137 StgTSO *run_queue_hd = NULL;
138 StgTSO *run_queue_tl = NULL;
139 StgTSO *blocked_queue_hd = NULL;
140 StgTSO *blocked_queue_tl = NULL;
141 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
142
143 #endif
144
145 /* Linked list of all threads.
146  * Used for detecting garbage collected threads.
147  */
148 StgTSO *all_threads = NULL;
149
150 /* When a thread performs a safe C call (_ccall_GC, using old
151  * terminology), it gets put on the suspended_ccalling_threads
152  * list. Used by the garbage collector.
153  */
154 static StgTSO *suspended_ccalling_threads;
155
156 static StgTSO *threadStackOverflow(StgTSO *tso);
157
158 /* KH: The following two flags are shared memory locations.  There is no need
159        to lock them, since they are only unset at the end of a scheduler
160        operation.
161 */
162
163 /* flag set by signal handler to precipitate a context switch */
164 nat context_switch = 0;
165
166 /* if this flag is set as well, give up execution */
167 rtsBool interrupted = rtsFalse;
168
169 /* Next thread ID to allocate.
170  * Locks required: thread_id_mutex
171  */
172 static StgThreadID next_thread_id = 1;
173
174 /*
175  * Pointers to the state of the current thread.
176  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
177  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
178  */
179  
180 /* The smallest stack size that makes any sense is:
181  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
182  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
183  *  + 1                       (the closure to enter)
184  *  + 1                       (stg_ap_v_ret)
185  *  + 1                       (spare slot req'd by stg_ap_v_ret)
186  *
187  * A thread with this stack will bomb immediately with a stack
188  * overflow, which will increase its stack size.  
189  */
190
191 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
192
193
194 #if defined(GRAN)
195 StgTSO *CurrentTSO;
196 #endif
197
198 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
199  *  exists - earlier gccs apparently didn't.
200  *  -= chak
201  */
202 StgTSO dummy_tso;
203
204 static rtsBool ready_to_gc;
205
206 /*
207  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
208  * in an MT setting, needed to signal that a worker thread shouldn't hang around
209  * in the scheduler when it is out of work.
210  */
211 static rtsBool shutting_down_scheduler = rtsFalse;
212
213 void            addToBlockedQueue ( StgTSO *tso );
214
215 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
216        void     interruptStgRts   ( void );
217
218 static void     detectBlackHoles  ( void );
219
220 #if defined(RTS_SUPPORTS_THREADS)
221 /* ToDo: carefully document the invariants that go together
222  *       with these synchronisation objects.
223  */
224 Mutex     sched_mutex       = INIT_MUTEX_VAR;
225 Mutex     term_mutex        = INIT_MUTEX_VAR;
226
227 #endif /* RTS_SUPPORTS_THREADS */
228
229 #if defined(PAR)
230 StgTSO *LastTSO;
231 rtsTime TimeOfLastYield;
232 rtsBool emitSchedule = rtsTrue;
233 #endif
234
235 #if DEBUG
236 static char *whatNext_strs[] = {
237   "ThreadRunGHC",
238   "ThreadInterpret",
239   "ThreadKilled",
240   "ThreadRelocated",
241   "ThreadComplete"
242 };
243 #endif
244
245 #if defined(PAR)
246 StgTSO * createSparkThread(rtsSpark spark);
247 StgTSO * activateSpark (rtsSpark spark);  
248 #endif
249
250 /* ----------------------------------------------------------------------------
251  * Starting Tasks
252  * ------------------------------------------------------------------------- */
253
254 #if defined(RTS_SUPPORTS_THREADS)
255 static rtsBool startingWorkerThread = rtsFalse;
256
257 static void taskStart(void);
258 static void
259 taskStart(void)
260 {
261   ACQUIRE_LOCK(&sched_mutex);
262   schedule(NULL,NULL);
263   RELEASE_LOCK(&sched_mutex);
264 }
265
266 void
267 startSchedulerTaskIfNecessary(void)
268 {
269   if(run_queue_hd != END_TSO_QUEUE
270     || blocked_queue_hd != END_TSO_QUEUE
271     || sleeping_queue != END_TSO_QUEUE)
272   {
273     if(!startingWorkerThread)
274     { // we don't want to start another worker thread
275       // just because the last one hasn't yet reached the
276       // "waiting for capability" state
277       startingWorkerThread = rtsTrue;
278       startTask(taskStart);
279     }
280   }
281 }
282 #endif
283
284 /* ---------------------------------------------------------------------------
285    Main scheduling loop.
286
287    We use round-robin scheduling, each thread returning to the
288    scheduler loop when one of these conditions is detected:
289
290       * out of heap space
291       * timer expires (thread yields)
292       * thread blocks
293       * thread ends
294       * stack overflow
295
296    Locking notes:  we acquire the scheduler lock once at the beginning
297    of the scheduler loop, and release it when
298     
299       * running a thread, or
300       * waiting for work, or
301       * waiting for a GC to complete.
302
303    GRAN version:
304      In a GranSim setup this loop iterates over the global event queue.
305      This revolves around the global event queue, which determines what 
306      to do next. Therefore, it's more complicated than either the 
307      concurrent or the parallel (GUM) setup.
308
309    GUM version:
310      GUM iterates over incoming messages.
311      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312      and sends out a fish whenever it has nothing to do; in-between
313      doing the actual reductions (shared code below) it processes the
314      incoming messages and deals with delayed operations 
315      (see PendingFetches).
316      This is not the ugliest code you could imagine, but it's bloody close.
317
318    ------------------------------------------------------------------------ */
319 static void
320 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
321           Capability *initialCapability )
322 {
323   StgTSO *t;
324   Capability *cap = initialCapability;
325   StgThreadReturnCode ret;
326 #if defined(GRAN)
327   rtsEvent *event;
328 #elif defined(PAR)
329   StgSparkPool *pool;
330   rtsSpark spark;
331   StgTSO *tso;
332   GlobalTaskId pe;
333   rtsBool receivedFinish = rtsFalse;
334 # if defined(DEBUG)
335   nat tp_size, sp_size; // stats only
336 # endif
337 #endif
338   rtsBool was_interrupted = rtsFalse;
339   StgTSOWhatNext prev_what_next;
340   
341   // Pre-condition: sched_mutex is held.
342  
343 #if defined(RTS_SUPPORTS_THREADS)
344   //
345   // in the threaded case, the capability is either passed in via the
346   // initialCapability parameter, or initialized inside the scheduler
347   // loop 
348   //
349   IF_DEBUG(scheduler,
350            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
351                        mainThread, initialCapability);
352       );
353 #else
354   // simply initialise it in the non-threaded case
355   grabCapability(&cap);
356 #endif
357
358 #if defined(GRAN)
359   /* set up first event to get things going */
360   /* ToDo: assign costs for system setup and init MainTSO ! */
361   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
362             ContinueThread, 
363             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
364
365   IF_DEBUG(gran,
366            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
367            G_TSO(CurrentTSO, 5));
368
369   if (RtsFlags.GranFlags.Light) {
370     /* Save current time; GranSim Light only */
371     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
372   }      
373
374   event = get_next_event();
375
376   while (event!=(rtsEvent*)NULL) {
377     /* Choose the processor with the next event */
378     CurrentProc = event->proc;
379     CurrentTSO = event->tso;
380
381 #elif defined(PAR)
382
383   while (!receivedFinish) {    /* set by processMessages */
384                                /* when receiving PP_FINISH message         */ 
385
386 #else // everything except GRAN and PAR
387
388   while (1) {
389
390 #endif
391
392      IF_DEBUG(scheduler, printAllThreads());
393
394 #if defined(RTS_SUPPORTS_THREADS)
395       // Yield the capability to higher-priority tasks if necessary.
396       //
397       if (cap != NULL) {
398           yieldCapability(&cap);
399       }
400
401       // If we do not currently hold a capability, we wait for one
402       //
403       if (cap == NULL) {
404           waitForCapability(&sched_mutex, &cap,
405                             mainThread ? &mainThread->bound_thread_cond : NULL);
406       }
407
408       // We now have a capability...
409 #endif
410
411     //
412     // If we're interrupted (the user pressed ^C, or some other
413     // termination condition occurred), kill all the currently running
414     // threads.
415     //
416     if (interrupted) {
417         IF_DEBUG(scheduler, sched_belch("interrupted"));
418         interrupted = rtsFalse;
419         was_interrupted = rtsTrue;
420 #if defined(RTS_SUPPORTS_THREADS)
421         // In the threaded RTS, deadlock detection doesn't work,
422         // so just exit right away.
423         prog_belch("interrupted");
424         releaseCapability(cap);
425         RELEASE_LOCK(&sched_mutex);
426         shutdownHaskellAndExit(EXIT_SUCCESS);
427 #else
428         deleteAllThreads();
429 #endif
430     }
431
432     //
433     // Go through the list of main threads and wake up any
434     // clients whose computations have finished.  ToDo: this
435     // should be done more efficiently without a linear scan
436     // of the main threads list, somehow...
437     //
438 #if defined(RTS_SUPPORTS_THREADS)
439     { 
440         StgMainThread *m, **prev;
441         prev = &main_threads;
442         for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
443           if (m->tso->what_next == ThreadComplete
444               || m->tso->what_next == ThreadKilled)
445           {
446             if (m == mainThread)
447             {
448               if (m->tso->what_next == ThreadComplete)
449               {
450                 if (m->ret)
451                 {
452                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
453                   *(m->ret) = (StgClosure *)m->tso->sp[1]; 
454                 }
455                 m->stat = Success;
456               }
457               else
458               {
459                 if (m->ret)
460                 {
461                   *(m->ret) = NULL;
462                 }
463                 if (was_interrupted)
464                 {
465                   m->stat = Interrupted;
466                 }
467                 else
468                 {
469                   m->stat = Killed;
470                 }
471               }
472               *prev = m->link;
473             
474 #ifdef DEBUG
475               removeThreadLabel((StgWord)m->tso->id);
476 #endif
477               releaseCapability(cap);
478               return;
479             }
480             else
481             {
482                 // The current OS thread can not handle the fact that
483                 // the Haskell thread "m" has ended.  "m" is bound;
484                 // the scheduler loop in it's bound OS thread has to
485                 // return, so let's pass our capability directly to
486                 // that thread.
487                 passCapability(&m->bound_thread_cond);
488                 continue;
489             }
490           }
491         }
492     }
493     
494 #else /* not threaded */
495
496 # if defined(PAR)
497     /* in GUM do this only on the Main PE */
498     if (IAmMainThread)
499 # endif
500     /* If our main thread has finished or been killed, return.
501      */
502     {
503       StgMainThread *m = main_threads;
504       if (m->tso->what_next == ThreadComplete
505           || m->tso->what_next == ThreadKilled) {
506 #ifdef DEBUG
507         removeThreadLabel((StgWord)m->tso->id);
508 #endif
509         main_threads = main_threads->link;
510         if (m->tso->what_next == ThreadComplete) {
511             // We finished successfully, fill in the return value
512             // NOTE: return val is tso->sp[1] (see StgStartup.hc)
513             if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
514             m->stat = Success;
515             return;
516         } else {
517           if (m->ret) { *(m->ret) = NULL; };
518           if (was_interrupted) {
519             m->stat = Interrupted;
520           } else {
521             m->stat = Killed;
522           }
523           return;
524         }
525       }
526     }
527 #endif
528
529
530 #if defined(RTS_USER_SIGNALS)
531     // check for signals each time around the scheduler
532     if (signals_pending()) {
533       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
534       startSignalHandlers();
535       ACQUIRE_LOCK(&sched_mutex);
536     }
537 #endif
538
539     /* Check whether any waiting threads need to be woken up.  If the
540      * run queue is empty, and there are no other tasks running, we
541      * can wait indefinitely for something to happen.
542      */
543     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
544 #if defined(RTS_SUPPORTS_THREADS)
545                 || EMPTY_RUN_QUEUE()
546 #endif
547         )
548     {
549       awaitEvent( EMPTY_RUN_QUEUE() );
550     }
551     /* we can be interrupted while waiting for I/O... */
552     if (interrupted) continue;
553
554     /* 
555      * Detect deadlock: when we have no threads to run, there are no
556      * threads waiting on I/O or sleeping, and all the other tasks are
557      * waiting for work, we must have a deadlock of some description.
558      *
559      * We first try to find threads blocked on themselves (ie. black
560      * holes), and generate NonTermination exceptions where necessary.
561      *
562      * If no threads are black holed, we have a deadlock situation, so
563      * inform all the main threads.
564      */
565 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
566     if (   EMPTY_THREAD_QUEUES() )
567     {
568         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
569         // Garbage collection can release some new threads due to
570         // either (a) finalizers or (b) threads resurrected because
571         // they are about to be send BlockedOnDeadMVar.  Any threads
572         // thus released will be immediately runnable.
573         GarbageCollect(GetRoots,rtsTrue);
574
575         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
576
577         IF_DEBUG(scheduler, 
578                  sched_belch("still deadlocked, checking for black holes..."));
579         detectBlackHoles();
580
581         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
582
583 #if defined(RTS_USER_SIGNALS)
584         /* If we have user-installed signal handlers, then wait
585          * for signals to arrive rather then bombing out with a
586          * deadlock.
587          */
588         if ( anyUserHandlers() ) {
589             IF_DEBUG(scheduler, 
590                      sched_belch("still deadlocked, waiting for signals..."));
591
592             awaitUserSignals();
593
594             // we might be interrupted...
595             if (interrupted) { continue; }
596
597             if (signals_pending()) {
598                 RELEASE_LOCK(&sched_mutex);
599                 startSignalHandlers();
600                 ACQUIRE_LOCK(&sched_mutex);
601             }
602             ASSERT(!EMPTY_RUN_QUEUE());
603             goto not_deadlocked;
604         }
605 #endif
606
607         /* Probably a real deadlock.  Send the current main thread the
608          * Deadlock exception (or in the SMP build, send *all* main
609          * threads the deadlock exception, since none of them can make
610          * progress).
611          */
612         {
613             StgMainThread *m;
614             m = main_threads;
615             switch (m->tso->why_blocked) {
616             case BlockedOnBlackHole:
617                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
618                 break;
619             case BlockedOnException:
620             case BlockedOnMVar:
621                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
622                 break;
623             default:
624                 barf("deadlock: main thread blocked in a strange way");
625             }
626         }
627     }
628   not_deadlocked:
629
630 #elif defined(RTS_SUPPORTS_THREADS)
631     // ToDo: add deadlock detection in threaded RTS
632 #elif defined(PAR)
633     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
634 #endif
635
636 #if defined(RTS_SUPPORTS_THREADS)
637     if ( EMPTY_RUN_QUEUE() ) {
638       continue; // nothing to do
639     }
640 #endif
641
642 #if defined(GRAN)
643     if (RtsFlags.GranFlags.Light)
644       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
645
646     /* adjust time based on time-stamp */
647     if (event->time > CurrentTime[CurrentProc] &&
648         event->evttype != ContinueThread)
649       CurrentTime[CurrentProc] = event->time;
650     
651     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
652     if (!RtsFlags.GranFlags.Light)
653       handleIdlePEs();
654
655     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
656
657     /* main event dispatcher in GranSim */
658     switch (event->evttype) {
659       /* Should just be continuing execution */
660     case ContinueThread:
661       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
662       /* ToDo: check assertion
663       ASSERT(run_queue_hd != (StgTSO*)NULL &&
664              run_queue_hd != END_TSO_QUEUE);
665       */
666       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
667       if (!RtsFlags.GranFlags.DoAsyncFetch &&
668           procStatus[CurrentProc]==Fetching) {
669         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
670               CurrentTSO->id, CurrentTSO, CurrentProc);
671         goto next_thread;
672       } 
673       /* Ignore ContinueThreads for completed threads */
674       if (CurrentTSO->what_next == ThreadComplete) {
675         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
676               CurrentTSO->id, CurrentTSO, CurrentProc);
677         goto next_thread;
678       } 
679       /* Ignore ContinueThreads for threads that are being migrated */
680       if (PROCS(CurrentTSO)==Nowhere) { 
681         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
682               CurrentTSO->id, CurrentTSO, CurrentProc);
683         goto next_thread;
684       }
685       /* The thread should be at the beginning of the run queue */
686       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
687         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
688               CurrentTSO->id, CurrentTSO, CurrentProc);
689         break; // run the thread anyway
690       }
691       /*
692       new_event(proc, proc, CurrentTime[proc],
693                 FindWork,
694                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
695       goto next_thread; 
696       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
697       break; // now actually run the thread; DaH Qu'vam yImuHbej 
698
699     case FetchNode:
700       do_the_fetchnode(event);
701       goto next_thread;             /* handle next event in event queue  */
702       
703     case GlobalBlock:
704       do_the_globalblock(event);
705       goto next_thread;             /* handle next event in event queue  */
706       
707     case FetchReply:
708       do_the_fetchreply(event);
709       goto next_thread;             /* handle next event in event queue  */
710       
711     case UnblockThread:   /* Move from the blocked queue to the tail of */
712       do_the_unblock(event);
713       goto next_thread;             /* handle next event in event queue  */
714       
715     case ResumeThread:  /* Move from the blocked queue to the tail of */
716       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
717       event->tso->gran.blocktime += 
718         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
719       do_the_startthread(event);
720       goto next_thread;             /* handle next event in event queue  */
721       
722     case StartThread:
723       do_the_startthread(event);
724       goto next_thread;             /* handle next event in event queue  */
725       
726     case MoveThread:
727       do_the_movethread(event);
728       goto next_thread;             /* handle next event in event queue  */
729       
730     case MoveSpark:
731       do_the_movespark(event);
732       goto next_thread;             /* handle next event in event queue  */
733       
734     case FindWork:
735       do_the_findwork(event);
736       goto next_thread;             /* handle next event in event queue  */
737       
738     default:
739       barf("Illegal event type %u\n", event->evttype);
740     }  /* switch */
741     
742     /* This point was scheduler_loop in the old RTS */
743
744     IF_DEBUG(gran, belch("GRAN: after main switch"));
745
746     TimeOfLastEvent = CurrentTime[CurrentProc];
747     TimeOfNextEvent = get_time_of_next_event();
748     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
749     // CurrentTSO = ThreadQueueHd;
750
751     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
752                          TimeOfNextEvent));
753
754     if (RtsFlags.GranFlags.Light) 
755       GranSimLight_leave_system(event, &ActiveTSO); 
756
757     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
758
759     IF_DEBUG(gran, 
760              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
761
762     /* in a GranSim setup the TSO stays on the run queue */
763     t = CurrentTSO;
764     /* Take a thread from the run queue. */
765     POP_RUN_QUEUE(t); // take_off_run_queue(t);
766
767     IF_DEBUG(gran, 
768              fprintf(stderr, "GRAN: About to run current thread, which is\n");
769              G_TSO(t,5));
770
771     context_switch = 0; // turned on via GranYield, checking events and time slice
772
773     IF_DEBUG(gran, 
774              DumpGranEvent(GR_SCHEDULE, t));
775
776     procStatus[CurrentProc] = Busy;
777
778 #elif defined(PAR)
779     if (PendingFetches != END_BF_QUEUE) {
780         processFetches();
781     }
782
783     /* ToDo: phps merge with spark activation above */
784     /* check whether we have local work and send requests if we have none */
785     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
786       /* :-[  no local threads => look out for local sparks */
787       /* the spark pool for the current PE */
788       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
789       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
790           pool->hd < pool->tl) {
791         /* 
792          * ToDo: add GC code check that we really have enough heap afterwards!!
793          * Old comment:
794          * If we're here (no runnable threads) and we have pending
795          * sparks, we must have a space problem.  Get enough space
796          * to turn one of those pending sparks into a
797          * thread... 
798          */
799
800         spark = findSpark(rtsFalse);                /* get a spark */
801         if (spark != (rtsSpark) NULL) {
802           tso = activateSpark(spark);       /* turn the spark into a thread */
803           IF_PAR_DEBUG(schedule,
804                        belch("==== schedule: Created TSO %d (%p); %d threads active",
805                              tso->id, tso, advisory_thread_count));
806
807           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
808             belch("==^^ failed to activate spark");
809             goto next_thread;
810           }               /* otherwise fall through & pick-up new tso */
811         } else {
812           IF_PAR_DEBUG(verbose,
813                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
814                              spark_queue_len(pool)));
815           goto next_thread;
816         }
817       }
818
819       /* If we still have no work we need to send a FISH to get a spark
820          from another PE 
821       */
822       if (EMPTY_RUN_QUEUE()) {
823       /* =8-[  no local sparks => look for work on other PEs */
824         /*
825          * We really have absolutely no work.  Send out a fish
826          * (there may be some out there already), and wait for
827          * something to arrive.  We clearly can't run any threads
828          * until a SCHEDULE or RESUME arrives, and so that's what
829          * we're hoping to see.  (Of course, we still have to
830          * respond to other types of messages.)
831          */
832         TIME now = msTime() /*CURRENT_TIME*/;
833         IF_PAR_DEBUG(verbose, 
834                      belch("--  now=%ld", now));
835         IF_PAR_DEBUG(verbose,
836                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
837                          (last_fish_arrived_at!=0 &&
838                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
839                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
840                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
841                              last_fish_arrived_at,
842                              RtsFlags.ParFlags.fishDelay, now);
843                      });
844         
845         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
846             (last_fish_arrived_at==0 ||
847              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
848           /* outstandingFishes is set in sendFish, processFish;
849              avoid flooding system with fishes via delay */
850           pe = choosePE();
851           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
852                    NEW_FISH_HUNGER);
853
854           // Global statistics: count no. of fishes
855           if (RtsFlags.ParFlags.ParStats.Global &&
856               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
857             globalParStats.tot_fish_mess++;
858           }
859         }
860       
861         receivedFinish = processMessages();
862         goto next_thread;
863       }
864     } else if (PacketsWaiting()) {  /* Look for incoming messages */
865       receivedFinish = processMessages();
866     }
867
868     /* Now we are sure that we have some work available */
869     ASSERT(run_queue_hd != END_TSO_QUEUE);
870
871     /* Take a thread from the run queue, if we have work */
872     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
873     IF_DEBUG(sanity,checkTSO(t));
874
875     /* ToDo: write something to the log-file
876     if (RTSflags.ParFlags.granSimStats && !sameThread)
877         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
878
879     CurrentTSO = t;
880     */
881     /* the spark pool for the current PE */
882     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
883
884     IF_DEBUG(scheduler, 
885              belch("--=^ %d threads, %d sparks on [%#x]", 
886                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
887
888 # if 1
889     if (0 && RtsFlags.ParFlags.ParStats.Full && 
890         t && LastTSO && t->id != LastTSO->id && 
891         LastTSO->why_blocked == NotBlocked && 
892         LastTSO->what_next != ThreadComplete) {
893       // if previously scheduled TSO not blocked we have to record the context switch
894       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
895                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
896     }
897
898     if (RtsFlags.ParFlags.ParStats.Full && 
899         (emitSchedule /* forced emit */ ||
900         (t && LastTSO && t->id != LastTSO->id))) {
901       /* 
902          we are running a different TSO, so write a schedule event to log file
903          NB: If we use fair scheduling we also have to write  a deschedule 
904              event for LastTSO; with unfair scheduling we know that the
905              previous tso has blocked whenever we switch to another tso, so
906              we don't need it in GUM for now
907       */
908       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
909                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
910       emitSchedule = rtsFalse;
911     }
912      
913 # endif
914 #else /* !GRAN && !PAR */
915   
916     // grab a thread from the run queue
917     ASSERT(run_queue_hd != END_TSO_QUEUE);
918     POP_RUN_QUEUE(t);
919
920     // Sanity check the thread we're about to run.  This can be
921     // expensive if there is lots of thread switching going on...
922     IF_DEBUG(sanity,checkTSO(t));
923 #endif
924
925 #ifdef THREADED_RTS
926     {
927       StgMainThread *m;
928       for(m = main_threads; m; m = m->link)
929       {
930         if(m->tso == t)
931           break;
932       }
933       
934       if(m)
935       {
936         if(m == mainThread)
937         {
938           IF_DEBUG(scheduler,
939             sched_belch("### Running thread %d in bound thread", t->id));
940           // yes, the Haskell thread is bound to the current native thread
941         }
942         else
943         {
944           IF_DEBUG(scheduler,
945             sched_belch("### thread %d bound to another OS thread", t->id));
946           // no, bound to a different Haskell thread: pass to that thread
947           PUSH_ON_RUN_QUEUE(t);
948           passCapability(&m->bound_thread_cond);
949           continue;
950         }
951       }
952       else
953       {
954         if(mainThread != NULL)
955         // The thread we want to run is bound.
956         {
957           IF_DEBUG(scheduler,
958             sched_belch("### this OS thread cannot run thread %d", t->id));
959           // no, the current native thread is bound to a different
960           // Haskell thread, so pass it to any worker thread
961           PUSH_ON_RUN_QUEUE(t);
962           passCapabilityToWorker();
963           continue; 
964         }
965       }
966     }
967 #endif
968
969     cap->r.rCurrentTSO = t;
970     
971     /* context switches are now initiated by the timer signal, unless
972      * the user specified "context switch as often as possible", with
973      * +RTS -C0
974      */
975     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
976          && (run_queue_hd != END_TSO_QUEUE
977              || blocked_queue_hd != END_TSO_QUEUE
978              || sleeping_queue != END_TSO_QUEUE)))
979         context_switch = 1;
980     else
981         context_switch = 0;
982
983 run_thread:
984
985     RELEASE_LOCK(&sched_mutex);
986
987     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
988                               t->id, whatNext_strs[t->what_next]));
989
990 #ifdef PROFILING
991     startHeapProfTimer();
992 #endif
993
994     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
995     /* Run the current thread 
996      */
997     prev_what_next = t->what_next;
998     switch (prev_what_next) {
999     case ThreadKilled:
1000     case ThreadComplete:
1001         /* Thread already finished, return to scheduler. */
1002         ret = ThreadFinished;
1003         break;
1004     case ThreadRunGHC:
1005         errno = t->saved_errno;
1006         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1007         t->saved_errno = errno;
1008         break;
1009     case ThreadInterpret:
1010         ret = interpretBCO(cap);
1011         break;
1012     default:
1013       barf("schedule: invalid what_next field");
1014     }
1015     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1016     
1017     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1018 #ifdef PROFILING
1019     stopHeapProfTimer();
1020     CCCS = CCS_SYSTEM;
1021 #endif
1022     
1023     ACQUIRE_LOCK(&sched_mutex);
1024     
1025 #ifdef RTS_SUPPORTS_THREADS
1026     IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
1027 #elif !defined(GRAN) && !defined(PAR)
1028     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
1029 #endif
1030     t = cap->r.rCurrentTSO;
1031     
1032 #if defined(PAR)
1033     /* HACK 675: if the last thread didn't yield, make sure to print a 
1034        SCHEDULE event to the log file when StgRunning the next thread, even
1035        if it is the same one as before */
1036     LastTSO = t; 
1037     TimeOfLastYield = CURRENT_TIME;
1038 #endif
1039
1040     switch (ret) {
1041     case HeapOverflow:
1042 #if defined(GRAN)
1043       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1044       globalGranStats.tot_heapover++;
1045 #elif defined(PAR)
1046       globalParStats.tot_heapover++;
1047 #endif
1048
1049       // did the task ask for a large block?
1050       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1051           // if so, get one and push it on the front of the nursery.
1052           bdescr *bd;
1053           nat blocks;
1054           
1055           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1056
1057           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
1058                                    t->id, whatNext_strs[t->what_next], blocks));
1059
1060           // don't do this if it would push us over the
1061           // alloc_blocks_lim limit; we'll GC first.
1062           if (alloc_blocks + blocks < alloc_blocks_lim) {
1063
1064               alloc_blocks += blocks;
1065               bd = allocGroup( blocks );
1066
1067               // link the new group into the list
1068               bd->link = cap->r.rCurrentNursery;
1069               bd->u.back = cap->r.rCurrentNursery->u.back;
1070               if (cap->r.rCurrentNursery->u.back != NULL) {
1071                   cap->r.rCurrentNursery->u.back->link = bd;
1072               } else {
1073                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1074                          g0s0->blocks == cap->r.rNursery);
1075                   cap->r.rNursery = g0s0->blocks = bd;
1076               }           
1077               cap->r.rCurrentNursery->u.back = bd;
1078
1079               // initialise it as a nursery block.  We initialise the
1080               // step, gen_no, and flags field of *every* sub-block in
1081               // this large block, because this is easier than making
1082               // sure that we always find the block head of a large
1083               // block whenever we call Bdescr() (eg. evacuate() and
1084               // isAlive() in the GC would both have to do this, at
1085               // least).
1086               { 
1087                   bdescr *x;
1088                   for (x = bd; x < bd + blocks; x++) {
1089                       x->step = g0s0;
1090                       x->gen_no = 0;
1091                       x->flags = 0;
1092                   }
1093               }
1094
1095               // don't forget to update the block count in g0s0.
1096               g0s0->n_blocks += blocks;
1097               // This assert can be a killer if the app is doing lots
1098               // of large block allocations.
1099               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1100
1101               // now update the nursery to point to the new block
1102               cap->r.rCurrentNursery = bd;
1103
1104               // we might be unlucky and have another thread get on the
1105               // run queue before us and steal the large block, but in that
1106               // case the thread will just end up requesting another large
1107               // block.
1108               PUSH_ON_RUN_QUEUE(t);
1109               break;
1110           }
1111       }
1112
1113       /* make all the running tasks block on a condition variable,
1114        * maybe set context_switch and wait till they all pile in,
1115        * then have them wait on a GC condition variable.
1116        */
1117       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1118                                t->id, whatNext_strs[t->what_next]));
1119       threadPaused(t);
1120 #if defined(GRAN)
1121       ASSERT(!is_on_queue(t,CurrentProc));
1122 #elif defined(PAR)
1123       /* Currently we emit a DESCHEDULE event before GC in GUM.
1124          ToDo: either add separate event to distinguish SYSTEM time from rest
1125                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1126       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1127         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1128                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1129         emitSchedule = rtsTrue;
1130       }
1131 #endif
1132       
1133       ready_to_gc = rtsTrue;
1134       context_switch = 1;               /* stop other threads ASAP */
1135       PUSH_ON_RUN_QUEUE(t);
1136       /* actual GC is done at the end of the while loop */
1137       break;
1138       
1139     case StackOverflow:
1140 #if defined(GRAN)
1141       IF_DEBUG(gran, 
1142                DumpGranEvent(GR_DESCHEDULE, t));
1143       globalGranStats.tot_stackover++;
1144 #elif defined(PAR)
1145       // IF_DEBUG(par, 
1146       // DumpGranEvent(GR_DESCHEDULE, t);
1147       globalParStats.tot_stackover++;
1148 #endif
1149       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1150                                t->id, whatNext_strs[t->what_next]));
1151       /* just adjust the stack for this thread, then pop it back
1152        * on the run queue.
1153        */
1154       threadPaused(t);
1155       { 
1156         StgMainThread *m;
1157         /* enlarge the stack */
1158         StgTSO *new_t = threadStackOverflow(t);
1159         
1160         /* This TSO has moved, so update any pointers to it from the
1161          * main thread stack.  It better not be on any other queues...
1162          * (it shouldn't be).
1163          */
1164         for (m = main_threads; m != NULL; m = m->link) {
1165           if (m->tso == t) {
1166             m->tso = new_t;
1167           }
1168         }
1169         threadPaused(new_t);
1170         PUSH_ON_RUN_QUEUE(new_t);
1171       }
1172       break;
1173
1174     case ThreadYielding:
1175 #if defined(GRAN)
1176       IF_DEBUG(gran, 
1177                DumpGranEvent(GR_DESCHEDULE, t));
1178       globalGranStats.tot_yields++;
1179 #elif defined(PAR)
1180       // IF_DEBUG(par, 
1181       // DumpGranEvent(GR_DESCHEDULE, t);
1182       globalParStats.tot_yields++;
1183 #endif
1184       /* put the thread back on the run queue.  Then, if we're ready to
1185        * GC, check whether this is the last task to stop.  If so, wake
1186        * up the GC thread.  getThread will block during a GC until the
1187        * GC is finished.
1188        */
1189       IF_DEBUG(scheduler,
1190                if (t->what_next != prev_what_next) {
1191                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1192                          t->id, whatNext_strs[t->what_next]);
1193                } else {
1194                    belch("--<< thread %ld (%s) stopped, yielding", 
1195                          t->id, whatNext_strs[t->what_next]);
1196                }
1197                );
1198
1199       IF_DEBUG(sanity,
1200                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1201                checkTSO(t));
1202       ASSERT(t->link == END_TSO_QUEUE);
1203
1204       // Shortcut if we're just switching evaluators: don't bother
1205       // doing stack squeezing (which can be expensive), just run the
1206       // thread.
1207       if (t->what_next != prev_what_next) {
1208           goto run_thread;
1209       }
1210
1211       threadPaused(t);
1212
1213 #if defined(GRAN)
1214       ASSERT(!is_on_queue(t,CurrentProc));
1215
1216       IF_DEBUG(sanity,
1217                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1218                checkThreadQsSanity(rtsTrue));
1219 #endif
1220
1221 #if defined(PAR)
1222       if (RtsFlags.ParFlags.doFairScheduling) { 
1223         /* this does round-robin scheduling; good for concurrency */
1224         APPEND_TO_RUN_QUEUE(t);
1225       } else {
1226         /* this does unfair scheduling; good for parallelism */
1227         PUSH_ON_RUN_QUEUE(t);
1228       }
1229 #else
1230       // this does round-robin scheduling; good for concurrency
1231       APPEND_TO_RUN_QUEUE(t);
1232 #endif
1233
1234 #if defined(GRAN)
1235       /* add a ContinueThread event to actually process the thread */
1236       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1237                 ContinueThread,
1238                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1239       IF_GRAN_DEBUG(bq, 
1240                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1241                G_EVENTQ(0);
1242                G_CURR_THREADQ(0));
1243 #endif /* GRAN */
1244       break;
1245
1246     case ThreadBlocked:
1247 #if defined(GRAN)
1248       IF_DEBUG(scheduler,
1249                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1250                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1251                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1252
1253       // ??? needed; should emit block before
1254       IF_DEBUG(gran, 
1255                DumpGranEvent(GR_DESCHEDULE, t)); 
1256       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1257       /*
1258         ngoq Dogh!
1259       ASSERT(procStatus[CurrentProc]==Busy || 
1260               ((procStatus[CurrentProc]==Fetching) && 
1261               (t->block_info.closure!=(StgClosure*)NULL)));
1262       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1263           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1264             procStatus[CurrentProc]==Fetching)) 
1265         procStatus[CurrentProc] = Idle;
1266       */
1267 #elif defined(PAR)
1268       IF_DEBUG(scheduler,
1269                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1270                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1271       IF_PAR_DEBUG(bq,
1272
1273                    if (t->block_info.closure!=(StgClosure*)NULL) 
1274                      print_bq(t->block_info.closure));
1275
1276       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1277       blockThread(t);
1278
1279       /* whatever we schedule next, we must log that schedule */
1280       emitSchedule = rtsTrue;
1281
1282 #else /* !GRAN */
1283       /* don't need to do anything.  Either the thread is blocked on
1284        * I/O, in which case we'll have called addToBlockedQueue
1285        * previously, or it's blocked on an MVar or Blackhole, in which
1286        * case it'll be on the relevant queue already.
1287        */
1288       IF_DEBUG(scheduler,
1289                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1290                        t->id, whatNext_strs[t->what_next]);
1291                printThreadBlockage(t);
1292                fprintf(stderr, "\n"));
1293       fflush(stderr);
1294
1295       /* Only for dumping event to log file 
1296          ToDo: do I need this in GranSim, too?
1297       blockThread(t);
1298       */
1299 #endif
1300       threadPaused(t);
1301       break;
1302
1303     case ThreadFinished:
1304       /* Need to check whether this was a main thread, and if so, signal
1305        * the task that started it with the return value.  If we have no
1306        * more main threads, we probably need to stop all the tasks until
1307        * we get a new one.
1308        */
1309       /* We also end up here if the thread kills itself with an
1310        * uncaught exception, see Exception.hc.
1311        */
1312       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1313                                t->id, whatNext_strs[t->what_next]));
1314 #if defined(GRAN)
1315       endThread(t, CurrentProc); // clean-up the thread
1316 #elif defined(PAR)
1317       /* For now all are advisory -- HWL */
1318       //if(t->priority==AdvisoryPriority) ??
1319       advisory_thread_count--;
1320       
1321 # ifdef DIST
1322       if(t->dist.priority==RevalPriority)
1323         FinishReval(t);
1324 # endif
1325       
1326       if (RtsFlags.ParFlags.ParStats.Full &&
1327           !RtsFlags.ParFlags.ParStats.Suppressed) 
1328         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1329 #endif
1330       break;
1331       
1332     default:
1333       barf("schedule: invalid thread return code %d", (int)ret);
1334     }
1335
1336 #ifdef PROFILING
1337     // When we have +RTS -i0 and we're heap profiling, do a census at
1338     // every GC.  This lets us get repeatable runs for debugging.
1339     if (performHeapProfile ||
1340         (RtsFlags.ProfFlags.profileInterval==0 &&
1341          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1342         GarbageCollect(GetRoots, rtsTrue);
1343         heapCensus();
1344         performHeapProfile = rtsFalse;
1345         ready_to_gc = rtsFalse; // we already GC'd
1346     }
1347 #endif
1348
1349     if (ready_to_gc) {
1350       /* everybody back, start the GC.
1351        * Could do it in this thread, or signal a condition var
1352        * to do it in another thread.  Either way, we need to
1353        * broadcast on gc_pending_cond afterward.
1354        */
1355 #if defined(RTS_SUPPORTS_THREADS)
1356       IF_DEBUG(scheduler,sched_belch("doing GC"));
1357 #endif
1358       GarbageCollect(GetRoots,rtsFalse);
1359       ready_to_gc = rtsFalse;
1360 #if defined(GRAN)
1361       /* add a ContinueThread event to continue execution of current thread */
1362       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1363                 ContinueThread,
1364                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1365       IF_GRAN_DEBUG(bq, 
1366                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1367                G_EVENTQ(0);
1368                G_CURR_THREADQ(0));
1369 #endif /* GRAN */
1370     }
1371
1372 #if defined(GRAN)
1373   next_thread:
1374     IF_GRAN_DEBUG(unused,
1375                   print_eventq(EventHd));
1376
1377     event = get_next_event();
1378 #elif defined(PAR)
1379   next_thread:
1380     /* ToDo: wait for next message to arrive rather than busy wait */
1381 #endif /* GRAN */
1382
1383   } /* end of while(1) */
1384
1385   IF_PAR_DEBUG(verbose,
1386                belch("== Leaving schedule() after having received Finish"));
1387 }
1388
1389 /* ---------------------------------------------------------------------------
1390  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1391  * used by Control.Concurrent for error checking.
1392  * ------------------------------------------------------------------------- */
1393  
1394 StgBool
1395 rtsSupportsBoundThreads(void)
1396 {
1397 #ifdef THREADED_RTS
1398   return rtsTrue;
1399 #else
1400   return rtsFalse;
1401 #endif
1402 }
1403
1404 /* ---------------------------------------------------------------------------
1405  * isThreadBound(tso): check whether tso is bound to an OS thread.
1406  * ------------------------------------------------------------------------- */
1407  
1408 StgBool
1409 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1410 {
1411 #ifdef THREADED_RTS
1412   StgMainThread *m;
1413   for(m = main_threads; m; m = m->link)
1414   {
1415     if(m->tso == tso)
1416       return rtsTrue;
1417   }
1418 #endif
1419   return rtsFalse;
1420 }
1421
1422 /* ---------------------------------------------------------------------------
1423  * Singleton fork(). Do not copy any running threads.
1424  * ------------------------------------------------------------------------- */
1425
1426 static void 
1427 deleteThreadImmediately(StgTSO *tso);
1428
1429 StgInt
1430 forkProcess(HsStablePtr *entry)
1431 {
1432 #ifndef mingw32_TARGET_OS
1433   pid_t pid;
1434   StgTSO* t,*next;
1435   StgMainThread *m;
1436   SchedulerStatus rc;
1437
1438   IF_DEBUG(scheduler,sched_belch("forking!"));
1439   rts_lock(); // This not only acquires sched_mutex, it also
1440               // makes sure that no other threads are running
1441
1442   pid = fork();
1443
1444   if (pid) { /* parent */
1445
1446   /* just return the pid */
1447     rts_unlock();
1448     return pid;
1449     
1450   } else { /* child */
1451     
1452     
1453       // delete all threads
1454     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1455     
1456     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1457       next = t->link;
1458
1459         // don't allow threads to catch the ThreadKilled exception
1460       deleteThreadImmediately(t);
1461     }
1462     
1463       // wipe the main thread list
1464     while((m = main_threads) != NULL) {
1465       main_threads = m->link;
1466 #ifdef THREADED_RTS
1467       closeCondition(&m->bound_thread_cond);
1468 #endif
1469       stgFree(m);
1470     }
1471     
1472 #ifdef RTS_SUPPORTS_THREADS
1473     resetTaskManagerAfterFork();      // tell startTask() and friends that
1474     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1475     resetWorkerWakeupPipeAfterFork();
1476 #endif
1477     
1478     rc = rts_evalStableIO(entry, NULL);  // run the action
1479     rts_checkSchedStatus("forkProcess",rc);
1480     
1481     rts_unlock();
1482     
1483     hs_exit();                      // clean up and exit
1484     stg_exit(0);
1485   }
1486 #else /* mingw32 */
1487   barf("forkProcess#: primop not implemented for mingw32, sorry!\n");
1488   return -1;
1489 #endif /* mingw32 */
1490 }
1491
1492 /* ---------------------------------------------------------------------------
1493  * deleteAllThreads():  kill all the live threads.
1494  *
1495  * This is used when we catch a user interrupt (^C), before performing
1496  * any necessary cleanups and running finalizers.
1497  *
1498  * Locks: sched_mutex held.
1499  * ------------------------------------------------------------------------- */
1500    
1501 void
1502 deleteAllThreads ( void )
1503 {
1504   StgTSO* t, *next;
1505   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1506   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1507       next = t->global_link;
1508       deleteThread(t);
1509   }      
1510   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1511   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1512   sleeping_queue = END_TSO_QUEUE;
1513 }
1514
1515 /* startThread and  insertThread are now in GranSim.c -- HWL */
1516
1517
1518 /* ---------------------------------------------------------------------------
1519  * Suspending & resuming Haskell threads.
1520  * 
1521  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1522  * its capability before calling the C function.  This allows another
1523  * task to pick up the capability and carry on running Haskell
1524  * threads.  It also means that if the C call blocks, it won't lock
1525  * the whole system.
1526  *
1527  * The Haskell thread making the C call is put to sleep for the
1528  * duration of the call, on the susepended_ccalling_threads queue.  We
1529  * give out a token to the task, which it can use to resume the thread
1530  * on return from the C function.
1531  * ------------------------------------------------------------------------- */
1532    
1533 StgInt
1534 suspendThread( StgRegTable *reg, 
1535                rtsBool concCall
1536 #if !defined(DEBUG)
1537                STG_UNUSED
1538 #endif
1539                )
1540 {
1541   nat tok;
1542   Capability *cap;
1543   int saved_errno = errno;
1544
1545   /* assume that *reg is a pointer to the StgRegTable part
1546    * of a Capability.
1547    */
1548   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1549
1550   ACQUIRE_LOCK(&sched_mutex);
1551
1552   IF_DEBUG(scheduler,
1553            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1554
1555   // XXX this might not be necessary --SDM
1556   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1557
1558   threadPaused(cap->r.rCurrentTSO);
1559   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1560   suspended_ccalling_threads = cap->r.rCurrentTSO;
1561
1562 #if defined(RTS_SUPPORTS_THREADS)
1563   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1564   {
1565       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1566       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1567   }
1568   else
1569   {
1570       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1571   }
1572 #endif
1573
1574   /* Use the thread ID as the token; it should be unique */
1575   tok = cap->r.rCurrentTSO->id;
1576
1577   /* Hand back capability */
1578   releaseCapability(cap);
1579   
1580 #if defined(RTS_SUPPORTS_THREADS)
1581   /* Preparing to leave the RTS, so ensure there's a native thread/task
1582      waiting to take over.
1583   */
1584   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1585 #endif
1586
1587   /* Other threads _might_ be available for execution; signal this */
1588   THREAD_RUNNABLE();
1589   RELEASE_LOCK(&sched_mutex);
1590   
1591   errno = saved_errno;
1592   return tok; 
1593 }
1594
1595 StgRegTable *
1596 resumeThread( StgInt tok,
1597               rtsBool concCall STG_UNUSED )
1598 {
1599   StgTSO *tso, **prev;
1600   Capability *cap;
1601   int saved_errno = errno;
1602
1603 #if defined(RTS_SUPPORTS_THREADS)
1604   /* Wait for permission to re-enter the RTS with the result. */
1605   ACQUIRE_LOCK(&sched_mutex);
1606   waitForReturnCapability(&sched_mutex, &cap);
1607
1608   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1609 #else
1610   grabCapability(&cap);
1611 #endif
1612
1613   /* Remove the thread off of the suspended list */
1614   prev = &suspended_ccalling_threads;
1615   for (tso = suspended_ccalling_threads; 
1616        tso != END_TSO_QUEUE; 
1617        prev = &tso->link, tso = tso->link) {
1618     if (tso->id == (StgThreadID)tok) {
1619       *prev = tso->link;
1620       break;
1621     }
1622   }
1623   if (tso == END_TSO_QUEUE) {
1624     barf("resumeThread: thread not found");
1625   }
1626   tso->link = END_TSO_QUEUE;
1627   
1628 #if defined(RTS_SUPPORTS_THREADS)
1629   if(tso->why_blocked == BlockedOnCCall)
1630   {
1631       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1632       tso->blocked_exceptions = NULL;
1633   }
1634 #endif
1635   
1636   /* Reset blocking status */
1637   tso->why_blocked  = NotBlocked;
1638
1639   cap->r.rCurrentTSO = tso;
1640   RELEASE_LOCK(&sched_mutex);
1641   errno = saved_errno;
1642   return &cap->r;
1643 }
1644
1645
1646 /* ---------------------------------------------------------------------------
1647  * Static functions
1648  * ------------------------------------------------------------------------ */
1649 static void unblockThread(StgTSO *tso);
1650
1651 /* ---------------------------------------------------------------------------
1652  * Comparing Thread ids.
1653  *
1654  * This is used from STG land in the implementation of the
1655  * instances of Eq/Ord for ThreadIds.
1656  * ------------------------------------------------------------------------ */
1657
1658 int
1659 cmp_thread(StgPtr tso1, StgPtr tso2) 
1660
1661   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1662   StgThreadID id2 = ((StgTSO *)tso2)->id;
1663  
1664   if (id1 < id2) return (-1);
1665   if (id1 > id2) return 1;
1666   return 0;
1667 }
1668
1669 /* ---------------------------------------------------------------------------
1670  * Fetching the ThreadID from an StgTSO.
1671  *
1672  * This is used in the implementation of Show for ThreadIds.
1673  * ------------------------------------------------------------------------ */
1674 int
1675 rts_getThreadId(StgPtr tso) 
1676 {
1677   return ((StgTSO *)tso)->id;
1678 }
1679
1680 #ifdef DEBUG
1681 void
1682 labelThread(StgPtr tso, char *label)
1683 {
1684   int len;
1685   void *buf;
1686
1687   /* Caveat: Once set, you can only set the thread name to "" */
1688   len = strlen(label)+1;
1689   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1690   strncpy(buf,label,len);
1691   /* Update will free the old memory for us */
1692   updateThreadLabel(((StgTSO *)tso)->id,buf);
1693 }
1694 #endif /* DEBUG */
1695
1696 /* ---------------------------------------------------------------------------
1697    Create a new thread.
1698
1699    The new thread starts with the given stack size.  Before the
1700    scheduler can run, however, this thread needs to have a closure
1701    (and possibly some arguments) pushed on its stack.  See
1702    pushClosure() in Schedule.h.
1703
1704    createGenThread() and createIOThread() (in SchedAPI.h) are
1705    convenient packaged versions of this function.
1706
1707    currently pri (priority) is only used in a GRAN setup -- HWL
1708    ------------------------------------------------------------------------ */
1709 #if defined(GRAN)
1710 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1711 StgTSO *
1712 createThread(nat size, StgInt pri)
1713 #else
1714 StgTSO *
1715 createThread(nat size)
1716 #endif
1717 {
1718
1719     StgTSO *tso;
1720     nat stack_size;
1721
1722     /* First check whether we should create a thread at all */
1723 #if defined(PAR)
1724   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1725   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1726     threadsIgnored++;
1727     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1728           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1729     return END_TSO_QUEUE;
1730   }
1731   threadsCreated++;
1732 #endif
1733
1734 #if defined(GRAN)
1735   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1736 #endif
1737
1738   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1739
1740   /* catch ridiculously small stack sizes */
1741   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1742     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1743   }
1744
1745   stack_size = size - TSO_STRUCT_SIZEW;
1746
1747   tso = (StgTSO *)allocate(size);
1748   TICK_ALLOC_TSO(stack_size, 0);
1749
1750   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1751 #if defined(GRAN)
1752   SET_GRAN_HDR(tso, ThisPE);
1753 #endif
1754
1755   // Always start with the compiled code evaluator
1756   tso->what_next = ThreadRunGHC;
1757
1758   tso->id = next_thread_id++; 
1759   tso->why_blocked  = NotBlocked;
1760   tso->blocked_exceptions = NULL;
1761
1762   tso->saved_errno = 0;
1763   
1764   tso->stack_size   = stack_size;
1765   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1766                               - TSO_STRUCT_SIZEW;
1767   tso->sp           = (P_)&(tso->stack) + stack_size;
1768
1769 #ifdef PROFILING
1770   tso->prof.CCCS = CCS_MAIN;
1771 #endif
1772
1773   /* put a stop frame on the stack */
1774   tso->sp -= sizeofW(StgStopFrame);
1775   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1776   // ToDo: check this
1777 #if defined(GRAN)
1778   tso->link = END_TSO_QUEUE;
1779   /* uses more flexible routine in GranSim */
1780   insertThread(tso, CurrentProc);
1781 #else
1782   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1783    * from its creation
1784    */
1785 #endif
1786
1787 #if defined(GRAN) 
1788   if (RtsFlags.GranFlags.GranSimStats.Full) 
1789     DumpGranEvent(GR_START,tso);
1790 #elif defined(PAR)
1791   if (RtsFlags.ParFlags.ParStats.Full) 
1792     DumpGranEvent(GR_STARTQ,tso);
1793   /* HACk to avoid SCHEDULE 
1794      LastTSO = tso; */
1795 #endif
1796
1797   /* Link the new thread on the global thread list.
1798    */
1799   tso->global_link = all_threads;
1800   all_threads = tso;
1801
1802 #if defined(DIST)
1803   tso->dist.priority = MandatoryPriority; //by default that is...
1804 #endif
1805
1806 #if defined(GRAN)
1807   tso->gran.pri = pri;
1808 # if defined(DEBUG)
1809   tso->gran.magic = TSO_MAGIC; // debugging only
1810 # endif
1811   tso->gran.sparkname   = 0;
1812   tso->gran.startedat   = CURRENT_TIME; 
1813   tso->gran.exported    = 0;
1814   tso->gran.basicblocks = 0;
1815   tso->gran.allocs      = 0;
1816   tso->gran.exectime    = 0;
1817   tso->gran.fetchtime   = 0;
1818   tso->gran.fetchcount  = 0;
1819   tso->gran.blocktime   = 0;
1820   tso->gran.blockcount  = 0;
1821   tso->gran.blockedat   = 0;
1822   tso->gran.globalsparks = 0;
1823   tso->gran.localsparks  = 0;
1824   if (RtsFlags.GranFlags.Light)
1825     tso->gran.clock  = Now; /* local clock */
1826   else
1827     tso->gran.clock  = 0;
1828
1829   IF_DEBUG(gran,printTSO(tso));
1830 #elif defined(PAR)
1831 # if defined(DEBUG)
1832   tso->par.magic = TSO_MAGIC; // debugging only
1833 # endif
1834   tso->par.sparkname   = 0;
1835   tso->par.startedat   = CURRENT_TIME; 
1836   tso->par.exported    = 0;
1837   tso->par.basicblocks = 0;
1838   tso->par.allocs      = 0;
1839   tso->par.exectime    = 0;
1840   tso->par.fetchtime   = 0;
1841   tso->par.fetchcount  = 0;
1842   tso->par.blocktime   = 0;
1843   tso->par.blockcount  = 0;
1844   tso->par.blockedat   = 0;
1845   tso->par.globalsparks = 0;
1846   tso->par.localsparks  = 0;
1847 #endif
1848
1849 #if defined(GRAN)
1850   globalGranStats.tot_threads_created++;
1851   globalGranStats.threads_created_on_PE[CurrentProc]++;
1852   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1853   globalGranStats.tot_sq_probes++;
1854 #elif defined(PAR)
1855   // collect parallel global statistics (currently done together with GC stats)
1856   if (RtsFlags.ParFlags.ParStats.Global &&
1857       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1858     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1859     globalParStats.tot_threads_created++;
1860   }
1861 #endif 
1862
1863 #if defined(GRAN)
1864   IF_GRAN_DEBUG(pri,
1865                 belch("==__ schedule: Created TSO %d (%p);",
1866                       CurrentProc, tso, tso->id));
1867 #elif defined(PAR)
1868     IF_PAR_DEBUG(verbose,
1869                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1870                        tso->id, tso, advisory_thread_count));
1871 #else
1872   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1873                                  tso->id, tso->stack_size));
1874 #endif    
1875   return tso;
1876 }
1877
1878 #if defined(PAR)
1879 /* RFP:
1880    all parallel thread creation calls should fall through the following routine.
1881 */
1882 StgTSO *
1883 createSparkThread(rtsSpark spark) 
1884 { StgTSO *tso;
1885   ASSERT(spark != (rtsSpark)NULL);
1886   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1887   { threadsIgnored++;
1888     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1889           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1890     return END_TSO_QUEUE;
1891   }
1892   else
1893   { threadsCreated++;
1894     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1895     if (tso==END_TSO_QUEUE)     
1896       barf("createSparkThread: Cannot create TSO");
1897 #if defined(DIST)
1898     tso->priority = AdvisoryPriority;
1899 #endif
1900     pushClosure(tso,spark);
1901     PUSH_ON_RUN_QUEUE(tso);
1902     advisory_thread_count++;    
1903   }
1904   return tso;
1905 }
1906 #endif
1907
1908 /*
1909   Turn a spark into a thread.
1910   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1911 */
1912 #if defined(PAR)
1913 StgTSO *
1914 activateSpark (rtsSpark spark) 
1915 {
1916   StgTSO *tso;
1917
1918   tso = createSparkThread(spark);
1919   if (RtsFlags.ParFlags.ParStats.Full) {   
1920     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1921     IF_PAR_DEBUG(verbose,
1922                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1923                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1924   }
1925   // ToDo: fwd info on local/global spark to thread -- HWL
1926   // tso->gran.exported =  spark->exported;
1927   // tso->gran.locked =   !spark->global;
1928   // tso->gran.sparkname = spark->name;
1929
1930   return tso;
1931 }
1932 #endif
1933
1934 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1935                                    Capability *initialCapability
1936                                    );
1937
1938
1939 /* ---------------------------------------------------------------------------
1940  * scheduleThread()
1941  *
1942  * scheduleThread puts a thread on the head of the runnable queue.
1943  * This will usually be done immediately after a thread is created.
1944  * The caller of scheduleThread must create the thread using e.g.
1945  * createThread and push an appropriate closure
1946  * on this thread's stack before the scheduler is invoked.
1947  * ------------------------------------------------------------------------ */
1948
1949 static void scheduleThread_ (StgTSO* tso);
1950
1951 void
1952 scheduleThread_(StgTSO *tso)
1953 {
1954   // Precondition: sched_mutex must be held.
1955
1956   /* Put the new thread on the head of the runnable queue.  The caller
1957    * better push an appropriate closure on this thread's stack
1958    * beforehand.  In the SMP case, the thread may start running as
1959    * soon as we release the scheduler lock below.
1960    */
1961   PUSH_ON_RUN_QUEUE(tso);
1962   THREAD_RUNNABLE();
1963
1964 #if 0
1965   IF_DEBUG(scheduler,printTSO(tso));
1966 #endif
1967 }
1968
1969 void scheduleThread(StgTSO* tso)
1970 {
1971   ACQUIRE_LOCK(&sched_mutex);
1972   scheduleThread_(tso);
1973   RELEASE_LOCK(&sched_mutex);
1974 }
1975
1976 #if defined(RTS_SUPPORTS_THREADS)
1977 static Condition *bound_cond_cache = NULL;
1978 #endif
1979
1980 SchedulerStatus
1981 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1982                    Capability *initialCapability)
1983 {
1984     // Precondition: sched_mutex must be held
1985     StgMainThread *m;
1986
1987     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1988     m->tso = tso;
1989     m->ret = ret;
1990     m->stat = NoStatus;
1991 #if defined(RTS_SUPPORTS_THREADS)
1992     // Allocating a new condition for each thread is expensive, so we
1993     // cache one.  This is a pretty feeble hack, but it helps speed up
1994     // consecutive call-ins quite a bit.
1995     if (bound_cond_cache != NULL) {
1996         m->bound_thread_cond = *bound_cond_cache;
1997         bound_cond_cache = NULL;
1998     } else {
1999         initCondition(&m->bound_thread_cond);
2000     }
2001 #endif
2002
2003     /* Put the thread on the main-threads list prior to scheduling the TSO.
2004        Failure to do so introduces a race condition in the MT case (as
2005        identified by Wolfgang Thaller), whereby the new task/OS thread 
2006        created by scheduleThread_() would complete prior to the thread
2007        that spawned it managed to put 'itself' on the main-threads list.
2008        The upshot of it all being that the worker thread wouldn't get to
2009        signal the completion of the its work item for the main thread to
2010        see (==> it got stuck waiting.)    -- sof 6/02.
2011     */
2012     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2013     
2014     m->link = main_threads;
2015     main_threads = m;
2016     
2017     scheduleThread_(tso);
2018
2019     return waitThread_(m, initialCapability);
2020 }
2021
2022 /* ---------------------------------------------------------------------------
2023  * initScheduler()
2024  *
2025  * Initialise the scheduler.  This resets all the queues - if the
2026  * queues contained any threads, they'll be garbage collected at the
2027  * next pass.
2028  *
2029  * ------------------------------------------------------------------------ */
2030
2031 void 
2032 initScheduler(void)
2033 {
2034 #if defined(GRAN)
2035   nat i;
2036
2037   for (i=0; i<=MAX_PROC; i++) {
2038     run_queue_hds[i]      = END_TSO_QUEUE;
2039     run_queue_tls[i]      = END_TSO_QUEUE;
2040     blocked_queue_hds[i]  = END_TSO_QUEUE;
2041     blocked_queue_tls[i]  = END_TSO_QUEUE;
2042     ccalling_threadss[i]  = END_TSO_QUEUE;
2043     sleeping_queue        = END_TSO_QUEUE;
2044   }
2045 #else
2046   run_queue_hd      = END_TSO_QUEUE;
2047   run_queue_tl      = END_TSO_QUEUE;
2048   blocked_queue_hd  = END_TSO_QUEUE;
2049   blocked_queue_tl  = END_TSO_QUEUE;
2050   sleeping_queue    = END_TSO_QUEUE;
2051 #endif 
2052
2053   suspended_ccalling_threads  = END_TSO_QUEUE;
2054
2055   main_threads = NULL;
2056   all_threads  = END_TSO_QUEUE;
2057
2058   context_switch = 0;
2059   interrupted    = 0;
2060
2061   RtsFlags.ConcFlags.ctxtSwitchTicks =
2062       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2063       
2064 #if defined(RTS_SUPPORTS_THREADS)
2065   /* Initialise the mutex and condition variables used by
2066    * the scheduler. */
2067   initMutex(&sched_mutex);
2068   initMutex(&term_mutex);
2069 #endif
2070   
2071 #if defined(RTS_SUPPORTS_THREADS)
2072   ACQUIRE_LOCK(&sched_mutex);
2073 #endif
2074
2075   /* A capability holds the state a native thread needs in
2076    * order to execute STG code. At least one capability is
2077    * floating around (only SMP builds have more than one).
2078    */
2079   initCapabilities();
2080   
2081 #if defined(RTS_SUPPORTS_THREADS)
2082     /* start our haskell execution tasks */
2083     startTaskManager(0,taskStart);
2084 #endif
2085
2086 #if /* defined(SMP) ||*/ defined(PAR)
2087   initSparkPools();
2088 #endif
2089
2090   RELEASE_LOCK(&sched_mutex);
2091
2092 }
2093
2094 void
2095 exitScheduler( void )
2096 {
2097 #if defined(RTS_SUPPORTS_THREADS)
2098   stopTaskManager();
2099 #endif
2100   shutting_down_scheduler = rtsTrue;
2101 }
2102
2103 /* ----------------------------------------------------------------------------
2104    Managing the per-task allocation areas.
2105    
2106    Each capability comes with an allocation area.  These are
2107    fixed-length block lists into which allocation can be done.
2108
2109    ToDo: no support for two-space collection at the moment???
2110    ------------------------------------------------------------------------- */
2111
2112 static
2113 SchedulerStatus
2114 waitThread_(StgMainThread* m, Capability *initialCapability)
2115 {
2116   SchedulerStatus stat;
2117
2118   // Precondition: sched_mutex must be held.
2119   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2120
2121 #if defined(GRAN)
2122   /* GranSim specific init */
2123   CurrentTSO = m->tso;                // the TSO to run
2124   procStatus[MainProc] = Busy;        // status of main PE
2125   CurrentProc = MainProc;             // PE to run it on
2126   schedule(m,initialCapability);
2127 #else
2128   schedule(m,initialCapability);
2129   ASSERT(m->stat != NoStatus);
2130 #endif
2131
2132   stat = m->stat;
2133
2134 #if defined(RTS_SUPPORTS_THREADS)
2135   // Free the condition variable, returning it to the cache if possible.
2136   if (bound_cond_cache == NULL) {
2137       *bound_cond_cache = m->bound_thread_cond;
2138   } else {
2139       closeCondition(&m->bound_thread_cond);
2140   }
2141 #endif
2142
2143   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2144   stgFree(m);
2145
2146   // Postcondition: sched_mutex still held
2147   return stat;
2148 }
2149
2150 /* ---------------------------------------------------------------------------
2151    Where are the roots that we know about?
2152
2153         - all the threads on the runnable queue
2154         - all the threads on the blocked queue
2155         - all the threads on the sleeping queue
2156         - all the thread currently executing a _ccall_GC
2157         - all the "main threads"
2158      
2159    ------------------------------------------------------------------------ */
2160
2161 /* This has to be protected either by the scheduler monitor, or by the
2162         garbage collection monitor (probably the latter).
2163         KH @ 25/10/99
2164 */
2165
2166 void
2167 GetRoots( evac_fn evac )
2168 {
2169 #if defined(GRAN)
2170   {
2171     nat i;
2172     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2173       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2174           evac((StgClosure **)&run_queue_hds[i]);
2175       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2176           evac((StgClosure **)&run_queue_tls[i]);
2177       
2178       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2179           evac((StgClosure **)&blocked_queue_hds[i]);
2180       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2181           evac((StgClosure **)&blocked_queue_tls[i]);
2182       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2183           evac((StgClosure **)&ccalling_threads[i]);
2184     }
2185   }
2186
2187   markEventQueue();
2188
2189 #else /* !GRAN */
2190   if (run_queue_hd != END_TSO_QUEUE) {
2191       ASSERT(run_queue_tl != END_TSO_QUEUE);
2192       evac((StgClosure **)&run_queue_hd);
2193       evac((StgClosure **)&run_queue_tl);
2194   }
2195   
2196   if (blocked_queue_hd != END_TSO_QUEUE) {
2197       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2198       evac((StgClosure **)&blocked_queue_hd);
2199       evac((StgClosure **)&blocked_queue_tl);
2200   }
2201   
2202   if (sleeping_queue != END_TSO_QUEUE) {
2203       evac((StgClosure **)&sleeping_queue);
2204   }
2205 #endif 
2206
2207   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2208       evac((StgClosure **)&suspended_ccalling_threads);
2209   }
2210
2211 #if defined(PAR) || defined(GRAN)
2212   markSparkQueue(evac);
2213 #endif
2214
2215 #if defined(RTS_USER_SIGNALS)
2216   // mark the signal handlers (signals should be already blocked)
2217   markSignalHandlers(evac);
2218 #endif
2219
2220   // main threads which have completed need to be retained until they
2221   // are dealt with in the main scheduler loop.  They won't be
2222   // retained any other way: the GC will drop them from the
2223   // all_threads list, so we have to be careful to treat them as roots
2224   // here.
2225   { 
2226       StgMainThread *m;
2227       for (m = main_threads; m != NULL; m = m->link) {
2228           switch (m->tso->what_next) {
2229           case ThreadComplete:
2230           case ThreadKilled:
2231               evac((StgClosure **)&m->tso);
2232               break;
2233           default:
2234               break;
2235           }
2236       }
2237   }
2238 }
2239
2240 /* -----------------------------------------------------------------------------
2241    performGC
2242
2243    This is the interface to the garbage collector from Haskell land.
2244    We provide this so that external C code can allocate and garbage
2245    collect when called from Haskell via _ccall_GC.
2246
2247    It might be useful to provide an interface whereby the programmer
2248    can specify more roots (ToDo).
2249    
2250    This needs to be protected by the GC condition variable above.  KH.
2251    -------------------------------------------------------------------------- */
2252
2253 static void (*extra_roots)(evac_fn);
2254
2255 void
2256 performGC(void)
2257 {
2258   /* Obligated to hold this lock upon entry */
2259   ACQUIRE_LOCK(&sched_mutex);
2260   GarbageCollect(GetRoots,rtsFalse);
2261   RELEASE_LOCK(&sched_mutex);
2262 }
2263
2264 void
2265 performMajorGC(void)
2266 {
2267   ACQUIRE_LOCK(&sched_mutex);
2268   GarbageCollect(GetRoots,rtsTrue);
2269   RELEASE_LOCK(&sched_mutex);
2270 }
2271
2272 static void
2273 AllRoots(evac_fn evac)
2274 {
2275     GetRoots(evac);             // the scheduler's roots
2276     extra_roots(evac);          // the user's roots
2277 }
2278
2279 void
2280 performGCWithRoots(void (*get_roots)(evac_fn))
2281 {
2282   ACQUIRE_LOCK(&sched_mutex);
2283   extra_roots = get_roots;
2284   GarbageCollect(AllRoots,rtsFalse);
2285   RELEASE_LOCK(&sched_mutex);
2286 }
2287
2288 /* -----------------------------------------------------------------------------
2289    Stack overflow
2290
2291    If the thread has reached its maximum stack size, then raise the
2292    StackOverflow exception in the offending thread.  Otherwise
2293    relocate the TSO into a larger chunk of memory and adjust its stack
2294    size appropriately.
2295    -------------------------------------------------------------------------- */
2296
2297 static StgTSO *
2298 threadStackOverflow(StgTSO *tso)
2299 {
2300   nat new_stack_size, new_tso_size, stack_words;
2301   StgPtr new_sp;
2302   StgTSO *dest;
2303
2304   IF_DEBUG(sanity,checkTSO(tso));
2305   if (tso->stack_size >= tso->max_stack_size) {
2306
2307     IF_DEBUG(gc,
2308              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2309                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2310              /* If we're debugging, just print out the top of the stack */
2311              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2312                                               tso->sp+64)));
2313
2314     /* Send this thread the StackOverflow exception */
2315     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2316     return tso;
2317   }
2318
2319   /* Try to double the current stack size.  If that takes us over the
2320    * maximum stack size for this thread, then use the maximum instead.
2321    * Finally round up so the TSO ends up as a whole number of blocks.
2322    */
2323   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2324   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2325                                        TSO_STRUCT_SIZE)/sizeof(W_);
2326   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2327   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2328
2329   IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2330
2331   dest = (StgTSO *)allocate(new_tso_size);
2332   TICK_ALLOC_TSO(new_stack_size,0);
2333
2334   /* copy the TSO block and the old stack into the new area */
2335   memcpy(dest,tso,TSO_STRUCT_SIZE);
2336   stack_words = tso->stack + tso->stack_size - tso->sp;
2337   new_sp = (P_)dest + new_tso_size - stack_words;
2338   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2339
2340   /* relocate the stack pointers... */
2341   dest->sp         = new_sp;
2342   dest->stack_size = new_stack_size;
2343         
2344   /* Mark the old TSO as relocated.  We have to check for relocated
2345    * TSOs in the garbage collector and any primops that deal with TSOs.
2346    *
2347    * It's important to set the sp value to just beyond the end
2348    * of the stack, so we don't attempt to scavenge any part of the
2349    * dead TSO's stack.
2350    */
2351   tso->what_next = ThreadRelocated;
2352   tso->link = dest;
2353   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2354   tso->why_blocked = NotBlocked;
2355   dest->mut_link = NULL;
2356
2357   IF_PAR_DEBUG(verbose,
2358                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2359                      tso->id, tso, tso->stack_size);
2360                /* If we're debugging, just print out the top of the stack */
2361                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2362                                                 tso->sp+64)));
2363   
2364   IF_DEBUG(sanity,checkTSO(tso));
2365 #if 0
2366   IF_DEBUG(scheduler,printTSO(dest));
2367 #endif
2368
2369   return dest;
2370 }
2371
2372 /* ---------------------------------------------------------------------------
2373    Wake up a queue that was blocked on some resource.
2374    ------------------------------------------------------------------------ */
2375
2376 #if defined(GRAN)
2377 STATIC_INLINE void
2378 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2379 {
2380 }
2381 #elif defined(PAR)
2382 STATIC_INLINE void
2383 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2384 {
2385   /* write RESUME events to log file and
2386      update blocked and fetch time (depending on type of the orig closure) */
2387   if (RtsFlags.ParFlags.ParStats.Full) {
2388     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2389                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2390                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2391     if (EMPTY_RUN_QUEUE())
2392       emitSchedule = rtsTrue;
2393
2394     switch (get_itbl(node)->type) {
2395         case FETCH_ME_BQ:
2396           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2397           break;
2398         case RBH:
2399         case FETCH_ME:
2400         case BLACKHOLE_BQ:
2401           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2402           break;
2403 #ifdef DIST
2404         case MVAR:
2405           break;
2406 #endif    
2407         default:
2408           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2409         }
2410       }
2411 }
2412 #endif
2413
2414 #if defined(GRAN)
2415 static StgBlockingQueueElement *
2416 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2417 {
2418     StgTSO *tso;
2419     PEs node_loc, tso_loc;
2420
2421     node_loc = where_is(node); // should be lifted out of loop
2422     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2423     tso_loc = where_is((StgClosure *)tso);
2424     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2425       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2426       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2427       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2428       // insertThread(tso, node_loc);
2429       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2430                 ResumeThread,
2431                 tso, node, (rtsSpark*)NULL);
2432       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2433       // len_local++;
2434       // len++;
2435     } else { // TSO is remote (actually should be FMBQ)
2436       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2437                                   RtsFlags.GranFlags.Costs.gunblocktime +
2438                                   RtsFlags.GranFlags.Costs.latency;
2439       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2440                 UnblockThread,
2441                 tso, node, (rtsSpark*)NULL);
2442       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2443       // len++;
2444     }
2445     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2446     IF_GRAN_DEBUG(bq,
2447                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2448                           (node_loc==tso_loc ? "Local" : "Global"), 
2449                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2450     tso->block_info.closure = NULL;
2451     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2452                              tso->id, tso));
2453 }
2454 #elif defined(PAR)
2455 static StgBlockingQueueElement *
2456 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2457 {
2458     StgBlockingQueueElement *next;
2459
2460     switch (get_itbl(bqe)->type) {
2461     case TSO:
2462       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2463       /* if it's a TSO just push it onto the run_queue */
2464       next = bqe->link;
2465       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2466       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2467       THREAD_RUNNABLE();
2468       unblockCount(bqe, node);
2469       /* reset blocking status after dumping event */
2470       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2471       break;
2472
2473     case BLOCKED_FETCH:
2474       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2475       next = bqe->link;
2476       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2477       PendingFetches = (StgBlockedFetch *)bqe;
2478       break;
2479
2480 # if defined(DEBUG)
2481       /* can ignore this case in a non-debugging setup; 
2482          see comments on RBHSave closures above */
2483     case CONSTR:
2484       /* check that the closure is an RBHSave closure */
2485       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2486              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2487              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2488       break;
2489
2490     default:
2491       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2492            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2493            (StgClosure *)bqe);
2494 # endif
2495     }
2496   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2497   return next;
2498 }
2499
2500 #else /* !GRAN && !PAR */
2501 static StgTSO *
2502 unblockOneLocked(StgTSO *tso)
2503 {
2504   StgTSO *next;
2505
2506   ASSERT(get_itbl(tso)->type == TSO);
2507   ASSERT(tso->why_blocked != NotBlocked);
2508   tso->why_blocked = NotBlocked;
2509   next = tso->link;
2510   PUSH_ON_RUN_QUEUE(tso);
2511   THREAD_RUNNABLE();
2512   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2513   return next;
2514 }
2515 #endif
2516
2517 #if defined(GRAN) || defined(PAR)
2518 INLINE_ME StgBlockingQueueElement *
2519 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2520 {
2521   ACQUIRE_LOCK(&sched_mutex);
2522   bqe = unblockOneLocked(bqe, node);
2523   RELEASE_LOCK(&sched_mutex);
2524   return bqe;
2525 }
2526 #else
2527 INLINE_ME StgTSO *
2528 unblockOne(StgTSO *tso)
2529 {
2530   ACQUIRE_LOCK(&sched_mutex);
2531   tso = unblockOneLocked(tso);
2532   RELEASE_LOCK(&sched_mutex);
2533   return tso;
2534 }
2535 #endif
2536
2537 #if defined(GRAN)
2538 void 
2539 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2540 {
2541   StgBlockingQueueElement *bqe;
2542   PEs node_loc;
2543   nat len = 0; 
2544
2545   IF_GRAN_DEBUG(bq, 
2546                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2547                       node, CurrentProc, CurrentTime[CurrentProc], 
2548                       CurrentTSO->id, CurrentTSO));
2549
2550   node_loc = where_is(node);
2551
2552   ASSERT(q == END_BQ_QUEUE ||
2553          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2554          get_itbl(q)->type == CONSTR); // closure (type constructor)
2555   ASSERT(is_unique(node));
2556
2557   /* FAKE FETCH: magically copy the node to the tso's proc;
2558      no Fetch necessary because in reality the node should not have been 
2559      moved to the other PE in the first place
2560   */
2561   if (CurrentProc!=node_loc) {
2562     IF_GRAN_DEBUG(bq, 
2563                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2564                         node, node_loc, CurrentProc, CurrentTSO->id, 
2565                         // CurrentTSO, where_is(CurrentTSO),
2566                         node->header.gran.procs));
2567     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2568     IF_GRAN_DEBUG(bq, 
2569                   belch("## new bitmask of node %p is %#x",
2570                         node, node->header.gran.procs));
2571     if (RtsFlags.GranFlags.GranSimStats.Global) {
2572       globalGranStats.tot_fake_fetches++;
2573     }
2574   }
2575
2576   bqe = q;
2577   // ToDo: check: ASSERT(CurrentProc==node_loc);
2578   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2579     //next = bqe->link;
2580     /* 
2581        bqe points to the current element in the queue
2582        next points to the next element in the queue
2583     */
2584     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2585     //tso_loc = where_is(tso);
2586     len++;
2587     bqe = unblockOneLocked(bqe, node);
2588   }
2589
2590   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2591      the closure to make room for the anchor of the BQ */
2592   if (bqe!=END_BQ_QUEUE) {
2593     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2594     /*
2595     ASSERT((info_ptr==&RBH_Save_0_info) ||
2596            (info_ptr==&RBH_Save_1_info) ||
2597            (info_ptr==&RBH_Save_2_info));
2598     */
2599     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2600     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2601     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2602
2603     IF_GRAN_DEBUG(bq,
2604                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2605                         node, info_type(node)));
2606   }
2607
2608   /* statistics gathering */
2609   if (RtsFlags.GranFlags.GranSimStats.Global) {
2610     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2611     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2612     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2613     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2614   }
2615   IF_GRAN_DEBUG(bq,
2616                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2617                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2618 }
2619 #elif defined(PAR)
2620 void 
2621 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2622 {
2623   StgBlockingQueueElement *bqe;
2624
2625   ACQUIRE_LOCK(&sched_mutex);
2626
2627   IF_PAR_DEBUG(verbose, 
2628                belch("##-_ AwBQ for node %p on [%x]: ",
2629                      node, mytid));
2630 #ifdef DIST  
2631   //RFP
2632   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2633     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2634     return;
2635   }
2636 #endif
2637   
2638   ASSERT(q == END_BQ_QUEUE ||
2639          get_itbl(q)->type == TSO ||           
2640          get_itbl(q)->type == BLOCKED_FETCH || 
2641          get_itbl(q)->type == CONSTR); 
2642
2643   bqe = q;
2644   while (get_itbl(bqe)->type==TSO || 
2645          get_itbl(bqe)->type==BLOCKED_FETCH) {
2646     bqe = unblockOneLocked(bqe, node);
2647   }
2648   RELEASE_LOCK(&sched_mutex);
2649 }
2650
2651 #else   /* !GRAN && !PAR */
2652
2653 #ifdef RTS_SUPPORTS_THREADS
2654 void
2655 awakenBlockedQueueNoLock(StgTSO *tso)
2656 {
2657   while (tso != END_TSO_QUEUE) {
2658     tso = unblockOneLocked(tso);
2659   }
2660 }
2661 #endif
2662
2663 void
2664 awakenBlockedQueue(StgTSO *tso)
2665 {
2666   ACQUIRE_LOCK(&sched_mutex);
2667   while (tso != END_TSO_QUEUE) {
2668     tso = unblockOneLocked(tso);
2669   }
2670   RELEASE_LOCK(&sched_mutex);
2671 }
2672 #endif
2673
2674 /* ---------------------------------------------------------------------------
2675    Interrupt execution
2676    - usually called inside a signal handler so it mustn't do anything fancy.   
2677    ------------------------------------------------------------------------ */
2678
2679 void
2680 interruptStgRts(void)
2681 {
2682     interrupted    = 1;
2683     context_switch = 1;
2684 #ifdef RTS_SUPPORTS_THREADS
2685     wakeBlockedWorkerThread();
2686 #endif
2687 }
2688
2689 /* -----------------------------------------------------------------------------
2690    Unblock a thread
2691
2692    This is for use when we raise an exception in another thread, which
2693    may be blocked.
2694    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2695    -------------------------------------------------------------------------- */
2696
2697 #if defined(GRAN) || defined(PAR)
2698 /*
2699   NB: only the type of the blocking queue is different in GranSim and GUM
2700       the operations on the queue-elements are the same
2701       long live polymorphism!
2702
2703   Locks: sched_mutex is held upon entry and exit.
2704
2705 */
2706 static void
2707 unblockThread(StgTSO *tso)
2708 {
2709   StgBlockingQueueElement *t, **last;
2710
2711   switch (tso->why_blocked) {
2712
2713   case NotBlocked:
2714     return;  /* not blocked */
2715
2716   case BlockedOnMVar:
2717     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2718     {
2719       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2720       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2721
2722       last = (StgBlockingQueueElement **)&mvar->head;
2723       for (t = (StgBlockingQueueElement *)mvar->head; 
2724            t != END_BQ_QUEUE; 
2725            last = &t->link, last_tso = t, t = t->link) {
2726         if (t == (StgBlockingQueueElement *)tso) {
2727           *last = (StgBlockingQueueElement *)tso->link;
2728           if (mvar->tail == tso) {
2729             mvar->tail = (StgTSO *)last_tso;
2730           }
2731           goto done;
2732         }
2733       }
2734       barf("unblockThread (MVAR): TSO not found");
2735     }
2736
2737   case BlockedOnBlackHole:
2738     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2739     {
2740       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2741
2742       last = &bq->blocking_queue;
2743       for (t = bq->blocking_queue; 
2744            t != END_BQ_QUEUE; 
2745            last = &t->link, t = t->link) {
2746         if (t == (StgBlockingQueueElement *)tso) {
2747           *last = (StgBlockingQueueElement *)tso->link;
2748           goto done;
2749         }
2750       }
2751       barf("unblockThread (BLACKHOLE): TSO not found");
2752     }
2753
2754   case BlockedOnException:
2755     {
2756       StgTSO *target  = tso->block_info.tso;
2757
2758       ASSERT(get_itbl(target)->type == TSO);
2759
2760       if (target->what_next == ThreadRelocated) {
2761           target = target->link;
2762           ASSERT(get_itbl(target)->type == TSO);
2763       }
2764
2765       ASSERT(target->blocked_exceptions != NULL);
2766
2767       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2768       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2769            t != END_BQ_QUEUE; 
2770            last = &t->link, t = t->link) {
2771         ASSERT(get_itbl(t)->type == TSO);
2772         if (t == (StgBlockingQueueElement *)tso) {
2773           *last = (StgBlockingQueueElement *)tso->link;
2774           goto done;
2775         }
2776       }
2777       barf("unblockThread (Exception): TSO not found");
2778     }
2779
2780   case BlockedOnRead:
2781   case BlockedOnWrite:
2782 #if defined(mingw32_TARGET_OS)
2783   case BlockedOnDoProc:
2784 #endif
2785     {
2786       /* take TSO off blocked_queue */
2787       StgBlockingQueueElement *prev = NULL;
2788       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2789            prev = t, t = t->link) {
2790         if (t == (StgBlockingQueueElement *)tso) {
2791           if (prev == NULL) {
2792             blocked_queue_hd = (StgTSO *)t->link;
2793             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2794               blocked_queue_tl = END_TSO_QUEUE;
2795             }
2796           } else {
2797             prev->link = t->link;
2798             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2799               blocked_queue_tl = (StgTSO *)prev;
2800             }
2801           }
2802           goto done;
2803         }
2804       }
2805       barf("unblockThread (I/O): TSO not found");
2806     }
2807
2808   case BlockedOnDelay:
2809     {
2810       /* take TSO off sleeping_queue */
2811       StgBlockingQueueElement *prev = NULL;
2812       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2813            prev = t, t = t->link) {
2814         if (t == (StgBlockingQueueElement *)tso) {
2815           if (prev == NULL) {
2816             sleeping_queue = (StgTSO *)t->link;
2817           } else {
2818             prev->link = t->link;
2819           }
2820           goto done;
2821         }
2822       }
2823       barf("unblockThread (delay): TSO not found");
2824     }
2825
2826   default:
2827     barf("unblockThread");
2828   }
2829
2830  done:
2831   tso->link = END_TSO_QUEUE;
2832   tso->why_blocked = NotBlocked;
2833   tso->block_info.closure = NULL;
2834   PUSH_ON_RUN_QUEUE(tso);
2835 }
2836 #else
2837 static void
2838 unblockThread(StgTSO *tso)
2839 {
2840   StgTSO *t, **last;
2841   
2842   /* To avoid locking unnecessarily. */
2843   if (tso->why_blocked == NotBlocked) {
2844     return;
2845   }
2846
2847   switch (tso->why_blocked) {
2848
2849   case BlockedOnMVar:
2850     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2851     {
2852       StgTSO *last_tso = END_TSO_QUEUE;
2853       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2854
2855       last = &mvar->head;
2856       for (t = mvar->head; t != END_TSO_QUEUE; 
2857            last = &t->link, last_tso = t, t = t->link) {
2858         if (t == tso) {
2859           *last = tso->link;
2860           if (mvar->tail == tso) {
2861             mvar->tail = last_tso;
2862           }
2863           goto done;
2864         }
2865       }
2866       barf("unblockThread (MVAR): TSO not found");
2867     }
2868
2869   case BlockedOnBlackHole:
2870     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2871     {
2872       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2873
2874       last = &bq->blocking_queue;
2875       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2876            last = &t->link, t = t->link) {
2877         if (t == tso) {
2878           *last = tso->link;
2879           goto done;
2880         }
2881       }
2882       barf("unblockThread (BLACKHOLE): TSO not found");
2883     }
2884
2885   case BlockedOnException:
2886     {
2887       StgTSO *target  = tso->block_info.tso;
2888
2889       ASSERT(get_itbl(target)->type == TSO);
2890
2891       while (target->what_next == ThreadRelocated) {
2892           target = target->link;
2893           ASSERT(get_itbl(target)->type == TSO);
2894       }
2895       
2896       ASSERT(target->blocked_exceptions != NULL);
2897
2898       last = &target->blocked_exceptions;
2899       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2900            last = &t->link, t = t->link) {
2901         ASSERT(get_itbl(t)->type == TSO);
2902         if (t == tso) {
2903           *last = tso->link;
2904           goto done;
2905         }
2906       }
2907       barf("unblockThread (Exception): TSO not found");
2908     }
2909
2910   case BlockedOnRead:
2911   case BlockedOnWrite:
2912 #if defined(mingw32_TARGET_OS)
2913   case BlockedOnDoProc:
2914 #endif
2915     {
2916       StgTSO *prev = NULL;
2917       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2918            prev = t, t = t->link) {
2919         if (t == tso) {
2920           if (prev == NULL) {
2921             blocked_queue_hd = t->link;
2922             if (blocked_queue_tl == t) {
2923               blocked_queue_tl = END_TSO_QUEUE;
2924             }
2925           } else {
2926             prev->link = t->link;
2927             if (blocked_queue_tl == t) {
2928               blocked_queue_tl = prev;
2929             }
2930           }
2931           goto done;
2932         }
2933       }
2934       barf("unblockThread (I/O): TSO not found");
2935     }
2936
2937   case BlockedOnDelay:
2938     {
2939       StgTSO *prev = NULL;
2940       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2941            prev = t, t = t->link) {
2942         if (t == tso) {
2943           if (prev == NULL) {
2944             sleeping_queue = t->link;
2945           } else {
2946             prev->link = t->link;
2947           }
2948           goto done;
2949         }
2950       }
2951       barf("unblockThread (delay): TSO not found");
2952     }
2953
2954   default:
2955     barf("unblockThread");
2956   }
2957
2958  done:
2959   tso->link = END_TSO_QUEUE;
2960   tso->why_blocked = NotBlocked;
2961   tso->block_info.closure = NULL;
2962   PUSH_ON_RUN_QUEUE(tso);
2963 }
2964 #endif
2965
2966 /* -----------------------------------------------------------------------------
2967  * raiseAsync()
2968  *
2969  * The following function implements the magic for raising an
2970  * asynchronous exception in an existing thread.
2971  *
2972  * We first remove the thread from any queue on which it might be
2973  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2974  *
2975  * We strip the stack down to the innermost CATCH_FRAME, building
2976  * thunks in the heap for all the active computations, so they can 
2977  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2978  * an application of the handler to the exception, and push it on
2979  * the top of the stack.
2980  * 
2981  * How exactly do we save all the active computations?  We create an
2982  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2983  * AP_STACKs pushes everything from the corresponding update frame
2984  * upwards onto the stack.  (Actually, it pushes everything up to the
2985  * next update frame plus a pointer to the next AP_STACK object.
2986  * Entering the next AP_STACK object pushes more onto the stack until we
2987  * reach the last AP_STACK object - at which point the stack should look
2988  * exactly as it did when we killed the TSO and we can continue
2989  * execution by entering the closure on top of the stack.
2990  *
2991  * We can also kill a thread entirely - this happens if either (a) the 
2992  * exception passed to raiseAsync is NULL, or (b) there's no
2993  * CATCH_FRAME on the stack.  In either case, we strip the entire
2994  * stack and replace the thread with a zombie.
2995  *
2996  * Locks: sched_mutex held upon entry nor exit.
2997  *
2998  * -------------------------------------------------------------------------- */
2999  
3000 void 
3001 deleteThread(StgTSO *tso)
3002 {
3003   raiseAsync(tso,NULL);
3004 }
3005
3006 static void 
3007 deleteThreadImmediately(StgTSO *tso)
3008 { // for forkProcess only:
3009   // delete thread without giving it a chance to catch the KillThread exception
3010
3011   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3012       return;
3013   }
3014 #if defined(RTS_SUPPORTS_THREADS)
3015   if (tso->why_blocked != BlockedOnCCall
3016       && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
3017 #endif
3018     unblockThread(tso);
3019   tso->what_next = ThreadKilled;
3020 }
3021
3022 void
3023 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3024 {
3025   /* When raising async exs from contexts where sched_mutex isn't held;
3026      use raiseAsyncWithLock(). */
3027   ACQUIRE_LOCK(&sched_mutex);
3028   raiseAsync(tso,exception);
3029   RELEASE_LOCK(&sched_mutex);
3030 }
3031
3032 void
3033 raiseAsync(StgTSO *tso, StgClosure *exception)
3034 {
3035     StgRetInfoTable *info;
3036     StgPtr sp;
3037   
3038     // Thread already dead?
3039     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3040         return;
3041     }
3042
3043     IF_DEBUG(scheduler, 
3044              sched_belch("raising exception in thread %ld.", tso->id));
3045     
3046     // Remove it from any blocking queues
3047     unblockThread(tso);
3048
3049     sp = tso->sp;
3050     
3051     // The stack freezing code assumes there's a closure pointer on
3052     // the top of the stack, so we have to arrange that this is the case...
3053     //
3054     if (sp[0] == (W_)&stg_enter_info) {
3055         sp++;
3056     } else {
3057         sp--;
3058         sp[0] = (W_)&stg_dummy_ret_closure;
3059     }
3060
3061     while (1) {
3062         nat i;
3063
3064         // 1. Let the top of the stack be the "current closure"
3065         //
3066         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3067         // CATCH_FRAME.
3068         //
3069         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3070         // current closure applied to the chunk of stack up to (but not
3071         // including) the update frame.  This closure becomes the "current
3072         // closure".  Go back to step 2.
3073         //
3074         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3075         // top of the stack applied to the exception.
3076         // 
3077         // 5. If it's a STOP_FRAME, then kill the thread.
3078         
3079         StgPtr frame;
3080         
3081         frame = sp + 1;
3082         info = get_ret_itbl((StgClosure *)frame);
3083         
3084         while (info->i.type != UPDATE_FRAME
3085                && (info->i.type != CATCH_FRAME || exception == NULL)
3086                && info->i.type != STOP_FRAME) {
3087             frame += stack_frame_sizeW((StgClosure *)frame);
3088             info = get_ret_itbl((StgClosure *)frame);
3089         }
3090         
3091         switch (info->i.type) {
3092             
3093         case CATCH_FRAME:
3094             // If we find a CATCH_FRAME, and we've got an exception to raise,
3095             // then build the THUNK raise(exception), and leave it on
3096             // top of the CATCH_FRAME ready to enter.
3097             //
3098         {
3099 #ifdef PROFILING
3100             StgCatchFrame *cf = (StgCatchFrame *)frame;
3101 #endif
3102             StgClosure *raise;
3103             
3104             // we've got an exception to raise, so let's pass it to the
3105             // handler in this frame.
3106             //
3107             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3108             TICK_ALLOC_SE_THK(1,0);
3109             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3110             raise->payload[0] = exception;
3111             
3112             // throw away the stack from Sp up to the CATCH_FRAME.
3113             //
3114             sp = frame - 1;
3115             
3116             /* Ensure that async excpetions are blocked now, so we don't get
3117              * a surprise exception before we get around to executing the
3118              * handler.
3119              */
3120             if (tso->blocked_exceptions == NULL) {
3121                 tso->blocked_exceptions = END_TSO_QUEUE;
3122             }
3123             
3124             /* Put the newly-built THUNK on top of the stack, ready to execute
3125              * when the thread restarts.
3126              */
3127             sp[0] = (W_)raise;
3128             sp[-1] = (W_)&stg_enter_info;
3129             tso->sp = sp-1;
3130             tso->what_next = ThreadRunGHC;
3131             IF_DEBUG(sanity, checkTSO(tso));
3132             return;
3133         }
3134         
3135         case UPDATE_FRAME:
3136         {
3137             StgAP_STACK * ap;
3138             nat words;
3139             
3140             // First build an AP_STACK consisting of the stack chunk above the
3141             // current update frame, with the top word on the stack as the
3142             // fun field.
3143             //
3144             words = frame - sp - 1;
3145             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3146             
3147             ap->size = words;
3148             ap->fun  = (StgClosure *)sp[0];
3149             sp++;
3150             for(i=0; i < (nat)words; ++i) {
3151                 ap->payload[i] = (StgClosure *)*sp++;
3152             }
3153             
3154             SET_HDR(ap,&stg_AP_STACK_info,
3155                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3156             TICK_ALLOC_UP_THK(words+1,0);
3157             
3158             IF_DEBUG(scheduler,
3159                      fprintf(stderr,  "sched: Updating ");
3160                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3161                      fprintf(stderr,  " with ");
3162                      printObj((StgClosure *)ap);
3163                 );
3164
3165             // Replace the updatee with an indirection - happily
3166             // this will also wake up any threads currently
3167             // waiting on the result.
3168             //
3169             // Warning: if we're in a loop, more than one update frame on
3170             // the stack may point to the same object.  Be careful not to
3171             // overwrite an IND_OLDGEN in this case, because we'll screw
3172             // up the mutable lists.  To be on the safe side, don't
3173             // overwrite any kind of indirection at all.  See also
3174             // threadSqueezeStack in GC.c, where we have to make a similar
3175             // check.
3176             //
3177             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3178                 // revert the black hole
3179                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3180             }
3181             sp += sizeofW(StgUpdateFrame) - 1;
3182             sp[0] = (W_)ap; // push onto stack
3183             break;
3184         }
3185         
3186         case STOP_FRAME:
3187             // We've stripped the entire stack, the thread is now dead.
3188             sp += sizeofW(StgStopFrame);
3189             tso->what_next = ThreadKilled;
3190             tso->sp = sp;
3191             return;
3192             
3193         default:
3194             barf("raiseAsync");
3195         }
3196     }
3197     barf("raiseAsync");
3198 }
3199
3200 /* -----------------------------------------------------------------------------
3201    resurrectThreads is called after garbage collection on the list of
3202    threads found to be garbage.  Each of these threads will be woken
3203    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3204    on an MVar, or NonTermination if the thread was blocked on a Black
3205    Hole.
3206
3207    Locks: sched_mutex isn't held upon entry nor exit.
3208    -------------------------------------------------------------------------- */
3209
3210 void
3211 resurrectThreads( StgTSO *threads )
3212 {
3213   StgTSO *tso, *next;
3214
3215   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3216     next = tso->global_link;
3217     tso->global_link = all_threads;
3218     all_threads = tso;
3219     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3220
3221     switch (tso->why_blocked) {
3222     case BlockedOnMVar:
3223     case BlockedOnException:
3224       /* Called by GC - sched_mutex lock is currently held. */
3225       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3226       break;
3227     case BlockedOnBlackHole:
3228       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3229       break;
3230     case NotBlocked:
3231       /* This might happen if the thread was blocked on a black hole
3232        * belonging to a thread that we've just woken up (raiseAsync
3233        * can wake up threads, remember...).
3234        */
3235       continue;
3236     default:
3237       barf("resurrectThreads: thread blocked in a strange way");
3238     }
3239   }
3240 }
3241
3242 /* -----------------------------------------------------------------------------
3243  * Blackhole detection: if we reach a deadlock, test whether any
3244  * threads are blocked on themselves.  Any threads which are found to
3245  * be self-blocked get sent a NonTermination exception.
3246  *
3247  * This is only done in a deadlock situation in order to avoid
3248  * performance overhead in the normal case.
3249  *
3250  * Locks: sched_mutex is held upon entry and exit.
3251  * -------------------------------------------------------------------------- */
3252
3253 static void
3254 detectBlackHoles( void )
3255 {
3256     StgTSO *tso = all_threads;
3257     StgClosure *frame;
3258     StgClosure *blocked_on;
3259     StgRetInfoTable *info;
3260
3261     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3262
3263         while (tso->what_next == ThreadRelocated) {
3264             tso = tso->link;
3265             ASSERT(get_itbl(tso)->type == TSO);
3266         }
3267       
3268         if (tso->why_blocked != BlockedOnBlackHole) {
3269             continue;
3270         }
3271         blocked_on = tso->block_info.closure;
3272
3273         frame = (StgClosure *)tso->sp;
3274
3275         while(1) {
3276             info = get_ret_itbl(frame);
3277             switch (info->i.type) {
3278             case UPDATE_FRAME:
3279                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3280                     /* We are blocking on one of our own computations, so
3281                      * send this thread the NonTermination exception.  
3282                      */
3283                     IF_DEBUG(scheduler, 
3284                              sched_belch("thread %d is blocked on itself", tso->id));
3285                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3286                     goto done;
3287                 }
3288                 
3289                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3290                 continue;
3291
3292             case STOP_FRAME:
3293                 goto done;
3294
3295                 // normal stack frames; do nothing except advance the pointer
3296             default:
3297                 (StgPtr)frame += stack_frame_sizeW(frame);
3298             }
3299         }   
3300         done: ;
3301     }
3302 }
3303
3304 /* ----------------------------------------------------------------------------
3305  * Debugging: why is a thread blocked
3306  * [Also provides useful information when debugging threaded programs
3307  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3308    ------------------------------------------------------------------------- */
3309
3310 static
3311 void
3312 printThreadBlockage(StgTSO *tso)
3313 {
3314   switch (tso->why_blocked) {
3315   case BlockedOnRead:
3316     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3317     break;
3318   case BlockedOnWrite:
3319     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3320     break;
3321 #if defined(mingw32_TARGET_OS)
3322     case BlockedOnDoProc:
3323     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3324     break;
3325 #endif
3326   case BlockedOnDelay:
3327     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3328     break;
3329   case BlockedOnMVar:
3330     fprintf(stderr,"is blocked on an MVar");
3331     break;
3332   case BlockedOnException:
3333     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3334             tso->block_info.tso->id);
3335     break;
3336   case BlockedOnBlackHole:
3337     fprintf(stderr,"is blocked on a black hole");
3338     break;
3339   case NotBlocked:
3340     fprintf(stderr,"is not blocked");
3341     break;
3342 #if defined(PAR)
3343   case BlockedOnGA:
3344     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3345             tso->block_info.closure, info_type(tso->block_info.closure));
3346     break;
3347   case BlockedOnGA_NoSend:
3348     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3349             tso->block_info.closure, info_type(tso->block_info.closure));
3350     break;
3351 #endif
3352 #if defined(RTS_SUPPORTS_THREADS)
3353   case BlockedOnCCall:
3354     fprintf(stderr,"is blocked on an external call");
3355     break;
3356   case BlockedOnCCall_NoUnblockExc:
3357     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3358     break;
3359 #endif
3360   default:
3361     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3362          tso->why_blocked, tso->id, tso);
3363   }
3364 }
3365
3366 static
3367 void
3368 printThreadStatus(StgTSO *tso)
3369 {
3370   switch (tso->what_next) {
3371   case ThreadKilled:
3372     fprintf(stderr,"has been killed");
3373     break;
3374   case ThreadComplete:
3375     fprintf(stderr,"has completed");
3376     break;
3377   default:
3378     printThreadBlockage(tso);
3379   }
3380 }
3381
3382 void
3383 printAllThreads(void)
3384 {
3385   StgTSO *t;
3386   void *label;
3387
3388 # if defined(GRAN)
3389   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3390   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3391                        time_string, rtsFalse/*no commas!*/);
3392
3393   fprintf(stderr, "all threads at [%s]:\n", time_string);
3394 # elif defined(PAR)
3395   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3396   ullong_format_string(CURRENT_TIME,
3397                        time_string, rtsFalse/*no commas!*/);
3398
3399   fprintf(stderr,"all threads at [%s]:\n", time_string);
3400 # else
3401   fprintf(stderr,"all threads:\n");
3402 # endif
3403
3404   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3405     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3406     label = lookupThreadLabel(t->id);
3407     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3408     printThreadStatus(t);
3409     fprintf(stderr,"\n");
3410   }
3411 }
3412     
3413 #ifdef DEBUG
3414
3415 /* 
3416    Print a whole blocking queue attached to node (debugging only).
3417 */
3418 # if defined(PAR)
3419 void 
3420 print_bq (StgClosure *node)
3421 {
3422   StgBlockingQueueElement *bqe;
3423   StgTSO *tso;
3424   rtsBool end;
3425
3426   fprintf(stderr,"## BQ of closure %p (%s): ",
3427           node, info_type(node));
3428
3429   /* should cover all closures that may have a blocking queue */
3430   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3431          get_itbl(node)->type == FETCH_ME_BQ ||
3432          get_itbl(node)->type == RBH ||
3433          get_itbl(node)->type == MVAR);
3434     
3435   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3436
3437   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3438 }
3439
3440 /* 
3441    Print a whole blocking queue starting with the element bqe.
3442 */
3443 void 
3444 print_bqe (StgBlockingQueueElement *bqe)
3445 {
3446   rtsBool end;
3447
3448   /* 
3449      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3450   */
3451   for (end = (bqe==END_BQ_QUEUE);
3452        !end; // iterate until bqe points to a CONSTR
3453        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3454        bqe = end ? END_BQ_QUEUE : bqe->link) {
3455     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3456     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3457     /* types of closures that may appear in a blocking queue */
3458     ASSERT(get_itbl(bqe)->type == TSO ||           
3459            get_itbl(bqe)->type == BLOCKED_FETCH || 
3460            get_itbl(bqe)->type == CONSTR); 
3461     /* only BQs of an RBH end with an RBH_Save closure */
3462     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3463
3464     switch (get_itbl(bqe)->type) {
3465     case TSO:
3466       fprintf(stderr," TSO %u (%x),",
3467               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3468       break;
3469     case BLOCKED_FETCH:
3470       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3471               ((StgBlockedFetch *)bqe)->node, 
3472               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3473               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3474               ((StgBlockedFetch *)bqe)->ga.weight);
3475       break;
3476     case CONSTR:
3477       fprintf(stderr," %s (IP %p),",
3478               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3479                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3480                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3481                "RBH_Save_?"), get_itbl(bqe));
3482       break;
3483     default:
3484       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3485            info_type((StgClosure *)bqe)); // , node, info_type(node));
3486       break;
3487     }
3488   } /* for */
3489   fputc('\n', stderr);
3490 }
3491 # elif defined(GRAN)
3492 void 
3493 print_bq (StgClosure *node)
3494 {
3495   StgBlockingQueueElement *bqe;
3496   PEs node_loc, tso_loc;
3497   rtsBool end;
3498
3499   /* should cover all closures that may have a blocking queue */
3500   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3501          get_itbl(node)->type == FETCH_ME_BQ ||
3502          get_itbl(node)->type == RBH);
3503     
3504   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3505   node_loc = where_is(node);
3506
3507   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3508           node, info_type(node), node_loc);
3509
3510   /* 
3511      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3512   */
3513   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3514        !end; // iterate until bqe points to a CONSTR
3515        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3516     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3517     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3518     /* types of closures that may appear in a blocking queue */
3519     ASSERT(get_itbl(bqe)->type == TSO ||           
3520            get_itbl(bqe)->type == CONSTR); 
3521     /* only BQs of an RBH end with an RBH_Save closure */
3522     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3523
3524     tso_loc = where_is((StgClosure *)bqe);
3525     switch (get_itbl(bqe)->type) {
3526     case TSO:
3527       fprintf(stderr," TSO %d (%p) on [PE %d],",
3528               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3529       break;
3530     case CONSTR:
3531       fprintf(stderr," %s (IP %p),",
3532               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3533                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3534                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3535                "RBH_Save_?"), get_itbl(bqe));
3536       break;
3537     default:
3538       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3539            info_type((StgClosure *)bqe), node, info_type(node));
3540       break;
3541     }
3542   } /* for */
3543   fputc('\n', stderr);
3544 }
3545 #else
3546 /* 
3547    Nice and easy: only TSOs on the blocking queue
3548 */
3549 void 
3550 print_bq (StgClosure *node)
3551 {
3552   StgTSO *tso;
3553
3554   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3555   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3556        tso != END_TSO_QUEUE; 
3557        tso=tso->link) {
3558     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3559     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3560     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3561   }
3562   fputc('\n', stderr);
3563 }
3564 # endif
3565
3566 #if defined(PAR)
3567 static nat
3568 run_queue_len(void)
3569 {
3570   nat i;
3571   StgTSO *tso;
3572
3573   for (i=0, tso=run_queue_hd; 
3574        tso != END_TSO_QUEUE;
3575        i++, tso=tso->link)
3576     /* nothing */
3577
3578   return i;
3579 }
3580 #endif
3581
3582 void
3583 sched_belch(char *s, ...)
3584 {
3585   va_list ap;
3586   va_start(ap,s);
3587 #ifdef RTS_SUPPORTS_THREADS
3588   fprintf(stderr, "sched (task %p): ", osThreadId());
3589 #elif defined(PAR)
3590   fprintf(stderr, "== ");
3591 #else
3592   fprintf(stderr, "sched: ");
3593 #endif
3594   vfprintf(stderr, s, ap);
3595   fprintf(stderr, "\n");
3596   fflush(stderr);
3597   va_end(ap);
3598 }
3599
3600 #endif /* DEBUG */