[project @ 2002-02-13 08:48:06 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.122 2002/02/13 08:48:06 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #include <stdarg.h>
118
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
121
122 /* Main threads:
123  *
124  * These are the threads which clients have requested that we run.  
125  *
126  * In a 'threaded' build, we might have several concurrent clients all
127  * waiting for results, and each one will wait on a condition variable
128  * until the result is available.
129  *
130  * In non-SMP, clients are strictly nested: the first client calls
131  * into the RTS, which might call out again to C with a _ccall_GC, and
132  * eventually re-enter the RTS.
133  *
134  * Main threads information is kept in a linked list:
135  */
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
138   StgTSO *         tso;
139   SchedulerStatus  stat;
140   StgClosure **    ret;
141 #if defined(RTS_SUPPORTS_THREADS)
142   Condition        wakeup;
143 #endif
144   struct StgMainThread_ *link;
145 } StgMainThread;
146
147 /* Main thread queue.
148  * Locks required: sched_mutex.
149  */
150 static StgMainThread *main_threads;
151
152 /* Thread queues.
153  * Locks required: sched_mutex.
154  */
155 #if defined(GRAN)
156
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
159
160 /* 
161    In GranSim we have a runnable and a blocked queue for each processor.
162    In order to minimise code changes new arrays run_queue_hds/tls
163    are created. run_queue_hd is then a short cut (macro) for
164    run_queue_hds[CurrentProc] (see GranSim.h).
165    -- HWL
166 */
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171    the std RTS (i.e. we are cheating). However, we don't use this list in
172    the GranSim specific code at the moment (so we are only potentially
173    cheating).  */
174
175 #else /* !GRAN */
176
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
180
181 #endif
182
183 /* Linked list of all threads.
184  * Used for detecting garbage collected threads.
185  */
186 StgTSO *all_threads;
187
188 /* When a thread performs a safe C call (_ccall_GC, using old
189  * terminology), it gets put on the suspended_ccalling_threads
190  * list. Used by the garbage collector.
191  */
192 static StgTSO *suspended_ccalling_threads;
193
194 static StgTSO *threadStackOverflow(StgTSO *tso);
195
196 /* KH: The following two flags are shared memory locations.  There is no need
197        to lock them, since they are only unset at the end of a scheduler
198        operation.
199 */
200
201 /* flag set by signal handler to precipitate a context switch */
202 //@cindex context_switch
203 nat context_switch;
204
205 /* if this flag is set as well, give up execution */
206 //@cindex interrupted
207 rtsBool interrupted;
208
209 /* Next thread ID to allocate.
210  * Locks required: sched_mutex
211  */
212 //@cindex next_thread_id
213 StgThreadID next_thread_id = 1;
214
215 /*
216  * Pointers to the state of the current thread.
217  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
218  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
219  */
220  
221 /* The smallest stack size that makes any sense is:
222  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
223  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
224  *  + 1                       (the realworld token for an IO thread)
225  *  + 1                       (the closure to enter)
226  *
227  * A thread with this stack will bomb immediately with a stack
228  * overflow, which will increase its stack size.  
229  */
230
231 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
232
233
234 #if defined(GRAN)
235 StgTSO *CurrentTSO;
236 #endif
237
238 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
239  *  exists - earlier gccs apparently didn't.
240  *  -= chak
241  */
242 StgTSO dummy_tso;
243
244 rtsBool ready_to_gc;
245
246 void            addToBlockedQueue ( StgTSO *tso );
247
248 static void     schedule          ( void );
249        void     interruptStgRts   ( void );
250 #if defined(GRAN)
251 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
252 #else
253 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
254 #endif
255
256 static void     detectBlackHoles  ( void );
257
258 #ifdef DEBUG
259 static void sched_belch(char *s, ...);
260 #endif
261
262 #if defined(RTS_SUPPORTS_THREADS)
263 /* ToDo: carefully document the invariants that go together
264  *       with these synchronisation objects.
265  */
266 Mutex     sched_mutex       = INIT_MUTEX_VAR;
267 Mutex     term_mutex        = INIT_MUTEX_VAR;
268
269
270 /*
271  * When a native thread has completed executing an external
272  * call, it needs to communicate the result back to the
273  * (Haskell) thread that made the call. Do this as follows:
274  *
275  *  - in resumeThread(), the thread increments the counter
276  *    rts_n_returning_workers, and then blocks waiting on the
277  *    condition returning_worker_cond.
278  *  - upon entry to the scheduler, a worker/task checks 
279  *    rts_n_returning_workers. If it is > 0, worker threads
280  *    are waiting to return, so it gives up its capability
281  *    to let a worker deposit its result.
282  *  - the worker thread that gave up its capability then tries
283  *    to re-grab a capability and re-enter the Scheduler.
284  */
285
286
287 /* thread_ready_cond: when signalled, a thread has become runnable for a
288  * task to execute.
289  *
290  * In the non-SMP case, it also implies that the thread that is woken up has
291  * exclusive access to the RTS and all its data structures (that are not
292  * under sched_mutex's control).
293  *
294  * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold.
295  *
296  */
297 Condition thread_ready_cond = INIT_COND_VAR;
298 #if 0
299 /* For documentation purposes only */
300 #define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE())
301 #endif
302
303 /*
304  * To be able to make an informed decision about whether or not 
305  * to create a new task when making an external call, keep track of
306  * the number of tasks currently blocked waiting on thread_ready_cond.
307  * (if > 0 => no need for a new task, just unblock an existing one).
308  */
309 nat rts_n_waiting_tasks = 0;
310
311 /* returning_worker_cond: when a worker thread returns from executing an
312  * external call, it needs to wait for an RTS Capability before passing
313  * on the result of the call to the Haskell thread that made it.
314  * 
315  * returning_worker_cond is signalled in Capability.releaseCapability().
316  *
317  */
318 Condition returning_worker_cond = INIT_COND_VAR;
319
320 /*
321  * To avoid starvation of threads blocked on worker_thread_cond,
322  * the task(s) that enter the Scheduler will check to see whether
323  * there are one or more worker threads blocked waiting on
324  * returning_worker_cond.
325  *
326  * Locks needed: sched_mutex
327  */
328 nat rts_n_waiting_workers = 0;
329
330
331 # if defined(SMP)
332 static Condition gc_pending_cond = INIT_COND_VAR;
333 nat await_death;
334 # endif
335
336 #endif /* RTS_SUPPORTS_THREADS */
337
338 #if defined(PAR)
339 StgTSO *LastTSO;
340 rtsTime TimeOfLastYield;
341 rtsBool emitSchedule = rtsTrue;
342 #endif
343
344 #if DEBUG
345 char *whatNext_strs[] = {
346   "ThreadEnterGHC",
347   "ThreadRunGHC",
348   "ThreadEnterInterp",
349   "ThreadKilled",
350   "ThreadComplete"
351 };
352
353 char *threadReturnCode_strs[] = {
354   "HeapOverflow",                       /* might also be StackOverflow */
355   "StackOverflow",
356   "ThreadYielding",
357   "ThreadBlocked",
358   "ThreadFinished"
359 };
360 #endif
361
362 #if defined(PAR)
363 StgTSO * createSparkThread(rtsSpark spark);
364 StgTSO * activateSpark (rtsSpark spark);  
365 #endif
366
367 /*
368  * The thread state for the main thread.
369 // ToDo: check whether not needed any more
370 StgTSO   *MainTSO;
371  */
372
373 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
374 static void taskStart(void);
375 static void
376 taskStart(void)
377 {
378   schedule();
379 }
380 #endif
381
382
383
384
385 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
386 //@subsection Main scheduling loop
387
388 /* ---------------------------------------------------------------------------
389    Main scheduling loop.
390
391    We use round-robin scheduling, each thread returning to the
392    scheduler loop when one of these conditions is detected:
393
394       * out of heap space
395       * timer expires (thread yields)
396       * thread blocks
397       * thread ends
398       * stack overflow
399
400    Locking notes:  we acquire the scheduler lock once at the beginning
401    of the scheduler loop, and release it when
402     
403       * running a thread, or
404       * waiting for work, or
405       * waiting for a GC to complete.
406
407    GRAN version:
408      In a GranSim setup this loop iterates over the global event queue.
409      This revolves around the global event queue, which determines what 
410      to do next. Therefore, it's more complicated than either the 
411      concurrent or the parallel (GUM) setup.
412
413    GUM version:
414      GUM iterates over incoming messages.
415      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
416      and sends out a fish whenever it has nothing to do; in-between
417      doing the actual reductions (shared code below) it processes the
418      incoming messages and deals with delayed operations 
419      (see PendingFetches).
420      This is not the ugliest code you could imagine, but it's bloody close.
421
422    ------------------------------------------------------------------------ */
423 //@cindex schedule
424 static void
425 schedule( void )
426 {
427   StgTSO *t;
428   Capability *cap;
429   StgThreadReturnCode ret;
430 #if defined(GRAN)
431   rtsEvent *event;
432 #elif defined(PAR)
433   StgSparkPool *pool;
434   rtsSpark spark;
435   StgTSO *tso;
436   GlobalTaskId pe;
437   rtsBool receivedFinish = rtsFalse;
438 # if defined(DEBUG)
439   nat tp_size, sp_size; // stats only
440 # endif
441 #endif
442   rtsBool was_interrupted = rtsFalse;
443
444 #if defined(RTS_SUPPORTS_THREADS)
445 schedule_start:
446 #endif
447   
448 #if defined(RTS_SUPPORTS_THREADS)
449   ACQUIRE_LOCK(&sched_mutex);
450 #endif
451  
452 #if defined(RTS_SUPPORTS_THREADS)
453   /* ToDo: consider SMP support */
454   if ( rts_n_waiting_workers > 0 && noCapabilities() ) {
455     /* (At least) one native thread is waiting to
456      * deposit the result of an external call. So,
457      * be nice and hand over our capability.
458      */
459     IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (waiting workers: %d)\n", osThreadId(), rts_n_waiting_workers));
460     releaseCapability(cap);
461     RELEASE_LOCK(&sched_mutex);
462
463     yieldThread();
464     goto schedule_start;
465   }
466 #endif
467
468 #if defined(RTS_SUPPORTS_THREADS)
469   while ( noCapabilities() ) {
470     rts_n_waiting_tasks++;
471     waitCondition(&thread_ready_cond, &sched_mutex);
472     rts_n_waiting_tasks--;
473   }
474 #endif
475
476 #if defined(GRAN)
477
478   /* set up first event to get things going */
479   /* ToDo: assign costs for system setup and init MainTSO ! */
480   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
481             ContinueThread, 
482             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
483
484   IF_DEBUG(gran,
485            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
486            G_TSO(CurrentTSO, 5));
487
488   if (RtsFlags.GranFlags.Light) {
489     /* Save current time; GranSim Light only */
490     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
491   }      
492
493   event = get_next_event();
494
495   while (event!=(rtsEvent*)NULL) {
496     /* Choose the processor with the next event */
497     CurrentProc = event->proc;
498     CurrentTSO = event->tso;
499
500 #elif defined(PAR)
501
502   while (!receivedFinish) {    /* set by processMessages */
503                                /* when receiving PP_FINISH message         */ 
504 #else
505
506   while (1) {
507
508 #endif
509
510     IF_DEBUG(scheduler, printAllThreads());
511
512     /* If we're interrupted (the user pressed ^C, or some other
513      * termination condition occurred), kill all the currently running
514      * threads.
515      */
516     if (interrupted) {
517       IF_DEBUG(scheduler, sched_belch("interrupted"));
518       deleteAllThreads();
519       interrupted = rtsFalse;
520       was_interrupted = rtsTrue;
521     }
522
523     /* Go through the list of main threads and wake up any
524      * clients whose computations have finished.  ToDo: this
525      * should be done more efficiently without a linear scan
526      * of the main threads list, somehow...
527      */
528 #if defined(RTS_SUPPORTS_THREADS)
529     { 
530       StgMainThread *m, **prev;
531       prev = &main_threads;
532       for (m = main_threads; m != NULL; m = m->link) {
533         switch (m->tso->what_next) {
534         case ThreadComplete:
535           if (m->ret) {
536             *(m->ret) = (StgClosure *)m->tso->sp[0];
537           }
538           *prev = m->link;
539           m->stat = Success;
540           broadcastCondition(&m->wakeup);
541           break;
542         case ThreadKilled:
543           if (m->ret) *(m->ret) = NULL;
544           *prev = m->link;
545           if (was_interrupted) {
546             m->stat = Interrupted;
547           } else {
548             m->stat = Killed;
549           }
550           broadcastCondition(&m->wakeup);
551           break;
552         default:
553           break;
554         }
555       }
556     }
557
558 #else /* not threaded */
559
560 # if defined(PAR)
561     /* in GUM do this only on the Main PE */
562     if (IAmMainThread)
563 # endif
564     /* If our main thread has finished or been killed, return.
565      */
566     {
567       StgMainThread *m = main_threads;
568       if (m->tso->what_next == ThreadComplete
569           || m->tso->what_next == ThreadKilled) {
570         main_threads = main_threads->link;
571         if (m->tso->what_next == ThreadComplete) {
572           /* we finished successfully, fill in the return value */
573           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
574           m->stat = Success;
575           return;
576         } else {
577           if (m->ret) { *(m->ret) = NULL; };
578           if (was_interrupted) {
579             m->stat = Interrupted;
580           } else {
581             m->stat = Killed;
582           }
583           return;
584         }
585       }
586     }
587 #endif
588
589     /* Top up the run queue from our spark pool.  We try to make the
590      * number of threads in the run queue equal to the number of
591      * free capabilities.
592      *
593      * Disable spark support in SMP for now, non-essential & requires
594      * a little bit of work to make it compile cleanly. -- sof 1/02.
595      */
596 #if 0 /* defined(SMP) */
597     {
598       nat n = getFreeCapabilities();
599       StgTSO *tso = run_queue_hd;
600
601       /* Count the run queue */
602       while (n > 0 && tso != END_TSO_QUEUE) {
603         tso = tso->link;
604         n--;
605       }
606
607       for (; n > 0; n--) {
608         StgClosure *spark;
609         spark = findSpark(rtsFalse);
610         if (spark == NULL) {
611           break; /* no more sparks in the pool */
612         } else {
613           /* I'd prefer this to be done in activateSpark -- HWL */
614           /* tricky - it needs to hold the scheduler lock and
615            * not try to re-acquire it -- SDM */
616           createSparkThread(spark);       
617           IF_DEBUG(scheduler,
618                    sched_belch("==^^ turning spark of closure %p into a thread",
619                                (StgClosure *)spark));
620         }
621       }
622       /* We need to wake up the other tasks if we just created some
623        * work for them.
624        */
625       if (getFreeCapabilities() - n > 1) {
626           signalCondition( &thread_ready_cond );
627       }
628     }
629 #endif // SMP
630
631     /* check for signals each time around the scheduler */
632 #ifndef mingw32_TARGET_OS
633     if (signals_pending()) {
634       startSignalHandlers();
635     }
636 #endif
637
638     /* Check whether any waiting threads need to be woken up.  If the
639      * run queue is empty, and there are no other tasks running, we
640      * can wait indefinitely for something to happen.
641      * ToDo: what if another client comes along & requests another
642      * main thread?
643      */
644     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
645       awaitEvent( EMPTY_RUN_QUEUE()
646 #if defined(SMP)
647         && allFreeCapabilities()
648 #endif
649         );
650     }
651     /* we can be interrupted while waiting for I/O... */
652     if (interrupted) continue;
653
654     /* 
655      * Detect deadlock: when we have no threads to run, there are no
656      * threads waiting on I/O or sleeping, and all the other tasks are
657      * waiting for work, we must have a deadlock of some description.
658      *
659      * We first try to find threads blocked on themselves (ie. black
660      * holes), and generate NonTermination exceptions where necessary.
661      *
662      * If no threads are black holed, we have a deadlock situation, so
663      * inform all the main threads.
664      */
665 #ifndef PAR
666     if (   EMPTY_RUN_QUEUE()
667         && EMPTY_QUEUE(blocked_queue_hd)
668         && EMPTY_QUEUE(sleeping_queue)
669 #if defined(RTS_SUPPORTS_THREADS)
670         && EMPTY_QUEUE(suspended_ccalling_threads)
671 #endif
672 #ifdef SMP
673         && allFreeCapabilities()
674 #endif
675         )
676     {
677         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
678 #if defined(THREADED_RTS)
679         /* and SMP mode ..? */
680         releaseCapability(cap);
681 #endif
682         RELEASE_LOCK(&sched_mutex);
683         GarbageCollect(GetRoots,rtsTrue);
684         ACQUIRE_LOCK(&sched_mutex);
685         if (   EMPTY_QUEUE(blocked_queue_hd)
686             && EMPTY_RUN_QUEUE()
687             && EMPTY_QUEUE(sleeping_queue) ) {
688
689             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
690             detectBlackHoles();
691
692             /* No black holes, so probably a real deadlock.  Send the
693              * current main thread the Deadlock exception (or in the SMP
694              * build, send *all* main threads the deadlock exception,
695              * since none of them can make progress).
696              */
697             if ( EMPTY_RUN_QUEUE() ) {
698                 StgMainThread *m;
699 #if defined(RTS_SUPPORTS_THREADS)
700                 for (m = main_threads; m != NULL; m = m->link) {
701                     switch (m->tso->why_blocked) {
702                     case BlockedOnBlackHole:
703                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
704                         break;
705                     case BlockedOnException:
706                     case BlockedOnMVar:
707                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
708                         break;
709                     default:
710                         barf("deadlock: main thread blocked in a strange way");
711                     }
712                 }
713 #else
714                 m = main_threads;
715                 switch (m->tso->why_blocked) {
716                 case BlockedOnBlackHole:
717                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
718                     break;
719                 case BlockedOnException:
720                 case BlockedOnMVar:
721                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
722                     break;
723                 default:
724                     barf("deadlock: main thread blocked in a strange way");
725                 }
726 #endif
727             }
728 #if defined(RTS_SUPPORTS_THREADS)
729             /* ToDo: revisit conditions (and mechanism) for shutting
730                down a multi-threaded world  */
731             if ( EMPTY_RUN_QUEUE() ) {
732               IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
733               shutdownHaskellAndExit(0);
734             
735             }
736 #endif
737             ASSERT( !EMPTY_RUN_QUEUE() );
738         }
739     }
740 #elif defined(PAR)
741     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
742 #endif
743
744 #if defined(SMP)
745     /* If there's a GC pending, don't do anything until it has
746      * completed.
747      */
748     if (ready_to_gc) {
749       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
750       waitCondition( &gc_pending_cond, &sched_mutex );
751     }
752 #endif    
753
754 #if defined(RTS_SUPPORTS_THREADS)
755     /* block until we've got a thread on the run queue and a free
756      * capability.
757      *
758      */
759     if ( EMPTY_RUN_QUEUE() ) {
760       /* Give up our capability */
761       releaseCapability(cap);
762       while ( noCapabilities() || EMPTY_RUN_QUEUE() ) {
763         IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
764         rts_n_waiting_tasks++;
765         waitCondition( &thread_ready_cond, &sched_mutex );
766         rts_n_waiting_tasks--;
767         IF_DEBUG(scheduler, sched_belch("thread %d: work now available %d %d", osThreadId(), getFreeCapabilities(),EMPTY_RUN_QUEUE()));
768       }
769     }
770 #endif
771
772 #if defined(GRAN)
773
774     if (RtsFlags.GranFlags.Light)
775       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
776
777     /* adjust time based on time-stamp */
778     if (event->time > CurrentTime[CurrentProc] &&
779         event->evttype != ContinueThread)
780       CurrentTime[CurrentProc] = event->time;
781     
782     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
783     if (!RtsFlags.GranFlags.Light)
784       handleIdlePEs();
785
786     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
787
788     /* main event dispatcher in GranSim */
789     switch (event->evttype) {
790       /* Should just be continuing execution */
791     case ContinueThread:
792       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
793       /* ToDo: check assertion
794       ASSERT(run_queue_hd != (StgTSO*)NULL &&
795              run_queue_hd != END_TSO_QUEUE);
796       */
797       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
798       if (!RtsFlags.GranFlags.DoAsyncFetch &&
799           procStatus[CurrentProc]==Fetching) {
800         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
801               CurrentTSO->id, CurrentTSO, CurrentProc);
802         goto next_thread;
803       } 
804       /* Ignore ContinueThreads for completed threads */
805       if (CurrentTSO->what_next == ThreadComplete) {
806         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
807               CurrentTSO->id, CurrentTSO, CurrentProc);
808         goto next_thread;
809       } 
810       /* Ignore ContinueThreads for threads that are being migrated */
811       if (PROCS(CurrentTSO)==Nowhere) { 
812         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
813               CurrentTSO->id, CurrentTSO, CurrentProc);
814         goto next_thread;
815       }
816       /* The thread should be at the beginning of the run queue */
817       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
818         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
819               CurrentTSO->id, CurrentTSO, CurrentProc);
820         break; // run the thread anyway
821       }
822       /*
823       new_event(proc, proc, CurrentTime[proc],
824                 FindWork,
825                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
826       goto next_thread; 
827       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
828       break; // now actually run the thread; DaH Qu'vam yImuHbej 
829
830     case FetchNode:
831       do_the_fetchnode(event);
832       goto next_thread;             /* handle next event in event queue  */
833       
834     case GlobalBlock:
835       do_the_globalblock(event);
836       goto next_thread;             /* handle next event in event queue  */
837       
838     case FetchReply:
839       do_the_fetchreply(event);
840       goto next_thread;             /* handle next event in event queue  */
841       
842     case UnblockThread:   /* Move from the blocked queue to the tail of */
843       do_the_unblock(event);
844       goto next_thread;             /* handle next event in event queue  */
845       
846     case ResumeThread:  /* Move from the blocked queue to the tail of */
847       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
848       event->tso->gran.blocktime += 
849         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
850       do_the_startthread(event);
851       goto next_thread;             /* handle next event in event queue  */
852       
853     case StartThread:
854       do_the_startthread(event);
855       goto next_thread;             /* handle next event in event queue  */
856       
857     case MoveThread:
858       do_the_movethread(event);
859       goto next_thread;             /* handle next event in event queue  */
860       
861     case MoveSpark:
862       do_the_movespark(event);
863       goto next_thread;             /* handle next event in event queue  */
864       
865     case FindWork:
866       do_the_findwork(event);
867       goto next_thread;             /* handle next event in event queue  */
868       
869     default:
870       barf("Illegal event type %u\n", event->evttype);
871     }  /* switch */
872     
873     /* This point was scheduler_loop in the old RTS */
874
875     IF_DEBUG(gran, belch("GRAN: after main switch"));
876
877     TimeOfLastEvent = CurrentTime[CurrentProc];
878     TimeOfNextEvent = get_time_of_next_event();
879     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
880     // CurrentTSO = ThreadQueueHd;
881
882     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
883                          TimeOfNextEvent));
884
885     if (RtsFlags.GranFlags.Light) 
886       GranSimLight_leave_system(event, &ActiveTSO); 
887
888     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
889
890     IF_DEBUG(gran, 
891              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
892
893     /* in a GranSim setup the TSO stays on the run queue */
894     t = CurrentTSO;
895     /* Take a thread from the run queue. */
896     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
897
898     IF_DEBUG(gran, 
899              fprintf(stderr, "GRAN: About to run current thread, which is\n");
900              G_TSO(t,5));
901
902     context_switch = 0; // turned on via GranYield, checking events and time slice
903
904     IF_DEBUG(gran, 
905              DumpGranEvent(GR_SCHEDULE, t));
906
907     procStatus[CurrentProc] = Busy;
908
909 #elif defined(PAR)
910     if (PendingFetches != END_BF_QUEUE) {
911         processFetches();
912     }
913
914     /* ToDo: phps merge with spark activation above */
915     /* check whether we have local work and send requests if we have none */
916     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
917       /* :-[  no local threads => look out for local sparks */
918       /* the spark pool for the current PE */
919       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
920       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
921           pool->hd < pool->tl) {
922         /* 
923          * ToDo: add GC code check that we really have enough heap afterwards!!
924          * Old comment:
925          * If we're here (no runnable threads) and we have pending
926          * sparks, we must have a space problem.  Get enough space
927          * to turn one of those pending sparks into a
928          * thread... 
929          */
930
931         spark = findSpark(rtsFalse);                /* get a spark */
932         if (spark != (rtsSpark) NULL) {
933           tso = activateSpark(spark);       /* turn the spark into a thread */
934           IF_PAR_DEBUG(schedule,
935                        belch("==== schedule: Created TSO %d (%p); %d threads active",
936                              tso->id, tso, advisory_thread_count));
937
938           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
939             belch("==^^ failed to activate spark");
940             goto next_thread;
941           }               /* otherwise fall through & pick-up new tso */
942         } else {
943           IF_PAR_DEBUG(verbose,
944                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
945                              spark_queue_len(pool)));
946           goto next_thread;
947         }
948       }
949
950       /* If we still have no work we need to send a FISH to get a spark
951          from another PE 
952       */
953       if (EMPTY_RUN_QUEUE()) {
954       /* =8-[  no local sparks => look for work on other PEs */
955         /*
956          * We really have absolutely no work.  Send out a fish
957          * (there may be some out there already), and wait for
958          * something to arrive.  We clearly can't run any threads
959          * until a SCHEDULE or RESUME arrives, and so that's what
960          * we're hoping to see.  (Of course, we still have to
961          * respond to other types of messages.)
962          */
963         TIME now = msTime() /*CURRENT_TIME*/;
964         IF_PAR_DEBUG(verbose, 
965                      belch("--  now=%ld", now));
966         IF_PAR_DEBUG(verbose,
967                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
968                          (last_fish_arrived_at!=0 &&
969                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
970                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
971                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
972                              last_fish_arrived_at,
973                              RtsFlags.ParFlags.fishDelay, now);
974                      });
975         
976         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
977             (last_fish_arrived_at==0 ||
978              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
979           /* outstandingFishes is set in sendFish, processFish;
980              avoid flooding system with fishes via delay */
981           pe = choosePE();
982           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
983                    NEW_FISH_HUNGER);
984
985           // Global statistics: count no. of fishes
986           if (RtsFlags.ParFlags.ParStats.Global &&
987               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
988             globalParStats.tot_fish_mess++;
989           }
990         }
991       
992         receivedFinish = processMessages();
993         goto next_thread;
994       }
995     } else if (PacketsWaiting()) {  /* Look for incoming messages */
996       receivedFinish = processMessages();
997     }
998
999     /* Now we are sure that we have some work available */
1000     ASSERT(run_queue_hd != END_TSO_QUEUE);
1001
1002     /* Take a thread from the run queue, if we have work */
1003     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
1004     IF_DEBUG(sanity,checkTSO(t));
1005
1006     /* ToDo: write something to the log-file
1007     if (RTSflags.ParFlags.granSimStats && !sameThread)
1008         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1009
1010     CurrentTSO = t;
1011     */
1012     /* the spark pool for the current PE */
1013     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1014
1015     IF_DEBUG(scheduler, 
1016              belch("--=^ %d threads, %d sparks on [%#x]", 
1017                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1018
1019 #if 1
1020     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1021         t && LastTSO && t->id != LastTSO->id && 
1022         LastTSO->why_blocked == NotBlocked && 
1023         LastTSO->what_next != ThreadComplete) {
1024       // if previously scheduled TSO not blocked we have to record the context switch
1025       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1026                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1027     }
1028
1029     if (RtsFlags.ParFlags.ParStats.Full && 
1030         (emitSchedule /* forced emit */ ||
1031         (t && LastTSO && t->id != LastTSO->id))) {
1032       /* 
1033          we are running a different TSO, so write a schedule event to log file
1034          NB: If we use fair scheduling we also have to write  a deschedule 
1035              event for LastTSO; with unfair scheduling we know that the
1036              previous tso has blocked whenever we switch to another tso, so
1037              we don't need it in GUM for now
1038       */
1039       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1040                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1041       emitSchedule = rtsFalse;
1042     }
1043      
1044 #endif
1045 #else /* !GRAN && !PAR */
1046   
1047     /* grab a thread from the run queue */
1048     ASSERT(run_queue_hd != END_TSO_QUEUE);
1049     t = POP_RUN_QUEUE();
1050     // Sanity check the thread we're about to run.  This can be
1051     // expensive if there is lots of thread switching going on...
1052     IF_DEBUG(sanity,checkTSO(t));
1053 #endif
1054     
1055     grabCapability(&cap);
1056     cap->r.rCurrentTSO = t;
1057     
1058     /* context switches are now initiated by the timer signal, unless
1059      * the user specified "context switch as often as possible", with
1060      * +RTS -C0
1061      */
1062     if (
1063 #ifdef PROFILING
1064         RtsFlags.ProfFlags.profileInterval == 0 ||
1065 #endif
1066         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1067          && (run_queue_hd != END_TSO_QUEUE
1068              || blocked_queue_hd != END_TSO_QUEUE
1069              || sleeping_queue != END_TSO_QUEUE)))
1070         context_switch = 1;
1071     else
1072         context_switch = 0;
1073
1074     RELEASE_LOCK(&sched_mutex);
1075
1076     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1077                               t->id, t, whatNext_strs[t->what_next]));
1078
1079 #ifdef PROFILING
1080     startHeapProfTimer();
1081 #endif
1082
1083     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1084     /* Run the current thread 
1085      */
1086     switch (cap->r.rCurrentTSO->what_next) {
1087     case ThreadKilled:
1088     case ThreadComplete:
1089         /* Thread already finished, return to scheduler. */
1090         ret = ThreadFinished;
1091         break;
1092     case ThreadEnterGHC:
1093         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1094         break;
1095     case ThreadRunGHC:
1096         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1097         break;
1098     case ThreadEnterInterp:
1099         ret = interpretBCO(cap);
1100         break;
1101     default:
1102       barf("schedule: invalid what_next field");
1103     }
1104     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1105     
1106     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1107 #ifdef PROFILING
1108     stopHeapProfTimer();
1109     CCCS = CCS_SYSTEM;
1110 #endif
1111     
1112     ACQUIRE_LOCK(&sched_mutex);
1113
1114 #ifdef SMP
1115     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1116 #elif !defined(GRAN) && !defined(PAR)
1117     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1118 #endif
1119     t = cap->r.rCurrentTSO;
1120     
1121 #if defined(PAR)
1122     /* HACK 675: if the last thread didn't yield, make sure to print a 
1123        SCHEDULE event to the log file when StgRunning the next thread, even
1124        if it is the same one as before */
1125     LastTSO = t; 
1126     TimeOfLastYield = CURRENT_TIME;
1127 #endif
1128
1129     switch (ret) {
1130     case HeapOverflow:
1131 #if defined(GRAN)
1132       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1133       globalGranStats.tot_heapover++;
1134 #elif defined(PAR)
1135       globalParStats.tot_heapover++;
1136 #endif
1137
1138       // did the task ask for a large block?
1139       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1140           // if so, get one and push it on the front of the nursery.
1141           bdescr *bd;
1142           nat blocks;
1143           
1144           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1145
1146           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1147                                    t->id, t,
1148                                    whatNext_strs[t->what_next], blocks));
1149
1150           // don't do this if it would push us over the
1151           // alloc_blocks_lim limit; we'll GC first.
1152           if (alloc_blocks + blocks < alloc_blocks_lim) {
1153
1154               alloc_blocks += blocks;
1155               bd = allocGroup( blocks );
1156
1157               // link the new group into the list
1158               bd->link = cap->r.rCurrentNursery;
1159               bd->u.back = cap->r.rCurrentNursery->u.back;
1160               if (cap->r.rCurrentNursery->u.back != NULL) {
1161                   cap->r.rCurrentNursery->u.back->link = bd;
1162               } else {
1163                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1164                          g0s0->blocks == cap->r.rNursery);
1165                   cap->r.rNursery = g0s0->blocks = bd;
1166               }           
1167               cap->r.rCurrentNursery->u.back = bd;
1168
1169               // initialise it as a nursery block
1170               bd->step = g0s0;
1171               bd->gen_no = 0;
1172               bd->flags = 0;
1173               bd->free = bd->start;
1174
1175               // don't forget to update the block count in g0s0.
1176               g0s0->n_blocks += blocks;
1177               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1178
1179               // now update the nursery to point to the new block
1180               cap->r.rCurrentNursery = bd;
1181
1182               // we might be unlucky and have another thread get on the
1183               // run queue before us and steal the large block, but in that
1184               // case the thread will just end up requesting another large
1185               // block.
1186               PUSH_ON_RUN_QUEUE(t);
1187               break;
1188           }
1189       }
1190
1191       /* make all the running tasks block on a condition variable,
1192        * maybe set context_switch and wait till they all pile in,
1193        * then have them wait on a GC condition variable.
1194        */
1195       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1196                                t->id, t, whatNext_strs[t->what_next]));
1197       threadPaused(t);
1198 #if defined(GRAN)
1199       ASSERT(!is_on_queue(t,CurrentProc));
1200 #elif defined(PAR)
1201       /* Currently we emit a DESCHEDULE event before GC in GUM.
1202          ToDo: either add separate event to distinguish SYSTEM time from rest
1203                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1204       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1205         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1206                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1207         emitSchedule = rtsTrue;
1208       }
1209 #endif
1210       
1211       ready_to_gc = rtsTrue;
1212       context_switch = 1;               /* stop other threads ASAP */
1213       PUSH_ON_RUN_QUEUE(t);
1214       /* actual GC is done at the end of the while loop */
1215       break;
1216       
1217     case StackOverflow:
1218 #if defined(GRAN)
1219       IF_DEBUG(gran, 
1220                DumpGranEvent(GR_DESCHEDULE, t));
1221       globalGranStats.tot_stackover++;
1222 #elif defined(PAR)
1223       // IF_DEBUG(par, 
1224       // DumpGranEvent(GR_DESCHEDULE, t);
1225       globalParStats.tot_stackover++;
1226 #endif
1227       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1228                                t->id, t, whatNext_strs[t->what_next]));
1229       /* just adjust the stack for this thread, then pop it back
1230        * on the run queue.
1231        */
1232       threadPaused(t);
1233       { 
1234         StgMainThread *m;
1235         /* enlarge the stack */
1236         StgTSO *new_t = threadStackOverflow(t);
1237         
1238         /* This TSO has moved, so update any pointers to it from the
1239          * main thread stack.  It better not be on any other queues...
1240          * (it shouldn't be).
1241          */
1242         for (m = main_threads; m != NULL; m = m->link) {
1243           if (m->tso == t) {
1244             m->tso = new_t;
1245           }
1246         }
1247         threadPaused(new_t);
1248         PUSH_ON_RUN_QUEUE(new_t);
1249       }
1250       break;
1251
1252     case ThreadYielding:
1253 #if defined(GRAN)
1254       IF_DEBUG(gran, 
1255                DumpGranEvent(GR_DESCHEDULE, t));
1256       globalGranStats.tot_yields++;
1257 #elif defined(PAR)
1258       // IF_DEBUG(par, 
1259       // DumpGranEvent(GR_DESCHEDULE, t);
1260       globalParStats.tot_yields++;
1261 #endif
1262       /* put the thread back on the run queue.  Then, if we're ready to
1263        * GC, check whether this is the last task to stop.  If so, wake
1264        * up the GC thread.  getThread will block during a GC until the
1265        * GC is finished.
1266        */
1267       IF_DEBUG(scheduler,
1268                if (t->what_next == ThreadEnterInterp) {
1269                    /* ToDo: or maybe a timer expired when we were in Hugs?
1270                     * or maybe someone hit ctrl-C
1271                     */
1272                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1273                          t->id, t, whatNext_strs[t->what_next]);
1274                } else {
1275                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1276                          t->id, t, whatNext_strs[t->what_next]);
1277                }
1278                );
1279
1280       threadPaused(t);
1281
1282       IF_DEBUG(sanity,
1283                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1284                checkTSO(t));
1285       ASSERT(t->link == END_TSO_QUEUE);
1286 #if defined(GRAN)
1287       ASSERT(!is_on_queue(t,CurrentProc));
1288
1289       IF_DEBUG(sanity,
1290                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1291                checkThreadQsSanity(rtsTrue));
1292 #endif
1293 #if defined(PAR)
1294       if (RtsFlags.ParFlags.doFairScheduling) { 
1295         /* this does round-robin scheduling; good for concurrency */
1296         APPEND_TO_RUN_QUEUE(t);
1297       } else {
1298         /* this does unfair scheduling; good for parallelism */
1299         PUSH_ON_RUN_QUEUE(t);
1300       }
1301 #else
1302       /* this does round-robin scheduling; good for concurrency */
1303       APPEND_TO_RUN_QUEUE(t);
1304 #endif
1305 #if defined(GRAN)
1306       /* add a ContinueThread event to actually process the thread */
1307       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1308                 ContinueThread,
1309                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1310       IF_GRAN_DEBUG(bq, 
1311                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1312                G_EVENTQ(0);
1313                G_CURR_THREADQ(0));
1314 #endif /* GRAN */
1315       break;
1316       
1317     case ThreadBlocked:
1318 #if defined(GRAN)
1319       IF_DEBUG(scheduler,
1320                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1321                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1322                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1323
1324       // ??? needed; should emit block before
1325       IF_DEBUG(gran, 
1326                DumpGranEvent(GR_DESCHEDULE, t)); 
1327       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1328       /*
1329         ngoq Dogh!
1330       ASSERT(procStatus[CurrentProc]==Busy || 
1331               ((procStatus[CurrentProc]==Fetching) && 
1332               (t->block_info.closure!=(StgClosure*)NULL)));
1333       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1334           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1335             procStatus[CurrentProc]==Fetching)) 
1336         procStatus[CurrentProc] = Idle;
1337       */
1338 #elif defined(PAR)
1339       IF_DEBUG(scheduler,
1340                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1341                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1342       IF_PAR_DEBUG(bq,
1343
1344                    if (t->block_info.closure!=(StgClosure*)NULL) 
1345                      print_bq(t->block_info.closure));
1346
1347       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1348       blockThread(t);
1349
1350       /* whatever we schedule next, we must log that schedule */
1351       emitSchedule = rtsTrue;
1352
1353 #else /* !GRAN */
1354       /* don't need to do anything.  Either the thread is blocked on
1355        * I/O, in which case we'll have called addToBlockedQueue
1356        * previously, or it's blocked on an MVar or Blackhole, in which
1357        * case it'll be on the relevant queue already.
1358        */
1359       IF_DEBUG(scheduler,
1360                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1361                printThreadBlockage(t);
1362                fprintf(stderr, "\n"));
1363
1364       /* Only for dumping event to log file 
1365          ToDo: do I need this in GranSim, too?
1366       blockThread(t);
1367       */
1368 #endif
1369       threadPaused(t);
1370       break;
1371       
1372     case ThreadFinished:
1373       /* Need to check whether this was a main thread, and if so, signal
1374        * the task that started it with the return value.  If we have no
1375        * more main threads, we probably need to stop all the tasks until
1376        * we get a new one.
1377        */
1378       /* We also end up here if the thread kills itself with an
1379        * uncaught exception, see Exception.hc.
1380        */
1381       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1382 #if defined(GRAN)
1383       endThread(t, CurrentProc); // clean-up the thread
1384 #elif defined(PAR)
1385       /* For now all are advisory -- HWL */
1386       //if(t->priority==AdvisoryPriority) ??
1387       advisory_thread_count--;
1388       
1389 # ifdef DIST
1390       if(t->dist.priority==RevalPriority)
1391         FinishReval(t);
1392 # endif
1393       
1394       if (RtsFlags.ParFlags.ParStats.Full &&
1395           !RtsFlags.ParFlags.ParStats.Suppressed) 
1396         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1397 #endif
1398       break;
1399       
1400     default:
1401       barf("schedule: invalid thread return code %d", (int)ret);
1402     }
1403     
1404 #if defined(RTS_SUPPORTS_THREADS)
1405     /* I don't understand what this re-grab is doing -- sof */
1406     grabCapability(&cap);
1407 #endif
1408
1409 #ifdef PROFILING
1410     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1411         GarbageCollect(GetRoots, rtsTrue);
1412         heapCensus();
1413         performHeapProfile = rtsFalse;
1414         ready_to_gc = rtsFalse; // we already GC'd
1415     }
1416 #endif
1417
1418 #ifdef SMP
1419     if (ready_to_gc && allFreeCapabilities() )
1420 #else
1421     if (ready_to_gc) 
1422 #endif
1423       {
1424       /* everybody back, start the GC.
1425        * Could do it in this thread, or signal a condition var
1426        * to do it in another thread.  Either way, we need to
1427        * broadcast on gc_pending_cond afterward.
1428        */
1429 #if defined(RTS_SUPPORTS_THREADS)
1430       IF_DEBUG(scheduler,sched_belch("doing GC"));
1431 #endif
1432       GarbageCollect(GetRoots,rtsFalse);
1433       ready_to_gc = rtsFalse;
1434 #ifdef SMP
1435       broadcastCondition(&gc_pending_cond);
1436 #endif
1437 #if defined(GRAN)
1438       /* add a ContinueThread event to continue execution of current thread */
1439       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1440                 ContinueThread,
1441                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1442       IF_GRAN_DEBUG(bq, 
1443                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1444                G_EVENTQ(0);
1445                G_CURR_THREADQ(0));
1446 #endif /* GRAN */
1447     }
1448
1449 #if defined(GRAN)
1450   next_thread:
1451     IF_GRAN_DEBUG(unused,
1452                   print_eventq(EventHd));
1453
1454     event = get_next_event();
1455 #elif defined(PAR)
1456   next_thread:
1457     /* ToDo: wait for next message to arrive rather than busy wait */
1458 #endif /* GRAN */
1459
1460   } /* end of while(1) */
1461
1462   IF_PAR_DEBUG(verbose,
1463                belch("== Leaving schedule() after having received Finish"));
1464 }
1465
1466 /* ---------------------------------------------------------------------------
1467  * deleteAllThreads():  kill all the live threads.
1468  *
1469  * This is used when we catch a user interrupt (^C), before performing
1470  * any necessary cleanups and running finalizers.
1471  * ------------------------------------------------------------------------- */
1472    
1473 void deleteAllThreads ( void )
1474 {
1475   StgTSO* t, *next;
1476   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1477   for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1478       next = t->link;
1479       deleteThread(t);
1480   }
1481   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1482       next = t->link;
1483       deleteThread(t);
1484   }
1485   for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1486       next = t->link;
1487       deleteThread(t);
1488   }
1489   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1490   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1491   sleeping_queue = END_TSO_QUEUE;
1492 }
1493
1494 /* startThread and  insertThread are now in GranSim.c -- HWL */
1495
1496
1497 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1498 //@subsection Suspend and Resume
1499
1500 /* ---------------------------------------------------------------------------
1501  * Suspending & resuming Haskell threads.
1502  * 
1503  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1504  * its capability before calling the C function.  This allows another
1505  * task to pick up the capability and carry on running Haskell
1506  * threads.  It also means that if the C call blocks, it won't lock
1507  * the whole system.
1508  *
1509  * The Haskell thread making the C call is put to sleep for the
1510  * duration of the call, on the susepended_ccalling_threads queue.  We
1511  * give out a token to the task, which it can use to resume the thread
1512  * on return from the C function.
1513  * ------------------------------------------------------------------------- */
1514    
1515 StgInt
1516 suspendThread( StgRegTable *reg )
1517 {
1518   nat tok;
1519   Capability *cap;
1520
1521   /* assume that *reg is a pointer to the StgRegTable part
1522    * of a Capability.
1523    */
1524   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1525
1526   ACQUIRE_LOCK(&sched_mutex);
1527
1528   IF_DEBUG(scheduler,
1529            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1530
1531   threadPaused(cap->r.rCurrentTSO);
1532   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1533   suspended_ccalling_threads = cap->r.rCurrentTSO;
1534
1535 #if defined(RTS_SUPPORTS_THREADS)
1536   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1537 #endif
1538
1539   /* Use the thread ID as the token; it should be unique */
1540   tok = cap->r.rCurrentTSO->id;
1541
1542   /* Hand back capability */
1543   releaseCapability(cap);
1544   
1545 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1546   /* Preparing to leave the RTS, so ensure there's a native thread/task
1547      waiting to take over.
1548      
1549      ToDo: optimise this and only create a new task if there's a need
1550      for one (i.e., if there's only one Concurrent Haskell thread alive,
1551      there's no need to create a new task).
1552   */
1553   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok));
1554   startTask(taskStart);
1555 #endif
1556
1557   THREAD_RUNNABLE();
1558   RELEASE_LOCK(&sched_mutex);
1559   return tok; 
1560 }
1561
1562 StgRegTable *
1563 resumeThread( StgInt tok )
1564 {
1565   StgTSO *tso, **prev;
1566   Capability *cap;
1567
1568 #if defined(RTS_SUPPORTS_THREADS)
1569   IF_DEBUG(scheduler, sched_belch("worker %d: returning, waiting for sched. lock.\n", tok));
1570   ACQUIRE_LOCK(&sched_mutex);
1571   rts_n_waiting_workers++;
1572   IF_DEBUG(scheduler, sched_belch("worker %d: returning; workers waiting: %d.\n", tok, rts_n_waiting_workers));
1573
1574   /*
1575    * Wait for the go ahead
1576    */
1577   IF_DEBUG(scheduler, sched_belch("worker %d: waiting for capability %d...\n", tok, rts_n_free_capabilities));
1578   while ( noCapabilities() ) {
1579     waitCondition(&returning_worker_cond, &sched_mutex);
1580   }
1581   rts_n_waiting_workers--;
1582
1583   IF_DEBUG(scheduler, sched_belch("worker %d: acquired capability...\n", tok));
1584 #endif
1585
1586   /* Remove the thread off of the suspended list */
1587   prev = &suspended_ccalling_threads;
1588   for (tso = suspended_ccalling_threads; 
1589        tso != END_TSO_QUEUE; 
1590        prev = &tso->link, tso = tso->link) {
1591     if (tso->id == (StgThreadID)tok) {
1592       *prev = tso->link;
1593       break;
1594     }
1595   }
1596   if (tso == END_TSO_QUEUE) {
1597     barf("resumeThread: thread not found");
1598   }
1599   tso->link = END_TSO_QUEUE;
1600
1601 #if defined(RTS_SUPPORTS_THREADS)
1602   /* Is it clever to block here with the TSO off the list,
1603    * but not hooked up to a capability?
1604    */
1605   while ( noCapabilities() ) {
1606     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1607     rts_n_waiting_tasks++;
1608     waitCondition(&thread_ready_cond, &sched_mutex);
1609     rts_n_waiting_tasks--;
1610     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1611   }
1612 #endif
1613
1614   grabCapability(&cap);
1615   RELEASE_LOCK(&sched_mutex);
1616
1617   /* Reset blocking status */
1618   tso->why_blocked  = NotBlocked;
1619
1620   cap->r.rCurrentTSO = tso;
1621
1622   return &cap->r;
1623 }
1624
1625
1626 /* ---------------------------------------------------------------------------
1627  * Static functions
1628  * ------------------------------------------------------------------------ */
1629 static void unblockThread(StgTSO *tso);
1630
1631 /* ---------------------------------------------------------------------------
1632  * Comparing Thread ids.
1633  *
1634  * This is used from STG land in the implementation of the
1635  * instances of Eq/Ord for ThreadIds.
1636  * ------------------------------------------------------------------------ */
1637
1638 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1639
1640   StgThreadID id1 = tso1->id; 
1641   StgThreadID id2 = tso2->id;
1642  
1643   if (id1 < id2) return (-1);
1644   if (id1 > id2) return 1;
1645   return 0;
1646 }
1647
1648 /* ---------------------------------------------------------------------------
1649  * Fetching the ThreadID from an StgTSO.
1650  *
1651  * This is used in the implementation of Show for ThreadIds.
1652  * ------------------------------------------------------------------------ */
1653 int rts_getThreadId(const StgTSO *tso) 
1654 {
1655   return tso->id;
1656 }
1657
1658 /* ---------------------------------------------------------------------------
1659    Create a new thread.
1660
1661    The new thread starts with the given stack size.  Before the
1662    scheduler can run, however, this thread needs to have a closure
1663    (and possibly some arguments) pushed on its stack.  See
1664    pushClosure() in Schedule.h.
1665
1666    createGenThread() and createIOThread() (in SchedAPI.h) are
1667    convenient packaged versions of this function.
1668
1669    currently pri (priority) is only used in a GRAN setup -- HWL
1670    ------------------------------------------------------------------------ */
1671 //@cindex createThread
1672 #if defined(GRAN)
1673 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1674 StgTSO *
1675 createThread(nat stack_size, StgInt pri)
1676 {
1677   return createThread_(stack_size, rtsFalse, pri);
1678 }
1679
1680 static StgTSO *
1681 createThread_(nat size, rtsBool have_lock, StgInt pri)
1682 {
1683 #else
1684 StgTSO *
1685 createThread(nat stack_size)
1686 {
1687   return createThread_(stack_size, rtsFalse);
1688 }
1689
1690 static StgTSO *
1691 createThread_(nat size, rtsBool have_lock)
1692 {
1693 #endif
1694
1695     StgTSO *tso;
1696     nat stack_size;
1697
1698     /* First check whether we should create a thread at all */
1699 #if defined(PAR)
1700   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1701   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1702     threadsIgnored++;
1703     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1704           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1705     return END_TSO_QUEUE;
1706   }
1707   threadsCreated++;
1708 #endif
1709
1710 #if defined(GRAN)
1711   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1712 #endif
1713
1714   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1715
1716   /* catch ridiculously small stack sizes */
1717   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1718     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1719   }
1720
1721   stack_size = size - TSO_STRUCT_SIZEW;
1722
1723   tso = (StgTSO *)allocate(size);
1724   TICK_ALLOC_TSO(stack_size, 0);
1725
1726   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1727 #if defined(GRAN)
1728   SET_GRAN_HDR(tso, ThisPE);
1729 #endif
1730   tso->what_next     = ThreadEnterGHC;
1731
1732   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1733    * protect the increment operation on next_thread_id.
1734    * In future, we could use an atomic increment instead.
1735    */
1736   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1737   tso->id = next_thread_id++; 
1738   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1739
1740   tso->why_blocked  = NotBlocked;
1741   tso->blocked_exceptions = NULL;
1742
1743   tso->stack_size   = stack_size;
1744   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1745                               - TSO_STRUCT_SIZEW;
1746   tso->sp           = (P_)&(tso->stack) + stack_size;
1747
1748 #ifdef PROFILING
1749   tso->prof.CCCS = CCS_MAIN;
1750 #endif
1751
1752   /* put a stop frame on the stack */
1753   tso->sp -= sizeofW(StgStopFrame);
1754   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1755   tso->su = (StgUpdateFrame*)tso->sp;
1756
1757   // ToDo: check this
1758 #if defined(GRAN)
1759   tso->link = END_TSO_QUEUE;
1760   /* uses more flexible routine in GranSim */
1761   insertThread(tso, CurrentProc);
1762 #else
1763   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1764    * from its creation
1765    */
1766 #endif
1767
1768 #if defined(GRAN) 
1769   if (RtsFlags.GranFlags.GranSimStats.Full) 
1770     DumpGranEvent(GR_START,tso);
1771 #elif defined(PAR)
1772   if (RtsFlags.ParFlags.ParStats.Full) 
1773     DumpGranEvent(GR_STARTQ,tso);
1774   /* HACk to avoid SCHEDULE 
1775      LastTSO = tso; */
1776 #endif
1777
1778   /* Link the new thread on the global thread list.
1779    */
1780   tso->global_link = all_threads;
1781   all_threads = tso;
1782
1783 #if defined(DIST)
1784   tso->dist.priority = MandatoryPriority; //by default that is...
1785 #endif
1786
1787 #if defined(GRAN)
1788   tso->gran.pri = pri;
1789 # if defined(DEBUG)
1790   tso->gran.magic = TSO_MAGIC; // debugging only
1791 # endif
1792   tso->gran.sparkname   = 0;
1793   tso->gran.startedat   = CURRENT_TIME; 
1794   tso->gran.exported    = 0;
1795   tso->gran.basicblocks = 0;
1796   tso->gran.allocs      = 0;
1797   tso->gran.exectime    = 0;
1798   tso->gran.fetchtime   = 0;
1799   tso->gran.fetchcount  = 0;
1800   tso->gran.blocktime   = 0;
1801   tso->gran.blockcount  = 0;
1802   tso->gran.blockedat   = 0;
1803   tso->gran.globalsparks = 0;
1804   tso->gran.localsparks  = 0;
1805   if (RtsFlags.GranFlags.Light)
1806     tso->gran.clock  = Now; /* local clock */
1807   else
1808     tso->gran.clock  = 0;
1809
1810   IF_DEBUG(gran,printTSO(tso));
1811 #elif defined(PAR)
1812 # if defined(DEBUG)
1813   tso->par.magic = TSO_MAGIC; // debugging only
1814 # endif
1815   tso->par.sparkname   = 0;
1816   tso->par.startedat   = CURRENT_TIME; 
1817   tso->par.exported    = 0;
1818   tso->par.basicblocks = 0;
1819   tso->par.allocs      = 0;
1820   tso->par.exectime    = 0;
1821   tso->par.fetchtime   = 0;
1822   tso->par.fetchcount  = 0;
1823   tso->par.blocktime   = 0;
1824   tso->par.blockcount  = 0;
1825   tso->par.blockedat   = 0;
1826   tso->par.globalsparks = 0;
1827   tso->par.localsparks  = 0;
1828 #endif
1829
1830 #if defined(GRAN)
1831   globalGranStats.tot_threads_created++;
1832   globalGranStats.threads_created_on_PE[CurrentProc]++;
1833   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1834   globalGranStats.tot_sq_probes++;
1835 #elif defined(PAR)
1836   // collect parallel global statistics (currently done together with GC stats)
1837   if (RtsFlags.ParFlags.ParStats.Global &&
1838       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1839     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1840     globalParStats.tot_threads_created++;
1841   }
1842 #endif 
1843
1844 #if defined(GRAN)
1845   IF_GRAN_DEBUG(pri,
1846                 belch("==__ schedule: Created TSO %d (%p);",
1847                       CurrentProc, tso, tso->id));
1848 #elif defined(PAR)
1849     IF_PAR_DEBUG(verbose,
1850                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1851                        tso->id, tso, advisory_thread_count));
1852 #else
1853   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1854                                  tso->id, tso->stack_size));
1855 #endif    
1856   return tso;
1857 }
1858
1859 #if defined(PAR)
1860 /* RFP:
1861    all parallel thread creation calls should fall through the following routine.
1862 */
1863 StgTSO *
1864 createSparkThread(rtsSpark spark) 
1865 { StgTSO *tso;
1866   ASSERT(spark != (rtsSpark)NULL);
1867   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1868   { threadsIgnored++;
1869     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1870           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1871     return END_TSO_QUEUE;
1872   }
1873   else
1874   { threadsCreated++;
1875     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1876     if (tso==END_TSO_QUEUE)     
1877       barf("createSparkThread: Cannot create TSO");
1878 #if defined(DIST)
1879     tso->priority = AdvisoryPriority;
1880 #endif
1881     pushClosure(tso,spark);
1882     PUSH_ON_RUN_QUEUE(tso);
1883     advisory_thread_count++;    
1884   }
1885   return tso;
1886 }
1887 #endif
1888
1889 /*
1890   Turn a spark into a thread.
1891   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1892 */
1893 #if defined(PAR)
1894 //@cindex activateSpark
1895 StgTSO *
1896 activateSpark (rtsSpark spark) 
1897 {
1898   StgTSO *tso;
1899
1900   tso = createSparkThread(spark);
1901   if (RtsFlags.ParFlags.ParStats.Full) {   
1902     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1903     IF_PAR_DEBUG(verbose,
1904                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1905                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1906   }
1907   // ToDo: fwd info on local/global spark to thread -- HWL
1908   // tso->gran.exported =  spark->exported;
1909   // tso->gran.locked =   !spark->global;
1910   // tso->gran.sparkname = spark->name;
1911
1912   return tso;
1913 }
1914 #endif
1915
1916 /* ---------------------------------------------------------------------------
1917  * scheduleThread()
1918  *
1919  * scheduleThread puts a thread on the head of the runnable queue.
1920  * This will usually be done immediately after a thread is created.
1921  * The caller of scheduleThread must create the thread using e.g.
1922  * createThread and push an appropriate closure
1923  * on this thread's stack before the scheduler is invoked.
1924  * ------------------------------------------------------------------------ */
1925
1926 void
1927 scheduleThread_(StgTSO *tso
1928 #if defined(THREADED_RTS)
1929                , rtsBool createTask
1930 #endif
1931               )
1932 {
1933   ACQUIRE_LOCK(&sched_mutex);
1934
1935   /* Put the new thread on the head of the runnable queue.  The caller
1936    * better push an appropriate closure on this thread's stack
1937    * beforehand.  In the SMP case, the thread may start running as
1938    * soon as we release the scheduler lock below.
1939    */
1940   PUSH_ON_RUN_QUEUE(tso);
1941 #if defined(THREADED_RTS)
1942   /* If main() is scheduling a thread, don't bother creating a 
1943    * new task.
1944    */
1945   if ( createTask ) {
1946     startTask(taskStart);
1947   }
1948 #endif
1949   THREAD_RUNNABLE();
1950
1951 #if 0
1952   IF_DEBUG(scheduler,printTSO(tso));
1953 #endif
1954   RELEASE_LOCK(&sched_mutex);
1955 }
1956
1957 void scheduleThread(StgTSO* tso)
1958 {
1959 #if defined(THREADED_RTS)
1960   return scheduleThread_(tso, rtsTrue);
1961 #else
1962   return scheduleThread_(tso);
1963 #endif
1964 }
1965
1966 /* ---------------------------------------------------------------------------
1967  * initScheduler()
1968  *
1969  * Initialise the scheduler.  This resets all the queues - if the
1970  * queues contained any threads, they'll be garbage collected at the
1971  * next pass.
1972  *
1973  * ------------------------------------------------------------------------ */
1974
1975 #ifdef SMP
1976 static void
1977 term_handler(int sig STG_UNUSED)
1978 {
1979   stat_workerStop();
1980   ACQUIRE_LOCK(&term_mutex);
1981   await_death--;
1982   RELEASE_LOCK(&term_mutex);
1983   shutdownThread();
1984 }
1985 #endif
1986
1987 void 
1988 initScheduler(void)
1989 {
1990 #if defined(GRAN)
1991   nat i;
1992
1993   for (i=0; i<=MAX_PROC; i++) {
1994     run_queue_hds[i]      = END_TSO_QUEUE;
1995     run_queue_tls[i]      = END_TSO_QUEUE;
1996     blocked_queue_hds[i]  = END_TSO_QUEUE;
1997     blocked_queue_tls[i]  = END_TSO_QUEUE;
1998     ccalling_threadss[i]  = END_TSO_QUEUE;
1999     sleeping_queue        = END_TSO_QUEUE;
2000   }
2001 #else
2002   run_queue_hd      = END_TSO_QUEUE;
2003   run_queue_tl      = END_TSO_QUEUE;
2004   blocked_queue_hd  = END_TSO_QUEUE;
2005   blocked_queue_tl  = END_TSO_QUEUE;
2006   sleeping_queue    = END_TSO_QUEUE;
2007 #endif 
2008
2009   suspended_ccalling_threads  = END_TSO_QUEUE;
2010
2011   main_threads = NULL;
2012   all_threads  = END_TSO_QUEUE;
2013
2014   context_switch = 0;
2015   interrupted    = 0;
2016
2017   RtsFlags.ConcFlags.ctxtSwitchTicks =
2018       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2019       
2020 #if defined(RTS_SUPPORTS_THREADS)
2021   /* Initialise the mutex and condition variables used by
2022    * the scheduler. */
2023   initMutex(&sched_mutex);
2024   initMutex(&term_mutex);
2025
2026   initCondition(&thread_ready_cond);
2027   initCondition(&returning_worker_cond);
2028 #endif
2029   
2030 #if defined(SMP)
2031   initCondition(&gc_pending_cond);
2032 #endif
2033
2034 #if defined(RTS_SUPPORTS_THREADS)
2035   ACQUIRE_LOCK(&sched_mutex);
2036 #endif
2037
2038   /* Install the SIGHUP handler */
2039 #if defined(SMP)
2040   {
2041     struct sigaction action,oact;
2042
2043     action.sa_handler = term_handler;
2044     sigemptyset(&action.sa_mask);
2045     action.sa_flags = 0;
2046     if (sigaction(SIGTERM, &action, &oact) != 0) {
2047       barf("can't install TERM handler");
2048     }
2049   }
2050 #endif
2051
2052   /* A capability holds the state a native thread needs in
2053    * order to execute STG code. At least one capability is
2054    * floating around (only SMP builds have more than one).
2055    */
2056   initCapabilities();
2057   
2058 #if defined(RTS_SUPPORTS_THREADS)
2059     /* start our haskell execution tasks */
2060 # if defined(SMP)
2061     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2062 # else
2063     startTaskManager(0,taskStart);
2064 # endif
2065 #endif
2066
2067 #if /* defined(SMP) ||*/ defined(PAR)
2068   initSparkPools();
2069 #endif
2070
2071 #if defined(RTS_SUPPORTS_THREADS)
2072   RELEASE_LOCK(&sched_mutex);
2073 #endif
2074
2075 }
2076
2077 void
2078 exitScheduler( void )
2079 {
2080 #if defined(RTS_SUPPORTS_THREADS)
2081   stopTaskManager();
2082 #endif
2083 }
2084
2085 /* -----------------------------------------------------------------------------
2086    Managing the per-task allocation areas.
2087    
2088    Each capability comes with an allocation area.  These are
2089    fixed-length block lists into which allocation can be done.
2090
2091    ToDo: no support for two-space collection at the moment???
2092    -------------------------------------------------------------------------- */
2093
2094 /* -----------------------------------------------------------------------------
2095  * waitThread is the external interface for running a new computation
2096  * and waiting for the result.
2097  *
2098  * In the non-SMP case, we create a new main thread, push it on the 
2099  * main-thread stack, and invoke the scheduler to run it.  The
2100  * scheduler will return when the top main thread on the stack has
2101  * completed or died, and fill in the necessary fields of the
2102  * main_thread structure.
2103  *
2104  * In the SMP case, we create a main thread as before, but we then
2105  * create a new condition variable and sleep on it.  When our new
2106  * main thread has completed, we'll be woken up and the status/result
2107  * will be in the main_thread struct.
2108  * -------------------------------------------------------------------------- */
2109
2110 int 
2111 howManyThreadsAvail ( void )
2112 {
2113    int i = 0;
2114    StgTSO* q;
2115    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2116       i++;
2117    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2118       i++;
2119    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2120       i++;
2121    return i;
2122 }
2123
2124 void
2125 finishAllThreads ( void )
2126 {
2127    do {
2128       while (run_queue_hd != END_TSO_QUEUE) {
2129          waitThread ( run_queue_hd, NULL);
2130       }
2131       while (blocked_queue_hd != END_TSO_QUEUE) {
2132          waitThread ( blocked_queue_hd, NULL);
2133       }
2134       while (sleeping_queue != END_TSO_QUEUE) {
2135          waitThread ( blocked_queue_hd, NULL);
2136       }
2137    } while 
2138       (blocked_queue_hd != END_TSO_QUEUE || 
2139        run_queue_hd     != END_TSO_QUEUE ||
2140        sleeping_queue   != END_TSO_QUEUE);
2141 }
2142
2143 SchedulerStatus
2144 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2145
2146 #if defined(THREADED_RTS)
2147   return waitThread_(tso,ret, rtsFalse);
2148 #else
2149   return waitThread_(tso,ret);
2150 #endif
2151 }
2152
2153 SchedulerStatus
2154 waitThread_(StgTSO *tso,
2155             /*out*/StgClosure **ret
2156 #if defined(THREADED_RTS)
2157             , rtsBool blockWaiting
2158 #endif
2159            )
2160 {
2161   StgMainThread *m;
2162   SchedulerStatus stat;
2163
2164   ACQUIRE_LOCK(&sched_mutex);
2165   
2166   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2167
2168   m->tso = tso;
2169   m->ret = ret;
2170   m->stat = NoStatus;
2171 #if defined(RTS_SUPPORTS_THREADS)
2172   initCondition(&m->wakeup);
2173 #endif
2174
2175   m->link = main_threads;
2176   main_threads = m;
2177
2178   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2179
2180 #if defined(RTS_SUPPORTS_THREADS)
2181
2182 # if defined(THREADED_RTS)
2183   if (!blockWaiting) {
2184     /* In the threaded case, the OS thread that called main()
2185      * gets to enter the RTS directly without going via another
2186      * task/thread.
2187      */
2188     RELEASE_LOCK(&sched_mutex);
2189     schedule();
2190     ASSERT(m->stat != NoStatus);
2191   } else 
2192 # endif
2193   {
2194     IF_DEBUG(scheduler, sched_belch("sfoo"));
2195     do {
2196       waitCondition(&m->wakeup, &sched_mutex);
2197     } while (m->stat == NoStatus);
2198   }
2199 #elif defined(GRAN)
2200   /* GranSim specific init */
2201   CurrentTSO = m->tso;                // the TSO to run
2202   procStatus[MainProc] = Busy;        // status of main PE
2203   CurrentProc = MainProc;             // PE to run it on
2204
2205   schedule();
2206 #else
2207   RELEASE_LOCK(&sched_mutex);
2208   schedule();
2209   ASSERT(m->stat != NoStatus);
2210 #endif
2211
2212   stat = m->stat;
2213
2214 #if defined(RTS_SUPPORTS_THREADS)
2215   closeCondition(&m->wakeup);
2216 #endif
2217
2218   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2219                               m->tso->id));
2220   free(m);
2221
2222 #if defined(THREADED_RTS)
2223   if (blockWaiting) 
2224 #endif
2225     RELEASE_LOCK(&sched_mutex);
2226
2227   return stat;
2228 }
2229
2230 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2231 //@subsection Run queue code 
2232
2233 #if 0
2234 /* 
2235    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2236        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2237        implicit global variable that has to be correct when calling these
2238        fcts -- HWL 
2239 */
2240
2241 /* Put the new thread on the head of the runnable queue.
2242  * The caller of createThread better push an appropriate closure
2243  * on this thread's stack before the scheduler is invoked.
2244  */
2245 static /* inline */ void
2246 add_to_run_queue(tso)
2247 StgTSO* tso; 
2248 {
2249   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2250   tso->link = run_queue_hd;
2251   run_queue_hd = tso;
2252   if (run_queue_tl == END_TSO_QUEUE) {
2253     run_queue_tl = tso;
2254   }
2255 }
2256
2257 /* Put the new thread at the end of the runnable queue. */
2258 static /* inline */ void
2259 push_on_run_queue(tso)
2260 StgTSO* tso; 
2261 {
2262   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2263   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2264   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2265   if (run_queue_hd == END_TSO_QUEUE) {
2266     run_queue_hd = tso;
2267   } else {
2268     run_queue_tl->link = tso;
2269   }
2270   run_queue_tl = tso;
2271 }
2272
2273 /* 
2274    Should be inlined because it's used very often in schedule.  The tso
2275    argument is actually only needed in GranSim, where we want to have the
2276    possibility to schedule *any* TSO on the run queue, irrespective of the
2277    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2278    the run queue and dequeue the tso, adjusting the links in the queue. 
2279 */
2280 //@cindex take_off_run_queue
2281 static /* inline */ StgTSO*
2282 take_off_run_queue(StgTSO *tso) {
2283   StgTSO *t, *prev;
2284
2285   /* 
2286      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2287
2288      if tso is specified, unlink that tso from the run_queue (doesn't have
2289      to be at the beginning of the queue); GranSim only 
2290   */
2291   if (tso!=END_TSO_QUEUE) {
2292     /* find tso in queue */
2293     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2294          t!=END_TSO_QUEUE && t!=tso;
2295          prev=t, t=t->link) 
2296       /* nothing */ ;
2297     ASSERT(t==tso);
2298     /* now actually dequeue the tso */
2299     if (prev!=END_TSO_QUEUE) {
2300       ASSERT(run_queue_hd!=t);
2301       prev->link = t->link;
2302     } else {
2303       /* t is at beginning of thread queue */
2304       ASSERT(run_queue_hd==t);
2305       run_queue_hd = t->link;
2306     }
2307     /* t is at end of thread queue */
2308     if (t->link==END_TSO_QUEUE) {
2309       ASSERT(t==run_queue_tl);
2310       run_queue_tl = prev;
2311     } else {
2312       ASSERT(run_queue_tl!=t);
2313     }
2314     t->link = END_TSO_QUEUE;
2315   } else {
2316     /* take tso from the beginning of the queue; std concurrent code */
2317     t = run_queue_hd;
2318     if (t != END_TSO_QUEUE) {
2319       run_queue_hd = t->link;
2320       t->link = END_TSO_QUEUE;
2321       if (run_queue_hd == END_TSO_QUEUE) {
2322         run_queue_tl = END_TSO_QUEUE;
2323       }
2324     }
2325   }
2326   return t;
2327 }
2328
2329 #endif /* 0 */
2330
2331 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2332 //@subsection Garbage Collextion Routines
2333
2334 /* ---------------------------------------------------------------------------
2335    Where are the roots that we know about?
2336
2337         - all the threads on the runnable queue
2338         - all the threads on the blocked queue
2339         - all the threads on the sleeping queue
2340         - all the thread currently executing a _ccall_GC
2341         - all the "main threads"
2342      
2343    ------------------------------------------------------------------------ */
2344
2345 /* This has to be protected either by the scheduler monitor, or by the
2346         garbage collection monitor (probably the latter).
2347         KH @ 25/10/99
2348 */
2349
2350 void
2351 GetRoots(evac_fn evac)
2352 {
2353   StgMainThread *m;
2354
2355 #if defined(GRAN)
2356   {
2357     nat i;
2358     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2359       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2360           evac((StgClosure **)&run_queue_hds[i]);
2361       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2362           evac((StgClosure **)&run_queue_tls[i]);
2363       
2364       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2365           evac((StgClosure **)&blocked_queue_hds[i]);
2366       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2367           evac((StgClosure **)&blocked_queue_tls[i]);
2368       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2369           evac((StgClosure **)&ccalling_threads[i]);
2370     }
2371   }
2372
2373   markEventQueue();
2374
2375 #else /* !GRAN */
2376   if (run_queue_hd != END_TSO_QUEUE) {
2377       ASSERT(run_queue_tl != END_TSO_QUEUE);
2378       evac((StgClosure **)&run_queue_hd);
2379       evac((StgClosure **)&run_queue_tl);
2380   }
2381   
2382   if (blocked_queue_hd != END_TSO_QUEUE) {
2383       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2384       evac((StgClosure **)&blocked_queue_hd);
2385       evac((StgClosure **)&blocked_queue_tl);
2386   }
2387   
2388   if (sleeping_queue != END_TSO_QUEUE) {
2389       evac((StgClosure **)&sleeping_queue);
2390   }
2391 #endif 
2392
2393   for (m = main_threads; m != NULL; m = m->link) {
2394       evac((StgClosure **)&m->tso);
2395   }
2396   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2397       evac((StgClosure **)&suspended_ccalling_threads);
2398   }
2399
2400 #if defined(PAR) || defined(GRAN)
2401   markSparkQueue(evac);
2402 #endif
2403 }
2404
2405 /* -----------------------------------------------------------------------------
2406    performGC
2407
2408    This is the interface to the garbage collector from Haskell land.
2409    We provide this so that external C code can allocate and garbage
2410    collect when called from Haskell via _ccall_GC.
2411
2412    It might be useful to provide an interface whereby the programmer
2413    can specify more roots (ToDo).
2414    
2415    This needs to be protected by the GC condition variable above.  KH.
2416    -------------------------------------------------------------------------- */
2417
2418 void (*extra_roots)(evac_fn);
2419
2420 void
2421 performGC(void)
2422 {
2423   GarbageCollect(GetRoots,rtsFalse);
2424 }
2425
2426 void
2427 performMajorGC(void)
2428 {
2429   GarbageCollect(GetRoots,rtsTrue);
2430 }
2431
2432 static void
2433 AllRoots(evac_fn evac)
2434 {
2435     GetRoots(evac);             // the scheduler's roots
2436     extra_roots(evac);          // the user's roots
2437 }
2438
2439 void
2440 performGCWithRoots(void (*get_roots)(evac_fn))
2441 {
2442   extra_roots = get_roots;
2443   GarbageCollect(AllRoots,rtsFalse);
2444 }
2445
2446 /* -----------------------------------------------------------------------------
2447    Stack overflow
2448
2449    If the thread has reached its maximum stack size, then raise the
2450    StackOverflow exception in the offending thread.  Otherwise
2451    relocate the TSO into a larger chunk of memory and adjust its stack
2452    size appropriately.
2453    -------------------------------------------------------------------------- */
2454
2455 static StgTSO *
2456 threadStackOverflow(StgTSO *tso)
2457 {
2458   nat new_stack_size, new_tso_size, diff, stack_words;
2459   StgPtr new_sp;
2460   StgTSO *dest;
2461
2462   IF_DEBUG(sanity,checkTSO(tso));
2463   if (tso->stack_size >= tso->max_stack_size) {
2464
2465     IF_DEBUG(gc,
2466              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2467                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2468              /* If we're debugging, just print out the top of the stack */
2469              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2470                                               tso->sp+64)));
2471
2472     /* Send this thread the StackOverflow exception */
2473     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2474     return tso;
2475   }
2476
2477   /* Try to double the current stack size.  If that takes us over the
2478    * maximum stack size for this thread, then use the maximum instead.
2479    * Finally round up so the TSO ends up as a whole number of blocks.
2480    */
2481   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2482   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2483                                        TSO_STRUCT_SIZE)/sizeof(W_);
2484   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2485   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2486
2487   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2488
2489   dest = (StgTSO *)allocate(new_tso_size);
2490   TICK_ALLOC_TSO(new_stack_size,0);
2491
2492   /* copy the TSO block and the old stack into the new area */
2493   memcpy(dest,tso,TSO_STRUCT_SIZE);
2494   stack_words = tso->stack + tso->stack_size - tso->sp;
2495   new_sp = (P_)dest + new_tso_size - stack_words;
2496   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2497
2498   /* relocate the stack pointers... */
2499   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2500   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2501   dest->sp    = new_sp;
2502   dest->stack_size = new_stack_size;
2503         
2504   /* and relocate the update frame list */
2505   relocate_stack(dest, diff);
2506
2507   /* Mark the old TSO as relocated.  We have to check for relocated
2508    * TSOs in the garbage collector and any primops that deal with TSOs.
2509    *
2510    * It's important to set the sp and su values to just beyond the end
2511    * of the stack, so we don't attempt to scavenge any part of the
2512    * dead TSO's stack.
2513    */
2514   tso->what_next = ThreadRelocated;
2515   tso->link = dest;
2516   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2517   tso->su = (StgUpdateFrame *)tso->sp;
2518   tso->why_blocked = NotBlocked;
2519   dest->mut_link = NULL;
2520
2521   IF_PAR_DEBUG(verbose,
2522                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2523                      tso->id, tso, tso->stack_size);
2524                /* If we're debugging, just print out the top of the stack */
2525                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2526                                                 tso->sp+64)));
2527   
2528   IF_DEBUG(sanity,checkTSO(tso));
2529 #if 0
2530   IF_DEBUG(scheduler,printTSO(dest));
2531 #endif
2532
2533   return dest;
2534 }
2535
2536 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2537 //@subsection Blocking Queue Routines
2538
2539 /* ---------------------------------------------------------------------------
2540    Wake up a queue that was blocked on some resource.
2541    ------------------------------------------------------------------------ */
2542
2543 #if defined(GRAN)
2544 static inline void
2545 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2546 {
2547 }
2548 #elif defined(PAR)
2549 static inline void
2550 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2551 {
2552   /* write RESUME events to log file and
2553      update blocked and fetch time (depending on type of the orig closure) */
2554   if (RtsFlags.ParFlags.ParStats.Full) {
2555     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2556                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2557                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2558     if (EMPTY_RUN_QUEUE())
2559       emitSchedule = rtsTrue;
2560
2561     switch (get_itbl(node)->type) {
2562         case FETCH_ME_BQ:
2563           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2564           break;
2565         case RBH:
2566         case FETCH_ME:
2567         case BLACKHOLE_BQ:
2568           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2569           break;
2570 #ifdef DIST
2571         case MVAR:
2572           break;
2573 #endif    
2574         default:
2575           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2576         }
2577       }
2578 }
2579 #endif
2580
2581 #if defined(GRAN)
2582 static StgBlockingQueueElement *
2583 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2584 {
2585     StgTSO *tso;
2586     PEs node_loc, tso_loc;
2587
2588     node_loc = where_is(node); // should be lifted out of loop
2589     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2590     tso_loc = where_is((StgClosure *)tso);
2591     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2592       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2593       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2594       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2595       // insertThread(tso, node_loc);
2596       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2597                 ResumeThread,
2598                 tso, node, (rtsSpark*)NULL);
2599       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2600       // len_local++;
2601       // len++;
2602     } else { // TSO is remote (actually should be FMBQ)
2603       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2604                                   RtsFlags.GranFlags.Costs.gunblocktime +
2605                                   RtsFlags.GranFlags.Costs.latency;
2606       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2607                 UnblockThread,
2608                 tso, node, (rtsSpark*)NULL);
2609       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2610       // len++;
2611     }
2612     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2613     IF_GRAN_DEBUG(bq,
2614                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2615                           (node_loc==tso_loc ? "Local" : "Global"), 
2616                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2617     tso->block_info.closure = NULL;
2618     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2619                              tso->id, tso));
2620 }
2621 #elif defined(PAR)
2622 static StgBlockingQueueElement *
2623 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2624 {
2625     StgBlockingQueueElement *next;
2626
2627     switch (get_itbl(bqe)->type) {
2628     case TSO:
2629       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2630       /* if it's a TSO just push it onto the run_queue */
2631       next = bqe->link;
2632       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2633       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2634       THREAD_RUNNABLE();
2635       unblockCount(bqe, node);
2636       /* reset blocking status after dumping event */
2637       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2638       break;
2639
2640     case BLOCKED_FETCH:
2641       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2642       next = bqe->link;
2643       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2644       PendingFetches = (StgBlockedFetch *)bqe;
2645       break;
2646
2647 # if defined(DEBUG)
2648       /* can ignore this case in a non-debugging setup; 
2649          see comments on RBHSave closures above */
2650     case CONSTR:
2651       /* check that the closure is an RBHSave closure */
2652       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2653              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2654              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2655       break;
2656
2657     default:
2658       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2659            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2660            (StgClosure *)bqe);
2661 # endif
2662     }
2663   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2664   return next;
2665 }
2666
2667 #else /* !GRAN && !PAR */
2668 static StgTSO *
2669 unblockOneLocked(StgTSO *tso)
2670 {
2671   StgTSO *next;
2672
2673   ASSERT(get_itbl(tso)->type == TSO);
2674   ASSERT(tso->why_blocked != NotBlocked);
2675   tso->why_blocked = NotBlocked;
2676   next = tso->link;
2677   PUSH_ON_RUN_QUEUE(tso);
2678   THREAD_RUNNABLE();
2679   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2680   return next;
2681 }
2682 #endif
2683
2684 #if defined(GRAN) || defined(PAR)
2685 inline StgBlockingQueueElement *
2686 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2687 {
2688   ACQUIRE_LOCK(&sched_mutex);
2689   bqe = unblockOneLocked(bqe, node);
2690   RELEASE_LOCK(&sched_mutex);
2691   return bqe;
2692 }
2693 #else
2694 inline StgTSO *
2695 unblockOne(StgTSO *tso)
2696 {
2697   ACQUIRE_LOCK(&sched_mutex);
2698   tso = unblockOneLocked(tso);
2699   RELEASE_LOCK(&sched_mutex);
2700   return tso;
2701 }
2702 #endif
2703
2704 #if defined(GRAN)
2705 void 
2706 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2707 {
2708   StgBlockingQueueElement *bqe;
2709   PEs node_loc;
2710   nat len = 0; 
2711
2712   IF_GRAN_DEBUG(bq, 
2713                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2714                       node, CurrentProc, CurrentTime[CurrentProc], 
2715                       CurrentTSO->id, CurrentTSO));
2716
2717   node_loc = where_is(node);
2718
2719   ASSERT(q == END_BQ_QUEUE ||
2720          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2721          get_itbl(q)->type == CONSTR); // closure (type constructor)
2722   ASSERT(is_unique(node));
2723
2724   /* FAKE FETCH: magically copy the node to the tso's proc;
2725      no Fetch necessary because in reality the node should not have been 
2726      moved to the other PE in the first place
2727   */
2728   if (CurrentProc!=node_loc) {
2729     IF_GRAN_DEBUG(bq, 
2730                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2731                         node, node_loc, CurrentProc, CurrentTSO->id, 
2732                         // CurrentTSO, where_is(CurrentTSO),
2733                         node->header.gran.procs));
2734     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2735     IF_GRAN_DEBUG(bq, 
2736                   belch("## new bitmask of node %p is %#x",
2737                         node, node->header.gran.procs));
2738     if (RtsFlags.GranFlags.GranSimStats.Global) {
2739       globalGranStats.tot_fake_fetches++;
2740     }
2741   }
2742
2743   bqe = q;
2744   // ToDo: check: ASSERT(CurrentProc==node_loc);
2745   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2746     //next = bqe->link;
2747     /* 
2748        bqe points to the current element in the queue
2749        next points to the next element in the queue
2750     */
2751     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2752     //tso_loc = where_is(tso);
2753     len++;
2754     bqe = unblockOneLocked(bqe, node);
2755   }
2756
2757   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2758      the closure to make room for the anchor of the BQ */
2759   if (bqe!=END_BQ_QUEUE) {
2760     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2761     /*
2762     ASSERT((info_ptr==&RBH_Save_0_info) ||
2763            (info_ptr==&RBH_Save_1_info) ||
2764            (info_ptr==&RBH_Save_2_info));
2765     */
2766     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2767     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2768     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2769
2770     IF_GRAN_DEBUG(bq,
2771                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2772                         node, info_type(node)));
2773   }
2774
2775   /* statistics gathering */
2776   if (RtsFlags.GranFlags.GranSimStats.Global) {
2777     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2778     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2779     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2780     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2781   }
2782   IF_GRAN_DEBUG(bq,
2783                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2784                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2785 }
2786 #elif defined(PAR)
2787 void 
2788 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2789 {
2790   StgBlockingQueueElement *bqe;
2791
2792   ACQUIRE_LOCK(&sched_mutex);
2793
2794   IF_PAR_DEBUG(verbose, 
2795                belch("##-_ AwBQ for node %p on [%x]: ",
2796                      node, mytid));
2797 #ifdef DIST  
2798   //RFP
2799   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2800     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2801     return;
2802   }
2803 #endif
2804   
2805   ASSERT(q == END_BQ_QUEUE ||
2806          get_itbl(q)->type == TSO ||           
2807          get_itbl(q)->type == BLOCKED_FETCH || 
2808          get_itbl(q)->type == CONSTR); 
2809
2810   bqe = q;
2811   while (get_itbl(bqe)->type==TSO || 
2812          get_itbl(bqe)->type==BLOCKED_FETCH) {
2813     bqe = unblockOneLocked(bqe, node);
2814   }
2815   RELEASE_LOCK(&sched_mutex);
2816 }
2817
2818 #else   /* !GRAN && !PAR */
2819 void
2820 awakenBlockedQueue(StgTSO *tso)
2821 {
2822   ACQUIRE_LOCK(&sched_mutex);
2823   while (tso != END_TSO_QUEUE) {
2824     tso = unblockOneLocked(tso);
2825   }
2826   RELEASE_LOCK(&sched_mutex);
2827 }
2828 #endif
2829
2830 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2831 //@subsection Exception Handling Routines
2832
2833 /* ---------------------------------------------------------------------------
2834    Interrupt execution
2835    - usually called inside a signal handler so it mustn't do anything fancy.   
2836    ------------------------------------------------------------------------ */
2837
2838 void
2839 interruptStgRts(void)
2840 {
2841     interrupted    = 1;
2842     context_switch = 1;
2843 }
2844
2845 /* -----------------------------------------------------------------------------
2846    Unblock a thread
2847
2848    This is for use when we raise an exception in another thread, which
2849    may be blocked.
2850    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2851    -------------------------------------------------------------------------- */
2852
2853 #if defined(GRAN) || defined(PAR)
2854 /*
2855   NB: only the type of the blocking queue is different in GranSim and GUM
2856       the operations on the queue-elements are the same
2857       long live polymorphism!
2858 */
2859 static void
2860 unblockThread(StgTSO *tso)
2861 {
2862   StgBlockingQueueElement *t, **last;
2863
2864   ACQUIRE_LOCK(&sched_mutex);
2865   switch (tso->why_blocked) {
2866
2867   case NotBlocked:
2868     return;  /* not blocked */
2869
2870   case BlockedOnMVar:
2871     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2872     {
2873       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2874       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2875
2876       last = (StgBlockingQueueElement **)&mvar->head;
2877       for (t = (StgBlockingQueueElement *)mvar->head; 
2878            t != END_BQ_QUEUE; 
2879            last = &t->link, last_tso = t, t = t->link) {
2880         if (t == (StgBlockingQueueElement *)tso) {
2881           *last = (StgBlockingQueueElement *)tso->link;
2882           if (mvar->tail == tso) {
2883             mvar->tail = (StgTSO *)last_tso;
2884           }
2885           goto done;
2886         }
2887       }
2888       barf("unblockThread (MVAR): TSO not found");
2889     }
2890
2891   case BlockedOnBlackHole:
2892     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2893     {
2894       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2895
2896       last = &bq->blocking_queue;
2897       for (t = bq->blocking_queue; 
2898            t != END_BQ_QUEUE; 
2899            last = &t->link, t = t->link) {
2900         if (t == (StgBlockingQueueElement *)tso) {
2901           *last = (StgBlockingQueueElement *)tso->link;
2902           goto done;
2903         }
2904       }
2905       barf("unblockThread (BLACKHOLE): TSO not found");
2906     }
2907
2908   case BlockedOnException:
2909     {
2910       StgTSO *target  = tso->block_info.tso;
2911
2912       ASSERT(get_itbl(target)->type == TSO);
2913
2914       if (target->what_next == ThreadRelocated) {
2915           target = target->link;
2916           ASSERT(get_itbl(target)->type == TSO);
2917       }
2918
2919       ASSERT(target->blocked_exceptions != NULL);
2920
2921       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2922       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2923            t != END_BQ_QUEUE; 
2924            last = &t->link, t = t->link) {
2925         ASSERT(get_itbl(t)->type == TSO);
2926         if (t == (StgBlockingQueueElement *)tso) {
2927           *last = (StgBlockingQueueElement *)tso->link;
2928           goto done;
2929         }
2930       }
2931       barf("unblockThread (Exception): TSO not found");
2932     }
2933
2934   case BlockedOnRead:
2935   case BlockedOnWrite:
2936     {
2937       /* take TSO off blocked_queue */
2938       StgBlockingQueueElement *prev = NULL;
2939       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2940            prev = t, t = t->link) {
2941         if (t == (StgBlockingQueueElement *)tso) {
2942           if (prev == NULL) {
2943             blocked_queue_hd = (StgTSO *)t->link;
2944             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2945               blocked_queue_tl = END_TSO_QUEUE;
2946             }
2947           } else {
2948             prev->link = t->link;
2949             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2950               blocked_queue_tl = (StgTSO *)prev;
2951             }
2952           }
2953           goto done;
2954         }
2955       }
2956       barf("unblockThread (I/O): TSO not found");
2957     }
2958
2959   case BlockedOnDelay:
2960     {
2961       /* take TSO off sleeping_queue */
2962       StgBlockingQueueElement *prev = NULL;
2963       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2964            prev = t, t = t->link) {
2965         if (t == (StgBlockingQueueElement *)tso) {
2966           if (prev == NULL) {
2967             sleeping_queue = (StgTSO *)t->link;
2968           } else {
2969             prev->link = t->link;
2970           }
2971           goto done;
2972         }
2973       }
2974       barf("unblockThread (I/O): TSO not found");
2975     }
2976
2977   default:
2978     barf("unblockThread");
2979   }
2980
2981  done:
2982   tso->link = END_TSO_QUEUE;
2983   tso->why_blocked = NotBlocked;
2984   tso->block_info.closure = NULL;
2985   PUSH_ON_RUN_QUEUE(tso);
2986   RELEASE_LOCK(&sched_mutex);
2987 }
2988 #else
2989 static void
2990 unblockThread(StgTSO *tso)
2991 {
2992   StgTSO *t, **last;
2993
2994   ACQUIRE_LOCK(&sched_mutex);
2995   switch (tso->why_blocked) {
2996
2997   case NotBlocked:
2998     return;  /* not blocked */
2999
3000   case BlockedOnMVar:
3001     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3002     {
3003       StgTSO *last_tso = END_TSO_QUEUE;
3004       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3005
3006       last = &mvar->head;
3007       for (t = mvar->head; t != END_TSO_QUEUE; 
3008            last = &t->link, last_tso = t, t = t->link) {
3009         if (t == tso) {
3010           *last = tso->link;
3011           if (mvar->tail == tso) {
3012             mvar->tail = last_tso;
3013           }
3014           goto done;
3015         }
3016       }
3017       barf("unblockThread (MVAR): TSO not found");
3018     }
3019
3020   case BlockedOnBlackHole:
3021     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3022     {
3023       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3024
3025       last = &bq->blocking_queue;
3026       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3027            last = &t->link, t = t->link) {
3028         if (t == tso) {
3029           *last = tso->link;
3030           goto done;
3031         }
3032       }
3033       barf("unblockThread (BLACKHOLE): TSO not found");
3034     }
3035
3036   case BlockedOnException:
3037     {
3038       StgTSO *target  = tso->block_info.tso;
3039
3040       ASSERT(get_itbl(target)->type == TSO);
3041
3042       while (target->what_next == ThreadRelocated) {
3043           target = target->link;
3044           ASSERT(get_itbl(target)->type == TSO);
3045       }
3046       
3047       ASSERT(target->blocked_exceptions != NULL);
3048
3049       last = &target->blocked_exceptions;
3050       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3051            last = &t->link, t = t->link) {
3052         ASSERT(get_itbl(t)->type == TSO);
3053         if (t == tso) {
3054           *last = tso->link;
3055           goto done;
3056         }
3057       }
3058       barf("unblockThread (Exception): TSO not found");
3059     }
3060
3061   case BlockedOnRead:
3062   case BlockedOnWrite:
3063     {
3064       StgTSO *prev = NULL;
3065       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3066            prev = t, t = t->link) {
3067         if (t == tso) {
3068           if (prev == NULL) {
3069             blocked_queue_hd = t->link;
3070             if (blocked_queue_tl == t) {
3071               blocked_queue_tl = END_TSO_QUEUE;
3072             }
3073           } else {
3074             prev->link = t->link;
3075             if (blocked_queue_tl == t) {
3076               blocked_queue_tl = prev;
3077             }
3078           }
3079           goto done;
3080         }
3081       }
3082       barf("unblockThread (I/O): TSO not found");
3083     }
3084
3085   case BlockedOnDelay:
3086     {
3087       StgTSO *prev = NULL;
3088       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3089            prev = t, t = t->link) {
3090         if (t == tso) {
3091           if (prev == NULL) {
3092             sleeping_queue = t->link;
3093           } else {
3094             prev->link = t->link;
3095           }
3096           goto done;
3097         }
3098       }
3099       barf("unblockThread (I/O): TSO not found");
3100     }
3101
3102   default:
3103     barf("unblockThread");
3104   }
3105
3106  done:
3107   tso->link = END_TSO_QUEUE;
3108   tso->why_blocked = NotBlocked;
3109   tso->block_info.closure = NULL;
3110   PUSH_ON_RUN_QUEUE(tso);
3111   RELEASE_LOCK(&sched_mutex);
3112 }
3113 #endif
3114
3115 /* -----------------------------------------------------------------------------
3116  * raiseAsync()
3117  *
3118  * The following function implements the magic for raising an
3119  * asynchronous exception in an existing thread.
3120  *
3121  * We first remove the thread from any queue on which it might be
3122  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3123  *
3124  * We strip the stack down to the innermost CATCH_FRAME, building
3125  * thunks in the heap for all the active computations, so they can 
3126  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3127  * an application of the handler to the exception, and push it on
3128  * the top of the stack.
3129  * 
3130  * How exactly do we save all the active computations?  We create an
3131  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3132  * AP_UPDs pushes everything from the corresponding update frame
3133  * upwards onto the stack.  (Actually, it pushes everything up to the
3134  * next update frame plus a pointer to the next AP_UPD object.
3135  * Entering the next AP_UPD object pushes more onto the stack until we
3136  * reach the last AP_UPD object - at which point the stack should look
3137  * exactly as it did when we killed the TSO and we can continue
3138  * execution by entering the closure on top of the stack.
3139  *
3140  * We can also kill a thread entirely - this happens if either (a) the 
3141  * exception passed to raiseAsync is NULL, or (b) there's no
3142  * CATCH_FRAME on the stack.  In either case, we strip the entire
3143  * stack and replace the thread with a zombie.
3144  *
3145  * -------------------------------------------------------------------------- */
3146  
3147 void 
3148 deleteThread(StgTSO *tso)
3149 {
3150   raiseAsync(tso,NULL);
3151 }
3152
3153 void
3154 raiseAsync(StgTSO *tso, StgClosure *exception)
3155 {
3156   StgUpdateFrame* su = tso->su;
3157   StgPtr          sp = tso->sp;
3158   
3159   /* Thread already dead? */
3160   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3161     return;
3162   }
3163
3164   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3165
3166   /* Remove it from any blocking queues */
3167   unblockThread(tso);
3168
3169   /* The stack freezing code assumes there's a closure pointer on
3170    * the top of the stack.  This isn't always the case with compiled
3171    * code, so we have to push a dummy closure on the top which just
3172    * returns to the next return address on the stack.
3173    */
3174   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3175     *(--sp) = (W_)&stg_dummy_ret_closure;
3176   }
3177
3178   while (1) {
3179     nat words = ((P_)su - (P_)sp) - 1;
3180     nat i;
3181     StgAP_UPD * ap;
3182
3183     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3184      * then build PAP(handler,exception,realworld#), and leave it on
3185      * top of the stack ready to enter.
3186      */
3187     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3188       StgCatchFrame *cf = (StgCatchFrame *)su;
3189       /* we've got an exception to raise, so let's pass it to the
3190        * handler in this frame.
3191        */
3192       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3193       TICK_ALLOC_UPD_PAP(3,0);
3194       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3195               
3196       ap->n_args = 2;
3197       ap->fun = cf->handler;    /* :: Exception -> IO a */
3198       ap->payload[0] = exception;
3199       ap->payload[1] = ARG_TAG(0); /* realworld token */
3200
3201       /* throw away the stack from Sp up to and including the
3202        * CATCH_FRAME.
3203        */
3204       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3205       tso->su = cf->link;
3206
3207       /* Restore the blocked/unblocked state for asynchronous exceptions
3208        * at the CATCH_FRAME.  
3209        *
3210        * If exceptions were unblocked at the catch, arrange that they
3211        * are unblocked again after executing the handler by pushing an
3212        * unblockAsyncExceptions_ret stack frame.
3213        */
3214       if (!cf->exceptions_blocked) {
3215         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3216       }
3217       
3218       /* Ensure that async exceptions are blocked when running the handler.
3219        */
3220       if (tso->blocked_exceptions == NULL) {
3221         tso->blocked_exceptions = END_TSO_QUEUE;
3222       }
3223       
3224       /* Put the newly-built PAP on top of the stack, ready to execute
3225        * when the thread restarts.
3226        */
3227       sp[0] = (W_)ap;
3228       tso->sp = sp;
3229       tso->what_next = ThreadEnterGHC;
3230       IF_DEBUG(sanity, checkTSO(tso));
3231       return;
3232     }
3233
3234     /* First build an AP_UPD consisting of the stack chunk above the
3235      * current update frame, with the top word on the stack as the
3236      * fun field.
3237      */
3238     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3239     
3240     ASSERT(words >= 0);
3241     
3242     ap->n_args = words;
3243     ap->fun    = (StgClosure *)sp[0];
3244     sp++;
3245     for(i=0; i < (nat)words; ++i) {
3246       ap->payload[i] = (StgClosure *)*sp++;
3247     }
3248     
3249     switch (get_itbl(su)->type) {
3250       
3251     case UPDATE_FRAME:
3252       {
3253         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3254         TICK_ALLOC_UP_THK(words+1,0);
3255         
3256         IF_DEBUG(scheduler,
3257                  fprintf(stderr,  "scheduler: Updating ");
3258                  printPtr((P_)su->updatee); 
3259                  fprintf(stderr,  " with ");
3260                  printObj((StgClosure *)ap);
3261                  );
3262         
3263         /* Replace the updatee with an indirection - happily
3264          * this will also wake up any threads currently
3265          * waiting on the result.
3266          *
3267          * Warning: if we're in a loop, more than one update frame on
3268          * the stack may point to the same object.  Be careful not to
3269          * overwrite an IND_OLDGEN in this case, because we'll screw
3270          * up the mutable lists.  To be on the safe side, don't
3271          * overwrite any kind of indirection at all.  See also
3272          * threadSqueezeStack in GC.c, where we have to make a similar
3273          * check.
3274          */
3275         if (!closure_IND(su->updatee)) {
3276             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3277         }
3278         su = su->link;
3279         sp += sizeofW(StgUpdateFrame) -1;
3280         sp[0] = (W_)ap; /* push onto stack */
3281         break;
3282       }
3283
3284     case CATCH_FRAME:
3285       {
3286         StgCatchFrame *cf = (StgCatchFrame *)su;
3287         StgClosure* o;
3288         
3289         /* We want a PAP, not an AP_UPD.  Fortunately, the
3290          * layout's the same.
3291          */
3292         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3293         TICK_ALLOC_UPD_PAP(words+1,0);
3294         
3295         /* now build o = FUN(catch,ap,handler) */
3296         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3297         TICK_ALLOC_FUN(2,0);
3298         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3299         o->payload[0] = (StgClosure *)ap;
3300         o->payload[1] = cf->handler;
3301         
3302         IF_DEBUG(scheduler,
3303                  fprintf(stderr,  "scheduler: Built ");
3304                  printObj((StgClosure *)o);
3305                  );
3306         
3307         /* pop the old handler and put o on the stack */
3308         su = cf->link;
3309         sp += sizeofW(StgCatchFrame) - 1;
3310         sp[0] = (W_)o;
3311         break;
3312       }
3313       
3314     case SEQ_FRAME:
3315       {
3316         StgSeqFrame *sf = (StgSeqFrame *)su;
3317         StgClosure* o;
3318         
3319         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3320         TICK_ALLOC_UPD_PAP(words+1,0);
3321         
3322         /* now build o = FUN(seq,ap) */
3323         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3324         TICK_ALLOC_SE_THK(1,0);
3325         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3326         o->payload[0] = (StgClosure *)ap;
3327         
3328         IF_DEBUG(scheduler,
3329                  fprintf(stderr,  "scheduler: Built ");
3330                  printObj((StgClosure *)o);
3331                  );
3332         
3333         /* pop the old handler and put o on the stack */
3334         su = sf->link;
3335         sp += sizeofW(StgSeqFrame) - 1;
3336         sp[0] = (W_)o;
3337         break;
3338       }
3339       
3340     case STOP_FRAME:
3341       /* We've stripped the entire stack, the thread is now dead. */
3342       sp += sizeofW(StgStopFrame) - 1;
3343       sp[0] = (W_)exception;    /* save the exception */
3344       tso->what_next = ThreadKilled;
3345       tso->su = (StgUpdateFrame *)(sp+1);
3346       tso->sp = sp;
3347       return;
3348
3349     default:
3350       barf("raiseAsync");
3351     }
3352   }
3353   barf("raiseAsync");
3354 }
3355
3356 /* -----------------------------------------------------------------------------
3357    resurrectThreads is called after garbage collection on the list of
3358    threads found to be garbage.  Each of these threads will be woken
3359    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3360    on an MVar, or NonTermination if the thread was blocked on a Black
3361    Hole.
3362    -------------------------------------------------------------------------- */
3363
3364 void
3365 resurrectThreads( StgTSO *threads )
3366 {
3367   StgTSO *tso, *next;
3368
3369   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3370     next = tso->global_link;
3371     tso->global_link = all_threads;
3372     all_threads = tso;
3373     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3374
3375     switch (tso->why_blocked) {
3376     case BlockedOnMVar:
3377     case BlockedOnException:
3378       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3379       break;
3380     case BlockedOnBlackHole:
3381       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3382       break;
3383     case NotBlocked:
3384       /* This might happen if the thread was blocked on a black hole
3385        * belonging to a thread that we've just woken up (raiseAsync
3386        * can wake up threads, remember...).
3387        */
3388       continue;
3389     default:
3390       barf("resurrectThreads: thread blocked in a strange way");
3391     }
3392   }
3393 }
3394
3395 /* -----------------------------------------------------------------------------
3396  * Blackhole detection: if we reach a deadlock, test whether any
3397  * threads are blocked on themselves.  Any threads which are found to
3398  * be self-blocked get sent a NonTermination exception.
3399  *
3400  * This is only done in a deadlock situation in order to avoid
3401  * performance overhead in the normal case.
3402  * -------------------------------------------------------------------------- */
3403
3404 static void
3405 detectBlackHoles( void )
3406 {
3407     StgTSO *t = all_threads;
3408     StgUpdateFrame *frame;
3409     StgClosure *blocked_on;
3410
3411     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3412
3413         while (t->what_next == ThreadRelocated) {
3414             t = t->link;
3415             ASSERT(get_itbl(t)->type == TSO);
3416         }
3417       
3418         if (t->why_blocked != BlockedOnBlackHole) {
3419             continue;
3420         }
3421
3422         blocked_on = t->block_info.closure;
3423
3424         for (frame = t->su; ; frame = frame->link) {
3425             switch (get_itbl(frame)->type) {
3426
3427             case UPDATE_FRAME:
3428                 if (frame->updatee == blocked_on) {
3429                     /* We are blocking on one of our own computations, so
3430                      * send this thread the NonTermination exception.  
3431                      */
3432                     IF_DEBUG(scheduler, 
3433                              sched_belch("thread %d is blocked on itself", t->id));
3434                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3435                     goto done;
3436                 }
3437                 else {
3438                     continue;
3439                 }
3440
3441             case CATCH_FRAME:
3442             case SEQ_FRAME:
3443                 continue;
3444                 
3445             case STOP_FRAME:
3446                 break;
3447             }
3448             break;
3449         }
3450
3451     done: ;
3452     }   
3453 }
3454
3455 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3456 //@subsection Debugging Routines
3457
3458 /* -----------------------------------------------------------------------------
3459    Debugging: why is a thread blocked
3460    -------------------------------------------------------------------------- */
3461
3462 #ifdef DEBUG
3463
3464 void
3465 printThreadBlockage(StgTSO *tso)
3466 {
3467   switch (tso->why_blocked) {
3468   case BlockedOnRead:
3469     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3470     break;
3471   case BlockedOnWrite:
3472     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3473     break;
3474   case BlockedOnDelay:
3475     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3476     break;
3477   case BlockedOnMVar:
3478     fprintf(stderr,"is blocked on an MVar");
3479     break;
3480   case BlockedOnException:
3481     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3482             tso->block_info.tso->id);
3483     break;
3484   case BlockedOnBlackHole:
3485     fprintf(stderr,"is blocked on a black hole");
3486     break;
3487   case NotBlocked:
3488     fprintf(stderr,"is not blocked");
3489     break;
3490 #if defined(PAR)
3491   case BlockedOnGA:
3492     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3493             tso->block_info.closure, info_type(tso->block_info.closure));
3494     break;
3495   case BlockedOnGA_NoSend:
3496     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3497             tso->block_info.closure, info_type(tso->block_info.closure));
3498     break;
3499 #endif
3500 #if defined(RTS_SUPPORTS_THREADS)
3501   case BlockedOnCCall:
3502     fprintf(stderr,"is blocked on an external call");
3503     break;
3504 #endif
3505   default:
3506     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3507          tso->why_blocked, tso->id, tso);
3508   }
3509 }
3510
3511 void
3512 printThreadStatus(StgTSO *tso)
3513 {
3514   switch (tso->what_next) {
3515   case ThreadKilled:
3516     fprintf(stderr,"has been killed");
3517     break;
3518   case ThreadComplete:
3519     fprintf(stderr,"has completed");
3520     break;
3521   default:
3522     printThreadBlockage(tso);
3523   }
3524 }
3525
3526 void
3527 printAllThreads(void)
3528 {
3529   StgTSO *t;
3530
3531 # if defined(GRAN)
3532   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3533   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3534                        time_string, rtsFalse/*no commas!*/);
3535
3536   sched_belch("all threads at [%s]:", time_string);
3537 # elif defined(PAR)
3538   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3539   ullong_format_string(CURRENT_TIME,
3540                        time_string, rtsFalse/*no commas!*/);
3541
3542   sched_belch("all threads at [%s]:", time_string);
3543 # else
3544   sched_belch("all threads:");
3545 # endif
3546
3547   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3548     fprintf(stderr, "\tthread %d ", t->id);
3549     printThreadStatus(t);
3550     fprintf(stderr,"\n");
3551   }
3552 }
3553     
3554 /* 
3555    Print a whole blocking queue attached to node (debugging only).
3556 */
3557 //@cindex print_bq
3558 # if defined(PAR)
3559 void 
3560 print_bq (StgClosure *node)
3561 {
3562   StgBlockingQueueElement *bqe;
3563   StgTSO *tso;
3564   rtsBool end;
3565
3566   fprintf(stderr,"## BQ of closure %p (%s): ",
3567           node, info_type(node));
3568
3569   /* should cover all closures that may have a blocking queue */
3570   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3571          get_itbl(node)->type == FETCH_ME_BQ ||
3572          get_itbl(node)->type == RBH ||
3573          get_itbl(node)->type == MVAR);
3574     
3575   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3576
3577   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3578 }
3579
3580 /* 
3581    Print a whole blocking queue starting with the element bqe.
3582 */
3583 void 
3584 print_bqe (StgBlockingQueueElement *bqe)
3585 {
3586   rtsBool end;
3587
3588   /* 
3589      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3590   */
3591   for (end = (bqe==END_BQ_QUEUE);
3592        !end; // iterate until bqe points to a CONSTR
3593        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3594        bqe = end ? END_BQ_QUEUE : bqe->link) {
3595     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3596     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3597     /* types of closures that may appear in a blocking queue */
3598     ASSERT(get_itbl(bqe)->type == TSO ||           
3599            get_itbl(bqe)->type == BLOCKED_FETCH || 
3600            get_itbl(bqe)->type == CONSTR); 
3601     /* only BQs of an RBH end with an RBH_Save closure */
3602     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3603
3604     switch (get_itbl(bqe)->type) {
3605     case TSO:
3606       fprintf(stderr," TSO %u (%x),",
3607               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3608       break;
3609     case BLOCKED_FETCH:
3610       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3611               ((StgBlockedFetch *)bqe)->node, 
3612               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3613               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3614               ((StgBlockedFetch *)bqe)->ga.weight);
3615       break;
3616     case CONSTR:
3617       fprintf(stderr," %s (IP %p),",
3618               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3619                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3620                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3621                "RBH_Save_?"), get_itbl(bqe));
3622       break;
3623     default:
3624       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3625            info_type((StgClosure *)bqe)); // , node, info_type(node));
3626       break;
3627     }
3628   } /* for */
3629   fputc('\n', stderr);
3630 }
3631 # elif defined(GRAN)
3632 void 
3633 print_bq (StgClosure *node)
3634 {
3635   StgBlockingQueueElement *bqe;
3636   PEs node_loc, tso_loc;
3637   rtsBool end;
3638
3639   /* should cover all closures that may have a blocking queue */
3640   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3641          get_itbl(node)->type == FETCH_ME_BQ ||
3642          get_itbl(node)->type == RBH);
3643     
3644   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3645   node_loc = where_is(node);
3646
3647   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3648           node, info_type(node), node_loc);
3649
3650   /* 
3651      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3652   */
3653   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3654        !end; // iterate until bqe points to a CONSTR
3655        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3656     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3657     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3658     /* types of closures that may appear in a blocking queue */
3659     ASSERT(get_itbl(bqe)->type == TSO ||           
3660            get_itbl(bqe)->type == CONSTR); 
3661     /* only BQs of an RBH end with an RBH_Save closure */
3662     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3663
3664     tso_loc = where_is((StgClosure *)bqe);
3665     switch (get_itbl(bqe)->type) {
3666     case TSO:
3667       fprintf(stderr," TSO %d (%p) on [PE %d],",
3668               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3669       break;
3670     case CONSTR:
3671       fprintf(stderr," %s (IP %p),",
3672               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3673                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3674                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3675                "RBH_Save_?"), get_itbl(bqe));
3676       break;
3677     default:
3678       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3679            info_type((StgClosure *)bqe), node, info_type(node));
3680       break;
3681     }
3682   } /* for */
3683   fputc('\n', stderr);
3684 }
3685 #else
3686 /* 
3687    Nice and easy: only TSOs on the blocking queue
3688 */
3689 void 
3690 print_bq (StgClosure *node)
3691 {
3692   StgTSO *tso;
3693
3694   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3695   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3696        tso != END_TSO_QUEUE; 
3697        tso=tso->link) {
3698     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3699     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3700     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3701   }
3702   fputc('\n', stderr);
3703 }
3704 # endif
3705
3706 #if defined(PAR)
3707 static nat
3708 run_queue_len(void)
3709 {
3710   nat i;
3711   StgTSO *tso;
3712
3713   for (i=0, tso=run_queue_hd; 
3714        tso != END_TSO_QUEUE;
3715        i++, tso=tso->link)
3716     /* nothing */
3717
3718   return i;
3719 }
3720 #endif
3721
3722 static void
3723 sched_belch(char *s, ...)
3724 {
3725   va_list ap;
3726   va_start(ap,s);
3727 #ifdef SMP
3728   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3729 #elif defined(PAR)
3730   fprintf(stderr, "== ");
3731 #else
3732   fprintf(stderr, "scheduler: ");
3733 #endif
3734   vfprintf(stderr, s, ap);
3735   fprintf(stderr, "\n");
3736 }
3737
3738 #endif /* DEBUG */
3739
3740
3741 //@node Index,  , Debugging Routines, Main scheduling code
3742 //@subsection Index
3743
3744 //@index
3745 //* MainRegTable::  @cindex\s-+MainRegTable
3746 //* StgMainThread::  @cindex\s-+StgMainThread
3747 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3748 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3749 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3750 //* context_switch::  @cindex\s-+context_switch
3751 //* createThread::  @cindex\s-+createThread
3752 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3753 //* initScheduler::  @cindex\s-+initScheduler
3754 //* interrupted::  @cindex\s-+interrupted
3755 //* next_thread_id::  @cindex\s-+next_thread_id
3756 //* print_bq::  @cindex\s-+print_bq
3757 //* run_queue_hd::  @cindex\s-+run_queue_hd
3758 //* run_queue_tl::  @cindex\s-+run_queue_tl
3759 //* sched_mutex::  @cindex\s-+sched_mutex
3760 //* schedule::  @cindex\s-+schedule
3761 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3762 //* term_mutex::  @cindex\s-+term_mutex
3763 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3764 //@end index