[project @ 2002-07-10 09:28:54 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.147 2002/07/10 09:28:56 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #include "ThreadLabels.h"
100 #ifdef PROFILING
101 #include "Proftimer.h"
102 #include "ProfHeap.h"
103 #endif
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
111 # include "HLC.h"
112 #endif
113 #include "Sparks.h"
114 #include "Capability.h"
115 #include "OSThreads.h"
116 #include  "Task.h"
117
118 #ifdef HAVE_SYS_TYPES_H
119 #include <sys/types.h>
120 #endif
121 #ifdef HAVE_UNISTD_H
122 #include <unistd.h>
123 #endif
124
125 #include <stdarg.h>
126
127 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
128 //@subsection Variables and Data structures
129
130 /* Main thread queue.
131  * Locks required: sched_mutex.
132  */
133 StgMainThread *main_threads;
134
135 /* Thread queues.
136  * Locks required: sched_mutex.
137  */
138 #if defined(GRAN)
139
140 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
141 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
142
143 /* 
144    In GranSim we have a runnable and a blocked queue for each processor.
145    In order to minimise code changes new arrays run_queue_hds/tls
146    are created. run_queue_hd is then a short cut (macro) for
147    run_queue_hds[CurrentProc] (see GranSim.h).
148    -- HWL
149 */
150 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
151 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
152 StgTSO *ccalling_threadss[MAX_PROC];
153 /* We use the same global list of threads (all_threads) in GranSim as in
154    the std RTS (i.e. we are cheating). However, we don't use this list in
155    the GranSim specific code at the moment (so we are only potentially
156    cheating).  */
157
158 #else /* !GRAN */
159
160 StgTSO *run_queue_hd, *run_queue_tl;
161 StgTSO *blocked_queue_hd, *blocked_queue_tl;
162 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
163
164 #endif
165
166 /* Linked list of all threads.
167  * Used for detecting garbage collected threads.
168  */
169 StgTSO *all_threads;
170
171 /* When a thread performs a safe C call (_ccall_GC, using old
172  * terminology), it gets put on the suspended_ccalling_threads
173  * list. Used by the garbage collector.
174  */
175 static StgTSO *suspended_ccalling_threads;
176
177 static StgTSO *threadStackOverflow(StgTSO *tso);
178
179 /* KH: The following two flags are shared memory locations.  There is no need
180        to lock them, since they are only unset at the end of a scheduler
181        operation.
182 */
183
184 /* flag set by signal handler to precipitate a context switch */
185 //@cindex context_switch
186 nat context_switch;
187
188 /* if this flag is set as well, give up execution */
189 //@cindex interrupted
190 rtsBool interrupted;
191
192 /* Next thread ID to allocate.
193  * Locks required: thread_id_mutex
194  */
195 //@cindex next_thread_id
196 StgThreadID next_thread_id = 1;
197
198 /*
199  * Pointers to the state of the current thread.
200  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
201  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
202  */
203  
204 /* The smallest stack size that makes any sense is:
205  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
206  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
207  *  + 1                       (the realworld token for an IO thread)
208  *  + 1                       (the closure to enter)
209  *
210  * A thread with this stack will bomb immediately with a stack
211  * overflow, which will increase its stack size.  
212  */
213
214 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
215
216
217 #if defined(GRAN)
218 StgTSO *CurrentTSO;
219 #endif
220
221 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
222  *  exists - earlier gccs apparently didn't.
223  *  -= chak
224  */
225 StgTSO dummy_tso;
226
227 rtsBool ready_to_gc;
228
229 /*
230  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
231  * in an MT setting, needed to signal that a worker thread shouldn't hang around
232  * in the scheduler when it is out of work.
233  */
234 static rtsBool shutting_down_scheduler = rtsFalse;
235
236 void            addToBlockedQueue ( StgTSO *tso );
237
238 static void     schedule          ( void );
239        void     interruptStgRts   ( void );
240
241 static void     detectBlackHoles  ( void );
242
243 #ifdef DEBUG
244 static void sched_belch(char *s, ...);
245 #endif
246
247 #if defined(RTS_SUPPORTS_THREADS)
248 /* ToDo: carefully document the invariants that go together
249  *       with these synchronisation objects.
250  */
251 Mutex     sched_mutex       = INIT_MUTEX_VAR;
252 Mutex     term_mutex        = INIT_MUTEX_VAR;
253
254 /*
255  * A heavyweight solution to the problem of protecting
256  * the thread_id from concurrent update.
257  */
258 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
259
260
261 # if defined(SMP)
262 static Condition gc_pending_cond = INIT_COND_VAR;
263 nat await_death;
264 # endif
265
266 #endif /* RTS_SUPPORTS_THREADS */
267
268 #if defined(PAR)
269 StgTSO *LastTSO;
270 rtsTime TimeOfLastYield;
271 rtsBool emitSchedule = rtsTrue;
272 #endif
273
274 #if DEBUG
275 char *whatNext_strs[] = {
276   "ThreadEnterGHC",
277   "ThreadRunGHC",
278   "ThreadEnterInterp",
279   "ThreadKilled",
280   "ThreadComplete"
281 };
282
283 char *threadReturnCode_strs[] = {
284   "HeapOverflow",                       /* might also be StackOverflow */
285   "StackOverflow",
286   "ThreadYielding",
287   "ThreadBlocked",
288   "ThreadFinished"
289 };
290 #endif
291
292 #if defined(PAR)
293 StgTSO * createSparkThread(rtsSpark spark);
294 StgTSO * activateSpark (rtsSpark spark);  
295 #endif
296
297 /*
298  * The thread state for the main thread.
299 // ToDo: check whether not needed any more
300 StgTSO   *MainTSO;
301  */
302
303 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
304 static void taskStart(void);
305 static void
306 taskStart(void)
307 {
308   schedule();
309 }
310 #endif
311
312
313
314
315 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
316 //@subsection Main scheduling loop
317
318 /* ---------------------------------------------------------------------------
319    Main scheduling loop.
320
321    We use round-robin scheduling, each thread returning to the
322    scheduler loop when one of these conditions is detected:
323
324       * out of heap space
325       * timer expires (thread yields)
326       * thread blocks
327       * thread ends
328       * stack overflow
329
330    Locking notes:  we acquire the scheduler lock once at the beginning
331    of the scheduler loop, and release it when
332     
333       * running a thread, or
334       * waiting for work, or
335       * waiting for a GC to complete.
336
337    GRAN version:
338      In a GranSim setup this loop iterates over the global event queue.
339      This revolves around the global event queue, which determines what 
340      to do next. Therefore, it's more complicated than either the 
341      concurrent or the parallel (GUM) setup.
342
343    GUM version:
344      GUM iterates over incoming messages.
345      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
346      and sends out a fish whenever it has nothing to do; in-between
347      doing the actual reductions (shared code below) it processes the
348      incoming messages and deals with delayed operations 
349      (see PendingFetches).
350      This is not the ugliest code you could imagine, but it's bloody close.
351
352    ------------------------------------------------------------------------ */
353 //@cindex schedule
354 static void
355 schedule( void )
356 {
357   StgTSO *t;
358   Capability *cap;
359   StgThreadReturnCode ret;
360 #if defined(GRAN)
361   rtsEvent *event;
362 #elif defined(PAR)
363   StgSparkPool *pool;
364   rtsSpark spark;
365   StgTSO *tso;
366   GlobalTaskId pe;
367   rtsBool receivedFinish = rtsFalse;
368 # if defined(DEBUG)
369   nat tp_size, sp_size; // stats only
370 # endif
371 #endif
372   rtsBool was_interrupted = rtsFalse;
373   
374   ACQUIRE_LOCK(&sched_mutex);
375  
376 #if defined(RTS_SUPPORTS_THREADS)
377   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
378 #else
379   /* simply initialise it in the non-threaded case */
380   grabCapability(&cap);
381 #endif
382
383 #if defined(GRAN)
384   /* set up first event to get things going */
385   /* ToDo: assign costs for system setup and init MainTSO ! */
386   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
387             ContinueThread, 
388             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
389
390   IF_DEBUG(gran,
391            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
392            G_TSO(CurrentTSO, 5));
393
394   if (RtsFlags.GranFlags.Light) {
395     /* Save current time; GranSim Light only */
396     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
397   }      
398
399   event = get_next_event();
400
401   while (event!=(rtsEvent*)NULL) {
402     /* Choose the processor with the next event */
403     CurrentProc = event->proc;
404     CurrentTSO = event->tso;
405
406 #elif defined(PAR)
407
408   while (!receivedFinish) {    /* set by processMessages */
409                                /* when receiving PP_FINISH message         */ 
410 #else
411
412   while (1) {
413
414 #endif
415
416     IF_DEBUG(scheduler, printAllThreads());
417
418 #if defined(RTS_SUPPORTS_THREADS)
419     /* Check to see whether there are any worker threads
420        waiting to deposit external call results. If so,
421        yield our capability */
422     yieldToReturningWorker(&sched_mutex, &cap);
423 #endif
424
425     /* If we're interrupted (the user pressed ^C, or some other
426      * termination condition occurred), kill all the currently running
427      * threads.
428      */
429     if (interrupted) {
430       IF_DEBUG(scheduler, sched_belch("interrupted"));
431       deleteAllThreads();
432       interrupted = rtsFalse;
433       was_interrupted = rtsTrue;
434     }
435
436     /* Go through the list of main threads and wake up any
437      * clients whose computations have finished.  ToDo: this
438      * should be done more efficiently without a linear scan
439      * of the main threads list, somehow...
440      */
441 #if defined(RTS_SUPPORTS_THREADS)
442     { 
443       StgMainThread *m, **prev;
444       prev = &main_threads;
445       for (m = main_threads; m != NULL; m = m->link) {
446         switch (m->tso->what_next) {
447         case ThreadComplete:
448           if (m->ret) {
449             *(m->ret) = (StgClosure *)m->tso->sp[0];
450           }
451           *prev = m->link;
452           m->stat = Success;
453           broadcastCondition(&m->wakeup);
454 #ifdef DEBUG
455           removeThreadLabel(m->tso);
456 #endif
457           break;
458         case ThreadKilled:
459           if (m->ret) *(m->ret) = NULL;
460           *prev = m->link;
461           if (was_interrupted) {
462             m->stat = Interrupted;
463           } else {
464             m->stat = Killed;
465           }
466           broadcastCondition(&m->wakeup);
467 #ifdef DEBUG
468           removeThreadLabel(m->tso);
469 #endif
470           break;
471         default:
472           break;
473         }
474       }
475     }
476
477 #else /* not threaded */
478
479 # if defined(PAR)
480     /* in GUM do this only on the Main PE */
481     if (IAmMainThread)
482 # endif
483     /* If our main thread has finished or been killed, return.
484      */
485     {
486       StgMainThread *m = main_threads;
487       if (m->tso->what_next == ThreadComplete
488           || m->tso->what_next == ThreadKilled) {
489 #ifdef DEBUG
490         removeThreadLabel((StgWord)m->tso);
491 #endif
492         main_threads = main_threads->link;
493         if (m->tso->what_next == ThreadComplete) {
494           /* we finished successfully, fill in the return value */
495           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
496           m->stat = Success;
497           return;
498         } else {
499           if (m->ret) { *(m->ret) = NULL; };
500           if (was_interrupted) {
501             m->stat = Interrupted;
502           } else {
503             m->stat = Killed;
504           }
505           return;
506         }
507       }
508     }
509 #endif
510
511     /* Top up the run queue from our spark pool.  We try to make the
512      * number of threads in the run queue equal to the number of
513      * free capabilities.
514      *
515      * Disable spark support in SMP for now, non-essential & requires
516      * a little bit of work to make it compile cleanly. -- sof 1/02.
517      */
518 #if 0 /* defined(SMP) */
519     {
520       nat n = getFreeCapabilities();
521       StgTSO *tso = run_queue_hd;
522
523       /* Count the run queue */
524       while (n > 0 && tso != END_TSO_QUEUE) {
525         tso = tso->link;
526         n--;
527       }
528
529       for (; n > 0; n--) {
530         StgClosure *spark;
531         spark = findSpark(rtsFalse);
532         if (spark == NULL) {
533           break; /* no more sparks in the pool */
534         } else {
535           /* I'd prefer this to be done in activateSpark -- HWL */
536           /* tricky - it needs to hold the scheduler lock and
537            * not try to re-acquire it -- SDM */
538           createSparkThread(spark);       
539           IF_DEBUG(scheduler,
540                    sched_belch("==^^ turning spark of closure %p into a thread",
541                                (StgClosure *)spark));
542         }
543       }
544       /* We need to wake up the other tasks if we just created some
545        * work for them.
546        */
547       if (getFreeCapabilities() - n > 1) {
548           signalCondition( &thread_ready_cond );
549       }
550     }
551 #endif // SMP
552
553     /* check for signals each time around the scheduler */
554 #ifndef mingw32_TARGET_OS
555     if (signals_pending()) {
556       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
557       startSignalHandlers();
558       ACQUIRE_LOCK(&sched_mutex);
559     }
560 #endif
561
562     /* Check whether any waiting threads need to be woken up.  If the
563      * run queue is empty, and there are no other tasks running, we
564      * can wait indefinitely for something to happen.
565      * ToDo: what if another client comes along & requests another
566      * main thread?
567      */
568     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
569       awaitEvent( EMPTY_RUN_QUEUE()
570 #if defined(SMP)
571         && allFreeCapabilities()
572 #endif
573         );
574     }
575     /* we can be interrupted while waiting for I/O... */
576     if (interrupted) continue;
577
578     /* 
579      * Detect deadlock: when we have no threads to run, there are no
580      * threads waiting on I/O or sleeping, and all the other tasks are
581      * waiting for work, we must have a deadlock of some description.
582      *
583      * We first try to find threads blocked on themselves (ie. black
584      * holes), and generate NonTermination exceptions where necessary.
585      *
586      * If no threads are black holed, we have a deadlock situation, so
587      * inform all the main threads.
588      */
589 #ifndef PAR
590     if (   EMPTY_THREAD_QUEUES()
591 #if defined(RTS_SUPPORTS_THREADS)
592         && EMPTY_QUEUE(suspended_ccalling_threads)
593 #endif
594 #ifdef SMP
595         && allFreeCapabilities()
596 #endif
597         )
598     {
599         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
600 #if defined(THREADED_RTS)
601         /* and SMP mode ..? */
602         releaseCapability(cap);
603 #endif
604         // Garbage collection can release some new threads due to
605         // either (a) finalizers or (b) threads resurrected because
606         // they are about to be send BlockedOnDeadMVar.  Any threads
607         // thus released will be immediately runnable.
608         GarbageCollect(GetRoots,rtsTrue);
609
610         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
611
612         IF_DEBUG(scheduler, 
613                  sched_belch("still deadlocked, checking for black holes..."));
614         detectBlackHoles();
615
616         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
617
618 #ifndef mingw32_TARGET_OS
619         /* If we have user-installed signal handlers, then wait
620          * for signals to arrive rather then bombing out with a
621          * deadlock.
622          */
623 #if defined(RTS_SUPPORTS_THREADS)
624         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
625                       a signal with no runnable threads (or I/O
626                       suspended ones) leads nowhere quick.
627                       For now, simply shut down when we reach this
628                       condition.
629                       
630                       ToDo: define precisely under what conditions
631                       the Scheduler should shut down in an MT setting.
632                    */
633 #else
634         if ( anyUserHandlers() ) {
635 #endif
636             IF_DEBUG(scheduler, 
637                      sched_belch("still deadlocked, waiting for signals..."));
638
639             awaitUserSignals();
640
641             // we might be interrupted...
642             if (interrupted) { continue; }
643
644             if (signals_pending()) {
645                 RELEASE_LOCK(&sched_mutex);
646                 startSignalHandlers();
647                 ACQUIRE_LOCK(&sched_mutex);
648             }
649             ASSERT(!EMPTY_RUN_QUEUE());
650             goto not_deadlocked;
651         }
652 #endif
653
654         /* Probably a real deadlock.  Send the current main thread the
655          * Deadlock exception (or in the SMP build, send *all* main
656          * threads the deadlock exception, since none of them can make
657          * progress).
658          */
659         {
660             StgMainThread *m;
661 #if defined(RTS_SUPPORTS_THREADS)
662             for (m = main_threads; m != NULL; m = m->link) {
663                 switch (m->tso->why_blocked) {
664                 case BlockedOnBlackHole:
665                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
666                     break;
667                 case BlockedOnException:
668                 case BlockedOnMVar:
669                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
670                     break;
671                 default:
672                     barf("deadlock: main thread blocked in a strange way");
673                 }
674             }
675 #else
676             m = main_threads;
677             switch (m->tso->why_blocked) {
678             case BlockedOnBlackHole:
679                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
680                 break;
681             case BlockedOnException:
682             case BlockedOnMVar:
683                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
684                 break;
685             default:
686                 barf("deadlock: main thread blocked in a strange way");
687             }
688 #endif
689         }
690
691 #if defined(RTS_SUPPORTS_THREADS)
692         /* ToDo: revisit conditions (and mechanism) for shutting
693            down a multi-threaded world  */
694         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
695         RELEASE_LOCK(&sched_mutex);
696         shutdownHaskell();
697         return;
698 #endif
699     }
700   not_deadlocked:
701
702 #elif defined(PAR)
703     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
704 #endif
705
706 #if defined(SMP)
707     /* If there's a GC pending, don't do anything until it has
708      * completed.
709      */
710     if (ready_to_gc) {
711       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
712       waitCondition( &gc_pending_cond, &sched_mutex );
713     }
714 #endif    
715
716 #if defined(RTS_SUPPORTS_THREADS)
717     /* block until we've got a thread on the run queue and a free
718      * capability.
719      *
720      */
721     if ( EMPTY_RUN_QUEUE() ) {
722       /* Give up our capability */
723       releaseCapability(cap);
724
725       /* If we're in the process of shutting down (& running the
726        * a batch of finalisers), don't wait around.
727        */
728       if ( shutting_down_scheduler ) {
729         RELEASE_LOCK(&sched_mutex);
730         return;
731       }
732       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
733       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
734       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
735     }
736 #endif
737
738 #if defined(GRAN)
739     if (RtsFlags.GranFlags.Light)
740       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
741
742     /* adjust time based on time-stamp */
743     if (event->time > CurrentTime[CurrentProc] &&
744         event->evttype != ContinueThread)
745       CurrentTime[CurrentProc] = event->time;
746     
747     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
748     if (!RtsFlags.GranFlags.Light)
749       handleIdlePEs();
750
751     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
752
753     /* main event dispatcher in GranSim */
754     switch (event->evttype) {
755       /* Should just be continuing execution */
756     case ContinueThread:
757       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
758       /* ToDo: check assertion
759       ASSERT(run_queue_hd != (StgTSO*)NULL &&
760              run_queue_hd != END_TSO_QUEUE);
761       */
762       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
763       if (!RtsFlags.GranFlags.DoAsyncFetch &&
764           procStatus[CurrentProc]==Fetching) {
765         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
766               CurrentTSO->id, CurrentTSO, CurrentProc);
767         goto next_thread;
768       } 
769       /* Ignore ContinueThreads for completed threads */
770       if (CurrentTSO->what_next == ThreadComplete) {
771         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
772               CurrentTSO->id, CurrentTSO, CurrentProc);
773         goto next_thread;
774       } 
775       /* Ignore ContinueThreads for threads that are being migrated */
776       if (PROCS(CurrentTSO)==Nowhere) { 
777         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
778               CurrentTSO->id, CurrentTSO, CurrentProc);
779         goto next_thread;
780       }
781       /* The thread should be at the beginning of the run queue */
782       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
783         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
784               CurrentTSO->id, CurrentTSO, CurrentProc);
785         break; // run the thread anyway
786       }
787       /*
788       new_event(proc, proc, CurrentTime[proc],
789                 FindWork,
790                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
791       goto next_thread; 
792       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
793       break; // now actually run the thread; DaH Qu'vam yImuHbej 
794
795     case FetchNode:
796       do_the_fetchnode(event);
797       goto next_thread;             /* handle next event in event queue  */
798       
799     case GlobalBlock:
800       do_the_globalblock(event);
801       goto next_thread;             /* handle next event in event queue  */
802       
803     case FetchReply:
804       do_the_fetchreply(event);
805       goto next_thread;             /* handle next event in event queue  */
806       
807     case UnblockThread:   /* Move from the blocked queue to the tail of */
808       do_the_unblock(event);
809       goto next_thread;             /* handle next event in event queue  */
810       
811     case ResumeThread:  /* Move from the blocked queue to the tail of */
812       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
813       event->tso->gran.blocktime += 
814         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
815       do_the_startthread(event);
816       goto next_thread;             /* handle next event in event queue  */
817       
818     case StartThread:
819       do_the_startthread(event);
820       goto next_thread;             /* handle next event in event queue  */
821       
822     case MoveThread:
823       do_the_movethread(event);
824       goto next_thread;             /* handle next event in event queue  */
825       
826     case MoveSpark:
827       do_the_movespark(event);
828       goto next_thread;             /* handle next event in event queue  */
829       
830     case FindWork:
831       do_the_findwork(event);
832       goto next_thread;             /* handle next event in event queue  */
833       
834     default:
835       barf("Illegal event type %u\n", event->evttype);
836     }  /* switch */
837     
838     /* This point was scheduler_loop in the old RTS */
839
840     IF_DEBUG(gran, belch("GRAN: after main switch"));
841
842     TimeOfLastEvent = CurrentTime[CurrentProc];
843     TimeOfNextEvent = get_time_of_next_event();
844     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
845     // CurrentTSO = ThreadQueueHd;
846
847     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
848                          TimeOfNextEvent));
849
850     if (RtsFlags.GranFlags.Light) 
851       GranSimLight_leave_system(event, &ActiveTSO); 
852
853     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
854
855     IF_DEBUG(gran, 
856              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
857
858     /* in a GranSim setup the TSO stays on the run queue */
859     t = CurrentTSO;
860     /* Take a thread from the run queue. */
861     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
862
863     IF_DEBUG(gran, 
864              fprintf(stderr, "GRAN: About to run current thread, which is\n");
865              G_TSO(t,5));
866
867     context_switch = 0; // turned on via GranYield, checking events and time slice
868
869     IF_DEBUG(gran, 
870              DumpGranEvent(GR_SCHEDULE, t));
871
872     procStatus[CurrentProc] = Busy;
873
874 #elif defined(PAR)
875     if (PendingFetches != END_BF_QUEUE) {
876         processFetches();
877     }
878
879     /* ToDo: phps merge with spark activation above */
880     /* check whether we have local work and send requests if we have none */
881     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
882       /* :-[  no local threads => look out for local sparks */
883       /* the spark pool for the current PE */
884       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
885       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
886           pool->hd < pool->tl) {
887         /* 
888          * ToDo: add GC code check that we really have enough heap afterwards!!
889          * Old comment:
890          * If we're here (no runnable threads) and we have pending
891          * sparks, we must have a space problem.  Get enough space
892          * to turn one of those pending sparks into a
893          * thread... 
894          */
895
896         spark = findSpark(rtsFalse);                /* get a spark */
897         if (spark != (rtsSpark) NULL) {
898           tso = activateSpark(spark);       /* turn the spark into a thread */
899           IF_PAR_DEBUG(schedule,
900                        belch("==== schedule: Created TSO %d (%p); %d threads active",
901                              tso->id, tso, advisory_thread_count));
902
903           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
904             belch("==^^ failed to activate spark");
905             goto next_thread;
906           }               /* otherwise fall through & pick-up new tso */
907         } else {
908           IF_PAR_DEBUG(verbose,
909                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
910                              spark_queue_len(pool)));
911           goto next_thread;
912         }
913       }
914
915       /* If we still have no work we need to send a FISH to get a spark
916          from another PE 
917       */
918       if (EMPTY_RUN_QUEUE()) {
919       /* =8-[  no local sparks => look for work on other PEs */
920         /*
921          * We really have absolutely no work.  Send out a fish
922          * (there may be some out there already), and wait for
923          * something to arrive.  We clearly can't run any threads
924          * until a SCHEDULE or RESUME arrives, and so that's what
925          * we're hoping to see.  (Of course, we still have to
926          * respond to other types of messages.)
927          */
928         TIME now = msTime() /*CURRENT_TIME*/;
929         IF_PAR_DEBUG(verbose, 
930                      belch("--  now=%ld", now));
931         IF_PAR_DEBUG(verbose,
932                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
933                          (last_fish_arrived_at!=0 &&
934                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
935                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
936                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
937                              last_fish_arrived_at,
938                              RtsFlags.ParFlags.fishDelay, now);
939                      });
940         
941         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
942             (last_fish_arrived_at==0 ||
943              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
944           /* outstandingFishes is set in sendFish, processFish;
945              avoid flooding system with fishes via delay */
946           pe = choosePE();
947           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
948                    NEW_FISH_HUNGER);
949
950           // Global statistics: count no. of fishes
951           if (RtsFlags.ParFlags.ParStats.Global &&
952               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
953             globalParStats.tot_fish_mess++;
954           }
955         }
956       
957         receivedFinish = processMessages();
958         goto next_thread;
959       }
960     } else if (PacketsWaiting()) {  /* Look for incoming messages */
961       receivedFinish = processMessages();
962     }
963
964     /* Now we are sure that we have some work available */
965     ASSERT(run_queue_hd != END_TSO_QUEUE);
966
967     /* Take a thread from the run queue, if we have work */
968     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
969     IF_DEBUG(sanity,checkTSO(t));
970
971     /* ToDo: write something to the log-file
972     if (RTSflags.ParFlags.granSimStats && !sameThread)
973         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
974
975     CurrentTSO = t;
976     */
977     /* the spark pool for the current PE */
978     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
979
980     IF_DEBUG(scheduler, 
981              belch("--=^ %d threads, %d sparks on [%#x]", 
982                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
983
984 # if 1
985     if (0 && RtsFlags.ParFlags.ParStats.Full && 
986         t && LastTSO && t->id != LastTSO->id && 
987         LastTSO->why_blocked == NotBlocked && 
988         LastTSO->what_next != ThreadComplete) {
989       // if previously scheduled TSO not blocked we have to record the context switch
990       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
991                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
992     }
993
994     if (RtsFlags.ParFlags.ParStats.Full && 
995         (emitSchedule /* forced emit */ ||
996         (t && LastTSO && t->id != LastTSO->id))) {
997       /* 
998          we are running a different TSO, so write a schedule event to log file
999          NB: If we use fair scheduling we also have to write  a deschedule 
1000              event for LastTSO; with unfair scheduling we know that the
1001              previous tso has blocked whenever we switch to another tso, so
1002              we don't need it in GUM for now
1003       */
1004       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1005                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1006       emitSchedule = rtsFalse;
1007     }
1008      
1009 # endif
1010 #else /* !GRAN && !PAR */
1011   
1012     /* grab a thread from the run queue */
1013     ASSERT(run_queue_hd != END_TSO_QUEUE);
1014     t = POP_RUN_QUEUE();
1015     // Sanity check the thread we're about to run.  This can be
1016     // expensive if there is lots of thread switching going on...
1017     IF_DEBUG(sanity,checkTSO(t));
1018 #endif
1019     
1020     cap->r.rCurrentTSO = t;
1021     
1022     /* context switches are now initiated by the timer signal, unless
1023      * the user specified "context switch as often as possible", with
1024      * +RTS -C0
1025      */
1026     if (
1027 #ifdef PROFILING
1028         RtsFlags.ProfFlags.profileInterval == 0 ||
1029 #endif
1030         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1031          && (run_queue_hd != END_TSO_QUEUE
1032              || blocked_queue_hd != END_TSO_QUEUE
1033              || sleeping_queue != END_TSO_QUEUE)))
1034         context_switch = 1;
1035     else
1036         context_switch = 0;
1037
1038     RELEASE_LOCK(&sched_mutex);
1039
1040     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1041                               t->id, t, whatNext_strs[t->what_next]));
1042
1043 #ifdef PROFILING
1044     startHeapProfTimer();
1045 #endif
1046
1047     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1048     /* Run the current thread 
1049      */
1050     switch (cap->r.rCurrentTSO->what_next) {
1051     case ThreadKilled:
1052     case ThreadComplete:
1053         /* Thread already finished, return to scheduler. */
1054         ret = ThreadFinished;
1055         break;
1056     case ThreadEnterGHC:
1057         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1058         break;
1059     case ThreadRunGHC:
1060         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1061         break;
1062     case ThreadEnterInterp:
1063         ret = interpretBCO(cap);
1064         break;
1065     default:
1066       barf("schedule: invalid what_next field");
1067     }
1068     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1069     
1070     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1071 #ifdef PROFILING
1072     stopHeapProfTimer();
1073     CCCS = CCS_SYSTEM;
1074 #endif
1075     
1076     ACQUIRE_LOCK(&sched_mutex);
1077
1078 #ifdef SMP
1079     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1080 #elif !defined(GRAN) && !defined(PAR)
1081     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1082 #endif
1083     t = cap->r.rCurrentTSO;
1084     
1085 #if defined(PAR)
1086     /* HACK 675: if the last thread didn't yield, make sure to print a 
1087        SCHEDULE event to the log file when StgRunning the next thread, even
1088        if it is the same one as before */
1089     LastTSO = t; 
1090     TimeOfLastYield = CURRENT_TIME;
1091 #endif
1092
1093     switch (ret) {
1094     case HeapOverflow:
1095 #if defined(GRAN)
1096       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1097       globalGranStats.tot_heapover++;
1098 #elif defined(PAR)
1099       globalParStats.tot_heapover++;
1100 #endif
1101
1102       // did the task ask for a large block?
1103       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1104           // if so, get one and push it on the front of the nursery.
1105           bdescr *bd;
1106           nat blocks;
1107           
1108           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1109
1110           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1111                                    t->id, t,
1112                                    whatNext_strs[t->what_next], blocks));
1113
1114           // don't do this if it would push us over the
1115           // alloc_blocks_lim limit; we'll GC first.
1116           if (alloc_blocks + blocks < alloc_blocks_lim) {
1117
1118               alloc_blocks += blocks;
1119               bd = allocGroup( blocks );
1120
1121               // link the new group into the list
1122               bd->link = cap->r.rCurrentNursery;
1123               bd->u.back = cap->r.rCurrentNursery->u.back;
1124               if (cap->r.rCurrentNursery->u.back != NULL) {
1125                   cap->r.rCurrentNursery->u.back->link = bd;
1126               } else {
1127                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1128                          g0s0->blocks == cap->r.rNursery);
1129                   cap->r.rNursery = g0s0->blocks = bd;
1130               }           
1131               cap->r.rCurrentNursery->u.back = bd;
1132
1133               // initialise it as a nursery block.  We initialise the
1134               // step, gen_no, and flags field of *every* sub-block in
1135               // this large block, because this is easier than making
1136               // sure that we always find the block head of a large
1137               // block whenever we call Bdescr() (eg. evacuate() and
1138               // isAlive() in the GC would both have to do this, at
1139               // least).
1140               { 
1141                   bdescr *x;
1142                   for (x = bd; x < bd + blocks; x++) {
1143                       x->step = g0s0;
1144                       x->gen_no = 0;
1145                       x->flags = 0;
1146                       x->free = x->start;
1147                   }
1148               }
1149
1150               // don't forget to update the block count in g0s0.
1151               g0s0->n_blocks += blocks;
1152               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1153
1154               // now update the nursery to point to the new block
1155               cap->r.rCurrentNursery = bd;
1156
1157               // we might be unlucky and have another thread get on the
1158               // run queue before us and steal the large block, but in that
1159               // case the thread will just end up requesting another large
1160               // block.
1161               PUSH_ON_RUN_QUEUE(t);
1162               break;
1163           }
1164       }
1165
1166       /* make all the running tasks block on a condition variable,
1167        * maybe set context_switch and wait till they all pile in,
1168        * then have them wait on a GC condition variable.
1169        */
1170       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1171                                t->id, t, whatNext_strs[t->what_next]));
1172       threadPaused(t);
1173 #if defined(GRAN)
1174       ASSERT(!is_on_queue(t,CurrentProc));
1175 #elif defined(PAR)
1176       /* Currently we emit a DESCHEDULE event before GC in GUM.
1177          ToDo: either add separate event to distinguish SYSTEM time from rest
1178                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1179       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1180         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1181                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1182         emitSchedule = rtsTrue;
1183       }
1184 #endif
1185       
1186       ready_to_gc = rtsTrue;
1187       context_switch = 1;               /* stop other threads ASAP */
1188       PUSH_ON_RUN_QUEUE(t);
1189       /* actual GC is done at the end of the while loop */
1190       break;
1191       
1192     case StackOverflow:
1193 #if defined(GRAN)
1194       IF_DEBUG(gran, 
1195                DumpGranEvent(GR_DESCHEDULE, t));
1196       globalGranStats.tot_stackover++;
1197 #elif defined(PAR)
1198       // IF_DEBUG(par, 
1199       // DumpGranEvent(GR_DESCHEDULE, t);
1200       globalParStats.tot_stackover++;
1201 #endif
1202       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1203                                t->id, t, whatNext_strs[t->what_next]));
1204       /* just adjust the stack for this thread, then pop it back
1205        * on the run queue.
1206        */
1207       threadPaused(t);
1208       { 
1209         StgMainThread *m;
1210         /* enlarge the stack */
1211         StgTSO *new_t = threadStackOverflow(t);
1212         
1213         /* This TSO has moved, so update any pointers to it from the
1214          * main thread stack.  It better not be on any other queues...
1215          * (it shouldn't be).
1216          */
1217         for (m = main_threads; m != NULL; m = m->link) {
1218           if (m->tso == t) {
1219             m->tso = new_t;
1220           }
1221         }
1222         threadPaused(new_t);
1223         PUSH_ON_RUN_QUEUE(new_t);
1224       }
1225       break;
1226
1227     case ThreadYielding:
1228 #if defined(GRAN)
1229       IF_DEBUG(gran, 
1230                DumpGranEvent(GR_DESCHEDULE, t));
1231       globalGranStats.tot_yields++;
1232 #elif defined(PAR)
1233       // IF_DEBUG(par, 
1234       // DumpGranEvent(GR_DESCHEDULE, t);
1235       globalParStats.tot_yields++;
1236 #endif
1237       /* put the thread back on the run queue.  Then, if we're ready to
1238        * GC, check whether this is the last task to stop.  If so, wake
1239        * up the GC thread.  getThread will block during a GC until the
1240        * GC is finished.
1241        */
1242       IF_DEBUG(scheduler,
1243                if (t->what_next == ThreadEnterInterp) {
1244                    /* ToDo: or maybe a timer expired when we were in Hugs?
1245                     * or maybe someone hit ctrl-C
1246                     */
1247                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1248                          t->id, t, whatNext_strs[t->what_next]);
1249                } else {
1250                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1251                          t->id, t, whatNext_strs[t->what_next]);
1252                }
1253                );
1254
1255       threadPaused(t);
1256
1257       IF_DEBUG(sanity,
1258                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1259                checkTSO(t));
1260       ASSERT(t->link == END_TSO_QUEUE);
1261 #if defined(GRAN)
1262       ASSERT(!is_on_queue(t,CurrentProc));
1263
1264       IF_DEBUG(sanity,
1265                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1266                checkThreadQsSanity(rtsTrue));
1267 #endif
1268 #if defined(PAR)
1269       if (RtsFlags.ParFlags.doFairScheduling) { 
1270         /* this does round-robin scheduling; good for concurrency */
1271         APPEND_TO_RUN_QUEUE(t);
1272       } else {
1273         /* this does unfair scheduling; good for parallelism */
1274         PUSH_ON_RUN_QUEUE(t);
1275       }
1276 #else
1277       /* this does round-robin scheduling; good for concurrency */
1278       APPEND_TO_RUN_QUEUE(t);
1279 #endif
1280 #if defined(GRAN)
1281       /* add a ContinueThread event to actually process the thread */
1282       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1283                 ContinueThread,
1284                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1285       IF_GRAN_DEBUG(bq, 
1286                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1287                G_EVENTQ(0);
1288                G_CURR_THREADQ(0));
1289 #endif /* GRAN */
1290       break;
1291       
1292     case ThreadBlocked:
1293 #if defined(GRAN)
1294       IF_DEBUG(scheduler,
1295                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1296                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1297                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1298
1299       // ??? needed; should emit block before
1300       IF_DEBUG(gran, 
1301                DumpGranEvent(GR_DESCHEDULE, t)); 
1302       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1303       /*
1304         ngoq Dogh!
1305       ASSERT(procStatus[CurrentProc]==Busy || 
1306               ((procStatus[CurrentProc]==Fetching) && 
1307               (t->block_info.closure!=(StgClosure*)NULL)));
1308       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1309           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1310             procStatus[CurrentProc]==Fetching)) 
1311         procStatus[CurrentProc] = Idle;
1312       */
1313 #elif defined(PAR)
1314       IF_DEBUG(scheduler,
1315                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1316                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1317       IF_PAR_DEBUG(bq,
1318
1319                    if (t->block_info.closure!=(StgClosure*)NULL) 
1320                      print_bq(t->block_info.closure));
1321
1322       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1323       blockThread(t);
1324
1325       /* whatever we schedule next, we must log that schedule */
1326       emitSchedule = rtsTrue;
1327
1328 #else /* !GRAN */
1329       /* don't need to do anything.  Either the thread is blocked on
1330        * I/O, in which case we'll have called addToBlockedQueue
1331        * previously, or it's blocked on an MVar or Blackhole, in which
1332        * case it'll be on the relevant queue already.
1333        */
1334       IF_DEBUG(scheduler,
1335                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1336                printThreadBlockage(t);
1337                fprintf(stderr, "\n"));
1338
1339       /* Only for dumping event to log file 
1340          ToDo: do I need this in GranSim, too?
1341       blockThread(t);
1342       */
1343 #endif
1344       threadPaused(t);
1345       break;
1346       
1347     case ThreadFinished:
1348       /* Need to check whether this was a main thread, and if so, signal
1349        * the task that started it with the return value.  If we have no
1350        * more main threads, we probably need to stop all the tasks until
1351        * we get a new one.
1352        */
1353       /* We also end up here if the thread kills itself with an
1354        * uncaught exception, see Exception.hc.
1355        */
1356       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1357 #if defined(GRAN)
1358       endThread(t, CurrentProc); // clean-up the thread
1359 #elif defined(PAR)
1360       /* For now all are advisory -- HWL */
1361       //if(t->priority==AdvisoryPriority) ??
1362       advisory_thread_count--;
1363       
1364 # ifdef DIST
1365       if(t->dist.priority==RevalPriority)
1366         FinishReval(t);
1367 # endif
1368       
1369       if (RtsFlags.ParFlags.ParStats.Full &&
1370           !RtsFlags.ParFlags.ParStats.Suppressed) 
1371         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1372 #endif
1373       break;
1374       
1375     default:
1376       barf("schedule: invalid thread return code %d", (int)ret);
1377     }
1378
1379 #ifdef PROFILING
1380     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1381         GarbageCollect(GetRoots, rtsTrue);
1382         heapCensus();
1383         performHeapProfile = rtsFalse;
1384         ready_to_gc = rtsFalse; // we already GC'd
1385     }
1386 #endif
1387
1388     if (ready_to_gc 
1389 #ifdef SMP
1390         && allFreeCapabilities() 
1391 #endif
1392         ) {
1393       /* everybody back, start the GC.
1394        * Could do it in this thread, or signal a condition var
1395        * to do it in another thread.  Either way, we need to
1396        * broadcast on gc_pending_cond afterward.
1397        */
1398 #if defined(RTS_SUPPORTS_THREADS)
1399       IF_DEBUG(scheduler,sched_belch("doing GC"));
1400 #endif
1401       GarbageCollect(GetRoots,rtsFalse);
1402       ready_to_gc = rtsFalse;
1403 #ifdef SMP
1404       broadcastCondition(&gc_pending_cond);
1405 #endif
1406 #if defined(GRAN)
1407       /* add a ContinueThread event to continue execution of current thread */
1408       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1409                 ContinueThread,
1410                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1411       IF_GRAN_DEBUG(bq, 
1412                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1413                G_EVENTQ(0);
1414                G_CURR_THREADQ(0));
1415 #endif /* GRAN */
1416     }
1417
1418 #if defined(GRAN)
1419   next_thread:
1420     IF_GRAN_DEBUG(unused,
1421                   print_eventq(EventHd));
1422
1423     event = get_next_event();
1424 #elif defined(PAR)
1425   next_thread:
1426     /* ToDo: wait for next message to arrive rather than busy wait */
1427 #endif /* GRAN */
1428
1429   } /* end of while(1) */
1430
1431   IF_PAR_DEBUG(verbose,
1432                belch("== Leaving schedule() after having received Finish"));
1433 }
1434
1435 /* ---------------------------------------------------------------------------
1436  * Singleton fork(). Do not copy any running threads.
1437  * ------------------------------------------------------------------------- */
1438
1439 StgInt forkProcess(StgTSO* tso) {
1440
1441 #ifndef mingw32_TARGET_OS
1442   pid_t pid;
1443   StgTSO* t,*next;
1444
1445   IF_DEBUG(scheduler,sched_belch("forking!"));
1446
1447   pid = fork();
1448   if (pid) { /* parent */
1449
1450   /* just return the pid */
1451     
1452   } else { /* child */
1453   /* wipe all other threads */
1454   run_queue_hd = tso;
1455   tso->link = END_TSO_QUEUE;
1456
1457   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1458      us is picky about finding the threat still in its queue when
1459      handling the deleteThread() */
1460
1461   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1462     next = t->link;
1463     if (t->id != tso->id) {
1464       deleteThread(t);
1465     }
1466   }
1467   }
1468   return pid;
1469 #else /* mingw32 */
1470   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1471   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1472   return -1;
1473 #endif /* mingw32 */
1474 }
1475
1476 /* ---------------------------------------------------------------------------
1477  * deleteAllThreads():  kill all the live threads.
1478  *
1479  * This is used when we catch a user interrupt (^C), before performing
1480  * any necessary cleanups and running finalizers.
1481  *
1482  * Locks: sched_mutex held.
1483  * ------------------------------------------------------------------------- */
1484    
1485 void deleteAllThreads ( void )
1486 {
1487   StgTSO* t, *next;
1488   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1489   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1490       next = t->global_link;
1491       deleteThread(t);
1492   }      
1493   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1494   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1495   sleeping_queue = END_TSO_QUEUE;
1496 }
1497
1498 /* startThread and  insertThread are now in GranSim.c -- HWL */
1499
1500
1501 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1502 //@subsection Suspend and Resume
1503
1504 /* ---------------------------------------------------------------------------
1505  * Suspending & resuming Haskell threads.
1506  * 
1507  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1508  * its capability before calling the C function.  This allows another
1509  * task to pick up the capability and carry on running Haskell
1510  * threads.  It also means that if the C call blocks, it won't lock
1511  * the whole system.
1512  *
1513  * The Haskell thread making the C call is put to sleep for the
1514  * duration of the call, on the susepended_ccalling_threads queue.  We
1515  * give out a token to the task, which it can use to resume the thread
1516  * on return from the C function.
1517  * ------------------------------------------------------------------------- */
1518    
1519 StgInt
1520 suspendThread( StgRegTable *reg, 
1521                rtsBool concCall
1522 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1523                STG_UNUSED
1524 #endif
1525                )
1526 {
1527   nat tok;
1528   Capability *cap;
1529
1530   /* assume that *reg is a pointer to the StgRegTable part
1531    * of a Capability.
1532    */
1533   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1534
1535   ACQUIRE_LOCK(&sched_mutex);
1536
1537   IF_DEBUG(scheduler,
1538            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1539
1540   threadPaused(cap->r.rCurrentTSO);
1541   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1542   suspended_ccalling_threads = cap->r.rCurrentTSO;
1543
1544 #if defined(RTS_SUPPORTS_THREADS)
1545   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1546 #endif
1547
1548   /* Use the thread ID as the token; it should be unique */
1549   tok = cap->r.rCurrentTSO->id;
1550
1551   /* Hand back capability */
1552   releaseCapability(cap);
1553   
1554 #if defined(RTS_SUPPORTS_THREADS)
1555   /* Preparing to leave the RTS, so ensure there's a native thread/task
1556      waiting to take over.
1557      
1558      ToDo: optimise this and only create a new task if there's a need
1559      for one (i.e., if there's only one Concurrent Haskell thread alive,
1560      there's no need to create a new task).
1561   */
1562   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1563   if (concCall) {
1564     startTask(taskStart);
1565   }
1566 #endif
1567
1568   /* Other threads _might_ be available for execution; signal this */
1569   THREAD_RUNNABLE();
1570   RELEASE_LOCK(&sched_mutex);
1571   return tok; 
1572 }
1573
1574 StgRegTable *
1575 resumeThread( StgInt tok,
1576               rtsBool concCall
1577 #if !defined(RTS_SUPPORTS_THREADS)
1578                STG_UNUSED
1579 #endif
1580               )
1581 {
1582   StgTSO *tso, **prev;
1583   Capability *cap;
1584
1585 #if defined(RTS_SUPPORTS_THREADS)
1586   /* Wait for permission to re-enter the RTS with the result. */
1587   if ( concCall ) {
1588     ACQUIRE_LOCK(&sched_mutex);
1589     grabReturnCapability(&sched_mutex, &cap);
1590   } else {
1591     grabCapability(&cap);
1592   }
1593 #else
1594   grabCapability(&cap);
1595 #endif
1596
1597   /* Remove the thread off of the suspended list */
1598   prev = &suspended_ccalling_threads;
1599   for (tso = suspended_ccalling_threads; 
1600        tso != END_TSO_QUEUE; 
1601        prev = &tso->link, tso = tso->link) {
1602     if (tso->id == (StgThreadID)tok) {
1603       *prev = tso->link;
1604       break;
1605     }
1606   }
1607   if (tso == END_TSO_QUEUE) {
1608     barf("resumeThread: thread not found");
1609   }
1610   tso->link = END_TSO_QUEUE;
1611   /* Reset blocking status */
1612   tso->why_blocked  = NotBlocked;
1613
1614   cap->r.rCurrentTSO = tso;
1615   RELEASE_LOCK(&sched_mutex);
1616   return &cap->r;
1617 }
1618
1619
1620 /* ---------------------------------------------------------------------------
1621  * Static functions
1622  * ------------------------------------------------------------------------ */
1623 static void unblockThread(StgTSO *tso);
1624
1625 /* ---------------------------------------------------------------------------
1626  * Comparing Thread ids.
1627  *
1628  * This is used from STG land in the implementation of the
1629  * instances of Eq/Ord for ThreadIds.
1630  * ------------------------------------------------------------------------ */
1631
1632 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1633
1634   StgThreadID id1 = tso1->id; 
1635   StgThreadID id2 = tso2->id;
1636  
1637   if (id1 < id2) return (-1);
1638   if (id1 > id2) return 1;
1639   return 0;
1640 }
1641
1642 /* ---------------------------------------------------------------------------
1643  * Fetching the ThreadID from an StgTSO.
1644  *
1645  * This is used in the implementation of Show for ThreadIds.
1646  * ------------------------------------------------------------------------ */
1647 int rts_getThreadId(const StgTSO *tso) 
1648 {
1649   return tso->id;
1650 }
1651
1652 #ifdef DEBUG
1653 void labelThread(StgTSO *tso, char *label)
1654 {
1655   int len;
1656   void *buf;
1657
1658   /* Caveat: Once set, you can only set the thread name to "" */
1659   len = strlen(label)+1;
1660   buf = malloc(len);
1661   if (buf == NULL) {
1662     fprintf(stderr,"insufficient memory for labelThread!\n");
1663   } else
1664     strncpy(buf,label,len);
1665   /* Update will free the old memory for us */
1666   updateThreadLabel((StgWord)tso,buf);
1667 }
1668 #endif /* DEBUG */
1669
1670 /* ---------------------------------------------------------------------------
1671    Create a new thread.
1672
1673    The new thread starts with the given stack size.  Before the
1674    scheduler can run, however, this thread needs to have a closure
1675    (and possibly some arguments) pushed on its stack.  See
1676    pushClosure() in Schedule.h.
1677
1678    createGenThread() and createIOThread() (in SchedAPI.h) are
1679    convenient packaged versions of this function.
1680
1681    currently pri (priority) is only used in a GRAN setup -- HWL
1682    ------------------------------------------------------------------------ */
1683 //@cindex createThread
1684 #if defined(GRAN)
1685 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1686 StgTSO *
1687 createThread(nat size, StgInt pri)
1688 #else
1689 StgTSO *
1690 createThread(nat size)
1691 #endif
1692 {
1693
1694     StgTSO *tso;
1695     nat stack_size;
1696
1697     /* First check whether we should create a thread at all */
1698 #if defined(PAR)
1699   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1700   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1701     threadsIgnored++;
1702     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1703           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1704     return END_TSO_QUEUE;
1705   }
1706   threadsCreated++;
1707 #endif
1708
1709 #if defined(GRAN)
1710   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1711 #endif
1712
1713   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1714
1715   /* catch ridiculously small stack sizes */
1716   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1717     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1718   }
1719
1720   stack_size = size - TSO_STRUCT_SIZEW;
1721
1722   tso = (StgTSO *)allocate(size);
1723   TICK_ALLOC_TSO(stack_size, 0);
1724
1725   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1726 #if defined(GRAN)
1727   SET_GRAN_HDR(tso, ThisPE);
1728 #endif
1729   tso->what_next     = ThreadEnterGHC;
1730
1731   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1732    * protect the increment operation on next_thread_id.
1733    * In future, we could use an atomic increment instead.
1734    */
1735   ACQUIRE_LOCK(&thread_id_mutex);
1736   tso->id = next_thread_id++; 
1737   RELEASE_LOCK(&thread_id_mutex);
1738
1739   tso->why_blocked  = NotBlocked;
1740   tso->blocked_exceptions = NULL;
1741
1742   tso->stack_size   = stack_size;
1743   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1744                               - TSO_STRUCT_SIZEW;
1745   tso->sp           = (P_)&(tso->stack) + stack_size;
1746
1747 #ifdef PROFILING
1748   tso->prof.CCCS = CCS_MAIN;
1749 #endif
1750
1751   /* put a stop frame on the stack */
1752   tso->sp -= sizeofW(StgStopFrame);
1753   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1754   tso->su = (StgUpdateFrame*)tso->sp;
1755
1756   // ToDo: check this
1757 #if defined(GRAN)
1758   tso->link = END_TSO_QUEUE;
1759   /* uses more flexible routine in GranSim */
1760   insertThread(tso, CurrentProc);
1761 #else
1762   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1763    * from its creation
1764    */
1765 #endif
1766
1767 #if defined(GRAN) 
1768   if (RtsFlags.GranFlags.GranSimStats.Full) 
1769     DumpGranEvent(GR_START,tso);
1770 #elif defined(PAR)
1771   if (RtsFlags.ParFlags.ParStats.Full) 
1772     DumpGranEvent(GR_STARTQ,tso);
1773   /* HACk to avoid SCHEDULE 
1774      LastTSO = tso; */
1775 #endif
1776
1777   /* Link the new thread on the global thread list.
1778    */
1779   tso->global_link = all_threads;
1780   all_threads = tso;
1781
1782 #if defined(DIST)
1783   tso->dist.priority = MandatoryPriority; //by default that is...
1784 #endif
1785
1786 #if defined(GRAN)
1787   tso->gran.pri = pri;
1788 # if defined(DEBUG)
1789   tso->gran.magic = TSO_MAGIC; // debugging only
1790 # endif
1791   tso->gran.sparkname   = 0;
1792   tso->gran.startedat   = CURRENT_TIME; 
1793   tso->gran.exported    = 0;
1794   tso->gran.basicblocks = 0;
1795   tso->gran.allocs      = 0;
1796   tso->gran.exectime    = 0;
1797   tso->gran.fetchtime   = 0;
1798   tso->gran.fetchcount  = 0;
1799   tso->gran.blocktime   = 0;
1800   tso->gran.blockcount  = 0;
1801   tso->gran.blockedat   = 0;
1802   tso->gran.globalsparks = 0;
1803   tso->gran.localsparks  = 0;
1804   if (RtsFlags.GranFlags.Light)
1805     tso->gran.clock  = Now; /* local clock */
1806   else
1807     tso->gran.clock  = 0;
1808
1809   IF_DEBUG(gran,printTSO(tso));
1810 #elif defined(PAR)
1811 # if defined(DEBUG)
1812   tso->par.magic = TSO_MAGIC; // debugging only
1813 # endif
1814   tso->par.sparkname   = 0;
1815   tso->par.startedat   = CURRENT_TIME; 
1816   tso->par.exported    = 0;
1817   tso->par.basicblocks = 0;
1818   tso->par.allocs      = 0;
1819   tso->par.exectime    = 0;
1820   tso->par.fetchtime   = 0;
1821   tso->par.fetchcount  = 0;
1822   tso->par.blocktime   = 0;
1823   tso->par.blockcount  = 0;
1824   tso->par.blockedat   = 0;
1825   tso->par.globalsparks = 0;
1826   tso->par.localsparks  = 0;
1827 #endif
1828
1829 #if defined(GRAN)
1830   globalGranStats.tot_threads_created++;
1831   globalGranStats.threads_created_on_PE[CurrentProc]++;
1832   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1833   globalGranStats.tot_sq_probes++;
1834 #elif defined(PAR)
1835   // collect parallel global statistics (currently done together with GC stats)
1836   if (RtsFlags.ParFlags.ParStats.Global &&
1837       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1838     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1839     globalParStats.tot_threads_created++;
1840   }
1841 #endif 
1842
1843 #if defined(GRAN)
1844   IF_GRAN_DEBUG(pri,
1845                 belch("==__ schedule: Created TSO %d (%p);",
1846                       CurrentProc, tso, tso->id));
1847 #elif defined(PAR)
1848     IF_PAR_DEBUG(verbose,
1849                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1850                        tso->id, tso, advisory_thread_count));
1851 #else
1852   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1853                                  tso->id, tso->stack_size));
1854 #endif    
1855   return tso;
1856 }
1857
1858 #if defined(PAR)
1859 /* RFP:
1860    all parallel thread creation calls should fall through the following routine.
1861 */
1862 StgTSO *
1863 createSparkThread(rtsSpark spark) 
1864 { StgTSO *tso;
1865   ASSERT(spark != (rtsSpark)NULL);
1866   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1867   { threadsIgnored++;
1868     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1869           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1870     return END_TSO_QUEUE;
1871   }
1872   else
1873   { threadsCreated++;
1874     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1875     if (tso==END_TSO_QUEUE)     
1876       barf("createSparkThread: Cannot create TSO");
1877 #if defined(DIST)
1878     tso->priority = AdvisoryPriority;
1879 #endif
1880     pushClosure(tso,spark);
1881     PUSH_ON_RUN_QUEUE(tso);
1882     advisory_thread_count++;    
1883   }
1884   return tso;
1885 }
1886 #endif
1887
1888 /*
1889   Turn a spark into a thread.
1890   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1891 */
1892 #if defined(PAR)
1893 //@cindex activateSpark
1894 StgTSO *
1895 activateSpark (rtsSpark spark) 
1896 {
1897   StgTSO *tso;
1898
1899   tso = createSparkThread(spark);
1900   if (RtsFlags.ParFlags.ParStats.Full) {   
1901     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1902     IF_PAR_DEBUG(verbose,
1903                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1904                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1905   }
1906   // ToDo: fwd info on local/global spark to thread -- HWL
1907   // tso->gran.exported =  spark->exported;
1908   // tso->gran.locked =   !spark->global;
1909   // tso->gran.sparkname = spark->name;
1910
1911   return tso;
1912 }
1913 #endif
1914
1915 static SchedulerStatus waitThread_(/*out*/StgMainThread* m
1916 #if defined(THREADED_RTS)
1917                                    , rtsBool blockWaiting
1918 #endif
1919                                    );
1920
1921
1922 /* ---------------------------------------------------------------------------
1923  * scheduleThread()
1924  *
1925  * scheduleThread puts a thread on the head of the runnable queue.
1926  * This will usually be done immediately after a thread is created.
1927  * The caller of scheduleThread must create the thread using e.g.
1928  * createThread and push an appropriate closure
1929  * on this thread's stack before the scheduler is invoked.
1930  * ------------------------------------------------------------------------ */
1931
1932 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1933
1934 void
1935 scheduleThread_(StgTSO *tso
1936                , rtsBool createTask
1937 #if !defined(THREADED_RTS)
1938                  STG_UNUSED
1939 #endif
1940               )
1941 {
1942   ACQUIRE_LOCK(&sched_mutex);
1943
1944   /* Put the new thread on the head of the runnable queue.  The caller
1945    * better push an appropriate closure on this thread's stack
1946    * beforehand.  In the SMP case, the thread may start running as
1947    * soon as we release the scheduler lock below.
1948    */
1949   PUSH_ON_RUN_QUEUE(tso);
1950 #if defined(THREADED_RTS)
1951   /* If main() is scheduling a thread, don't bother creating a 
1952    * new task.
1953    */
1954   if ( createTask ) {
1955     startTask(taskStart);
1956   }
1957 #endif
1958   THREAD_RUNNABLE();
1959
1960 #if 0
1961   IF_DEBUG(scheduler,printTSO(tso));
1962 #endif
1963   RELEASE_LOCK(&sched_mutex);
1964 }
1965
1966 void scheduleThread(StgTSO* tso)
1967 {
1968   scheduleThread_(tso, rtsFalse);
1969 }
1970
1971 SchedulerStatus
1972 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
1973 {
1974   StgMainThread *m;
1975
1976   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1977   m->tso = tso;
1978   m->ret = ret;
1979   m->stat = NoStatus;
1980 #if defined(RTS_SUPPORTS_THREADS)
1981   initCondition(&m->wakeup);
1982 #endif
1983
1984   /* Put the thread on the main-threads list prior to scheduling the TSO.
1985      Failure to do so introduces a race condition in the MT case (as
1986      identified by Wolfgang Thaller), whereby the new task/OS thread 
1987      created by scheduleThread_() would complete prior to the thread
1988      that spawned it managed to put 'itself' on the main-threads list.
1989      The upshot of it all being that the worker thread wouldn't get to
1990      signal the completion of the its work item for the main thread to
1991      see (==> it got stuck waiting.)    -- sof 6/02.
1992   */
1993   ACQUIRE_LOCK(&sched_mutex);
1994   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
1995   
1996   m->link = main_threads;
1997   main_threads = m;
1998
1999   /* Inefficient (scheduleThread_() acquires it again right away),
2000    * but obviously correct.
2001    */
2002   RELEASE_LOCK(&sched_mutex);
2003
2004   scheduleThread_(tso, rtsTrue);
2005 #if defined(THREADED_RTS)
2006   return waitThread_(m, rtsTrue);
2007 #else
2008   return waitThread_(m);
2009 #endif
2010 }
2011
2012 /* ---------------------------------------------------------------------------
2013  * initScheduler()
2014  *
2015  * Initialise the scheduler.  This resets all the queues - if the
2016  * queues contained any threads, they'll be garbage collected at the
2017  * next pass.
2018  *
2019  * ------------------------------------------------------------------------ */
2020
2021 #ifdef SMP
2022 static void
2023 term_handler(int sig STG_UNUSED)
2024 {
2025   stat_workerStop();
2026   ACQUIRE_LOCK(&term_mutex);
2027   await_death--;
2028   RELEASE_LOCK(&term_mutex);
2029   shutdownThread();
2030 }
2031 #endif
2032
2033 void 
2034 initScheduler(void)
2035 {
2036 #if defined(GRAN)
2037   nat i;
2038
2039   for (i=0; i<=MAX_PROC; i++) {
2040     run_queue_hds[i]      = END_TSO_QUEUE;
2041     run_queue_tls[i]      = END_TSO_QUEUE;
2042     blocked_queue_hds[i]  = END_TSO_QUEUE;
2043     blocked_queue_tls[i]  = END_TSO_QUEUE;
2044     ccalling_threadss[i]  = END_TSO_QUEUE;
2045     sleeping_queue        = END_TSO_QUEUE;
2046   }
2047 #else
2048   run_queue_hd      = END_TSO_QUEUE;
2049   run_queue_tl      = END_TSO_QUEUE;
2050   blocked_queue_hd  = END_TSO_QUEUE;
2051   blocked_queue_tl  = END_TSO_QUEUE;
2052   sleeping_queue    = END_TSO_QUEUE;
2053 #endif 
2054
2055   suspended_ccalling_threads  = END_TSO_QUEUE;
2056
2057   main_threads = NULL;
2058   all_threads  = END_TSO_QUEUE;
2059
2060   context_switch = 0;
2061   interrupted    = 0;
2062
2063   RtsFlags.ConcFlags.ctxtSwitchTicks =
2064       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2065       
2066 #if defined(RTS_SUPPORTS_THREADS)
2067   /* Initialise the mutex and condition variables used by
2068    * the scheduler. */
2069   initMutex(&sched_mutex);
2070   initMutex(&term_mutex);
2071   initMutex(&thread_id_mutex);
2072
2073   initCondition(&thread_ready_cond);
2074 #endif
2075   
2076 #if defined(SMP)
2077   initCondition(&gc_pending_cond);
2078 #endif
2079
2080 #if defined(RTS_SUPPORTS_THREADS)
2081   ACQUIRE_LOCK(&sched_mutex);
2082 #endif
2083
2084   /* Install the SIGHUP handler */
2085 #if defined(SMP)
2086   {
2087     struct sigaction action,oact;
2088
2089     action.sa_handler = term_handler;
2090     sigemptyset(&action.sa_mask);
2091     action.sa_flags = 0;
2092     if (sigaction(SIGTERM, &action, &oact) != 0) {
2093       barf("can't install TERM handler");
2094     }
2095   }
2096 #endif
2097
2098   /* A capability holds the state a native thread needs in
2099    * order to execute STG code. At least one capability is
2100    * floating around (only SMP builds have more than one).
2101    */
2102   initCapabilities();
2103   
2104 #if defined(RTS_SUPPORTS_THREADS)
2105     /* start our haskell execution tasks */
2106 # if defined(SMP)
2107     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2108 # else
2109     startTaskManager(0,taskStart);
2110 # endif
2111 #endif
2112
2113 #if /* defined(SMP) ||*/ defined(PAR)
2114   initSparkPools();
2115 #endif
2116
2117 #if defined(RTS_SUPPORTS_THREADS)
2118   RELEASE_LOCK(&sched_mutex);
2119 #endif
2120
2121 }
2122
2123 void
2124 exitScheduler( void )
2125 {
2126 #if defined(RTS_SUPPORTS_THREADS)
2127   stopTaskManager();
2128 #endif
2129   shutting_down_scheduler = rtsTrue;
2130 }
2131
2132 /* -----------------------------------------------------------------------------
2133    Managing the per-task allocation areas.
2134    
2135    Each capability comes with an allocation area.  These are
2136    fixed-length block lists into which allocation can be done.
2137
2138    ToDo: no support for two-space collection at the moment???
2139    -------------------------------------------------------------------------- */
2140
2141 /* -----------------------------------------------------------------------------
2142  * waitThread is the external interface for running a new computation
2143  * and waiting for the result.
2144  *
2145  * In the non-SMP case, we create a new main thread, push it on the 
2146  * main-thread stack, and invoke the scheduler to run it.  The
2147  * scheduler will return when the top main thread on the stack has
2148  * completed or died, and fill in the necessary fields of the
2149  * main_thread structure.
2150  *
2151  * In the SMP case, we create a main thread as before, but we then
2152  * create a new condition variable and sleep on it.  When our new
2153  * main thread has completed, we'll be woken up and the status/result
2154  * will be in the main_thread struct.
2155  * -------------------------------------------------------------------------- */
2156
2157 int 
2158 howManyThreadsAvail ( void )
2159 {
2160    int i = 0;
2161    StgTSO* q;
2162    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2163       i++;
2164    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2165       i++;
2166    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2167       i++;
2168    return i;
2169 }
2170
2171 void
2172 finishAllThreads ( void )
2173 {
2174    do {
2175       while (run_queue_hd != END_TSO_QUEUE) {
2176          waitThread ( run_queue_hd, NULL);
2177       }
2178       while (blocked_queue_hd != END_TSO_QUEUE) {
2179          waitThread ( blocked_queue_hd, NULL);
2180       }
2181       while (sleeping_queue != END_TSO_QUEUE) {
2182          waitThread ( blocked_queue_hd, NULL);
2183       }
2184    } while 
2185       (blocked_queue_hd != END_TSO_QUEUE || 
2186        run_queue_hd     != END_TSO_QUEUE ||
2187        sleeping_queue   != END_TSO_QUEUE);
2188 }
2189
2190 SchedulerStatus
2191 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2192
2193   StgMainThread *m;
2194
2195   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2196   m->tso = tso;
2197   m->ret = ret;
2198   m->stat = NoStatus;
2199 #if defined(RTS_SUPPORTS_THREADS)
2200   initCondition(&m->wakeup);
2201 #endif
2202
2203   /* see scheduleWaitThread() comment */
2204   ACQUIRE_LOCK(&sched_mutex);
2205   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2206   m->link = main_threads;
2207   main_threads = m;
2208   RELEASE_LOCK(&sched_mutex);
2209
2210   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2211 #if defined(THREADED_RTS)
2212   return waitThread_(m, rtsFalse);
2213 #else
2214   return waitThread_(m);
2215 #endif
2216 }
2217
2218 static
2219 SchedulerStatus
2220 waitThread_(StgMainThread* m
2221 #if defined(THREADED_RTS)
2222             , rtsBool blockWaiting
2223 #endif
2224            )
2225 {
2226   SchedulerStatus stat;
2227
2228   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2229
2230 #if defined(RTS_SUPPORTS_THREADS)
2231
2232 # if defined(THREADED_RTS)
2233   if (!blockWaiting) {
2234     /* In the threaded case, the OS thread that called main()
2235      * gets to enter the RTS directly without going via another
2236      * task/thread.
2237      */
2238     schedule();
2239     ASSERT(m->stat != NoStatus);
2240   } else 
2241 # endif
2242   {
2243     ACQUIRE_LOCK(&sched_mutex);
2244     do {
2245       waitCondition(&m->wakeup, &sched_mutex);
2246     } while (m->stat == NoStatus);
2247   }
2248 #elif defined(GRAN)
2249   /* GranSim specific init */
2250   CurrentTSO = m->tso;                // the TSO to run
2251   procStatus[MainProc] = Busy;        // status of main PE
2252   CurrentProc = MainProc;             // PE to run it on
2253
2254   schedule();
2255 #else
2256   RELEASE_LOCK(&sched_mutex);
2257   schedule();
2258   ASSERT(m->stat != NoStatus);
2259 #endif
2260
2261   stat = m->stat;
2262
2263 #if defined(RTS_SUPPORTS_THREADS)
2264   closeCondition(&m->wakeup);
2265 #endif
2266
2267   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2268                               m->tso->id));
2269   free(m);
2270
2271 #if defined(THREADED_RTS)
2272   if (blockWaiting) 
2273 #endif
2274     RELEASE_LOCK(&sched_mutex);
2275
2276   return stat;
2277 }
2278
2279 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2280 //@subsection Run queue code 
2281
2282 #if 0
2283 /* 
2284    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2285        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2286        implicit global variable that has to be correct when calling these
2287        fcts -- HWL 
2288 */
2289
2290 /* Put the new thread on the head of the runnable queue.
2291  * The caller of createThread better push an appropriate closure
2292  * on this thread's stack before the scheduler is invoked.
2293  */
2294 static /* inline */ void
2295 add_to_run_queue(tso)
2296 StgTSO* tso; 
2297 {
2298   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2299   tso->link = run_queue_hd;
2300   run_queue_hd = tso;
2301   if (run_queue_tl == END_TSO_QUEUE) {
2302     run_queue_tl = tso;
2303   }
2304 }
2305
2306 /* Put the new thread at the end of the runnable queue. */
2307 static /* inline */ void
2308 push_on_run_queue(tso)
2309 StgTSO* tso; 
2310 {
2311   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2312   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2313   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2314   if (run_queue_hd == END_TSO_QUEUE) {
2315     run_queue_hd = tso;
2316   } else {
2317     run_queue_tl->link = tso;
2318   }
2319   run_queue_tl = tso;
2320 }
2321
2322 /* 
2323    Should be inlined because it's used very often in schedule.  The tso
2324    argument is actually only needed in GranSim, where we want to have the
2325    possibility to schedule *any* TSO on the run queue, irrespective of the
2326    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2327    the run queue and dequeue the tso, adjusting the links in the queue. 
2328 */
2329 //@cindex take_off_run_queue
2330 static /* inline */ StgTSO*
2331 take_off_run_queue(StgTSO *tso) {
2332   StgTSO *t, *prev;
2333
2334   /* 
2335      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2336
2337      if tso is specified, unlink that tso from the run_queue (doesn't have
2338      to be at the beginning of the queue); GranSim only 
2339   */
2340   if (tso!=END_TSO_QUEUE) {
2341     /* find tso in queue */
2342     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2343          t!=END_TSO_QUEUE && t!=tso;
2344          prev=t, t=t->link) 
2345       /* nothing */ ;
2346     ASSERT(t==tso);
2347     /* now actually dequeue the tso */
2348     if (prev!=END_TSO_QUEUE) {
2349       ASSERT(run_queue_hd!=t);
2350       prev->link = t->link;
2351     } else {
2352       /* t is at beginning of thread queue */
2353       ASSERT(run_queue_hd==t);
2354       run_queue_hd = t->link;
2355     }
2356     /* t is at end of thread queue */
2357     if (t->link==END_TSO_QUEUE) {
2358       ASSERT(t==run_queue_tl);
2359       run_queue_tl = prev;
2360     } else {
2361       ASSERT(run_queue_tl!=t);
2362     }
2363     t->link = END_TSO_QUEUE;
2364   } else {
2365     /* take tso from the beginning of the queue; std concurrent code */
2366     t = run_queue_hd;
2367     if (t != END_TSO_QUEUE) {
2368       run_queue_hd = t->link;
2369       t->link = END_TSO_QUEUE;
2370       if (run_queue_hd == END_TSO_QUEUE) {
2371         run_queue_tl = END_TSO_QUEUE;
2372       }
2373     }
2374   }
2375   return t;
2376 }
2377
2378 #endif /* 0 */
2379
2380 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2381 //@subsection Garbage Collextion Routines
2382
2383 /* ---------------------------------------------------------------------------
2384    Where are the roots that we know about?
2385
2386         - all the threads on the runnable queue
2387         - all the threads on the blocked queue
2388         - all the threads on the sleeping queue
2389         - all the thread currently executing a _ccall_GC
2390         - all the "main threads"
2391      
2392    ------------------------------------------------------------------------ */
2393
2394 /* This has to be protected either by the scheduler monitor, or by the
2395         garbage collection monitor (probably the latter).
2396         KH @ 25/10/99
2397 */
2398
2399 void
2400 GetRoots(evac_fn evac)
2401 {
2402 #if defined(GRAN)
2403   {
2404     nat i;
2405     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2406       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2407           evac((StgClosure **)&run_queue_hds[i]);
2408       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2409           evac((StgClosure **)&run_queue_tls[i]);
2410       
2411       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2412           evac((StgClosure **)&blocked_queue_hds[i]);
2413       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2414           evac((StgClosure **)&blocked_queue_tls[i]);
2415       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2416           evac((StgClosure **)&ccalling_threads[i]);
2417     }
2418   }
2419
2420   markEventQueue();
2421
2422 #else /* !GRAN */
2423   if (run_queue_hd != END_TSO_QUEUE) {
2424       ASSERT(run_queue_tl != END_TSO_QUEUE);
2425       evac((StgClosure **)&run_queue_hd);
2426       evac((StgClosure **)&run_queue_tl);
2427   }
2428   
2429   if (blocked_queue_hd != END_TSO_QUEUE) {
2430       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2431       evac((StgClosure **)&blocked_queue_hd);
2432       evac((StgClosure **)&blocked_queue_tl);
2433   }
2434   
2435   if (sleeping_queue != END_TSO_QUEUE) {
2436       evac((StgClosure **)&sleeping_queue);
2437   }
2438 #endif 
2439
2440   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2441       evac((StgClosure **)&suspended_ccalling_threads);
2442   }
2443
2444 #if defined(PAR) || defined(GRAN)
2445   markSparkQueue(evac);
2446 #endif
2447 }
2448
2449 /* -----------------------------------------------------------------------------
2450    performGC
2451
2452    This is the interface to the garbage collector from Haskell land.
2453    We provide this so that external C code can allocate and garbage
2454    collect when called from Haskell via _ccall_GC.
2455
2456    It might be useful to provide an interface whereby the programmer
2457    can specify more roots (ToDo).
2458    
2459    This needs to be protected by the GC condition variable above.  KH.
2460    -------------------------------------------------------------------------- */
2461
2462 void (*extra_roots)(evac_fn);
2463
2464 void
2465 performGC(void)
2466 {
2467   /* Obligated to hold this lock upon entry */
2468   ACQUIRE_LOCK(&sched_mutex);
2469   GarbageCollect(GetRoots,rtsFalse);
2470   RELEASE_LOCK(&sched_mutex);
2471 }
2472
2473 void
2474 performMajorGC(void)
2475 {
2476   ACQUIRE_LOCK(&sched_mutex);
2477   GarbageCollect(GetRoots,rtsTrue);
2478   RELEASE_LOCK(&sched_mutex);
2479 }
2480
2481 static void
2482 AllRoots(evac_fn evac)
2483 {
2484     GetRoots(evac);             // the scheduler's roots
2485     extra_roots(evac);          // the user's roots
2486 }
2487
2488 void
2489 performGCWithRoots(void (*get_roots)(evac_fn))
2490 {
2491   ACQUIRE_LOCK(&sched_mutex);
2492   extra_roots = get_roots;
2493   GarbageCollect(AllRoots,rtsFalse);
2494   RELEASE_LOCK(&sched_mutex);
2495 }
2496
2497 /* -----------------------------------------------------------------------------
2498    Stack overflow
2499
2500    If the thread has reached its maximum stack size, then raise the
2501    StackOverflow exception in the offending thread.  Otherwise
2502    relocate the TSO into a larger chunk of memory and adjust its stack
2503    size appropriately.
2504    -------------------------------------------------------------------------- */
2505
2506 static StgTSO *
2507 threadStackOverflow(StgTSO *tso)
2508 {
2509   nat new_stack_size, new_tso_size, diff, stack_words;
2510   StgPtr new_sp;
2511   StgTSO *dest;
2512
2513   IF_DEBUG(sanity,checkTSO(tso));
2514   if (tso->stack_size >= tso->max_stack_size) {
2515
2516     IF_DEBUG(gc,
2517              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2518                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2519              /* If we're debugging, just print out the top of the stack */
2520              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2521                                               tso->sp+64)));
2522
2523     /* Send this thread the StackOverflow exception */
2524     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2525     return tso;
2526   }
2527
2528   /* Try to double the current stack size.  If that takes us over the
2529    * maximum stack size for this thread, then use the maximum instead.
2530    * Finally round up so the TSO ends up as a whole number of blocks.
2531    */
2532   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2533   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2534                                        TSO_STRUCT_SIZE)/sizeof(W_);
2535   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2536   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2537
2538   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2539
2540   dest = (StgTSO *)allocate(new_tso_size);
2541   TICK_ALLOC_TSO(new_stack_size,0);
2542
2543   /* copy the TSO block and the old stack into the new area */
2544   memcpy(dest,tso,TSO_STRUCT_SIZE);
2545   stack_words = tso->stack + tso->stack_size - tso->sp;
2546   new_sp = (P_)dest + new_tso_size - stack_words;
2547   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2548
2549   /* relocate the stack pointers... */
2550   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2551   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2552   dest->sp    = new_sp;
2553   dest->stack_size = new_stack_size;
2554         
2555   /* and relocate the update frame list */
2556   relocate_stack(dest, diff);
2557
2558   /* Mark the old TSO as relocated.  We have to check for relocated
2559    * TSOs in the garbage collector and any primops that deal with TSOs.
2560    *
2561    * It's important to set the sp and su values to just beyond the end
2562    * of the stack, so we don't attempt to scavenge any part of the
2563    * dead TSO's stack.
2564    */
2565   tso->what_next = ThreadRelocated;
2566   tso->link = dest;
2567   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2568   tso->su = (StgUpdateFrame *)tso->sp;
2569   tso->why_blocked = NotBlocked;
2570   dest->mut_link = NULL;
2571
2572   IF_PAR_DEBUG(verbose,
2573                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2574                      tso->id, tso, tso->stack_size);
2575                /* If we're debugging, just print out the top of the stack */
2576                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2577                                                 tso->sp+64)));
2578   
2579   IF_DEBUG(sanity,checkTSO(tso));
2580 #if 0
2581   IF_DEBUG(scheduler,printTSO(dest));
2582 #endif
2583
2584   return dest;
2585 }
2586
2587 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2588 //@subsection Blocking Queue Routines
2589
2590 /* ---------------------------------------------------------------------------
2591    Wake up a queue that was blocked on some resource.
2592    ------------------------------------------------------------------------ */
2593
2594 #if defined(GRAN)
2595 static inline void
2596 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2597 {
2598 }
2599 #elif defined(PAR)
2600 static inline void
2601 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2602 {
2603   /* write RESUME events to log file and
2604      update blocked and fetch time (depending on type of the orig closure) */
2605   if (RtsFlags.ParFlags.ParStats.Full) {
2606     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2607                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2608                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2609     if (EMPTY_RUN_QUEUE())
2610       emitSchedule = rtsTrue;
2611
2612     switch (get_itbl(node)->type) {
2613         case FETCH_ME_BQ:
2614           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2615           break;
2616         case RBH:
2617         case FETCH_ME:
2618         case BLACKHOLE_BQ:
2619           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2620           break;
2621 #ifdef DIST
2622         case MVAR:
2623           break;
2624 #endif    
2625         default:
2626           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2627         }
2628       }
2629 }
2630 #endif
2631
2632 #if defined(GRAN)
2633 static StgBlockingQueueElement *
2634 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2635 {
2636     StgTSO *tso;
2637     PEs node_loc, tso_loc;
2638
2639     node_loc = where_is(node); // should be lifted out of loop
2640     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2641     tso_loc = where_is((StgClosure *)tso);
2642     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2643       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2644       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2645       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2646       // insertThread(tso, node_loc);
2647       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2648                 ResumeThread,
2649                 tso, node, (rtsSpark*)NULL);
2650       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2651       // len_local++;
2652       // len++;
2653     } else { // TSO is remote (actually should be FMBQ)
2654       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2655                                   RtsFlags.GranFlags.Costs.gunblocktime +
2656                                   RtsFlags.GranFlags.Costs.latency;
2657       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2658                 UnblockThread,
2659                 tso, node, (rtsSpark*)NULL);
2660       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2661       // len++;
2662     }
2663     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2664     IF_GRAN_DEBUG(bq,
2665                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2666                           (node_loc==tso_loc ? "Local" : "Global"), 
2667                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2668     tso->block_info.closure = NULL;
2669     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2670                              tso->id, tso));
2671 }
2672 #elif defined(PAR)
2673 static StgBlockingQueueElement *
2674 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2675 {
2676     StgBlockingQueueElement *next;
2677
2678     switch (get_itbl(bqe)->type) {
2679     case TSO:
2680       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2681       /* if it's a TSO just push it onto the run_queue */
2682       next = bqe->link;
2683       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2684       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2685       THREAD_RUNNABLE();
2686       unblockCount(bqe, node);
2687       /* reset blocking status after dumping event */
2688       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2689       break;
2690
2691     case BLOCKED_FETCH:
2692       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2693       next = bqe->link;
2694       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2695       PendingFetches = (StgBlockedFetch *)bqe;
2696       break;
2697
2698 # if defined(DEBUG)
2699       /* can ignore this case in a non-debugging setup; 
2700          see comments on RBHSave closures above */
2701     case CONSTR:
2702       /* check that the closure is an RBHSave closure */
2703       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2704              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2705              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2706       break;
2707
2708     default:
2709       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2710            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2711            (StgClosure *)bqe);
2712 # endif
2713     }
2714   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2715   return next;
2716 }
2717
2718 #else /* !GRAN && !PAR */
2719 static StgTSO *
2720 unblockOneLocked(StgTSO *tso)
2721 {
2722   StgTSO *next;
2723
2724   ASSERT(get_itbl(tso)->type == TSO);
2725   ASSERT(tso->why_blocked != NotBlocked);
2726   tso->why_blocked = NotBlocked;
2727   next = tso->link;
2728   PUSH_ON_RUN_QUEUE(tso);
2729   THREAD_RUNNABLE();
2730   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2731   return next;
2732 }
2733 #endif
2734
2735 #if defined(GRAN) || defined(PAR)
2736 inline StgBlockingQueueElement *
2737 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2738 {
2739   ACQUIRE_LOCK(&sched_mutex);
2740   bqe = unblockOneLocked(bqe, node);
2741   RELEASE_LOCK(&sched_mutex);
2742   return bqe;
2743 }
2744 #else
2745 inline StgTSO *
2746 unblockOne(StgTSO *tso)
2747 {
2748   ACQUIRE_LOCK(&sched_mutex);
2749   tso = unblockOneLocked(tso);
2750   RELEASE_LOCK(&sched_mutex);
2751   return tso;
2752 }
2753 #endif
2754
2755 #if defined(GRAN)
2756 void 
2757 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2758 {
2759   StgBlockingQueueElement *bqe;
2760   PEs node_loc;
2761   nat len = 0; 
2762
2763   IF_GRAN_DEBUG(bq, 
2764                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2765                       node, CurrentProc, CurrentTime[CurrentProc], 
2766                       CurrentTSO->id, CurrentTSO));
2767
2768   node_loc = where_is(node);
2769
2770   ASSERT(q == END_BQ_QUEUE ||
2771          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2772          get_itbl(q)->type == CONSTR); // closure (type constructor)
2773   ASSERT(is_unique(node));
2774
2775   /* FAKE FETCH: magically copy the node to the tso's proc;
2776      no Fetch necessary because in reality the node should not have been 
2777      moved to the other PE in the first place
2778   */
2779   if (CurrentProc!=node_loc) {
2780     IF_GRAN_DEBUG(bq, 
2781                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2782                         node, node_loc, CurrentProc, CurrentTSO->id, 
2783                         // CurrentTSO, where_is(CurrentTSO),
2784                         node->header.gran.procs));
2785     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2786     IF_GRAN_DEBUG(bq, 
2787                   belch("## new bitmask of node %p is %#x",
2788                         node, node->header.gran.procs));
2789     if (RtsFlags.GranFlags.GranSimStats.Global) {
2790       globalGranStats.tot_fake_fetches++;
2791     }
2792   }
2793
2794   bqe = q;
2795   // ToDo: check: ASSERT(CurrentProc==node_loc);
2796   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2797     //next = bqe->link;
2798     /* 
2799        bqe points to the current element in the queue
2800        next points to the next element in the queue
2801     */
2802     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2803     //tso_loc = where_is(tso);
2804     len++;
2805     bqe = unblockOneLocked(bqe, node);
2806   }
2807
2808   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2809      the closure to make room for the anchor of the BQ */
2810   if (bqe!=END_BQ_QUEUE) {
2811     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2812     /*
2813     ASSERT((info_ptr==&RBH_Save_0_info) ||
2814            (info_ptr==&RBH_Save_1_info) ||
2815            (info_ptr==&RBH_Save_2_info));
2816     */
2817     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2818     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2819     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2820
2821     IF_GRAN_DEBUG(bq,
2822                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2823                         node, info_type(node)));
2824   }
2825
2826   /* statistics gathering */
2827   if (RtsFlags.GranFlags.GranSimStats.Global) {
2828     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2829     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2830     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2831     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2832   }
2833   IF_GRAN_DEBUG(bq,
2834                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2835                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2836 }
2837 #elif defined(PAR)
2838 void 
2839 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2840 {
2841   StgBlockingQueueElement *bqe;
2842
2843   ACQUIRE_LOCK(&sched_mutex);
2844
2845   IF_PAR_DEBUG(verbose, 
2846                belch("##-_ AwBQ for node %p on [%x]: ",
2847                      node, mytid));
2848 #ifdef DIST  
2849   //RFP
2850   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2851     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2852     return;
2853   }
2854 #endif
2855   
2856   ASSERT(q == END_BQ_QUEUE ||
2857          get_itbl(q)->type == TSO ||           
2858          get_itbl(q)->type == BLOCKED_FETCH || 
2859          get_itbl(q)->type == CONSTR); 
2860
2861   bqe = q;
2862   while (get_itbl(bqe)->type==TSO || 
2863          get_itbl(bqe)->type==BLOCKED_FETCH) {
2864     bqe = unblockOneLocked(bqe, node);
2865   }
2866   RELEASE_LOCK(&sched_mutex);
2867 }
2868
2869 #else   /* !GRAN && !PAR */
2870 void
2871 awakenBlockedQueue(StgTSO *tso)
2872 {
2873   ACQUIRE_LOCK(&sched_mutex);
2874   while (tso != END_TSO_QUEUE) {
2875     tso = unblockOneLocked(tso);
2876   }
2877   RELEASE_LOCK(&sched_mutex);
2878 }
2879 #endif
2880
2881 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2882 //@subsection Exception Handling Routines
2883
2884 /* ---------------------------------------------------------------------------
2885    Interrupt execution
2886    - usually called inside a signal handler so it mustn't do anything fancy.   
2887    ------------------------------------------------------------------------ */
2888
2889 void
2890 interruptStgRts(void)
2891 {
2892     interrupted    = 1;
2893     context_switch = 1;
2894 }
2895
2896 /* -----------------------------------------------------------------------------
2897    Unblock a thread
2898
2899    This is for use when we raise an exception in another thread, which
2900    may be blocked.
2901    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2902    -------------------------------------------------------------------------- */
2903
2904 #if defined(GRAN) || defined(PAR)
2905 /*
2906   NB: only the type of the blocking queue is different in GranSim and GUM
2907       the operations on the queue-elements are the same
2908       long live polymorphism!
2909
2910   Locks: sched_mutex is held upon entry and exit.
2911
2912 */
2913 static void
2914 unblockThread(StgTSO *tso)
2915 {
2916   StgBlockingQueueElement *t, **last;
2917
2918   switch (tso->why_blocked) {
2919
2920   case NotBlocked:
2921     return;  /* not blocked */
2922
2923   case BlockedOnMVar:
2924     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2925     {
2926       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2927       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2928
2929       last = (StgBlockingQueueElement **)&mvar->head;
2930       for (t = (StgBlockingQueueElement *)mvar->head; 
2931            t != END_BQ_QUEUE; 
2932            last = &t->link, last_tso = t, t = t->link) {
2933         if (t == (StgBlockingQueueElement *)tso) {
2934           *last = (StgBlockingQueueElement *)tso->link;
2935           if (mvar->tail == tso) {
2936             mvar->tail = (StgTSO *)last_tso;
2937           }
2938           goto done;
2939         }
2940       }
2941       barf("unblockThread (MVAR): TSO not found");
2942     }
2943
2944   case BlockedOnBlackHole:
2945     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2946     {
2947       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2948
2949       last = &bq->blocking_queue;
2950       for (t = bq->blocking_queue; 
2951            t != END_BQ_QUEUE; 
2952            last = &t->link, t = t->link) {
2953         if (t == (StgBlockingQueueElement *)tso) {
2954           *last = (StgBlockingQueueElement *)tso->link;
2955           goto done;
2956         }
2957       }
2958       barf("unblockThread (BLACKHOLE): TSO not found");
2959     }
2960
2961   case BlockedOnException:
2962     {
2963       StgTSO *target  = tso->block_info.tso;
2964
2965       ASSERT(get_itbl(target)->type == TSO);
2966
2967       if (target->what_next == ThreadRelocated) {
2968           target = target->link;
2969           ASSERT(get_itbl(target)->type == TSO);
2970       }
2971
2972       ASSERT(target->blocked_exceptions != NULL);
2973
2974       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2975       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2976            t != END_BQ_QUEUE; 
2977            last = &t->link, t = t->link) {
2978         ASSERT(get_itbl(t)->type == TSO);
2979         if (t == (StgBlockingQueueElement *)tso) {
2980           *last = (StgBlockingQueueElement *)tso->link;
2981           goto done;
2982         }
2983       }
2984       barf("unblockThread (Exception): TSO not found");
2985     }
2986
2987   case BlockedOnRead:
2988   case BlockedOnWrite:
2989     {
2990       /* take TSO off blocked_queue */
2991       StgBlockingQueueElement *prev = NULL;
2992       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2993            prev = t, t = t->link) {
2994         if (t == (StgBlockingQueueElement *)tso) {
2995           if (prev == NULL) {
2996             blocked_queue_hd = (StgTSO *)t->link;
2997             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2998               blocked_queue_tl = END_TSO_QUEUE;
2999             }
3000           } else {
3001             prev->link = t->link;
3002             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3003               blocked_queue_tl = (StgTSO *)prev;
3004             }
3005           }
3006           goto done;
3007         }
3008       }
3009       barf("unblockThread (I/O): TSO not found");
3010     }
3011
3012   case BlockedOnDelay:
3013     {
3014       /* take TSO off sleeping_queue */
3015       StgBlockingQueueElement *prev = NULL;
3016       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3017            prev = t, t = t->link) {
3018         if (t == (StgBlockingQueueElement *)tso) {
3019           if (prev == NULL) {
3020             sleeping_queue = (StgTSO *)t->link;
3021           } else {
3022             prev->link = t->link;
3023           }
3024           goto done;
3025         }
3026       }
3027       barf("unblockThread (I/O): TSO not found");
3028     }
3029
3030   default:
3031     barf("unblockThread");
3032   }
3033
3034  done:
3035   tso->link = END_TSO_QUEUE;
3036   tso->why_blocked = NotBlocked;
3037   tso->block_info.closure = NULL;
3038   PUSH_ON_RUN_QUEUE(tso);
3039 }
3040 #else
3041 static void
3042 unblockThread(StgTSO *tso)
3043 {
3044   StgTSO *t, **last;
3045   
3046   /* To avoid locking unnecessarily. */
3047   if (tso->why_blocked == NotBlocked) {
3048     return;
3049   }
3050
3051   switch (tso->why_blocked) {
3052
3053   case BlockedOnMVar:
3054     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3055     {
3056       StgTSO *last_tso = END_TSO_QUEUE;
3057       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3058
3059       last = &mvar->head;
3060       for (t = mvar->head; t != END_TSO_QUEUE; 
3061            last = &t->link, last_tso = t, t = t->link) {
3062         if (t == tso) {
3063           *last = tso->link;
3064           if (mvar->tail == tso) {
3065             mvar->tail = last_tso;
3066           }
3067           goto done;
3068         }
3069       }
3070       barf("unblockThread (MVAR): TSO not found");
3071     }
3072
3073   case BlockedOnBlackHole:
3074     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3075     {
3076       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3077
3078       last = &bq->blocking_queue;
3079       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3080            last = &t->link, t = t->link) {
3081         if (t == tso) {
3082           *last = tso->link;
3083           goto done;
3084         }
3085       }
3086       barf("unblockThread (BLACKHOLE): TSO not found");
3087     }
3088
3089   case BlockedOnException:
3090     {
3091       StgTSO *target  = tso->block_info.tso;
3092
3093       ASSERT(get_itbl(target)->type == TSO);
3094
3095       while (target->what_next == ThreadRelocated) {
3096           target = target->link;
3097           ASSERT(get_itbl(target)->type == TSO);
3098       }
3099       
3100       ASSERT(target->blocked_exceptions != NULL);
3101
3102       last = &target->blocked_exceptions;
3103       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3104            last = &t->link, t = t->link) {
3105         ASSERT(get_itbl(t)->type == TSO);
3106         if (t == tso) {
3107           *last = tso->link;
3108           goto done;
3109         }
3110       }
3111       barf("unblockThread (Exception): TSO not found");
3112     }
3113
3114   case BlockedOnRead:
3115   case BlockedOnWrite:
3116     {
3117       StgTSO *prev = NULL;
3118       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3119            prev = t, t = t->link) {
3120         if (t == tso) {
3121           if (prev == NULL) {
3122             blocked_queue_hd = t->link;
3123             if (blocked_queue_tl == t) {
3124               blocked_queue_tl = END_TSO_QUEUE;
3125             }
3126           } else {
3127             prev->link = t->link;
3128             if (blocked_queue_tl == t) {
3129               blocked_queue_tl = prev;
3130             }
3131           }
3132           goto done;
3133         }
3134       }
3135       barf("unblockThread (I/O): TSO not found");
3136     }
3137
3138   case BlockedOnDelay:
3139     {
3140       StgTSO *prev = NULL;
3141       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3142            prev = t, t = t->link) {
3143         if (t == tso) {
3144           if (prev == NULL) {
3145             sleeping_queue = t->link;
3146           } else {
3147             prev->link = t->link;
3148           }
3149           goto done;
3150         }
3151       }
3152       barf("unblockThread (I/O): TSO not found");
3153     }
3154
3155   default:
3156     barf("unblockThread");
3157   }
3158
3159  done:
3160   tso->link = END_TSO_QUEUE;
3161   tso->why_blocked = NotBlocked;
3162   tso->block_info.closure = NULL;
3163   PUSH_ON_RUN_QUEUE(tso);
3164 }
3165 #endif
3166
3167 /* -----------------------------------------------------------------------------
3168  * raiseAsync()
3169  *
3170  * The following function implements the magic for raising an
3171  * asynchronous exception in an existing thread.
3172  *
3173  * We first remove the thread from any queue on which it might be
3174  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3175  *
3176  * We strip the stack down to the innermost CATCH_FRAME, building
3177  * thunks in the heap for all the active computations, so they can 
3178  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3179  * an application of the handler to the exception, and push it on
3180  * the top of the stack.
3181  * 
3182  * How exactly do we save all the active computations?  We create an
3183  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3184  * AP_UPDs pushes everything from the corresponding update frame
3185  * upwards onto the stack.  (Actually, it pushes everything up to the
3186  * next update frame plus a pointer to the next AP_UPD object.
3187  * Entering the next AP_UPD object pushes more onto the stack until we
3188  * reach the last AP_UPD object - at which point the stack should look
3189  * exactly as it did when we killed the TSO and we can continue
3190  * execution by entering the closure on top of the stack.
3191  *
3192  * We can also kill a thread entirely - this happens if either (a) the 
3193  * exception passed to raiseAsync is NULL, or (b) there's no
3194  * CATCH_FRAME on the stack.  In either case, we strip the entire
3195  * stack and replace the thread with a zombie.
3196  *
3197  * Locks: sched_mutex held upon entry nor exit.
3198  *
3199  * -------------------------------------------------------------------------- */
3200  
3201 void 
3202 deleteThread(StgTSO *tso)
3203 {
3204   raiseAsync(tso,NULL);
3205 }
3206
3207 void
3208 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3209 {
3210   /* When raising async exs from contexts where sched_mutex isn't held;
3211      use raiseAsyncWithLock(). */
3212   ACQUIRE_LOCK(&sched_mutex);
3213   raiseAsync(tso,exception);
3214   RELEASE_LOCK(&sched_mutex);
3215 }
3216
3217 void
3218 raiseAsync(StgTSO *tso, StgClosure *exception)
3219 {
3220   StgUpdateFrame* su = tso->su;
3221   StgPtr          sp = tso->sp;
3222   
3223   /* Thread already dead? */
3224   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3225     return;
3226   }
3227
3228   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3229
3230   /* Remove it from any blocking queues */
3231   unblockThread(tso);
3232
3233   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3234   /* The stack freezing code assumes there's a closure pointer on
3235    * the top of the stack.  This isn't always the case with compiled
3236    * code, so we have to push a dummy closure on the top which just
3237    * returns to the next return address on the stack.
3238    */
3239   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3240     *(--sp) = (W_)&stg_dummy_ret_closure;
3241   }
3242
3243   while (1) {
3244     nat words = ((P_)su - (P_)sp) - 1;
3245     nat i;
3246     StgAP_UPD * ap;
3247
3248     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3249      * then build the THUNK raise(exception), and leave it on
3250      * top of the CATCH_FRAME ready to enter.
3251      */
3252     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3253 #ifdef PROFILING
3254       StgCatchFrame *cf = (StgCatchFrame *)su;
3255 #endif
3256       StgClosure *raise;
3257
3258       /* we've got an exception to raise, so let's pass it to the
3259        * handler in this frame.
3260        */
3261       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3262       TICK_ALLOC_SE_THK(1,0);
3263       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3264       raise->payload[0] = exception;
3265
3266       /* throw away the stack from Sp up to the CATCH_FRAME.
3267        */
3268       sp = (P_)su - 1;
3269
3270       /* Ensure that async excpetions are blocked now, so we don't get
3271        * a surprise exception before we get around to executing the
3272        * handler.
3273        */
3274       if (tso->blocked_exceptions == NULL) {
3275           tso->blocked_exceptions = END_TSO_QUEUE;
3276       }
3277
3278       /* Put the newly-built THUNK on top of the stack, ready to execute
3279        * when the thread restarts.
3280        */
3281       sp[0] = (W_)raise;
3282       tso->sp = sp;
3283       tso->su = su;
3284       tso->what_next = ThreadEnterGHC;
3285       IF_DEBUG(sanity, checkTSO(tso));
3286       return;
3287     }
3288
3289     /* First build an AP_UPD consisting of the stack chunk above the
3290      * current update frame, with the top word on the stack as the
3291      * fun field.
3292      */
3293     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3294     
3295     ASSERT(words >= 0);
3296     
3297     ap->n_args = words;
3298     ap->fun    = (StgClosure *)sp[0];
3299     sp++;
3300     for(i=0; i < (nat)words; ++i) {
3301       ap->payload[i] = (StgClosure *)*sp++;
3302     }
3303     
3304     switch (get_itbl(su)->type) {
3305       
3306     case UPDATE_FRAME:
3307       {
3308         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3309         TICK_ALLOC_UP_THK(words+1,0);
3310         
3311         IF_DEBUG(scheduler,
3312                  fprintf(stderr,  "scheduler: Updating ");
3313                  printPtr((P_)su->updatee); 
3314                  fprintf(stderr,  " with ");
3315                  printObj((StgClosure *)ap);
3316                  );
3317         
3318         /* Replace the updatee with an indirection - happily
3319          * this will also wake up any threads currently
3320          * waiting on the result.
3321          *
3322          * Warning: if we're in a loop, more than one update frame on
3323          * the stack may point to the same object.  Be careful not to
3324          * overwrite an IND_OLDGEN in this case, because we'll screw
3325          * up the mutable lists.  To be on the safe side, don't
3326          * overwrite any kind of indirection at all.  See also
3327          * threadSqueezeStack in GC.c, where we have to make a similar
3328          * check.
3329          */
3330         if (!closure_IND(su->updatee)) {
3331             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3332         }
3333         su = su->link;
3334         sp += sizeofW(StgUpdateFrame) -1;
3335         sp[0] = (W_)ap; /* push onto stack */
3336         break;
3337       }
3338
3339     case CATCH_FRAME:
3340       {
3341         StgCatchFrame *cf = (StgCatchFrame *)su;
3342         StgClosure* o;
3343         
3344         /* We want a PAP, not an AP_UPD.  Fortunately, the
3345          * layout's the same.
3346          */
3347         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3348         TICK_ALLOC_UPD_PAP(words+1,0);
3349         
3350         /* now build o = FUN(catch,ap,handler) */
3351         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3352         TICK_ALLOC_FUN(2,0);
3353         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3354         o->payload[0] = (StgClosure *)ap;
3355         o->payload[1] = cf->handler;
3356         
3357         IF_DEBUG(scheduler,
3358                  fprintf(stderr,  "scheduler: Built ");
3359                  printObj((StgClosure *)o);
3360                  );
3361         
3362         /* pop the old handler and put o on the stack */
3363         su = cf->link;
3364         sp += sizeofW(StgCatchFrame) - 1;
3365         sp[0] = (W_)o;
3366         break;
3367       }
3368       
3369     case SEQ_FRAME:
3370       {
3371         StgSeqFrame *sf = (StgSeqFrame *)su;
3372         StgClosure* o;
3373         
3374         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3375         TICK_ALLOC_UPD_PAP(words+1,0);
3376         
3377         /* now build o = FUN(seq,ap) */
3378         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3379         TICK_ALLOC_SE_THK(1,0);
3380         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3381         o->payload[0] = (StgClosure *)ap;
3382         
3383         IF_DEBUG(scheduler,
3384                  fprintf(stderr,  "scheduler: Built ");
3385                  printObj((StgClosure *)o);
3386                  );
3387         
3388         /* pop the old handler and put o on the stack */
3389         su = sf->link;
3390         sp += sizeofW(StgSeqFrame) - 1;
3391         sp[0] = (W_)o;
3392         break;
3393       }
3394       
3395     case STOP_FRAME:
3396       /* We've stripped the entire stack, the thread is now dead. */
3397       sp += sizeofW(StgStopFrame) - 1;
3398       sp[0] = (W_)exception;    /* save the exception */
3399       tso->what_next = ThreadKilled;
3400       tso->su = (StgUpdateFrame *)(sp+1);
3401       tso->sp = sp;
3402       return;
3403
3404     default:
3405       barf("raiseAsync");
3406     }
3407   }
3408   barf("raiseAsync");
3409 }
3410
3411 /* -----------------------------------------------------------------------------
3412    resurrectThreads is called after garbage collection on the list of
3413    threads found to be garbage.  Each of these threads will be woken
3414    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3415    on an MVar, or NonTermination if the thread was blocked on a Black
3416    Hole.
3417
3418    Locks: sched_mutex isn't held upon entry nor exit.
3419    -------------------------------------------------------------------------- */
3420
3421 void
3422 resurrectThreads( StgTSO *threads )
3423 {
3424   StgTSO *tso, *next;
3425
3426   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3427     next = tso->global_link;
3428     tso->global_link = all_threads;
3429     all_threads = tso;
3430     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3431
3432     switch (tso->why_blocked) {
3433     case BlockedOnMVar:
3434     case BlockedOnException:
3435       /* Called by GC - sched_mutex lock is currently held. */
3436       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3437       break;
3438     case BlockedOnBlackHole:
3439       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3440       break;
3441     case NotBlocked:
3442       /* This might happen if the thread was blocked on a black hole
3443        * belonging to a thread that we've just woken up (raiseAsync
3444        * can wake up threads, remember...).
3445        */
3446       continue;
3447     default:
3448       barf("resurrectThreads: thread blocked in a strange way");
3449     }
3450   }
3451 }
3452
3453 /* -----------------------------------------------------------------------------
3454  * Blackhole detection: if we reach a deadlock, test whether any
3455  * threads are blocked on themselves.  Any threads which are found to
3456  * be self-blocked get sent a NonTermination exception.
3457  *
3458  * This is only done in a deadlock situation in order to avoid
3459  * performance overhead in the normal case.
3460  *
3461  * Locks: sched_mutex is held upon entry and exit.
3462  * -------------------------------------------------------------------------- */
3463
3464 static void
3465 detectBlackHoles( void )
3466 {
3467     StgTSO *t = all_threads;
3468     StgUpdateFrame *frame;
3469     StgClosure *blocked_on;
3470
3471     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3472
3473         while (t->what_next == ThreadRelocated) {
3474             t = t->link;
3475             ASSERT(get_itbl(t)->type == TSO);
3476         }
3477       
3478         if (t->why_blocked != BlockedOnBlackHole) {
3479             continue;
3480         }
3481
3482         blocked_on = t->block_info.closure;
3483
3484         for (frame = t->su; ; frame = frame->link) {
3485             switch (get_itbl(frame)->type) {
3486
3487             case UPDATE_FRAME:
3488                 if (frame->updatee == blocked_on) {
3489                     /* We are blocking on one of our own computations, so
3490                      * send this thread the NonTermination exception.  
3491                      */
3492                     IF_DEBUG(scheduler, 
3493                              sched_belch("thread %d is blocked on itself", t->id));
3494                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3495                     goto done;
3496                 }
3497                 else {
3498                     continue;
3499                 }
3500
3501             case CATCH_FRAME:
3502             case SEQ_FRAME:
3503                 continue;
3504                 
3505             case STOP_FRAME:
3506                 break;
3507             }
3508             break;
3509         }
3510
3511     done: ;
3512     }   
3513 }
3514
3515 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3516 //@subsection Debugging Routines
3517
3518 /* -----------------------------------------------------------------------------
3519    Debugging: why is a thread blocked
3520    -------------------------------------------------------------------------- */
3521
3522 #ifdef DEBUG
3523
3524 void
3525 printThreadBlockage(StgTSO *tso)
3526 {
3527   switch (tso->why_blocked) {
3528   case BlockedOnRead:
3529     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3530     break;
3531   case BlockedOnWrite:
3532     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3533     break;
3534   case BlockedOnDelay:
3535     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3536     break;
3537   case BlockedOnMVar:
3538     fprintf(stderr,"is blocked on an MVar");
3539     break;
3540   case BlockedOnException:
3541     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3542             tso->block_info.tso->id);
3543     break;
3544   case BlockedOnBlackHole:
3545     fprintf(stderr,"is blocked on a black hole");
3546     break;
3547   case NotBlocked:
3548     fprintf(stderr,"is not blocked");
3549     break;
3550 #if defined(PAR)
3551   case BlockedOnGA:
3552     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3553             tso->block_info.closure, info_type(tso->block_info.closure));
3554     break;
3555   case BlockedOnGA_NoSend:
3556     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3557             tso->block_info.closure, info_type(tso->block_info.closure));
3558     break;
3559 #endif
3560 #if defined(RTS_SUPPORTS_THREADS)
3561   case BlockedOnCCall:
3562     fprintf(stderr,"is blocked on an external call");
3563     break;
3564 #endif
3565   default:
3566     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3567          tso->why_blocked, tso->id, tso);
3568   }
3569 }
3570
3571 void
3572 printThreadStatus(StgTSO *tso)
3573 {
3574   switch (tso->what_next) {
3575   case ThreadKilled:
3576     fprintf(stderr,"has been killed");
3577     break;
3578   case ThreadComplete:
3579     fprintf(stderr,"has completed");
3580     break;
3581   default:
3582     printThreadBlockage(tso);
3583   }
3584 }
3585
3586 void
3587 printAllThreads(void)
3588 {
3589   StgTSO *t;
3590   void *label;
3591
3592 # if defined(GRAN)
3593   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3594   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3595                        time_string, rtsFalse/*no commas!*/);
3596
3597   sched_belch("all threads at [%s]:", time_string);
3598 # elif defined(PAR)
3599   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3600   ullong_format_string(CURRENT_TIME,
3601                        time_string, rtsFalse/*no commas!*/);
3602
3603   sched_belch("all threads at [%s]:", time_string);
3604 # else
3605   sched_belch("all threads:");
3606 # endif
3607
3608   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3609     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3610     label = lookupThreadLabel((StgWord)t);
3611     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3612     printThreadStatus(t);
3613     fprintf(stderr,"\n");
3614   }
3615 }
3616     
3617 /* 
3618    Print a whole blocking queue attached to node (debugging only).
3619 */
3620 //@cindex print_bq
3621 # if defined(PAR)
3622 void 
3623 print_bq (StgClosure *node)
3624 {
3625   StgBlockingQueueElement *bqe;
3626   StgTSO *tso;
3627   rtsBool end;
3628
3629   fprintf(stderr,"## BQ of closure %p (%s): ",
3630           node, info_type(node));
3631
3632   /* should cover all closures that may have a blocking queue */
3633   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3634          get_itbl(node)->type == FETCH_ME_BQ ||
3635          get_itbl(node)->type == RBH ||
3636          get_itbl(node)->type == MVAR);
3637     
3638   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3639
3640   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3641 }
3642
3643 /* 
3644    Print a whole blocking queue starting with the element bqe.
3645 */
3646 void 
3647 print_bqe (StgBlockingQueueElement *bqe)
3648 {
3649   rtsBool end;
3650
3651   /* 
3652      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3653   */
3654   for (end = (bqe==END_BQ_QUEUE);
3655        !end; // iterate until bqe points to a CONSTR
3656        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3657        bqe = end ? END_BQ_QUEUE : bqe->link) {
3658     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3659     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3660     /* types of closures that may appear in a blocking queue */
3661     ASSERT(get_itbl(bqe)->type == TSO ||           
3662            get_itbl(bqe)->type == BLOCKED_FETCH || 
3663            get_itbl(bqe)->type == CONSTR); 
3664     /* only BQs of an RBH end with an RBH_Save closure */
3665     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3666
3667     switch (get_itbl(bqe)->type) {
3668     case TSO:
3669       fprintf(stderr," TSO %u (%x),",
3670               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3671       break;
3672     case BLOCKED_FETCH:
3673       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3674               ((StgBlockedFetch *)bqe)->node, 
3675               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3676               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3677               ((StgBlockedFetch *)bqe)->ga.weight);
3678       break;
3679     case CONSTR:
3680       fprintf(stderr," %s (IP %p),",
3681               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3682                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3683                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3684                "RBH_Save_?"), get_itbl(bqe));
3685       break;
3686     default:
3687       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3688            info_type((StgClosure *)bqe)); // , node, info_type(node));
3689       break;
3690     }
3691   } /* for */
3692   fputc('\n', stderr);
3693 }
3694 # elif defined(GRAN)
3695 void 
3696 print_bq (StgClosure *node)
3697 {
3698   StgBlockingQueueElement *bqe;
3699   PEs node_loc, tso_loc;
3700   rtsBool end;
3701
3702   /* should cover all closures that may have a blocking queue */
3703   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3704          get_itbl(node)->type == FETCH_ME_BQ ||
3705          get_itbl(node)->type == RBH);
3706     
3707   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3708   node_loc = where_is(node);
3709
3710   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3711           node, info_type(node), node_loc);
3712
3713   /* 
3714      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3715   */
3716   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3717        !end; // iterate until bqe points to a CONSTR
3718        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3719     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3720     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3721     /* types of closures that may appear in a blocking queue */
3722     ASSERT(get_itbl(bqe)->type == TSO ||           
3723            get_itbl(bqe)->type == CONSTR); 
3724     /* only BQs of an RBH end with an RBH_Save closure */
3725     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3726
3727     tso_loc = where_is((StgClosure *)bqe);
3728     switch (get_itbl(bqe)->type) {
3729     case TSO:
3730       fprintf(stderr," TSO %d (%p) on [PE %d],",
3731               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3732       break;
3733     case CONSTR:
3734       fprintf(stderr," %s (IP %p),",
3735               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3736                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3737                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3738                "RBH_Save_?"), get_itbl(bqe));
3739       break;
3740     default:
3741       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3742            info_type((StgClosure *)bqe), node, info_type(node));
3743       break;
3744     }
3745   } /* for */
3746   fputc('\n', stderr);
3747 }
3748 #else
3749 /* 
3750    Nice and easy: only TSOs on the blocking queue
3751 */
3752 void 
3753 print_bq (StgClosure *node)
3754 {
3755   StgTSO *tso;
3756
3757   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3758   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3759        tso != END_TSO_QUEUE; 
3760        tso=tso->link) {
3761     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3762     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3763     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3764   }
3765   fputc('\n', stderr);
3766 }
3767 # endif
3768
3769 #if defined(PAR)
3770 static nat
3771 run_queue_len(void)
3772 {
3773   nat i;
3774   StgTSO *tso;
3775
3776   for (i=0, tso=run_queue_hd; 
3777        tso != END_TSO_QUEUE;
3778        i++, tso=tso->link)
3779     /* nothing */
3780
3781   return i;
3782 }
3783 #endif
3784
3785 static void
3786 sched_belch(char *s, ...)
3787 {
3788   va_list ap;
3789   va_start(ap,s);
3790 #ifdef SMP
3791   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3792 #elif defined(PAR)
3793   fprintf(stderr, "== ");
3794 #else
3795   fprintf(stderr, "scheduler: ");
3796 #endif
3797   vfprintf(stderr, s, ap);
3798   fprintf(stderr, "\n");
3799   va_end(ap);
3800 }
3801
3802 #endif /* DEBUG */
3803
3804
3805 //@node Index,  , Debugging Routines, Main scheduling code
3806 //@subsection Index
3807
3808 //@index
3809 //* StgMainThread::  @cindex\s-+StgMainThread
3810 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3811 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3812 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3813 //* context_switch::  @cindex\s-+context_switch
3814 //* createThread::  @cindex\s-+createThread
3815 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3816 //* initScheduler::  @cindex\s-+initScheduler
3817 //* interrupted::  @cindex\s-+interrupted
3818 //* next_thread_id::  @cindex\s-+next_thread_id
3819 //* print_bq::  @cindex\s-+print_bq
3820 //* run_queue_hd::  @cindex\s-+run_queue_hd
3821 //* run_queue_tl::  @cindex\s-+run_queue_tl
3822 //* sched_mutex::  @cindex\s-+sched_mutex
3823 //* schedule::  @cindex\s-+schedule
3824 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3825 //* term_mutex::  @cindex\s-+term_mutex
3826 //@end index