[project @ 2002-02-14 07:52:05 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.123 2002/02/14 07:52:05 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #include <stdarg.h>
118
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
121
122 /* Main threads:
123  *
124  * These are the threads which clients have requested that we run.  
125  *
126  * In a 'threaded' build, we might have several concurrent clients all
127  * waiting for results, and each one will wait on a condition variable
128  * until the result is available.
129  *
130  * In non-SMP, clients are strictly nested: the first client calls
131  * into the RTS, which might call out again to C with a _ccall_GC, and
132  * eventually re-enter the RTS.
133  *
134  * Main threads information is kept in a linked list:
135  */
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
138   StgTSO *         tso;
139   SchedulerStatus  stat;
140   StgClosure **    ret;
141 #if defined(RTS_SUPPORTS_THREADS)
142   Condition        wakeup;
143 #endif
144   struct StgMainThread_ *link;
145 } StgMainThread;
146
147 /* Main thread queue.
148  * Locks required: sched_mutex.
149  */
150 static StgMainThread *main_threads;
151
152 /* Thread queues.
153  * Locks required: sched_mutex.
154  */
155 #if defined(GRAN)
156
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
159
160 /* 
161    In GranSim we have a runnable and a blocked queue for each processor.
162    In order to minimise code changes new arrays run_queue_hds/tls
163    are created. run_queue_hd is then a short cut (macro) for
164    run_queue_hds[CurrentProc] (see GranSim.h).
165    -- HWL
166 */
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171    the std RTS (i.e. we are cheating). However, we don't use this list in
172    the GranSim specific code at the moment (so we are only potentially
173    cheating).  */
174
175 #else /* !GRAN */
176
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
180
181 #endif
182
183 /* Linked list of all threads.
184  * Used for detecting garbage collected threads.
185  */
186 StgTSO *all_threads;
187
188 /* When a thread performs a safe C call (_ccall_GC, using old
189  * terminology), it gets put on the suspended_ccalling_threads
190  * list. Used by the garbage collector.
191  */
192 static StgTSO *suspended_ccalling_threads;
193
194 static StgTSO *threadStackOverflow(StgTSO *tso);
195
196 /* KH: The following two flags are shared memory locations.  There is no need
197        to lock them, since they are only unset at the end of a scheduler
198        operation.
199 */
200
201 /* flag set by signal handler to precipitate a context switch */
202 //@cindex context_switch
203 nat context_switch;
204
205 /* if this flag is set as well, give up execution */
206 //@cindex interrupted
207 rtsBool interrupted;
208
209 /* Next thread ID to allocate.
210  * Locks required: sched_mutex
211  */
212 //@cindex next_thread_id
213 StgThreadID next_thread_id = 1;
214
215 /*
216  * Pointers to the state of the current thread.
217  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
218  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
219  */
220  
221 /* The smallest stack size that makes any sense is:
222  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
223  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
224  *  + 1                       (the realworld token for an IO thread)
225  *  + 1                       (the closure to enter)
226  *
227  * A thread with this stack will bomb immediately with a stack
228  * overflow, which will increase its stack size.  
229  */
230
231 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
232
233
234 #if defined(GRAN)
235 StgTSO *CurrentTSO;
236 #endif
237
238 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
239  *  exists - earlier gccs apparently didn't.
240  *  -= chak
241  */
242 StgTSO dummy_tso;
243
244 rtsBool ready_to_gc;
245
246 void            addToBlockedQueue ( StgTSO *tso );
247
248 static void     schedule          ( void );
249        void     interruptStgRts   ( void );
250 #if defined(GRAN)
251 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
252 #else
253 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
254 #endif
255
256 static void     detectBlackHoles  ( void );
257
258 #ifdef DEBUG
259 static void sched_belch(char *s, ...);
260 #endif
261
262 #if defined(RTS_SUPPORTS_THREADS)
263 /* ToDo: carefully document the invariants that go together
264  *       with these synchronisation objects.
265  */
266 Mutex     sched_mutex       = INIT_MUTEX_VAR;
267 Mutex     term_mutex        = INIT_MUTEX_VAR;
268
269
270
271
272 /* thread_ready_cond: when signalled, a thread has become runnable for a
273  * task to execute.
274  *
275  * In the non-SMP case, it also implies that the thread that is woken up has
276  * exclusive access to the RTS and all its data structures (that are not
277  * under sched_mutex's control).
278  *
279  * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold.
280  *
281  */
282 Condition thread_ready_cond = INIT_COND_VAR;
283 #if 0
284 /* For documentation purposes only */
285 #define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE())
286 #endif
287
288 /*
289  * To be able to make an informed decision about whether or not 
290  * to create a new task when making an external call, keep track of
291  * the number of tasks currently blocked waiting on thread_ready_cond.
292  * (if > 0 => no need for a new task, just unblock an existing one).
293  *
294  * waitForWork() takes care of keeping it up-to-date; Task.startTask()
295  * uses its current value.
296  */
297 nat rts_n_waiting_tasks = 0;
298
299 static void waitForWork(void);
300
301 # if defined(SMP)
302 static Condition gc_pending_cond = INIT_COND_VAR;
303 nat await_death;
304 # endif
305
306 #endif /* RTS_SUPPORTS_THREADS */
307
308 #if defined(PAR)
309 StgTSO *LastTSO;
310 rtsTime TimeOfLastYield;
311 rtsBool emitSchedule = rtsTrue;
312 #endif
313
314 #if DEBUG
315 char *whatNext_strs[] = {
316   "ThreadEnterGHC",
317   "ThreadRunGHC",
318   "ThreadEnterInterp",
319   "ThreadKilled",
320   "ThreadComplete"
321 };
322
323 char *threadReturnCode_strs[] = {
324   "HeapOverflow",                       /* might also be StackOverflow */
325   "StackOverflow",
326   "ThreadYielding",
327   "ThreadBlocked",
328   "ThreadFinished"
329 };
330 #endif
331
332 #if defined(PAR)
333 StgTSO * createSparkThread(rtsSpark spark);
334 StgTSO * activateSpark (rtsSpark spark);  
335 #endif
336
337 /*
338  * The thread state for the main thread.
339 // ToDo: check whether not needed any more
340 StgTSO   *MainTSO;
341  */
342
343 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
344 static void taskStart(void);
345 static void
346 taskStart(void)
347 {
348   schedule();
349 }
350 #endif
351
352
353
354
355 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
356 //@subsection Main scheduling loop
357
358 /* ---------------------------------------------------------------------------
359    Main scheduling loop.
360
361    We use round-robin scheduling, each thread returning to the
362    scheduler loop when one of these conditions is detected:
363
364       * out of heap space
365       * timer expires (thread yields)
366       * thread blocks
367       * thread ends
368       * stack overflow
369
370    Locking notes:  we acquire the scheduler lock once at the beginning
371    of the scheduler loop, and release it when
372     
373       * running a thread, or
374       * waiting for work, or
375       * waiting for a GC to complete.
376
377    GRAN version:
378      In a GranSim setup this loop iterates over the global event queue.
379      This revolves around the global event queue, which determines what 
380      to do next. Therefore, it's more complicated than either the 
381      concurrent or the parallel (GUM) setup.
382
383    GUM version:
384      GUM iterates over incoming messages.
385      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
386      and sends out a fish whenever it has nothing to do; in-between
387      doing the actual reductions (shared code below) it processes the
388      incoming messages and deals with delayed operations 
389      (see PendingFetches).
390      This is not the ugliest code you could imagine, but it's bloody close.
391
392    ------------------------------------------------------------------------ */
393 //@cindex schedule
394 static void
395 schedule( void )
396 {
397   StgTSO *t;
398   Capability *cap;
399   StgThreadReturnCode ret;
400 #if defined(GRAN)
401   rtsEvent *event;
402 #elif defined(PAR)
403   StgSparkPool *pool;
404   rtsSpark spark;
405   StgTSO *tso;
406   GlobalTaskId pe;
407   rtsBool receivedFinish = rtsFalse;
408 # if defined(DEBUG)
409   nat tp_size, sp_size; // stats only
410 # endif
411 #endif
412   rtsBool was_interrupted = rtsFalse;
413
414 #if defined(RTS_SUPPORTS_THREADS)
415 schedule_start:
416 #endif
417   
418 #if defined(RTS_SUPPORTS_THREADS)
419   ACQUIRE_LOCK(&sched_mutex);
420 #endif
421  
422 #if defined(RTS_SUPPORTS_THREADS)
423   /* ToDo: consider SMP support */
424   if ( rts_n_waiting_workers > 0 && noCapabilities() ) {
425     /* (At least) one native thread is waiting to
426      * deposit the result of an external call. So,
427      * be nice and hand over our capability.
428      */
429     yieldCapability(cap);
430     /* Lost our sched_mutex lock, try to re-enter the scheduler. */
431     goto schedule_start;
432   }
433 #endif
434
435 #if defined(RTS_SUPPORTS_THREADS)
436   while ( noCapabilities() ) {
437     waitForWork();
438   }
439 #endif
440
441 #if defined(GRAN)
442
443   /* set up first event to get things going */
444   /* ToDo: assign costs for system setup and init MainTSO ! */
445   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
446             ContinueThread, 
447             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
448
449   IF_DEBUG(gran,
450            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
451            G_TSO(CurrentTSO, 5));
452
453   if (RtsFlags.GranFlags.Light) {
454     /* Save current time; GranSim Light only */
455     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
456   }      
457
458   event = get_next_event();
459
460   while (event!=(rtsEvent*)NULL) {
461     /* Choose the processor with the next event */
462     CurrentProc = event->proc;
463     CurrentTSO = event->tso;
464
465 #elif defined(PAR)
466
467   while (!receivedFinish) {    /* set by processMessages */
468                                /* when receiving PP_FINISH message         */ 
469 #else
470
471   while (1) {
472
473 #endif
474
475     IF_DEBUG(scheduler, printAllThreads());
476
477     /* If we're interrupted (the user pressed ^C, or some other
478      * termination condition occurred), kill all the currently running
479      * threads.
480      */
481     if (interrupted) {
482       IF_DEBUG(scheduler, sched_belch("interrupted"));
483       deleteAllThreads();
484       interrupted = rtsFalse;
485       was_interrupted = rtsTrue;
486     }
487
488     /* Go through the list of main threads and wake up any
489      * clients whose computations have finished.  ToDo: this
490      * should be done more efficiently without a linear scan
491      * of the main threads list, somehow...
492      */
493 #if defined(RTS_SUPPORTS_THREADS)
494     { 
495       StgMainThread *m, **prev;
496       prev = &main_threads;
497       for (m = main_threads; m != NULL; m = m->link) {
498         switch (m->tso->what_next) {
499         case ThreadComplete:
500           if (m->ret) {
501             *(m->ret) = (StgClosure *)m->tso->sp[0];
502           }
503           *prev = m->link;
504           m->stat = Success;
505           broadcastCondition(&m->wakeup);
506           break;
507         case ThreadKilled:
508           if (m->ret) *(m->ret) = NULL;
509           *prev = m->link;
510           if (was_interrupted) {
511             m->stat = Interrupted;
512           } else {
513             m->stat = Killed;
514           }
515           broadcastCondition(&m->wakeup);
516           break;
517         default:
518           break;
519         }
520       }
521     }
522
523 #else /* not threaded */
524
525 # if defined(PAR)
526     /* in GUM do this only on the Main PE */
527     if (IAmMainThread)
528 # endif
529     /* If our main thread has finished or been killed, return.
530      */
531     {
532       StgMainThread *m = main_threads;
533       if (m->tso->what_next == ThreadComplete
534           || m->tso->what_next == ThreadKilled) {
535         main_threads = main_threads->link;
536         if (m->tso->what_next == ThreadComplete) {
537           /* we finished successfully, fill in the return value */
538           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
539           m->stat = Success;
540           return;
541         } else {
542           if (m->ret) { *(m->ret) = NULL; };
543           if (was_interrupted) {
544             m->stat = Interrupted;
545           } else {
546             m->stat = Killed;
547           }
548           return;
549         }
550       }
551     }
552 #endif
553
554     /* Top up the run queue from our spark pool.  We try to make the
555      * number of threads in the run queue equal to the number of
556      * free capabilities.
557      *
558      * Disable spark support in SMP for now, non-essential & requires
559      * a little bit of work to make it compile cleanly. -- sof 1/02.
560      */
561 #if 0 /* defined(SMP) */
562     {
563       nat n = getFreeCapabilities();
564       StgTSO *tso = run_queue_hd;
565
566       /* Count the run queue */
567       while (n > 0 && tso != END_TSO_QUEUE) {
568         tso = tso->link;
569         n--;
570       }
571
572       for (; n > 0; n--) {
573         StgClosure *spark;
574         spark = findSpark(rtsFalse);
575         if (spark == NULL) {
576           break; /* no more sparks in the pool */
577         } else {
578           /* I'd prefer this to be done in activateSpark -- HWL */
579           /* tricky - it needs to hold the scheduler lock and
580            * not try to re-acquire it -- SDM */
581           createSparkThread(spark);       
582           IF_DEBUG(scheduler,
583                    sched_belch("==^^ turning spark of closure %p into a thread",
584                                (StgClosure *)spark));
585         }
586       }
587       /* We need to wake up the other tasks if we just created some
588        * work for them.
589        */
590       if (getFreeCapabilities() - n > 1) {
591           signalCondition( &thread_ready_cond );
592       }
593     }
594 #endif // SMP
595
596     /* check for signals each time around the scheduler */
597 #ifndef mingw32_TARGET_OS
598     if (signals_pending()) {
599       startSignalHandlers();
600     }
601 #endif
602
603     /* Check whether any waiting threads need to be woken up.  If the
604      * run queue is empty, and there are no other tasks running, we
605      * can wait indefinitely for something to happen.
606      * ToDo: what if another client comes along & requests another
607      * main thread?
608      */
609     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
610       awaitEvent( EMPTY_RUN_QUEUE()
611 #if defined(SMP)
612         && allFreeCapabilities()
613 #endif
614         );
615     }
616     /* we can be interrupted while waiting for I/O... */
617     if (interrupted) continue;
618
619     /* 
620      * Detect deadlock: when we have no threads to run, there are no
621      * threads waiting on I/O or sleeping, and all the other tasks are
622      * waiting for work, we must have a deadlock of some description.
623      *
624      * We first try to find threads blocked on themselves (ie. black
625      * holes), and generate NonTermination exceptions where necessary.
626      *
627      * If no threads are black holed, we have a deadlock situation, so
628      * inform all the main threads.
629      */
630 #ifndef PAR
631     if (   EMPTY_RUN_QUEUE()
632         && EMPTY_QUEUE(blocked_queue_hd)
633         && EMPTY_QUEUE(sleeping_queue)
634 #if defined(RTS_SUPPORTS_THREADS)
635         && EMPTY_QUEUE(suspended_ccalling_threads)
636 #endif
637 #ifdef SMP
638         && allFreeCapabilities()
639 #endif
640         )
641     {
642         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
643 #if defined(THREADED_RTS)
644         /* and SMP mode ..? */
645         releaseCapability(cap);
646 #endif
647         RELEASE_LOCK(&sched_mutex);
648         GarbageCollect(GetRoots,rtsTrue);
649         ACQUIRE_LOCK(&sched_mutex);
650         if (   EMPTY_QUEUE(blocked_queue_hd)
651             && EMPTY_RUN_QUEUE()
652             && EMPTY_QUEUE(sleeping_queue) ) {
653
654             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
655             detectBlackHoles();
656
657             /* No black holes, so probably a real deadlock.  Send the
658              * current main thread the Deadlock exception (or in the SMP
659              * build, send *all* main threads the deadlock exception,
660              * since none of them can make progress).
661              */
662             if ( EMPTY_RUN_QUEUE() ) {
663                 StgMainThread *m;
664 #if defined(RTS_SUPPORTS_THREADS)
665                 for (m = main_threads; m != NULL; m = m->link) {
666                     switch (m->tso->why_blocked) {
667                     case BlockedOnBlackHole:
668                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
669                         break;
670                     case BlockedOnException:
671                     case BlockedOnMVar:
672                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
673                         break;
674                     default:
675                         barf("deadlock: main thread blocked in a strange way");
676                     }
677                 }
678 #else
679                 m = main_threads;
680                 switch (m->tso->why_blocked) {
681                 case BlockedOnBlackHole:
682                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
683                     break;
684                 case BlockedOnException:
685                 case BlockedOnMVar:
686                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
687                     break;
688                 default:
689                     barf("deadlock: main thread blocked in a strange way");
690                 }
691 #endif
692             }
693 #if defined(RTS_SUPPORTS_THREADS)
694             /* ToDo: revisit conditions (and mechanism) for shutting
695                down a multi-threaded world  */
696             if ( EMPTY_RUN_QUEUE() ) {
697               IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
698               shutdownHaskellAndExit(0);
699             }
700 #endif
701             ASSERT( !EMPTY_RUN_QUEUE() );
702         }
703     }
704 #elif defined(PAR)
705     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
706 #endif
707
708 #if defined(SMP)
709     /* If there's a GC pending, don't do anything until it has
710      * completed.
711      */
712     if (ready_to_gc) {
713       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
714       waitCondition( &gc_pending_cond, &sched_mutex );
715     }
716 #endif    
717
718 #if defined(RTS_SUPPORTS_THREADS)
719     /* block until we've got a thread on the run queue and a free
720      * capability.
721      *
722      */
723     if ( EMPTY_RUN_QUEUE() ) {
724       /* Give up our capability */
725       releaseCapability(cap);
726       while ( noCapabilities() || EMPTY_RUN_QUEUE() ) {
727         IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
728         waitForWork();
729         IF_DEBUG(scheduler, sched_belch("thread %d: work now available %d %d", osThreadId(), getFreeCapabilities(),EMPTY_RUN_QUEUE()));
730       }
731     }
732 #endif
733
734 #if defined(GRAN)
735
736     if (RtsFlags.GranFlags.Light)
737       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
738
739     /* adjust time based on time-stamp */
740     if (event->time > CurrentTime[CurrentProc] &&
741         event->evttype != ContinueThread)
742       CurrentTime[CurrentProc] = event->time;
743     
744     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
745     if (!RtsFlags.GranFlags.Light)
746       handleIdlePEs();
747
748     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
749
750     /* main event dispatcher in GranSim */
751     switch (event->evttype) {
752       /* Should just be continuing execution */
753     case ContinueThread:
754       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
755       /* ToDo: check assertion
756       ASSERT(run_queue_hd != (StgTSO*)NULL &&
757              run_queue_hd != END_TSO_QUEUE);
758       */
759       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
760       if (!RtsFlags.GranFlags.DoAsyncFetch &&
761           procStatus[CurrentProc]==Fetching) {
762         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
763               CurrentTSO->id, CurrentTSO, CurrentProc);
764         goto next_thread;
765       } 
766       /* Ignore ContinueThreads for completed threads */
767       if (CurrentTSO->what_next == ThreadComplete) {
768         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
769               CurrentTSO->id, CurrentTSO, CurrentProc);
770         goto next_thread;
771       } 
772       /* Ignore ContinueThreads for threads that are being migrated */
773       if (PROCS(CurrentTSO)==Nowhere) { 
774         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
775               CurrentTSO->id, CurrentTSO, CurrentProc);
776         goto next_thread;
777       }
778       /* The thread should be at the beginning of the run queue */
779       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
780         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
781               CurrentTSO->id, CurrentTSO, CurrentProc);
782         break; // run the thread anyway
783       }
784       /*
785       new_event(proc, proc, CurrentTime[proc],
786                 FindWork,
787                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
788       goto next_thread; 
789       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
790       break; // now actually run the thread; DaH Qu'vam yImuHbej 
791
792     case FetchNode:
793       do_the_fetchnode(event);
794       goto next_thread;             /* handle next event in event queue  */
795       
796     case GlobalBlock:
797       do_the_globalblock(event);
798       goto next_thread;             /* handle next event in event queue  */
799       
800     case FetchReply:
801       do_the_fetchreply(event);
802       goto next_thread;             /* handle next event in event queue  */
803       
804     case UnblockThread:   /* Move from the blocked queue to the tail of */
805       do_the_unblock(event);
806       goto next_thread;             /* handle next event in event queue  */
807       
808     case ResumeThread:  /* Move from the blocked queue to the tail of */
809       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
810       event->tso->gran.blocktime += 
811         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
812       do_the_startthread(event);
813       goto next_thread;             /* handle next event in event queue  */
814       
815     case StartThread:
816       do_the_startthread(event);
817       goto next_thread;             /* handle next event in event queue  */
818       
819     case MoveThread:
820       do_the_movethread(event);
821       goto next_thread;             /* handle next event in event queue  */
822       
823     case MoveSpark:
824       do_the_movespark(event);
825       goto next_thread;             /* handle next event in event queue  */
826       
827     case FindWork:
828       do_the_findwork(event);
829       goto next_thread;             /* handle next event in event queue  */
830       
831     default:
832       barf("Illegal event type %u\n", event->evttype);
833     }  /* switch */
834     
835     /* This point was scheduler_loop in the old RTS */
836
837     IF_DEBUG(gran, belch("GRAN: after main switch"));
838
839     TimeOfLastEvent = CurrentTime[CurrentProc];
840     TimeOfNextEvent = get_time_of_next_event();
841     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
842     // CurrentTSO = ThreadQueueHd;
843
844     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
845                          TimeOfNextEvent));
846
847     if (RtsFlags.GranFlags.Light) 
848       GranSimLight_leave_system(event, &ActiveTSO); 
849
850     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
851
852     IF_DEBUG(gran, 
853              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
854
855     /* in a GranSim setup the TSO stays on the run queue */
856     t = CurrentTSO;
857     /* Take a thread from the run queue. */
858     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
859
860     IF_DEBUG(gran, 
861              fprintf(stderr, "GRAN: About to run current thread, which is\n");
862              G_TSO(t,5));
863
864     context_switch = 0; // turned on via GranYield, checking events and time slice
865
866     IF_DEBUG(gran, 
867              DumpGranEvent(GR_SCHEDULE, t));
868
869     procStatus[CurrentProc] = Busy;
870
871 #elif defined(PAR)
872     if (PendingFetches != END_BF_QUEUE) {
873         processFetches();
874     }
875
876     /* ToDo: phps merge with spark activation above */
877     /* check whether we have local work and send requests if we have none */
878     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
879       /* :-[  no local threads => look out for local sparks */
880       /* the spark pool for the current PE */
881       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
882       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
883           pool->hd < pool->tl) {
884         /* 
885          * ToDo: add GC code check that we really have enough heap afterwards!!
886          * Old comment:
887          * If we're here (no runnable threads) and we have pending
888          * sparks, we must have a space problem.  Get enough space
889          * to turn one of those pending sparks into a
890          * thread... 
891          */
892
893         spark = findSpark(rtsFalse);                /* get a spark */
894         if (spark != (rtsSpark) NULL) {
895           tso = activateSpark(spark);       /* turn the spark into a thread */
896           IF_PAR_DEBUG(schedule,
897                        belch("==== schedule: Created TSO %d (%p); %d threads active",
898                              tso->id, tso, advisory_thread_count));
899
900           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
901             belch("==^^ failed to activate spark");
902             goto next_thread;
903           }               /* otherwise fall through & pick-up new tso */
904         } else {
905           IF_PAR_DEBUG(verbose,
906                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
907                              spark_queue_len(pool)));
908           goto next_thread;
909         }
910       }
911
912       /* If we still have no work we need to send a FISH to get a spark
913          from another PE 
914       */
915       if (EMPTY_RUN_QUEUE()) {
916       /* =8-[  no local sparks => look for work on other PEs */
917         /*
918          * We really have absolutely no work.  Send out a fish
919          * (there may be some out there already), and wait for
920          * something to arrive.  We clearly can't run any threads
921          * until a SCHEDULE or RESUME arrives, and so that's what
922          * we're hoping to see.  (Of course, we still have to
923          * respond to other types of messages.)
924          */
925         TIME now = msTime() /*CURRENT_TIME*/;
926         IF_PAR_DEBUG(verbose, 
927                      belch("--  now=%ld", now));
928         IF_PAR_DEBUG(verbose,
929                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
930                          (last_fish_arrived_at!=0 &&
931                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
932                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
933                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
934                              last_fish_arrived_at,
935                              RtsFlags.ParFlags.fishDelay, now);
936                      });
937         
938         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
939             (last_fish_arrived_at==0 ||
940              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
941           /* outstandingFishes is set in sendFish, processFish;
942              avoid flooding system with fishes via delay */
943           pe = choosePE();
944           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
945                    NEW_FISH_HUNGER);
946
947           // Global statistics: count no. of fishes
948           if (RtsFlags.ParFlags.ParStats.Global &&
949               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
950             globalParStats.tot_fish_mess++;
951           }
952         }
953       
954         receivedFinish = processMessages();
955         goto next_thread;
956       }
957     } else if (PacketsWaiting()) {  /* Look for incoming messages */
958       receivedFinish = processMessages();
959     }
960
961     /* Now we are sure that we have some work available */
962     ASSERT(run_queue_hd != END_TSO_QUEUE);
963
964     /* Take a thread from the run queue, if we have work */
965     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
966     IF_DEBUG(sanity,checkTSO(t));
967
968     /* ToDo: write something to the log-file
969     if (RTSflags.ParFlags.granSimStats && !sameThread)
970         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
971
972     CurrentTSO = t;
973     */
974     /* the spark pool for the current PE */
975     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
976
977     IF_DEBUG(scheduler, 
978              belch("--=^ %d threads, %d sparks on [%#x]", 
979                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
980
981 #if 1
982     if (0 && RtsFlags.ParFlags.ParStats.Full && 
983         t && LastTSO && t->id != LastTSO->id && 
984         LastTSO->why_blocked == NotBlocked && 
985         LastTSO->what_next != ThreadComplete) {
986       // if previously scheduled TSO not blocked we have to record the context switch
987       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
988                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
989     }
990
991     if (RtsFlags.ParFlags.ParStats.Full && 
992         (emitSchedule /* forced emit */ ||
993         (t && LastTSO && t->id != LastTSO->id))) {
994       /* 
995          we are running a different TSO, so write a schedule event to log file
996          NB: If we use fair scheduling we also have to write  a deschedule 
997              event for LastTSO; with unfair scheduling we know that the
998              previous tso has blocked whenever we switch to another tso, so
999              we don't need it in GUM for now
1000       */
1001       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1002                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1003       emitSchedule = rtsFalse;
1004     }
1005      
1006 #endif
1007 #else /* !GRAN && !PAR */
1008   
1009     /* grab a thread from the run queue */
1010     ASSERT(run_queue_hd != END_TSO_QUEUE);
1011     t = POP_RUN_QUEUE();
1012     // Sanity check the thread we're about to run.  This can be
1013     // expensive if there is lots of thread switching going on...
1014     IF_DEBUG(sanity,checkTSO(t));
1015 #endif
1016     
1017     grabCapability(&cap);
1018     cap->r.rCurrentTSO = t;
1019     
1020     /* context switches are now initiated by the timer signal, unless
1021      * the user specified "context switch as often as possible", with
1022      * +RTS -C0
1023      */
1024     if (
1025 #ifdef PROFILING
1026         RtsFlags.ProfFlags.profileInterval == 0 ||
1027 #endif
1028         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1029          && (run_queue_hd != END_TSO_QUEUE
1030              || blocked_queue_hd != END_TSO_QUEUE
1031              || sleeping_queue != END_TSO_QUEUE)))
1032         context_switch = 1;
1033     else
1034         context_switch = 0;
1035
1036     RELEASE_LOCK(&sched_mutex);
1037
1038     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1039                               t->id, t, whatNext_strs[t->what_next]));
1040
1041 #ifdef PROFILING
1042     startHeapProfTimer();
1043 #endif
1044
1045     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1046     /* Run the current thread 
1047      */
1048     switch (cap->r.rCurrentTSO->what_next) {
1049     case ThreadKilled:
1050     case ThreadComplete:
1051         /* Thread already finished, return to scheduler. */
1052         ret = ThreadFinished;
1053         break;
1054     case ThreadEnterGHC:
1055         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1056         break;
1057     case ThreadRunGHC:
1058         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1059         break;
1060     case ThreadEnterInterp:
1061         ret = interpretBCO(cap);
1062         break;
1063     default:
1064       barf("schedule: invalid what_next field");
1065     }
1066     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1067     
1068     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1069 #ifdef PROFILING
1070     stopHeapProfTimer();
1071     CCCS = CCS_SYSTEM;
1072 #endif
1073     
1074     ACQUIRE_LOCK(&sched_mutex);
1075
1076 #ifdef SMP
1077     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1078 #elif !defined(GRAN) && !defined(PAR)
1079     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1080 #endif
1081     t = cap->r.rCurrentTSO;
1082     
1083 #if defined(PAR)
1084     /* HACK 675: if the last thread didn't yield, make sure to print a 
1085        SCHEDULE event to the log file when StgRunning the next thread, even
1086        if it is the same one as before */
1087     LastTSO = t; 
1088     TimeOfLastYield = CURRENT_TIME;
1089 #endif
1090
1091     switch (ret) {
1092     case HeapOverflow:
1093 #if defined(GRAN)
1094       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1095       globalGranStats.tot_heapover++;
1096 #elif defined(PAR)
1097       globalParStats.tot_heapover++;
1098 #endif
1099
1100       // did the task ask for a large block?
1101       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1102           // if so, get one and push it on the front of the nursery.
1103           bdescr *bd;
1104           nat blocks;
1105           
1106           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1107
1108           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1109                                    t->id, t,
1110                                    whatNext_strs[t->what_next], blocks));
1111
1112           // don't do this if it would push us over the
1113           // alloc_blocks_lim limit; we'll GC first.
1114           if (alloc_blocks + blocks < alloc_blocks_lim) {
1115
1116               alloc_blocks += blocks;
1117               bd = allocGroup( blocks );
1118
1119               // link the new group into the list
1120               bd->link = cap->r.rCurrentNursery;
1121               bd->u.back = cap->r.rCurrentNursery->u.back;
1122               if (cap->r.rCurrentNursery->u.back != NULL) {
1123                   cap->r.rCurrentNursery->u.back->link = bd;
1124               } else {
1125                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1126                          g0s0->blocks == cap->r.rNursery);
1127                   cap->r.rNursery = g0s0->blocks = bd;
1128               }           
1129               cap->r.rCurrentNursery->u.back = bd;
1130
1131               // initialise it as a nursery block
1132               bd->step = g0s0;
1133               bd->gen_no = 0;
1134               bd->flags = 0;
1135               bd->free = bd->start;
1136
1137               // don't forget to update the block count in g0s0.
1138               g0s0->n_blocks += blocks;
1139               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1140
1141               // now update the nursery to point to the new block
1142               cap->r.rCurrentNursery = bd;
1143
1144               // we might be unlucky and have another thread get on the
1145               // run queue before us and steal the large block, but in that
1146               // case the thread will just end up requesting another large
1147               // block.
1148               PUSH_ON_RUN_QUEUE(t);
1149               break;
1150           }
1151       }
1152
1153       /* make all the running tasks block on a condition variable,
1154        * maybe set context_switch and wait till they all pile in,
1155        * then have them wait on a GC condition variable.
1156        */
1157       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1158                                t->id, t, whatNext_strs[t->what_next]));
1159       threadPaused(t);
1160 #if defined(GRAN)
1161       ASSERT(!is_on_queue(t,CurrentProc));
1162 #elif defined(PAR)
1163       /* Currently we emit a DESCHEDULE event before GC in GUM.
1164          ToDo: either add separate event to distinguish SYSTEM time from rest
1165                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1166       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1167         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1168                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1169         emitSchedule = rtsTrue;
1170       }
1171 #endif
1172       
1173       ready_to_gc = rtsTrue;
1174       context_switch = 1;               /* stop other threads ASAP */
1175       PUSH_ON_RUN_QUEUE(t);
1176       /* actual GC is done at the end of the while loop */
1177       break;
1178       
1179     case StackOverflow:
1180 #if defined(GRAN)
1181       IF_DEBUG(gran, 
1182                DumpGranEvent(GR_DESCHEDULE, t));
1183       globalGranStats.tot_stackover++;
1184 #elif defined(PAR)
1185       // IF_DEBUG(par, 
1186       // DumpGranEvent(GR_DESCHEDULE, t);
1187       globalParStats.tot_stackover++;
1188 #endif
1189       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1190                                t->id, t, whatNext_strs[t->what_next]));
1191       /* just adjust the stack for this thread, then pop it back
1192        * on the run queue.
1193        */
1194       threadPaused(t);
1195       { 
1196         StgMainThread *m;
1197         /* enlarge the stack */
1198         StgTSO *new_t = threadStackOverflow(t);
1199         
1200         /* This TSO has moved, so update any pointers to it from the
1201          * main thread stack.  It better not be on any other queues...
1202          * (it shouldn't be).
1203          */
1204         for (m = main_threads; m != NULL; m = m->link) {
1205           if (m->tso == t) {
1206             m->tso = new_t;
1207           }
1208         }
1209         threadPaused(new_t);
1210         PUSH_ON_RUN_QUEUE(new_t);
1211       }
1212       break;
1213
1214     case ThreadYielding:
1215 #if defined(GRAN)
1216       IF_DEBUG(gran, 
1217                DumpGranEvent(GR_DESCHEDULE, t));
1218       globalGranStats.tot_yields++;
1219 #elif defined(PAR)
1220       // IF_DEBUG(par, 
1221       // DumpGranEvent(GR_DESCHEDULE, t);
1222       globalParStats.tot_yields++;
1223 #endif
1224       /* put the thread back on the run queue.  Then, if we're ready to
1225        * GC, check whether this is the last task to stop.  If so, wake
1226        * up the GC thread.  getThread will block during a GC until the
1227        * GC is finished.
1228        */
1229       IF_DEBUG(scheduler,
1230                if (t->what_next == ThreadEnterInterp) {
1231                    /* ToDo: or maybe a timer expired when we were in Hugs?
1232                     * or maybe someone hit ctrl-C
1233                     */
1234                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1235                          t->id, t, whatNext_strs[t->what_next]);
1236                } else {
1237                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1238                          t->id, t, whatNext_strs[t->what_next]);
1239                }
1240                );
1241
1242       threadPaused(t);
1243
1244       IF_DEBUG(sanity,
1245                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1246                checkTSO(t));
1247       ASSERT(t->link == END_TSO_QUEUE);
1248 #if defined(GRAN)
1249       ASSERT(!is_on_queue(t,CurrentProc));
1250
1251       IF_DEBUG(sanity,
1252                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1253                checkThreadQsSanity(rtsTrue));
1254 #endif
1255 #if defined(PAR)
1256       if (RtsFlags.ParFlags.doFairScheduling) { 
1257         /* this does round-robin scheduling; good for concurrency */
1258         APPEND_TO_RUN_QUEUE(t);
1259       } else {
1260         /* this does unfair scheduling; good for parallelism */
1261         PUSH_ON_RUN_QUEUE(t);
1262       }
1263 #else
1264       /* this does round-robin scheduling; good for concurrency */
1265       APPEND_TO_RUN_QUEUE(t);
1266 #endif
1267 #if defined(GRAN)
1268       /* add a ContinueThread event to actually process the thread */
1269       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1270                 ContinueThread,
1271                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1272       IF_GRAN_DEBUG(bq, 
1273                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1274                G_EVENTQ(0);
1275                G_CURR_THREADQ(0));
1276 #endif /* GRAN */
1277       break;
1278       
1279     case ThreadBlocked:
1280 #if defined(GRAN)
1281       IF_DEBUG(scheduler,
1282                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1283                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1284                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1285
1286       // ??? needed; should emit block before
1287       IF_DEBUG(gran, 
1288                DumpGranEvent(GR_DESCHEDULE, t)); 
1289       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1290       /*
1291         ngoq Dogh!
1292       ASSERT(procStatus[CurrentProc]==Busy || 
1293               ((procStatus[CurrentProc]==Fetching) && 
1294               (t->block_info.closure!=(StgClosure*)NULL)));
1295       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1296           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1297             procStatus[CurrentProc]==Fetching)) 
1298         procStatus[CurrentProc] = Idle;
1299       */
1300 #elif defined(PAR)
1301       IF_DEBUG(scheduler,
1302                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1303                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1304       IF_PAR_DEBUG(bq,
1305
1306                    if (t->block_info.closure!=(StgClosure*)NULL) 
1307                      print_bq(t->block_info.closure));
1308
1309       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1310       blockThread(t);
1311
1312       /* whatever we schedule next, we must log that schedule */
1313       emitSchedule = rtsTrue;
1314
1315 #else /* !GRAN */
1316       /* don't need to do anything.  Either the thread is blocked on
1317        * I/O, in which case we'll have called addToBlockedQueue
1318        * previously, or it's blocked on an MVar or Blackhole, in which
1319        * case it'll be on the relevant queue already.
1320        */
1321       IF_DEBUG(scheduler,
1322                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1323                printThreadBlockage(t);
1324                fprintf(stderr, "\n"));
1325
1326       /* Only for dumping event to log file 
1327          ToDo: do I need this in GranSim, too?
1328       blockThread(t);
1329       */
1330 #endif
1331       threadPaused(t);
1332       break;
1333       
1334     case ThreadFinished:
1335       /* Need to check whether this was a main thread, and if so, signal
1336        * the task that started it with the return value.  If we have no
1337        * more main threads, we probably need to stop all the tasks until
1338        * we get a new one.
1339        */
1340       /* We also end up here if the thread kills itself with an
1341        * uncaught exception, see Exception.hc.
1342        */
1343       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1344 #if defined(GRAN)
1345       endThread(t, CurrentProc); // clean-up the thread
1346 #elif defined(PAR)
1347       /* For now all are advisory -- HWL */
1348       //if(t->priority==AdvisoryPriority) ??
1349       advisory_thread_count--;
1350       
1351 # ifdef DIST
1352       if(t->dist.priority==RevalPriority)
1353         FinishReval(t);
1354 # endif
1355       
1356       if (RtsFlags.ParFlags.ParStats.Full &&
1357           !RtsFlags.ParFlags.ParStats.Suppressed) 
1358         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1359 #endif
1360       break;
1361       
1362     default:
1363       barf("schedule: invalid thread return code %d", (int)ret);
1364     }
1365     
1366 #if defined(RTS_SUPPORTS_THREADS)
1367     /* I don't understand what this re-grab is doing -- sof */
1368     grabCapability(&cap);
1369 #endif
1370
1371 #ifdef PROFILING
1372     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1373         GarbageCollect(GetRoots, rtsTrue);
1374         heapCensus();
1375         performHeapProfile = rtsFalse;
1376         ready_to_gc = rtsFalse; // we already GC'd
1377     }
1378 #endif
1379
1380 #ifdef SMP
1381     if (ready_to_gc && allFreeCapabilities() )
1382 #else
1383     if (ready_to_gc) 
1384 #endif
1385       {
1386       /* everybody back, start the GC.
1387        * Could do it in this thread, or signal a condition var
1388        * to do it in another thread.  Either way, we need to
1389        * broadcast on gc_pending_cond afterward.
1390        */
1391 #if defined(RTS_SUPPORTS_THREADS)
1392       IF_DEBUG(scheduler,sched_belch("doing GC"));
1393 #endif
1394       GarbageCollect(GetRoots,rtsFalse);
1395       ready_to_gc = rtsFalse;
1396 #ifdef SMP
1397       broadcastCondition(&gc_pending_cond);
1398 #endif
1399 #if defined(GRAN)
1400       /* add a ContinueThread event to continue execution of current thread */
1401       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1402                 ContinueThread,
1403                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1404       IF_GRAN_DEBUG(bq, 
1405                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1406                G_EVENTQ(0);
1407                G_CURR_THREADQ(0));
1408 #endif /* GRAN */
1409     }
1410
1411 #if defined(GRAN)
1412   next_thread:
1413     IF_GRAN_DEBUG(unused,
1414                   print_eventq(EventHd));
1415
1416     event = get_next_event();
1417 #elif defined(PAR)
1418   next_thread:
1419     /* ToDo: wait for next message to arrive rather than busy wait */
1420 #endif /* GRAN */
1421
1422   } /* end of while(1) */
1423
1424   IF_PAR_DEBUG(verbose,
1425                belch("== Leaving schedule() after having received Finish"));
1426 }
1427
1428 /* ---------------------------------------------------------------------------
1429  * deleteAllThreads():  kill all the live threads.
1430  *
1431  * This is used when we catch a user interrupt (^C), before performing
1432  * any necessary cleanups and running finalizers.
1433  * ------------------------------------------------------------------------- */
1434    
1435 void deleteAllThreads ( void )
1436 {
1437   StgTSO* t, *next;
1438   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1439   for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1440       next = t->link;
1441       deleteThread(t);
1442   }
1443   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1444       next = t->link;
1445       deleteThread(t);
1446   }
1447   for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1448       next = t->link;
1449       deleteThread(t);
1450   }
1451   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1452   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1453   sleeping_queue = END_TSO_QUEUE;
1454 }
1455
1456 /* startThread and  insertThread are now in GranSim.c -- HWL */
1457
1458
1459 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1460 //@subsection Suspend and Resume
1461
1462 /* ---------------------------------------------------------------------------
1463  * Suspending & resuming Haskell threads.
1464  * 
1465  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1466  * its capability before calling the C function.  This allows another
1467  * task to pick up the capability and carry on running Haskell
1468  * threads.  It also means that if the C call blocks, it won't lock
1469  * the whole system.
1470  *
1471  * The Haskell thread making the C call is put to sleep for the
1472  * duration of the call, on the susepended_ccalling_threads queue.  We
1473  * give out a token to the task, which it can use to resume the thread
1474  * on return from the C function.
1475  * ------------------------------------------------------------------------- */
1476    
1477 StgInt
1478 suspendThread( StgRegTable *reg )
1479 {
1480   nat tok;
1481   Capability *cap;
1482
1483   /* assume that *reg is a pointer to the StgRegTable part
1484    * of a Capability.
1485    */
1486   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1487
1488   ACQUIRE_LOCK(&sched_mutex);
1489
1490   IF_DEBUG(scheduler,
1491            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1492
1493   threadPaused(cap->r.rCurrentTSO);
1494   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1495   suspended_ccalling_threads = cap->r.rCurrentTSO;
1496
1497 #if defined(RTS_SUPPORTS_THREADS)
1498   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1499 #endif
1500
1501   /* Use the thread ID as the token; it should be unique */
1502   tok = cap->r.rCurrentTSO->id;
1503
1504   /* Hand back capability */
1505   releaseCapability(cap);
1506   
1507 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1508   /* Preparing to leave the RTS, so ensure there's a native thread/task
1509      waiting to take over.
1510      
1511      ToDo: optimise this and only create a new task if there's a need
1512      for one (i.e., if there's only one Concurrent Haskell thread alive,
1513      there's no need to create a new task).
1514   */
1515   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok));
1516   startTask(taskStart);
1517 #endif
1518
1519   THREAD_RUNNABLE();
1520   RELEASE_LOCK(&sched_mutex);
1521   return tok; 
1522 }
1523
1524 StgRegTable *
1525 resumeThread( StgInt tok )
1526 {
1527   StgTSO *tso, **prev;
1528   Capability *cap;
1529
1530 #if defined(RTS_SUPPORTS_THREADS)
1531   /* Wait for permission to re-enter the RTS with the result.. */
1532   grabReturnCapability(&cap);
1533 #else
1534   grabCapability(&cap);
1535 #endif
1536
1537   /* Remove the thread off of the suspended list */
1538   prev = &suspended_ccalling_threads;
1539   for (tso = suspended_ccalling_threads; 
1540        tso != END_TSO_QUEUE; 
1541        prev = &tso->link, tso = tso->link) {
1542     if (tso->id == (StgThreadID)tok) {
1543       *prev = tso->link;
1544       break;
1545     }
1546   }
1547   if (tso == END_TSO_QUEUE) {
1548     barf("resumeThread: thread not found");
1549   }
1550   tso->link = END_TSO_QUEUE;
1551   /* Reset blocking status */
1552   tso->why_blocked  = NotBlocked;
1553
1554   RELEASE_LOCK(&sched_mutex);
1555
1556   cap->r.rCurrentTSO = tso;
1557   return &cap->r;
1558 }
1559
1560
1561 #if defined(RTS_SUPPORTS_THREADS)
1562 static void
1563 waitForWork()
1564 {
1565   rts_n_waiting_tasks++;
1566   waitCondition(&thread_ready_cond, &sched_mutex);
1567   rts_n_waiting_tasks--;
1568   return;
1569 }
1570 #endif
1571
1572
1573 /* ---------------------------------------------------------------------------
1574  * Static functions
1575  * ------------------------------------------------------------------------ */
1576 static void unblockThread(StgTSO *tso);
1577
1578 /* ---------------------------------------------------------------------------
1579  * Comparing Thread ids.
1580  *
1581  * This is used from STG land in the implementation of the
1582  * instances of Eq/Ord for ThreadIds.
1583  * ------------------------------------------------------------------------ */
1584
1585 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1586
1587   StgThreadID id1 = tso1->id; 
1588   StgThreadID id2 = tso2->id;
1589  
1590   if (id1 < id2) return (-1);
1591   if (id1 > id2) return 1;
1592   return 0;
1593 }
1594
1595 /* ---------------------------------------------------------------------------
1596  * Fetching the ThreadID from an StgTSO.
1597  *
1598  * This is used in the implementation of Show for ThreadIds.
1599  * ------------------------------------------------------------------------ */
1600 int rts_getThreadId(const StgTSO *tso) 
1601 {
1602   return tso->id;
1603 }
1604
1605 /* ---------------------------------------------------------------------------
1606    Create a new thread.
1607
1608    The new thread starts with the given stack size.  Before the
1609    scheduler can run, however, this thread needs to have a closure
1610    (and possibly some arguments) pushed on its stack.  See
1611    pushClosure() in Schedule.h.
1612
1613    createGenThread() and createIOThread() (in SchedAPI.h) are
1614    convenient packaged versions of this function.
1615
1616    currently pri (priority) is only used in a GRAN setup -- HWL
1617    ------------------------------------------------------------------------ */
1618 //@cindex createThread
1619 #if defined(GRAN)
1620 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1621 StgTSO *
1622 createThread(nat stack_size, StgInt pri)
1623 {
1624   return createThread_(stack_size, rtsFalse, pri);
1625 }
1626
1627 static StgTSO *
1628 createThread_(nat size, rtsBool have_lock, StgInt pri)
1629 {
1630 #else
1631 StgTSO *
1632 createThread(nat stack_size)
1633 {
1634   return createThread_(stack_size, rtsFalse);
1635 }
1636
1637 static StgTSO *
1638 createThread_(nat size, rtsBool have_lock)
1639 {
1640 #endif
1641
1642     StgTSO *tso;
1643     nat stack_size;
1644
1645     /* First check whether we should create a thread at all */
1646 #if defined(PAR)
1647   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1648   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1649     threadsIgnored++;
1650     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1651           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1652     return END_TSO_QUEUE;
1653   }
1654   threadsCreated++;
1655 #endif
1656
1657 #if defined(GRAN)
1658   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1659 #endif
1660
1661   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1662
1663   /* catch ridiculously small stack sizes */
1664   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1665     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1666   }
1667
1668   stack_size = size - TSO_STRUCT_SIZEW;
1669
1670   tso = (StgTSO *)allocate(size);
1671   TICK_ALLOC_TSO(stack_size, 0);
1672
1673   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1674 #if defined(GRAN)
1675   SET_GRAN_HDR(tso, ThisPE);
1676 #endif
1677   tso->what_next     = ThreadEnterGHC;
1678
1679   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1680    * protect the increment operation on next_thread_id.
1681    * In future, we could use an atomic increment instead.
1682    */
1683   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1684   tso->id = next_thread_id++; 
1685   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1686
1687   tso->why_blocked  = NotBlocked;
1688   tso->blocked_exceptions = NULL;
1689
1690   tso->stack_size   = stack_size;
1691   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1692                               - TSO_STRUCT_SIZEW;
1693   tso->sp           = (P_)&(tso->stack) + stack_size;
1694
1695 #ifdef PROFILING
1696   tso->prof.CCCS = CCS_MAIN;
1697 #endif
1698
1699   /* put a stop frame on the stack */
1700   tso->sp -= sizeofW(StgStopFrame);
1701   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1702   tso->su = (StgUpdateFrame*)tso->sp;
1703
1704   // ToDo: check this
1705 #if defined(GRAN)
1706   tso->link = END_TSO_QUEUE;
1707   /* uses more flexible routine in GranSim */
1708   insertThread(tso, CurrentProc);
1709 #else
1710   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1711    * from its creation
1712    */
1713 #endif
1714
1715 #if defined(GRAN) 
1716   if (RtsFlags.GranFlags.GranSimStats.Full) 
1717     DumpGranEvent(GR_START,tso);
1718 #elif defined(PAR)
1719   if (RtsFlags.ParFlags.ParStats.Full) 
1720     DumpGranEvent(GR_STARTQ,tso);
1721   /* HACk to avoid SCHEDULE 
1722      LastTSO = tso; */
1723 #endif
1724
1725   /* Link the new thread on the global thread list.
1726    */
1727   tso->global_link = all_threads;
1728   all_threads = tso;
1729
1730 #if defined(DIST)
1731   tso->dist.priority = MandatoryPriority; //by default that is...
1732 #endif
1733
1734 #if defined(GRAN)
1735   tso->gran.pri = pri;
1736 # if defined(DEBUG)
1737   tso->gran.magic = TSO_MAGIC; // debugging only
1738 # endif
1739   tso->gran.sparkname   = 0;
1740   tso->gran.startedat   = CURRENT_TIME; 
1741   tso->gran.exported    = 0;
1742   tso->gran.basicblocks = 0;
1743   tso->gran.allocs      = 0;
1744   tso->gran.exectime    = 0;
1745   tso->gran.fetchtime   = 0;
1746   tso->gran.fetchcount  = 0;
1747   tso->gran.blocktime   = 0;
1748   tso->gran.blockcount  = 0;
1749   tso->gran.blockedat   = 0;
1750   tso->gran.globalsparks = 0;
1751   tso->gran.localsparks  = 0;
1752   if (RtsFlags.GranFlags.Light)
1753     tso->gran.clock  = Now; /* local clock */
1754   else
1755     tso->gran.clock  = 0;
1756
1757   IF_DEBUG(gran,printTSO(tso));
1758 #elif defined(PAR)
1759 # if defined(DEBUG)
1760   tso->par.magic = TSO_MAGIC; // debugging only
1761 # endif
1762   tso->par.sparkname   = 0;
1763   tso->par.startedat   = CURRENT_TIME; 
1764   tso->par.exported    = 0;
1765   tso->par.basicblocks = 0;
1766   tso->par.allocs      = 0;
1767   tso->par.exectime    = 0;
1768   tso->par.fetchtime   = 0;
1769   tso->par.fetchcount  = 0;
1770   tso->par.blocktime   = 0;
1771   tso->par.blockcount  = 0;
1772   tso->par.blockedat   = 0;
1773   tso->par.globalsparks = 0;
1774   tso->par.localsparks  = 0;
1775 #endif
1776
1777 #if defined(GRAN)
1778   globalGranStats.tot_threads_created++;
1779   globalGranStats.threads_created_on_PE[CurrentProc]++;
1780   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1781   globalGranStats.tot_sq_probes++;
1782 #elif defined(PAR)
1783   // collect parallel global statistics (currently done together with GC stats)
1784   if (RtsFlags.ParFlags.ParStats.Global &&
1785       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1786     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1787     globalParStats.tot_threads_created++;
1788   }
1789 #endif 
1790
1791 #if defined(GRAN)
1792   IF_GRAN_DEBUG(pri,
1793                 belch("==__ schedule: Created TSO %d (%p);",
1794                       CurrentProc, tso, tso->id));
1795 #elif defined(PAR)
1796     IF_PAR_DEBUG(verbose,
1797                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1798                        tso->id, tso, advisory_thread_count));
1799 #else
1800   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1801                                  tso->id, tso->stack_size));
1802 #endif    
1803   return tso;
1804 }
1805
1806 #if defined(PAR)
1807 /* RFP:
1808    all parallel thread creation calls should fall through the following routine.
1809 */
1810 StgTSO *
1811 createSparkThread(rtsSpark spark) 
1812 { StgTSO *tso;
1813   ASSERT(spark != (rtsSpark)NULL);
1814   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1815   { threadsIgnored++;
1816     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1817           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1818     return END_TSO_QUEUE;
1819   }
1820   else
1821   { threadsCreated++;
1822     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1823     if (tso==END_TSO_QUEUE)     
1824       barf("createSparkThread: Cannot create TSO");
1825 #if defined(DIST)
1826     tso->priority = AdvisoryPriority;
1827 #endif
1828     pushClosure(tso,spark);
1829     PUSH_ON_RUN_QUEUE(tso);
1830     advisory_thread_count++;    
1831   }
1832   return tso;
1833 }
1834 #endif
1835
1836 /*
1837   Turn a spark into a thread.
1838   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1839 */
1840 #if defined(PAR)
1841 //@cindex activateSpark
1842 StgTSO *
1843 activateSpark (rtsSpark spark) 
1844 {
1845   StgTSO *tso;
1846
1847   tso = createSparkThread(spark);
1848   if (RtsFlags.ParFlags.ParStats.Full) {   
1849     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1850     IF_PAR_DEBUG(verbose,
1851                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1852                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1853   }
1854   // ToDo: fwd info on local/global spark to thread -- HWL
1855   // tso->gran.exported =  spark->exported;
1856   // tso->gran.locked =   !spark->global;
1857   // tso->gran.sparkname = spark->name;
1858
1859   return tso;
1860 }
1861 #endif
1862
1863 /* ---------------------------------------------------------------------------
1864  * scheduleThread()
1865  *
1866  * scheduleThread puts a thread on the head of the runnable queue.
1867  * This will usually be done immediately after a thread is created.
1868  * The caller of scheduleThread must create the thread using e.g.
1869  * createThread and push an appropriate closure
1870  * on this thread's stack before the scheduler is invoked.
1871  * ------------------------------------------------------------------------ */
1872
1873 void
1874 scheduleThread_(StgTSO *tso
1875 #if defined(THREADED_RTS)
1876                , rtsBool createTask
1877 #endif
1878               )
1879 {
1880   ACQUIRE_LOCK(&sched_mutex);
1881
1882   /* Put the new thread on the head of the runnable queue.  The caller
1883    * better push an appropriate closure on this thread's stack
1884    * beforehand.  In the SMP case, the thread may start running as
1885    * soon as we release the scheduler lock below.
1886    */
1887   PUSH_ON_RUN_QUEUE(tso);
1888 #if defined(THREADED_RTS)
1889   /* If main() is scheduling a thread, don't bother creating a 
1890    * new task.
1891    */
1892   if ( createTask ) {
1893     startTask(taskStart);
1894   }
1895 #endif
1896   THREAD_RUNNABLE();
1897
1898 #if 0
1899   IF_DEBUG(scheduler,printTSO(tso));
1900 #endif
1901   RELEASE_LOCK(&sched_mutex);
1902 }
1903
1904 void scheduleThread(StgTSO* tso)
1905 {
1906 #if defined(THREADED_RTS)
1907   return scheduleThread_(tso, rtsTrue);
1908 #else
1909   return scheduleThread_(tso);
1910 #endif
1911 }
1912
1913 /* ---------------------------------------------------------------------------
1914  * initScheduler()
1915  *
1916  * Initialise the scheduler.  This resets all the queues - if the
1917  * queues contained any threads, they'll be garbage collected at the
1918  * next pass.
1919  *
1920  * ------------------------------------------------------------------------ */
1921
1922 #ifdef SMP
1923 static void
1924 term_handler(int sig STG_UNUSED)
1925 {
1926   stat_workerStop();
1927   ACQUIRE_LOCK(&term_mutex);
1928   await_death--;
1929   RELEASE_LOCK(&term_mutex);
1930   shutdownThread();
1931 }
1932 #endif
1933
1934 void 
1935 initScheduler(void)
1936 {
1937 #if defined(GRAN)
1938   nat i;
1939
1940   for (i=0; i<=MAX_PROC; i++) {
1941     run_queue_hds[i]      = END_TSO_QUEUE;
1942     run_queue_tls[i]      = END_TSO_QUEUE;
1943     blocked_queue_hds[i]  = END_TSO_QUEUE;
1944     blocked_queue_tls[i]  = END_TSO_QUEUE;
1945     ccalling_threadss[i]  = END_TSO_QUEUE;
1946     sleeping_queue        = END_TSO_QUEUE;
1947   }
1948 #else
1949   run_queue_hd      = END_TSO_QUEUE;
1950   run_queue_tl      = END_TSO_QUEUE;
1951   blocked_queue_hd  = END_TSO_QUEUE;
1952   blocked_queue_tl  = END_TSO_QUEUE;
1953   sleeping_queue    = END_TSO_QUEUE;
1954 #endif 
1955
1956   suspended_ccalling_threads  = END_TSO_QUEUE;
1957
1958   main_threads = NULL;
1959   all_threads  = END_TSO_QUEUE;
1960
1961   context_switch = 0;
1962   interrupted    = 0;
1963
1964   RtsFlags.ConcFlags.ctxtSwitchTicks =
1965       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1966       
1967 #if defined(RTS_SUPPORTS_THREADS)
1968   /* Initialise the mutex and condition variables used by
1969    * the scheduler. */
1970   initMutex(&sched_mutex);
1971   initMutex(&term_mutex);
1972
1973   initCondition(&thread_ready_cond);
1974 #endif
1975   
1976 #if defined(SMP)
1977   initCondition(&gc_pending_cond);
1978 #endif
1979
1980 #if defined(RTS_SUPPORTS_THREADS)
1981   ACQUIRE_LOCK(&sched_mutex);
1982 #endif
1983
1984   /* Install the SIGHUP handler */
1985 #if defined(SMP)
1986   {
1987     struct sigaction action,oact;
1988
1989     action.sa_handler = term_handler;
1990     sigemptyset(&action.sa_mask);
1991     action.sa_flags = 0;
1992     if (sigaction(SIGTERM, &action, &oact) != 0) {
1993       barf("can't install TERM handler");
1994     }
1995   }
1996 #endif
1997
1998   /* A capability holds the state a native thread needs in
1999    * order to execute STG code. At least one capability is
2000    * floating around (only SMP builds have more than one).
2001    */
2002   initCapabilities();
2003   
2004 #if defined(RTS_SUPPORTS_THREADS)
2005     /* start our haskell execution tasks */
2006 # if defined(SMP)
2007     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2008 # else
2009     startTaskManager(0,taskStart);
2010 # endif
2011 #endif
2012
2013 #if /* defined(SMP) ||*/ defined(PAR)
2014   initSparkPools();
2015 #endif
2016
2017 #if defined(RTS_SUPPORTS_THREADS)
2018   RELEASE_LOCK(&sched_mutex);
2019 #endif
2020
2021 }
2022
2023 void
2024 exitScheduler( void )
2025 {
2026 #if defined(RTS_SUPPORTS_THREADS)
2027   stopTaskManager();
2028 #endif
2029 }
2030
2031 /* -----------------------------------------------------------------------------
2032    Managing the per-task allocation areas.
2033    
2034    Each capability comes with an allocation area.  These are
2035    fixed-length block lists into which allocation can be done.
2036
2037    ToDo: no support for two-space collection at the moment???
2038    -------------------------------------------------------------------------- */
2039
2040 /* -----------------------------------------------------------------------------
2041  * waitThread is the external interface for running a new computation
2042  * and waiting for the result.
2043  *
2044  * In the non-SMP case, we create a new main thread, push it on the 
2045  * main-thread stack, and invoke the scheduler to run it.  The
2046  * scheduler will return when the top main thread on the stack has
2047  * completed or died, and fill in the necessary fields of the
2048  * main_thread structure.
2049  *
2050  * In the SMP case, we create a main thread as before, but we then
2051  * create a new condition variable and sleep on it.  When our new
2052  * main thread has completed, we'll be woken up and the status/result
2053  * will be in the main_thread struct.
2054  * -------------------------------------------------------------------------- */
2055
2056 int 
2057 howManyThreadsAvail ( void )
2058 {
2059    int i = 0;
2060    StgTSO* q;
2061    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2062       i++;
2063    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2064       i++;
2065    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2066       i++;
2067    return i;
2068 }
2069
2070 void
2071 finishAllThreads ( void )
2072 {
2073    do {
2074       while (run_queue_hd != END_TSO_QUEUE) {
2075          waitThread ( run_queue_hd, NULL);
2076       }
2077       while (blocked_queue_hd != END_TSO_QUEUE) {
2078          waitThread ( blocked_queue_hd, NULL);
2079       }
2080       while (sleeping_queue != END_TSO_QUEUE) {
2081          waitThread ( blocked_queue_hd, NULL);
2082       }
2083    } while 
2084       (blocked_queue_hd != END_TSO_QUEUE || 
2085        run_queue_hd     != END_TSO_QUEUE ||
2086        sleeping_queue   != END_TSO_QUEUE);
2087 }
2088
2089 SchedulerStatus
2090 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2091
2092 #if defined(THREADED_RTS)
2093   return waitThread_(tso,ret, rtsFalse);
2094 #else
2095   return waitThread_(tso,ret);
2096 #endif
2097 }
2098
2099 SchedulerStatus
2100 waitThread_(StgTSO *tso,
2101             /*out*/StgClosure **ret
2102 #if defined(THREADED_RTS)
2103             , rtsBool blockWaiting
2104 #endif
2105            )
2106 {
2107   StgMainThread *m;
2108   SchedulerStatus stat;
2109
2110   ACQUIRE_LOCK(&sched_mutex);
2111   
2112   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2113
2114   m->tso = tso;
2115   m->ret = ret;
2116   m->stat = NoStatus;
2117 #if defined(RTS_SUPPORTS_THREADS)
2118   initCondition(&m->wakeup);
2119 #endif
2120
2121   m->link = main_threads;
2122   main_threads = m;
2123
2124   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2125
2126 #if defined(RTS_SUPPORTS_THREADS)
2127
2128 # if defined(THREADED_RTS)
2129   if (!blockWaiting) {
2130     /* In the threaded case, the OS thread that called main()
2131      * gets to enter the RTS directly without going via another
2132      * task/thread.
2133      */
2134     RELEASE_LOCK(&sched_mutex);
2135     schedule();
2136     ASSERT(m->stat != NoStatus);
2137   } else 
2138 # endif
2139   {
2140     IF_DEBUG(scheduler, sched_belch("sfoo"));
2141     do {
2142       waitCondition(&m->wakeup, &sched_mutex);
2143     } while (m->stat == NoStatus);
2144   }
2145 #elif defined(GRAN)
2146   /* GranSim specific init */
2147   CurrentTSO = m->tso;                // the TSO to run
2148   procStatus[MainProc] = Busy;        // status of main PE
2149   CurrentProc = MainProc;             // PE to run it on
2150
2151   schedule();
2152 #else
2153   RELEASE_LOCK(&sched_mutex);
2154   schedule();
2155   ASSERT(m->stat != NoStatus);
2156 #endif
2157
2158   stat = m->stat;
2159
2160 #if defined(RTS_SUPPORTS_THREADS)
2161   closeCondition(&m->wakeup);
2162 #endif
2163
2164   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2165                               m->tso->id));
2166   free(m);
2167
2168 #if defined(THREADED_RTS)
2169   if (blockWaiting) 
2170 #endif
2171     RELEASE_LOCK(&sched_mutex);
2172
2173   return stat;
2174 }
2175
2176 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2177 //@subsection Run queue code 
2178
2179 #if 0
2180 /* 
2181    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2182        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2183        implicit global variable that has to be correct when calling these
2184        fcts -- HWL 
2185 */
2186
2187 /* Put the new thread on the head of the runnable queue.
2188  * The caller of createThread better push an appropriate closure
2189  * on this thread's stack before the scheduler is invoked.
2190  */
2191 static /* inline */ void
2192 add_to_run_queue(tso)
2193 StgTSO* tso; 
2194 {
2195   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2196   tso->link = run_queue_hd;
2197   run_queue_hd = tso;
2198   if (run_queue_tl == END_TSO_QUEUE) {
2199     run_queue_tl = tso;
2200   }
2201 }
2202
2203 /* Put the new thread at the end of the runnable queue. */
2204 static /* inline */ void
2205 push_on_run_queue(tso)
2206 StgTSO* tso; 
2207 {
2208   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2209   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2210   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2211   if (run_queue_hd == END_TSO_QUEUE) {
2212     run_queue_hd = tso;
2213   } else {
2214     run_queue_tl->link = tso;
2215   }
2216   run_queue_tl = tso;
2217 }
2218
2219 /* 
2220    Should be inlined because it's used very often in schedule.  The tso
2221    argument is actually only needed in GranSim, where we want to have the
2222    possibility to schedule *any* TSO on the run queue, irrespective of the
2223    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2224    the run queue and dequeue the tso, adjusting the links in the queue. 
2225 */
2226 //@cindex take_off_run_queue
2227 static /* inline */ StgTSO*
2228 take_off_run_queue(StgTSO *tso) {
2229   StgTSO *t, *prev;
2230
2231   /* 
2232      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2233
2234      if tso is specified, unlink that tso from the run_queue (doesn't have
2235      to be at the beginning of the queue); GranSim only 
2236   */
2237   if (tso!=END_TSO_QUEUE) {
2238     /* find tso in queue */
2239     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2240          t!=END_TSO_QUEUE && t!=tso;
2241          prev=t, t=t->link) 
2242       /* nothing */ ;
2243     ASSERT(t==tso);
2244     /* now actually dequeue the tso */
2245     if (prev!=END_TSO_QUEUE) {
2246       ASSERT(run_queue_hd!=t);
2247       prev->link = t->link;
2248     } else {
2249       /* t is at beginning of thread queue */
2250       ASSERT(run_queue_hd==t);
2251       run_queue_hd = t->link;
2252     }
2253     /* t is at end of thread queue */
2254     if (t->link==END_TSO_QUEUE) {
2255       ASSERT(t==run_queue_tl);
2256       run_queue_tl = prev;
2257     } else {
2258       ASSERT(run_queue_tl!=t);
2259     }
2260     t->link = END_TSO_QUEUE;
2261   } else {
2262     /* take tso from the beginning of the queue; std concurrent code */
2263     t = run_queue_hd;
2264     if (t != END_TSO_QUEUE) {
2265       run_queue_hd = t->link;
2266       t->link = END_TSO_QUEUE;
2267       if (run_queue_hd == END_TSO_QUEUE) {
2268         run_queue_tl = END_TSO_QUEUE;
2269       }
2270     }
2271   }
2272   return t;
2273 }
2274
2275 #endif /* 0 */
2276
2277 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2278 //@subsection Garbage Collextion Routines
2279
2280 /* ---------------------------------------------------------------------------
2281    Where are the roots that we know about?
2282
2283         - all the threads on the runnable queue
2284         - all the threads on the blocked queue
2285         - all the threads on the sleeping queue
2286         - all the thread currently executing a _ccall_GC
2287         - all the "main threads"
2288      
2289    ------------------------------------------------------------------------ */
2290
2291 /* This has to be protected either by the scheduler monitor, or by the
2292         garbage collection monitor (probably the latter).
2293         KH @ 25/10/99
2294 */
2295
2296 void
2297 GetRoots(evac_fn evac)
2298 {
2299   StgMainThread *m;
2300
2301 #if defined(GRAN)
2302   {
2303     nat i;
2304     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2305       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2306           evac((StgClosure **)&run_queue_hds[i]);
2307       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2308           evac((StgClosure **)&run_queue_tls[i]);
2309       
2310       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2311           evac((StgClosure **)&blocked_queue_hds[i]);
2312       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2313           evac((StgClosure **)&blocked_queue_tls[i]);
2314       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2315           evac((StgClosure **)&ccalling_threads[i]);
2316     }
2317   }
2318
2319   markEventQueue();
2320
2321 #else /* !GRAN */
2322   if (run_queue_hd != END_TSO_QUEUE) {
2323       ASSERT(run_queue_tl != END_TSO_QUEUE);
2324       evac((StgClosure **)&run_queue_hd);
2325       evac((StgClosure **)&run_queue_tl);
2326   }
2327   
2328   if (blocked_queue_hd != END_TSO_QUEUE) {
2329       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2330       evac((StgClosure **)&blocked_queue_hd);
2331       evac((StgClosure **)&blocked_queue_tl);
2332   }
2333   
2334   if (sleeping_queue != END_TSO_QUEUE) {
2335       evac((StgClosure **)&sleeping_queue);
2336   }
2337 #endif 
2338
2339   for (m = main_threads; m != NULL; m = m->link) {
2340       evac((StgClosure **)&m->tso);
2341   }
2342   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2343       evac((StgClosure **)&suspended_ccalling_threads);
2344   }
2345
2346 #if defined(PAR) || defined(GRAN)
2347   markSparkQueue(evac);
2348 #endif
2349 }
2350
2351 /* -----------------------------------------------------------------------------
2352    performGC
2353
2354    This is the interface to the garbage collector from Haskell land.
2355    We provide this so that external C code can allocate and garbage
2356    collect when called from Haskell via _ccall_GC.
2357
2358    It might be useful to provide an interface whereby the programmer
2359    can specify more roots (ToDo).
2360    
2361    This needs to be protected by the GC condition variable above.  KH.
2362    -------------------------------------------------------------------------- */
2363
2364 void (*extra_roots)(evac_fn);
2365
2366 void
2367 performGC(void)
2368 {
2369   GarbageCollect(GetRoots,rtsFalse);
2370 }
2371
2372 void
2373 performMajorGC(void)
2374 {
2375   GarbageCollect(GetRoots,rtsTrue);
2376 }
2377
2378 static void
2379 AllRoots(evac_fn evac)
2380 {
2381     GetRoots(evac);             // the scheduler's roots
2382     extra_roots(evac);          // the user's roots
2383 }
2384
2385 void
2386 performGCWithRoots(void (*get_roots)(evac_fn))
2387 {
2388   extra_roots = get_roots;
2389   GarbageCollect(AllRoots,rtsFalse);
2390 }
2391
2392 /* -----------------------------------------------------------------------------
2393    Stack overflow
2394
2395    If the thread has reached its maximum stack size, then raise the
2396    StackOverflow exception in the offending thread.  Otherwise
2397    relocate the TSO into a larger chunk of memory and adjust its stack
2398    size appropriately.
2399    -------------------------------------------------------------------------- */
2400
2401 static StgTSO *
2402 threadStackOverflow(StgTSO *tso)
2403 {
2404   nat new_stack_size, new_tso_size, diff, stack_words;
2405   StgPtr new_sp;
2406   StgTSO *dest;
2407
2408   IF_DEBUG(sanity,checkTSO(tso));
2409   if (tso->stack_size >= tso->max_stack_size) {
2410
2411     IF_DEBUG(gc,
2412              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2413                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2414              /* If we're debugging, just print out the top of the stack */
2415              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2416                                               tso->sp+64)));
2417
2418     /* Send this thread the StackOverflow exception */
2419     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2420     return tso;
2421   }
2422
2423   /* Try to double the current stack size.  If that takes us over the
2424    * maximum stack size for this thread, then use the maximum instead.
2425    * Finally round up so the TSO ends up as a whole number of blocks.
2426    */
2427   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2428   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2429                                        TSO_STRUCT_SIZE)/sizeof(W_);
2430   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2431   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2432
2433   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2434
2435   dest = (StgTSO *)allocate(new_tso_size);
2436   TICK_ALLOC_TSO(new_stack_size,0);
2437
2438   /* copy the TSO block and the old stack into the new area */
2439   memcpy(dest,tso,TSO_STRUCT_SIZE);
2440   stack_words = tso->stack + tso->stack_size - tso->sp;
2441   new_sp = (P_)dest + new_tso_size - stack_words;
2442   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2443
2444   /* relocate the stack pointers... */
2445   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2446   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2447   dest->sp    = new_sp;
2448   dest->stack_size = new_stack_size;
2449         
2450   /* and relocate the update frame list */
2451   relocate_stack(dest, diff);
2452
2453   /* Mark the old TSO as relocated.  We have to check for relocated
2454    * TSOs in the garbage collector and any primops that deal with TSOs.
2455    *
2456    * It's important to set the sp and su values to just beyond the end
2457    * of the stack, so we don't attempt to scavenge any part of the
2458    * dead TSO's stack.
2459    */
2460   tso->what_next = ThreadRelocated;
2461   tso->link = dest;
2462   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2463   tso->su = (StgUpdateFrame *)tso->sp;
2464   tso->why_blocked = NotBlocked;
2465   dest->mut_link = NULL;
2466
2467   IF_PAR_DEBUG(verbose,
2468                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2469                      tso->id, tso, tso->stack_size);
2470                /* If we're debugging, just print out the top of the stack */
2471                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2472                                                 tso->sp+64)));
2473   
2474   IF_DEBUG(sanity,checkTSO(tso));
2475 #if 0
2476   IF_DEBUG(scheduler,printTSO(dest));
2477 #endif
2478
2479   return dest;
2480 }
2481
2482 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2483 //@subsection Blocking Queue Routines
2484
2485 /* ---------------------------------------------------------------------------
2486    Wake up a queue that was blocked on some resource.
2487    ------------------------------------------------------------------------ */
2488
2489 #if defined(GRAN)
2490 static inline void
2491 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2492 {
2493 }
2494 #elif defined(PAR)
2495 static inline void
2496 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2497 {
2498   /* write RESUME events to log file and
2499      update blocked and fetch time (depending on type of the orig closure) */
2500   if (RtsFlags.ParFlags.ParStats.Full) {
2501     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2502                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2503                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2504     if (EMPTY_RUN_QUEUE())
2505       emitSchedule = rtsTrue;
2506
2507     switch (get_itbl(node)->type) {
2508         case FETCH_ME_BQ:
2509           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2510           break;
2511         case RBH:
2512         case FETCH_ME:
2513         case BLACKHOLE_BQ:
2514           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2515           break;
2516 #ifdef DIST
2517         case MVAR:
2518           break;
2519 #endif    
2520         default:
2521           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2522         }
2523       }
2524 }
2525 #endif
2526
2527 #if defined(GRAN)
2528 static StgBlockingQueueElement *
2529 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2530 {
2531     StgTSO *tso;
2532     PEs node_loc, tso_loc;
2533
2534     node_loc = where_is(node); // should be lifted out of loop
2535     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2536     tso_loc = where_is((StgClosure *)tso);
2537     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2538       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2539       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2540       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2541       // insertThread(tso, node_loc);
2542       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2543                 ResumeThread,
2544                 tso, node, (rtsSpark*)NULL);
2545       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2546       // len_local++;
2547       // len++;
2548     } else { // TSO is remote (actually should be FMBQ)
2549       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2550                                   RtsFlags.GranFlags.Costs.gunblocktime +
2551                                   RtsFlags.GranFlags.Costs.latency;
2552       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2553                 UnblockThread,
2554                 tso, node, (rtsSpark*)NULL);
2555       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2556       // len++;
2557     }
2558     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2559     IF_GRAN_DEBUG(bq,
2560                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2561                           (node_loc==tso_loc ? "Local" : "Global"), 
2562                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2563     tso->block_info.closure = NULL;
2564     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2565                              tso->id, tso));
2566 }
2567 #elif defined(PAR)
2568 static StgBlockingQueueElement *
2569 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2570 {
2571     StgBlockingQueueElement *next;
2572
2573     switch (get_itbl(bqe)->type) {
2574     case TSO:
2575       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2576       /* if it's a TSO just push it onto the run_queue */
2577       next = bqe->link;
2578       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2579       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2580       THREAD_RUNNABLE();
2581       unblockCount(bqe, node);
2582       /* reset blocking status after dumping event */
2583       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2584       break;
2585
2586     case BLOCKED_FETCH:
2587       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2588       next = bqe->link;
2589       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2590       PendingFetches = (StgBlockedFetch *)bqe;
2591       break;
2592
2593 # if defined(DEBUG)
2594       /* can ignore this case in a non-debugging setup; 
2595          see comments on RBHSave closures above */
2596     case CONSTR:
2597       /* check that the closure is an RBHSave closure */
2598       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2599              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2600              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2601       break;
2602
2603     default:
2604       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2605            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2606            (StgClosure *)bqe);
2607 # endif
2608     }
2609   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2610   return next;
2611 }
2612
2613 #else /* !GRAN && !PAR */
2614 static StgTSO *
2615 unblockOneLocked(StgTSO *tso)
2616 {
2617   StgTSO *next;
2618
2619   ASSERT(get_itbl(tso)->type == TSO);
2620   ASSERT(tso->why_blocked != NotBlocked);
2621   tso->why_blocked = NotBlocked;
2622   next = tso->link;
2623   PUSH_ON_RUN_QUEUE(tso);
2624   THREAD_RUNNABLE();
2625   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2626   return next;
2627 }
2628 #endif
2629
2630 #if defined(GRAN) || defined(PAR)
2631 inline StgBlockingQueueElement *
2632 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2633 {
2634   ACQUIRE_LOCK(&sched_mutex);
2635   bqe = unblockOneLocked(bqe, node);
2636   RELEASE_LOCK(&sched_mutex);
2637   return bqe;
2638 }
2639 #else
2640 inline StgTSO *
2641 unblockOne(StgTSO *tso)
2642 {
2643   ACQUIRE_LOCK(&sched_mutex);
2644   tso = unblockOneLocked(tso);
2645   RELEASE_LOCK(&sched_mutex);
2646   return tso;
2647 }
2648 #endif
2649
2650 #if defined(GRAN)
2651 void 
2652 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2653 {
2654   StgBlockingQueueElement *bqe;
2655   PEs node_loc;
2656   nat len = 0; 
2657
2658   IF_GRAN_DEBUG(bq, 
2659                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2660                       node, CurrentProc, CurrentTime[CurrentProc], 
2661                       CurrentTSO->id, CurrentTSO));
2662
2663   node_loc = where_is(node);
2664
2665   ASSERT(q == END_BQ_QUEUE ||
2666          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2667          get_itbl(q)->type == CONSTR); // closure (type constructor)
2668   ASSERT(is_unique(node));
2669
2670   /* FAKE FETCH: magically copy the node to the tso's proc;
2671      no Fetch necessary because in reality the node should not have been 
2672      moved to the other PE in the first place
2673   */
2674   if (CurrentProc!=node_loc) {
2675     IF_GRAN_DEBUG(bq, 
2676                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2677                         node, node_loc, CurrentProc, CurrentTSO->id, 
2678                         // CurrentTSO, where_is(CurrentTSO),
2679                         node->header.gran.procs));
2680     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2681     IF_GRAN_DEBUG(bq, 
2682                   belch("## new bitmask of node %p is %#x",
2683                         node, node->header.gran.procs));
2684     if (RtsFlags.GranFlags.GranSimStats.Global) {
2685       globalGranStats.tot_fake_fetches++;
2686     }
2687   }
2688
2689   bqe = q;
2690   // ToDo: check: ASSERT(CurrentProc==node_loc);
2691   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2692     //next = bqe->link;
2693     /* 
2694        bqe points to the current element in the queue
2695        next points to the next element in the queue
2696     */
2697     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2698     //tso_loc = where_is(tso);
2699     len++;
2700     bqe = unblockOneLocked(bqe, node);
2701   }
2702
2703   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2704      the closure to make room for the anchor of the BQ */
2705   if (bqe!=END_BQ_QUEUE) {
2706     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2707     /*
2708     ASSERT((info_ptr==&RBH_Save_0_info) ||
2709            (info_ptr==&RBH_Save_1_info) ||
2710            (info_ptr==&RBH_Save_2_info));
2711     */
2712     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2713     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2714     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2715
2716     IF_GRAN_DEBUG(bq,
2717                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2718                         node, info_type(node)));
2719   }
2720
2721   /* statistics gathering */
2722   if (RtsFlags.GranFlags.GranSimStats.Global) {
2723     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2724     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2725     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2726     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2727   }
2728   IF_GRAN_DEBUG(bq,
2729                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2730                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2731 }
2732 #elif defined(PAR)
2733 void 
2734 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2735 {
2736   StgBlockingQueueElement *bqe;
2737
2738   ACQUIRE_LOCK(&sched_mutex);
2739
2740   IF_PAR_DEBUG(verbose, 
2741                belch("##-_ AwBQ for node %p on [%x]: ",
2742                      node, mytid));
2743 #ifdef DIST  
2744   //RFP
2745   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2746     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2747     return;
2748   }
2749 #endif
2750   
2751   ASSERT(q == END_BQ_QUEUE ||
2752          get_itbl(q)->type == TSO ||           
2753          get_itbl(q)->type == BLOCKED_FETCH || 
2754          get_itbl(q)->type == CONSTR); 
2755
2756   bqe = q;
2757   while (get_itbl(bqe)->type==TSO || 
2758          get_itbl(bqe)->type==BLOCKED_FETCH) {
2759     bqe = unblockOneLocked(bqe, node);
2760   }
2761   RELEASE_LOCK(&sched_mutex);
2762 }
2763
2764 #else   /* !GRAN && !PAR */
2765 void
2766 awakenBlockedQueue(StgTSO *tso)
2767 {
2768   ACQUIRE_LOCK(&sched_mutex);
2769   while (tso != END_TSO_QUEUE) {
2770     tso = unblockOneLocked(tso);
2771   }
2772   RELEASE_LOCK(&sched_mutex);
2773 }
2774 #endif
2775
2776 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2777 //@subsection Exception Handling Routines
2778
2779 /* ---------------------------------------------------------------------------
2780    Interrupt execution
2781    - usually called inside a signal handler so it mustn't do anything fancy.   
2782    ------------------------------------------------------------------------ */
2783
2784 void
2785 interruptStgRts(void)
2786 {
2787     interrupted    = 1;
2788     context_switch = 1;
2789 }
2790
2791 /* -----------------------------------------------------------------------------
2792    Unblock a thread
2793
2794    This is for use when we raise an exception in another thread, which
2795    may be blocked.
2796    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2797    -------------------------------------------------------------------------- */
2798
2799 #if defined(GRAN) || defined(PAR)
2800 /*
2801   NB: only the type of the blocking queue is different in GranSim and GUM
2802       the operations on the queue-elements are the same
2803       long live polymorphism!
2804 */
2805 static void
2806 unblockThread(StgTSO *tso)
2807 {
2808   StgBlockingQueueElement *t, **last;
2809
2810   ACQUIRE_LOCK(&sched_mutex);
2811   switch (tso->why_blocked) {
2812
2813   case NotBlocked:
2814     return;  /* not blocked */
2815
2816   case BlockedOnMVar:
2817     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2818     {
2819       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2820       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2821
2822       last = (StgBlockingQueueElement **)&mvar->head;
2823       for (t = (StgBlockingQueueElement *)mvar->head; 
2824            t != END_BQ_QUEUE; 
2825            last = &t->link, last_tso = t, t = t->link) {
2826         if (t == (StgBlockingQueueElement *)tso) {
2827           *last = (StgBlockingQueueElement *)tso->link;
2828           if (mvar->tail == tso) {
2829             mvar->tail = (StgTSO *)last_tso;
2830           }
2831           goto done;
2832         }
2833       }
2834       barf("unblockThread (MVAR): TSO not found");
2835     }
2836
2837   case BlockedOnBlackHole:
2838     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2839     {
2840       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2841
2842       last = &bq->blocking_queue;
2843       for (t = bq->blocking_queue; 
2844            t != END_BQ_QUEUE; 
2845            last = &t->link, t = t->link) {
2846         if (t == (StgBlockingQueueElement *)tso) {
2847           *last = (StgBlockingQueueElement *)tso->link;
2848           goto done;
2849         }
2850       }
2851       barf("unblockThread (BLACKHOLE): TSO not found");
2852     }
2853
2854   case BlockedOnException:
2855     {
2856       StgTSO *target  = tso->block_info.tso;
2857
2858       ASSERT(get_itbl(target)->type == TSO);
2859
2860       if (target->what_next == ThreadRelocated) {
2861           target = target->link;
2862           ASSERT(get_itbl(target)->type == TSO);
2863       }
2864
2865       ASSERT(target->blocked_exceptions != NULL);
2866
2867       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2868       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2869            t != END_BQ_QUEUE; 
2870            last = &t->link, t = t->link) {
2871         ASSERT(get_itbl(t)->type == TSO);
2872         if (t == (StgBlockingQueueElement *)tso) {
2873           *last = (StgBlockingQueueElement *)tso->link;
2874           goto done;
2875         }
2876       }
2877       barf("unblockThread (Exception): TSO not found");
2878     }
2879
2880   case BlockedOnRead:
2881   case BlockedOnWrite:
2882     {
2883       /* take TSO off blocked_queue */
2884       StgBlockingQueueElement *prev = NULL;
2885       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2886            prev = t, t = t->link) {
2887         if (t == (StgBlockingQueueElement *)tso) {
2888           if (prev == NULL) {
2889             blocked_queue_hd = (StgTSO *)t->link;
2890             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2891               blocked_queue_tl = END_TSO_QUEUE;
2892             }
2893           } else {
2894             prev->link = t->link;
2895             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2896               blocked_queue_tl = (StgTSO *)prev;
2897             }
2898           }
2899           goto done;
2900         }
2901       }
2902       barf("unblockThread (I/O): TSO not found");
2903     }
2904
2905   case BlockedOnDelay:
2906     {
2907       /* take TSO off sleeping_queue */
2908       StgBlockingQueueElement *prev = NULL;
2909       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2910            prev = t, t = t->link) {
2911         if (t == (StgBlockingQueueElement *)tso) {
2912           if (prev == NULL) {
2913             sleeping_queue = (StgTSO *)t->link;
2914           } else {
2915             prev->link = t->link;
2916           }
2917           goto done;
2918         }
2919       }
2920       barf("unblockThread (I/O): TSO not found");
2921     }
2922
2923   default:
2924     barf("unblockThread");
2925   }
2926
2927  done:
2928   tso->link = END_TSO_QUEUE;
2929   tso->why_blocked = NotBlocked;
2930   tso->block_info.closure = NULL;
2931   PUSH_ON_RUN_QUEUE(tso);
2932   RELEASE_LOCK(&sched_mutex);
2933 }
2934 #else
2935 static void
2936 unblockThread(StgTSO *tso)
2937 {
2938   StgTSO *t, **last;
2939
2940   ACQUIRE_LOCK(&sched_mutex);
2941   switch (tso->why_blocked) {
2942
2943   case NotBlocked:
2944     return;  /* not blocked */
2945
2946   case BlockedOnMVar:
2947     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2948     {
2949       StgTSO *last_tso = END_TSO_QUEUE;
2950       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2951
2952       last = &mvar->head;
2953       for (t = mvar->head; t != END_TSO_QUEUE; 
2954            last = &t->link, last_tso = t, t = t->link) {
2955         if (t == tso) {
2956           *last = tso->link;
2957           if (mvar->tail == tso) {
2958             mvar->tail = last_tso;
2959           }
2960           goto done;
2961         }
2962       }
2963       barf("unblockThread (MVAR): TSO not found");
2964     }
2965
2966   case BlockedOnBlackHole:
2967     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2968     {
2969       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2970
2971       last = &bq->blocking_queue;
2972       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2973            last = &t->link, t = t->link) {
2974         if (t == tso) {
2975           *last = tso->link;
2976           goto done;
2977         }
2978       }
2979       barf("unblockThread (BLACKHOLE): TSO not found");
2980     }
2981
2982   case BlockedOnException:
2983     {
2984       StgTSO *target  = tso->block_info.tso;
2985
2986       ASSERT(get_itbl(target)->type == TSO);
2987
2988       while (target->what_next == ThreadRelocated) {
2989           target = target->link;
2990           ASSERT(get_itbl(target)->type == TSO);
2991       }
2992       
2993       ASSERT(target->blocked_exceptions != NULL);
2994
2995       last = &target->blocked_exceptions;
2996       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2997            last = &t->link, t = t->link) {
2998         ASSERT(get_itbl(t)->type == TSO);
2999         if (t == tso) {
3000           *last = tso->link;
3001           goto done;
3002         }
3003       }
3004       barf("unblockThread (Exception): TSO not found");
3005     }
3006
3007   case BlockedOnRead:
3008   case BlockedOnWrite:
3009     {
3010       StgTSO *prev = NULL;
3011       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3012            prev = t, t = t->link) {
3013         if (t == tso) {
3014           if (prev == NULL) {
3015             blocked_queue_hd = t->link;
3016             if (blocked_queue_tl == t) {
3017               blocked_queue_tl = END_TSO_QUEUE;
3018             }
3019           } else {
3020             prev->link = t->link;
3021             if (blocked_queue_tl == t) {
3022               blocked_queue_tl = prev;
3023             }
3024           }
3025           goto done;
3026         }
3027       }
3028       barf("unblockThread (I/O): TSO not found");
3029     }
3030
3031   case BlockedOnDelay:
3032     {
3033       StgTSO *prev = NULL;
3034       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3035            prev = t, t = t->link) {
3036         if (t == tso) {
3037           if (prev == NULL) {
3038             sleeping_queue = t->link;
3039           } else {
3040             prev->link = t->link;
3041           }
3042           goto done;
3043         }
3044       }
3045       barf("unblockThread (I/O): TSO not found");
3046     }
3047
3048   default:
3049     barf("unblockThread");
3050   }
3051
3052  done:
3053   tso->link = END_TSO_QUEUE;
3054   tso->why_blocked = NotBlocked;
3055   tso->block_info.closure = NULL;
3056   PUSH_ON_RUN_QUEUE(tso);
3057   RELEASE_LOCK(&sched_mutex);
3058 }
3059 #endif
3060
3061 /* -----------------------------------------------------------------------------
3062  * raiseAsync()
3063  *
3064  * The following function implements the magic for raising an
3065  * asynchronous exception in an existing thread.
3066  *
3067  * We first remove the thread from any queue on which it might be
3068  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3069  *
3070  * We strip the stack down to the innermost CATCH_FRAME, building
3071  * thunks in the heap for all the active computations, so they can 
3072  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3073  * an application of the handler to the exception, and push it on
3074  * the top of the stack.
3075  * 
3076  * How exactly do we save all the active computations?  We create an
3077  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3078  * AP_UPDs pushes everything from the corresponding update frame
3079  * upwards onto the stack.  (Actually, it pushes everything up to the
3080  * next update frame plus a pointer to the next AP_UPD object.
3081  * Entering the next AP_UPD object pushes more onto the stack until we
3082  * reach the last AP_UPD object - at which point the stack should look
3083  * exactly as it did when we killed the TSO and we can continue
3084  * execution by entering the closure on top of the stack.
3085  *
3086  * We can also kill a thread entirely - this happens if either (a) the 
3087  * exception passed to raiseAsync is NULL, or (b) there's no
3088  * CATCH_FRAME on the stack.  In either case, we strip the entire
3089  * stack and replace the thread with a zombie.
3090  *
3091  * -------------------------------------------------------------------------- */
3092  
3093 void 
3094 deleteThread(StgTSO *tso)
3095 {
3096   raiseAsync(tso,NULL);
3097 }
3098
3099 void
3100 raiseAsync(StgTSO *tso, StgClosure *exception)
3101 {
3102   StgUpdateFrame* su = tso->su;
3103   StgPtr          sp = tso->sp;
3104   
3105   /* Thread already dead? */
3106   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3107     return;
3108   }
3109
3110   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3111
3112   /* Remove it from any blocking queues */
3113   unblockThread(tso);
3114
3115   /* The stack freezing code assumes there's a closure pointer on
3116    * the top of the stack.  This isn't always the case with compiled
3117    * code, so we have to push a dummy closure on the top which just
3118    * returns to the next return address on the stack.
3119    */
3120   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3121     *(--sp) = (W_)&stg_dummy_ret_closure;
3122   }
3123
3124   while (1) {
3125     nat words = ((P_)su - (P_)sp) - 1;
3126     nat i;
3127     StgAP_UPD * ap;
3128
3129     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3130      * then build PAP(handler,exception,realworld#), and leave it on
3131      * top of the stack ready to enter.
3132      */
3133     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3134       StgCatchFrame *cf = (StgCatchFrame *)su;
3135       /* we've got an exception to raise, so let's pass it to the
3136        * handler in this frame.
3137        */
3138       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3139       TICK_ALLOC_UPD_PAP(3,0);
3140       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3141               
3142       ap->n_args = 2;
3143       ap->fun = cf->handler;    /* :: Exception -> IO a */
3144       ap->payload[0] = exception;
3145       ap->payload[1] = ARG_TAG(0); /* realworld token */
3146
3147       /* throw away the stack from Sp up to and including the
3148        * CATCH_FRAME.
3149        */
3150       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3151       tso->su = cf->link;
3152
3153       /* Restore the blocked/unblocked state for asynchronous exceptions
3154        * at the CATCH_FRAME.  
3155        *
3156        * If exceptions were unblocked at the catch, arrange that they
3157        * are unblocked again after executing the handler by pushing an
3158        * unblockAsyncExceptions_ret stack frame.
3159        */
3160       if (!cf->exceptions_blocked) {
3161         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3162       }
3163       
3164       /* Ensure that async exceptions are blocked when running the handler.
3165        */
3166       if (tso->blocked_exceptions == NULL) {
3167         tso->blocked_exceptions = END_TSO_QUEUE;
3168       }
3169       
3170       /* Put the newly-built PAP on top of the stack, ready to execute
3171        * when the thread restarts.
3172        */
3173       sp[0] = (W_)ap;
3174       tso->sp = sp;
3175       tso->what_next = ThreadEnterGHC;
3176       IF_DEBUG(sanity, checkTSO(tso));
3177       return;
3178     }
3179
3180     /* First build an AP_UPD consisting of the stack chunk above the
3181      * current update frame, with the top word on the stack as the
3182      * fun field.
3183      */
3184     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3185     
3186     ASSERT(words >= 0);
3187     
3188     ap->n_args = words;
3189     ap->fun    = (StgClosure *)sp[0];
3190     sp++;
3191     for(i=0; i < (nat)words; ++i) {
3192       ap->payload[i] = (StgClosure *)*sp++;
3193     }
3194     
3195     switch (get_itbl(su)->type) {
3196       
3197     case UPDATE_FRAME:
3198       {
3199         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3200         TICK_ALLOC_UP_THK(words+1,0);
3201         
3202         IF_DEBUG(scheduler,
3203                  fprintf(stderr,  "scheduler: Updating ");
3204                  printPtr((P_)su->updatee); 
3205                  fprintf(stderr,  " with ");
3206                  printObj((StgClosure *)ap);
3207                  );
3208         
3209         /* Replace the updatee with an indirection - happily
3210          * this will also wake up any threads currently
3211          * waiting on the result.
3212          *
3213          * Warning: if we're in a loop, more than one update frame on
3214          * the stack may point to the same object.  Be careful not to
3215          * overwrite an IND_OLDGEN in this case, because we'll screw
3216          * up the mutable lists.  To be on the safe side, don't
3217          * overwrite any kind of indirection at all.  See also
3218          * threadSqueezeStack in GC.c, where we have to make a similar
3219          * check.
3220          */
3221         if (!closure_IND(su->updatee)) {
3222             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3223         }
3224         su = su->link;
3225         sp += sizeofW(StgUpdateFrame) -1;
3226         sp[0] = (W_)ap; /* push onto stack */
3227         break;
3228       }
3229
3230     case CATCH_FRAME:
3231       {
3232         StgCatchFrame *cf = (StgCatchFrame *)su;
3233         StgClosure* o;
3234         
3235         /* We want a PAP, not an AP_UPD.  Fortunately, the
3236          * layout's the same.
3237          */
3238         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3239         TICK_ALLOC_UPD_PAP(words+1,0);
3240         
3241         /* now build o = FUN(catch,ap,handler) */
3242         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3243         TICK_ALLOC_FUN(2,0);
3244         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3245         o->payload[0] = (StgClosure *)ap;
3246         o->payload[1] = cf->handler;
3247         
3248         IF_DEBUG(scheduler,
3249                  fprintf(stderr,  "scheduler: Built ");
3250                  printObj((StgClosure *)o);
3251                  );
3252         
3253         /* pop the old handler and put o on the stack */
3254         su = cf->link;
3255         sp += sizeofW(StgCatchFrame) - 1;
3256         sp[0] = (W_)o;
3257         break;
3258       }
3259       
3260     case SEQ_FRAME:
3261       {
3262         StgSeqFrame *sf = (StgSeqFrame *)su;
3263         StgClosure* o;
3264         
3265         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3266         TICK_ALLOC_UPD_PAP(words+1,0);
3267         
3268         /* now build o = FUN(seq,ap) */
3269         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3270         TICK_ALLOC_SE_THK(1,0);
3271         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3272         o->payload[0] = (StgClosure *)ap;
3273         
3274         IF_DEBUG(scheduler,
3275                  fprintf(stderr,  "scheduler: Built ");
3276                  printObj((StgClosure *)o);
3277                  );
3278         
3279         /* pop the old handler and put o on the stack */
3280         su = sf->link;
3281         sp += sizeofW(StgSeqFrame) - 1;
3282         sp[0] = (W_)o;
3283         break;
3284       }
3285       
3286     case STOP_FRAME:
3287       /* We've stripped the entire stack, the thread is now dead. */
3288       sp += sizeofW(StgStopFrame) - 1;
3289       sp[0] = (W_)exception;    /* save the exception */
3290       tso->what_next = ThreadKilled;
3291       tso->su = (StgUpdateFrame *)(sp+1);
3292       tso->sp = sp;
3293       return;
3294
3295     default:
3296       barf("raiseAsync");
3297     }
3298   }
3299   barf("raiseAsync");
3300 }
3301
3302 /* -----------------------------------------------------------------------------
3303    resurrectThreads is called after garbage collection on the list of
3304    threads found to be garbage.  Each of these threads will be woken
3305    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3306    on an MVar, or NonTermination if the thread was blocked on a Black
3307    Hole.
3308    -------------------------------------------------------------------------- */
3309
3310 void
3311 resurrectThreads( StgTSO *threads )
3312 {
3313   StgTSO *tso, *next;
3314
3315   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3316     next = tso->global_link;
3317     tso->global_link = all_threads;
3318     all_threads = tso;
3319     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3320
3321     switch (tso->why_blocked) {
3322     case BlockedOnMVar:
3323     case BlockedOnException:
3324       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3325       break;
3326     case BlockedOnBlackHole:
3327       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3328       break;
3329     case NotBlocked:
3330       /* This might happen if the thread was blocked on a black hole
3331        * belonging to a thread that we've just woken up (raiseAsync
3332        * can wake up threads, remember...).
3333        */
3334       continue;
3335     default:
3336       barf("resurrectThreads: thread blocked in a strange way");
3337     }
3338   }
3339 }
3340
3341 /* -----------------------------------------------------------------------------
3342  * Blackhole detection: if we reach a deadlock, test whether any
3343  * threads are blocked on themselves.  Any threads which are found to
3344  * be self-blocked get sent a NonTermination exception.
3345  *
3346  * This is only done in a deadlock situation in order to avoid
3347  * performance overhead in the normal case.
3348  * -------------------------------------------------------------------------- */
3349
3350 static void
3351 detectBlackHoles( void )
3352 {
3353     StgTSO *t = all_threads;
3354     StgUpdateFrame *frame;
3355     StgClosure *blocked_on;
3356
3357     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3358
3359         while (t->what_next == ThreadRelocated) {
3360             t = t->link;
3361             ASSERT(get_itbl(t)->type == TSO);
3362         }
3363       
3364         if (t->why_blocked != BlockedOnBlackHole) {
3365             continue;
3366         }
3367
3368         blocked_on = t->block_info.closure;
3369
3370         for (frame = t->su; ; frame = frame->link) {
3371             switch (get_itbl(frame)->type) {
3372
3373             case UPDATE_FRAME:
3374                 if (frame->updatee == blocked_on) {
3375                     /* We are blocking on one of our own computations, so
3376                      * send this thread the NonTermination exception.  
3377                      */
3378                     IF_DEBUG(scheduler, 
3379                              sched_belch("thread %d is blocked on itself", t->id));
3380                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3381                     goto done;
3382                 }
3383                 else {
3384                     continue;
3385                 }
3386
3387             case CATCH_FRAME:
3388             case SEQ_FRAME:
3389                 continue;
3390                 
3391             case STOP_FRAME:
3392                 break;
3393             }
3394             break;
3395         }
3396
3397     done: ;
3398     }   
3399 }
3400
3401 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3402 //@subsection Debugging Routines
3403
3404 /* -----------------------------------------------------------------------------
3405    Debugging: why is a thread blocked
3406    -------------------------------------------------------------------------- */
3407
3408 #ifdef DEBUG
3409
3410 void
3411 printThreadBlockage(StgTSO *tso)
3412 {
3413   switch (tso->why_blocked) {
3414   case BlockedOnRead:
3415     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3416     break;
3417   case BlockedOnWrite:
3418     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3419     break;
3420   case BlockedOnDelay:
3421     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3422     break;
3423   case BlockedOnMVar:
3424     fprintf(stderr,"is blocked on an MVar");
3425     break;
3426   case BlockedOnException:
3427     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3428             tso->block_info.tso->id);
3429     break;
3430   case BlockedOnBlackHole:
3431     fprintf(stderr,"is blocked on a black hole");
3432     break;
3433   case NotBlocked:
3434     fprintf(stderr,"is not blocked");
3435     break;
3436 #if defined(PAR)
3437   case BlockedOnGA:
3438     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3439             tso->block_info.closure, info_type(tso->block_info.closure));
3440     break;
3441   case BlockedOnGA_NoSend:
3442     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3443             tso->block_info.closure, info_type(tso->block_info.closure));
3444     break;
3445 #endif
3446 #if defined(RTS_SUPPORTS_THREADS)
3447   case BlockedOnCCall:
3448     fprintf(stderr,"is blocked on an external call");
3449     break;
3450 #endif
3451   default:
3452     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3453          tso->why_blocked, tso->id, tso);
3454   }
3455 }
3456
3457 void
3458 printThreadStatus(StgTSO *tso)
3459 {
3460   switch (tso->what_next) {
3461   case ThreadKilled:
3462     fprintf(stderr,"has been killed");
3463     break;
3464   case ThreadComplete:
3465     fprintf(stderr,"has completed");
3466     break;
3467   default:
3468     printThreadBlockage(tso);
3469   }
3470 }
3471
3472 void
3473 printAllThreads(void)
3474 {
3475   StgTSO *t;
3476
3477 # if defined(GRAN)
3478   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3479   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3480                        time_string, rtsFalse/*no commas!*/);
3481
3482   sched_belch("all threads at [%s]:", time_string);
3483 # elif defined(PAR)
3484   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3485   ullong_format_string(CURRENT_TIME,
3486                        time_string, rtsFalse/*no commas!*/);
3487
3488   sched_belch("all threads at [%s]:", time_string);
3489 # else
3490   sched_belch("all threads:");
3491 # endif
3492
3493   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3494     fprintf(stderr, "\tthread %d ", t->id);
3495     printThreadStatus(t);
3496     fprintf(stderr,"\n");
3497   }
3498 }
3499     
3500 /* 
3501    Print a whole blocking queue attached to node (debugging only).
3502 */
3503 //@cindex print_bq
3504 # if defined(PAR)
3505 void 
3506 print_bq (StgClosure *node)
3507 {
3508   StgBlockingQueueElement *bqe;
3509   StgTSO *tso;
3510   rtsBool end;
3511
3512   fprintf(stderr,"## BQ of closure %p (%s): ",
3513           node, info_type(node));
3514
3515   /* should cover all closures that may have a blocking queue */
3516   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3517          get_itbl(node)->type == FETCH_ME_BQ ||
3518          get_itbl(node)->type == RBH ||
3519          get_itbl(node)->type == MVAR);
3520     
3521   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3522
3523   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3524 }
3525
3526 /* 
3527    Print a whole blocking queue starting with the element bqe.
3528 */
3529 void 
3530 print_bqe (StgBlockingQueueElement *bqe)
3531 {
3532   rtsBool end;
3533
3534   /* 
3535      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3536   */
3537   for (end = (bqe==END_BQ_QUEUE);
3538        !end; // iterate until bqe points to a CONSTR
3539        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3540        bqe = end ? END_BQ_QUEUE : bqe->link) {
3541     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3542     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3543     /* types of closures that may appear in a blocking queue */
3544     ASSERT(get_itbl(bqe)->type == TSO ||           
3545            get_itbl(bqe)->type == BLOCKED_FETCH || 
3546            get_itbl(bqe)->type == CONSTR); 
3547     /* only BQs of an RBH end with an RBH_Save closure */
3548     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3549
3550     switch (get_itbl(bqe)->type) {
3551     case TSO:
3552       fprintf(stderr," TSO %u (%x),",
3553               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3554       break;
3555     case BLOCKED_FETCH:
3556       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3557               ((StgBlockedFetch *)bqe)->node, 
3558               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3559               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3560               ((StgBlockedFetch *)bqe)->ga.weight);
3561       break;
3562     case CONSTR:
3563       fprintf(stderr," %s (IP %p),",
3564               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3565                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3566                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3567                "RBH_Save_?"), get_itbl(bqe));
3568       break;
3569     default:
3570       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3571            info_type((StgClosure *)bqe)); // , node, info_type(node));
3572       break;
3573     }
3574   } /* for */
3575   fputc('\n', stderr);
3576 }
3577 # elif defined(GRAN)
3578 void 
3579 print_bq (StgClosure *node)
3580 {
3581   StgBlockingQueueElement *bqe;
3582   PEs node_loc, tso_loc;
3583   rtsBool end;
3584
3585   /* should cover all closures that may have a blocking queue */
3586   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3587          get_itbl(node)->type == FETCH_ME_BQ ||
3588          get_itbl(node)->type == RBH);
3589     
3590   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3591   node_loc = where_is(node);
3592
3593   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3594           node, info_type(node), node_loc);
3595
3596   /* 
3597      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3598   */
3599   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3600        !end; // iterate until bqe points to a CONSTR
3601        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3602     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3603     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3604     /* types of closures that may appear in a blocking queue */
3605     ASSERT(get_itbl(bqe)->type == TSO ||           
3606            get_itbl(bqe)->type == CONSTR); 
3607     /* only BQs of an RBH end with an RBH_Save closure */
3608     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3609
3610     tso_loc = where_is((StgClosure *)bqe);
3611     switch (get_itbl(bqe)->type) {
3612     case TSO:
3613       fprintf(stderr," TSO %d (%p) on [PE %d],",
3614               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3615       break;
3616     case CONSTR:
3617       fprintf(stderr," %s (IP %p),",
3618               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3619                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3620                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3621                "RBH_Save_?"), get_itbl(bqe));
3622       break;
3623     default:
3624       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3625            info_type((StgClosure *)bqe), node, info_type(node));
3626       break;
3627     }
3628   } /* for */
3629   fputc('\n', stderr);
3630 }
3631 #else
3632 /* 
3633    Nice and easy: only TSOs on the blocking queue
3634 */
3635 void 
3636 print_bq (StgClosure *node)
3637 {
3638   StgTSO *tso;
3639
3640   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3641   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3642        tso != END_TSO_QUEUE; 
3643        tso=tso->link) {
3644     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3645     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3646     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3647   }
3648   fputc('\n', stderr);
3649 }
3650 # endif
3651
3652 #if defined(PAR)
3653 static nat
3654 run_queue_len(void)
3655 {
3656   nat i;
3657   StgTSO *tso;
3658
3659   for (i=0, tso=run_queue_hd; 
3660        tso != END_TSO_QUEUE;
3661        i++, tso=tso->link)
3662     /* nothing */
3663
3664   return i;
3665 }
3666 #endif
3667
3668 static void
3669 sched_belch(char *s, ...)
3670 {
3671   va_list ap;
3672   va_start(ap,s);
3673 #ifdef SMP
3674   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3675 #elif defined(PAR)
3676   fprintf(stderr, "== ");
3677 #else
3678   fprintf(stderr, "scheduler: ");
3679 #endif
3680   vfprintf(stderr, s, ap);
3681   fprintf(stderr, "\n");
3682 }
3683
3684 #endif /* DEBUG */
3685
3686
3687 //@node Index,  , Debugging Routines, Main scheduling code
3688 //@subsection Index
3689
3690 //@index
3691 //* MainRegTable::  @cindex\s-+MainRegTable
3692 //* StgMainThread::  @cindex\s-+StgMainThread
3693 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3694 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3695 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3696 //* context_switch::  @cindex\s-+context_switch
3697 //* createThread::  @cindex\s-+createThread
3698 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3699 //* initScheduler::  @cindex\s-+initScheduler
3700 //* interrupted::  @cindex\s-+interrupted
3701 //* next_thread_id::  @cindex\s-+next_thread_id
3702 //* print_bq::  @cindex\s-+print_bq
3703 //* run_queue_hd::  @cindex\s-+run_queue_hd
3704 //* run_queue_tl::  @cindex\s-+run_queue_tl
3705 //* sched_mutex::  @cindex\s-+sched_mutex
3706 //* schedule::  @cindex\s-+schedule
3707 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3708 //* term_mutex::  @cindex\s-+term_mutex
3709 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3710 //@end index