[project @ 2002-04-23 09:56:28 by stolz]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.139 2002/04/23 09:56:28 stolz Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #ifdef HAVE_SYS_TYPES_H
118 #include <sys/types.h>
119 #endif
120 #ifdef HAVE_UNISTD_H
121 #include <unistd.h>
122 #endif
123
124 #include <stdarg.h>
125
126 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
127 //@subsection Variables and Data structures
128
129 /* Main thread queue.
130  * Locks required: sched_mutex.
131  */
132 StgMainThread *main_threads;
133
134 /* Thread queues.
135  * Locks required: sched_mutex.
136  */
137 #if defined(GRAN)
138
139 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
140 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
141
142 /* 
143    In GranSim we have a runnable and a blocked queue for each processor.
144    In order to minimise code changes new arrays run_queue_hds/tls
145    are created. run_queue_hd is then a short cut (macro) for
146    run_queue_hds[CurrentProc] (see GranSim.h).
147    -- HWL
148 */
149 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
150 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
151 StgTSO *ccalling_threadss[MAX_PROC];
152 /* We use the same global list of threads (all_threads) in GranSim as in
153    the std RTS (i.e. we are cheating). However, we don't use this list in
154    the GranSim specific code at the moment (so we are only potentially
155    cheating).  */
156
157 #else /* !GRAN */
158
159 StgTSO *run_queue_hd, *run_queue_tl;
160 StgTSO *blocked_queue_hd, *blocked_queue_tl;
161 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
162
163 #endif
164
165 /* Linked list of all threads.
166  * Used for detecting garbage collected threads.
167  */
168 StgTSO *all_threads;
169
170 /* When a thread performs a safe C call (_ccall_GC, using old
171  * terminology), it gets put on the suspended_ccalling_threads
172  * list. Used by the garbage collector.
173  */
174 static StgTSO *suspended_ccalling_threads;
175
176 static StgTSO *threadStackOverflow(StgTSO *tso);
177
178 /* KH: The following two flags are shared memory locations.  There is no need
179        to lock them, since they are only unset at the end of a scheduler
180        operation.
181 */
182
183 /* flag set by signal handler to precipitate a context switch */
184 //@cindex context_switch
185 nat context_switch;
186
187 /* if this flag is set as well, give up execution */
188 //@cindex interrupted
189 rtsBool interrupted;
190
191 /* Next thread ID to allocate.
192  * Locks required: sched_mutex
193  */
194 //@cindex next_thread_id
195 StgThreadID next_thread_id = 1;
196
197 /*
198  * Pointers to the state of the current thread.
199  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
200  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
201  */
202  
203 /* The smallest stack size that makes any sense is:
204  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
205  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
206  *  + 1                       (the realworld token for an IO thread)
207  *  + 1                       (the closure to enter)
208  *
209  * A thread with this stack will bomb immediately with a stack
210  * overflow, which will increase its stack size.  
211  */
212
213 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
214
215
216 #if defined(GRAN)
217 StgTSO *CurrentTSO;
218 #endif
219
220 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
221  *  exists - earlier gccs apparently didn't.
222  *  -= chak
223  */
224 StgTSO dummy_tso;
225
226 rtsBool ready_to_gc;
227
228 void            addToBlockedQueue ( StgTSO *tso );
229
230 static void     schedule          ( void );
231        void     interruptStgRts   ( void );
232 #if defined(GRAN)
233 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
234 #else
235 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
236 #endif
237
238 static void     detectBlackHoles  ( void );
239
240 #ifdef DEBUG
241 static void sched_belch(char *s, ...);
242 #endif
243
244 #if defined(RTS_SUPPORTS_THREADS)
245 /* ToDo: carefully document the invariants that go together
246  *       with these synchronisation objects.
247  */
248 Mutex     sched_mutex       = INIT_MUTEX_VAR;
249 Mutex     term_mutex        = INIT_MUTEX_VAR;
250
251 # if defined(SMP)
252 static Condition gc_pending_cond = INIT_COND_VAR;
253 nat await_death;
254 # endif
255
256 #endif /* RTS_SUPPORTS_THREADS */
257
258 #if defined(PAR)
259 StgTSO *LastTSO;
260 rtsTime TimeOfLastYield;
261 rtsBool emitSchedule = rtsTrue;
262 #endif
263
264 #if DEBUG
265 char *whatNext_strs[] = {
266   "ThreadEnterGHC",
267   "ThreadRunGHC",
268   "ThreadEnterInterp",
269   "ThreadKilled",
270   "ThreadComplete"
271 };
272
273 char *threadReturnCode_strs[] = {
274   "HeapOverflow",                       /* might also be StackOverflow */
275   "StackOverflow",
276   "ThreadYielding",
277   "ThreadBlocked",
278   "ThreadFinished"
279 };
280 #endif
281
282 #if defined(PAR)
283 StgTSO * createSparkThread(rtsSpark spark);
284 StgTSO * activateSpark (rtsSpark spark);  
285 #endif
286
287 /*
288  * The thread state for the main thread.
289 // ToDo: check whether not needed any more
290 StgTSO   *MainTSO;
291  */
292
293 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
294 static void taskStart(void);
295 static void
296 taskStart(void)
297 {
298   schedule();
299 }
300 #endif
301
302
303
304
305 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
306 //@subsection Main scheduling loop
307
308 /* ---------------------------------------------------------------------------
309    Main scheduling loop.
310
311    We use round-robin scheduling, each thread returning to the
312    scheduler loop when one of these conditions is detected:
313
314       * out of heap space
315       * timer expires (thread yields)
316       * thread blocks
317       * thread ends
318       * stack overflow
319
320    Locking notes:  we acquire the scheduler lock once at the beginning
321    of the scheduler loop, and release it when
322     
323       * running a thread, or
324       * waiting for work, or
325       * waiting for a GC to complete.
326
327    GRAN version:
328      In a GranSim setup this loop iterates over the global event queue.
329      This revolves around the global event queue, which determines what 
330      to do next. Therefore, it's more complicated than either the 
331      concurrent or the parallel (GUM) setup.
332
333    GUM version:
334      GUM iterates over incoming messages.
335      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
336      and sends out a fish whenever it has nothing to do; in-between
337      doing the actual reductions (shared code below) it processes the
338      incoming messages and deals with delayed operations 
339      (see PendingFetches).
340      This is not the ugliest code you could imagine, but it's bloody close.
341
342    ------------------------------------------------------------------------ */
343 //@cindex schedule
344 static void
345 schedule( void )
346 {
347   StgTSO *t;
348   Capability *cap;
349   StgThreadReturnCode ret;
350 #if defined(GRAN)
351   rtsEvent *event;
352 #elif defined(PAR)
353   StgSparkPool *pool;
354   rtsSpark spark;
355   StgTSO *tso;
356   GlobalTaskId pe;
357   rtsBool receivedFinish = rtsFalse;
358 # if defined(DEBUG)
359   nat tp_size, sp_size; // stats only
360 # endif
361 #endif
362   rtsBool was_interrupted = rtsFalse;
363   
364   ACQUIRE_LOCK(&sched_mutex);
365  
366 #if defined(RTS_SUPPORTS_THREADS)
367   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
368 #else
369   /* simply initialise it in the non-threaded case */
370   grabCapability(&cap);
371 #endif
372
373 #if defined(GRAN)
374   /* set up first event to get things going */
375   /* ToDo: assign costs for system setup and init MainTSO ! */
376   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
377             ContinueThread, 
378             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
379
380   IF_DEBUG(gran,
381            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
382            G_TSO(CurrentTSO, 5));
383
384   if (RtsFlags.GranFlags.Light) {
385     /* Save current time; GranSim Light only */
386     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
387   }      
388
389   event = get_next_event();
390
391   while (event!=(rtsEvent*)NULL) {
392     /* Choose the processor with the next event */
393     CurrentProc = event->proc;
394     CurrentTSO = event->tso;
395
396 #elif defined(PAR)
397
398   while (!receivedFinish) {    /* set by processMessages */
399                                /* when receiving PP_FINISH message         */ 
400 #else
401
402   while (1) {
403
404 #endif
405
406     IF_DEBUG(scheduler, printAllThreads());
407
408 #if defined(RTS_SUPPORTS_THREADS)
409     /* Check to see whether there are any worker threads
410        waiting to deposit external call results. If so,
411        yield our capability */
412     yieldToReturningWorker(&sched_mutex, &cap);
413 #endif
414
415     /* If we're interrupted (the user pressed ^C, or some other
416      * termination condition occurred), kill all the currently running
417      * threads.
418      */
419     if (interrupted) {
420       IF_DEBUG(scheduler, sched_belch("interrupted"));
421       deleteAllThreads();
422       interrupted = rtsFalse;
423       was_interrupted = rtsTrue;
424     }
425
426     /* Go through the list of main threads and wake up any
427      * clients whose computations have finished.  ToDo: this
428      * should be done more efficiently without a linear scan
429      * of the main threads list, somehow...
430      */
431 #if defined(RTS_SUPPORTS_THREADS)
432     { 
433       StgMainThread *m, **prev;
434       prev = &main_threads;
435       for (m = main_threads; m != NULL; m = m->link) {
436         switch (m->tso->what_next) {
437         case ThreadComplete:
438           if (m->ret) {
439             *(m->ret) = (StgClosure *)m->tso->sp[0];
440           }
441           *prev = m->link;
442           m->stat = Success;
443           broadcastCondition(&m->wakeup);
444 #ifdef DEBUG
445           free(m->tso->label);
446           m->tso->label = NULL;
447 #endif
448           break;
449         case ThreadKilled:
450           if (m->ret) *(m->ret) = NULL;
451           *prev = m->link;
452           if (was_interrupted) {
453             m->stat = Interrupted;
454           } else {
455             m->stat = Killed;
456           }
457           broadcastCondition(&m->wakeup);
458 #ifdef DEBUG
459           free(m->tso->label);
460           m->tso->label = NULL;
461 #endif
462           break;
463         default:
464           break;
465         }
466       }
467     }
468
469 #else /* not threaded */
470
471 # if defined(PAR)
472     /* in GUM do this only on the Main PE */
473     if (IAmMainThread)
474 # endif
475     /* If our main thread has finished or been killed, return.
476      */
477     {
478       StgMainThread *m = main_threads;
479       if (m->tso->what_next == ThreadComplete
480           || m->tso->what_next == ThreadKilled) {
481 #ifdef DEBUG
482         free(m->tso->label);
483         m->tso->label = NULL;
484 #endif
485         main_threads = main_threads->link;
486         if (m->tso->what_next == ThreadComplete) {
487           /* we finished successfully, fill in the return value */
488           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
489           m->stat = Success;
490           return;
491         } else {
492           if (m->ret) { *(m->ret) = NULL; };
493           if (was_interrupted) {
494             m->stat = Interrupted;
495           } else {
496             m->stat = Killed;
497           }
498           return;
499         }
500       }
501     }
502 #endif
503
504     /* Top up the run queue from our spark pool.  We try to make the
505      * number of threads in the run queue equal to the number of
506      * free capabilities.
507      *
508      * Disable spark support in SMP for now, non-essential & requires
509      * a little bit of work to make it compile cleanly. -- sof 1/02.
510      */
511 #if 0 /* defined(SMP) */
512     {
513       nat n = getFreeCapabilities();
514       StgTSO *tso = run_queue_hd;
515
516       /* Count the run queue */
517       while (n > 0 && tso != END_TSO_QUEUE) {
518         tso = tso->link;
519         n--;
520       }
521
522       for (; n > 0; n--) {
523         StgClosure *spark;
524         spark = findSpark(rtsFalse);
525         if (spark == NULL) {
526           break; /* no more sparks in the pool */
527         } else {
528           /* I'd prefer this to be done in activateSpark -- HWL */
529           /* tricky - it needs to hold the scheduler lock and
530            * not try to re-acquire it -- SDM */
531           createSparkThread(spark);       
532           IF_DEBUG(scheduler,
533                    sched_belch("==^^ turning spark of closure %p into a thread",
534                                (StgClosure *)spark));
535         }
536       }
537       /* We need to wake up the other tasks if we just created some
538        * work for them.
539        */
540       if (getFreeCapabilities() - n > 1) {
541           signalCondition( &thread_ready_cond );
542       }
543     }
544 #endif // SMP
545
546     /* check for signals each time around the scheduler */
547 #ifndef mingw32_TARGET_OS
548     if (signals_pending()) {
549       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
550       startSignalHandlers();
551       ACQUIRE_LOCK(&sched_mutex);
552     }
553 #endif
554
555     /* Check whether any waiting threads need to be woken up.  If the
556      * run queue is empty, and there are no other tasks running, we
557      * can wait indefinitely for something to happen.
558      * ToDo: what if another client comes along & requests another
559      * main thread?
560      */
561     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
562       awaitEvent( EMPTY_RUN_QUEUE()
563 #if defined(SMP)
564         && allFreeCapabilities()
565 #endif
566         );
567     }
568     /* we can be interrupted while waiting for I/O... */
569     if (interrupted) continue;
570
571     /* 
572      * Detect deadlock: when we have no threads to run, there are no
573      * threads waiting on I/O or sleeping, and all the other tasks are
574      * waiting for work, we must have a deadlock of some description.
575      *
576      * We first try to find threads blocked on themselves (ie. black
577      * holes), and generate NonTermination exceptions where necessary.
578      *
579      * If no threads are black holed, we have a deadlock situation, so
580      * inform all the main threads.
581      */
582 #ifndef PAR
583     if (   EMPTY_THREAD_QUEUES()
584 #if defined(RTS_SUPPORTS_THREADS)
585         && EMPTY_QUEUE(suspended_ccalling_threads)
586 #endif
587 #ifdef SMP
588         && allFreeCapabilities()
589 #endif
590         )
591     {
592         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
593 #if defined(THREADED_RTS)
594         /* and SMP mode ..? */
595         releaseCapability(cap);
596 #endif
597         // Garbage collection can release some new threads due to
598         // either (a) finalizers or (b) threads resurrected because
599         // they are about to be send BlockedOnDeadMVar.  Any threads
600         // thus released will be immediately runnable.
601         GarbageCollect(GetRoots,rtsTrue);
602
603         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
604
605         IF_DEBUG(scheduler, 
606                  sched_belch("still deadlocked, checking for black holes..."));
607         detectBlackHoles();
608
609         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
610
611 #ifndef mingw32_TARGET_OS
612         /* If we have user-installed signal handlers, then wait
613          * for signals to arrive rather then bombing out with a
614          * deadlock.
615          */
616 #if defined(RTS_SUPPORTS_THREADS)
617         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
618                       a signal with no runnable threads (or I/O
619                       suspended ones) leads nowhere quick.
620                       For now, simply shut down when we reach this
621                       condition.
622                       
623                       ToDo: define precisely under what conditions
624                       the Scheduler should shut down in an MT setting.
625                    */
626 #else
627         if ( anyUserHandlers() ) {
628 #endif
629             IF_DEBUG(scheduler, 
630                      sched_belch("still deadlocked, waiting for signals..."));
631
632             awaitUserSignals();
633
634             // we might be interrupted...
635             if (interrupted) { continue; }
636
637             if (signals_pending()) {
638                 RELEASE_LOCK(&sched_mutex);
639                 startSignalHandlers();
640                 ACQUIRE_LOCK(&sched_mutex);
641             }
642             ASSERT(!EMPTY_RUN_QUEUE());
643             goto not_deadlocked;
644         }
645 #endif
646
647         /* Probably a real deadlock.  Send the current main thread the
648          * Deadlock exception (or in the SMP build, send *all* main
649          * threads the deadlock exception, since none of them can make
650          * progress).
651          */
652         {
653             StgMainThread *m;
654 #if defined(RTS_SUPPORTS_THREADS)
655             for (m = main_threads; m != NULL; m = m->link) {
656                 switch (m->tso->why_blocked) {
657                 case BlockedOnBlackHole:
658                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
659                     break;
660                 case BlockedOnException:
661                 case BlockedOnMVar:
662                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
663                     break;
664                 default:
665                     barf("deadlock: main thread blocked in a strange way");
666                 }
667             }
668 #else
669             m = main_threads;
670             switch (m->tso->why_blocked) {
671             case BlockedOnBlackHole:
672                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
673                 break;
674             case BlockedOnException:
675             case BlockedOnMVar:
676                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
677                 break;
678             default:
679                 barf("deadlock: main thread blocked in a strange way");
680             }
681 #endif
682         }
683
684 #if defined(RTS_SUPPORTS_THREADS)
685         /* ToDo: revisit conditions (and mechanism) for shutting
686            down a multi-threaded world  */
687         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
688         shutdownHaskellAndExit(0);
689 #endif
690     }
691   not_deadlocked:
692
693 #elif defined(PAR)
694     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
695 #endif
696
697 #if defined(SMP)
698     /* If there's a GC pending, don't do anything until it has
699      * completed.
700      */
701     if (ready_to_gc) {
702       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
703       waitCondition( &gc_pending_cond, &sched_mutex );
704     }
705 #endif    
706
707 #if defined(RTS_SUPPORTS_THREADS)
708     /* block until we've got a thread on the run queue and a free
709      * capability.
710      *
711      */
712     if ( EMPTY_RUN_QUEUE() ) {
713       /* Give up our capability */
714       releaseCapability(cap);
715       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
716       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
717       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
718 #if 0
719       while ( EMPTY_RUN_QUEUE() ) {
720         waitForWorkCapability(&sched_mutex, &cap);
721         IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
722       }
723 #endif
724     }
725 #endif
726
727 #if defined(GRAN)
728     if (RtsFlags.GranFlags.Light)
729       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
730
731     /* adjust time based on time-stamp */
732     if (event->time > CurrentTime[CurrentProc] &&
733         event->evttype != ContinueThread)
734       CurrentTime[CurrentProc] = event->time;
735     
736     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
737     if (!RtsFlags.GranFlags.Light)
738       handleIdlePEs();
739
740     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
741
742     /* main event dispatcher in GranSim */
743     switch (event->evttype) {
744       /* Should just be continuing execution */
745     case ContinueThread:
746       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
747       /* ToDo: check assertion
748       ASSERT(run_queue_hd != (StgTSO*)NULL &&
749              run_queue_hd != END_TSO_QUEUE);
750       */
751       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
752       if (!RtsFlags.GranFlags.DoAsyncFetch &&
753           procStatus[CurrentProc]==Fetching) {
754         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
755               CurrentTSO->id, CurrentTSO, CurrentProc);
756         goto next_thread;
757       } 
758       /* Ignore ContinueThreads for completed threads */
759       if (CurrentTSO->what_next == ThreadComplete) {
760         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
761               CurrentTSO->id, CurrentTSO, CurrentProc);
762         goto next_thread;
763       } 
764       /* Ignore ContinueThreads for threads that are being migrated */
765       if (PROCS(CurrentTSO)==Nowhere) { 
766         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
767               CurrentTSO->id, CurrentTSO, CurrentProc);
768         goto next_thread;
769       }
770       /* The thread should be at the beginning of the run queue */
771       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
772         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
773               CurrentTSO->id, CurrentTSO, CurrentProc);
774         break; // run the thread anyway
775       }
776       /*
777       new_event(proc, proc, CurrentTime[proc],
778                 FindWork,
779                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
780       goto next_thread; 
781       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
782       break; // now actually run the thread; DaH Qu'vam yImuHbej 
783
784     case FetchNode:
785       do_the_fetchnode(event);
786       goto next_thread;             /* handle next event in event queue  */
787       
788     case GlobalBlock:
789       do_the_globalblock(event);
790       goto next_thread;             /* handle next event in event queue  */
791       
792     case FetchReply:
793       do_the_fetchreply(event);
794       goto next_thread;             /* handle next event in event queue  */
795       
796     case UnblockThread:   /* Move from the blocked queue to the tail of */
797       do_the_unblock(event);
798       goto next_thread;             /* handle next event in event queue  */
799       
800     case ResumeThread:  /* Move from the blocked queue to the tail of */
801       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
802       event->tso->gran.blocktime += 
803         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
804       do_the_startthread(event);
805       goto next_thread;             /* handle next event in event queue  */
806       
807     case StartThread:
808       do_the_startthread(event);
809       goto next_thread;             /* handle next event in event queue  */
810       
811     case MoveThread:
812       do_the_movethread(event);
813       goto next_thread;             /* handle next event in event queue  */
814       
815     case MoveSpark:
816       do_the_movespark(event);
817       goto next_thread;             /* handle next event in event queue  */
818       
819     case FindWork:
820       do_the_findwork(event);
821       goto next_thread;             /* handle next event in event queue  */
822       
823     default:
824       barf("Illegal event type %u\n", event->evttype);
825     }  /* switch */
826     
827     /* This point was scheduler_loop in the old RTS */
828
829     IF_DEBUG(gran, belch("GRAN: after main switch"));
830
831     TimeOfLastEvent = CurrentTime[CurrentProc];
832     TimeOfNextEvent = get_time_of_next_event();
833     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
834     // CurrentTSO = ThreadQueueHd;
835
836     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
837                          TimeOfNextEvent));
838
839     if (RtsFlags.GranFlags.Light) 
840       GranSimLight_leave_system(event, &ActiveTSO); 
841
842     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
843
844     IF_DEBUG(gran, 
845              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
846
847     /* in a GranSim setup the TSO stays on the run queue */
848     t = CurrentTSO;
849     /* Take a thread from the run queue. */
850     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
851
852     IF_DEBUG(gran, 
853              fprintf(stderr, "GRAN: About to run current thread, which is\n");
854              G_TSO(t,5));
855
856     context_switch = 0; // turned on via GranYield, checking events and time slice
857
858     IF_DEBUG(gran, 
859              DumpGranEvent(GR_SCHEDULE, t));
860
861     procStatus[CurrentProc] = Busy;
862
863 #elif defined(PAR)
864     if (PendingFetches != END_BF_QUEUE) {
865         processFetches();
866     }
867
868     /* ToDo: phps merge with spark activation above */
869     /* check whether we have local work and send requests if we have none */
870     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
871       /* :-[  no local threads => look out for local sparks */
872       /* the spark pool for the current PE */
873       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
874       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
875           pool->hd < pool->tl) {
876         /* 
877          * ToDo: add GC code check that we really have enough heap afterwards!!
878          * Old comment:
879          * If we're here (no runnable threads) and we have pending
880          * sparks, we must have a space problem.  Get enough space
881          * to turn one of those pending sparks into a
882          * thread... 
883          */
884
885         spark = findSpark(rtsFalse);                /* get a spark */
886         if (spark != (rtsSpark) NULL) {
887           tso = activateSpark(spark);       /* turn the spark into a thread */
888           IF_PAR_DEBUG(schedule,
889                        belch("==== schedule: Created TSO %d (%p); %d threads active",
890                              tso->id, tso, advisory_thread_count));
891
892           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
893             belch("==^^ failed to activate spark");
894             goto next_thread;
895           }               /* otherwise fall through & pick-up new tso */
896         } else {
897           IF_PAR_DEBUG(verbose,
898                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
899                              spark_queue_len(pool)));
900           goto next_thread;
901         }
902       }
903
904       /* If we still have no work we need to send a FISH to get a spark
905          from another PE 
906       */
907       if (EMPTY_RUN_QUEUE()) {
908       /* =8-[  no local sparks => look for work on other PEs */
909         /*
910          * We really have absolutely no work.  Send out a fish
911          * (there may be some out there already), and wait for
912          * something to arrive.  We clearly can't run any threads
913          * until a SCHEDULE or RESUME arrives, and so that's what
914          * we're hoping to see.  (Of course, we still have to
915          * respond to other types of messages.)
916          */
917         TIME now = msTime() /*CURRENT_TIME*/;
918         IF_PAR_DEBUG(verbose, 
919                      belch("--  now=%ld", now));
920         IF_PAR_DEBUG(verbose,
921                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
922                          (last_fish_arrived_at!=0 &&
923                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
924                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
925                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
926                              last_fish_arrived_at,
927                              RtsFlags.ParFlags.fishDelay, now);
928                      });
929         
930         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
931             (last_fish_arrived_at==0 ||
932              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
933           /* outstandingFishes is set in sendFish, processFish;
934              avoid flooding system with fishes via delay */
935           pe = choosePE();
936           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
937                    NEW_FISH_HUNGER);
938
939           // Global statistics: count no. of fishes
940           if (RtsFlags.ParFlags.ParStats.Global &&
941               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
942             globalParStats.tot_fish_mess++;
943           }
944         }
945       
946         receivedFinish = processMessages();
947         goto next_thread;
948       }
949     } else if (PacketsWaiting()) {  /* Look for incoming messages */
950       receivedFinish = processMessages();
951     }
952
953     /* Now we are sure that we have some work available */
954     ASSERT(run_queue_hd != END_TSO_QUEUE);
955
956     /* Take a thread from the run queue, if we have work */
957     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
958     IF_DEBUG(sanity,checkTSO(t));
959
960     /* ToDo: write something to the log-file
961     if (RTSflags.ParFlags.granSimStats && !sameThread)
962         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
963
964     CurrentTSO = t;
965     */
966     /* the spark pool for the current PE */
967     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
968
969     IF_DEBUG(scheduler, 
970              belch("--=^ %d threads, %d sparks on [%#x]", 
971                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
972
973 # if 1
974     if (0 && RtsFlags.ParFlags.ParStats.Full && 
975         t && LastTSO && t->id != LastTSO->id && 
976         LastTSO->why_blocked == NotBlocked && 
977         LastTSO->what_next != ThreadComplete) {
978       // if previously scheduled TSO not blocked we have to record the context switch
979       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
980                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
981     }
982
983     if (RtsFlags.ParFlags.ParStats.Full && 
984         (emitSchedule /* forced emit */ ||
985         (t && LastTSO && t->id != LastTSO->id))) {
986       /* 
987          we are running a different TSO, so write a schedule event to log file
988          NB: If we use fair scheduling we also have to write  a deschedule 
989              event for LastTSO; with unfair scheduling we know that the
990              previous tso has blocked whenever we switch to another tso, so
991              we don't need it in GUM for now
992       */
993       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
994                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
995       emitSchedule = rtsFalse;
996     }
997      
998 # endif
999 #else /* !GRAN && !PAR */
1000   
1001     /* grab a thread from the run queue */
1002     ASSERT(run_queue_hd != END_TSO_QUEUE);
1003     t = POP_RUN_QUEUE();
1004     // Sanity check the thread we're about to run.  This can be
1005     // expensive if there is lots of thread switching going on...
1006     IF_DEBUG(sanity,checkTSO(t));
1007 #endif
1008     
1009     cap->r.rCurrentTSO = t;
1010     
1011     /* context switches are now initiated by the timer signal, unless
1012      * the user specified "context switch as often as possible", with
1013      * +RTS -C0
1014      */
1015     if (
1016 #ifdef PROFILING
1017         RtsFlags.ProfFlags.profileInterval == 0 ||
1018 #endif
1019         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1020          && (run_queue_hd != END_TSO_QUEUE
1021              || blocked_queue_hd != END_TSO_QUEUE
1022              || sleeping_queue != END_TSO_QUEUE)))
1023         context_switch = 1;
1024     else
1025         context_switch = 0;
1026
1027     RELEASE_LOCK(&sched_mutex);
1028
1029     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1030                               t->id, t, whatNext_strs[t->what_next]));
1031
1032 #ifdef PROFILING
1033     startHeapProfTimer();
1034 #endif
1035
1036     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1037     /* Run the current thread 
1038      */
1039     switch (cap->r.rCurrentTSO->what_next) {
1040     case ThreadKilled:
1041     case ThreadComplete:
1042         /* Thread already finished, return to scheduler. */
1043         ret = ThreadFinished;
1044         break;
1045     case ThreadEnterGHC:
1046         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1047         break;
1048     case ThreadRunGHC:
1049         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1050         break;
1051     case ThreadEnterInterp:
1052         ret = interpretBCO(cap);
1053         break;
1054     default:
1055       barf("schedule: invalid what_next field");
1056     }
1057     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1058     
1059     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1060 #ifdef PROFILING
1061     stopHeapProfTimer();
1062     CCCS = CCS_SYSTEM;
1063 #endif
1064     
1065     ACQUIRE_LOCK(&sched_mutex);
1066
1067 #ifdef SMP
1068     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1069 #elif !defined(GRAN) && !defined(PAR)
1070     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1071 #endif
1072     t = cap->r.rCurrentTSO;
1073     
1074 #if defined(PAR)
1075     /* HACK 675: if the last thread didn't yield, make sure to print a 
1076        SCHEDULE event to the log file when StgRunning the next thread, even
1077        if it is the same one as before */
1078     LastTSO = t; 
1079     TimeOfLastYield = CURRENT_TIME;
1080 #endif
1081
1082     switch (ret) {
1083     case HeapOverflow:
1084 #if defined(GRAN)
1085       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1086       globalGranStats.tot_heapover++;
1087 #elif defined(PAR)
1088       globalParStats.tot_heapover++;
1089 #endif
1090
1091       // did the task ask for a large block?
1092       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1093           // if so, get one and push it on the front of the nursery.
1094           bdescr *bd;
1095           nat blocks;
1096           
1097           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1098
1099           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1100                                    t->id, t,
1101                                    whatNext_strs[t->what_next], blocks));
1102
1103           // don't do this if it would push us over the
1104           // alloc_blocks_lim limit; we'll GC first.
1105           if (alloc_blocks + blocks < alloc_blocks_lim) {
1106
1107               alloc_blocks += blocks;
1108               bd = allocGroup( blocks );
1109
1110               // link the new group into the list
1111               bd->link = cap->r.rCurrentNursery;
1112               bd->u.back = cap->r.rCurrentNursery->u.back;
1113               if (cap->r.rCurrentNursery->u.back != NULL) {
1114                   cap->r.rCurrentNursery->u.back->link = bd;
1115               } else {
1116                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1117                          g0s0->blocks == cap->r.rNursery);
1118                   cap->r.rNursery = g0s0->blocks = bd;
1119               }           
1120               cap->r.rCurrentNursery->u.back = bd;
1121
1122               // initialise it as a nursery block
1123               bd->step = g0s0;
1124               bd->gen_no = 0;
1125               bd->flags = 0;
1126               bd->free = bd->start;
1127
1128               // don't forget to update the block count in g0s0.
1129               g0s0->n_blocks += blocks;
1130               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1131
1132               // now update the nursery to point to the new block
1133               cap->r.rCurrentNursery = bd;
1134
1135               // we might be unlucky and have another thread get on the
1136               // run queue before us and steal the large block, but in that
1137               // case the thread will just end up requesting another large
1138               // block.
1139               PUSH_ON_RUN_QUEUE(t);
1140               break;
1141           }
1142       }
1143
1144       /* make all the running tasks block on a condition variable,
1145        * maybe set context_switch and wait till they all pile in,
1146        * then have them wait on a GC condition variable.
1147        */
1148       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1149                                t->id, t, whatNext_strs[t->what_next]));
1150       threadPaused(t);
1151 #if defined(GRAN)
1152       ASSERT(!is_on_queue(t,CurrentProc));
1153 #elif defined(PAR)
1154       /* Currently we emit a DESCHEDULE event before GC in GUM.
1155          ToDo: either add separate event to distinguish SYSTEM time from rest
1156                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1157       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1158         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1159                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1160         emitSchedule = rtsTrue;
1161       }
1162 #endif
1163       
1164       ready_to_gc = rtsTrue;
1165       context_switch = 1;               /* stop other threads ASAP */
1166       PUSH_ON_RUN_QUEUE(t);
1167       /* actual GC is done at the end of the while loop */
1168       break;
1169       
1170     case StackOverflow:
1171 #if defined(GRAN)
1172       IF_DEBUG(gran, 
1173                DumpGranEvent(GR_DESCHEDULE, t));
1174       globalGranStats.tot_stackover++;
1175 #elif defined(PAR)
1176       // IF_DEBUG(par, 
1177       // DumpGranEvent(GR_DESCHEDULE, t);
1178       globalParStats.tot_stackover++;
1179 #endif
1180       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1181                                t->id, t, whatNext_strs[t->what_next]));
1182       /* just adjust the stack for this thread, then pop it back
1183        * on the run queue.
1184        */
1185       threadPaused(t);
1186       { 
1187         StgMainThread *m;
1188         /* enlarge the stack */
1189         StgTSO *new_t = threadStackOverflow(t);
1190         
1191         /* This TSO has moved, so update any pointers to it from the
1192          * main thread stack.  It better not be on any other queues...
1193          * (it shouldn't be).
1194          */
1195         for (m = main_threads; m != NULL; m = m->link) {
1196           if (m->tso == t) {
1197             m->tso = new_t;
1198           }
1199         }
1200         threadPaused(new_t);
1201         PUSH_ON_RUN_QUEUE(new_t);
1202       }
1203       break;
1204
1205     case ThreadYielding:
1206 #if defined(GRAN)
1207       IF_DEBUG(gran, 
1208                DumpGranEvent(GR_DESCHEDULE, t));
1209       globalGranStats.tot_yields++;
1210 #elif defined(PAR)
1211       // IF_DEBUG(par, 
1212       // DumpGranEvent(GR_DESCHEDULE, t);
1213       globalParStats.tot_yields++;
1214 #endif
1215       /* put the thread back on the run queue.  Then, if we're ready to
1216        * GC, check whether this is the last task to stop.  If so, wake
1217        * up the GC thread.  getThread will block during a GC until the
1218        * GC is finished.
1219        */
1220       IF_DEBUG(scheduler,
1221                if (t->what_next == ThreadEnterInterp) {
1222                    /* ToDo: or maybe a timer expired when we were in Hugs?
1223                     * or maybe someone hit ctrl-C
1224                     */
1225                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1226                          t->id, t, whatNext_strs[t->what_next]);
1227                } else {
1228                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1229                          t->id, t, whatNext_strs[t->what_next]);
1230                }
1231                );
1232
1233       threadPaused(t);
1234
1235       IF_DEBUG(sanity,
1236                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1237                checkTSO(t));
1238       ASSERT(t->link == END_TSO_QUEUE);
1239 #if defined(GRAN)
1240       ASSERT(!is_on_queue(t,CurrentProc));
1241
1242       IF_DEBUG(sanity,
1243                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1244                checkThreadQsSanity(rtsTrue));
1245 #endif
1246 #if defined(PAR)
1247       if (RtsFlags.ParFlags.doFairScheduling) { 
1248         /* this does round-robin scheduling; good for concurrency */
1249         APPEND_TO_RUN_QUEUE(t);
1250       } else {
1251         /* this does unfair scheduling; good for parallelism */
1252         PUSH_ON_RUN_QUEUE(t);
1253       }
1254 #else
1255       /* this does round-robin scheduling; good for concurrency */
1256       APPEND_TO_RUN_QUEUE(t);
1257 #endif
1258 #if defined(GRAN)
1259       /* add a ContinueThread event to actually process the thread */
1260       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1261                 ContinueThread,
1262                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1263       IF_GRAN_DEBUG(bq, 
1264                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1265                G_EVENTQ(0);
1266                G_CURR_THREADQ(0));
1267 #endif /* GRAN */
1268       break;
1269       
1270     case ThreadBlocked:
1271 #if defined(GRAN)
1272       IF_DEBUG(scheduler,
1273                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1274                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1275                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1276
1277       // ??? needed; should emit block before
1278       IF_DEBUG(gran, 
1279                DumpGranEvent(GR_DESCHEDULE, t)); 
1280       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1281       /*
1282         ngoq Dogh!
1283       ASSERT(procStatus[CurrentProc]==Busy || 
1284               ((procStatus[CurrentProc]==Fetching) && 
1285               (t->block_info.closure!=(StgClosure*)NULL)));
1286       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1287           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1288             procStatus[CurrentProc]==Fetching)) 
1289         procStatus[CurrentProc] = Idle;
1290       */
1291 #elif defined(PAR)
1292       IF_DEBUG(scheduler,
1293                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1294                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1295       IF_PAR_DEBUG(bq,
1296
1297                    if (t->block_info.closure!=(StgClosure*)NULL) 
1298                      print_bq(t->block_info.closure));
1299
1300       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1301       blockThread(t);
1302
1303       /* whatever we schedule next, we must log that schedule */
1304       emitSchedule = rtsTrue;
1305
1306 #else /* !GRAN */
1307       /* don't need to do anything.  Either the thread is blocked on
1308        * I/O, in which case we'll have called addToBlockedQueue
1309        * previously, or it's blocked on an MVar or Blackhole, in which
1310        * case it'll be on the relevant queue already.
1311        */
1312       IF_DEBUG(scheduler,
1313                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1314                printThreadBlockage(t);
1315                fprintf(stderr, "\n"));
1316
1317       /* Only for dumping event to log file 
1318          ToDo: do I need this in GranSim, too?
1319       blockThread(t);
1320       */
1321 #endif
1322       threadPaused(t);
1323       break;
1324       
1325     case ThreadFinished:
1326       /* Need to check whether this was a main thread, and if so, signal
1327        * the task that started it with the return value.  If we have no
1328        * more main threads, we probably need to stop all the tasks until
1329        * we get a new one.
1330        */
1331       /* We also end up here if the thread kills itself with an
1332        * uncaught exception, see Exception.hc.
1333        */
1334       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1335 #if defined(GRAN)
1336       endThread(t, CurrentProc); // clean-up the thread
1337 #elif defined(PAR)
1338       /* For now all are advisory -- HWL */
1339       //if(t->priority==AdvisoryPriority) ??
1340       advisory_thread_count--;
1341       
1342 # ifdef DIST
1343       if(t->dist.priority==RevalPriority)
1344         FinishReval(t);
1345 # endif
1346       
1347       if (RtsFlags.ParFlags.ParStats.Full &&
1348           !RtsFlags.ParFlags.ParStats.Suppressed) 
1349         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1350 #endif
1351       break;
1352       
1353     default:
1354       barf("schedule: invalid thread return code %d", (int)ret);
1355     }
1356
1357 #ifdef PROFILING
1358     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1359         GarbageCollect(GetRoots, rtsTrue);
1360         heapCensus();
1361         performHeapProfile = rtsFalse;
1362         ready_to_gc = rtsFalse; // we already GC'd
1363     }
1364 #endif
1365
1366     if (ready_to_gc 
1367 #ifdef SMP
1368         && allFreeCapabilities() 
1369 #endif
1370         ) {
1371       /* everybody back, start the GC.
1372        * Could do it in this thread, or signal a condition var
1373        * to do it in another thread.  Either way, we need to
1374        * broadcast on gc_pending_cond afterward.
1375        */
1376 #if defined(RTS_SUPPORTS_THREADS)
1377       IF_DEBUG(scheduler,sched_belch("doing GC"));
1378 #endif
1379       GarbageCollect(GetRoots,rtsFalse);
1380       ready_to_gc = rtsFalse;
1381 #ifdef SMP
1382       broadcastCondition(&gc_pending_cond);
1383 #endif
1384 #if defined(GRAN)
1385       /* add a ContinueThread event to continue execution of current thread */
1386       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1387                 ContinueThread,
1388                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1389       IF_GRAN_DEBUG(bq, 
1390                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1391                G_EVENTQ(0);
1392                G_CURR_THREADQ(0));
1393 #endif /* GRAN */
1394     }
1395
1396 #if defined(GRAN)
1397   next_thread:
1398     IF_GRAN_DEBUG(unused,
1399                   print_eventq(EventHd));
1400
1401     event = get_next_event();
1402 #elif defined(PAR)
1403   next_thread:
1404     /* ToDo: wait for next message to arrive rather than busy wait */
1405 #endif /* GRAN */
1406
1407   } /* end of while(1) */
1408
1409   IF_PAR_DEBUG(verbose,
1410                belch("== Leaving schedule() after having received Finish"));
1411 }
1412
1413 /* ---------------------------------------------------------------------------
1414  * Singleton fork(). Do not copy any running threads.
1415  * ------------------------------------------------------------------------- */
1416
1417 StgInt forkProcess(StgTSO* tso) {
1418
1419 #ifndef mingw32_TARGET_OS
1420   pid_t pid;
1421   StgTSO* t,*next;
1422
1423   IF_DEBUG(scheduler,sched_belch("forking!"));
1424
1425   pid = fork();
1426   if (pid) { /* parent */
1427
1428   /* just return the pid */
1429     
1430   } else { /* child */
1431   /* wipe all other threads */
1432   run_queue_hd = tso;
1433   tso->link = END_TSO_QUEUE;
1434
1435   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1436      us is picky about finding the threat still in its queue when
1437      handling the deleteThread() */
1438
1439   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1440     next = t->link;
1441     if (t->id != tso->id) {
1442       deleteThread(t);
1443     }
1444   }
1445   }
1446   return pid;
1447 #else /* mingw32 */
1448   barf("forkProcess#: primop not implemented for mingw32, sorry!");
1449   return -1;
1450 #endif /* mingw32 */
1451 }
1452
1453 /* ---------------------------------------------------------------------------
1454  * deleteAllThreads():  kill all the live threads.
1455  *
1456  * This is used when we catch a user interrupt (^C), before performing
1457  * any necessary cleanups and running finalizers.
1458  *
1459  * Locks: sched_mutex held.
1460  * ------------------------------------------------------------------------- */
1461    
1462 void deleteAllThreads ( void )
1463 {
1464   StgTSO* t, *next;
1465   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1466   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1467       next = t->global_link;
1468       deleteThread(t);
1469   }      
1470   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1471   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1472   sleeping_queue = END_TSO_QUEUE;
1473 }
1474
1475 /* startThread and  insertThread are now in GranSim.c -- HWL */
1476
1477
1478 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1479 //@subsection Suspend and Resume
1480
1481 /* ---------------------------------------------------------------------------
1482  * Suspending & resuming Haskell threads.
1483  * 
1484  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1485  * its capability before calling the C function.  This allows another
1486  * task to pick up the capability and carry on running Haskell
1487  * threads.  It also means that if the C call blocks, it won't lock
1488  * the whole system.
1489  *
1490  * The Haskell thread making the C call is put to sleep for the
1491  * duration of the call, on the susepended_ccalling_threads queue.  We
1492  * give out a token to the task, which it can use to resume the thread
1493  * on return from the C function.
1494  * ------------------------------------------------------------------------- */
1495    
1496 StgInt
1497 suspendThread( StgRegTable *reg, 
1498                rtsBool concCall
1499 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1500                STG_UNUSED
1501 #endif
1502                )
1503 {
1504   nat tok;
1505   Capability *cap;
1506
1507   /* assume that *reg is a pointer to the StgRegTable part
1508    * of a Capability.
1509    */
1510   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1511
1512   ACQUIRE_LOCK(&sched_mutex);
1513
1514   IF_DEBUG(scheduler,
1515            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1516
1517   threadPaused(cap->r.rCurrentTSO);
1518   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1519   suspended_ccalling_threads = cap->r.rCurrentTSO;
1520
1521 #if defined(RTS_SUPPORTS_THREADS)
1522   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1523 #endif
1524
1525   /* Use the thread ID as the token; it should be unique */
1526   tok = cap->r.rCurrentTSO->id;
1527
1528   /* Hand back capability */
1529   releaseCapability(cap);
1530   
1531 #if defined(RTS_SUPPORTS_THREADS)
1532   /* Preparing to leave the RTS, so ensure there's a native thread/task
1533      waiting to take over.
1534      
1535      ToDo: optimise this and only create a new task if there's a need
1536      for one (i.e., if there's only one Concurrent Haskell thread alive,
1537      there's no need to create a new task).
1538   */
1539   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1540   if (concCall) {
1541     startTask(taskStart);
1542   }
1543 #endif
1544
1545   /* Other threads _might_ be available for execution; signal this */
1546   THREAD_RUNNABLE();
1547   RELEASE_LOCK(&sched_mutex);
1548   return tok; 
1549 }
1550
1551 StgRegTable *
1552 resumeThread( StgInt tok,
1553               rtsBool concCall
1554 #if !defined(RTS_SUPPORTS_THREADS)
1555                STG_UNUSED
1556 #endif
1557               )
1558 {
1559   StgTSO *tso, **prev;
1560   Capability *cap;
1561
1562 #if defined(RTS_SUPPORTS_THREADS)
1563   /* Wait for permission to re-enter the RTS with the result. */
1564   if ( concCall ) {
1565     ACQUIRE_LOCK(&sched_mutex);
1566     grabReturnCapability(&sched_mutex, &cap);
1567   } else {
1568     grabCapability(&cap);
1569   }
1570 #else
1571   grabCapability(&cap);
1572 #endif
1573
1574   /* Remove the thread off of the suspended list */
1575   prev = &suspended_ccalling_threads;
1576   for (tso = suspended_ccalling_threads; 
1577        tso != END_TSO_QUEUE; 
1578        prev = &tso->link, tso = tso->link) {
1579     if (tso->id == (StgThreadID)tok) {
1580       *prev = tso->link;
1581       break;
1582     }
1583   }
1584   if (tso == END_TSO_QUEUE) {
1585     barf("resumeThread: thread not found");
1586   }
1587   tso->link = END_TSO_QUEUE;
1588   /* Reset blocking status */
1589   tso->why_blocked  = NotBlocked;
1590
1591   cap->r.rCurrentTSO = tso;
1592   RELEASE_LOCK(&sched_mutex);
1593   return &cap->r;
1594 }
1595
1596
1597 /* ---------------------------------------------------------------------------
1598  * Static functions
1599  * ------------------------------------------------------------------------ */
1600 static void unblockThread(StgTSO *tso);
1601
1602 /* ---------------------------------------------------------------------------
1603  * Comparing Thread ids.
1604  *
1605  * This is used from STG land in the implementation of the
1606  * instances of Eq/Ord for ThreadIds.
1607  * ------------------------------------------------------------------------ */
1608
1609 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1610
1611   StgThreadID id1 = tso1->id; 
1612   StgThreadID id2 = tso2->id;
1613  
1614   if (id1 < id2) return (-1);
1615   if (id1 > id2) return 1;
1616   return 0;
1617 }
1618
1619 /* ---------------------------------------------------------------------------
1620  * Fetching the ThreadID from an StgTSO.
1621  *
1622  * This is used in the implementation of Show for ThreadIds.
1623  * ------------------------------------------------------------------------ */
1624 int rts_getThreadId(const StgTSO *tso) 
1625 {
1626   return tso->id;
1627 }
1628
1629 #ifdef DEBUG
1630 void labelThread(StgTSO *tso, char *label)
1631 {
1632   int len;
1633   void *buf;
1634
1635   /* Caveat: Once set, you can only set the thread name to "" */
1636   len = strlen(label)+1;
1637   buf = realloc(tso->label,len);
1638   if (buf == NULL) {
1639     fprintf(stderr,"insufficient memory for labelThread!\n");
1640     free(tso->label);
1641     tso->label = NULL;
1642   } else
1643     strncpy(buf,label,len);
1644   tso->label = buf;
1645 }
1646 #endif /* DEBUG */
1647
1648 /* ---------------------------------------------------------------------------
1649    Create a new thread.
1650
1651    The new thread starts with the given stack size.  Before the
1652    scheduler can run, however, this thread needs to have a closure
1653    (and possibly some arguments) pushed on its stack.  See
1654    pushClosure() in Schedule.h.
1655
1656    createGenThread() and createIOThread() (in SchedAPI.h) are
1657    convenient packaged versions of this function.
1658
1659    currently pri (priority) is only used in a GRAN setup -- HWL
1660    ------------------------------------------------------------------------ */
1661 //@cindex createThread
1662 #if defined(GRAN)
1663 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1664 StgTSO *
1665 createThread(nat stack_size, StgInt pri)
1666 {
1667   return createThread_(stack_size, rtsFalse, pri);
1668 }
1669
1670 static StgTSO *
1671 createThread_(nat size, rtsBool have_lock, StgInt pri)
1672 {
1673 #else
1674 StgTSO *
1675 createThread(nat stack_size)
1676 {
1677   return createThread_(stack_size, rtsFalse);
1678 }
1679
1680 static StgTSO *
1681 createThread_(nat size, rtsBool have_lock)
1682 {
1683 #endif
1684
1685     StgTSO *tso;
1686     nat stack_size;
1687
1688     /* First check whether we should create a thread at all */
1689 #if defined(PAR)
1690   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1691   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1692     threadsIgnored++;
1693     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1694           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1695     return END_TSO_QUEUE;
1696   }
1697   threadsCreated++;
1698 #endif
1699
1700 #if defined(GRAN)
1701   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1702 #endif
1703
1704   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1705
1706   /* catch ridiculously small stack sizes */
1707   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1708     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1709   }
1710
1711   stack_size = size - TSO_STRUCT_SIZEW;
1712
1713   tso = (StgTSO *)allocate(size);
1714   TICK_ALLOC_TSO(stack_size, 0);
1715
1716   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1717 #if defined(GRAN)
1718   SET_GRAN_HDR(tso, ThisPE);
1719 #endif
1720   tso->what_next     = ThreadEnterGHC;
1721
1722 #ifdef DEBUG
1723   tso->label = NULL;
1724 #endif
1725
1726   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1727    * protect the increment operation on next_thread_id.
1728    * In future, we could use an atomic increment instead.
1729    */
1730 #ifdef SMP
1731   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1732 #endif
1733   tso->id = next_thread_id++; 
1734 #ifdef SMP
1735   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1736 #endif
1737
1738   tso->why_blocked  = NotBlocked;
1739   tso->blocked_exceptions = NULL;
1740
1741   tso->stack_size   = stack_size;
1742   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1743                               - TSO_STRUCT_SIZEW;
1744   tso->sp           = (P_)&(tso->stack) + stack_size;
1745
1746 #ifdef PROFILING
1747   tso->prof.CCCS = CCS_MAIN;
1748 #endif
1749
1750   /* put a stop frame on the stack */
1751   tso->sp -= sizeofW(StgStopFrame);
1752   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1753   tso->su = (StgUpdateFrame*)tso->sp;
1754
1755   // ToDo: check this
1756 #if defined(GRAN)
1757   tso->link = END_TSO_QUEUE;
1758   /* uses more flexible routine in GranSim */
1759   insertThread(tso, CurrentProc);
1760 #else
1761   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1762    * from its creation
1763    */
1764 #endif
1765
1766 #if defined(GRAN) 
1767   if (RtsFlags.GranFlags.GranSimStats.Full) 
1768     DumpGranEvent(GR_START,tso);
1769 #elif defined(PAR)
1770   if (RtsFlags.ParFlags.ParStats.Full) 
1771     DumpGranEvent(GR_STARTQ,tso);
1772   /* HACk to avoid SCHEDULE 
1773      LastTSO = tso; */
1774 #endif
1775
1776   /* Link the new thread on the global thread list.
1777    */
1778   tso->global_link = all_threads;
1779   all_threads = tso;
1780
1781 #if defined(DIST)
1782   tso->dist.priority = MandatoryPriority; //by default that is...
1783 #endif
1784
1785 #if defined(GRAN)
1786   tso->gran.pri = pri;
1787 # if defined(DEBUG)
1788   tso->gran.magic = TSO_MAGIC; // debugging only
1789 # endif
1790   tso->gran.sparkname   = 0;
1791   tso->gran.startedat   = CURRENT_TIME; 
1792   tso->gran.exported    = 0;
1793   tso->gran.basicblocks = 0;
1794   tso->gran.allocs      = 0;
1795   tso->gran.exectime    = 0;
1796   tso->gran.fetchtime   = 0;
1797   tso->gran.fetchcount  = 0;
1798   tso->gran.blocktime   = 0;
1799   tso->gran.blockcount  = 0;
1800   tso->gran.blockedat   = 0;
1801   tso->gran.globalsparks = 0;
1802   tso->gran.localsparks  = 0;
1803   if (RtsFlags.GranFlags.Light)
1804     tso->gran.clock  = Now; /* local clock */
1805   else
1806     tso->gran.clock  = 0;
1807
1808   IF_DEBUG(gran,printTSO(tso));
1809 #elif defined(PAR)
1810 # if defined(DEBUG)
1811   tso->par.magic = TSO_MAGIC; // debugging only
1812 # endif
1813   tso->par.sparkname   = 0;
1814   tso->par.startedat   = CURRENT_TIME; 
1815   tso->par.exported    = 0;
1816   tso->par.basicblocks = 0;
1817   tso->par.allocs      = 0;
1818   tso->par.exectime    = 0;
1819   tso->par.fetchtime   = 0;
1820   tso->par.fetchcount  = 0;
1821   tso->par.blocktime   = 0;
1822   tso->par.blockcount  = 0;
1823   tso->par.blockedat   = 0;
1824   tso->par.globalsparks = 0;
1825   tso->par.localsparks  = 0;
1826 #endif
1827
1828 #if defined(GRAN)
1829   globalGranStats.tot_threads_created++;
1830   globalGranStats.threads_created_on_PE[CurrentProc]++;
1831   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1832   globalGranStats.tot_sq_probes++;
1833 #elif defined(PAR)
1834   // collect parallel global statistics (currently done together with GC stats)
1835   if (RtsFlags.ParFlags.ParStats.Global &&
1836       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1837     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1838     globalParStats.tot_threads_created++;
1839   }
1840 #endif 
1841
1842 #if defined(GRAN)
1843   IF_GRAN_DEBUG(pri,
1844                 belch("==__ schedule: Created TSO %d (%p);",
1845                       CurrentProc, tso, tso->id));
1846 #elif defined(PAR)
1847     IF_PAR_DEBUG(verbose,
1848                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1849                        tso->id, tso, advisory_thread_count));
1850 #else
1851   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1852                                  tso->id, tso->stack_size));
1853 #endif    
1854   return tso;
1855 }
1856
1857 #if defined(PAR)
1858 /* RFP:
1859    all parallel thread creation calls should fall through the following routine.
1860 */
1861 StgTSO *
1862 createSparkThread(rtsSpark spark) 
1863 { StgTSO *tso;
1864   ASSERT(spark != (rtsSpark)NULL);
1865   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1866   { threadsIgnored++;
1867     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1868           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1869     return END_TSO_QUEUE;
1870   }
1871   else
1872   { threadsCreated++;
1873     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1874     if (tso==END_TSO_QUEUE)     
1875       barf("createSparkThread: Cannot create TSO");
1876 #if defined(DIST)
1877     tso->priority = AdvisoryPriority;
1878 #endif
1879     pushClosure(tso,spark);
1880     PUSH_ON_RUN_QUEUE(tso);
1881     advisory_thread_count++;    
1882   }
1883   return tso;
1884 }
1885 #endif
1886
1887 /*
1888   Turn a spark into a thread.
1889   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1890 */
1891 #if defined(PAR)
1892 //@cindex activateSpark
1893 StgTSO *
1894 activateSpark (rtsSpark spark) 
1895 {
1896   StgTSO *tso;
1897
1898   tso = createSparkThread(spark);
1899   if (RtsFlags.ParFlags.ParStats.Full) {   
1900     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1901     IF_PAR_DEBUG(verbose,
1902                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1903                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1904   }
1905   // ToDo: fwd info on local/global spark to thread -- HWL
1906   // tso->gran.exported =  spark->exported;
1907   // tso->gran.locked =   !spark->global;
1908   // tso->gran.sparkname = spark->name;
1909
1910   return tso;
1911 }
1912 #endif
1913
1914 /* ---------------------------------------------------------------------------
1915  * scheduleThread()
1916  *
1917  * scheduleThread puts a thread on the head of the runnable queue.
1918  * This will usually be done immediately after a thread is created.
1919  * The caller of scheduleThread must create the thread using e.g.
1920  * createThread and push an appropriate closure
1921  * on this thread's stack before the scheduler is invoked.
1922  * ------------------------------------------------------------------------ */
1923
1924 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1925
1926 void
1927 scheduleThread_(StgTSO *tso
1928                , rtsBool createTask
1929 #if !defined(THREADED_RTS)
1930                  STG_UNUSED
1931 #endif
1932               )
1933 {
1934   ACQUIRE_LOCK(&sched_mutex);
1935
1936   /* Put the new thread on the head of the runnable queue.  The caller
1937    * better push an appropriate closure on this thread's stack
1938    * beforehand.  In the SMP case, the thread may start running as
1939    * soon as we release the scheduler lock below.
1940    */
1941   PUSH_ON_RUN_QUEUE(tso);
1942 #if defined(THREADED_RTS)
1943   /* If main() is scheduling a thread, don't bother creating a 
1944    * new task.
1945    */
1946   if ( createTask ) {
1947     startTask(taskStart);
1948   }
1949 #endif
1950   THREAD_RUNNABLE();
1951
1952 #if 0
1953   IF_DEBUG(scheduler,printTSO(tso));
1954 #endif
1955   RELEASE_LOCK(&sched_mutex);
1956 }
1957
1958 void scheduleThread(StgTSO* tso)
1959 {
1960   return scheduleThread_(tso, rtsFalse);
1961 }
1962
1963 void scheduleExtThread(StgTSO* tso)
1964 {
1965   return scheduleThread_(tso, rtsTrue);
1966 }
1967
1968 /* ---------------------------------------------------------------------------
1969  * initScheduler()
1970  *
1971  * Initialise the scheduler.  This resets all the queues - if the
1972  * queues contained any threads, they'll be garbage collected at the
1973  * next pass.
1974  *
1975  * ------------------------------------------------------------------------ */
1976
1977 #ifdef SMP
1978 static void
1979 term_handler(int sig STG_UNUSED)
1980 {
1981   stat_workerStop();
1982   ACQUIRE_LOCK(&term_mutex);
1983   await_death--;
1984   RELEASE_LOCK(&term_mutex);
1985   shutdownThread();
1986 }
1987 #endif
1988
1989 void 
1990 initScheduler(void)
1991 {
1992 #if defined(GRAN)
1993   nat i;
1994
1995   for (i=0; i<=MAX_PROC; i++) {
1996     run_queue_hds[i]      = END_TSO_QUEUE;
1997     run_queue_tls[i]      = END_TSO_QUEUE;
1998     blocked_queue_hds[i]  = END_TSO_QUEUE;
1999     blocked_queue_tls[i]  = END_TSO_QUEUE;
2000     ccalling_threadss[i]  = END_TSO_QUEUE;
2001     sleeping_queue        = END_TSO_QUEUE;
2002   }
2003 #else
2004   run_queue_hd      = END_TSO_QUEUE;
2005   run_queue_tl      = END_TSO_QUEUE;
2006   blocked_queue_hd  = END_TSO_QUEUE;
2007   blocked_queue_tl  = END_TSO_QUEUE;
2008   sleeping_queue    = END_TSO_QUEUE;
2009 #endif 
2010
2011   suspended_ccalling_threads  = END_TSO_QUEUE;
2012
2013   main_threads = NULL;
2014   all_threads  = END_TSO_QUEUE;
2015
2016   context_switch = 0;
2017   interrupted    = 0;
2018
2019   RtsFlags.ConcFlags.ctxtSwitchTicks =
2020       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2021       
2022 #if defined(RTS_SUPPORTS_THREADS)
2023   /* Initialise the mutex and condition variables used by
2024    * the scheduler. */
2025   initMutex(&sched_mutex);
2026   initMutex(&term_mutex);
2027
2028   initCondition(&thread_ready_cond);
2029 #endif
2030   
2031 #if defined(SMP)
2032   initCondition(&gc_pending_cond);
2033 #endif
2034
2035 #if defined(RTS_SUPPORTS_THREADS)
2036   ACQUIRE_LOCK(&sched_mutex);
2037 #endif
2038
2039   /* Install the SIGHUP handler */
2040 #if defined(SMP)
2041   {
2042     struct sigaction action,oact;
2043
2044     action.sa_handler = term_handler;
2045     sigemptyset(&action.sa_mask);
2046     action.sa_flags = 0;
2047     if (sigaction(SIGTERM, &action, &oact) != 0) {
2048       barf("can't install TERM handler");
2049     }
2050   }
2051 #endif
2052
2053   /* A capability holds the state a native thread needs in
2054    * order to execute STG code. At least one capability is
2055    * floating around (only SMP builds have more than one).
2056    */
2057   initCapabilities();
2058   
2059 #if defined(RTS_SUPPORTS_THREADS)
2060     /* start our haskell execution tasks */
2061 # if defined(SMP)
2062     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2063 # else
2064     startTaskManager(0,taskStart);
2065 # endif
2066 #endif
2067
2068 #if /* defined(SMP) ||*/ defined(PAR)
2069   initSparkPools();
2070 #endif
2071
2072 #if defined(RTS_SUPPORTS_THREADS)
2073   RELEASE_LOCK(&sched_mutex);
2074 #endif
2075
2076 }
2077
2078 void
2079 exitScheduler( void )
2080 {
2081 #if defined(RTS_SUPPORTS_THREADS)
2082   stopTaskManager();
2083 #endif
2084 }
2085
2086 /* -----------------------------------------------------------------------------
2087    Managing the per-task allocation areas.
2088    
2089    Each capability comes with an allocation area.  These are
2090    fixed-length block lists into which allocation can be done.
2091
2092    ToDo: no support for two-space collection at the moment???
2093    -------------------------------------------------------------------------- */
2094
2095 /* -----------------------------------------------------------------------------
2096  * waitThread is the external interface for running a new computation
2097  * and waiting for the result.
2098  *
2099  * In the non-SMP case, we create a new main thread, push it on the 
2100  * main-thread stack, and invoke the scheduler to run it.  The
2101  * scheduler will return when the top main thread on the stack has
2102  * completed or died, and fill in the necessary fields of the
2103  * main_thread structure.
2104  *
2105  * In the SMP case, we create a main thread as before, but we then
2106  * create a new condition variable and sleep on it.  When our new
2107  * main thread has completed, we'll be woken up and the status/result
2108  * will be in the main_thread struct.
2109  * -------------------------------------------------------------------------- */
2110
2111 int 
2112 howManyThreadsAvail ( void )
2113 {
2114    int i = 0;
2115    StgTSO* q;
2116    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2117       i++;
2118    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2119       i++;
2120    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2121       i++;
2122    return i;
2123 }
2124
2125 void
2126 finishAllThreads ( void )
2127 {
2128    do {
2129       while (run_queue_hd != END_TSO_QUEUE) {
2130          waitThread ( run_queue_hd, NULL);
2131       }
2132       while (blocked_queue_hd != END_TSO_QUEUE) {
2133          waitThread ( blocked_queue_hd, NULL);
2134       }
2135       while (sleeping_queue != END_TSO_QUEUE) {
2136          waitThread ( blocked_queue_hd, NULL);
2137       }
2138    } while 
2139       (blocked_queue_hd != END_TSO_QUEUE || 
2140        run_queue_hd     != END_TSO_QUEUE ||
2141        sleeping_queue   != END_TSO_QUEUE);
2142 }
2143
2144 SchedulerStatus
2145 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2146
2147   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2148 #if defined(THREADED_RTS)
2149   return waitThread_(tso,ret, rtsFalse);
2150 #else
2151   return waitThread_(tso,ret);
2152 #endif
2153 }
2154
2155 SchedulerStatus
2156 waitThread_(StgTSO *tso,
2157             /*out*/StgClosure **ret
2158 #if defined(THREADED_RTS)
2159             , rtsBool blockWaiting
2160 #endif
2161            )
2162 {
2163   StgMainThread *m;
2164   SchedulerStatus stat;
2165
2166   ACQUIRE_LOCK(&sched_mutex);
2167   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2168   
2169   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2170
2171   m->tso = tso;
2172   m->ret = ret;
2173   m->stat = NoStatus;
2174 #if defined(RTS_SUPPORTS_THREADS)
2175   initCondition(&m->wakeup);
2176 #endif
2177
2178   m->link = main_threads;
2179   main_threads = m;
2180
2181   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2182
2183 #if defined(RTS_SUPPORTS_THREADS)
2184
2185 # if defined(THREADED_RTS)
2186   if (!blockWaiting) {
2187     /* In the threaded case, the OS thread that called main()
2188      * gets to enter the RTS directly without going via another
2189      * task/thread.
2190      */
2191     RELEASE_LOCK(&sched_mutex);
2192     schedule();
2193     ASSERT(m->stat != NoStatus);
2194   } else 
2195 # endif
2196   {
2197     IF_DEBUG(scheduler, sched_belch("sfoo"));
2198     do {
2199       waitCondition(&m->wakeup, &sched_mutex);
2200     } while (m->stat == NoStatus);
2201   }
2202 #elif defined(GRAN)
2203   /* GranSim specific init */
2204   CurrentTSO = m->tso;                // the TSO to run
2205   procStatus[MainProc] = Busy;        // status of main PE
2206   CurrentProc = MainProc;             // PE to run it on
2207
2208   schedule();
2209 #else
2210   RELEASE_LOCK(&sched_mutex);
2211   schedule();
2212   ASSERT(m->stat != NoStatus);
2213 #endif
2214
2215   stat = m->stat;
2216
2217 #if defined(RTS_SUPPORTS_THREADS)
2218   closeCondition(&m->wakeup);
2219 #endif
2220
2221   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2222                               m->tso->id));
2223   free(m);
2224
2225 #if defined(THREADED_RTS)
2226   if (blockWaiting) 
2227 #endif
2228     RELEASE_LOCK(&sched_mutex);
2229
2230   return stat;
2231 }
2232
2233 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2234 //@subsection Run queue code 
2235
2236 #if 0
2237 /* 
2238    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2239        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2240        implicit global variable that has to be correct when calling these
2241        fcts -- HWL 
2242 */
2243
2244 /* Put the new thread on the head of the runnable queue.
2245  * The caller of createThread better push an appropriate closure
2246  * on this thread's stack before the scheduler is invoked.
2247  */
2248 static /* inline */ void
2249 add_to_run_queue(tso)
2250 StgTSO* tso; 
2251 {
2252   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2253   tso->link = run_queue_hd;
2254   run_queue_hd = tso;
2255   if (run_queue_tl == END_TSO_QUEUE) {
2256     run_queue_tl = tso;
2257   }
2258 }
2259
2260 /* Put the new thread at the end of the runnable queue. */
2261 static /* inline */ void
2262 push_on_run_queue(tso)
2263 StgTSO* tso; 
2264 {
2265   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2266   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2267   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2268   if (run_queue_hd == END_TSO_QUEUE) {
2269     run_queue_hd = tso;
2270   } else {
2271     run_queue_tl->link = tso;
2272   }
2273   run_queue_tl = tso;
2274 }
2275
2276 /* 
2277    Should be inlined because it's used very often in schedule.  The tso
2278    argument is actually only needed in GranSim, where we want to have the
2279    possibility to schedule *any* TSO on the run queue, irrespective of the
2280    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2281    the run queue and dequeue the tso, adjusting the links in the queue. 
2282 */
2283 //@cindex take_off_run_queue
2284 static /* inline */ StgTSO*
2285 take_off_run_queue(StgTSO *tso) {
2286   StgTSO *t, *prev;
2287
2288   /* 
2289      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2290
2291      if tso is specified, unlink that tso from the run_queue (doesn't have
2292      to be at the beginning of the queue); GranSim only 
2293   */
2294   if (tso!=END_TSO_QUEUE) {
2295     /* find tso in queue */
2296     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2297          t!=END_TSO_QUEUE && t!=tso;
2298          prev=t, t=t->link) 
2299       /* nothing */ ;
2300     ASSERT(t==tso);
2301     /* now actually dequeue the tso */
2302     if (prev!=END_TSO_QUEUE) {
2303       ASSERT(run_queue_hd!=t);
2304       prev->link = t->link;
2305     } else {
2306       /* t is at beginning of thread queue */
2307       ASSERT(run_queue_hd==t);
2308       run_queue_hd = t->link;
2309     }
2310     /* t is at end of thread queue */
2311     if (t->link==END_TSO_QUEUE) {
2312       ASSERT(t==run_queue_tl);
2313       run_queue_tl = prev;
2314     } else {
2315       ASSERT(run_queue_tl!=t);
2316     }
2317     t->link = END_TSO_QUEUE;
2318   } else {
2319     /* take tso from the beginning of the queue; std concurrent code */
2320     t = run_queue_hd;
2321     if (t != END_TSO_QUEUE) {
2322       run_queue_hd = t->link;
2323       t->link = END_TSO_QUEUE;
2324       if (run_queue_hd == END_TSO_QUEUE) {
2325         run_queue_tl = END_TSO_QUEUE;
2326       }
2327     }
2328   }
2329   return t;
2330 }
2331
2332 #endif /* 0 */
2333
2334 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2335 //@subsection Garbage Collextion Routines
2336
2337 /* ---------------------------------------------------------------------------
2338    Where are the roots that we know about?
2339
2340         - all the threads on the runnable queue
2341         - all the threads on the blocked queue
2342         - all the threads on the sleeping queue
2343         - all the thread currently executing a _ccall_GC
2344         - all the "main threads"
2345      
2346    ------------------------------------------------------------------------ */
2347
2348 /* This has to be protected either by the scheduler monitor, or by the
2349         garbage collection monitor (probably the latter).
2350         KH @ 25/10/99
2351 */
2352
2353 void
2354 GetRoots(evac_fn evac)
2355 {
2356 #if defined(GRAN)
2357   {
2358     nat i;
2359     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2360       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2361           evac((StgClosure **)&run_queue_hds[i]);
2362       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2363           evac((StgClosure **)&run_queue_tls[i]);
2364       
2365       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2366           evac((StgClosure **)&blocked_queue_hds[i]);
2367       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2368           evac((StgClosure **)&blocked_queue_tls[i]);
2369       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2370           evac((StgClosure **)&ccalling_threads[i]);
2371     }
2372   }
2373
2374   markEventQueue();
2375
2376 #else /* !GRAN */
2377   if (run_queue_hd != END_TSO_QUEUE) {
2378       ASSERT(run_queue_tl != END_TSO_QUEUE);
2379       evac((StgClosure **)&run_queue_hd);
2380       evac((StgClosure **)&run_queue_tl);
2381   }
2382   
2383   if (blocked_queue_hd != END_TSO_QUEUE) {
2384       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2385       evac((StgClosure **)&blocked_queue_hd);
2386       evac((StgClosure **)&blocked_queue_tl);
2387   }
2388   
2389   if (sleeping_queue != END_TSO_QUEUE) {
2390       evac((StgClosure **)&sleeping_queue);
2391   }
2392 #endif 
2393
2394   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2395       evac((StgClosure **)&suspended_ccalling_threads);
2396   }
2397
2398 #if defined(PAR) || defined(GRAN)
2399   markSparkQueue(evac);
2400 #endif
2401 }
2402
2403 /* -----------------------------------------------------------------------------
2404    performGC
2405
2406    This is the interface to the garbage collector from Haskell land.
2407    We provide this so that external C code can allocate and garbage
2408    collect when called from Haskell via _ccall_GC.
2409
2410    It might be useful to provide an interface whereby the programmer
2411    can specify more roots (ToDo).
2412    
2413    This needs to be protected by the GC condition variable above.  KH.
2414    -------------------------------------------------------------------------- */
2415
2416 void (*extra_roots)(evac_fn);
2417
2418 void
2419 performGC(void)
2420 {
2421   /* Obligated to hold this lock upon entry */
2422   ACQUIRE_LOCK(&sched_mutex);
2423   GarbageCollect(GetRoots,rtsFalse);
2424   RELEASE_LOCK(&sched_mutex);
2425 }
2426
2427 void
2428 performMajorGC(void)
2429 {
2430   ACQUIRE_LOCK(&sched_mutex);
2431   GarbageCollect(GetRoots,rtsTrue);
2432   RELEASE_LOCK(&sched_mutex);
2433 }
2434
2435 static void
2436 AllRoots(evac_fn evac)
2437 {
2438     GetRoots(evac);             // the scheduler's roots
2439     extra_roots(evac);          // the user's roots
2440 }
2441
2442 void
2443 performGCWithRoots(void (*get_roots)(evac_fn))
2444 {
2445   ACQUIRE_LOCK(&sched_mutex);
2446   extra_roots = get_roots;
2447   GarbageCollect(AllRoots,rtsFalse);
2448   RELEASE_LOCK(&sched_mutex);
2449 }
2450
2451 /* -----------------------------------------------------------------------------
2452    Stack overflow
2453
2454    If the thread has reached its maximum stack size, then raise the
2455    StackOverflow exception in the offending thread.  Otherwise
2456    relocate the TSO into a larger chunk of memory and adjust its stack
2457    size appropriately.
2458    -------------------------------------------------------------------------- */
2459
2460 static StgTSO *
2461 threadStackOverflow(StgTSO *tso)
2462 {
2463   nat new_stack_size, new_tso_size, diff, stack_words;
2464   StgPtr new_sp;
2465   StgTSO *dest;
2466
2467   IF_DEBUG(sanity,checkTSO(tso));
2468   if (tso->stack_size >= tso->max_stack_size) {
2469
2470     IF_DEBUG(gc,
2471              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2472                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2473              /* If we're debugging, just print out the top of the stack */
2474              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2475                                               tso->sp+64)));
2476
2477     /* Send this thread the StackOverflow exception */
2478     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2479     return tso;
2480   }
2481
2482   /* Try to double the current stack size.  If that takes us over the
2483    * maximum stack size for this thread, then use the maximum instead.
2484    * Finally round up so the TSO ends up as a whole number of blocks.
2485    */
2486   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2487   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2488                                        TSO_STRUCT_SIZE)/sizeof(W_);
2489   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2490   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2491
2492   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2493
2494   dest = (StgTSO *)allocate(new_tso_size);
2495   TICK_ALLOC_TSO(new_stack_size,0);
2496
2497   /* copy the TSO block and the old stack into the new area */
2498   memcpy(dest,tso,TSO_STRUCT_SIZE);
2499   stack_words = tso->stack + tso->stack_size - tso->sp;
2500   new_sp = (P_)dest + new_tso_size - stack_words;
2501   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2502
2503   /* relocate the stack pointers... */
2504   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2505   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2506   dest->sp    = new_sp;
2507   dest->stack_size = new_stack_size;
2508         
2509   /* and relocate the update frame list */
2510   relocate_stack(dest, diff);
2511
2512   /* Mark the old TSO as relocated.  We have to check for relocated
2513    * TSOs in the garbage collector and any primops that deal with TSOs.
2514    *
2515    * It's important to set the sp and su values to just beyond the end
2516    * of the stack, so we don't attempt to scavenge any part of the
2517    * dead TSO's stack.
2518    */
2519   tso->what_next = ThreadRelocated;
2520   tso->link = dest;
2521   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2522   tso->su = (StgUpdateFrame *)tso->sp;
2523   tso->why_blocked = NotBlocked;
2524   dest->mut_link = NULL;
2525
2526   IF_PAR_DEBUG(verbose,
2527                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2528                      tso->id, tso, tso->stack_size);
2529                /* If we're debugging, just print out the top of the stack */
2530                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2531                                                 tso->sp+64)));
2532   
2533   IF_DEBUG(sanity,checkTSO(tso));
2534 #if 0
2535   IF_DEBUG(scheduler,printTSO(dest));
2536 #endif
2537
2538   return dest;
2539 }
2540
2541 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2542 //@subsection Blocking Queue Routines
2543
2544 /* ---------------------------------------------------------------------------
2545    Wake up a queue that was blocked on some resource.
2546    ------------------------------------------------------------------------ */
2547
2548 #if defined(GRAN)
2549 static inline void
2550 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2551 {
2552 }
2553 #elif defined(PAR)
2554 static inline void
2555 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2556 {
2557   /* write RESUME events to log file and
2558      update blocked and fetch time (depending on type of the orig closure) */
2559   if (RtsFlags.ParFlags.ParStats.Full) {
2560     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2561                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2562                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2563     if (EMPTY_RUN_QUEUE())
2564       emitSchedule = rtsTrue;
2565
2566     switch (get_itbl(node)->type) {
2567         case FETCH_ME_BQ:
2568           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2569           break;
2570         case RBH:
2571         case FETCH_ME:
2572         case BLACKHOLE_BQ:
2573           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2574           break;
2575 #ifdef DIST
2576         case MVAR:
2577           break;
2578 #endif    
2579         default:
2580           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2581         }
2582       }
2583 }
2584 #endif
2585
2586 #if defined(GRAN)
2587 static StgBlockingQueueElement *
2588 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2589 {
2590     StgTSO *tso;
2591     PEs node_loc, tso_loc;
2592
2593     node_loc = where_is(node); // should be lifted out of loop
2594     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2595     tso_loc = where_is((StgClosure *)tso);
2596     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2597       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2598       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2599       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2600       // insertThread(tso, node_loc);
2601       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2602                 ResumeThread,
2603                 tso, node, (rtsSpark*)NULL);
2604       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2605       // len_local++;
2606       // len++;
2607     } else { // TSO is remote (actually should be FMBQ)
2608       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2609                                   RtsFlags.GranFlags.Costs.gunblocktime +
2610                                   RtsFlags.GranFlags.Costs.latency;
2611       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2612                 UnblockThread,
2613                 tso, node, (rtsSpark*)NULL);
2614       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2615       // len++;
2616     }
2617     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2618     IF_GRAN_DEBUG(bq,
2619                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2620                           (node_loc==tso_loc ? "Local" : "Global"), 
2621                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2622     tso->block_info.closure = NULL;
2623     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2624                              tso->id, tso));
2625 }
2626 #elif defined(PAR)
2627 static StgBlockingQueueElement *
2628 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2629 {
2630     StgBlockingQueueElement *next;
2631
2632     switch (get_itbl(bqe)->type) {
2633     case TSO:
2634       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2635       /* if it's a TSO just push it onto the run_queue */
2636       next = bqe->link;
2637       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2638       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2639       THREAD_RUNNABLE();
2640       unblockCount(bqe, node);
2641       /* reset blocking status after dumping event */
2642       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2643       break;
2644
2645     case BLOCKED_FETCH:
2646       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2647       next = bqe->link;
2648       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2649       PendingFetches = (StgBlockedFetch *)bqe;
2650       break;
2651
2652 # if defined(DEBUG)
2653       /* can ignore this case in a non-debugging setup; 
2654          see comments on RBHSave closures above */
2655     case CONSTR:
2656       /* check that the closure is an RBHSave closure */
2657       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2658              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2659              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2660       break;
2661
2662     default:
2663       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2664            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2665            (StgClosure *)bqe);
2666 # endif
2667     }
2668   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2669   return next;
2670 }
2671
2672 #else /* !GRAN && !PAR */
2673 static StgTSO *
2674 unblockOneLocked(StgTSO *tso)
2675 {
2676   StgTSO *next;
2677
2678   ASSERT(get_itbl(tso)->type == TSO);
2679   ASSERT(tso->why_blocked != NotBlocked);
2680   tso->why_blocked = NotBlocked;
2681   next = tso->link;
2682   PUSH_ON_RUN_QUEUE(tso);
2683   THREAD_RUNNABLE();
2684   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2685   return next;
2686 }
2687 #endif
2688
2689 #if defined(GRAN) || defined(PAR)
2690 inline StgBlockingQueueElement *
2691 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2692 {
2693   ACQUIRE_LOCK(&sched_mutex);
2694   bqe = unblockOneLocked(bqe, node);
2695   RELEASE_LOCK(&sched_mutex);
2696   return bqe;
2697 }
2698 #else
2699 inline StgTSO *
2700 unblockOne(StgTSO *tso)
2701 {
2702   ACQUIRE_LOCK(&sched_mutex);
2703   tso = unblockOneLocked(tso);
2704   RELEASE_LOCK(&sched_mutex);
2705   return tso;
2706 }
2707 #endif
2708
2709 #if defined(GRAN)
2710 void 
2711 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2712 {
2713   StgBlockingQueueElement *bqe;
2714   PEs node_loc;
2715   nat len = 0; 
2716
2717   IF_GRAN_DEBUG(bq, 
2718                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2719                       node, CurrentProc, CurrentTime[CurrentProc], 
2720                       CurrentTSO->id, CurrentTSO));
2721
2722   node_loc = where_is(node);
2723
2724   ASSERT(q == END_BQ_QUEUE ||
2725          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2726          get_itbl(q)->type == CONSTR); // closure (type constructor)
2727   ASSERT(is_unique(node));
2728
2729   /* FAKE FETCH: magically copy the node to the tso's proc;
2730      no Fetch necessary because in reality the node should not have been 
2731      moved to the other PE in the first place
2732   */
2733   if (CurrentProc!=node_loc) {
2734     IF_GRAN_DEBUG(bq, 
2735                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2736                         node, node_loc, CurrentProc, CurrentTSO->id, 
2737                         // CurrentTSO, where_is(CurrentTSO),
2738                         node->header.gran.procs));
2739     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2740     IF_GRAN_DEBUG(bq, 
2741                   belch("## new bitmask of node %p is %#x",
2742                         node, node->header.gran.procs));
2743     if (RtsFlags.GranFlags.GranSimStats.Global) {
2744       globalGranStats.tot_fake_fetches++;
2745     }
2746   }
2747
2748   bqe = q;
2749   // ToDo: check: ASSERT(CurrentProc==node_loc);
2750   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2751     //next = bqe->link;
2752     /* 
2753        bqe points to the current element in the queue
2754        next points to the next element in the queue
2755     */
2756     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2757     //tso_loc = where_is(tso);
2758     len++;
2759     bqe = unblockOneLocked(bqe, node);
2760   }
2761
2762   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2763      the closure to make room for the anchor of the BQ */
2764   if (bqe!=END_BQ_QUEUE) {
2765     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2766     /*
2767     ASSERT((info_ptr==&RBH_Save_0_info) ||
2768            (info_ptr==&RBH_Save_1_info) ||
2769            (info_ptr==&RBH_Save_2_info));
2770     */
2771     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2772     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2773     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2774
2775     IF_GRAN_DEBUG(bq,
2776                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2777                         node, info_type(node)));
2778   }
2779
2780   /* statistics gathering */
2781   if (RtsFlags.GranFlags.GranSimStats.Global) {
2782     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2783     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2784     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2785     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2786   }
2787   IF_GRAN_DEBUG(bq,
2788                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2789                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2790 }
2791 #elif defined(PAR)
2792 void 
2793 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2794 {
2795   StgBlockingQueueElement *bqe;
2796
2797   ACQUIRE_LOCK(&sched_mutex);
2798
2799   IF_PAR_DEBUG(verbose, 
2800                belch("##-_ AwBQ for node %p on [%x]: ",
2801                      node, mytid));
2802 #ifdef DIST  
2803   //RFP
2804   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2805     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2806     return;
2807   }
2808 #endif
2809   
2810   ASSERT(q == END_BQ_QUEUE ||
2811          get_itbl(q)->type == TSO ||           
2812          get_itbl(q)->type == BLOCKED_FETCH || 
2813          get_itbl(q)->type == CONSTR); 
2814
2815   bqe = q;
2816   while (get_itbl(bqe)->type==TSO || 
2817          get_itbl(bqe)->type==BLOCKED_FETCH) {
2818     bqe = unblockOneLocked(bqe, node);
2819   }
2820   RELEASE_LOCK(&sched_mutex);
2821 }
2822
2823 #else   /* !GRAN && !PAR */
2824 void
2825 awakenBlockedQueue(StgTSO *tso)
2826 {
2827   ACQUIRE_LOCK(&sched_mutex);
2828   while (tso != END_TSO_QUEUE) {
2829     tso = unblockOneLocked(tso);
2830   }
2831   RELEASE_LOCK(&sched_mutex);
2832 }
2833 #endif
2834
2835 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2836 //@subsection Exception Handling Routines
2837
2838 /* ---------------------------------------------------------------------------
2839    Interrupt execution
2840    - usually called inside a signal handler so it mustn't do anything fancy.   
2841    ------------------------------------------------------------------------ */
2842
2843 void
2844 interruptStgRts(void)
2845 {
2846     interrupted    = 1;
2847     context_switch = 1;
2848 }
2849
2850 /* -----------------------------------------------------------------------------
2851    Unblock a thread
2852
2853    This is for use when we raise an exception in another thread, which
2854    may be blocked.
2855    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2856    -------------------------------------------------------------------------- */
2857
2858 #if defined(GRAN) || defined(PAR)
2859 /*
2860   NB: only the type of the blocking queue is different in GranSim and GUM
2861       the operations on the queue-elements are the same
2862       long live polymorphism!
2863
2864   Locks: sched_mutex is held upon entry and exit.
2865
2866 */
2867 static void
2868 unblockThread(StgTSO *tso)
2869 {
2870   StgBlockingQueueElement *t, **last;
2871
2872   switch (tso->why_blocked) {
2873
2874   case NotBlocked:
2875     return;  /* not blocked */
2876
2877   case BlockedOnMVar:
2878     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2879     {
2880       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2881       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2882
2883       last = (StgBlockingQueueElement **)&mvar->head;
2884       for (t = (StgBlockingQueueElement *)mvar->head; 
2885            t != END_BQ_QUEUE; 
2886            last = &t->link, last_tso = t, t = t->link) {
2887         if (t == (StgBlockingQueueElement *)tso) {
2888           *last = (StgBlockingQueueElement *)tso->link;
2889           if (mvar->tail == tso) {
2890             mvar->tail = (StgTSO *)last_tso;
2891           }
2892           goto done;
2893         }
2894       }
2895       barf("unblockThread (MVAR): TSO not found");
2896     }
2897
2898   case BlockedOnBlackHole:
2899     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2900     {
2901       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2902
2903       last = &bq->blocking_queue;
2904       for (t = bq->blocking_queue; 
2905            t != END_BQ_QUEUE; 
2906            last = &t->link, t = t->link) {
2907         if (t == (StgBlockingQueueElement *)tso) {
2908           *last = (StgBlockingQueueElement *)tso->link;
2909           goto done;
2910         }
2911       }
2912       barf("unblockThread (BLACKHOLE): TSO not found");
2913     }
2914
2915   case BlockedOnException:
2916     {
2917       StgTSO *target  = tso->block_info.tso;
2918
2919       ASSERT(get_itbl(target)->type == TSO);
2920
2921       if (target->what_next == ThreadRelocated) {
2922           target = target->link;
2923           ASSERT(get_itbl(target)->type == TSO);
2924       }
2925
2926       ASSERT(target->blocked_exceptions != NULL);
2927
2928       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2929       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2930            t != END_BQ_QUEUE; 
2931            last = &t->link, t = t->link) {
2932         ASSERT(get_itbl(t)->type == TSO);
2933         if (t == (StgBlockingQueueElement *)tso) {
2934           *last = (StgBlockingQueueElement *)tso->link;
2935           goto done;
2936         }
2937       }
2938       barf("unblockThread (Exception): TSO not found");
2939     }
2940
2941   case BlockedOnRead:
2942   case BlockedOnWrite:
2943     {
2944       /* take TSO off blocked_queue */
2945       StgBlockingQueueElement *prev = NULL;
2946       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2947            prev = t, t = t->link) {
2948         if (t == (StgBlockingQueueElement *)tso) {
2949           if (prev == NULL) {
2950             blocked_queue_hd = (StgTSO *)t->link;
2951             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2952               blocked_queue_tl = END_TSO_QUEUE;
2953             }
2954           } else {
2955             prev->link = t->link;
2956             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2957               blocked_queue_tl = (StgTSO *)prev;
2958             }
2959           }
2960           goto done;
2961         }
2962       }
2963       barf("unblockThread (I/O): TSO not found");
2964     }
2965
2966   case BlockedOnDelay:
2967     {
2968       /* take TSO off sleeping_queue */
2969       StgBlockingQueueElement *prev = NULL;
2970       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2971            prev = t, t = t->link) {
2972         if (t == (StgBlockingQueueElement *)tso) {
2973           if (prev == NULL) {
2974             sleeping_queue = (StgTSO *)t->link;
2975           } else {
2976             prev->link = t->link;
2977           }
2978           goto done;
2979         }
2980       }
2981       barf("unblockThread (I/O): TSO not found");
2982     }
2983
2984   default:
2985     barf("unblockThread");
2986   }
2987
2988  done:
2989   tso->link = END_TSO_QUEUE;
2990   tso->why_blocked = NotBlocked;
2991   tso->block_info.closure = NULL;
2992   PUSH_ON_RUN_QUEUE(tso);
2993 }
2994 #else
2995 static void
2996 unblockThread(StgTSO *tso)
2997 {
2998   StgTSO *t, **last;
2999   
3000   /* To avoid locking unnecessarily. */
3001   if (tso->why_blocked == NotBlocked) {
3002     return;
3003   }
3004
3005   switch (tso->why_blocked) {
3006
3007   case BlockedOnMVar:
3008     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3009     {
3010       StgTSO *last_tso = END_TSO_QUEUE;
3011       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3012
3013       last = &mvar->head;
3014       for (t = mvar->head; t != END_TSO_QUEUE; 
3015            last = &t->link, last_tso = t, t = t->link) {
3016         if (t == tso) {
3017           *last = tso->link;
3018           if (mvar->tail == tso) {
3019             mvar->tail = last_tso;
3020           }
3021           goto done;
3022         }
3023       }
3024       barf("unblockThread (MVAR): TSO not found");
3025     }
3026
3027   case BlockedOnBlackHole:
3028     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3029     {
3030       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3031
3032       last = &bq->blocking_queue;
3033       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3034            last = &t->link, t = t->link) {
3035         if (t == tso) {
3036           *last = tso->link;
3037           goto done;
3038         }
3039       }
3040       barf("unblockThread (BLACKHOLE): TSO not found");
3041     }
3042
3043   case BlockedOnException:
3044     {
3045       StgTSO *target  = tso->block_info.tso;
3046
3047       ASSERT(get_itbl(target)->type == TSO);
3048
3049       while (target->what_next == ThreadRelocated) {
3050           target = target->link;
3051           ASSERT(get_itbl(target)->type == TSO);
3052       }
3053       
3054       ASSERT(target->blocked_exceptions != NULL);
3055
3056       last = &target->blocked_exceptions;
3057       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3058            last = &t->link, t = t->link) {
3059         ASSERT(get_itbl(t)->type == TSO);
3060         if (t == tso) {
3061           *last = tso->link;
3062           goto done;
3063         }
3064       }
3065       barf("unblockThread (Exception): TSO not found");
3066     }
3067
3068   case BlockedOnRead:
3069   case BlockedOnWrite:
3070     {
3071       StgTSO *prev = NULL;
3072       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3073            prev = t, t = t->link) {
3074         if (t == tso) {
3075           if (prev == NULL) {
3076             blocked_queue_hd = t->link;
3077             if (blocked_queue_tl == t) {
3078               blocked_queue_tl = END_TSO_QUEUE;
3079             }
3080           } else {
3081             prev->link = t->link;
3082             if (blocked_queue_tl == t) {
3083               blocked_queue_tl = prev;
3084             }
3085           }
3086           goto done;
3087         }
3088       }
3089       barf("unblockThread (I/O): TSO not found");
3090     }
3091
3092   case BlockedOnDelay:
3093     {
3094       StgTSO *prev = NULL;
3095       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3096            prev = t, t = t->link) {
3097         if (t == tso) {
3098           if (prev == NULL) {
3099             sleeping_queue = t->link;
3100           } else {
3101             prev->link = t->link;
3102           }
3103           goto done;
3104         }
3105       }
3106       barf("unblockThread (I/O): TSO not found");
3107     }
3108
3109   default:
3110     barf("unblockThread");
3111   }
3112
3113  done:
3114   tso->link = END_TSO_QUEUE;
3115   tso->why_blocked = NotBlocked;
3116   tso->block_info.closure = NULL;
3117   PUSH_ON_RUN_QUEUE(tso);
3118 }
3119 #endif
3120
3121 /* -----------------------------------------------------------------------------
3122  * raiseAsync()
3123  *
3124  * The following function implements the magic for raising an
3125  * asynchronous exception in an existing thread.
3126  *
3127  * We first remove the thread from any queue on which it might be
3128  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3129  *
3130  * We strip the stack down to the innermost CATCH_FRAME, building
3131  * thunks in the heap for all the active computations, so they can 
3132  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3133  * an application of the handler to the exception, and push it on
3134  * the top of the stack.
3135  * 
3136  * How exactly do we save all the active computations?  We create an
3137  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3138  * AP_UPDs pushes everything from the corresponding update frame
3139  * upwards onto the stack.  (Actually, it pushes everything up to the
3140  * next update frame plus a pointer to the next AP_UPD object.
3141  * Entering the next AP_UPD object pushes more onto the stack until we
3142  * reach the last AP_UPD object - at which point the stack should look
3143  * exactly as it did when we killed the TSO and we can continue
3144  * execution by entering the closure on top of the stack.
3145  *
3146  * We can also kill a thread entirely - this happens if either (a) the 
3147  * exception passed to raiseAsync is NULL, or (b) there's no
3148  * CATCH_FRAME on the stack.  In either case, we strip the entire
3149  * stack and replace the thread with a zombie.
3150  *
3151  * Locks: sched_mutex held upon entry nor exit.
3152  *
3153  * -------------------------------------------------------------------------- */
3154  
3155 void 
3156 deleteThread(StgTSO *tso)
3157 {
3158   raiseAsync(tso,NULL);
3159 }
3160
3161 void
3162 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3163 {
3164   /* When raising async exs from contexts where sched_mutex isn't held;
3165      use raiseAsyncWithLock(). */
3166   ACQUIRE_LOCK(&sched_mutex);
3167   raiseAsync(tso,exception);
3168   RELEASE_LOCK(&sched_mutex);
3169 }
3170
3171 void
3172 raiseAsync(StgTSO *tso, StgClosure *exception)
3173 {
3174   StgUpdateFrame* su = tso->su;
3175   StgPtr          sp = tso->sp;
3176   
3177   /* Thread already dead? */
3178   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3179     return;
3180   }
3181
3182   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3183
3184   /* Remove it from any blocking queues */
3185   unblockThread(tso);
3186
3187   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3188   /* The stack freezing code assumes there's a closure pointer on
3189    * the top of the stack.  This isn't always the case with compiled
3190    * code, so we have to push a dummy closure on the top which just
3191    * returns to the next return address on the stack.
3192    */
3193   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3194     *(--sp) = (W_)&stg_dummy_ret_closure;
3195   }
3196
3197   while (1) {
3198     nat words = ((P_)su - (P_)sp) - 1;
3199     nat i;
3200     StgAP_UPD * ap;
3201
3202     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3203      * then build the THUNK raise(exception), and leave it on
3204      * top of the CATCH_FRAME ready to enter.
3205      */
3206     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3207 #ifdef PROFILING
3208       StgCatchFrame *cf = (StgCatchFrame *)su;
3209 #endif
3210       StgClosure *raise;
3211
3212       /* we've got an exception to raise, so let's pass it to the
3213        * handler in this frame.
3214        */
3215       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3216       TICK_ALLOC_SE_THK(1,0);
3217       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3218       raise->payload[0] = exception;
3219
3220       /* throw away the stack from Sp up to the CATCH_FRAME.
3221        */
3222       sp = (P_)su - 1;
3223
3224       /* Ensure that async excpetions are blocked now, so we don't get
3225        * a surprise exception before we get around to executing the
3226        * handler.
3227        */
3228       if (tso->blocked_exceptions == NULL) {
3229           tso->blocked_exceptions = END_TSO_QUEUE;
3230       }
3231
3232       /* Put the newly-built THUNK on top of the stack, ready to execute
3233        * when the thread restarts.
3234        */
3235       sp[0] = (W_)raise;
3236       tso->sp = sp;
3237       tso->su = su;
3238       tso->what_next = ThreadEnterGHC;
3239       IF_DEBUG(sanity, checkTSO(tso));
3240       return;
3241     }
3242
3243     /* First build an AP_UPD consisting of the stack chunk above the
3244      * current update frame, with the top word on the stack as the
3245      * fun field.
3246      */
3247     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3248     
3249     ASSERT(words >= 0);
3250     
3251     ap->n_args = words;
3252     ap->fun    = (StgClosure *)sp[0];
3253     sp++;
3254     for(i=0; i < (nat)words; ++i) {
3255       ap->payload[i] = (StgClosure *)*sp++;
3256     }
3257     
3258     switch (get_itbl(su)->type) {
3259       
3260     case UPDATE_FRAME:
3261       {
3262         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3263         TICK_ALLOC_UP_THK(words+1,0);
3264         
3265         IF_DEBUG(scheduler,
3266                  fprintf(stderr,  "scheduler: Updating ");
3267                  printPtr((P_)su->updatee); 
3268                  fprintf(stderr,  " with ");
3269                  printObj((StgClosure *)ap);
3270                  );
3271         
3272         /* Replace the updatee with an indirection - happily
3273          * this will also wake up any threads currently
3274          * waiting on the result.
3275          *
3276          * Warning: if we're in a loop, more than one update frame on
3277          * the stack may point to the same object.  Be careful not to
3278          * overwrite an IND_OLDGEN in this case, because we'll screw
3279          * up the mutable lists.  To be on the safe side, don't
3280          * overwrite any kind of indirection at all.  See also
3281          * threadSqueezeStack in GC.c, where we have to make a similar
3282          * check.
3283          */
3284         if (!closure_IND(su->updatee)) {
3285             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3286         }
3287         su = su->link;
3288         sp += sizeofW(StgUpdateFrame) -1;
3289         sp[0] = (W_)ap; /* push onto stack */
3290         break;
3291       }
3292
3293     case CATCH_FRAME:
3294       {
3295         StgCatchFrame *cf = (StgCatchFrame *)su;
3296         StgClosure* o;
3297         
3298         /* We want a PAP, not an AP_UPD.  Fortunately, the
3299          * layout's the same.
3300          */
3301         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3302         TICK_ALLOC_UPD_PAP(words+1,0);
3303         
3304         /* now build o = FUN(catch,ap,handler) */
3305         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3306         TICK_ALLOC_FUN(2,0);
3307         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3308         o->payload[0] = (StgClosure *)ap;
3309         o->payload[1] = cf->handler;
3310         
3311         IF_DEBUG(scheduler,
3312                  fprintf(stderr,  "scheduler: Built ");
3313                  printObj((StgClosure *)o);
3314                  );
3315         
3316         /* pop the old handler and put o on the stack */
3317         su = cf->link;
3318         sp += sizeofW(StgCatchFrame) - 1;
3319         sp[0] = (W_)o;
3320         break;
3321       }
3322       
3323     case SEQ_FRAME:
3324       {
3325         StgSeqFrame *sf = (StgSeqFrame *)su;
3326         StgClosure* o;
3327         
3328         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3329         TICK_ALLOC_UPD_PAP(words+1,0);
3330         
3331         /* now build o = FUN(seq,ap) */
3332         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3333         TICK_ALLOC_SE_THK(1,0);
3334         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3335         o->payload[0] = (StgClosure *)ap;
3336         
3337         IF_DEBUG(scheduler,
3338                  fprintf(stderr,  "scheduler: Built ");
3339                  printObj((StgClosure *)o);
3340                  );
3341         
3342         /* pop the old handler and put o on the stack */
3343         su = sf->link;
3344         sp += sizeofW(StgSeqFrame) - 1;
3345         sp[0] = (W_)o;
3346         break;
3347       }
3348       
3349     case STOP_FRAME:
3350       /* We've stripped the entire stack, the thread is now dead. */
3351       sp += sizeofW(StgStopFrame) - 1;
3352       sp[0] = (W_)exception;    /* save the exception */
3353       tso->what_next = ThreadKilled;
3354       tso->su = (StgUpdateFrame *)(sp+1);
3355       tso->sp = sp;
3356       return;
3357
3358     default:
3359       barf("raiseAsync");
3360     }
3361   }
3362   barf("raiseAsync");
3363 }
3364
3365 /* -----------------------------------------------------------------------------
3366    resurrectThreads is called after garbage collection on the list of
3367    threads found to be garbage.  Each of these threads will be woken
3368    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3369    on an MVar, or NonTermination if the thread was blocked on a Black
3370    Hole.
3371
3372    Locks: sched_mutex isn't held upon entry nor exit.
3373    -------------------------------------------------------------------------- */
3374
3375 void
3376 resurrectThreads( StgTSO *threads )
3377 {
3378   StgTSO *tso, *next;
3379
3380   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3381     next = tso->global_link;
3382     tso->global_link = all_threads;
3383     all_threads = tso;
3384     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3385
3386     switch (tso->why_blocked) {
3387     case BlockedOnMVar:
3388     case BlockedOnException:
3389       /* Called by GC - sched_mutex lock is currently held. */
3390       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3391       break;
3392     case BlockedOnBlackHole:
3393       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3394       break;
3395     case NotBlocked:
3396       /* This might happen if the thread was blocked on a black hole
3397        * belonging to a thread that we've just woken up (raiseAsync
3398        * can wake up threads, remember...).
3399        */
3400       continue;
3401     default:
3402       barf("resurrectThreads: thread blocked in a strange way");
3403     }
3404   }
3405 }
3406
3407 /* -----------------------------------------------------------------------------
3408  * Blackhole detection: if we reach a deadlock, test whether any
3409  * threads are blocked on themselves.  Any threads which are found to
3410  * be self-blocked get sent a NonTermination exception.
3411  *
3412  * This is only done in a deadlock situation in order to avoid
3413  * performance overhead in the normal case.
3414  *
3415  * Locks: sched_mutex is held upon entry and exit.
3416  * -------------------------------------------------------------------------- */
3417
3418 static void
3419 detectBlackHoles( void )
3420 {
3421     StgTSO *t = all_threads;
3422     StgUpdateFrame *frame;
3423     StgClosure *blocked_on;
3424
3425     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3426
3427         while (t->what_next == ThreadRelocated) {
3428             t = t->link;
3429             ASSERT(get_itbl(t)->type == TSO);
3430         }
3431       
3432         if (t->why_blocked != BlockedOnBlackHole) {
3433             continue;
3434         }
3435
3436         blocked_on = t->block_info.closure;
3437
3438         for (frame = t->su; ; frame = frame->link) {
3439             switch (get_itbl(frame)->type) {
3440
3441             case UPDATE_FRAME:
3442                 if (frame->updatee == blocked_on) {
3443                     /* We are blocking on one of our own computations, so
3444                      * send this thread the NonTermination exception.  
3445                      */
3446                     IF_DEBUG(scheduler, 
3447                              sched_belch("thread %d is blocked on itself", t->id));
3448                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3449                     goto done;
3450                 }
3451                 else {
3452                     continue;
3453                 }
3454
3455             case CATCH_FRAME:
3456             case SEQ_FRAME:
3457                 continue;
3458                 
3459             case STOP_FRAME:
3460                 break;
3461             }
3462             break;
3463         }
3464
3465     done: ;
3466     }   
3467 }
3468
3469 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3470 //@subsection Debugging Routines
3471
3472 /* -----------------------------------------------------------------------------
3473    Debugging: why is a thread blocked
3474    -------------------------------------------------------------------------- */
3475
3476 #ifdef DEBUG
3477
3478 void
3479 printThreadBlockage(StgTSO *tso)
3480 {
3481   switch (tso->why_blocked) {
3482   case BlockedOnRead:
3483     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3484     break;
3485   case BlockedOnWrite:
3486     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3487     break;
3488   case BlockedOnDelay:
3489     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3490     break;
3491   case BlockedOnMVar:
3492     fprintf(stderr,"is blocked on an MVar");
3493     break;
3494   case BlockedOnException:
3495     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3496             tso->block_info.tso->id);
3497     break;
3498   case BlockedOnBlackHole:
3499     fprintf(stderr,"is blocked on a black hole");
3500     break;
3501   case NotBlocked:
3502     fprintf(stderr,"is not blocked");
3503     break;
3504 #if defined(PAR)
3505   case BlockedOnGA:
3506     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3507             tso->block_info.closure, info_type(tso->block_info.closure));
3508     break;
3509   case BlockedOnGA_NoSend:
3510     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3511             tso->block_info.closure, info_type(tso->block_info.closure));
3512     break;
3513 #endif
3514 #if defined(RTS_SUPPORTS_THREADS)
3515   case BlockedOnCCall:
3516     fprintf(stderr,"is blocked on an external call");
3517     break;
3518 #endif
3519   default:
3520     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3521          tso->why_blocked, tso->id, tso);
3522   }
3523 }
3524
3525 void
3526 printThreadStatus(StgTSO *tso)
3527 {
3528   switch (tso->what_next) {
3529   case ThreadKilled:
3530     fprintf(stderr,"has been killed");
3531     break;
3532   case ThreadComplete:
3533     fprintf(stderr,"has completed");
3534     break;
3535   default:
3536     printThreadBlockage(tso);
3537   }
3538 }
3539
3540 void
3541 printAllThreads(void)
3542 {
3543   StgTSO *t;
3544
3545 # if defined(GRAN)
3546   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3547   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3548                        time_string, rtsFalse/*no commas!*/);
3549
3550   sched_belch("all threads at [%s]:", time_string);
3551 # elif defined(PAR)
3552   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3553   ullong_format_string(CURRENT_TIME,
3554                        time_string, rtsFalse/*no commas!*/);
3555
3556   sched_belch("all threads at [%s]:", time_string);
3557 # else
3558   sched_belch("all threads:");
3559 # endif
3560
3561   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3562     fprintf(stderr, "\tthread %d ", t->id);
3563     if (t->label) fprintf(stderr,"[\"%s\"] ",t->label);
3564     printThreadStatus(t);
3565     fprintf(stderr,"\n");
3566   }
3567 }
3568     
3569 /* 
3570    Print a whole blocking queue attached to node (debugging only).
3571 */
3572 //@cindex print_bq
3573 # if defined(PAR)
3574 void 
3575 print_bq (StgClosure *node)
3576 {
3577   StgBlockingQueueElement *bqe;
3578   StgTSO *tso;
3579   rtsBool end;
3580
3581   fprintf(stderr,"## BQ of closure %p (%s): ",
3582           node, info_type(node));
3583
3584   /* should cover all closures that may have a blocking queue */
3585   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3586          get_itbl(node)->type == FETCH_ME_BQ ||
3587          get_itbl(node)->type == RBH ||
3588          get_itbl(node)->type == MVAR);
3589     
3590   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3591
3592   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3593 }
3594
3595 /* 
3596    Print a whole blocking queue starting with the element bqe.
3597 */
3598 void 
3599 print_bqe (StgBlockingQueueElement *bqe)
3600 {
3601   rtsBool end;
3602
3603   /* 
3604      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3605   */
3606   for (end = (bqe==END_BQ_QUEUE);
3607        !end; // iterate until bqe points to a CONSTR
3608        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3609        bqe = end ? END_BQ_QUEUE : bqe->link) {
3610     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3611     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3612     /* types of closures that may appear in a blocking queue */
3613     ASSERT(get_itbl(bqe)->type == TSO ||           
3614            get_itbl(bqe)->type == BLOCKED_FETCH || 
3615            get_itbl(bqe)->type == CONSTR); 
3616     /* only BQs of an RBH end with an RBH_Save closure */
3617     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3618
3619     switch (get_itbl(bqe)->type) {
3620     case TSO:
3621       fprintf(stderr," TSO %u (%x),",
3622               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3623       break;
3624     case BLOCKED_FETCH:
3625       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3626               ((StgBlockedFetch *)bqe)->node, 
3627               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3628               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3629               ((StgBlockedFetch *)bqe)->ga.weight);
3630       break;
3631     case CONSTR:
3632       fprintf(stderr," %s (IP %p),",
3633               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3634                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3635                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3636                "RBH_Save_?"), get_itbl(bqe));
3637       break;
3638     default:
3639       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3640            info_type((StgClosure *)bqe)); // , node, info_type(node));
3641       break;
3642     }
3643   } /* for */
3644   fputc('\n', stderr);
3645 }
3646 # elif defined(GRAN)
3647 void 
3648 print_bq (StgClosure *node)
3649 {
3650   StgBlockingQueueElement *bqe;
3651   PEs node_loc, tso_loc;
3652   rtsBool end;
3653
3654   /* should cover all closures that may have a blocking queue */
3655   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3656          get_itbl(node)->type == FETCH_ME_BQ ||
3657          get_itbl(node)->type == RBH);
3658     
3659   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3660   node_loc = where_is(node);
3661
3662   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3663           node, info_type(node), node_loc);
3664
3665   /* 
3666      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3667   */
3668   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3669        !end; // iterate until bqe points to a CONSTR
3670        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3671     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3672     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3673     /* types of closures that may appear in a blocking queue */
3674     ASSERT(get_itbl(bqe)->type == TSO ||           
3675            get_itbl(bqe)->type == CONSTR); 
3676     /* only BQs of an RBH end with an RBH_Save closure */
3677     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3678
3679     tso_loc = where_is((StgClosure *)bqe);
3680     switch (get_itbl(bqe)->type) {
3681     case TSO:
3682       fprintf(stderr," TSO %d (%p) on [PE %d],",
3683               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3684       break;
3685     case CONSTR:
3686       fprintf(stderr," %s (IP %p),",
3687               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3688                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3689                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3690                "RBH_Save_?"), get_itbl(bqe));
3691       break;
3692     default:
3693       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3694            info_type((StgClosure *)bqe), node, info_type(node));
3695       break;
3696     }
3697   } /* for */
3698   fputc('\n', stderr);
3699 }
3700 #else
3701 /* 
3702    Nice and easy: only TSOs on the blocking queue
3703 */
3704 void 
3705 print_bq (StgClosure *node)
3706 {
3707   StgTSO *tso;
3708
3709   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3710   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3711        tso != END_TSO_QUEUE; 
3712        tso=tso->link) {
3713     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3714     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3715     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3716   }
3717   fputc('\n', stderr);
3718 }
3719 # endif
3720
3721 #if defined(PAR)
3722 static nat
3723 run_queue_len(void)
3724 {
3725   nat i;
3726   StgTSO *tso;
3727
3728   for (i=0, tso=run_queue_hd; 
3729        tso != END_TSO_QUEUE;
3730        i++, tso=tso->link)
3731     /* nothing */
3732
3733   return i;
3734 }
3735 #endif
3736
3737 static void
3738 sched_belch(char *s, ...)
3739 {
3740   va_list ap;
3741   va_start(ap,s);
3742 #ifdef SMP
3743   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3744 #elif defined(PAR)
3745   fprintf(stderr, "== ");
3746 #else
3747   fprintf(stderr, "scheduler: ");
3748 #endif
3749   vfprintf(stderr, s, ap);
3750   fprintf(stderr, "\n");
3751 }
3752
3753 #endif /* DEBUG */
3754
3755
3756 //@node Index,  , Debugging Routines, Main scheduling code
3757 //@subsection Index
3758
3759 //@index
3760 //* StgMainThread::  @cindex\s-+StgMainThread
3761 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3762 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3763 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3764 //* context_switch::  @cindex\s-+context_switch
3765 //* createThread::  @cindex\s-+createThread
3766 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3767 //* initScheduler::  @cindex\s-+initScheduler
3768 //* interrupted::  @cindex\s-+interrupted
3769 //* next_thread_id::  @cindex\s-+next_thread_id
3770 //* print_bq::  @cindex\s-+print_bq
3771 //* run_queue_hd::  @cindex\s-+run_queue_hd
3772 //* run_queue_tl::  @cindex\s-+run_queue_tl
3773 //* sched_mutex::  @cindex\s-+sched_mutex
3774 //* schedule::  @cindex\s-+schedule
3775 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3776 //* term_mutex::  @cindex\s-+term_mutex
3777 //@end index