[project @ 2002-04-23 14:20:18 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.140 2002/04/23 14:20:18 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #ifdef HAVE_SYS_TYPES_H
118 #include <sys/types.h>
119 #endif
120 #ifdef HAVE_UNISTD_H
121 #include <unistd.h>
122 #endif
123
124 #include <stdarg.h>
125
126 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
127 //@subsection Variables and Data structures
128
129 /* Main thread queue.
130  * Locks required: sched_mutex.
131  */
132 StgMainThread *main_threads;
133
134 /* Thread queues.
135  * Locks required: sched_mutex.
136  */
137 #if defined(GRAN)
138
139 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
140 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
141
142 /* 
143    In GranSim we have a runnable and a blocked queue for each processor.
144    In order to minimise code changes new arrays run_queue_hds/tls
145    are created. run_queue_hd is then a short cut (macro) for
146    run_queue_hds[CurrentProc] (see GranSim.h).
147    -- HWL
148 */
149 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
150 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
151 StgTSO *ccalling_threadss[MAX_PROC];
152 /* We use the same global list of threads (all_threads) in GranSim as in
153    the std RTS (i.e. we are cheating). However, we don't use this list in
154    the GranSim specific code at the moment (so we are only potentially
155    cheating).  */
156
157 #else /* !GRAN */
158
159 StgTSO *run_queue_hd, *run_queue_tl;
160 StgTSO *blocked_queue_hd, *blocked_queue_tl;
161 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
162
163 #endif
164
165 /* Linked list of all threads.
166  * Used for detecting garbage collected threads.
167  */
168 StgTSO *all_threads;
169
170 /* When a thread performs a safe C call (_ccall_GC, using old
171  * terminology), it gets put on the suspended_ccalling_threads
172  * list. Used by the garbage collector.
173  */
174 static StgTSO *suspended_ccalling_threads;
175
176 static StgTSO *threadStackOverflow(StgTSO *tso);
177
178 /* KH: The following two flags are shared memory locations.  There is no need
179        to lock them, since they are only unset at the end of a scheduler
180        operation.
181 */
182
183 /* flag set by signal handler to precipitate a context switch */
184 //@cindex context_switch
185 nat context_switch;
186
187 /* if this flag is set as well, give up execution */
188 //@cindex interrupted
189 rtsBool interrupted;
190
191 /* Next thread ID to allocate.
192  * Locks required: sched_mutex
193  */
194 //@cindex next_thread_id
195 StgThreadID next_thread_id = 1;
196
197 /*
198  * Pointers to the state of the current thread.
199  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
200  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
201  */
202  
203 /* The smallest stack size that makes any sense is:
204  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
205  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
206  *  + 1                       (the realworld token for an IO thread)
207  *  + 1                       (the closure to enter)
208  *
209  * A thread with this stack will bomb immediately with a stack
210  * overflow, which will increase its stack size.  
211  */
212
213 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
214
215
216 #if defined(GRAN)
217 StgTSO *CurrentTSO;
218 #endif
219
220 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
221  *  exists - earlier gccs apparently didn't.
222  *  -= chak
223  */
224 StgTSO dummy_tso;
225
226 rtsBool ready_to_gc;
227
228 void            addToBlockedQueue ( StgTSO *tso );
229
230 static void     schedule          ( void );
231        void     interruptStgRts   ( void );
232 #if defined(GRAN)
233 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
234 #else
235 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
236 #endif
237
238 static void     detectBlackHoles  ( void );
239
240 #ifdef DEBUG
241 static void sched_belch(char *s, ...);
242 #endif
243
244 #if defined(RTS_SUPPORTS_THREADS)
245 /* ToDo: carefully document the invariants that go together
246  *       with these synchronisation objects.
247  */
248 Mutex     sched_mutex       = INIT_MUTEX_VAR;
249 Mutex     term_mutex        = INIT_MUTEX_VAR;
250
251 # if defined(SMP)
252 static Condition gc_pending_cond = INIT_COND_VAR;
253 nat await_death;
254 # endif
255
256 #endif /* RTS_SUPPORTS_THREADS */
257
258 #if defined(PAR)
259 StgTSO *LastTSO;
260 rtsTime TimeOfLastYield;
261 rtsBool emitSchedule = rtsTrue;
262 #endif
263
264 #if DEBUG
265 char *whatNext_strs[] = {
266   "ThreadEnterGHC",
267   "ThreadRunGHC",
268   "ThreadEnterInterp",
269   "ThreadKilled",
270   "ThreadComplete"
271 };
272
273 char *threadReturnCode_strs[] = {
274   "HeapOverflow",                       /* might also be StackOverflow */
275   "StackOverflow",
276   "ThreadYielding",
277   "ThreadBlocked",
278   "ThreadFinished"
279 };
280 #endif
281
282 #if defined(PAR)
283 StgTSO * createSparkThread(rtsSpark spark);
284 StgTSO * activateSpark (rtsSpark spark);  
285 #endif
286
287 /*
288  * The thread state for the main thread.
289 // ToDo: check whether not needed any more
290 StgTSO   *MainTSO;
291  */
292
293 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
294 static void taskStart(void);
295 static void
296 taskStart(void)
297 {
298   schedule();
299 }
300 #endif
301
302
303
304
305 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
306 //@subsection Main scheduling loop
307
308 /* ---------------------------------------------------------------------------
309    Main scheduling loop.
310
311    We use round-robin scheduling, each thread returning to the
312    scheduler loop when one of these conditions is detected:
313
314       * out of heap space
315       * timer expires (thread yields)
316       * thread blocks
317       * thread ends
318       * stack overflow
319
320    Locking notes:  we acquire the scheduler lock once at the beginning
321    of the scheduler loop, and release it when
322     
323       * running a thread, or
324       * waiting for work, or
325       * waiting for a GC to complete.
326
327    GRAN version:
328      In a GranSim setup this loop iterates over the global event queue.
329      This revolves around the global event queue, which determines what 
330      to do next. Therefore, it's more complicated than either the 
331      concurrent or the parallel (GUM) setup.
332
333    GUM version:
334      GUM iterates over incoming messages.
335      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
336      and sends out a fish whenever it has nothing to do; in-between
337      doing the actual reductions (shared code below) it processes the
338      incoming messages and deals with delayed operations 
339      (see PendingFetches).
340      This is not the ugliest code you could imagine, but it's bloody close.
341
342    ------------------------------------------------------------------------ */
343 //@cindex schedule
344 static void
345 schedule( void )
346 {
347   StgTSO *t;
348   Capability *cap;
349   StgThreadReturnCode ret;
350 #if defined(GRAN)
351   rtsEvent *event;
352 #elif defined(PAR)
353   StgSparkPool *pool;
354   rtsSpark spark;
355   StgTSO *tso;
356   GlobalTaskId pe;
357   rtsBool receivedFinish = rtsFalse;
358 # if defined(DEBUG)
359   nat tp_size, sp_size; // stats only
360 # endif
361 #endif
362   rtsBool was_interrupted = rtsFalse;
363   
364   ACQUIRE_LOCK(&sched_mutex);
365  
366 #if defined(RTS_SUPPORTS_THREADS)
367   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
368 #else
369   /* simply initialise it in the non-threaded case */
370   grabCapability(&cap);
371 #endif
372
373 #if defined(GRAN)
374   /* set up first event to get things going */
375   /* ToDo: assign costs for system setup and init MainTSO ! */
376   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
377             ContinueThread, 
378             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
379
380   IF_DEBUG(gran,
381            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
382            G_TSO(CurrentTSO, 5));
383
384   if (RtsFlags.GranFlags.Light) {
385     /* Save current time; GranSim Light only */
386     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
387   }      
388
389   event = get_next_event();
390
391   while (event!=(rtsEvent*)NULL) {
392     /* Choose the processor with the next event */
393     CurrentProc = event->proc;
394     CurrentTSO = event->tso;
395
396 #elif defined(PAR)
397
398   while (!receivedFinish) {    /* set by processMessages */
399                                /* when receiving PP_FINISH message         */ 
400 #else
401
402   while (1) {
403
404 #endif
405
406     IF_DEBUG(scheduler, printAllThreads());
407
408 #if defined(RTS_SUPPORTS_THREADS)
409     /* Check to see whether there are any worker threads
410        waiting to deposit external call results. If so,
411        yield our capability */
412     yieldToReturningWorker(&sched_mutex, &cap);
413 #endif
414
415     /* If we're interrupted (the user pressed ^C, or some other
416      * termination condition occurred), kill all the currently running
417      * threads.
418      */
419     if (interrupted) {
420       IF_DEBUG(scheduler, sched_belch("interrupted"));
421       deleteAllThreads();
422       interrupted = rtsFalse;
423       was_interrupted = rtsTrue;
424     }
425
426     /* Go through the list of main threads and wake up any
427      * clients whose computations have finished.  ToDo: this
428      * should be done more efficiently without a linear scan
429      * of the main threads list, somehow...
430      */
431 #if defined(RTS_SUPPORTS_THREADS)
432     { 
433       StgMainThread *m, **prev;
434       prev = &main_threads;
435       for (m = main_threads; m != NULL; m = m->link) {
436         switch (m->tso->what_next) {
437         case ThreadComplete:
438           if (m->ret) {
439             *(m->ret) = (StgClosure *)m->tso->sp[0];
440           }
441           *prev = m->link;
442           m->stat = Success;
443           broadcastCondition(&m->wakeup);
444 #ifdef DEBUG
445           free(m->tso->label);
446           m->tso->label = NULL;
447 #endif
448           break;
449         case ThreadKilled:
450           if (m->ret) *(m->ret) = NULL;
451           *prev = m->link;
452           if (was_interrupted) {
453             m->stat = Interrupted;
454           } else {
455             m->stat = Killed;
456           }
457           broadcastCondition(&m->wakeup);
458 #ifdef DEBUG
459           free(m->tso->label);
460           m->tso->label = NULL;
461 #endif
462           break;
463         default:
464           break;
465         }
466       }
467     }
468
469 #else /* not threaded */
470
471 # if defined(PAR)
472     /* in GUM do this only on the Main PE */
473     if (IAmMainThread)
474 # endif
475     /* If our main thread has finished or been killed, return.
476      */
477     {
478       StgMainThread *m = main_threads;
479       if (m->tso->what_next == ThreadComplete
480           || m->tso->what_next == ThreadKilled) {
481 #ifdef DEBUG
482         free(m->tso->label);
483         m->tso->label = NULL;
484 #endif
485         main_threads = main_threads->link;
486         if (m->tso->what_next == ThreadComplete) {
487           /* we finished successfully, fill in the return value */
488           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
489           m->stat = Success;
490           return;
491         } else {
492           if (m->ret) { *(m->ret) = NULL; };
493           if (was_interrupted) {
494             m->stat = Interrupted;
495           } else {
496             m->stat = Killed;
497           }
498           return;
499         }
500       }
501     }
502 #endif
503
504     /* Top up the run queue from our spark pool.  We try to make the
505      * number of threads in the run queue equal to the number of
506      * free capabilities.
507      *
508      * Disable spark support in SMP for now, non-essential & requires
509      * a little bit of work to make it compile cleanly. -- sof 1/02.
510      */
511 #if 0 /* defined(SMP) */
512     {
513       nat n = getFreeCapabilities();
514       StgTSO *tso = run_queue_hd;
515
516       /* Count the run queue */
517       while (n > 0 && tso != END_TSO_QUEUE) {
518         tso = tso->link;
519         n--;
520       }
521
522       for (; n > 0; n--) {
523         StgClosure *spark;
524         spark = findSpark(rtsFalse);
525         if (spark == NULL) {
526           break; /* no more sparks in the pool */
527         } else {
528           /* I'd prefer this to be done in activateSpark -- HWL */
529           /* tricky - it needs to hold the scheduler lock and
530            * not try to re-acquire it -- SDM */
531           createSparkThread(spark);       
532           IF_DEBUG(scheduler,
533                    sched_belch("==^^ turning spark of closure %p into a thread",
534                                (StgClosure *)spark));
535         }
536       }
537       /* We need to wake up the other tasks if we just created some
538        * work for them.
539        */
540       if (getFreeCapabilities() - n > 1) {
541           signalCondition( &thread_ready_cond );
542       }
543     }
544 #endif // SMP
545
546     /* check for signals each time around the scheduler */
547 #ifndef mingw32_TARGET_OS
548     if (signals_pending()) {
549       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
550       startSignalHandlers();
551       ACQUIRE_LOCK(&sched_mutex);
552     }
553 #endif
554
555     /* Check whether any waiting threads need to be woken up.  If the
556      * run queue is empty, and there are no other tasks running, we
557      * can wait indefinitely for something to happen.
558      * ToDo: what if another client comes along & requests another
559      * main thread?
560      */
561     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
562       awaitEvent( EMPTY_RUN_QUEUE()
563 #if defined(SMP)
564         && allFreeCapabilities()
565 #endif
566         );
567     }
568     /* we can be interrupted while waiting for I/O... */
569     if (interrupted) continue;
570
571     /* 
572      * Detect deadlock: when we have no threads to run, there are no
573      * threads waiting on I/O or sleeping, and all the other tasks are
574      * waiting for work, we must have a deadlock of some description.
575      *
576      * We first try to find threads blocked on themselves (ie. black
577      * holes), and generate NonTermination exceptions where necessary.
578      *
579      * If no threads are black holed, we have a deadlock situation, so
580      * inform all the main threads.
581      */
582 #ifndef PAR
583     if (   EMPTY_THREAD_QUEUES()
584 #if defined(RTS_SUPPORTS_THREADS)
585         && EMPTY_QUEUE(suspended_ccalling_threads)
586 #endif
587 #ifdef SMP
588         && allFreeCapabilities()
589 #endif
590         )
591     {
592         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
593 #if defined(THREADED_RTS)
594         /* and SMP mode ..? */
595         releaseCapability(cap);
596 #endif
597         // Garbage collection can release some new threads due to
598         // either (a) finalizers or (b) threads resurrected because
599         // they are about to be send BlockedOnDeadMVar.  Any threads
600         // thus released will be immediately runnable.
601         GarbageCollect(GetRoots,rtsTrue);
602
603         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
604
605         IF_DEBUG(scheduler, 
606                  sched_belch("still deadlocked, checking for black holes..."));
607         detectBlackHoles();
608
609         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
610
611 #ifndef mingw32_TARGET_OS
612         /* If we have user-installed signal handlers, then wait
613          * for signals to arrive rather then bombing out with a
614          * deadlock.
615          */
616 #if defined(RTS_SUPPORTS_THREADS)
617         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
618                       a signal with no runnable threads (or I/O
619                       suspended ones) leads nowhere quick.
620                       For now, simply shut down when we reach this
621                       condition.
622                       
623                       ToDo: define precisely under what conditions
624                       the Scheduler should shut down in an MT setting.
625                    */
626 #else
627         if ( anyUserHandlers() ) {
628 #endif
629             IF_DEBUG(scheduler, 
630                      sched_belch("still deadlocked, waiting for signals..."));
631
632             awaitUserSignals();
633
634             // we might be interrupted...
635             if (interrupted) { continue; }
636
637             if (signals_pending()) {
638                 RELEASE_LOCK(&sched_mutex);
639                 startSignalHandlers();
640                 ACQUIRE_LOCK(&sched_mutex);
641             }
642             ASSERT(!EMPTY_RUN_QUEUE());
643             goto not_deadlocked;
644         }
645 #endif
646
647         /* Probably a real deadlock.  Send the current main thread the
648          * Deadlock exception (or in the SMP build, send *all* main
649          * threads the deadlock exception, since none of them can make
650          * progress).
651          */
652         {
653             StgMainThread *m;
654 #if defined(RTS_SUPPORTS_THREADS)
655             for (m = main_threads; m != NULL; m = m->link) {
656                 switch (m->tso->why_blocked) {
657                 case BlockedOnBlackHole:
658                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
659                     break;
660                 case BlockedOnException:
661                 case BlockedOnMVar:
662                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
663                     break;
664                 default:
665                     barf("deadlock: main thread blocked in a strange way");
666                 }
667             }
668 #else
669             m = main_threads;
670             switch (m->tso->why_blocked) {
671             case BlockedOnBlackHole:
672                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
673                 break;
674             case BlockedOnException:
675             case BlockedOnMVar:
676                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
677                 break;
678             default:
679                 barf("deadlock: main thread blocked in a strange way");
680             }
681 #endif
682         }
683
684 #if defined(RTS_SUPPORTS_THREADS)
685         /* ToDo: revisit conditions (and mechanism) for shutting
686            down a multi-threaded world  */
687         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
688         shutdownHaskellAndExit(0);
689 #endif
690     }
691   not_deadlocked:
692
693 #elif defined(PAR)
694     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
695 #endif
696
697 #if defined(SMP)
698     /* If there's a GC pending, don't do anything until it has
699      * completed.
700      */
701     if (ready_to_gc) {
702       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
703       waitCondition( &gc_pending_cond, &sched_mutex );
704     }
705 #endif    
706
707 #if defined(RTS_SUPPORTS_THREADS)
708     /* block until we've got a thread on the run queue and a free
709      * capability.
710      *
711      */
712     if ( EMPTY_RUN_QUEUE() ) {
713       /* Give up our capability */
714       releaseCapability(cap);
715       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
716       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
717       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
718 #if 0
719       while ( EMPTY_RUN_QUEUE() ) {
720         waitForWorkCapability(&sched_mutex, &cap);
721         IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
722       }
723 #endif
724     }
725 #endif
726
727 #if defined(GRAN)
728     if (RtsFlags.GranFlags.Light)
729       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
730
731     /* adjust time based on time-stamp */
732     if (event->time > CurrentTime[CurrentProc] &&
733         event->evttype != ContinueThread)
734       CurrentTime[CurrentProc] = event->time;
735     
736     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
737     if (!RtsFlags.GranFlags.Light)
738       handleIdlePEs();
739
740     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
741
742     /* main event dispatcher in GranSim */
743     switch (event->evttype) {
744       /* Should just be continuing execution */
745     case ContinueThread:
746       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
747       /* ToDo: check assertion
748       ASSERT(run_queue_hd != (StgTSO*)NULL &&
749              run_queue_hd != END_TSO_QUEUE);
750       */
751       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
752       if (!RtsFlags.GranFlags.DoAsyncFetch &&
753           procStatus[CurrentProc]==Fetching) {
754         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
755               CurrentTSO->id, CurrentTSO, CurrentProc);
756         goto next_thread;
757       } 
758       /* Ignore ContinueThreads for completed threads */
759       if (CurrentTSO->what_next == ThreadComplete) {
760         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
761               CurrentTSO->id, CurrentTSO, CurrentProc);
762         goto next_thread;
763       } 
764       /* Ignore ContinueThreads for threads that are being migrated */
765       if (PROCS(CurrentTSO)==Nowhere) { 
766         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
767               CurrentTSO->id, CurrentTSO, CurrentProc);
768         goto next_thread;
769       }
770       /* The thread should be at the beginning of the run queue */
771       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
772         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
773               CurrentTSO->id, CurrentTSO, CurrentProc);
774         break; // run the thread anyway
775       }
776       /*
777       new_event(proc, proc, CurrentTime[proc],
778                 FindWork,
779                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
780       goto next_thread; 
781       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
782       break; // now actually run the thread; DaH Qu'vam yImuHbej 
783
784     case FetchNode:
785       do_the_fetchnode(event);
786       goto next_thread;             /* handle next event in event queue  */
787       
788     case GlobalBlock:
789       do_the_globalblock(event);
790       goto next_thread;             /* handle next event in event queue  */
791       
792     case FetchReply:
793       do_the_fetchreply(event);
794       goto next_thread;             /* handle next event in event queue  */
795       
796     case UnblockThread:   /* Move from the blocked queue to the tail of */
797       do_the_unblock(event);
798       goto next_thread;             /* handle next event in event queue  */
799       
800     case ResumeThread:  /* Move from the blocked queue to the tail of */
801       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
802       event->tso->gran.blocktime += 
803         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
804       do_the_startthread(event);
805       goto next_thread;             /* handle next event in event queue  */
806       
807     case StartThread:
808       do_the_startthread(event);
809       goto next_thread;             /* handle next event in event queue  */
810       
811     case MoveThread:
812       do_the_movethread(event);
813       goto next_thread;             /* handle next event in event queue  */
814       
815     case MoveSpark:
816       do_the_movespark(event);
817       goto next_thread;             /* handle next event in event queue  */
818       
819     case FindWork:
820       do_the_findwork(event);
821       goto next_thread;             /* handle next event in event queue  */
822       
823     default:
824       barf("Illegal event type %u\n", event->evttype);
825     }  /* switch */
826     
827     /* This point was scheduler_loop in the old RTS */
828
829     IF_DEBUG(gran, belch("GRAN: after main switch"));
830
831     TimeOfLastEvent = CurrentTime[CurrentProc];
832     TimeOfNextEvent = get_time_of_next_event();
833     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
834     // CurrentTSO = ThreadQueueHd;
835
836     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
837                          TimeOfNextEvent));
838
839     if (RtsFlags.GranFlags.Light) 
840       GranSimLight_leave_system(event, &ActiveTSO); 
841
842     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
843
844     IF_DEBUG(gran, 
845              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
846
847     /* in a GranSim setup the TSO stays on the run queue */
848     t = CurrentTSO;
849     /* Take a thread from the run queue. */
850     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
851
852     IF_DEBUG(gran, 
853              fprintf(stderr, "GRAN: About to run current thread, which is\n");
854              G_TSO(t,5));
855
856     context_switch = 0; // turned on via GranYield, checking events and time slice
857
858     IF_DEBUG(gran, 
859              DumpGranEvent(GR_SCHEDULE, t));
860
861     procStatus[CurrentProc] = Busy;
862
863 #elif defined(PAR)
864     if (PendingFetches != END_BF_QUEUE) {
865         processFetches();
866     }
867
868     /* ToDo: phps merge with spark activation above */
869     /* check whether we have local work and send requests if we have none */
870     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
871       /* :-[  no local threads => look out for local sparks */
872       /* the spark pool for the current PE */
873       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
874       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
875           pool->hd < pool->tl) {
876         /* 
877          * ToDo: add GC code check that we really have enough heap afterwards!!
878          * Old comment:
879          * If we're here (no runnable threads) and we have pending
880          * sparks, we must have a space problem.  Get enough space
881          * to turn one of those pending sparks into a
882          * thread... 
883          */
884
885         spark = findSpark(rtsFalse);                /* get a spark */
886         if (spark != (rtsSpark) NULL) {
887           tso = activateSpark(spark);       /* turn the spark into a thread */
888           IF_PAR_DEBUG(schedule,
889                        belch("==== schedule: Created TSO %d (%p); %d threads active",
890                              tso->id, tso, advisory_thread_count));
891
892           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
893             belch("==^^ failed to activate spark");
894             goto next_thread;
895           }               /* otherwise fall through & pick-up new tso */
896         } else {
897           IF_PAR_DEBUG(verbose,
898                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
899                              spark_queue_len(pool)));
900           goto next_thread;
901         }
902       }
903
904       /* If we still have no work we need to send a FISH to get a spark
905          from another PE 
906       */
907       if (EMPTY_RUN_QUEUE()) {
908       /* =8-[  no local sparks => look for work on other PEs */
909         /*
910          * We really have absolutely no work.  Send out a fish
911          * (there may be some out there already), and wait for
912          * something to arrive.  We clearly can't run any threads
913          * until a SCHEDULE or RESUME arrives, and so that's what
914          * we're hoping to see.  (Of course, we still have to
915          * respond to other types of messages.)
916          */
917         TIME now = msTime() /*CURRENT_TIME*/;
918         IF_PAR_DEBUG(verbose, 
919                      belch("--  now=%ld", now));
920         IF_PAR_DEBUG(verbose,
921                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
922                          (last_fish_arrived_at!=0 &&
923                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
924                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
925                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
926                              last_fish_arrived_at,
927                              RtsFlags.ParFlags.fishDelay, now);
928                      });
929         
930         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
931             (last_fish_arrived_at==0 ||
932              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
933           /* outstandingFishes is set in sendFish, processFish;
934              avoid flooding system with fishes via delay */
935           pe = choosePE();
936           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
937                    NEW_FISH_HUNGER);
938
939           // Global statistics: count no. of fishes
940           if (RtsFlags.ParFlags.ParStats.Global &&
941               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
942             globalParStats.tot_fish_mess++;
943           }
944         }
945       
946         receivedFinish = processMessages();
947         goto next_thread;
948       }
949     } else if (PacketsWaiting()) {  /* Look for incoming messages */
950       receivedFinish = processMessages();
951     }
952
953     /* Now we are sure that we have some work available */
954     ASSERT(run_queue_hd != END_TSO_QUEUE);
955
956     /* Take a thread from the run queue, if we have work */
957     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
958     IF_DEBUG(sanity,checkTSO(t));
959
960     /* ToDo: write something to the log-file
961     if (RTSflags.ParFlags.granSimStats && !sameThread)
962         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
963
964     CurrentTSO = t;
965     */
966     /* the spark pool for the current PE */
967     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
968
969     IF_DEBUG(scheduler, 
970              belch("--=^ %d threads, %d sparks on [%#x]", 
971                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
972
973 # if 1
974     if (0 && RtsFlags.ParFlags.ParStats.Full && 
975         t && LastTSO && t->id != LastTSO->id && 
976         LastTSO->why_blocked == NotBlocked && 
977         LastTSO->what_next != ThreadComplete) {
978       // if previously scheduled TSO not blocked we have to record the context switch
979       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
980                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
981     }
982
983     if (RtsFlags.ParFlags.ParStats.Full && 
984         (emitSchedule /* forced emit */ ||
985         (t && LastTSO && t->id != LastTSO->id))) {
986       /* 
987          we are running a different TSO, so write a schedule event to log file
988          NB: If we use fair scheduling we also have to write  a deschedule 
989              event for LastTSO; with unfair scheduling we know that the
990              previous tso has blocked whenever we switch to another tso, so
991              we don't need it in GUM for now
992       */
993       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
994                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
995       emitSchedule = rtsFalse;
996     }
997      
998 # endif
999 #else /* !GRAN && !PAR */
1000   
1001     /* grab a thread from the run queue */
1002     ASSERT(run_queue_hd != END_TSO_QUEUE);
1003     t = POP_RUN_QUEUE();
1004     // Sanity check the thread we're about to run.  This can be
1005     // expensive if there is lots of thread switching going on...
1006     IF_DEBUG(sanity,checkTSO(t));
1007 #endif
1008     
1009     cap->r.rCurrentTSO = t;
1010     
1011     /* context switches are now initiated by the timer signal, unless
1012      * the user specified "context switch as often as possible", with
1013      * +RTS -C0
1014      */
1015     if (
1016 #ifdef PROFILING
1017         RtsFlags.ProfFlags.profileInterval == 0 ||
1018 #endif
1019         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1020          && (run_queue_hd != END_TSO_QUEUE
1021              || blocked_queue_hd != END_TSO_QUEUE
1022              || sleeping_queue != END_TSO_QUEUE)))
1023         context_switch = 1;
1024     else
1025         context_switch = 0;
1026
1027     RELEASE_LOCK(&sched_mutex);
1028
1029     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1030                               t->id, t, whatNext_strs[t->what_next]));
1031
1032 #ifdef PROFILING
1033     startHeapProfTimer();
1034 #endif
1035
1036     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1037     /* Run the current thread 
1038      */
1039     switch (cap->r.rCurrentTSO->what_next) {
1040     case ThreadKilled:
1041     case ThreadComplete:
1042         /* Thread already finished, return to scheduler. */
1043         ret = ThreadFinished;
1044         break;
1045     case ThreadEnterGHC:
1046         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1047         break;
1048     case ThreadRunGHC:
1049         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1050         break;
1051     case ThreadEnterInterp:
1052         ret = interpretBCO(cap);
1053         break;
1054     default:
1055       barf("schedule: invalid what_next field");
1056     }
1057     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1058     
1059     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1060 #ifdef PROFILING
1061     stopHeapProfTimer();
1062     CCCS = CCS_SYSTEM;
1063 #endif
1064     
1065     ACQUIRE_LOCK(&sched_mutex);
1066
1067 #ifdef SMP
1068     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1069 #elif !defined(GRAN) && !defined(PAR)
1070     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1071 #endif
1072     t = cap->r.rCurrentTSO;
1073     
1074 #if defined(PAR)
1075     /* HACK 675: if the last thread didn't yield, make sure to print a 
1076        SCHEDULE event to the log file when StgRunning the next thread, even
1077        if it is the same one as before */
1078     LastTSO = t; 
1079     TimeOfLastYield = CURRENT_TIME;
1080 #endif
1081
1082     switch (ret) {
1083     case HeapOverflow:
1084 #if defined(GRAN)
1085       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1086       globalGranStats.tot_heapover++;
1087 #elif defined(PAR)
1088       globalParStats.tot_heapover++;
1089 #endif
1090
1091       // did the task ask for a large block?
1092       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1093           // if so, get one and push it on the front of the nursery.
1094           bdescr *bd;
1095           nat blocks;
1096           
1097           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1098
1099           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1100                                    t->id, t,
1101                                    whatNext_strs[t->what_next], blocks));
1102
1103           // don't do this if it would push us over the
1104           // alloc_blocks_lim limit; we'll GC first.
1105           if (alloc_blocks + blocks < alloc_blocks_lim) {
1106
1107               alloc_blocks += blocks;
1108               bd = allocGroup( blocks );
1109
1110               // link the new group into the list
1111               bd->link = cap->r.rCurrentNursery;
1112               bd->u.back = cap->r.rCurrentNursery->u.back;
1113               if (cap->r.rCurrentNursery->u.back != NULL) {
1114                   cap->r.rCurrentNursery->u.back->link = bd;
1115               } else {
1116                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1117                          g0s0->blocks == cap->r.rNursery);
1118                   cap->r.rNursery = g0s0->blocks = bd;
1119               }           
1120               cap->r.rCurrentNursery->u.back = bd;
1121
1122               // initialise it as a nursery block
1123               bd->step = g0s0;
1124               bd->gen_no = 0;
1125               bd->flags = 0;
1126               bd->free = bd->start;
1127
1128               // don't forget to update the block count in g0s0.
1129               g0s0->n_blocks += blocks;
1130               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1131
1132               // now update the nursery to point to the new block
1133               cap->r.rCurrentNursery = bd;
1134
1135               // we might be unlucky and have another thread get on the
1136               // run queue before us and steal the large block, but in that
1137               // case the thread will just end up requesting another large
1138               // block.
1139               PUSH_ON_RUN_QUEUE(t);
1140               break;
1141           }
1142       }
1143
1144       /* make all the running tasks block on a condition variable,
1145        * maybe set context_switch and wait till they all pile in,
1146        * then have them wait on a GC condition variable.
1147        */
1148       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1149                                t->id, t, whatNext_strs[t->what_next]));
1150       threadPaused(t);
1151 #if defined(GRAN)
1152       ASSERT(!is_on_queue(t,CurrentProc));
1153 #elif defined(PAR)
1154       /* Currently we emit a DESCHEDULE event before GC in GUM.
1155          ToDo: either add separate event to distinguish SYSTEM time from rest
1156                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1157       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1158         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1159                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1160         emitSchedule = rtsTrue;
1161       }
1162 #endif
1163       
1164       ready_to_gc = rtsTrue;
1165       context_switch = 1;               /* stop other threads ASAP */
1166       PUSH_ON_RUN_QUEUE(t);
1167       /* actual GC is done at the end of the while loop */
1168       break;
1169       
1170     case StackOverflow:
1171 #if defined(GRAN)
1172       IF_DEBUG(gran, 
1173                DumpGranEvent(GR_DESCHEDULE, t));
1174       globalGranStats.tot_stackover++;
1175 #elif defined(PAR)
1176       // IF_DEBUG(par, 
1177       // DumpGranEvent(GR_DESCHEDULE, t);
1178       globalParStats.tot_stackover++;
1179 #endif
1180       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1181                                t->id, t, whatNext_strs[t->what_next]));
1182       /* just adjust the stack for this thread, then pop it back
1183        * on the run queue.
1184        */
1185       threadPaused(t);
1186       { 
1187         StgMainThread *m;
1188         /* enlarge the stack */
1189         StgTSO *new_t = threadStackOverflow(t);
1190         
1191         /* This TSO has moved, so update any pointers to it from the
1192          * main thread stack.  It better not be on any other queues...
1193          * (it shouldn't be).
1194          */
1195         for (m = main_threads; m != NULL; m = m->link) {
1196           if (m->tso == t) {
1197             m->tso = new_t;
1198           }
1199         }
1200         threadPaused(new_t);
1201         PUSH_ON_RUN_QUEUE(new_t);
1202       }
1203       break;
1204
1205     case ThreadYielding:
1206 #if defined(GRAN)
1207       IF_DEBUG(gran, 
1208                DumpGranEvent(GR_DESCHEDULE, t));
1209       globalGranStats.tot_yields++;
1210 #elif defined(PAR)
1211       // IF_DEBUG(par, 
1212       // DumpGranEvent(GR_DESCHEDULE, t);
1213       globalParStats.tot_yields++;
1214 #endif
1215       /* put the thread back on the run queue.  Then, if we're ready to
1216        * GC, check whether this is the last task to stop.  If so, wake
1217        * up the GC thread.  getThread will block during a GC until the
1218        * GC is finished.
1219        */
1220       IF_DEBUG(scheduler,
1221                if (t->what_next == ThreadEnterInterp) {
1222                    /* ToDo: or maybe a timer expired when we were in Hugs?
1223                     * or maybe someone hit ctrl-C
1224                     */
1225                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1226                          t->id, t, whatNext_strs[t->what_next]);
1227                } else {
1228                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1229                          t->id, t, whatNext_strs[t->what_next]);
1230                }
1231                );
1232
1233       threadPaused(t);
1234
1235       IF_DEBUG(sanity,
1236                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1237                checkTSO(t));
1238       ASSERT(t->link == END_TSO_QUEUE);
1239 #if defined(GRAN)
1240       ASSERT(!is_on_queue(t,CurrentProc));
1241
1242       IF_DEBUG(sanity,
1243                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1244                checkThreadQsSanity(rtsTrue));
1245 #endif
1246 #if defined(PAR)
1247       if (RtsFlags.ParFlags.doFairScheduling) { 
1248         /* this does round-robin scheduling; good for concurrency */
1249         APPEND_TO_RUN_QUEUE(t);
1250       } else {
1251         /* this does unfair scheduling; good for parallelism */
1252         PUSH_ON_RUN_QUEUE(t);
1253       }
1254 #else
1255       /* this does round-robin scheduling; good for concurrency */
1256       APPEND_TO_RUN_QUEUE(t);
1257 #endif
1258 #if defined(GRAN)
1259       /* add a ContinueThread event to actually process the thread */
1260       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1261                 ContinueThread,
1262                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1263       IF_GRAN_DEBUG(bq, 
1264                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1265                G_EVENTQ(0);
1266                G_CURR_THREADQ(0));
1267 #endif /* GRAN */
1268       break;
1269       
1270     case ThreadBlocked:
1271 #if defined(GRAN)
1272       IF_DEBUG(scheduler,
1273                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1274                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1275                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1276
1277       // ??? needed; should emit block before
1278       IF_DEBUG(gran, 
1279                DumpGranEvent(GR_DESCHEDULE, t)); 
1280       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1281       /*
1282         ngoq Dogh!
1283       ASSERT(procStatus[CurrentProc]==Busy || 
1284               ((procStatus[CurrentProc]==Fetching) && 
1285               (t->block_info.closure!=(StgClosure*)NULL)));
1286       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1287           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1288             procStatus[CurrentProc]==Fetching)) 
1289         procStatus[CurrentProc] = Idle;
1290       */
1291 #elif defined(PAR)
1292       IF_DEBUG(scheduler,
1293                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1294                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1295       IF_PAR_DEBUG(bq,
1296
1297                    if (t->block_info.closure!=(StgClosure*)NULL) 
1298                      print_bq(t->block_info.closure));
1299
1300       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1301       blockThread(t);
1302
1303       /* whatever we schedule next, we must log that schedule */
1304       emitSchedule = rtsTrue;
1305
1306 #else /* !GRAN */
1307       /* don't need to do anything.  Either the thread is blocked on
1308        * I/O, in which case we'll have called addToBlockedQueue
1309        * previously, or it's blocked on an MVar or Blackhole, in which
1310        * case it'll be on the relevant queue already.
1311        */
1312       IF_DEBUG(scheduler,
1313                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1314                printThreadBlockage(t);
1315                fprintf(stderr, "\n"));
1316
1317       /* Only for dumping event to log file 
1318          ToDo: do I need this in GranSim, too?
1319       blockThread(t);
1320       */
1321 #endif
1322       threadPaused(t);
1323       break;
1324       
1325     case ThreadFinished:
1326       /* Need to check whether this was a main thread, and if so, signal
1327        * the task that started it with the return value.  If we have no
1328        * more main threads, we probably need to stop all the tasks until
1329        * we get a new one.
1330        */
1331       /* We also end up here if the thread kills itself with an
1332        * uncaught exception, see Exception.hc.
1333        */
1334       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1335 #if defined(GRAN)
1336       endThread(t, CurrentProc); // clean-up the thread
1337 #elif defined(PAR)
1338       /* For now all are advisory -- HWL */
1339       //if(t->priority==AdvisoryPriority) ??
1340       advisory_thread_count--;
1341       
1342 # ifdef DIST
1343       if(t->dist.priority==RevalPriority)
1344         FinishReval(t);
1345 # endif
1346       
1347       if (RtsFlags.ParFlags.ParStats.Full &&
1348           !RtsFlags.ParFlags.ParStats.Suppressed) 
1349         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1350 #endif
1351       break;
1352       
1353     default:
1354       barf("schedule: invalid thread return code %d", (int)ret);
1355     }
1356
1357 #ifdef PROFILING
1358     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1359         GarbageCollect(GetRoots, rtsTrue);
1360         heapCensus();
1361         performHeapProfile = rtsFalse;
1362         ready_to_gc = rtsFalse; // we already GC'd
1363     }
1364 #endif
1365
1366     if (ready_to_gc 
1367 #ifdef SMP
1368         && allFreeCapabilities() 
1369 #endif
1370         ) {
1371       /* everybody back, start the GC.
1372        * Could do it in this thread, or signal a condition var
1373        * to do it in another thread.  Either way, we need to
1374        * broadcast on gc_pending_cond afterward.
1375        */
1376 #if defined(RTS_SUPPORTS_THREADS)
1377       IF_DEBUG(scheduler,sched_belch("doing GC"));
1378 #endif
1379       GarbageCollect(GetRoots,rtsFalse);
1380       ready_to_gc = rtsFalse;
1381 #ifdef SMP
1382       broadcastCondition(&gc_pending_cond);
1383 #endif
1384 #if defined(GRAN)
1385       /* add a ContinueThread event to continue execution of current thread */
1386       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1387                 ContinueThread,
1388                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1389       IF_GRAN_DEBUG(bq, 
1390                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1391                G_EVENTQ(0);
1392                G_CURR_THREADQ(0));
1393 #endif /* GRAN */
1394     }
1395
1396 #if defined(GRAN)
1397   next_thread:
1398     IF_GRAN_DEBUG(unused,
1399                   print_eventq(EventHd));
1400
1401     event = get_next_event();
1402 #elif defined(PAR)
1403   next_thread:
1404     /* ToDo: wait for next message to arrive rather than busy wait */
1405 #endif /* GRAN */
1406
1407   } /* end of while(1) */
1408
1409   IF_PAR_DEBUG(verbose,
1410                belch("== Leaving schedule() after having received Finish"));
1411 }
1412
1413 /* ---------------------------------------------------------------------------
1414  * Singleton fork(). Do not copy any running threads.
1415  * ------------------------------------------------------------------------- */
1416
1417 StgInt forkProcess(StgTSO* tso) {
1418
1419 #ifndef mingw32_TARGET_OS
1420   pid_t pid;
1421   StgTSO* t,*next;
1422
1423   IF_DEBUG(scheduler,sched_belch("forking!"));
1424
1425   pid = fork();
1426   if (pid) { /* parent */
1427
1428   /* just return the pid */
1429     
1430   } else { /* child */
1431   /* wipe all other threads */
1432   run_queue_hd = tso;
1433   tso->link = END_TSO_QUEUE;
1434
1435   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1436      us is picky about finding the threat still in its queue when
1437      handling the deleteThread() */
1438
1439   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1440     next = t->link;
1441     if (t->id != tso->id) {
1442       deleteThread(t);
1443     }
1444   }
1445   }
1446   return pid;
1447 #else /* mingw32 */
1448   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1449   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1450   return -1;
1451 #endif /* mingw32 */
1452 }
1453
1454 /* ---------------------------------------------------------------------------
1455  * deleteAllThreads():  kill all the live threads.
1456  *
1457  * This is used when we catch a user interrupt (^C), before performing
1458  * any necessary cleanups and running finalizers.
1459  *
1460  * Locks: sched_mutex held.
1461  * ------------------------------------------------------------------------- */
1462    
1463 void deleteAllThreads ( void )
1464 {
1465   StgTSO* t, *next;
1466   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1467   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1468       next = t->global_link;
1469       deleteThread(t);
1470   }      
1471   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1472   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1473   sleeping_queue = END_TSO_QUEUE;
1474 }
1475
1476 /* startThread and  insertThread are now in GranSim.c -- HWL */
1477
1478
1479 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1480 //@subsection Suspend and Resume
1481
1482 /* ---------------------------------------------------------------------------
1483  * Suspending & resuming Haskell threads.
1484  * 
1485  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1486  * its capability before calling the C function.  This allows another
1487  * task to pick up the capability and carry on running Haskell
1488  * threads.  It also means that if the C call blocks, it won't lock
1489  * the whole system.
1490  *
1491  * The Haskell thread making the C call is put to sleep for the
1492  * duration of the call, on the susepended_ccalling_threads queue.  We
1493  * give out a token to the task, which it can use to resume the thread
1494  * on return from the C function.
1495  * ------------------------------------------------------------------------- */
1496    
1497 StgInt
1498 suspendThread( StgRegTable *reg, 
1499                rtsBool concCall
1500 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1501                STG_UNUSED
1502 #endif
1503                )
1504 {
1505   nat tok;
1506   Capability *cap;
1507
1508   /* assume that *reg is a pointer to the StgRegTable part
1509    * of a Capability.
1510    */
1511   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1512
1513   ACQUIRE_LOCK(&sched_mutex);
1514
1515   IF_DEBUG(scheduler,
1516            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1517
1518   threadPaused(cap->r.rCurrentTSO);
1519   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1520   suspended_ccalling_threads = cap->r.rCurrentTSO;
1521
1522 #if defined(RTS_SUPPORTS_THREADS)
1523   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1524 #endif
1525
1526   /* Use the thread ID as the token; it should be unique */
1527   tok = cap->r.rCurrentTSO->id;
1528
1529   /* Hand back capability */
1530   releaseCapability(cap);
1531   
1532 #if defined(RTS_SUPPORTS_THREADS)
1533   /* Preparing to leave the RTS, so ensure there's a native thread/task
1534      waiting to take over.
1535      
1536      ToDo: optimise this and only create a new task if there's a need
1537      for one (i.e., if there's only one Concurrent Haskell thread alive,
1538      there's no need to create a new task).
1539   */
1540   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1541   if (concCall) {
1542     startTask(taskStart);
1543   }
1544 #endif
1545
1546   /* Other threads _might_ be available for execution; signal this */
1547   THREAD_RUNNABLE();
1548   RELEASE_LOCK(&sched_mutex);
1549   return tok; 
1550 }
1551
1552 StgRegTable *
1553 resumeThread( StgInt tok,
1554               rtsBool concCall
1555 #if !defined(RTS_SUPPORTS_THREADS)
1556                STG_UNUSED
1557 #endif
1558               )
1559 {
1560   StgTSO *tso, **prev;
1561   Capability *cap;
1562
1563 #if defined(RTS_SUPPORTS_THREADS)
1564   /* Wait for permission to re-enter the RTS with the result. */
1565   if ( concCall ) {
1566     ACQUIRE_LOCK(&sched_mutex);
1567     grabReturnCapability(&sched_mutex, &cap);
1568   } else {
1569     grabCapability(&cap);
1570   }
1571 #else
1572   grabCapability(&cap);
1573 #endif
1574
1575   /* Remove the thread off of the suspended list */
1576   prev = &suspended_ccalling_threads;
1577   for (tso = suspended_ccalling_threads; 
1578        tso != END_TSO_QUEUE; 
1579        prev = &tso->link, tso = tso->link) {
1580     if (tso->id == (StgThreadID)tok) {
1581       *prev = tso->link;
1582       break;
1583     }
1584   }
1585   if (tso == END_TSO_QUEUE) {
1586     barf("resumeThread: thread not found");
1587   }
1588   tso->link = END_TSO_QUEUE;
1589   /* Reset blocking status */
1590   tso->why_blocked  = NotBlocked;
1591
1592   cap->r.rCurrentTSO = tso;
1593   RELEASE_LOCK(&sched_mutex);
1594   return &cap->r;
1595 }
1596
1597
1598 /* ---------------------------------------------------------------------------
1599  * Static functions
1600  * ------------------------------------------------------------------------ */
1601 static void unblockThread(StgTSO *tso);
1602
1603 /* ---------------------------------------------------------------------------
1604  * Comparing Thread ids.
1605  *
1606  * This is used from STG land in the implementation of the
1607  * instances of Eq/Ord for ThreadIds.
1608  * ------------------------------------------------------------------------ */
1609
1610 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1611
1612   StgThreadID id1 = tso1->id; 
1613   StgThreadID id2 = tso2->id;
1614  
1615   if (id1 < id2) return (-1);
1616   if (id1 > id2) return 1;
1617   return 0;
1618 }
1619
1620 /* ---------------------------------------------------------------------------
1621  * Fetching the ThreadID from an StgTSO.
1622  *
1623  * This is used in the implementation of Show for ThreadIds.
1624  * ------------------------------------------------------------------------ */
1625 int rts_getThreadId(const StgTSO *tso) 
1626 {
1627   return tso->id;
1628 }
1629
1630 #ifdef DEBUG
1631 void labelThread(StgTSO *tso, char *label)
1632 {
1633   int len;
1634   void *buf;
1635
1636   /* Caveat: Once set, you can only set the thread name to "" */
1637   len = strlen(label)+1;
1638   buf = realloc(tso->label,len);
1639   if (buf == NULL) {
1640     fprintf(stderr,"insufficient memory for labelThread!\n");
1641     free(tso->label);
1642     tso->label = NULL;
1643   } else
1644     strncpy(buf,label,len);
1645   tso->label = buf;
1646 }
1647 #endif /* DEBUG */
1648
1649 /* ---------------------------------------------------------------------------
1650    Create a new thread.
1651
1652    The new thread starts with the given stack size.  Before the
1653    scheduler can run, however, this thread needs to have a closure
1654    (and possibly some arguments) pushed on its stack.  See
1655    pushClosure() in Schedule.h.
1656
1657    createGenThread() and createIOThread() (in SchedAPI.h) are
1658    convenient packaged versions of this function.
1659
1660    currently pri (priority) is only used in a GRAN setup -- HWL
1661    ------------------------------------------------------------------------ */
1662 //@cindex createThread
1663 #if defined(GRAN)
1664 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1665 StgTSO *
1666 createThread(nat stack_size, StgInt pri)
1667 {
1668   return createThread_(stack_size, rtsFalse, pri);
1669 }
1670
1671 static StgTSO *
1672 createThread_(nat size, rtsBool have_lock, StgInt pri)
1673 {
1674 #else
1675 StgTSO *
1676 createThread(nat stack_size)
1677 {
1678   return createThread_(stack_size, rtsFalse);
1679 }
1680
1681 static StgTSO *
1682 createThread_(nat size, rtsBool have_lock)
1683 {
1684 #endif
1685
1686     StgTSO *tso;
1687     nat stack_size;
1688
1689     /* First check whether we should create a thread at all */
1690 #if defined(PAR)
1691   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1692   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1693     threadsIgnored++;
1694     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1695           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1696     return END_TSO_QUEUE;
1697   }
1698   threadsCreated++;
1699 #endif
1700
1701 #if defined(GRAN)
1702   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1703 #endif
1704
1705   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1706
1707   /* catch ridiculously small stack sizes */
1708   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1709     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1710   }
1711
1712   stack_size = size - TSO_STRUCT_SIZEW;
1713
1714   tso = (StgTSO *)allocate(size);
1715   TICK_ALLOC_TSO(stack_size, 0);
1716
1717   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1718 #if defined(GRAN)
1719   SET_GRAN_HDR(tso, ThisPE);
1720 #endif
1721   tso->what_next     = ThreadEnterGHC;
1722
1723 #ifdef DEBUG
1724   tso->label = NULL;
1725 #endif
1726
1727   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1728    * protect the increment operation on next_thread_id.
1729    * In future, we could use an atomic increment instead.
1730    */
1731 #ifdef SMP
1732   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1733 #endif
1734   tso->id = next_thread_id++; 
1735 #ifdef SMP
1736   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1737 #endif
1738
1739   tso->why_blocked  = NotBlocked;
1740   tso->blocked_exceptions = NULL;
1741
1742   tso->stack_size   = stack_size;
1743   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1744                               - TSO_STRUCT_SIZEW;
1745   tso->sp           = (P_)&(tso->stack) + stack_size;
1746
1747 #ifdef PROFILING
1748   tso->prof.CCCS = CCS_MAIN;
1749 #endif
1750
1751   /* put a stop frame on the stack */
1752   tso->sp -= sizeofW(StgStopFrame);
1753   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1754   tso->su = (StgUpdateFrame*)tso->sp;
1755
1756   // ToDo: check this
1757 #if defined(GRAN)
1758   tso->link = END_TSO_QUEUE;
1759   /* uses more flexible routine in GranSim */
1760   insertThread(tso, CurrentProc);
1761 #else
1762   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1763    * from its creation
1764    */
1765 #endif
1766
1767 #if defined(GRAN) 
1768   if (RtsFlags.GranFlags.GranSimStats.Full) 
1769     DumpGranEvent(GR_START,tso);
1770 #elif defined(PAR)
1771   if (RtsFlags.ParFlags.ParStats.Full) 
1772     DumpGranEvent(GR_STARTQ,tso);
1773   /* HACk to avoid SCHEDULE 
1774      LastTSO = tso; */
1775 #endif
1776
1777   /* Link the new thread on the global thread list.
1778    */
1779   tso->global_link = all_threads;
1780   all_threads = tso;
1781
1782 #if defined(DIST)
1783   tso->dist.priority = MandatoryPriority; //by default that is...
1784 #endif
1785
1786 #if defined(GRAN)
1787   tso->gran.pri = pri;
1788 # if defined(DEBUG)
1789   tso->gran.magic = TSO_MAGIC; // debugging only
1790 # endif
1791   tso->gran.sparkname   = 0;
1792   tso->gran.startedat   = CURRENT_TIME; 
1793   tso->gran.exported    = 0;
1794   tso->gran.basicblocks = 0;
1795   tso->gran.allocs      = 0;
1796   tso->gran.exectime    = 0;
1797   tso->gran.fetchtime   = 0;
1798   tso->gran.fetchcount  = 0;
1799   tso->gran.blocktime   = 0;
1800   tso->gran.blockcount  = 0;
1801   tso->gran.blockedat   = 0;
1802   tso->gran.globalsparks = 0;
1803   tso->gran.localsparks  = 0;
1804   if (RtsFlags.GranFlags.Light)
1805     tso->gran.clock  = Now; /* local clock */
1806   else
1807     tso->gran.clock  = 0;
1808
1809   IF_DEBUG(gran,printTSO(tso));
1810 #elif defined(PAR)
1811 # if defined(DEBUG)
1812   tso->par.magic = TSO_MAGIC; // debugging only
1813 # endif
1814   tso->par.sparkname   = 0;
1815   tso->par.startedat   = CURRENT_TIME; 
1816   tso->par.exported    = 0;
1817   tso->par.basicblocks = 0;
1818   tso->par.allocs      = 0;
1819   tso->par.exectime    = 0;
1820   tso->par.fetchtime   = 0;
1821   tso->par.fetchcount  = 0;
1822   tso->par.blocktime   = 0;
1823   tso->par.blockcount  = 0;
1824   tso->par.blockedat   = 0;
1825   tso->par.globalsparks = 0;
1826   tso->par.localsparks  = 0;
1827 #endif
1828
1829 #if defined(GRAN)
1830   globalGranStats.tot_threads_created++;
1831   globalGranStats.threads_created_on_PE[CurrentProc]++;
1832   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1833   globalGranStats.tot_sq_probes++;
1834 #elif defined(PAR)
1835   // collect parallel global statistics (currently done together with GC stats)
1836   if (RtsFlags.ParFlags.ParStats.Global &&
1837       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1838     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1839     globalParStats.tot_threads_created++;
1840   }
1841 #endif 
1842
1843 #if defined(GRAN)
1844   IF_GRAN_DEBUG(pri,
1845                 belch("==__ schedule: Created TSO %d (%p);",
1846                       CurrentProc, tso, tso->id));
1847 #elif defined(PAR)
1848     IF_PAR_DEBUG(verbose,
1849                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1850                        tso->id, tso, advisory_thread_count));
1851 #else
1852   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1853                                  tso->id, tso->stack_size));
1854 #endif    
1855   return tso;
1856 }
1857
1858 #if defined(PAR)
1859 /* RFP:
1860    all parallel thread creation calls should fall through the following routine.
1861 */
1862 StgTSO *
1863 createSparkThread(rtsSpark spark) 
1864 { StgTSO *tso;
1865   ASSERT(spark != (rtsSpark)NULL);
1866   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1867   { threadsIgnored++;
1868     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1869           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1870     return END_TSO_QUEUE;
1871   }
1872   else
1873   { threadsCreated++;
1874     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1875     if (tso==END_TSO_QUEUE)     
1876       barf("createSparkThread: Cannot create TSO");
1877 #if defined(DIST)
1878     tso->priority = AdvisoryPriority;
1879 #endif
1880     pushClosure(tso,spark);
1881     PUSH_ON_RUN_QUEUE(tso);
1882     advisory_thread_count++;    
1883   }
1884   return tso;
1885 }
1886 #endif
1887
1888 /*
1889   Turn a spark into a thread.
1890   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1891 */
1892 #if defined(PAR)
1893 //@cindex activateSpark
1894 StgTSO *
1895 activateSpark (rtsSpark spark) 
1896 {
1897   StgTSO *tso;
1898
1899   tso = createSparkThread(spark);
1900   if (RtsFlags.ParFlags.ParStats.Full) {   
1901     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1902     IF_PAR_DEBUG(verbose,
1903                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1904                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1905   }
1906   // ToDo: fwd info on local/global spark to thread -- HWL
1907   // tso->gran.exported =  spark->exported;
1908   // tso->gran.locked =   !spark->global;
1909   // tso->gran.sparkname = spark->name;
1910
1911   return tso;
1912 }
1913 #endif
1914
1915 /* ---------------------------------------------------------------------------
1916  * scheduleThread()
1917  *
1918  * scheduleThread puts a thread on the head of the runnable queue.
1919  * This will usually be done immediately after a thread is created.
1920  * The caller of scheduleThread must create the thread using e.g.
1921  * createThread and push an appropriate closure
1922  * on this thread's stack before the scheduler is invoked.
1923  * ------------------------------------------------------------------------ */
1924
1925 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1926
1927 void
1928 scheduleThread_(StgTSO *tso
1929                , rtsBool createTask
1930 #if !defined(THREADED_RTS)
1931                  STG_UNUSED
1932 #endif
1933               )
1934 {
1935   ACQUIRE_LOCK(&sched_mutex);
1936
1937   /* Put the new thread on the head of the runnable queue.  The caller
1938    * better push an appropriate closure on this thread's stack
1939    * beforehand.  In the SMP case, the thread may start running as
1940    * soon as we release the scheduler lock below.
1941    */
1942   PUSH_ON_RUN_QUEUE(tso);
1943 #if defined(THREADED_RTS)
1944   /* If main() is scheduling a thread, don't bother creating a 
1945    * new task.
1946    */
1947   if ( createTask ) {
1948     startTask(taskStart);
1949   }
1950 #endif
1951   THREAD_RUNNABLE();
1952
1953 #if 0
1954   IF_DEBUG(scheduler,printTSO(tso));
1955 #endif
1956   RELEASE_LOCK(&sched_mutex);
1957 }
1958
1959 void scheduleThread(StgTSO* tso)
1960 {
1961   return scheduleThread_(tso, rtsFalse);
1962 }
1963
1964 void scheduleExtThread(StgTSO* tso)
1965 {
1966   return scheduleThread_(tso, rtsTrue);
1967 }
1968
1969 /* ---------------------------------------------------------------------------
1970  * initScheduler()
1971  *
1972  * Initialise the scheduler.  This resets all the queues - if the
1973  * queues contained any threads, they'll be garbage collected at the
1974  * next pass.
1975  *
1976  * ------------------------------------------------------------------------ */
1977
1978 #ifdef SMP
1979 static void
1980 term_handler(int sig STG_UNUSED)
1981 {
1982   stat_workerStop();
1983   ACQUIRE_LOCK(&term_mutex);
1984   await_death--;
1985   RELEASE_LOCK(&term_mutex);
1986   shutdownThread();
1987 }
1988 #endif
1989
1990 void 
1991 initScheduler(void)
1992 {
1993 #if defined(GRAN)
1994   nat i;
1995
1996   for (i=0; i<=MAX_PROC; i++) {
1997     run_queue_hds[i]      = END_TSO_QUEUE;
1998     run_queue_tls[i]      = END_TSO_QUEUE;
1999     blocked_queue_hds[i]  = END_TSO_QUEUE;
2000     blocked_queue_tls[i]  = END_TSO_QUEUE;
2001     ccalling_threadss[i]  = END_TSO_QUEUE;
2002     sleeping_queue        = END_TSO_QUEUE;
2003   }
2004 #else
2005   run_queue_hd      = END_TSO_QUEUE;
2006   run_queue_tl      = END_TSO_QUEUE;
2007   blocked_queue_hd  = END_TSO_QUEUE;
2008   blocked_queue_tl  = END_TSO_QUEUE;
2009   sleeping_queue    = END_TSO_QUEUE;
2010 #endif 
2011
2012   suspended_ccalling_threads  = END_TSO_QUEUE;
2013
2014   main_threads = NULL;
2015   all_threads  = END_TSO_QUEUE;
2016
2017   context_switch = 0;
2018   interrupted    = 0;
2019
2020   RtsFlags.ConcFlags.ctxtSwitchTicks =
2021       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2022       
2023 #if defined(RTS_SUPPORTS_THREADS)
2024   /* Initialise the mutex and condition variables used by
2025    * the scheduler. */
2026   initMutex(&sched_mutex);
2027   initMutex(&term_mutex);
2028
2029   initCondition(&thread_ready_cond);
2030 #endif
2031   
2032 #if defined(SMP)
2033   initCondition(&gc_pending_cond);
2034 #endif
2035
2036 #if defined(RTS_SUPPORTS_THREADS)
2037   ACQUIRE_LOCK(&sched_mutex);
2038 #endif
2039
2040   /* Install the SIGHUP handler */
2041 #if defined(SMP)
2042   {
2043     struct sigaction action,oact;
2044
2045     action.sa_handler = term_handler;
2046     sigemptyset(&action.sa_mask);
2047     action.sa_flags = 0;
2048     if (sigaction(SIGTERM, &action, &oact) != 0) {
2049       barf("can't install TERM handler");
2050     }
2051   }
2052 #endif
2053
2054   /* A capability holds the state a native thread needs in
2055    * order to execute STG code. At least one capability is
2056    * floating around (only SMP builds have more than one).
2057    */
2058   initCapabilities();
2059   
2060 #if defined(RTS_SUPPORTS_THREADS)
2061     /* start our haskell execution tasks */
2062 # if defined(SMP)
2063     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2064 # else
2065     startTaskManager(0,taskStart);
2066 # endif
2067 #endif
2068
2069 #if /* defined(SMP) ||*/ defined(PAR)
2070   initSparkPools();
2071 #endif
2072
2073 #if defined(RTS_SUPPORTS_THREADS)
2074   RELEASE_LOCK(&sched_mutex);
2075 #endif
2076
2077 }
2078
2079 void
2080 exitScheduler( void )
2081 {
2082 #if defined(RTS_SUPPORTS_THREADS)
2083   stopTaskManager();
2084 #endif
2085 }
2086
2087 /* -----------------------------------------------------------------------------
2088    Managing the per-task allocation areas.
2089    
2090    Each capability comes with an allocation area.  These are
2091    fixed-length block lists into which allocation can be done.
2092
2093    ToDo: no support for two-space collection at the moment???
2094    -------------------------------------------------------------------------- */
2095
2096 /* -----------------------------------------------------------------------------
2097  * waitThread is the external interface for running a new computation
2098  * and waiting for the result.
2099  *
2100  * In the non-SMP case, we create a new main thread, push it on the 
2101  * main-thread stack, and invoke the scheduler to run it.  The
2102  * scheduler will return when the top main thread on the stack has
2103  * completed or died, and fill in the necessary fields of the
2104  * main_thread structure.
2105  *
2106  * In the SMP case, we create a main thread as before, but we then
2107  * create a new condition variable and sleep on it.  When our new
2108  * main thread has completed, we'll be woken up and the status/result
2109  * will be in the main_thread struct.
2110  * -------------------------------------------------------------------------- */
2111
2112 int 
2113 howManyThreadsAvail ( void )
2114 {
2115    int i = 0;
2116    StgTSO* q;
2117    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2118       i++;
2119    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2120       i++;
2121    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2122       i++;
2123    return i;
2124 }
2125
2126 void
2127 finishAllThreads ( void )
2128 {
2129    do {
2130       while (run_queue_hd != END_TSO_QUEUE) {
2131          waitThread ( run_queue_hd, NULL);
2132       }
2133       while (blocked_queue_hd != END_TSO_QUEUE) {
2134          waitThread ( blocked_queue_hd, NULL);
2135       }
2136       while (sleeping_queue != END_TSO_QUEUE) {
2137          waitThread ( blocked_queue_hd, NULL);
2138       }
2139    } while 
2140       (blocked_queue_hd != END_TSO_QUEUE || 
2141        run_queue_hd     != END_TSO_QUEUE ||
2142        sleeping_queue   != END_TSO_QUEUE);
2143 }
2144
2145 SchedulerStatus
2146 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2147
2148   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2149 #if defined(THREADED_RTS)
2150   return waitThread_(tso,ret, rtsFalse);
2151 #else
2152   return waitThread_(tso,ret);
2153 #endif
2154 }
2155
2156 SchedulerStatus
2157 waitThread_(StgTSO *tso,
2158             /*out*/StgClosure **ret
2159 #if defined(THREADED_RTS)
2160             , rtsBool blockWaiting
2161 #endif
2162            )
2163 {
2164   StgMainThread *m;
2165   SchedulerStatus stat;
2166
2167   ACQUIRE_LOCK(&sched_mutex);
2168   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2169   
2170   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2171
2172   m->tso = tso;
2173   m->ret = ret;
2174   m->stat = NoStatus;
2175 #if defined(RTS_SUPPORTS_THREADS)
2176   initCondition(&m->wakeup);
2177 #endif
2178
2179   m->link = main_threads;
2180   main_threads = m;
2181
2182   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2183
2184 #if defined(RTS_SUPPORTS_THREADS)
2185
2186 # if defined(THREADED_RTS)
2187   if (!blockWaiting) {
2188     /* In the threaded case, the OS thread that called main()
2189      * gets to enter the RTS directly without going via another
2190      * task/thread.
2191      */
2192     RELEASE_LOCK(&sched_mutex);
2193     schedule();
2194     ASSERT(m->stat != NoStatus);
2195   } else 
2196 # endif
2197   {
2198     IF_DEBUG(scheduler, sched_belch("sfoo"));
2199     do {
2200       waitCondition(&m->wakeup, &sched_mutex);
2201     } while (m->stat == NoStatus);
2202   }
2203 #elif defined(GRAN)
2204   /* GranSim specific init */
2205   CurrentTSO = m->tso;                // the TSO to run
2206   procStatus[MainProc] = Busy;        // status of main PE
2207   CurrentProc = MainProc;             // PE to run it on
2208
2209   schedule();
2210 #else
2211   RELEASE_LOCK(&sched_mutex);
2212   schedule();
2213   ASSERT(m->stat != NoStatus);
2214 #endif
2215
2216   stat = m->stat;
2217
2218 #if defined(RTS_SUPPORTS_THREADS)
2219   closeCondition(&m->wakeup);
2220 #endif
2221
2222   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2223                               m->tso->id));
2224   free(m);
2225
2226 #if defined(THREADED_RTS)
2227   if (blockWaiting) 
2228 #endif
2229     RELEASE_LOCK(&sched_mutex);
2230
2231   return stat;
2232 }
2233
2234 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2235 //@subsection Run queue code 
2236
2237 #if 0
2238 /* 
2239    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2240        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2241        implicit global variable that has to be correct when calling these
2242        fcts -- HWL 
2243 */
2244
2245 /* Put the new thread on the head of the runnable queue.
2246  * The caller of createThread better push an appropriate closure
2247  * on this thread's stack before the scheduler is invoked.
2248  */
2249 static /* inline */ void
2250 add_to_run_queue(tso)
2251 StgTSO* tso; 
2252 {
2253   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2254   tso->link = run_queue_hd;
2255   run_queue_hd = tso;
2256   if (run_queue_tl == END_TSO_QUEUE) {
2257     run_queue_tl = tso;
2258   }
2259 }
2260
2261 /* Put the new thread at the end of the runnable queue. */
2262 static /* inline */ void
2263 push_on_run_queue(tso)
2264 StgTSO* tso; 
2265 {
2266   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2267   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2268   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2269   if (run_queue_hd == END_TSO_QUEUE) {
2270     run_queue_hd = tso;
2271   } else {
2272     run_queue_tl->link = tso;
2273   }
2274   run_queue_tl = tso;
2275 }
2276
2277 /* 
2278    Should be inlined because it's used very often in schedule.  The tso
2279    argument is actually only needed in GranSim, where we want to have the
2280    possibility to schedule *any* TSO on the run queue, irrespective of the
2281    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2282    the run queue and dequeue the tso, adjusting the links in the queue. 
2283 */
2284 //@cindex take_off_run_queue
2285 static /* inline */ StgTSO*
2286 take_off_run_queue(StgTSO *tso) {
2287   StgTSO *t, *prev;
2288
2289   /* 
2290      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2291
2292      if tso is specified, unlink that tso from the run_queue (doesn't have
2293      to be at the beginning of the queue); GranSim only 
2294   */
2295   if (tso!=END_TSO_QUEUE) {
2296     /* find tso in queue */
2297     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2298          t!=END_TSO_QUEUE && t!=tso;
2299          prev=t, t=t->link) 
2300       /* nothing */ ;
2301     ASSERT(t==tso);
2302     /* now actually dequeue the tso */
2303     if (prev!=END_TSO_QUEUE) {
2304       ASSERT(run_queue_hd!=t);
2305       prev->link = t->link;
2306     } else {
2307       /* t is at beginning of thread queue */
2308       ASSERT(run_queue_hd==t);
2309       run_queue_hd = t->link;
2310     }
2311     /* t is at end of thread queue */
2312     if (t->link==END_TSO_QUEUE) {
2313       ASSERT(t==run_queue_tl);
2314       run_queue_tl = prev;
2315     } else {
2316       ASSERT(run_queue_tl!=t);
2317     }
2318     t->link = END_TSO_QUEUE;
2319   } else {
2320     /* take tso from the beginning of the queue; std concurrent code */
2321     t = run_queue_hd;
2322     if (t != END_TSO_QUEUE) {
2323       run_queue_hd = t->link;
2324       t->link = END_TSO_QUEUE;
2325       if (run_queue_hd == END_TSO_QUEUE) {
2326         run_queue_tl = END_TSO_QUEUE;
2327       }
2328     }
2329   }
2330   return t;
2331 }
2332
2333 #endif /* 0 */
2334
2335 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2336 //@subsection Garbage Collextion Routines
2337
2338 /* ---------------------------------------------------------------------------
2339    Where are the roots that we know about?
2340
2341         - all the threads on the runnable queue
2342         - all the threads on the blocked queue
2343         - all the threads on the sleeping queue
2344         - all the thread currently executing a _ccall_GC
2345         - all the "main threads"
2346      
2347    ------------------------------------------------------------------------ */
2348
2349 /* This has to be protected either by the scheduler monitor, or by the
2350         garbage collection monitor (probably the latter).
2351         KH @ 25/10/99
2352 */
2353
2354 void
2355 GetRoots(evac_fn evac)
2356 {
2357 #if defined(GRAN)
2358   {
2359     nat i;
2360     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2361       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2362           evac((StgClosure **)&run_queue_hds[i]);
2363       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2364           evac((StgClosure **)&run_queue_tls[i]);
2365       
2366       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2367           evac((StgClosure **)&blocked_queue_hds[i]);
2368       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2369           evac((StgClosure **)&blocked_queue_tls[i]);
2370       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2371           evac((StgClosure **)&ccalling_threads[i]);
2372     }
2373   }
2374
2375   markEventQueue();
2376
2377 #else /* !GRAN */
2378   if (run_queue_hd != END_TSO_QUEUE) {
2379       ASSERT(run_queue_tl != END_TSO_QUEUE);
2380       evac((StgClosure **)&run_queue_hd);
2381       evac((StgClosure **)&run_queue_tl);
2382   }
2383   
2384   if (blocked_queue_hd != END_TSO_QUEUE) {
2385       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2386       evac((StgClosure **)&blocked_queue_hd);
2387       evac((StgClosure **)&blocked_queue_tl);
2388   }
2389   
2390   if (sleeping_queue != END_TSO_QUEUE) {
2391       evac((StgClosure **)&sleeping_queue);
2392   }
2393 #endif 
2394
2395   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2396       evac((StgClosure **)&suspended_ccalling_threads);
2397   }
2398
2399 #if defined(PAR) || defined(GRAN)
2400   markSparkQueue(evac);
2401 #endif
2402 }
2403
2404 /* -----------------------------------------------------------------------------
2405    performGC
2406
2407    This is the interface to the garbage collector from Haskell land.
2408    We provide this so that external C code can allocate and garbage
2409    collect when called from Haskell via _ccall_GC.
2410
2411    It might be useful to provide an interface whereby the programmer
2412    can specify more roots (ToDo).
2413    
2414    This needs to be protected by the GC condition variable above.  KH.
2415    -------------------------------------------------------------------------- */
2416
2417 void (*extra_roots)(evac_fn);
2418
2419 void
2420 performGC(void)
2421 {
2422   /* Obligated to hold this lock upon entry */
2423   ACQUIRE_LOCK(&sched_mutex);
2424   GarbageCollect(GetRoots,rtsFalse);
2425   RELEASE_LOCK(&sched_mutex);
2426 }
2427
2428 void
2429 performMajorGC(void)
2430 {
2431   ACQUIRE_LOCK(&sched_mutex);
2432   GarbageCollect(GetRoots,rtsTrue);
2433   RELEASE_LOCK(&sched_mutex);
2434 }
2435
2436 static void
2437 AllRoots(evac_fn evac)
2438 {
2439     GetRoots(evac);             // the scheduler's roots
2440     extra_roots(evac);          // the user's roots
2441 }
2442
2443 void
2444 performGCWithRoots(void (*get_roots)(evac_fn))
2445 {
2446   ACQUIRE_LOCK(&sched_mutex);
2447   extra_roots = get_roots;
2448   GarbageCollect(AllRoots,rtsFalse);
2449   RELEASE_LOCK(&sched_mutex);
2450 }
2451
2452 /* -----------------------------------------------------------------------------
2453    Stack overflow
2454
2455    If the thread has reached its maximum stack size, then raise the
2456    StackOverflow exception in the offending thread.  Otherwise
2457    relocate the TSO into a larger chunk of memory and adjust its stack
2458    size appropriately.
2459    -------------------------------------------------------------------------- */
2460
2461 static StgTSO *
2462 threadStackOverflow(StgTSO *tso)
2463 {
2464   nat new_stack_size, new_tso_size, diff, stack_words;
2465   StgPtr new_sp;
2466   StgTSO *dest;
2467
2468   IF_DEBUG(sanity,checkTSO(tso));
2469   if (tso->stack_size >= tso->max_stack_size) {
2470
2471     IF_DEBUG(gc,
2472              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2473                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2474              /* If we're debugging, just print out the top of the stack */
2475              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2476                                               tso->sp+64)));
2477
2478     /* Send this thread the StackOverflow exception */
2479     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2480     return tso;
2481   }
2482
2483   /* Try to double the current stack size.  If that takes us over the
2484    * maximum stack size for this thread, then use the maximum instead.
2485    * Finally round up so the TSO ends up as a whole number of blocks.
2486    */
2487   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2488   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2489                                        TSO_STRUCT_SIZE)/sizeof(W_);
2490   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2491   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2492
2493   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2494
2495   dest = (StgTSO *)allocate(new_tso_size);
2496   TICK_ALLOC_TSO(new_stack_size,0);
2497
2498   /* copy the TSO block and the old stack into the new area */
2499   memcpy(dest,tso,TSO_STRUCT_SIZE);
2500   stack_words = tso->stack + tso->stack_size - tso->sp;
2501   new_sp = (P_)dest + new_tso_size - stack_words;
2502   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2503
2504   /* relocate the stack pointers... */
2505   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2506   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2507   dest->sp    = new_sp;
2508   dest->stack_size = new_stack_size;
2509         
2510   /* and relocate the update frame list */
2511   relocate_stack(dest, diff);
2512
2513   /* Mark the old TSO as relocated.  We have to check for relocated
2514    * TSOs in the garbage collector and any primops that deal with TSOs.
2515    *
2516    * It's important to set the sp and su values to just beyond the end
2517    * of the stack, so we don't attempt to scavenge any part of the
2518    * dead TSO's stack.
2519    */
2520   tso->what_next = ThreadRelocated;
2521   tso->link = dest;
2522   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2523   tso->su = (StgUpdateFrame *)tso->sp;
2524   tso->why_blocked = NotBlocked;
2525   dest->mut_link = NULL;
2526
2527   IF_PAR_DEBUG(verbose,
2528                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2529                      tso->id, tso, tso->stack_size);
2530                /* If we're debugging, just print out the top of the stack */
2531                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2532                                                 tso->sp+64)));
2533   
2534   IF_DEBUG(sanity,checkTSO(tso));
2535 #if 0
2536   IF_DEBUG(scheduler,printTSO(dest));
2537 #endif
2538
2539   return dest;
2540 }
2541
2542 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2543 //@subsection Blocking Queue Routines
2544
2545 /* ---------------------------------------------------------------------------
2546    Wake up a queue that was blocked on some resource.
2547    ------------------------------------------------------------------------ */
2548
2549 #if defined(GRAN)
2550 static inline void
2551 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2552 {
2553 }
2554 #elif defined(PAR)
2555 static inline void
2556 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2557 {
2558   /* write RESUME events to log file and
2559      update blocked and fetch time (depending on type of the orig closure) */
2560   if (RtsFlags.ParFlags.ParStats.Full) {
2561     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2562                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2563                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2564     if (EMPTY_RUN_QUEUE())
2565       emitSchedule = rtsTrue;
2566
2567     switch (get_itbl(node)->type) {
2568         case FETCH_ME_BQ:
2569           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2570           break;
2571         case RBH:
2572         case FETCH_ME:
2573         case BLACKHOLE_BQ:
2574           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2575           break;
2576 #ifdef DIST
2577         case MVAR:
2578           break;
2579 #endif    
2580         default:
2581           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2582         }
2583       }
2584 }
2585 #endif
2586
2587 #if defined(GRAN)
2588 static StgBlockingQueueElement *
2589 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2590 {
2591     StgTSO *tso;
2592     PEs node_loc, tso_loc;
2593
2594     node_loc = where_is(node); // should be lifted out of loop
2595     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2596     tso_loc = where_is((StgClosure *)tso);
2597     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2598       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2599       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2600       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2601       // insertThread(tso, node_loc);
2602       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2603                 ResumeThread,
2604                 tso, node, (rtsSpark*)NULL);
2605       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2606       // len_local++;
2607       // len++;
2608     } else { // TSO is remote (actually should be FMBQ)
2609       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2610                                   RtsFlags.GranFlags.Costs.gunblocktime +
2611                                   RtsFlags.GranFlags.Costs.latency;
2612       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2613                 UnblockThread,
2614                 tso, node, (rtsSpark*)NULL);
2615       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2616       // len++;
2617     }
2618     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2619     IF_GRAN_DEBUG(bq,
2620                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2621                           (node_loc==tso_loc ? "Local" : "Global"), 
2622                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2623     tso->block_info.closure = NULL;
2624     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2625                              tso->id, tso));
2626 }
2627 #elif defined(PAR)
2628 static StgBlockingQueueElement *
2629 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2630 {
2631     StgBlockingQueueElement *next;
2632
2633     switch (get_itbl(bqe)->type) {
2634     case TSO:
2635       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2636       /* if it's a TSO just push it onto the run_queue */
2637       next = bqe->link;
2638       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2639       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2640       THREAD_RUNNABLE();
2641       unblockCount(bqe, node);
2642       /* reset blocking status after dumping event */
2643       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2644       break;
2645
2646     case BLOCKED_FETCH:
2647       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2648       next = bqe->link;
2649       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2650       PendingFetches = (StgBlockedFetch *)bqe;
2651       break;
2652
2653 # if defined(DEBUG)
2654       /* can ignore this case in a non-debugging setup; 
2655          see comments on RBHSave closures above */
2656     case CONSTR:
2657       /* check that the closure is an RBHSave closure */
2658       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2659              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2660              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2661       break;
2662
2663     default:
2664       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2665            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2666            (StgClosure *)bqe);
2667 # endif
2668     }
2669   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2670   return next;
2671 }
2672
2673 #else /* !GRAN && !PAR */
2674 static StgTSO *
2675 unblockOneLocked(StgTSO *tso)
2676 {
2677   StgTSO *next;
2678
2679   ASSERT(get_itbl(tso)->type == TSO);
2680   ASSERT(tso->why_blocked != NotBlocked);
2681   tso->why_blocked = NotBlocked;
2682   next = tso->link;
2683   PUSH_ON_RUN_QUEUE(tso);
2684   THREAD_RUNNABLE();
2685   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2686   return next;
2687 }
2688 #endif
2689
2690 #if defined(GRAN) || defined(PAR)
2691 inline StgBlockingQueueElement *
2692 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2693 {
2694   ACQUIRE_LOCK(&sched_mutex);
2695   bqe = unblockOneLocked(bqe, node);
2696   RELEASE_LOCK(&sched_mutex);
2697   return bqe;
2698 }
2699 #else
2700 inline StgTSO *
2701 unblockOne(StgTSO *tso)
2702 {
2703   ACQUIRE_LOCK(&sched_mutex);
2704   tso = unblockOneLocked(tso);
2705   RELEASE_LOCK(&sched_mutex);
2706   return tso;
2707 }
2708 #endif
2709
2710 #if defined(GRAN)
2711 void 
2712 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2713 {
2714   StgBlockingQueueElement *bqe;
2715   PEs node_loc;
2716   nat len = 0; 
2717
2718   IF_GRAN_DEBUG(bq, 
2719                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2720                       node, CurrentProc, CurrentTime[CurrentProc], 
2721                       CurrentTSO->id, CurrentTSO));
2722
2723   node_loc = where_is(node);
2724
2725   ASSERT(q == END_BQ_QUEUE ||
2726          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2727          get_itbl(q)->type == CONSTR); // closure (type constructor)
2728   ASSERT(is_unique(node));
2729
2730   /* FAKE FETCH: magically copy the node to the tso's proc;
2731      no Fetch necessary because in reality the node should not have been 
2732      moved to the other PE in the first place
2733   */
2734   if (CurrentProc!=node_loc) {
2735     IF_GRAN_DEBUG(bq, 
2736                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2737                         node, node_loc, CurrentProc, CurrentTSO->id, 
2738                         // CurrentTSO, where_is(CurrentTSO),
2739                         node->header.gran.procs));
2740     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2741     IF_GRAN_DEBUG(bq, 
2742                   belch("## new bitmask of node %p is %#x",
2743                         node, node->header.gran.procs));
2744     if (RtsFlags.GranFlags.GranSimStats.Global) {
2745       globalGranStats.tot_fake_fetches++;
2746     }
2747   }
2748
2749   bqe = q;
2750   // ToDo: check: ASSERT(CurrentProc==node_loc);
2751   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2752     //next = bqe->link;
2753     /* 
2754        bqe points to the current element in the queue
2755        next points to the next element in the queue
2756     */
2757     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2758     //tso_loc = where_is(tso);
2759     len++;
2760     bqe = unblockOneLocked(bqe, node);
2761   }
2762
2763   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2764      the closure to make room for the anchor of the BQ */
2765   if (bqe!=END_BQ_QUEUE) {
2766     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2767     /*
2768     ASSERT((info_ptr==&RBH_Save_0_info) ||
2769            (info_ptr==&RBH_Save_1_info) ||
2770            (info_ptr==&RBH_Save_2_info));
2771     */
2772     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2773     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2774     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2775
2776     IF_GRAN_DEBUG(bq,
2777                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2778                         node, info_type(node)));
2779   }
2780
2781   /* statistics gathering */
2782   if (RtsFlags.GranFlags.GranSimStats.Global) {
2783     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2784     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2785     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2786     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2787   }
2788   IF_GRAN_DEBUG(bq,
2789                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2790                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2791 }
2792 #elif defined(PAR)
2793 void 
2794 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2795 {
2796   StgBlockingQueueElement *bqe;
2797
2798   ACQUIRE_LOCK(&sched_mutex);
2799
2800   IF_PAR_DEBUG(verbose, 
2801                belch("##-_ AwBQ for node %p on [%x]: ",
2802                      node, mytid));
2803 #ifdef DIST  
2804   //RFP
2805   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2806     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2807     return;
2808   }
2809 #endif
2810   
2811   ASSERT(q == END_BQ_QUEUE ||
2812          get_itbl(q)->type == TSO ||           
2813          get_itbl(q)->type == BLOCKED_FETCH || 
2814          get_itbl(q)->type == CONSTR); 
2815
2816   bqe = q;
2817   while (get_itbl(bqe)->type==TSO || 
2818          get_itbl(bqe)->type==BLOCKED_FETCH) {
2819     bqe = unblockOneLocked(bqe, node);
2820   }
2821   RELEASE_LOCK(&sched_mutex);
2822 }
2823
2824 #else   /* !GRAN && !PAR */
2825 void
2826 awakenBlockedQueue(StgTSO *tso)
2827 {
2828   ACQUIRE_LOCK(&sched_mutex);
2829   while (tso != END_TSO_QUEUE) {
2830     tso = unblockOneLocked(tso);
2831   }
2832   RELEASE_LOCK(&sched_mutex);
2833 }
2834 #endif
2835
2836 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2837 //@subsection Exception Handling Routines
2838
2839 /* ---------------------------------------------------------------------------
2840    Interrupt execution
2841    - usually called inside a signal handler so it mustn't do anything fancy.   
2842    ------------------------------------------------------------------------ */
2843
2844 void
2845 interruptStgRts(void)
2846 {
2847     interrupted    = 1;
2848     context_switch = 1;
2849 }
2850
2851 /* -----------------------------------------------------------------------------
2852    Unblock a thread
2853
2854    This is for use when we raise an exception in another thread, which
2855    may be blocked.
2856    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2857    -------------------------------------------------------------------------- */
2858
2859 #if defined(GRAN) || defined(PAR)
2860 /*
2861   NB: only the type of the blocking queue is different in GranSim and GUM
2862       the operations on the queue-elements are the same
2863       long live polymorphism!
2864
2865   Locks: sched_mutex is held upon entry and exit.
2866
2867 */
2868 static void
2869 unblockThread(StgTSO *tso)
2870 {
2871   StgBlockingQueueElement *t, **last;
2872
2873   switch (tso->why_blocked) {
2874
2875   case NotBlocked:
2876     return;  /* not blocked */
2877
2878   case BlockedOnMVar:
2879     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2880     {
2881       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2882       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2883
2884       last = (StgBlockingQueueElement **)&mvar->head;
2885       for (t = (StgBlockingQueueElement *)mvar->head; 
2886            t != END_BQ_QUEUE; 
2887            last = &t->link, last_tso = t, t = t->link) {
2888         if (t == (StgBlockingQueueElement *)tso) {
2889           *last = (StgBlockingQueueElement *)tso->link;
2890           if (mvar->tail == tso) {
2891             mvar->tail = (StgTSO *)last_tso;
2892           }
2893           goto done;
2894         }
2895       }
2896       barf("unblockThread (MVAR): TSO not found");
2897     }
2898
2899   case BlockedOnBlackHole:
2900     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2901     {
2902       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2903
2904       last = &bq->blocking_queue;
2905       for (t = bq->blocking_queue; 
2906            t != END_BQ_QUEUE; 
2907            last = &t->link, t = t->link) {
2908         if (t == (StgBlockingQueueElement *)tso) {
2909           *last = (StgBlockingQueueElement *)tso->link;
2910           goto done;
2911         }
2912       }
2913       barf("unblockThread (BLACKHOLE): TSO not found");
2914     }
2915
2916   case BlockedOnException:
2917     {
2918       StgTSO *target  = tso->block_info.tso;
2919
2920       ASSERT(get_itbl(target)->type == TSO);
2921
2922       if (target->what_next == ThreadRelocated) {
2923           target = target->link;
2924           ASSERT(get_itbl(target)->type == TSO);
2925       }
2926
2927       ASSERT(target->blocked_exceptions != NULL);
2928
2929       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2930       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2931            t != END_BQ_QUEUE; 
2932            last = &t->link, t = t->link) {
2933         ASSERT(get_itbl(t)->type == TSO);
2934         if (t == (StgBlockingQueueElement *)tso) {
2935           *last = (StgBlockingQueueElement *)tso->link;
2936           goto done;
2937         }
2938       }
2939       barf("unblockThread (Exception): TSO not found");
2940     }
2941
2942   case BlockedOnRead:
2943   case BlockedOnWrite:
2944     {
2945       /* take TSO off blocked_queue */
2946       StgBlockingQueueElement *prev = NULL;
2947       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2948            prev = t, t = t->link) {
2949         if (t == (StgBlockingQueueElement *)tso) {
2950           if (prev == NULL) {
2951             blocked_queue_hd = (StgTSO *)t->link;
2952             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2953               blocked_queue_tl = END_TSO_QUEUE;
2954             }
2955           } else {
2956             prev->link = t->link;
2957             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2958               blocked_queue_tl = (StgTSO *)prev;
2959             }
2960           }
2961           goto done;
2962         }
2963       }
2964       barf("unblockThread (I/O): TSO not found");
2965     }
2966
2967   case BlockedOnDelay:
2968     {
2969       /* take TSO off sleeping_queue */
2970       StgBlockingQueueElement *prev = NULL;
2971       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2972            prev = t, t = t->link) {
2973         if (t == (StgBlockingQueueElement *)tso) {
2974           if (prev == NULL) {
2975             sleeping_queue = (StgTSO *)t->link;
2976           } else {
2977             prev->link = t->link;
2978           }
2979           goto done;
2980         }
2981       }
2982       barf("unblockThread (I/O): TSO not found");
2983     }
2984
2985   default:
2986     barf("unblockThread");
2987   }
2988
2989  done:
2990   tso->link = END_TSO_QUEUE;
2991   tso->why_blocked = NotBlocked;
2992   tso->block_info.closure = NULL;
2993   PUSH_ON_RUN_QUEUE(tso);
2994 }
2995 #else
2996 static void
2997 unblockThread(StgTSO *tso)
2998 {
2999   StgTSO *t, **last;
3000   
3001   /* To avoid locking unnecessarily. */
3002   if (tso->why_blocked == NotBlocked) {
3003     return;
3004   }
3005
3006   switch (tso->why_blocked) {
3007
3008   case BlockedOnMVar:
3009     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3010     {
3011       StgTSO *last_tso = END_TSO_QUEUE;
3012       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3013
3014       last = &mvar->head;
3015       for (t = mvar->head; t != END_TSO_QUEUE; 
3016            last = &t->link, last_tso = t, t = t->link) {
3017         if (t == tso) {
3018           *last = tso->link;
3019           if (mvar->tail == tso) {
3020             mvar->tail = last_tso;
3021           }
3022           goto done;
3023         }
3024       }
3025       barf("unblockThread (MVAR): TSO not found");
3026     }
3027
3028   case BlockedOnBlackHole:
3029     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3030     {
3031       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3032
3033       last = &bq->blocking_queue;
3034       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3035            last = &t->link, t = t->link) {
3036         if (t == tso) {
3037           *last = tso->link;
3038           goto done;
3039         }
3040       }
3041       barf("unblockThread (BLACKHOLE): TSO not found");
3042     }
3043
3044   case BlockedOnException:
3045     {
3046       StgTSO *target  = tso->block_info.tso;
3047
3048       ASSERT(get_itbl(target)->type == TSO);
3049
3050       while (target->what_next == ThreadRelocated) {
3051           target = target->link;
3052           ASSERT(get_itbl(target)->type == TSO);
3053       }
3054       
3055       ASSERT(target->blocked_exceptions != NULL);
3056
3057       last = &target->blocked_exceptions;
3058       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3059            last = &t->link, t = t->link) {
3060         ASSERT(get_itbl(t)->type == TSO);
3061         if (t == tso) {
3062           *last = tso->link;
3063           goto done;
3064         }
3065       }
3066       barf("unblockThread (Exception): TSO not found");
3067     }
3068
3069   case BlockedOnRead:
3070   case BlockedOnWrite:
3071     {
3072       StgTSO *prev = NULL;
3073       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3074            prev = t, t = t->link) {
3075         if (t == tso) {
3076           if (prev == NULL) {
3077             blocked_queue_hd = t->link;
3078             if (blocked_queue_tl == t) {
3079               blocked_queue_tl = END_TSO_QUEUE;
3080             }
3081           } else {
3082             prev->link = t->link;
3083             if (blocked_queue_tl == t) {
3084               blocked_queue_tl = prev;
3085             }
3086           }
3087           goto done;
3088         }
3089       }
3090       barf("unblockThread (I/O): TSO not found");
3091     }
3092
3093   case BlockedOnDelay:
3094     {
3095       StgTSO *prev = NULL;
3096       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3097            prev = t, t = t->link) {
3098         if (t == tso) {
3099           if (prev == NULL) {
3100             sleeping_queue = t->link;
3101           } else {
3102             prev->link = t->link;
3103           }
3104           goto done;
3105         }
3106       }
3107       barf("unblockThread (I/O): TSO not found");
3108     }
3109
3110   default:
3111     barf("unblockThread");
3112   }
3113
3114  done:
3115   tso->link = END_TSO_QUEUE;
3116   tso->why_blocked = NotBlocked;
3117   tso->block_info.closure = NULL;
3118   PUSH_ON_RUN_QUEUE(tso);
3119 }
3120 #endif
3121
3122 /* -----------------------------------------------------------------------------
3123  * raiseAsync()
3124  *
3125  * The following function implements the magic for raising an
3126  * asynchronous exception in an existing thread.
3127  *
3128  * We first remove the thread from any queue on which it might be
3129  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3130  *
3131  * We strip the stack down to the innermost CATCH_FRAME, building
3132  * thunks in the heap for all the active computations, so they can 
3133  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3134  * an application of the handler to the exception, and push it on
3135  * the top of the stack.
3136  * 
3137  * How exactly do we save all the active computations?  We create an
3138  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3139  * AP_UPDs pushes everything from the corresponding update frame
3140  * upwards onto the stack.  (Actually, it pushes everything up to the
3141  * next update frame plus a pointer to the next AP_UPD object.
3142  * Entering the next AP_UPD object pushes more onto the stack until we
3143  * reach the last AP_UPD object - at which point the stack should look
3144  * exactly as it did when we killed the TSO and we can continue
3145  * execution by entering the closure on top of the stack.
3146  *
3147  * We can also kill a thread entirely - this happens if either (a) the 
3148  * exception passed to raiseAsync is NULL, or (b) there's no
3149  * CATCH_FRAME on the stack.  In either case, we strip the entire
3150  * stack and replace the thread with a zombie.
3151  *
3152  * Locks: sched_mutex held upon entry nor exit.
3153  *
3154  * -------------------------------------------------------------------------- */
3155  
3156 void 
3157 deleteThread(StgTSO *tso)
3158 {
3159   raiseAsync(tso,NULL);
3160 }
3161
3162 void
3163 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3164 {
3165   /* When raising async exs from contexts where sched_mutex isn't held;
3166      use raiseAsyncWithLock(). */
3167   ACQUIRE_LOCK(&sched_mutex);
3168   raiseAsync(tso,exception);
3169   RELEASE_LOCK(&sched_mutex);
3170 }
3171
3172 void
3173 raiseAsync(StgTSO *tso, StgClosure *exception)
3174 {
3175   StgUpdateFrame* su = tso->su;
3176   StgPtr          sp = tso->sp;
3177   
3178   /* Thread already dead? */
3179   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3180     return;
3181   }
3182
3183   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3184
3185   /* Remove it from any blocking queues */
3186   unblockThread(tso);
3187
3188   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3189   /* The stack freezing code assumes there's a closure pointer on
3190    * the top of the stack.  This isn't always the case with compiled
3191    * code, so we have to push a dummy closure on the top which just
3192    * returns to the next return address on the stack.
3193    */
3194   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3195     *(--sp) = (W_)&stg_dummy_ret_closure;
3196   }
3197
3198   while (1) {
3199     nat words = ((P_)su - (P_)sp) - 1;
3200     nat i;
3201     StgAP_UPD * ap;
3202
3203     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3204      * then build the THUNK raise(exception), and leave it on
3205      * top of the CATCH_FRAME ready to enter.
3206      */
3207     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3208 #ifdef PROFILING
3209       StgCatchFrame *cf = (StgCatchFrame *)su;
3210 #endif
3211       StgClosure *raise;
3212
3213       /* we've got an exception to raise, so let's pass it to the
3214        * handler in this frame.
3215        */
3216       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3217       TICK_ALLOC_SE_THK(1,0);
3218       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3219       raise->payload[0] = exception;
3220
3221       /* throw away the stack from Sp up to the CATCH_FRAME.
3222        */
3223       sp = (P_)su - 1;
3224
3225       /* Ensure that async excpetions are blocked now, so we don't get
3226        * a surprise exception before we get around to executing the
3227        * handler.
3228        */
3229       if (tso->blocked_exceptions == NULL) {
3230           tso->blocked_exceptions = END_TSO_QUEUE;
3231       }
3232
3233       /* Put the newly-built THUNK on top of the stack, ready to execute
3234        * when the thread restarts.
3235        */
3236       sp[0] = (W_)raise;
3237       tso->sp = sp;
3238       tso->su = su;
3239       tso->what_next = ThreadEnterGHC;
3240       IF_DEBUG(sanity, checkTSO(tso));
3241       return;
3242     }
3243
3244     /* First build an AP_UPD consisting of the stack chunk above the
3245      * current update frame, with the top word on the stack as the
3246      * fun field.
3247      */
3248     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3249     
3250     ASSERT(words >= 0);
3251     
3252     ap->n_args = words;
3253     ap->fun    = (StgClosure *)sp[0];
3254     sp++;
3255     for(i=0; i < (nat)words; ++i) {
3256       ap->payload[i] = (StgClosure *)*sp++;
3257     }
3258     
3259     switch (get_itbl(su)->type) {
3260       
3261     case UPDATE_FRAME:
3262       {
3263         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3264         TICK_ALLOC_UP_THK(words+1,0);
3265         
3266         IF_DEBUG(scheduler,
3267                  fprintf(stderr,  "scheduler: Updating ");
3268                  printPtr((P_)su->updatee); 
3269                  fprintf(stderr,  " with ");
3270                  printObj((StgClosure *)ap);
3271                  );
3272         
3273         /* Replace the updatee with an indirection - happily
3274          * this will also wake up any threads currently
3275          * waiting on the result.
3276          *
3277          * Warning: if we're in a loop, more than one update frame on
3278          * the stack may point to the same object.  Be careful not to
3279          * overwrite an IND_OLDGEN in this case, because we'll screw
3280          * up the mutable lists.  To be on the safe side, don't
3281          * overwrite any kind of indirection at all.  See also
3282          * threadSqueezeStack in GC.c, where we have to make a similar
3283          * check.
3284          */
3285         if (!closure_IND(su->updatee)) {
3286             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3287         }
3288         su = su->link;
3289         sp += sizeofW(StgUpdateFrame) -1;
3290         sp[0] = (W_)ap; /* push onto stack */
3291         break;
3292       }
3293
3294     case CATCH_FRAME:
3295       {
3296         StgCatchFrame *cf = (StgCatchFrame *)su;
3297         StgClosure* o;
3298         
3299         /* We want a PAP, not an AP_UPD.  Fortunately, the
3300          * layout's the same.
3301          */
3302         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3303         TICK_ALLOC_UPD_PAP(words+1,0);
3304         
3305         /* now build o = FUN(catch,ap,handler) */
3306         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3307         TICK_ALLOC_FUN(2,0);
3308         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3309         o->payload[0] = (StgClosure *)ap;
3310         o->payload[1] = cf->handler;
3311         
3312         IF_DEBUG(scheduler,
3313                  fprintf(stderr,  "scheduler: Built ");
3314                  printObj((StgClosure *)o);
3315                  );
3316         
3317         /* pop the old handler and put o on the stack */
3318         su = cf->link;
3319         sp += sizeofW(StgCatchFrame) - 1;
3320         sp[0] = (W_)o;
3321         break;
3322       }
3323       
3324     case SEQ_FRAME:
3325       {
3326         StgSeqFrame *sf = (StgSeqFrame *)su;
3327         StgClosure* o;
3328         
3329         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3330         TICK_ALLOC_UPD_PAP(words+1,0);
3331         
3332         /* now build o = FUN(seq,ap) */
3333         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3334         TICK_ALLOC_SE_THK(1,0);
3335         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3336         o->payload[0] = (StgClosure *)ap;
3337         
3338         IF_DEBUG(scheduler,
3339                  fprintf(stderr,  "scheduler: Built ");
3340                  printObj((StgClosure *)o);
3341                  );
3342         
3343         /* pop the old handler and put o on the stack */
3344         su = sf->link;
3345         sp += sizeofW(StgSeqFrame) - 1;
3346         sp[0] = (W_)o;
3347         break;
3348       }
3349       
3350     case STOP_FRAME:
3351       /* We've stripped the entire stack, the thread is now dead. */
3352       sp += sizeofW(StgStopFrame) - 1;
3353       sp[0] = (W_)exception;    /* save the exception */
3354       tso->what_next = ThreadKilled;
3355       tso->su = (StgUpdateFrame *)(sp+1);
3356       tso->sp = sp;
3357       return;
3358
3359     default:
3360       barf("raiseAsync");
3361     }
3362   }
3363   barf("raiseAsync");
3364 }
3365
3366 /* -----------------------------------------------------------------------------
3367    resurrectThreads is called after garbage collection on the list of
3368    threads found to be garbage.  Each of these threads will be woken
3369    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3370    on an MVar, or NonTermination if the thread was blocked on a Black
3371    Hole.
3372
3373    Locks: sched_mutex isn't held upon entry nor exit.
3374    -------------------------------------------------------------------------- */
3375
3376 void
3377 resurrectThreads( StgTSO *threads )
3378 {
3379   StgTSO *tso, *next;
3380
3381   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3382     next = tso->global_link;
3383     tso->global_link = all_threads;
3384     all_threads = tso;
3385     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3386
3387     switch (tso->why_blocked) {
3388     case BlockedOnMVar:
3389     case BlockedOnException:
3390       /* Called by GC - sched_mutex lock is currently held. */
3391       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3392       break;
3393     case BlockedOnBlackHole:
3394       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3395       break;
3396     case NotBlocked:
3397       /* This might happen if the thread was blocked on a black hole
3398        * belonging to a thread that we've just woken up (raiseAsync
3399        * can wake up threads, remember...).
3400        */
3401       continue;
3402     default:
3403       barf("resurrectThreads: thread blocked in a strange way");
3404     }
3405   }
3406 }
3407
3408 /* -----------------------------------------------------------------------------
3409  * Blackhole detection: if we reach a deadlock, test whether any
3410  * threads are blocked on themselves.  Any threads which are found to
3411  * be self-blocked get sent a NonTermination exception.
3412  *
3413  * This is only done in a deadlock situation in order to avoid
3414  * performance overhead in the normal case.
3415  *
3416  * Locks: sched_mutex is held upon entry and exit.
3417  * -------------------------------------------------------------------------- */
3418
3419 static void
3420 detectBlackHoles( void )
3421 {
3422     StgTSO *t = all_threads;
3423     StgUpdateFrame *frame;
3424     StgClosure *blocked_on;
3425
3426     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3427
3428         while (t->what_next == ThreadRelocated) {
3429             t = t->link;
3430             ASSERT(get_itbl(t)->type == TSO);
3431         }
3432       
3433         if (t->why_blocked != BlockedOnBlackHole) {
3434             continue;
3435         }
3436
3437         blocked_on = t->block_info.closure;
3438
3439         for (frame = t->su; ; frame = frame->link) {
3440             switch (get_itbl(frame)->type) {
3441
3442             case UPDATE_FRAME:
3443                 if (frame->updatee == blocked_on) {
3444                     /* We are blocking on one of our own computations, so
3445                      * send this thread the NonTermination exception.  
3446                      */
3447                     IF_DEBUG(scheduler, 
3448                              sched_belch("thread %d is blocked on itself", t->id));
3449                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3450                     goto done;
3451                 }
3452                 else {
3453                     continue;
3454                 }
3455
3456             case CATCH_FRAME:
3457             case SEQ_FRAME:
3458                 continue;
3459                 
3460             case STOP_FRAME:
3461                 break;
3462             }
3463             break;
3464         }
3465
3466     done: ;
3467     }   
3468 }
3469
3470 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3471 //@subsection Debugging Routines
3472
3473 /* -----------------------------------------------------------------------------
3474    Debugging: why is a thread blocked
3475    -------------------------------------------------------------------------- */
3476
3477 #ifdef DEBUG
3478
3479 void
3480 printThreadBlockage(StgTSO *tso)
3481 {
3482   switch (tso->why_blocked) {
3483   case BlockedOnRead:
3484     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3485     break;
3486   case BlockedOnWrite:
3487     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3488     break;
3489   case BlockedOnDelay:
3490     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3491     break;
3492   case BlockedOnMVar:
3493     fprintf(stderr,"is blocked on an MVar");
3494     break;
3495   case BlockedOnException:
3496     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3497             tso->block_info.tso->id);
3498     break;
3499   case BlockedOnBlackHole:
3500     fprintf(stderr,"is blocked on a black hole");
3501     break;
3502   case NotBlocked:
3503     fprintf(stderr,"is not blocked");
3504     break;
3505 #if defined(PAR)
3506   case BlockedOnGA:
3507     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3508             tso->block_info.closure, info_type(tso->block_info.closure));
3509     break;
3510   case BlockedOnGA_NoSend:
3511     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3512             tso->block_info.closure, info_type(tso->block_info.closure));
3513     break;
3514 #endif
3515 #if defined(RTS_SUPPORTS_THREADS)
3516   case BlockedOnCCall:
3517     fprintf(stderr,"is blocked on an external call");
3518     break;
3519 #endif
3520   default:
3521     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3522          tso->why_blocked, tso->id, tso);
3523   }
3524 }
3525
3526 void
3527 printThreadStatus(StgTSO *tso)
3528 {
3529   switch (tso->what_next) {
3530   case ThreadKilled:
3531     fprintf(stderr,"has been killed");
3532     break;
3533   case ThreadComplete:
3534     fprintf(stderr,"has completed");
3535     break;
3536   default:
3537     printThreadBlockage(tso);
3538   }
3539 }
3540
3541 void
3542 printAllThreads(void)
3543 {
3544   StgTSO *t;
3545
3546 # if defined(GRAN)
3547   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3548   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3549                        time_string, rtsFalse/*no commas!*/);
3550
3551   sched_belch("all threads at [%s]:", time_string);
3552 # elif defined(PAR)
3553   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3554   ullong_format_string(CURRENT_TIME,
3555                        time_string, rtsFalse/*no commas!*/);
3556
3557   sched_belch("all threads at [%s]:", time_string);
3558 # else
3559   sched_belch("all threads:");
3560 # endif
3561
3562   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3563     fprintf(stderr, "\tthread %d ", t->id);
3564     if (t->label) fprintf(stderr,"[\"%s\"] ",t->label);
3565     printThreadStatus(t);
3566     fprintf(stderr,"\n");
3567   }
3568 }
3569     
3570 /* 
3571    Print a whole blocking queue attached to node (debugging only).
3572 */
3573 //@cindex print_bq
3574 # if defined(PAR)
3575 void 
3576 print_bq (StgClosure *node)
3577 {
3578   StgBlockingQueueElement *bqe;
3579   StgTSO *tso;
3580   rtsBool end;
3581
3582   fprintf(stderr,"## BQ of closure %p (%s): ",
3583           node, info_type(node));
3584
3585   /* should cover all closures that may have a blocking queue */
3586   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3587          get_itbl(node)->type == FETCH_ME_BQ ||
3588          get_itbl(node)->type == RBH ||
3589          get_itbl(node)->type == MVAR);
3590     
3591   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3592
3593   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3594 }
3595
3596 /* 
3597    Print a whole blocking queue starting with the element bqe.
3598 */
3599 void 
3600 print_bqe (StgBlockingQueueElement *bqe)
3601 {
3602   rtsBool end;
3603
3604   /* 
3605      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3606   */
3607   for (end = (bqe==END_BQ_QUEUE);
3608        !end; // iterate until bqe points to a CONSTR
3609        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3610        bqe = end ? END_BQ_QUEUE : bqe->link) {
3611     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3612     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3613     /* types of closures that may appear in a blocking queue */
3614     ASSERT(get_itbl(bqe)->type == TSO ||           
3615            get_itbl(bqe)->type == BLOCKED_FETCH || 
3616            get_itbl(bqe)->type == CONSTR); 
3617     /* only BQs of an RBH end with an RBH_Save closure */
3618     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3619
3620     switch (get_itbl(bqe)->type) {
3621     case TSO:
3622       fprintf(stderr," TSO %u (%x),",
3623               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3624       break;
3625     case BLOCKED_FETCH:
3626       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3627               ((StgBlockedFetch *)bqe)->node, 
3628               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3629               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3630               ((StgBlockedFetch *)bqe)->ga.weight);
3631       break;
3632     case CONSTR:
3633       fprintf(stderr," %s (IP %p),",
3634               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3635                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3636                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3637                "RBH_Save_?"), get_itbl(bqe));
3638       break;
3639     default:
3640       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3641            info_type((StgClosure *)bqe)); // , node, info_type(node));
3642       break;
3643     }
3644   } /* for */
3645   fputc('\n', stderr);
3646 }
3647 # elif defined(GRAN)
3648 void 
3649 print_bq (StgClosure *node)
3650 {
3651   StgBlockingQueueElement *bqe;
3652   PEs node_loc, tso_loc;
3653   rtsBool end;
3654
3655   /* should cover all closures that may have a blocking queue */
3656   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3657          get_itbl(node)->type == FETCH_ME_BQ ||
3658          get_itbl(node)->type == RBH);
3659     
3660   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3661   node_loc = where_is(node);
3662
3663   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3664           node, info_type(node), node_loc);
3665
3666   /* 
3667      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3668   */
3669   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3670        !end; // iterate until bqe points to a CONSTR
3671        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3672     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3673     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3674     /* types of closures that may appear in a blocking queue */
3675     ASSERT(get_itbl(bqe)->type == TSO ||           
3676            get_itbl(bqe)->type == CONSTR); 
3677     /* only BQs of an RBH end with an RBH_Save closure */
3678     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3679
3680     tso_loc = where_is((StgClosure *)bqe);
3681     switch (get_itbl(bqe)->type) {
3682     case TSO:
3683       fprintf(stderr," TSO %d (%p) on [PE %d],",
3684               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3685       break;
3686     case CONSTR:
3687       fprintf(stderr," %s (IP %p),",
3688               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3689                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3690                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3691                "RBH_Save_?"), get_itbl(bqe));
3692       break;
3693     default:
3694       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3695            info_type((StgClosure *)bqe), node, info_type(node));
3696       break;
3697     }
3698   } /* for */
3699   fputc('\n', stderr);
3700 }
3701 #else
3702 /* 
3703    Nice and easy: only TSOs on the blocking queue
3704 */
3705 void 
3706 print_bq (StgClosure *node)
3707 {
3708   StgTSO *tso;
3709
3710   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3711   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3712        tso != END_TSO_QUEUE; 
3713        tso=tso->link) {
3714     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3715     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3716     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3717   }
3718   fputc('\n', stderr);
3719 }
3720 # endif
3721
3722 #if defined(PAR)
3723 static nat
3724 run_queue_len(void)
3725 {
3726   nat i;
3727   StgTSO *tso;
3728
3729   for (i=0, tso=run_queue_hd; 
3730        tso != END_TSO_QUEUE;
3731        i++, tso=tso->link)
3732     /* nothing */
3733
3734   return i;
3735 }
3736 #endif
3737
3738 static void
3739 sched_belch(char *s, ...)
3740 {
3741   va_list ap;
3742   va_start(ap,s);
3743 #ifdef SMP
3744   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3745 #elif defined(PAR)
3746   fprintf(stderr, "== ");
3747 #else
3748   fprintf(stderr, "scheduler: ");
3749 #endif
3750   vfprintf(stderr, s, ap);
3751   fprintf(stderr, "\n");
3752 }
3753
3754 #endif /* DEBUG */
3755
3756
3757 //@node Index,  , Debugging Routines, Main scheduling code
3758 //@subsection Index
3759
3760 //@index
3761 //* StgMainThread::  @cindex\s-+StgMainThread
3762 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3763 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3764 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3765 //* context_switch::  @cindex\s-+context_switch
3766 //* createThread::  @cindex\s-+createThread
3767 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3768 //* initScheduler::  @cindex\s-+initScheduler
3769 //* interrupted::  @cindex\s-+interrupted
3770 //* next_thread_id::  @cindex\s-+next_thread_id
3771 //* print_bq::  @cindex\s-+print_bq
3772 //* run_queue_hd::  @cindex\s-+run_queue_hd
3773 //* run_queue_tl::  @cindex\s-+run_queue_tl
3774 //* sched_mutex::  @cindex\s-+sched_mutex
3775 //* schedule::  @cindex\s-+schedule
3776 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3777 //* term_mutex::  @cindex\s-+term_mutex
3778 //@end index