[project @ 2004-02-26 14:49:05 by simonpj]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.186 2004/02/26 11:41:22 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2003
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 /* 
21  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22
23    The main scheduling loop in GUM iterates until a finish message is received.
24    In that case a global flag @receivedFinish@ is set and this instance of
25    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
26    for the handling of incoming messages, such as PP_FINISH.
27    Note that in the parallel case we have a system manager that coordinates
28    different PEs, each of which are running one instance of the RTS.
29    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
30    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31
32  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33
34    The main scheduling code in GranSim is quite different from that in std
35    (concurrent) Haskell: while concurrent Haskell just iterates over the
36    threads in the runnable queue, GranSim is event driven, i.e. it iterates
37    over the events in the global event queue.  -- HWL
38 */
39
40 #include "PosixSource.h"
41 #include "Rts.h"
42 #include "SchedAPI.h"
43 #include "RtsUtils.h"
44 #include "RtsFlags.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "StgStartup.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Storage.h"
53 #include "Interpreter.h"
54 #include "Exception.h"
55 #include "Printer.h"
56 #include "Signals.h"
57 #include "Sanity.h"
58 #include "Stats.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #ifdef PROFILING
63 #include "Proftimer.h"
64 #include "ProfHeap.h"
65 #endif
66 #if defined(GRAN) || defined(PAR)
67 # include "GranSimRts.h"
68 # include "GranSim.h"
69 # include "ParallelRts.h"
70 # include "Parallel.h"
71 # include "ParallelDebug.h"
72 # include "FetchMe.h"
73 # include "HLC.h"
74 #endif
75 #include "Sparks.h"
76 #include "Capability.h"
77 #include "OSThreads.h"
78 #include  "Task.h"
79
80 #ifdef HAVE_SYS_TYPES_H
81 #include <sys/types.h>
82 #endif
83 #ifdef HAVE_UNISTD_H
84 #include <unistd.h>
85 #endif
86
87 #include <string.h>
88 #include <stdlib.h>
89 #include <stdarg.h>
90
91 #ifdef HAVE_ERRNO_H
92 #include <errno.h>
93 #endif
94
95 #ifdef THREADED_RTS
96 #define USED_IN_THREADED_RTS
97 #else
98 #define USED_IN_THREADED_RTS STG_UNUSED
99 #endif
100
101 #ifdef RTS_SUPPORTS_THREADS
102 #define USED_WHEN_RTS_SUPPORTS_THREADS
103 #else
104 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
105 #endif
106
107 /* Main thread queue.
108  * Locks required: sched_mutex.
109  */
110 StgMainThread *main_threads = NULL;
111
112 /* Thread queues.
113  * Locks required: sched_mutex.
114  */
115 #if defined(GRAN)
116
117 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
118 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
119
120 /* 
121    In GranSim we have a runnable and a blocked queue for each processor.
122    In order to minimise code changes new arrays run_queue_hds/tls
123    are created. run_queue_hd is then a short cut (macro) for
124    run_queue_hds[CurrentProc] (see GranSim.h).
125    -- HWL
126 */
127 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
128 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
129 StgTSO *ccalling_threadss[MAX_PROC];
130 /* We use the same global list of threads (all_threads) in GranSim as in
131    the std RTS (i.e. we are cheating). However, we don't use this list in
132    the GranSim specific code at the moment (so we are only potentially
133    cheating).  */
134
135 #else /* !GRAN */
136
137 StgTSO *run_queue_hd = NULL;
138 StgTSO *run_queue_tl = NULL;
139 StgTSO *blocked_queue_hd = NULL;
140 StgTSO *blocked_queue_tl = NULL;
141 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
142
143 #endif
144
145 /* Linked list of all threads.
146  * Used for detecting garbage collected threads.
147  */
148 StgTSO *all_threads = NULL;
149
150 /* When a thread performs a safe C call (_ccall_GC, using old
151  * terminology), it gets put on the suspended_ccalling_threads
152  * list. Used by the garbage collector.
153  */
154 static StgTSO *suspended_ccalling_threads;
155
156 static StgTSO *threadStackOverflow(StgTSO *tso);
157
158 /* KH: The following two flags are shared memory locations.  There is no need
159        to lock them, since they are only unset at the end of a scheduler
160        operation.
161 */
162
163 /* flag set by signal handler to precipitate a context switch */
164 nat context_switch = 0;
165
166 /* if this flag is set as well, give up execution */
167 rtsBool interrupted = rtsFalse;
168
169 /* Next thread ID to allocate.
170  * Locks required: thread_id_mutex
171  */
172 static StgThreadID next_thread_id = 1;
173
174 /*
175  * Pointers to the state of the current thread.
176  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
177  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
178  */
179  
180 /* The smallest stack size that makes any sense is:
181  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
182  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
183  *  + 1                       (the closure to enter)
184  *  + 1                       (stg_ap_v_ret)
185  *  + 1                       (spare slot req'd by stg_ap_v_ret)
186  *
187  * A thread with this stack will bomb immediately with a stack
188  * overflow, which will increase its stack size.  
189  */
190
191 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
192
193
194 #if defined(GRAN)
195 StgTSO *CurrentTSO;
196 #endif
197
198 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
199  *  exists - earlier gccs apparently didn't.
200  *  -= chak
201  */
202 StgTSO dummy_tso;
203
204 static rtsBool ready_to_gc;
205
206 /*
207  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
208  * in an MT setting, needed to signal that a worker thread shouldn't hang around
209  * in the scheduler when it is out of work.
210  */
211 static rtsBool shutting_down_scheduler = rtsFalse;
212
213 void            addToBlockedQueue ( StgTSO *tso );
214
215 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
216        void     interruptStgRts   ( void );
217
218 static void     detectBlackHoles  ( void );
219
220 #if defined(RTS_SUPPORTS_THREADS)
221 /* ToDo: carefully document the invariants that go together
222  *       with these synchronisation objects.
223  */
224 Mutex     sched_mutex       = INIT_MUTEX_VAR;
225 Mutex     term_mutex        = INIT_MUTEX_VAR;
226
227 /*
228  * A heavyweight solution to the problem of protecting
229  * the thread_id from concurrent update.
230  */
231 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
232
233 #endif /* RTS_SUPPORTS_THREADS */
234
235 #if defined(PAR)
236 StgTSO *LastTSO;
237 rtsTime TimeOfLastYield;
238 rtsBool emitSchedule = rtsTrue;
239 #endif
240
241 #if DEBUG
242 static char *whatNext_strs[] = {
243   "ThreadRunGHC",
244   "ThreadInterpret",
245   "ThreadKilled",
246   "ThreadRelocated",
247   "ThreadComplete"
248 };
249 #endif
250
251 #if defined(PAR)
252 StgTSO * createSparkThread(rtsSpark spark);
253 StgTSO * activateSpark (rtsSpark spark);  
254 #endif
255
256 /* ----------------------------------------------------------------------------
257  * Starting Tasks
258  * ------------------------------------------------------------------------- */
259
260 #if defined(RTS_SUPPORTS_THREADS)
261 static rtsBool startingWorkerThread = rtsFalse;
262
263 static void taskStart(void);
264 static void
265 taskStart(void)
266 {
267   ACQUIRE_LOCK(&sched_mutex);
268   schedule(NULL,NULL);
269   RELEASE_LOCK(&sched_mutex);
270 }
271
272 void
273 startSchedulerTaskIfNecessary(void)
274 {
275   if(run_queue_hd != END_TSO_QUEUE
276     || blocked_queue_hd != END_TSO_QUEUE
277     || sleeping_queue != END_TSO_QUEUE)
278   {
279     if(!startingWorkerThread)
280     { // we don't want to start another worker thread
281       // just because the last one hasn't yet reached the
282       // "waiting for capability" state
283       startingWorkerThread = rtsTrue;
284       startTask(taskStart);
285     }
286   }
287 }
288 #endif
289
290 /* ---------------------------------------------------------------------------
291    Main scheduling loop.
292
293    We use round-robin scheduling, each thread returning to the
294    scheduler loop when one of these conditions is detected:
295
296       * out of heap space
297       * timer expires (thread yields)
298       * thread blocks
299       * thread ends
300       * stack overflow
301
302    Locking notes:  we acquire the scheduler lock once at the beginning
303    of the scheduler loop, and release it when
304     
305       * running a thread, or
306       * waiting for work, or
307       * waiting for a GC to complete.
308
309    GRAN version:
310      In a GranSim setup this loop iterates over the global event queue.
311      This revolves around the global event queue, which determines what 
312      to do next. Therefore, it's more complicated than either the 
313      concurrent or the parallel (GUM) setup.
314
315    GUM version:
316      GUM iterates over incoming messages.
317      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
318      and sends out a fish whenever it has nothing to do; in-between
319      doing the actual reductions (shared code below) it processes the
320      incoming messages and deals with delayed operations 
321      (see PendingFetches).
322      This is not the ugliest code you could imagine, but it's bloody close.
323
324    ------------------------------------------------------------------------ */
325 static void
326 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
327           Capability *initialCapability )
328 {
329   StgTSO *t;
330   Capability *cap = initialCapability;
331   StgThreadReturnCode ret;
332 #if defined(GRAN)
333   rtsEvent *event;
334 #elif defined(PAR)
335   StgSparkPool *pool;
336   rtsSpark spark;
337   StgTSO *tso;
338   GlobalTaskId pe;
339   rtsBool receivedFinish = rtsFalse;
340 # if defined(DEBUG)
341   nat tp_size, sp_size; // stats only
342 # endif
343 #endif
344   rtsBool was_interrupted = rtsFalse;
345   StgTSOWhatNext prev_what_next;
346   
347   // Pre-condition: sched_mutex is held.
348  
349 #if defined(RTS_SUPPORTS_THREADS)
350   //
351   // in the threaded case, the capability is either passed in via the
352   // initialCapability parameter, or initialized inside the scheduler
353   // loop 
354   //
355   IF_DEBUG(scheduler,
356            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
357                        mainThread, initialCapability);
358       );
359 #else
360   // simply initialise it in the non-threaded case
361   grabCapability(&cap);
362 #endif
363
364 #if defined(GRAN)
365   /* set up first event to get things going */
366   /* ToDo: assign costs for system setup and init MainTSO ! */
367   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
368             ContinueThread, 
369             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
370
371   IF_DEBUG(gran,
372            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
373            G_TSO(CurrentTSO, 5));
374
375   if (RtsFlags.GranFlags.Light) {
376     /* Save current time; GranSim Light only */
377     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
378   }      
379
380   event = get_next_event();
381
382   while (event!=(rtsEvent*)NULL) {
383     /* Choose the processor with the next event */
384     CurrentProc = event->proc;
385     CurrentTSO = event->tso;
386
387 #elif defined(PAR)
388
389   while (!receivedFinish) {    /* set by processMessages */
390                                /* when receiving PP_FINISH message         */ 
391
392 #else // everything except GRAN and PAR
393
394   while (1) {
395
396 #endif
397
398      IF_DEBUG(scheduler, printAllThreads());
399
400 #if defined(RTS_SUPPORTS_THREADS)
401       // Yield the capability to higher-priority tasks if necessary.
402       //
403       if (cap != NULL) {
404           yieldCapability(&cap);
405       }
406
407       // If we do not currently hold a capability, we wait for one
408       //
409       if (cap == NULL) {
410           waitForCapability(&sched_mutex, &cap,
411                             mainThread ? &mainThread->bound_thread_cond : NULL);
412       }
413
414       // We now have a capability...
415 #endif
416
417     //
418     // If we're interrupted (the user pressed ^C, or some other
419     // termination condition occurred), kill all the currently running
420     // threads.
421     //
422     if (interrupted) {
423         IF_DEBUG(scheduler, sched_belch("interrupted"));
424         interrupted = rtsFalse;
425         was_interrupted = rtsTrue;
426 #if defined(RTS_SUPPORTS_THREADS)
427         // In the threaded RTS, deadlock detection doesn't work,
428         // so just exit right away.
429         prog_belch("interrupted");
430         releaseCapability(cap);
431         RELEASE_LOCK(&sched_mutex);
432         shutdownHaskellAndExit(EXIT_SUCCESS);
433 #else
434         deleteAllThreads();
435 #endif
436     }
437
438     //
439     // Go through the list of main threads and wake up any
440     // clients whose computations have finished.  ToDo: this
441     // should be done more efficiently without a linear scan
442     // of the main threads list, somehow...
443     //
444 #if defined(RTS_SUPPORTS_THREADS)
445     { 
446         StgMainThread *m, **prev;
447         prev = &main_threads;
448         for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
449           if (m->tso->what_next == ThreadComplete
450               || m->tso->what_next == ThreadKilled)
451           {
452             if (m == mainThread)
453             {
454               if (m->tso->what_next == ThreadComplete)
455               {
456                 if (m->ret)
457                 {
458                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
459                   *(m->ret) = (StgClosure *)m->tso->sp[1]; 
460                 }
461                 m->stat = Success;
462               }
463               else
464               {
465                 if (m->ret)
466                 {
467                   *(m->ret) = NULL;
468                 }
469                 if (was_interrupted)
470                 {
471                   m->stat = Interrupted;
472                 }
473                 else
474                 {
475                   m->stat = Killed;
476                 }
477               }
478               *prev = m->link;
479             
480 #ifdef DEBUG
481               removeThreadLabel((StgWord)m->tso->id);
482 #endif
483               releaseCapability(cap);
484               return;
485             }
486             else
487             {
488                 // The current OS thread can not handle the fact that
489                 // the Haskell thread "m" has ended.  "m" is bound;
490                 // the scheduler loop in it's bound OS thread has to
491                 // return, so let's pass our capability directly to
492                 // that thread.
493                 passCapability(&m->bound_thread_cond);
494                 continue;
495             }
496           }
497         }
498     }
499     
500 #else /* not threaded */
501
502 # if defined(PAR)
503     /* in GUM do this only on the Main PE */
504     if (IAmMainThread)
505 # endif
506     /* If our main thread has finished or been killed, return.
507      */
508     {
509       StgMainThread *m = main_threads;
510       if (m->tso->what_next == ThreadComplete
511           || m->tso->what_next == ThreadKilled) {
512 #ifdef DEBUG
513         removeThreadLabel((StgWord)m->tso->id);
514 #endif
515         main_threads = main_threads->link;
516         if (m->tso->what_next == ThreadComplete) {
517             // We finished successfully, fill in the return value
518             // NOTE: return val is tso->sp[1] (see StgStartup.hc)
519             if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
520             m->stat = Success;
521             return;
522         } else {
523           if (m->ret) { *(m->ret) = NULL; };
524           if (was_interrupted) {
525             m->stat = Interrupted;
526           } else {
527             m->stat = Killed;
528           }
529           return;
530         }
531       }
532     }
533 #endif
534
535
536 #if defined(RTS_USER_SIGNALS)
537     // check for signals each time around the scheduler
538     if (signals_pending()) {
539       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
540       startSignalHandlers();
541       ACQUIRE_LOCK(&sched_mutex);
542     }
543 #endif
544
545     /* Check whether any waiting threads need to be woken up.  If the
546      * run queue is empty, and there are no other tasks running, we
547      * can wait indefinitely for something to happen.
548      */
549     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
550 #if defined(RTS_SUPPORTS_THREADS)
551                 || EMPTY_RUN_QUEUE()
552 #endif
553         )
554     {
555       awaitEvent( EMPTY_RUN_QUEUE() );
556     }
557     /* we can be interrupted while waiting for I/O... */
558     if (interrupted) continue;
559
560     /* 
561      * Detect deadlock: when we have no threads to run, there are no
562      * threads waiting on I/O or sleeping, and all the other tasks are
563      * waiting for work, we must have a deadlock of some description.
564      *
565      * We first try to find threads blocked on themselves (ie. black
566      * holes), and generate NonTermination exceptions where necessary.
567      *
568      * If no threads are black holed, we have a deadlock situation, so
569      * inform all the main threads.
570      */
571 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
572     if (   EMPTY_THREAD_QUEUES() )
573     {
574         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
575         // Garbage collection can release some new threads due to
576         // either (a) finalizers or (b) threads resurrected because
577         // they are about to be send BlockedOnDeadMVar.  Any threads
578         // thus released will be immediately runnable.
579         GarbageCollect(GetRoots,rtsTrue);
580
581         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
582
583         IF_DEBUG(scheduler, 
584                  sched_belch("still deadlocked, checking for black holes..."));
585         detectBlackHoles();
586
587         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
588
589 #if defined(RTS_USER_SIGNALS)
590         /* If we have user-installed signal handlers, then wait
591          * for signals to arrive rather then bombing out with a
592          * deadlock.
593          */
594         if ( anyUserHandlers() ) {
595             IF_DEBUG(scheduler, 
596                      sched_belch("still deadlocked, waiting for signals..."));
597
598             awaitUserSignals();
599
600             // we might be interrupted...
601             if (interrupted) { continue; }
602
603             if (signals_pending()) {
604                 RELEASE_LOCK(&sched_mutex);
605                 startSignalHandlers();
606                 ACQUIRE_LOCK(&sched_mutex);
607             }
608             ASSERT(!EMPTY_RUN_QUEUE());
609             goto not_deadlocked;
610         }
611 #endif
612
613         /* Probably a real deadlock.  Send the current main thread the
614          * Deadlock exception (or in the SMP build, send *all* main
615          * threads the deadlock exception, since none of them can make
616          * progress).
617          */
618         {
619             StgMainThread *m;
620             m = main_threads;
621             switch (m->tso->why_blocked) {
622             case BlockedOnBlackHole:
623                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
624                 break;
625             case BlockedOnException:
626             case BlockedOnMVar:
627                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
628                 break;
629             default:
630                 barf("deadlock: main thread blocked in a strange way");
631             }
632         }
633     }
634   not_deadlocked:
635
636 #elif defined(RTS_SUPPORTS_THREADS)
637     // ToDo: add deadlock detection in threaded RTS
638 #elif defined(PAR)
639     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
640 #endif
641
642 #if defined(RTS_SUPPORTS_THREADS)
643     if ( EMPTY_RUN_QUEUE() ) {
644       continue; // nothing to do
645     }
646 #endif
647
648 #if defined(GRAN)
649     if (RtsFlags.GranFlags.Light)
650       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
651
652     /* adjust time based on time-stamp */
653     if (event->time > CurrentTime[CurrentProc] &&
654         event->evttype != ContinueThread)
655       CurrentTime[CurrentProc] = event->time;
656     
657     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
658     if (!RtsFlags.GranFlags.Light)
659       handleIdlePEs();
660
661     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
662
663     /* main event dispatcher in GranSim */
664     switch (event->evttype) {
665       /* Should just be continuing execution */
666     case ContinueThread:
667       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
668       /* ToDo: check assertion
669       ASSERT(run_queue_hd != (StgTSO*)NULL &&
670              run_queue_hd != END_TSO_QUEUE);
671       */
672       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
673       if (!RtsFlags.GranFlags.DoAsyncFetch &&
674           procStatus[CurrentProc]==Fetching) {
675         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
676               CurrentTSO->id, CurrentTSO, CurrentProc);
677         goto next_thread;
678       } 
679       /* Ignore ContinueThreads for completed threads */
680       if (CurrentTSO->what_next == ThreadComplete) {
681         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
682               CurrentTSO->id, CurrentTSO, CurrentProc);
683         goto next_thread;
684       } 
685       /* Ignore ContinueThreads for threads that are being migrated */
686       if (PROCS(CurrentTSO)==Nowhere) { 
687         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
688               CurrentTSO->id, CurrentTSO, CurrentProc);
689         goto next_thread;
690       }
691       /* The thread should be at the beginning of the run queue */
692       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
693         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
694               CurrentTSO->id, CurrentTSO, CurrentProc);
695         break; // run the thread anyway
696       }
697       /*
698       new_event(proc, proc, CurrentTime[proc],
699                 FindWork,
700                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
701       goto next_thread; 
702       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
703       break; // now actually run the thread; DaH Qu'vam yImuHbej 
704
705     case FetchNode:
706       do_the_fetchnode(event);
707       goto next_thread;             /* handle next event in event queue  */
708       
709     case GlobalBlock:
710       do_the_globalblock(event);
711       goto next_thread;             /* handle next event in event queue  */
712       
713     case FetchReply:
714       do_the_fetchreply(event);
715       goto next_thread;             /* handle next event in event queue  */
716       
717     case UnblockThread:   /* Move from the blocked queue to the tail of */
718       do_the_unblock(event);
719       goto next_thread;             /* handle next event in event queue  */
720       
721     case ResumeThread:  /* Move from the blocked queue to the tail of */
722       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
723       event->tso->gran.blocktime += 
724         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
725       do_the_startthread(event);
726       goto next_thread;             /* handle next event in event queue  */
727       
728     case StartThread:
729       do_the_startthread(event);
730       goto next_thread;             /* handle next event in event queue  */
731       
732     case MoveThread:
733       do_the_movethread(event);
734       goto next_thread;             /* handle next event in event queue  */
735       
736     case MoveSpark:
737       do_the_movespark(event);
738       goto next_thread;             /* handle next event in event queue  */
739       
740     case FindWork:
741       do_the_findwork(event);
742       goto next_thread;             /* handle next event in event queue  */
743       
744     default:
745       barf("Illegal event type %u\n", event->evttype);
746     }  /* switch */
747     
748     /* This point was scheduler_loop in the old RTS */
749
750     IF_DEBUG(gran, belch("GRAN: after main switch"));
751
752     TimeOfLastEvent = CurrentTime[CurrentProc];
753     TimeOfNextEvent = get_time_of_next_event();
754     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
755     // CurrentTSO = ThreadQueueHd;
756
757     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
758                          TimeOfNextEvent));
759
760     if (RtsFlags.GranFlags.Light) 
761       GranSimLight_leave_system(event, &ActiveTSO); 
762
763     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
764
765     IF_DEBUG(gran, 
766              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
767
768     /* in a GranSim setup the TSO stays on the run queue */
769     t = CurrentTSO;
770     /* Take a thread from the run queue. */
771     POP_RUN_QUEUE(t); // take_off_run_queue(t);
772
773     IF_DEBUG(gran, 
774              fprintf(stderr, "GRAN: About to run current thread, which is\n");
775              G_TSO(t,5));
776
777     context_switch = 0; // turned on via GranYield, checking events and time slice
778
779     IF_DEBUG(gran, 
780              DumpGranEvent(GR_SCHEDULE, t));
781
782     procStatus[CurrentProc] = Busy;
783
784 #elif defined(PAR)
785     if (PendingFetches != END_BF_QUEUE) {
786         processFetches();
787     }
788
789     /* ToDo: phps merge with spark activation above */
790     /* check whether we have local work and send requests if we have none */
791     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
792       /* :-[  no local threads => look out for local sparks */
793       /* the spark pool for the current PE */
794       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
795       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
796           pool->hd < pool->tl) {
797         /* 
798          * ToDo: add GC code check that we really have enough heap afterwards!!
799          * Old comment:
800          * If we're here (no runnable threads) and we have pending
801          * sparks, we must have a space problem.  Get enough space
802          * to turn one of those pending sparks into a
803          * thread... 
804          */
805
806         spark = findSpark(rtsFalse);                /* get a spark */
807         if (spark != (rtsSpark) NULL) {
808           tso = activateSpark(spark);       /* turn the spark into a thread */
809           IF_PAR_DEBUG(schedule,
810                        belch("==== schedule: Created TSO %d (%p); %d threads active",
811                              tso->id, tso, advisory_thread_count));
812
813           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
814             belch("==^^ failed to activate spark");
815             goto next_thread;
816           }               /* otherwise fall through & pick-up new tso */
817         } else {
818           IF_PAR_DEBUG(verbose,
819                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
820                              spark_queue_len(pool)));
821           goto next_thread;
822         }
823       }
824
825       /* If we still have no work we need to send a FISH to get a spark
826          from another PE 
827       */
828       if (EMPTY_RUN_QUEUE()) {
829       /* =8-[  no local sparks => look for work on other PEs */
830         /*
831          * We really have absolutely no work.  Send out a fish
832          * (there may be some out there already), and wait for
833          * something to arrive.  We clearly can't run any threads
834          * until a SCHEDULE or RESUME arrives, and so that's what
835          * we're hoping to see.  (Of course, we still have to
836          * respond to other types of messages.)
837          */
838         TIME now = msTime() /*CURRENT_TIME*/;
839         IF_PAR_DEBUG(verbose, 
840                      belch("--  now=%ld", now));
841         IF_PAR_DEBUG(verbose,
842                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
843                          (last_fish_arrived_at!=0 &&
844                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
845                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
846                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
847                              last_fish_arrived_at,
848                              RtsFlags.ParFlags.fishDelay, now);
849                      });
850         
851         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
852             (last_fish_arrived_at==0 ||
853              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
854           /* outstandingFishes is set in sendFish, processFish;
855              avoid flooding system with fishes via delay */
856           pe = choosePE();
857           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
858                    NEW_FISH_HUNGER);
859
860           // Global statistics: count no. of fishes
861           if (RtsFlags.ParFlags.ParStats.Global &&
862               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
863             globalParStats.tot_fish_mess++;
864           }
865         }
866       
867         receivedFinish = processMessages();
868         goto next_thread;
869       }
870     } else if (PacketsWaiting()) {  /* Look for incoming messages */
871       receivedFinish = processMessages();
872     }
873
874     /* Now we are sure that we have some work available */
875     ASSERT(run_queue_hd != END_TSO_QUEUE);
876
877     /* Take a thread from the run queue, if we have work */
878     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
879     IF_DEBUG(sanity,checkTSO(t));
880
881     /* ToDo: write something to the log-file
882     if (RTSflags.ParFlags.granSimStats && !sameThread)
883         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
884
885     CurrentTSO = t;
886     */
887     /* the spark pool for the current PE */
888     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
889
890     IF_DEBUG(scheduler, 
891              belch("--=^ %d threads, %d sparks on [%#x]", 
892                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
893
894 # if 1
895     if (0 && RtsFlags.ParFlags.ParStats.Full && 
896         t && LastTSO && t->id != LastTSO->id && 
897         LastTSO->why_blocked == NotBlocked && 
898         LastTSO->what_next != ThreadComplete) {
899       // if previously scheduled TSO not blocked we have to record the context switch
900       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
901                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
902     }
903
904     if (RtsFlags.ParFlags.ParStats.Full && 
905         (emitSchedule /* forced emit */ ||
906         (t && LastTSO && t->id != LastTSO->id))) {
907       /* 
908          we are running a different TSO, so write a schedule event to log file
909          NB: If we use fair scheduling we also have to write  a deschedule 
910              event for LastTSO; with unfair scheduling we know that the
911              previous tso has blocked whenever we switch to another tso, so
912              we don't need it in GUM for now
913       */
914       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
915                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
916       emitSchedule = rtsFalse;
917     }
918      
919 # endif
920 #else /* !GRAN && !PAR */
921   
922     // grab a thread from the run queue
923     ASSERT(run_queue_hd != END_TSO_QUEUE);
924     POP_RUN_QUEUE(t);
925
926     // Sanity check the thread we're about to run.  This can be
927     // expensive if there is lots of thread switching going on...
928     IF_DEBUG(sanity,checkTSO(t));
929 #endif
930
931 #ifdef THREADED_RTS
932     {
933       StgMainThread *m;
934       for(m = main_threads; m; m = m->link)
935       {
936         if(m->tso == t)
937           break;
938       }
939       
940       if(m)
941       {
942         if(m == mainThread)
943         {
944           IF_DEBUG(scheduler,
945             sched_belch("### Running thread %d in bound thread", t->id));
946           // yes, the Haskell thread is bound to the current native thread
947         }
948         else
949         {
950           IF_DEBUG(scheduler,
951             sched_belch("### thread %d bound to another OS thread", t->id));
952           // no, bound to a different Haskell thread: pass to that thread
953           PUSH_ON_RUN_QUEUE(t);
954           passCapability(&m->bound_thread_cond);
955           continue;
956         }
957       }
958       else
959       {
960         if(mainThread != NULL)
961         // The thread we want to run is bound.
962         {
963           IF_DEBUG(scheduler,
964             sched_belch("### this OS thread cannot run thread %d", t->id));
965           // no, the current native thread is bound to a different
966           // Haskell thread, so pass it to any worker thread
967           PUSH_ON_RUN_QUEUE(t);
968           passCapabilityToWorker();
969           continue; 
970         }
971       }
972     }
973 #endif
974
975     cap->r.rCurrentTSO = t;
976     
977     /* context switches are now initiated by the timer signal, unless
978      * the user specified "context switch as often as possible", with
979      * +RTS -C0
980      */
981     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
982          && (run_queue_hd != END_TSO_QUEUE
983              || blocked_queue_hd != END_TSO_QUEUE
984              || sleeping_queue != END_TSO_QUEUE)))
985         context_switch = 1;
986     else
987         context_switch = 0;
988
989 run_thread:
990
991     RELEASE_LOCK(&sched_mutex);
992
993     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
994                               t->id, whatNext_strs[t->what_next]));
995
996 #ifdef PROFILING
997     startHeapProfTimer();
998 #endif
999
1000     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1001     /* Run the current thread 
1002      */
1003     prev_what_next = t->what_next;
1004     switch (prev_what_next) {
1005     case ThreadKilled:
1006     case ThreadComplete:
1007         /* Thread already finished, return to scheduler. */
1008         ret = ThreadFinished;
1009         break;
1010     case ThreadRunGHC:
1011         errno = t->saved_errno;
1012         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1013         t->saved_errno = errno;
1014         break;
1015     case ThreadInterpret:
1016         ret = interpretBCO(cap);
1017         break;
1018     default:
1019       barf("schedule: invalid what_next field");
1020     }
1021     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1022     
1023     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1024 #ifdef PROFILING
1025     stopHeapProfTimer();
1026     CCCS = CCS_SYSTEM;
1027 #endif
1028     
1029     ACQUIRE_LOCK(&sched_mutex);
1030     
1031 #ifdef RTS_SUPPORTS_THREADS
1032     IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
1033 #elif !defined(GRAN) && !defined(PAR)
1034     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
1035 #endif
1036     t = cap->r.rCurrentTSO;
1037     
1038 #if defined(PAR)
1039     /* HACK 675: if the last thread didn't yield, make sure to print a 
1040        SCHEDULE event to the log file when StgRunning the next thread, even
1041        if it is the same one as before */
1042     LastTSO = t; 
1043     TimeOfLastYield = CURRENT_TIME;
1044 #endif
1045
1046     switch (ret) {
1047     case HeapOverflow:
1048 #if defined(GRAN)
1049       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1050       globalGranStats.tot_heapover++;
1051 #elif defined(PAR)
1052       globalParStats.tot_heapover++;
1053 #endif
1054
1055       // did the task ask for a large block?
1056       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1057           // if so, get one and push it on the front of the nursery.
1058           bdescr *bd;
1059           nat blocks;
1060           
1061           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1062
1063           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
1064                                    t->id, whatNext_strs[t->what_next], blocks));
1065
1066           // don't do this if it would push us over the
1067           // alloc_blocks_lim limit; we'll GC first.
1068           if (alloc_blocks + blocks < alloc_blocks_lim) {
1069
1070               alloc_blocks += blocks;
1071               bd = allocGroup( blocks );
1072
1073               // link the new group into the list
1074               bd->link = cap->r.rCurrentNursery;
1075               bd->u.back = cap->r.rCurrentNursery->u.back;
1076               if (cap->r.rCurrentNursery->u.back != NULL) {
1077                   cap->r.rCurrentNursery->u.back->link = bd;
1078               } else {
1079                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1080                          g0s0->blocks == cap->r.rNursery);
1081                   cap->r.rNursery = g0s0->blocks = bd;
1082               }           
1083               cap->r.rCurrentNursery->u.back = bd;
1084
1085               // initialise it as a nursery block.  We initialise the
1086               // step, gen_no, and flags field of *every* sub-block in
1087               // this large block, because this is easier than making
1088               // sure that we always find the block head of a large
1089               // block whenever we call Bdescr() (eg. evacuate() and
1090               // isAlive() in the GC would both have to do this, at
1091               // least).
1092               { 
1093                   bdescr *x;
1094                   for (x = bd; x < bd + blocks; x++) {
1095                       x->step = g0s0;
1096                       x->gen_no = 0;
1097                       x->flags = 0;
1098                   }
1099               }
1100
1101               // don't forget to update the block count in g0s0.
1102               g0s0->n_blocks += blocks;
1103               // This assert can be a killer if the app is doing lots
1104               // of large block allocations.
1105               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1106
1107               // now update the nursery to point to the new block
1108               cap->r.rCurrentNursery = bd;
1109
1110               // we might be unlucky and have another thread get on the
1111               // run queue before us and steal the large block, but in that
1112               // case the thread will just end up requesting another large
1113               // block.
1114               PUSH_ON_RUN_QUEUE(t);
1115               break;
1116           }
1117       }
1118
1119       /* make all the running tasks block on a condition variable,
1120        * maybe set context_switch and wait till they all pile in,
1121        * then have them wait on a GC condition variable.
1122        */
1123       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1124                                t->id, whatNext_strs[t->what_next]));
1125       threadPaused(t);
1126 #if defined(GRAN)
1127       ASSERT(!is_on_queue(t,CurrentProc));
1128 #elif defined(PAR)
1129       /* Currently we emit a DESCHEDULE event before GC in GUM.
1130          ToDo: either add separate event to distinguish SYSTEM time from rest
1131                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1132       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1133         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1134                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1135         emitSchedule = rtsTrue;
1136       }
1137 #endif
1138       
1139       ready_to_gc = rtsTrue;
1140       context_switch = 1;               /* stop other threads ASAP */
1141       PUSH_ON_RUN_QUEUE(t);
1142       /* actual GC is done at the end of the while loop */
1143       break;
1144       
1145     case StackOverflow:
1146 #if defined(GRAN)
1147       IF_DEBUG(gran, 
1148                DumpGranEvent(GR_DESCHEDULE, t));
1149       globalGranStats.tot_stackover++;
1150 #elif defined(PAR)
1151       // IF_DEBUG(par, 
1152       // DumpGranEvent(GR_DESCHEDULE, t);
1153       globalParStats.tot_stackover++;
1154 #endif
1155       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1156                                t->id, whatNext_strs[t->what_next]));
1157       /* just adjust the stack for this thread, then pop it back
1158        * on the run queue.
1159        */
1160       threadPaused(t);
1161       { 
1162         StgMainThread *m;
1163         /* enlarge the stack */
1164         StgTSO *new_t = threadStackOverflow(t);
1165         
1166         /* This TSO has moved, so update any pointers to it from the
1167          * main thread stack.  It better not be on any other queues...
1168          * (it shouldn't be).
1169          */
1170         for (m = main_threads; m != NULL; m = m->link) {
1171           if (m->tso == t) {
1172             m->tso = new_t;
1173           }
1174         }
1175         threadPaused(new_t);
1176         PUSH_ON_RUN_QUEUE(new_t);
1177       }
1178       break;
1179
1180     case ThreadYielding:
1181 #if defined(GRAN)
1182       IF_DEBUG(gran, 
1183                DumpGranEvent(GR_DESCHEDULE, t));
1184       globalGranStats.tot_yields++;
1185 #elif defined(PAR)
1186       // IF_DEBUG(par, 
1187       // DumpGranEvent(GR_DESCHEDULE, t);
1188       globalParStats.tot_yields++;
1189 #endif
1190       /* put the thread back on the run queue.  Then, if we're ready to
1191        * GC, check whether this is the last task to stop.  If so, wake
1192        * up the GC thread.  getThread will block during a GC until the
1193        * GC is finished.
1194        */
1195       IF_DEBUG(scheduler,
1196                if (t->what_next != prev_what_next) {
1197                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1198                          t->id, whatNext_strs[t->what_next]);
1199                } else {
1200                    belch("--<< thread %ld (%s) stopped, yielding", 
1201                          t->id, whatNext_strs[t->what_next]);
1202                }
1203                );
1204
1205       IF_DEBUG(sanity,
1206                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1207                checkTSO(t));
1208       ASSERT(t->link == END_TSO_QUEUE);
1209
1210       // Shortcut if we're just switching evaluators: don't bother
1211       // doing stack squeezing (which can be expensive), just run the
1212       // thread.
1213       if (t->what_next != prev_what_next) {
1214           goto run_thread;
1215       }
1216
1217       threadPaused(t);
1218
1219 #if defined(GRAN)
1220       ASSERT(!is_on_queue(t,CurrentProc));
1221
1222       IF_DEBUG(sanity,
1223                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1224                checkThreadQsSanity(rtsTrue));
1225 #endif
1226
1227 #if defined(PAR)
1228       if (RtsFlags.ParFlags.doFairScheduling) { 
1229         /* this does round-robin scheduling; good for concurrency */
1230         APPEND_TO_RUN_QUEUE(t);
1231       } else {
1232         /* this does unfair scheduling; good for parallelism */
1233         PUSH_ON_RUN_QUEUE(t);
1234       }
1235 #else
1236       // this does round-robin scheduling; good for concurrency
1237       APPEND_TO_RUN_QUEUE(t);
1238 #endif
1239
1240 #if defined(GRAN)
1241       /* add a ContinueThread event to actually process the thread */
1242       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1243                 ContinueThread,
1244                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1245       IF_GRAN_DEBUG(bq, 
1246                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1247                G_EVENTQ(0);
1248                G_CURR_THREADQ(0));
1249 #endif /* GRAN */
1250       break;
1251
1252     case ThreadBlocked:
1253 #if defined(GRAN)
1254       IF_DEBUG(scheduler,
1255                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1256                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1257                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1258
1259       // ??? needed; should emit block before
1260       IF_DEBUG(gran, 
1261                DumpGranEvent(GR_DESCHEDULE, t)); 
1262       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1263       /*
1264         ngoq Dogh!
1265       ASSERT(procStatus[CurrentProc]==Busy || 
1266               ((procStatus[CurrentProc]==Fetching) && 
1267               (t->block_info.closure!=(StgClosure*)NULL)));
1268       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1269           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1270             procStatus[CurrentProc]==Fetching)) 
1271         procStatus[CurrentProc] = Idle;
1272       */
1273 #elif defined(PAR)
1274       IF_DEBUG(scheduler,
1275                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1276                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1277       IF_PAR_DEBUG(bq,
1278
1279                    if (t->block_info.closure!=(StgClosure*)NULL) 
1280                      print_bq(t->block_info.closure));
1281
1282       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1283       blockThread(t);
1284
1285       /* whatever we schedule next, we must log that schedule */
1286       emitSchedule = rtsTrue;
1287
1288 #else /* !GRAN */
1289       /* don't need to do anything.  Either the thread is blocked on
1290        * I/O, in which case we'll have called addToBlockedQueue
1291        * previously, or it's blocked on an MVar or Blackhole, in which
1292        * case it'll be on the relevant queue already.
1293        */
1294       IF_DEBUG(scheduler,
1295                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1296                        t->id, whatNext_strs[t->what_next]);
1297                printThreadBlockage(t);
1298                fprintf(stderr, "\n"));
1299       fflush(stderr);
1300
1301       /* Only for dumping event to log file 
1302          ToDo: do I need this in GranSim, too?
1303       blockThread(t);
1304       */
1305 #endif
1306       threadPaused(t);
1307       break;
1308
1309     case ThreadFinished:
1310       /* Need to check whether this was a main thread, and if so, signal
1311        * the task that started it with the return value.  If we have no
1312        * more main threads, we probably need to stop all the tasks until
1313        * we get a new one.
1314        */
1315       /* We also end up here if the thread kills itself with an
1316        * uncaught exception, see Exception.hc.
1317        */
1318       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1319                                t->id, whatNext_strs[t->what_next]));
1320 #if defined(GRAN)
1321       endThread(t, CurrentProc); // clean-up the thread
1322 #elif defined(PAR)
1323       /* For now all are advisory -- HWL */
1324       //if(t->priority==AdvisoryPriority) ??
1325       advisory_thread_count--;
1326       
1327 # ifdef DIST
1328       if(t->dist.priority==RevalPriority)
1329         FinishReval(t);
1330 # endif
1331       
1332       if (RtsFlags.ParFlags.ParStats.Full &&
1333           !RtsFlags.ParFlags.ParStats.Suppressed) 
1334         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1335 #endif
1336       break;
1337       
1338     default:
1339       barf("schedule: invalid thread return code %d", (int)ret);
1340     }
1341
1342 #ifdef PROFILING
1343     // When we have +RTS -i0 and we're heap profiling, do a census at
1344     // every GC.  This lets us get repeatable runs for debugging.
1345     if (performHeapProfile ||
1346         (RtsFlags.ProfFlags.profileInterval==0 &&
1347          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1348         GarbageCollect(GetRoots, rtsTrue);
1349         heapCensus();
1350         performHeapProfile = rtsFalse;
1351         ready_to_gc = rtsFalse; // we already GC'd
1352     }
1353 #endif
1354
1355     if (ready_to_gc) {
1356       /* everybody back, start the GC.
1357        * Could do it in this thread, or signal a condition var
1358        * to do it in another thread.  Either way, we need to
1359        * broadcast on gc_pending_cond afterward.
1360        */
1361 #if defined(RTS_SUPPORTS_THREADS)
1362       IF_DEBUG(scheduler,sched_belch("doing GC"));
1363 #endif
1364       GarbageCollect(GetRoots,rtsFalse);
1365       ready_to_gc = rtsFalse;
1366 #if defined(GRAN)
1367       /* add a ContinueThread event to continue execution of current thread */
1368       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1369                 ContinueThread,
1370                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1371       IF_GRAN_DEBUG(bq, 
1372                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1373                G_EVENTQ(0);
1374                G_CURR_THREADQ(0));
1375 #endif /* GRAN */
1376     }
1377
1378 #if defined(GRAN)
1379   next_thread:
1380     IF_GRAN_DEBUG(unused,
1381                   print_eventq(EventHd));
1382
1383     event = get_next_event();
1384 #elif defined(PAR)
1385   next_thread:
1386     /* ToDo: wait for next message to arrive rather than busy wait */
1387 #endif /* GRAN */
1388
1389   } /* end of while(1) */
1390
1391   IF_PAR_DEBUG(verbose,
1392                belch("== Leaving schedule() after having received Finish"));
1393 }
1394
1395 /* ---------------------------------------------------------------------------
1396  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1397  * used by Control.Concurrent for error checking.
1398  * ------------------------------------------------------------------------- */
1399  
1400 StgBool
1401 rtsSupportsBoundThreads(void)
1402 {
1403 #ifdef THREADED_RTS
1404   return rtsTrue;
1405 #else
1406   return rtsFalse;
1407 #endif
1408 }
1409
1410 /* ---------------------------------------------------------------------------
1411  * isThreadBound(tso): check whether tso is bound to an OS thread.
1412  * ------------------------------------------------------------------------- */
1413  
1414 StgBool
1415 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1416 {
1417 #ifdef THREADED_RTS
1418   StgMainThread *m;
1419   for(m = main_threads; m; m = m->link)
1420   {
1421     if(m->tso == tso)
1422       return rtsTrue;
1423   }
1424 #endif
1425   return rtsFalse;
1426 }
1427
1428 /* ---------------------------------------------------------------------------
1429  * Singleton fork(). Do not copy any running threads.
1430  * ------------------------------------------------------------------------- */
1431
1432 static void 
1433 deleteThreadImmediately(StgTSO *tso);
1434
1435 StgInt
1436 forkProcess(HsStablePtr *entry)
1437 {
1438 #ifndef mingw32_TARGET_OS
1439   pid_t pid;
1440   StgTSO* t,*next;
1441   StgMainThread *m;
1442   SchedulerStatus rc;
1443
1444   IF_DEBUG(scheduler,sched_belch("forking!"));
1445   rts_lock(); // This not only acquires sched_mutex, it also
1446               // makes sure that no other threads are running
1447
1448   pid = fork();
1449
1450   if (pid) { /* parent */
1451
1452   /* just return the pid */
1453     rts_unlock();
1454     return pid;
1455     
1456   } else { /* child */
1457     
1458     
1459       // delete all threads
1460     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1461     
1462     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1463       next = t->link;
1464
1465         // don't allow threads to catch the ThreadKilled exception
1466       deleteThreadImmediately(t);
1467     }
1468     
1469       // wipe the main thread list
1470     while((m = main_threads) != NULL) {
1471       main_threads = m->link;
1472 #ifdef THREADED_RTS
1473       closeCondition(&m->bound_thread_cond);
1474 #endif
1475       stgFree(m);
1476     }
1477     
1478 #ifdef RTS_SUPPORTS_THREADS
1479     resetTaskManagerAfterFork();      // tell startTask() and friends that
1480     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1481     resetWorkerWakeupPipeAfterFork();
1482 #endif
1483     
1484     rc = rts_evalStableIO(entry, NULL);  // run the action
1485     rts_checkSchedStatus("forkProcess",rc);
1486     
1487     rts_unlock();
1488     
1489     hs_exit();                      // clean up and exit
1490     stg_exit(0);
1491   }
1492 #else /* mingw32 */
1493   barf("forkProcess#: primop not implemented for mingw32, sorry!\n");
1494   return -1;
1495 #endif /* mingw32 */
1496 }
1497
1498 /* ---------------------------------------------------------------------------
1499  * deleteAllThreads():  kill all the live threads.
1500  *
1501  * This is used when we catch a user interrupt (^C), before performing
1502  * any necessary cleanups and running finalizers.
1503  *
1504  * Locks: sched_mutex held.
1505  * ------------------------------------------------------------------------- */
1506    
1507 void
1508 deleteAllThreads ( void )
1509 {
1510   StgTSO* t, *next;
1511   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1512   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1513       next = t->global_link;
1514       deleteThread(t);
1515   }      
1516   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1517   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1518   sleeping_queue = END_TSO_QUEUE;
1519 }
1520
1521 /* startThread and  insertThread are now in GranSim.c -- HWL */
1522
1523
1524 /* ---------------------------------------------------------------------------
1525  * Suspending & resuming Haskell threads.
1526  * 
1527  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1528  * its capability before calling the C function.  This allows another
1529  * task to pick up the capability and carry on running Haskell
1530  * threads.  It also means that if the C call blocks, it won't lock
1531  * the whole system.
1532  *
1533  * The Haskell thread making the C call is put to sleep for the
1534  * duration of the call, on the susepended_ccalling_threads queue.  We
1535  * give out a token to the task, which it can use to resume the thread
1536  * on return from the C function.
1537  * ------------------------------------------------------------------------- */
1538    
1539 StgInt
1540 suspendThread( StgRegTable *reg, 
1541                rtsBool concCall
1542 #if !defined(DEBUG)
1543                STG_UNUSED
1544 #endif
1545                )
1546 {
1547   nat tok;
1548   Capability *cap;
1549   int saved_errno = errno;
1550
1551   /* assume that *reg is a pointer to the StgRegTable part
1552    * of a Capability.
1553    */
1554   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1555
1556   ACQUIRE_LOCK(&sched_mutex);
1557
1558   IF_DEBUG(scheduler,
1559            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1560
1561   // XXX this might not be necessary --SDM
1562   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1563
1564   threadPaused(cap->r.rCurrentTSO);
1565   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1566   suspended_ccalling_threads = cap->r.rCurrentTSO;
1567
1568 #if defined(RTS_SUPPORTS_THREADS)
1569   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1570   {
1571       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1572       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1573   }
1574   else
1575   {
1576       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1577   }
1578 #endif
1579
1580   /* Use the thread ID as the token; it should be unique */
1581   tok = cap->r.rCurrentTSO->id;
1582
1583   /* Hand back capability */
1584   releaseCapability(cap);
1585   
1586 #if defined(RTS_SUPPORTS_THREADS)
1587   /* Preparing to leave the RTS, so ensure there's a native thread/task
1588      waiting to take over.
1589   */
1590   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1591 #endif
1592
1593   /* Other threads _might_ be available for execution; signal this */
1594   THREAD_RUNNABLE();
1595   RELEASE_LOCK(&sched_mutex);
1596   
1597   errno = saved_errno;
1598   return tok; 
1599 }
1600
1601 StgRegTable *
1602 resumeThread( StgInt tok,
1603               rtsBool concCall STG_UNUSED )
1604 {
1605   StgTSO *tso, **prev;
1606   Capability *cap;
1607   int saved_errno = errno;
1608
1609 #if defined(RTS_SUPPORTS_THREADS)
1610   /* Wait for permission to re-enter the RTS with the result. */
1611   ACQUIRE_LOCK(&sched_mutex);
1612   waitForReturnCapability(&sched_mutex, &cap);
1613
1614   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1615 #else
1616   grabCapability(&cap);
1617 #endif
1618
1619   /* Remove the thread off of the suspended list */
1620   prev = &suspended_ccalling_threads;
1621   for (tso = suspended_ccalling_threads; 
1622        tso != END_TSO_QUEUE; 
1623        prev = &tso->link, tso = tso->link) {
1624     if (tso->id == (StgThreadID)tok) {
1625       *prev = tso->link;
1626       break;
1627     }
1628   }
1629   if (tso == END_TSO_QUEUE) {
1630     barf("resumeThread: thread not found");
1631   }
1632   tso->link = END_TSO_QUEUE;
1633   
1634 #if defined(RTS_SUPPORTS_THREADS)
1635   if(tso->why_blocked == BlockedOnCCall)
1636   {
1637       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1638       tso->blocked_exceptions = NULL;
1639   }
1640 #endif
1641   
1642   /* Reset blocking status */
1643   tso->why_blocked  = NotBlocked;
1644
1645   cap->r.rCurrentTSO = tso;
1646   RELEASE_LOCK(&sched_mutex);
1647   errno = saved_errno;
1648   return &cap->r;
1649 }
1650
1651
1652 /* ---------------------------------------------------------------------------
1653  * Static functions
1654  * ------------------------------------------------------------------------ */
1655 static void unblockThread(StgTSO *tso);
1656
1657 /* ---------------------------------------------------------------------------
1658  * Comparing Thread ids.
1659  *
1660  * This is used from STG land in the implementation of the
1661  * instances of Eq/Ord for ThreadIds.
1662  * ------------------------------------------------------------------------ */
1663
1664 int
1665 cmp_thread(StgPtr tso1, StgPtr tso2) 
1666
1667   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1668   StgThreadID id2 = ((StgTSO *)tso2)->id;
1669  
1670   if (id1 < id2) return (-1);
1671   if (id1 > id2) return 1;
1672   return 0;
1673 }
1674
1675 /* ---------------------------------------------------------------------------
1676  * Fetching the ThreadID from an StgTSO.
1677  *
1678  * This is used in the implementation of Show for ThreadIds.
1679  * ------------------------------------------------------------------------ */
1680 int
1681 rts_getThreadId(StgPtr tso) 
1682 {
1683   return ((StgTSO *)tso)->id;
1684 }
1685
1686 #ifdef DEBUG
1687 void
1688 labelThread(StgPtr tso, char *label)
1689 {
1690   int len;
1691   void *buf;
1692
1693   /* Caveat: Once set, you can only set the thread name to "" */
1694   len = strlen(label)+1;
1695   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1696   strncpy(buf,label,len);
1697   /* Update will free the old memory for us */
1698   updateThreadLabel(((StgTSO *)tso)->id,buf);
1699 }
1700 #endif /* DEBUG */
1701
1702 /* ---------------------------------------------------------------------------
1703    Create a new thread.
1704
1705    The new thread starts with the given stack size.  Before the
1706    scheduler can run, however, this thread needs to have a closure
1707    (and possibly some arguments) pushed on its stack.  See
1708    pushClosure() in Schedule.h.
1709
1710    createGenThread() and createIOThread() (in SchedAPI.h) are
1711    convenient packaged versions of this function.
1712
1713    currently pri (priority) is only used in a GRAN setup -- HWL
1714    ------------------------------------------------------------------------ */
1715 #if defined(GRAN)
1716 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1717 StgTSO *
1718 createThread(nat size, StgInt pri)
1719 #else
1720 StgTSO *
1721 createThread(nat size)
1722 #endif
1723 {
1724
1725     StgTSO *tso;
1726     nat stack_size;
1727
1728     /* First check whether we should create a thread at all */
1729 #if defined(PAR)
1730   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1731   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1732     threadsIgnored++;
1733     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1734           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1735     return END_TSO_QUEUE;
1736   }
1737   threadsCreated++;
1738 #endif
1739
1740 #if defined(GRAN)
1741   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1742 #endif
1743
1744   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1745
1746   /* catch ridiculously small stack sizes */
1747   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1748     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1749   }
1750
1751   stack_size = size - TSO_STRUCT_SIZEW;
1752
1753   tso = (StgTSO *)allocate(size);
1754   TICK_ALLOC_TSO(stack_size, 0);
1755
1756   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1757 #if defined(GRAN)
1758   SET_GRAN_HDR(tso, ThisPE);
1759 #endif
1760
1761   // Always start with the compiled code evaluator
1762   tso->what_next = ThreadRunGHC;
1763
1764   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1765    * protect the increment operation on next_thread_id.
1766    * In future, we could use an atomic increment instead.
1767    */
1768   ACQUIRE_LOCK(&thread_id_mutex);
1769   tso->id = next_thread_id++; 
1770   RELEASE_LOCK(&thread_id_mutex);
1771
1772   tso->why_blocked  = NotBlocked;
1773   tso->blocked_exceptions = NULL;
1774
1775   tso->saved_errno = 0;
1776   
1777   tso->stack_size   = stack_size;
1778   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1779                               - TSO_STRUCT_SIZEW;
1780   tso->sp           = (P_)&(tso->stack) + stack_size;
1781
1782 #ifdef PROFILING
1783   tso->prof.CCCS = CCS_MAIN;
1784 #endif
1785
1786   /* put a stop frame on the stack */
1787   tso->sp -= sizeofW(StgStopFrame);
1788   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1789   // ToDo: check this
1790 #if defined(GRAN)
1791   tso->link = END_TSO_QUEUE;
1792   /* uses more flexible routine in GranSim */
1793   insertThread(tso, CurrentProc);
1794 #else
1795   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1796    * from its creation
1797    */
1798 #endif
1799
1800 #if defined(GRAN) 
1801   if (RtsFlags.GranFlags.GranSimStats.Full) 
1802     DumpGranEvent(GR_START,tso);
1803 #elif defined(PAR)
1804   if (RtsFlags.ParFlags.ParStats.Full) 
1805     DumpGranEvent(GR_STARTQ,tso);
1806   /* HACk to avoid SCHEDULE 
1807      LastTSO = tso; */
1808 #endif
1809
1810   /* Link the new thread on the global thread list.
1811    */
1812   tso->global_link = all_threads;
1813   all_threads = tso;
1814
1815 #if defined(DIST)
1816   tso->dist.priority = MandatoryPriority; //by default that is...
1817 #endif
1818
1819 #if defined(GRAN)
1820   tso->gran.pri = pri;
1821 # if defined(DEBUG)
1822   tso->gran.magic = TSO_MAGIC; // debugging only
1823 # endif
1824   tso->gran.sparkname   = 0;
1825   tso->gran.startedat   = CURRENT_TIME; 
1826   tso->gran.exported    = 0;
1827   tso->gran.basicblocks = 0;
1828   tso->gran.allocs      = 0;
1829   tso->gran.exectime    = 0;
1830   tso->gran.fetchtime   = 0;
1831   tso->gran.fetchcount  = 0;
1832   tso->gran.blocktime   = 0;
1833   tso->gran.blockcount  = 0;
1834   tso->gran.blockedat   = 0;
1835   tso->gran.globalsparks = 0;
1836   tso->gran.localsparks  = 0;
1837   if (RtsFlags.GranFlags.Light)
1838     tso->gran.clock  = Now; /* local clock */
1839   else
1840     tso->gran.clock  = 0;
1841
1842   IF_DEBUG(gran,printTSO(tso));
1843 #elif defined(PAR)
1844 # if defined(DEBUG)
1845   tso->par.magic = TSO_MAGIC; // debugging only
1846 # endif
1847   tso->par.sparkname   = 0;
1848   tso->par.startedat   = CURRENT_TIME; 
1849   tso->par.exported    = 0;
1850   tso->par.basicblocks = 0;
1851   tso->par.allocs      = 0;
1852   tso->par.exectime    = 0;
1853   tso->par.fetchtime   = 0;
1854   tso->par.fetchcount  = 0;
1855   tso->par.blocktime   = 0;
1856   tso->par.blockcount  = 0;
1857   tso->par.blockedat   = 0;
1858   tso->par.globalsparks = 0;
1859   tso->par.localsparks  = 0;
1860 #endif
1861
1862 #if defined(GRAN)
1863   globalGranStats.tot_threads_created++;
1864   globalGranStats.threads_created_on_PE[CurrentProc]++;
1865   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1866   globalGranStats.tot_sq_probes++;
1867 #elif defined(PAR)
1868   // collect parallel global statistics (currently done together with GC stats)
1869   if (RtsFlags.ParFlags.ParStats.Global &&
1870       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1871     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1872     globalParStats.tot_threads_created++;
1873   }
1874 #endif 
1875
1876 #if defined(GRAN)
1877   IF_GRAN_DEBUG(pri,
1878                 belch("==__ schedule: Created TSO %d (%p);",
1879                       CurrentProc, tso, tso->id));
1880 #elif defined(PAR)
1881     IF_PAR_DEBUG(verbose,
1882                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1883                        tso->id, tso, advisory_thread_count));
1884 #else
1885   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1886                                  tso->id, tso->stack_size));
1887 #endif    
1888   return tso;
1889 }
1890
1891 #if defined(PAR)
1892 /* RFP:
1893    all parallel thread creation calls should fall through the following routine.
1894 */
1895 StgTSO *
1896 createSparkThread(rtsSpark spark) 
1897 { StgTSO *tso;
1898   ASSERT(spark != (rtsSpark)NULL);
1899   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1900   { threadsIgnored++;
1901     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1902           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1903     return END_TSO_QUEUE;
1904   }
1905   else
1906   { threadsCreated++;
1907     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1908     if (tso==END_TSO_QUEUE)     
1909       barf("createSparkThread: Cannot create TSO");
1910 #if defined(DIST)
1911     tso->priority = AdvisoryPriority;
1912 #endif
1913     pushClosure(tso,spark);
1914     PUSH_ON_RUN_QUEUE(tso);
1915     advisory_thread_count++;    
1916   }
1917   return tso;
1918 }
1919 #endif
1920
1921 /*
1922   Turn a spark into a thread.
1923   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1924 */
1925 #if defined(PAR)
1926 StgTSO *
1927 activateSpark (rtsSpark spark) 
1928 {
1929   StgTSO *tso;
1930
1931   tso = createSparkThread(spark);
1932   if (RtsFlags.ParFlags.ParStats.Full) {   
1933     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1934     IF_PAR_DEBUG(verbose,
1935                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1936                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1937   }
1938   // ToDo: fwd info on local/global spark to thread -- HWL
1939   // tso->gran.exported =  spark->exported;
1940   // tso->gran.locked =   !spark->global;
1941   // tso->gran.sparkname = spark->name;
1942
1943   return tso;
1944 }
1945 #endif
1946
1947 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1948                                    Capability *initialCapability
1949                                    );
1950
1951
1952 /* ---------------------------------------------------------------------------
1953  * scheduleThread()
1954  *
1955  * scheduleThread puts a thread on the head of the runnable queue.
1956  * This will usually be done immediately after a thread is created.
1957  * The caller of scheduleThread must create the thread using e.g.
1958  * createThread and push an appropriate closure
1959  * on this thread's stack before the scheduler is invoked.
1960  * ------------------------------------------------------------------------ */
1961
1962 static void scheduleThread_ (StgTSO* tso);
1963
1964 void
1965 scheduleThread_(StgTSO *tso)
1966 {
1967   // Precondition: sched_mutex must be held.
1968
1969   /* Put the new thread on the head of the runnable queue.  The caller
1970    * better push an appropriate closure on this thread's stack
1971    * beforehand.  In the SMP case, the thread may start running as
1972    * soon as we release the scheduler lock below.
1973    */
1974   PUSH_ON_RUN_QUEUE(tso);
1975   THREAD_RUNNABLE();
1976
1977 #if 0
1978   IF_DEBUG(scheduler,printTSO(tso));
1979 #endif
1980 }
1981
1982 void scheduleThread(StgTSO* tso)
1983 {
1984   ACQUIRE_LOCK(&sched_mutex);
1985   scheduleThread_(tso);
1986   RELEASE_LOCK(&sched_mutex);
1987 }
1988
1989 #if defined(RTS_SUPPORTS_THREADS)
1990 static Condition *bound_cond_cache = NULL;
1991 #endif
1992
1993 SchedulerStatus
1994 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1995                    Capability *initialCapability)
1996 {
1997     // Precondition: sched_mutex must be held
1998     StgMainThread *m;
1999
2000     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2001     m->tso = tso;
2002     m->ret = ret;
2003     m->stat = NoStatus;
2004 #if defined(RTS_SUPPORTS_THREADS)
2005     // Allocating a new condition for each thread is expensive, so we
2006     // cache one.  This is a pretty feeble hack, but it helps speed up
2007     // consecutive call-ins quite a bit.
2008     if (bound_cond_cache != NULL) {
2009         m->bound_thread_cond = *bound_cond_cache;
2010         bound_cond_cache = NULL;
2011     } else {
2012         initCondition(&m->bound_thread_cond);
2013     }
2014 #endif
2015
2016     /* Put the thread on the main-threads list prior to scheduling the TSO.
2017        Failure to do so introduces a race condition in the MT case (as
2018        identified by Wolfgang Thaller), whereby the new task/OS thread 
2019        created by scheduleThread_() would complete prior to the thread
2020        that spawned it managed to put 'itself' on the main-threads list.
2021        The upshot of it all being that the worker thread wouldn't get to
2022        signal the completion of the its work item for the main thread to
2023        see (==> it got stuck waiting.)    -- sof 6/02.
2024     */
2025     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2026     
2027     m->link = main_threads;
2028     main_threads = m;
2029     
2030     scheduleThread_(tso);
2031
2032     return waitThread_(m, initialCapability);
2033 }
2034
2035 /* ---------------------------------------------------------------------------
2036  * initScheduler()
2037  *
2038  * Initialise the scheduler.  This resets all the queues - if the
2039  * queues contained any threads, they'll be garbage collected at the
2040  * next pass.
2041  *
2042  * ------------------------------------------------------------------------ */
2043
2044 void 
2045 initScheduler(void)
2046 {
2047 #if defined(GRAN)
2048   nat i;
2049
2050   for (i=0; i<=MAX_PROC; i++) {
2051     run_queue_hds[i]      = END_TSO_QUEUE;
2052     run_queue_tls[i]      = END_TSO_QUEUE;
2053     blocked_queue_hds[i]  = END_TSO_QUEUE;
2054     blocked_queue_tls[i]  = END_TSO_QUEUE;
2055     ccalling_threadss[i]  = END_TSO_QUEUE;
2056     sleeping_queue        = END_TSO_QUEUE;
2057   }
2058 #else
2059   run_queue_hd      = END_TSO_QUEUE;
2060   run_queue_tl      = END_TSO_QUEUE;
2061   blocked_queue_hd  = END_TSO_QUEUE;
2062   blocked_queue_tl  = END_TSO_QUEUE;
2063   sleeping_queue    = END_TSO_QUEUE;
2064 #endif 
2065
2066   suspended_ccalling_threads  = END_TSO_QUEUE;
2067
2068   main_threads = NULL;
2069   all_threads  = END_TSO_QUEUE;
2070
2071   context_switch = 0;
2072   interrupted    = 0;
2073
2074   RtsFlags.ConcFlags.ctxtSwitchTicks =
2075       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2076       
2077 #if defined(RTS_SUPPORTS_THREADS)
2078   /* Initialise the mutex and condition variables used by
2079    * the scheduler. */
2080   initMutex(&sched_mutex);
2081   initMutex(&term_mutex);
2082   initMutex(&thread_id_mutex);
2083
2084   initCondition(&thread_ready_cond);
2085 #endif
2086   
2087 #if defined(RTS_SUPPORTS_THREADS)
2088   ACQUIRE_LOCK(&sched_mutex);
2089 #endif
2090
2091   /* A capability holds the state a native thread needs in
2092    * order to execute STG code. At least one capability is
2093    * floating around (only SMP builds have more than one).
2094    */
2095   initCapabilities();
2096   
2097 #if defined(RTS_SUPPORTS_THREADS)
2098     /* start our haskell execution tasks */
2099     startTaskManager(0,taskStart);
2100 #endif
2101
2102 #if /* defined(SMP) ||*/ defined(PAR)
2103   initSparkPools();
2104 #endif
2105
2106   RELEASE_LOCK(&sched_mutex);
2107
2108 }
2109
2110 void
2111 exitScheduler( void )
2112 {
2113 #if defined(RTS_SUPPORTS_THREADS)
2114   stopTaskManager();
2115 #endif
2116   shutting_down_scheduler = rtsTrue;
2117 }
2118
2119 /* ----------------------------------------------------------------------------
2120    Managing the per-task allocation areas.
2121    
2122    Each capability comes with an allocation area.  These are
2123    fixed-length block lists into which allocation can be done.
2124
2125    ToDo: no support for two-space collection at the moment???
2126    ------------------------------------------------------------------------- */
2127
2128 static
2129 SchedulerStatus
2130 waitThread_(StgMainThread* m, Capability *initialCapability)
2131 {
2132   SchedulerStatus stat;
2133
2134   // Precondition: sched_mutex must be held.
2135   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2136
2137 #if defined(GRAN)
2138   /* GranSim specific init */
2139   CurrentTSO = m->tso;                // the TSO to run
2140   procStatus[MainProc] = Busy;        // status of main PE
2141   CurrentProc = MainProc;             // PE to run it on
2142   schedule(m,initialCapability);
2143 #else
2144   schedule(m,initialCapability);
2145   ASSERT(m->stat != NoStatus);
2146 #endif
2147
2148   stat = m->stat;
2149
2150 #if defined(RTS_SUPPORTS_THREADS)
2151   // Free the condition variable, returning it to the cache if possible.
2152   if (bound_cond_cache == NULL) {
2153       *bound_cond_cache = m->bound_thread_cond;
2154   } else {
2155       closeCondition(&m->bound_thread_cond);
2156   }
2157 #endif
2158
2159   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2160   stgFree(m);
2161
2162   // Postcondition: sched_mutex still held
2163   return stat;
2164 }
2165
2166 /* ---------------------------------------------------------------------------
2167    Where are the roots that we know about?
2168
2169         - all the threads on the runnable queue
2170         - all the threads on the blocked queue
2171         - all the threads on the sleeping queue
2172         - all the thread currently executing a _ccall_GC
2173         - all the "main threads"
2174      
2175    ------------------------------------------------------------------------ */
2176
2177 /* This has to be protected either by the scheduler monitor, or by the
2178         garbage collection monitor (probably the latter).
2179         KH @ 25/10/99
2180 */
2181
2182 void
2183 GetRoots( evac_fn evac )
2184 {
2185 #if defined(GRAN)
2186   {
2187     nat i;
2188     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2189       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2190           evac((StgClosure **)&run_queue_hds[i]);
2191       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2192           evac((StgClosure **)&run_queue_tls[i]);
2193       
2194       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2195           evac((StgClosure **)&blocked_queue_hds[i]);
2196       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2197           evac((StgClosure **)&blocked_queue_tls[i]);
2198       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2199           evac((StgClosure **)&ccalling_threads[i]);
2200     }
2201   }
2202
2203   markEventQueue();
2204
2205 #else /* !GRAN */
2206   if (run_queue_hd != END_TSO_QUEUE) {
2207       ASSERT(run_queue_tl != END_TSO_QUEUE);
2208       evac((StgClosure **)&run_queue_hd);
2209       evac((StgClosure **)&run_queue_tl);
2210   }
2211   
2212   if (blocked_queue_hd != END_TSO_QUEUE) {
2213       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2214       evac((StgClosure **)&blocked_queue_hd);
2215       evac((StgClosure **)&blocked_queue_tl);
2216   }
2217   
2218   if (sleeping_queue != END_TSO_QUEUE) {
2219       evac((StgClosure **)&sleeping_queue);
2220   }
2221 #endif 
2222
2223   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2224       evac((StgClosure **)&suspended_ccalling_threads);
2225   }
2226
2227 #if defined(PAR) || defined(GRAN)
2228   markSparkQueue(evac);
2229 #endif
2230
2231 #if defined(RTS_USER_SIGNALS)
2232   // mark the signal handlers (signals should be already blocked)
2233   markSignalHandlers(evac);
2234 #endif
2235
2236   // main threads which have completed need to be retained until they
2237   // are dealt with in the main scheduler loop.  They won't be
2238   // retained any other way: the GC will drop them from the
2239   // all_threads list, so we have to be careful to treat them as roots
2240   // here.
2241   { 
2242       StgMainThread *m;
2243       for (m = main_threads; m != NULL; m = m->link) {
2244           switch (m->tso->what_next) {
2245           case ThreadComplete:
2246           case ThreadKilled:
2247               evac((StgClosure **)&m->tso);
2248               break;
2249           default:
2250               break;
2251           }
2252       }
2253   }
2254 }
2255
2256 /* -----------------------------------------------------------------------------
2257    performGC
2258
2259    This is the interface to the garbage collector from Haskell land.
2260    We provide this so that external C code can allocate and garbage
2261    collect when called from Haskell via _ccall_GC.
2262
2263    It might be useful to provide an interface whereby the programmer
2264    can specify more roots (ToDo).
2265    
2266    This needs to be protected by the GC condition variable above.  KH.
2267    -------------------------------------------------------------------------- */
2268
2269 static void (*extra_roots)(evac_fn);
2270
2271 void
2272 performGC(void)
2273 {
2274   /* Obligated to hold this lock upon entry */
2275   ACQUIRE_LOCK(&sched_mutex);
2276   GarbageCollect(GetRoots,rtsFalse);
2277   RELEASE_LOCK(&sched_mutex);
2278 }
2279
2280 void
2281 performMajorGC(void)
2282 {
2283   ACQUIRE_LOCK(&sched_mutex);
2284   GarbageCollect(GetRoots,rtsTrue);
2285   RELEASE_LOCK(&sched_mutex);
2286 }
2287
2288 static void
2289 AllRoots(evac_fn evac)
2290 {
2291     GetRoots(evac);             // the scheduler's roots
2292     extra_roots(evac);          // the user's roots
2293 }
2294
2295 void
2296 performGCWithRoots(void (*get_roots)(evac_fn))
2297 {
2298   ACQUIRE_LOCK(&sched_mutex);
2299   extra_roots = get_roots;
2300   GarbageCollect(AllRoots,rtsFalse);
2301   RELEASE_LOCK(&sched_mutex);
2302 }
2303
2304 /* -----------------------------------------------------------------------------
2305    Stack overflow
2306
2307    If the thread has reached its maximum stack size, then raise the
2308    StackOverflow exception in the offending thread.  Otherwise
2309    relocate the TSO into a larger chunk of memory and adjust its stack
2310    size appropriately.
2311    -------------------------------------------------------------------------- */
2312
2313 static StgTSO *
2314 threadStackOverflow(StgTSO *tso)
2315 {
2316   nat new_stack_size, new_tso_size, stack_words;
2317   StgPtr new_sp;
2318   StgTSO *dest;
2319
2320   IF_DEBUG(sanity,checkTSO(tso));
2321   if (tso->stack_size >= tso->max_stack_size) {
2322
2323     IF_DEBUG(gc,
2324              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2325                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2326              /* If we're debugging, just print out the top of the stack */
2327              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2328                                               tso->sp+64)));
2329
2330     /* Send this thread the StackOverflow exception */
2331     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2332     return tso;
2333   }
2334
2335   /* Try to double the current stack size.  If that takes us over the
2336    * maximum stack size for this thread, then use the maximum instead.
2337    * Finally round up so the TSO ends up as a whole number of blocks.
2338    */
2339   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2340   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2341                                        TSO_STRUCT_SIZE)/sizeof(W_);
2342   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2343   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2344
2345   IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2346
2347   dest = (StgTSO *)allocate(new_tso_size);
2348   TICK_ALLOC_TSO(new_stack_size,0);
2349
2350   /* copy the TSO block and the old stack into the new area */
2351   memcpy(dest,tso,TSO_STRUCT_SIZE);
2352   stack_words = tso->stack + tso->stack_size - tso->sp;
2353   new_sp = (P_)dest + new_tso_size - stack_words;
2354   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2355
2356   /* relocate the stack pointers... */
2357   dest->sp         = new_sp;
2358   dest->stack_size = new_stack_size;
2359         
2360   /* Mark the old TSO as relocated.  We have to check for relocated
2361    * TSOs in the garbage collector and any primops that deal with TSOs.
2362    *
2363    * It's important to set the sp value to just beyond the end
2364    * of the stack, so we don't attempt to scavenge any part of the
2365    * dead TSO's stack.
2366    */
2367   tso->what_next = ThreadRelocated;
2368   tso->link = dest;
2369   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2370   tso->why_blocked = NotBlocked;
2371   dest->mut_link = NULL;
2372
2373   IF_PAR_DEBUG(verbose,
2374                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2375                      tso->id, tso, tso->stack_size);
2376                /* If we're debugging, just print out the top of the stack */
2377                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2378                                                 tso->sp+64)));
2379   
2380   IF_DEBUG(sanity,checkTSO(tso));
2381 #if 0
2382   IF_DEBUG(scheduler,printTSO(dest));
2383 #endif
2384
2385   return dest;
2386 }
2387
2388 /* ---------------------------------------------------------------------------
2389    Wake up a queue that was blocked on some resource.
2390    ------------------------------------------------------------------------ */
2391
2392 #if defined(GRAN)
2393 STATIC_INLINE void
2394 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2395 {
2396 }
2397 #elif defined(PAR)
2398 STATIC_INLINE void
2399 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2400 {
2401   /* write RESUME events to log file and
2402      update blocked and fetch time (depending on type of the orig closure) */
2403   if (RtsFlags.ParFlags.ParStats.Full) {
2404     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2405                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2406                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2407     if (EMPTY_RUN_QUEUE())
2408       emitSchedule = rtsTrue;
2409
2410     switch (get_itbl(node)->type) {
2411         case FETCH_ME_BQ:
2412           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2413           break;
2414         case RBH:
2415         case FETCH_ME:
2416         case BLACKHOLE_BQ:
2417           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2418           break;
2419 #ifdef DIST
2420         case MVAR:
2421           break;
2422 #endif    
2423         default:
2424           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2425         }
2426       }
2427 }
2428 #endif
2429
2430 #if defined(GRAN)
2431 static StgBlockingQueueElement *
2432 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2433 {
2434     StgTSO *tso;
2435     PEs node_loc, tso_loc;
2436
2437     node_loc = where_is(node); // should be lifted out of loop
2438     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2439     tso_loc = where_is((StgClosure *)tso);
2440     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2441       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2442       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2443       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2444       // insertThread(tso, node_loc);
2445       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2446                 ResumeThread,
2447                 tso, node, (rtsSpark*)NULL);
2448       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2449       // len_local++;
2450       // len++;
2451     } else { // TSO is remote (actually should be FMBQ)
2452       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2453                                   RtsFlags.GranFlags.Costs.gunblocktime +
2454                                   RtsFlags.GranFlags.Costs.latency;
2455       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2456                 UnblockThread,
2457                 tso, node, (rtsSpark*)NULL);
2458       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2459       // len++;
2460     }
2461     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2462     IF_GRAN_DEBUG(bq,
2463                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2464                           (node_loc==tso_loc ? "Local" : "Global"), 
2465                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2466     tso->block_info.closure = NULL;
2467     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2468                              tso->id, tso));
2469 }
2470 #elif defined(PAR)
2471 static StgBlockingQueueElement *
2472 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2473 {
2474     StgBlockingQueueElement *next;
2475
2476     switch (get_itbl(bqe)->type) {
2477     case TSO:
2478       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2479       /* if it's a TSO just push it onto the run_queue */
2480       next = bqe->link;
2481       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2482       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2483       THREAD_RUNNABLE();
2484       unblockCount(bqe, node);
2485       /* reset blocking status after dumping event */
2486       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2487       break;
2488
2489     case BLOCKED_FETCH:
2490       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2491       next = bqe->link;
2492       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2493       PendingFetches = (StgBlockedFetch *)bqe;
2494       break;
2495
2496 # if defined(DEBUG)
2497       /* can ignore this case in a non-debugging setup; 
2498          see comments on RBHSave closures above */
2499     case CONSTR:
2500       /* check that the closure is an RBHSave closure */
2501       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2502              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2503              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2504       break;
2505
2506     default:
2507       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2508            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2509            (StgClosure *)bqe);
2510 # endif
2511     }
2512   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2513   return next;
2514 }
2515
2516 #else /* !GRAN && !PAR */
2517 static StgTSO *
2518 unblockOneLocked(StgTSO *tso)
2519 {
2520   StgTSO *next;
2521
2522   ASSERT(get_itbl(tso)->type == TSO);
2523   ASSERT(tso->why_blocked != NotBlocked);
2524   tso->why_blocked = NotBlocked;
2525   next = tso->link;
2526   PUSH_ON_RUN_QUEUE(tso);
2527   THREAD_RUNNABLE();
2528   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2529   return next;
2530 }
2531 #endif
2532
2533 #if defined(GRAN) || defined(PAR)
2534 INLINE_ME StgBlockingQueueElement *
2535 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2536 {
2537   ACQUIRE_LOCK(&sched_mutex);
2538   bqe = unblockOneLocked(bqe, node);
2539   RELEASE_LOCK(&sched_mutex);
2540   return bqe;
2541 }
2542 #else
2543 INLINE_ME StgTSO *
2544 unblockOne(StgTSO *tso)
2545 {
2546   ACQUIRE_LOCK(&sched_mutex);
2547   tso = unblockOneLocked(tso);
2548   RELEASE_LOCK(&sched_mutex);
2549   return tso;
2550 }
2551 #endif
2552
2553 #if defined(GRAN)
2554 void 
2555 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2556 {
2557   StgBlockingQueueElement *bqe;
2558   PEs node_loc;
2559   nat len = 0; 
2560
2561   IF_GRAN_DEBUG(bq, 
2562                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2563                       node, CurrentProc, CurrentTime[CurrentProc], 
2564                       CurrentTSO->id, CurrentTSO));
2565
2566   node_loc = where_is(node);
2567
2568   ASSERT(q == END_BQ_QUEUE ||
2569          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2570          get_itbl(q)->type == CONSTR); // closure (type constructor)
2571   ASSERT(is_unique(node));
2572
2573   /* FAKE FETCH: magically copy the node to the tso's proc;
2574      no Fetch necessary because in reality the node should not have been 
2575      moved to the other PE in the first place
2576   */
2577   if (CurrentProc!=node_loc) {
2578     IF_GRAN_DEBUG(bq, 
2579                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2580                         node, node_loc, CurrentProc, CurrentTSO->id, 
2581                         // CurrentTSO, where_is(CurrentTSO),
2582                         node->header.gran.procs));
2583     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2584     IF_GRAN_DEBUG(bq, 
2585                   belch("## new bitmask of node %p is %#x",
2586                         node, node->header.gran.procs));
2587     if (RtsFlags.GranFlags.GranSimStats.Global) {
2588       globalGranStats.tot_fake_fetches++;
2589     }
2590   }
2591
2592   bqe = q;
2593   // ToDo: check: ASSERT(CurrentProc==node_loc);
2594   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2595     //next = bqe->link;
2596     /* 
2597        bqe points to the current element in the queue
2598        next points to the next element in the queue
2599     */
2600     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2601     //tso_loc = where_is(tso);
2602     len++;
2603     bqe = unblockOneLocked(bqe, node);
2604   }
2605
2606   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2607      the closure to make room for the anchor of the BQ */
2608   if (bqe!=END_BQ_QUEUE) {
2609     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2610     /*
2611     ASSERT((info_ptr==&RBH_Save_0_info) ||
2612            (info_ptr==&RBH_Save_1_info) ||
2613            (info_ptr==&RBH_Save_2_info));
2614     */
2615     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2616     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2617     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2618
2619     IF_GRAN_DEBUG(bq,
2620                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2621                         node, info_type(node)));
2622   }
2623
2624   /* statistics gathering */
2625   if (RtsFlags.GranFlags.GranSimStats.Global) {
2626     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2627     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2628     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2629     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2630   }
2631   IF_GRAN_DEBUG(bq,
2632                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2633                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2634 }
2635 #elif defined(PAR)
2636 void 
2637 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2638 {
2639   StgBlockingQueueElement *bqe;
2640
2641   ACQUIRE_LOCK(&sched_mutex);
2642
2643   IF_PAR_DEBUG(verbose, 
2644                belch("##-_ AwBQ for node %p on [%x]: ",
2645                      node, mytid));
2646 #ifdef DIST  
2647   //RFP
2648   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2649     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2650     return;
2651   }
2652 #endif
2653   
2654   ASSERT(q == END_BQ_QUEUE ||
2655          get_itbl(q)->type == TSO ||           
2656          get_itbl(q)->type == BLOCKED_FETCH || 
2657          get_itbl(q)->type == CONSTR); 
2658
2659   bqe = q;
2660   while (get_itbl(bqe)->type==TSO || 
2661          get_itbl(bqe)->type==BLOCKED_FETCH) {
2662     bqe = unblockOneLocked(bqe, node);
2663   }
2664   RELEASE_LOCK(&sched_mutex);
2665 }
2666
2667 #else   /* !GRAN && !PAR */
2668
2669 #ifdef RTS_SUPPORTS_THREADS
2670 void
2671 awakenBlockedQueueNoLock(StgTSO *tso)
2672 {
2673   while (tso != END_TSO_QUEUE) {
2674     tso = unblockOneLocked(tso);
2675   }
2676 }
2677 #endif
2678
2679 void
2680 awakenBlockedQueue(StgTSO *tso)
2681 {
2682   ACQUIRE_LOCK(&sched_mutex);
2683   while (tso != END_TSO_QUEUE) {
2684     tso = unblockOneLocked(tso);
2685   }
2686   RELEASE_LOCK(&sched_mutex);
2687 }
2688 #endif
2689
2690 /* ---------------------------------------------------------------------------
2691    Interrupt execution
2692    - usually called inside a signal handler so it mustn't do anything fancy.   
2693    ------------------------------------------------------------------------ */
2694
2695 void
2696 interruptStgRts(void)
2697 {
2698     interrupted    = 1;
2699     context_switch = 1;
2700 #ifdef RTS_SUPPORTS_THREADS
2701     wakeBlockedWorkerThread();
2702 #endif
2703 }
2704
2705 /* -----------------------------------------------------------------------------
2706    Unblock a thread
2707
2708    This is for use when we raise an exception in another thread, which
2709    may be blocked.
2710    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2711    -------------------------------------------------------------------------- */
2712
2713 #if defined(GRAN) || defined(PAR)
2714 /*
2715   NB: only the type of the blocking queue is different in GranSim and GUM
2716       the operations on the queue-elements are the same
2717       long live polymorphism!
2718
2719   Locks: sched_mutex is held upon entry and exit.
2720
2721 */
2722 static void
2723 unblockThread(StgTSO *tso)
2724 {
2725   StgBlockingQueueElement *t, **last;
2726
2727   switch (tso->why_blocked) {
2728
2729   case NotBlocked:
2730     return;  /* not blocked */
2731
2732   case BlockedOnMVar:
2733     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2734     {
2735       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2736       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2737
2738       last = (StgBlockingQueueElement **)&mvar->head;
2739       for (t = (StgBlockingQueueElement *)mvar->head; 
2740            t != END_BQ_QUEUE; 
2741            last = &t->link, last_tso = t, t = t->link) {
2742         if (t == (StgBlockingQueueElement *)tso) {
2743           *last = (StgBlockingQueueElement *)tso->link;
2744           if (mvar->tail == tso) {
2745             mvar->tail = (StgTSO *)last_tso;
2746           }
2747           goto done;
2748         }
2749       }
2750       barf("unblockThread (MVAR): TSO not found");
2751     }
2752
2753   case BlockedOnBlackHole:
2754     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2755     {
2756       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2757
2758       last = &bq->blocking_queue;
2759       for (t = bq->blocking_queue; 
2760            t != END_BQ_QUEUE; 
2761            last = &t->link, t = t->link) {
2762         if (t == (StgBlockingQueueElement *)tso) {
2763           *last = (StgBlockingQueueElement *)tso->link;
2764           goto done;
2765         }
2766       }
2767       barf("unblockThread (BLACKHOLE): TSO not found");
2768     }
2769
2770   case BlockedOnException:
2771     {
2772       StgTSO *target  = tso->block_info.tso;
2773
2774       ASSERT(get_itbl(target)->type == TSO);
2775
2776       if (target->what_next == ThreadRelocated) {
2777           target = target->link;
2778           ASSERT(get_itbl(target)->type == TSO);
2779       }
2780
2781       ASSERT(target->blocked_exceptions != NULL);
2782
2783       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2784       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2785            t != END_BQ_QUEUE; 
2786            last = &t->link, t = t->link) {
2787         ASSERT(get_itbl(t)->type == TSO);
2788         if (t == (StgBlockingQueueElement *)tso) {
2789           *last = (StgBlockingQueueElement *)tso->link;
2790           goto done;
2791         }
2792       }
2793       barf("unblockThread (Exception): TSO not found");
2794     }
2795
2796   case BlockedOnRead:
2797   case BlockedOnWrite:
2798 #if defined(mingw32_TARGET_OS)
2799   case BlockedOnDoProc:
2800 #endif
2801     {
2802       /* take TSO off blocked_queue */
2803       StgBlockingQueueElement *prev = NULL;
2804       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2805            prev = t, t = t->link) {
2806         if (t == (StgBlockingQueueElement *)tso) {
2807           if (prev == NULL) {
2808             blocked_queue_hd = (StgTSO *)t->link;
2809             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2810               blocked_queue_tl = END_TSO_QUEUE;
2811             }
2812           } else {
2813             prev->link = t->link;
2814             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2815               blocked_queue_tl = (StgTSO *)prev;
2816             }
2817           }
2818           goto done;
2819         }
2820       }
2821       barf("unblockThread (I/O): TSO not found");
2822     }
2823
2824   case BlockedOnDelay:
2825     {
2826       /* take TSO off sleeping_queue */
2827       StgBlockingQueueElement *prev = NULL;
2828       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2829            prev = t, t = t->link) {
2830         if (t == (StgBlockingQueueElement *)tso) {
2831           if (prev == NULL) {
2832             sleeping_queue = (StgTSO *)t->link;
2833           } else {
2834             prev->link = t->link;
2835           }
2836           goto done;
2837         }
2838       }
2839       barf("unblockThread (delay): TSO not found");
2840     }
2841
2842   default:
2843     barf("unblockThread");
2844   }
2845
2846  done:
2847   tso->link = END_TSO_QUEUE;
2848   tso->why_blocked = NotBlocked;
2849   tso->block_info.closure = NULL;
2850   PUSH_ON_RUN_QUEUE(tso);
2851 }
2852 #else
2853 static void
2854 unblockThread(StgTSO *tso)
2855 {
2856   StgTSO *t, **last;
2857   
2858   /* To avoid locking unnecessarily. */
2859   if (tso->why_blocked == NotBlocked) {
2860     return;
2861   }
2862
2863   switch (tso->why_blocked) {
2864
2865   case BlockedOnMVar:
2866     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2867     {
2868       StgTSO *last_tso = END_TSO_QUEUE;
2869       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2870
2871       last = &mvar->head;
2872       for (t = mvar->head; t != END_TSO_QUEUE; 
2873            last = &t->link, last_tso = t, t = t->link) {
2874         if (t == tso) {
2875           *last = tso->link;
2876           if (mvar->tail == tso) {
2877             mvar->tail = last_tso;
2878           }
2879           goto done;
2880         }
2881       }
2882       barf("unblockThread (MVAR): TSO not found");
2883     }
2884
2885   case BlockedOnBlackHole:
2886     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2887     {
2888       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2889
2890       last = &bq->blocking_queue;
2891       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2892            last = &t->link, t = t->link) {
2893         if (t == tso) {
2894           *last = tso->link;
2895           goto done;
2896         }
2897       }
2898       barf("unblockThread (BLACKHOLE): TSO not found");
2899     }
2900
2901   case BlockedOnException:
2902     {
2903       StgTSO *target  = tso->block_info.tso;
2904
2905       ASSERT(get_itbl(target)->type == TSO);
2906
2907       while (target->what_next == ThreadRelocated) {
2908           target = target->link;
2909           ASSERT(get_itbl(target)->type == TSO);
2910       }
2911       
2912       ASSERT(target->blocked_exceptions != NULL);
2913
2914       last = &target->blocked_exceptions;
2915       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2916            last = &t->link, t = t->link) {
2917         ASSERT(get_itbl(t)->type == TSO);
2918         if (t == tso) {
2919           *last = tso->link;
2920           goto done;
2921         }
2922       }
2923       barf("unblockThread (Exception): TSO not found");
2924     }
2925
2926   case BlockedOnRead:
2927   case BlockedOnWrite:
2928 #if defined(mingw32_TARGET_OS)
2929   case BlockedOnDoProc:
2930 #endif
2931     {
2932       StgTSO *prev = NULL;
2933       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2934            prev = t, t = t->link) {
2935         if (t == tso) {
2936           if (prev == NULL) {
2937             blocked_queue_hd = t->link;
2938             if (blocked_queue_tl == t) {
2939               blocked_queue_tl = END_TSO_QUEUE;
2940             }
2941           } else {
2942             prev->link = t->link;
2943             if (blocked_queue_tl == t) {
2944               blocked_queue_tl = prev;
2945             }
2946           }
2947           goto done;
2948         }
2949       }
2950       barf("unblockThread (I/O): TSO not found");
2951     }
2952
2953   case BlockedOnDelay:
2954     {
2955       StgTSO *prev = NULL;
2956       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2957            prev = t, t = t->link) {
2958         if (t == tso) {
2959           if (prev == NULL) {
2960             sleeping_queue = t->link;
2961           } else {
2962             prev->link = t->link;
2963           }
2964           goto done;
2965         }
2966       }
2967       barf("unblockThread (delay): TSO not found");
2968     }
2969
2970   default:
2971     barf("unblockThread");
2972   }
2973
2974  done:
2975   tso->link = END_TSO_QUEUE;
2976   tso->why_blocked = NotBlocked;
2977   tso->block_info.closure = NULL;
2978   PUSH_ON_RUN_QUEUE(tso);
2979 }
2980 #endif
2981
2982 /* -----------------------------------------------------------------------------
2983  * raiseAsync()
2984  *
2985  * The following function implements the magic for raising an
2986  * asynchronous exception in an existing thread.
2987  *
2988  * We first remove the thread from any queue on which it might be
2989  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2990  *
2991  * We strip the stack down to the innermost CATCH_FRAME, building
2992  * thunks in the heap for all the active computations, so they can 
2993  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2994  * an application of the handler to the exception, and push it on
2995  * the top of the stack.
2996  * 
2997  * How exactly do we save all the active computations?  We create an
2998  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2999  * AP_STACKs pushes everything from the corresponding update frame
3000  * upwards onto the stack.  (Actually, it pushes everything up to the
3001  * next update frame plus a pointer to the next AP_STACK object.
3002  * Entering the next AP_STACK object pushes more onto the stack until we
3003  * reach the last AP_STACK object - at which point the stack should look
3004  * exactly as it did when we killed the TSO and we can continue
3005  * execution by entering the closure on top of the stack.
3006  *
3007  * We can also kill a thread entirely - this happens if either (a) the 
3008  * exception passed to raiseAsync is NULL, or (b) there's no
3009  * CATCH_FRAME on the stack.  In either case, we strip the entire
3010  * stack and replace the thread with a zombie.
3011  *
3012  * Locks: sched_mutex held upon entry nor exit.
3013  *
3014  * -------------------------------------------------------------------------- */
3015  
3016 void 
3017 deleteThread(StgTSO *tso)
3018 {
3019   raiseAsync(tso,NULL);
3020 }
3021
3022 static void 
3023 deleteThreadImmediately(StgTSO *tso)
3024 { // for forkProcess only:
3025   // delete thread without giving it a chance to catch the KillThread exception
3026
3027   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3028       return;
3029   }
3030 #if defined(RTS_SUPPORTS_THREADS)
3031   if (tso->why_blocked != BlockedOnCCall
3032       && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
3033 #endif
3034     unblockThread(tso);
3035   tso->what_next = ThreadKilled;
3036 }
3037
3038 void
3039 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3040 {
3041   /* When raising async exs from contexts where sched_mutex isn't held;
3042      use raiseAsyncWithLock(). */
3043   ACQUIRE_LOCK(&sched_mutex);
3044   raiseAsync(tso,exception);
3045   RELEASE_LOCK(&sched_mutex);
3046 }
3047
3048 void
3049 raiseAsync(StgTSO *tso, StgClosure *exception)
3050 {
3051     StgRetInfoTable *info;
3052     StgPtr sp;
3053   
3054     // Thread already dead?
3055     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3056         return;
3057     }
3058
3059     IF_DEBUG(scheduler, 
3060              sched_belch("raising exception in thread %ld.", tso->id));
3061     
3062     // Remove it from any blocking queues
3063     unblockThread(tso);
3064
3065     sp = tso->sp;
3066     
3067     // The stack freezing code assumes there's a closure pointer on
3068     // the top of the stack, so we have to arrange that this is the case...
3069     //
3070     if (sp[0] == (W_)&stg_enter_info) {
3071         sp++;
3072     } else {
3073         sp--;
3074         sp[0] = (W_)&stg_dummy_ret_closure;
3075     }
3076
3077     while (1) {
3078         nat i;
3079
3080         // 1. Let the top of the stack be the "current closure"
3081         //
3082         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3083         // CATCH_FRAME.
3084         //
3085         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3086         // current closure applied to the chunk of stack up to (but not
3087         // including) the update frame.  This closure becomes the "current
3088         // closure".  Go back to step 2.
3089         //
3090         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3091         // top of the stack applied to the exception.
3092         // 
3093         // 5. If it's a STOP_FRAME, then kill the thread.
3094         
3095         StgPtr frame;
3096         
3097         frame = sp + 1;
3098         info = get_ret_itbl((StgClosure *)frame);
3099         
3100         while (info->i.type != UPDATE_FRAME
3101                && (info->i.type != CATCH_FRAME || exception == NULL)
3102                && info->i.type != STOP_FRAME) {
3103             frame += stack_frame_sizeW((StgClosure *)frame);
3104             info = get_ret_itbl((StgClosure *)frame);
3105         }
3106         
3107         switch (info->i.type) {
3108             
3109         case CATCH_FRAME:
3110             // If we find a CATCH_FRAME, and we've got an exception to raise,
3111             // then build the THUNK raise(exception), and leave it on
3112             // top of the CATCH_FRAME ready to enter.
3113             //
3114         {
3115 #ifdef PROFILING
3116             StgCatchFrame *cf = (StgCatchFrame *)frame;
3117 #endif
3118             StgClosure *raise;
3119             
3120             // we've got an exception to raise, so let's pass it to the
3121             // handler in this frame.
3122             //
3123             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3124             TICK_ALLOC_SE_THK(1,0);
3125             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3126             raise->payload[0] = exception;
3127             
3128             // throw away the stack from Sp up to the CATCH_FRAME.
3129             //
3130             sp = frame - 1;
3131             
3132             /* Ensure that async excpetions are blocked now, so we don't get
3133              * a surprise exception before we get around to executing the
3134              * handler.
3135              */
3136             if (tso->blocked_exceptions == NULL) {
3137                 tso->blocked_exceptions = END_TSO_QUEUE;
3138             }
3139             
3140             /* Put the newly-built THUNK on top of the stack, ready to execute
3141              * when the thread restarts.
3142              */
3143             sp[0] = (W_)raise;
3144             sp[-1] = (W_)&stg_enter_info;
3145             tso->sp = sp-1;
3146             tso->what_next = ThreadRunGHC;
3147             IF_DEBUG(sanity, checkTSO(tso));
3148             return;
3149         }
3150         
3151         case UPDATE_FRAME:
3152         {
3153             StgAP_STACK * ap;
3154             nat words;
3155             
3156             // First build an AP_STACK consisting of the stack chunk above the
3157             // current update frame, with the top word on the stack as the
3158             // fun field.
3159             //
3160             words = frame - sp - 1;
3161             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3162             
3163             ap->size = words;
3164             ap->fun  = (StgClosure *)sp[0];
3165             sp++;
3166             for(i=0; i < (nat)words; ++i) {
3167                 ap->payload[i] = (StgClosure *)*sp++;
3168             }
3169             
3170             SET_HDR(ap,&stg_AP_STACK_info,
3171                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3172             TICK_ALLOC_UP_THK(words+1,0);
3173             
3174             IF_DEBUG(scheduler,
3175                      fprintf(stderr,  "sched: Updating ");
3176                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3177                      fprintf(stderr,  " with ");
3178                      printObj((StgClosure *)ap);
3179                 );
3180
3181             // Replace the updatee with an indirection - happily
3182             // this will also wake up any threads currently
3183             // waiting on the result.
3184             //
3185             // Warning: if we're in a loop, more than one update frame on
3186             // the stack may point to the same object.  Be careful not to
3187             // overwrite an IND_OLDGEN in this case, because we'll screw
3188             // up the mutable lists.  To be on the safe side, don't
3189             // overwrite any kind of indirection at all.  See also
3190             // threadSqueezeStack in GC.c, where we have to make a similar
3191             // check.
3192             //
3193             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3194                 // revert the black hole
3195                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3196             }
3197             sp += sizeofW(StgUpdateFrame) - 1;
3198             sp[0] = (W_)ap; // push onto stack
3199             break;
3200         }
3201         
3202         case STOP_FRAME:
3203             // We've stripped the entire stack, the thread is now dead.
3204             sp += sizeofW(StgStopFrame);
3205             tso->what_next = ThreadKilled;
3206             tso->sp = sp;
3207             return;
3208             
3209         default:
3210             barf("raiseAsync");
3211         }
3212     }
3213     barf("raiseAsync");
3214 }
3215
3216 /* -----------------------------------------------------------------------------
3217    resurrectThreads is called after garbage collection on the list of
3218    threads found to be garbage.  Each of these threads will be woken
3219    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3220    on an MVar, or NonTermination if the thread was blocked on a Black
3221    Hole.
3222
3223    Locks: sched_mutex isn't held upon entry nor exit.
3224    -------------------------------------------------------------------------- */
3225
3226 void
3227 resurrectThreads( StgTSO *threads )
3228 {
3229   StgTSO *tso, *next;
3230
3231   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3232     next = tso->global_link;
3233     tso->global_link = all_threads;
3234     all_threads = tso;
3235     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3236
3237     switch (tso->why_blocked) {
3238     case BlockedOnMVar:
3239     case BlockedOnException:
3240       /* Called by GC - sched_mutex lock is currently held. */
3241       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3242       break;
3243     case BlockedOnBlackHole:
3244       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3245       break;
3246     case NotBlocked:
3247       /* This might happen if the thread was blocked on a black hole
3248        * belonging to a thread that we've just woken up (raiseAsync
3249        * can wake up threads, remember...).
3250        */
3251       continue;
3252     default:
3253       barf("resurrectThreads: thread blocked in a strange way");
3254     }
3255   }
3256 }
3257
3258 /* -----------------------------------------------------------------------------
3259  * Blackhole detection: if we reach a deadlock, test whether any
3260  * threads are blocked on themselves.  Any threads which are found to
3261  * be self-blocked get sent a NonTermination exception.
3262  *
3263  * This is only done in a deadlock situation in order to avoid
3264  * performance overhead in the normal case.
3265  *
3266  * Locks: sched_mutex is held upon entry and exit.
3267  * -------------------------------------------------------------------------- */
3268
3269 static void
3270 detectBlackHoles( void )
3271 {
3272     StgTSO *tso = all_threads;
3273     StgClosure *frame;
3274     StgClosure *blocked_on;
3275     StgRetInfoTable *info;
3276
3277     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3278
3279         while (tso->what_next == ThreadRelocated) {
3280             tso = tso->link;
3281             ASSERT(get_itbl(tso)->type == TSO);
3282         }
3283       
3284         if (tso->why_blocked != BlockedOnBlackHole) {
3285             continue;
3286         }
3287         blocked_on = tso->block_info.closure;
3288
3289         frame = (StgClosure *)tso->sp;
3290
3291         while(1) {
3292             info = get_ret_itbl(frame);
3293             switch (info->i.type) {
3294             case UPDATE_FRAME:
3295                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3296                     /* We are blocking on one of our own computations, so
3297                      * send this thread the NonTermination exception.  
3298                      */
3299                     IF_DEBUG(scheduler, 
3300                              sched_belch("thread %d is blocked on itself", tso->id));
3301                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3302                     goto done;
3303                 }
3304                 
3305                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3306                 continue;
3307
3308             case STOP_FRAME:
3309                 goto done;
3310
3311                 // normal stack frames; do nothing except advance the pointer
3312             default:
3313                 (StgPtr)frame += stack_frame_sizeW(frame);
3314             }
3315         }   
3316         done: ;
3317     }
3318 }
3319
3320 /* ----------------------------------------------------------------------------
3321  * Debugging: why is a thread blocked
3322  * [Also provides useful information when debugging threaded programs
3323  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3324    ------------------------------------------------------------------------- */
3325
3326 static
3327 void
3328 printThreadBlockage(StgTSO *tso)
3329 {
3330   switch (tso->why_blocked) {
3331   case BlockedOnRead:
3332     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3333     break;
3334   case BlockedOnWrite:
3335     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3336     break;
3337 #if defined(mingw32_TARGET_OS)
3338     case BlockedOnDoProc:
3339     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3340     break;
3341 #endif
3342   case BlockedOnDelay:
3343     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3344     break;
3345   case BlockedOnMVar:
3346     fprintf(stderr,"is blocked on an MVar");
3347     break;
3348   case BlockedOnException:
3349     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3350             tso->block_info.tso->id);
3351     break;
3352   case BlockedOnBlackHole:
3353     fprintf(stderr,"is blocked on a black hole");
3354     break;
3355   case NotBlocked:
3356     fprintf(stderr,"is not blocked");
3357     break;
3358 #if defined(PAR)
3359   case BlockedOnGA:
3360     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3361             tso->block_info.closure, info_type(tso->block_info.closure));
3362     break;
3363   case BlockedOnGA_NoSend:
3364     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3365             tso->block_info.closure, info_type(tso->block_info.closure));
3366     break;
3367 #endif
3368 #if defined(RTS_SUPPORTS_THREADS)
3369   case BlockedOnCCall:
3370     fprintf(stderr,"is blocked on an external call");
3371     break;
3372   case BlockedOnCCall_NoUnblockExc:
3373     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3374     break;
3375 #endif
3376   default:
3377     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3378          tso->why_blocked, tso->id, tso);
3379   }
3380 }
3381
3382 static
3383 void
3384 printThreadStatus(StgTSO *tso)
3385 {
3386   switch (tso->what_next) {
3387   case ThreadKilled:
3388     fprintf(stderr,"has been killed");
3389     break;
3390   case ThreadComplete:
3391     fprintf(stderr,"has completed");
3392     break;
3393   default:
3394     printThreadBlockage(tso);
3395   }
3396 }
3397
3398 void
3399 printAllThreads(void)
3400 {
3401   StgTSO *t;
3402   void *label;
3403
3404 # if defined(GRAN)
3405   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3406   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3407                        time_string, rtsFalse/*no commas!*/);
3408
3409   fprintf(stderr, "all threads at [%s]:\n", time_string);
3410 # elif defined(PAR)
3411   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3412   ullong_format_string(CURRENT_TIME,
3413                        time_string, rtsFalse/*no commas!*/);
3414
3415   fprintf(stderr,"all threads at [%s]:\n", time_string);
3416 # else
3417   fprintf(stderr,"all threads:\n");
3418 # endif
3419
3420   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3421     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3422     label = lookupThreadLabel(t->id);
3423     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3424     printThreadStatus(t);
3425     fprintf(stderr,"\n");
3426   }
3427 }
3428     
3429 #ifdef DEBUG
3430
3431 /* 
3432    Print a whole blocking queue attached to node (debugging only).
3433 */
3434 # if defined(PAR)
3435 void 
3436 print_bq (StgClosure *node)
3437 {
3438   StgBlockingQueueElement *bqe;
3439   StgTSO *tso;
3440   rtsBool end;
3441
3442   fprintf(stderr,"## BQ of closure %p (%s): ",
3443           node, info_type(node));
3444
3445   /* should cover all closures that may have a blocking queue */
3446   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3447          get_itbl(node)->type == FETCH_ME_BQ ||
3448          get_itbl(node)->type == RBH ||
3449          get_itbl(node)->type == MVAR);
3450     
3451   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3452
3453   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3454 }
3455
3456 /* 
3457    Print a whole blocking queue starting with the element bqe.
3458 */
3459 void 
3460 print_bqe (StgBlockingQueueElement *bqe)
3461 {
3462   rtsBool end;
3463
3464   /* 
3465      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3466   */
3467   for (end = (bqe==END_BQ_QUEUE);
3468        !end; // iterate until bqe points to a CONSTR
3469        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3470        bqe = end ? END_BQ_QUEUE : bqe->link) {
3471     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3472     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3473     /* types of closures that may appear in a blocking queue */
3474     ASSERT(get_itbl(bqe)->type == TSO ||           
3475            get_itbl(bqe)->type == BLOCKED_FETCH || 
3476            get_itbl(bqe)->type == CONSTR); 
3477     /* only BQs of an RBH end with an RBH_Save closure */
3478     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3479
3480     switch (get_itbl(bqe)->type) {
3481     case TSO:
3482       fprintf(stderr," TSO %u (%x),",
3483               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3484       break;
3485     case BLOCKED_FETCH:
3486       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3487               ((StgBlockedFetch *)bqe)->node, 
3488               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3489               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3490               ((StgBlockedFetch *)bqe)->ga.weight);
3491       break;
3492     case CONSTR:
3493       fprintf(stderr," %s (IP %p),",
3494               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3495                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3496                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3497                "RBH_Save_?"), get_itbl(bqe));
3498       break;
3499     default:
3500       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3501            info_type((StgClosure *)bqe)); // , node, info_type(node));
3502       break;
3503     }
3504   } /* for */
3505   fputc('\n', stderr);
3506 }
3507 # elif defined(GRAN)
3508 void 
3509 print_bq (StgClosure *node)
3510 {
3511   StgBlockingQueueElement *bqe;
3512   PEs node_loc, tso_loc;
3513   rtsBool end;
3514
3515   /* should cover all closures that may have a blocking queue */
3516   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3517          get_itbl(node)->type == FETCH_ME_BQ ||
3518          get_itbl(node)->type == RBH);
3519     
3520   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3521   node_loc = where_is(node);
3522
3523   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3524           node, info_type(node), node_loc);
3525
3526   /* 
3527      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3528   */
3529   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3530        !end; // iterate until bqe points to a CONSTR
3531        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3532     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3533     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3534     /* types of closures that may appear in a blocking queue */
3535     ASSERT(get_itbl(bqe)->type == TSO ||           
3536            get_itbl(bqe)->type == CONSTR); 
3537     /* only BQs of an RBH end with an RBH_Save closure */
3538     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3539
3540     tso_loc = where_is((StgClosure *)bqe);
3541     switch (get_itbl(bqe)->type) {
3542     case TSO:
3543       fprintf(stderr," TSO %d (%p) on [PE %d],",
3544               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3545       break;
3546     case CONSTR:
3547       fprintf(stderr," %s (IP %p),",
3548               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3549                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3550                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3551                "RBH_Save_?"), get_itbl(bqe));
3552       break;
3553     default:
3554       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3555            info_type((StgClosure *)bqe), node, info_type(node));
3556       break;
3557     }
3558   } /* for */
3559   fputc('\n', stderr);
3560 }
3561 #else
3562 /* 
3563    Nice and easy: only TSOs on the blocking queue
3564 */
3565 void 
3566 print_bq (StgClosure *node)
3567 {
3568   StgTSO *tso;
3569
3570   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3571   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3572        tso != END_TSO_QUEUE; 
3573        tso=tso->link) {
3574     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3575     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3576     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3577   }
3578   fputc('\n', stderr);
3579 }
3580 # endif
3581
3582 #if defined(PAR)
3583 static nat
3584 run_queue_len(void)
3585 {
3586   nat i;
3587   StgTSO *tso;
3588
3589   for (i=0, tso=run_queue_hd; 
3590        tso != END_TSO_QUEUE;
3591        i++, tso=tso->link)
3592     /* nothing */
3593
3594   return i;
3595 }
3596 #endif
3597
3598 void
3599 sched_belch(char *s, ...)
3600 {
3601   va_list ap;
3602   va_start(ap,s);
3603 #ifdef RTS_SUPPORTS_THREADS
3604   fprintf(stderr, "sched (task %p): ", osThreadId());
3605 #elif defined(PAR)
3606   fprintf(stderr, "== ");
3607 #else
3608   fprintf(stderr, "sched: ");
3609 #endif
3610   vfprintf(stderr, s, ap);
3611   fprintf(stderr, "\n");
3612   fflush(stderr);
3613   va_end(ap);
3614 }
3615
3616 #endif /* DEBUG */