[project @ 2002-04-26 22:35:54 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.141 2002/04/26 22:35:54 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #ifdef HAVE_SYS_TYPES_H
118 #include <sys/types.h>
119 #endif
120 #ifdef HAVE_UNISTD_H
121 #include <unistd.h>
122 #endif
123
124 #include <stdarg.h>
125
126 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
127 //@subsection Variables and Data structures
128
129 /* Main thread queue.
130  * Locks required: sched_mutex.
131  */
132 StgMainThread *main_threads;
133
134 /* Thread queues.
135  * Locks required: sched_mutex.
136  */
137 #if defined(GRAN)
138
139 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
140 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
141
142 /* 
143    In GranSim we have a runnable and a blocked queue for each processor.
144    In order to minimise code changes new arrays run_queue_hds/tls
145    are created. run_queue_hd is then a short cut (macro) for
146    run_queue_hds[CurrentProc] (see GranSim.h).
147    -- HWL
148 */
149 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
150 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
151 StgTSO *ccalling_threadss[MAX_PROC];
152 /* We use the same global list of threads (all_threads) in GranSim as in
153    the std RTS (i.e. we are cheating). However, we don't use this list in
154    the GranSim specific code at the moment (so we are only potentially
155    cheating).  */
156
157 #else /* !GRAN */
158
159 StgTSO *run_queue_hd, *run_queue_tl;
160 StgTSO *blocked_queue_hd, *blocked_queue_tl;
161 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
162
163 #endif
164
165 /* Linked list of all threads.
166  * Used for detecting garbage collected threads.
167  */
168 StgTSO *all_threads;
169
170 /* When a thread performs a safe C call (_ccall_GC, using old
171  * terminology), it gets put on the suspended_ccalling_threads
172  * list. Used by the garbage collector.
173  */
174 static StgTSO *suspended_ccalling_threads;
175
176 static StgTSO *threadStackOverflow(StgTSO *tso);
177
178 /* KH: The following two flags are shared memory locations.  There is no need
179        to lock them, since they are only unset at the end of a scheduler
180        operation.
181 */
182
183 /* flag set by signal handler to precipitate a context switch */
184 //@cindex context_switch
185 nat context_switch;
186
187 /* if this flag is set as well, give up execution */
188 //@cindex interrupted
189 rtsBool interrupted;
190
191 /* Next thread ID to allocate.
192  * Locks required: sched_mutex
193  */
194 //@cindex next_thread_id
195 StgThreadID next_thread_id = 1;
196
197 /*
198  * Pointers to the state of the current thread.
199  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
200  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
201  */
202  
203 /* The smallest stack size that makes any sense is:
204  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
205  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
206  *  + 1                       (the realworld token for an IO thread)
207  *  + 1                       (the closure to enter)
208  *
209  * A thread with this stack will bomb immediately with a stack
210  * overflow, which will increase its stack size.  
211  */
212
213 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
214
215
216 #if defined(GRAN)
217 StgTSO *CurrentTSO;
218 #endif
219
220 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
221  *  exists - earlier gccs apparently didn't.
222  *  -= chak
223  */
224 StgTSO dummy_tso;
225
226 rtsBool ready_to_gc;
227
228 void            addToBlockedQueue ( StgTSO *tso );
229
230 static void     schedule          ( void );
231        void     interruptStgRts   ( void );
232 #if defined(GRAN)
233 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
234 #else
235 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
236 #endif
237
238 static void     detectBlackHoles  ( void );
239
240 #ifdef DEBUG
241 static void sched_belch(char *s, ...);
242 #endif
243
244 #if defined(RTS_SUPPORTS_THREADS)
245 /* ToDo: carefully document the invariants that go together
246  *       with these synchronisation objects.
247  */
248 Mutex     sched_mutex       = INIT_MUTEX_VAR;
249 Mutex     term_mutex        = INIT_MUTEX_VAR;
250
251 # if defined(SMP)
252 static Condition gc_pending_cond = INIT_COND_VAR;
253 nat await_death;
254 # endif
255
256 #endif /* RTS_SUPPORTS_THREADS */
257
258 #if defined(PAR)
259 StgTSO *LastTSO;
260 rtsTime TimeOfLastYield;
261 rtsBool emitSchedule = rtsTrue;
262 #endif
263
264 #if DEBUG
265 char *whatNext_strs[] = {
266   "ThreadEnterGHC",
267   "ThreadRunGHC",
268   "ThreadEnterInterp",
269   "ThreadKilled",
270   "ThreadComplete"
271 };
272
273 char *threadReturnCode_strs[] = {
274   "HeapOverflow",                       /* might also be StackOverflow */
275   "StackOverflow",
276   "ThreadYielding",
277   "ThreadBlocked",
278   "ThreadFinished"
279 };
280 #endif
281
282 #if defined(PAR)
283 StgTSO * createSparkThread(rtsSpark spark);
284 StgTSO * activateSpark (rtsSpark spark);  
285 #endif
286
287 /*
288  * The thread state for the main thread.
289 // ToDo: check whether not needed any more
290 StgTSO   *MainTSO;
291  */
292
293 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
294 static void taskStart(void);
295 static void
296 taskStart(void)
297 {
298   schedule();
299 }
300 #endif
301
302
303
304
305 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
306 //@subsection Main scheduling loop
307
308 /* ---------------------------------------------------------------------------
309    Main scheduling loop.
310
311    We use round-robin scheduling, each thread returning to the
312    scheduler loop when one of these conditions is detected:
313
314       * out of heap space
315       * timer expires (thread yields)
316       * thread blocks
317       * thread ends
318       * stack overflow
319
320    Locking notes:  we acquire the scheduler lock once at the beginning
321    of the scheduler loop, and release it when
322     
323       * running a thread, or
324       * waiting for work, or
325       * waiting for a GC to complete.
326
327    GRAN version:
328      In a GranSim setup this loop iterates over the global event queue.
329      This revolves around the global event queue, which determines what 
330      to do next. Therefore, it's more complicated than either the 
331      concurrent or the parallel (GUM) setup.
332
333    GUM version:
334      GUM iterates over incoming messages.
335      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
336      and sends out a fish whenever it has nothing to do; in-between
337      doing the actual reductions (shared code below) it processes the
338      incoming messages and deals with delayed operations 
339      (see PendingFetches).
340      This is not the ugliest code you could imagine, but it's bloody close.
341
342    ------------------------------------------------------------------------ */
343 //@cindex schedule
344 static void
345 schedule( void )
346 {
347   StgTSO *t;
348   Capability *cap;
349   StgThreadReturnCode ret;
350 #if defined(GRAN)
351   rtsEvent *event;
352 #elif defined(PAR)
353   StgSparkPool *pool;
354   rtsSpark spark;
355   StgTSO *tso;
356   GlobalTaskId pe;
357   rtsBool receivedFinish = rtsFalse;
358 # if defined(DEBUG)
359   nat tp_size, sp_size; // stats only
360 # endif
361 #endif
362   rtsBool was_interrupted = rtsFalse;
363   
364   ACQUIRE_LOCK(&sched_mutex);
365  
366 #if defined(RTS_SUPPORTS_THREADS)
367   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
368 #else
369   /* simply initialise it in the non-threaded case */
370   grabCapability(&cap);
371 #endif
372
373 #if defined(GRAN)
374   /* set up first event to get things going */
375   /* ToDo: assign costs for system setup and init MainTSO ! */
376   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
377             ContinueThread, 
378             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
379
380   IF_DEBUG(gran,
381            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
382            G_TSO(CurrentTSO, 5));
383
384   if (RtsFlags.GranFlags.Light) {
385     /* Save current time; GranSim Light only */
386     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
387   }      
388
389   event = get_next_event();
390
391   while (event!=(rtsEvent*)NULL) {
392     /* Choose the processor with the next event */
393     CurrentProc = event->proc;
394     CurrentTSO = event->tso;
395
396 #elif defined(PAR)
397
398   while (!receivedFinish) {    /* set by processMessages */
399                                /* when receiving PP_FINISH message         */ 
400 #else
401
402   while (1) {
403
404 #endif
405
406     IF_DEBUG(scheduler, printAllThreads());
407
408 #if defined(RTS_SUPPORTS_THREADS)
409     /* Check to see whether there are any worker threads
410        waiting to deposit external call results. If so,
411        yield our capability */
412     yieldToReturningWorker(&sched_mutex, &cap);
413 #endif
414
415     /* If we're interrupted (the user pressed ^C, or some other
416      * termination condition occurred), kill all the currently running
417      * threads.
418      */
419     if (interrupted) {
420       IF_DEBUG(scheduler, sched_belch("interrupted"));
421       deleteAllThreads();
422       interrupted = rtsFalse;
423       was_interrupted = rtsTrue;
424     }
425
426     /* Go through the list of main threads and wake up any
427      * clients whose computations have finished.  ToDo: this
428      * should be done more efficiently without a linear scan
429      * of the main threads list, somehow...
430      */
431 #if defined(RTS_SUPPORTS_THREADS)
432     { 
433       StgMainThread *m, **prev;
434       prev = &main_threads;
435       for (m = main_threads; m != NULL; m = m->link) {
436         switch (m->tso->what_next) {
437         case ThreadComplete:
438           if (m->ret) {
439             *(m->ret) = (StgClosure *)m->tso->sp[0];
440           }
441           *prev = m->link;
442           m->stat = Success;
443           broadcastCondition(&m->wakeup);
444 #ifdef DEBUG
445           free(m->tso->label);
446           m->tso->label = NULL;
447 #endif
448           break;
449         case ThreadKilled:
450           if (m->ret) *(m->ret) = NULL;
451           *prev = m->link;
452           if (was_interrupted) {
453             m->stat = Interrupted;
454           } else {
455             m->stat = Killed;
456           }
457           broadcastCondition(&m->wakeup);
458 #ifdef DEBUG
459           free(m->tso->label);
460           m->tso->label = NULL;
461 #endif
462           break;
463         default:
464           break;
465         }
466       }
467     }
468
469 #else /* not threaded */
470
471 # if defined(PAR)
472     /* in GUM do this only on the Main PE */
473     if (IAmMainThread)
474 # endif
475     /* If our main thread has finished or been killed, return.
476      */
477     {
478       StgMainThread *m = main_threads;
479       if (m->tso->what_next == ThreadComplete
480           || m->tso->what_next == ThreadKilled) {
481 #ifdef DEBUG
482         free(m->tso->label);
483         m->tso->label = NULL;
484 #endif
485         main_threads = main_threads->link;
486         if (m->tso->what_next == ThreadComplete) {
487           /* we finished successfully, fill in the return value */
488           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
489           m->stat = Success;
490           return;
491         } else {
492           if (m->ret) { *(m->ret) = NULL; };
493           if (was_interrupted) {
494             m->stat = Interrupted;
495           } else {
496             m->stat = Killed;
497           }
498           return;
499         }
500       }
501     }
502 #endif
503
504     /* Top up the run queue from our spark pool.  We try to make the
505      * number of threads in the run queue equal to the number of
506      * free capabilities.
507      *
508      * Disable spark support in SMP for now, non-essential & requires
509      * a little bit of work to make it compile cleanly. -- sof 1/02.
510      */
511 #if 0 /* defined(SMP) */
512     {
513       nat n = getFreeCapabilities();
514       StgTSO *tso = run_queue_hd;
515
516       /* Count the run queue */
517       while (n > 0 && tso != END_TSO_QUEUE) {
518         tso = tso->link;
519         n--;
520       }
521
522       for (; n > 0; n--) {
523         StgClosure *spark;
524         spark = findSpark(rtsFalse);
525         if (spark == NULL) {
526           break; /* no more sparks in the pool */
527         } else {
528           /* I'd prefer this to be done in activateSpark -- HWL */
529           /* tricky - it needs to hold the scheduler lock and
530            * not try to re-acquire it -- SDM */
531           createSparkThread(spark);       
532           IF_DEBUG(scheduler,
533                    sched_belch("==^^ turning spark of closure %p into a thread",
534                                (StgClosure *)spark));
535         }
536       }
537       /* We need to wake up the other tasks if we just created some
538        * work for them.
539        */
540       if (getFreeCapabilities() - n > 1) {
541           signalCondition( &thread_ready_cond );
542       }
543     }
544 #endif // SMP
545
546     /* check for signals each time around the scheduler */
547 #ifndef mingw32_TARGET_OS
548     if (signals_pending()) {
549       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
550       startSignalHandlers();
551       ACQUIRE_LOCK(&sched_mutex);
552     }
553 #endif
554
555     /* Check whether any waiting threads need to be woken up.  If the
556      * run queue is empty, and there are no other tasks running, we
557      * can wait indefinitely for something to happen.
558      * ToDo: what if another client comes along & requests another
559      * main thread?
560      */
561     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
562       awaitEvent( EMPTY_RUN_QUEUE()
563 #if defined(SMP)
564         && allFreeCapabilities()
565 #endif
566         );
567     }
568     /* we can be interrupted while waiting for I/O... */
569     if (interrupted) continue;
570
571     /* 
572      * Detect deadlock: when we have no threads to run, there are no
573      * threads waiting on I/O or sleeping, and all the other tasks are
574      * waiting for work, we must have a deadlock of some description.
575      *
576      * We first try to find threads blocked on themselves (ie. black
577      * holes), and generate NonTermination exceptions where necessary.
578      *
579      * If no threads are black holed, we have a deadlock situation, so
580      * inform all the main threads.
581      */
582 #ifndef PAR
583     if (   EMPTY_THREAD_QUEUES()
584 #if defined(RTS_SUPPORTS_THREADS)
585         && EMPTY_QUEUE(suspended_ccalling_threads)
586 #endif
587 #ifdef SMP
588         && allFreeCapabilities()
589 #endif
590         )
591     {
592         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
593 #if defined(THREADED_RTS)
594         /* and SMP mode ..? */
595         releaseCapability(cap);
596 #endif
597         // Garbage collection can release some new threads due to
598         // either (a) finalizers or (b) threads resurrected because
599         // they are about to be send BlockedOnDeadMVar.  Any threads
600         // thus released will be immediately runnable.
601         GarbageCollect(GetRoots,rtsTrue);
602
603         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
604
605         IF_DEBUG(scheduler, 
606                  sched_belch("still deadlocked, checking for black holes..."));
607         detectBlackHoles();
608
609         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
610
611 #ifndef mingw32_TARGET_OS
612         /* If we have user-installed signal handlers, then wait
613          * for signals to arrive rather then bombing out with a
614          * deadlock.
615          */
616 #if defined(RTS_SUPPORTS_THREADS)
617         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
618                       a signal with no runnable threads (or I/O
619                       suspended ones) leads nowhere quick.
620                       For now, simply shut down when we reach this
621                       condition.
622                       
623                       ToDo: define precisely under what conditions
624                       the Scheduler should shut down in an MT setting.
625                    */
626 #else
627         if ( anyUserHandlers() ) {
628 #endif
629             IF_DEBUG(scheduler, 
630                      sched_belch("still deadlocked, waiting for signals..."));
631
632             awaitUserSignals();
633
634             // we might be interrupted...
635             if (interrupted) { continue; }
636
637             if (signals_pending()) {
638                 RELEASE_LOCK(&sched_mutex);
639                 startSignalHandlers();
640                 ACQUIRE_LOCK(&sched_mutex);
641             }
642             ASSERT(!EMPTY_RUN_QUEUE());
643             goto not_deadlocked;
644         }
645 #endif
646
647         /* Probably a real deadlock.  Send the current main thread the
648          * Deadlock exception (or in the SMP build, send *all* main
649          * threads the deadlock exception, since none of them can make
650          * progress).
651          */
652         {
653             StgMainThread *m;
654 #if defined(RTS_SUPPORTS_THREADS)
655             for (m = main_threads; m != NULL; m = m->link) {
656                 switch (m->tso->why_blocked) {
657                 case BlockedOnBlackHole:
658                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
659                     break;
660                 case BlockedOnException:
661                 case BlockedOnMVar:
662                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
663                     break;
664                 default:
665                     barf("deadlock: main thread blocked in a strange way");
666                 }
667             }
668 #else
669             m = main_threads;
670             switch (m->tso->why_blocked) {
671             case BlockedOnBlackHole:
672                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
673                 break;
674             case BlockedOnException:
675             case BlockedOnMVar:
676                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
677                 break;
678             default:
679                 barf("deadlock: main thread blocked in a strange way");
680             }
681 #endif
682         }
683
684 #if defined(RTS_SUPPORTS_THREADS)
685         /* ToDo: revisit conditions (and mechanism) for shutting
686            down a multi-threaded world  */
687         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
688         RELEASE_LOCK(&sched_mutex);
689         shutdownHaskell();
690         return;
691 #endif
692     }
693   not_deadlocked:
694
695 #elif defined(PAR)
696     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
697 #endif
698
699 #if defined(SMP)
700     /* If there's a GC pending, don't do anything until it has
701      * completed.
702      */
703     if (ready_to_gc) {
704       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
705       waitCondition( &gc_pending_cond, &sched_mutex );
706     }
707 #endif    
708
709 #if defined(RTS_SUPPORTS_THREADS)
710     /* block until we've got a thread on the run queue and a free
711      * capability.
712      *
713      */
714     if ( EMPTY_RUN_QUEUE() ) {
715       /* Give up our capability */
716       releaseCapability(cap);
717       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
718       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
719       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
720 #if 0
721       while ( EMPTY_RUN_QUEUE() ) {
722         waitForWorkCapability(&sched_mutex, &cap);
723         IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
724       }
725 #endif
726     }
727 #endif
728
729 #if defined(GRAN)
730     if (RtsFlags.GranFlags.Light)
731       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
732
733     /* adjust time based on time-stamp */
734     if (event->time > CurrentTime[CurrentProc] &&
735         event->evttype != ContinueThread)
736       CurrentTime[CurrentProc] = event->time;
737     
738     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
739     if (!RtsFlags.GranFlags.Light)
740       handleIdlePEs();
741
742     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
743
744     /* main event dispatcher in GranSim */
745     switch (event->evttype) {
746       /* Should just be continuing execution */
747     case ContinueThread:
748       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
749       /* ToDo: check assertion
750       ASSERT(run_queue_hd != (StgTSO*)NULL &&
751              run_queue_hd != END_TSO_QUEUE);
752       */
753       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
754       if (!RtsFlags.GranFlags.DoAsyncFetch &&
755           procStatus[CurrentProc]==Fetching) {
756         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
757               CurrentTSO->id, CurrentTSO, CurrentProc);
758         goto next_thread;
759       } 
760       /* Ignore ContinueThreads for completed threads */
761       if (CurrentTSO->what_next == ThreadComplete) {
762         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
763               CurrentTSO->id, CurrentTSO, CurrentProc);
764         goto next_thread;
765       } 
766       /* Ignore ContinueThreads for threads that are being migrated */
767       if (PROCS(CurrentTSO)==Nowhere) { 
768         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
769               CurrentTSO->id, CurrentTSO, CurrentProc);
770         goto next_thread;
771       }
772       /* The thread should be at the beginning of the run queue */
773       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
774         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
775               CurrentTSO->id, CurrentTSO, CurrentProc);
776         break; // run the thread anyway
777       }
778       /*
779       new_event(proc, proc, CurrentTime[proc],
780                 FindWork,
781                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
782       goto next_thread; 
783       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
784       break; // now actually run the thread; DaH Qu'vam yImuHbej 
785
786     case FetchNode:
787       do_the_fetchnode(event);
788       goto next_thread;             /* handle next event in event queue  */
789       
790     case GlobalBlock:
791       do_the_globalblock(event);
792       goto next_thread;             /* handle next event in event queue  */
793       
794     case FetchReply:
795       do_the_fetchreply(event);
796       goto next_thread;             /* handle next event in event queue  */
797       
798     case UnblockThread:   /* Move from the blocked queue to the tail of */
799       do_the_unblock(event);
800       goto next_thread;             /* handle next event in event queue  */
801       
802     case ResumeThread:  /* Move from the blocked queue to the tail of */
803       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
804       event->tso->gran.blocktime += 
805         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
806       do_the_startthread(event);
807       goto next_thread;             /* handle next event in event queue  */
808       
809     case StartThread:
810       do_the_startthread(event);
811       goto next_thread;             /* handle next event in event queue  */
812       
813     case MoveThread:
814       do_the_movethread(event);
815       goto next_thread;             /* handle next event in event queue  */
816       
817     case MoveSpark:
818       do_the_movespark(event);
819       goto next_thread;             /* handle next event in event queue  */
820       
821     case FindWork:
822       do_the_findwork(event);
823       goto next_thread;             /* handle next event in event queue  */
824       
825     default:
826       barf("Illegal event type %u\n", event->evttype);
827     }  /* switch */
828     
829     /* This point was scheduler_loop in the old RTS */
830
831     IF_DEBUG(gran, belch("GRAN: after main switch"));
832
833     TimeOfLastEvent = CurrentTime[CurrentProc];
834     TimeOfNextEvent = get_time_of_next_event();
835     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
836     // CurrentTSO = ThreadQueueHd;
837
838     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
839                          TimeOfNextEvent));
840
841     if (RtsFlags.GranFlags.Light) 
842       GranSimLight_leave_system(event, &ActiveTSO); 
843
844     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
845
846     IF_DEBUG(gran, 
847              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
848
849     /* in a GranSim setup the TSO stays on the run queue */
850     t = CurrentTSO;
851     /* Take a thread from the run queue. */
852     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
853
854     IF_DEBUG(gran, 
855              fprintf(stderr, "GRAN: About to run current thread, which is\n");
856              G_TSO(t,5));
857
858     context_switch = 0; // turned on via GranYield, checking events and time slice
859
860     IF_DEBUG(gran, 
861              DumpGranEvent(GR_SCHEDULE, t));
862
863     procStatus[CurrentProc] = Busy;
864
865 #elif defined(PAR)
866     if (PendingFetches != END_BF_QUEUE) {
867         processFetches();
868     }
869
870     /* ToDo: phps merge with spark activation above */
871     /* check whether we have local work and send requests if we have none */
872     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
873       /* :-[  no local threads => look out for local sparks */
874       /* the spark pool for the current PE */
875       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
876       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
877           pool->hd < pool->tl) {
878         /* 
879          * ToDo: add GC code check that we really have enough heap afterwards!!
880          * Old comment:
881          * If we're here (no runnable threads) and we have pending
882          * sparks, we must have a space problem.  Get enough space
883          * to turn one of those pending sparks into a
884          * thread... 
885          */
886
887         spark = findSpark(rtsFalse);                /* get a spark */
888         if (spark != (rtsSpark) NULL) {
889           tso = activateSpark(spark);       /* turn the spark into a thread */
890           IF_PAR_DEBUG(schedule,
891                        belch("==== schedule: Created TSO %d (%p); %d threads active",
892                              tso->id, tso, advisory_thread_count));
893
894           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
895             belch("==^^ failed to activate spark");
896             goto next_thread;
897           }               /* otherwise fall through & pick-up new tso */
898         } else {
899           IF_PAR_DEBUG(verbose,
900                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
901                              spark_queue_len(pool)));
902           goto next_thread;
903         }
904       }
905
906       /* If we still have no work we need to send a FISH to get a spark
907          from another PE 
908       */
909       if (EMPTY_RUN_QUEUE()) {
910       /* =8-[  no local sparks => look for work on other PEs */
911         /*
912          * We really have absolutely no work.  Send out a fish
913          * (there may be some out there already), and wait for
914          * something to arrive.  We clearly can't run any threads
915          * until a SCHEDULE or RESUME arrives, and so that's what
916          * we're hoping to see.  (Of course, we still have to
917          * respond to other types of messages.)
918          */
919         TIME now = msTime() /*CURRENT_TIME*/;
920         IF_PAR_DEBUG(verbose, 
921                      belch("--  now=%ld", now));
922         IF_PAR_DEBUG(verbose,
923                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
924                          (last_fish_arrived_at!=0 &&
925                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
926                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
927                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
928                              last_fish_arrived_at,
929                              RtsFlags.ParFlags.fishDelay, now);
930                      });
931         
932         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
933             (last_fish_arrived_at==0 ||
934              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
935           /* outstandingFishes is set in sendFish, processFish;
936              avoid flooding system with fishes via delay */
937           pe = choosePE();
938           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
939                    NEW_FISH_HUNGER);
940
941           // Global statistics: count no. of fishes
942           if (RtsFlags.ParFlags.ParStats.Global &&
943               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
944             globalParStats.tot_fish_mess++;
945           }
946         }
947       
948         receivedFinish = processMessages();
949         goto next_thread;
950       }
951     } else if (PacketsWaiting()) {  /* Look for incoming messages */
952       receivedFinish = processMessages();
953     }
954
955     /* Now we are sure that we have some work available */
956     ASSERT(run_queue_hd != END_TSO_QUEUE);
957
958     /* Take a thread from the run queue, if we have work */
959     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
960     IF_DEBUG(sanity,checkTSO(t));
961
962     /* ToDo: write something to the log-file
963     if (RTSflags.ParFlags.granSimStats && !sameThread)
964         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
965
966     CurrentTSO = t;
967     */
968     /* the spark pool for the current PE */
969     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
970
971     IF_DEBUG(scheduler, 
972              belch("--=^ %d threads, %d sparks on [%#x]", 
973                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
974
975 # if 1
976     if (0 && RtsFlags.ParFlags.ParStats.Full && 
977         t && LastTSO && t->id != LastTSO->id && 
978         LastTSO->why_blocked == NotBlocked && 
979         LastTSO->what_next != ThreadComplete) {
980       // if previously scheduled TSO not blocked we have to record the context switch
981       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
982                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
983     }
984
985     if (RtsFlags.ParFlags.ParStats.Full && 
986         (emitSchedule /* forced emit */ ||
987         (t && LastTSO && t->id != LastTSO->id))) {
988       /* 
989          we are running a different TSO, so write a schedule event to log file
990          NB: If we use fair scheduling we also have to write  a deschedule 
991              event for LastTSO; with unfair scheduling we know that the
992              previous tso has blocked whenever we switch to another tso, so
993              we don't need it in GUM for now
994       */
995       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
996                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
997       emitSchedule = rtsFalse;
998     }
999      
1000 # endif
1001 #else /* !GRAN && !PAR */
1002   
1003     /* grab a thread from the run queue */
1004     ASSERT(run_queue_hd != END_TSO_QUEUE);
1005     t = POP_RUN_QUEUE();
1006     // Sanity check the thread we're about to run.  This can be
1007     // expensive if there is lots of thread switching going on...
1008     IF_DEBUG(sanity,checkTSO(t));
1009 #endif
1010     
1011     cap->r.rCurrentTSO = t;
1012     
1013     /* context switches are now initiated by the timer signal, unless
1014      * the user specified "context switch as often as possible", with
1015      * +RTS -C0
1016      */
1017     if (
1018 #ifdef PROFILING
1019         RtsFlags.ProfFlags.profileInterval == 0 ||
1020 #endif
1021         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1022          && (run_queue_hd != END_TSO_QUEUE
1023              || blocked_queue_hd != END_TSO_QUEUE
1024              || sleeping_queue != END_TSO_QUEUE)))
1025         context_switch = 1;
1026     else
1027         context_switch = 0;
1028
1029     RELEASE_LOCK(&sched_mutex);
1030
1031     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1032                               t->id, t, whatNext_strs[t->what_next]));
1033
1034 #ifdef PROFILING
1035     startHeapProfTimer();
1036 #endif
1037
1038     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1039     /* Run the current thread 
1040      */
1041     switch (cap->r.rCurrentTSO->what_next) {
1042     case ThreadKilled:
1043     case ThreadComplete:
1044         /* Thread already finished, return to scheduler. */
1045         ret = ThreadFinished;
1046         break;
1047     case ThreadEnterGHC:
1048         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1049         break;
1050     case ThreadRunGHC:
1051         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1052         break;
1053     case ThreadEnterInterp:
1054         ret = interpretBCO(cap);
1055         break;
1056     default:
1057       barf("schedule: invalid what_next field");
1058     }
1059     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1060     
1061     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1062 #ifdef PROFILING
1063     stopHeapProfTimer();
1064     CCCS = CCS_SYSTEM;
1065 #endif
1066     
1067     ACQUIRE_LOCK(&sched_mutex);
1068
1069 #ifdef SMP
1070     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1071 #elif !defined(GRAN) && !defined(PAR)
1072     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1073 #endif
1074     t = cap->r.rCurrentTSO;
1075     
1076 #if defined(PAR)
1077     /* HACK 675: if the last thread didn't yield, make sure to print a 
1078        SCHEDULE event to the log file when StgRunning the next thread, even
1079        if it is the same one as before */
1080     LastTSO = t; 
1081     TimeOfLastYield = CURRENT_TIME;
1082 #endif
1083
1084     switch (ret) {
1085     case HeapOverflow:
1086 #if defined(GRAN)
1087       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1088       globalGranStats.tot_heapover++;
1089 #elif defined(PAR)
1090       globalParStats.tot_heapover++;
1091 #endif
1092
1093       // did the task ask for a large block?
1094       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1095           // if so, get one and push it on the front of the nursery.
1096           bdescr *bd;
1097           nat blocks;
1098           
1099           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1100
1101           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1102                                    t->id, t,
1103                                    whatNext_strs[t->what_next], blocks));
1104
1105           // don't do this if it would push us over the
1106           // alloc_blocks_lim limit; we'll GC first.
1107           if (alloc_blocks + blocks < alloc_blocks_lim) {
1108
1109               alloc_blocks += blocks;
1110               bd = allocGroup( blocks );
1111
1112               // link the new group into the list
1113               bd->link = cap->r.rCurrentNursery;
1114               bd->u.back = cap->r.rCurrentNursery->u.back;
1115               if (cap->r.rCurrentNursery->u.back != NULL) {
1116                   cap->r.rCurrentNursery->u.back->link = bd;
1117               } else {
1118                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1119                          g0s0->blocks == cap->r.rNursery);
1120                   cap->r.rNursery = g0s0->blocks = bd;
1121               }           
1122               cap->r.rCurrentNursery->u.back = bd;
1123
1124               // initialise it as a nursery block
1125               bd->step = g0s0;
1126               bd->gen_no = 0;
1127               bd->flags = 0;
1128               bd->free = bd->start;
1129
1130               // don't forget to update the block count in g0s0.
1131               g0s0->n_blocks += blocks;
1132               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1133
1134               // now update the nursery to point to the new block
1135               cap->r.rCurrentNursery = bd;
1136
1137               // we might be unlucky and have another thread get on the
1138               // run queue before us and steal the large block, but in that
1139               // case the thread will just end up requesting another large
1140               // block.
1141               PUSH_ON_RUN_QUEUE(t);
1142               break;
1143           }
1144       }
1145
1146       /* make all the running tasks block on a condition variable,
1147        * maybe set context_switch and wait till they all pile in,
1148        * then have them wait on a GC condition variable.
1149        */
1150       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1151                                t->id, t, whatNext_strs[t->what_next]));
1152       threadPaused(t);
1153 #if defined(GRAN)
1154       ASSERT(!is_on_queue(t,CurrentProc));
1155 #elif defined(PAR)
1156       /* Currently we emit a DESCHEDULE event before GC in GUM.
1157          ToDo: either add separate event to distinguish SYSTEM time from rest
1158                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1159       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1160         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1161                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1162         emitSchedule = rtsTrue;
1163       }
1164 #endif
1165       
1166       ready_to_gc = rtsTrue;
1167       context_switch = 1;               /* stop other threads ASAP */
1168       PUSH_ON_RUN_QUEUE(t);
1169       /* actual GC is done at the end of the while loop */
1170       break;
1171       
1172     case StackOverflow:
1173 #if defined(GRAN)
1174       IF_DEBUG(gran, 
1175                DumpGranEvent(GR_DESCHEDULE, t));
1176       globalGranStats.tot_stackover++;
1177 #elif defined(PAR)
1178       // IF_DEBUG(par, 
1179       // DumpGranEvent(GR_DESCHEDULE, t);
1180       globalParStats.tot_stackover++;
1181 #endif
1182       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1183                                t->id, t, whatNext_strs[t->what_next]));
1184       /* just adjust the stack for this thread, then pop it back
1185        * on the run queue.
1186        */
1187       threadPaused(t);
1188       { 
1189         StgMainThread *m;
1190         /* enlarge the stack */
1191         StgTSO *new_t = threadStackOverflow(t);
1192         
1193         /* This TSO has moved, so update any pointers to it from the
1194          * main thread stack.  It better not be on any other queues...
1195          * (it shouldn't be).
1196          */
1197         for (m = main_threads; m != NULL; m = m->link) {
1198           if (m->tso == t) {
1199             m->tso = new_t;
1200           }
1201         }
1202         threadPaused(new_t);
1203         PUSH_ON_RUN_QUEUE(new_t);
1204       }
1205       break;
1206
1207     case ThreadYielding:
1208 #if defined(GRAN)
1209       IF_DEBUG(gran, 
1210                DumpGranEvent(GR_DESCHEDULE, t));
1211       globalGranStats.tot_yields++;
1212 #elif defined(PAR)
1213       // IF_DEBUG(par, 
1214       // DumpGranEvent(GR_DESCHEDULE, t);
1215       globalParStats.tot_yields++;
1216 #endif
1217       /* put the thread back on the run queue.  Then, if we're ready to
1218        * GC, check whether this is the last task to stop.  If so, wake
1219        * up the GC thread.  getThread will block during a GC until the
1220        * GC is finished.
1221        */
1222       IF_DEBUG(scheduler,
1223                if (t->what_next == ThreadEnterInterp) {
1224                    /* ToDo: or maybe a timer expired when we were in Hugs?
1225                     * or maybe someone hit ctrl-C
1226                     */
1227                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1228                          t->id, t, whatNext_strs[t->what_next]);
1229                } else {
1230                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1231                          t->id, t, whatNext_strs[t->what_next]);
1232                }
1233                );
1234
1235       threadPaused(t);
1236
1237       IF_DEBUG(sanity,
1238                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1239                checkTSO(t));
1240       ASSERT(t->link == END_TSO_QUEUE);
1241 #if defined(GRAN)
1242       ASSERT(!is_on_queue(t,CurrentProc));
1243
1244       IF_DEBUG(sanity,
1245                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1246                checkThreadQsSanity(rtsTrue));
1247 #endif
1248 #if defined(PAR)
1249       if (RtsFlags.ParFlags.doFairScheduling) { 
1250         /* this does round-robin scheduling; good for concurrency */
1251         APPEND_TO_RUN_QUEUE(t);
1252       } else {
1253         /* this does unfair scheduling; good for parallelism */
1254         PUSH_ON_RUN_QUEUE(t);
1255       }
1256 #else
1257       /* this does round-robin scheduling; good for concurrency */
1258       APPEND_TO_RUN_QUEUE(t);
1259 #endif
1260 #if defined(GRAN)
1261       /* add a ContinueThread event to actually process the thread */
1262       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1263                 ContinueThread,
1264                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1265       IF_GRAN_DEBUG(bq, 
1266                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1267                G_EVENTQ(0);
1268                G_CURR_THREADQ(0));
1269 #endif /* GRAN */
1270       break;
1271       
1272     case ThreadBlocked:
1273 #if defined(GRAN)
1274       IF_DEBUG(scheduler,
1275                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1276                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1277                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1278
1279       // ??? needed; should emit block before
1280       IF_DEBUG(gran, 
1281                DumpGranEvent(GR_DESCHEDULE, t)); 
1282       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1283       /*
1284         ngoq Dogh!
1285       ASSERT(procStatus[CurrentProc]==Busy || 
1286               ((procStatus[CurrentProc]==Fetching) && 
1287               (t->block_info.closure!=(StgClosure*)NULL)));
1288       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1289           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1290             procStatus[CurrentProc]==Fetching)) 
1291         procStatus[CurrentProc] = Idle;
1292       */
1293 #elif defined(PAR)
1294       IF_DEBUG(scheduler,
1295                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1296                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1297       IF_PAR_DEBUG(bq,
1298
1299                    if (t->block_info.closure!=(StgClosure*)NULL) 
1300                      print_bq(t->block_info.closure));
1301
1302       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1303       blockThread(t);
1304
1305       /* whatever we schedule next, we must log that schedule */
1306       emitSchedule = rtsTrue;
1307
1308 #else /* !GRAN */
1309       /* don't need to do anything.  Either the thread is blocked on
1310        * I/O, in which case we'll have called addToBlockedQueue
1311        * previously, or it's blocked on an MVar or Blackhole, in which
1312        * case it'll be on the relevant queue already.
1313        */
1314       IF_DEBUG(scheduler,
1315                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1316                printThreadBlockage(t);
1317                fprintf(stderr, "\n"));
1318
1319       /* Only for dumping event to log file 
1320          ToDo: do I need this in GranSim, too?
1321       blockThread(t);
1322       */
1323 #endif
1324       threadPaused(t);
1325       break;
1326       
1327     case ThreadFinished:
1328       /* Need to check whether this was a main thread, and if so, signal
1329        * the task that started it with the return value.  If we have no
1330        * more main threads, we probably need to stop all the tasks until
1331        * we get a new one.
1332        */
1333       /* We also end up here if the thread kills itself with an
1334        * uncaught exception, see Exception.hc.
1335        */
1336       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1337 #if defined(GRAN)
1338       endThread(t, CurrentProc); // clean-up the thread
1339 #elif defined(PAR)
1340       /* For now all are advisory -- HWL */
1341       //if(t->priority==AdvisoryPriority) ??
1342       advisory_thread_count--;
1343       
1344 # ifdef DIST
1345       if(t->dist.priority==RevalPriority)
1346         FinishReval(t);
1347 # endif
1348       
1349       if (RtsFlags.ParFlags.ParStats.Full &&
1350           !RtsFlags.ParFlags.ParStats.Suppressed) 
1351         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1352 #endif
1353       break;
1354       
1355     default:
1356       barf("schedule: invalid thread return code %d", (int)ret);
1357     }
1358
1359 #ifdef PROFILING
1360     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1361         GarbageCollect(GetRoots, rtsTrue);
1362         heapCensus();
1363         performHeapProfile = rtsFalse;
1364         ready_to_gc = rtsFalse; // we already GC'd
1365     }
1366 #endif
1367
1368     if (ready_to_gc 
1369 #ifdef SMP
1370         && allFreeCapabilities() 
1371 #endif
1372         ) {
1373       /* everybody back, start the GC.
1374        * Could do it in this thread, or signal a condition var
1375        * to do it in another thread.  Either way, we need to
1376        * broadcast on gc_pending_cond afterward.
1377        */
1378 #if defined(RTS_SUPPORTS_THREADS)
1379       IF_DEBUG(scheduler,sched_belch("doing GC"));
1380 #endif
1381       GarbageCollect(GetRoots,rtsFalse);
1382       ready_to_gc = rtsFalse;
1383 #ifdef SMP
1384       broadcastCondition(&gc_pending_cond);
1385 #endif
1386 #if defined(GRAN)
1387       /* add a ContinueThread event to continue execution of current thread */
1388       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1389                 ContinueThread,
1390                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1391       IF_GRAN_DEBUG(bq, 
1392                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1393                G_EVENTQ(0);
1394                G_CURR_THREADQ(0));
1395 #endif /* GRAN */
1396     }
1397
1398 #if defined(GRAN)
1399   next_thread:
1400     IF_GRAN_DEBUG(unused,
1401                   print_eventq(EventHd));
1402
1403     event = get_next_event();
1404 #elif defined(PAR)
1405   next_thread:
1406     /* ToDo: wait for next message to arrive rather than busy wait */
1407 #endif /* GRAN */
1408
1409   } /* end of while(1) */
1410
1411   IF_PAR_DEBUG(verbose,
1412                belch("== Leaving schedule() after having received Finish"));
1413 }
1414
1415 /* ---------------------------------------------------------------------------
1416  * Singleton fork(). Do not copy any running threads.
1417  * ------------------------------------------------------------------------- */
1418
1419 StgInt forkProcess(StgTSO* tso) {
1420
1421 #ifndef mingw32_TARGET_OS
1422   pid_t pid;
1423   StgTSO* t,*next;
1424
1425   IF_DEBUG(scheduler,sched_belch("forking!"));
1426
1427   pid = fork();
1428   if (pid) { /* parent */
1429
1430   /* just return the pid */
1431     
1432   } else { /* child */
1433   /* wipe all other threads */
1434   run_queue_hd = tso;
1435   tso->link = END_TSO_QUEUE;
1436
1437   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1438      us is picky about finding the threat still in its queue when
1439      handling the deleteThread() */
1440
1441   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1442     next = t->link;
1443     if (t->id != tso->id) {
1444       deleteThread(t);
1445     }
1446   }
1447   }
1448   return pid;
1449 #else /* mingw32 */
1450   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1451   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1452   return -1;
1453 #endif /* mingw32 */
1454 }
1455
1456 /* ---------------------------------------------------------------------------
1457  * deleteAllThreads():  kill all the live threads.
1458  *
1459  * This is used when we catch a user interrupt (^C), before performing
1460  * any necessary cleanups and running finalizers.
1461  *
1462  * Locks: sched_mutex held.
1463  * ------------------------------------------------------------------------- */
1464    
1465 void deleteAllThreads ( void )
1466 {
1467   StgTSO* t, *next;
1468   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1469   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1470       next = t->global_link;
1471       deleteThread(t);
1472   }      
1473   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1474   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1475   sleeping_queue = END_TSO_QUEUE;
1476 }
1477
1478 /* startThread and  insertThread are now in GranSim.c -- HWL */
1479
1480
1481 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1482 //@subsection Suspend and Resume
1483
1484 /* ---------------------------------------------------------------------------
1485  * Suspending & resuming Haskell threads.
1486  * 
1487  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1488  * its capability before calling the C function.  This allows another
1489  * task to pick up the capability and carry on running Haskell
1490  * threads.  It also means that if the C call blocks, it won't lock
1491  * the whole system.
1492  *
1493  * The Haskell thread making the C call is put to sleep for the
1494  * duration of the call, on the susepended_ccalling_threads queue.  We
1495  * give out a token to the task, which it can use to resume the thread
1496  * on return from the C function.
1497  * ------------------------------------------------------------------------- */
1498    
1499 StgInt
1500 suspendThread( StgRegTable *reg, 
1501                rtsBool concCall
1502 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1503                STG_UNUSED
1504 #endif
1505                )
1506 {
1507   nat tok;
1508   Capability *cap;
1509
1510   /* assume that *reg is a pointer to the StgRegTable part
1511    * of a Capability.
1512    */
1513   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1514
1515   ACQUIRE_LOCK(&sched_mutex);
1516
1517   IF_DEBUG(scheduler,
1518            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1519
1520   threadPaused(cap->r.rCurrentTSO);
1521   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1522   suspended_ccalling_threads = cap->r.rCurrentTSO;
1523
1524 #if defined(RTS_SUPPORTS_THREADS)
1525   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1526 #endif
1527
1528   /* Use the thread ID as the token; it should be unique */
1529   tok = cap->r.rCurrentTSO->id;
1530
1531   /* Hand back capability */
1532   releaseCapability(cap);
1533   
1534 #if defined(RTS_SUPPORTS_THREADS)
1535   /* Preparing to leave the RTS, so ensure there's a native thread/task
1536      waiting to take over.
1537      
1538      ToDo: optimise this and only create a new task if there's a need
1539      for one (i.e., if there's only one Concurrent Haskell thread alive,
1540      there's no need to create a new task).
1541   */
1542   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1543   if (concCall) {
1544     startTask(taskStart);
1545   }
1546 #endif
1547
1548   /* Other threads _might_ be available for execution; signal this */
1549   THREAD_RUNNABLE();
1550   RELEASE_LOCK(&sched_mutex);
1551   return tok; 
1552 }
1553
1554 StgRegTable *
1555 resumeThread( StgInt tok,
1556               rtsBool concCall
1557 #if !defined(RTS_SUPPORTS_THREADS)
1558                STG_UNUSED
1559 #endif
1560               )
1561 {
1562   StgTSO *tso, **prev;
1563   Capability *cap;
1564
1565 #if defined(RTS_SUPPORTS_THREADS)
1566   /* Wait for permission to re-enter the RTS with the result. */
1567   if ( concCall ) {
1568     ACQUIRE_LOCK(&sched_mutex);
1569     grabReturnCapability(&sched_mutex, &cap);
1570   } else {
1571     grabCapability(&cap);
1572   }
1573 #else
1574   grabCapability(&cap);
1575 #endif
1576
1577   /* Remove the thread off of the suspended list */
1578   prev = &suspended_ccalling_threads;
1579   for (tso = suspended_ccalling_threads; 
1580        tso != END_TSO_QUEUE; 
1581        prev = &tso->link, tso = tso->link) {
1582     if (tso->id == (StgThreadID)tok) {
1583       *prev = tso->link;
1584       break;
1585     }
1586   }
1587   if (tso == END_TSO_QUEUE) {
1588     barf("resumeThread: thread not found");
1589   }
1590   tso->link = END_TSO_QUEUE;
1591   /* Reset blocking status */
1592   tso->why_blocked  = NotBlocked;
1593
1594   cap->r.rCurrentTSO = tso;
1595   RELEASE_LOCK(&sched_mutex);
1596   return &cap->r;
1597 }
1598
1599
1600 /* ---------------------------------------------------------------------------
1601  * Static functions
1602  * ------------------------------------------------------------------------ */
1603 static void unblockThread(StgTSO *tso);
1604
1605 /* ---------------------------------------------------------------------------
1606  * Comparing Thread ids.
1607  *
1608  * This is used from STG land in the implementation of the
1609  * instances of Eq/Ord for ThreadIds.
1610  * ------------------------------------------------------------------------ */
1611
1612 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1613
1614   StgThreadID id1 = tso1->id; 
1615   StgThreadID id2 = tso2->id;
1616  
1617   if (id1 < id2) return (-1);
1618   if (id1 > id2) return 1;
1619   return 0;
1620 }
1621
1622 /* ---------------------------------------------------------------------------
1623  * Fetching the ThreadID from an StgTSO.
1624  *
1625  * This is used in the implementation of Show for ThreadIds.
1626  * ------------------------------------------------------------------------ */
1627 int rts_getThreadId(const StgTSO *tso) 
1628 {
1629   return tso->id;
1630 }
1631
1632 #ifdef DEBUG
1633 void labelThread(StgTSO *tso, char *label)
1634 {
1635   int len;
1636   void *buf;
1637
1638   /* Caveat: Once set, you can only set the thread name to "" */
1639   len = strlen(label)+1;
1640   buf = realloc(tso->label,len);
1641   if (buf == NULL) {
1642     fprintf(stderr,"insufficient memory for labelThread!\n");
1643     free(tso->label);
1644     tso->label = NULL;
1645   } else
1646     strncpy(buf,label,len);
1647   tso->label = buf;
1648 }
1649 #endif /* DEBUG */
1650
1651 /* ---------------------------------------------------------------------------
1652    Create a new thread.
1653
1654    The new thread starts with the given stack size.  Before the
1655    scheduler can run, however, this thread needs to have a closure
1656    (and possibly some arguments) pushed on its stack.  See
1657    pushClosure() in Schedule.h.
1658
1659    createGenThread() and createIOThread() (in SchedAPI.h) are
1660    convenient packaged versions of this function.
1661
1662    currently pri (priority) is only used in a GRAN setup -- HWL
1663    ------------------------------------------------------------------------ */
1664 //@cindex createThread
1665 #if defined(GRAN)
1666 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1667 StgTSO *
1668 createThread(nat stack_size, StgInt pri)
1669 {
1670   return createThread_(stack_size, rtsFalse, pri);
1671 }
1672
1673 static StgTSO *
1674 createThread_(nat size, rtsBool have_lock, StgInt pri)
1675 {
1676 #else
1677 StgTSO *
1678 createThread(nat stack_size)
1679 {
1680   return createThread_(stack_size, rtsFalse);
1681 }
1682
1683 static StgTSO *
1684 createThread_(nat size, rtsBool have_lock)
1685 {
1686 #endif
1687
1688     StgTSO *tso;
1689     nat stack_size;
1690
1691     /* First check whether we should create a thread at all */
1692 #if defined(PAR)
1693   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1694   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1695     threadsIgnored++;
1696     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1697           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1698     return END_TSO_QUEUE;
1699   }
1700   threadsCreated++;
1701 #endif
1702
1703 #if defined(GRAN)
1704   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1705 #endif
1706
1707   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1708
1709   /* catch ridiculously small stack sizes */
1710   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1711     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1712   }
1713
1714   stack_size = size - TSO_STRUCT_SIZEW;
1715
1716   tso = (StgTSO *)allocate(size);
1717   TICK_ALLOC_TSO(stack_size, 0);
1718
1719   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1720 #if defined(GRAN)
1721   SET_GRAN_HDR(tso, ThisPE);
1722 #endif
1723   tso->what_next     = ThreadEnterGHC;
1724
1725 #ifdef DEBUG
1726   tso->label = NULL;
1727 #endif
1728
1729   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1730    * protect the increment operation on next_thread_id.
1731    * In future, we could use an atomic increment instead.
1732    */
1733 #ifdef SMP
1734   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1735 #endif
1736   tso->id = next_thread_id++; 
1737 #ifdef SMP
1738   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1739 #endif
1740
1741   tso->why_blocked  = NotBlocked;
1742   tso->blocked_exceptions = NULL;
1743
1744   tso->stack_size   = stack_size;
1745   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1746                               - TSO_STRUCT_SIZEW;
1747   tso->sp           = (P_)&(tso->stack) + stack_size;
1748
1749 #ifdef PROFILING
1750   tso->prof.CCCS = CCS_MAIN;
1751 #endif
1752
1753   /* put a stop frame on the stack */
1754   tso->sp -= sizeofW(StgStopFrame);
1755   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1756   tso->su = (StgUpdateFrame*)tso->sp;
1757
1758   // ToDo: check this
1759 #if defined(GRAN)
1760   tso->link = END_TSO_QUEUE;
1761   /* uses more flexible routine in GranSim */
1762   insertThread(tso, CurrentProc);
1763 #else
1764   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1765    * from its creation
1766    */
1767 #endif
1768
1769 #if defined(GRAN) 
1770   if (RtsFlags.GranFlags.GranSimStats.Full) 
1771     DumpGranEvent(GR_START,tso);
1772 #elif defined(PAR)
1773   if (RtsFlags.ParFlags.ParStats.Full) 
1774     DumpGranEvent(GR_STARTQ,tso);
1775   /* HACk to avoid SCHEDULE 
1776      LastTSO = tso; */
1777 #endif
1778
1779   /* Link the new thread on the global thread list.
1780    */
1781   tso->global_link = all_threads;
1782   all_threads = tso;
1783
1784 #if defined(DIST)
1785   tso->dist.priority = MandatoryPriority; //by default that is...
1786 #endif
1787
1788 #if defined(GRAN)
1789   tso->gran.pri = pri;
1790 # if defined(DEBUG)
1791   tso->gran.magic = TSO_MAGIC; // debugging only
1792 # endif
1793   tso->gran.sparkname   = 0;
1794   tso->gran.startedat   = CURRENT_TIME; 
1795   tso->gran.exported    = 0;
1796   tso->gran.basicblocks = 0;
1797   tso->gran.allocs      = 0;
1798   tso->gran.exectime    = 0;
1799   tso->gran.fetchtime   = 0;
1800   tso->gran.fetchcount  = 0;
1801   tso->gran.blocktime   = 0;
1802   tso->gran.blockcount  = 0;
1803   tso->gran.blockedat   = 0;
1804   tso->gran.globalsparks = 0;
1805   tso->gran.localsparks  = 0;
1806   if (RtsFlags.GranFlags.Light)
1807     tso->gran.clock  = Now; /* local clock */
1808   else
1809     tso->gran.clock  = 0;
1810
1811   IF_DEBUG(gran,printTSO(tso));
1812 #elif defined(PAR)
1813 # if defined(DEBUG)
1814   tso->par.magic = TSO_MAGIC; // debugging only
1815 # endif
1816   tso->par.sparkname   = 0;
1817   tso->par.startedat   = CURRENT_TIME; 
1818   tso->par.exported    = 0;
1819   tso->par.basicblocks = 0;
1820   tso->par.allocs      = 0;
1821   tso->par.exectime    = 0;
1822   tso->par.fetchtime   = 0;
1823   tso->par.fetchcount  = 0;
1824   tso->par.blocktime   = 0;
1825   tso->par.blockcount  = 0;
1826   tso->par.blockedat   = 0;
1827   tso->par.globalsparks = 0;
1828   tso->par.localsparks  = 0;
1829 #endif
1830
1831 #if defined(GRAN)
1832   globalGranStats.tot_threads_created++;
1833   globalGranStats.threads_created_on_PE[CurrentProc]++;
1834   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1835   globalGranStats.tot_sq_probes++;
1836 #elif defined(PAR)
1837   // collect parallel global statistics (currently done together with GC stats)
1838   if (RtsFlags.ParFlags.ParStats.Global &&
1839       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1840     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1841     globalParStats.tot_threads_created++;
1842   }
1843 #endif 
1844
1845 #if defined(GRAN)
1846   IF_GRAN_DEBUG(pri,
1847                 belch("==__ schedule: Created TSO %d (%p);",
1848                       CurrentProc, tso, tso->id));
1849 #elif defined(PAR)
1850     IF_PAR_DEBUG(verbose,
1851                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1852                        tso->id, tso, advisory_thread_count));
1853 #else
1854   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1855                                  tso->id, tso->stack_size));
1856 #endif    
1857   return tso;
1858 }
1859
1860 #if defined(PAR)
1861 /* RFP:
1862    all parallel thread creation calls should fall through the following routine.
1863 */
1864 StgTSO *
1865 createSparkThread(rtsSpark spark) 
1866 { StgTSO *tso;
1867   ASSERT(spark != (rtsSpark)NULL);
1868   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1869   { threadsIgnored++;
1870     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1871           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1872     return END_TSO_QUEUE;
1873   }
1874   else
1875   { threadsCreated++;
1876     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1877     if (tso==END_TSO_QUEUE)     
1878       barf("createSparkThread: Cannot create TSO");
1879 #if defined(DIST)
1880     tso->priority = AdvisoryPriority;
1881 #endif
1882     pushClosure(tso,spark);
1883     PUSH_ON_RUN_QUEUE(tso);
1884     advisory_thread_count++;    
1885   }
1886   return tso;
1887 }
1888 #endif
1889
1890 /*
1891   Turn a spark into a thread.
1892   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1893 */
1894 #if defined(PAR)
1895 //@cindex activateSpark
1896 StgTSO *
1897 activateSpark (rtsSpark spark) 
1898 {
1899   StgTSO *tso;
1900
1901   tso = createSparkThread(spark);
1902   if (RtsFlags.ParFlags.ParStats.Full) {   
1903     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1904     IF_PAR_DEBUG(verbose,
1905                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1906                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1907   }
1908   // ToDo: fwd info on local/global spark to thread -- HWL
1909   // tso->gran.exported =  spark->exported;
1910   // tso->gran.locked =   !spark->global;
1911   // tso->gran.sparkname = spark->name;
1912
1913   return tso;
1914 }
1915 #endif
1916
1917 /* ---------------------------------------------------------------------------
1918  * scheduleThread()
1919  *
1920  * scheduleThread puts a thread on the head of the runnable queue.
1921  * This will usually be done immediately after a thread is created.
1922  * The caller of scheduleThread must create the thread using e.g.
1923  * createThread and push an appropriate closure
1924  * on this thread's stack before the scheduler is invoked.
1925  * ------------------------------------------------------------------------ */
1926
1927 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1928
1929 void
1930 scheduleThread_(StgTSO *tso
1931                , rtsBool createTask
1932 #if !defined(THREADED_RTS)
1933                  STG_UNUSED
1934 #endif
1935               )
1936 {
1937   ACQUIRE_LOCK(&sched_mutex);
1938
1939   /* Put the new thread on the head of the runnable queue.  The caller
1940    * better push an appropriate closure on this thread's stack
1941    * beforehand.  In the SMP case, the thread may start running as
1942    * soon as we release the scheduler lock below.
1943    */
1944   PUSH_ON_RUN_QUEUE(tso);
1945 #if defined(THREADED_RTS)
1946   /* If main() is scheduling a thread, don't bother creating a 
1947    * new task.
1948    */
1949   if ( createTask ) {
1950     startTask(taskStart);
1951   }
1952 #endif
1953   THREAD_RUNNABLE();
1954
1955 #if 0
1956   IF_DEBUG(scheduler,printTSO(tso));
1957 #endif
1958   RELEASE_LOCK(&sched_mutex);
1959 }
1960
1961 void scheduleThread(StgTSO* tso)
1962 {
1963   return scheduleThread_(tso, rtsFalse);
1964 }
1965
1966 void scheduleExtThread(StgTSO* tso)
1967 {
1968   return scheduleThread_(tso, rtsTrue);
1969 }
1970
1971 /* ---------------------------------------------------------------------------
1972  * initScheduler()
1973  *
1974  * Initialise the scheduler.  This resets all the queues - if the
1975  * queues contained any threads, they'll be garbage collected at the
1976  * next pass.
1977  *
1978  * ------------------------------------------------------------------------ */
1979
1980 #ifdef SMP
1981 static void
1982 term_handler(int sig STG_UNUSED)
1983 {
1984   stat_workerStop();
1985   ACQUIRE_LOCK(&term_mutex);
1986   await_death--;
1987   RELEASE_LOCK(&term_mutex);
1988   shutdownThread();
1989 }
1990 #endif
1991
1992 void 
1993 initScheduler(void)
1994 {
1995 #if defined(GRAN)
1996   nat i;
1997
1998   for (i=0; i<=MAX_PROC; i++) {
1999     run_queue_hds[i]      = END_TSO_QUEUE;
2000     run_queue_tls[i]      = END_TSO_QUEUE;
2001     blocked_queue_hds[i]  = END_TSO_QUEUE;
2002     blocked_queue_tls[i]  = END_TSO_QUEUE;
2003     ccalling_threadss[i]  = END_TSO_QUEUE;
2004     sleeping_queue        = END_TSO_QUEUE;
2005   }
2006 #else
2007   run_queue_hd      = END_TSO_QUEUE;
2008   run_queue_tl      = END_TSO_QUEUE;
2009   blocked_queue_hd  = END_TSO_QUEUE;
2010   blocked_queue_tl  = END_TSO_QUEUE;
2011   sleeping_queue    = END_TSO_QUEUE;
2012 #endif 
2013
2014   suspended_ccalling_threads  = END_TSO_QUEUE;
2015
2016   main_threads = NULL;
2017   all_threads  = END_TSO_QUEUE;
2018
2019   context_switch = 0;
2020   interrupted    = 0;
2021
2022   RtsFlags.ConcFlags.ctxtSwitchTicks =
2023       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2024       
2025 #if defined(RTS_SUPPORTS_THREADS)
2026   /* Initialise the mutex and condition variables used by
2027    * the scheduler. */
2028   initMutex(&sched_mutex);
2029   initMutex(&term_mutex);
2030
2031   initCondition(&thread_ready_cond);
2032 #endif
2033   
2034 #if defined(SMP)
2035   initCondition(&gc_pending_cond);
2036 #endif
2037
2038 #if defined(RTS_SUPPORTS_THREADS)
2039   ACQUIRE_LOCK(&sched_mutex);
2040 #endif
2041
2042   /* Install the SIGHUP handler */
2043 #if defined(SMP)
2044   {
2045     struct sigaction action,oact;
2046
2047     action.sa_handler = term_handler;
2048     sigemptyset(&action.sa_mask);
2049     action.sa_flags = 0;
2050     if (sigaction(SIGTERM, &action, &oact) != 0) {
2051       barf("can't install TERM handler");
2052     }
2053   }
2054 #endif
2055
2056   /* A capability holds the state a native thread needs in
2057    * order to execute STG code. At least one capability is
2058    * floating around (only SMP builds have more than one).
2059    */
2060   initCapabilities();
2061   
2062 #if defined(RTS_SUPPORTS_THREADS)
2063     /* start our haskell execution tasks */
2064 # if defined(SMP)
2065     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2066 # else
2067     startTaskManager(0,taskStart);
2068 # endif
2069 #endif
2070
2071 #if /* defined(SMP) ||*/ defined(PAR)
2072   initSparkPools();
2073 #endif
2074
2075 #if defined(RTS_SUPPORTS_THREADS)
2076   RELEASE_LOCK(&sched_mutex);
2077 #endif
2078
2079 }
2080
2081 void
2082 exitScheduler( void )
2083 {
2084 #if defined(RTS_SUPPORTS_THREADS)
2085   stopTaskManager();
2086 #endif
2087 }
2088
2089 /* -----------------------------------------------------------------------------
2090    Managing the per-task allocation areas.
2091    
2092    Each capability comes with an allocation area.  These are
2093    fixed-length block lists into which allocation can be done.
2094
2095    ToDo: no support for two-space collection at the moment???
2096    -------------------------------------------------------------------------- */
2097
2098 /* -----------------------------------------------------------------------------
2099  * waitThread is the external interface for running a new computation
2100  * and waiting for the result.
2101  *
2102  * In the non-SMP case, we create a new main thread, push it on the 
2103  * main-thread stack, and invoke the scheduler to run it.  The
2104  * scheduler will return when the top main thread on the stack has
2105  * completed or died, and fill in the necessary fields of the
2106  * main_thread structure.
2107  *
2108  * In the SMP case, we create a main thread as before, but we then
2109  * create a new condition variable and sleep on it.  When our new
2110  * main thread has completed, we'll be woken up and the status/result
2111  * will be in the main_thread struct.
2112  * -------------------------------------------------------------------------- */
2113
2114 int 
2115 howManyThreadsAvail ( void )
2116 {
2117    int i = 0;
2118    StgTSO* q;
2119    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2120       i++;
2121    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2122       i++;
2123    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2124       i++;
2125    return i;
2126 }
2127
2128 void
2129 finishAllThreads ( void )
2130 {
2131    do {
2132       while (run_queue_hd != END_TSO_QUEUE) {
2133          waitThread ( run_queue_hd, NULL);
2134       }
2135       while (blocked_queue_hd != END_TSO_QUEUE) {
2136          waitThread ( blocked_queue_hd, NULL);
2137       }
2138       while (sleeping_queue != END_TSO_QUEUE) {
2139          waitThread ( blocked_queue_hd, NULL);
2140       }
2141    } while 
2142       (blocked_queue_hd != END_TSO_QUEUE || 
2143        run_queue_hd     != END_TSO_QUEUE ||
2144        sleeping_queue   != END_TSO_QUEUE);
2145 }
2146
2147 SchedulerStatus
2148 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2149
2150   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2151 #if defined(THREADED_RTS)
2152   return waitThread_(tso,ret, rtsFalse);
2153 #else
2154   return waitThread_(tso,ret);
2155 #endif
2156 }
2157
2158 SchedulerStatus
2159 waitThread_(StgTSO *tso,
2160             /*out*/StgClosure **ret
2161 #if defined(THREADED_RTS)
2162             , rtsBool blockWaiting
2163 #endif
2164            )
2165 {
2166   StgMainThread *m;
2167   SchedulerStatus stat;
2168
2169   ACQUIRE_LOCK(&sched_mutex);
2170   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2171   
2172   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2173
2174   m->tso = tso;
2175   m->ret = ret;
2176   m->stat = NoStatus;
2177 #if defined(RTS_SUPPORTS_THREADS)
2178   initCondition(&m->wakeup);
2179 #endif
2180
2181   m->link = main_threads;
2182   main_threads = m;
2183
2184   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2185
2186 #if defined(RTS_SUPPORTS_THREADS)
2187
2188 # if defined(THREADED_RTS)
2189   if (!blockWaiting) {
2190     /* In the threaded case, the OS thread that called main()
2191      * gets to enter the RTS directly without going via another
2192      * task/thread.
2193      */
2194     RELEASE_LOCK(&sched_mutex);
2195     schedule();
2196     ASSERT(m->stat != NoStatus);
2197   } else 
2198 # endif
2199   {
2200     do {
2201       waitCondition(&m->wakeup, &sched_mutex);
2202     } while (m->stat == NoStatus);
2203   }
2204 #elif defined(GRAN)
2205   /* GranSim specific init */
2206   CurrentTSO = m->tso;                // the TSO to run
2207   procStatus[MainProc] = Busy;        // status of main PE
2208   CurrentProc = MainProc;             // PE to run it on
2209
2210   schedule();
2211 #else
2212   RELEASE_LOCK(&sched_mutex);
2213   schedule();
2214   ASSERT(m->stat != NoStatus);
2215 #endif
2216
2217   stat = m->stat;
2218
2219 #if defined(RTS_SUPPORTS_THREADS)
2220   closeCondition(&m->wakeup);
2221 #endif
2222
2223   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2224                               m->tso->id));
2225   free(m);
2226
2227 #if defined(THREADED_RTS)
2228   if (blockWaiting) 
2229 #endif
2230     RELEASE_LOCK(&sched_mutex);
2231
2232   return stat;
2233 }
2234
2235 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2236 //@subsection Run queue code 
2237
2238 #if 0
2239 /* 
2240    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2241        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2242        implicit global variable that has to be correct when calling these
2243        fcts -- HWL 
2244 */
2245
2246 /* Put the new thread on the head of the runnable queue.
2247  * The caller of createThread better push an appropriate closure
2248  * on this thread's stack before the scheduler is invoked.
2249  */
2250 static /* inline */ void
2251 add_to_run_queue(tso)
2252 StgTSO* tso; 
2253 {
2254   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2255   tso->link = run_queue_hd;
2256   run_queue_hd = tso;
2257   if (run_queue_tl == END_TSO_QUEUE) {
2258     run_queue_tl = tso;
2259   }
2260 }
2261
2262 /* Put the new thread at the end of the runnable queue. */
2263 static /* inline */ void
2264 push_on_run_queue(tso)
2265 StgTSO* tso; 
2266 {
2267   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2268   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2269   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2270   if (run_queue_hd == END_TSO_QUEUE) {
2271     run_queue_hd = tso;
2272   } else {
2273     run_queue_tl->link = tso;
2274   }
2275   run_queue_tl = tso;
2276 }
2277
2278 /* 
2279    Should be inlined because it's used very often in schedule.  The tso
2280    argument is actually only needed in GranSim, where we want to have the
2281    possibility to schedule *any* TSO on the run queue, irrespective of the
2282    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2283    the run queue and dequeue the tso, adjusting the links in the queue. 
2284 */
2285 //@cindex take_off_run_queue
2286 static /* inline */ StgTSO*
2287 take_off_run_queue(StgTSO *tso) {
2288   StgTSO *t, *prev;
2289
2290   /* 
2291      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2292
2293      if tso is specified, unlink that tso from the run_queue (doesn't have
2294      to be at the beginning of the queue); GranSim only 
2295   */
2296   if (tso!=END_TSO_QUEUE) {
2297     /* find tso in queue */
2298     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2299          t!=END_TSO_QUEUE && t!=tso;
2300          prev=t, t=t->link) 
2301       /* nothing */ ;
2302     ASSERT(t==tso);
2303     /* now actually dequeue the tso */
2304     if (prev!=END_TSO_QUEUE) {
2305       ASSERT(run_queue_hd!=t);
2306       prev->link = t->link;
2307     } else {
2308       /* t is at beginning of thread queue */
2309       ASSERT(run_queue_hd==t);
2310       run_queue_hd = t->link;
2311     }
2312     /* t is at end of thread queue */
2313     if (t->link==END_TSO_QUEUE) {
2314       ASSERT(t==run_queue_tl);
2315       run_queue_tl = prev;
2316     } else {
2317       ASSERT(run_queue_tl!=t);
2318     }
2319     t->link = END_TSO_QUEUE;
2320   } else {
2321     /* take tso from the beginning of the queue; std concurrent code */
2322     t = run_queue_hd;
2323     if (t != END_TSO_QUEUE) {
2324       run_queue_hd = t->link;
2325       t->link = END_TSO_QUEUE;
2326       if (run_queue_hd == END_TSO_QUEUE) {
2327         run_queue_tl = END_TSO_QUEUE;
2328       }
2329     }
2330   }
2331   return t;
2332 }
2333
2334 #endif /* 0 */
2335
2336 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2337 //@subsection Garbage Collextion Routines
2338
2339 /* ---------------------------------------------------------------------------
2340    Where are the roots that we know about?
2341
2342         - all the threads on the runnable queue
2343         - all the threads on the blocked queue
2344         - all the threads on the sleeping queue
2345         - all the thread currently executing a _ccall_GC
2346         - all the "main threads"
2347      
2348    ------------------------------------------------------------------------ */
2349
2350 /* This has to be protected either by the scheduler monitor, or by the
2351         garbage collection monitor (probably the latter).
2352         KH @ 25/10/99
2353 */
2354
2355 void
2356 GetRoots(evac_fn evac)
2357 {
2358 #if defined(GRAN)
2359   {
2360     nat i;
2361     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2362       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2363           evac((StgClosure **)&run_queue_hds[i]);
2364       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2365           evac((StgClosure **)&run_queue_tls[i]);
2366       
2367       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2368           evac((StgClosure **)&blocked_queue_hds[i]);
2369       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2370           evac((StgClosure **)&blocked_queue_tls[i]);
2371       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2372           evac((StgClosure **)&ccalling_threads[i]);
2373     }
2374   }
2375
2376   markEventQueue();
2377
2378 #else /* !GRAN */
2379   if (run_queue_hd != END_TSO_QUEUE) {
2380       ASSERT(run_queue_tl != END_TSO_QUEUE);
2381       evac((StgClosure **)&run_queue_hd);
2382       evac((StgClosure **)&run_queue_tl);
2383   }
2384   
2385   if (blocked_queue_hd != END_TSO_QUEUE) {
2386       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2387       evac((StgClosure **)&blocked_queue_hd);
2388       evac((StgClosure **)&blocked_queue_tl);
2389   }
2390   
2391   if (sleeping_queue != END_TSO_QUEUE) {
2392       evac((StgClosure **)&sleeping_queue);
2393   }
2394 #endif 
2395
2396   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2397       evac((StgClosure **)&suspended_ccalling_threads);
2398   }
2399
2400 #if defined(PAR) || defined(GRAN)
2401   markSparkQueue(evac);
2402 #endif
2403 }
2404
2405 /* -----------------------------------------------------------------------------
2406    performGC
2407
2408    This is the interface to the garbage collector from Haskell land.
2409    We provide this so that external C code can allocate and garbage
2410    collect when called from Haskell via _ccall_GC.
2411
2412    It might be useful to provide an interface whereby the programmer
2413    can specify more roots (ToDo).
2414    
2415    This needs to be protected by the GC condition variable above.  KH.
2416    -------------------------------------------------------------------------- */
2417
2418 void (*extra_roots)(evac_fn);
2419
2420 void
2421 performGC(void)
2422 {
2423   /* Obligated to hold this lock upon entry */
2424   ACQUIRE_LOCK(&sched_mutex);
2425   GarbageCollect(GetRoots,rtsFalse);
2426   RELEASE_LOCK(&sched_mutex);
2427 }
2428
2429 void
2430 performMajorGC(void)
2431 {
2432   ACQUIRE_LOCK(&sched_mutex);
2433   GarbageCollect(GetRoots,rtsTrue);
2434   RELEASE_LOCK(&sched_mutex);
2435 }
2436
2437 static void
2438 AllRoots(evac_fn evac)
2439 {
2440     GetRoots(evac);             // the scheduler's roots
2441     extra_roots(evac);          // the user's roots
2442 }
2443
2444 void
2445 performGCWithRoots(void (*get_roots)(evac_fn))
2446 {
2447   ACQUIRE_LOCK(&sched_mutex);
2448   extra_roots = get_roots;
2449   GarbageCollect(AllRoots,rtsFalse);
2450   RELEASE_LOCK(&sched_mutex);
2451 }
2452
2453 /* -----------------------------------------------------------------------------
2454    Stack overflow
2455
2456    If the thread has reached its maximum stack size, then raise the
2457    StackOverflow exception in the offending thread.  Otherwise
2458    relocate the TSO into a larger chunk of memory and adjust its stack
2459    size appropriately.
2460    -------------------------------------------------------------------------- */
2461
2462 static StgTSO *
2463 threadStackOverflow(StgTSO *tso)
2464 {
2465   nat new_stack_size, new_tso_size, diff, stack_words;
2466   StgPtr new_sp;
2467   StgTSO *dest;
2468
2469   IF_DEBUG(sanity,checkTSO(tso));
2470   if (tso->stack_size >= tso->max_stack_size) {
2471
2472     IF_DEBUG(gc,
2473              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2474                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2475              /* If we're debugging, just print out the top of the stack */
2476              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2477                                               tso->sp+64)));
2478
2479     /* Send this thread the StackOverflow exception */
2480     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2481     return tso;
2482   }
2483
2484   /* Try to double the current stack size.  If that takes us over the
2485    * maximum stack size for this thread, then use the maximum instead.
2486    * Finally round up so the TSO ends up as a whole number of blocks.
2487    */
2488   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2489   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2490                                        TSO_STRUCT_SIZE)/sizeof(W_);
2491   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2492   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2493
2494   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2495
2496   dest = (StgTSO *)allocate(new_tso_size);
2497   TICK_ALLOC_TSO(new_stack_size,0);
2498
2499   /* copy the TSO block and the old stack into the new area */
2500   memcpy(dest,tso,TSO_STRUCT_SIZE);
2501   stack_words = tso->stack + tso->stack_size - tso->sp;
2502   new_sp = (P_)dest + new_tso_size - stack_words;
2503   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2504
2505   /* relocate the stack pointers... */
2506   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2507   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2508   dest->sp    = new_sp;
2509   dest->stack_size = new_stack_size;
2510         
2511   /* and relocate the update frame list */
2512   relocate_stack(dest, diff);
2513
2514   /* Mark the old TSO as relocated.  We have to check for relocated
2515    * TSOs in the garbage collector and any primops that deal with TSOs.
2516    *
2517    * It's important to set the sp and su values to just beyond the end
2518    * of the stack, so we don't attempt to scavenge any part of the
2519    * dead TSO's stack.
2520    */
2521   tso->what_next = ThreadRelocated;
2522   tso->link = dest;
2523   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2524   tso->su = (StgUpdateFrame *)tso->sp;
2525   tso->why_blocked = NotBlocked;
2526   dest->mut_link = NULL;
2527
2528   IF_PAR_DEBUG(verbose,
2529                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2530                      tso->id, tso, tso->stack_size);
2531                /* If we're debugging, just print out the top of the stack */
2532                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2533                                                 tso->sp+64)));
2534   
2535   IF_DEBUG(sanity,checkTSO(tso));
2536 #if 0
2537   IF_DEBUG(scheduler,printTSO(dest));
2538 #endif
2539
2540   return dest;
2541 }
2542
2543 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2544 //@subsection Blocking Queue Routines
2545
2546 /* ---------------------------------------------------------------------------
2547    Wake up a queue that was blocked on some resource.
2548    ------------------------------------------------------------------------ */
2549
2550 #if defined(GRAN)
2551 static inline void
2552 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2553 {
2554 }
2555 #elif defined(PAR)
2556 static inline void
2557 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2558 {
2559   /* write RESUME events to log file and
2560      update blocked and fetch time (depending on type of the orig closure) */
2561   if (RtsFlags.ParFlags.ParStats.Full) {
2562     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2563                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2564                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2565     if (EMPTY_RUN_QUEUE())
2566       emitSchedule = rtsTrue;
2567
2568     switch (get_itbl(node)->type) {
2569         case FETCH_ME_BQ:
2570           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2571           break;
2572         case RBH:
2573         case FETCH_ME:
2574         case BLACKHOLE_BQ:
2575           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2576           break;
2577 #ifdef DIST
2578         case MVAR:
2579           break;
2580 #endif    
2581         default:
2582           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2583         }
2584       }
2585 }
2586 #endif
2587
2588 #if defined(GRAN)
2589 static StgBlockingQueueElement *
2590 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2591 {
2592     StgTSO *tso;
2593     PEs node_loc, tso_loc;
2594
2595     node_loc = where_is(node); // should be lifted out of loop
2596     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2597     tso_loc = where_is((StgClosure *)tso);
2598     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2599       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2600       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2601       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2602       // insertThread(tso, node_loc);
2603       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2604                 ResumeThread,
2605                 tso, node, (rtsSpark*)NULL);
2606       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2607       // len_local++;
2608       // len++;
2609     } else { // TSO is remote (actually should be FMBQ)
2610       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2611                                   RtsFlags.GranFlags.Costs.gunblocktime +
2612                                   RtsFlags.GranFlags.Costs.latency;
2613       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2614                 UnblockThread,
2615                 tso, node, (rtsSpark*)NULL);
2616       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2617       // len++;
2618     }
2619     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2620     IF_GRAN_DEBUG(bq,
2621                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2622                           (node_loc==tso_loc ? "Local" : "Global"), 
2623                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2624     tso->block_info.closure = NULL;
2625     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2626                              tso->id, tso));
2627 }
2628 #elif defined(PAR)
2629 static StgBlockingQueueElement *
2630 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2631 {
2632     StgBlockingQueueElement *next;
2633
2634     switch (get_itbl(bqe)->type) {
2635     case TSO:
2636       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2637       /* if it's a TSO just push it onto the run_queue */
2638       next = bqe->link;
2639       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2640       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2641       THREAD_RUNNABLE();
2642       unblockCount(bqe, node);
2643       /* reset blocking status after dumping event */
2644       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2645       break;
2646
2647     case BLOCKED_FETCH:
2648       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2649       next = bqe->link;
2650       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2651       PendingFetches = (StgBlockedFetch *)bqe;
2652       break;
2653
2654 # if defined(DEBUG)
2655       /* can ignore this case in a non-debugging setup; 
2656          see comments on RBHSave closures above */
2657     case CONSTR:
2658       /* check that the closure is an RBHSave closure */
2659       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2660              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2661              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2662       break;
2663
2664     default:
2665       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2666            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2667            (StgClosure *)bqe);
2668 # endif
2669     }
2670   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2671   return next;
2672 }
2673
2674 #else /* !GRAN && !PAR */
2675 static StgTSO *
2676 unblockOneLocked(StgTSO *tso)
2677 {
2678   StgTSO *next;
2679
2680   ASSERT(get_itbl(tso)->type == TSO);
2681   ASSERT(tso->why_blocked != NotBlocked);
2682   tso->why_blocked = NotBlocked;
2683   next = tso->link;
2684   PUSH_ON_RUN_QUEUE(tso);
2685   THREAD_RUNNABLE();
2686   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2687   return next;
2688 }
2689 #endif
2690
2691 #if defined(GRAN) || defined(PAR)
2692 inline StgBlockingQueueElement *
2693 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2694 {
2695   ACQUIRE_LOCK(&sched_mutex);
2696   bqe = unblockOneLocked(bqe, node);
2697   RELEASE_LOCK(&sched_mutex);
2698   return bqe;
2699 }
2700 #else
2701 inline StgTSO *
2702 unblockOne(StgTSO *tso)
2703 {
2704   ACQUIRE_LOCK(&sched_mutex);
2705   tso = unblockOneLocked(tso);
2706   RELEASE_LOCK(&sched_mutex);
2707   return tso;
2708 }
2709 #endif
2710
2711 #if defined(GRAN)
2712 void 
2713 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2714 {
2715   StgBlockingQueueElement *bqe;
2716   PEs node_loc;
2717   nat len = 0; 
2718
2719   IF_GRAN_DEBUG(bq, 
2720                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2721                       node, CurrentProc, CurrentTime[CurrentProc], 
2722                       CurrentTSO->id, CurrentTSO));
2723
2724   node_loc = where_is(node);
2725
2726   ASSERT(q == END_BQ_QUEUE ||
2727          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2728          get_itbl(q)->type == CONSTR); // closure (type constructor)
2729   ASSERT(is_unique(node));
2730
2731   /* FAKE FETCH: magically copy the node to the tso's proc;
2732      no Fetch necessary because in reality the node should not have been 
2733      moved to the other PE in the first place
2734   */
2735   if (CurrentProc!=node_loc) {
2736     IF_GRAN_DEBUG(bq, 
2737                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2738                         node, node_loc, CurrentProc, CurrentTSO->id, 
2739                         // CurrentTSO, where_is(CurrentTSO),
2740                         node->header.gran.procs));
2741     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2742     IF_GRAN_DEBUG(bq, 
2743                   belch("## new bitmask of node %p is %#x",
2744                         node, node->header.gran.procs));
2745     if (RtsFlags.GranFlags.GranSimStats.Global) {
2746       globalGranStats.tot_fake_fetches++;
2747     }
2748   }
2749
2750   bqe = q;
2751   // ToDo: check: ASSERT(CurrentProc==node_loc);
2752   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2753     //next = bqe->link;
2754     /* 
2755        bqe points to the current element in the queue
2756        next points to the next element in the queue
2757     */
2758     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2759     //tso_loc = where_is(tso);
2760     len++;
2761     bqe = unblockOneLocked(bqe, node);
2762   }
2763
2764   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2765      the closure to make room for the anchor of the BQ */
2766   if (bqe!=END_BQ_QUEUE) {
2767     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2768     /*
2769     ASSERT((info_ptr==&RBH_Save_0_info) ||
2770            (info_ptr==&RBH_Save_1_info) ||
2771            (info_ptr==&RBH_Save_2_info));
2772     */
2773     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2774     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2775     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2776
2777     IF_GRAN_DEBUG(bq,
2778                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2779                         node, info_type(node)));
2780   }
2781
2782   /* statistics gathering */
2783   if (RtsFlags.GranFlags.GranSimStats.Global) {
2784     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2785     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2786     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2787     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2788   }
2789   IF_GRAN_DEBUG(bq,
2790                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2791                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2792 }
2793 #elif defined(PAR)
2794 void 
2795 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2796 {
2797   StgBlockingQueueElement *bqe;
2798
2799   ACQUIRE_LOCK(&sched_mutex);
2800
2801   IF_PAR_DEBUG(verbose, 
2802                belch("##-_ AwBQ for node %p on [%x]: ",
2803                      node, mytid));
2804 #ifdef DIST  
2805   //RFP
2806   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2807     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2808     return;
2809   }
2810 #endif
2811   
2812   ASSERT(q == END_BQ_QUEUE ||
2813          get_itbl(q)->type == TSO ||           
2814          get_itbl(q)->type == BLOCKED_FETCH || 
2815          get_itbl(q)->type == CONSTR); 
2816
2817   bqe = q;
2818   while (get_itbl(bqe)->type==TSO || 
2819          get_itbl(bqe)->type==BLOCKED_FETCH) {
2820     bqe = unblockOneLocked(bqe, node);
2821   }
2822   RELEASE_LOCK(&sched_mutex);
2823 }
2824
2825 #else   /* !GRAN && !PAR */
2826 void
2827 awakenBlockedQueue(StgTSO *tso)
2828 {
2829   ACQUIRE_LOCK(&sched_mutex);
2830   while (tso != END_TSO_QUEUE) {
2831     tso = unblockOneLocked(tso);
2832   }
2833   RELEASE_LOCK(&sched_mutex);
2834 }
2835 #endif
2836
2837 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2838 //@subsection Exception Handling Routines
2839
2840 /* ---------------------------------------------------------------------------
2841    Interrupt execution
2842    - usually called inside a signal handler so it mustn't do anything fancy.   
2843    ------------------------------------------------------------------------ */
2844
2845 void
2846 interruptStgRts(void)
2847 {
2848     interrupted    = 1;
2849     context_switch = 1;
2850 }
2851
2852 /* -----------------------------------------------------------------------------
2853    Unblock a thread
2854
2855    This is for use when we raise an exception in another thread, which
2856    may be blocked.
2857    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2858    -------------------------------------------------------------------------- */
2859
2860 #if defined(GRAN) || defined(PAR)
2861 /*
2862   NB: only the type of the blocking queue is different in GranSim and GUM
2863       the operations on the queue-elements are the same
2864       long live polymorphism!
2865
2866   Locks: sched_mutex is held upon entry and exit.
2867
2868 */
2869 static void
2870 unblockThread(StgTSO *tso)
2871 {
2872   StgBlockingQueueElement *t, **last;
2873
2874   switch (tso->why_blocked) {
2875
2876   case NotBlocked:
2877     return;  /* not blocked */
2878
2879   case BlockedOnMVar:
2880     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2881     {
2882       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2883       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2884
2885       last = (StgBlockingQueueElement **)&mvar->head;
2886       for (t = (StgBlockingQueueElement *)mvar->head; 
2887            t != END_BQ_QUEUE; 
2888            last = &t->link, last_tso = t, t = t->link) {
2889         if (t == (StgBlockingQueueElement *)tso) {
2890           *last = (StgBlockingQueueElement *)tso->link;
2891           if (mvar->tail == tso) {
2892             mvar->tail = (StgTSO *)last_tso;
2893           }
2894           goto done;
2895         }
2896       }
2897       barf("unblockThread (MVAR): TSO not found");
2898     }
2899
2900   case BlockedOnBlackHole:
2901     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2902     {
2903       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2904
2905       last = &bq->blocking_queue;
2906       for (t = bq->blocking_queue; 
2907            t != END_BQ_QUEUE; 
2908            last = &t->link, t = t->link) {
2909         if (t == (StgBlockingQueueElement *)tso) {
2910           *last = (StgBlockingQueueElement *)tso->link;
2911           goto done;
2912         }
2913       }
2914       barf("unblockThread (BLACKHOLE): TSO not found");
2915     }
2916
2917   case BlockedOnException:
2918     {
2919       StgTSO *target  = tso->block_info.tso;
2920
2921       ASSERT(get_itbl(target)->type == TSO);
2922
2923       if (target->what_next == ThreadRelocated) {
2924           target = target->link;
2925           ASSERT(get_itbl(target)->type == TSO);
2926       }
2927
2928       ASSERT(target->blocked_exceptions != NULL);
2929
2930       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2931       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2932            t != END_BQ_QUEUE; 
2933            last = &t->link, t = t->link) {
2934         ASSERT(get_itbl(t)->type == TSO);
2935         if (t == (StgBlockingQueueElement *)tso) {
2936           *last = (StgBlockingQueueElement *)tso->link;
2937           goto done;
2938         }
2939       }
2940       barf("unblockThread (Exception): TSO not found");
2941     }
2942
2943   case BlockedOnRead:
2944   case BlockedOnWrite:
2945     {
2946       /* take TSO off blocked_queue */
2947       StgBlockingQueueElement *prev = NULL;
2948       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2949            prev = t, t = t->link) {
2950         if (t == (StgBlockingQueueElement *)tso) {
2951           if (prev == NULL) {
2952             blocked_queue_hd = (StgTSO *)t->link;
2953             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2954               blocked_queue_tl = END_TSO_QUEUE;
2955             }
2956           } else {
2957             prev->link = t->link;
2958             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2959               blocked_queue_tl = (StgTSO *)prev;
2960             }
2961           }
2962           goto done;
2963         }
2964       }
2965       barf("unblockThread (I/O): TSO not found");
2966     }
2967
2968   case BlockedOnDelay:
2969     {
2970       /* take TSO off sleeping_queue */
2971       StgBlockingQueueElement *prev = NULL;
2972       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2973            prev = t, t = t->link) {
2974         if (t == (StgBlockingQueueElement *)tso) {
2975           if (prev == NULL) {
2976             sleeping_queue = (StgTSO *)t->link;
2977           } else {
2978             prev->link = t->link;
2979           }
2980           goto done;
2981         }
2982       }
2983       barf("unblockThread (I/O): TSO not found");
2984     }
2985
2986   default:
2987     barf("unblockThread");
2988   }
2989
2990  done:
2991   tso->link = END_TSO_QUEUE;
2992   tso->why_blocked = NotBlocked;
2993   tso->block_info.closure = NULL;
2994   PUSH_ON_RUN_QUEUE(tso);
2995 }
2996 #else
2997 static void
2998 unblockThread(StgTSO *tso)
2999 {
3000   StgTSO *t, **last;
3001   
3002   /* To avoid locking unnecessarily. */
3003   if (tso->why_blocked == NotBlocked) {
3004     return;
3005   }
3006
3007   switch (tso->why_blocked) {
3008
3009   case BlockedOnMVar:
3010     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3011     {
3012       StgTSO *last_tso = END_TSO_QUEUE;
3013       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3014
3015       last = &mvar->head;
3016       for (t = mvar->head; t != END_TSO_QUEUE; 
3017            last = &t->link, last_tso = t, t = t->link) {
3018         if (t == tso) {
3019           *last = tso->link;
3020           if (mvar->tail == tso) {
3021             mvar->tail = last_tso;
3022           }
3023           goto done;
3024         }
3025       }
3026       barf("unblockThread (MVAR): TSO not found");
3027     }
3028
3029   case BlockedOnBlackHole:
3030     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3031     {
3032       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3033
3034       last = &bq->blocking_queue;
3035       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3036            last = &t->link, t = t->link) {
3037         if (t == tso) {
3038           *last = tso->link;
3039           goto done;
3040         }
3041       }
3042       barf("unblockThread (BLACKHOLE): TSO not found");
3043     }
3044
3045   case BlockedOnException:
3046     {
3047       StgTSO *target  = tso->block_info.tso;
3048
3049       ASSERT(get_itbl(target)->type == TSO);
3050
3051       while (target->what_next == ThreadRelocated) {
3052           target = target->link;
3053           ASSERT(get_itbl(target)->type == TSO);
3054       }
3055       
3056       ASSERT(target->blocked_exceptions != NULL);
3057
3058       last = &target->blocked_exceptions;
3059       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3060            last = &t->link, t = t->link) {
3061         ASSERT(get_itbl(t)->type == TSO);
3062         if (t == tso) {
3063           *last = tso->link;
3064           goto done;
3065         }
3066       }
3067       barf("unblockThread (Exception): TSO not found");
3068     }
3069
3070   case BlockedOnRead:
3071   case BlockedOnWrite:
3072     {
3073       StgTSO *prev = NULL;
3074       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3075            prev = t, t = t->link) {
3076         if (t == tso) {
3077           if (prev == NULL) {
3078             blocked_queue_hd = t->link;
3079             if (blocked_queue_tl == t) {
3080               blocked_queue_tl = END_TSO_QUEUE;
3081             }
3082           } else {
3083             prev->link = t->link;
3084             if (blocked_queue_tl == t) {
3085               blocked_queue_tl = prev;
3086             }
3087           }
3088           goto done;
3089         }
3090       }
3091       barf("unblockThread (I/O): TSO not found");
3092     }
3093
3094   case BlockedOnDelay:
3095     {
3096       StgTSO *prev = NULL;
3097       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3098            prev = t, t = t->link) {
3099         if (t == tso) {
3100           if (prev == NULL) {
3101             sleeping_queue = t->link;
3102           } else {
3103             prev->link = t->link;
3104           }
3105           goto done;
3106         }
3107       }
3108       barf("unblockThread (I/O): TSO not found");
3109     }
3110
3111   default:
3112     barf("unblockThread");
3113   }
3114
3115  done:
3116   tso->link = END_TSO_QUEUE;
3117   tso->why_blocked = NotBlocked;
3118   tso->block_info.closure = NULL;
3119   PUSH_ON_RUN_QUEUE(tso);
3120 }
3121 #endif
3122
3123 /* -----------------------------------------------------------------------------
3124  * raiseAsync()
3125  *
3126  * The following function implements the magic for raising an
3127  * asynchronous exception in an existing thread.
3128  *
3129  * We first remove the thread from any queue on which it might be
3130  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3131  *
3132  * We strip the stack down to the innermost CATCH_FRAME, building
3133  * thunks in the heap for all the active computations, so they can 
3134  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3135  * an application of the handler to the exception, and push it on
3136  * the top of the stack.
3137  * 
3138  * How exactly do we save all the active computations?  We create an
3139  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3140  * AP_UPDs pushes everything from the corresponding update frame
3141  * upwards onto the stack.  (Actually, it pushes everything up to the
3142  * next update frame plus a pointer to the next AP_UPD object.
3143  * Entering the next AP_UPD object pushes more onto the stack until we
3144  * reach the last AP_UPD object - at which point the stack should look
3145  * exactly as it did when we killed the TSO and we can continue
3146  * execution by entering the closure on top of the stack.
3147  *
3148  * We can also kill a thread entirely - this happens if either (a) the 
3149  * exception passed to raiseAsync is NULL, or (b) there's no
3150  * CATCH_FRAME on the stack.  In either case, we strip the entire
3151  * stack and replace the thread with a zombie.
3152  *
3153  * Locks: sched_mutex held upon entry nor exit.
3154  *
3155  * -------------------------------------------------------------------------- */
3156  
3157 void 
3158 deleteThread(StgTSO *tso)
3159 {
3160   raiseAsync(tso,NULL);
3161 }
3162
3163 void
3164 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3165 {
3166   /* When raising async exs from contexts where sched_mutex isn't held;
3167      use raiseAsyncWithLock(). */
3168   ACQUIRE_LOCK(&sched_mutex);
3169   raiseAsync(tso,exception);
3170   RELEASE_LOCK(&sched_mutex);
3171 }
3172
3173 void
3174 raiseAsync(StgTSO *tso, StgClosure *exception)
3175 {
3176   StgUpdateFrame* su = tso->su;
3177   StgPtr          sp = tso->sp;
3178   
3179   /* Thread already dead? */
3180   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3181     return;
3182   }
3183
3184   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3185
3186   /* Remove it from any blocking queues */
3187   unblockThread(tso);
3188
3189   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3190   /* The stack freezing code assumes there's a closure pointer on
3191    * the top of the stack.  This isn't always the case with compiled
3192    * code, so we have to push a dummy closure on the top which just
3193    * returns to the next return address on the stack.
3194    */
3195   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3196     *(--sp) = (W_)&stg_dummy_ret_closure;
3197   }
3198
3199   while (1) {
3200     nat words = ((P_)su - (P_)sp) - 1;
3201     nat i;
3202     StgAP_UPD * ap;
3203
3204     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3205      * then build the THUNK raise(exception), and leave it on
3206      * top of the CATCH_FRAME ready to enter.
3207      */
3208     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3209 #ifdef PROFILING
3210       StgCatchFrame *cf = (StgCatchFrame *)su;
3211 #endif
3212       StgClosure *raise;
3213
3214       /* we've got an exception to raise, so let's pass it to the
3215        * handler in this frame.
3216        */
3217       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3218       TICK_ALLOC_SE_THK(1,0);
3219       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3220       raise->payload[0] = exception;
3221
3222       /* throw away the stack from Sp up to the CATCH_FRAME.
3223        */
3224       sp = (P_)su - 1;
3225
3226       /* Ensure that async excpetions are blocked now, so we don't get
3227        * a surprise exception before we get around to executing the
3228        * handler.
3229        */
3230       if (tso->blocked_exceptions == NULL) {
3231           tso->blocked_exceptions = END_TSO_QUEUE;
3232       }
3233
3234       /* Put the newly-built THUNK on top of the stack, ready to execute
3235        * when the thread restarts.
3236        */
3237       sp[0] = (W_)raise;
3238       tso->sp = sp;
3239       tso->su = su;
3240       tso->what_next = ThreadEnterGHC;
3241       IF_DEBUG(sanity, checkTSO(tso));
3242       return;
3243     }
3244
3245     /* First build an AP_UPD consisting of the stack chunk above the
3246      * current update frame, with the top word on the stack as the
3247      * fun field.
3248      */
3249     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3250     
3251     ASSERT(words >= 0);
3252     
3253     ap->n_args = words;
3254     ap->fun    = (StgClosure *)sp[0];
3255     sp++;
3256     for(i=0; i < (nat)words; ++i) {
3257       ap->payload[i] = (StgClosure *)*sp++;
3258     }
3259     
3260     switch (get_itbl(su)->type) {
3261       
3262     case UPDATE_FRAME:
3263       {
3264         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3265         TICK_ALLOC_UP_THK(words+1,0);
3266         
3267         IF_DEBUG(scheduler,
3268                  fprintf(stderr,  "scheduler: Updating ");
3269                  printPtr((P_)su->updatee); 
3270                  fprintf(stderr,  " with ");
3271                  printObj((StgClosure *)ap);
3272                  );
3273         
3274         /* Replace the updatee with an indirection - happily
3275          * this will also wake up any threads currently
3276          * waiting on the result.
3277          *
3278          * Warning: if we're in a loop, more than one update frame on
3279          * the stack may point to the same object.  Be careful not to
3280          * overwrite an IND_OLDGEN in this case, because we'll screw
3281          * up the mutable lists.  To be on the safe side, don't
3282          * overwrite any kind of indirection at all.  See also
3283          * threadSqueezeStack in GC.c, where we have to make a similar
3284          * check.
3285          */
3286         if (!closure_IND(su->updatee)) {
3287             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3288         }
3289         su = su->link;
3290         sp += sizeofW(StgUpdateFrame) -1;
3291         sp[0] = (W_)ap; /* push onto stack */
3292         break;
3293       }
3294
3295     case CATCH_FRAME:
3296       {
3297         StgCatchFrame *cf = (StgCatchFrame *)su;
3298         StgClosure* o;
3299         
3300         /* We want a PAP, not an AP_UPD.  Fortunately, the
3301          * layout's the same.
3302          */
3303         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3304         TICK_ALLOC_UPD_PAP(words+1,0);
3305         
3306         /* now build o = FUN(catch,ap,handler) */
3307         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3308         TICK_ALLOC_FUN(2,0);
3309         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3310         o->payload[0] = (StgClosure *)ap;
3311         o->payload[1] = cf->handler;
3312         
3313         IF_DEBUG(scheduler,
3314                  fprintf(stderr,  "scheduler: Built ");
3315                  printObj((StgClosure *)o);
3316                  );
3317         
3318         /* pop the old handler and put o on the stack */
3319         su = cf->link;
3320         sp += sizeofW(StgCatchFrame) - 1;
3321         sp[0] = (W_)o;
3322         break;
3323       }
3324       
3325     case SEQ_FRAME:
3326       {
3327         StgSeqFrame *sf = (StgSeqFrame *)su;
3328         StgClosure* o;
3329         
3330         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3331         TICK_ALLOC_UPD_PAP(words+1,0);
3332         
3333         /* now build o = FUN(seq,ap) */
3334         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3335         TICK_ALLOC_SE_THK(1,0);
3336         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3337         o->payload[0] = (StgClosure *)ap;
3338         
3339         IF_DEBUG(scheduler,
3340                  fprintf(stderr,  "scheduler: Built ");
3341                  printObj((StgClosure *)o);
3342                  );
3343         
3344         /* pop the old handler and put o on the stack */
3345         su = sf->link;
3346         sp += sizeofW(StgSeqFrame) - 1;
3347         sp[0] = (W_)o;
3348         break;
3349       }
3350       
3351     case STOP_FRAME:
3352       /* We've stripped the entire stack, the thread is now dead. */
3353       sp += sizeofW(StgStopFrame) - 1;
3354       sp[0] = (W_)exception;    /* save the exception */
3355       tso->what_next = ThreadKilled;
3356       tso->su = (StgUpdateFrame *)(sp+1);
3357       tso->sp = sp;
3358       return;
3359
3360     default:
3361       barf("raiseAsync");
3362     }
3363   }
3364   barf("raiseAsync");
3365 }
3366
3367 /* -----------------------------------------------------------------------------
3368    resurrectThreads is called after garbage collection on the list of
3369    threads found to be garbage.  Each of these threads will be woken
3370    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3371    on an MVar, or NonTermination if the thread was blocked on a Black
3372    Hole.
3373
3374    Locks: sched_mutex isn't held upon entry nor exit.
3375    -------------------------------------------------------------------------- */
3376
3377 void
3378 resurrectThreads( StgTSO *threads )
3379 {
3380   StgTSO *tso, *next;
3381
3382   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3383     next = tso->global_link;
3384     tso->global_link = all_threads;
3385     all_threads = tso;
3386     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3387
3388     switch (tso->why_blocked) {
3389     case BlockedOnMVar:
3390     case BlockedOnException:
3391       /* Called by GC - sched_mutex lock is currently held. */
3392       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3393       break;
3394     case BlockedOnBlackHole:
3395       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3396       break;
3397     case NotBlocked:
3398       /* This might happen if the thread was blocked on a black hole
3399        * belonging to a thread that we've just woken up (raiseAsync
3400        * can wake up threads, remember...).
3401        */
3402       continue;
3403     default:
3404       barf("resurrectThreads: thread blocked in a strange way");
3405     }
3406   }
3407 }
3408
3409 /* -----------------------------------------------------------------------------
3410  * Blackhole detection: if we reach a deadlock, test whether any
3411  * threads are blocked on themselves.  Any threads which are found to
3412  * be self-blocked get sent a NonTermination exception.
3413  *
3414  * This is only done in a deadlock situation in order to avoid
3415  * performance overhead in the normal case.
3416  *
3417  * Locks: sched_mutex is held upon entry and exit.
3418  * -------------------------------------------------------------------------- */
3419
3420 static void
3421 detectBlackHoles( void )
3422 {
3423     StgTSO *t = all_threads;
3424     StgUpdateFrame *frame;
3425     StgClosure *blocked_on;
3426
3427     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3428
3429         while (t->what_next == ThreadRelocated) {
3430             t = t->link;
3431             ASSERT(get_itbl(t)->type == TSO);
3432         }
3433       
3434         if (t->why_blocked != BlockedOnBlackHole) {
3435             continue;
3436         }
3437
3438         blocked_on = t->block_info.closure;
3439
3440         for (frame = t->su; ; frame = frame->link) {
3441             switch (get_itbl(frame)->type) {
3442
3443             case UPDATE_FRAME:
3444                 if (frame->updatee == blocked_on) {
3445                     /* We are blocking on one of our own computations, so
3446                      * send this thread the NonTermination exception.  
3447                      */
3448                     IF_DEBUG(scheduler, 
3449                              sched_belch("thread %d is blocked on itself", t->id));
3450                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3451                     goto done;
3452                 }
3453                 else {
3454                     continue;
3455                 }
3456
3457             case CATCH_FRAME:
3458             case SEQ_FRAME:
3459                 continue;
3460                 
3461             case STOP_FRAME:
3462                 break;
3463             }
3464             break;
3465         }
3466
3467     done: ;
3468     }   
3469 }
3470
3471 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3472 //@subsection Debugging Routines
3473
3474 /* -----------------------------------------------------------------------------
3475    Debugging: why is a thread blocked
3476    -------------------------------------------------------------------------- */
3477
3478 #ifdef DEBUG
3479
3480 void
3481 printThreadBlockage(StgTSO *tso)
3482 {
3483   switch (tso->why_blocked) {
3484   case BlockedOnRead:
3485     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3486     break;
3487   case BlockedOnWrite:
3488     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3489     break;
3490   case BlockedOnDelay:
3491     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3492     break;
3493   case BlockedOnMVar:
3494     fprintf(stderr,"is blocked on an MVar");
3495     break;
3496   case BlockedOnException:
3497     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3498             tso->block_info.tso->id);
3499     break;
3500   case BlockedOnBlackHole:
3501     fprintf(stderr,"is blocked on a black hole");
3502     break;
3503   case NotBlocked:
3504     fprintf(stderr,"is not blocked");
3505     break;
3506 #if defined(PAR)
3507   case BlockedOnGA:
3508     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3509             tso->block_info.closure, info_type(tso->block_info.closure));
3510     break;
3511   case BlockedOnGA_NoSend:
3512     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3513             tso->block_info.closure, info_type(tso->block_info.closure));
3514     break;
3515 #endif
3516 #if defined(RTS_SUPPORTS_THREADS)
3517   case BlockedOnCCall:
3518     fprintf(stderr,"is blocked on an external call");
3519     break;
3520 #endif
3521   default:
3522     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3523          tso->why_blocked, tso->id, tso);
3524   }
3525 }
3526
3527 void
3528 printThreadStatus(StgTSO *tso)
3529 {
3530   switch (tso->what_next) {
3531   case ThreadKilled:
3532     fprintf(stderr,"has been killed");
3533     break;
3534   case ThreadComplete:
3535     fprintf(stderr,"has completed");
3536     break;
3537   default:
3538     printThreadBlockage(tso);
3539   }
3540 }
3541
3542 void
3543 printAllThreads(void)
3544 {
3545   StgTSO *t;
3546
3547 # if defined(GRAN)
3548   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3549   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3550                        time_string, rtsFalse/*no commas!*/);
3551
3552   sched_belch("all threads at [%s]:", time_string);
3553 # elif defined(PAR)
3554   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3555   ullong_format_string(CURRENT_TIME,
3556                        time_string, rtsFalse/*no commas!*/);
3557
3558   sched_belch("all threads at [%s]:", time_string);
3559 # else
3560   sched_belch("all threads:");
3561 # endif
3562
3563   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3564     fprintf(stderr, "\tthread %d ", t->id);
3565     if (t->label) fprintf(stderr,"[\"%s\"] ",t->label);
3566     printThreadStatus(t);
3567     fprintf(stderr,"\n");
3568   }
3569 }
3570     
3571 /* 
3572    Print a whole blocking queue attached to node (debugging only).
3573 */
3574 //@cindex print_bq
3575 # if defined(PAR)
3576 void 
3577 print_bq (StgClosure *node)
3578 {
3579   StgBlockingQueueElement *bqe;
3580   StgTSO *tso;
3581   rtsBool end;
3582
3583   fprintf(stderr,"## BQ of closure %p (%s): ",
3584           node, info_type(node));
3585
3586   /* should cover all closures that may have a blocking queue */
3587   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3588          get_itbl(node)->type == FETCH_ME_BQ ||
3589          get_itbl(node)->type == RBH ||
3590          get_itbl(node)->type == MVAR);
3591     
3592   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3593
3594   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3595 }
3596
3597 /* 
3598    Print a whole blocking queue starting with the element bqe.
3599 */
3600 void 
3601 print_bqe (StgBlockingQueueElement *bqe)
3602 {
3603   rtsBool end;
3604
3605   /* 
3606      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3607   */
3608   for (end = (bqe==END_BQ_QUEUE);
3609        !end; // iterate until bqe points to a CONSTR
3610        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3611        bqe = end ? END_BQ_QUEUE : bqe->link) {
3612     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3613     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3614     /* types of closures that may appear in a blocking queue */
3615     ASSERT(get_itbl(bqe)->type == TSO ||           
3616            get_itbl(bqe)->type == BLOCKED_FETCH || 
3617            get_itbl(bqe)->type == CONSTR); 
3618     /* only BQs of an RBH end with an RBH_Save closure */
3619     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3620
3621     switch (get_itbl(bqe)->type) {
3622     case TSO:
3623       fprintf(stderr," TSO %u (%x),",
3624               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3625       break;
3626     case BLOCKED_FETCH:
3627       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3628               ((StgBlockedFetch *)bqe)->node, 
3629               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3630               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3631               ((StgBlockedFetch *)bqe)->ga.weight);
3632       break;
3633     case CONSTR:
3634       fprintf(stderr," %s (IP %p),",
3635               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3636                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3637                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3638                "RBH_Save_?"), get_itbl(bqe));
3639       break;
3640     default:
3641       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3642            info_type((StgClosure *)bqe)); // , node, info_type(node));
3643       break;
3644     }
3645   } /* for */
3646   fputc('\n', stderr);
3647 }
3648 # elif defined(GRAN)
3649 void 
3650 print_bq (StgClosure *node)
3651 {
3652   StgBlockingQueueElement *bqe;
3653   PEs node_loc, tso_loc;
3654   rtsBool end;
3655
3656   /* should cover all closures that may have a blocking queue */
3657   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3658          get_itbl(node)->type == FETCH_ME_BQ ||
3659          get_itbl(node)->type == RBH);
3660     
3661   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3662   node_loc = where_is(node);
3663
3664   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3665           node, info_type(node), node_loc);
3666
3667   /* 
3668      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3669   */
3670   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3671        !end; // iterate until bqe points to a CONSTR
3672        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3673     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3674     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3675     /* types of closures that may appear in a blocking queue */
3676     ASSERT(get_itbl(bqe)->type == TSO ||           
3677            get_itbl(bqe)->type == CONSTR); 
3678     /* only BQs of an RBH end with an RBH_Save closure */
3679     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3680
3681     tso_loc = where_is((StgClosure *)bqe);
3682     switch (get_itbl(bqe)->type) {
3683     case TSO:
3684       fprintf(stderr," TSO %d (%p) on [PE %d],",
3685               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3686       break;
3687     case CONSTR:
3688       fprintf(stderr," %s (IP %p),",
3689               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3690                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3691                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3692                "RBH_Save_?"), get_itbl(bqe));
3693       break;
3694     default:
3695       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3696            info_type((StgClosure *)bqe), node, info_type(node));
3697       break;
3698     }
3699   } /* for */
3700   fputc('\n', stderr);
3701 }
3702 #else
3703 /* 
3704    Nice and easy: only TSOs on the blocking queue
3705 */
3706 void 
3707 print_bq (StgClosure *node)
3708 {
3709   StgTSO *tso;
3710
3711   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3712   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3713        tso != END_TSO_QUEUE; 
3714        tso=tso->link) {
3715     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3716     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3717     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3718   }
3719   fputc('\n', stderr);
3720 }
3721 # endif
3722
3723 #if defined(PAR)
3724 static nat
3725 run_queue_len(void)
3726 {
3727   nat i;
3728   StgTSO *tso;
3729
3730   for (i=0, tso=run_queue_hd; 
3731        tso != END_TSO_QUEUE;
3732        i++, tso=tso->link)
3733     /* nothing */
3734
3735   return i;
3736 }
3737 #endif
3738
3739 static void
3740 sched_belch(char *s, ...)
3741 {
3742   va_list ap;
3743   va_start(ap,s);
3744 #ifdef SMP
3745   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3746 #elif defined(PAR)
3747   fprintf(stderr, "== ");
3748 #else
3749   fprintf(stderr, "scheduler: ");
3750 #endif
3751   vfprintf(stderr, s, ap);
3752   fprintf(stderr, "\n");
3753 }
3754
3755 #endif /* DEBUG */
3756
3757
3758 //@node Index,  , Debugging Routines, Main scheduling code
3759 //@subsection Index
3760
3761 //@index
3762 //* StgMainThread::  @cindex\s-+StgMainThread
3763 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3764 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3765 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3766 //* context_switch::  @cindex\s-+context_switch
3767 //* createThread::  @cindex\s-+createThread
3768 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3769 //* initScheduler::  @cindex\s-+initScheduler
3770 //* interrupted::  @cindex\s-+interrupted
3771 //* next_thread_id::  @cindex\s-+next_thread_id
3772 //* print_bq::  @cindex\s-+print_bq
3773 //* run_queue_hd::  @cindex\s-+run_queue_hd
3774 //* run_queue_tl::  @cindex\s-+run_queue_tl
3775 //* sched_mutex::  @cindex\s-+sched_mutex
3776 //* schedule::  @cindex\s-+schedule
3777 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3778 //* term_mutex::  @cindex\s-+term_mutex
3779 //@end index