bce5b2d6dcac9440d3b42e5dc196749541d60c26
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.115 2002/02/04 20:40:36 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #include <stdarg.h>
118
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
121
122 /* Main threads:
123  *
124  * These are the threads which clients have requested that we run.  
125  *
126  * In a 'threaded' build, we might have several concurrent clients all
127  * waiting for results, and each one will wait on a condition variable
128  * until the result is available.
129  *
130  * In non-SMP, clients are strictly nested: the first client calls
131  * into the RTS, which might call out again to C with a _ccall_GC, and
132  * eventually re-enter the RTS.
133  *
134  * Main threads information is kept in a linked list:
135  */
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
138   StgTSO *         tso;
139   SchedulerStatus  stat;
140   StgClosure **    ret;
141 #if defined(RTS_SUPPORTS_THREADS)
142   Condition        wakeup;
143 #endif
144   struct StgMainThread_ *link;
145 } StgMainThread;
146
147 /* Main thread queue.
148  * Locks required: sched_mutex.
149  */
150 static StgMainThread *main_threads;
151
152 /* Thread queues.
153  * Locks required: sched_mutex.
154  */
155 #if defined(GRAN)
156
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
159
160 /* 
161    In GranSim we have a runable and a blocked queue for each processor.
162    In order to minimise code changes new arrays run_queue_hds/tls
163    are created. run_queue_hd is then a short cut (macro) for
164    run_queue_hds[CurrentProc] (see GranSim.h).
165    -- HWL
166 */
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171    the std RTS (i.e. we are cheating). However, we don't use this list in
172    the GranSim specific code at the moment (so we are only potentially
173    cheating).  */
174
175 #else /* !GRAN */
176
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
180
181 #endif
182
183 /* Linked list of all threads.
184  * Used for detecting garbage collected threads.
185  */
186 StgTSO *all_threads;
187
188 /* Threads suspended in _ccall_GC.
189  */
190 static StgTSO *suspended_ccalling_threads;
191
192 static StgTSO *threadStackOverflow(StgTSO *tso);
193
194 /* KH: The following two flags are shared memory locations.  There is no need
195        to lock them, since they are only unset at the end of a scheduler
196        operation.
197 */
198
199 /* flag set by signal handler to precipitate a context switch */
200 //@cindex context_switch
201 nat context_switch;
202
203 /* if this flag is set as well, give up execution */
204 //@cindex interrupted
205 rtsBool interrupted;
206
207 /* Next thread ID to allocate.
208  * Locks required: sched_mutex
209  */
210 //@cindex next_thread_id
211 StgThreadID next_thread_id = 1;
212
213 /*
214  * Pointers to the state of the current thread.
215  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
216  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
217  */
218  
219 /* The smallest stack size that makes any sense is:
220  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
221  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
222  *  + 1                       (the realworld token for an IO thread)
223  *  + 1                       (the closure to enter)
224  *
225  * A thread with this stack will bomb immediately with a stack
226  * overflow, which will increase its stack size.  
227  */
228
229 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
230
231
232 #if defined(GRAN)
233 StgTSO *CurrentTSO;
234 #endif
235
236 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
237  *  exists - earlier gccs apparently didn't.
238  *  -= chak
239  */
240 StgTSO dummy_tso;
241
242 rtsBool ready_to_gc;
243
244 void            addToBlockedQueue ( StgTSO *tso );
245
246 static void     schedule          ( void );
247        void     interruptStgRts   ( void );
248 #if defined(GRAN)
249 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
250 #else
251 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
252 #endif
253
254 static void     detectBlackHoles  ( void );
255
256 #ifdef DEBUG
257 static void sched_belch(char *s, ...);
258 #endif
259
260 #if defined(RTS_SUPPORTS_THREADS)
261 /* ToDo: carefully document the invariants that go together
262  *       with these synchronisation objects.
263  */
264 Mutex     sched_mutex       = INIT_MUTEX_VAR;
265 Mutex     term_mutex        = INIT_MUTEX_VAR;
266 #if defined(THREADED_RTS)
267 /*
268  * The rts_mutex is the 'big lock' that the active native
269  * thread within the RTS holds while executing code
270  * within the RTS. It is given up when the thread makes a
271  * transition out of the RTS (e.g., to perform an external
272  * C call), hopefully for another thread to enter the RTS.
273  *
274  */
275 Mutex     rts_mutex         = INIT_MUTEX_VAR;
276 /*
277  * When a native thread has completed executing an external
278  * call, it needs to communicate the result back to the
279  * (Haskell) thread that made the call. Do this as follows:
280  *
281  *  - in resumeThread(), the thread increments the counter
282  *    ext_threads_waiting, and then blocks on the
283  *    'big' RTS lock.
284  *  - upon entry to the scheduler, the thread that's currently
285  *    holding the RTS lock checks ext_threads_waiting. If there
286  *    are native threads waiting, it gives up its RTS lock
287  *    and tries to re-grab the RTS lock [perhaps after having
288  *    waited for a bit..?]
289  *  - care must be taken to deal with the case where more than
290  *    one external thread are waiting on the lock. [ToDo: more]
291  *    
292  */
293
294 static nat ext_threads_waiting = 0;
295 /*
296  * thread_ready_aux_mutex is used to handle the scenario where the
297  * the RTS executing thread runs out of work, but there are
298  * active external threads. The RTS executing thread gives up
299  * its RTS mutex, and blocks waiting for the thread_ready_cond.
300  * Unfortunately, a condition variable needs to be associated
301  * with a mutex in pthreads, so rts_thread_waiting_mutex is
302  * used for just this purpose.
303  * 
304  */
305 Mutex  thread_ready_aux_mutex = INIT_MUTEX_VAR;
306 #endif
307
308
309 /* thread_ready_cond: when signalled, a thread has
310  * become runnable. When used?
311  */
312 Condition thread_ready_cond = INIT_COND_VAR;
313 Condition gc_pending_cond   = INIT_COND_VAR;
314
315 nat await_death;
316 #endif
317
318 #if defined(PAR)
319 StgTSO *LastTSO;
320 rtsTime TimeOfLastYield;
321 rtsBool emitSchedule = rtsTrue;
322 #endif
323
324 #if DEBUG
325 char *whatNext_strs[] = {
326   "ThreadEnterGHC",
327   "ThreadRunGHC",
328   "ThreadEnterInterp",
329   "ThreadKilled",
330   "ThreadComplete"
331 };
332
333 char *threadReturnCode_strs[] = {
334   "HeapOverflow",                       /* might also be StackOverflow */
335   "StackOverflow",
336   "ThreadYielding",
337   "ThreadBlocked",
338   "ThreadFinished"
339 };
340 #endif
341
342 #if defined(PAR)
343 StgTSO * createSparkThread(rtsSpark spark);
344 StgTSO * activateSpark (rtsSpark spark);  
345 #endif
346
347 /*
348  * The thread state for the main thread.
349 // ToDo: check whether not needed any more
350 StgTSO   *MainTSO;
351  */
352
353 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
354 static void taskStart(void);
355 static void
356 taskStart(void)
357 {
358   /* threads start up using 'taskStart', so make them
359      them grab the RTS lock. */
360 #if defined(THREADED_RTS)
361   ACQUIRE_LOCK(&rts_mutex);
362 #endif
363   schedule();
364 }
365 #endif
366
367
368
369
370 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
371 //@subsection Main scheduling loop
372
373 /* ---------------------------------------------------------------------------
374    Main scheduling loop.
375
376    We use round-robin scheduling, each thread returning to the
377    scheduler loop when one of these conditions is detected:
378
379       * out of heap space
380       * timer expires (thread yields)
381       * thread blocks
382       * thread ends
383       * stack overflow
384
385    Locking notes:  we acquire the scheduler lock once at the beginning
386    of the scheduler loop, and release it when
387     
388       * running a thread, or
389       * waiting for work, or
390       * waiting for a GC to complete.
391
392    GRAN version:
393      In a GranSim setup this loop iterates over the global event queue.
394      This revolves around the global event queue, which determines what 
395      to do next. Therefore, it's more complicated than either the 
396      concurrent or the parallel (GUM) setup.
397
398    GUM version:
399      GUM iterates over incoming messages.
400      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
401      and sends out a fish whenever it has nothing to do; in-between
402      doing the actual reductions (shared code below) it processes the
403      incoming messages and deals with delayed operations 
404      (see PendingFetches).
405      This is not the ugliest code you could imagine, but it's bloody close.
406
407    ------------------------------------------------------------------------ */
408 //@cindex schedule
409 static void
410 schedule( void )
411 {
412   StgTSO *t;
413   Capability *cap;
414   StgThreadReturnCode ret;
415 #if defined(GRAN)
416   rtsEvent *event;
417 #elif defined(PAR)
418   StgSparkPool *pool;
419   rtsSpark spark;
420   StgTSO *tso;
421   GlobalTaskId pe;
422   rtsBool receivedFinish = rtsFalse;
423 # if defined(DEBUG)
424   nat tp_size, sp_size; // stats only
425 # endif
426 #endif
427   rtsBool was_interrupted = rtsFalse;
428   
429   ACQUIRE_LOCK(&sched_mutex);
430   
431 #if defined(THREADED_RTS)
432   /* ToDo: consider SMP support */
433   if (ext_threads_waiting > 0) {
434     /* (At least) one external thread is waiting to
435      * to deposit the result of an external call.
436      * Give way to one of them by giving up the RTS
437      * lock.
438      */
439     RELEASE_LOCK(&sched_mutex);
440     RELEASE_LOCK(&rts_mutex);
441     /* ToDo: come up with mechanism that guarantees that
442      * the main thread doesn't loop here.
443      */
444     yieldThread();
445     /* ToDo: longjmp() */
446     taskStart();
447   }
448 #endif
449
450 #if defined(GRAN)
451
452   /* set up first event to get things going */
453   /* ToDo: assign costs for system setup and init MainTSO ! */
454   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
455             ContinueThread, 
456             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
457
458   IF_DEBUG(gran,
459            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
460            G_TSO(CurrentTSO, 5));
461
462   if (RtsFlags.GranFlags.Light) {
463     /* Save current time; GranSim Light only */
464     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
465   }      
466
467   event = get_next_event();
468
469   while (event!=(rtsEvent*)NULL) {
470     /* Choose the processor with the next event */
471     CurrentProc = event->proc;
472     CurrentTSO = event->tso;
473
474 #elif defined(PAR)
475
476   while (!receivedFinish) {    /* set by processMessages */
477                                /* when receiving PP_FINISH message         */ 
478 #else
479
480   while (1) {
481
482 #endif
483
484     IF_DEBUG(scheduler, printAllThreads());
485
486     /* If we're interrupted (the user pressed ^C, or some other
487      * termination condition occurred), kill all the currently running
488      * threads.
489      */
490     if (interrupted) {
491       IF_DEBUG(scheduler, sched_belch("interrupted"));
492       deleteAllThreads();
493       interrupted = rtsFalse;
494       was_interrupted = rtsTrue;
495     }
496
497     /* Go through the list of main threads and wake up any
498      * clients whose computations have finished.  ToDo: this
499      * should be done more efficiently without a linear scan
500      * of the main threads list, somehow...
501      */
502 #if defined(RTS_SUPPORTS_THREADS)
503     { 
504       StgMainThread *m, **prev;
505       prev = &main_threads;
506       for (m = main_threads; m != NULL; m = m->link) {
507         switch (m->tso->what_next) {
508         case ThreadComplete:
509           if (m->ret) {
510             *(m->ret) = (StgClosure *)m->tso->sp[0];
511           }
512           *prev = m->link;
513           m->stat = Success;
514           broadcastCondition(&m->wakeup);
515           break;
516         case ThreadKilled:
517           if (m->ret) *(m->ret) = NULL;
518           *prev = m->link;
519           if (was_interrupted) {
520             m->stat = Interrupted;
521           } else {
522             m->stat = Killed;
523           }
524           broadcastCondition(&m->wakeup);
525           break;
526         default:
527           break;
528         }
529       }
530     }
531
532 #else /* not threaded */
533
534 # if defined(PAR)
535     /* in GUM do this only on the Main PE */
536     if (IAmMainThread)
537 # endif
538     /* If our main thread has finished or been killed, return.
539      */
540     {
541       StgMainThread *m = main_threads;
542       if (m->tso->what_next == ThreadComplete
543           || m->tso->what_next == ThreadKilled) {
544         main_threads = main_threads->link;
545         if (m->tso->what_next == ThreadComplete) {
546           /* we finished successfully, fill in the return value */
547           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
548           m->stat = Success;
549           return;
550         } else {
551           if (m->ret) { *(m->ret) = NULL; };
552           if (was_interrupted) {
553             m->stat = Interrupted;
554           } else {
555             m->stat = Killed;
556           }
557           return;
558         }
559       }
560     }
561 #endif
562
563     /* Top up the run queue from our spark pool.  We try to make the
564      * number of threads in the run queue equal to the number of
565      * free capabilities.
566      *
567      * Disable spark support in SMP for now, non-essential & requires
568      * a little bit of work to make it compile cleanly. -- sof 1/02.
569      */
570 #if 0 /* defined(SMP) */
571     {
572       nat n = getFreeCapabilities();
573       StgTSO *tso = run_queue_hd;
574
575       /* Count the run queue */
576       while (n > 0 && tso != END_TSO_QUEUE) {
577         tso = tso->link;
578         n--;
579       }
580
581       for (; n > 0; n--) {
582         StgClosure *spark;
583         spark = findSpark(rtsFalse);
584         if (spark == NULL) {
585           break; /* no more sparks in the pool */
586         } else {
587           /* I'd prefer this to be done in activateSpark -- HWL */
588           /* tricky - it needs to hold the scheduler lock and
589            * not try to re-acquire it -- SDM */
590           createSparkThread(spark);       
591           IF_DEBUG(scheduler,
592                    sched_belch("==^^ turning spark of closure %p into a thread",
593                                (StgClosure *)spark));
594         }
595       }
596       /* We need to wake up the other tasks if we just created some
597        * work for them.
598        */
599       if (getFreeCapabilities() - n > 1) {
600           signalCondition( &thread_ready_cond );
601       }
602     }
603 #endif // SMP
604
605     /* check for signals each time around the scheduler */
606 #ifndef mingw32_TARGET_OS
607     if (signals_pending()) {
608       startSignalHandlers();
609     }
610 #endif
611
612     /* Check whether any waiting threads need to be woken up.  If the
613      * run queue is empty, and there are no other tasks running, we
614      * can wait indefinitely for something to happen.
615      * ToDo: what if another client comes along & requests another
616      * main thread?
617      */
618     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
619       awaitEvent(
620            (run_queue_hd == END_TSO_QUEUE)
621 #if defined(SMP)
622         && allFreeCapabilities()
623 #endif
624         );
625     }
626     /* we can be interrupted while waiting for I/O... */
627     if (interrupted) continue;
628
629     /* 
630      * Detect deadlock: when we have no threads to run, there are no
631      * threads waiting on I/O or sleeping, and all the other tasks are
632      * waiting for work, we must have a deadlock of some description.
633      *
634      * We first try to find threads blocked on themselves (ie. black
635      * holes), and generate NonTermination exceptions where necessary.
636      *
637      * If no threads are black holed, we have a deadlock situation, so
638      * inform all the main threads.
639      */
640 #ifndef PAR
641     if (blocked_queue_hd == END_TSO_QUEUE
642         && run_queue_hd == END_TSO_QUEUE
643         && sleeping_queue == END_TSO_QUEUE
644 #if defined(SMP)
645         && allFreeCapabilities()
646 #elif defined(THREADED_RTS)
647         && suspended_ccalling_threads == END_TSO_QUEUE
648 #endif
649         )
650     {
651         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
652         RELEASE_LOCK(&sched_mutex);
653         GarbageCollect(GetRoots,rtsTrue);
654         ACQUIRE_LOCK(&sched_mutex);
655         IF_DEBUG(scheduler, sched_belch("GC done."));
656         if (blocked_queue_hd == END_TSO_QUEUE
657             && run_queue_hd == END_TSO_QUEUE
658             && sleeping_queue == END_TSO_QUEUE) {
659
660             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
661             detectBlackHoles();
662
663             /* No black holes, so probably a real deadlock.  Send the
664              * current main thread the Deadlock exception (or in the SMP
665              * build, send *all* main threads the deadlock exception,
666              * since none of them can make progress).
667              */
668             if (run_queue_hd == END_TSO_QUEUE) {
669                 StgMainThread *m;
670 #if defined(RTS_SUPPORTS_THREADS)
671                 for (m = main_threads; m != NULL; m = m->link) {
672                     switch (m->tso->why_blocked) {
673                     case BlockedOnBlackHole:
674                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
675                         break;
676                     case BlockedOnException:
677                     case BlockedOnMVar:
678                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
679                         break;
680                     default:
681                         barf("deadlock: main thread blocked in a strange way");
682                     }
683                 }
684 #else
685                 m = main_threads;
686                 switch (m->tso->why_blocked) {
687                 case BlockedOnBlackHole:
688                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
689                     break;
690                 case BlockedOnException:
691                 case BlockedOnMVar:
692                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
693                     break;
694                 default:
695                     barf("deadlock: main thread blocked in a strange way");
696                 }
697 #endif
698             }
699 #if defined(RTS_SUPPORTS_THREADS)
700             if ( run_queue_hd == END_TSO_QUEUE ) {
701               IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down."));
702               shutdownHaskellAndExit(0);
703             
704             }
705 #endif
706             ASSERT( run_queue_hd != END_TSO_QUEUE );
707         }
708     }
709 #elif defined(PAR)
710     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
711 #endif
712
713 #if defined(SMP)
714     /* If there's a GC pending, don't do anything until it has
715      * completed.
716      */
717     if (ready_to_gc) {
718       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
719       waitCondition( &gc_pending_cond, &sched_mutex );
720     }
721 #endif    
722
723 #if defined(SMP)
724     /* block until we've got a thread on the run queue and a free
725      * capability.
726      */
727     while ( run_queue_hd == END_TSO_QUEUE 
728             || noFreeCapabilities() 
729             ) {
730       IF_DEBUG(scheduler, sched_belch("waiting for work"));
731       waitCondition( &thread_ready_cond, &sched_mutex );
732       IF_DEBUG(scheduler, sched_belch("work now available"));
733     }
734 #elif defined(THREADED_RTS)
735    if ( run_queue_hd == END_TSO_QUEUE ) {
736      /* no work available, wait for external calls to complete. */
737      IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId()));
738      RELEASE_LOCK(&sched_mutex);
739      RELEASE_LOCK(&rts_mutex);
740      /* Sigh - need to have a mutex locked in order to wait on the
741         condition variable. */
742      ACQUIRE_LOCK(&thread_ready_aux_mutex);
743      waitCondition(&thread_ready_cond, &thread_ready_aux_mutex);
744      RELEASE_LOCK(&thread_ready_aux_mutex);
745      IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId()));
746      /* ToDo: longjmp() */
747      taskStart();
748    
749    }
750 #endif
751
752 #if defined(GRAN)
753
754     if (RtsFlags.GranFlags.Light)
755       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
756
757     /* adjust time based on time-stamp */
758     if (event->time > CurrentTime[CurrentProc] &&
759         event->evttype != ContinueThread)
760       CurrentTime[CurrentProc] = event->time;
761     
762     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
763     if (!RtsFlags.GranFlags.Light)
764       handleIdlePEs();
765
766     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
767
768     /* main event dispatcher in GranSim */
769     switch (event->evttype) {
770       /* Should just be continuing execution */
771     case ContinueThread:
772       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
773       /* ToDo: check assertion
774       ASSERT(run_queue_hd != (StgTSO*)NULL &&
775              run_queue_hd != END_TSO_QUEUE);
776       */
777       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
778       if (!RtsFlags.GranFlags.DoAsyncFetch &&
779           procStatus[CurrentProc]==Fetching) {
780         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
781               CurrentTSO->id, CurrentTSO, CurrentProc);
782         goto next_thread;
783       } 
784       /* Ignore ContinueThreads for completed threads */
785       if (CurrentTSO->what_next == ThreadComplete) {
786         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
787               CurrentTSO->id, CurrentTSO, CurrentProc);
788         goto next_thread;
789       } 
790       /* Ignore ContinueThreads for threads that are being migrated */
791       if (PROCS(CurrentTSO)==Nowhere) { 
792         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
793               CurrentTSO->id, CurrentTSO, CurrentProc);
794         goto next_thread;
795       }
796       /* The thread should be at the beginning of the run queue */
797       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
798         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
799               CurrentTSO->id, CurrentTSO, CurrentProc);
800         break; // run the thread anyway
801       }
802       /*
803       new_event(proc, proc, CurrentTime[proc],
804                 FindWork,
805                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
806       goto next_thread; 
807       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
808       break; // now actually run the thread; DaH Qu'vam yImuHbej 
809
810     case FetchNode:
811       do_the_fetchnode(event);
812       goto next_thread;             /* handle next event in event queue  */
813       
814     case GlobalBlock:
815       do_the_globalblock(event);
816       goto next_thread;             /* handle next event in event queue  */
817       
818     case FetchReply:
819       do_the_fetchreply(event);
820       goto next_thread;             /* handle next event in event queue  */
821       
822     case UnblockThread:   /* Move from the blocked queue to the tail of */
823       do_the_unblock(event);
824       goto next_thread;             /* handle next event in event queue  */
825       
826     case ResumeThread:  /* Move from the blocked queue to the tail of */
827       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
828       event->tso->gran.blocktime += 
829         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
830       do_the_startthread(event);
831       goto next_thread;             /* handle next event in event queue  */
832       
833     case StartThread:
834       do_the_startthread(event);
835       goto next_thread;             /* handle next event in event queue  */
836       
837     case MoveThread:
838       do_the_movethread(event);
839       goto next_thread;             /* handle next event in event queue  */
840       
841     case MoveSpark:
842       do_the_movespark(event);
843       goto next_thread;             /* handle next event in event queue  */
844       
845     case FindWork:
846       do_the_findwork(event);
847       goto next_thread;             /* handle next event in event queue  */
848       
849     default:
850       barf("Illegal event type %u\n", event->evttype);
851     }  /* switch */
852     
853     /* This point was scheduler_loop in the old RTS */
854
855     IF_DEBUG(gran, belch("GRAN: after main switch"));
856
857     TimeOfLastEvent = CurrentTime[CurrentProc];
858     TimeOfNextEvent = get_time_of_next_event();
859     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
860     // CurrentTSO = ThreadQueueHd;
861
862     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
863                          TimeOfNextEvent));
864
865     if (RtsFlags.GranFlags.Light) 
866       GranSimLight_leave_system(event, &ActiveTSO); 
867
868     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
869
870     IF_DEBUG(gran, 
871              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
872
873     /* in a GranSim setup the TSO stays on the run queue */
874     t = CurrentTSO;
875     /* Take a thread from the run queue. */
876     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
877
878     IF_DEBUG(gran, 
879              fprintf(stderr, "GRAN: About to run current thread, which is\n");
880              G_TSO(t,5));
881
882     context_switch = 0; // turned on via GranYield, checking events and time slice
883
884     IF_DEBUG(gran, 
885              DumpGranEvent(GR_SCHEDULE, t));
886
887     procStatus[CurrentProc] = Busy;
888
889 #elif defined(PAR)
890     if (PendingFetches != END_BF_QUEUE) {
891         processFetches();
892     }
893
894     /* ToDo: phps merge with spark activation above */
895     /* check whether we have local work and send requests if we have none */
896     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
897       /* :-[  no local threads => look out for local sparks */
898       /* the spark pool for the current PE */
899       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
900       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
901           pool->hd < pool->tl) {
902         /* 
903          * ToDo: add GC code check that we really have enough heap afterwards!!
904          * Old comment:
905          * If we're here (no runnable threads) and we have pending
906          * sparks, we must have a space problem.  Get enough space
907          * to turn one of those pending sparks into a
908          * thread... 
909          */
910
911         spark = findSpark(rtsFalse);                /* get a spark */
912         if (spark != (rtsSpark) NULL) {
913           tso = activateSpark(spark);       /* turn the spark into a thread */
914           IF_PAR_DEBUG(schedule,
915                        belch("==== schedule: Created TSO %d (%p); %d threads active",
916                              tso->id, tso, advisory_thread_count));
917
918           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
919             belch("==^^ failed to activate spark");
920             goto next_thread;
921           }               /* otherwise fall through & pick-up new tso */
922         } else {
923           IF_PAR_DEBUG(verbose,
924                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
925                              spark_queue_len(pool)));
926           goto next_thread;
927         }
928       }
929
930       /* If we still have no work we need to send a FISH to get a spark
931          from another PE 
932       */
933       if (EMPTY_RUN_QUEUE()) {
934       /* =8-[  no local sparks => look for work on other PEs */
935         /*
936          * We really have absolutely no work.  Send out a fish
937          * (there may be some out there already), and wait for
938          * something to arrive.  We clearly can't run any threads
939          * until a SCHEDULE or RESUME arrives, and so that's what
940          * we're hoping to see.  (Of course, we still have to
941          * respond to other types of messages.)
942          */
943         TIME now = msTime() /*CURRENT_TIME*/;
944         IF_PAR_DEBUG(verbose, 
945                      belch("--  now=%ld", now));
946         IF_PAR_DEBUG(verbose,
947                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
948                          (last_fish_arrived_at!=0 &&
949                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
950                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
951                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
952                              last_fish_arrived_at,
953                              RtsFlags.ParFlags.fishDelay, now);
954                      });
955         
956         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
957             (last_fish_arrived_at==0 ||
958              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
959           /* outstandingFishes is set in sendFish, processFish;
960              avoid flooding system with fishes via delay */
961           pe = choosePE();
962           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
963                    NEW_FISH_HUNGER);
964
965           // Global statistics: count no. of fishes
966           if (RtsFlags.ParFlags.ParStats.Global &&
967               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
968             globalParStats.tot_fish_mess++;
969           }
970         }
971       
972         receivedFinish = processMessages();
973         goto next_thread;
974       }
975     } else if (PacketsWaiting()) {  /* Look for incoming messages */
976       receivedFinish = processMessages();
977     }
978
979     /* Now we are sure that we have some work available */
980     ASSERT(run_queue_hd != END_TSO_QUEUE);
981
982     /* Take a thread from the run queue, if we have work */
983     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
984     IF_DEBUG(sanity,checkTSO(t));
985
986     /* ToDo: write something to the log-file
987     if (RTSflags.ParFlags.granSimStats && !sameThread)
988         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
989
990     CurrentTSO = t;
991     */
992     /* the spark pool for the current PE */
993     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
994
995     IF_DEBUG(scheduler, 
996              belch("--=^ %d threads, %d sparks on [%#x]", 
997                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
998
999 #if 1
1000     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1001         t && LastTSO && t->id != LastTSO->id && 
1002         LastTSO->why_blocked == NotBlocked && 
1003         LastTSO->what_next != ThreadComplete) {
1004       // if previously scheduled TSO not blocked we have to record the context switch
1005       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1006                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1007     }
1008
1009     if (RtsFlags.ParFlags.ParStats.Full && 
1010         (emitSchedule /* forced emit */ ||
1011         (t && LastTSO && t->id != LastTSO->id))) {
1012       /* 
1013          we are running a different TSO, so write a schedule event to log file
1014          NB: If we use fair scheduling we also have to write  a deschedule 
1015              event for LastTSO; with unfair scheduling we know that the
1016              previous tso has blocked whenever we switch to another tso, so
1017              we don't need it in GUM for now
1018       */
1019       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1020                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1021       emitSchedule = rtsFalse;
1022     }
1023      
1024 #endif
1025 #else /* !GRAN && !PAR */
1026   
1027     /* grab a thread from the run queue
1028      */
1029     ASSERT(run_queue_hd != END_TSO_QUEUE);
1030     t = POP_RUN_QUEUE();
1031     // Sanity check the thread we're about to run.  This can be
1032     // expensive if there is lots of thread switching going on...
1033     IF_DEBUG(sanity,checkTSO(t));
1034 #endif
1035     
1036     grabCapability(&cap);
1037     cap->r.rCurrentTSO = t;
1038     
1039     /* context switches are now initiated by the timer signal, unless
1040      * the user specified "context switch as often as possible", with
1041      * +RTS -C0
1042      */
1043     if (
1044 #ifdef PROFILING
1045         RtsFlags.ProfFlags.profileInterval == 0 ||
1046 #endif
1047         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1048          && (run_queue_hd != END_TSO_QUEUE
1049              || blocked_queue_hd != END_TSO_QUEUE
1050              || sleeping_queue != END_TSO_QUEUE)))
1051         context_switch = 1;
1052     else
1053         context_switch = 0;
1054
1055     RELEASE_LOCK(&sched_mutex);
1056
1057     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1058                               t->id, t, whatNext_strs[t->what_next]));
1059
1060 #ifdef PROFILING
1061     startHeapProfTimer();
1062 #endif
1063
1064     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1065     /* Run the current thread 
1066      */
1067     switch (cap->r.rCurrentTSO->what_next) {
1068     case ThreadKilled:
1069     case ThreadComplete:
1070         /* Thread already finished, return to scheduler. */
1071         ret = ThreadFinished;
1072         break;
1073     case ThreadEnterGHC:
1074         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1075         break;
1076     case ThreadRunGHC:
1077         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1078         break;
1079     case ThreadEnterInterp:
1080         ret = interpretBCO(cap);
1081         break;
1082     default:
1083       barf("schedule: invalid what_next field");
1084     }
1085     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1086     
1087     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1088 #ifdef PROFILING
1089     stopHeapProfTimer();
1090     CCCS = CCS_SYSTEM;
1091 #endif
1092     
1093     ACQUIRE_LOCK(&sched_mutex);
1094
1095 #ifdef SMP
1096     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1097 #elif !defined(GRAN) && !defined(PAR)
1098     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1099 #endif
1100     t = cap->r.rCurrentTSO;
1101     
1102 #if defined(PAR)
1103     /* HACK 675: if the last thread didn't yield, make sure to print a 
1104        SCHEDULE event to the log file when StgRunning the next thread, even
1105        if it is the same one as before */
1106     LastTSO = t; 
1107     TimeOfLastYield = CURRENT_TIME;
1108 #endif
1109
1110     switch (ret) {
1111     case HeapOverflow:
1112 #if defined(GRAN)
1113       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1114       globalGranStats.tot_heapover++;
1115 #elif defined(PAR)
1116       globalParStats.tot_heapover++;
1117 #endif
1118
1119       // did the task ask for a large block?
1120       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1121           // if so, get one and push it on the front of the nursery.
1122           bdescr *bd;
1123           nat blocks;
1124           
1125           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1126
1127           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1128                                    t->id, t,
1129                                    whatNext_strs[t->what_next], blocks));
1130
1131           // don't do this if it would push us over the
1132           // alloc_blocks_lim limit; we'll GC first.
1133           if (alloc_blocks + blocks < alloc_blocks_lim) {
1134
1135               alloc_blocks += blocks;
1136               bd = allocGroup( blocks );
1137
1138               // link the new group into the list
1139               bd->link = cap->r.rCurrentNursery;
1140               bd->u.back = cap->r.rCurrentNursery->u.back;
1141               if (cap->r.rCurrentNursery->u.back != NULL) {
1142                   cap->r.rCurrentNursery->u.back->link = bd;
1143               } else {
1144                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1145                          g0s0->blocks == cap->r.rNursery);
1146                   cap->r.rNursery = g0s0->blocks = bd;
1147               }           
1148               cap->r.rCurrentNursery->u.back = bd;
1149
1150               // initialise it as a nursery block
1151               bd->step = g0s0;
1152               bd->gen_no = 0;
1153               bd->flags = 0;
1154               bd->free = bd->start;
1155
1156               // don't forget to update the block count in g0s0.
1157               g0s0->n_blocks += blocks;
1158               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1159
1160               // now update the nursery to point to the new block
1161               cap->r.rCurrentNursery = bd;
1162
1163               // we might be unlucky and have another thread get on the
1164               // run queue before us and steal the large block, but in that
1165               // case the thread will just end up requesting another large
1166               // block.
1167               PUSH_ON_RUN_QUEUE(t);
1168               break;
1169           }
1170       }
1171
1172       /* make all the running tasks block on a condition variable,
1173        * maybe set context_switch and wait till they all pile in,
1174        * then have them wait on a GC condition variable.
1175        */
1176       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1177                                t->id, t, whatNext_strs[t->what_next]));
1178       threadPaused(t);
1179 #if defined(GRAN)
1180       ASSERT(!is_on_queue(t,CurrentProc));
1181 #elif defined(PAR)
1182       /* Currently we emit a DESCHEDULE event before GC in GUM.
1183          ToDo: either add separate event to distinguish SYSTEM time from rest
1184                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1185       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1186         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1187                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1188         emitSchedule = rtsTrue;
1189       }
1190 #endif
1191       
1192       ready_to_gc = rtsTrue;
1193       context_switch = 1;               /* stop other threads ASAP */
1194       PUSH_ON_RUN_QUEUE(t);
1195       /* actual GC is done at the end of the while loop */
1196       break;
1197       
1198     case StackOverflow:
1199 #if defined(GRAN)
1200       IF_DEBUG(gran, 
1201                DumpGranEvent(GR_DESCHEDULE, t));
1202       globalGranStats.tot_stackover++;
1203 #elif defined(PAR)
1204       // IF_DEBUG(par, 
1205       // DumpGranEvent(GR_DESCHEDULE, t);
1206       globalParStats.tot_stackover++;
1207 #endif
1208       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1209                                t->id, t, whatNext_strs[t->what_next]));
1210       /* just adjust the stack for this thread, then pop it back
1211        * on the run queue.
1212        */
1213       threadPaused(t);
1214       { 
1215         StgMainThread *m;
1216         /* enlarge the stack */
1217         StgTSO *new_t = threadStackOverflow(t);
1218         
1219         /* This TSO has moved, so update any pointers to it from the
1220          * main thread stack.  It better not be on any other queues...
1221          * (it shouldn't be).
1222          */
1223         for (m = main_threads; m != NULL; m = m->link) {
1224           if (m->tso == t) {
1225             m->tso = new_t;
1226           }
1227         }
1228         threadPaused(new_t);
1229         PUSH_ON_RUN_QUEUE(new_t);
1230       }
1231       break;
1232
1233     case ThreadYielding:
1234 #if defined(GRAN)
1235       IF_DEBUG(gran, 
1236                DumpGranEvent(GR_DESCHEDULE, t));
1237       globalGranStats.tot_yields++;
1238 #elif defined(PAR)
1239       // IF_DEBUG(par, 
1240       // DumpGranEvent(GR_DESCHEDULE, t);
1241       globalParStats.tot_yields++;
1242 #endif
1243       /* put the thread back on the run queue.  Then, if we're ready to
1244        * GC, check whether this is the last task to stop.  If so, wake
1245        * up the GC thread.  getThread will block during a GC until the
1246        * GC is finished.
1247        */
1248       IF_DEBUG(scheduler,
1249                if (t->what_next == ThreadEnterInterp) {
1250                    /* ToDo: or maybe a timer expired when we were in Hugs?
1251                     * or maybe someone hit ctrl-C
1252                     */
1253                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1254                          t->id, t, whatNext_strs[t->what_next]);
1255                } else {
1256                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1257                          t->id, t, whatNext_strs[t->what_next]);
1258                }
1259                );
1260
1261       threadPaused(t);
1262
1263       IF_DEBUG(sanity,
1264                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1265                checkTSO(t));
1266       ASSERT(t->link == END_TSO_QUEUE);
1267 #if defined(GRAN)
1268       ASSERT(!is_on_queue(t,CurrentProc));
1269
1270       IF_DEBUG(sanity,
1271                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1272                checkThreadQsSanity(rtsTrue));
1273 #endif
1274 #if defined(PAR)
1275       if (RtsFlags.ParFlags.doFairScheduling) { 
1276         /* this does round-robin scheduling; good for concurrency */
1277         APPEND_TO_RUN_QUEUE(t);
1278       } else {
1279         /* this does unfair scheduling; good for parallelism */
1280         PUSH_ON_RUN_QUEUE(t);
1281       }
1282 #else
1283       /* this does round-robin scheduling; good for concurrency */
1284       APPEND_TO_RUN_QUEUE(t);
1285 #endif
1286 #if defined(GRAN)
1287       /* add a ContinueThread event to actually process the thread */
1288       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1289                 ContinueThread,
1290                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1291       IF_GRAN_DEBUG(bq, 
1292                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1293                G_EVENTQ(0);
1294                G_CURR_THREADQ(0));
1295 #endif /* GRAN */
1296       break;
1297       
1298     case ThreadBlocked:
1299 #if defined(GRAN)
1300       IF_DEBUG(scheduler,
1301                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1302                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1303                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1304
1305       // ??? needed; should emit block before
1306       IF_DEBUG(gran, 
1307                DumpGranEvent(GR_DESCHEDULE, t)); 
1308       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1309       /*
1310         ngoq Dogh!
1311       ASSERT(procStatus[CurrentProc]==Busy || 
1312               ((procStatus[CurrentProc]==Fetching) && 
1313               (t->block_info.closure!=(StgClosure*)NULL)));
1314       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1315           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1316             procStatus[CurrentProc]==Fetching)) 
1317         procStatus[CurrentProc] = Idle;
1318       */
1319 #elif defined(PAR)
1320       IF_DEBUG(scheduler,
1321                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1322                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1323       IF_PAR_DEBUG(bq,
1324
1325                    if (t->block_info.closure!=(StgClosure*)NULL) 
1326                      print_bq(t->block_info.closure));
1327
1328       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1329       blockThread(t);
1330
1331       /* whatever we schedule next, we must log that schedule */
1332       emitSchedule = rtsTrue;
1333
1334 #else /* !GRAN */
1335       /* don't need to do anything.  Either the thread is blocked on
1336        * I/O, in which case we'll have called addToBlockedQueue
1337        * previously, or it's blocked on an MVar or Blackhole, in which
1338        * case it'll be on the relevant queue already.
1339        */
1340       IF_DEBUG(scheduler,
1341                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1342                printThreadBlockage(t);
1343                fprintf(stderr, "\n"));
1344
1345       /* Only for dumping event to log file 
1346          ToDo: do I need this in GranSim, too?
1347       blockThread(t);
1348       */
1349 #endif
1350       threadPaused(t);
1351       break;
1352       
1353     case ThreadFinished:
1354       /* Need to check whether this was a main thread, and if so, signal
1355        * the task that started it with the return value.  If we have no
1356        * more main threads, we probably need to stop all the tasks until
1357        * we get a new one.
1358        */
1359       /* We also end up here if the thread kills itself with an
1360        * uncaught exception, see Exception.hc.
1361        */
1362       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1363 #if defined(GRAN)
1364       endThread(t, CurrentProc); // clean-up the thread
1365 #elif defined(PAR)
1366       /* For now all are advisory -- HWL */
1367       //if(t->priority==AdvisoryPriority) ??
1368       advisory_thread_count--;
1369       
1370 # ifdef DIST
1371       if(t->dist.priority==RevalPriority)
1372         FinishReval(t);
1373 # endif
1374       
1375       if (RtsFlags.ParFlags.ParStats.Full &&
1376           !RtsFlags.ParFlags.ParStats.Suppressed) 
1377         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1378 #endif
1379       break;
1380       
1381     default:
1382       barf("schedule: invalid thread return code %d", (int)ret);
1383     }
1384     
1385 #ifdef SMP
1386     grabCapability(&cap);
1387 #endif
1388
1389 #ifdef PROFILING
1390     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1391         GarbageCollect(GetRoots, rtsTrue);
1392         heapCensus();
1393         performHeapProfile = rtsFalse;
1394         ready_to_gc = rtsFalse; // we already GC'd
1395     }
1396 #endif
1397
1398 #ifdef SMP
1399     if (ready_to_gc && allFreeCapabilities() )
1400 #else
1401     if (ready_to_gc) 
1402 #endif
1403       {
1404       /* everybody back, start the GC.
1405        * Could do it in this thread, or signal a condition var
1406        * to do it in another thread.  Either way, we need to
1407        * broadcast on gc_pending_cond afterward.
1408        */
1409 #if defined(RTS_SUPPORTS_THREADS)
1410       IF_DEBUG(scheduler,sched_belch("doing GC"));
1411 #endif
1412       GarbageCollect(GetRoots,rtsFalse);
1413       ready_to_gc = rtsFalse;
1414 #ifdef SMP
1415       broadcastCondition(&gc_pending_cond);
1416 #endif
1417 #if defined(GRAN)
1418       /* add a ContinueThread event to continue execution of current thread */
1419       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1420                 ContinueThread,
1421                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1422       IF_GRAN_DEBUG(bq, 
1423                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1424                G_EVENTQ(0);
1425                G_CURR_THREADQ(0));
1426 #endif /* GRAN */
1427     }
1428
1429 #if defined(GRAN)
1430   next_thread:
1431     IF_GRAN_DEBUG(unused,
1432                   print_eventq(EventHd));
1433
1434     event = get_next_event();
1435 #elif defined(PAR)
1436   next_thread:
1437     /* ToDo: wait for next message to arrive rather than busy wait */
1438 #endif /* GRAN */
1439
1440   } /* end of while(1) */
1441
1442   IF_PAR_DEBUG(verbose,
1443                belch("== Leaving schedule() after having received Finish"));
1444 }
1445
1446 /* ---------------------------------------------------------------------------
1447  * deleteAllThreads():  kill all the live threads.
1448  *
1449  * This is used when we catch a user interrupt (^C), before performing
1450  * any necessary cleanups and running finalizers.
1451  * ------------------------------------------------------------------------- */
1452    
1453 void deleteAllThreads ( void )
1454 {
1455   StgTSO* t;
1456   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1457   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1458       deleteThread(t);
1459   }
1460   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1461       deleteThread(t);
1462   }
1463   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1464       deleteThread(t);
1465   }
1466   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1467   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1468   sleeping_queue = END_TSO_QUEUE;
1469 }
1470
1471 /* startThread and  insertThread are now in GranSim.c -- HWL */
1472
1473
1474 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1475 //@subsection Suspend and Resume
1476
1477 /* ---------------------------------------------------------------------------
1478  * Suspending & resuming Haskell threads.
1479  * 
1480  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1481  * its capability before calling the C function.  This allows another
1482  * task to pick up the capability and carry on running Haskell
1483  * threads.  It also means that if the C call blocks, it won't lock
1484  * the whole system.
1485  *
1486  * The Haskell thread making the C call is put to sleep for the
1487  * duration of the call, on the susepended_ccalling_threads queue.  We
1488  * give out a token to the task, which it can use to resume the thread
1489  * on return from the C function.
1490  * ------------------------------------------------------------------------- */
1491    
1492 StgInt
1493 suspendThread( StgRegTable *reg )
1494 {
1495   nat tok;
1496   Capability *cap;
1497
1498   /* assume that *reg is a pointer to the StgRegTable part
1499    * of a Capability.
1500    */
1501   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1502
1503   ACQUIRE_LOCK(&sched_mutex);
1504
1505   IF_DEBUG(scheduler,
1506            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1507
1508   threadPaused(cap->r.rCurrentTSO);
1509   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1510   suspended_ccalling_threads = cap->r.rCurrentTSO;
1511
1512   /* Use the thread ID as the token; it should be unique */
1513   tok = cap->r.rCurrentTSO->id;
1514
1515   /* Hand back capability */
1516   releaseCapability(&cap);
1517   
1518 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1519   IF_DEBUG(scheduler, sched_belch("thread %d leaving RTS\n", tok));
1520   startTask(taskStart);
1521 #endif
1522   
1523   RELEASE_LOCK(&sched_mutex);
1524   RELEASE_LOCK(&rts_mutex);
1525   return tok; 
1526 }
1527
1528 StgRegTable *
1529 resumeThread( StgInt tok )
1530 {
1531   StgTSO *tso, **prev;
1532   Capability *cap;
1533
1534   IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok));
1535   ACQUIRE_LOCK(&sched_mutex);
1536   ext_threads_waiting++;
1537   IF_DEBUG(scheduler, sched_belch("thread %d returning, ext_thread count: %d.\n", tok, ext_threads_waiting));
1538   RELEASE_LOCK(&sched_mutex);
1539   
1540   IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok));
1541   ACQUIRE_LOCK(&rts_mutex);
1542   ext_threads_waiting--;
1543   IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok));
1544
1545 #if defined(THREADED_RTS)
1546   /* Free up any RTS-blocked threads. */
1547   broadcastCondition(&thread_ready_cond);
1548 #endif
1549
1550   /* Remove the thread off of the suspended list */
1551   prev = &suspended_ccalling_threads;
1552   for (tso = suspended_ccalling_threads; 
1553        tso != END_TSO_QUEUE; 
1554        prev = &tso->link, tso = tso->link) {
1555     if (tso->id == (StgThreadID)tok) {
1556       *prev = tso->link;
1557       break;
1558     }
1559   }
1560   if (tso == END_TSO_QUEUE) {
1561     barf("resumeThread: thread not found");
1562   }
1563   tso->link = END_TSO_QUEUE;
1564
1565 #if defined(SMP)
1566   while ( noFreeCapabilities() ) {
1567     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1568     waitCondition(&thread_ready_cond, &sched_mutex);
1569     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1570   }
1571 #endif
1572
1573   grabCapability(&cap);
1574
1575   cap->r.rCurrentTSO = tso;
1576
1577   return &cap->r;
1578 }
1579
1580
1581 /* ---------------------------------------------------------------------------
1582  * Static functions
1583  * ------------------------------------------------------------------------ */
1584 static void unblockThread(StgTSO *tso);
1585
1586 /* ---------------------------------------------------------------------------
1587  * Comparing Thread ids.
1588  *
1589  * This is used from STG land in the implementation of the
1590  * instances of Eq/Ord for ThreadIds.
1591  * ------------------------------------------------------------------------ */
1592
1593 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1594
1595   StgThreadID id1 = tso1->id; 
1596   StgThreadID id2 = tso2->id;
1597  
1598   if (id1 < id2) return (-1);
1599   if (id1 > id2) return 1;
1600   return 0;
1601 }
1602
1603 /* ---------------------------------------------------------------------------
1604  * Fetching the ThreadID from an StgTSO.
1605  *
1606  * This is used in the implementation of Show for ThreadIds.
1607  * ------------------------------------------------------------------------ */
1608 int rts_getThreadId(const StgTSO *tso) 
1609 {
1610   return tso->id;
1611 }
1612
1613 /* ---------------------------------------------------------------------------
1614    Create a new thread.
1615
1616    The new thread starts with the given stack size.  Before the
1617    scheduler can run, however, this thread needs to have a closure
1618    (and possibly some arguments) pushed on its stack.  See
1619    pushClosure() in Schedule.h.
1620
1621    createGenThread() and createIOThread() (in SchedAPI.h) are
1622    convenient packaged versions of this function.
1623
1624    currently pri (priority) is only used in a GRAN setup -- HWL
1625    ------------------------------------------------------------------------ */
1626 //@cindex createThread
1627 #if defined(GRAN)
1628 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1629 StgTSO *
1630 createThread(nat stack_size, StgInt pri)
1631 {
1632   return createThread_(stack_size, rtsFalse, pri);
1633 }
1634
1635 static StgTSO *
1636 createThread_(nat size, rtsBool have_lock, StgInt pri)
1637 {
1638 #else
1639 StgTSO *
1640 createThread(nat stack_size)
1641 {
1642   return createThread_(stack_size, rtsFalse);
1643 }
1644
1645 static StgTSO *
1646 createThread_(nat size, rtsBool have_lock)
1647 {
1648 #endif
1649
1650     StgTSO *tso;
1651     nat stack_size;
1652
1653     /* First check whether we should create a thread at all */
1654 #if defined(PAR)
1655   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1656   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1657     threadsIgnored++;
1658     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1659           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1660     return END_TSO_QUEUE;
1661   }
1662   threadsCreated++;
1663 #endif
1664
1665 #if defined(GRAN)
1666   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1667 #endif
1668
1669   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1670
1671   /* catch ridiculously small stack sizes */
1672   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1673     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1674   }
1675
1676   stack_size = size - TSO_STRUCT_SIZEW;
1677
1678   tso = (StgTSO *)allocate(size);
1679   TICK_ALLOC_TSO(stack_size, 0);
1680
1681   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1682 #if defined(GRAN)
1683   SET_GRAN_HDR(tso, ThisPE);
1684 #endif
1685   tso->what_next     = ThreadEnterGHC;
1686
1687   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1688    * protect the increment operation on next_thread_id.
1689    * In future, we could use an atomic increment instead.
1690    */
1691   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1692   tso->id = next_thread_id++; 
1693   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1694
1695   tso->why_blocked  = NotBlocked;
1696   tso->blocked_exceptions = NULL;
1697
1698   tso->stack_size   = stack_size;
1699   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1700                               - TSO_STRUCT_SIZEW;
1701   tso->sp           = (P_)&(tso->stack) + stack_size;
1702
1703 #ifdef PROFILING
1704   tso->prof.CCCS = CCS_MAIN;
1705 #endif
1706
1707   /* put a stop frame on the stack */
1708   tso->sp -= sizeofW(StgStopFrame);
1709   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1710   tso->su = (StgUpdateFrame*)tso->sp;
1711
1712   // ToDo: check this
1713 #if defined(GRAN)
1714   tso->link = END_TSO_QUEUE;
1715   /* uses more flexible routine in GranSim */
1716   insertThread(tso, CurrentProc);
1717 #else
1718   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1719    * from its creation
1720    */
1721 #endif
1722
1723 #if defined(GRAN) 
1724   if (RtsFlags.GranFlags.GranSimStats.Full) 
1725     DumpGranEvent(GR_START,tso);
1726 #elif defined(PAR)
1727   if (RtsFlags.ParFlags.ParStats.Full) 
1728     DumpGranEvent(GR_STARTQ,tso);
1729   /* HACk to avoid SCHEDULE 
1730      LastTSO = tso; */
1731 #endif
1732
1733   /* Link the new thread on the global thread list.
1734    */
1735   tso->global_link = all_threads;
1736   all_threads = tso;
1737
1738 #if defined(DIST)
1739   tso->dist.priority = MandatoryPriority; //by default that is...
1740 #endif
1741
1742 #if defined(GRAN)
1743   tso->gran.pri = pri;
1744 # if defined(DEBUG)
1745   tso->gran.magic = TSO_MAGIC; // debugging only
1746 # endif
1747   tso->gran.sparkname   = 0;
1748   tso->gran.startedat   = CURRENT_TIME; 
1749   tso->gran.exported    = 0;
1750   tso->gran.basicblocks = 0;
1751   tso->gran.allocs      = 0;
1752   tso->gran.exectime    = 0;
1753   tso->gran.fetchtime   = 0;
1754   tso->gran.fetchcount  = 0;
1755   tso->gran.blocktime   = 0;
1756   tso->gran.blockcount  = 0;
1757   tso->gran.blockedat   = 0;
1758   tso->gran.globalsparks = 0;
1759   tso->gran.localsparks  = 0;
1760   if (RtsFlags.GranFlags.Light)
1761     tso->gran.clock  = Now; /* local clock */
1762   else
1763     tso->gran.clock  = 0;
1764
1765   IF_DEBUG(gran,printTSO(tso));
1766 #elif defined(PAR)
1767 # if defined(DEBUG)
1768   tso->par.magic = TSO_MAGIC; // debugging only
1769 # endif
1770   tso->par.sparkname   = 0;
1771   tso->par.startedat   = CURRENT_TIME; 
1772   tso->par.exported    = 0;
1773   tso->par.basicblocks = 0;
1774   tso->par.allocs      = 0;
1775   tso->par.exectime    = 0;
1776   tso->par.fetchtime   = 0;
1777   tso->par.fetchcount  = 0;
1778   tso->par.blocktime   = 0;
1779   tso->par.blockcount  = 0;
1780   tso->par.blockedat   = 0;
1781   tso->par.globalsparks = 0;
1782   tso->par.localsparks  = 0;
1783 #endif
1784
1785 #if defined(GRAN)
1786   globalGranStats.tot_threads_created++;
1787   globalGranStats.threads_created_on_PE[CurrentProc]++;
1788   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1789   globalGranStats.tot_sq_probes++;
1790 #elif defined(PAR)
1791   // collect parallel global statistics (currently done together with GC stats)
1792   if (RtsFlags.ParFlags.ParStats.Global &&
1793       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1794     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1795     globalParStats.tot_threads_created++;
1796   }
1797 #endif 
1798
1799 #if defined(GRAN)
1800   IF_GRAN_DEBUG(pri,
1801                 belch("==__ schedule: Created TSO %d (%p);",
1802                       CurrentProc, tso, tso->id));
1803 #elif defined(PAR)
1804     IF_PAR_DEBUG(verbose,
1805                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1806                        tso->id, tso, advisory_thread_count));
1807 #else
1808   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1809                                  tso->id, tso->stack_size));
1810 #endif    
1811   return tso;
1812 }
1813
1814 #if defined(PAR)
1815 /* RFP:
1816    all parallel thread creation calls should fall through the following routine.
1817 */
1818 StgTSO *
1819 createSparkThread(rtsSpark spark) 
1820 { StgTSO *tso;
1821   ASSERT(spark != (rtsSpark)NULL);
1822   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1823   { threadsIgnored++;
1824     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1825           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1826     return END_TSO_QUEUE;
1827   }
1828   else
1829   { threadsCreated++;
1830     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1831     if (tso==END_TSO_QUEUE)     
1832       barf("createSparkThread: Cannot create TSO");
1833 #if defined(DIST)
1834     tso->priority = AdvisoryPriority;
1835 #endif
1836     pushClosure(tso,spark);
1837     PUSH_ON_RUN_QUEUE(tso);
1838     advisory_thread_count++;    
1839   }
1840   return tso;
1841 }
1842 #endif
1843
1844 /*
1845   Turn a spark into a thread.
1846   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1847 */
1848 #if defined(PAR)
1849 //@cindex activateSpark
1850 StgTSO *
1851 activateSpark (rtsSpark spark) 
1852 {
1853   StgTSO *tso;
1854
1855   tso = createSparkThread(spark);
1856   if (RtsFlags.ParFlags.ParStats.Full) {   
1857     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1858     IF_PAR_DEBUG(verbose,
1859                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1860                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1861   }
1862   // ToDo: fwd info on local/global spark to thread -- HWL
1863   // tso->gran.exported =  spark->exported;
1864   // tso->gran.locked =   !spark->global;
1865   // tso->gran.sparkname = spark->name;
1866
1867   return tso;
1868 }
1869 #endif
1870
1871 /* ---------------------------------------------------------------------------
1872  * scheduleThread()
1873  *
1874  * scheduleThread puts a thread on the head of the runnable queue.
1875  * This will usually be done immediately after a thread is created.
1876  * The caller of scheduleThread must create the thread using e.g.
1877  * createThread and push an appropriate closure
1878  * on this thread's stack before the scheduler is invoked.
1879  * ------------------------------------------------------------------------ */
1880
1881 void
1882 scheduleThread(StgTSO *tso)
1883 {
1884   ACQUIRE_LOCK(&sched_mutex);
1885
1886   /* Put the new thread on the head of the runnable queue.  The caller
1887    * better push an appropriate closure on this thread's stack
1888    * beforehand.  In the SMP case, the thread may start running as
1889    * soon as we release the scheduler lock below.
1890    */
1891   PUSH_ON_RUN_QUEUE(tso);
1892   THREAD_RUNNABLE();
1893
1894 #if 0
1895   IF_DEBUG(scheduler,printTSO(tso));
1896 #endif
1897   RELEASE_LOCK(&sched_mutex);
1898 }
1899
1900 /* ---------------------------------------------------------------------------
1901  * initScheduler()
1902  *
1903  * Initialise the scheduler.  This resets all the queues - if the
1904  * queues contained any threads, they'll be garbage collected at the
1905  * next pass.
1906  *
1907  * ------------------------------------------------------------------------ */
1908
1909 #ifdef SMP
1910 static void
1911 term_handler(int sig STG_UNUSED)
1912 {
1913   stat_workerStop();
1914   ACQUIRE_LOCK(&term_mutex);
1915   await_death--;
1916   RELEASE_LOCK(&term_mutex);
1917   shutdownThread();
1918 }
1919 #endif
1920
1921 void 
1922 initScheduler(void)
1923 {
1924 #if defined(GRAN)
1925   nat i;
1926
1927   for (i=0; i<=MAX_PROC; i++) {
1928     run_queue_hds[i]      = END_TSO_QUEUE;
1929     run_queue_tls[i]      = END_TSO_QUEUE;
1930     blocked_queue_hds[i]  = END_TSO_QUEUE;
1931     blocked_queue_tls[i]  = END_TSO_QUEUE;
1932     ccalling_threadss[i]  = END_TSO_QUEUE;
1933     sleeping_queue        = END_TSO_QUEUE;
1934   }
1935 #else
1936   run_queue_hd      = END_TSO_QUEUE;
1937   run_queue_tl      = END_TSO_QUEUE;
1938   blocked_queue_hd  = END_TSO_QUEUE;
1939   blocked_queue_tl  = END_TSO_QUEUE;
1940   sleeping_queue    = END_TSO_QUEUE;
1941 #endif 
1942
1943   suspended_ccalling_threads  = END_TSO_QUEUE;
1944
1945   main_threads = NULL;
1946   all_threads  = END_TSO_QUEUE;
1947
1948   context_switch = 0;
1949   interrupted    = 0;
1950
1951   RtsFlags.ConcFlags.ctxtSwitchTicks =
1952       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1953       
1954 #if defined(RTS_SUPPORTS_THREADS)
1955   /* Initialise the mutex and condition variables used by
1956    * the scheduler. */
1957   initMutex(&rts_mutex);
1958   initMutex(&sched_mutex);
1959   initMutex(&term_mutex);
1960 #if defined(THREADED_RTS)
1961   initMutex(&thread_ready_aux_mutex);
1962 #endif
1963   
1964   initCondition(&thread_ready_cond);
1965   initCondition(&gc_pending_cond);
1966 #endif
1967
1968 #if defined(THREADED_RTS)
1969   /* Grab big lock */
1970   ACQUIRE_LOCK(&rts_mutex);
1971   IF_DEBUG(scheduler, 
1972            sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId()));
1973 #endif
1974
1975   /* Install the SIGHUP handler */
1976 #ifdef SMP
1977   {
1978     struct sigaction action,oact;
1979
1980     action.sa_handler = term_handler;
1981     sigemptyset(&action.sa_mask);
1982     action.sa_flags = 0;
1983     if (sigaction(SIGTERM, &action, &oact) != 0) {
1984       barf("can't install TERM handler");
1985     }
1986   }
1987 #endif
1988
1989   /* A capability holds the state a native thread needs in
1990    * order to execute STG code. At least one capability is
1991    * floating around (only SMP builds have more than one).
1992    */
1993   initCapabilities();
1994   
1995 #if defined(RTS_SUPPORTS_THREADS)
1996     /* start our haskell execution tasks */
1997 # if defined(SMP)
1998     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
1999 # else
2000     startTaskManager(0,taskStart);
2001 # endif
2002 #endif
2003
2004 #if /* defined(SMP) ||*/ defined(PAR)
2005   initSparkPools();
2006 #endif
2007 }
2008
2009 void
2010 exitScheduler( void )
2011 {
2012 #if defined(RTS_SUPPORTS_THREADS)
2013   stopTaskManager();
2014 #endif
2015 }
2016
2017 /* -----------------------------------------------------------------------------
2018    Managing the per-task allocation areas.
2019    
2020    Each capability comes with an allocation area.  These are
2021    fixed-length block lists into which allocation can be done.
2022
2023    ToDo: no support for two-space collection at the moment???
2024    -------------------------------------------------------------------------- */
2025
2026 /* -----------------------------------------------------------------------------
2027  * waitThread is the external interface for running a new computation
2028  * and waiting for the result.
2029  *
2030  * In the non-SMP case, we create a new main thread, push it on the 
2031  * main-thread stack, and invoke the scheduler to run it.  The
2032  * scheduler will return when the top main thread on the stack has
2033  * completed or died, and fill in the necessary fields of the
2034  * main_thread structure.
2035  *
2036  * In the SMP case, we create a main thread as before, but we then
2037  * create a new condition variable and sleep on it.  When our new
2038  * main thread has completed, we'll be woken up and the status/result
2039  * will be in the main_thread struct.
2040  * -------------------------------------------------------------------------- */
2041
2042 int 
2043 howManyThreadsAvail ( void )
2044 {
2045    int i = 0;
2046    StgTSO* q;
2047    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2048       i++;
2049    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2050       i++;
2051    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2052       i++;
2053    return i;
2054 }
2055
2056 void
2057 finishAllThreads ( void )
2058 {
2059    do {
2060       while (run_queue_hd != END_TSO_QUEUE) {
2061          waitThread ( run_queue_hd, NULL );
2062       }
2063       while (blocked_queue_hd != END_TSO_QUEUE) {
2064          waitThread ( blocked_queue_hd, NULL );
2065       }
2066       while (sleeping_queue != END_TSO_QUEUE) {
2067          waitThread ( blocked_queue_hd, NULL );
2068       }
2069    } while 
2070       (blocked_queue_hd != END_TSO_QUEUE || 
2071        run_queue_hd     != END_TSO_QUEUE ||
2072        sleeping_queue   != END_TSO_QUEUE);
2073 }
2074
2075 SchedulerStatus
2076 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2077 {
2078   StgMainThread *m;
2079   SchedulerStatus stat;
2080
2081   ACQUIRE_LOCK(&sched_mutex);
2082   
2083   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2084
2085   m->tso = tso;
2086   m->ret = ret;
2087   m->stat = NoStatus;
2088 #if defined(RTS_SUPPORTS_THREADS)
2089   initCondition(&m->wakeup);
2090 #endif
2091
2092   m->link = main_threads;
2093   main_threads = m;
2094
2095   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2096                               m->tso->id));
2097
2098 #ifdef SMP
2099   do {
2100     waitCondition(&m->wakeup, &sched_mutex);
2101   } while (m->stat == NoStatus);
2102 #elif defined(GRAN)
2103   /* GranSim specific init */
2104   CurrentTSO = m->tso;                // the TSO to run
2105   procStatus[MainProc] = Busy;        // status of main PE
2106   CurrentProc = MainProc;             // PE to run it on
2107
2108   schedule();
2109 #else
2110   RELEASE_LOCK(&sched_mutex);
2111   schedule();
2112   ASSERT(m->stat != NoStatus);
2113 #endif
2114
2115   stat = m->stat;
2116
2117 #if defined(RTS_SUPPORTS_THREADS)
2118   closeCondition(&m->wakeup);
2119 #endif
2120
2121   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2122                               m->tso->id));
2123   free(m);
2124
2125   RELEASE_LOCK(&sched_mutex);
2126
2127   return stat;
2128 }
2129
2130 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2131 //@subsection Run queue code 
2132
2133 #if 0
2134 /* 
2135    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2136        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2137        implicit global variable that has to be correct when calling these
2138        fcts -- HWL 
2139 */
2140
2141 /* Put the new thread on the head of the runnable queue.
2142  * The caller of createThread better push an appropriate closure
2143  * on this thread's stack before the scheduler is invoked.
2144  */
2145 static /* inline */ void
2146 add_to_run_queue(tso)
2147 StgTSO* tso; 
2148 {
2149   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2150   tso->link = run_queue_hd;
2151   run_queue_hd = tso;
2152   if (run_queue_tl == END_TSO_QUEUE) {
2153     run_queue_tl = tso;
2154   }
2155 }
2156
2157 /* Put the new thread at the end of the runnable queue. */
2158 static /* inline */ void
2159 push_on_run_queue(tso)
2160 StgTSO* tso; 
2161 {
2162   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2163   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2164   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2165   if (run_queue_hd == END_TSO_QUEUE) {
2166     run_queue_hd = tso;
2167   } else {
2168     run_queue_tl->link = tso;
2169   }
2170   run_queue_tl = tso;
2171 }
2172
2173 /* 
2174    Should be inlined because it's used very often in schedule.  The tso
2175    argument is actually only needed in GranSim, where we want to have the
2176    possibility to schedule *any* TSO on the run queue, irrespective of the
2177    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2178    the run queue and dequeue the tso, adjusting the links in the queue. 
2179 */
2180 //@cindex take_off_run_queue
2181 static /* inline */ StgTSO*
2182 take_off_run_queue(StgTSO *tso) {
2183   StgTSO *t, *prev;
2184
2185   /* 
2186      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2187
2188      if tso is specified, unlink that tso from the run_queue (doesn't have
2189      to be at the beginning of the queue); GranSim only 
2190   */
2191   if (tso!=END_TSO_QUEUE) {
2192     /* find tso in queue */
2193     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2194          t!=END_TSO_QUEUE && t!=tso;
2195          prev=t, t=t->link) 
2196       /* nothing */ ;
2197     ASSERT(t==tso);
2198     /* now actually dequeue the tso */
2199     if (prev!=END_TSO_QUEUE) {
2200       ASSERT(run_queue_hd!=t);
2201       prev->link = t->link;
2202     } else {
2203       /* t is at beginning of thread queue */
2204       ASSERT(run_queue_hd==t);
2205       run_queue_hd = t->link;
2206     }
2207     /* t is at end of thread queue */
2208     if (t->link==END_TSO_QUEUE) {
2209       ASSERT(t==run_queue_tl);
2210       run_queue_tl = prev;
2211     } else {
2212       ASSERT(run_queue_tl!=t);
2213     }
2214     t->link = END_TSO_QUEUE;
2215   } else {
2216     /* take tso from the beginning of the queue; std concurrent code */
2217     t = run_queue_hd;
2218     if (t != END_TSO_QUEUE) {
2219       run_queue_hd = t->link;
2220       t->link = END_TSO_QUEUE;
2221       if (run_queue_hd == END_TSO_QUEUE) {
2222         run_queue_tl = END_TSO_QUEUE;
2223       }
2224     }
2225   }
2226   return t;
2227 }
2228
2229 #endif /* 0 */
2230
2231 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2232 //@subsection Garbage Collextion Routines
2233
2234 /* ---------------------------------------------------------------------------
2235    Where are the roots that we know about?
2236
2237         - all the threads on the runnable queue
2238         - all the threads on the blocked queue
2239         - all the threads on the sleeping queue
2240         - all the thread currently executing a _ccall_GC
2241         - all the "main threads"
2242      
2243    ------------------------------------------------------------------------ */
2244
2245 /* This has to be protected either by the scheduler monitor, or by the
2246         garbage collection monitor (probably the latter).
2247         KH @ 25/10/99
2248 */
2249
2250 void
2251 GetRoots(evac_fn evac)
2252 {
2253   StgMainThread *m;
2254
2255 #if defined(GRAN)
2256   {
2257     nat i;
2258     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2259       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2260           evac((StgClosure **)&run_queue_hds[i]);
2261       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2262           evac((StgClosure **)&run_queue_tls[i]);
2263       
2264       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2265           evac((StgClosure **)&blocked_queue_hds[i]);
2266       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2267           evac((StgClosure **)&blocked_queue_tls[i]);
2268       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2269           evac((StgClosure **)&ccalling_threads[i]);
2270     }
2271   }
2272
2273   markEventQueue();
2274
2275 #else /* !GRAN */
2276   if (run_queue_hd != END_TSO_QUEUE) {
2277       ASSERT(run_queue_tl != END_TSO_QUEUE);
2278       evac((StgClosure **)&run_queue_hd);
2279       evac((StgClosure **)&run_queue_tl);
2280   }
2281   
2282   if (blocked_queue_hd != END_TSO_QUEUE) {
2283       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2284       evac((StgClosure **)&blocked_queue_hd);
2285       evac((StgClosure **)&blocked_queue_tl);
2286   }
2287   
2288   if (sleeping_queue != END_TSO_QUEUE) {
2289       evac((StgClosure **)&sleeping_queue);
2290   }
2291 #endif 
2292
2293   for (m = main_threads; m != NULL; m = m->link) {
2294       evac((StgClosure **)&m->tso);
2295   }
2296   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2297       evac((StgClosure **)&suspended_ccalling_threads);
2298   }
2299
2300 #if defined(PAR) || defined(GRAN)
2301   markSparkQueue(evac);
2302 #endif
2303 }
2304
2305 /* -----------------------------------------------------------------------------
2306    performGC
2307
2308    This is the interface to the garbage collector from Haskell land.
2309    We provide this so that external C code can allocate and garbage
2310    collect when called from Haskell via _ccall_GC.
2311
2312    It might be useful to provide an interface whereby the programmer
2313    can specify more roots (ToDo).
2314    
2315    This needs to be protected by the GC condition variable above.  KH.
2316    -------------------------------------------------------------------------- */
2317
2318 void (*extra_roots)(evac_fn);
2319
2320 void
2321 performGC(void)
2322 {
2323   GarbageCollect(GetRoots,rtsFalse);
2324 }
2325
2326 void
2327 performMajorGC(void)
2328 {
2329   GarbageCollect(GetRoots,rtsTrue);
2330 }
2331
2332 static void
2333 AllRoots(evac_fn evac)
2334 {
2335     GetRoots(evac);             // the scheduler's roots
2336     extra_roots(evac);          // the user's roots
2337 }
2338
2339 void
2340 performGCWithRoots(void (*get_roots)(evac_fn))
2341 {
2342   extra_roots = get_roots;
2343   GarbageCollect(AllRoots,rtsFalse);
2344 }
2345
2346 /* -----------------------------------------------------------------------------
2347    Stack overflow
2348
2349    If the thread has reached its maximum stack size, then raise the
2350    StackOverflow exception in the offending thread.  Otherwise
2351    relocate the TSO into a larger chunk of memory and adjust its stack
2352    size appropriately.
2353    -------------------------------------------------------------------------- */
2354
2355 static StgTSO *
2356 threadStackOverflow(StgTSO *tso)
2357 {
2358   nat new_stack_size, new_tso_size, diff, stack_words;
2359   StgPtr new_sp;
2360   StgTSO *dest;
2361
2362   IF_DEBUG(sanity,checkTSO(tso));
2363   if (tso->stack_size >= tso->max_stack_size) {
2364
2365     IF_DEBUG(gc,
2366              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2367                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2368              /* If we're debugging, just print out the top of the stack */
2369              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2370                                               tso->sp+64)));
2371
2372     /* Send this thread the StackOverflow exception */
2373     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2374     return tso;
2375   }
2376
2377   /* Try to double the current stack size.  If that takes us over the
2378    * maximum stack size for this thread, then use the maximum instead.
2379    * Finally round up so the TSO ends up as a whole number of blocks.
2380    */
2381   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2382   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2383                                        TSO_STRUCT_SIZE)/sizeof(W_);
2384   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2385   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2386
2387   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2388
2389   dest = (StgTSO *)allocate(new_tso_size);
2390   TICK_ALLOC_TSO(new_stack_size,0);
2391
2392   /* copy the TSO block and the old stack into the new area */
2393   memcpy(dest,tso,TSO_STRUCT_SIZE);
2394   stack_words = tso->stack + tso->stack_size - tso->sp;
2395   new_sp = (P_)dest + new_tso_size - stack_words;
2396   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2397
2398   /* relocate the stack pointers... */
2399   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2400   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2401   dest->sp    = new_sp;
2402   dest->stack_size = new_stack_size;
2403         
2404   /* and relocate the update frame list */
2405   relocate_stack(dest, diff);
2406
2407   /* Mark the old TSO as relocated.  We have to check for relocated
2408    * TSOs in the garbage collector and any primops that deal with TSOs.
2409    *
2410    * It's important to set the sp and su values to just beyond the end
2411    * of the stack, so we don't attempt to scavenge any part of the
2412    * dead TSO's stack.
2413    */
2414   tso->what_next = ThreadRelocated;
2415   tso->link = dest;
2416   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2417   tso->su = (StgUpdateFrame *)tso->sp;
2418   tso->why_blocked = NotBlocked;
2419   dest->mut_link = NULL;
2420
2421   IF_PAR_DEBUG(verbose,
2422                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2423                      tso->id, tso, tso->stack_size);
2424                /* If we're debugging, just print out the top of the stack */
2425                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2426                                                 tso->sp+64)));
2427   
2428   IF_DEBUG(sanity,checkTSO(tso));
2429 #if 0
2430   IF_DEBUG(scheduler,printTSO(dest));
2431 #endif
2432
2433   return dest;
2434 }
2435
2436 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2437 //@subsection Blocking Queue Routines
2438
2439 /* ---------------------------------------------------------------------------
2440    Wake up a queue that was blocked on some resource.
2441    ------------------------------------------------------------------------ */
2442
2443 #if defined(GRAN)
2444 static inline void
2445 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2446 {
2447 }
2448 #elif defined(PAR)
2449 static inline void
2450 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2451 {
2452   /* write RESUME events to log file and
2453      update blocked and fetch time (depending on type of the orig closure) */
2454   if (RtsFlags.ParFlags.ParStats.Full) {
2455     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2456                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2457                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2458     if (EMPTY_RUN_QUEUE())
2459       emitSchedule = rtsTrue;
2460
2461     switch (get_itbl(node)->type) {
2462         case FETCH_ME_BQ:
2463           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2464           break;
2465         case RBH:
2466         case FETCH_ME:
2467         case BLACKHOLE_BQ:
2468           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2469           break;
2470 #ifdef DIST
2471         case MVAR:
2472           break;
2473 #endif    
2474         default:
2475           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2476         }
2477       }
2478 }
2479 #endif
2480
2481 #if defined(GRAN)
2482 static StgBlockingQueueElement *
2483 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2484 {
2485     StgTSO *tso;
2486     PEs node_loc, tso_loc;
2487
2488     node_loc = where_is(node); // should be lifted out of loop
2489     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2490     tso_loc = where_is((StgClosure *)tso);
2491     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2492       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2493       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2494       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2495       // insertThread(tso, node_loc);
2496       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2497                 ResumeThread,
2498                 tso, node, (rtsSpark*)NULL);
2499       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2500       // len_local++;
2501       // len++;
2502     } else { // TSO is remote (actually should be FMBQ)
2503       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2504                                   RtsFlags.GranFlags.Costs.gunblocktime +
2505                                   RtsFlags.GranFlags.Costs.latency;
2506       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2507                 UnblockThread,
2508                 tso, node, (rtsSpark*)NULL);
2509       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2510       // len++;
2511     }
2512     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2513     IF_GRAN_DEBUG(bq,
2514                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2515                           (node_loc==tso_loc ? "Local" : "Global"), 
2516                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2517     tso->block_info.closure = NULL;
2518     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2519                              tso->id, tso));
2520 }
2521 #elif defined(PAR)
2522 static StgBlockingQueueElement *
2523 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2524 {
2525     StgBlockingQueueElement *next;
2526
2527     switch (get_itbl(bqe)->type) {
2528     case TSO:
2529       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2530       /* if it's a TSO just push it onto the run_queue */
2531       next = bqe->link;
2532       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2533       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2534       THREAD_RUNNABLE();
2535       unblockCount(bqe, node);
2536       /* reset blocking status after dumping event */
2537       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2538       break;
2539
2540     case BLOCKED_FETCH:
2541       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2542       next = bqe->link;
2543       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2544       PendingFetches = (StgBlockedFetch *)bqe;
2545       break;
2546
2547 # if defined(DEBUG)
2548       /* can ignore this case in a non-debugging setup; 
2549          see comments on RBHSave closures above */
2550     case CONSTR:
2551       /* check that the closure is an RBHSave closure */
2552       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2553              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2554              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2555       break;
2556
2557     default:
2558       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2559            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2560            (StgClosure *)bqe);
2561 # endif
2562     }
2563   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2564   return next;
2565 }
2566
2567 #else /* !GRAN && !PAR */
2568 static StgTSO *
2569 unblockOneLocked(StgTSO *tso)
2570 {
2571   StgTSO *next;
2572
2573   ASSERT(get_itbl(tso)->type == TSO);
2574   ASSERT(tso->why_blocked != NotBlocked);
2575   tso->why_blocked = NotBlocked;
2576   next = tso->link;
2577   PUSH_ON_RUN_QUEUE(tso);
2578   THREAD_RUNNABLE();
2579   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2580   return next;
2581 }
2582 #endif
2583
2584 #if defined(GRAN) || defined(PAR)
2585 inline StgBlockingQueueElement *
2586 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2587 {
2588   ACQUIRE_LOCK(&sched_mutex);
2589   bqe = unblockOneLocked(bqe, node);
2590   RELEASE_LOCK(&sched_mutex);
2591   return bqe;
2592 }
2593 #else
2594 inline StgTSO *
2595 unblockOne(StgTSO *tso)
2596 {
2597   ACQUIRE_LOCK(&sched_mutex);
2598   tso = unblockOneLocked(tso);
2599   RELEASE_LOCK(&sched_mutex);
2600   return tso;
2601 }
2602 #endif
2603
2604 #if defined(GRAN)
2605 void 
2606 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2607 {
2608   StgBlockingQueueElement *bqe;
2609   PEs node_loc;
2610   nat len = 0; 
2611
2612   IF_GRAN_DEBUG(bq, 
2613                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2614                       node, CurrentProc, CurrentTime[CurrentProc], 
2615                       CurrentTSO->id, CurrentTSO));
2616
2617   node_loc = where_is(node);
2618
2619   ASSERT(q == END_BQ_QUEUE ||
2620          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2621          get_itbl(q)->type == CONSTR); // closure (type constructor)
2622   ASSERT(is_unique(node));
2623
2624   /* FAKE FETCH: magically copy the node to the tso's proc;
2625      no Fetch necessary because in reality the node should not have been 
2626      moved to the other PE in the first place
2627   */
2628   if (CurrentProc!=node_loc) {
2629     IF_GRAN_DEBUG(bq, 
2630                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2631                         node, node_loc, CurrentProc, CurrentTSO->id, 
2632                         // CurrentTSO, where_is(CurrentTSO),
2633                         node->header.gran.procs));
2634     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2635     IF_GRAN_DEBUG(bq, 
2636                   belch("## new bitmask of node %p is %#x",
2637                         node, node->header.gran.procs));
2638     if (RtsFlags.GranFlags.GranSimStats.Global) {
2639       globalGranStats.tot_fake_fetches++;
2640     }
2641   }
2642
2643   bqe = q;
2644   // ToDo: check: ASSERT(CurrentProc==node_loc);
2645   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2646     //next = bqe->link;
2647     /* 
2648        bqe points to the current element in the queue
2649        next points to the next element in the queue
2650     */
2651     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2652     //tso_loc = where_is(tso);
2653     len++;
2654     bqe = unblockOneLocked(bqe, node);
2655   }
2656
2657   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2658      the closure to make room for the anchor of the BQ */
2659   if (bqe!=END_BQ_QUEUE) {
2660     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2661     /*
2662     ASSERT((info_ptr==&RBH_Save_0_info) ||
2663            (info_ptr==&RBH_Save_1_info) ||
2664            (info_ptr==&RBH_Save_2_info));
2665     */
2666     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2667     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2668     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2669
2670     IF_GRAN_DEBUG(bq,
2671                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2672                         node, info_type(node)));
2673   }
2674
2675   /* statistics gathering */
2676   if (RtsFlags.GranFlags.GranSimStats.Global) {
2677     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2678     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2679     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2680     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2681   }
2682   IF_GRAN_DEBUG(bq,
2683                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2684                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2685 }
2686 #elif defined(PAR)
2687 void 
2688 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2689 {
2690   StgBlockingQueueElement *bqe;
2691
2692   ACQUIRE_LOCK(&sched_mutex);
2693
2694   IF_PAR_DEBUG(verbose, 
2695                belch("##-_ AwBQ for node %p on [%x]: ",
2696                      node, mytid));
2697 #ifdef DIST  
2698   //RFP
2699   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2700     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2701     return;
2702   }
2703 #endif
2704   
2705   ASSERT(q == END_BQ_QUEUE ||
2706          get_itbl(q)->type == TSO ||           
2707          get_itbl(q)->type == BLOCKED_FETCH || 
2708          get_itbl(q)->type == CONSTR); 
2709
2710   bqe = q;
2711   while (get_itbl(bqe)->type==TSO || 
2712          get_itbl(bqe)->type==BLOCKED_FETCH) {
2713     bqe = unblockOneLocked(bqe, node);
2714   }
2715   RELEASE_LOCK(&sched_mutex);
2716 }
2717
2718 #else   /* !GRAN && !PAR */
2719 void
2720 awakenBlockedQueue(StgTSO *tso)
2721 {
2722   ACQUIRE_LOCK(&sched_mutex);
2723   while (tso != END_TSO_QUEUE) {
2724     tso = unblockOneLocked(tso);
2725   }
2726   RELEASE_LOCK(&sched_mutex);
2727 }
2728 #endif
2729
2730 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2731 //@subsection Exception Handling Routines
2732
2733 /* ---------------------------------------------------------------------------
2734    Interrupt execution
2735    - usually called inside a signal handler so it mustn't do anything fancy.   
2736    ------------------------------------------------------------------------ */
2737
2738 void
2739 interruptStgRts(void)
2740 {
2741     interrupted    = 1;
2742     context_switch = 1;
2743 }
2744
2745 /* -----------------------------------------------------------------------------
2746    Unblock a thread
2747
2748    This is for use when we raise an exception in another thread, which
2749    may be blocked.
2750    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2751    -------------------------------------------------------------------------- */
2752
2753 #if defined(GRAN) || defined(PAR)
2754 /*
2755   NB: only the type of the blocking queue is different in GranSim and GUM
2756       the operations on the queue-elements are the same
2757       long live polymorphism!
2758 */
2759 static void
2760 unblockThread(StgTSO *tso)
2761 {
2762   StgBlockingQueueElement *t, **last;
2763
2764   ACQUIRE_LOCK(&sched_mutex);
2765   switch (tso->why_blocked) {
2766
2767   case NotBlocked:
2768     return;  /* not blocked */
2769
2770   case BlockedOnMVar:
2771     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2772     {
2773       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2774       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2775
2776       last = (StgBlockingQueueElement **)&mvar->head;
2777       for (t = (StgBlockingQueueElement *)mvar->head; 
2778            t != END_BQ_QUEUE; 
2779            last = &t->link, last_tso = t, t = t->link) {
2780         if (t == (StgBlockingQueueElement *)tso) {
2781           *last = (StgBlockingQueueElement *)tso->link;
2782           if (mvar->tail == tso) {
2783             mvar->tail = (StgTSO *)last_tso;
2784           }
2785           goto done;
2786         }
2787       }
2788       barf("unblockThread (MVAR): TSO not found");
2789     }
2790
2791   case BlockedOnBlackHole:
2792     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2793     {
2794       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2795
2796       last = &bq->blocking_queue;
2797       for (t = bq->blocking_queue; 
2798            t != END_BQ_QUEUE; 
2799            last = &t->link, t = t->link) {
2800         if (t == (StgBlockingQueueElement *)tso) {
2801           *last = (StgBlockingQueueElement *)tso->link;
2802           goto done;
2803         }
2804       }
2805       barf("unblockThread (BLACKHOLE): TSO not found");
2806     }
2807
2808   case BlockedOnException:
2809     {
2810       StgTSO *target  = tso->block_info.tso;
2811
2812       ASSERT(get_itbl(target)->type == TSO);
2813
2814       if (target->what_next == ThreadRelocated) {
2815           target = target->link;
2816           ASSERT(get_itbl(target)->type == TSO);
2817       }
2818
2819       ASSERT(target->blocked_exceptions != NULL);
2820
2821       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2822       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2823            t != END_BQ_QUEUE; 
2824            last = &t->link, t = t->link) {
2825         ASSERT(get_itbl(t)->type == TSO);
2826         if (t == (StgBlockingQueueElement *)tso) {
2827           *last = (StgBlockingQueueElement *)tso->link;
2828           goto done;
2829         }
2830       }
2831       barf("unblockThread (Exception): TSO not found");
2832     }
2833
2834   case BlockedOnRead:
2835   case BlockedOnWrite:
2836     {
2837       /* take TSO off blocked_queue */
2838       StgBlockingQueueElement *prev = NULL;
2839       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2840            prev = t, t = t->link) {
2841         if (t == (StgBlockingQueueElement *)tso) {
2842           if (prev == NULL) {
2843             blocked_queue_hd = (StgTSO *)t->link;
2844             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2845               blocked_queue_tl = END_TSO_QUEUE;
2846             }
2847           } else {
2848             prev->link = t->link;
2849             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2850               blocked_queue_tl = (StgTSO *)prev;
2851             }
2852           }
2853           goto done;
2854         }
2855       }
2856       barf("unblockThread (I/O): TSO not found");
2857     }
2858
2859   case BlockedOnDelay:
2860     {
2861       /* take TSO off sleeping_queue */
2862       StgBlockingQueueElement *prev = NULL;
2863       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2864            prev = t, t = t->link) {
2865         if (t == (StgBlockingQueueElement *)tso) {
2866           if (prev == NULL) {
2867             sleeping_queue = (StgTSO *)t->link;
2868           } else {
2869             prev->link = t->link;
2870           }
2871           goto done;
2872         }
2873       }
2874       barf("unblockThread (I/O): TSO not found");
2875     }
2876
2877   default:
2878     barf("unblockThread");
2879   }
2880
2881  done:
2882   tso->link = END_TSO_QUEUE;
2883   tso->why_blocked = NotBlocked;
2884   tso->block_info.closure = NULL;
2885   PUSH_ON_RUN_QUEUE(tso);
2886   RELEASE_LOCK(&sched_mutex);
2887 }
2888 #else
2889 static void
2890 unblockThread(StgTSO *tso)
2891 {
2892   StgTSO *t, **last;
2893
2894   ACQUIRE_LOCK(&sched_mutex);
2895   switch (tso->why_blocked) {
2896
2897   case NotBlocked:
2898     return;  /* not blocked */
2899
2900   case BlockedOnMVar:
2901     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2902     {
2903       StgTSO *last_tso = END_TSO_QUEUE;
2904       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2905
2906       last = &mvar->head;
2907       for (t = mvar->head; t != END_TSO_QUEUE; 
2908            last = &t->link, last_tso = t, t = t->link) {
2909         if (t == tso) {
2910           *last = tso->link;
2911           if (mvar->tail == tso) {
2912             mvar->tail = last_tso;
2913           }
2914           goto done;
2915         }
2916       }
2917       barf("unblockThread (MVAR): TSO not found");
2918     }
2919
2920   case BlockedOnBlackHole:
2921     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2922     {
2923       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2924
2925       last = &bq->blocking_queue;
2926       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2927            last = &t->link, t = t->link) {
2928         if (t == tso) {
2929           *last = tso->link;
2930           goto done;
2931         }
2932       }
2933       barf("unblockThread (BLACKHOLE): TSO not found");
2934     }
2935
2936   case BlockedOnException:
2937     {
2938       StgTSO *target  = tso->block_info.tso;
2939
2940       ASSERT(get_itbl(target)->type == TSO);
2941
2942       while (target->what_next == ThreadRelocated) {
2943           target = target->link;
2944           ASSERT(get_itbl(target)->type == TSO);
2945       }
2946       
2947       ASSERT(target->blocked_exceptions != NULL);
2948
2949       last = &target->blocked_exceptions;
2950       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2951            last = &t->link, t = t->link) {
2952         ASSERT(get_itbl(t)->type == TSO);
2953         if (t == tso) {
2954           *last = tso->link;
2955           goto done;
2956         }
2957       }
2958       barf("unblockThread (Exception): TSO not found");
2959     }
2960
2961   case BlockedOnRead:
2962   case BlockedOnWrite:
2963     {
2964       StgTSO *prev = NULL;
2965       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2966            prev = t, t = t->link) {
2967         if (t == tso) {
2968           if (prev == NULL) {
2969             blocked_queue_hd = t->link;
2970             if (blocked_queue_tl == t) {
2971               blocked_queue_tl = END_TSO_QUEUE;
2972             }
2973           } else {
2974             prev->link = t->link;
2975             if (blocked_queue_tl == t) {
2976               blocked_queue_tl = prev;
2977             }
2978           }
2979           goto done;
2980         }
2981       }
2982       barf("unblockThread (I/O): TSO not found");
2983     }
2984
2985   case BlockedOnDelay:
2986     {
2987       StgTSO *prev = NULL;
2988       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2989            prev = t, t = t->link) {
2990         if (t == tso) {
2991           if (prev == NULL) {
2992             sleeping_queue = t->link;
2993           } else {
2994             prev->link = t->link;
2995           }
2996           goto done;
2997         }
2998       }
2999       barf("unblockThread (I/O): TSO not found");
3000     }
3001
3002   default:
3003     barf("unblockThread");
3004   }
3005
3006  done:
3007   tso->link = END_TSO_QUEUE;
3008   tso->why_blocked = NotBlocked;
3009   tso->block_info.closure = NULL;
3010   PUSH_ON_RUN_QUEUE(tso);
3011   RELEASE_LOCK(&sched_mutex);
3012 }
3013 #endif
3014
3015 /* -----------------------------------------------------------------------------
3016  * raiseAsync()
3017  *
3018  * The following function implements the magic for raising an
3019  * asynchronous exception in an existing thread.
3020  *
3021  * We first remove the thread from any queue on which it might be
3022  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3023  *
3024  * We strip the stack down to the innermost CATCH_FRAME, building
3025  * thunks in the heap for all the active computations, so they can 
3026  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3027  * an application of the handler to the exception, and push it on
3028  * the top of the stack.
3029  * 
3030  * How exactly do we save all the active computations?  We create an
3031  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3032  * AP_UPDs pushes everything from the corresponding update frame
3033  * upwards onto the stack.  (Actually, it pushes everything up to the
3034  * next update frame plus a pointer to the next AP_UPD object.
3035  * Entering the next AP_UPD object pushes more onto the stack until we
3036  * reach the last AP_UPD object - at which point the stack should look
3037  * exactly as it did when we killed the TSO and we can continue
3038  * execution by entering the closure on top of the stack.
3039  *
3040  * We can also kill a thread entirely - this happens if either (a) the 
3041  * exception passed to raiseAsync is NULL, or (b) there's no
3042  * CATCH_FRAME on the stack.  In either case, we strip the entire
3043  * stack and replace the thread with a zombie.
3044  *
3045  * -------------------------------------------------------------------------- */
3046  
3047 void 
3048 deleteThread(StgTSO *tso)
3049 {
3050   raiseAsync(tso,NULL);
3051 }
3052
3053 void
3054 raiseAsync(StgTSO *tso, StgClosure *exception)
3055 {
3056   StgUpdateFrame* su = tso->su;
3057   StgPtr          sp = tso->sp;
3058   
3059   /* Thread already dead? */
3060   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3061     return;
3062   }
3063
3064   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3065
3066   /* Remove it from any blocking queues */
3067   unblockThread(tso);
3068
3069   /* The stack freezing code assumes there's a closure pointer on
3070    * the top of the stack.  This isn't always the case with compiled
3071    * code, so we have to push a dummy closure on the top which just
3072    * returns to the next return address on the stack.
3073    */
3074   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3075     *(--sp) = (W_)&stg_dummy_ret_closure;
3076   }
3077
3078   while (1) {
3079     nat words = ((P_)su - (P_)sp) - 1;
3080     nat i;
3081     StgAP_UPD * ap;
3082
3083     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3084      * then build PAP(handler,exception,realworld#), and leave it on
3085      * top of the stack ready to enter.
3086      */
3087     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3088       StgCatchFrame *cf = (StgCatchFrame *)su;
3089       /* we've got an exception to raise, so let's pass it to the
3090        * handler in this frame.
3091        */
3092       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3093       TICK_ALLOC_UPD_PAP(3,0);
3094       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3095               
3096       ap->n_args = 2;
3097       ap->fun = cf->handler;    /* :: Exception -> IO a */
3098       ap->payload[0] = exception;
3099       ap->payload[1] = ARG_TAG(0); /* realworld token */
3100
3101       /* throw away the stack from Sp up to and including the
3102        * CATCH_FRAME.
3103        */
3104       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3105       tso->su = cf->link;
3106
3107       /* Restore the blocked/unblocked state for asynchronous exceptions
3108        * at the CATCH_FRAME.  
3109        *
3110        * If exceptions were unblocked at the catch, arrange that they
3111        * are unblocked again after executing the handler by pushing an
3112        * unblockAsyncExceptions_ret stack frame.
3113        */
3114       if (!cf->exceptions_blocked) {
3115         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3116       }
3117       
3118       /* Ensure that async exceptions are blocked when running the handler.
3119        */
3120       if (tso->blocked_exceptions == NULL) {
3121         tso->blocked_exceptions = END_TSO_QUEUE;
3122       }
3123       
3124       /* Put the newly-built PAP on top of the stack, ready to execute
3125        * when the thread restarts.
3126        */
3127       sp[0] = (W_)ap;
3128       tso->sp = sp;
3129       tso->what_next = ThreadEnterGHC;
3130       IF_DEBUG(sanity, checkTSO(tso));
3131       return;
3132     }
3133
3134     /* First build an AP_UPD consisting of the stack chunk above the
3135      * current update frame, with the top word on the stack as the
3136      * fun field.
3137      */
3138     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3139     
3140     ASSERT(words >= 0);
3141     
3142     ap->n_args = words;
3143     ap->fun    = (StgClosure *)sp[0];
3144     sp++;
3145     for(i=0; i < (nat)words; ++i) {
3146       ap->payload[i] = (StgClosure *)*sp++;
3147     }
3148     
3149     switch (get_itbl(su)->type) {
3150       
3151     case UPDATE_FRAME:
3152       {
3153         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3154         TICK_ALLOC_UP_THK(words+1,0);
3155         
3156         IF_DEBUG(scheduler,
3157                  fprintf(stderr,  "scheduler: Updating ");
3158                  printPtr((P_)su->updatee); 
3159                  fprintf(stderr,  " with ");
3160                  printObj((StgClosure *)ap);
3161                  );
3162         
3163         /* Replace the updatee with an indirection - happily
3164          * this will also wake up any threads currently
3165          * waiting on the result.
3166          *
3167          * Warning: if we're in a loop, more than one update frame on
3168          * the stack may point to the same object.  Be careful not to
3169          * overwrite an IND_OLDGEN in this case, because we'll screw
3170          * up the mutable lists.  To be on the safe side, don't
3171          * overwrite any kind of indirection at all.  See also
3172          * threadSqueezeStack in GC.c, where we have to make a similar
3173          * check.
3174          */
3175         if (!closure_IND(su->updatee)) {
3176             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3177         }
3178         su = su->link;
3179         sp += sizeofW(StgUpdateFrame) -1;
3180         sp[0] = (W_)ap; /* push onto stack */
3181         break;
3182       }
3183
3184     case CATCH_FRAME:
3185       {
3186         StgCatchFrame *cf = (StgCatchFrame *)su;
3187         StgClosure* o;
3188         
3189         /* We want a PAP, not an AP_UPD.  Fortunately, the
3190          * layout's the same.
3191          */
3192         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3193         TICK_ALLOC_UPD_PAP(words+1,0);
3194         
3195         /* now build o = FUN(catch,ap,handler) */
3196         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3197         TICK_ALLOC_FUN(2,0);
3198         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3199         o->payload[0] = (StgClosure *)ap;
3200         o->payload[1] = cf->handler;
3201         
3202         IF_DEBUG(scheduler,
3203                  fprintf(stderr,  "scheduler: Built ");
3204                  printObj((StgClosure *)o);
3205                  );
3206         
3207         /* pop the old handler and put o on the stack */
3208         su = cf->link;
3209         sp += sizeofW(StgCatchFrame) - 1;
3210         sp[0] = (W_)o;
3211         break;
3212       }
3213       
3214     case SEQ_FRAME:
3215       {
3216         StgSeqFrame *sf = (StgSeqFrame *)su;
3217         StgClosure* o;
3218         
3219         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3220         TICK_ALLOC_UPD_PAP(words+1,0);
3221         
3222         /* now build o = FUN(seq,ap) */
3223         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3224         TICK_ALLOC_SE_THK(1,0);
3225         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3226         o->payload[0] = (StgClosure *)ap;
3227         
3228         IF_DEBUG(scheduler,
3229                  fprintf(stderr,  "scheduler: Built ");
3230                  printObj((StgClosure *)o);
3231                  );
3232         
3233         /* pop the old handler and put o on the stack */
3234         su = sf->link;
3235         sp += sizeofW(StgSeqFrame) - 1;
3236         sp[0] = (W_)o;
3237         break;
3238       }
3239       
3240     case STOP_FRAME:
3241       /* We've stripped the entire stack, the thread is now dead. */
3242       sp += sizeofW(StgStopFrame) - 1;
3243       sp[0] = (W_)exception;    /* save the exception */
3244       tso->what_next = ThreadKilled;
3245       tso->su = (StgUpdateFrame *)(sp+1);
3246       tso->sp = sp;
3247       return;
3248
3249     default:
3250       barf("raiseAsync");
3251     }
3252   }
3253   barf("raiseAsync");
3254 }
3255
3256 /* -----------------------------------------------------------------------------
3257    resurrectThreads is called after garbage collection on the list of
3258    threads found to be garbage.  Each of these threads will be woken
3259    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3260    on an MVar, or NonTermination if the thread was blocked on a Black
3261    Hole.
3262    -------------------------------------------------------------------------- */
3263
3264 void
3265 resurrectThreads( StgTSO *threads )
3266 {
3267   StgTSO *tso, *next;
3268
3269   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3270     next = tso->global_link;
3271     tso->global_link = all_threads;
3272     all_threads = tso;
3273     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3274
3275     switch (tso->why_blocked) {
3276     case BlockedOnMVar:
3277     case BlockedOnException:
3278       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3279       break;
3280     case BlockedOnBlackHole:
3281       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3282       break;
3283     case NotBlocked:
3284       /* This might happen if the thread was blocked on a black hole
3285        * belonging to a thread that we've just woken up (raiseAsync
3286        * can wake up threads, remember...).
3287        */
3288       continue;
3289     default:
3290       barf("resurrectThreads: thread blocked in a strange way");
3291     }
3292   }
3293 }
3294
3295 /* -----------------------------------------------------------------------------
3296  * Blackhole detection: if we reach a deadlock, test whether any
3297  * threads are blocked on themselves.  Any threads which are found to
3298  * be self-blocked get sent a NonTermination exception.
3299  *
3300  * This is only done in a deadlock situation in order to avoid
3301  * performance overhead in the normal case.
3302  * -------------------------------------------------------------------------- */
3303
3304 static void
3305 detectBlackHoles( void )
3306 {
3307     StgTSO *t = all_threads;
3308     StgUpdateFrame *frame;
3309     StgClosure *blocked_on;
3310
3311     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3312
3313         while (t->what_next == ThreadRelocated) {
3314             t = t->link;
3315             ASSERT(get_itbl(t)->type == TSO);
3316         }
3317       
3318         if (t->why_blocked != BlockedOnBlackHole) {
3319             continue;
3320         }
3321
3322         blocked_on = t->block_info.closure;
3323
3324         for (frame = t->su; ; frame = frame->link) {
3325             switch (get_itbl(frame)->type) {
3326
3327             case UPDATE_FRAME:
3328                 if (frame->updatee == blocked_on) {
3329                     /* We are blocking on one of our own computations, so
3330                      * send this thread the NonTermination exception.  
3331                      */
3332                     IF_DEBUG(scheduler, 
3333                              sched_belch("thread %d is blocked on itself", t->id));
3334                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3335                     goto done;
3336                 }
3337                 else {
3338                     continue;
3339                 }
3340
3341             case CATCH_FRAME:
3342             case SEQ_FRAME:
3343                 continue;
3344                 
3345             case STOP_FRAME:
3346                 break;
3347             }
3348             break;
3349         }
3350
3351     done: ;
3352     }   
3353 }
3354
3355 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3356 //@subsection Debugging Routines
3357
3358 /* -----------------------------------------------------------------------------
3359    Debugging: why is a thread blocked
3360    -------------------------------------------------------------------------- */
3361
3362 #ifdef DEBUG
3363
3364 void
3365 printThreadBlockage(StgTSO *tso)
3366 {
3367   switch (tso->why_blocked) {
3368   case BlockedOnRead:
3369     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3370     break;
3371   case BlockedOnWrite:
3372     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3373     break;
3374   case BlockedOnDelay:
3375     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3376     break;
3377   case BlockedOnMVar:
3378     fprintf(stderr,"is blocked on an MVar");
3379     break;
3380   case BlockedOnException:
3381     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3382             tso->block_info.tso->id);
3383     break;
3384   case BlockedOnBlackHole:
3385     fprintf(stderr,"is blocked on a black hole");
3386     break;
3387   case NotBlocked:
3388     fprintf(stderr,"is not blocked");
3389     break;
3390 #if defined(PAR)
3391   case BlockedOnGA:
3392     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3393             tso->block_info.closure, info_type(tso->block_info.closure));
3394     break;
3395   case BlockedOnGA_NoSend:
3396     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3397             tso->block_info.closure, info_type(tso->block_info.closure));
3398     break;
3399 #endif
3400   default:
3401     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3402          tso->why_blocked, tso->id, tso);
3403   }
3404 }
3405
3406 void
3407 printThreadStatus(StgTSO *tso)
3408 {
3409   switch (tso->what_next) {
3410   case ThreadKilled:
3411     fprintf(stderr,"has been killed");
3412     break;
3413   case ThreadComplete:
3414     fprintf(stderr,"has completed");
3415     break;
3416   default:
3417     printThreadBlockage(tso);
3418   }
3419 }
3420
3421 void
3422 printAllThreads(void)
3423 {
3424   StgTSO *t;
3425
3426 # if defined(GRAN)
3427   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3428   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3429                        time_string, rtsFalse/*no commas!*/);
3430
3431   sched_belch("all threads at [%s]:", time_string);
3432 # elif defined(PAR)
3433   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3434   ullong_format_string(CURRENT_TIME,
3435                        time_string, rtsFalse/*no commas!*/);
3436
3437   sched_belch("all threads at [%s]:", time_string);
3438 # else
3439   sched_belch("all threads:");
3440 # endif
3441
3442   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3443     fprintf(stderr, "\tthread %d ", t->id);
3444     printThreadStatus(t);
3445     fprintf(stderr,"\n");
3446   }
3447 }
3448     
3449 /* 
3450    Print a whole blocking queue attached to node (debugging only).
3451 */
3452 //@cindex print_bq
3453 # if defined(PAR)
3454 void 
3455 print_bq (StgClosure *node)
3456 {
3457   StgBlockingQueueElement *bqe;
3458   StgTSO *tso;
3459   rtsBool end;
3460
3461   fprintf(stderr,"## BQ of closure %p (%s): ",
3462           node, info_type(node));
3463
3464   /* should cover all closures that may have a blocking queue */
3465   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3466          get_itbl(node)->type == FETCH_ME_BQ ||
3467          get_itbl(node)->type == RBH ||
3468          get_itbl(node)->type == MVAR);
3469     
3470   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3471
3472   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3473 }
3474
3475 /* 
3476    Print a whole blocking queue starting with the element bqe.
3477 */
3478 void 
3479 print_bqe (StgBlockingQueueElement *bqe)
3480 {
3481   rtsBool end;
3482
3483   /* 
3484      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3485   */
3486   for (end = (bqe==END_BQ_QUEUE);
3487        !end; // iterate until bqe points to a CONSTR
3488        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3489        bqe = end ? END_BQ_QUEUE : bqe->link) {
3490     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3491     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3492     /* types of closures that may appear in a blocking queue */
3493     ASSERT(get_itbl(bqe)->type == TSO ||           
3494            get_itbl(bqe)->type == BLOCKED_FETCH || 
3495            get_itbl(bqe)->type == CONSTR); 
3496     /* only BQs of an RBH end with an RBH_Save closure */
3497     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3498
3499     switch (get_itbl(bqe)->type) {
3500     case TSO:
3501       fprintf(stderr," TSO %u (%x),",
3502               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3503       break;
3504     case BLOCKED_FETCH:
3505       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3506               ((StgBlockedFetch *)bqe)->node, 
3507               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3508               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3509               ((StgBlockedFetch *)bqe)->ga.weight);
3510       break;
3511     case CONSTR:
3512       fprintf(stderr," %s (IP %p),",
3513               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3514                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3515                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3516                "RBH_Save_?"), get_itbl(bqe));
3517       break;
3518     default:
3519       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3520            info_type((StgClosure *)bqe)); // , node, info_type(node));
3521       break;
3522     }
3523   } /* for */
3524   fputc('\n', stderr);
3525 }
3526 # elif defined(GRAN)
3527 void 
3528 print_bq (StgClosure *node)
3529 {
3530   StgBlockingQueueElement *bqe;
3531   PEs node_loc, tso_loc;
3532   rtsBool end;
3533
3534   /* should cover all closures that may have a blocking queue */
3535   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3536          get_itbl(node)->type == FETCH_ME_BQ ||
3537          get_itbl(node)->type == RBH);
3538     
3539   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3540   node_loc = where_is(node);
3541
3542   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3543           node, info_type(node), node_loc);
3544
3545   /* 
3546      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3547   */
3548   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3549        !end; // iterate until bqe points to a CONSTR
3550        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3551     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3552     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3553     /* types of closures that may appear in a blocking queue */
3554     ASSERT(get_itbl(bqe)->type == TSO ||           
3555            get_itbl(bqe)->type == CONSTR); 
3556     /* only BQs of an RBH end with an RBH_Save closure */
3557     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3558
3559     tso_loc = where_is((StgClosure *)bqe);
3560     switch (get_itbl(bqe)->type) {
3561     case TSO:
3562       fprintf(stderr," TSO %d (%p) on [PE %d],",
3563               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3564       break;
3565     case CONSTR:
3566       fprintf(stderr," %s (IP %p),",
3567               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3568                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3569                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3570                "RBH_Save_?"), get_itbl(bqe));
3571       break;
3572     default:
3573       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3574            info_type((StgClosure *)bqe), node, info_type(node));
3575       break;
3576     }
3577   } /* for */
3578   fputc('\n', stderr);
3579 }
3580 #else
3581 /* 
3582    Nice and easy: only TSOs on the blocking queue
3583 */
3584 void 
3585 print_bq (StgClosure *node)
3586 {
3587   StgTSO *tso;
3588
3589   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3590   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3591        tso != END_TSO_QUEUE; 
3592        tso=tso->link) {
3593     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3594     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3595     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3596   }
3597   fputc('\n', stderr);
3598 }
3599 # endif
3600
3601 #if defined(PAR)
3602 static nat
3603 run_queue_len(void)
3604 {
3605   nat i;
3606   StgTSO *tso;
3607
3608   for (i=0, tso=run_queue_hd; 
3609        tso != END_TSO_QUEUE;
3610        i++, tso=tso->link)
3611     /* nothing */
3612
3613   return i;
3614 }
3615 #endif
3616
3617 static void
3618 sched_belch(char *s, ...)
3619 {
3620   va_list ap;
3621   va_start(ap,s);
3622 #ifdef SMP
3623   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3624 #elif defined(PAR)
3625   fprintf(stderr, "== ");
3626 #else
3627   fprintf(stderr, "scheduler: ");
3628 #endif
3629   vfprintf(stderr, s, ap);
3630   fprintf(stderr, "\n");
3631 }
3632
3633 #endif /* DEBUG */
3634
3635
3636 //@node Index,  , Debugging Routines, Main scheduling code
3637 //@subsection Index
3638
3639 //@index
3640 //* MainRegTable::  @cindex\s-+MainRegTable
3641 //* StgMainThread::  @cindex\s-+StgMainThread
3642 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3643 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3644 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3645 //* context_switch::  @cindex\s-+context_switch
3646 //* createThread::  @cindex\s-+createThread
3647 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3648 //* initScheduler::  @cindex\s-+initScheduler
3649 //* interrupted::  @cindex\s-+interrupted
3650 //* next_thread_id::  @cindex\s-+next_thread_id
3651 //* print_bq::  @cindex\s-+print_bq
3652 //* run_queue_hd::  @cindex\s-+run_queue_hd
3653 //* run_queue_tl::  @cindex\s-+run_queue_tl
3654 //* sched_mutex::  @cindex\s-+sched_mutex
3655 //* schedule::  @cindex\s-+schedule
3656 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3657 //* term_mutex::  @cindex\s-+term_mutex
3658 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3659 //@end index