[project @ 2002-02-15 14:49:08 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.125 2002/02/15 14:49:08 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #include <stdarg.h>
118
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
121
122 /* Main threads:
123  *
124  * These are the threads which clients have requested that we run.  
125  *
126  * In a 'threaded' build, we might have several concurrent clients all
127  * waiting for results, and each one will wait on a condition variable
128  * until the result is available.
129  *
130  * In non-SMP, clients are strictly nested: the first client calls
131  * into the RTS, which might call out again to C with a _ccall_GC, and
132  * eventually re-enter the RTS.
133  *
134  * Main threads information is kept in a linked list:
135  */
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
138   StgTSO *         tso;
139   SchedulerStatus  stat;
140   StgClosure **    ret;
141 #if defined(RTS_SUPPORTS_THREADS)
142   Condition        wakeup;
143 #endif
144   struct StgMainThread_ *link;
145 } StgMainThread;
146
147 /* Main thread queue.
148  * Locks required: sched_mutex.
149  */
150 static StgMainThread *main_threads;
151
152 /* Thread queues.
153  * Locks required: sched_mutex.
154  */
155 #if defined(GRAN)
156
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
159
160 /* 
161    In GranSim we have a runnable and a blocked queue for each processor.
162    In order to minimise code changes new arrays run_queue_hds/tls
163    are created. run_queue_hd is then a short cut (macro) for
164    run_queue_hds[CurrentProc] (see GranSim.h).
165    -- HWL
166 */
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171    the std RTS (i.e. we are cheating). However, we don't use this list in
172    the GranSim specific code at the moment (so we are only potentially
173    cheating).  */
174
175 #else /* !GRAN */
176
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
180
181 #endif
182
183 /* Linked list of all threads.
184  * Used for detecting garbage collected threads.
185  */
186 StgTSO *all_threads;
187
188 /* When a thread performs a safe C call (_ccall_GC, using old
189  * terminology), it gets put on the suspended_ccalling_threads
190  * list. Used by the garbage collector.
191  */
192 static StgTSO *suspended_ccalling_threads;
193
194 static StgTSO *threadStackOverflow(StgTSO *tso);
195
196 /* KH: The following two flags are shared memory locations.  There is no need
197        to lock them, since they are only unset at the end of a scheduler
198        operation.
199 */
200
201 /* flag set by signal handler to precipitate a context switch */
202 //@cindex context_switch
203 nat context_switch;
204
205 /* if this flag is set as well, give up execution */
206 //@cindex interrupted
207 rtsBool interrupted;
208
209 /* Next thread ID to allocate.
210  * Locks required: sched_mutex
211  */
212 //@cindex next_thread_id
213 StgThreadID next_thread_id = 1;
214
215 /*
216  * Pointers to the state of the current thread.
217  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
218  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
219  */
220  
221 /* The smallest stack size that makes any sense is:
222  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
223  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
224  *  + 1                       (the realworld token for an IO thread)
225  *  + 1                       (the closure to enter)
226  *
227  * A thread with this stack will bomb immediately with a stack
228  * overflow, which will increase its stack size.  
229  */
230
231 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
232
233
234 #if defined(GRAN)
235 StgTSO *CurrentTSO;
236 #endif
237
238 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
239  *  exists - earlier gccs apparently didn't.
240  *  -= chak
241  */
242 StgTSO dummy_tso;
243
244 rtsBool ready_to_gc;
245
246 void            addToBlockedQueue ( StgTSO *tso );
247
248 static void     schedule          ( void );
249        void     interruptStgRts   ( void );
250 #if defined(GRAN)
251 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
252 #else
253 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
254 #endif
255
256 static void     detectBlackHoles  ( void );
257
258 #ifdef DEBUG
259 static void sched_belch(char *s, ...);
260 #endif
261
262 #if defined(RTS_SUPPORTS_THREADS)
263 /* ToDo: carefully document the invariants that go together
264  *       with these synchronisation objects.
265  */
266 Mutex     sched_mutex       = INIT_MUTEX_VAR;
267 Mutex     term_mutex        = INIT_MUTEX_VAR;
268
269 # if defined(SMP)
270 static Condition gc_pending_cond = INIT_COND_VAR;
271 nat await_death;
272 # endif
273
274 #endif /* RTS_SUPPORTS_THREADS */
275
276 #if defined(PAR)
277 StgTSO *LastTSO;
278 rtsTime TimeOfLastYield;
279 rtsBool emitSchedule = rtsTrue;
280 #endif
281
282 #if DEBUG
283 char *whatNext_strs[] = {
284   "ThreadEnterGHC",
285   "ThreadRunGHC",
286   "ThreadEnterInterp",
287   "ThreadKilled",
288   "ThreadComplete"
289 };
290
291 char *threadReturnCode_strs[] = {
292   "HeapOverflow",                       /* might also be StackOverflow */
293   "StackOverflow",
294   "ThreadYielding",
295   "ThreadBlocked",
296   "ThreadFinished"
297 };
298 #endif
299
300 #if defined(PAR)
301 StgTSO * createSparkThread(rtsSpark spark);
302 StgTSO * activateSpark (rtsSpark spark);  
303 #endif
304
305 /*
306  * The thread state for the main thread.
307 // ToDo: check whether not needed any more
308 StgTSO   *MainTSO;
309  */
310
311 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
312 static void taskStart(void);
313 static void
314 taskStart(void)
315 {
316   schedule();
317 }
318 #endif
319
320
321
322
323 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
324 //@subsection Main scheduling loop
325
326 /* ---------------------------------------------------------------------------
327    Main scheduling loop.
328
329    We use round-robin scheduling, each thread returning to the
330    scheduler loop when one of these conditions is detected:
331
332       * out of heap space
333       * timer expires (thread yields)
334       * thread blocks
335       * thread ends
336       * stack overflow
337
338    Locking notes:  we acquire the scheduler lock once at the beginning
339    of the scheduler loop, and release it when
340     
341       * running a thread, or
342       * waiting for work, or
343       * waiting for a GC to complete.
344
345    GRAN version:
346      In a GranSim setup this loop iterates over the global event queue.
347      This revolves around the global event queue, which determines what 
348      to do next. Therefore, it's more complicated than either the 
349      concurrent or the parallel (GUM) setup.
350
351    GUM version:
352      GUM iterates over incoming messages.
353      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
354      and sends out a fish whenever it has nothing to do; in-between
355      doing the actual reductions (shared code below) it processes the
356      incoming messages and deals with delayed operations 
357      (see PendingFetches).
358      This is not the ugliest code you could imagine, but it's bloody close.
359
360    ------------------------------------------------------------------------ */
361 //@cindex schedule
362 static void
363 schedule( void )
364 {
365   StgTSO *t;
366   Capability *cap;
367   StgThreadReturnCode ret;
368 #if defined(GRAN)
369   rtsEvent *event;
370 #elif defined(PAR)
371   StgSparkPool *pool;
372   rtsSpark spark;
373   StgTSO *tso;
374   GlobalTaskId pe;
375   rtsBool receivedFinish = rtsFalse;
376 # if defined(DEBUG)
377   nat tp_size, sp_size; // stats only
378 # endif
379 #endif
380   rtsBool was_interrupted = rtsFalse;
381   
382   ACQUIRE_LOCK(&sched_mutex);
383  
384 #if defined(RTS_SUPPORTS_THREADS)
385   /* Check to see whether there are any worker threads
386      waiting to deposit external call results. If so,
387      yield our capability */
388   yieldToReturningWorker(&sched_mutex, cap);
389
390   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
391 #endif
392
393 #if defined(GRAN)
394   /* set up first event to get things going */
395   /* ToDo: assign costs for system setup and init MainTSO ! */
396   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
397             ContinueThread, 
398             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
399
400   IF_DEBUG(gran,
401            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
402            G_TSO(CurrentTSO, 5));
403
404   if (RtsFlags.GranFlags.Light) {
405     /* Save current time; GranSim Light only */
406     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
407   }      
408
409   event = get_next_event();
410
411   while (event!=(rtsEvent*)NULL) {
412     /* Choose the processor with the next event */
413     CurrentProc = event->proc;
414     CurrentTSO = event->tso;
415
416 #elif defined(PAR)
417
418   while (!receivedFinish) {    /* set by processMessages */
419                                /* when receiving PP_FINISH message         */ 
420 #else
421
422   while (1) {
423
424 #endif
425
426     IF_DEBUG(scheduler, printAllThreads());
427
428     /* If we're interrupted (the user pressed ^C, or some other
429      * termination condition occurred), kill all the currently running
430      * threads.
431      */
432     if (interrupted) {
433       IF_DEBUG(scheduler, sched_belch("interrupted"));
434       deleteAllThreads();
435       interrupted = rtsFalse;
436       was_interrupted = rtsTrue;
437     }
438
439     /* Go through the list of main threads and wake up any
440      * clients whose computations have finished.  ToDo: this
441      * should be done more efficiently without a linear scan
442      * of the main threads list, somehow...
443      */
444 #if defined(RTS_SUPPORTS_THREADS)
445     { 
446       StgMainThread *m, **prev;
447       prev = &main_threads;
448       for (m = main_threads; m != NULL; m = m->link) {
449         switch (m->tso->what_next) {
450         case ThreadComplete:
451           if (m->ret) {
452             *(m->ret) = (StgClosure *)m->tso->sp[0];
453           }
454           *prev = m->link;
455           m->stat = Success;
456           broadcastCondition(&m->wakeup);
457           break;
458         case ThreadKilled:
459           if (m->ret) *(m->ret) = NULL;
460           *prev = m->link;
461           if (was_interrupted) {
462             m->stat = Interrupted;
463           } else {
464             m->stat = Killed;
465           }
466           broadcastCondition(&m->wakeup);
467           break;
468         default:
469           break;
470         }
471       }
472     }
473
474 #else /* not threaded */
475
476 # if defined(PAR)
477     /* in GUM do this only on the Main PE */
478     if (IAmMainThread)
479 # endif
480     /* If our main thread has finished or been killed, return.
481      */
482     {
483       StgMainThread *m = main_threads;
484       if (m->tso->what_next == ThreadComplete
485           || m->tso->what_next == ThreadKilled) {
486         main_threads = main_threads->link;
487         if (m->tso->what_next == ThreadComplete) {
488           /* we finished successfully, fill in the return value */
489           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
490           m->stat = Success;
491           return;
492         } else {
493           if (m->ret) { *(m->ret) = NULL; };
494           if (was_interrupted) {
495             m->stat = Interrupted;
496           } else {
497             m->stat = Killed;
498           }
499           return;
500         }
501       }
502     }
503 #endif
504
505     /* Top up the run queue from our spark pool.  We try to make the
506      * number of threads in the run queue equal to the number of
507      * free capabilities.
508      *
509      * Disable spark support in SMP for now, non-essential & requires
510      * a little bit of work to make it compile cleanly. -- sof 1/02.
511      */
512 #if 0 /* defined(SMP) */
513     {
514       nat n = getFreeCapabilities();
515       StgTSO *tso = run_queue_hd;
516
517       /* Count the run queue */
518       while (n > 0 && tso != END_TSO_QUEUE) {
519         tso = tso->link;
520         n--;
521       }
522
523       for (; n > 0; n--) {
524         StgClosure *spark;
525         spark = findSpark(rtsFalse);
526         if (spark == NULL) {
527           break; /* no more sparks in the pool */
528         } else {
529           /* I'd prefer this to be done in activateSpark -- HWL */
530           /* tricky - it needs to hold the scheduler lock and
531            * not try to re-acquire it -- SDM */
532           createSparkThread(spark);       
533           IF_DEBUG(scheduler,
534                    sched_belch("==^^ turning spark of closure %p into a thread",
535                                (StgClosure *)spark));
536         }
537       }
538       /* We need to wake up the other tasks if we just created some
539        * work for them.
540        */
541       if (getFreeCapabilities() - n > 1) {
542           signalCondition( &thread_ready_cond );
543       }
544     }
545 #endif // SMP
546
547     /* check for signals each time around the scheduler */
548 #ifndef mingw32_TARGET_OS
549     if (signals_pending()) {
550       startSignalHandlers();
551     }
552 #endif
553
554     /* Check whether any waiting threads need to be woken up.  If the
555      * run queue is empty, and there are no other tasks running, we
556      * can wait indefinitely for something to happen.
557      * ToDo: what if another client comes along & requests another
558      * main thread?
559      */
560     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
561       awaitEvent( EMPTY_RUN_QUEUE()
562 #if defined(SMP)
563         && allFreeCapabilities()
564 #endif
565         );
566     }
567     /* we can be interrupted while waiting for I/O... */
568     if (interrupted) continue;
569
570     /* 
571      * Detect deadlock: when we have no threads to run, there are no
572      * threads waiting on I/O or sleeping, and all the other tasks are
573      * waiting for work, we must have a deadlock of some description.
574      *
575      * We first try to find threads blocked on themselves (ie. black
576      * holes), and generate NonTermination exceptions where necessary.
577      *
578      * If no threads are black holed, we have a deadlock situation, so
579      * inform all the main threads.
580      */
581 #ifndef PAR
582     if (   EMPTY_RUN_QUEUE()
583         && EMPTY_QUEUE(blocked_queue_hd)
584         && EMPTY_QUEUE(sleeping_queue)
585 #if defined(RTS_SUPPORTS_THREADS)
586         && EMPTY_QUEUE(suspended_ccalling_threads)
587 #endif
588 #ifdef SMP
589         && allFreeCapabilities()
590 #endif
591         )
592     {
593         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
594 #if defined(THREADED_RTS)
595         /* and SMP mode ..? */
596         releaseCapability(cap);
597 #endif
598         RELEASE_LOCK(&sched_mutex);
599         GarbageCollect(GetRoots,rtsTrue);
600         ACQUIRE_LOCK(&sched_mutex);
601         if (   EMPTY_QUEUE(blocked_queue_hd)
602             && EMPTY_RUN_QUEUE()
603             && EMPTY_QUEUE(sleeping_queue) ) {
604
605             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
606             detectBlackHoles();
607
608             /* No black holes, so probably a real deadlock.  Send the
609              * current main thread the Deadlock exception (or in the SMP
610              * build, send *all* main threads the deadlock exception,
611              * since none of them can make progress).
612              */
613             if ( EMPTY_RUN_QUEUE() ) {
614                 StgMainThread *m;
615 #if defined(RTS_SUPPORTS_THREADS)
616                 for (m = main_threads; m != NULL; m = m->link) {
617                     switch (m->tso->why_blocked) {
618                     case BlockedOnBlackHole:
619                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
620                         break;
621                     case BlockedOnException:
622                     case BlockedOnMVar:
623                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
624                         break;
625                     default:
626                         barf("deadlock: main thread blocked in a strange way");
627                     }
628                 }
629 #else
630                 m = main_threads;
631                 switch (m->tso->why_blocked) {
632                 case BlockedOnBlackHole:
633                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
634                     break;
635                 case BlockedOnException:
636                 case BlockedOnMVar:
637                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
638                     break;
639                 default:
640                     barf("deadlock: main thread blocked in a strange way");
641                 }
642 #endif
643             }
644 #if defined(RTS_SUPPORTS_THREADS)
645             /* ToDo: revisit conditions (and mechanism) for shutting
646                down a multi-threaded world  */
647             if ( EMPTY_RUN_QUEUE() ) {
648               IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
649               shutdownHaskellAndExit(0);
650             }
651 #endif
652             ASSERT( !EMPTY_RUN_QUEUE() );
653         }
654     }
655 #elif defined(PAR)
656     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
657 #endif
658
659 #if defined(SMP)
660     /* If there's a GC pending, don't do anything until it has
661      * completed.
662      */
663     if (ready_to_gc) {
664       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
665       waitCondition( &gc_pending_cond, &sched_mutex );
666     }
667 #endif    
668
669 #if defined(RTS_SUPPORTS_THREADS)
670     /* block until we've got a thread on the run queue and a free
671      * capability.
672      *
673      */
674     if ( EMPTY_RUN_QUEUE() ) {
675       /* Give up our capability */
676       releaseCapability(cap);
677       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
678       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
679       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
680 #if 0
681       while ( EMPTY_RUN_QUEUE() ) {
682         waitForWorkCapability(&sched_mutex, &cap);
683         IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
684       }
685 #endif
686     }
687 #endif
688
689 #if defined(GRAN)
690     if (RtsFlags.GranFlags.Light)
691       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
692
693     /* adjust time based on time-stamp */
694     if (event->time > CurrentTime[CurrentProc] &&
695         event->evttype != ContinueThread)
696       CurrentTime[CurrentProc] = event->time;
697     
698     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
699     if (!RtsFlags.GranFlags.Light)
700       handleIdlePEs();
701
702     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
703
704     /* main event dispatcher in GranSim */
705     switch (event->evttype) {
706       /* Should just be continuing execution */
707     case ContinueThread:
708       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
709       /* ToDo: check assertion
710       ASSERT(run_queue_hd != (StgTSO*)NULL &&
711              run_queue_hd != END_TSO_QUEUE);
712       */
713       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
714       if (!RtsFlags.GranFlags.DoAsyncFetch &&
715           procStatus[CurrentProc]==Fetching) {
716         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
717               CurrentTSO->id, CurrentTSO, CurrentProc);
718         goto next_thread;
719       } 
720       /* Ignore ContinueThreads for completed threads */
721       if (CurrentTSO->what_next == ThreadComplete) {
722         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
723               CurrentTSO->id, CurrentTSO, CurrentProc);
724         goto next_thread;
725       } 
726       /* Ignore ContinueThreads for threads that are being migrated */
727       if (PROCS(CurrentTSO)==Nowhere) { 
728         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
729               CurrentTSO->id, CurrentTSO, CurrentProc);
730         goto next_thread;
731       }
732       /* The thread should be at the beginning of the run queue */
733       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
734         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
735               CurrentTSO->id, CurrentTSO, CurrentProc);
736         break; // run the thread anyway
737       }
738       /*
739       new_event(proc, proc, CurrentTime[proc],
740                 FindWork,
741                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
742       goto next_thread; 
743       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
744       break; // now actually run the thread; DaH Qu'vam yImuHbej 
745
746     case FetchNode:
747       do_the_fetchnode(event);
748       goto next_thread;             /* handle next event in event queue  */
749       
750     case GlobalBlock:
751       do_the_globalblock(event);
752       goto next_thread;             /* handle next event in event queue  */
753       
754     case FetchReply:
755       do_the_fetchreply(event);
756       goto next_thread;             /* handle next event in event queue  */
757       
758     case UnblockThread:   /* Move from the blocked queue to the tail of */
759       do_the_unblock(event);
760       goto next_thread;             /* handle next event in event queue  */
761       
762     case ResumeThread:  /* Move from the blocked queue to the tail of */
763       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
764       event->tso->gran.blocktime += 
765         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
766       do_the_startthread(event);
767       goto next_thread;             /* handle next event in event queue  */
768       
769     case StartThread:
770       do_the_startthread(event);
771       goto next_thread;             /* handle next event in event queue  */
772       
773     case MoveThread:
774       do_the_movethread(event);
775       goto next_thread;             /* handle next event in event queue  */
776       
777     case MoveSpark:
778       do_the_movespark(event);
779       goto next_thread;             /* handle next event in event queue  */
780       
781     case FindWork:
782       do_the_findwork(event);
783       goto next_thread;             /* handle next event in event queue  */
784       
785     default:
786       barf("Illegal event type %u\n", event->evttype);
787     }  /* switch */
788     
789     /* This point was scheduler_loop in the old RTS */
790
791     IF_DEBUG(gran, belch("GRAN: after main switch"));
792
793     TimeOfLastEvent = CurrentTime[CurrentProc];
794     TimeOfNextEvent = get_time_of_next_event();
795     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
796     // CurrentTSO = ThreadQueueHd;
797
798     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
799                          TimeOfNextEvent));
800
801     if (RtsFlags.GranFlags.Light) 
802       GranSimLight_leave_system(event, &ActiveTSO); 
803
804     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
805
806     IF_DEBUG(gran, 
807              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
808
809     /* in a GranSim setup the TSO stays on the run queue */
810     t = CurrentTSO;
811     /* Take a thread from the run queue. */
812     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
813
814     IF_DEBUG(gran, 
815              fprintf(stderr, "GRAN: About to run current thread, which is\n");
816              G_TSO(t,5));
817
818     context_switch = 0; // turned on via GranYield, checking events and time slice
819
820     IF_DEBUG(gran, 
821              DumpGranEvent(GR_SCHEDULE, t));
822
823     procStatus[CurrentProc] = Busy;
824
825 #elif defined(PAR)
826     if (PendingFetches != END_BF_QUEUE) {
827         processFetches();
828     }
829
830     /* ToDo: phps merge with spark activation above */
831     /* check whether we have local work and send requests if we have none */
832     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
833       /* :-[  no local threads => look out for local sparks */
834       /* the spark pool for the current PE */
835       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
836       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
837           pool->hd < pool->tl) {
838         /* 
839          * ToDo: add GC code check that we really have enough heap afterwards!!
840          * Old comment:
841          * If we're here (no runnable threads) and we have pending
842          * sparks, we must have a space problem.  Get enough space
843          * to turn one of those pending sparks into a
844          * thread... 
845          */
846
847         spark = findSpark(rtsFalse);                /* get a spark */
848         if (spark != (rtsSpark) NULL) {
849           tso = activateSpark(spark);       /* turn the spark into a thread */
850           IF_PAR_DEBUG(schedule,
851                        belch("==== schedule: Created TSO %d (%p); %d threads active",
852                              tso->id, tso, advisory_thread_count));
853
854           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
855             belch("==^^ failed to activate spark");
856             goto next_thread;
857           }               /* otherwise fall through & pick-up new tso */
858         } else {
859           IF_PAR_DEBUG(verbose,
860                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
861                              spark_queue_len(pool)));
862           goto next_thread;
863         }
864       }
865
866       /* If we still have no work we need to send a FISH to get a spark
867          from another PE 
868       */
869       if (EMPTY_RUN_QUEUE()) {
870       /* =8-[  no local sparks => look for work on other PEs */
871         /*
872          * We really have absolutely no work.  Send out a fish
873          * (there may be some out there already), and wait for
874          * something to arrive.  We clearly can't run any threads
875          * until a SCHEDULE or RESUME arrives, and so that's what
876          * we're hoping to see.  (Of course, we still have to
877          * respond to other types of messages.)
878          */
879         TIME now = msTime() /*CURRENT_TIME*/;
880         IF_PAR_DEBUG(verbose, 
881                      belch("--  now=%ld", now));
882         IF_PAR_DEBUG(verbose,
883                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
884                          (last_fish_arrived_at!=0 &&
885                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
886                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
887                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
888                              last_fish_arrived_at,
889                              RtsFlags.ParFlags.fishDelay, now);
890                      });
891         
892         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
893             (last_fish_arrived_at==0 ||
894              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
895           /* outstandingFishes is set in sendFish, processFish;
896              avoid flooding system with fishes via delay */
897           pe = choosePE();
898           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
899                    NEW_FISH_HUNGER);
900
901           // Global statistics: count no. of fishes
902           if (RtsFlags.ParFlags.ParStats.Global &&
903               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
904             globalParStats.tot_fish_mess++;
905           }
906         }
907       
908         receivedFinish = processMessages();
909         goto next_thread;
910       }
911     } else if (PacketsWaiting()) {  /* Look for incoming messages */
912       receivedFinish = processMessages();
913     }
914
915     /* Now we are sure that we have some work available */
916     ASSERT(run_queue_hd != END_TSO_QUEUE);
917
918     /* Take a thread from the run queue, if we have work */
919     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
920     IF_DEBUG(sanity,checkTSO(t));
921
922     /* ToDo: write something to the log-file
923     if (RTSflags.ParFlags.granSimStats && !sameThread)
924         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
925
926     CurrentTSO = t;
927     */
928     /* the spark pool for the current PE */
929     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
930
931     IF_DEBUG(scheduler, 
932              belch("--=^ %d threads, %d sparks on [%#x]", 
933                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
934
935 # if 1
936     if (0 && RtsFlags.ParFlags.ParStats.Full && 
937         t && LastTSO && t->id != LastTSO->id && 
938         LastTSO->why_blocked == NotBlocked && 
939         LastTSO->what_next != ThreadComplete) {
940       // if previously scheduled TSO not blocked we have to record the context switch
941       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
942                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
943     }
944
945     if (RtsFlags.ParFlags.ParStats.Full && 
946         (emitSchedule /* forced emit */ ||
947         (t && LastTSO && t->id != LastTSO->id))) {
948       /* 
949          we are running a different TSO, so write a schedule event to log file
950          NB: If we use fair scheduling we also have to write  a deschedule 
951              event for LastTSO; with unfair scheduling we know that the
952              previous tso has blocked whenever we switch to another tso, so
953              we don't need it in GUM for now
954       */
955       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
956                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
957       emitSchedule = rtsFalse;
958     }
959      
960 # endif
961 #else /* !GRAN && !PAR */
962   
963     /* grab a thread from the run queue */
964     ASSERT(run_queue_hd != END_TSO_QUEUE);
965     t = POP_RUN_QUEUE();
966     // Sanity check the thread we're about to run.  This can be
967     // expensive if there is lots of thread switching going on...
968     IF_DEBUG(sanity,checkTSO(t));
969 #endif
970     
971     grabCapability(&cap);
972     cap->r.rCurrentTSO = t;
973     
974     /* context switches are now initiated by the timer signal, unless
975      * the user specified "context switch as often as possible", with
976      * +RTS -C0
977      */
978     if (
979 #ifdef PROFILING
980         RtsFlags.ProfFlags.profileInterval == 0 ||
981 #endif
982         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
983          && (run_queue_hd != END_TSO_QUEUE
984              || blocked_queue_hd != END_TSO_QUEUE
985              || sleeping_queue != END_TSO_QUEUE)))
986         context_switch = 1;
987     else
988         context_switch = 0;
989
990     RELEASE_LOCK(&sched_mutex);
991
992     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
993                               t->id, t, whatNext_strs[t->what_next]));
994
995 #ifdef PROFILING
996     startHeapProfTimer();
997 #endif
998
999     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1000     /* Run the current thread 
1001      */
1002     switch (cap->r.rCurrentTSO->what_next) {
1003     case ThreadKilled:
1004     case ThreadComplete:
1005         /* Thread already finished, return to scheduler. */
1006         ret = ThreadFinished;
1007         break;
1008     case ThreadEnterGHC:
1009         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1010         break;
1011     case ThreadRunGHC:
1012         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1013         break;
1014     case ThreadEnterInterp:
1015         ret = interpretBCO(cap);
1016         break;
1017     default:
1018       barf("schedule: invalid what_next field");
1019     }
1020     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1021     
1022     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1023 #ifdef PROFILING
1024     stopHeapProfTimer();
1025     CCCS = CCS_SYSTEM;
1026 #endif
1027     
1028     ACQUIRE_LOCK(&sched_mutex);
1029
1030 #ifdef SMP
1031     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1032 #elif !defined(GRAN) && !defined(PAR)
1033     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1034 #endif
1035     t = cap->r.rCurrentTSO;
1036     
1037 #if defined(PAR)
1038     /* HACK 675: if the last thread didn't yield, make sure to print a 
1039        SCHEDULE event to the log file when StgRunning the next thread, even
1040        if it is the same one as before */
1041     LastTSO = t; 
1042     TimeOfLastYield = CURRENT_TIME;
1043 #endif
1044
1045     switch (ret) {
1046     case HeapOverflow:
1047 #if defined(GRAN)
1048       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1049       globalGranStats.tot_heapover++;
1050 #elif defined(PAR)
1051       globalParStats.tot_heapover++;
1052 #endif
1053
1054       // did the task ask for a large block?
1055       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1056           // if so, get one and push it on the front of the nursery.
1057           bdescr *bd;
1058           nat blocks;
1059           
1060           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1061
1062           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1063                                    t->id, t,
1064                                    whatNext_strs[t->what_next], blocks));
1065
1066           // don't do this if it would push us over the
1067           // alloc_blocks_lim limit; we'll GC first.
1068           if (alloc_blocks + blocks < alloc_blocks_lim) {
1069
1070               alloc_blocks += blocks;
1071               bd = allocGroup( blocks );
1072
1073               // link the new group into the list
1074               bd->link = cap->r.rCurrentNursery;
1075               bd->u.back = cap->r.rCurrentNursery->u.back;
1076               if (cap->r.rCurrentNursery->u.back != NULL) {
1077                   cap->r.rCurrentNursery->u.back->link = bd;
1078               } else {
1079                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1080                          g0s0->blocks == cap->r.rNursery);
1081                   cap->r.rNursery = g0s0->blocks = bd;
1082               }           
1083               cap->r.rCurrentNursery->u.back = bd;
1084
1085               // initialise it as a nursery block
1086               bd->step = g0s0;
1087               bd->gen_no = 0;
1088               bd->flags = 0;
1089               bd->free = bd->start;
1090
1091               // don't forget to update the block count in g0s0.
1092               g0s0->n_blocks += blocks;
1093               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1094
1095               // now update the nursery to point to the new block
1096               cap->r.rCurrentNursery = bd;
1097
1098               // we might be unlucky and have another thread get on the
1099               // run queue before us and steal the large block, but in that
1100               // case the thread will just end up requesting another large
1101               // block.
1102               PUSH_ON_RUN_QUEUE(t);
1103               break;
1104           }
1105       }
1106
1107       /* make all the running tasks block on a condition variable,
1108        * maybe set context_switch and wait till they all pile in,
1109        * then have them wait on a GC condition variable.
1110        */
1111       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1112                                t->id, t, whatNext_strs[t->what_next]));
1113       threadPaused(t);
1114 #if defined(GRAN)
1115       ASSERT(!is_on_queue(t,CurrentProc));
1116 #elif defined(PAR)
1117       /* Currently we emit a DESCHEDULE event before GC in GUM.
1118          ToDo: either add separate event to distinguish SYSTEM time from rest
1119                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1120       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1121         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1122                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1123         emitSchedule = rtsTrue;
1124       }
1125 #endif
1126       
1127       ready_to_gc = rtsTrue;
1128       context_switch = 1;               /* stop other threads ASAP */
1129       PUSH_ON_RUN_QUEUE(t);
1130       /* actual GC is done at the end of the while loop */
1131       break;
1132       
1133     case StackOverflow:
1134 #if defined(GRAN)
1135       IF_DEBUG(gran, 
1136                DumpGranEvent(GR_DESCHEDULE, t));
1137       globalGranStats.tot_stackover++;
1138 #elif defined(PAR)
1139       // IF_DEBUG(par, 
1140       // DumpGranEvent(GR_DESCHEDULE, t);
1141       globalParStats.tot_stackover++;
1142 #endif
1143       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1144                                t->id, t, whatNext_strs[t->what_next]));
1145       /* just adjust the stack for this thread, then pop it back
1146        * on the run queue.
1147        */
1148       threadPaused(t);
1149       { 
1150         StgMainThread *m;
1151         /* enlarge the stack */
1152         StgTSO *new_t = threadStackOverflow(t);
1153         
1154         /* This TSO has moved, so update any pointers to it from the
1155          * main thread stack.  It better not be on any other queues...
1156          * (it shouldn't be).
1157          */
1158         for (m = main_threads; m != NULL; m = m->link) {
1159           if (m->tso == t) {
1160             m->tso = new_t;
1161           }
1162         }
1163         threadPaused(new_t);
1164         PUSH_ON_RUN_QUEUE(new_t);
1165       }
1166       break;
1167
1168     case ThreadYielding:
1169 #if defined(GRAN)
1170       IF_DEBUG(gran, 
1171                DumpGranEvent(GR_DESCHEDULE, t));
1172       globalGranStats.tot_yields++;
1173 #elif defined(PAR)
1174       // IF_DEBUG(par, 
1175       // DumpGranEvent(GR_DESCHEDULE, t);
1176       globalParStats.tot_yields++;
1177 #endif
1178       /* put the thread back on the run queue.  Then, if we're ready to
1179        * GC, check whether this is the last task to stop.  If so, wake
1180        * up the GC thread.  getThread will block during a GC until the
1181        * GC is finished.
1182        */
1183       IF_DEBUG(scheduler,
1184                if (t->what_next == ThreadEnterInterp) {
1185                    /* ToDo: or maybe a timer expired when we were in Hugs?
1186                     * or maybe someone hit ctrl-C
1187                     */
1188                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1189                          t->id, t, whatNext_strs[t->what_next]);
1190                } else {
1191                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1192                          t->id, t, whatNext_strs[t->what_next]);
1193                }
1194                );
1195
1196       threadPaused(t);
1197
1198       IF_DEBUG(sanity,
1199                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1200                checkTSO(t));
1201       ASSERT(t->link == END_TSO_QUEUE);
1202 #if defined(GRAN)
1203       ASSERT(!is_on_queue(t,CurrentProc));
1204
1205       IF_DEBUG(sanity,
1206                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1207                checkThreadQsSanity(rtsTrue));
1208 #endif
1209 #if defined(PAR)
1210       if (RtsFlags.ParFlags.doFairScheduling) { 
1211         /* this does round-robin scheduling; good for concurrency */
1212         APPEND_TO_RUN_QUEUE(t);
1213       } else {
1214         /* this does unfair scheduling; good for parallelism */
1215         PUSH_ON_RUN_QUEUE(t);
1216       }
1217 #else
1218       /* this does round-robin scheduling; good for concurrency */
1219       APPEND_TO_RUN_QUEUE(t);
1220 #endif
1221 #if defined(GRAN)
1222       /* add a ContinueThread event to actually process the thread */
1223       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1224                 ContinueThread,
1225                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1226       IF_GRAN_DEBUG(bq, 
1227                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1228                G_EVENTQ(0);
1229                G_CURR_THREADQ(0));
1230 #endif /* GRAN */
1231       break;
1232       
1233     case ThreadBlocked:
1234 #if defined(GRAN)
1235       IF_DEBUG(scheduler,
1236                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1237                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1238                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1239
1240       // ??? needed; should emit block before
1241       IF_DEBUG(gran, 
1242                DumpGranEvent(GR_DESCHEDULE, t)); 
1243       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1244       /*
1245         ngoq Dogh!
1246       ASSERT(procStatus[CurrentProc]==Busy || 
1247               ((procStatus[CurrentProc]==Fetching) && 
1248               (t->block_info.closure!=(StgClosure*)NULL)));
1249       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1250           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1251             procStatus[CurrentProc]==Fetching)) 
1252         procStatus[CurrentProc] = Idle;
1253       */
1254 #elif defined(PAR)
1255       IF_DEBUG(scheduler,
1256                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1257                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1258       IF_PAR_DEBUG(bq,
1259
1260                    if (t->block_info.closure!=(StgClosure*)NULL) 
1261                      print_bq(t->block_info.closure));
1262
1263       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1264       blockThread(t);
1265
1266       /* whatever we schedule next, we must log that schedule */
1267       emitSchedule = rtsTrue;
1268
1269 #else /* !GRAN */
1270       /* don't need to do anything.  Either the thread is blocked on
1271        * I/O, in which case we'll have called addToBlockedQueue
1272        * previously, or it's blocked on an MVar or Blackhole, in which
1273        * case it'll be on the relevant queue already.
1274        */
1275       IF_DEBUG(scheduler,
1276                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1277                printThreadBlockage(t);
1278                fprintf(stderr, "\n"));
1279
1280       /* Only for dumping event to log file 
1281          ToDo: do I need this in GranSim, too?
1282       blockThread(t);
1283       */
1284 #endif
1285       threadPaused(t);
1286       break;
1287       
1288     case ThreadFinished:
1289       /* Need to check whether this was a main thread, and if so, signal
1290        * the task that started it with the return value.  If we have no
1291        * more main threads, we probably need to stop all the tasks until
1292        * we get a new one.
1293        */
1294       /* We also end up here if the thread kills itself with an
1295        * uncaught exception, see Exception.hc.
1296        */
1297       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1298 #if defined(GRAN)
1299       endThread(t, CurrentProc); // clean-up the thread
1300 #elif defined(PAR)
1301       /* For now all are advisory -- HWL */
1302       //if(t->priority==AdvisoryPriority) ??
1303       advisory_thread_count--;
1304       
1305 # ifdef DIST
1306       if(t->dist.priority==RevalPriority)
1307         FinishReval(t);
1308 # endif
1309       
1310       if (RtsFlags.ParFlags.ParStats.Full &&
1311           !RtsFlags.ParFlags.ParStats.Suppressed) 
1312         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1313 #endif
1314       break;
1315       
1316     default:
1317       barf("schedule: invalid thread return code %d", (int)ret);
1318     }
1319     
1320 #if defined(RTS_SUPPORTS_THREADS)
1321     /* I don't understand what this re-grab is doing -- sof */
1322     grabCapability(&cap);
1323 #endif
1324
1325 #ifdef PROFILING
1326     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1327         GarbageCollect(GetRoots, rtsTrue);
1328         heapCensus();
1329         performHeapProfile = rtsFalse;
1330         ready_to_gc = rtsFalse; // we already GC'd
1331     }
1332 #endif
1333
1334     if (ready_to_gc 
1335 #ifdef SMP
1336         && allFreeCapabilities() 
1337 #endif
1338         ) {
1339       /* everybody back, start the GC.
1340        * Could do it in this thread, or signal a condition var
1341        * to do it in another thread.  Either way, we need to
1342        * broadcast on gc_pending_cond afterward.
1343        */
1344 #if defined(RTS_SUPPORTS_THREADS)
1345       IF_DEBUG(scheduler,sched_belch("doing GC"));
1346 #endif
1347       GarbageCollect(GetRoots,rtsFalse);
1348       ready_to_gc = rtsFalse;
1349 #ifdef SMP
1350       broadcastCondition(&gc_pending_cond);
1351 #endif
1352 #if defined(GRAN)
1353       /* add a ContinueThread event to continue execution of current thread */
1354       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1355                 ContinueThread,
1356                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1357       IF_GRAN_DEBUG(bq, 
1358                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1359                G_EVENTQ(0);
1360                G_CURR_THREADQ(0));
1361 #endif /* GRAN */
1362     }
1363
1364 #if defined(GRAN)
1365   next_thread:
1366     IF_GRAN_DEBUG(unused,
1367                   print_eventq(EventHd));
1368
1369     event = get_next_event();
1370 #elif defined(PAR)
1371   next_thread:
1372     /* ToDo: wait for next message to arrive rather than busy wait */
1373 #endif /* GRAN */
1374
1375   } /* end of while(1) */
1376
1377   IF_PAR_DEBUG(verbose,
1378                belch("== Leaving schedule() after having received Finish"));
1379 }
1380
1381 /* ---------------------------------------------------------------------------
1382  * deleteAllThreads():  kill all the live threads.
1383  *
1384  * This is used when we catch a user interrupt (^C), before performing
1385  * any necessary cleanups and running finalizers.
1386  * ------------------------------------------------------------------------- */
1387    
1388 void deleteAllThreads ( void )
1389 {
1390   StgTSO* t, *next;
1391   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1392   for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1393       next = t->link;
1394       deleteThread(t);
1395   }
1396   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1397       next = t->link;
1398       deleteThread(t);
1399   }
1400   for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1401       next = t->link;
1402       deleteThread(t);
1403   }
1404   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1405   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1406   sleeping_queue = END_TSO_QUEUE;
1407 }
1408
1409 /* startThread and  insertThread are now in GranSim.c -- HWL */
1410
1411
1412 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1413 //@subsection Suspend and Resume
1414
1415 /* ---------------------------------------------------------------------------
1416  * Suspending & resuming Haskell threads.
1417  * 
1418  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1419  * its capability before calling the C function.  This allows another
1420  * task to pick up the capability and carry on running Haskell
1421  * threads.  It also means that if the C call blocks, it won't lock
1422  * the whole system.
1423  *
1424  * The Haskell thread making the C call is put to sleep for the
1425  * duration of the call, on the susepended_ccalling_threads queue.  We
1426  * give out a token to the task, which it can use to resume the thread
1427  * on return from the C function.
1428  * ------------------------------------------------------------------------- */
1429    
1430 StgInt
1431 suspendThread( StgRegTable *reg )
1432 {
1433   nat tok;
1434   Capability *cap;
1435
1436   /* assume that *reg is a pointer to the StgRegTable part
1437    * of a Capability.
1438    */
1439   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1440
1441   ACQUIRE_LOCK(&sched_mutex);
1442
1443   IF_DEBUG(scheduler,
1444            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1445
1446   threadPaused(cap->r.rCurrentTSO);
1447   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1448   suspended_ccalling_threads = cap->r.rCurrentTSO;
1449
1450 #if defined(RTS_SUPPORTS_THREADS)
1451   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1452 #endif
1453
1454   /* Use the thread ID as the token; it should be unique */
1455   tok = cap->r.rCurrentTSO->id;
1456
1457   /* Hand back capability */
1458   releaseCapability(cap);
1459   
1460 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1461   /* Preparing to leave the RTS, so ensure there's a native thread/task
1462      waiting to take over.
1463      
1464      ToDo: optimise this and only create a new task if there's a need
1465      for one (i.e., if there's only one Concurrent Haskell thread alive,
1466      there's no need to create a new task).
1467   */
1468   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1469   startTask(taskStart);
1470 #endif
1471
1472   THREAD_RUNNABLE();
1473   RELEASE_LOCK(&sched_mutex);
1474   return tok; 
1475 }
1476
1477 StgRegTable *
1478 resumeThread( StgInt tok )
1479 {
1480   StgTSO *tso, **prev;
1481   Capability *cap;
1482
1483 #if defined(RTS_SUPPORTS_THREADS)
1484   /* Wait for permission to re-enter the RTS with the result. */
1485   grabReturnCapability(&sched_mutex, &cap);
1486 #else
1487   grabCapability(&cap);
1488 #endif
1489
1490   /* Remove the thread off of the suspended list */
1491   prev = &suspended_ccalling_threads;
1492   for (tso = suspended_ccalling_threads; 
1493        tso != END_TSO_QUEUE; 
1494        prev = &tso->link, tso = tso->link) {
1495     if (tso->id == (StgThreadID)tok) {
1496       *prev = tso->link;
1497       break;
1498     }
1499   }
1500   if (tso == END_TSO_QUEUE) {
1501     barf("resumeThread: thread not found");
1502   }
1503   tso->link = END_TSO_QUEUE;
1504   /* Reset blocking status */
1505   tso->why_blocked  = NotBlocked;
1506
1507   RELEASE_LOCK(&sched_mutex);
1508
1509   cap->r.rCurrentTSO = tso;
1510   return &cap->r;
1511 }
1512
1513
1514 /* ---------------------------------------------------------------------------
1515  * Static functions
1516  * ------------------------------------------------------------------------ */
1517 static void unblockThread(StgTSO *tso);
1518
1519 /* ---------------------------------------------------------------------------
1520  * Comparing Thread ids.
1521  *
1522  * This is used from STG land in the implementation of the
1523  * instances of Eq/Ord for ThreadIds.
1524  * ------------------------------------------------------------------------ */
1525
1526 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1527
1528   StgThreadID id1 = tso1->id; 
1529   StgThreadID id2 = tso2->id;
1530  
1531   if (id1 < id2) return (-1);
1532   if (id1 > id2) return 1;
1533   return 0;
1534 }
1535
1536 /* ---------------------------------------------------------------------------
1537  * Fetching the ThreadID from an StgTSO.
1538  *
1539  * This is used in the implementation of Show for ThreadIds.
1540  * ------------------------------------------------------------------------ */
1541 int rts_getThreadId(const StgTSO *tso) 
1542 {
1543   return tso->id;
1544 }
1545
1546 /* ---------------------------------------------------------------------------
1547    Create a new thread.
1548
1549    The new thread starts with the given stack size.  Before the
1550    scheduler can run, however, this thread needs to have a closure
1551    (and possibly some arguments) pushed on its stack.  See
1552    pushClosure() in Schedule.h.
1553
1554    createGenThread() and createIOThread() (in SchedAPI.h) are
1555    convenient packaged versions of this function.
1556
1557    currently pri (priority) is only used in a GRAN setup -- HWL
1558    ------------------------------------------------------------------------ */
1559 //@cindex createThread
1560 #if defined(GRAN)
1561 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1562 StgTSO *
1563 createThread(nat stack_size, StgInt pri)
1564 {
1565   return createThread_(stack_size, rtsFalse, pri);
1566 }
1567
1568 static StgTSO *
1569 createThread_(nat size, rtsBool have_lock, StgInt pri)
1570 {
1571 #else
1572 StgTSO *
1573 createThread(nat stack_size)
1574 {
1575   return createThread_(stack_size, rtsFalse);
1576 }
1577
1578 static StgTSO *
1579 createThread_(nat size, rtsBool have_lock)
1580 {
1581 #endif
1582
1583     StgTSO *tso;
1584     nat stack_size;
1585
1586     /* First check whether we should create a thread at all */
1587 #if defined(PAR)
1588   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1589   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1590     threadsIgnored++;
1591     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1592           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1593     return END_TSO_QUEUE;
1594   }
1595   threadsCreated++;
1596 #endif
1597
1598 #if defined(GRAN)
1599   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1600 #endif
1601
1602   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1603
1604   /* catch ridiculously small stack sizes */
1605   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1606     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1607   }
1608
1609   stack_size = size - TSO_STRUCT_SIZEW;
1610
1611   tso = (StgTSO *)allocate(size);
1612   TICK_ALLOC_TSO(stack_size, 0);
1613
1614   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1615 #if defined(GRAN)
1616   SET_GRAN_HDR(tso, ThisPE);
1617 #endif
1618   tso->what_next     = ThreadEnterGHC;
1619
1620   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1621    * protect the increment operation on next_thread_id.
1622    * In future, we could use an atomic increment instead.
1623    */
1624   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1625   tso->id = next_thread_id++; 
1626   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1627
1628   tso->why_blocked  = NotBlocked;
1629   tso->blocked_exceptions = NULL;
1630
1631   tso->stack_size   = stack_size;
1632   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1633                               - TSO_STRUCT_SIZEW;
1634   tso->sp           = (P_)&(tso->stack) + stack_size;
1635
1636 #ifdef PROFILING
1637   tso->prof.CCCS = CCS_MAIN;
1638 #endif
1639
1640   /* put a stop frame on the stack */
1641   tso->sp -= sizeofW(StgStopFrame);
1642   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1643   tso->su = (StgUpdateFrame*)tso->sp;
1644
1645   // ToDo: check this
1646 #if defined(GRAN)
1647   tso->link = END_TSO_QUEUE;
1648   /* uses more flexible routine in GranSim */
1649   insertThread(tso, CurrentProc);
1650 #else
1651   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1652    * from its creation
1653    */
1654 #endif
1655
1656 #if defined(GRAN) 
1657   if (RtsFlags.GranFlags.GranSimStats.Full) 
1658     DumpGranEvent(GR_START,tso);
1659 #elif defined(PAR)
1660   if (RtsFlags.ParFlags.ParStats.Full) 
1661     DumpGranEvent(GR_STARTQ,tso);
1662   /* HACk to avoid SCHEDULE 
1663      LastTSO = tso; */
1664 #endif
1665
1666   /* Link the new thread on the global thread list.
1667    */
1668   tso->global_link = all_threads;
1669   all_threads = tso;
1670
1671 #if defined(DIST)
1672   tso->dist.priority = MandatoryPriority; //by default that is...
1673 #endif
1674
1675 #if defined(GRAN)
1676   tso->gran.pri = pri;
1677 # if defined(DEBUG)
1678   tso->gran.magic = TSO_MAGIC; // debugging only
1679 # endif
1680   tso->gran.sparkname   = 0;
1681   tso->gran.startedat   = CURRENT_TIME; 
1682   tso->gran.exported    = 0;
1683   tso->gran.basicblocks = 0;
1684   tso->gran.allocs      = 0;
1685   tso->gran.exectime    = 0;
1686   tso->gran.fetchtime   = 0;
1687   tso->gran.fetchcount  = 0;
1688   tso->gran.blocktime   = 0;
1689   tso->gran.blockcount  = 0;
1690   tso->gran.blockedat   = 0;
1691   tso->gran.globalsparks = 0;
1692   tso->gran.localsparks  = 0;
1693   if (RtsFlags.GranFlags.Light)
1694     tso->gran.clock  = Now; /* local clock */
1695   else
1696     tso->gran.clock  = 0;
1697
1698   IF_DEBUG(gran,printTSO(tso));
1699 #elif defined(PAR)
1700 # if defined(DEBUG)
1701   tso->par.magic = TSO_MAGIC; // debugging only
1702 # endif
1703   tso->par.sparkname   = 0;
1704   tso->par.startedat   = CURRENT_TIME; 
1705   tso->par.exported    = 0;
1706   tso->par.basicblocks = 0;
1707   tso->par.allocs      = 0;
1708   tso->par.exectime    = 0;
1709   tso->par.fetchtime   = 0;
1710   tso->par.fetchcount  = 0;
1711   tso->par.blocktime   = 0;
1712   tso->par.blockcount  = 0;
1713   tso->par.blockedat   = 0;
1714   tso->par.globalsparks = 0;
1715   tso->par.localsparks  = 0;
1716 #endif
1717
1718 #if defined(GRAN)
1719   globalGranStats.tot_threads_created++;
1720   globalGranStats.threads_created_on_PE[CurrentProc]++;
1721   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1722   globalGranStats.tot_sq_probes++;
1723 #elif defined(PAR)
1724   // collect parallel global statistics (currently done together with GC stats)
1725   if (RtsFlags.ParFlags.ParStats.Global &&
1726       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1727     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1728     globalParStats.tot_threads_created++;
1729   }
1730 #endif 
1731
1732 #if defined(GRAN)
1733   IF_GRAN_DEBUG(pri,
1734                 belch("==__ schedule: Created TSO %d (%p);",
1735                       CurrentProc, tso, tso->id));
1736 #elif defined(PAR)
1737     IF_PAR_DEBUG(verbose,
1738                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1739                        tso->id, tso, advisory_thread_count));
1740 #else
1741   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1742                                  tso->id, tso->stack_size));
1743 #endif    
1744   return tso;
1745 }
1746
1747 #if defined(PAR)
1748 /* RFP:
1749    all parallel thread creation calls should fall through the following routine.
1750 */
1751 StgTSO *
1752 createSparkThread(rtsSpark spark) 
1753 { StgTSO *tso;
1754   ASSERT(spark != (rtsSpark)NULL);
1755   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1756   { threadsIgnored++;
1757     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1758           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1759     return END_TSO_QUEUE;
1760   }
1761   else
1762   { threadsCreated++;
1763     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1764     if (tso==END_TSO_QUEUE)     
1765       barf("createSparkThread: Cannot create TSO");
1766 #if defined(DIST)
1767     tso->priority = AdvisoryPriority;
1768 #endif
1769     pushClosure(tso,spark);
1770     PUSH_ON_RUN_QUEUE(tso);
1771     advisory_thread_count++;    
1772   }
1773   return tso;
1774 }
1775 #endif
1776
1777 /*
1778   Turn a spark into a thread.
1779   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1780 */
1781 #if defined(PAR)
1782 //@cindex activateSpark
1783 StgTSO *
1784 activateSpark (rtsSpark spark) 
1785 {
1786   StgTSO *tso;
1787
1788   tso = createSparkThread(spark);
1789   if (RtsFlags.ParFlags.ParStats.Full) {   
1790     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1791     IF_PAR_DEBUG(verbose,
1792                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1793                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1794   }
1795   // ToDo: fwd info on local/global spark to thread -- HWL
1796   // tso->gran.exported =  spark->exported;
1797   // tso->gran.locked =   !spark->global;
1798   // tso->gran.sparkname = spark->name;
1799
1800   return tso;
1801 }
1802 #endif
1803
1804 /* ---------------------------------------------------------------------------
1805  * scheduleThread()
1806  *
1807  * scheduleThread puts a thread on the head of the runnable queue.
1808  * This will usually be done immediately after a thread is created.
1809  * The caller of scheduleThread must create the thread using e.g.
1810  * createThread and push an appropriate closure
1811  * on this thread's stack before the scheduler is invoked.
1812  * ------------------------------------------------------------------------ */
1813
1814 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1815
1816 void
1817 scheduleThread_(StgTSO *tso
1818                , rtsBool createTask
1819 #if !defined(THREADED_RTS)
1820                  STG_UNUSED
1821 #endif
1822               )
1823 {
1824   ACQUIRE_LOCK(&sched_mutex);
1825
1826   /* Put the new thread on the head of the runnable queue.  The caller
1827    * better push an appropriate closure on this thread's stack
1828    * beforehand.  In the SMP case, the thread may start running as
1829    * soon as we release the scheduler lock below.
1830    */
1831   PUSH_ON_RUN_QUEUE(tso);
1832 #if defined(THREADED_RTS)
1833   /* If main() is scheduling a thread, don't bother creating a 
1834    * new task.
1835    */
1836   if ( createTask ) {
1837     startTask(taskStart);
1838   }
1839 #endif
1840   THREAD_RUNNABLE();
1841
1842 #if 0
1843   IF_DEBUG(scheduler,printTSO(tso));
1844 #endif
1845   RELEASE_LOCK(&sched_mutex);
1846 }
1847
1848 void scheduleThread(StgTSO* tso)
1849 {
1850   return scheduleThread_(tso, rtsFalse);
1851 }
1852
1853 void scheduleExtThread(StgTSO* tso)
1854 {
1855   return scheduleThread_(tso, rtsTrue);
1856 }
1857
1858 /* ---------------------------------------------------------------------------
1859  * initScheduler()
1860  *
1861  * Initialise the scheduler.  This resets all the queues - if the
1862  * queues contained any threads, they'll be garbage collected at the
1863  * next pass.
1864  *
1865  * ------------------------------------------------------------------------ */
1866
1867 #ifdef SMP
1868 static void
1869 term_handler(int sig STG_UNUSED)
1870 {
1871   stat_workerStop();
1872   ACQUIRE_LOCK(&term_mutex);
1873   await_death--;
1874   RELEASE_LOCK(&term_mutex);
1875   shutdownThread();
1876 }
1877 #endif
1878
1879 void 
1880 initScheduler(void)
1881 {
1882 #if defined(GRAN)
1883   nat i;
1884
1885   for (i=0; i<=MAX_PROC; i++) {
1886     run_queue_hds[i]      = END_TSO_QUEUE;
1887     run_queue_tls[i]      = END_TSO_QUEUE;
1888     blocked_queue_hds[i]  = END_TSO_QUEUE;
1889     blocked_queue_tls[i]  = END_TSO_QUEUE;
1890     ccalling_threadss[i]  = END_TSO_QUEUE;
1891     sleeping_queue        = END_TSO_QUEUE;
1892   }
1893 #else
1894   run_queue_hd      = END_TSO_QUEUE;
1895   run_queue_tl      = END_TSO_QUEUE;
1896   blocked_queue_hd  = END_TSO_QUEUE;
1897   blocked_queue_tl  = END_TSO_QUEUE;
1898   sleeping_queue    = END_TSO_QUEUE;
1899 #endif 
1900
1901   suspended_ccalling_threads  = END_TSO_QUEUE;
1902
1903   main_threads = NULL;
1904   all_threads  = END_TSO_QUEUE;
1905
1906   context_switch = 0;
1907   interrupted    = 0;
1908
1909   RtsFlags.ConcFlags.ctxtSwitchTicks =
1910       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1911       
1912 #if defined(RTS_SUPPORTS_THREADS)
1913   /* Initialise the mutex and condition variables used by
1914    * the scheduler. */
1915   initMutex(&sched_mutex);
1916   initMutex(&term_mutex);
1917
1918   initCondition(&thread_ready_cond);
1919 #endif
1920   
1921 #if defined(SMP)
1922   initCondition(&gc_pending_cond);
1923 #endif
1924
1925 #if defined(RTS_SUPPORTS_THREADS)
1926   ACQUIRE_LOCK(&sched_mutex);
1927 #endif
1928
1929   /* Install the SIGHUP handler */
1930 #if defined(SMP)
1931   {
1932     struct sigaction action,oact;
1933
1934     action.sa_handler = term_handler;
1935     sigemptyset(&action.sa_mask);
1936     action.sa_flags = 0;
1937     if (sigaction(SIGTERM, &action, &oact) != 0) {
1938       barf("can't install TERM handler");
1939     }
1940   }
1941 #endif
1942
1943   /* A capability holds the state a native thread needs in
1944    * order to execute STG code. At least one capability is
1945    * floating around (only SMP builds have more than one).
1946    */
1947   initCapabilities();
1948   
1949 #if defined(RTS_SUPPORTS_THREADS)
1950     /* start our haskell execution tasks */
1951 # if defined(SMP)
1952     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
1953 # else
1954     startTaskManager(0,taskStart);
1955 # endif
1956 #endif
1957
1958 #if /* defined(SMP) ||*/ defined(PAR)
1959   initSparkPools();
1960 #endif
1961
1962 #if defined(RTS_SUPPORTS_THREADS)
1963   RELEASE_LOCK(&sched_mutex);
1964 #endif
1965
1966 }
1967
1968 void
1969 exitScheduler( void )
1970 {
1971 #if defined(RTS_SUPPORTS_THREADS)
1972   stopTaskManager();
1973 #endif
1974 }
1975
1976 /* -----------------------------------------------------------------------------
1977    Managing the per-task allocation areas.
1978    
1979    Each capability comes with an allocation area.  These are
1980    fixed-length block lists into which allocation can be done.
1981
1982    ToDo: no support for two-space collection at the moment???
1983    -------------------------------------------------------------------------- */
1984
1985 /* -----------------------------------------------------------------------------
1986  * waitThread is the external interface for running a new computation
1987  * and waiting for the result.
1988  *
1989  * In the non-SMP case, we create a new main thread, push it on the 
1990  * main-thread stack, and invoke the scheduler to run it.  The
1991  * scheduler will return when the top main thread on the stack has
1992  * completed or died, and fill in the necessary fields of the
1993  * main_thread structure.
1994  *
1995  * In the SMP case, we create a main thread as before, but we then
1996  * create a new condition variable and sleep on it.  When our new
1997  * main thread has completed, we'll be woken up and the status/result
1998  * will be in the main_thread struct.
1999  * -------------------------------------------------------------------------- */
2000
2001 int 
2002 howManyThreadsAvail ( void )
2003 {
2004    int i = 0;
2005    StgTSO* q;
2006    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2007       i++;
2008    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2009       i++;
2010    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2011       i++;
2012    return i;
2013 }
2014
2015 void
2016 finishAllThreads ( void )
2017 {
2018    do {
2019       while (run_queue_hd != END_TSO_QUEUE) {
2020          waitThread ( run_queue_hd, NULL);
2021       }
2022       while (blocked_queue_hd != END_TSO_QUEUE) {
2023          waitThread ( blocked_queue_hd, NULL);
2024       }
2025       while (sleeping_queue != END_TSO_QUEUE) {
2026          waitThread ( blocked_queue_hd, NULL);
2027       }
2028    } while 
2029       (blocked_queue_hd != END_TSO_QUEUE || 
2030        run_queue_hd     != END_TSO_QUEUE ||
2031        sleeping_queue   != END_TSO_QUEUE);
2032 }
2033
2034 SchedulerStatus
2035 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2036
2037 #if defined(THREADED_RTS)
2038   return waitThread_(tso,ret, rtsFalse);
2039 #else
2040   return waitThread_(tso,ret);
2041 #endif
2042 }
2043
2044 SchedulerStatus
2045 waitThread_(StgTSO *tso,
2046             /*out*/StgClosure **ret
2047 #if defined(THREADED_RTS)
2048             , rtsBool blockWaiting
2049 #endif
2050            )
2051 {
2052   StgMainThread *m;
2053   SchedulerStatus stat;
2054
2055   ACQUIRE_LOCK(&sched_mutex);
2056   
2057   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2058
2059   m->tso = tso;
2060   m->ret = ret;
2061   m->stat = NoStatus;
2062 #if defined(RTS_SUPPORTS_THREADS)
2063   initCondition(&m->wakeup);
2064 #endif
2065
2066   m->link = main_threads;
2067   main_threads = m;
2068
2069   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2070
2071 #if defined(RTS_SUPPORTS_THREADS)
2072
2073 # if defined(THREADED_RTS)
2074   if (!blockWaiting) {
2075     /* In the threaded case, the OS thread that called main()
2076      * gets to enter the RTS directly without going via another
2077      * task/thread.
2078      */
2079     RELEASE_LOCK(&sched_mutex);
2080     schedule();
2081     ASSERT(m->stat != NoStatus);
2082   } else 
2083 # endif
2084   {
2085     IF_DEBUG(scheduler, sched_belch("sfoo"));
2086     do {
2087       waitCondition(&m->wakeup, &sched_mutex);
2088     } while (m->stat == NoStatus);
2089   }
2090 #elif defined(GRAN)
2091   /* GranSim specific init */
2092   CurrentTSO = m->tso;                // the TSO to run
2093   procStatus[MainProc] = Busy;        // status of main PE
2094   CurrentProc = MainProc;             // PE to run it on
2095
2096   schedule();
2097 #else
2098   RELEASE_LOCK(&sched_mutex);
2099   schedule();
2100   ASSERT(m->stat != NoStatus);
2101 #endif
2102
2103   stat = m->stat;
2104
2105 #if defined(RTS_SUPPORTS_THREADS)
2106   closeCondition(&m->wakeup);
2107 #endif
2108
2109   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2110                               m->tso->id));
2111   free(m);
2112
2113 #if defined(THREADED_RTS)
2114   if (blockWaiting) 
2115 #endif
2116     RELEASE_LOCK(&sched_mutex);
2117
2118   return stat;
2119 }
2120
2121 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2122 //@subsection Run queue code 
2123
2124 #if 0
2125 /* 
2126    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2127        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2128        implicit global variable that has to be correct when calling these
2129        fcts -- HWL 
2130 */
2131
2132 /* Put the new thread on the head of the runnable queue.
2133  * The caller of createThread better push an appropriate closure
2134  * on this thread's stack before the scheduler is invoked.
2135  */
2136 static /* inline */ void
2137 add_to_run_queue(tso)
2138 StgTSO* tso; 
2139 {
2140   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2141   tso->link = run_queue_hd;
2142   run_queue_hd = tso;
2143   if (run_queue_tl == END_TSO_QUEUE) {
2144     run_queue_tl = tso;
2145   }
2146 }
2147
2148 /* Put the new thread at the end of the runnable queue. */
2149 static /* inline */ void
2150 push_on_run_queue(tso)
2151 StgTSO* tso; 
2152 {
2153   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2154   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2155   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2156   if (run_queue_hd == END_TSO_QUEUE) {
2157     run_queue_hd = tso;
2158   } else {
2159     run_queue_tl->link = tso;
2160   }
2161   run_queue_tl = tso;
2162 }
2163
2164 /* 
2165    Should be inlined because it's used very often in schedule.  The tso
2166    argument is actually only needed in GranSim, where we want to have the
2167    possibility to schedule *any* TSO on the run queue, irrespective of the
2168    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2169    the run queue and dequeue the tso, adjusting the links in the queue. 
2170 */
2171 //@cindex take_off_run_queue
2172 static /* inline */ StgTSO*
2173 take_off_run_queue(StgTSO *tso) {
2174   StgTSO *t, *prev;
2175
2176   /* 
2177      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2178
2179      if tso is specified, unlink that tso from the run_queue (doesn't have
2180      to be at the beginning of the queue); GranSim only 
2181   */
2182   if (tso!=END_TSO_QUEUE) {
2183     /* find tso in queue */
2184     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2185          t!=END_TSO_QUEUE && t!=tso;
2186          prev=t, t=t->link) 
2187       /* nothing */ ;
2188     ASSERT(t==tso);
2189     /* now actually dequeue the tso */
2190     if (prev!=END_TSO_QUEUE) {
2191       ASSERT(run_queue_hd!=t);
2192       prev->link = t->link;
2193     } else {
2194       /* t is at beginning of thread queue */
2195       ASSERT(run_queue_hd==t);
2196       run_queue_hd = t->link;
2197     }
2198     /* t is at end of thread queue */
2199     if (t->link==END_TSO_QUEUE) {
2200       ASSERT(t==run_queue_tl);
2201       run_queue_tl = prev;
2202     } else {
2203       ASSERT(run_queue_tl!=t);
2204     }
2205     t->link = END_TSO_QUEUE;
2206   } else {
2207     /* take tso from the beginning of the queue; std concurrent code */
2208     t = run_queue_hd;
2209     if (t != END_TSO_QUEUE) {
2210       run_queue_hd = t->link;
2211       t->link = END_TSO_QUEUE;
2212       if (run_queue_hd == END_TSO_QUEUE) {
2213         run_queue_tl = END_TSO_QUEUE;
2214       }
2215     }
2216   }
2217   return t;
2218 }
2219
2220 #endif /* 0 */
2221
2222 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2223 //@subsection Garbage Collextion Routines
2224
2225 /* ---------------------------------------------------------------------------
2226    Where are the roots that we know about?
2227
2228         - all the threads on the runnable queue
2229         - all the threads on the blocked queue
2230         - all the threads on the sleeping queue
2231         - all the thread currently executing a _ccall_GC
2232         - all the "main threads"
2233      
2234    ------------------------------------------------------------------------ */
2235
2236 /* This has to be protected either by the scheduler monitor, or by the
2237         garbage collection monitor (probably the latter).
2238         KH @ 25/10/99
2239 */
2240
2241 void
2242 GetRoots(evac_fn evac)
2243 {
2244   StgMainThread *m;
2245
2246 #if defined(GRAN)
2247   {
2248     nat i;
2249     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2250       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2251           evac((StgClosure **)&run_queue_hds[i]);
2252       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2253           evac((StgClosure **)&run_queue_tls[i]);
2254       
2255       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2256           evac((StgClosure **)&blocked_queue_hds[i]);
2257       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2258           evac((StgClosure **)&blocked_queue_tls[i]);
2259       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2260           evac((StgClosure **)&ccalling_threads[i]);
2261     }
2262   }
2263
2264   markEventQueue();
2265
2266 #else /* !GRAN */
2267   if (run_queue_hd != END_TSO_QUEUE) {
2268       ASSERT(run_queue_tl != END_TSO_QUEUE);
2269       evac((StgClosure **)&run_queue_hd);
2270       evac((StgClosure **)&run_queue_tl);
2271   }
2272   
2273   if (blocked_queue_hd != END_TSO_QUEUE) {
2274       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2275       evac((StgClosure **)&blocked_queue_hd);
2276       evac((StgClosure **)&blocked_queue_tl);
2277   }
2278   
2279   if (sleeping_queue != END_TSO_QUEUE) {
2280       evac((StgClosure **)&sleeping_queue);
2281   }
2282 #endif 
2283
2284   for (m = main_threads; m != NULL; m = m->link) {
2285       evac((StgClosure **)&m->tso);
2286   }
2287   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2288       evac((StgClosure **)&suspended_ccalling_threads);
2289   }
2290
2291 #if defined(PAR) || defined(GRAN)
2292   markSparkQueue(evac);
2293 #endif
2294 }
2295
2296 /* -----------------------------------------------------------------------------
2297    performGC
2298
2299    This is the interface to the garbage collector from Haskell land.
2300    We provide this so that external C code can allocate and garbage
2301    collect when called from Haskell via _ccall_GC.
2302
2303    It might be useful to provide an interface whereby the programmer
2304    can specify more roots (ToDo).
2305    
2306    This needs to be protected by the GC condition variable above.  KH.
2307    -------------------------------------------------------------------------- */
2308
2309 void (*extra_roots)(evac_fn);
2310
2311 void
2312 performGC(void)
2313 {
2314   GarbageCollect(GetRoots,rtsFalse);
2315 }
2316
2317 void
2318 performMajorGC(void)
2319 {
2320   GarbageCollect(GetRoots,rtsTrue);
2321 }
2322
2323 static void
2324 AllRoots(evac_fn evac)
2325 {
2326     GetRoots(evac);             // the scheduler's roots
2327     extra_roots(evac);          // the user's roots
2328 }
2329
2330 void
2331 performGCWithRoots(void (*get_roots)(evac_fn))
2332 {
2333   extra_roots = get_roots;
2334   GarbageCollect(AllRoots,rtsFalse);
2335 }
2336
2337 /* -----------------------------------------------------------------------------
2338    Stack overflow
2339
2340    If the thread has reached its maximum stack size, then raise the
2341    StackOverflow exception in the offending thread.  Otherwise
2342    relocate the TSO into a larger chunk of memory and adjust its stack
2343    size appropriately.
2344    -------------------------------------------------------------------------- */
2345
2346 static StgTSO *
2347 threadStackOverflow(StgTSO *tso)
2348 {
2349   nat new_stack_size, new_tso_size, diff, stack_words;
2350   StgPtr new_sp;
2351   StgTSO *dest;
2352
2353   IF_DEBUG(sanity,checkTSO(tso));
2354   if (tso->stack_size >= tso->max_stack_size) {
2355
2356     IF_DEBUG(gc,
2357              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2358                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2359              /* If we're debugging, just print out the top of the stack */
2360              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2361                                               tso->sp+64)));
2362
2363     /* Send this thread the StackOverflow exception */
2364     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2365     return tso;
2366   }
2367
2368   /* Try to double the current stack size.  If that takes us over the
2369    * maximum stack size for this thread, then use the maximum instead.
2370    * Finally round up so the TSO ends up as a whole number of blocks.
2371    */
2372   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2373   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2374                                        TSO_STRUCT_SIZE)/sizeof(W_);
2375   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2376   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2377
2378   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2379
2380   dest = (StgTSO *)allocate(new_tso_size);
2381   TICK_ALLOC_TSO(new_stack_size,0);
2382
2383   /* copy the TSO block and the old stack into the new area */
2384   memcpy(dest,tso,TSO_STRUCT_SIZE);
2385   stack_words = tso->stack + tso->stack_size - tso->sp;
2386   new_sp = (P_)dest + new_tso_size - stack_words;
2387   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2388
2389   /* relocate the stack pointers... */
2390   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2391   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2392   dest->sp    = new_sp;
2393   dest->stack_size = new_stack_size;
2394         
2395   /* and relocate the update frame list */
2396   relocate_stack(dest, diff);
2397
2398   /* Mark the old TSO as relocated.  We have to check for relocated
2399    * TSOs in the garbage collector and any primops that deal with TSOs.
2400    *
2401    * It's important to set the sp and su values to just beyond the end
2402    * of the stack, so we don't attempt to scavenge any part of the
2403    * dead TSO's stack.
2404    */
2405   tso->what_next = ThreadRelocated;
2406   tso->link = dest;
2407   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2408   tso->su = (StgUpdateFrame *)tso->sp;
2409   tso->why_blocked = NotBlocked;
2410   dest->mut_link = NULL;
2411
2412   IF_PAR_DEBUG(verbose,
2413                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2414                      tso->id, tso, tso->stack_size);
2415                /* If we're debugging, just print out the top of the stack */
2416                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2417                                                 tso->sp+64)));
2418   
2419   IF_DEBUG(sanity,checkTSO(tso));
2420 #if 0
2421   IF_DEBUG(scheduler,printTSO(dest));
2422 #endif
2423
2424   return dest;
2425 }
2426
2427 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2428 //@subsection Blocking Queue Routines
2429
2430 /* ---------------------------------------------------------------------------
2431    Wake up a queue that was blocked on some resource.
2432    ------------------------------------------------------------------------ */
2433
2434 #if defined(GRAN)
2435 static inline void
2436 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2437 {
2438 }
2439 #elif defined(PAR)
2440 static inline void
2441 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2442 {
2443   /* write RESUME events to log file and
2444      update blocked and fetch time (depending on type of the orig closure) */
2445   if (RtsFlags.ParFlags.ParStats.Full) {
2446     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2447                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2448                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2449     if (EMPTY_RUN_QUEUE())
2450       emitSchedule = rtsTrue;
2451
2452     switch (get_itbl(node)->type) {
2453         case FETCH_ME_BQ:
2454           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2455           break;
2456         case RBH:
2457         case FETCH_ME:
2458         case BLACKHOLE_BQ:
2459           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2460           break;
2461 #ifdef DIST
2462         case MVAR:
2463           break;
2464 #endif    
2465         default:
2466           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2467         }
2468       }
2469 }
2470 #endif
2471
2472 #if defined(GRAN)
2473 static StgBlockingQueueElement *
2474 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2475 {
2476     StgTSO *tso;
2477     PEs node_loc, tso_loc;
2478
2479     node_loc = where_is(node); // should be lifted out of loop
2480     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2481     tso_loc = where_is((StgClosure *)tso);
2482     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2483       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2484       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2485       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2486       // insertThread(tso, node_loc);
2487       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2488                 ResumeThread,
2489                 tso, node, (rtsSpark*)NULL);
2490       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2491       // len_local++;
2492       // len++;
2493     } else { // TSO is remote (actually should be FMBQ)
2494       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2495                                   RtsFlags.GranFlags.Costs.gunblocktime +
2496                                   RtsFlags.GranFlags.Costs.latency;
2497       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2498                 UnblockThread,
2499                 tso, node, (rtsSpark*)NULL);
2500       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2501       // len++;
2502     }
2503     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2504     IF_GRAN_DEBUG(bq,
2505                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2506                           (node_loc==tso_loc ? "Local" : "Global"), 
2507                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2508     tso->block_info.closure = NULL;
2509     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2510                              tso->id, tso));
2511 }
2512 #elif defined(PAR)
2513 static StgBlockingQueueElement *
2514 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2515 {
2516     StgBlockingQueueElement *next;
2517
2518     switch (get_itbl(bqe)->type) {
2519     case TSO:
2520       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2521       /* if it's a TSO just push it onto the run_queue */
2522       next = bqe->link;
2523       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2524       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2525       THREAD_RUNNABLE();
2526       unblockCount(bqe, node);
2527       /* reset blocking status after dumping event */
2528       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2529       break;
2530
2531     case BLOCKED_FETCH:
2532       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2533       next = bqe->link;
2534       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2535       PendingFetches = (StgBlockedFetch *)bqe;
2536       break;
2537
2538 # if defined(DEBUG)
2539       /* can ignore this case in a non-debugging setup; 
2540          see comments on RBHSave closures above */
2541     case CONSTR:
2542       /* check that the closure is an RBHSave closure */
2543       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2544              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2545              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2546       break;
2547
2548     default:
2549       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2550            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2551            (StgClosure *)bqe);
2552 # endif
2553     }
2554   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2555   return next;
2556 }
2557
2558 #else /* !GRAN && !PAR */
2559 static StgTSO *
2560 unblockOneLocked(StgTSO *tso)
2561 {
2562   StgTSO *next;
2563
2564   ASSERT(get_itbl(tso)->type == TSO);
2565   ASSERT(tso->why_blocked != NotBlocked);
2566   tso->why_blocked = NotBlocked;
2567   next = tso->link;
2568   PUSH_ON_RUN_QUEUE(tso);
2569   THREAD_RUNNABLE();
2570   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2571   return next;
2572 }
2573 #endif
2574
2575 #if defined(GRAN) || defined(PAR)
2576 inline StgBlockingQueueElement *
2577 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2578 {
2579   ACQUIRE_LOCK(&sched_mutex);
2580   bqe = unblockOneLocked(bqe, node);
2581   RELEASE_LOCK(&sched_mutex);
2582   return bqe;
2583 }
2584 #else
2585 inline StgTSO *
2586 unblockOne(StgTSO *tso)
2587 {
2588   ACQUIRE_LOCK(&sched_mutex);
2589   tso = unblockOneLocked(tso);
2590   RELEASE_LOCK(&sched_mutex);
2591   return tso;
2592 }
2593 #endif
2594
2595 #if defined(GRAN)
2596 void 
2597 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2598 {
2599   StgBlockingQueueElement *bqe;
2600   PEs node_loc;
2601   nat len = 0; 
2602
2603   IF_GRAN_DEBUG(bq, 
2604                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2605                       node, CurrentProc, CurrentTime[CurrentProc], 
2606                       CurrentTSO->id, CurrentTSO));
2607
2608   node_loc = where_is(node);
2609
2610   ASSERT(q == END_BQ_QUEUE ||
2611          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2612          get_itbl(q)->type == CONSTR); // closure (type constructor)
2613   ASSERT(is_unique(node));
2614
2615   /* FAKE FETCH: magically copy the node to the tso's proc;
2616      no Fetch necessary because in reality the node should not have been 
2617      moved to the other PE in the first place
2618   */
2619   if (CurrentProc!=node_loc) {
2620     IF_GRAN_DEBUG(bq, 
2621                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2622                         node, node_loc, CurrentProc, CurrentTSO->id, 
2623                         // CurrentTSO, where_is(CurrentTSO),
2624                         node->header.gran.procs));
2625     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2626     IF_GRAN_DEBUG(bq, 
2627                   belch("## new bitmask of node %p is %#x",
2628                         node, node->header.gran.procs));
2629     if (RtsFlags.GranFlags.GranSimStats.Global) {
2630       globalGranStats.tot_fake_fetches++;
2631     }
2632   }
2633
2634   bqe = q;
2635   // ToDo: check: ASSERT(CurrentProc==node_loc);
2636   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2637     //next = bqe->link;
2638     /* 
2639        bqe points to the current element in the queue
2640        next points to the next element in the queue
2641     */
2642     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2643     //tso_loc = where_is(tso);
2644     len++;
2645     bqe = unblockOneLocked(bqe, node);
2646   }
2647
2648   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2649      the closure to make room for the anchor of the BQ */
2650   if (bqe!=END_BQ_QUEUE) {
2651     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2652     /*
2653     ASSERT((info_ptr==&RBH_Save_0_info) ||
2654            (info_ptr==&RBH_Save_1_info) ||
2655            (info_ptr==&RBH_Save_2_info));
2656     */
2657     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2658     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2659     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2660
2661     IF_GRAN_DEBUG(bq,
2662                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2663                         node, info_type(node)));
2664   }
2665
2666   /* statistics gathering */
2667   if (RtsFlags.GranFlags.GranSimStats.Global) {
2668     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2669     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2670     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2671     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2672   }
2673   IF_GRAN_DEBUG(bq,
2674                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2675                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2676 }
2677 #elif defined(PAR)
2678 void 
2679 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2680 {
2681   StgBlockingQueueElement *bqe;
2682
2683   ACQUIRE_LOCK(&sched_mutex);
2684
2685   IF_PAR_DEBUG(verbose, 
2686                belch("##-_ AwBQ for node %p on [%x]: ",
2687                      node, mytid));
2688 #ifdef DIST  
2689   //RFP
2690   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2691     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2692     return;
2693   }
2694 #endif
2695   
2696   ASSERT(q == END_BQ_QUEUE ||
2697          get_itbl(q)->type == TSO ||           
2698          get_itbl(q)->type == BLOCKED_FETCH || 
2699          get_itbl(q)->type == CONSTR); 
2700
2701   bqe = q;
2702   while (get_itbl(bqe)->type==TSO || 
2703          get_itbl(bqe)->type==BLOCKED_FETCH) {
2704     bqe = unblockOneLocked(bqe, node);
2705   }
2706   RELEASE_LOCK(&sched_mutex);
2707 }
2708
2709 #else   /* !GRAN && !PAR */
2710 void
2711 awakenBlockedQueue(StgTSO *tso)
2712 {
2713   ACQUIRE_LOCK(&sched_mutex);
2714   while (tso != END_TSO_QUEUE) {
2715     tso = unblockOneLocked(tso);
2716   }
2717   RELEASE_LOCK(&sched_mutex);
2718 }
2719 #endif
2720
2721 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2722 //@subsection Exception Handling Routines
2723
2724 /* ---------------------------------------------------------------------------
2725    Interrupt execution
2726    - usually called inside a signal handler so it mustn't do anything fancy.   
2727    ------------------------------------------------------------------------ */
2728
2729 void
2730 interruptStgRts(void)
2731 {
2732     interrupted    = 1;
2733     context_switch = 1;
2734 }
2735
2736 /* -----------------------------------------------------------------------------
2737    Unblock a thread
2738
2739    This is for use when we raise an exception in another thread, which
2740    may be blocked.
2741    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2742    -------------------------------------------------------------------------- */
2743
2744 #if defined(GRAN) || defined(PAR)
2745 /*
2746   NB: only the type of the blocking queue is different in GranSim and GUM
2747       the operations on the queue-elements are the same
2748       long live polymorphism!
2749 */
2750 static void
2751 unblockThread(StgTSO *tso)
2752 {
2753   StgBlockingQueueElement *t, **last;
2754
2755   ACQUIRE_LOCK(&sched_mutex);
2756   switch (tso->why_blocked) {
2757
2758   case NotBlocked:
2759     return;  /* not blocked */
2760
2761   case BlockedOnMVar:
2762     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2763     {
2764       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2765       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2766
2767       last = (StgBlockingQueueElement **)&mvar->head;
2768       for (t = (StgBlockingQueueElement *)mvar->head; 
2769            t != END_BQ_QUEUE; 
2770            last = &t->link, last_tso = t, t = t->link) {
2771         if (t == (StgBlockingQueueElement *)tso) {
2772           *last = (StgBlockingQueueElement *)tso->link;
2773           if (mvar->tail == tso) {
2774             mvar->tail = (StgTSO *)last_tso;
2775           }
2776           goto done;
2777         }
2778       }
2779       barf("unblockThread (MVAR): TSO not found");
2780     }
2781
2782   case BlockedOnBlackHole:
2783     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2784     {
2785       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2786
2787       last = &bq->blocking_queue;
2788       for (t = bq->blocking_queue; 
2789            t != END_BQ_QUEUE; 
2790            last = &t->link, t = t->link) {
2791         if (t == (StgBlockingQueueElement *)tso) {
2792           *last = (StgBlockingQueueElement *)tso->link;
2793           goto done;
2794         }
2795       }
2796       barf("unblockThread (BLACKHOLE): TSO not found");
2797     }
2798
2799   case BlockedOnException:
2800     {
2801       StgTSO *target  = tso->block_info.tso;
2802
2803       ASSERT(get_itbl(target)->type == TSO);
2804
2805       if (target->what_next == ThreadRelocated) {
2806           target = target->link;
2807           ASSERT(get_itbl(target)->type == TSO);
2808       }
2809
2810       ASSERT(target->blocked_exceptions != NULL);
2811
2812       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2813       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2814            t != END_BQ_QUEUE; 
2815            last = &t->link, t = t->link) {
2816         ASSERT(get_itbl(t)->type == TSO);
2817         if (t == (StgBlockingQueueElement *)tso) {
2818           *last = (StgBlockingQueueElement *)tso->link;
2819           goto done;
2820         }
2821       }
2822       barf("unblockThread (Exception): TSO not found");
2823     }
2824
2825   case BlockedOnRead:
2826   case BlockedOnWrite:
2827     {
2828       /* take TSO off blocked_queue */
2829       StgBlockingQueueElement *prev = NULL;
2830       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2831            prev = t, t = t->link) {
2832         if (t == (StgBlockingQueueElement *)tso) {
2833           if (prev == NULL) {
2834             blocked_queue_hd = (StgTSO *)t->link;
2835             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2836               blocked_queue_tl = END_TSO_QUEUE;
2837             }
2838           } else {
2839             prev->link = t->link;
2840             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2841               blocked_queue_tl = (StgTSO *)prev;
2842             }
2843           }
2844           goto done;
2845         }
2846       }
2847       barf("unblockThread (I/O): TSO not found");
2848     }
2849
2850   case BlockedOnDelay:
2851     {
2852       /* take TSO off sleeping_queue */
2853       StgBlockingQueueElement *prev = NULL;
2854       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2855            prev = t, t = t->link) {
2856         if (t == (StgBlockingQueueElement *)tso) {
2857           if (prev == NULL) {
2858             sleeping_queue = (StgTSO *)t->link;
2859           } else {
2860             prev->link = t->link;
2861           }
2862           goto done;
2863         }
2864       }
2865       barf("unblockThread (I/O): TSO not found");
2866     }
2867
2868   default:
2869     barf("unblockThread");
2870   }
2871
2872  done:
2873   tso->link = END_TSO_QUEUE;
2874   tso->why_blocked = NotBlocked;
2875   tso->block_info.closure = NULL;
2876   PUSH_ON_RUN_QUEUE(tso);
2877   RELEASE_LOCK(&sched_mutex);
2878 }
2879 #else
2880 static void
2881 unblockThread(StgTSO *tso)
2882 {
2883   StgTSO *t, **last;
2884
2885   ACQUIRE_LOCK(&sched_mutex);
2886   switch (tso->why_blocked) {
2887
2888   case NotBlocked:
2889     return;  /* not blocked */
2890
2891   case BlockedOnMVar:
2892     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2893     {
2894       StgTSO *last_tso = END_TSO_QUEUE;
2895       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2896
2897       last = &mvar->head;
2898       for (t = mvar->head; t != END_TSO_QUEUE; 
2899            last = &t->link, last_tso = t, t = t->link) {
2900         if (t == tso) {
2901           *last = tso->link;
2902           if (mvar->tail == tso) {
2903             mvar->tail = last_tso;
2904           }
2905           goto done;
2906         }
2907       }
2908       barf("unblockThread (MVAR): TSO not found");
2909     }
2910
2911   case BlockedOnBlackHole:
2912     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2913     {
2914       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2915
2916       last = &bq->blocking_queue;
2917       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2918            last = &t->link, t = t->link) {
2919         if (t == tso) {
2920           *last = tso->link;
2921           goto done;
2922         }
2923       }
2924       barf("unblockThread (BLACKHOLE): TSO not found");
2925     }
2926
2927   case BlockedOnException:
2928     {
2929       StgTSO *target  = tso->block_info.tso;
2930
2931       ASSERT(get_itbl(target)->type == TSO);
2932
2933       while (target->what_next == ThreadRelocated) {
2934           target = target->link;
2935           ASSERT(get_itbl(target)->type == TSO);
2936       }
2937       
2938       ASSERT(target->blocked_exceptions != NULL);
2939
2940       last = &target->blocked_exceptions;
2941       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2942            last = &t->link, t = t->link) {
2943         ASSERT(get_itbl(t)->type == TSO);
2944         if (t == tso) {
2945           *last = tso->link;
2946           goto done;
2947         }
2948       }
2949       barf("unblockThread (Exception): TSO not found");
2950     }
2951
2952   case BlockedOnRead:
2953   case BlockedOnWrite:
2954     {
2955       StgTSO *prev = NULL;
2956       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2957            prev = t, t = t->link) {
2958         if (t == tso) {
2959           if (prev == NULL) {
2960             blocked_queue_hd = t->link;
2961             if (blocked_queue_tl == t) {
2962               blocked_queue_tl = END_TSO_QUEUE;
2963             }
2964           } else {
2965             prev->link = t->link;
2966             if (blocked_queue_tl == t) {
2967               blocked_queue_tl = prev;
2968             }
2969           }
2970           goto done;
2971         }
2972       }
2973       barf("unblockThread (I/O): TSO not found");
2974     }
2975
2976   case BlockedOnDelay:
2977     {
2978       StgTSO *prev = NULL;
2979       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2980            prev = t, t = t->link) {
2981         if (t == tso) {
2982           if (prev == NULL) {
2983             sleeping_queue = t->link;
2984           } else {
2985             prev->link = t->link;
2986           }
2987           goto done;
2988         }
2989       }
2990       barf("unblockThread (I/O): TSO not found");
2991     }
2992
2993   default:
2994     barf("unblockThread");
2995   }
2996
2997  done:
2998   tso->link = END_TSO_QUEUE;
2999   tso->why_blocked = NotBlocked;
3000   tso->block_info.closure = NULL;
3001   PUSH_ON_RUN_QUEUE(tso);
3002   RELEASE_LOCK(&sched_mutex);
3003 }
3004 #endif
3005
3006 /* -----------------------------------------------------------------------------
3007  * raiseAsync()
3008  *
3009  * The following function implements the magic for raising an
3010  * asynchronous exception in an existing thread.
3011  *
3012  * We first remove the thread from any queue on which it might be
3013  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3014  *
3015  * We strip the stack down to the innermost CATCH_FRAME, building
3016  * thunks in the heap for all the active computations, so they can 
3017  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3018  * an application of the handler to the exception, and push it on
3019  * the top of the stack.
3020  * 
3021  * How exactly do we save all the active computations?  We create an
3022  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3023  * AP_UPDs pushes everything from the corresponding update frame
3024  * upwards onto the stack.  (Actually, it pushes everything up to the
3025  * next update frame plus a pointer to the next AP_UPD object.
3026  * Entering the next AP_UPD object pushes more onto the stack until we
3027  * reach the last AP_UPD object - at which point the stack should look
3028  * exactly as it did when we killed the TSO and we can continue
3029  * execution by entering the closure on top of the stack.
3030  *
3031  * We can also kill a thread entirely - this happens if either (a) the 
3032  * exception passed to raiseAsync is NULL, or (b) there's no
3033  * CATCH_FRAME on the stack.  In either case, we strip the entire
3034  * stack and replace the thread with a zombie.
3035  *
3036  * -------------------------------------------------------------------------- */
3037  
3038 void 
3039 deleteThread(StgTSO *tso)
3040 {
3041   raiseAsync(tso,NULL);
3042 }
3043
3044 void
3045 raiseAsync(StgTSO *tso, StgClosure *exception)
3046 {
3047   StgUpdateFrame* su = tso->su;
3048   StgPtr          sp = tso->sp;
3049   
3050   /* Thread already dead? */
3051   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3052     return;
3053   }
3054
3055   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3056
3057   /* Remove it from any blocking queues */
3058   unblockThread(tso);
3059
3060   /* The stack freezing code assumes there's a closure pointer on
3061    * the top of the stack.  This isn't always the case with compiled
3062    * code, so we have to push a dummy closure on the top which just
3063    * returns to the next return address on the stack.
3064    */
3065   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3066     *(--sp) = (W_)&stg_dummy_ret_closure;
3067   }
3068
3069   while (1) {
3070     nat words = ((P_)su - (P_)sp) - 1;
3071     nat i;
3072     StgAP_UPD * ap;
3073
3074     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3075      * then build the THUNK raise(exception), and leave it on
3076      * top of the CATCH_FRAME ready to enter.
3077      */
3078     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3079       StgClosure *raise;
3080
3081       /* we've got an exception to raise, so let's pass it to the
3082        * handler in this frame.
3083        */
3084       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3085       TICK_ALLOC_SE_THK(1,0);
3086       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3087       raise->payload[0] = exception;
3088
3089       /* throw away the stack from Sp up to the CATCH_FRAME.
3090        */
3091       sp = (P_)su - 1;
3092
3093       /* Put the newly-built THUNK on top of the stack, ready to execute
3094        * when the thread restarts.
3095        */
3096       sp[0] = (W_)raise;
3097       tso->sp = sp;
3098       tso->su = su;
3099       tso->what_next = ThreadEnterGHC;
3100       IF_DEBUG(sanity, checkTSO(tso));
3101       return;
3102     }
3103
3104     /* First build an AP_UPD consisting of the stack chunk above the
3105      * current update frame, with the top word on the stack as the
3106      * fun field.
3107      */
3108     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3109     
3110     ASSERT(words >= 0);
3111     
3112     ap->n_args = words;
3113     ap->fun    = (StgClosure *)sp[0];
3114     sp++;
3115     for(i=0; i < (nat)words; ++i) {
3116       ap->payload[i] = (StgClosure *)*sp++;
3117     }
3118     
3119     switch (get_itbl(su)->type) {
3120       
3121     case UPDATE_FRAME:
3122       {
3123         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3124         TICK_ALLOC_UP_THK(words+1,0);
3125         
3126         IF_DEBUG(scheduler,
3127                  fprintf(stderr,  "scheduler: Updating ");
3128                  printPtr((P_)su->updatee); 
3129                  fprintf(stderr,  " with ");
3130                  printObj((StgClosure *)ap);
3131                  );
3132         
3133         /* Replace the updatee with an indirection - happily
3134          * this will also wake up any threads currently
3135          * waiting on the result.
3136          *
3137          * Warning: if we're in a loop, more than one update frame on
3138          * the stack may point to the same object.  Be careful not to
3139          * overwrite an IND_OLDGEN in this case, because we'll screw
3140          * up the mutable lists.  To be on the safe side, don't
3141          * overwrite any kind of indirection at all.  See also
3142          * threadSqueezeStack in GC.c, where we have to make a similar
3143          * check.
3144          */
3145         if (!closure_IND(su->updatee)) {
3146             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3147         }
3148         su = su->link;
3149         sp += sizeofW(StgUpdateFrame) -1;
3150         sp[0] = (W_)ap; /* push onto stack */
3151         break;
3152       }
3153
3154     case CATCH_FRAME:
3155       {
3156         StgCatchFrame *cf = (StgCatchFrame *)su;
3157         StgClosure* o;
3158         
3159         /* We want a PAP, not an AP_UPD.  Fortunately, the
3160          * layout's the same.
3161          */
3162         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3163         TICK_ALLOC_UPD_PAP(words+1,0);
3164         
3165         /* now build o = FUN(catch,ap,handler) */
3166         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3167         TICK_ALLOC_FUN(2,0);
3168         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3169         o->payload[0] = (StgClosure *)ap;
3170         o->payload[1] = cf->handler;
3171         
3172         IF_DEBUG(scheduler,
3173                  fprintf(stderr,  "scheduler: Built ");
3174                  printObj((StgClosure *)o);
3175                  );
3176         
3177         /* pop the old handler and put o on the stack */
3178         su = cf->link;
3179         sp += sizeofW(StgCatchFrame) - 1;
3180         sp[0] = (W_)o;
3181         break;
3182       }
3183       
3184     case SEQ_FRAME:
3185       {
3186         StgSeqFrame *sf = (StgSeqFrame *)su;
3187         StgClosure* o;
3188         
3189         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3190         TICK_ALLOC_UPD_PAP(words+1,0);
3191         
3192         /* now build o = FUN(seq,ap) */
3193         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3194         TICK_ALLOC_SE_THK(1,0);
3195         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3196         o->payload[0] = (StgClosure *)ap;
3197         
3198         IF_DEBUG(scheduler,
3199                  fprintf(stderr,  "scheduler: Built ");
3200                  printObj((StgClosure *)o);
3201                  );
3202         
3203         /* pop the old handler and put o on the stack */
3204         su = sf->link;
3205         sp += sizeofW(StgSeqFrame) - 1;
3206         sp[0] = (W_)o;
3207         break;
3208       }
3209       
3210     case STOP_FRAME:
3211       /* We've stripped the entire stack, the thread is now dead. */
3212       sp += sizeofW(StgStopFrame) - 1;
3213       sp[0] = (W_)exception;    /* save the exception */
3214       tso->what_next = ThreadKilled;
3215       tso->su = (StgUpdateFrame *)(sp+1);
3216       tso->sp = sp;
3217       return;
3218
3219     default:
3220       barf("raiseAsync");
3221     }
3222   }
3223   barf("raiseAsync");
3224 }
3225
3226 /* -----------------------------------------------------------------------------
3227    resurrectThreads is called after garbage collection on the list of
3228    threads found to be garbage.  Each of these threads will be woken
3229    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3230    on an MVar, or NonTermination if the thread was blocked on a Black
3231    Hole.
3232    -------------------------------------------------------------------------- */
3233
3234 void
3235 resurrectThreads( StgTSO *threads )
3236 {
3237   StgTSO *tso, *next;
3238
3239   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3240     next = tso->global_link;
3241     tso->global_link = all_threads;
3242     all_threads = tso;
3243     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3244
3245     switch (tso->why_blocked) {
3246     case BlockedOnMVar:
3247     case BlockedOnException:
3248       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3249       break;
3250     case BlockedOnBlackHole:
3251       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3252       break;
3253     case NotBlocked:
3254       /* This might happen if the thread was blocked on a black hole
3255        * belonging to a thread that we've just woken up (raiseAsync
3256        * can wake up threads, remember...).
3257        */
3258       continue;
3259     default:
3260       barf("resurrectThreads: thread blocked in a strange way");
3261     }
3262   }
3263 }
3264
3265 /* -----------------------------------------------------------------------------
3266  * Blackhole detection: if we reach a deadlock, test whether any
3267  * threads are blocked on themselves.  Any threads which are found to
3268  * be self-blocked get sent a NonTermination exception.
3269  *
3270  * This is only done in a deadlock situation in order to avoid
3271  * performance overhead in the normal case.
3272  * -------------------------------------------------------------------------- */
3273
3274 static void
3275 detectBlackHoles( void )
3276 {
3277     StgTSO *t = all_threads;
3278     StgUpdateFrame *frame;
3279     StgClosure *blocked_on;
3280
3281     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3282
3283         while (t->what_next == ThreadRelocated) {
3284             t = t->link;
3285             ASSERT(get_itbl(t)->type == TSO);
3286         }
3287       
3288         if (t->why_blocked != BlockedOnBlackHole) {
3289             continue;
3290         }
3291
3292         blocked_on = t->block_info.closure;
3293
3294         for (frame = t->su; ; frame = frame->link) {
3295             switch (get_itbl(frame)->type) {
3296
3297             case UPDATE_FRAME:
3298                 if (frame->updatee == blocked_on) {
3299                     /* We are blocking on one of our own computations, so
3300                      * send this thread the NonTermination exception.  
3301                      */
3302                     IF_DEBUG(scheduler, 
3303                              sched_belch("thread %d is blocked on itself", t->id));
3304                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3305                     goto done;
3306                 }
3307                 else {
3308                     continue;
3309                 }
3310
3311             case CATCH_FRAME:
3312             case SEQ_FRAME:
3313                 continue;
3314                 
3315             case STOP_FRAME:
3316                 break;
3317             }
3318             break;
3319         }
3320
3321     done: ;
3322     }   
3323 }
3324
3325 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3326 //@subsection Debugging Routines
3327
3328 /* -----------------------------------------------------------------------------
3329    Debugging: why is a thread blocked
3330    -------------------------------------------------------------------------- */
3331
3332 #ifdef DEBUG
3333
3334 void
3335 printThreadBlockage(StgTSO *tso)
3336 {
3337   switch (tso->why_blocked) {
3338   case BlockedOnRead:
3339     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3340     break;
3341   case BlockedOnWrite:
3342     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3343     break;
3344   case BlockedOnDelay:
3345     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3346     break;
3347   case BlockedOnMVar:
3348     fprintf(stderr,"is blocked on an MVar");
3349     break;
3350   case BlockedOnException:
3351     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3352             tso->block_info.tso->id);
3353     break;
3354   case BlockedOnBlackHole:
3355     fprintf(stderr,"is blocked on a black hole");
3356     break;
3357   case NotBlocked:
3358     fprintf(stderr,"is not blocked");
3359     break;
3360 #if defined(PAR)
3361   case BlockedOnGA:
3362     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3363             tso->block_info.closure, info_type(tso->block_info.closure));
3364     break;
3365   case BlockedOnGA_NoSend:
3366     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3367             tso->block_info.closure, info_type(tso->block_info.closure));
3368     break;
3369 #endif
3370 #if defined(RTS_SUPPORTS_THREADS)
3371   case BlockedOnCCall:
3372     fprintf(stderr,"is blocked on an external call");
3373     break;
3374 #endif
3375   default:
3376     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3377          tso->why_blocked, tso->id, tso);
3378   }
3379 }
3380
3381 void
3382 printThreadStatus(StgTSO *tso)
3383 {
3384   switch (tso->what_next) {
3385   case ThreadKilled:
3386     fprintf(stderr,"has been killed");
3387     break;
3388   case ThreadComplete:
3389     fprintf(stderr,"has completed");
3390     break;
3391   default:
3392     printThreadBlockage(tso);
3393   }
3394 }
3395
3396 void
3397 printAllThreads(void)
3398 {
3399   StgTSO *t;
3400
3401 # if defined(GRAN)
3402   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3403   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3404                        time_string, rtsFalse/*no commas!*/);
3405
3406   sched_belch("all threads at [%s]:", time_string);
3407 # elif defined(PAR)
3408   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3409   ullong_format_string(CURRENT_TIME,
3410                        time_string, rtsFalse/*no commas!*/);
3411
3412   sched_belch("all threads at [%s]:", time_string);
3413 # else
3414   sched_belch("all threads:");
3415 # endif
3416
3417   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3418     fprintf(stderr, "\tthread %d ", t->id);
3419     printThreadStatus(t);
3420     fprintf(stderr,"\n");
3421   }
3422 }
3423     
3424 /* 
3425    Print a whole blocking queue attached to node (debugging only).
3426 */
3427 //@cindex print_bq
3428 # if defined(PAR)
3429 void 
3430 print_bq (StgClosure *node)
3431 {
3432   StgBlockingQueueElement *bqe;
3433   StgTSO *tso;
3434   rtsBool end;
3435
3436   fprintf(stderr,"## BQ of closure %p (%s): ",
3437           node, info_type(node));
3438
3439   /* should cover all closures that may have a blocking queue */
3440   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3441          get_itbl(node)->type == FETCH_ME_BQ ||
3442          get_itbl(node)->type == RBH ||
3443          get_itbl(node)->type == MVAR);
3444     
3445   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3446
3447   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3448 }
3449
3450 /* 
3451    Print a whole blocking queue starting with the element bqe.
3452 */
3453 void 
3454 print_bqe (StgBlockingQueueElement *bqe)
3455 {
3456   rtsBool end;
3457
3458   /* 
3459      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3460   */
3461   for (end = (bqe==END_BQ_QUEUE);
3462        !end; // iterate until bqe points to a CONSTR
3463        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3464        bqe = end ? END_BQ_QUEUE : bqe->link) {
3465     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3466     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3467     /* types of closures that may appear in a blocking queue */
3468     ASSERT(get_itbl(bqe)->type == TSO ||           
3469            get_itbl(bqe)->type == BLOCKED_FETCH || 
3470            get_itbl(bqe)->type == CONSTR); 
3471     /* only BQs of an RBH end with an RBH_Save closure */
3472     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3473
3474     switch (get_itbl(bqe)->type) {
3475     case TSO:
3476       fprintf(stderr," TSO %u (%x),",
3477               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3478       break;
3479     case BLOCKED_FETCH:
3480       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3481               ((StgBlockedFetch *)bqe)->node, 
3482               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3483               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3484               ((StgBlockedFetch *)bqe)->ga.weight);
3485       break;
3486     case CONSTR:
3487       fprintf(stderr," %s (IP %p),",
3488               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3489                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3490                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3491                "RBH_Save_?"), get_itbl(bqe));
3492       break;
3493     default:
3494       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3495            info_type((StgClosure *)bqe)); // , node, info_type(node));
3496       break;
3497     }
3498   } /* for */
3499   fputc('\n', stderr);
3500 }
3501 # elif defined(GRAN)
3502 void 
3503 print_bq (StgClosure *node)
3504 {
3505   StgBlockingQueueElement *bqe;
3506   PEs node_loc, tso_loc;
3507   rtsBool end;
3508
3509   /* should cover all closures that may have a blocking queue */
3510   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3511          get_itbl(node)->type == FETCH_ME_BQ ||
3512          get_itbl(node)->type == RBH);
3513     
3514   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3515   node_loc = where_is(node);
3516
3517   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3518           node, info_type(node), node_loc);
3519
3520   /* 
3521      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3522   */
3523   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3524        !end; // iterate until bqe points to a CONSTR
3525        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3526     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3527     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3528     /* types of closures that may appear in a blocking queue */
3529     ASSERT(get_itbl(bqe)->type == TSO ||           
3530            get_itbl(bqe)->type == CONSTR); 
3531     /* only BQs of an RBH end with an RBH_Save closure */
3532     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3533
3534     tso_loc = where_is((StgClosure *)bqe);
3535     switch (get_itbl(bqe)->type) {
3536     case TSO:
3537       fprintf(stderr," TSO %d (%p) on [PE %d],",
3538               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3539       break;
3540     case CONSTR:
3541       fprintf(stderr," %s (IP %p),",
3542               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3543                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3544                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3545                "RBH_Save_?"), get_itbl(bqe));
3546       break;
3547     default:
3548       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3549            info_type((StgClosure *)bqe), node, info_type(node));
3550       break;
3551     }
3552   } /* for */
3553   fputc('\n', stderr);
3554 }
3555 #else
3556 /* 
3557    Nice and easy: only TSOs on the blocking queue
3558 */
3559 void 
3560 print_bq (StgClosure *node)
3561 {
3562   StgTSO *tso;
3563
3564   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3565   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3566        tso != END_TSO_QUEUE; 
3567        tso=tso->link) {
3568     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3569     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3570     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3571   }
3572   fputc('\n', stderr);
3573 }
3574 # endif
3575
3576 #if defined(PAR)
3577 static nat
3578 run_queue_len(void)
3579 {
3580   nat i;
3581   StgTSO *tso;
3582
3583   for (i=0, tso=run_queue_hd; 
3584        tso != END_TSO_QUEUE;
3585        i++, tso=tso->link)
3586     /* nothing */
3587
3588   return i;
3589 }
3590 #endif
3591
3592 static void
3593 sched_belch(char *s, ...)
3594 {
3595   va_list ap;
3596   va_start(ap,s);
3597 #ifdef SMP
3598   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3599 #elif defined(PAR)
3600   fprintf(stderr, "== ");
3601 #else
3602   fprintf(stderr, "scheduler: ");
3603 #endif
3604   vfprintf(stderr, s, ap);
3605   fprintf(stderr, "\n");
3606 }
3607
3608 #endif /* DEBUG */
3609
3610
3611 //@node Index,  , Debugging Routines, Main scheduling code
3612 //@subsection Index
3613
3614 //@index
3615 //* StgMainThread::  @cindex\s-+StgMainThread
3616 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3617 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3618 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3619 //* context_switch::  @cindex\s-+context_switch
3620 //* createThread::  @cindex\s-+createThread
3621 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3622 //* initScheduler::  @cindex\s-+initScheduler
3623 //* interrupted::  @cindex\s-+interrupted
3624 //* next_thread_id::  @cindex\s-+next_thread_id
3625 //* print_bq::  @cindex\s-+print_bq
3626 //* run_queue_hd::  @cindex\s-+run_queue_hd
3627 //* run_queue_tl::  @cindex\s-+run_queue_tl
3628 //* sched_mutex::  @cindex\s-+sched_mutex
3629 //* schedule::  @cindex\s-+schedule
3630 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3631 //* term_mutex::  @cindex\s-+term_mutex
3632 //@end index