bc091be70b17fbfff773fe5cd7bad7f9f3c73114
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.183 2003/12/16 13:27:32 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2003
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distrib. memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 /* 
21  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
22
23    The main scheduling loop in GUM iterates until a finish message is received.
24    In that case a global flag @receivedFinish@ is set and this instance of
25    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
26    for the handling of incoming messages, such as PP_FINISH.
27    Note that in the parallel case we have a system manager that coordinates
28    different PEs, each of which are running one instance of the RTS.
29    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
30    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
31
32  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
33
34    The main scheduling code in GranSim is quite different from that in std
35    (concurrent) Haskell: while concurrent Haskell just iterates over the
36    threads in the runnable queue, GranSim is event driven, i.e. it iterates
37    over the events in the global event queue.  -- HWL
38 */
39
40 #include "PosixSource.h"
41 #include "Rts.h"
42 #include "SchedAPI.h"
43 #include "RtsUtils.h"
44 #include "RtsFlags.h"
45 #include "Storage.h"
46 #include "StgRun.h"
47 #include "StgStartup.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Storage.h"
53 #include "Interpreter.h"
54 #include "Exception.h"
55 #include "Printer.h"
56 #include "Signals.h"
57 #include "Sanity.h"
58 #include "Stats.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #ifdef PROFILING
63 #include "Proftimer.h"
64 #include "ProfHeap.h"
65 #endif
66 #if defined(GRAN) || defined(PAR)
67 # include "GranSimRts.h"
68 # include "GranSim.h"
69 # include "ParallelRts.h"
70 # include "Parallel.h"
71 # include "ParallelDebug.h"
72 # include "FetchMe.h"
73 # include "HLC.h"
74 #endif
75 #include "Sparks.h"
76 #include "Capability.h"
77 #include "OSThreads.h"
78 #include  "Task.h"
79
80 #ifdef HAVE_SYS_TYPES_H
81 #include <sys/types.h>
82 #endif
83 #ifdef HAVE_UNISTD_H
84 #include <unistd.h>
85 #endif
86
87 #include <string.h>
88 #include <stdlib.h>
89 #include <stdarg.h>
90
91 #ifdef HAVE_ERRNO_H
92 #include <errno.h>
93 #endif
94
95 #ifdef THREADED_RTS
96 #define USED_IN_THREADED_RTS
97 #else
98 #define USED_IN_THREADED_RTS STG_UNUSED
99 #endif
100
101 #ifdef RTS_SUPPORTS_THREADS
102 #define USED_WHEN_RTS_SUPPORTS_THREADS
103 #else
104 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
105 #endif
106
107 /* Main thread queue.
108  * Locks required: sched_mutex.
109  */
110 StgMainThread *main_threads = NULL;
111
112 /* Thread queues.
113  * Locks required: sched_mutex.
114  */
115 #if defined(GRAN)
116
117 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
118 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
119
120 /* 
121    In GranSim we have a runnable and a blocked queue for each processor.
122    In order to minimise code changes new arrays run_queue_hds/tls
123    are created. run_queue_hd is then a short cut (macro) for
124    run_queue_hds[CurrentProc] (see GranSim.h).
125    -- HWL
126 */
127 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
128 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
129 StgTSO *ccalling_threadss[MAX_PROC];
130 /* We use the same global list of threads (all_threads) in GranSim as in
131    the std RTS (i.e. we are cheating). However, we don't use this list in
132    the GranSim specific code at the moment (so we are only potentially
133    cheating).  */
134
135 #else /* !GRAN */
136
137 StgTSO *run_queue_hd = NULL;
138 StgTSO *run_queue_tl = NULL;
139 StgTSO *blocked_queue_hd = NULL;
140 StgTSO *blocked_queue_tl = NULL;
141 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
142
143 #endif
144
145 /* Linked list of all threads.
146  * Used for detecting garbage collected threads.
147  */
148 StgTSO *all_threads = NULL;
149
150 /* When a thread performs a safe C call (_ccall_GC, using old
151  * terminology), it gets put on the suspended_ccalling_threads
152  * list. Used by the garbage collector.
153  */
154 static StgTSO *suspended_ccalling_threads;
155
156 static StgTSO *threadStackOverflow(StgTSO *tso);
157
158 /* KH: The following two flags are shared memory locations.  There is no need
159        to lock them, since they are only unset at the end of a scheduler
160        operation.
161 */
162
163 /* flag set by signal handler to precipitate a context switch */
164 nat context_switch = 0;
165
166 /* if this flag is set as well, give up execution */
167 rtsBool interrupted = rtsFalse;
168
169 /* Next thread ID to allocate.
170  * Locks required: thread_id_mutex
171  */
172 static StgThreadID next_thread_id = 1;
173
174 /*
175  * Pointers to the state of the current thread.
176  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
177  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
178  */
179  
180 /* The smallest stack size that makes any sense is:
181  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
182  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
183  *  + 1                       (the closure to enter)
184  *  + 1                       (stg_ap_v_ret)
185  *  + 1                       (spare slot req'd by stg_ap_v_ret)
186  *
187  * A thread with this stack will bomb immediately with a stack
188  * overflow, which will increase its stack size.  
189  */
190
191 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
192
193
194 #if defined(GRAN)
195 StgTSO *CurrentTSO;
196 #endif
197
198 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
199  *  exists - earlier gccs apparently didn't.
200  *  -= chak
201  */
202 StgTSO dummy_tso;
203
204 static rtsBool ready_to_gc;
205
206 /*
207  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
208  * in an MT setting, needed to signal that a worker thread shouldn't hang around
209  * in the scheduler when it is out of work.
210  */
211 static rtsBool shutting_down_scheduler = rtsFalse;
212
213 void            addToBlockedQueue ( StgTSO *tso );
214
215 static void     schedule          ( StgMainThread *mainThread, Capability *initialCapability );
216        void     interruptStgRts   ( void );
217
218 static void     detectBlackHoles  ( void );
219
220 #if defined(RTS_SUPPORTS_THREADS)
221 /* ToDo: carefully document the invariants that go together
222  *       with these synchronisation objects.
223  */
224 Mutex     sched_mutex       = INIT_MUTEX_VAR;
225 Mutex     term_mutex        = INIT_MUTEX_VAR;
226
227 /*
228  * A heavyweight solution to the problem of protecting
229  * the thread_id from concurrent update.
230  */
231 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
232
233 #endif /* RTS_SUPPORTS_THREADS */
234
235 #if defined(PAR)
236 StgTSO *LastTSO;
237 rtsTime TimeOfLastYield;
238 rtsBool emitSchedule = rtsTrue;
239 #endif
240
241 #if DEBUG
242 static char *whatNext_strs[] = {
243   "ThreadRunGHC",
244   "ThreadInterpret",
245   "ThreadKilled",
246   "ThreadRelocated",
247   "ThreadComplete"
248 };
249 #endif
250
251 #if defined(PAR)
252 StgTSO * createSparkThread(rtsSpark spark);
253 StgTSO * activateSpark (rtsSpark spark);  
254 #endif
255
256 /* ----------------------------------------------------------------------------
257  * Starting Tasks
258  * ------------------------------------------------------------------------- */
259
260 #if defined(RTS_SUPPORTS_THREADS)
261 static rtsBool startingWorkerThread = rtsFalse;
262
263 static void taskStart(void);
264 static void
265 taskStart(void)
266 {
267   ACQUIRE_LOCK(&sched_mutex);
268   schedule(NULL,NULL);
269   RELEASE_LOCK(&sched_mutex);
270 }
271
272 void
273 startSchedulerTaskIfNecessary(void)
274 {
275   if(run_queue_hd != END_TSO_QUEUE
276     || blocked_queue_hd != END_TSO_QUEUE
277     || sleeping_queue != END_TSO_QUEUE)
278   {
279     if(!startingWorkerThread)
280     { // we don't want to start another worker thread
281       // just because the last one hasn't yet reached the
282       // "waiting for capability" state
283       startingWorkerThread = rtsTrue;
284       startTask(taskStart);
285     }
286   }
287 }
288 #endif
289
290 /* ---------------------------------------------------------------------------
291    Main scheduling loop.
292
293    We use round-robin scheduling, each thread returning to the
294    scheduler loop when one of these conditions is detected:
295
296       * out of heap space
297       * timer expires (thread yields)
298       * thread blocks
299       * thread ends
300       * stack overflow
301
302    Locking notes:  we acquire the scheduler lock once at the beginning
303    of the scheduler loop, and release it when
304     
305       * running a thread, or
306       * waiting for work, or
307       * waiting for a GC to complete.
308
309    GRAN version:
310      In a GranSim setup this loop iterates over the global event queue.
311      This revolves around the global event queue, which determines what 
312      to do next. Therefore, it's more complicated than either the 
313      concurrent or the parallel (GUM) setup.
314
315    GUM version:
316      GUM iterates over incoming messages.
317      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
318      and sends out a fish whenever it has nothing to do; in-between
319      doing the actual reductions (shared code below) it processes the
320      incoming messages and deals with delayed operations 
321      (see PendingFetches).
322      This is not the ugliest code you could imagine, but it's bloody close.
323
324    ------------------------------------------------------------------------ */
325 static void
326 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
327           Capability *initialCapability )
328 {
329   StgTSO *t;
330   Capability *cap = initialCapability;
331   StgThreadReturnCode ret;
332 #if defined(GRAN)
333   rtsEvent *event;
334 #elif defined(PAR)
335   StgSparkPool *pool;
336   rtsSpark spark;
337   StgTSO *tso;
338   GlobalTaskId pe;
339   rtsBool receivedFinish = rtsFalse;
340 # if defined(DEBUG)
341   nat tp_size, sp_size; // stats only
342 # endif
343 #endif
344   rtsBool was_interrupted = rtsFalse;
345   StgTSOWhatNext prev_what_next;
346   
347   // Pre-condition: sched_mutex is held.
348  
349 #if defined(RTS_SUPPORTS_THREADS)
350   //
351   // in the threaded case, the capability is either passed in via the
352   // initialCapability parameter, or initialized inside the scheduler
353   // loop 
354   //
355   IF_DEBUG(scheduler,
356            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
357                        mainThread, initialCapability);
358       );
359 #else
360   // simply initialise it in the non-threaded case
361   grabCapability(&cap);
362 #endif
363
364 #if defined(GRAN)
365   /* set up first event to get things going */
366   /* ToDo: assign costs for system setup and init MainTSO ! */
367   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
368             ContinueThread, 
369             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
370
371   IF_DEBUG(gran,
372            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
373            G_TSO(CurrentTSO, 5));
374
375   if (RtsFlags.GranFlags.Light) {
376     /* Save current time; GranSim Light only */
377     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
378   }      
379
380   event = get_next_event();
381
382   while (event!=(rtsEvent*)NULL) {
383     /* Choose the processor with the next event */
384     CurrentProc = event->proc;
385     CurrentTSO = event->tso;
386
387 #elif defined(PAR)
388
389   while (!receivedFinish) {    /* set by processMessages */
390                                /* when receiving PP_FINISH message         */ 
391
392 #else // everything except GRAN and PAR
393
394   while (1) {
395
396 #endif
397
398      IF_DEBUG(scheduler, printAllThreads());
399
400 #if defined(RTS_SUPPORTS_THREADS)
401       // Yield the capability to higher-priority tasks if necessary.
402       //
403       if (cap != NULL) {
404           yieldCapability(&cap);
405       }
406
407       // If we do not currently hold a capability, we wait for one
408       //
409       if (cap == NULL) {
410           waitForCapability(&sched_mutex, &cap,
411                             mainThread ? &mainThread->bound_thread_cond : NULL);
412       }
413
414       // We now have a capability...
415 #endif
416
417     //
418     // If we're interrupted (the user pressed ^C, or some other
419     // termination condition occurred), kill all the currently running
420     // threads.
421     //
422     if (interrupted) {
423         IF_DEBUG(scheduler, sched_belch("interrupted"));
424         interrupted = rtsFalse;
425         was_interrupted = rtsTrue;
426 #if defined(RTS_SUPPORTS_THREADS)
427         // In the threaded RTS, deadlock detection doesn't work,
428         // so just exit right away.
429         prog_belch("interrupted");
430         releaseCapability(cap);
431         RELEASE_LOCK(&sched_mutex);
432         shutdownHaskellAndExit(EXIT_SUCCESS);
433 #else
434         deleteAllThreads();
435 #endif
436     }
437
438     //
439     // Go through the list of main threads and wake up any
440     // clients whose computations have finished.  ToDo: this
441     // should be done more efficiently without a linear scan
442     // of the main threads list, somehow...
443     //
444 #if defined(RTS_SUPPORTS_THREADS)
445     { 
446         StgMainThread *m, **prev;
447         prev = &main_threads;
448         for (m = main_threads; m != NULL; prev = &m->link, m = m->link) {
449           if (m->tso->what_next == ThreadComplete
450               || m->tso->what_next == ThreadKilled)
451           {
452             if (m == mainThread)
453             {
454               if (m->tso->what_next == ThreadComplete)
455               {
456                 if (m->ret)
457                 {
458                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
459                   *(m->ret) = (StgClosure *)m->tso->sp[1]; 
460                 }
461                 m->stat = Success;
462               }
463               else
464               {
465                 if (m->ret)
466                 {
467                   *(m->ret) = NULL;
468                 }
469                 if (was_interrupted)
470                 {
471                   m->stat = Interrupted;
472                 }
473                 else
474                 {
475                   m->stat = Killed;
476                 }
477               }
478               *prev = m->link;
479             
480 #ifdef DEBUG
481               removeThreadLabel((StgWord)m->tso->id);
482 #endif
483               releaseCapability(cap);
484               return;
485             }
486             else
487             {
488                 // The current OS thread can not handle the fact that
489                 // the Haskell thread "m" has ended.  "m" is bound;
490                 // the scheduler loop in it's bound OS thread has to
491                 // return, so let's pass our capability directly to
492                 // that thread.
493                 passCapability(&m->bound_thread_cond);
494             }
495           }
496         }
497     }
498     
499 #else /* not threaded */
500
501 # if defined(PAR)
502     /* in GUM do this only on the Main PE */
503     if (IAmMainThread)
504 # endif
505     /* If our main thread has finished or been killed, return.
506      */
507     {
508       StgMainThread *m = main_threads;
509       if (m->tso->what_next == ThreadComplete
510           || m->tso->what_next == ThreadKilled) {
511 #ifdef DEBUG
512         removeThreadLabel((StgWord)m->tso->id);
513 #endif
514         main_threads = main_threads->link;
515         if (m->tso->what_next == ThreadComplete) {
516             // We finished successfully, fill in the return value
517             // NOTE: return val is tso->sp[1] (see StgStartup.hc)
518             if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[1]; };
519             m->stat = Success;
520             return;
521         } else {
522           if (m->ret) { *(m->ret) = NULL; };
523           if (was_interrupted) {
524             m->stat = Interrupted;
525           } else {
526             m->stat = Killed;
527           }
528           return;
529         }
530       }
531     }
532 #endif
533
534
535 #if defined(RTS_USER_SIGNALS)
536     // check for signals each time around the scheduler
537     if (signals_pending()) {
538       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
539       startSignalHandlers();
540       ACQUIRE_LOCK(&sched_mutex);
541     }
542 #endif
543
544     /* Check whether any waiting threads need to be woken up.  If the
545      * run queue is empty, and there are no other tasks running, we
546      * can wait indefinitely for something to happen.
547      */
548     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) 
549 #if defined(RTS_SUPPORTS_THREADS)
550                 || EMPTY_RUN_QUEUE()
551 #endif
552         )
553     {
554       awaitEvent( EMPTY_RUN_QUEUE() );
555     }
556     /* we can be interrupted while waiting for I/O... */
557     if (interrupted) continue;
558
559     /* 
560      * Detect deadlock: when we have no threads to run, there are no
561      * threads waiting on I/O or sleeping, and all the other tasks are
562      * waiting for work, we must have a deadlock of some description.
563      *
564      * We first try to find threads blocked on themselves (ie. black
565      * holes), and generate NonTermination exceptions where necessary.
566      *
567      * If no threads are black holed, we have a deadlock situation, so
568      * inform all the main threads.
569      */
570 #if !defined(PAR) && !defined(RTS_SUPPORTS_THREADS)
571     if (   EMPTY_THREAD_QUEUES() )
572     {
573         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
574         // Garbage collection can release some new threads due to
575         // either (a) finalizers or (b) threads resurrected because
576         // they are about to be send BlockedOnDeadMVar.  Any threads
577         // thus released will be immediately runnable.
578         GarbageCollect(GetRoots,rtsTrue);
579
580         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
581
582         IF_DEBUG(scheduler, 
583                  sched_belch("still deadlocked, checking for black holes..."));
584         detectBlackHoles();
585
586         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
587
588 #if defined(RTS_USER_SIGNALS)
589         /* If we have user-installed signal handlers, then wait
590          * for signals to arrive rather then bombing out with a
591          * deadlock.
592          */
593         if ( anyUserHandlers() ) {
594             IF_DEBUG(scheduler, 
595                      sched_belch("still deadlocked, waiting for signals..."));
596
597             awaitUserSignals();
598
599             // we might be interrupted...
600             if (interrupted) { continue; }
601
602             if (signals_pending()) {
603                 RELEASE_LOCK(&sched_mutex);
604                 startSignalHandlers();
605                 ACQUIRE_LOCK(&sched_mutex);
606             }
607             ASSERT(!EMPTY_RUN_QUEUE());
608             goto not_deadlocked;
609         }
610 #endif
611
612         /* Probably a real deadlock.  Send the current main thread the
613          * Deadlock exception (or in the SMP build, send *all* main
614          * threads the deadlock exception, since none of them can make
615          * progress).
616          */
617         {
618             StgMainThread *m;
619             m = main_threads;
620             switch (m->tso->why_blocked) {
621             case BlockedOnBlackHole:
622                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
623                 break;
624             case BlockedOnException:
625             case BlockedOnMVar:
626                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
627                 break;
628             default:
629                 barf("deadlock: main thread blocked in a strange way");
630             }
631         }
632     }
633   not_deadlocked:
634
635 #elif defined(RTS_SUPPORTS_THREADS)
636     // ToDo: add deadlock detection in threaded RTS
637 #elif defined(PAR)
638     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
639 #endif
640
641 #if defined(RTS_SUPPORTS_THREADS)
642     if ( EMPTY_RUN_QUEUE() ) {
643       continue; // nothing to do
644     }
645 #endif
646
647 #if defined(GRAN)
648     if (RtsFlags.GranFlags.Light)
649       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
650
651     /* adjust time based on time-stamp */
652     if (event->time > CurrentTime[CurrentProc] &&
653         event->evttype != ContinueThread)
654       CurrentTime[CurrentProc] = event->time;
655     
656     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
657     if (!RtsFlags.GranFlags.Light)
658       handleIdlePEs();
659
660     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
661
662     /* main event dispatcher in GranSim */
663     switch (event->evttype) {
664       /* Should just be continuing execution */
665     case ContinueThread:
666       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
667       /* ToDo: check assertion
668       ASSERT(run_queue_hd != (StgTSO*)NULL &&
669              run_queue_hd != END_TSO_QUEUE);
670       */
671       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
672       if (!RtsFlags.GranFlags.DoAsyncFetch &&
673           procStatus[CurrentProc]==Fetching) {
674         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
675               CurrentTSO->id, CurrentTSO, CurrentProc);
676         goto next_thread;
677       } 
678       /* Ignore ContinueThreads for completed threads */
679       if (CurrentTSO->what_next == ThreadComplete) {
680         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
681               CurrentTSO->id, CurrentTSO, CurrentProc);
682         goto next_thread;
683       } 
684       /* Ignore ContinueThreads for threads that are being migrated */
685       if (PROCS(CurrentTSO)==Nowhere) { 
686         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
687               CurrentTSO->id, CurrentTSO, CurrentProc);
688         goto next_thread;
689       }
690       /* The thread should be at the beginning of the run queue */
691       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
692         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
693               CurrentTSO->id, CurrentTSO, CurrentProc);
694         break; // run the thread anyway
695       }
696       /*
697       new_event(proc, proc, CurrentTime[proc],
698                 FindWork,
699                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
700       goto next_thread; 
701       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
702       break; // now actually run the thread; DaH Qu'vam yImuHbej 
703
704     case FetchNode:
705       do_the_fetchnode(event);
706       goto next_thread;             /* handle next event in event queue  */
707       
708     case GlobalBlock:
709       do_the_globalblock(event);
710       goto next_thread;             /* handle next event in event queue  */
711       
712     case FetchReply:
713       do_the_fetchreply(event);
714       goto next_thread;             /* handle next event in event queue  */
715       
716     case UnblockThread:   /* Move from the blocked queue to the tail of */
717       do_the_unblock(event);
718       goto next_thread;             /* handle next event in event queue  */
719       
720     case ResumeThread:  /* Move from the blocked queue to the tail of */
721       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
722       event->tso->gran.blocktime += 
723         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
724       do_the_startthread(event);
725       goto next_thread;             /* handle next event in event queue  */
726       
727     case StartThread:
728       do_the_startthread(event);
729       goto next_thread;             /* handle next event in event queue  */
730       
731     case MoveThread:
732       do_the_movethread(event);
733       goto next_thread;             /* handle next event in event queue  */
734       
735     case MoveSpark:
736       do_the_movespark(event);
737       goto next_thread;             /* handle next event in event queue  */
738       
739     case FindWork:
740       do_the_findwork(event);
741       goto next_thread;             /* handle next event in event queue  */
742       
743     default:
744       barf("Illegal event type %u\n", event->evttype);
745     }  /* switch */
746     
747     /* This point was scheduler_loop in the old RTS */
748
749     IF_DEBUG(gran, belch("GRAN: after main switch"));
750
751     TimeOfLastEvent = CurrentTime[CurrentProc];
752     TimeOfNextEvent = get_time_of_next_event();
753     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
754     // CurrentTSO = ThreadQueueHd;
755
756     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
757                          TimeOfNextEvent));
758
759     if (RtsFlags.GranFlags.Light) 
760       GranSimLight_leave_system(event, &ActiveTSO); 
761
762     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
763
764     IF_DEBUG(gran, 
765              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
766
767     /* in a GranSim setup the TSO stays on the run queue */
768     t = CurrentTSO;
769     /* Take a thread from the run queue. */
770     POP_RUN_QUEUE(t); // take_off_run_queue(t);
771
772     IF_DEBUG(gran, 
773              fprintf(stderr, "GRAN: About to run current thread, which is\n");
774              G_TSO(t,5));
775
776     context_switch = 0; // turned on via GranYield, checking events and time slice
777
778     IF_DEBUG(gran, 
779              DumpGranEvent(GR_SCHEDULE, t));
780
781     procStatus[CurrentProc] = Busy;
782
783 #elif defined(PAR)
784     if (PendingFetches != END_BF_QUEUE) {
785         processFetches();
786     }
787
788     /* ToDo: phps merge with spark activation above */
789     /* check whether we have local work and send requests if we have none */
790     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
791       /* :-[  no local threads => look out for local sparks */
792       /* the spark pool for the current PE */
793       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
794       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
795           pool->hd < pool->tl) {
796         /* 
797          * ToDo: add GC code check that we really have enough heap afterwards!!
798          * Old comment:
799          * If we're here (no runnable threads) and we have pending
800          * sparks, we must have a space problem.  Get enough space
801          * to turn one of those pending sparks into a
802          * thread... 
803          */
804
805         spark = findSpark(rtsFalse);                /* get a spark */
806         if (spark != (rtsSpark) NULL) {
807           tso = activateSpark(spark);       /* turn the spark into a thread */
808           IF_PAR_DEBUG(schedule,
809                        belch("==== schedule: Created TSO %d (%p); %d threads active",
810                              tso->id, tso, advisory_thread_count));
811
812           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
813             belch("==^^ failed to activate spark");
814             goto next_thread;
815           }               /* otherwise fall through & pick-up new tso */
816         } else {
817           IF_PAR_DEBUG(verbose,
818                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
819                              spark_queue_len(pool)));
820           goto next_thread;
821         }
822       }
823
824       /* If we still have no work we need to send a FISH to get a spark
825          from another PE 
826       */
827       if (EMPTY_RUN_QUEUE()) {
828       /* =8-[  no local sparks => look for work on other PEs */
829         /*
830          * We really have absolutely no work.  Send out a fish
831          * (there may be some out there already), and wait for
832          * something to arrive.  We clearly can't run any threads
833          * until a SCHEDULE or RESUME arrives, and so that's what
834          * we're hoping to see.  (Of course, we still have to
835          * respond to other types of messages.)
836          */
837         TIME now = msTime() /*CURRENT_TIME*/;
838         IF_PAR_DEBUG(verbose, 
839                      belch("--  now=%ld", now));
840         IF_PAR_DEBUG(verbose,
841                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
842                          (last_fish_arrived_at!=0 &&
843                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
844                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
845                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
846                              last_fish_arrived_at,
847                              RtsFlags.ParFlags.fishDelay, now);
848                      });
849         
850         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
851             (last_fish_arrived_at==0 ||
852              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
853           /* outstandingFishes is set in sendFish, processFish;
854              avoid flooding system with fishes via delay */
855           pe = choosePE();
856           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
857                    NEW_FISH_HUNGER);
858
859           // Global statistics: count no. of fishes
860           if (RtsFlags.ParFlags.ParStats.Global &&
861               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
862             globalParStats.tot_fish_mess++;
863           }
864         }
865       
866         receivedFinish = processMessages();
867         goto next_thread;
868       }
869     } else if (PacketsWaiting()) {  /* Look for incoming messages */
870       receivedFinish = processMessages();
871     }
872
873     /* Now we are sure that we have some work available */
874     ASSERT(run_queue_hd != END_TSO_QUEUE);
875
876     /* Take a thread from the run queue, if we have work */
877     POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
878     IF_DEBUG(sanity,checkTSO(t));
879
880     /* ToDo: write something to the log-file
881     if (RTSflags.ParFlags.granSimStats && !sameThread)
882         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
883
884     CurrentTSO = t;
885     */
886     /* the spark pool for the current PE */
887     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
888
889     IF_DEBUG(scheduler, 
890              belch("--=^ %d threads, %d sparks on [%#x]", 
891                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
892
893 # if 1
894     if (0 && RtsFlags.ParFlags.ParStats.Full && 
895         t && LastTSO && t->id != LastTSO->id && 
896         LastTSO->why_blocked == NotBlocked && 
897         LastTSO->what_next != ThreadComplete) {
898       // if previously scheduled TSO not blocked we have to record the context switch
899       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
900                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
901     }
902
903     if (RtsFlags.ParFlags.ParStats.Full && 
904         (emitSchedule /* forced emit */ ||
905         (t && LastTSO && t->id != LastTSO->id))) {
906       /* 
907          we are running a different TSO, so write a schedule event to log file
908          NB: If we use fair scheduling we also have to write  a deschedule 
909              event for LastTSO; with unfair scheduling we know that the
910              previous tso has blocked whenever we switch to another tso, so
911              we don't need it in GUM for now
912       */
913       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
914                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
915       emitSchedule = rtsFalse;
916     }
917      
918 # endif
919 #else /* !GRAN && !PAR */
920   
921     // grab a thread from the run queue
922     ASSERT(run_queue_hd != END_TSO_QUEUE);
923     POP_RUN_QUEUE(t);
924
925     // Sanity check the thread we're about to run.  This can be
926     // expensive if there is lots of thread switching going on...
927     IF_DEBUG(sanity,checkTSO(t));
928 #endif
929
930 #ifdef THREADED_RTS
931     {
932       StgMainThread *m;
933       for(m = main_threads; m; m = m->link)
934       {
935         if(m->tso == t)
936           break;
937       }
938       
939       if(m)
940       {
941         if(m == mainThread)
942         {
943           IF_DEBUG(scheduler,
944             sched_belch("### Running thread %d in bound thread", t->id));
945           // yes, the Haskell thread is bound to the current native thread
946         }
947         else
948         {
949           IF_DEBUG(scheduler,
950             sched_belch("### thread %d bound to another OS thread", t->id));
951           // no, bound to a different Haskell thread: pass to that thread
952           PUSH_ON_RUN_QUEUE(t);
953           passCapability(&m->bound_thread_cond);
954           continue;
955         }
956       }
957       else
958       {
959         if(mainThread != NULL)
960         // The thread we want to run is bound.
961         {
962           IF_DEBUG(scheduler,
963             sched_belch("### this OS thread cannot run thread %d", t->id));
964           // no, the current native thread is bound to a different
965           // Haskell thread, so pass it to any worker thread
966           PUSH_ON_RUN_QUEUE(t);
967           passCapabilityToWorker();
968           continue; 
969         }
970       }
971     }
972 #endif
973
974     cap->r.rCurrentTSO = t;
975     
976     /* context switches are now initiated by the timer signal, unless
977      * the user specified "context switch as often as possible", with
978      * +RTS -C0
979      */
980     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
981          && (run_queue_hd != END_TSO_QUEUE
982              || blocked_queue_hd != END_TSO_QUEUE
983              || sleeping_queue != END_TSO_QUEUE)))
984         context_switch = 1;
985     else
986         context_switch = 0;
987
988 run_thread:
989
990     RELEASE_LOCK(&sched_mutex);
991
992     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
993                               t->id, whatNext_strs[t->what_next]));
994
995 #ifdef PROFILING
996     startHeapProfTimer();
997 #endif
998
999     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1000     /* Run the current thread 
1001      */
1002     prev_what_next = t->what_next;
1003     switch (prev_what_next) {
1004     case ThreadKilled:
1005     case ThreadComplete:
1006         /* Thread already finished, return to scheduler. */
1007         ret = ThreadFinished;
1008         break;
1009     case ThreadRunGHC:
1010         errno = t->saved_errno;
1011         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1012         t->saved_errno = errno;
1013         break;
1014     case ThreadInterpret:
1015         ret = interpretBCO(cap);
1016         break;
1017     default:
1018       barf("schedule: invalid what_next field");
1019     }
1020     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1021     
1022     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1023 #ifdef PROFILING
1024     stopHeapProfTimer();
1025     CCCS = CCS_SYSTEM;
1026 #endif
1027     
1028     ACQUIRE_LOCK(&sched_mutex);
1029     
1030 #ifdef RTS_SUPPORTS_THREADS
1031     IF_DEBUG(scheduler,fprintf(stderr,"sched (task %p): ", osThreadId()););
1032 #elif !defined(GRAN) && !defined(PAR)
1033     IF_DEBUG(scheduler,fprintf(stderr,"sched: "););
1034 #endif
1035     t = cap->r.rCurrentTSO;
1036     
1037 #if defined(PAR)
1038     /* HACK 675: if the last thread didn't yield, make sure to print a 
1039        SCHEDULE event to the log file when StgRunning the next thread, even
1040        if it is the same one as before */
1041     LastTSO = t; 
1042     TimeOfLastYield = CURRENT_TIME;
1043 #endif
1044
1045     switch (ret) {
1046     case HeapOverflow:
1047 #if defined(GRAN)
1048       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1049       globalGranStats.tot_heapover++;
1050 #elif defined(PAR)
1051       globalParStats.tot_heapover++;
1052 #endif
1053
1054       // did the task ask for a large block?
1055       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1056           // if so, get one and push it on the front of the nursery.
1057           bdescr *bd;
1058           nat blocks;
1059           
1060           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1061
1062           IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: requesting a large block (size %d)", 
1063                                    t->id, whatNext_strs[t->what_next], blocks));
1064
1065           // don't do this if it would push us over the
1066           // alloc_blocks_lim limit; we'll GC first.
1067           if (alloc_blocks + blocks < alloc_blocks_lim) {
1068
1069               alloc_blocks += blocks;
1070               bd = allocGroup( blocks );
1071
1072               // link the new group into the list
1073               bd->link = cap->r.rCurrentNursery;
1074               bd->u.back = cap->r.rCurrentNursery->u.back;
1075               if (cap->r.rCurrentNursery->u.back != NULL) {
1076                   cap->r.rCurrentNursery->u.back->link = bd;
1077               } else {
1078                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1079                          g0s0->blocks == cap->r.rNursery);
1080                   cap->r.rNursery = g0s0->blocks = bd;
1081               }           
1082               cap->r.rCurrentNursery->u.back = bd;
1083
1084               // initialise it as a nursery block.  We initialise the
1085               // step, gen_no, and flags field of *every* sub-block in
1086               // this large block, because this is easier than making
1087               // sure that we always find the block head of a large
1088               // block whenever we call Bdescr() (eg. evacuate() and
1089               // isAlive() in the GC would both have to do this, at
1090               // least).
1091               { 
1092                   bdescr *x;
1093                   for (x = bd; x < bd + blocks; x++) {
1094                       x->step = g0s0;
1095                       x->gen_no = 0;
1096                       x->flags = 0;
1097                   }
1098               }
1099
1100               // don't forget to update the block count in g0s0.
1101               g0s0->n_blocks += blocks;
1102               // This assert can be a killer if the app is doing lots
1103               // of large block allocations.
1104               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1105
1106               // now update the nursery to point to the new block
1107               cap->r.rCurrentNursery = bd;
1108
1109               // we might be unlucky and have another thread get on the
1110               // run queue before us and steal the large block, but in that
1111               // case the thread will just end up requesting another large
1112               // block.
1113               PUSH_ON_RUN_QUEUE(t);
1114               break;
1115           }
1116       }
1117
1118       /* make all the running tasks block on a condition variable,
1119        * maybe set context_switch and wait till they all pile in,
1120        * then have them wait on a GC condition variable.
1121        */
1122       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped: HeapOverflow", 
1123                                t->id, whatNext_strs[t->what_next]));
1124       threadPaused(t);
1125 #if defined(GRAN)
1126       ASSERT(!is_on_queue(t,CurrentProc));
1127 #elif defined(PAR)
1128       /* Currently we emit a DESCHEDULE event before GC in GUM.
1129          ToDo: either add separate event to distinguish SYSTEM time from rest
1130                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1131       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1132         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1133                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1134         emitSchedule = rtsTrue;
1135       }
1136 #endif
1137       
1138       ready_to_gc = rtsTrue;
1139       context_switch = 1;               /* stop other threads ASAP */
1140       PUSH_ON_RUN_QUEUE(t);
1141       /* actual GC is done at the end of the while loop */
1142       break;
1143       
1144     case StackOverflow:
1145 #if defined(GRAN)
1146       IF_DEBUG(gran, 
1147                DumpGranEvent(GR_DESCHEDULE, t));
1148       globalGranStats.tot_stackover++;
1149 #elif defined(PAR)
1150       // IF_DEBUG(par, 
1151       // DumpGranEvent(GR_DESCHEDULE, t);
1152       globalParStats.tot_stackover++;
1153 #endif
1154       IF_DEBUG(scheduler,belch("--<< thread %ld (%s) stopped, StackOverflow", 
1155                                t->id, whatNext_strs[t->what_next]));
1156       /* just adjust the stack for this thread, then pop it back
1157        * on the run queue.
1158        */
1159       threadPaused(t);
1160       { 
1161         StgMainThread *m;
1162         /* enlarge the stack */
1163         StgTSO *new_t = threadStackOverflow(t);
1164         
1165         /* This TSO has moved, so update any pointers to it from the
1166          * main thread stack.  It better not be on any other queues...
1167          * (it shouldn't be).
1168          */
1169         for (m = main_threads; m != NULL; m = m->link) {
1170           if (m->tso == t) {
1171             m->tso = new_t;
1172           }
1173         }
1174         threadPaused(new_t);
1175         PUSH_ON_RUN_QUEUE(new_t);
1176       }
1177       break;
1178
1179     case ThreadYielding:
1180 #if defined(GRAN)
1181       IF_DEBUG(gran, 
1182                DumpGranEvent(GR_DESCHEDULE, t));
1183       globalGranStats.tot_yields++;
1184 #elif defined(PAR)
1185       // IF_DEBUG(par, 
1186       // DumpGranEvent(GR_DESCHEDULE, t);
1187       globalParStats.tot_yields++;
1188 #endif
1189       /* put the thread back on the run queue.  Then, if we're ready to
1190        * GC, check whether this is the last task to stop.  If so, wake
1191        * up the GC thread.  getThread will block during a GC until the
1192        * GC is finished.
1193        */
1194       IF_DEBUG(scheduler,
1195                if (t->what_next != prev_what_next) {
1196                    belch("--<< thread %ld (%s) stopped to switch evaluators", 
1197                          t->id, whatNext_strs[t->what_next]);
1198                } else {
1199                    belch("--<< thread %ld (%s) stopped, yielding", 
1200                          t->id, whatNext_strs[t->what_next]);
1201                }
1202                );
1203
1204       IF_DEBUG(sanity,
1205                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1206                checkTSO(t));
1207       ASSERT(t->link == END_TSO_QUEUE);
1208
1209       // Shortcut if we're just switching evaluators: don't bother
1210       // doing stack squeezing (which can be expensive), just run the
1211       // thread.
1212       if (t->what_next != prev_what_next) {
1213           goto run_thread;
1214       }
1215
1216       threadPaused(t);
1217
1218 #if defined(GRAN)
1219       ASSERT(!is_on_queue(t,CurrentProc));
1220
1221       IF_DEBUG(sanity,
1222                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1223                checkThreadQsSanity(rtsTrue));
1224 #endif
1225
1226 #if defined(PAR)
1227       if (RtsFlags.ParFlags.doFairScheduling) { 
1228         /* this does round-robin scheduling; good for concurrency */
1229         APPEND_TO_RUN_QUEUE(t);
1230       } else {
1231         /* this does unfair scheduling; good for parallelism */
1232         PUSH_ON_RUN_QUEUE(t);
1233       }
1234 #else
1235       // this does round-robin scheduling; good for concurrency
1236       APPEND_TO_RUN_QUEUE(t);
1237 #endif
1238
1239 #if defined(GRAN)
1240       /* add a ContinueThread event to actually process the thread */
1241       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1242                 ContinueThread,
1243                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1244       IF_GRAN_DEBUG(bq, 
1245                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1246                G_EVENTQ(0);
1247                G_CURR_THREADQ(0));
1248 #endif /* GRAN */
1249       break;
1250
1251     case ThreadBlocked:
1252 #if defined(GRAN)
1253       IF_DEBUG(scheduler,
1254                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1255                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1256                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1257
1258       // ??? needed; should emit block before
1259       IF_DEBUG(gran, 
1260                DumpGranEvent(GR_DESCHEDULE, t)); 
1261       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1262       /*
1263         ngoq Dogh!
1264       ASSERT(procStatus[CurrentProc]==Busy || 
1265               ((procStatus[CurrentProc]==Fetching) && 
1266               (t->block_info.closure!=(StgClosure*)NULL)));
1267       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1268           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1269             procStatus[CurrentProc]==Fetching)) 
1270         procStatus[CurrentProc] = Idle;
1271       */
1272 #elif defined(PAR)
1273       IF_DEBUG(scheduler,
1274                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1275                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1276       IF_PAR_DEBUG(bq,
1277
1278                    if (t->block_info.closure!=(StgClosure*)NULL) 
1279                      print_bq(t->block_info.closure));
1280
1281       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1282       blockThread(t);
1283
1284       /* whatever we schedule next, we must log that schedule */
1285       emitSchedule = rtsTrue;
1286
1287 #else /* !GRAN */
1288       /* don't need to do anything.  Either the thread is blocked on
1289        * I/O, in which case we'll have called addToBlockedQueue
1290        * previously, or it's blocked on an MVar or Blackhole, in which
1291        * case it'll be on the relevant queue already.
1292        */
1293       IF_DEBUG(scheduler,
1294                fprintf(stderr, "--<< thread %d (%s) stopped: ", 
1295                        t->id, whatNext_strs[t->what_next]);
1296                printThreadBlockage(t);
1297                fprintf(stderr, "\n"));
1298       fflush(stderr);
1299
1300       /* Only for dumping event to log file 
1301          ToDo: do I need this in GranSim, too?
1302       blockThread(t);
1303       */
1304 #endif
1305       threadPaused(t);
1306       break;
1307
1308     case ThreadFinished:
1309       /* Need to check whether this was a main thread, and if so, signal
1310        * the task that started it with the return value.  If we have no
1311        * more main threads, we probably need to stop all the tasks until
1312        * we get a new one.
1313        */
1314       /* We also end up here if the thread kills itself with an
1315        * uncaught exception, see Exception.hc.
1316        */
1317       IF_DEBUG(scheduler,belch("--++ thread %d (%s) finished", 
1318                                t->id, whatNext_strs[t->what_next]));
1319 #if defined(GRAN)
1320       endThread(t, CurrentProc); // clean-up the thread
1321 #elif defined(PAR)
1322       /* For now all are advisory -- HWL */
1323       //if(t->priority==AdvisoryPriority) ??
1324       advisory_thread_count--;
1325       
1326 # ifdef DIST
1327       if(t->dist.priority==RevalPriority)
1328         FinishReval(t);
1329 # endif
1330       
1331       if (RtsFlags.ParFlags.ParStats.Full &&
1332           !RtsFlags.ParFlags.ParStats.Suppressed) 
1333         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1334 #endif
1335       break;
1336       
1337     default:
1338       barf("schedule: invalid thread return code %d", (int)ret);
1339     }
1340
1341 #ifdef PROFILING
1342     // When we have +RTS -i0 and we're heap profiling, do a census at
1343     // every GC.  This lets us get repeatable runs for debugging.
1344     if (performHeapProfile ||
1345         (RtsFlags.ProfFlags.profileInterval==0 &&
1346          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1347         GarbageCollect(GetRoots, rtsTrue);
1348         heapCensus();
1349         performHeapProfile = rtsFalse;
1350         ready_to_gc = rtsFalse; // we already GC'd
1351     }
1352 #endif
1353
1354     if (ready_to_gc) {
1355       /* everybody back, start the GC.
1356        * Could do it in this thread, or signal a condition var
1357        * to do it in another thread.  Either way, we need to
1358        * broadcast on gc_pending_cond afterward.
1359        */
1360 #if defined(RTS_SUPPORTS_THREADS)
1361       IF_DEBUG(scheduler,sched_belch("doing GC"));
1362 #endif
1363       GarbageCollect(GetRoots,rtsFalse);
1364       ready_to_gc = rtsFalse;
1365 #if defined(GRAN)
1366       /* add a ContinueThread event to continue execution of current thread */
1367       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1368                 ContinueThread,
1369                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1370       IF_GRAN_DEBUG(bq, 
1371                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1372                G_EVENTQ(0);
1373                G_CURR_THREADQ(0));
1374 #endif /* GRAN */
1375     }
1376
1377 #if defined(GRAN)
1378   next_thread:
1379     IF_GRAN_DEBUG(unused,
1380                   print_eventq(EventHd));
1381
1382     event = get_next_event();
1383 #elif defined(PAR)
1384   next_thread:
1385     /* ToDo: wait for next message to arrive rather than busy wait */
1386 #endif /* GRAN */
1387
1388   } /* end of while(1) */
1389
1390   IF_PAR_DEBUG(verbose,
1391                belch("== Leaving schedule() after having received Finish"));
1392 }
1393
1394 /* ---------------------------------------------------------------------------
1395  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1396  * used by Control.Concurrent for error checking.
1397  * ------------------------------------------------------------------------- */
1398  
1399 StgBool
1400 rtsSupportsBoundThreads(void)
1401 {
1402 #ifdef THREADED_RTS
1403   return rtsTrue;
1404 #else
1405   return rtsFalse;
1406 #endif
1407 }
1408
1409 /* ---------------------------------------------------------------------------
1410  * isThreadBound(tso): check whether tso is bound to an OS thread.
1411  * ------------------------------------------------------------------------- */
1412  
1413 StgBool
1414 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1415 {
1416 #ifdef THREADED_RTS
1417   StgMainThread *m;
1418   for(m = main_threads; m; m = m->link)
1419   {
1420     if(m->tso == tso)
1421       return rtsTrue;
1422   }
1423 #endif
1424   return rtsFalse;
1425 }
1426
1427 /* ---------------------------------------------------------------------------
1428  * Singleton fork(). Do not copy any running threads.
1429  * ------------------------------------------------------------------------- */
1430
1431 static void 
1432 deleteThreadImmediately(StgTSO *tso);
1433
1434 StgInt
1435 forkProcess(HsStablePtr *entry)
1436 {
1437 #ifndef mingw32_TARGET_OS
1438   pid_t pid;
1439   StgTSO* t,*next;
1440   StgMainThread *m;
1441   SchedulerStatus rc;
1442
1443   IF_DEBUG(scheduler,sched_belch("forking!"));
1444   rts_lock(); // This not only acquires sched_mutex, it also
1445               // makes sure that no other threads are running
1446
1447   pid = fork();
1448
1449   if (pid) { /* parent */
1450
1451   /* just return the pid */
1452     rts_unlock();
1453     return pid;
1454     
1455   } else { /* child */
1456     
1457     
1458       // delete all threads
1459     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1460     
1461     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1462       next = t->link;
1463
1464         // don't allow threads to catch the ThreadKilled exception
1465       deleteThreadImmediately(t);
1466     }
1467     
1468       // wipe the main thread list
1469     while((m = main_threads) != NULL) {
1470       main_threads = m->link;
1471 #ifdef THREADED_RTS
1472       closeCondition(&m->bound_thread_cond);
1473 #endif
1474       stgFree(m);
1475     }
1476     
1477 #ifdef RTS_SUPPORTS_THREADS
1478     resetTaskManagerAfterFork();      // tell startTask() and friends that
1479     startingWorkerThread = rtsFalse;  // we have no worker threads any more
1480     resetWorkerWakeupPipeAfterFork();
1481 #endif
1482     
1483     rc = rts_evalStableIO(entry, NULL);  // run the action
1484     rts_checkSchedStatus("forkProcess",rc);
1485     
1486     rts_unlock();
1487     
1488     hs_exit();                      // clean up and exit
1489     stg_exit(0);
1490   }
1491 #else /* mingw32 */
1492   barf("forkProcess#: primop not implemented for mingw32, sorry!\n");
1493   return -1;
1494 #endif /* mingw32 */
1495 }
1496
1497 /* ---------------------------------------------------------------------------
1498  * deleteAllThreads():  kill all the live threads.
1499  *
1500  * This is used when we catch a user interrupt (^C), before performing
1501  * any necessary cleanups and running finalizers.
1502  *
1503  * Locks: sched_mutex held.
1504  * ------------------------------------------------------------------------- */
1505    
1506 void
1507 deleteAllThreads ( void )
1508 {
1509   StgTSO* t, *next;
1510   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1511   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1512       next = t->global_link;
1513       deleteThread(t);
1514   }      
1515   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1516   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1517   sleeping_queue = END_TSO_QUEUE;
1518 }
1519
1520 /* startThread and  insertThread are now in GranSim.c -- HWL */
1521
1522
1523 /* ---------------------------------------------------------------------------
1524  * Suspending & resuming Haskell threads.
1525  * 
1526  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1527  * its capability before calling the C function.  This allows another
1528  * task to pick up the capability and carry on running Haskell
1529  * threads.  It also means that if the C call blocks, it won't lock
1530  * the whole system.
1531  *
1532  * The Haskell thread making the C call is put to sleep for the
1533  * duration of the call, on the susepended_ccalling_threads queue.  We
1534  * give out a token to the task, which it can use to resume the thread
1535  * on return from the C function.
1536  * ------------------------------------------------------------------------- */
1537    
1538 StgInt
1539 suspendThread( StgRegTable *reg, 
1540                rtsBool concCall
1541 #if !defined(DEBUG)
1542                STG_UNUSED
1543 #endif
1544                )
1545 {
1546   nat tok;
1547   Capability *cap;
1548   int saved_errno = errno;
1549
1550   /* assume that *reg is a pointer to the StgRegTable part
1551    * of a Capability.
1552    */
1553   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
1554
1555   ACQUIRE_LOCK(&sched_mutex);
1556
1557   IF_DEBUG(scheduler,
1558            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1559
1560   // XXX this might not be necessary --SDM
1561   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
1562
1563   threadPaused(cap->r.rCurrentTSO);
1564   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1565   suspended_ccalling_threads = cap->r.rCurrentTSO;
1566
1567 #if defined(RTS_SUPPORTS_THREADS)
1568   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)
1569   {
1570       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
1571       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
1572   }
1573   else
1574   {
1575       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
1576   }
1577 #endif
1578
1579   /* Use the thread ID as the token; it should be unique */
1580   tok = cap->r.rCurrentTSO->id;
1581
1582   /* Hand back capability */
1583   releaseCapability(cap);
1584   
1585 #if defined(RTS_SUPPORTS_THREADS)
1586   /* Preparing to leave the RTS, so ensure there's a native thread/task
1587      waiting to take over.
1588   */
1589   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
1590 #endif
1591
1592   /* Other threads _might_ be available for execution; signal this */
1593   THREAD_RUNNABLE();
1594   RELEASE_LOCK(&sched_mutex);
1595   
1596   errno = saved_errno;
1597   return tok; 
1598 }
1599
1600 StgRegTable *
1601 resumeThread( StgInt tok,
1602               rtsBool concCall STG_UNUSED )
1603 {
1604   StgTSO *tso, **prev;
1605   Capability *cap;
1606   int saved_errno = errno;
1607
1608 #if defined(RTS_SUPPORTS_THREADS)
1609   /* Wait for permission to re-enter the RTS with the result. */
1610   ACQUIRE_LOCK(&sched_mutex);
1611   waitForReturnCapability(&sched_mutex, &cap);
1612
1613   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
1614 #else
1615   grabCapability(&cap);
1616 #endif
1617
1618   /* Remove the thread off of the suspended list */
1619   prev = &suspended_ccalling_threads;
1620   for (tso = suspended_ccalling_threads; 
1621        tso != END_TSO_QUEUE; 
1622        prev = &tso->link, tso = tso->link) {
1623     if (tso->id == (StgThreadID)tok) {
1624       *prev = tso->link;
1625       break;
1626     }
1627   }
1628   if (tso == END_TSO_QUEUE) {
1629     barf("resumeThread: thread not found");
1630   }
1631   tso->link = END_TSO_QUEUE;
1632   
1633 #if defined(RTS_SUPPORTS_THREADS)
1634   if(tso->why_blocked == BlockedOnCCall)
1635   {
1636       awakenBlockedQueueNoLock(tso->blocked_exceptions);
1637       tso->blocked_exceptions = NULL;
1638   }
1639 #endif
1640   
1641   /* Reset blocking status */
1642   tso->why_blocked  = NotBlocked;
1643
1644   cap->r.rCurrentTSO = tso;
1645   RELEASE_LOCK(&sched_mutex);
1646   errno = saved_errno;
1647   return &cap->r;
1648 }
1649
1650
1651 /* ---------------------------------------------------------------------------
1652  * Static functions
1653  * ------------------------------------------------------------------------ */
1654 static void unblockThread(StgTSO *tso);
1655
1656 /* ---------------------------------------------------------------------------
1657  * Comparing Thread ids.
1658  *
1659  * This is used from STG land in the implementation of the
1660  * instances of Eq/Ord for ThreadIds.
1661  * ------------------------------------------------------------------------ */
1662
1663 int
1664 cmp_thread(StgPtr tso1, StgPtr tso2) 
1665
1666   StgThreadID id1 = ((StgTSO *)tso1)->id; 
1667   StgThreadID id2 = ((StgTSO *)tso2)->id;
1668  
1669   if (id1 < id2) return (-1);
1670   if (id1 > id2) return 1;
1671   return 0;
1672 }
1673
1674 /* ---------------------------------------------------------------------------
1675  * Fetching the ThreadID from an StgTSO.
1676  *
1677  * This is used in the implementation of Show for ThreadIds.
1678  * ------------------------------------------------------------------------ */
1679 int
1680 rts_getThreadId(StgPtr tso) 
1681 {
1682   return ((StgTSO *)tso)->id;
1683 }
1684
1685 #ifdef DEBUG
1686 void
1687 labelThread(StgPtr tso, char *label)
1688 {
1689   int len;
1690   void *buf;
1691
1692   /* Caveat: Once set, you can only set the thread name to "" */
1693   len = strlen(label)+1;
1694   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
1695   strncpy(buf,label,len);
1696   /* Update will free the old memory for us */
1697   updateThreadLabel(((StgTSO *)tso)->id,buf);
1698 }
1699 #endif /* DEBUG */
1700
1701 /* ---------------------------------------------------------------------------
1702    Create a new thread.
1703
1704    The new thread starts with the given stack size.  Before the
1705    scheduler can run, however, this thread needs to have a closure
1706    (and possibly some arguments) pushed on its stack.  See
1707    pushClosure() in Schedule.h.
1708
1709    createGenThread() and createIOThread() (in SchedAPI.h) are
1710    convenient packaged versions of this function.
1711
1712    currently pri (priority) is only used in a GRAN setup -- HWL
1713    ------------------------------------------------------------------------ */
1714 #if defined(GRAN)
1715 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1716 StgTSO *
1717 createThread(nat size, StgInt pri)
1718 #else
1719 StgTSO *
1720 createThread(nat size)
1721 #endif
1722 {
1723
1724     StgTSO *tso;
1725     nat stack_size;
1726
1727     /* First check whether we should create a thread at all */
1728 #if defined(PAR)
1729   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1730   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1731     threadsIgnored++;
1732     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1733           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1734     return END_TSO_QUEUE;
1735   }
1736   threadsCreated++;
1737 #endif
1738
1739 #if defined(GRAN)
1740   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1741 #endif
1742
1743   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1744
1745   /* catch ridiculously small stack sizes */
1746   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1747     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1748   }
1749
1750   stack_size = size - TSO_STRUCT_SIZEW;
1751
1752   tso = (StgTSO *)allocate(size);
1753   TICK_ALLOC_TSO(stack_size, 0);
1754
1755   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1756 #if defined(GRAN)
1757   SET_GRAN_HDR(tso, ThisPE);
1758 #endif
1759
1760   // Always start with the compiled code evaluator
1761   tso->what_next = ThreadRunGHC;
1762
1763   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1764    * protect the increment operation on next_thread_id.
1765    * In future, we could use an atomic increment instead.
1766    */
1767   ACQUIRE_LOCK(&thread_id_mutex);
1768   tso->id = next_thread_id++; 
1769   RELEASE_LOCK(&thread_id_mutex);
1770
1771   tso->why_blocked  = NotBlocked;
1772   tso->blocked_exceptions = NULL;
1773
1774   tso->saved_errno = 0;
1775   
1776   tso->stack_size   = stack_size;
1777   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1778                               - TSO_STRUCT_SIZEW;
1779   tso->sp           = (P_)&(tso->stack) + stack_size;
1780
1781 #ifdef PROFILING
1782   tso->prof.CCCS = CCS_MAIN;
1783 #endif
1784
1785   /* put a stop frame on the stack */
1786   tso->sp -= sizeofW(StgStopFrame);
1787   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1788   // ToDo: check this
1789 #if defined(GRAN)
1790   tso->link = END_TSO_QUEUE;
1791   /* uses more flexible routine in GranSim */
1792   insertThread(tso, CurrentProc);
1793 #else
1794   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1795    * from its creation
1796    */
1797 #endif
1798
1799 #if defined(GRAN) 
1800   if (RtsFlags.GranFlags.GranSimStats.Full) 
1801     DumpGranEvent(GR_START,tso);
1802 #elif defined(PAR)
1803   if (RtsFlags.ParFlags.ParStats.Full) 
1804     DumpGranEvent(GR_STARTQ,tso);
1805   /* HACk to avoid SCHEDULE 
1806      LastTSO = tso; */
1807 #endif
1808
1809   /* Link the new thread on the global thread list.
1810    */
1811   tso->global_link = all_threads;
1812   all_threads = tso;
1813
1814 #if defined(DIST)
1815   tso->dist.priority = MandatoryPriority; //by default that is...
1816 #endif
1817
1818 #if defined(GRAN)
1819   tso->gran.pri = pri;
1820 # if defined(DEBUG)
1821   tso->gran.magic = TSO_MAGIC; // debugging only
1822 # endif
1823   tso->gran.sparkname   = 0;
1824   tso->gran.startedat   = CURRENT_TIME; 
1825   tso->gran.exported    = 0;
1826   tso->gran.basicblocks = 0;
1827   tso->gran.allocs      = 0;
1828   tso->gran.exectime    = 0;
1829   tso->gran.fetchtime   = 0;
1830   tso->gran.fetchcount  = 0;
1831   tso->gran.blocktime   = 0;
1832   tso->gran.blockcount  = 0;
1833   tso->gran.blockedat   = 0;
1834   tso->gran.globalsparks = 0;
1835   tso->gran.localsparks  = 0;
1836   if (RtsFlags.GranFlags.Light)
1837     tso->gran.clock  = Now; /* local clock */
1838   else
1839     tso->gran.clock  = 0;
1840
1841   IF_DEBUG(gran,printTSO(tso));
1842 #elif defined(PAR)
1843 # if defined(DEBUG)
1844   tso->par.magic = TSO_MAGIC; // debugging only
1845 # endif
1846   tso->par.sparkname   = 0;
1847   tso->par.startedat   = CURRENT_TIME; 
1848   tso->par.exported    = 0;
1849   tso->par.basicblocks = 0;
1850   tso->par.allocs      = 0;
1851   tso->par.exectime    = 0;
1852   tso->par.fetchtime   = 0;
1853   tso->par.fetchcount  = 0;
1854   tso->par.blocktime   = 0;
1855   tso->par.blockcount  = 0;
1856   tso->par.blockedat   = 0;
1857   tso->par.globalsparks = 0;
1858   tso->par.localsparks  = 0;
1859 #endif
1860
1861 #if defined(GRAN)
1862   globalGranStats.tot_threads_created++;
1863   globalGranStats.threads_created_on_PE[CurrentProc]++;
1864   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1865   globalGranStats.tot_sq_probes++;
1866 #elif defined(PAR)
1867   // collect parallel global statistics (currently done together with GC stats)
1868   if (RtsFlags.ParFlags.ParStats.Global &&
1869       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1870     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1871     globalParStats.tot_threads_created++;
1872   }
1873 #endif 
1874
1875 #if defined(GRAN)
1876   IF_GRAN_DEBUG(pri,
1877                 belch("==__ schedule: Created TSO %d (%p);",
1878                       CurrentProc, tso, tso->id));
1879 #elif defined(PAR)
1880     IF_PAR_DEBUG(verbose,
1881                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1882                        tso->id, tso, advisory_thread_count));
1883 #else
1884   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1885                                  tso->id, tso->stack_size));
1886 #endif    
1887   return tso;
1888 }
1889
1890 #if defined(PAR)
1891 /* RFP:
1892    all parallel thread creation calls should fall through the following routine.
1893 */
1894 StgTSO *
1895 createSparkThread(rtsSpark spark) 
1896 { StgTSO *tso;
1897   ASSERT(spark != (rtsSpark)NULL);
1898   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1899   { threadsIgnored++;
1900     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1901           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1902     return END_TSO_QUEUE;
1903   }
1904   else
1905   { threadsCreated++;
1906     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1907     if (tso==END_TSO_QUEUE)     
1908       barf("createSparkThread: Cannot create TSO");
1909 #if defined(DIST)
1910     tso->priority = AdvisoryPriority;
1911 #endif
1912     pushClosure(tso,spark);
1913     PUSH_ON_RUN_QUEUE(tso);
1914     advisory_thread_count++;    
1915   }
1916   return tso;
1917 }
1918 #endif
1919
1920 /*
1921   Turn a spark into a thread.
1922   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1923 */
1924 #if defined(PAR)
1925 StgTSO *
1926 activateSpark (rtsSpark spark) 
1927 {
1928   StgTSO *tso;
1929
1930   tso = createSparkThread(spark);
1931   if (RtsFlags.ParFlags.ParStats.Full) {   
1932     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1933     IF_PAR_DEBUG(verbose,
1934                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1935                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1936   }
1937   // ToDo: fwd info on local/global spark to thread -- HWL
1938   // tso->gran.exported =  spark->exported;
1939   // tso->gran.locked =   !spark->global;
1940   // tso->gran.sparkname = spark->name;
1941
1942   return tso;
1943 }
1944 #endif
1945
1946 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
1947                                    Capability *initialCapability
1948                                    );
1949
1950
1951 /* ---------------------------------------------------------------------------
1952  * scheduleThread()
1953  *
1954  * scheduleThread puts a thread on the head of the runnable queue.
1955  * This will usually be done immediately after a thread is created.
1956  * The caller of scheduleThread must create the thread using e.g.
1957  * createThread and push an appropriate closure
1958  * on this thread's stack before the scheduler is invoked.
1959  * ------------------------------------------------------------------------ */
1960
1961 static void scheduleThread_ (StgTSO* tso);
1962
1963 void
1964 scheduleThread_(StgTSO *tso)
1965 {
1966   // Precondition: sched_mutex must be held.
1967
1968   /* Put the new thread on the head of the runnable queue.  The caller
1969    * better push an appropriate closure on this thread's stack
1970    * beforehand.  In the SMP case, the thread may start running as
1971    * soon as we release the scheduler lock below.
1972    */
1973   PUSH_ON_RUN_QUEUE(tso);
1974   THREAD_RUNNABLE();
1975
1976 #if 0
1977   IF_DEBUG(scheduler,printTSO(tso));
1978 #endif
1979 }
1980
1981 void scheduleThread(StgTSO* tso)
1982 {
1983   ACQUIRE_LOCK(&sched_mutex);
1984   scheduleThread_(tso);
1985   RELEASE_LOCK(&sched_mutex);
1986 }
1987
1988 SchedulerStatus
1989 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
1990                    Capability *initialCapability)
1991 {
1992     // Precondition: sched_mutex must be held
1993     StgMainThread *m;
1994
1995     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1996     m->tso = tso;
1997     m->ret = ret;
1998     m->stat = NoStatus;
1999 #if defined(RTS_SUPPORTS_THREADS)
2000     initCondition(&m->bound_thread_cond);
2001 #endif
2002
2003     /* Put the thread on the main-threads list prior to scheduling the TSO.
2004        Failure to do so introduces a race condition in the MT case (as
2005        identified by Wolfgang Thaller), whereby the new task/OS thread 
2006        created by scheduleThread_() would complete prior to the thread
2007        that spawned it managed to put 'itself' on the main-threads list.
2008        The upshot of it all being that the worker thread wouldn't get to
2009        signal the completion of the its work item for the main thread to
2010        see (==> it got stuck waiting.)    -- sof 6/02.
2011     */
2012     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2013     
2014     m->link = main_threads;
2015     main_threads = m;
2016     
2017     scheduleThread_(tso);
2018
2019     return waitThread_(m, initialCapability);
2020 }
2021
2022 /* ---------------------------------------------------------------------------
2023  * initScheduler()
2024  *
2025  * Initialise the scheduler.  This resets all the queues - if the
2026  * queues contained any threads, they'll be garbage collected at the
2027  * next pass.
2028  *
2029  * ------------------------------------------------------------------------ */
2030
2031 void 
2032 initScheduler(void)
2033 {
2034 #if defined(GRAN)
2035   nat i;
2036
2037   for (i=0; i<=MAX_PROC; i++) {
2038     run_queue_hds[i]      = END_TSO_QUEUE;
2039     run_queue_tls[i]      = END_TSO_QUEUE;
2040     blocked_queue_hds[i]  = END_TSO_QUEUE;
2041     blocked_queue_tls[i]  = END_TSO_QUEUE;
2042     ccalling_threadss[i]  = END_TSO_QUEUE;
2043     sleeping_queue        = END_TSO_QUEUE;
2044   }
2045 #else
2046   run_queue_hd      = END_TSO_QUEUE;
2047   run_queue_tl      = END_TSO_QUEUE;
2048   blocked_queue_hd  = END_TSO_QUEUE;
2049   blocked_queue_tl  = END_TSO_QUEUE;
2050   sleeping_queue    = END_TSO_QUEUE;
2051 #endif 
2052
2053   suspended_ccalling_threads  = END_TSO_QUEUE;
2054
2055   main_threads = NULL;
2056   all_threads  = END_TSO_QUEUE;
2057
2058   context_switch = 0;
2059   interrupted    = 0;
2060
2061   RtsFlags.ConcFlags.ctxtSwitchTicks =
2062       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2063       
2064 #if defined(RTS_SUPPORTS_THREADS)
2065   /* Initialise the mutex and condition variables used by
2066    * the scheduler. */
2067   initMutex(&sched_mutex);
2068   initMutex(&term_mutex);
2069   initMutex(&thread_id_mutex);
2070
2071   initCondition(&thread_ready_cond);
2072 #endif
2073   
2074 #if defined(RTS_SUPPORTS_THREADS)
2075   ACQUIRE_LOCK(&sched_mutex);
2076 #endif
2077
2078   /* A capability holds the state a native thread needs in
2079    * order to execute STG code. At least one capability is
2080    * floating around (only SMP builds have more than one).
2081    */
2082   initCapabilities();
2083   
2084 #if defined(RTS_SUPPORTS_THREADS)
2085     /* start our haskell execution tasks */
2086     startTaskManager(0,taskStart);
2087 #endif
2088
2089 #if /* defined(SMP) ||*/ defined(PAR)
2090   initSparkPools();
2091 #endif
2092
2093   RELEASE_LOCK(&sched_mutex);
2094
2095 }
2096
2097 void
2098 exitScheduler( void )
2099 {
2100 #if defined(RTS_SUPPORTS_THREADS)
2101   stopTaskManager();
2102 #endif
2103   shutting_down_scheduler = rtsTrue;
2104 }
2105
2106 /* ----------------------------------------------------------------------------
2107    Managing the per-task allocation areas.
2108    
2109    Each capability comes with an allocation area.  These are
2110    fixed-length block lists into which allocation can be done.
2111
2112    ToDo: no support for two-space collection at the moment???
2113    ------------------------------------------------------------------------- */
2114
2115 static
2116 SchedulerStatus
2117 waitThread_(StgMainThread* m, Capability *initialCapability)
2118 {
2119   SchedulerStatus stat;
2120
2121   // Precondition: sched_mutex must be held.
2122   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2123
2124 #if defined(GRAN)
2125   /* GranSim specific init */
2126   CurrentTSO = m->tso;                // the TSO to run
2127   procStatus[MainProc] = Busy;        // status of main PE
2128   CurrentProc = MainProc;             // PE to run it on
2129   schedule(m,initialCapability);
2130 #else
2131   schedule(m,initialCapability);
2132   ASSERT(m->stat != NoStatus);
2133 #endif
2134
2135   stat = m->stat;
2136
2137 #if defined(RTS_SUPPORTS_THREADS)
2138   closeCondition(&m->bound_thread_cond);
2139 #endif
2140
2141   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2142   stgFree(m);
2143
2144   // Postcondition: sched_mutex still held
2145   return stat;
2146 }
2147
2148 /* ---------------------------------------------------------------------------
2149    Where are the roots that we know about?
2150
2151         - all the threads on the runnable queue
2152         - all the threads on the blocked queue
2153         - all the threads on the sleeping queue
2154         - all the thread currently executing a _ccall_GC
2155         - all the "main threads"
2156      
2157    ------------------------------------------------------------------------ */
2158
2159 /* This has to be protected either by the scheduler monitor, or by the
2160         garbage collection monitor (probably the latter).
2161         KH @ 25/10/99
2162 */
2163
2164 void
2165 GetRoots( evac_fn evac )
2166 {
2167 #if defined(GRAN)
2168   {
2169     nat i;
2170     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2171       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2172           evac((StgClosure **)&run_queue_hds[i]);
2173       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2174           evac((StgClosure **)&run_queue_tls[i]);
2175       
2176       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2177           evac((StgClosure **)&blocked_queue_hds[i]);
2178       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2179           evac((StgClosure **)&blocked_queue_tls[i]);
2180       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2181           evac((StgClosure **)&ccalling_threads[i]);
2182     }
2183   }
2184
2185   markEventQueue();
2186
2187 #else /* !GRAN */
2188   if (run_queue_hd != END_TSO_QUEUE) {
2189       ASSERT(run_queue_tl != END_TSO_QUEUE);
2190       evac((StgClosure **)&run_queue_hd);
2191       evac((StgClosure **)&run_queue_tl);
2192   }
2193   
2194   if (blocked_queue_hd != END_TSO_QUEUE) {
2195       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2196       evac((StgClosure **)&blocked_queue_hd);
2197       evac((StgClosure **)&blocked_queue_tl);
2198   }
2199   
2200   if (sleeping_queue != END_TSO_QUEUE) {
2201       evac((StgClosure **)&sleeping_queue);
2202   }
2203 #endif 
2204
2205   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2206       evac((StgClosure **)&suspended_ccalling_threads);
2207   }
2208
2209 #if defined(PAR) || defined(GRAN)
2210   markSparkQueue(evac);
2211 #endif
2212
2213 #if defined(RTS_USER_SIGNALS)
2214   // mark the signal handlers (signals should be already blocked)
2215   markSignalHandlers(evac);
2216 #endif
2217
2218   // main threads which have completed need to be retained until they
2219   // are dealt with in the main scheduler loop.  They won't be
2220   // retained any other way: the GC will drop them from the
2221   // all_threads list, so we have to be careful to treat them as roots
2222   // here.
2223   { 
2224       StgMainThread *m;
2225       for (m = main_threads; m != NULL; m = m->link) {
2226           switch (m->tso->what_next) {
2227           case ThreadComplete:
2228           case ThreadKilled:
2229               evac((StgClosure **)&m->tso);
2230               break;
2231           default:
2232               break;
2233           }
2234       }
2235   }
2236 }
2237
2238 /* -----------------------------------------------------------------------------
2239    performGC
2240
2241    This is the interface to the garbage collector from Haskell land.
2242    We provide this so that external C code can allocate and garbage
2243    collect when called from Haskell via _ccall_GC.
2244
2245    It might be useful to provide an interface whereby the programmer
2246    can specify more roots (ToDo).
2247    
2248    This needs to be protected by the GC condition variable above.  KH.
2249    -------------------------------------------------------------------------- */
2250
2251 static void (*extra_roots)(evac_fn);
2252
2253 void
2254 performGC(void)
2255 {
2256   /* Obligated to hold this lock upon entry */
2257   ACQUIRE_LOCK(&sched_mutex);
2258   GarbageCollect(GetRoots,rtsFalse);
2259   RELEASE_LOCK(&sched_mutex);
2260 }
2261
2262 void
2263 performMajorGC(void)
2264 {
2265   ACQUIRE_LOCK(&sched_mutex);
2266   GarbageCollect(GetRoots,rtsTrue);
2267   RELEASE_LOCK(&sched_mutex);
2268 }
2269
2270 static void
2271 AllRoots(evac_fn evac)
2272 {
2273     GetRoots(evac);             // the scheduler's roots
2274     extra_roots(evac);          // the user's roots
2275 }
2276
2277 void
2278 performGCWithRoots(void (*get_roots)(evac_fn))
2279 {
2280   ACQUIRE_LOCK(&sched_mutex);
2281   extra_roots = get_roots;
2282   GarbageCollect(AllRoots,rtsFalse);
2283   RELEASE_LOCK(&sched_mutex);
2284 }
2285
2286 /* -----------------------------------------------------------------------------
2287    Stack overflow
2288
2289    If the thread has reached its maximum stack size, then raise the
2290    StackOverflow exception in the offending thread.  Otherwise
2291    relocate the TSO into a larger chunk of memory and adjust its stack
2292    size appropriately.
2293    -------------------------------------------------------------------------- */
2294
2295 static StgTSO *
2296 threadStackOverflow(StgTSO *tso)
2297 {
2298   nat new_stack_size, new_tso_size, stack_words;
2299   StgPtr new_sp;
2300   StgTSO *dest;
2301
2302   IF_DEBUG(sanity,checkTSO(tso));
2303   if (tso->stack_size >= tso->max_stack_size) {
2304
2305     IF_DEBUG(gc,
2306              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld)",
2307                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2308              /* If we're debugging, just print out the top of the stack */
2309              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2310                                               tso->sp+64)));
2311
2312     /* Send this thread the StackOverflow exception */
2313     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2314     return tso;
2315   }
2316
2317   /* Try to double the current stack size.  If that takes us over the
2318    * maximum stack size for this thread, then use the maximum instead.
2319    * Finally round up so the TSO ends up as a whole number of blocks.
2320    */
2321   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2322   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2323                                        TSO_STRUCT_SIZE)/sizeof(W_);
2324   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2325   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2326
2327   IF_DEBUG(scheduler, fprintf(stderr,"== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2328
2329   dest = (StgTSO *)allocate(new_tso_size);
2330   TICK_ALLOC_TSO(new_stack_size,0);
2331
2332   /* copy the TSO block and the old stack into the new area */
2333   memcpy(dest,tso,TSO_STRUCT_SIZE);
2334   stack_words = tso->stack + tso->stack_size - tso->sp;
2335   new_sp = (P_)dest + new_tso_size - stack_words;
2336   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2337
2338   /* relocate the stack pointers... */
2339   dest->sp         = new_sp;
2340   dest->stack_size = new_stack_size;
2341         
2342   /* Mark the old TSO as relocated.  We have to check for relocated
2343    * TSOs in the garbage collector and any primops that deal with TSOs.
2344    *
2345    * It's important to set the sp value to just beyond the end
2346    * of the stack, so we don't attempt to scavenge any part of the
2347    * dead TSO's stack.
2348    */
2349   tso->what_next = ThreadRelocated;
2350   tso->link = dest;
2351   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2352   tso->why_blocked = NotBlocked;
2353   dest->mut_link = NULL;
2354
2355   IF_PAR_DEBUG(verbose,
2356                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2357                      tso->id, tso, tso->stack_size);
2358                /* If we're debugging, just print out the top of the stack */
2359                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2360                                                 tso->sp+64)));
2361   
2362   IF_DEBUG(sanity,checkTSO(tso));
2363 #if 0
2364   IF_DEBUG(scheduler,printTSO(dest));
2365 #endif
2366
2367   return dest;
2368 }
2369
2370 /* ---------------------------------------------------------------------------
2371    Wake up a queue that was blocked on some resource.
2372    ------------------------------------------------------------------------ */
2373
2374 #if defined(GRAN)
2375 STATIC_INLINE void
2376 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2377 {
2378 }
2379 #elif defined(PAR)
2380 STATIC_INLINE void
2381 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2382 {
2383   /* write RESUME events to log file and
2384      update blocked and fetch time (depending on type of the orig closure) */
2385   if (RtsFlags.ParFlags.ParStats.Full) {
2386     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2387                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2388                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2389     if (EMPTY_RUN_QUEUE())
2390       emitSchedule = rtsTrue;
2391
2392     switch (get_itbl(node)->type) {
2393         case FETCH_ME_BQ:
2394           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2395           break;
2396         case RBH:
2397         case FETCH_ME:
2398         case BLACKHOLE_BQ:
2399           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2400           break;
2401 #ifdef DIST
2402         case MVAR:
2403           break;
2404 #endif    
2405         default:
2406           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2407         }
2408       }
2409 }
2410 #endif
2411
2412 #if defined(GRAN)
2413 static StgBlockingQueueElement *
2414 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2415 {
2416     StgTSO *tso;
2417     PEs node_loc, tso_loc;
2418
2419     node_loc = where_is(node); // should be lifted out of loop
2420     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2421     tso_loc = where_is((StgClosure *)tso);
2422     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2423       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2424       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2425       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2426       // insertThread(tso, node_loc);
2427       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2428                 ResumeThread,
2429                 tso, node, (rtsSpark*)NULL);
2430       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2431       // len_local++;
2432       // len++;
2433     } else { // TSO is remote (actually should be FMBQ)
2434       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2435                                   RtsFlags.GranFlags.Costs.gunblocktime +
2436                                   RtsFlags.GranFlags.Costs.latency;
2437       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2438                 UnblockThread,
2439                 tso, node, (rtsSpark*)NULL);
2440       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2441       // len++;
2442     }
2443     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2444     IF_GRAN_DEBUG(bq,
2445                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2446                           (node_loc==tso_loc ? "Local" : "Global"), 
2447                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2448     tso->block_info.closure = NULL;
2449     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2450                              tso->id, tso));
2451 }
2452 #elif defined(PAR)
2453 static StgBlockingQueueElement *
2454 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2455 {
2456     StgBlockingQueueElement *next;
2457
2458     switch (get_itbl(bqe)->type) {
2459     case TSO:
2460       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2461       /* if it's a TSO just push it onto the run_queue */
2462       next = bqe->link;
2463       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2464       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2465       THREAD_RUNNABLE();
2466       unblockCount(bqe, node);
2467       /* reset blocking status after dumping event */
2468       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2469       break;
2470
2471     case BLOCKED_FETCH:
2472       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2473       next = bqe->link;
2474       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2475       PendingFetches = (StgBlockedFetch *)bqe;
2476       break;
2477
2478 # if defined(DEBUG)
2479       /* can ignore this case in a non-debugging setup; 
2480          see comments on RBHSave closures above */
2481     case CONSTR:
2482       /* check that the closure is an RBHSave closure */
2483       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2484              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2485              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2486       break;
2487
2488     default:
2489       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2490            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2491            (StgClosure *)bqe);
2492 # endif
2493     }
2494   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2495   return next;
2496 }
2497
2498 #else /* !GRAN && !PAR */
2499 static StgTSO *
2500 unblockOneLocked(StgTSO *tso)
2501 {
2502   StgTSO *next;
2503
2504   ASSERT(get_itbl(tso)->type == TSO);
2505   ASSERT(tso->why_blocked != NotBlocked);
2506   tso->why_blocked = NotBlocked;
2507   next = tso->link;
2508   PUSH_ON_RUN_QUEUE(tso);
2509   THREAD_RUNNABLE();
2510   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2511   return next;
2512 }
2513 #endif
2514
2515 #if defined(GRAN) || defined(PAR)
2516 INLINE_ME StgBlockingQueueElement *
2517 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2518 {
2519   ACQUIRE_LOCK(&sched_mutex);
2520   bqe = unblockOneLocked(bqe, node);
2521   RELEASE_LOCK(&sched_mutex);
2522   return bqe;
2523 }
2524 #else
2525 INLINE_ME StgTSO *
2526 unblockOne(StgTSO *tso)
2527 {
2528   ACQUIRE_LOCK(&sched_mutex);
2529   tso = unblockOneLocked(tso);
2530   RELEASE_LOCK(&sched_mutex);
2531   return tso;
2532 }
2533 #endif
2534
2535 #if defined(GRAN)
2536 void 
2537 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2538 {
2539   StgBlockingQueueElement *bqe;
2540   PEs node_loc;
2541   nat len = 0; 
2542
2543   IF_GRAN_DEBUG(bq, 
2544                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2545                       node, CurrentProc, CurrentTime[CurrentProc], 
2546                       CurrentTSO->id, CurrentTSO));
2547
2548   node_loc = where_is(node);
2549
2550   ASSERT(q == END_BQ_QUEUE ||
2551          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2552          get_itbl(q)->type == CONSTR); // closure (type constructor)
2553   ASSERT(is_unique(node));
2554
2555   /* FAKE FETCH: magically copy the node to the tso's proc;
2556      no Fetch necessary because in reality the node should not have been 
2557      moved to the other PE in the first place
2558   */
2559   if (CurrentProc!=node_loc) {
2560     IF_GRAN_DEBUG(bq, 
2561                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2562                         node, node_loc, CurrentProc, CurrentTSO->id, 
2563                         // CurrentTSO, where_is(CurrentTSO),
2564                         node->header.gran.procs));
2565     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2566     IF_GRAN_DEBUG(bq, 
2567                   belch("## new bitmask of node %p is %#x",
2568                         node, node->header.gran.procs));
2569     if (RtsFlags.GranFlags.GranSimStats.Global) {
2570       globalGranStats.tot_fake_fetches++;
2571     }
2572   }
2573
2574   bqe = q;
2575   // ToDo: check: ASSERT(CurrentProc==node_loc);
2576   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2577     //next = bqe->link;
2578     /* 
2579        bqe points to the current element in the queue
2580        next points to the next element in the queue
2581     */
2582     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2583     //tso_loc = where_is(tso);
2584     len++;
2585     bqe = unblockOneLocked(bqe, node);
2586   }
2587
2588   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2589      the closure to make room for the anchor of the BQ */
2590   if (bqe!=END_BQ_QUEUE) {
2591     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2592     /*
2593     ASSERT((info_ptr==&RBH_Save_0_info) ||
2594            (info_ptr==&RBH_Save_1_info) ||
2595            (info_ptr==&RBH_Save_2_info));
2596     */
2597     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2598     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2599     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2600
2601     IF_GRAN_DEBUG(bq,
2602                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2603                         node, info_type(node)));
2604   }
2605
2606   /* statistics gathering */
2607   if (RtsFlags.GranFlags.GranSimStats.Global) {
2608     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2609     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2610     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2611     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2612   }
2613   IF_GRAN_DEBUG(bq,
2614                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2615                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2616 }
2617 #elif defined(PAR)
2618 void 
2619 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2620 {
2621   StgBlockingQueueElement *bqe;
2622
2623   ACQUIRE_LOCK(&sched_mutex);
2624
2625   IF_PAR_DEBUG(verbose, 
2626                belch("##-_ AwBQ for node %p on [%x]: ",
2627                      node, mytid));
2628 #ifdef DIST  
2629   //RFP
2630   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2631     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2632     return;
2633   }
2634 #endif
2635   
2636   ASSERT(q == END_BQ_QUEUE ||
2637          get_itbl(q)->type == TSO ||           
2638          get_itbl(q)->type == BLOCKED_FETCH || 
2639          get_itbl(q)->type == CONSTR); 
2640
2641   bqe = q;
2642   while (get_itbl(bqe)->type==TSO || 
2643          get_itbl(bqe)->type==BLOCKED_FETCH) {
2644     bqe = unblockOneLocked(bqe, node);
2645   }
2646   RELEASE_LOCK(&sched_mutex);
2647 }
2648
2649 #else   /* !GRAN && !PAR */
2650
2651 #ifdef RTS_SUPPORTS_THREADS
2652 void
2653 awakenBlockedQueueNoLock(StgTSO *tso)
2654 {
2655   while (tso != END_TSO_QUEUE) {
2656     tso = unblockOneLocked(tso);
2657   }
2658 }
2659 #endif
2660
2661 void
2662 awakenBlockedQueue(StgTSO *tso)
2663 {
2664   ACQUIRE_LOCK(&sched_mutex);
2665   while (tso != END_TSO_QUEUE) {
2666     tso = unblockOneLocked(tso);
2667   }
2668   RELEASE_LOCK(&sched_mutex);
2669 }
2670 #endif
2671
2672 /* ---------------------------------------------------------------------------
2673    Interrupt execution
2674    - usually called inside a signal handler so it mustn't do anything fancy.   
2675    ------------------------------------------------------------------------ */
2676
2677 void
2678 interruptStgRts(void)
2679 {
2680     interrupted    = 1;
2681     context_switch = 1;
2682 #ifdef RTS_SUPPORTS_THREADS
2683     wakeBlockedWorkerThread();
2684 #endif
2685 }
2686
2687 /* -----------------------------------------------------------------------------
2688    Unblock a thread
2689
2690    This is for use when we raise an exception in another thread, which
2691    may be blocked.
2692    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2693    -------------------------------------------------------------------------- */
2694
2695 #if defined(GRAN) || defined(PAR)
2696 /*
2697   NB: only the type of the blocking queue is different in GranSim and GUM
2698       the operations on the queue-elements are the same
2699       long live polymorphism!
2700
2701   Locks: sched_mutex is held upon entry and exit.
2702
2703 */
2704 static void
2705 unblockThread(StgTSO *tso)
2706 {
2707   StgBlockingQueueElement *t, **last;
2708
2709   switch (tso->why_blocked) {
2710
2711   case NotBlocked:
2712     return;  /* not blocked */
2713
2714   case BlockedOnMVar:
2715     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2716     {
2717       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2718       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2719
2720       last = (StgBlockingQueueElement **)&mvar->head;
2721       for (t = (StgBlockingQueueElement *)mvar->head; 
2722            t != END_BQ_QUEUE; 
2723            last = &t->link, last_tso = t, t = t->link) {
2724         if (t == (StgBlockingQueueElement *)tso) {
2725           *last = (StgBlockingQueueElement *)tso->link;
2726           if (mvar->tail == tso) {
2727             mvar->tail = (StgTSO *)last_tso;
2728           }
2729           goto done;
2730         }
2731       }
2732       barf("unblockThread (MVAR): TSO not found");
2733     }
2734
2735   case BlockedOnBlackHole:
2736     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2737     {
2738       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2739
2740       last = &bq->blocking_queue;
2741       for (t = bq->blocking_queue; 
2742            t != END_BQ_QUEUE; 
2743            last = &t->link, t = t->link) {
2744         if (t == (StgBlockingQueueElement *)tso) {
2745           *last = (StgBlockingQueueElement *)tso->link;
2746           goto done;
2747         }
2748       }
2749       barf("unblockThread (BLACKHOLE): TSO not found");
2750     }
2751
2752   case BlockedOnException:
2753     {
2754       StgTSO *target  = tso->block_info.tso;
2755
2756       ASSERT(get_itbl(target)->type == TSO);
2757
2758       if (target->what_next == ThreadRelocated) {
2759           target = target->link;
2760           ASSERT(get_itbl(target)->type == TSO);
2761       }
2762
2763       ASSERT(target->blocked_exceptions != NULL);
2764
2765       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2766       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2767            t != END_BQ_QUEUE; 
2768            last = &t->link, t = t->link) {
2769         ASSERT(get_itbl(t)->type == TSO);
2770         if (t == (StgBlockingQueueElement *)tso) {
2771           *last = (StgBlockingQueueElement *)tso->link;
2772           goto done;
2773         }
2774       }
2775       barf("unblockThread (Exception): TSO not found");
2776     }
2777
2778   case BlockedOnRead:
2779   case BlockedOnWrite:
2780 #if defined(mingw32_TARGET_OS)
2781   case BlockedOnDoProc:
2782 #endif
2783     {
2784       /* take TSO off blocked_queue */
2785       StgBlockingQueueElement *prev = NULL;
2786       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2787            prev = t, t = t->link) {
2788         if (t == (StgBlockingQueueElement *)tso) {
2789           if (prev == NULL) {
2790             blocked_queue_hd = (StgTSO *)t->link;
2791             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2792               blocked_queue_tl = END_TSO_QUEUE;
2793             }
2794           } else {
2795             prev->link = t->link;
2796             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2797               blocked_queue_tl = (StgTSO *)prev;
2798             }
2799           }
2800           goto done;
2801         }
2802       }
2803       barf("unblockThread (I/O): TSO not found");
2804     }
2805
2806   case BlockedOnDelay:
2807     {
2808       /* take TSO off sleeping_queue */
2809       StgBlockingQueueElement *prev = NULL;
2810       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2811            prev = t, t = t->link) {
2812         if (t == (StgBlockingQueueElement *)tso) {
2813           if (prev == NULL) {
2814             sleeping_queue = (StgTSO *)t->link;
2815           } else {
2816             prev->link = t->link;
2817           }
2818           goto done;
2819         }
2820       }
2821       barf("unblockThread (delay): TSO not found");
2822     }
2823
2824   default:
2825     barf("unblockThread");
2826   }
2827
2828  done:
2829   tso->link = END_TSO_QUEUE;
2830   tso->why_blocked = NotBlocked;
2831   tso->block_info.closure = NULL;
2832   PUSH_ON_RUN_QUEUE(tso);
2833 }
2834 #else
2835 static void
2836 unblockThread(StgTSO *tso)
2837 {
2838   StgTSO *t, **last;
2839   
2840   /* To avoid locking unnecessarily. */
2841   if (tso->why_blocked == NotBlocked) {
2842     return;
2843   }
2844
2845   switch (tso->why_blocked) {
2846
2847   case BlockedOnMVar:
2848     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2849     {
2850       StgTSO *last_tso = END_TSO_QUEUE;
2851       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2852
2853       last = &mvar->head;
2854       for (t = mvar->head; t != END_TSO_QUEUE; 
2855            last = &t->link, last_tso = t, t = t->link) {
2856         if (t == tso) {
2857           *last = tso->link;
2858           if (mvar->tail == tso) {
2859             mvar->tail = last_tso;
2860           }
2861           goto done;
2862         }
2863       }
2864       barf("unblockThread (MVAR): TSO not found");
2865     }
2866
2867   case BlockedOnBlackHole:
2868     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2869     {
2870       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2871
2872       last = &bq->blocking_queue;
2873       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2874            last = &t->link, t = t->link) {
2875         if (t == tso) {
2876           *last = tso->link;
2877           goto done;
2878         }
2879       }
2880       barf("unblockThread (BLACKHOLE): TSO not found");
2881     }
2882
2883   case BlockedOnException:
2884     {
2885       StgTSO *target  = tso->block_info.tso;
2886
2887       ASSERT(get_itbl(target)->type == TSO);
2888
2889       while (target->what_next == ThreadRelocated) {
2890           target = target->link;
2891           ASSERT(get_itbl(target)->type == TSO);
2892       }
2893       
2894       ASSERT(target->blocked_exceptions != NULL);
2895
2896       last = &target->blocked_exceptions;
2897       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2898            last = &t->link, t = t->link) {
2899         ASSERT(get_itbl(t)->type == TSO);
2900         if (t == tso) {
2901           *last = tso->link;
2902           goto done;
2903         }
2904       }
2905       barf("unblockThread (Exception): TSO not found");
2906     }
2907
2908   case BlockedOnRead:
2909   case BlockedOnWrite:
2910 #if defined(mingw32_TARGET_OS)
2911   case BlockedOnDoProc:
2912 #endif
2913     {
2914       StgTSO *prev = NULL;
2915       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2916            prev = t, t = t->link) {
2917         if (t == tso) {
2918           if (prev == NULL) {
2919             blocked_queue_hd = t->link;
2920             if (blocked_queue_tl == t) {
2921               blocked_queue_tl = END_TSO_QUEUE;
2922             }
2923           } else {
2924             prev->link = t->link;
2925             if (blocked_queue_tl == t) {
2926               blocked_queue_tl = prev;
2927             }
2928           }
2929           goto done;
2930         }
2931       }
2932       barf("unblockThread (I/O): TSO not found");
2933     }
2934
2935   case BlockedOnDelay:
2936     {
2937       StgTSO *prev = NULL;
2938       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2939            prev = t, t = t->link) {
2940         if (t == tso) {
2941           if (prev == NULL) {
2942             sleeping_queue = t->link;
2943           } else {
2944             prev->link = t->link;
2945           }
2946           goto done;
2947         }
2948       }
2949       barf("unblockThread (delay): TSO not found");
2950     }
2951
2952   default:
2953     barf("unblockThread");
2954   }
2955
2956  done:
2957   tso->link = END_TSO_QUEUE;
2958   tso->why_blocked = NotBlocked;
2959   tso->block_info.closure = NULL;
2960   PUSH_ON_RUN_QUEUE(tso);
2961 }
2962 #endif
2963
2964 /* -----------------------------------------------------------------------------
2965  * raiseAsync()
2966  *
2967  * The following function implements the magic for raising an
2968  * asynchronous exception in an existing thread.
2969  *
2970  * We first remove the thread from any queue on which it might be
2971  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2972  *
2973  * We strip the stack down to the innermost CATCH_FRAME, building
2974  * thunks in the heap for all the active computations, so they can 
2975  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2976  * an application of the handler to the exception, and push it on
2977  * the top of the stack.
2978  * 
2979  * How exactly do we save all the active computations?  We create an
2980  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
2981  * AP_STACKs pushes everything from the corresponding update frame
2982  * upwards onto the stack.  (Actually, it pushes everything up to the
2983  * next update frame plus a pointer to the next AP_STACK object.
2984  * Entering the next AP_STACK object pushes more onto the stack until we
2985  * reach the last AP_STACK object - at which point the stack should look
2986  * exactly as it did when we killed the TSO and we can continue
2987  * execution by entering the closure on top of the stack.
2988  *
2989  * We can also kill a thread entirely - this happens if either (a) the 
2990  * exception passed to raiseAsync is NULL, or (b) there's no
2991  * CATCH_FRAME on the stack.  In either case, we strip the entire
2992  * stack and replace the thread with a zombie.
2993  *
2994  * Locks: sched_mutex held upon entry nor exit.
2995  *
2996  * -------------------------------------------------------------------------- */
2997  
2998 void 
2999 deleteThread(StgTSO *tso)
3000 {
3001   raiseAsync(tso,NULL);
3002 }
3003
3004 static void 
3005 deleteThreadImmediately(StgTSO *tso)
3006 { // for forkProcess only:
3007   // delete thread without giving it a chance to catch the KillThread exception
3008
3009   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3010       return;
3011   }
3012 #if defined(RTS_SUPPORTS_THREADS)
3013   if (tso->why_blocked != BlockedOnCCall
3014       && tso->why_blocked != BlockedOnCCall_NoUnblockExc)
3015 #endif
3016     unblockThread(tso);
3017   tso->what_next = ThreadKilled;
3018 }
3019
3020 void
3021 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3022 {
3023   /* When raising async exs from contexts where sched_mutex isn't held;
3024      use raiseAsyncWithLock(). */
3025   ACQUIRE_LOCK(&sched_mutex);
3026   raiseAsync(tso,exception);
3027   RELEASE_LOCK(&sched_mutex);
3028 }
3029
3030 void
3031 raiseAsync(StgTSO *tso, StgClosure *exception)
3032 {
3033     StgRetInfoTable *info;
3034     StgPtr sp;
3035   
3036     // Thread already dead?
3037     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3038         return;
3039     }
3040
3041     IF_DEBUG(scheduler, 
3042              sched_belch("raising exception in thread %ld.", tso->id));
3043     
3044     // Remove it from any blocking queues
3045     unblockThread(tso);
3046
3047     sp = tso->sp;
3048     
3049     // The stack freezing code assumes there's a closure pointer on
3050     // the top of the stack, so we have to arrange that this is the case...
3051     //
3052     if (sp[0] == (W_)&stg_enter_info) {
3053         sp++;
3054     } else {
3055         sp--;
3056         sp[0] = (W_)&stg_dummy_ret_closure;
3057     }
3058
3059     while (1) {
3060         nat i;
3061
3062         // 1. Let the top of the stack be the "current closure"
3063         //
3064         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3065         // CATCH_FRAME.
3066         //
3067         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3068         // current closure applied to the chunk of stack up to (but not
3069         // including) the update frame.  This closure becomes the "current
3070         // closure".  Go back to step 2.
3071         //
3072         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3073         // top of the stack applied to the exception.
3074         // 
3075         // 5. If it's a STOP_FRAME, then kill the thread.
3076         
3077         StgPtr frame;
3078         
3079         frame = sp + 1;
3080         info = get_ret_itbl((StgClosure *)frame);
3081         
3082         while (info->i.type != UPDATE_FRAME
3083                && (info->i.type != CATCH_FRAME || exception == NULL)
3084                && info->i.type != STOP_FRAME) {
3085             frame += stack_frame_sizeW((StgClosure *)frame);
3086             info = get_ret_itbl((StgClosure *)frame);
3087         }
3088         
3089         switch (info->i.type) {
3090             
3091         case CATCH_FRAME:
3092             // If we find a CATCH_FRAME, and we've got an exception to raise,
3093             // then build the THUNK raise(exception), and leave it on
3094             // top of the CATCH_FRAME ready to enter.
3095             //
3096         {
3097 #ifdef PROFILING
3098             StgCatchFrame *cf = (StgCatchFrame *)frame;
3099 #endif
3100             StgClosure *raise;
3101             
3102             // we've got an exception to raise, so let's pass it to the
3103             // handler in this frame.
3104             //
3105             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3106             TICK_ALLOC_SE_THK(1,0);
3107             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3108             raise->payload[0] = exception;
3109             
3110             // throw away the stack from Sp up to the CATCH_FRAME.
3111             //
3112             sp = frame - 1;
3113             
3114             /* Ensure that async excpetions are blocked now, so we don't get
3115              * a surprise exception before we get around to executing the
3116              * handler.
3117              */
3118             if (tso->blocked_exceptions == NULL) {
3119                 tso->blocked_exceptions = END_TSO_QUEUE;
3120             }
3121             
3122             /* Put the newly-built THUNK on top of the stack, ready to execute
3123              * when the thread restarts.
3124              */
3125             sp[0] = (W_)raise;
3126             sp[-1] = (W_)&stg_enter_info;
3127             tso->sp = sp-1;
3128             tso->what_next = ThreadRunGHC;
3129             IF_DEBUG(sanity, checkTSO(tso));
3130             return;
3131         }
3132         
3133         case UPDATE_FRAME:
3134         {
3135             StgAP_STACK * ap;
3136             nat words;
3137             
3138             // First build an AP_STACK consisting of the stack chunk above the
3139             // current update frame, with the top word on the stack as the
3140             // fun field.
3141             //
3142             words = frame - sp - 1;
3143             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3144             
3145             ap->size = words;
3146             ap->fun  = (StgClosure *)sp[0];
3147             sp++;
3148             for(i=0; i < (nat)words; ++i) {
3149                 ap->payload[i] = (StgClosure *)*sp++;
3150             }
3151             
3152             SET_HDR(ap,&stg_AP_STACK_info,
3153                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3154             TICK_ALLOC_UP_THK(words+1,0);
3155             
3156             IF_DEBUG(scheduler,
3157                      fprintf(stderr,  "sched: Updating ");
3158                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3159                      fprintf(stderr,  " with ");
3160                      printObj((StgClosure *)ap);
3161                 );
3162
3163             // Replace the updatee with an indirection - happily
3164             // this will also wake up any threads currently
3165             // waiting on the result.
3166             //
3167             // Warning: if we're in a loop, more than one update frame on
3168             // the stack may point to the same object.  Be careful not to
3169             // overwrite an IND_OLDGEN in this case, because we'll screw
3170             // up the mutable lists.  To be on the safe side, don't
3171             // overwrite any kind of indirection at all.  See also
3172             // threadSqueezeStack in GC.c, where we have to make a similar
3173             // check.
3174             //
3175             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3176                 // revert the black hole
3177                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,ap);
3178             }
3179             sp += sizeofW(StgUpdateFrame) - 1;
3180             sp[0] = (W_)ap; // push onto stack
3181             break;
3182         }
3183         
3184         case STOP_FRAME:
3185             // We've stripped the entire stack, the thread is now dead.
3186             sp += sizeofW(StgStopFrame);
3187             tso->what_next = ThreadKilled;
3188             tso->sp = sp;
3189             return;
3190             
3191         default:
3192             barf("raiseAsync");
3193         }
3194     }
3195     barf("raiseAsync");
3196 }
3197
3198 /* -----------------------------------------------------------------------------
3199    resurrectThreads is called after garbage collection on the list of
3200    threads found to be garbage.  Each of these threads will be woken
3201    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3202    on an MVar, or NonTermination if the thread was blocked on a Black
3203    Hole.
3204
3205    Locks: sched_mutex isn't held upon entry nor exit.
3206    -------------------------------------------------------------------------- */
3207
3208 void
3209 resurrectThreads( StgTSO *threads )
3210 {
3211   StgTSO *tso, *next;
3212
3213   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3214     next = tso->global_link;
3215     tso->global_link = all_threads;
3216     all_threads = tso;
3217     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3218
3219     switch (tso->why_blocked) {
3220     case BlockedOnMVar:
3221     case BlockedOnException:
3222       /* Called by GC - sched_mutex lock is currently held. */
3223       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3224       break;
3225     case BlockedOnBlackHole:
3226       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3227       break;
3228     case NotBlocked:
3229       /* This might happen if the thread was blocked on a black hole
3230        * belonging to a thread that we've just woken up (raiseAsync
3231        * can wake up threads, remember...).
3232        */
3233       continue;
3234     default:
3235       barf("resurrectThreads: thread blocked in a strange way");
3236     }
3237   }
3238 }
3239
3240 /* -----------------------------------------------------------------------------
3241  * Blackhole detection: if we reach a deadlock, test whether any
3242  * threads are blocked on themselves.  Any threads which are found to
3243  * be self-blocked get sent a NonTermination exception.
3244  *
3245  * This is only done in a deadlock situation in order to avoid
3246  * performance overhead in the normal case.
3247  *
3248  * Locks: sched_mutex is held upon entry and exit.
3249  * -------------------------------------------------------------------------- */
3250
3251 static void
3252 detectBlackHoles( void )
3253 {
3254     StgTSO *tso = all_threads;
3255     StgClosure *frame;
3256     StgClosure *blocked_on;
3257     StgRetInfoTable *info;
3258
3259     for (tso = all_threads; tso != END_TSO_QUEUE; tso = tso->global_link) {
3260
3261         while (tso->what_next == ThreadRelocated) {
3262             tso = tso->link;
3263             ASSERT(get_itbl(tso)->type == TSO);
3264         }
3265       
3266         if (tso->why_blocked != BlockedOnBlackHole) {
3267             continue;
3268         }
3269         blocked_on = tso->block_info.closure;
3270
3271         frame = (StgClosure *)tso->sp;
3272
3273         while(1) {
3274             info = get_ret_itbl(frame);
3275             switch (info->i.type) {
3276             case UPDATE_FRAME:
3277                 if (((StgUpdateFrame *)frame)->updatee == blocked_on) {
3278                     /* We are blocking on one of our own computations, so
3279                      * send this thread the NonTermination exception.  
3280                      */
3281                     IF_DEBUG(scheduler, 
3282                              sched_belch("thread %d is blocked on itself", tso->id));
3283                     raiseAsync(tso, (StgClosure *)NonTermination_closure);
3284                     goto done;
3285                 }
3286                 
3287                 frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
3288                 continue;
3289
3290             case STOP_FRAME:
3291                 goto done;
3292
3293                 // normal stack frames; do nothing except advance the pointer
3294             default:
3295                 (StgPtr)frame += stack_frame_sizeW(frame);
3296             }
3297         }   
3298         done: ;
3299     }
3300 }
3301
3302 /* ----------------------------------------------------------------------------
3303  * Debugging: why is a thread blocked
3304  * [Also provides useful information when debugging threaded programs
3305  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
3306    ------------------------------------------------------------------------- */
3307
3308 static
3309 void
3310 printThreadBlockage(StgTSO *tso)
3311 {
3312   switch (tso->why_blocked) {
3313   case BlockedOnRead:
3314     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3315     break;
3316   case BlockedOnWrite:
3317     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3318     break;
3319 #if defined(mingw32_TARGET_OS)
3320     case BlockedOnDoProc:
3321     fprintf(stderr,"is blocked on proc (request: %d)", tso->block_info.async_result->reqID);
3322     break;
3323 #endif
3324   case BlockedOnDelay:
3325     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3326     break;
3327   case BlockedOnMVar:
3328     fprintf(stderr,"is blocked on an MVar");
3329     break;
3330   case BlockedOnException:
3331     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3332             tso->block_info.tso->id);
3333     break;
3334   case BlockedOnBlackHole:
3335     fprintf(stderr,"is blocked on a black hole");
3336     break;
3337   case NotBlocked:
3338     fprintf(stderr,"is not blocked");
3339     break;
3340 #if defined(PAR)
3341   case BlockedOnGA:
3342     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3343             tso->block_info.closure, info_type(tso->block_info.closure));
3344     break;
3345   case BlockedOnGA_NoSend:
3346     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3347             tso->block_info.closure, info_type(tso->block_info.closure));
3348     break;
3349 #endif
3350 #if defined(RTS_SUPPORTS_THREADS)
3351   case BlockedOnCCall:
3352     fprintf(stderr,"is blocked on an external call");
3353     break;
3354   case BlockedOnCCall_NoUnblockExc:
3355     fprintf(stderr,"is blocked on an external call (exceptions were already blocked)");
3356     break;
3357 #endif
3358   default:
3359     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3360          tso->why_blocked, tso->id, tso);
3361   }
3362 }
3363
3364 static
3365 void
3366 printThreadStatus(StgTSO *tso)
3367 {
3368   switch (tso->what_next) {
3369   case ThreadKilled:
3370     fprintf(stderr,"has been killed");
3371     break;
3372   case ThreadComplete:
3373     fprintf(stderr,"has completed");
3374     break;
3375   default:
3376     printThreadBlockage(tso);
3377   }
3378 }
3379
3380 void
3381 printAllThreads(void)
3382 {
3383   StgTSO *t;
3384   void *label;
3385
3386 # if defined(GRAN)
3387   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3388   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3389                        time_string, rtsFalse/*no commas!*/);
3390
3391   fprintf(stderr, "all threads at [%s]:\n", time_string);
3392 # elif defined(PAR)
3393   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3394   ullong_format_string(CURRENT_TIME,
3395                        time_string, rtsFalse/*no commas!*/);
3396
3397   fprintf(stderr,"all threads at [%s]:\n", time_string);
3398 # else
3399   fprintf(stderr,"all threads:\n");
3400 # endif
3401
3402   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3403     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3404     label = lookupThreadLabel(t->id);
3405     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3406     printThreadStatus(t);
3407     fprintf(stderr,"\n");
3408   }
3409 }
3410     
3411 #ifdef DEBUG
3412
3413 /* 
3414    Print a whole blocking queue attached to node (debugging only).
3415 */
3416 # if defined(PAR)
3417 void 
3418 print_bq (StgClosure *node)
3419 {
3420   StgBlockingQueueElement *bqe;
3421   StgTSO *tso;
3422   rtsBool end;
3423
3424   fprintf(stderr,"## BQ of closure %p (%s): ",
3425           node, info_type(node));
3426
3427   /* should cover all closures that may have a blocking queue */
3428   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3429          get_itbl(node)->type == FETCH_ME_BQ ||
3430          get_itbl(node)->type == RBH ||
3431          get_itbl(node)->type == MVAR);
3432     
3433   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3434
3435   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3436 }
3437
3438 /* 
3439    Print a whole blocking queue starting with the element bqe.
3440 */
3441 void 
3442 print_bqe (StgBlockingQueueElement *bqe)
3443 {
3444   rtsBool end;
3445
3446   /* 
3447      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3448   */
3449   for (end = (bqe==END_BQ_QUEUE);
3450        !end; // iterate until bqe points to a CONSTR
3451        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3452        bqe = end ? END_BQ_QUEUE : bqe->link) {
3453     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3454     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3455     /* types of closures that may appear in a blocking queue */
3456     ASSERT(get_itbl(bqe)->type == TSO ||           
3457            get_itbl(bqe)->type == BLOCKED_FETCH || 
3458            get_itbl(bqe)->type == CONSTR); 
3459     /* only BQs of an RBH end with an RBH_Save closure */
3460     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3461
3462     switch (get_itbl(bqe)->type) {
3463     case TSO:
3464       fprintf(stderr," TSO %u (%x),",
3465               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3466       break;
3467     case BLOCKED_FETCH:
3468       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3469               ((StgBlockedFetch *)bqe)->node, 
3470               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3471               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3472               ((StgBlockedFetch *)bqe)->ga.weight);
3473       break;
3474     case CONSTR:
3475       fprintf(stderr," %s (IP %p),",
3476               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3477                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3478                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3479                "RBH_Save_?"), get_itbl(bqe));
3480       break;
3481     default:
3482       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3483            info_type((StgClosure *)bqe)); // , node, info_type(node));
3484       break;
3485     }
3486   } /* for */
3487   fputc('\n', stderr);
3488 }
3489 # elif defined(GRAN)
3490 void 
3491 print_bq (StgClosure *node)
3492 {
3493   StgBlockingQueueElement *bqe;
3494   PEs node_loc, tso_loc;
3495   rtsBool end;
3496
3497   /* should cover all closures that may have a blocking queue */
3498   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3499          get_itbl(node)->type == FETCH_ME_BQ ||
3500          get_itbl(node)->type == RBH);
3501     
3502   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3503   node_loc = where_is(node);
3504
3505   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3506           node, info_type(node), node_loc);
3507
3508   /* 
3509      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3510   */
3511   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3512        !end; // iterate until bqe points to a CONSTR
3513        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3514     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3515     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3516     /* types of closures that may appear in a blocking queue */
3517     ASSERT(get_itbl(bqe)->type == TSO ||           
3518            get_itbl(bqe)->type == CONSTR); 
3519     /* only BQs of an RBH end with an RBH_Save closure */
3520     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3521
3522     tso_loc = where_is((StgClosure *)bqe);
3523     switch (get_itbl(bqe)->type) {
3524     case TSO:
3525       fprintf(stderr," TSO %d (%p) on [PE %d],",
3526               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3527       break;
3528     case CONSTR:
3529       fprintf(stderr," %s (IP %p),",
3530               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3531                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3532                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3533                "RBH_Save_?"), get_itbl(bqe));
3534       break;
3535     default:
3536       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3537            info_type((StgClosure *)bqe), node, info_type(node));
3538       break;
3539     }
3540   } /* for */
3541   fputc('\n', stderr);
3542 }
3543 #else
3544 /* 
3545    Nice and easy: only TSOs on the blocking queue
3546 */
3547 void 
3548 print_bq (StgClosure *node)
3549 {
3550   StgTSO *tso;
3551
3552   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3553   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3554        tso != END_TSO_QUEUE; 
3555        tso=tso->link) {
3556     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3557     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3558     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3559   }
3560   fputc('\n', stderr);
3561 }
3562 # endif
3563
3564 #if defined(PAR)
3565 static nat
3566 run_queue_len(void)
3567 {
3568   nat i;
3569   StgTSO *tso;
3570
3571   for (i=0, tso=run_queue_hd; 
3572        tso != END_TSO_QUEUE;
3573        i++, tso=tso->link)
3574     /* nothing */
3575
3576   return i;
3577 }
3578 #endif
3579
3580 void
3581 sched_belch(char *s, ...)
3582 {
3583   va_list ap;
3584   va_start(ap,s);
3585 #ifdef RTS_SUPPORTS_THREADS
3586   fprintf(stderr, "sched (task %p): ", osThreadId());
3587 #elif defined(PAR)
3588   fprintf(stderr, "== ");
3589 #else
3590   fprintf(stderr, "sched: ");
3591 #endif
3592   vfprintf(stderr, s, ap);
3593   fprintf(stderr, "\n");
3594   fflush(stderr);
3595   va_end(ap);
3596 }
3597
3598 #endif /* DEBUG */