[project @ 2002-02-16 00:30:05 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.130 2002/02/16 00:30:05 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #include <stdarg.h>
118
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
121
122 /* Main threads:
123  *
124  * These are the threads which clients have requested that we run.  
125  *
126  * In a 'threaded' build, we might have several concurrent clients all
127  * waiting for results, and each one will wait on a condition variable
128  * until the result is available.
129  *
130  * In non-SMP, clients are strictly nested: the first client calls
131  * into the RTS, which might call out again to C with a _ccall_GC, and
132  * eventually re-enter the RTS.
133  *
134  * Main threads information is kept in a linked list:
135  */
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
138   StgTSO *         tso;
139   SchedulerStatus  stat;
140   StgClosure **    ret;
141 #if defined(RTS_SUPPORTS_THREADS)
142   Condition        wakeup;
143 #endif
144   struct StgMainThread_ *link;
145 } StgMainThread;
146
147 /* Main thread queue.
148  * Locks required: sched_mutex.
149  */
150 static StgMainThread *main_threads;
151
152 /* Thread queues.
153  * Locks required: sched_mutex.
154  */
155 #if defined(GRAN)
156
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
159
160 /* 
161    In GranSim we have a runnable and a blocked queue for each processor.
162    In order to minimise code changes new arrays run_queue_hds/tls
163    are created. run_queue_hd is then a short cut (macro) for
164    run_queue_hds[CurrentProc] (see GranSim.h).
165    -- HWL
166 */
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171    the std RTS (i.e. we are cheating). However, we don't use this list in
172    the GranSim specific code at the moment (so we are only potentially
173    cheating).  */
174
175 #else /* !GRAN */
176
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
180
181 #endif
182
183 /* Linked list of all threads.
184  * Used for detecting garbage collected threads.
185  */
186 StgTSO *all_threads;
187
188 /* When a thread performs a safe C call (_ccall_GC, using old
189  * terminology), it gets put on the suspended_ccalling_threads
190  * list. Used by the garbage collector.
191  */
192 static StgTSO *suspended_ccalling_threads;
193
194 static StgTSO *threadStackOverflow(StgTSO *tso);
195
196 /* KH: The following two flags are shared memory locations.  There is no need
197        to lock them, since they are only unset at the end of a scheduler
198        operation.
199 */
200
201 /* flag set by signal handler to precipitate a context switch */
202 //@cindex context_switch
203 nat context_switch;
204
205 /* if this flag is set as well, give up execution */
206 //@cindex interrupted
207 rtsBool interrupted;
208
209 /* Next thread ID to allocate.
210  * Locks required: sched_mutex
211  */
212 //@cindex next_thread_id
213 StgThreadID next_thread_id = 1;
214
215 /*
216  * Pointers to the state of the current thread.
217  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
218  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
219  */
220  
221 /* The smallest stack size that makes any sense is:
222  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
223  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
224  *  + 1                       (the realworld token for an IO thread)
225  *  + 1                       (the closure to enter)
226  *
227  * A thread with this stack will bomb immediately with a stack
228  * overflow, which will increase its stack size.  
229  */
230
231 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
232
233
234 #if defined(GRAN)
235 StgTSO *CurrentTSO;
236 #endif
237
238 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
239  *  exists - earlier gccs apparently didn't.
240  *  -= chak
241  */
242 StgTSO dummy_tso;
243
244 rtsBool ready_to_gc;
245
246 void            addToBlockedQueue ( StgTSO *tso );
247
248 static void     schedule          ( void );
249        void     interruptStgRts   ( void );
250 #if defined(GRAN)
251 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
252 #else
253 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
254 #endif
255
256 static void     detectBlackHoles  ( void );
257
258 #ifdef DEBUG
259 static void sched_belch(char *s, ...);
260 #endif
261
262 #if defined(RTS_SUPPORTS_THREADS)
263 /* ToDo: carefully document the invariants that go together
264  *       with these synchronisation objects.
265  */
266 Mutex     sched_mutex       = INIT_MUTEX_VAR;
267 Mutex     term_mutex        = INIT_MUTEX_VAR;
268
269 # if defined(SMP)
270 static Condition gc_pending_cond = INIT_COND_VAR;
271 nat await_death;
272 # endif
273
274 #endif /* RTS_SUPPORTS_THREADS */
275
276 #if defined(PAR)
277 StgTSO *LastTSO;
278 rtsTime TimeOfLastYield;
279 rtsBool emitSchedule = rtsTrue;
280 #endif
281
282 #if DEBUG
283 char *whatNext_strs[] = {
284   "ThreadEnterGHC",
285   "ThreadRunGHC",
286   "ThreadEnterInterp",
287   "ThreadKilled",
288   "ThreadComplete"
289 };
290
291 char *threadReturnCode_strs[] = {
292   "HeapOverflow",                       /* might also be StackOverflow */
293   "StackOverflow",
294   "ThreadYielding",
295   "ThreadBlocked",
296   "ThreadFinished"
297 };
298 #endif
299
300 #if defined(PAR)
301 StgTSO * createSparkThread(rtsSpark spark);
302 StgTSO * activateSpark (rtsSpark spark);  
303 #endif
304
305 /*
306  * The thread state for the main thread.
307 // ToDo: check whether not needed any more
308 StgTSO   *MainTSO;
309  */
310
311 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
312 static void taskStart(void);
313 static void
314 taskStart(void)
315 {
316   schedule();
317 }
318 #endif
319
320
321
322
323 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
324 //@subsection Main scheduling loop
325
326 /* ---------------------------------------------------------------------------
327    Main scheduling loop.
328
329    We use round-robin scheduling, each thread returning to the
330    scheduler loop when one of these conditions is detected:
331
332       * out of heap space
333       * timer expires (thread yields)
334       * thread blocks
335       * thread ends
336       * stack overflow
337
338    Locking notes:  we acquire the scheduler lock once at the beginning
339    of the scheduler loop, and release it when
340     
341       * running a thread, or
342       * waiting for work, or
343       * waiting for a GC to complete.
344
345    GRAN version:
346      In a GranSim setup this loop iterates over the global event queue.
347      This revolves around the global event queue, which determines what 
348      to do next. Therefore, it's more complicated than either the 
349      concurrent or the parallel (GUM) setup.
350
351    GUM version:
352      GUM iterates over incoming messages.
353      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
354      and sends out a fish whenever it has nothing to do; in-between
355      doing the actual reductions (shared code below) it processes the
356      incoming messages and deals with delayed operations 
357      (see PendingFetches).
358      This is not the ugliest code you could imagine, but it's bloody close.
359
360    ------------------------------------------------------------------------ */
361 //@cindex schedule
362 static void
363 schedule( void )
364 {
365   StgTSO *t;
366   Capability *cap;
367   StgThreadReturnCode ret;
368 #if defined(GRAN)
369   rtsEvent *event;
370 #elif defined(PAR)
371   StgSparkPool *pool;
372   rtsSpark spark;
373   StgTSO *tso;
374   GlobalTaskId pe;
375   rtsBool receivedFinish = rtsFalse;
376 # if defined(DEBUG)
377   nat tp_size, sp_size; // stats only
378 # endif
379 #endif
380   rtsBool was_interrupted = rtsFalse;
381   
382   ACQUIRE_LOCK(&sched_mutex);
383  
384 #if defined(RTS_SUPPORTS_THREADS)
385   /* Check to see whether there are any worker threads
386      waiting to deposit external call results. If so,
387      yield our capability */
388   yieldToReturningWorker(&sched_mutex, cap);
389
390   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
391 #endif
392
393 #if defined(GRAN)
394   /* set up first event to get things going */
395   /* ToDo: assign costs for system setup and init MainTSO ! */
396   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
397             ContinueThread, 
398             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
399
400   IF_DEBUG(gran,
401            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
402            G_TSO(CurrentTSO, 5));
403
404   if (RtsFlags.GranFlags.Light) {
405     /* Save current time; GranSim Light only */
406     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
407   }      
408
409   event = get_next_event();
410
411   while (event!=(rtsEvent*)NULL) {
412     /* Choose the processor with the next event */
413     CurrentProc = event->proc;
414     CurrentTSO = event->tso;
415
416 #elif defined(PAR)
417
418   while (!receivedFinish) {    /* set by processMessages */
419                                /* when receiving PP_FINISH message         */ 
420 #else
421
422   while (1) {
423
424 #endif
425
426     IF_DEBUG(scheduler, printAllThreads());
427
428     /* If we're interrupted (the user pressed ^C, or some other
429      * termination condition occurred), kill all the currently running
430      * threads.
431      */
432     if (interrupted) {
433       IF_DEBUG(scheduler, sched_belch("interrupted"));
434       deleteAllThreads();
435       interrupted = rtsFalse;
436       was_interrupted = rtsTrue;
437     }
438
439     /* Go through the list of main threads and wake up any
440      * clients whose computations have finished.  ToDo: this
441      * should be done more efficiently without a linear scan
442      * of the main threads list, somehow...
443      */
444 #if defined(RTS_SUPPORTS_THREADS)
445     { 
446       StgMainThread *m, **prev;
447       prev = &main_threads;
448       for (m = main_threads; m != NULL; m = m->link) {
449         switch (m->tso->what_next) {
450         case ThreadComplete:
451           if (m->ret) {
452             *(m->ret) = (StgClosure *)m->tso->sp[0];
453           }
454           *prev = m->link;
455           m->stat = Success;
456           broadcastCondition(&m->wakeup);
457           break;
458         case ThreadKilled:
459           if (m->ret) *(m->ret) = NULL;
460           *prev = m->link;
461           if (was_interrupted) {
462             m->stat = Interrupted;
463           } else {
464             m->stat = Killed;
465           }
466           broadcastCondition(&m->wakeup);
467           break;
468         default:
469           break;
470         }
471       }
472     }
473
474 #else /* not threaded */
475
476 # if defined(PAR)
477     /* in GUM do this only on the Main PE */
478     if (IAmMainThread)
479 # endif
480     /* If our main thread has finished or been killed, return.
481      */
482     {
483       StgMainThread *m = main_threads;
484       if (m->tso->what_next == ThreadComplete
485           || m->tso->what_next == ThreadKilled) {
486         main_threads = main_threads->link;
487         if (m->tso->what_next == ThreadComplete) {
488           /* we finished successfully, fill in the return value */
489           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
490           m->stat = Success;
491           return;
492         } else {
493           if (m->ret) { *(m->ret) = NULL; };
494           if (was_interrupted) {
495             m->stat = Interrupted;
496           } else {
497             m->stat = Killed;
498           }
499           return;
500         }
501       }
502     }
503 #endif
504
505     /* Top up the run queue from our spark pool.  We try to make the
506      * number of threads in the run queue equal to the number of
507      * free capabilities.
508      *
509      * Disable spark support in SMP for now, non-essential & requires
510      * a little bit of work to make it compile cleanly. -- sof 1/02.
511      */
512 #if 0 /* defined(SMP) */
513     {
514       nat n = getFreeCapabilities();
515       StgTSO *tso = run_queue_hd;
516
517       /* Count the run queue */
518       while (n > 0 && tso != END_TSO_QUEUE) {
519         tso = tso->link;
520         n--;
521       }
522
523       for (; n > 0; n--) {
524         StgClosure *spark;
525         spark = findSpark(rtsFalse);
526         if (spark == NULL) {
527           break; /* no more sparks in the pool */
528         } else {
529           /* I'd prefer this to be done in activateSpark -- HWL */
530           /* tricky - it needs to hold the scheduler lock and
531            * not try to re-acquire it -- SDM */
532           createSparkThread(spark);       
533           IF_DEBUG(scheduler,
534                    sched_belch("==^^ turning spark of closure %p into a thread",
535                                (StgClosure *)spark));
536         }
537       }
538       /* We need to wake up the other tasks if we just created some
539        * work for them.
540        */
541       if (getFreeCapabilities() - n > 1) {
542           signalCondition( &thread_ready_cond );
543       }
544     }
545 #endif // SMP
546
547     /* check for signals each time around the scheduler */
548 #ifndef mingw32_TARGET_OS
549     if (signals_pending()) {
550       startSignalHandlers();
551     }
552 #endif
553
554     /* Check whether any waiting threads need to be woken up.  If the
555      * run queue is empty, and there are no other tasks running, we
556      * can wait indefinitely for something to happen.
557      * ToDo: what if another client comes along & requests another
558      * main thread?
559      */
560     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
561       awaitEvent( EMPTY_RUN_QUEUE()
562 #if defined(SMP)
563         && allFreeCapabilities()
564 #endif
565         );
566     }
567     /* we can be interrupted while waiting for I/O... */
568     if (interrupted) continue;
569
570     /* 
571      * Detect deadlock: when we have no threads to run, there are no
572      * threads waiting on I/O or sleeping, and all the other tasks are
573      * waiting for work, we must have a deadlock of some description.
574      *
575      * We first try to find threads blocked on themselves (ie. black
576      * holes), and generate NonTermination exceptions where necessary.
577      *
578      * If no threads are black holed, we have a deadlock situation, so
579      * inform all the main threads.
580      */
581 #ifndef PAR
582     if (   EMPTY_RUN_QUEUE()
583         && EMPTY_QUEUE(blocked_queue_hd)
584         && EMPTY_QUEUE(sleeping_queue)
585 #if defined(RTS_SUPPORTS_THREADS)
586         && EMPTY_QUEUE(suspended_ccalling_threads)
587 #endif
588 #ifdef SMP
589         && allFreeCapabilities()
590 #endif
591         )
592     {
593         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
594 #if defined(THREADED_RTS)
595         /* and SMP mode ..? */
596         releaseCapability(cap);
597 #endif
598         RELEASE_LOCK(&sched_mutex);
599         GarbageCollect(GetRoots,rtsTrue);
600         ACQUIRE_LOCK(&sched_mutex);
601         if (   EMPTY_QUEUE(blocked_queue_hd)
602             && EMPTY_RUN_QUEUE()
603             && EMPTY_QUEUE(sleeping_queue) ) {
604
605             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
606             detectBlackHoles();
607
608             /* No black holes, so probably a real deadlock.  Send the
609              * current main thread the Deadlock exception (or in the SMP
610              * build, send *all* main threads the deadlock exception,
611              * since none of them can make progress).
612              */
613             if ( EMPTY_RUN_QUEUE() ) {
614                 StgMainThread *m;
615 #if defined(RTS_SUPPORTS_THREADS)
616                 for (m = main_threads; m != NULL; m = m->link) {
617                     switch (m->tso->why_blocked) {
618                     case BlockedOnBlackHole:
619                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
620                         break;
621                     case BlockedOnException:
622                     case BlockedOnMVar:
623                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
624                         break;
625                     default:
626                         barf("deadlock: main thread blocked in a strange way");
627                     }
628                 }
629 #else
630                 m = main_threads;
631                 switch (m->tso->why_blocked) {
632                 case BlockedOnBlackHole:
633                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
634                     break;
635                 case BlockedOnException:
636                 case BlockedOnMVar:
637                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
638                     break;
639                 default:
640                     barf("deadlock: main thread blocked in a strange way");
641                 }
642 #endif
643             }
644 #if defined(RTS_SUPPORTS_THREADS)
645             /* ToDo: revisit conditions (and mechanism) for shutting
646                down a multi-threaded world  */
647             if ( EMPTY_RUN_QUEUE() ) {
648               IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
649               shutdownHaskellAndExit(0);
650             }
651 #endif
652             ASSERT( !EMPTY_RUN_QUEUE() );
653         }
654     }
655 #elif defined(PAR)
656     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
657 #endif
658
659 #if defined(SMP)
660     /* If there's a GC pending, don't do anything until it has
661      * completed.
662      */
663     if (ready_to_gc) {
664       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
665       waitCondition( &gc_pending_cond, &sched_mutex );
666     }
667 #endif    
668
669 #if defined(RTS_SUPPORTS_THREADS)
670     /* block until we've got a thread on the run queue and a free
671      * capability.
672      *
673      */
674     if ( EMPTY_RUN_QUEUE() ) {
675       /* Give up our capability */
676       releaseCapability(cap);
677       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
678       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
679       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
680 #if 0
681       while ( EMPTY_RUN_QUEUE() ) {
682         waitForWorkCapability(&sched_mutex, &cap);
683         IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
684       }
685 #endif
686     }
687 #endif
688
689 #if defined(GRAN)
690     if (RtsFlags.GranFlags.Light)
691       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
692
693     /* adjust time based on time-stamp */
694     if (event->time > CurrentTime[CurrentProc] &&
695         event->evttype != ContinueThread)
696       CurrentTime[CurrentProc] = event->time;
697     
698     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
699     if (!RtsFlags.GranFlags.Light)
700       handleIdlePEs();
701
702     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
703
704     /* main event dispatcher in GranSim */
705     switch (event->evttype) {
706       /* Should just be continuing execution */
707     case ContinueThread:
708       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
709       /* ToDo: check assertion
710       ASSERT(run_queue_hd != (StgTSO*)NULL &&
711              run_queue_hd != END_TSO_QUEUE);
712       */
713       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
714       if (!RtsFlags.GranFlags.DoAsyncFetch &&
715           procStatus[CurrentProc]==Fetching) {
716         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
717               CurrentTSO->id, CurrentTSO, CurrentProc);
718         goto next_thread;
719       } 
720       /* Ignore ContinueThreads for completed threads */
721       if (CurrentTSO->what_next == ThreadComplete) {
722         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
723               CurrentTSO->id, CurrentTSO, CurrentProc);
724         goto next_thread;
725       } 
726       /* Ignore ContinueThreads for threads that are being migrated */
727       if (PROCS(CurrentTSO)==Nowhere) { 
728         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
729               CurrentTSO->id, CurrentTSO, CurrentProc);
730         goto next_thread;
731       }
732       /* The thread should be at the beginning of the run queue */
733       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
734         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
735               CurrentTSO->id, CurrentTSO, CurrentProc);
736         break; // run the thread anyway
737       }
738       /*
739       new_event(proc, proc, CurrentTime[proc],
740                 FindWork,
741                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
742       goto next_thread; 
743       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
744       break; // now actually run the thread; DaH Qu'vam yImuHbej 
745
746     case FetchNode:
747       do_the_fetchnode(event);
748       goto next_thread;             /* handle next event in event queue  */
749       
750     case GlobalBlock:
751       do_the_globalblock(event);
752       goto next_thread;             /* handle next event in event queue  */
753       
754     case FetchReply:
755       do_the_fetchreply(event);
756       goto next_thread;             /* handle next event in event queue  */
757       
758     case UnblockThread:   /* Move from the blocked queue to the tail of */
759       do_the_unblock(event);
760       goto next_thread;             /* handle next event in event queue  */
761       
762     case ResumeThread:  /* Move from the blocked queue to the tail of */
763       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
764       event->tso->gran.blocktime += 
765         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
766       do_the_startthread(event);
767       goto next_thread;             /* handle next event in event queue  */
768       
769     case StartThread:
770       do_the_startthread(event);
771       goto next_thread;             /* handle next event in event queue  */
772       
773     case MoveThread:
774       do_the_movethread(event);
775       goto next_thread;             /* handle next event in event queue  */
776       
777     case MoveSpark:
778       do_the_movespark(event);
779       goto next_thread;             /* handle next event in event queue  */
780       
781     case FindWork:
782       do_the_findwork(event);
783       goto next_thread;             /* handle next event in event queue  */
784       
785     default:
786       barf("Illegal event type %u\n", event->evttype);
787     }  /* switch */
788     
789     /* This point was scheduler_loop in the old RTS */
790
791     IF_DEBUG(gran, belch("GRAN: after main switch"));
792
793     TimeOfLastEvent = CurrentTime[CurrentProc];
794     TimeOfNextEvent = get_time_of_next_event();
795     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
796     // CurrentTSO = ThreadQueueHd;
797
798     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
799                          TimeOfNextEvent));
800
801     if (RtsFlags.GranFlags.Light) 
802       GranSimLight_leave_system(event, &ActiveTSO); 
803
804     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
805
806     IF_DEBUG(gran, 
807              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
808
809     /* in a GranSim setup the TSO stays on the run queue */
810     t = CurrentTSO;
811     /* Take a thread from the run queue. */
812     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
813
814     IF_DEBUG(gran, 
815              fprintf(stderr, "GRAN: About to run current thread, which is\n");
816              G_TSO(t,5));
817
818     context_switch = 0; // turned on via GranYield, checking events and time slice
819
820     IF_DEBUG(gran, 
821              DumpGranEvent(GR_SCHEDULE, t));
822
823     procStatus[CurrentProc] = Busy;
824
825 #elif defined(PAR)
826     if (PendingFetches != END_BF_QUEUE) {
827         processFetches();
828     }
829
830     /* ToDo: phps merge with spark activation above */
831     /* check whether we have local work and send requests if we have none */
832     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
833       /* :-[  no local threads => look out for local sparks */
834       /* the spark pool for the current PE */
835       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
836       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
837           pool->hd < pool->tl) {
838         /* 
839          * ToDo: add GC code check that we really have enough heap afterwards!!
840          * Old comment:
841          * If we're here (no runnable threads) and we have pending
842          * sparks, we must have a space problem.  Get enough space
843          * to turn one of those pending sparks into a
844          * thread... 
845          */
846
847         spark = findSpark(rtsFalse);                /* get a spark */
848         if (spark != (rtsSpark) NULL) {
849           tso = activateSpark(spark);       /* turn the spark into a thread */
850           IF_PAR_DEBUG(schedule,
851                        belch("==== schedule: Created TSO %d (%p); %d threads active",
852                              tso->id, tso, advisory_thread_count));
853
854           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
855             belch("==^^ failed to activate spark");
856             goto next_thread;
857           }               /* otherwise fall through & pick-up new tso */
858         } else {
859           IF_PAR_DEBUG(verbose,
860                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
861                              spark_queue_len(pool)));
862           goto next_thread;
863         }
864       }
865
866       /* If we still have no work we need to send a FISH to get a spark
867          from another PE 
868       */
869       if (EMPTY_RUN_QUEUE()) {
870       /* =8-[  no local sparks => look for work on other PEs */
871         /*
872          * We really have absolutely no work.  Send out a fish
873          * (there may be some out there already), and wait for
874          * something to arrive.  We clearly can't run any threads
875          * until a SCHEDULE or RESUME arrives, and so that's what
876          * we're hoping to see.  (Of course, we still have to
877          * respond to other types of messages.)
878          */
879         TIME now = msTime() /*CURRENT_TIME*/;
880         IF_PAR_DEBUG(verbose, 
881                      belch("--  now=%ld", now));
882         IF_PAR_DEBUG(verbose,
883                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
884                          (last_fish_arrived_at!=0 &&
885                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
886                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
887                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
888                              last_fish_arrived_at,
889                              RtsFlags.ParFlags.fishDelay, now);
890                      });
891         
892         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
893             (last_fish_arrived_at==0 ||
894              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
895           /* outstandingFishes is set in sendFish, processFish;
896              avoid flooding system with fishes via delay */
897           pe = choosePE();
898           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
899                    NEW_FISH_HUNGER);
900
901           // Global statistics: count no. of fishes
902           if (RtsFlags.ParFlags.ParStats.Global &&
903               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
904             globalParStats.tot_fish_mess++;
905           }
906         }
907       
908         receivedFinish = processMessages();
909         goto next_thread;
910       }
911     } else if (PacketsWaiting()) {  /* Look for incoming messages */
912       receivedFinish = processMessages();
913     }
914
915     /* Now we are sure that we have some work available */
916     ASSERT(run_queue_hd != END_TSO_QUEUE);
917
918     /* Take a thread from the run queue, if we have work */
919     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
920     IF_DEBUG(sanity,checkTSO(t));
921
922     /* ToDo: write something to the log-file
923     if (RTSflags.ParFlags.granSimStats && !sameThread)
924         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
925
926     CurrentTSO = t;
927     */
928     /* the spark pool for the current PE */
929     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
930
931     IF_DEBUG(scheduler, 
932              belch("--=^ %d threads, %d sparks on [%#x]", 
933                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
934
935 # if 1
936     if (0 && RtsFlags.ParFlags.ParStats.Full && 
937         t && LastTSO && t->id != LastTSO->id && 
938         LastTSO->why_blocked == NotBlocked && 
939         LastTSO->what_next != ThreadComplete) {
940       // if previously scheduled TSO not blocked we have to record the context switch
941       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
942                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
943     }
944
945     if (RtsFlags.ParFlags.ParStats.Full && 
946         (emitSchedule /* forced emit */ ||
947         (t && LastTSO && t->id != LastTSO->id))) {
948       /* 
949          we are running a different TSO, so write a schedule event to log file
950          NB: If we use fair scheduling we also have to write  a deschedule 
951              event for LastTSO; with unfair scheduling we know that the
952              previous tso has blocked whenever we switch to another tso, so
953              we don't need it in GUM for now
954       */
955       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
956                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
957       emitSchedule = rtsFalse;
958     }
959      
960 # endif
961 #else /* !GRAN && !PAR */
962   
963     /* grab a thread from the run queue */
964     ASSERT(run_queue_hd != END_TSO_QUEUE);
965     t = POP_RUN_QUEUE();
966     // Sanity check the thread we're about to run.  This can be
967     // expensive if there is lots of thread switching going on...
968     IF_DEBUG(sanity,checkTSO(t));
969 #endif
970     
971     grabCapability(&cap);
972     cap->r.rCurrentTSO = t;
973     
974     /* context switches are now initiated by the timer signal, unless
975      * the user specified "context switch as often as possible", with
976      * +RTS -C0
977      */
978     if (
979 #ifdef PROFILING
980         RtsFlags.ProfFlags.profileInterval == 0 ||
981 #endif
982         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
983          && (run_queue_hd != END_TSO_QUEUE
984              || blocked_queue_hd != END_TSO_QUEUE
985              || sleeping_queue != END_TSO_QUEUE)))
986         context_switch = 1;
987     else
988         context_switch = 0;
989
990     RELEASE_LOCK(&sched_mutex);
991
992     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
993                               t->id, t, whatNext_strs[t->what_next]));
994
995 #ifdef PROFILING
996     startHeapProfTimer();
997 #endif
998
999     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1000     /* Run the current thread 
1001      */
1002     switch (cap->r.rCurrentTSO->what_next) {
1003     case ThreadKilled:
1004     case ThreadComplete:
1005         /* Thread already finished, return to scheduler. */
1006         ret = ThreadFinished;
1007         break;
1008     case ThreadEnterGHC:
1009         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1010         break;
1011     case ThreadRunGHC:
1012         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1013         break;
1014     case ThreadEnterInterp:
1015         ret = interpretBCO(cap);
1016         break;
1017     default:
1018       barf("schedule: invalid what_next field");
1019     }
1020     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1021     
1022     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1023 #ifdef PROFILING
1024     stopHeapProfTimer();
1025     CCCS = CCS_SYSTEM;
1026 #endif
1027     
1028     ACQUIRE_LOCK(&sched_mutex);
1029
1030 #ifdef SMP
1031     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1032 #elif !defined(GRAN) && !defined(PAR)
1033     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1034 #endif
1035     t = cap->r.rCurrentTSO;
1036     
1037 #if defined(PAR)
1038     /* HACK 675: if the last thread didn't yield, make sure to print a 
1039        SCHEDULE event to the log file when StgRunning the next thread, even
1040        if it is the same one as before */
1041     LastTSO = t; 
1042     TimeOfLastYield = CURRENT_TIME;
1043 #endif
1044
1045     switch (ret) {
1046     case HeapOverflow:
1047 #if defined(GRAN)
1048       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1049       globalGranStats.tot_heapover++;
1050 #elif defined(PAR)
1051       globalParStats.tot_heapover++;
1052 #endif
1053
1054       // did the task ask for a large block?
1055       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1056           // if so, get one and push it on the front of the nursery.
1057           bdescr *bd;
1058           nat blocks;
1059           
1060           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1061
1062           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1063                                    t->id, t,
1064                                    whatNext_strs[t->what_next], blocks));
1065
1066           // don't do this if it would push us over the
1067           // alloc_blocks_lim limit; we'll GC first.
1068           if (alloc_blocks + blocks < alloc_blocks_lim) {
1069
1070               alloc_blocks += blocks;
1071               bd = allocGroup( blocks );
1072
1073               // link the new group into the list
1074               bd->link = cap->r.rCurrentNursery;
1075               bd->u.back = cap->r.rCurrentNursery->u.back;
1076               if (cap->r.rCurrentNursery->u.back != NULL) {
1077                   cap->r.rCurrentNursery->u.back->link = bd;
1078               } else {
1079                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1080                          g0s0->blocks == cap->r.rNursery);
1081                   cap->r.rNursery = g0s0->blocks = bd;
1082               }           
1083               cap->r.rCurrentNursery->u.back = bd;
1084
1085               // initialise it as a nursery block
1086               bd->step = g0s0;
1087               bd->gen_no = 0;
1088               bd->flags = 0;
1089               bd->free = bd->start;
1090
1091               // don't forget to update the block count in g0s0.
1092               g0s0->n_blocks += blocks;
1093               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1094
1095               // now update the nursery to point to the new block
1096               cap->r.rCurrentNursery = bd;
1097
1098               // we might be unlucky and have another thread get on the
1099               // run queue before us and steal the large block, but in that
1100               // case the thread will just end up requesting another large
1101               // block.
1102               PUSH_ON_RUN_QUEUE(t);
1103               break;
1104           }
1105       }
1106
1107       /* make all the running tasks block on a condition variable,
1108        * maybe set context_switch and wait till they all pile in,
1109        * then have them wait on a GC condition variable.
1110        */
1111       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1112                                t->id, t, whatNext_strs[t->what_next]));
1113       threadPaused(t);
1114 #if defined(GRAN)
1115       ASSERT(!is_on_queue(t,CurrentProc));
1116 #elif defined(PAR)
1117       /* Currently we emit a DESCHEDULE event before GC in GUM.
1118          ToDo: either add separate event to distinguish SYSTEM time from rest
1119                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1120       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1121         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1122                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1123         emitSchedule = rtsTrue;
1124       }
1125 #endif
1126       
1127       ready_to_gc = rtsTrue;
1128       context_switch = 1;               /* stop other threads ASAP */
1129       PUSH_ON_RUN_QUEUE(t);
1130       /* actual GC is done at the end of the while loop */
1131       break;
1132       
1133     case StackOverflow:
1134 #if defined(GRAN)
1135       IF_DEBUG(gran, 
1136                DumpGranEvent(GR_DESCHEDULE, t));
1137       globalGranStats.tot_stackover++;
1138 #elif defined(PAR)
1139       // IF_DEBUG(par, 
1140       // DumpGranEvent(GR_DESCHEDULE, t);
1141       globalParStats.tot_stackover++;
1142 #endif
1143       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1144                                t->id, t, whatNext_strs[t->what_next]));
1145       /* just adjust the stack for this thread, then pop it back
1146        * on the run queue.
1147        */
1148       threadPaused(t);
1149       { 
1150         StgMainThread *m;
1151         /* enlarge the stack */
1152         StgTSO *new_t = threadStackOverflow(t);
1153         
1154         /* This TSO has moved, so update any pointers to it from the
1155          * main thread stack.  It better not be on any other queues...
1156          * (it shouldn't be).
1157          */
1158         for (m = main_threads; m != NULL; m = m->link) {
1159           if (m->tso == t) {
1160             m->tso = new_t;
1161           }
1162         }
1163         threadPaused(new_t);
1164         PUSH_ON_RUN_QUEUE(new_t);
1165       }
1166       break;
1167
1168     case ThreadYielding:
1169 #if defined(GRAN)
1170       IF_DEBUG(gran, 
1171                DumpGranEvent(GR_DESCHEDULE, t));
1172       globalGranStats.tot_yields++;
1173 #elif defined(PAR)
1174       // IF_DEBUG(par, 
1175       // DumpGranEvent(GR_DESCHEDULE, t);
1176       globalParStats.tot_yields++;
1177 #endif
1178       /* put the thread back on the run queue.  Then, if we're ready to
1179        * GC, check whether this is the last task to stop.  If so, wake
1180        * up the GC thread.  getThread will block during a GC until the
1181        * GC is finished.
1182        */
1183       IF_DEBUG(scheduler,
1184                if (t->what_next == ThreadEnterInterp) {
1185                    /* ToDo: or maybe a timer expired when we were in Hugs?
1186                     * or maybe someone hit ctrl-C
1187                     */
1188                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1189                          t->id, t, whatNext_strs[t->what_next]);
1190                } else {
1191                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1192                          t->id, t, whatNext_strs[t->what_next]);
1193                }
1194                );
1195
1196       threadPaused(t);
1197
1198       IF_DEBUG(sanity,
1199                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1200                checkTSO(t));
1201       ASSERT(t->link == END_TSO_QUEUE);
1202 #if defined(GRAN)
1203       ASSERT(!is_on_queue(t,CurrentProc));
1204
1205       IF_DEBUG(sanity,
1206                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1207                checkThreadQsSanity(rtsTrue));
1208 #endif
1209 #if defined(PAR)
1210       if (RtsFlags.ParFlags.doFairScheduling) { 
1211         /* this does round-robin scheduling; good for concurrency */
1212         APPEND_TO_RUN_QUEUE(t);
1213       } else {
1214         /* this does unfair scheduling; good for parallelism */
1215         PUSH_ON_RUN_QUEUE(t);
1216       }
1217 #else
1218       /* this does round-robin scheduling; good for concurrency */
1219       APPEND_TO_RUN_QUEUE(t);
1220 #endif
1221 #if defined(GRAN)
1222       /* add a ContinueThread event to actually process the thread */
1223       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1224                 ContinueThread,
1225                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1226       IF_GRAN_DEBUG(bq, 
1227                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1228                G_EVENTQ(0);
1229                G_CURR_THREADQ(0));
1230 #endif /* GRAN */
1231       break;
1232       
1233     case ThreadBlocked:
1234 #if defined(GRAN)
1235       IF_DEBUG(scheduler,
1236                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1237                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1238                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1239
1240       // ??? needed; should emit block before
1241       IF_DEBUG(gran, 
1242                DumpGranEvent(GR_DESCHEDULE, t)); 
1243       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1244       /*
1245         ngoq Dogh!
1246       ASSERT(procStatus[CurrentProc]==Busy || 
1247               ((procStatus[CurrentProc]==Fetching) && 
1248               (t->block_info.closure!=(StgClosure*)NULL)));
1249       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1250           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1251             procStatus[CurrentProc]==Fetching)) 
1252         procStatus[CurrentProc] = Idle;
1253       */
1254 #elif defined(PAR)
1255       IF_DEBUG(scheduler,
1256                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1257                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1258       IF_PAR_DEBUG(bq,
1259
1260                    if (t->block_info.closure!=(StgClosure*)NULL) 
1261                      print_bq(t->block_info.closure));
1262
1263       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1264       blockThread(t);
1265
1266       /* whatever we schedule next, we must log that schedule */
1267       emitSchedule = rtsTrue;
1268
1269 #else /* !GRAN */
1270       /* don't need to do anything.  Either the thread is blocked on
1271        * I/O, in which case we'll have called addToBlockedQueue
1272        * previously, or it's blocked on an MVar or Blackhole, in which
1273        * case it'll be on the relevant queue already.
1274        */
1275       IF_DEBUG(scheduler,
1276                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1277                printThreadBlockage(t);
1278                fprintf(stderr, "\n"));
1279
1280       /* Only for dumping event to log file 
1281          ToDo: do I need this in GranSim, too?
1282       blockThread(t);
1283       */
1284 #endif
1285       threadPaused(t);
1286       break;
1287       
1288     case ThreadFinished:
1289       /* Need to check whether this was a main thread, and if so, signal
1290        * the task that started it with the return value.  If we have no
1291        * more main threads, we probably need to stop all the tasks until
1292        * we get a new one.
1293        */
1294       /* We also end up here if the thread kills itself with an
1295        * uncaught exception, see Exception.hc.
1296        */
1297       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1298 #if defined(GRAN)
1299       endThread(t, CurrentProc); // clean-up the thread
1300 #elif defined(PAR)
1301       /* For now all are advisory -- HWL */
1302       //if(t->priority==AdvisoryPriority) ??
1303       advisory_thread_count--;
1304       
1305 # ifdef DIST
1306       if(t->dist.priority==RevalPriority)
1307         FinishReval(t);
1308 # endif
1309       
1310       if (RtsFlags.ParFlags.ParStats.Full &&
1311           !RtsFlags.ParFlags.ParStats.Suppressed) 
1312         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1313 #endif
1314       break;
1315       
1316     default:
1317       barf("schedule: invalid thread return code %d", (int)ret);
1318     }
1319     
1320 #if defined(RTS_SUPPORTS_THREADS)
1321     /* I don't understand what this re-grab is doing -- sof */
1322     grabCapability(&cap);
1323 #endif
1324
1325 #ifdef PROFILING
1326     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1327         GarbageCollect(GetRoots, rtsTrue);
1328         heapCensus();
1329         performHeapProfile = rtsFalse;
1330         ready_to_gc = rtsFalse; // we already GC'd
1331     }
1332 #endif
1333
1334     if (ready_to_gc 
1335 #ifdef SMP
1336         && allFreeCapabilities() 
1337 #endif
1338         ) {
1339       /* everybody back, start the GC.
1340        * Could do it in this thread, or signal a condition var
1341        * to do it in another thread.  Either way, we need to
1342        * broadcast on gc_pending_cond afterward.
1343        */
1344 #if defined(RTS_SUPPORTS_THREADS)
1345       IF_DEBUG(scheduler,sched_belch("doing GC"));
1346 #endif
1347       RELEASE_LOCK(&sched_mutex);
1348       GarbageCollect(GetRoots,rtsFalse);
1349       ACQUIRE_LOCK(&sched_mutex);
1350       ready_to_gc = rtsFalse;
1351 #ifdef SMP
1352       broadcastCondition(&gc_pending_cond);
1353 #endif
1354 #if defined(GRAN)
1355       /* add a ContinueThread event to continue execution of current thread */
1356       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1357                 ContinueThread,
1358                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1359       IF_GRAN_DEBUG(bq, 
1360                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1361                G_EVENTQ(0);
1362                G_CURR_THREADQ(0));
1363 #endif /* GRAN */
1364     }
1365
1366 #if defined(GRAN)
1367   next_thread:
1368     IF_GRAN_DEBUG(unused,
1369                   print_eventq(EventHd));
1370
1371     event = get_next_event();
1372 #elif defined(PAR)
1373   next_thread:
1374     /* ToDo: wait for next message to arrive rather than busy wait */
1375 #endif /* GRAN */
1376
1377   } /* end of while(1) */
1378
1379   IF_PAR_DEBUG(verbose,
1380                belch("== Leaving schedule() after having received Finish"));
1381 }
1382
1383 /* ---------------------------------------------------------------------------
1384  * deleteAllThreads():  kill all the live threads.
1385  *
1386  * This is used when we catch a user interrupt (^C), before performing
1387  * any necessary cleanups and running finalizers.
1388  * ------------------------------------------------------------------------- */
1389    
1390 void deleteAllThreads ( void )
1391 {
1392   StgTSO* t, *next;
1393   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1394   for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1395       next = t->link;
1396       deleteThread(t);
1397   }
1398   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1399       next = t->link;
1400       deleteThread(t);
1401   }
1402   for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1403       next = t->link;
1404       deleteThread(t);
1405   }
1406   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1407   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1408   sleeping_queue = END_TSO_QUEUE;
1409 }
1410
1411 /* startThread and  insertThread are now in GranSim.c -- HWL */
1412
1413
1414 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1415 //@subsection Suspend and Resume
1416
1417 /* ---------------------------------------------------------------------------
1418  * Suspending & resuming Haskell threads.
1419  * 
1420  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1421  * its capability before calling the C function.  This allows another
1422  * task to pick up the capability and carry on running Haskell
1423  * threads.  It also means that if the C call blocks, it won't lock
1424  * the whole system.
1425  *
1426  * The Haskell thread making the C call is put to sleep for the
1427  * duration of the call, on the susepended_ccalling_threads queue.  We
1428  * give out a token to the task, which it can use to resume the thread
1429  * on return from the C function.
1430  * ------------------------------------------------------------------------- */
1431    
1432 StgInt
1433 suspendThread( StgRegTable *reg, rtsBool concCall )
1434 {
1435   nat tok;
1436   Capability *cap;
1437
1438   /* assume that *reg is a pointer to the StgRegTable part
1439    * of a Capability.
1440    */
1441   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1442
1443   ACQUIRE_LOCK(&sched_mutex);
1444
1445   IF_DEBUG(scheduler,
1446            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1447
1448   threadPaused(cap->r.rCurrentTSO);
1449   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1450   suspended_ccalling_threads = cap->r.rCurrentTSO;
1451
1452 #if defined(RTS_SUPPORTS_THREADS)
1453   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1454 #endif
1455
1456   /* Use the thread ID as the token; it should be unique */
1457   tok = cap->r.rCurrentTSO->id;
1458
1459   /* Hand back capability */
1460   releaseCapability(cap);
1461   
1462 #if defined(RTS_SUPPORTS_THREADS)
1463   /* Preparing to leave the RTS, so ensure there's a native thread/task
1464      waiting to take over.
1465      
1466      ToDo: optimise this and only create a new task if there's a need
1467      for one (i.e., if there's only one Concurrent Haskell thread alive,
1468      there's no need to create a new task).
1469   */
1470   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1471   if (concCall) {
1472     startTask(taskStart);
1473   }
1474 #endif
1475
1476   /* Other threads _might_ be available for execution; signal this */
1477   THREAD_RUNNABLE();
1478   RELEASE_LOCK(&sched_mutex);
1479   return tok; 
1480 }
1481
1482 StgRegTable *
1483 resumeThread( StgInt tok, rtsBool concCall )
1484 {
1485   StgTSO *tso, **prev;
1486   Capability *cap;
1487
1488 #if defined(RTS_SUPPORTS_THREADS)
1489   /* Wait for permission to re-enter the RTS with the result. */
1490   if ( concCall ) {
1491     grabReturnCapability(&sched_mutex, &cap);
1492   } else {
1493     grabCapability(&cap);
1494   }
1495 #else
1496   grabCapability(&cap);
1497 #endif
1498
1499   /* Remove the thread off of the suspended list */
1500   prev = &suspended_ccalling_threads;
1501   for (tso = suspended_ccalling_threads; 
1502        tso != END_TSO_QUEUE; 
1503        prev = &tso->link, tso = tso->link) {
1504     if (tso->id == (StgThreadID)tok) {
1505       *prev = tso->link;
1506       break;
1507     }
1508   }
1509   if (tso == END_TSO_QUEUE) {
1510     barf("resumeThread: thread not found");
1511   }
1512   tso->link = END_TSO_QUEUE;
1513   /* Reset blocking status */
1514   tso->why_blocked  = NotBlocked;
1515
1516   RELEASE_LOCK(&sched_mutex);
1517
1518   cap->r.rCurrentTSO = tso;
1519   return &cap->r;
1520 }
1521
1522
1523 /* ---------------------------------------------------------------------------
1524  * Static functions
1525  * ------------------------------------------------------------------------ */
1526 static void unblockThread(StgTSO *tso);
1527
1528 /* ---------------------------------------------------------------------------
1529  * Comparing Thread ids.
1530  *
1531  * This is used from STG land in the implementation of the
1532  * instances of Eq/Ord for ThreadIds.
1533  * ------------------------------------------------------------------------ */
1534
1535 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1536
1537   StgThreadID id1 = tso1->id; 
1538   StgThreadID id2 = tso2->id;
1539  
1540   if (id1 < id2) return (-1);
1541   if (id1 > id2) return 1;
1542   return 0;
1543 }
1544
1545 /* ---------------------------------------------------------------------------
1546  * Fetching the ThreadID from an StgTSO.
1547  *
1548  * This is used in the implementation of Show for ThreadIds.
1549  * ------------------------------------------------------------------------ */
1550 int rts_getThreadId(const StgTSO *tso) 
1551 {
1552   return tso->id;
1553 }
1554
1555 /* ---------------------------------------------------------------------------
1556    Create a new thread.
1557
1558    The new thread starts with the given stack size.  Before the
1559    scheduler can run, however, this thread needs to have a closure
1560    (and possibly some arguments) pushed on its stack.  See
1561    pushClosure() in Schedule.h.
1562
1563    createGenThread() and createIOThread() (in SchedAPI.h) are
1564    convenient packaged versions of this function.
1565
1566    currently pri (priority) is only used in a GRAN setup -- HWL
1567    ------------------------------------------------------------------------ */
1568 //@cindex createThread
1569 #if defined(GRAN)
1570 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1571 StgTSO *
1572 createThread(nat stack_size, StgInt pri)
1573 {
1574   return createThread_(stack_size, rtsFalse, pri);
1575 }
1576
1577 static StgTSO *
1578 createThread_(nat size, rtsBool have_lock, StgInt pri)
1579 {
1580 #else
1581 StgTSO *
1582 createThread(nat stack_size)
1583 {
1584   return createThread_(stack_size, rtsFalse);
1585 }
1586
1587 static StgTSO *
1588 createThread_(nat size, rtsBool have_lock)
1589 {
1590 #endif
1591
1592     StgTSO *tso;
1593     nat stack_size;
1594
1595     /* First check whether we should create a thread at all */
1596 #if defined(PAR)
1597   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1598   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1599     threadsIgnored++;
1600     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1601           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1602     return END_TSO_QUEUE;
1603   }
1604   threadsCreated++;
1605 #endif
1606
1607 #if defined(GRAN)
1608   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1609 #endif
1610
1611   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1612
1613   /* catch ridiculously small stack sizes */
1614   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1615     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1616   }
1617
1618   stack_size = size - TSO_STRUCT_SIZEW;
1619
1620   tso = (StgTSO *)allocate(size);
1621   TICK_ALLOC_TSO(stack_size, 0);
1622
1623   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1624 #if defined(GRAN)
1625   SET_GRAN_HDR(tso, ThisPE);
1626 #endif
1627   tso->what_next     = ThreadEnterGHC;
1628
1629   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1630    * protect the increment operation on next_thread_id.
1631    * In future, we could use an atomic increment instead.
1632    */
1633   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1634   tso->id = next_thread_id++; 
1635   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1636
1637   tso->why_blocked  = NotBlocked;
1638   tso->blocked_exceptions = NULL;
1639
1640   tso->stack_size   = stack_size;
1641   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1642                               - TSO_STRUCT_SIZEW;
1643   tso->sp           = (P_)&(tso->stack) + stack_size;
1644
1645 #ifdef PROFILING
1646   tso->prof.CCCS = CCS_MAIN;
1647 #endif
1648
1649   /* put a stop frame on the stack */
1650   tso->sp -= sizeofW(StgStopFrame);
1651   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1652   tso->su = (StgUpdateFrame*)tso->sp;
1653
1654   // ToDo: check this
1655 #if defined(GRAN)
1656   tso->link = END_TSO_QUEUE;
1657   /* uses more flexible routine in GranSim */
1658   insertThread(tso, CurrentProc);
1659 #else
1660   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1661    * from its creation
1662    */
1663 #endif
1664
1665 #if defined(GRAN) 
1666   if (RtsFlags.GranFlags.GranSimStats.Full) 
1667     DumpGranEvent(GR_START,tso);
1668 #elif defined(PAR)
1669   if (RtsFlags.ParFlags.ParStats.Full) 
1670     DumpGranEvent(GR_STARTQ,tso);
1671   /* HACk to avoid SCHEDULE 
1672      LastTSO = tso; */
1673 #endif
1674
1675   /* Link the new thread on the global thread list.
1676    */
1677   tso->global_link = all_threads;
1678   all_threads = tso;
1679
1680 #if defined(DIST)
1681   tso->dist.priority = MandatoryPriority; //by default that is...
1682 #endif
1683
1684 #if defined(GRAN)
1685   tso->gran.pri = pri;
1686 # if defined(DEBUG)
1687   tso->gran.magic = TSO_MAGIC; // debugging only
1688 # endif
1689   tso->gran.sparkname   = 0;
1690   tso->gran.startedat   = CURRENT_TIME; 
1691   tso->gran.exported    = 0;
1692   tso->gran.basicblocks = 0;
1693   tso->gran.allocs      = 0;
1694   tso->gran.exectime    = 0;
1695   tso->gran.fetchtime   = 0;
1696   tso->gran.fetchcount  = 0;
1697   tso->gran.blocktime   = 0;
1698   tso->gran.blockcount  = 0;
1699   tso->gran.blockedat   = 0;
1700   tso->gran.globalsparks = 0;
1701   tso->gran.localsparks  = 0;
1702   if (RtsFlags.GranFlags.Light)
1703     tso->gran.clock  = Now; /* local clock */
1704   else
1705     tso->gran.clock  = 0;
1706
1707   IF_DEBUG(gran,printTSO(tso));
1708 #elif defined(PAR)
1709 # if defined(DEBUG)
1710   tso->par.magic = TSO_MAGIC; // debugging only
1711 # endif
1712   tso->par.sparkname   = 0;
1713   tso->par.startedat   = CURRENT_TIME; 
1714   tso->par.exported    = 0;
1715   tso->par.basicblocks = 0;
1716   tso->par.allocs      = 0;
1717   tso->par.exectime    = 0;
1718   tso->par.fetchtime   = 0;
1719   tso->par.fetchcount  = 0;
1720   tso->par.blocktime   = 0;
1721   tso->par.blockcount  = 0;
1722   tso->par.blockedat   = 0;
1723   tso->par.globalsparks = 0;
1724   tso->par.localsparks  = 0;
1725 #endif
1726
1727 #if defined(GRAN)
1728   globalGranStats.tot_threads_created++;
1729   globalGranStats.threads_created_on_PE[CurrentProc]++;
1730   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1731   globalGranStats.tot_sq_probes++;
1732 #elif defined(PAR)
1733   // collect parallel global statistics (currently done together with GC stats)
1734   if (RtsFlags.ParFlags.ParStats.Global &&
1735       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1736     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1737     globalParStats.tot_threads_created++;
1738   }
1739 #endif 
1740
1741 #if defined(GRAN)
1742   IF_GRAN_DEBUG(pri,
1743                 belch("==__ schedule: Created TSO %d (%p);",
1744                       CurrentProc, tso, tso->id));
1745 #elif defined(PAR)
1746     IF_PAR_DEBUG(verbose,
1747                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1748                        tso->id, tso, advisory_thread_count));
1749 #else
1750   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1751                                  tso->id, tso->stack_size));
1752 #endif    
1753   return tso;
1754 }
1755
1756 #if defined(PAR)
1757 /* RFP:
1758    all parallel thread creation calls should fall through the following routine.
1759 */
1760 StgTSO *
1761 createSparkThread(rtsSpark spark) 
1762 { StgTSO *tso;
1763   ASSERT(spark != (rtsSpark)NULL);
1764   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1765   { threadsIgnored++;
1766     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1767           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1768     return END_TSO_QUEUE;
1769   }
1770   else
1771   { threadsCreated++;
1772     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1773     if (tso==END_TSO_QUEUE)     
1774       barf("createSparkThread: Cannot create TSO");
1775 #if defined(DIST)
1776     tso->priority = AdvisoryPriority;
1777 #endif
1778     pushClosure(tso,spark);
1779     PUSH_ON_RUN_QUEUE(tso);
1780     advisory_thread_count++;    
1781   }
1782   return tso;
1783 }
1784 #endif
1785
1786 /*
1787   Turn a spark into a thread.
1788   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1789 */
1790 #if defined(PAR)
1791 //@cindex activateSpark
1792 StgTSO *
1793 activateSpark (rtsSpark spark) 
1794 {
1795   StgTSO *tso;
1796
1797   tso = createSparkThread(spark);
1798   if (RtsFlags.ParFlags.ParStats.Full) {   
1799     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1800     IF_PAR_DEBUG(verbose,
1801                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1802                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1803   }
1804   // ToDo: fwd info on local/global spark to thread -- HWL
1805   // tso->gran.exported =  spark->exported;
1806   // tso->gran.locked =   !spark->global;
1807   // tso->gran.sparkname = spark->name;
1808
1809   return tso;
1810 }
1811 #endif
1812
1813 /* ---------------------------------------------------------------------------
1814  * scheduleThread()
1815  *
1816  * scheduleThread puts a thread on the head of the runnable queue.
1817  * This will usually be done immediately after a thread is created.
1818  * The caller of scheduleThread must create the thread using e.g.
1819  * createThread and push an appropriate closure
1820  * on this thread's stack before the scheduler is invoked.
1821  * ------------------------------------------------------------------------ */
1822
1823 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1824
1825 void
1826 scheduleThread_(StgTSO *tso
1827                , rtsBool createTask
1828 #if !defined(THREADED_RTS)
1829                  STG_UNUSED
1830 #endif
1831               )
1832 {
1833   ACQUIRE_LOCK(&sched_mutex);
1834
1835   /* Put the new thread on the head of the runnable queue.  The caller
1836    * better push an appropriate closure on this thread's stack
1837    * beforehand.  In the SMP case, the thread may start running as
1838    * soon as we release the scheduler lock below.
1839    */
1840   PUSH_ON_RUN_QUEUE(tso);
1841 #if defined(THREADED_RTS)
1842   /* If main() is scheduling a thread, don't bother creating a 
1843    * new task.
1844    */
1845   if ( createTask ) {
1846     startTask(taskStart);
1847   }
1848 #endif
1849   THREAD_RUNNABLE();
1850
1851 #if 0
1852   IF_DEBUG(scheduler,printTSO(tso));
1853 #endif
1854   RELEASE_LOCK(&sched_mutex);
1855 }
1856
1857 void scheduleThread(StgTSO* tso)
1858 {
1859   return scheduleThread_(tso, rtsFalse);
1860 }
1861
1862 void scheduleExtThread(StgTSO* tso)
1863 {
1864   return scheduleThread_(tso, rtsTrue);
1865 }
1866
1867 /* ---------------------------------------------------------------------------
1868  * initScheduler()
1869  *
1870  * Initialise the scheduler.  This resets all the queues - if the
1871  * queues contained any threads, they'll be garbage collected at the
1872  * next pass.
1873  *
1874  * ------------------------------------------------------------------------ */
1875
1876 #ifdef SMP
1877 static void
1878 term_handler(int sig STG_UNUSED)
1879 {
1880   stat_workerStop();
1881   ACQUIRE_LOCK(&term_mutex);
1882   await_death--;
1883   RELEASE_LOCK(&term_mutex);
1884   shutdownThread();
1885 }
1886 #endif
1887
1888 void 
1889 initScheduler(void)
1890 {
1891 #if defined(GRAN)
1892   nat i;
1893
1894   for (i=0; i<=MAX_PROC; i++) {
1895     run_queue_hds[i]      = END_TSO_QUEUE;
1896     run_queue_tls[i]      = END_TSO_QUEUE;
1897     blocked_queue_hds[i]  = END_TSO_QUEUE;
1898     blocked_queue_tls[i]  = END_TSO_QUEUE;
1899     ccalling_threadss[i]  = END_TSO_QUEUE;
1900     sleeping_queue        = END_TSO_QUEUE;
1901   }
1902 #else
1903   run_queue_hd      = END_TSO_QUEUE;
1904   run_queue_tl      = END_TSO_QUEUE;
1905   blocked_queue_hd  = END_TSO_QUEUE;
1906   blocked_queue_tl  = END_TSO_QUEUE;
1907   sleeping_queue    = END_TSO_QUEUE;
1908 #endif 
1909
1910   suspended_ccalling_threads  = END_TSO_QUEUE;
1911
1912   main_threads = NULL;
1913   all_threads  = END_TSO_QUEUE;
1914
1915   context_switch = 0;
1916   interrupted    = 0;
1917
1918   RtsFlags.ConcFlags.ctxtSwitchTicks =
1919       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1920       
1921 #if defined(RTS_SUPPORTS_THREADS)
1922   /* Initialise the mutex and condition variables used by
1923    * the scheduler. */
1924   initMutex(&sched_mutex);
1925   initMutex(&term_mutex);
1926
1927   initCondition(&thread_ready_cond);
1928 #endif
1929   
1930 #if defined(SMP)
1931   initCondition(&gc_pending_cond);
1932 #endif
1933
1934 #if defined(RTS_SUPPORTS_THREADS)
1935   ACQUIRE_LOCK(&sched_mutex);
1936 #endif
1937
1938   /* Install the SIGHUP handler */
1939 #if defined(SMP)
1940   {
1941     struct sigaction action,oact;
1942
1943     action.sa_handler = term_handler;
1944     sigemptyset(&action.sa_mask);
1945     action.sa_flags = 0;
1946     if (sigaction(SIGTERM, &action, &oact) != 0) {
1947       barf("can't install TERM handler");
1948     }
1949   }
1950 #endif
1951
1952   /* A capability holds the state a native thread needs in
1953    * order to execute STG code. At least one capability is
1954    * floating around (only SMP builds have more than one).
1955    */
1956   initCapabilities();
1957   
1958 #if defined(RTS_SUPPORTS_THREADS)
1959     /* start our haskell execution tasks */
1960 # if defined(SMP)
1961     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
1962 # else
1963     startTaskManager(0,taskStart);
1964 # endif
1965 #endif
1966
1967 #if /* defined(SMP) ||*/ defined(PAR)
1968   initSparkPools();
1969 #endif
1970
1971 #if defined(RTS_SUPPORTS_THREADS)
1972   RELEASE_LOCK(&sched_mutex);
1973 #endif
1974
1975 }
1976
1977 void
1978 exitScheduler( void )
1979 {
1980 #if defined(RTS_SUPPORTS_THREADS)
1981   stopTaskManager();
1982 #endif
1983 }
1984
1985 /* -----------------------------------------------------------------------------
1986    Managing the per-task allocation areas.
1987    
1988    Each capability comes with an allocation area.  These are
1989    fixed-length block lists into which allocation can be done.
1990
1991    ToDo: no support for two-space collection at the moment???
1992    -------------------------------------------------------------------------- */
1993
1994 /* -----------------------------------------------------------------------------
1995  * waitThread is the external interface for running a new computation
1996  * and waiting for the result.
1997  *
1998  * In the non-SMP case, we create a new main thread, push it on the 
1999  * main-thread stack, and invoke the scheduler to run it.  The
2000  * scheduler will return when the top main thread on the stack has
2001  * completed or died, and fill in the necessary fields of the
2002  * main_thread structure.
2003  *
2004  * In the SMP case, we create a main thread as before, but we then
2005  * create a new condition variable and sleep on it.  When our new
2006  * main thread has completed, we'll be woken up and the status/result
2007  * will be in the main_thread struct.
2008  * -------------------------------------------------------------------------- */
2009
2010 int 
2011 howManyThreadsAvail ( void )
2012 {
2013    int i = 0;
2014    StgTSO* q;
2015    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2016       i++;
2017    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2018       i++;
2019    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2020       i++;
2021    return i;
2022 }
2023
2024 void
2025 finishAllThreads ( void )
2026 {
2027    do {
2028       while (run_queue_hd != END_TSO_QUEUE) {
2029          waitThread ( run_queue_hd, NULL);
2030       }
2031       while (blocked_queue_hd != END_TSO_QUEUE) {
2032          waitThread ( blocked_queue_hd, NULL);
2033       }
2034       while (sleeping_queue != END_TSO_QUEUE) {
2035          waitThread ( blocked_queue_hd, NULL);
2036       }
2037    } while 
2038       (blocked_queue_hd != END_TSO_QUEUE || 
2039        run_queue_hd     != END_TSO_QUEUE ||
2040        sleeping_queue   != END_TSO_QUEUE);
2041 }
2042
2043 SchedulerStatus
2044 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2045
2046 #if defined(THREADED_RTS)
2047   return waitThread_(tso,ret, rtsFalse);
2048 #else
2049   return waitThread_(tso,ret);
2050 #endif
2051 }
2052
2053 SchedulerStatus
2054 waitThread_(StgTSO *tso,
2055             /*out*/StgClosure **ret
2056 #if defined(THREADED_RTS)
2057             , rtsBool blockWaiting
2058 #endif
2059            )
2060 {
2061   StgMainThread *m;
2062   SchedulerStatus stat;
2063
2064   ACQUIRE_LOCK(&sched_mutex);
2065   
2066   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2067
2068   m->tso = tso;
2069   m->ret = ret;
2070   m->stat = NoStatus;
2071 #if defined(RTS_SUPPORTS_THREADS)
2072   initCondition(&m->wakeup);
2073 #endif
2074
2075   m->link = main_threads;
2076   main_threads = m;
2077
2078   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2079
2080 #if defined(RTS_SUPPORTS_THREADS)
2081
2082 # if defined(THREADED_RTS)
2083   if (!blockWaiting) {
2084     /* In the threaded case, the OS thread that called main()
2085      * gets to enter the RTS directly without going via another
2086      * task/thread.
2087      */
2088     RELEASE_LOCK(&sched_mutex);
2089     schedule();
2090     ASSERT(m->stat != NoStatus);
2091   } else 
2092 # endif
2093   {
2094     IF_DEBUG(scheduler, sched_belch("sfoo"));
2095     do {
2096       waitCondition(&m->wakeup, &sched_mutex);
2097     } while (m->stat == NoStatus);
2098   }
2099 #elif defined(GRAN)
2100   /* GranSim specific init */
2101   CurrentTSO = m->tso;                // the TSO to run
2102   procStatus[MainProc] = Busy;        // status of main PE
2103   CurrentProc = MainProc;             // PE to run it on
2104
2105   schedule();
2106 #else
2107   RELEASE_LOCK(&sched_mutex);
2108   schedule();
2109   ASSERT(m->stat != NoStatus);
2110 #endif
2111
2112   stat = m->stat;
2113
2114 #if defined(RTS_SUPPORTS_THREADS)
2115   closeCondition(&m->wakeup);
2116 #endif
2117
2118   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2119                               m->tso->id));
2120   free(m);
2121
2122 #if defined(THREADED_RTS)
2123   if (blockWaiting) 
2124 #endif
2125     RELEASE_LOCK(&sched_mutex);
2126
2127   return stat;
2128 }
2129
2130 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2131 //@subsection Run queue code 
2132
2133 #if 0
2134 /* 
2135    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2136        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2137        implicit global variable that has to be correct when calling these
2138        fcts -- HWL 
2139 */
2140
2141 /* Put the new thread on the head of the runnable queue.
2142  * The caller of createThread better push an appropriate closure
2143  * on this thread's stack before the scheduler is invoked.
2144  */
2145 static /* inline */ void
2146 add_to_run_queue(tso)
2147 StgTSO* tso; 
2148 {
2149   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2150   tso->link = run_queue_hd;
2151   run_queue_hd = tso;
2152   if (run_queue_tl == END_TSO_QUEUE) {
2153     run_queue_tl = tso;
2154   }
2155 }
2156
2157 /* Put the new thread at the end of the runnable queue. */
2158 static /* inline */ void
2159 push_on_run_queue(tso)
2160 StgTSO* tso; 
2161 {
2162   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2163   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2164   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2165   if (run_queue_hd == END_TSO_QUEUE) {
2166     run_queue_hd = tso;
2167   } else {
2168     run_queue_tl->link = tso;
2169   }
2170   run_queue_tl = tso;
2171 }
2172
2173 /* 
2174    Should be inlined because it's used very often in schedule.  The tso
2175    argument is actually only needed in GranSim, where we want to have the
2176    possibility to schedule *any* TSO on the run queue, irrespective of the
2177    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2178    the run queue and dequeue the tso, adjusting the links in the queue. 
2179 */
2180 //@cindex take_off_run_queue
2181 static /* inline */ StgTSO*
2182 take_off_run_queue(StgTSO *tso) {
2183   StgTSO *t, *prev;
2184
2185   /* 
2186      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2187
2188      if tso is specified, unlink that tso from the run_queue (doesn't have
2189      to be at the beginning of the queue); GranSim only 
2190   */
2191   if (tso!=END_TSO_QUEUE) {
2192     /* find tso in queue */
2193     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2194          t!=END_TSO_QUEUE && t!=tso;
2195          prev=t, t=t->link) 
2196       /* nothing */ ;
2197     ASSERT(t==tso);
2198     /* now actually dequeue the tso */
2199     if (prev!=END_TSO_QUEUE) {
2200       ASSERT(run_queue_hd!=t);
2201       prev->link = t->link;
2202     } else {
2203       /* t is at beginning of thread queue */
2204       ASSERT(run_queue_hd==t);
2205       run_queue_hd = t->link;
2206     }
2207     /* t is at end of thread queue */
2208     if (t->link==END_TSO_QUEUE) {
2209       ASSERT(t==run_queue_tl);
2210       run_queue_tl = prev;
2211     } else {
2212       ASSERT(run_queue_tl!=t);
2213     }
2214     t->link = END_TSO_QUEUE;
2215   } else {
2216     /* take tso from the beginning of the queue; std concurrent code */
2217     t = run_queue_hd;
2218     if (t != END_TSO_QUEUE) {
2219       run_queue_hd = t->link;
2220       t->link = END_TSO_QUEUE;
2221       if (run_queue_hd == END_TSO_QUEUE) {
2222         run_queue_tl = END_TSO_QUEUE;
2223       }
2224     }
2225   }
2226   return t;
2227 }
2228
2229 #endif /* 0 */
2230
2231 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2232 //@subsection Garbage Collextion Routines
2233
2234 /* ---------------------------------------------------------------------------
2235    Where are the roots that we know about?
2236
2237         - all the threads on the runnable queue
2238         - all the threads on the blocked queue
2239         - all the threads on the sleeping queue
2240         - all the thread currently executing a _ccall_GC
2241         - all the "main threads"
2242      
2243    ------------------------------------------------------------------------ */
2244
2245 /* This has to be protected either by the scheduler monitor, or by the
2246         garbage collection monitor (probably the latter).
2247         KH @ 25/10/99
2248 */
2249
2250 void
2251 GetRoots(evac_fn evac)
2252 {
2253   StgMainThread *m;
2254
2255 #if defined(GRAN)
2256   {
2257     nat i;
2258     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2259       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2260           evac((StgClosure **)&run_queue_hds[i]);
2261       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2262           evac((StgClosure **)&run_queue_tls[i]);
2263       
2264       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2265           evac((StgClosure **)&blocked_queue_hds[i]);
2266       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2267           evac((StgClosure **)&blocked_queue_tls[i]);
2268       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2269           evac((StgClosure **)&ccalling_threads[i]);
2270     }
2271   }
2272
2273   markEventQueue();
2274
2275 #else /* !GRAN */
2276   if (run_queue_hd != END_TSO_QUEUE) {
2277       ASSERT(run_queue_tl != END_TSO_QUEUE);
2278       evac((StgClosure **)&run_queue_hd);
2279       evac((StgClosure **)&run_queue_tl);
2280   }
2281   
2282   if (blocked_queue_hd != END_TSO_QUEUE) {
2283       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2284       evac((StgClosure **)&blocked_queue_hd);
2285       evac((StgClosure **)&blocked_queue_tl);
2286   }
2287   
2288   if (sleeping_queue != END_TSO_QUEUE) {
2289       evac((StgClosure **)&sleeping_queue);
2290   }
2291 #endif 
2292
2293   for (m = main_threads; m != NULL; m = m->link) {
2294       evac((StgClosure **)&m->tso);
2295   }
2296   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2297       evac((StgClosure **)&suspended_ccalling_threads);
2298   }
2299
2300 #if defined(PAR) || defined(GRAN)
2301   markSparkQueue(evac);
2302 #endif
2303 }
2304
2305 /* -----------------------------------------------------------------------------
2306    performGC
2307
2308    This is the interface to the garbage collector from Haskell land.
2309    We provide this so that external C code can allocate and garbage
2310    collect when called from Haskell via _ccall_GC.
2311
2312    It might be useful to provide an interface whereby the programmer
2313    can specify more roots (ToDo).
2314    
2315    This needs to be protected by the GC condition variable above.  KH.
2316    -------------------------------------------------------------------------- */
2317
2318 void (*extra_roots)(evac_fn);
2319
2320 void
2321 performGC(void)
2322 {
2323   GarbageCollect(GetRoots,rtsFalse);
2324 }
2325
2326 void
2327 performMajorGC(void)
2328 {
2329   GarbageCollect(GetRoots,rtsTrue);
2330 }
2331
2332 static void
2333 AllRoots(evac_fn evac)
2334 {
2335     GetRoots(evac);             // the scheduler's roots
2336     extra_roots(evac);          // the user's roots
2337 }
2338
2339 void
2340 performGCWithRoots(void (*get_roots)(evac_fn))
2341 {
2342   extra_roots = get_roots;
2343   GarbageCollect(AllRoots,rtsFalse);
2344 }
2345
2346 /* -----------------------------------------------------------------------------
2347    Stack overflow
2348
2349    If the thread has reached its maximum stack size, then raise the
2350    StackOverflow exception in the offending thread.  Otherwise
2351    relocate the TSO into a larger chunk of memory and adjust its stack
2352    size appropriately.
2353    -------------------------------------------------------------------------- */
2354
2355 static StgTSO *
2356 threadStackOverflow(StgTSO *tso)
2357 {
2358   nat new_stack_size, new_tso_size, diff, stack_words;
2359   StgPtr new_sp;
2360   StgTSO *dest;
2361
2362   IF_DEBUG(sanity,checkTSO(tso));
2363   if (tso->stack_size >= tso->max_stack_size) {
2364
2365     IF_DEBUG(gc,
2366              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2367                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2368              /* If we're debugging, just print out the top of the stack */
2369              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2370                                               tso->sp+64)));
2371
2372     /* Send this thread the StackOverflow exception */
2373     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2374     return tso;
2375   }
2376
2377   /* Try to double the current stack size.  If that takes us over the
2378    * maximum stack size for this thread, then use the maximum instead.
2379    * Finally round up so the TSO ends up as a whole number of blocks.
2380    */
2381   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2382   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2383                                        TSO_STRUCT_SIZE)/sizeof(W_);
2384   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2385   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2386
2387   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2388
2389   dest = (StgTSO *)allocate(new_tso_size);
2390   TICK_ALLOC_TSO(new_stack_size,0);
2391
2392   /* copy the TSO block and the old stack into the new area */
2393   memcpy(dest,tso,TSO_STRUCT_SIZE);
2394   stack_words = tso->stack + tso->stack_size - tso->sp;
2395   new_sp = (P_)dest + new_tso_size - stack_words;
2396   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2397
2398   /* relocate the stack pointers... */
2399   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2400   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2401   dest->sp    = new_sp;
2402   dest->stack_size = new_stack_size;
2403         
2404   /* and relocate the update frame list */
2405   relocate_stack(dest, diff);
2406
2407   /* Mark the old TSO as relocated.  We have to check for relocated
2408    * TSOs in the garbage collector and any primops that deal with TSOs.
2409    *
2410    * It's important to set the sp and su values to just beyond the end
2411    * of the stack, so we don't attempt to scavenge any part of the
2412    * dead TSO's stack.
2413    */
2414   tso->what_next = ThreadRelocated;
2415   tso->link = dest;
2416   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2417   tso->su = (StgUpdateFrame *)tso->sp;
2418   tso->why_blocked = NotBlocked;
2419   dest->mut_link = NULL;
2420
2421   IF_PAR_DEBUG(verbose,
2422                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2423                      tso->id, tso, tso->stack_size);
2424                /* If we're debugging, just print out the top of the stack */
2425                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2426                                                 tso->sp+64)));
2427   
2428   IF_DEBUG(sanity,checkTSO(tso));
2429 #if 0
2430   IF_DEBUG(scheduler,printTSO(dest));
2431 #endif
2432
2433   return dest;
2434 }
2435
2436 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2437 //@subsection Blocking Queue Routines
2438
2439 /* ---------------------------------------------------------------------------
2440    Wake up a queue that was blocked on some resource.
2441    ------------------------------------------------------------------------ */
2442
2443 #if defined(GRAN)
2444 static inline void
2445 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2446 {
2447 }
2448 #elif defined(PAR)
2449 static inline void
2450 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2451 {
2452   /* write RESUME events to log file and
2453      update blocked and fetch time (depending on type of the orig closure) */
2454   if (RtsFlags.ParFlags.ParStats.Full) {
2455     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2456                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2457                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2458     if (EMPTY_RUN_QUEUE())
2459       emitSchedule = rtsTrue;
2460
2461     switch (get_itbl(node)->type) {
2462         case FETCH_ME_BQ:
2463           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2464           break;
2465         case RBH:
2466         case FETCH_ME:
2467         case BLACKHOLE_BQ:
2468           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2469           break;
2470 #ifdef DIST
2471         case MVAR:
2472           break;
2473 #endif    
2474         default:
2475           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2476         }
2477       }
2478 }
2479 #endif
2480
2481 #if defined(GRAN)
2482 static StgBlockingQueueElement *
2483 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2484 {
2485     StgTSO *tso;
2486     PEs node_loc, tso_loc;
2487
2488     node_loc = where_is(node); // should be lifted out of loop
2489     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2490     tso_loc = where_is((StgClosure *)tso);
2491     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2492       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2493       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2494       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2495       // insertThread(tso, node_loc);
2496       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2497                 ResumeThread,
2498                 tso, node, (rtsSpark*)NULL);
2499       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2500       // len_local++;
2501       // len++;
2502     } else { // TSO is remote (actually should be FMBQ)
2503       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2504                                   RtsFlags.GranFlags.Costs.gunblocktime +
2505                                   RtsFlags.GranFlags.Costs.latency;
2506       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2507                 UnblockThread,
2508                 tso, node, (rtsSpark*)NULL);
2509       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2510       // len++;
2511     }
2512     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2513     IF_GRAN_DEBUG(bq,
2514                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2515                           (node_loc==tso_loc ? "Local" : "Global"), 
2516                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2517     tso->block_info.closure = NULL;
2518     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2519                              tso->id, tso));
2520 }
2521 #elif defined(PAR)
2522 static StgBlockingQueueElement *
2523 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2524 {
2525     StgBlockingQueueElement *next;
2526
2527     switch (get_itbl(bqe)->type) {
2528     case TSO:
2529       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2530       /* if it's a TSO just push it onto the run_queue */
2531       next = bqe->link;
2532       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2533       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2534       THREAD_RUNNABLE();
2535       unblockCount(bqe, node);
2536       /* reset blocking status after dumping event */
2537       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2538       break;
2539
2540     case BLOCKED_FETCH:
2541       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2542       next = bqe->link;
2543       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2544       PendingFetches = (StgBlockedFetch *)bqe;
2545       break;
2546
2547 # if defined(DEBUG)
2548       /* can ignore this case in a non-debugging setup; 
2549          see comments on RBHSave closures above */
2550     case CONSTR:
2551       /* check that the closure is an RBHSave closure */
2552       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2553              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2554              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2555       break;
2556
2557     default:
2558       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2559            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2560            (StgClosure *)bqe);
2561 # endif
2562     }
2563   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2564   return next;
2565 }
2566
2567 #else /* !GRAN && !PAR */
2568 static StgTSO *
2569 unblockOneLocked(StgTSO *tso)
2570 {
2571   StgTSO *next;
2572
2573   ASSERT(get_itbl(tso)->type == TSO);
2574   ASSERT(tso->why_blocked != NotBlocked);
2575   tso->why_blocked = NotBlocked;
2576   next = tso->link;
2577   PUSH_ON_RUN_QUEUE(tso);
2578   THREAD_RUNNABLE();
2579   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2580   return next;
2581 }
2582 #endif
2583
2584 #if defined(GRAN) || defined(PAR)
2585 inline StgBlockingQueueElement *
2586 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2587 {
2588   ACQUIRE_LOCK(&sched_mutex);
2589   bqe = unblockOneLocked(bqe, node);
2590   RELEASE_LOCK(&sched_mutex);
2591   return bqe;
2592 }
2593 #else
2594 inline StgTSO *
2595 unblockOne(StgTSO *tso)
2596 {
2597   ACQUIRE_LOCK(&sched_mutex);
2598   tso = unblockOneLocked(tso);
2599   RELEASE_LOCK(&sched_mutex);
2600   return tso;
2601 }
2602 #endif
2603
2604 #if defined(GRAN)
2605 void 
2606 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2607 {
2608   StgBlockingQueueElement *bqe;
2609   PEs node_loc;
2610   nat len = 0; 
2611
2612   IF_GRAN_DEBUG(bq, 
2613                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2614                       node, CurrentProc, CurrentTime[CurrentProc], 
2615                       CurrentTSO->id, CurrentTSO));
2616
2617   node_loc = where_is(node);
2618
2619   ASSERT(q == END_BQ_QUEUE ||
2620          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2621          get_itbl(q)->type == CONSTR); // closure (type constructor)
2622   ASSERT(is_unique(node));
2623
2624   /* FAKE FETCH: magically copy the node to the tso's proc;
2625      no Fetch necessary because in reality the node should not have been 
2626      moved to the other PE in the first place
2627   */
2628   if (CurrentProc!=node_loc) {
2629     IF_GRAN_DEBUG(bq, 
2630                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2631                         node, node_loc, CurrentProc, CurrentTSO->id, 
2632                         // CurrentTSO, where_is(CurrentTSO),
2633                         node->header.gran.procs));
2634     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2635     IF_GRAN_DEBUG(bq, 
2636                   belch("## new bitmask of node %p is %#x",
2637                         node, node->header.gran.procs));
2638     if (RtsFlags.GranFlags.GranSimStats.Global) {
2639       globalGranStats.tot_fake_fetches++;
2640     }
2641   }
2642
2643   bqe = q;
2644   // ToDo: check: ASSERT(CurrentProc==node_loc);
2645   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2646     //next = bqe->link;
2647     /* 
2648        bqe points to the current element in the queue
2649        next points to the next element in the queue
2650     */
2651     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2652     //tso_loc = where_is(tso);
2653     len++;
2654     bqe = unblockOneLocked(bqe, node);
2655   }
2656
2657   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2658      the closure to make room for the anchor of the BQ */
2659   if (bqe!=END_BQ_QUEUE) {
2660     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2661     /*
2662     ASSERT((info_ptr==&RBH_Save_0_info) ||
2663            (info_ptr==&RBH_Save_1_info) ||
2664            (info_ptr==&RBH_Save_2_info));
2665     */
2666     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2667     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2668     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2669
2670     IF_GRAN_DEBUG(bq,
2671                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2672                         node, info_type(node)));
2673   }
2674
2675   /* statistics gathering */
2676   if (RtsFlags.GranFlags.GranSimStats.Global) {
2677     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2678     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2679     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2680     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2681   }
2682   IF_GRAN_DEBUG(bq,
2683                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2684                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2685 }
2686 #elif defined(PAR)
2687 void 
2688 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2689 {
2690   StgBlockingQueueElement *bqe;
2691
2692   ACQUIRE_LOCK(&sched_mutex);
2693
2694   IF_PAR_DEBUG(verbose, 
2695                belch("##-_ AwBQ for node %p on [%x]: ",
2696                      node, mytid));
2697 #ifdef DIST  
2698   //RFP
2699   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2700     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2701     return;
2702   }
2703 #endif
2704   
2705   ASSERT(q == END_BQ_QUEUE ||
2706          get_itbl(q)->type == TSO ||           
2707          get_itbl(q)->type == BLOCKED_FETCH || 
2708          get_itbl(q)->type == CONSTR); 
2709
2710   bqe = q;
2711   while (get_itbl(bqe)->type==TSO || 
2712          get_itbl(bqe)->type==BLOCKED_FETCH) {
2713     bqe = unblockOneLocked(bqe, node);
2714   }
2715   RELEASE_LOCK(&sched_mutex);
2716 }
2717
2718 #else   /* !GRAN && !PAR */
2719 void
2720 awakenBlockedQueue(StgTSO *tso)
2721 {
2722   ACQUIRE_LOCK(&sched_mutex);
2723   while (tso != END_TSO_QUEUE) {
2724     tso = unblockOneLocked(tso);
2725   }
2726   RELEASE_LOCK(&sched_mutex);
2727 }
2728 #endif
2729
2730 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2731 //@subsection Exception Handling Routines
2732
2733 /* ---------------------------------------------------------------------------
2734    Interrupt execution
2735    - usually called inside a signal handler so it mustn't do anything fancy.   
2736    ------------------------------------------------------------------------ */
2737
2738 void
2739 interruptStgRts(void)
2740 {
2741     interrupted    = 1;
2742     context_switch = 1;
2743 }
2744
2745 /* -----------------------------------------------------------------------------
2746    Unblock a thread
2747
2748    This is for use when we raise an exception in another thread, which
2749    may be blocked.
2750    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2751    -------------------------------------------------------------------------- */
2752
2753 #if defined(GRAN) || defined(PAR)
2754 /*
2755   NB: only the type of the blocking queue is different in GranSim and GUM
2756       the operations on the queue-elements are the same
2757       long live polymorphism!
2758 */
2759 static void
2760 unblockThread(StgTSO *tso)
2761 {
2762   StgBlockingQueueElement *t, **last;
2763
2764   ACQUIRE_LOCK(&sched_mutex);
2765   switch (tso->why_blocked) {
2766
2767   case NotBlocked:
2768     return;  /* not blocked */
2769
2770   case BlockedOnMVar:
2771     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2772     {
2773       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2774       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2775
2776       last = (StgBlockingQueueElement **)&mvar->head;
2777       for (t = (StgBlockingQueueElement *)mvar->head; 
2778            t != END_BQ_QUEUE; 
2779            last = &t->link, last_tso = t, t = t->link) {
2780         if (t == (StgBlockingQueueElement *)tso) {
2781           *last = (StgBlockingQueueElement *)tso->link;
2782           if (mvar->tail == tso) {
2783             mvar->tail = (StgTSO *)last_tso;
2784           }
2785           goto done;
2786         }
2787       }
2788       barf("unblockThread (MVAR): TSO not found");
2789     }
2790
2791   case BlockedOnBlackHole:
2792     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2793     {
2794       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2795
2796       last = &bq->blocking_queue;
2797       for (t = bq->blocking_queue; 
2798            t != END_BQ_QUEUE; 
2799            last = &t->link, t = t->link) {
2800         if (t == (StgBlockingQueueElement *)tso) {
2801           *last = (StgBlockingQueueElement *)tso->link;
2802           goto done;
2803         }
2804       }
2805       barf("unblockThread (BLACKHOLE): TSO not found");
2806     }
2807
2808   case BlockedOnException:
2809     {
2810       StgTSO *target  = tso->block_info.tso;
2811
2812       ASSERT(get_itbl(target)->type == TSO);
2813
2814       if (target->what_next == ThreadRelocated) {
2815           target = target->link;
2816           ASSERT(get_itbl(target)->type == TSO);
2817       }
2818
2819       ASSERT(target->blocked_exceptions != NULL);
2820
2821       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2822       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2823            t != END_BQ_QUEUE; 
2824            last = &t->link, t = t->link) {
2825         ASSERT(get_itbl(t)->type == TSO);
2826         if (t == (StgBlockingQueueElement *)tso) {
2827           *last = (StgBlockingQueueElement *)tso->link;
2828           goto done;
2829         }
2830       }
2831       barf("unblockThread (Exception): TSO not found");
2832     }
2833
2834   case BlockedOnRead:
2835   case BlockedOnWrite:
2836     {
2837       /* take TSO off blocked_queue */
2838       StgBlockingQueueElement *prev = NULL;
2839       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2840            prev = t, t = t->link) {
2841         if (t == (StgBlockingQueueElement *)tso) {
2842           if (prev == NULL) {
2843             blocked_queue_hd = (StgTSO *)t->link;
2844             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2845               blocked_queue_tl = END_TSO_QUEUE;
2846             }
2847           } else {
2848             prev->link = t->link;
2849             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2850               blocked_queue_tl = (StgTSO *)prev;
2851             }
2852           }
2853           goto done;
2854         }
2855       }
2856       barf("unblockThread (I/O): TSO not found");
2857     }
2858
2859   case BlockedOnDelay:
2860     {
2861       /* take TSO off sleeping_queue */
2862       StgBlockingQueueElement *prev = NULL;
2863       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2864            prev = t, t = t->link) {
2865         if (t == (StgBlockingQueueElement *)tso) {
2866           if (prev == NULL) {
2867             sleeping_queue = (StgTSO *)t->link;
2868           } else {
2869             prev->link = t->link;
2870           }
2871           goto done;
2872         }
2873       }
2874       barf("unblockThread (I/O): TSO not found");
2875     }
2876
2877   default:
2878     barf("unblockThread");
2879   }
2880
2881  done:
2882   tso->link = END_TSO_QUEUE;
2883   tso->why_blocked = NotBlocked;
2884   tso->block_info.closure = NULL;
2885   PUSH_ON_RUN_QUEUE(tso);
2886   RELEASE_LOCK(&sched_mutex);
2887 }
2888 #else
2889 static void
2890 unblockThread(StgTSO *tso)
2891 {
2892   StgTSO *t, **last;
2893
2894   ACQUIRE_LOCK(&sched_mutex);
2895   switch (tso->why_blocked) {
2896
2897   case NotBlocked:
2898     return;  /* not blocked */
2899
2900   case BlockedOnMVar:
2901     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2902     {
2903       StgTSO *last_tso = END_TSO_QUEUE;
2904       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2905
2906       last = &mvar->head;
2907       for (t = mvar->head; t != END_TSO_QUEUE; 
2908            last = &t->link, last_tso = t, t = t->link) {
2909         if (t == tso) {
2910           *last = tso->link;
2911           if (mvar->tail == tso) {
2912             mvar->tail = last_tso;
2913           }
2914           goto done;
2915         }
2916       }
2917       barf("unblockThread (MVAR): TSO not found");
2918     }
2919
2920   case BlockedOnBlackHole:
2921     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2922     {
2923       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2924
2925       last = &bq->blocking_queue;
2926       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2927            last = &t->link, t = t->link) {
2928         if (t == tso) {
2929           *last = tso->link;
2930           goto done;
2931         }
2932       }
2933       barf("unblockThread (BLACKHOLE): TSO not found");
2934     }
2935
2936   case BlockedOnException:
2937     {
2938       StgTSO *target  = tso->block_info.tso;
2939
2940       ASSERT(get_itbl(target)->type == TSO);
2941
2942       while (target->what_next == ThreadRelocated) {
2943           target = target->link;
2944           ASSERT(get_itbl(target)->type == TSO);
2945       }
2946       
2947       ASSERT(target->blocked_exceptions != NULL);
2948
2949       last = &target->blocked_exceptions;
2950       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2951            last = &t->link, t = t->link) {
2952         ASSERT(get_itbl(t)->type == TSO);
2953         if (t == tso) {
2954           *last = tso->link;
2955           goto done;
2956         }
2957       }
2958       barf("unblockThread (Exception): TSO not found");
2959     }
2960
2961   case BlockedOnRead:
2962   case BlockedOnWrite:
2963     {
2964       StgTSO *prev = NULL;
2965       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2966            prev = t, t = t->link) {
2967         if (t == tso) {
2968           if (prev == NULL) {
2969             blocked_queue_hd = t->link;
2970             if (blocked_queue_tl == t) {
2971               blocked_queue_tl = END_TSO_QUEUE;
2972             }
2973           } else {
2974             prev->link = t->link;
2975             if (blocked_queue_tl == t) {
2976               blocked_queue_tl = prev;
2977             }
2978           }
2979           goto done;
2980         }
2981       }
2982       barf("unblockThread (I/O): TSO not found");
2983     }
2984
2985   case BlockedOnDelay:
2986     {
2987       StgTSO *prev = NULL;
2988       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2989            prev = t, t = t->link) {
2990         if (t == tso) {
2991           if (prev == NULL) {
2992             sleeping_queue = t->link;
2993           } else {
2994             prev->link = t->link;
2995           }
2996           goto done;
2997         }
2998       }
2999       barf("unblockThread (I/O): TSO not found");
3000     }
3001
3002   default:
3003     barf("unblockThread");
3004   }
3005
3006  done:
3007   tso->link = END_TSO_QUEUE;
3008   tso->why_blocked = NotBlocked;
3009   tso->block_info.closure = NULL;
3010   PUSH_ON_RUN_QUEUE(tso);
3011   RELEASE_LOCK(&sched_mutex);
3012 }
3013 #endif
3014
3015 /* -----------------------------------------------------------------------------
3016  * raiseAsync()
3017  *
3018  * The following function implements the magic for raising an
3019  * asynchronous exception in an existing thread.
3020  *
3021  * We first remove the thread from any queue on which it might be
3022  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3023  *
3024  * We strip the stack down to the innermost CATCH_FRAME, building
3025  * thunks in the heap for all the active computations, so they can 
3026  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3027  * an application of the handler to the exception, and push it on
3028  * the top of the stack.
3029  * 
3030  * How exactly do we save all the active computations?  We create an
3031  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3032  * AP_UPDs pushes everything from the corresponding update frame
3033  * upwards onto the stack.  (Actually, it pushes everything up to the
3034  * next update frame plus a pointer to the next AP_UPD object.
3035  * Entering the next AP_UPD object pushes more onto the stack until we
3036  * reach the last AP_UPD object - at which point the stack should look
3037  * exactly as it did when we killed the TSO and we can continue
3038  * execution by entering the closure on top of the stack.
3039  *
3040  * We can also kill a thread entirely - this happens if either (a) the 
3041  * exception passed to raiseAsync is NULL, or (b) there's no
3042  * CATCH_FRAME on the stack.  In either case, we strip the entire
3043  * stack and replace the thread with a zombie.
3044  *
3045  * -------------------------------------------------------------------------- */
3046  
3047 void 
3048 deleteThread(StgTSO *tso)
3049 {
3050   raiseAsync(tso,NULL);
3051 }
3052
3053 void
3054 raiseAsync(StgTSO *tso, StgClosure *exception)
3055 {
3056   StgUpdateFrame* su = tso->su;
3057   StgPtr          sp = tso->sp;
3058   
3059   /* Thread already dead? */
3060   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3061     return;
3062   }
3063
3064   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3065
3066   /* Remove it from any blocking queues */
3067   unblockThread(tso);
3068
3069   /* The stack freezing code assumes there's a closure pointer on
3070    * the top of the stack.  This isn't always the case with compiled
3071    * code, so we have to push a dummy closure on the top which just
3072    * returns to the next return address on the stack.
3073    */
3074   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3075     *(--sp) = (W_)&stg_dummy_ret_closure;
3076   }
3077
3078   while (1) {
3079     nat words = ((P_)su - (P_)sp) - 1;
3080     nat i;
3081     StgAP_UPD * ap;
3082
3083     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3084      * then build the THUNK raise(exception), and leave it on
3085      * top of the CATCH_FRAME ready to enter.
3086      */
3087     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3088       StgCatchFrame *cf = (StgCatchFrame *)su;
3089       StgClosure *raise;
3090
3091       /* we've got an exception to raise, so let's pass it to the
3092        * handler in this frame.
3093        */
3094       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3095       TICK_ALLOC_SE_THK(1,0);
3096       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3097       raise->payload[0] = exception;
3098
3099       /* throw away the stack from Sp up to the CATCH_FRAME.
3100        */
3101       sp = (P_)su - 1;
3102
3103       /* Ensure that async excpetions are blocked now, so we don't get
3104        * a surprise exception before we get around to executing the
3105        * handler.
3106        */
3107       if (tso->blocked_exceptions == NULL) {
3108           tso->blocked_exceptions = END_TSO_QUEUE;
3109       }
3110
3111       /* Put the newly-built THUNK on top of the stack, ready to execute
3112        * when the thread restarts.
3113        */
3114       sp[0] = (W_)raise;
3115       tso->sp = sp;
3116       tso->su = su;
3117       tso->what_next = ThreadEnterGHC;
3118       IF_DEBUG(sanity, checkTSO(tso));
3119       return;
3120     }
3121
3122     /* First build an AP_UPD consisting of the stack chunk above the
3123      * current update frame, with the top word on the stack as the
3124      * fun field.
3125      */
3126     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3127     
3128     ASSERT(words >= 0);
3129     
3130     ap->n_args = words;
3131     ap->fun    = (StgClosure *)sp[0];
3132     sp++;
3133     for(i=0; i < (nat)words; ++i) {
3134       ap->payload[i] = (StgClosure *)*sp++;
3135     }
3136     
3137     switch (get_itbl(su)->type) {
3138       
3139     case UPDATE_FRAME:
3140       {
3141         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3142         TICK_ALLOC_UP_THK(words+1,0);
3143         
3144         IF_DEBUG(scheduler,
3145                  fprintf(stderr,  "scheduler: Updating ");
3146                  printPtr((P_)su->updatee); 
3147                  fprintf(stderr,  " with ");
3148                  printObj((StgClosure *)ap);
3149                  );
3150         
3151         /* Replace the updatee with an indirection - happily
3152          * this will also wake up any threads currently
3153          * waiting on the result.
3154          *
3155          * Warning: if we're in a loop, more than one update frame on
3156          * the stack may point to the same object.  Be careful not to
3157          * overwrite an IND_OLDGEN in this case, because we'll screw
3158          * up the mutable lists.  To be on the safe side, don't
3159          * overwrite any kind of indirection at all.  See also
3160          * threadSqueezeStack in GC.c, where we have to make a similar
3161          * check.
3162          */
3163         if (!closure_IND(su->updatee)) {
3164             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3165         }
3166         su = su->link;
3167         sp += sizeofW(StgUpdateFrame) -1;
3168         sp[0] = (W_)ap; /* push onto stack */
3169         break;
3170       }
3171
3172     case CATCH_FRAME:
3173       {
3174         StgCatchFrame *cf = (StgCatchFrame *)su;
3175         StgClosure* o;
3176         
3177         /* We want a PAP, not an AP_UPD.  Fortunately, the
3178          * layout's the same.
3179          */
3180         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3181         TICK_ALLOC_UPD_PAP(words+1,0);
3182         
3183         /* now build o = FUN(catch,ap,handler) */
3184         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3185         TICK_ALLOC_FUN(2,0);
3186         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3187         o->payload[0] = (StgClosure *)ap;
3188         o->payload[1] = cf->handler;
3189         
3190         IF_DEBUG(scheduler,
3191                  fprintf(stderr,  "scheduler: Built ");
3192                  printObj((StgClosure *)o);
3193                  );
3194         
3195         /* pop the old handler and put o on the stack */
3196         su = cf->link;
3197         sp += sizeofW(StgCatchFrame) - 1;
3198         sp[0] = (W_)o;
3199         break;
3200       }
3201       
3202     case SEQ_FRAME:
3203       {
3204         StgSeqFrame *sf = (StgSeqFrame *)su;
3205         StgClosure* o;
3206         
3207         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3208         TICK_ALLOC_UPD_PAP(words+1,0);
3209         
3210         /* now build o = FUN(seq,ap) */
3211         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3212         TICK_ALLOC_SE_THK(1,0);
3213         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3214         o->payload[0] = (StgClosure *)ap;
3215         
3216         IF_DEBUG(scheduler,
3217                  fprintf(stderr,  "scheduler: Built ");
3218                  printObj((StgClosure *)o);
3219                  );
3220         
3221         /* pop the old handler and put o on the stack */
3222         su = sf->link;
3223         sp += sizeofW(StgSeqFrame) - 1;
3224         sp[0] = (W_)o;
3225         break;
3226       }
3227       
3228     case STOP_FRAME:
3229       /* We've stripped the entire stack, the thread is now dead. */
3230       sp += sizeofW(StgStopFrame) - 1;
3231       sp[0] = (W_)exception;    /* save the exception */
3232       tso->what_next = ThreadKilled;
3233       tso->su = (StgUpdateFrame *)(sp+1);
3234       tso->sp = sp;
3235       return;
3236
3237     default:
3238       barf("raiseAsync");
3239     }
3240   }
3241   barf("raiseAsync");
3242 }
3243
3244 /* -----------------------------------------------------------------------------
3245    resurrectThreads is called after garbage collection on the list of
3246    threads found to be garbage.  Each of these threads will be woken
3247    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3248    on an MVar, or NonTermination if the thread was blocked on a Black
3249    Hole.
3250    -------------------------------------------------------------------------- */
3251
3252 void
3253 resurrectThreads( StgTSO *threads )
3254 {
3255   StgTSO *tso, *next;
3256
3257   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3258     next = tso->global_link;
3259     tso->global_link = all_threads;
3260     all_threads = tso;
3261     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3262
3263     switch (tso->why_blocked) {
3264     case BlockedOnMVar:
3265     case BlockedOnException:
3266       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3267       break;
3268     case BlockedOnBlackHole:
3269       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3270       break;
3271     case NotBlocked:
3272       /* This might happen if the thread was blocked on a black hole
3273        * belonging to a thread that we've just woken up (raiseAsync
3274        * can wake up threads, remember...).
3275        */
3276       continue;
3277     default:
3278       barf("resurrectThreads: thread blocked in a strange way");
3279     }
3280   }
3281 }
3282
3283 /* -----------------------------------------------------------------------------
3284  * Blackhole detection: if we reach a deadlock, test whether any
3285  * threads are blocked on themselves.  Any threads which are found to
3286  * be self-blocked get sent a NonTermination exception.
3287  *
3288  * This is only done in a deadlock situation in order to avoid
3289  * performance overhead in the normal case.
3290  * -------------------------------------------------------------------------- */
3291
3292 static void
3293 detectBlackHoles( void )
3294 {
3295     StgTSO *t = all_threads;
3296     StgUpdateFrame *frame;
3297     StgClosure *blocked_on;
3298
3299     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3300
3301         while (t->what_next == ThreadRelocated) {
3302             t = t->link;
3303             ASSERT(get_itbl(t)->type == TSO);
3304         }
3305       
3306         if (t->why_blocked != BlockedOnBlackHole) {
3307             continue;
3308         }
3309
3310         blocked_on = t->block_info.closure;
3311
3312         for (frame = t->su; ; frame = frame->link) {
3313             switch (get_itbl(frame)->type) {
3314
3315             case UPDATE_FRAME:
3316                 if (frame->updatee == blocked_on) {
3317                     /* We are blocking on one of our own computations, so
3318                      * send this thread the NonTermination exception.  
3319                      */
3320                     IF_DEBUG(scheduler, 
3321                              sched_belch("thread %d is blocked on itself", t->id));
3322                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3323                     goto done;
3324                 }
3325                 else {
3326                     continue;
3327                 }
3328
3329             case CATCH_FRAME:
3330             case SEQ_FRAME:
3331                 continue;
3332                 
3333             case STOP_FRAME:
3334                 break;
3335             }
3336             break;
3337         }
3338
3339     done: ;
3340     }   
3341 }
3342
3343 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3344 //@subsection Debugging Routines
3345
3346 /* -----------------------------------------------------------------------------
3347    Debugging: why is a thread blocked
3348    -------------------------------------------------------------------------- */
3349
3350 #ifdef DEBUG
3351
3352 void
3353 printThreadBlockage(StgTSO *tso)
3354 {
3355   switch (tso->why_blocked) {
3356   case BlockedOnRead:
3357     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3358     break;
3359   case BlockedOnWrite:
3360     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3361     break;
3362   case BlockedOnDelay:
3363     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3364     break;
3365   case BlockedOnMVar:
3366     fprintf(stderr,"is blocked on an MVar");
3367     break;
3368   case BlockedOnException:
3369     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3370             tso->block_info.tso->id);
3371     break;
3372   case BlockedOnBlackHole:
3373     fprintf(stderr,"is blocked on a black hole");
3374     break;
3375   case NotBlocked:
3376     fprintf(stderr,"is not blocked");
3377     break;
3378 #if defined(PAR)
3379   case BlockedOnGA:
3380     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3381             tso->block_info.closure, info_type(tso->block_info.closure));
3382     break;
3383   case BlockedOnGA_NoSend:
3384     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3385             tso->block_info.closure, info_type(tso->block_info.closure));
3386     break;
3387 #endif
3388 #if defined(RTS_SUPPORTS_THREADS)
3389   case BlockedOnCCall:
3390     fprintf(stderr,"is blocked on an external call");
3391     break;
3392 #endif
3393   default:
3394     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3395          tso->why_blocked, tso->id, tso);
3396   }
3397 }
3398
3399 void
3400 printThreadStatus(StgTSO *tso)
3401 {
3402   switch (tso->what_next) {
3403   case ThreadKilled:
3404     fprintf(stderr,"has been killed");
3405     break;
3406   case ThreadComplete:
3407     fprintf(stderr,"has completed");
3408     break;
3409   default:
3410     printThreadBlockage(tso);
3411   }
3412 }
3413
3414 void
3415 printAllThreads(void)
3416 {
3417   StgTSO *t;
3418
3419 # if defined(GRAN)
3420   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3421   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3422                        time_string, rtsFalse/*no commas!*/);
3423
3424   sched_belch("all threads at [%s]:", time_string);
3425 # elif defined(PAR)
3426   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3427   ullong_format_string(CURRENT_TIME,
3428                        time_string, rtsFalse/*no commas!*/);
3429
3430   sched_belch("all threads at [%s]:", time_string);
3431 # else
3432   sched_belch("all threads:");
3433 # endif
3434
3435   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3436     fprintf(stderr, "\tthread %d ", t->id);
3437     printThreadStatus(t);
3438     fprintf(stderr,"\n");
3439   }
3440 }
3441     
3442 /* 
3443    Print a whole blocking queue attached to node (debugging only).
3444 */
3445 //@cindex print_bq
3446 # if defined(PAR)
3447 void 
3448 print_bq (StgClosure *node)
3449 {
3450   StgBlockingQueueElement *bqe;
3451   StgTSO *tso;
3452   rtsBool end;
3453
3454   fprintf(stderr,"## BQ of closure %p (%s): ",
3455           node, info_type(node));
3456
3457   /* should cover all closures that may have a blocking queue */
3458   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3459          get_itbl(node)->type == FETCH_ME_BQ ||
3460          get_itbl(node)->type == RBH ||
3461          get_itbl(node)->type == MVAR);
3462     
3463   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3464
3465   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3466 }
3467
3468 /* 
3469    Print a whole blocking queue starting with the element bqe.
3470 */
3471 void 
3472 print_bqe (StgBlockingQueueElement *bqe)
3473 {
3474   rtsBool end;
3475
3476   /* 
3477      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3478   */
3479   for (end = (bqe==END_BQ_QUEUE);
3480        !end; // iterate until bqe points to a CONSTR
3481        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3482        bqe = end ? END_BQ_QUEUE : bqe->link) {
3483     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3484     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3485     /* types of closures that may appear in a blocking queue */
3486     ASSERT(get_itbl(bqe)->type == TSO ||           
3487            get_itbl(bqe)->type == BLOCKED_FETCH || 
3488            get_itbl(bqe)->type == CONSTR); 
3489     /* only BQs of an RBH end with an RBH_Save closure */
3490     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3491
3492     switch (get_itbl(bqe)->type) {
3493     case TSO:
3494       fprintf(stderr," TSO %u (%x),",
3495               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3496       break;
3497     case BLOCKED_FETCH:
3498       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3499               ((StgBlockedFetch *)bqe)->node, 
3500               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3501               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3502               ((StgBlockedFetch *)bqe)->ga.weight);
3503       break;
3504     case CONSTR:
3505       fprintf(stderr," %s (IP %p),",
3506               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3507                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3508                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3509                "RBH_Save_?"), get_itbl(bqe));
3510       break;
3511     default:
3512       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3513            info_type((StgClosure *)bqe)); // , node, info_type(node));
3514       break;
3515     }
3516   } /* for */
3517   fputc('\n', stderr);
3518 }
3519 # elif defined(GRAN)
3520 void 
3521 print_bq (StgClosure *node)
3522 {
3523   StgBlockingQueueElement *bqe;
3524   PEs node_loc, tso_loc;
3525   rtsBool end;
3526
3527   /* should cover all closures that may have a blocking queue */
3528   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3529          get_itbl(node)->type == FETCH_ME_BQ ||
3530          get_itbl(node)->type == RBH);
3531     
3532   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3533   node_loc = where_is(node);
3534
3535   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3536           node, info_type(node), node_loc);
3537
3538   /* 
3539      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3540   */
3541   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3542        !end; // iterate until bqe points to a CONSTR
3543        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3544     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3545     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3546     /* types of closures that may appear in a blocking queue */
3547     ASSERT(get_itbl(bqe)->type == TSO ||           
3548            get_itbl(bqe)->type == CONSTR); 
3549     /* only BQs of an RBH end with an RBH_Save closure */
3550     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3551
3552     tso_loc = where_is((StgClosure *)bqe);
3553     switch (get_itbl(bqe)->type) {
3554     case TSO:
3555       fprintf(stderr," TSO %d (%p) on [PE %d],",
3556               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3557       break;
3558     case CONSTR:
3559       fprintf(stderr," %s (IP %p),",
3560               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3561                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3562                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3563                "RBH_Save_?"), get_itbl(bqe));
3564       break;
3565     default:
3566       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3567            info_type((StgClosure *)bqe), node, info_type(node));
3568       break;
3569     }
3570   } /* for */
3571   fputc('\n', stderr);
3572 }
3573 #else
3574 /* 
3575    Nice and easy: only TSOs on the blocking queue
3576 */
3577 void 
3578 print_bq (StgClosure *node)
3579 {
3580   StgTSO *tso;
3581
3582   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3583   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3584        tso != END_TSO_QUEUE; 
3585        tso=tso->link) {
3586     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3587     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3588     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3589   }
3590   fputc('\n', stderr);
3591 }
3592 # endif
3593
3594 #if defined(PAR)
3595 static nat
3596 run_queue_len(void)
3597 {
3598   nat i;
3599   StgTSO *tso;
3600
3601   for (i=0, tso=run_queue_hd; 
3602        tso != END_TSO_QUEUE;
3603        i++, tso=tso->link)
3604     /* nothing */
3605
3606   return i;
3607 }
3608 #endif
3609
3610 static void
3611 sched_belch(char *s, ...)
3612 {
3613   va_list ap;
3614   va_start(ap,s);
3615 #ifdef SMP
3616   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3617 #elif defined(PAR)
3618   fprintf(stderr, "== ");
3619 #else
3620   fprintf(stderr, "scheduler: ");
3621 #endif
3622   vfprintf(stderr, s, ap);
3623   fprintf(stderr, "\n");
3624 }
3625
3626 #endif /* DEBUG */
3627
3628
3629 //@node Index,  , Debugging Routines, Main scheduling code
3630 //@subsection Index
3631
3632 //@index
3633 //* StgMainThread::  @cindex\s-+StgMainThread
3634 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3635 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3636 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3637 //* context_switch::  @cindex\s-+context_switch
3638 //* createThread::  @cindex\s-+createThread
3639 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3640 //* initScheduler::  @cindex\s-+initScheduler
3641 //* interrupted::  @cindex\s-+interrupted
3642 //* next_thread_id::  @cindex\s-+next_thread_id
3643 //* print_bq::  @cindex\s-+print_bq
3644 //* run_queue_hd::  @cindex\s-+run_queue_hd
3645 //* run_queue_tl::  @cindex\s-+run_queue_tl
3646 //* sched_mutex::  @cindex\s-+sched_mutex
3647 //* schedule::  @cindex\s-+schedule
3648 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3649 //* term_mutex::  @cindex\s-+term_mutex
3650 //@end index