[project @ 2002-02-04 20:21:16 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.114 2002/01/31 11:18:07 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115
116 #include <stdarg.h>
117
118 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
119 //@subsection Variables and Data structures
120
121 /* Main threads:
122  *
123  * These are the threads which clients have requested that we run.  
124  *
125  * In a 'threaded' build, we might have several concurrent clients all
126  * waiting for results, and each one will wait on a condition variable
127  * until the result is available.
128  *
129  * In non-SMP, clients are strictly nested: the first client calls
130  * into the RTS, which might call out again to C with a _ccall_GC, and
131  * eventually re-enter the RTS.
132  *
133  * Main threads information is kept in a linked list:
134  */
135 //@cindex StgMainThread
136 typedef struct StgMainThread_ {
137   StgTSO *         tso;
138   SchedulerStatus  stat;
139   StgClosure **    ret;
140 #if defined(RTS_SUPPORTS_THREADS)
141   CondVar          wakeup;
142 #endif
143   struct StgMainThread_ *link;
144 } StgMainThread;
145
146 /* Main thread queue.
147  * Locks required: sched_mutex.
148  */
149 static StgMainThread *main_threads;
150
151 /* Thread queues.
152  * Locks required: sched_mutex.
153  */
154 #if defined(GRAN)
155
156 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
157 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
158
159 /* 
160    In GranSim we have a runable and a blocked queue for each processor.
161    In order to minimise code changes new arrays run_queue_hds/tls
162    are created. run_queue_hd is then a short cut (macro) for
163    run_queue_hds[CurrentProc] (see GranSim.h).
164    -- HWL
165 */
166 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
167 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
168 StgTSO *ccalling_threadss[MAX_PROC];
169 /* We use the same global list of threads (all_threads) in GranSim as in
170    the std RTS (i.e. we are cheating). However, we don't use this list in
171    the GranSim specific code at the moment (so we are only potentially
172    cheating).  */
173
174 #else /* !GRAN */
175
176 StgTSO *run_queue_hd, *run_queue_tl;
177 StgTSO *blocked_queue_hd, *blocked_queue_tl;
178 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
179
180 #endif
181
182 /* Linked list of all threads.
183  * Used for detecting garbage collected threads.
184  */
185 StgTSO *all_threads;
186
187 /* Threads suspended in _ccall_GC.
188  */
189 static StgTSO *suspended_ccalling_threads;
190
191 static StgTSO *threadStackOverflow(StgTSO *tso);
192
193 /* KH: The following two flags are shared memory locations.  There is no need
194        to lock them, since they are only unset at the end of a scheduler
195        operation.
196 */
197
198 /* flag set by signal handler to precipitate a context switch */
199 //@cindex context_switch
200 nat context_switch;
201
202 /* if this flag is set as well, give up execution */
203 //@cindex interrupted
204 rtsBool interrupted;
205
206 /* Next thread ID to allocate.
207  * Locks required: sched_mutex
208  */
209 //@cindex next_thread_id
210 StgThreadID next_thread_id = 1;
211
212 /*
213  * Pointers to the state of the current thread.
214  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
215  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
216  */
217  
218 /* The smallest stack size that makes any sense is:
219  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
220  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
221  *  + 1                       (the realworld token for an IO thread)
222  *  + 1                       (the closure to enter)
223  *
224  * A thread with this stack will bomb immediately with a stack
225  * overflow, which will increase its stack size.  
226  */
227
228 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
229
230
231 #if !defined(SMP)
232 Capability MainCapability;     /* for non-SMP, we have one global capability */
233 #endif
234
235 #if defined(GRAN)
236 StgTSO *CurrentTSO;
237 #endif
238
239 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
240  *  exists - earlier gccs apparently didn't.
241  *  -= chak
242  */
243 StgTSO dummy_tso;
244
245 rtsBool ready_to_gc;
246
247 /* All our current task ids, saved in case we need to kill them later.
248  */
249 #if defined(SMP)
250 //@cindex task_ids
251 task_info *task_ids;
252 #endif
253
254 void            addToBlockedQueue ( StgTSO *tso );
255
256 static void     schedule          ( void );
257        void     interruptStgRts   ( void );
258 #if defined(GRAN)
259 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
260 #else
261 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
262 #endif
263
264 static void     detectBlackHoles  ( void );
265
266 #ifdef DEBUG
267 static void sched_belch(char *s, ...);
268 #endif
269
270 #if defined(RTS_SUPPORTS_THREADS)
271 /* ToDo: carefully document the invariants that go together
272  *       with these synchronisation objects.
273  */
274 MutexVar  sched_mutex       = INIT_MUTEX_VAR;
275 MutexVar  term_mutex        = INIT_MUTEX_VAR;
276 CondVar   thread_ready_cond = INIT_COND_VAR;
277 CondVar   gc_pending_cond   = INIT_COND_VAR;
278
279 nat await_death;
280 #endif
281
282 #if defined(PAR)
283 StgTSO *LastTSO;
284 rtsTime TimeOfLastYield;
285 rtsBool emitSchedule = rtsTrue;
286 #endif
287
288 #if DEBUG
289 char *whatNext_strs[] = {
290   "ThreadEnterGHC",
291   "ThreadRunGHC",
292   "ThreadEnterInterp",
293   "ThreadKilled",
294   "ThreadComplete"
295 };
296
297 char *threadReturnCode_strs[] = {
298   "HeapOverflow",                       /* might also be StackOverflow */
299   "StackOverflow",
300   "ThreadYielding",
301   "ThreadBlocked",
302   "ThreadFinished"
303 };
304 #endif
305
306 #if defined(PAR)
307 StgTSO * createSparkThread(rtsSpark spark);
308 StgTSO * activateSpark (rtsSpark spark);  
309 #endif
310
311 /*
312  * The thread state for the main thread.
313 // ToDo: check whether not needed any more
314 StgTSO   *MainTSO;
315  */
316
317 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
318 //@subsection Main scheduling loop
319
320 /* ---------------------------------------------------------------------------
321    Main scheduling loop.
322
323    We use round-robin scheduling, each thread returning to the
324    scheduler loop when one of these conditions is detected:
325
326       * out of heap space
327       * timer expires (thread yields)
328       * thread blocks
329       * thread ends
330       * stack overflow
331
332    Locking notes:  we acquire the scheduler lock once at the beginning
333    of the scheduler loop, and release it when
334     
335       * running a thread, or
336       * waiting for work, or
337       * waiting for a GC to complete.
338
339    GRAN version:
340      In a GranSim setup this loop iterates over the global event queue.
341      This revolves around the global event queue, which determines what 
342      to do next. Therefore, it's more complicated than either the 
343      concurrent or the parallel (GUM) setup.
344
345    GUM version:
346      GUM iterates over incoming messages.
347      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
348      and sends out a fish whenever it has nothing to do; in-between
349      doing the actual reductions (shared code below) it processes the
350      incoming messages and deals with delayed operations 
351      (see PendingFetches).
352      This is not the ugliest code you could imagine, but it's bloody close.
353
354    ------------------------------------------------------------------------ */
355 //@cindex schedule
356 static void
357 schedule( void )
358 {
359   StgTSO *t;
360   Capability *cap;
361   StgThreadReturnCode ret;
362 #if defined(GRAN)
363   rtsEvent *event;
364 #elif defined(PAR)
365   StgSparkPool *pool;
366   rtsSpark spark;
367   StgTSO *tso;
368   GlobalTaskId pe;
369   rtsBool receivedFinish = rtsFalse;
370 # if defined(DEBUG)
371   nat tp_size, sp_size; // stats only
372 # endif
373 #endif
374   rtsBool was_interrupted = rtsFalse;
375   
376   ACQUIRE_LOCK(&sched_mutex);
377
378 #if defined(GRAN)
379
380   /* set up first event to get things going */
381   /* ToDo: assign costs for system setup and init MainTSO ! */
382   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
383             ContinueThread, 
384             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
385
386   IF_DEBUG(gran,
387            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
388            G_TSO(CurrentTSO, 5));
389
390   if (RtsFlags.GranFlags.Light) {
391     /* Save current time; GranSim Light only */
392     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
393   }      
394
395   event = get_next_event();
396
397   while (event!=(rtsEvent*)NULL) {
398     /* Choose the processor with the next event */
399     CurrentProc = event->proc;
400     CurrentTSO = event->tso;
401
402 #elif defined(PAR)
403
404   while (!receivedFinish) {    /* set by processMessages */
405                                /* when receiving PP_FINISH message         */ 
406 #else
407
408   while (1) {
409
410 #endif
411
412     IF_DEBUG(scheduler, printAllThreads());
413
414     /* If we're interrupted (the user pressed ^C, or some other
415      * termination condition occurred), kill all the currently running
416      * threads.
417      */
418     if (interrupted) {
419       IF_DEBUG(scheduler, sched_belch("interrupted"));
420       deleteAllThreads();
421       interrupted = rtsFalse;
422       was_interrupted = rtsTrue;
423     }
424
425     /* Go through the list of main threads and wake up any
426      * clients whose computations have finished.  ToDo: this
427      * should be done more efficiently without a linear scan
428      * of the main threads list, somehow...
429      */
430 #if defined(RTS_SUPPORTS_THREADS)
431     { 
432       StgMainThread *m, **prev;
433       prev = &main_threads;
434       for (m = main_threads; m != NULL; m = m->link) {
435         switch (m->tso->what_next) {
436         case ThreadComplete:
437           if (m->ret) {
438             *(m->ret) = (StgClosure *)m->tso->sp[0];
439           }
440           *prev = m->link;
441           m->stat = Success;
442           broadcastCondVar(&m->wakeup);
443           break;
444         case ThreadKilled:
445           if (m->ret) *(m->ret) = NULL;
446           *prev = m->link;
447           if (was_interrupted) {
448             m->stat = Interrupted;
449           } else {
450             m->stat = Killed;
451           }
452           broadcastCondVar(&m->wakeup);
453           break;
454         default:
455           break;
456         }
457       }
458     }
459
460 #else /* not threaded */
461
462 # if defined(PAR)
463     /* in GUM do this only on the Main PE */
464     if (IAmMainThread)
465 # endif
466     /* If our main thread has finished or been killed, return.
467      */
468     {
469       StgMainThread *m = main_threads;
470       if (m->tso->what_next == ThreadComplete
471           || m->tso->what_next == ThreadKilled) {
472         main_threads = main_threads->link;
473         if (m->tso->what_next == ThreadComplete) {
474           /* we finished successfully, fill in the return value */
475           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
476           m->stat = Success;
477           return;
478         } else {
479           if (m->ret) { *(m->ret) = NULL; };
480           if (was_interrupted) {
481             m->stat = Interrupted;
482           } else {
483             m->stat = Killed;
484           }
485           return;
486         }
487       }
488     }
489 #endif
490
491     /* Top up the run queue from our spark pool.  We try to make the
492      * number of threads in the run queue equal to the number of
493      * free capabilities.
494      *
495      * Disable spark support in SMP for now, non-essential & requires
496      * a little bit of work to make it compile cleanly. -- sof 1/02.
497      */
498 #if 0 /* defined(SMP) */
499     {
500       nat n = getFreeCapabilities();
501       StgTSO *tso = run_queue_hd;
502
503       /* Count the run queue */
504       while (n > 0 && tso != END_TSO_QUEUE) {
505         tso = tso->link;
506         n--;
507       }
508
509       for (; n > 0; n--) {
510         StgClosure *spark;
511         spark = findSpark(rtsFalse);
512         if (spark == NULL) {
513           break; /* no more sparks in the pool */
514         } else {
515           /* I'd prefer this to be done in activateSpark -- HWL */
516           /* tricky - it needs to hold the scheduler lock and
517            * not try to re-acquire it -- SDM */
518           createSparkThread(spark);       
519           IF_DEBUG(scheduler,
520                    sched_belch("==^^ turning spark of closure %p into a thread",
521                                (StgClosure *)spark));
522         }
523       }
524       /* We need to wake up the other tasks if we just created some
525        * work for them.
526        */
527       if (getFreeCapabilities() - n > 1) {
528           signalCondVar ( &thread_ready_cond );
529       }
530     }
531 #endif // SMP
532
533     /* check for signals each time around the scheduler */
534 #ifndef mingw32_TARGET_OS
535     if (signals_pending()) {
536       startSignalHandlers();
537     }
538 #endif
539
540     /* Check whether any waiting threads need to be woken up.  If the
541      * run queue is empty, and there are no other tasks running, we
542      * can wait indefinitely for something to happen.
543      * ToDo: what if another client comes along & requests another
544      * main thread?
545      */
546     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
547       awaitEvent(
548            (run_queue_hd == END_TSO_QUEUE)
549 #if defined(SMP)
550         && allFreeCapabilities()
551 #endif
552         );
553     }
554     /* we can be interrupted while waiting for I/O... */
555     if (interrupted) continue;
556
557     /* 
558      * Detect deadlock: when we have no threads to run, there are no
559      * threads waiting on I/O or sleeping, and all the other tasks are
560      * waiting for work, we must have a deadlock of some description.
561      *
562      * We first try to find threads blocked on themselves (ie. black
563      * holes), and generate NonTermination exceptions where necessary.
564      *
565      * If no threads are black holed, we have a deadlock situation, so
566      * inform all the main threads.
567      */
568 #ifndef PAR
569     if (blocked_queue_hd == END_TSO_QUEUE
570         && run_queue_hd == END_TSO_QUEUE
571         && sleeping_queue == END_TSO_QUEUE
572 #if defined(SMP)
573         && allFreeCapabilities()
574 #endif
575         )
576     {
577         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
578         GarbageCollect(GetRoots,rtsTrue);
579         if (blocked_queue_hd == END_TSO_QUEUE
580             && run_queue_hd == END_TSO_QUEUE
581             && sleeping_queue == END_TSO_QUEUE) {
582
583             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
584             detectBlackHoles();
585
586             /* No black holes, so probably a real deadlock.  Send the
587              * current main thread the Deadlock exception (or in the SMP
588              * build, send *all* main threads the deadlock exception,
589              * since none of them can make progress).
590              */
591             if (run_queue_hd == END_TSO_QUEUE) {
592                 StgMainThread *m;
593 #if defined(RTS_SUPPORTS_THREADS)
594                 for (m = main_threads; m != NULL; m = m->link) {
595                     switch (m->tso->why_blocked) {
596                     case BlockedOnBlackHole:
597                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
598                         break;
599                     case BlockedOnException:
600                     case BlockedOnMVar:
601                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
602                         break;
603                     default:
604                         barf("deadlock: main thread blocked in a strange way");
605                     }
606                 }
607 #else
608                 m = main_threads;
609                 switch (m->tso->why_blocked) {
610                 case BlockedOnBlackHole:
611                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
612                     break;
613                 case BlockedOnException:
614                 case BlockedOnMVar:
615                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
616                     break;
617                 default:
618                     barf("deadlock: main thread blocked in a strange way");
619                 }
620 #endif
621             }
622 #if !defined(RTS_SUPPORTS_THREADS)
623             ASSERT( run_queue_hd != END_TSO_QUEUE );
624 #endif
625         }
626     }
627 #elif defined(PAR)
628     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
629 #endif
630
631 #if defined(SMP)
632     /* If there's a GC pending, don't do anything until it has
633      * completed.
634      */
635     if (ready_to_gc) {
636       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
637       waitCondVar ( &gc_pending_cond, &sched_mutex );
638     }
639 #endif    
640
641 #if defined(RTS_SUPPORTS_THREADS)
642     /* block until we've got a thread on the run queue and a free
643      * capability.
644      */
645     while ( run_queue_hd == END_TSO_QUEUE 
646 #if defined(SMP)
647             || noFreeCapabilities() 
648 #endif
649             ) {
650       IF_DEBUG(scheduler, sched_belch("waiting for work"));
651       waitCondVar ( &thread_ready_cond, &sched_mutex );
652       IF_DEBUG(scheduler, sched_belch("work now available"));
653     }
654 #endif
655
656 #if defined(GRAN)
657
658     if (RtsFlags.GranFlags.Light)
659       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
660
661     /* adjust time based on time-stamp */
662     if (event->time > CurrentTime[CurrentProc] &&
663         event->evttype != ContinueThread)
664       CurrentTime[CurrentProc] = event->time;
665     
666     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
667     if (!RtsFlags.GranFlags.Light)
668       handleIdlePEs();
669
670     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
671
672     /* main event dispatcher in GranSim */
673     switch (event->evttype) {
674       /* Should just be continuing execution */
675     case ContinueThread:
676       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
677       /* ToDo: check assertion
678       ASSERT(run_queue_hd != (StgTSO*)NULL &&
679              run_queue_hd != END_TSO_QUEUE);
680       */
681       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
682       if (!RtsFlags.GranFlags.DoAsyncFetch &&
683           procStatus[CurrentProc]==Fetching) {
684         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
685               CurrentTSO->id, CurrentTSO, CurrentProc);
686         goto next_thread;
687       } 
688       /* Ignore ContinueThreads for completed threads */
689       if (CurrentTSO->what_next == ThreadComplete) {
690         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
691               CurrentTSO->id, CurrentTSO, CurrentProc);
692         goto next_thread;
693       } 
694       /* Ignore ContinueThreads for threads that are being migrated */
695       if (PROCS(CurrentTSO)==Nowhere) { 
696         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
697               CurrentTSO->id, CurrentTSO, CurrentProc);
698         goto next_thread;
699       }
700       /* The thread should be at the beginning of the run queue */
701       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
702         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
703               CurrentTSO->id, CurrentTSO, CurrentProc);
704         break; // run the thread anyway
705       }
706       /*
707       new_event(proc, proc, CurrentTime[proc],
708                 FindWork,
709                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
710       goto next_thread; 
711       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
712       break; // now actually run the thread; DaH Qu'vam yImuHbej 
713
714     case FetchNode:
715       do_the_fetchnode(event);
716       goto next_thread;             /* handle next event in event queue  */
717       
718     case GlobalBlock:
719       do_the_globalblock(event);
720       goto next_thread;             /* handle next event in event queue  */
721       
722     case FetchReply:
723       do_the_fetchreply(event);
724       goto next_thread;             /* handle next event in event queue  */
725       
726     case UnblockThread:   /* Move from the blocked queue to the tail of */
727       do_the_unblock(event);
728       goto next_thread;             /* handle next event in event queue  */
729       
730     case ResumeThread:  /* Move from the blocked queue to the tail of */
731       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
732       event->tso->gran.blocktime += 
733         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
734       do_the_startthread(event);
735       goto next_thread;             /* handle next event in event queue  */
736       
737     case StartThread:
738       do_the_startthread(event);
739       goto next_thread;             /* handle next event in event queue  */
740       
741     case MoveThread:
742       do_the_movethread(event);
743       goto next_thread;             /* handle next event in event queue  */
744       
745     case MoveSpark:
746       do_the_movespark(event);
747       goto next_thread;             /* handle next event in event queue  */
748       
749     case FindWork:
750       do_the_findwork(event);
751       goto next_thread;             /* handle next event in event queue  */
752       
753     default:
754       barf("Illegal event type %u\n", event->evttype);
755     }  /* switch */
756     
757     /* This point was scheduler_loop in the old RTS */
758
759     IF_DEBUG(gran, belch("GRAN: after main switch"));
760
761     TimeOfLastEvent = CurrentTime[CurrentProc];
762     TimeOfNextEvent = get_time_of_next_event();
763     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
764     // CurrentTSO = ThreadQueueHd;
765
766     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
767                          TimeOfNextEvent));
768
769     if (RtsFlags.GranFlags.Light) 
770       GranSimLight_leave_system(event, &ActiveTSO); 
771
772     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
773
774     IF_DEBUG(gran, 
775              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
776
777     /* in a GranSim setup the TSO stays on the run queue */
778     t = CurrentTSO;
779     /* Take a thread from the run queue. */
780     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
781
782     IF_DEBUG(gran, 
783              fprintf(stderr, "GRAN: About to run current thread, which is\n");
784              G_TSO(t,5));
785
786     context_switch = 0; // turned on via GranYield, checking events and time slice
787
788     IF_DEBUG(gran, 
789              DumpGranEvent(GR_SCHEDULE, t));
790
791     procStatus[CurrentProc] = Busy;
792
793 #elif defined(PAR)
794     if (PendingFetches != END_BF_QUEUE) {
795         processFetches();
796     }
797
798     /* ToDo: phps merge with spark activation above */
799     /* check whether we have local work and send requests if we have none */
800     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
801       /* :-[  no local threads => look out for local sparks */
802       /* the spark pool for the current PE */
803       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
804       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
805           pool->hd < pool->tl) {
806         /* 
807          * ToDo: add GC code check that we really have enough heap afterwards!!
808          * Old comment:
809          * If we're here (no runnable threads) and we have pending
810          * sparks, we must have a space problem.  Get enough space
811          * to turn one of those pending sparks into a
812          * thread... 
813          */
814
815         spark = findSpark(rtsFalse);                /* get a spark */
816         if (spark != (rtsSpark) NULL) {
817           tso = activateSpark(spark);       /* turn the spark into a thread */
818           IF_PAR_DEBUG(schedule,
819                        belch("==== schedule: Created TSO %d (%p); %d threads active",
820                              tso->id, tso, advisory_thread_count));
821
822           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
823             belch("==^^ failed to activate spark");
824             goto next_thread;
825           }               /* otherwise fall through & pick-up new tso */
826         } else {
827           IF_PAR_DEBUG(verbose,
828                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
829                              spark_queue_len(pool)));
830           goto next_thread;
831         }
832       }
833
834       /* If we still have no work we need to send a FISH to get a spark
835          from another PE 
836       */
837       if (EMPTY_RUN_QUEUE()) {
838       /* =8-[  no local sparks => look for work on other PEs */
839         /*
840          * We really have absolutely no work.  Send out a fish
841          * (there may be some out there already), and wait for
842          * something to arrive.  We clearly can't run any threads
843          * until a SCHEDULE or RESUME arrives, and so that's what
844          * we're hoping to see.  (Of course, we still have to
845          * respond to other types of messages.)
846          */
847         TIME now = msTime() /*CURRENT_TIME*/;
848         IF_PAR_DEBUG(verbose, 
849                      belch("--  now=%ld", now));
850         IF_PAR_DEBUG(verbose,
851                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
852                          (last_fish_arrived_at!=0 &&
853                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
854                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
855                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
856                              last_fish_arrived_at,
857                              RtsFlags.ParFlags.fishDelay, now);
858                      });
859         
860         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
861             (last_fish_arrived_at==0 ||
862              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
863           /* outstandingFishes is set in sendFish, processFish;
864              avoid flooding system with fishes via delay */
865           pe = choosePE();
866           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
867                    NEW_FISH_HUNGER);
868
869           // Global statistics: count no. of fishes
870           if (RtsFlags.ParFlags.ParStats.Global &&
871               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
872             globalParStats.tot_fish_mess++;
873           }
874         }
875       
876         receivedFinish = processMessages();
877         goto next_thread;
878       }
879     } else if (PacketsWaiting()) {  /* Look for incoming messages */
880       receivedFinish = processMessages();
881     }
882
883     /* Now we are sure that we have some work available */
884     ASSERT(run_queue_hd != END_TSO_QUEUE);
885
886     /* Take a thread from the run queue, if we have work */
887     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
888     IF_DEBUG(sanity,checkTSO(t));
889
890     /* ToDo: write something to the log-file
891     if (RTSflags.ParFlags.granSimStats && !sameThread)
892         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
893
894     CurrentTSO = t;
895     */
896     /* the spark pool for the current PE */
897     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
898
899     IF_DEBUG(scheduler, 
900              belch("--=^ %d threads, %d sparks on [%#x]", 
901                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
902
903 #if 1
904     if (0 && RtsFlags.ParFlags.ParStats.Full && 
905         t && LastTSO && t->id != LastTSO->id && 
906         LastTSO->why_blocked == NotBlocked && 
907         LastTSO->what_next != ThreadComplete) {
908       // if previously scheduled TSO not blocked we have to record the context switch
909       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
910                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
911     }
912
913     if (RtsFlags.ParFlags.ParStats.Full && 
914         (emitSchedule /* forced emit */ ||
915         (t && LastTSO && t->id != LastTSO->id))) {
916       /* 
917          we are running a different TSO, so write a schedule event to log file
918          NB: If we use fair scheduling we also have to write  a deschedule 
919              event for LastTSO; with unfair scheduling we know that the
920              previous tso has blocked whenever we switch to another tso, so
921              we don't need it in GUM for now
922       */
923       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
924                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
925       emitSchedule = rtsFalse;
926     }
927      
928 #endif
929 #else /* !GRAN && !PAR */
930   
931     /* grab a thread from the run queue
932      */
933     ASSERT(run_queue_hd != END_TSO_QUEUE);
934     t = POP_RUN_QUEUE();
935
936     // Sanity check the thread we're about to run.  This can be
937     // expensive if there is lots of thread switching going on...
938     IF_DEBUG(sanity,checkTSO(t));
939
940 #endif
941     
942 #ifdef SMP
943     grabCapability(&cap);
944 #else
945     cap = &MainCapability;
946 #endif
947
948     cap->r.rCurrentTSO = t;
949     
950     /* context switches are now initiated by the timer signal, unless
951      * the user specified "context switch as often as possible", with
952      * +RTS -C0
953      */
954     if (
955 #ifdef PROFILING
956         RtsFlags.ProfFlags.profileInterval == 0 ||
957 #endif
958         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
959          && (run_queue_hd != END_TSO_QUEUE
960              || blocked_queue_hd != END_TSO_QUEUE
961              || sleeping_queue != END_TSO_QUEUE)))
962         context_switch = 1;
963     else
964         context_switch = 0;
965
966     RELEASE_LOCK(&sched_mutex);
967
968     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
969                               t->id, t, whatNext_strs[t->what_next]));
970
971 #ifdef PROFILING
972     startHeapProfTimer();
973 #endif
974
975     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
976     /* Run the current thread 
977      */
978     switch (cap->r.rCurrentTSO->what_next) {
979     case ThreadKilled:
980     case ThreadComplete:
981         /* Thread already finished, return to scheduler. */
982         ret = ThreadFinished;
983         break;
984     case ThreadEnterGHC:
985         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
986         break;
987     case ThreadRunGHC:
988         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
989         break;
990     case ThreadEnterInterp:
991         ret = interpretBCO(cap);
992         break;
993     default:
994       barf("schedule: invalid what_next field");
995     }
996     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
997     
998     /* Costs for the scheduler are assigned to CCS_SYSTEM */
999 #ifdef PROFILING
1000     stopHeapProfTimer();
1001     CCCS = CCS_SYSTEM;
1002 #endif
1003     
1004     ACQUIRE_LOCK(&sched_mutex);
1005
1006 #ifdef SMP
1007     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1008 #elif !defined(GRAN) && !defined(PAR)
1009     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1010 #endif
1011     t = cap->r.rCurrentTSO;
1012     
1013 #if defined(PAR)
1014     /* HACK 675: if the last thread didn't yield, make sure to print a 
1015        SCHEDULE event to the log file when StgRunning the next thread, even
1016        if it is the same one as before */
1017     LastTSO = t; 
1018     TimeOfLastYield = CURRENT_TIME;
1019 #endif
1020
1021     switch (ret) {
1022     case HeapOverflow:
1023 #if defined(GRAN)
1024       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1025       globalGranStats.tot_heapover++;
1026 #elif defined(PAR)
1027       globalParStats.tot_heapover++;
1028 #endif
1029
1030       // did the task ask for a large block?
1031       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1032           // if so, get one and push it on the front of the nursery.
1033           bdescr *bd;
1034           nat blocks;
1035           
1036           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1037
1038           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1039                                    t->id, t,
1040                                    whatNext_strs[t->what_next], blocks));
1041
1042           // don't do this if it would push us over the
1043           // alloc_blocks_lim limit; we'll GC first.
1044           if (alloc_blocks + blocks < alloc_blocks_lim) {
1045
1046               alloc_blocks += blocks;
1047               bd = allocGroup( blocks );
1048
1049               // link the new group into the list
1050               bd->link = cap->r.rCurrentNursery;
1051               bd->u.back = cap->r.rCurrentNursery->u.back;
1052               if (cap->r.rCurrentNursery->u.back != NULL) {
1053                   cap->r.rCurrentNursery->u.back->link = bd;
1054               } else {
1055                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1056                          g0s0->blocks == cap->r.rNursery);
1057                   cap->r.rNursery = g0s0->blocks = bd;
1058               }           
1059               cap->r.rCurrentNursery->u.back = bd;
1060
1061               // initialise it as a nursery block
1062               bd->step = g0s0;
1063               bd->gen_no = 0;
1064               bd->flags = 0;
1065               bd->free = bd->start;
1066
1067               // don't forget to update the block count in g0s0.
1068               g0s0->n_blocks += blocks;
1069               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1070
1071               // now update the nursery to point to the new block
1072               cap->r.rCurrentNursery = bd;
1073
1074               // we might be unlucky and have another thread get on the
1075               // run queue before us and steal the large block, but in that
1076               // case the thread will just end up requesting another large
1077               // block.
1078               PUSH_ON_RUN_QUEUE(t);
1079               break;
1080           }
1081       }
1082
1083       /* make all the running tasks block on a condition variable,
1084        * maybe set context_switch and wait till they all pile in,
1085        * then have them wait on a GC condition variable.
1086        */
1087       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1088                                t->id, t, whatNext_strs[t->what_next]));
1089       threadPaused(t);
1090 #if defined(GRAN)
1091       ASSERT(!is_on_queue(t,CurrentProc));
1092 #elif defined(PAR)
1093       /* Currently we emit a DESCHEDULE event before GC in GUM.
1094          ToDo: either add separate event to distinguish SYSTEM time from rest
1095                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1096       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1097         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1098                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1099         emitSchedule = rtsTrue;
1100       }
1101 #endif
1102       
1103       ready_to_gc = rtsTrue;
1104       context_switch = 1;               /* stop other threads ASAP */
1105       PUSH_ON_RUN_QUEUE(t);
1106       /* actual GC is done at the end of the while loop */
1107       break;
1108       
1109     case StackOverflow:
1110 #if defined(GRAN)
1111       IF_DEBUG(gran, 
1112                DumpGranEvent(GR_DESCHEDULE, t));
1113       globalGranStats.tot_stackover++;
1114 #elif defined(PAR)
1115       // IF_DEBUG(par, 
1116       // DumpGranEvent(GR_DESCHEDULE, t);
1117       globalParStats.tot_stackover++;
1118 #endif
1119       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1120                                t->id, t, whatNext_strs[t->what_next]));
1121       /* just adjust the stack for this thread, then pop it back
1122        * on the run queue.
1123        */
1124       threadPaused(t);
1125       { 
1126         StgMainThread *m;
1127         /* enlarge the stack */
1128         StgTSO *new_t = threadStackOverflow(t);
1129         
1130         /* This TSO has moved, so update any pointers to it from the
1131          * main thread stack.  It better not be on any other queues...
1132          * (it shouldn't be).
1133          */
1134         for (m = main_threads; m != NULL; m = m->link) {
1135           if (m->tso == t) {
1136             m->tso = new_t;
1137           }
1138         }
1139         threadPaused(new_t);
1140         PUSH_ON_RUN_QUEUE(new_t);
1141       }
1142       break;
1143
1144     case ThreadYielding:
1145 #if defined(GRAN)
1146       IF_DEBUG(gran, 
1147                DumpGranEvent(GR_DESCHEDULE, t));
1148       globalGranStats.tot_yields++;
1149 #elif defined(PAR)
1150       // IF_DEBUG(par, 
1151       // DumpGranEvent(GR_DESCHEDULE, t);
1152       globalParStats.tot_yields++;
1153 #endif
1154       /* put the thread back on the run queue.  Then, if we're ready to
1155        * GC, check whether this is the last task to stop.  If so, wake
1156        * up the GC thread.  getThread will block during a GC until the
1157        * GC is finished.
1158        */
1159       IF_DEBUG(scheduler,
1160                if (t->what_next == ThreadEnterInterp) {
1161                    /* ToDo: or maybe a timer expired when we were in Hugs?
1162                     * or maybe someone hit ctrl-C
1163                     */
1164                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1165                          t->id, t, whatNext_strs[t->what_next]);
1166                } else {
1167                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1168                          t->id, t, whatNext_strs[t->what_next]);
1169                }
1170                );
1171
1172       threadPaused(t);
1173
1174       IF_DEBUG(sanity,
1175                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1176                checkTSO(t));
1177       ASSERT(t->link == END_TSO_QUEUE);
1178 #if defined(GRAN)
1179       ASSERT(!is_on_queue(t,CurrentProc));
1180
1181       IF_DEBUG(sanity,
1182                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1183                checkThreadQsSanity(rtsTrue));
1184 #endif
1185 #if defined(PAR)
1186       if (RtsFlags.ParFlags.doFairScheduling) { 
1187         /* this does round-robin scheduling; good for concurrency */
1188         APPEND_TO_RUN_QUEUE(t);
1189       } else {
1190         /* this does unfair scheduling; good for parallelism */
1191         PUSH_ON_RUN_QUEUE(t);
1192       }
1193 #else
1194       /* this does round-robin scheduling; good for concurrency */
1195       APPEND_TO_RUN_QUEUE(t);
1196 #endif
1197 #if defined(GRAN)
1198       /* add a ContinueThread event to actually process the thread */
1199       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1200                 ContinueThread,
1201                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1202       IF_GRAN_DEBUG(bq, 
1203                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1204                G_EVENTQ(0);
1205                G_CURR_THREADQ(0));
1206 #endif /* GRAN */
1207       break;
1208       
1209     case ThreadBlocked:
1210 #if defined(GRAN)
1211       IF_DEBUG(scheduler,
1212                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1213                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1214                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1215
1216       // ??? needed; should emit block before
1217       IF_DEBUG(gran, 
1218                DumpGranEvent(GR_DESCHEDULE, t)); 
1219       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1220       /*
1221         ngoq Dogh!
1222       ASSERT(procStatus[CurrentProc]==Busy || 
1223               ((procStatus[CurrentProc]==Fetching) && 
1224               (t->block_info.closure!=(StgClosure*)NULL)));
1225       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1226           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1227             procStatus[CurrentProc]==Fetching)) 
1228         procStatus[CurrentProc] = Idle;
1229       */
1230 #elif defined(PAR)
1231       IF_DEBUG(scheduler,
1232                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1233                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1234       IF_PAR_DEBUG(bq,
1235
1236                    if (t->block_info.closure!=(StgClosure*)NULL) 
1237                      print_bq(t->block_info.closure));
1238
1239       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1240       blockThread(t);
1241
1242       /* whatever we schedule next, we must log that schedule */
1243       emitSchedule = rtsTrue;
1244
1245 #else /* !GRAN */
1246       /* don't need to do anything.  Either the thread is blocked on
1247        * I/O, in which case we'll have called addToBlockedQueue
1248        * previously, or it's blocked on an MVar or Blackhole, in which
1249        * case it'll be on the relevant queue already.
1250        */
1251       IF_DEBUG(scheduler,
1252                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1253                printThreadBlockage(t);
1254                fprintf(stderr, "\n"));
1255
1256       /* Only for dumping event to log file 
1257          ToDo: do I need this in GranSim, too?
1258       blockThread(t);
1259       */
1260 #endif
1261       threadPaused(t);
1262       break;
1263       
1264     case ThreadFinished:
1265       /* Need to check whether this was a main thread, and if so, signal
1266        * the task that started it with the return value.  If we have no
1267        * more main threads, we probably need to stop all the tasks until
1268        * we get a new one.
1269        */
1270       /* We also end up here if the thread kills itself with an
1271        * uncaught exception, see Exception.hc.
1272        */
1273       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1274 #if defined(GRAN)
1275       endThread(t, CurrentProc); // clean-up the thread
1276 #elif defined(PAR)
1277       /* For now all are advisory -- HWL */
1278       //if(t->priority==AdvisoryPriority) ??
1279       advisory_thread_count--;
1280       
1281 # ifdef DIST
1282       if(t->dist.priority==RevalPriority)
1283         FinishReval(t);
1284 # endif
1285       
1286       if (RtsFlags.ParFlags.ParStats.Full &&
1287           !RtsFlags.ParFlags.ParStats.Suppressed) 
1288         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1289 #endif
1290       break;
1291       
1292     default:
1293       barf("schedule: invalid thread return code %d", (int)ret);
1294     }
1295     
1296 #ifdef SMP
1297     grabCapability(&cap);
1298 #endif
1299
1300 #ifdef PROFILING
1301     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1302         GarbageCollect(GetRoots, rtsTrue);
1303         heapCensus();
1304         performHeapProfile = rtsFalse;
1305         ready_to_gc = rtsFalse; // we already GC'd
1306     }
1307 #endif
1308
1309 #ifdef SMP
1310     if (ready_to_gc && allFreeCapabilities() )
1311 #else
1312     if (ready_to_gc) 
1313 #endif
1314       {
1315       /* everybody back, start the GC.
1316        * Could do it in this thread, or signal a condition var
1317        * to do it in another thread.  Either way, we need to
1318        * broadcast on gc_pending_cond afterward.
1319        */
1320 #if defined(RTS_SUPPORTS_THREADS)
1321       IF_DEBUG(scheduler,sched_belch("doing GC"));
1322 #endif
1323       GarbageCollect(GetRoots,rtsFalse);
1324       ready_to_gc = rtsFalse;
1325 #ifdef SMP
1326       broadcastCondVar(&gc_pending_cond);
1327 #endif
1328 #if defined(GRAN)
1329       /* add a ContinueThread event to continue execution of current thread */
1330       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1331                 ContinueThread,
1332                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1333       IF_GRAN_DEBUG(bq, 
1334                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1335                G_EVENTQ(0);
1336                G_CURR_THREADQ(0));
1337 #endif /* GRAN */
1338     }
1339
1340 #if defined(GRAN)
1341   next_thread:
1342     IF_GRAN_DEBUG(unused,
1343                   print_eventq(EventHd));
1344
1345     event = get_next_event();
1346 #elif defined(PAR)
1347   next_thread:
1348     /* ToDo: wait for next message to arrive rather than busy wait */
1349 #endif /* GRAN */
1350
1351   } /* end of while(1) */
1352
1353   IF_PAR_DEBUG(verbose,
1354                belch("== Leaving schedule() after having received Finish"));
1355 }
1356
1357 /* ---------------------------------------------------------------------------
1358  * deleteAllThreads():  kill all the live threads.
1359  *
1360  * This is used when we catch a user interrupt (^C), before performing
1361  * any necessary cleanups and running finalizers.
1362  * ------------------------------------------------------------------------- */
1363    
1364 void deleteAllThreads ( void )
1365 {
1366   StgTSO* t;
1367   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1368   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1369       deleteThread(t);
1370   }
1371   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1372       deleteThread(t);
1373   }
1374   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1375       deleteThread(t);
1376   }
1377   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1378   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1379   sleeping_queue = END_TSO_QUEUE;
1380 }
1381
1382 /* startThread and  insertThread are now in GranSim.c -- HWL */
1383
1384 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1385 //@subsection Suspend and Resume
1386
1387 /* ---------------------------------------------------------------------------
1388  * Suspending & resuming Haskell threads.
1389  * 
1390  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1391  * its capability before calling the C function.  This allows another
1392  * task to pick up the capability and carry on running Haskell
1393  * threads.  It also means that if the C call blocks, it won't lock
1394  * the whole system.
1395  *
1396  * The Haskell thread making the C call is put to sleep for the
1397  * duration of the call, on the susepended_ccalling_threads queue.  We
1398  * give out a token to the task, which it can use to resume the thread
1399  * on return from the C function.
1400  * ------------------------------------------------------------------------- */
1401    
1402 StgInt
1403 suspendThread( StgRegTable *reg )
1404 {
1405   nat tok;
1406   Capability *cap;
1407
1408   // assume that *reg is a pointer to the StgRegTable part of a Capability
1409   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1410
1411   ACQUIRE_LOCK(&sched_mutex);
1412
1413   IF_DEBUG(scheduler,
1414            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1415
1416   threadPaused(cap->r.rCurrentTSO);
1417   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1418   suspended_ccalling_threads = cap->r.rCurrentTSO;
1419
1420   /* Use the thread ID as the token; it should be unique */
1421   tok = cap->r.rCurrentTSO->id;
1422
1423 #ifdef SMP
1424   /* Hand back capability */
1425   releaseCapability(&cap);
1426 #endif
1427
1428   RELEASE_LOCK(&sched_mutex);
1429   return tok; 
1430 }
1431
1432 StgRegTable *
1433 resumeThread( StgInt tok )
1434 {
1435   StgTSO *tso, **prev;
1436   Capability *cap;
1437
1438   ACQUIRE_LOCK(&sched_mutex);
1439
1440   prev = &suspended_ccalling_threads;
1441   for (tso = suspended_ccalling_threads; 
1442        tso != END_TSO_QUEUE; 
1443        prev = &tso->link, tso = tso->link) {
1444     if (tso->id == (StgThreadID)tok) {
1445       *prev = tso->link;
1446       break;
1447     }
1448   }
1449   if (tso == END_TSO_QUEUE) {
1450     barf("resumeThread: thread not found");
1451   }
1452   tso->link = END_TSO_QUEUE;
1453
1454 #ifdef SMP
1455   while ( noFreeCapabilities() ) {
1456     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1457     waitCondVar(&thread_ready_cond, &sched_mutex);
1458     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1459   }
1460   grabCapability(&cap);
1461 #else  
1462   cap = &MainCapability;
1463 #endif
1464
1465   cap->r.rCurrentTSO = tso;
1466
1467   RELEASE_LOCK(&sched_mutex);
1468   return &cap->r;
1469 }
1470
1471
1472 /* ---------------------------------------------------------------------------
1473  * Static functions
1474  * ------------------------------------------------------------------------ */
1475 static void unblockThread(StgTSO *tso);
1476
1477 /* ---------------------------------------------------------------------------
1478  * Comparing Thread ids.
1479  *
1480  * This is used from STG land in the implementation of the
1481  * instances of Eq/Ord for ThreadIds.
1482  * ------------------------------------------------------------------------ */
1483
1484 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1485
1486   StgThreadID id1 = tso1->id; 
1487   StgThreadID id2 = tso2->id;
1488  
1489   if (id1 < id2) return (-1);
1490   if (id1 > id2) return 1;
1491   return 0;
1492 }
1493
1494 /* ---------------------------------------------------------------------------
1495  * Fetching the ThreadID from an StgTSO.
1496  *
1497  * This is used in the implementation of Show for ThreadIds.
1498  * ------------------------------------------------------------------------ */
1499 int rts_getThreadId(const StgTSO *tso) 
1500 {
1501   return tso->id;
1502 }
1503
1504 /* ---------------------------------------------------------------------------
1505    Create a new thread.
1506
1507    The new thread starts with the given stack size.  Before the
1508    scheduler can run, however, this thread needs to have a closure
1509    (and possibly some arguments) pushed on its stack.  See
1510    pushClosure() in Schedule.h.
1511
1512    createGenThread() and createIOThread() (in SchedAPI.h) are
1513    convenient packaged versions of this function.
1514
1515    currently pri (priority) is only used in a GRAN setup -- HWL
1516    ------------------------------------------------------------------------ */
1517 //@cindex createThread
1518 #if defined(GRAN)
1519 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1520 StgTSO *
1521 createThread(nat stack_size, StgInt pri)
1522 {
1523   return createThread_(stack_size, rtsFalse, pri);
1524 }
1525
1526 static StgTSO *
1527 createThread_(nat size, rtsBool have_lock, StgInt pri)
1528 {
1529 #else
1530 StgTSO *
1531 createThread(nat stack_size)
1532 {
1533   return createThread_(stack_size, rtsFalse);
1534 }
1535
1536 static StgTSO *
1537 createThread_(nat size, rtsBool have_lock)
1538 {
1539 #endif
1540
1541     StgTSO *tso;
1542     nat stack_size;
1543
1544     /* First check whether we should create a thread at all */
1545 #if defined(PAR)
1546   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1547   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1548     threadsIgnored++;
1549     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1550           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1551     return END_TSO_QUEUE;
1552   }
1553   threadsCreated++;
1554 #endif
1555
1556 #if defined(GRAN)
1557   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1558 #endif
1559
1560   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1561
1562   /* catch ridiculously small stack sizes */
1563   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1564     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1565   }
1566
1567   stack_size = size - TSO_STRUCT_SIZEW;
1568
1569   tso = (StgTSO *)allocate(size);
1570   TICK_ALLOC_TSO(stack_size, 0);
1571
1572   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1573 #if defined(GRAN)
1574   SET_GRAN_HDR(tso, ThisPE);
1575 #endif
1576   tso->what_next     = ThreadEnterGHC;
1577
1578   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1579    * protect the increment operation on next_thread_id.
1580    * In future, we could use an atomic increment instead.
1581    */
1582   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1583   tso->id = next_thread_id++; 
1584   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1585
1586   tso->why_blocked  = NotBlocked;
1587   tso->blocked_exceptions = NULL;
1588
1589   tso->stack_size   = stack_size;
1590   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1591                               - TSO_STRUCT_SIZEW;
1592   tso->sp           = (P_)&(tso->stack) + stack_size;
1593
1594 #ifdef PROFILING
1595   tso->prof.CCCS = CCS_MAIN;
1596 #endif
1597
1598   /* put a stop frame on the stack */
1599   tso->sp -= sizeofW(StgStopFrame);
1600   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1601   tso->su = (StgUpdateFrame*)tso->sp;
1602
1603   // ToDo: check this
1604 #if defined(GRAN)
1605   tso->link = END_TSO_QUEUE;
1606   /* uses more flexible routine in GranSim */
1607   insertThread(tso, CurrentProc);
1608 #else
1609   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1610    * from its creation
1611    */
1612 #endif
1613
1614 #if defined(GRAN) 
1615   if (RtsFlags.GranFlags.GranSimStats.Full) 
1616     DumpGranEvent(GR_START,tso);
1617 #elif defined(PAR)
1618   if (RtsFlags.ParFlags.ParStats.Full) 
1619     DumpGranEvent(GR_STARTQ,tso);
1620   /* HACk to avoid SCHEDULE 
1621      LastTSO = tso; */
1622 #endif
1623
1624   /* Link the new thread on the global thread list.
1625    */
1626   tso->global_link = all_threads;
1627   all_threads = tso;
1628
1629 #if defined(DIST)
1630   tso->dist.priority = MandatoryPriority; //by default that is...
1631 #endif
1632
1633 #if defined(GRAN)
1634   tso->gran.pri = pri;
1635 # if defined(DEBUG)
1636   tso->gran.magic = TSO_MAGIC; // debugging only
1637 # endif
1638   tso->gran.sparkname   = 0;
1639   tso->gran.startedat   = CURRENT_TIME; 
1640   tso->gran.exported    = 0;
1641   tso->gran.basicblocks = 0;
1642   tso->gran.allocs      = 0;
1643   tso->gran.exectime    = 0;
1644   tso->gran.fetchtime   = 0;
1645   tso->gran.fetchcount  = 0;
1646   tso->gran.blocktime   = 0;
1647   tso->gran.blockcount  = 0;
1648   tso->gran.blockedat   = 0;
1649   tso->gran.globalsparks = 0;
1650   tso->gran.localsparks  = 0;
1651   if (RtsFlags.GranFlags.Light)
1652     tso->gran.clock  = Now; /* local clock */
1653   else
1654     tso->gran.clock  = 0;
1655
1656   IF_DEBUG(gran,printTSO(tso));
1657 #elif defined(PAR)
1658 # if defined(DEBUG)
1659   tso->par.magic = TSO_MAGIC; // debugging only
1660 # endif
1661   tso->par.sparkname   = 0;
1662   tso->par.startedat   = CURRENT_TIME; 
1663   tso->par.exported    = 0;
1664   tso->par.basicblocks = 0;
1665   tso->par.allocs      = 0;
1666   tso->par.exectime    = 0;
1667   tso->par.fetchtime   = 0;
1668   tso->par.fetchcount  = 0;
1669   tso->par.blocktime   = 0;
1670   tso->par.blockcount  = 0;
1671   tso->par.blockedat   = 0;
1672   tso->par.globalsparks = 0;
1673   tso->par.localsparks  = 0;
1674 #endif
1675
1676 #if defined(GRAN)
1677   globalGranStats.tot_threads_created++;
1678   globalGranStats.threads_created_on_PE[CurrentProc]++;
1679   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1680   globalGranStats.tot_sq_probes++;
1681 #elif defined(PAR)
1682   // collect parallel global statistics (currently done together with GC stats)
1683   if (RtsFlags.ParFlags.ParStats.Global &&
1684       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1685     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1686     globalParStats.tot_threads_created++;
1687   }
1688 #endif 
1689
1690 #if defined(GRAN)
1691   IF_GRAN_DEBUG(pri,
1692                 belch("==__ schedule: Created TSO %d (%p);",
1693                       CurrentProc, tso, tso->id));
1694 #elif defined(PAR)
1695     IF_PAR_DEBUG(verbose,
1696                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1697                        tso->id, tso, advisory_thread_count));
1698 #else
1699   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1700                                  tso->id, tso->stack_size));
1701 #endif    
1702   return tso;
1703 }
1704
1705 #if defined(PAR)
1706 /* RFP:
1707    all parallel thread creation calls should fall through the following routine.
1708 */
1709 StgTSO *
1710 createSparkThread(rtsSpark spark) 
1711 { StgTSO *tso;
1712   ASSERT(spark != (rtsSpark)NULL);
1713   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1714   { threadsIgnored++;
1715     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1716           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1717     return END_TSO_QUEUE;
1718   }
1719   else
1720   { threadsCreated++;
1721     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1722     if (tso==END_TSO_QUEUE)     
1723       barf("createSparkThread: Cannot create TSO");
1724 #if defined(DIST)
1725     tso->priority = AdvisoryPriority;
1726 #endif
1727     pushClosure(tso,spark);
1728     PUSH_ON_RUN_QUEUE(tso);
1729     advisory_thread_count++;    
1730   }
1731   return tso;
1732 }
1733 #endif
1734
1735 /*
1736   Turn a spark into a thread.
1737   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1738 */
1739 #if defined(PAR)
1740 //@cindex activateSpark
1741 StgTSO *
1742 activateSpark (rtsSpark spark) 
1743 {
1744   StgTSO *tso;
1745
1746   tso = createSparkThread(spark);
1747   if (RtsFlags.ParFlags.ParStats.Full) {   
1748     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1749     IF_PAR_DEBUG(verbose,
1750                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1751                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1752   }
1753   // ToDo: fwd info on local/global spark to thread -- HWL
1754   // tso->gran.exported =  spark->exported;
1755   // tso->gran.locked =   !spark->global;
1756   // tso->gran.sparkname = spark->name;
1757
1758   return tso;
1759 }
1760 #endif
1761
1762 /* ---------------------------------------------------------------------------
1763  * scheduleThread()
1764  *
1765  * scheduleThread puts a thread on the head of the runnable queue.
1766  * This will usually be done immediately after a thread is created.
1767  * The caller of scheduleThread must create the thread using e.g.
1768  * createThread and push an appropriate closure
1769  * on this thread's stack before the scheduler is invoked.
1770  * ------------------------------------------------------------------------ */
1771
1772 void
1773 scheduleThread(StgTSO *tso)
1774 {
1775   ACQUIRE_LOCK(&sched_mutex);
1776
1777   /* Put the new thread on the head of the runnable queue.  The caller
1778    * better push an appropriate closure on this thread's stack
1779    * beforehand.  In the SMP case, the thread may start running as
1780    * soon as we release the scheduler lock below.
1781    */
1782   PUSH_ON_RUN_QUEUE(tso);
1783   THREAD_RUNNABLE();
1784
1785 #if 0
1786   IF_DEBUG(scheduler,printTSO(tso));
1787 #endif
1788   RELEASE_LOCK(&sched_mutex);
1789 }
1790
1791 /* ---------------------------------------------------------------------------
1792  * startTasks()
1793  *
1794  * Start up Posix threads to run each of the scheduler tasks.
1795  * I believe the task ids are not needed in the system as defined.
1796  *  KH @ 25/10/99
1797  * ------------------------------------------------------------------------ */
1798
1799 #if defined(PAR) || defined(SMP)
1800 void
1801 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1802 {
1803   schedule();
1804 }
1805 #endif
1806
1807 /* ---------------------------------------------------------------------------
1808  * initScheduler()
1809  *
1810  * Initialise the scheduler.  This resets all the queues - if the
1811  * queues contained any threads, they'll be garbage collected at the
1812  * next pass.
1813  *
1814  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1815  * ------------------------------------------------------------------------ */
1816
1817 #ifdef SMP
1818 static void
1819 term_handler(int sig STG_UNUSED)
1820 {
1821   stat_workerStop();
1822   ACQUIRE_LOCK(&term_mutex);
1823   await_death--;
1824   RELEASE_LOCK(&term_mutex);
1825   shutdownThread();
1826 }
1827 #endif
1828
1829 void 
1830 initScheduler(void)
1831 {
1832 #if defined(GRAN)
1833   nat i;
1834
1835   for (i=0; i<=MAX_PROC; i++) {
1836     run_queue_hds[i]      = END_TSO_QUEUE;
1837     run_queue_tls[i]      = END_TSO_QUEUE;
1838     blocked_queue_hds[i]  = END_TSO_QUEUE;
1839     blocked_queue_tls[i]  = END_TSO_QUEUE;
1840     ccalling_threadss[i]  = END_TSO_QUEUE;
1841     sleeping_queue        = END_TSO_QUEUE;
1842   }
1843 #else
1844   run_queue_hd      = END_TSO_QUEUE;
1845   run_queue_tl      = END_TSO_QUEUE;
1846   blocked_queue_hd  = END_TSO_QUEUE;
1847   blocked_queue_tl  = END_TSO_QUEUE;
1848   sleeping_queue    = END_TSO_QUEUE;
1849 #endif 
1850
1851   suspended_ccalling_threads  = END_TSO_QUEUE;
1852
1853   main_threads = NULL;
1854   all_threads  = END_TSO_QUEUE;
1855
1856   context_switch = 0;
1857   interrupted    = 0;
1858
1859   RtsFlags.ConcFlags.ctxtSwitchTicks =
1860       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1861
1862   /* Install the SIGHUP handler */
1863 #ifdef SMP
1864   {
1865     struct sigaction action,oact;
1866
1867     action.sa_handler = term_handler;
1868     sigemptyset(&action.sa_mask);
1869     action.sa_flags = 0;
1870     if (sigaction(SIGTERM, &action, &oact) != 0) {
1871       barf("can't install TERM handler");
1872     }
1873   }
1874 #endif
1875
1876 #ifdef SMP
1877   /* Allocate N Capabilities */
1878   initCapabilities(RtsFlags.ParFlags.nNodes);
1879 #else
1880   initCapability(&MainCapability);
1881 #endif
1882
1883 #if /* defined(SMP) ||*/ defined(PAR)
1884   initSparkPools();
1885 #endif
1886 }
1887
1888 #ifdef SMP
1889 void
1890 startTasks( void )
1891 {
1892   nat i;
1893   int r;
1894   OSThreadId tid;
1895   
1896   /* make some space for saving all the thread ids */
1897   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1898                             "initScheduler:task_ids");
1899   
1900   /* and create all the threads */
1901   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1902     r = createOSThread(&tid,taskStart);
1903     if (r != 0) {
1904       barf("startTasks: Can't create new Posix thread");
1905     }
1906     task_ids[i].id = tid;
1907     task_ids[i].mut_time = 0.0;
1908     task_ids[i].mut_etime = 0.0;
1909     task_ids[i].gc_time = 0.0;
1910     task_ids[i].gc_etime = 0.0;
1911     task_ids[i].elapsedtimestart = stat_getElapsedTime();
1912     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1913   }
1914 }
1915 #endif
1916
1917 void
1918 exitScheduler( void )
1919 {
1920 #ifdef SMP
1921   nat i;
1922
1923   /* Don't want to use pthread_cancel, since we'd have to install
1924    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1925    * all our locks.
1926    */
1927 #if 0
1928   /* Cancel all our tasks */
1929   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1930     pthread_cancel(task_ids[i].id);
1931   }
1932   
1933   /* Wait for all the tasks to terminate */
1934   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1935     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1936                                task_ids[i].id));
1937     pthread_join(task_ids[i].id, NULL);
1938   }
1939 #endif
1940
1941   /* Send 'em all a SIGHUP.  That should shut 'em up.
1942    */
1943   await_death = RtsFlags.ParFlags.nNodes;
1944   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1945     pthread_kill(task_ids[i].id,SIGTERM);
1946   }
1947   while (await_death > 0) {
1948     sched_yield();
1949   }
1950 #endif
1951 }
1952
1953 /* -----------------------------------------------------------------------------
1954    Managing the per-task allocation areas.
1955    
1956    Each capability comes with an allocation area.  These are
1957    fixed-length block lists into which allocation can be done.
1958
1959    ToDo: no support for two-space collection at the moment???
1960    -------------------------------------------------------------------------- */
1961
1962 /* -----------------------------------------------------------------------------
1963  * waitThread is the external interface for running a new computation
1964  * and waiting for the result.
1965  *
1966  * In the non-SMP case, we create a new main thread, push it on the 
1967  * main-thread stack, and invoke the scheduler to run it.  The
1968  * scheduler will return when the top main thread on the stack has
1969  * completed or died, and fill in the necessary fields of the
1970  * main_thread structure.
1971  *
1972  * In the SMP case, we create a main thread as before, but we then
1973  * create a new condition variable and sleep on it.  When our new
1974  * main thread has completed, we'll be woken up and the status/result
1975  * will be in the main_thread struct.
1976  * -------------------------------------------------------------------------- */
1977
1978 int 
1979 howManyThreadsAvail ( void )
1980 {
1981    int i = 0;
1982    StgTSO* q;
1983    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1984       i++;
1985    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1986       i++;
1987    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1988       i++;
1989    return i;
1990 }
1991
1992 void
1993 finishAllThreads ( void )
1994 {
1995    do {
1996       while (run_queue_hd != END_TSO_QUEUE) {
1997          waitThread ( run_queue_hd, NULL );
1998       }
1999       while (blocked_queue_hd != END_TSO_QUEUE) {
2000          waitThread ( blocked_queue_hd, NULL );
2001       }
2002       while (sleeping_queue != END_TSO_QUEUE) {
2003          waitThread ( blocked_queue_hd, NULL );
2004       }
2005    } while 
2006       (blocked_queue_hd != END_TSO_QUEUE || 
2007        run_queue_hd     != END_TSO_QUEUE ||
2008        sleeping_queue   != END_TSO_QUEUE);
2009 }
2010
2011 SchedulerStatus
2012 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2013 {
2014   StgMainThread *m;
2015   SchedulerStatus stat;
2016
2017   ACQUIRE_LOCK(&sched_mutex);
2018   
2019   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2020
2021   m->tso = tso;
2022   m->ret = ret;
2023   m->stat = NoStatus;
2024 #if defined(RTS_SUPPORTS_THREADS)
2025   initCondVar(&m->wakeup);
2026 #endif
2027
2028   m->link = main_threads;
2029   main_threads = m;
2030
2031   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2032                               m->tso->id));
2033
2034 #ifdef SMP
2035   do {
2036     waitCondVar(&m->wakeup, &sched_mutex);
2037   } while (m->stat == NoStatus);
2038 #elif defined(GRAN)
2039   /* GranSim specific init */
2040   CurrentTSO = m->tso;                // the TSO to run
2041   procStatus[MainProc] = Busy;        // status of main PE
2042   CurrentProc = MainProc;             // PE to run it on
2043
2044   schedule();
2045 #else
2046   schedule();
2047   ASSERT(m->stat != NoStatus);
2048 #endif
2049
2050   stat = m->stat;
2051
2052 #if defined(RTS_SUPPORTS_THREADS)
2053   closeCondVar(&m->wakeup);
2054 #endif
2055
2056   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2057                               m->tso->id));
2058   free(m);
2059
2060   RELEASE_LOCK(&sched_mutex);
2061
2062   return stat;
2063 }
2064
2065 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2066 //@subsection Run queue code 
2067
2068 #if 0
2069 /* 
2070    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2071        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2072        implicit global variable that has to be correct when calling these
2073        fcts -- HWL 
2074 */
2075
2076 /* Put the new thread on the head of the runnable queue.
2077  * The caller of createThread better push an appropriate closure
2078  * on this thread's stack before the scheduler is invoked.
2079  */
2080 static /* inline */ void
2081 add_to_run_queue(tso)
2082 StgTSO* tso; 
2083 {
2084   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2085   tso->link = run_queue_hd;
2086   run_queue_hd = tso;
2087   if (run_queue_tl == END_TSO_QUEUE) {
2088     run_queue_tl = tso;
2089   }
2090 }
2091
2092 /* Put the new thread at the end of the runnable queue. */
2093 static /* inline */ void
2094 push_on_run_queue(tso)
2095 StgTSO* tso; 
2096 {
2097   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2098   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2099   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2100   if (run_queue_hd == END_TSO_QUEUE) {
2101     run_queue_hd = tso;
2102   } else {
2103     run_queue_tl->link = tso;
2104   }
2105   run_queue_tl = tso;
2106 }
2107
2108 /* 
2109    Should be inlined because it's used very often in schedule.  The tso
2110    argument is actually only needed in GranSim, where we want to have the
2111    possibility to schedule *any* TSO on the run queue, irrespective of the
2112    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2113    the run queue and dequeue the tso, adjusting the links in the queue. 
2114 */
2115 //@cindex take_off_run_queue
2116 static /* inline */ StgTSO*
2117 take_off_run_queue(StgTSO *tso) {
2118   StgTSO *t, *prev;
2119
2120   /* 
2121      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2122
2123      if tso is specified, unlink that tso from the run_queue (doesn't have
2124      to be at the beginning of the queue); GranSim only 
2125   */
2126   if (tso!=END_TSO_QUEUE) {
2127     /* find tso in queue */
2128     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2129          t!=END_TSO_QUEUE && t!=tso;
2130          prev=t, t=t->link) 
2131       /* nothing */ ;
2132     ASSERT(t==tso);
2133     /* now actually dequeue the tso */
2134     if (prev!=END_TSO_QUEUE) {
2135       ASSERT(run_queue_hd!=t);
2136       prev->link = t->link;
2137     } else {
2138       /* t is at beginning of thread queue */
2139       ASSERT(run_queue_hd==t);
2140       run_queue_hd = t->link;
2141     }
2142     /* t is at end of thread queue */
2143     if (t->link==END_TSO_QUEUE) {
2144       ASSERT(t==run_queue_tl);
2145       run_queue_tl = prev;
2146     } else {
2147       ASSERT(run_queue_tl!=t);
2148     }
2149     t->link = END_TSO_QUEUE;
2150   } else {
2151     /* take tso from the beginning of the queue; std concurrent code */
2152     t = run_queue_hd;
2153     if (t != END_TSO_QUEUE) {
2154       run_queue_hd = t->link;
2155       t->link = END_TSO_QUEUE;
2156       if (run_queue_hd == END_TSO_QUEUE) {
2157         run_queue_tl = END_TSO_QUEUE;
2158       }
2159     }
2160   }
2161   return t;
2162 }
2163
2164 #endif /* 0 */
2165
2166 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2167 //@subsection Garbage Collextion Routines
2168
2169 /* ---------------------------------------------------------------------------
2170    Where are the roots that we know about?
2171
2172         - all the threads on the runnable queue
2173         - all the threads on the blocked queue
2174         - all the threads on the sleeping queue
2175         - all the thread currently executing a _ccall_GC
2176         - all the "main threads"
2177      
2178    ------------------------------------------------------------------------ */
2179
2180 /* This has to be protected either by the scheduler monitor, or by the
2181         garbage collection monitor (probably the latter).
2182         KH @ 25/10/99
2183 */
2184
2185 void
2186 GetRoots(evac_fn evac)
2187 {
2188   StgMainThread *m;
2189
2190 #if defined(GRAN)
2191   {
2192     nat i;
2193     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2194       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2195           evac((StgClosure **)&run_queue_hds[i]);
2196       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2197           evac((StgClosure **)&run_queue_tls[i]);
2198       
2199       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2200           evac((StgClosure **)&blocked_queue_hds[i]);
2201       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2202           evac((StgClosure **)&blocked_queue_tls[i]);
2203       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2204           evac((StgClosure **)&ccalling_threads[i]);
2205     }
2206   }
2207
2208   markEventQueue();
2209
2210 #else /* !GRAN */
2211   if (run_queue_hd != END_TSO_QUEUE) {
2212       ASSERT(run_queue_tl != END_TSO_QUEUE);
2213       evac((StgClosure **)&run_queue_hd);
2214       evac((StgClosure **)&run_queue_tl);
2215   }
2216   
2217   if (blocked_queue_hd != END_TSO_QUEUE) {
2218       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2219       evac((StgClosure **)&blocked_queue_hd);
2220       evac((StgClosure **)&blocked_queue_tl);
2221   }
2222   
2223   if (sleeping_queue != END_TSO_QUEUE) {
2224       evac((StgClosure **)&sleeping_queue);
2225   }
2226 #endif 
2227
2228   for (m = main_threads; m != NULL; m = m->link) {
2229       evac((StgClosure **)&m->tso);
2230   }
2231   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2232       evac((StgClosure **)&suspended_ccalling_threads);
2233   }
2234
2235 #if defined(PAR) || defined(GRAN)
2236   markSparkQueue(evac);
2237 #endif
2238 }
2239
2240 /* -----------------------------------------------------------------------------
2241    performGC
2242
2243    This is the interface to the garbage collector from Haskell land.
2244    We provide this so that external C code can allocate and garbage
2245    collect when called from Haskell via _ccall_GC.
2246
2247    It might be useful to provide an interface whereby the programmer
2248    can specify more roots (ToDo).
2249    
2250    This needs to be protected by the GC condition variable above.  KH.
2251    -------------------------------------------------------------------------- */
2252
2253 void (*extra_roots)(evac_fn);
2254
2255 void
2256 performGC(void)
2257 {
2258   GarbageCollect(GetRoots,rtsFalse);
2259 }
2260
2261 void
2262 performMajorGC(void)
2263 {
2264   GarbageCollect(GetRoots,rtsTrue);
2265 }
2266
2267 static void
2268 AllRoots(evac_fn evac)
2269 {
2270     GetRoots(evac);             // the scheduler's roots
2271     extra_roots(evac);          // the user's roots
2272 }
2273
2274 void
2275 performGCWithRoots(void (*get_roots)(evac_fn))
2276 {
2277   extra_roots = get_roots;
2278   GarbageCollect(AllRoots,rtsFalse);
2279 }
2280
2281 /* -----------------------------------------------------------------------------
2282    Stack overflow
2283
2284    If the thread has reached its maximum stack size, then raise the
2285    StackOverflow exception in the offending thread.  Otherwise
2286    relocate the TSO into a larger chunk of memory and adjust its stack
2287    size appropriately.
2288    -------------------------------------------------------------------------- */
2289
2290 static StgTSO *
2291 threadStackOverflow(StgTSO *tso)
2292 {
2293   nat new_stack_size, new_tso_size, diff, stack_words;
2294   StgPtr new_sp;
2295   StgTSO *dest;
2296
2297   IF_DEBUG(sanity,checkTSO(tso));
2298   if (tso->stack_size >= tso->max_stack_size) {
2299
2300     IF_DEBUG(gc,
2301              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2302                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2303              /* If we're debugging, just print out the top of the stack */
2304              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2305                                               tso->sp+64)));
2306
2307     /* Send this thread the StackOverflow exception */
2308     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2309     return tso;
2310   }
2311
2312   /* Try to double the current stack size.  If that takes us over the
2313    * maximum stack size for this thread, then use the maximum instead.
2314    * Finally round up so the TSO ends up as a whole number of blocks.
2315    */
2316   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2317   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2318                                        TSO_STRUCT_SIZE)/sizeof(W_);
2319   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2320   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2321
2322   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2323
2324   dest = (StgTSO *)allocate(new_tso_size);
2325   TICK_ALLOC_TSO(new_stack_size,0);
2326
2327   /* copy the TSO block and the old stack into the new area */
2328   memcpy(dest,tso,TSO_STRUCT_SIZE);
2329   stack_words = tso->stack + tso->stack_size - tso->sp;
2330   new_sp = (P_)dest + new_tso_size - stack_words;
2331   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2332
2333   /* relocate the stack pointers... */
2334   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2335   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2336   dest->sp    = new_sp;
2337   dest->stack_size = new_stack_size;
2338         
2339   /* and relocate the update frame list */
2340   relocate_stack(dest, diff);
2341
2342   /* Mark the old TSO as relocated.  We have to check for relocated
2343    * TSOs in the garbage collector and any primops that deal with TSOs.
2344    *
2345    * It's important to set the sp and su values to just beyond the end
2346    * of the stack, so we don't attempt to scavenge any part of the
2347    * dead TSO's stack.
2348    */
2349   tso->what_next = ThreadRelocated;
2350   tso->link = dest;
2351   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2352   tso->su = (StgUpdateFrame *)tso->sp;
2353   tso->why_blocked = NotBlocked;
2354   dest->mut_link = NULL;
2355
2356   IF_PAR_DEBUG(verbose,
2357                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2358                      tso->id, tso, tso->stack_size);
2359                /* If we're debugging, just print out the top of the stack */
2360                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2361                                                 tso->sp+64)));
2362   
2363   IF_DEBUG(sanity,checkTSO(tso));
2364 #if 0
2365   IF_DEBUG(scheduler,printTSO(dest));
2366 #endif
2367
2368   return dest;
2369 }
2370
2371 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2372 //@subsection Blocking Queue Routines
2373
2374 /* ---------------------------------------------------------------------------
2375    Wake up a queue that was blocked on some resource.
2376    ------------------------------------------------------------------------ */
2377
2378 #if defined(GRAN)
2379 static inline void
2380 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2381 {
2382 }
2383 #elif defined(PAR)
2384 static inline void
2385 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2386 {
2387   /* write RESUME events to log file and
2388      update blocked and fetch time (depending on type of the orig closure) */
2389   if (RtsFlags.ParFlags.ParStats.Full) {
2390     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2391                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2392                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2393     if (EMPTY_RUN_QUEUE())
2394       emitSchedule = rtsTrue;
2395
2396     switch (get_itbl(node)->type) {
2397         case FETCH_ME_BQ:
2398           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2399           break;
2400         case RBH:
2401         case FETCH_ME:
2402         case BLACKHOLE_BQ:
2403           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2404           break;
2405 #ifdef DIST
2406         case MVAR:
2407           break;
2408 #endif    
2409         default:
2410           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2411         }
2412       }
2413 }
2414 #endif
2415
2416 #if defined(GRAN)
2417 static StgBlockingQueueElement *
2418 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2419 {
2420     StgTSO *tso;
2421     PEs node_loc, tso_loc;
2422
2423     node_loc = where_is(node); // should be lifted out of loop
2424     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2425     tso_loc = where_is((StgClosure *)tso);
2426     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2427       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2428       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2429       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2430       // insertThread(tso, node_loc);
2431       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2432                 ResumeThread,
2433                 tso, node, (rtsSpark*)NULL);
2434       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2435       // len_local++;
2436       // len++;
2437     } else { // TSO is remote (actually should be FMBQ)
2438       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2439                                   RtsFlags.GranFlags.Costs.gunblocktime +
2440                                   RtsFlags.GranFlags.Costs.latency;
2441       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2442                 UnblockThread,
2443                 tso, node, (rtsSpark*)NULL);
2444       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2445       // len++;
2446     }
2447     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2448     IF_GRAN_DEBUG(bq,
2449                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2450                           (node_loc==tso_loc ? "Local" : "Global"), 
2451                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2452     tso->block_info.closure = NULL;
2453     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2454                              tso->id, tso));
2455 }
2456 #elif defined(PAR)
2457 static StgBlockingQueueElement *
2458 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2459 {
2460     StgBlockingQueueElement *next;
2461
2462     switch (get_itbl(bqe)->type) {
2463     case TSO:
2464       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2465       /* if it's a TSO just push it onto the run_queue */
2466       next = bqe->link;
2467       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2468       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2469       THREAD_RUNNABLE();
2470       unblockCount(bqe, node);
2471       /* reset blocking status after dumping event */
2472       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2473       break;
2474
2475     case BLOCKED_FETCH:
2476       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2477       next = bqe->link;
2478       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2479       PendingFetches = (StgBlockedFetch *)bqe;
2480       break;
2481
2482 # if defined(DEBUG)
2483       /* can ignore this case in a non-debugging setup; 
2484          see comments on RBHSave closures above */
2485     case CONSTR:
2486       /* check that the closure is an RBHSave closure */
2487       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2488              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2489              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2490       break;
2491
2492     default:
2493       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2494            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2495            (StgClosure *)bqe);
2496 # endif
2497     }
2498   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2499   return next;
2500 }
2501
2502 #else /* !GRAN && !PAR */
2503 static StgTSO *
2504 unblockOneLocked(StgTSO *tso)
2505 {
2506   StgTSO *next;
2507
2508   ASSERT(get_itbl(tso)->type == TSO);
2509   ASSERT(tso->why_blocked != NotBlocked);
2510   tso->why_blocked = NotBlocked;
2511   next = tso->link;
2512   PUSH_ON_RUN_QUEUE(tso);
2513   THREAD_RUNNABLE();
2514   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2515   return next;
2516 }
2517 #endif
2518
2519 #if defined(GRAN) || defined(PAR)
2520 inline StgBlockingQueueElement *
2521 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2522 {
2523   ACQUIRE_LOCK(&sched_mutex);
2524   bqe = unblockOneLocked(bqe, node);
2525   RELEASE_LOCK(&sched_mutex);
2526   return bqe;
2527 }
2528 #else
2529 inline StgTSO *
2530 unblockOne(StgTSO *tso)
2531 {
2532   ACQUIRE_LOCK(&sched_mutex);
2533   tso = unblockOneLocked(tso);
2534   RELEASE_LOCK(&sched_mutex);
2535   return tso;
2536 }
2537 #endif
2538
2539 #if defined(GRAN)
2540 void 
2541 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2542 {
2543   StgBlockingQueueElement *bqe;
2544   PEs node_loc;
2545   nat len = 0; 
2546
2547   IF_GRAN_DEBUG(bq, 
2548                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2549                       node, CurrentProc, CurrentTime[CurrentProc], 
2550                       CurrentTSO->id, CurrentTSO));
2551
2552   node_loc = where_is(node);
2553
2554   ASSERT(q == END_BQ_QUEUE ||
2555          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2556          get_itbl(q)->type == CONSTR); // closure (type constructor)
2557   ASSERT(is_unique(node));
2558
2559   /* FAKE FETCH: magically copy the node to the tso's proc;
2560      no Fetch necessary because in reality the node should not have been 
2561      moved to the other PE in the first place
2562   */
2563   if (CurrentProc!=node_loc) {
2564     IF_GRAN_DEBUG(bq, 
2565                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2566                         node, node_loc, CurrentProc, CurrentTSO->id, 
2567                         // CurrentTSO, where_is(CurrentTSO),
2568                         node->header.gran.procs));
2569     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2570     IF_GRAN_DEBUG(bq, 
2571                   belch("## new bitmask of node %p is %#x",
2572                         node, node->header.gran.procs));
2573     if (RtsFlags.GranFlags.GranSimStats.Global) {
2574       globalGranStats.tot_fake_fetches++;
2575     }
2576   }
2577
2578   bqe = q;
2579   // ToDo: check: ASSERT(CurrentProc==node_loc);
2580   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2581     //next = bqe->link;
2582     /* 
2583        bqe points to the current element in the queue
2584        next points to the next element in the queue
2585     */
2586     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2587     //tso_loc = where_is(tso);
2588     len++;
2589     bqe = unblockOneLocked(bqe, node);
2590   }
2591
2592   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2593      the closure to make room for the anchor of the BQ */
2594   if (bqe!=END_BQ_QUEUE) {
2595     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2596     /*
2597     ASSERT((info_ptr==&RBH_Save_0_info) ||
2598            (info_ptr==&RBH_Save_1_info) ||
2599            (info_ptr==&RBH_Save_2_info));
2600     */
2601     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2602     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2603     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2604
2605     IF_GRAN_DEBUG(bq,
2606                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2607                         node, info_type(node)));
2608   }
2609
2610   /* statistics gathering */
2611   if (RtsFlags.GranFlags.GranSimStats.Global) {
2612     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2613     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2614     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2615     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2616   }
2617   IF_GRAN_DEBUG(bq,
2618                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2619                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2620 }
2621 #elif defined(PAR)
2622 void 
2623 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2624 {
2625   StgBlockingQueueElement *bqe;
2626
2627   ACQUIRE_LOCK(&sched_mutex);
2628
2629   IF_PAR_DEBUG(verbose, 
2630                belch("##-_ AwBQ for node %p on [%x]: ",
2631                      node, mytid));
2632 #ifdef DIST  
2633   //RFP
2634   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2635     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2636     return;
2637   }
2638 #endif
2639   
2640   ASSERT(q == END_BQ_QUEUE ||
2641          get_itbl(q)->type == TSO ||           
2642          get_itbl(q)->type == BLOCKED_FETCH || 
2643          get_itbl(q)->type == CONSTR); 
2644
2645   bqe = q;
2646   while (get_itbl(bqe)->type==TSO || 
2647          get_itbl(bqe)->type==BLOCKED_FETCH) {
2648     bqe = unblockOneLocked(bqe, node);
2649   }
2650   RELEASE_LOCK(&sched_mutex);
2651 }
2652
2653 #else   /* !GRAN && !PAR */
2654 void
2655 awakenBlockedQueue(StgTSO *tso)
2656 {
2657   ACQUIRE_LOCK(&sched_mutex);
2658   while (tso != END_TSO_QUEUE) {
2659     tso = unblockOneLocked(tso);
2660   }
2661   RELEASE_LOCK(&sched_mutex);
2662 }
2663 #endif
2664
2665 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2666 //@subsection Exception Handling Routines
2667
2668 /* ---------------------------------------------------------------------------
2669    Interrupt execution
2670    - usually called inside a signal handler so it mustn't do anything fancy.   
2671    ------------------------------------------------------------------------ */
2672
2673 void
2674 interruptStgRts(void)
2675 {
2676     interrupted    = 1;
2677     context_switch = 1;
2678 }
2679
2680 /* -----------------------------------------------------------------------------
2681    Unblock a thread
2682
2683    This is for use when we raise an exception in another thread, which
2684    may be blocked.
2685    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2686    -------------------------------------------------------------------------- */
2687
2688 #if defined(GRAN) || defined(PAR)
2689 /*
2690   NB: only the type of the blocking queue is different in GranSim and GUM
2691       the operations on the queue-elements are the same
2692       long live polymorphism!
2693 */
2694 static void
2695 unblockThread(StgTSO *tso)
2696 {
2697   StgBlockingQueueElement *t, **last;
2698
2699   ACQUIRE_LOCK(&sched_mutex);
2700   switch (tso->why_blocked) {
2701
2702   case NotBlocked:
2703     return;  /* not blocked */
2704
2705   case BlockedOnMVar:
2706     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2707     {
2708       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2709       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2710
2711       last = (StgBlockingQueueElement **)&mvar->head;
2712       for (t = (StgBlockingQueueElement *)mvar->head; 
2713            t != END_BQ_QUEUE; 
2714            last = &t->link, last_tso = t, t = t->link) {
2715         if (t == (StgBlockingQueueElement *)tso) {
2716           *last = (StgBlockingQueueElement *)tso->link;
2717           if (mvar->tail == tso) {
2718             mvar->tail = (StgTSO *)last_tso;
2719           }
2720           goto done;
2721         }
2722       }
2723       barf("unblockThread (MVAR): TSO not found");
2724     }
2725
2726   case BlockedOnBlackHole:
2727     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2728     {
2729       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2730
2731       last = &bq->blocking_queue;
2732       for (t = bq->blocking_queue; 
2733            t != END_BQ_QUEUE; 
2734            last = &t->link, t = t->link) {
2735         if (t == (StgBlockingQueueElement *)tso) {
2736           *last = (StgBlockingQueueElement *)tso->link;
2737           goto done;
2738         }
2739       }
2740       barf("unblockThread (BLACKHOLE): TSO not found");
2741     }
2742
2743   case BlockedOnException:
2744     {
2745       StgTSO *target  = tso->block_info.tso;
2746
2747       ASSERT(get_itbl(target)->type == TSO);
2748
2749       if (target->what_next == ThreadRelocated) {
2750           target = target->link;
2751           ASSERT(get_itbl(target)->type == TSO);
2752       }
2753
2754       ASSERT(target->blocked_exceptions != NULL);
2755
2756       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2757       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2758            t != END_BQ_QUEUE; 
2759            last = &t->link, t = t->link) {
2760         ASSERT(get_itbl(t)->type == TSO);
2761         if (t == (StgBlockingQueueElement *)tso) {
2762           *last = (StgBlockingQueueElement *)tso->link;
2763           goto done;
2764         }
2765       }
2766       barf("unblockThread (Exception): TSO not found");
2767     }
2768
2769   case BlockedOnRead:
2770   case BlockedOnWrite:
2771     {
2772       /* take TSO off blocked_queue */
2773       StgBlockingQueueElement *prev = NULL;
2774       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2775            prev = t, t = t->link) {
2776         if (t == (StgBlockingQueueElement *)tso) {
2777           if (prev == NULL) {
2778             blocked_queue_hd = (StgTSO *)t->link;
2779             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2780               blocked_queue_tl = END_TSO_QUEUE;
2781             }
2782           } else {
2783             prev->link = t->link;
2784             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2785               blocked_queue_tl = (StgTSO *)prev;
2786             }
2787           }
2788           goto done;
2789         }
2790       }
2791       barf("unblockThread (I/O): TSO not found");
2792     }
2793
2794   case BlockedOnDelay:
2795     {
2796       /* take TSO off sleeping_queue */
2797       StgBlockingQueueElement *prev = NULL;
2798       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2799            prev = t, t = t->link) {
2800         if (t == (StgBlockingQueueElement *)tso) {
2801           if (prev == NULL) {
2802             sleeping_queue = (StgTSO *)t->link;
2803           } else {
2804             prev->link = t->link;
2805           }
2806           goto done;
2807         }
2808       }
2809       barf("unblockThread (I/O): TSO not found");
2810     }
2811
2812   default:
2813     barf("unblockThread");
2814   }
2815
2816  done:
2817   tso->link = END_TSO_QUEUE;
2818   tso->why_blocked = NotBlocked;
2819   tso->block_info.closure = NULL;
2820   PUSH_ON_RUN_QUEUE(tso);
2821   RELEASE_LOCK(&sched_mutex);
2822 }
2823 #else
2824 static void
2825 unblockThread(StgTSO *tso)
2826 {
2827   StgTSO *t, **last;
2828
2829   ACQUIRE_LOCK(&sched_mutex);
2830   switch (tso->why_blocked) {
2831
2832   case NotBlocked:
2833     return;  /* not blocked */
2834
2835   case BlockedOnMVar:
2836     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2837     {
2838       StgTSO *last_tso = END_TSO_QUEUE;
2839       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2840
2841       last = &mvar->head;
2842       for (t = mvar->head; t != END_TSO_QUEUE; 
2843            last = &t->link, last_tso = t, t = t->link) {
2844         if (t == tso) {
2845           *last = tso->link;
2846           if (mvar->tail == tso) {
2847             mvar->tail = last_tso;
2848           }
2849           goto done;
2850         }
2851       }
2852       barf("unblockThread (MVAR): TSO not found");
2853     }
2854
2855   case BlockedOnBlackHole:
2856     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2857     {
2858       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2859
2860       last = &bq->blocking_queue;
2861       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2862            last = &t->link, t = t->link) {
2863         if (t == tso) {
2864           *last = tso->link;
2865           goto done;
2866         }
2867       }
2868       barf("unblockThread (BLACKHOLE): TSO not found");
2869     }
2870
2871   case BlockedOnException:
2872     {
2873       StgTSO *target  = tso->block_info.tso;
2874
2875       ASSERT(get_itbl(target)->type == TSO);
2876
2877       while (target->what_next == ThreadRelocated) {
2878           target = target->link;
2879           ASSERT(get_itbl(target)->type == TSO);
2880       }
2881       
2882       ASSERT(target->blocked_exceptions != NULL);
2883
2884       last = &target->blocked_exceptions;
2885       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2886            last = &t->link, t = t->link) {
2887         ASSERT(get_itbl(t)->type == TSO);
2888         if (t == tso) {
2889           *last = tso->link;
2890           goto done;
2891         }
2892       }
2893       barf("unblockThread (Exception): TSO not found");
2894     }
2895
2896   case BlockedOnRead:
2897   case BlockedOnWrite:
2898     {
2899       StgTSO *prev = NULL;
2900       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2901            prev = t, t = t->link) {
2902         if (t == tso) {
2903           if (prev == NULL) {
2904             blocked_queue_hd = t->link;
2905             if (blocked_queue_tl == t) {
2906               blocked_queue_tl = END_TSO_QUEUE;
2907             }
2908           } else {
2909             prev->link = t->link;
2910             if (blocked_queue_tl == t) {
2911               blocked_queue_tl = prev;
2912             }
2913           }
2914           goto done;
2915         }
2916       }
2917       barf("unblockThread (I/O): TSO not found");
2918     }
2919
2920   case BlockedOnDelay:
2921     {
2922       StgTSO *prev = NULL;
2923       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2924            prev = t, t = t->link) {
2925         if (t == tso) {
2926           if (prev == NULL) {
2927             sleeping_queue = t->link;
2928           } else {
2929             prev->link = t->link;
2930           }
2931           goto done;
2932         }
2933       }
2934       barf("unblockThread (I/O): TSO not found");
2935     }
2936
2937   default:
2938     barf("unblockThread");
2939   }
2940
2941  done:
2942   tso->link = END_TSO_QUEUE;
2943   tso->why_blocked = NotBlocked;
2944   tso->block_info.closure = NULL;
2945   PUSH_ON_RUN_QUEUE(tso);
2946   RELEASE_LOCK(&sched_mutex);
2947 }
2948 #endif
2949
2950 /* -----------------------------------------------------------------------------
2951  * raiseAsync()
2952  *
2953  * The following function implements the magic for raising an
2954  * asynchronous exception in an existing thread.
2955  *
2956  * We first remove the thread from any queue on which it might be
2957  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2958  *
2959  * We strip the stack down to the innermost CATCH_FRAME, building
2960  * thunks in the heap for all the active computations, so they can 
2961  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2962  * an application of the handler to the exception, and push it on
2963  * the top of the stack.
2964  * 
2965  * How exactly do we save all the active computations?  We create an
2966  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2967  * AP_UPDs pushes everything from the corresponding update frame
2968  * upwards onto the stack.  (Actually, it pushes everything up to the
2969  * next update frame plus a pointer to the next AP_UPD object.
2970  * Entering the next AP_UPD object pushes more onto the stack until we
2971  * reach the last AP_UPD object - at which point the stack should look
2972  * exactly as it did when we killed the TSO and we can continue
2973  * execution by entering the closure on top of the stack.
2974  *
2975  * We can also kill a thread entirely - this happens if either (a) the 
2976  * exception passed to raiseAsync is NULL, or (b) there's no
2977  * CATCH_FRAME on the stack.  In either case, we strip the entire
2978  * stack and replace the thread with a zombie.
2979  *
2980  * -------------------------------------------------------------------------- */
2981  
2982 void 
2983 deleteThread(StgTSO *tso)
2984 {
2985   raiseAsync(tso,NULL);
2986 }
2987
2988 void
2989 raiseAsync(StgTSO *tso, StgClosure *exception)
2990 {
2991   StgUpdateFrame* su = tso->su;
2992   StgPtr          sp = tso->sp;
2993   
2994   /* Thread already dead? */
2995   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2996     return;
2997   }
2998
2999   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3000
3001   /* Remove it from any blocking queues */
3002   unblockThread(tso);
3003
3004   /* The stack freezing code assumes there's a closure pointer on
3005    * the top of the stack.  This isn't always the case with compiled
3006    * code, so we have to push a dummy closure on the top which just
3007    * returns to the next return address on the stack.
3008    */
3009   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3010     *(--sp) = (W_)&stg_dummy_ret_closure;
3011   }
3012
3013   while (1) {
3014     nat words = ((P_)su - (P_)sp) - 1;
3015     nat i;
3016     StgAP_UPD * ap;
3017
3018     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3019      * then build PAP(handler,exception,realworld#), and leave it on
3020      * top of the stack ready to enter.
3021      */
3022     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3023       StgCatchFrame *cf = (StgCatchFrame *)su;
3024       /* we've got an exception to raise, so let's pass it to the
3025        * handler in this frame.
3026        */
3027       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3028       TICK_ALLOC_UPD_PAP(3,0);
3029       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3030               
3031       ap->n_args = 2;
3032       ap->fun = cf->handler;    /* :: Exception -> IO a */
3033       ap->payload[0] = exception;
3034       ap->payload[1] = ARG_TAG(0); /* realworld token */
3035
3036       /* throw away the stack from Sp up to and including the
3037        * CATCH_FRAME.
3038        */
3039       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3040       tso->su = cf->link;
3041
3042       /* Restore the blocked/unblocked state for asynchronous exceptions
3043        * at the CATCH_FRAME.  
3044        *
3045        * If exceptions were unblocked at the catch, arrange that they
3046        * are unblocked again after executing the handler by pushing an
3047        * unblockAsyncExceptions_ret stack frame.
3048        */
3049       if (!cf->exceptions_blocked) {
3050         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3051       }
3052       
3053       /* Ensure that async exceptions are blocked when running the handler.
3054        */
3055       if (tso->blocked_exceptions == NULL) {
3056         tso->blocked_exceptions = END_TSO_QUEUE;
3057       }
3058       
3059       /* Put the newly-built PAP on top of the stack, ready to execute
3060        * when the thread restarts.
3061        */
3062       sp[0] = (W_)ap;
3063       tso->sp = sp;
3064       tso->what_next = ThreadEnterGHC;
3065       IF_DEBUG(sanity, checkTSO(tso));
3066       return;
3067     }
3068
3069     /* First build an AP_UPD consisting of the stack chunk above the
3070      * current update frame, with the top word on the stack as the
3071      * fun field.
3072      */
3073     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3074     
3075     ASSERT(words >= 0);
3076     
3077     ap->n_args = words;
3078     ap->fun    = (StgClosure *)sp[0];
3079     sp++;
3080     for(i=0; i < (nat)words; ++i) {
3081       ap->payload[i] = (StgClosure *)*sp++;
3082     }
3083     
3084     switch (get_itbl(su)->type) {
3085       
3086     case UPDATE_FRAME:
3087       {
3088         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3089         TICK_ALLOC_UP_THK(words+1,0);
3090         
3091         IF_DEBUG(scheduler,
3092                  fprintf(stderr,  "scheduler: Updating ");
3093                  printPtr((P_)su->updatee); 
3094                  fprintf(stderr,  " with ");
3095                  printObj((StgClosure *)ap);
3096                  );
3097         
3098         /* Replace the updatee with an indirection - happily
3099          * this will also wake up any threads currently
3100          * waiting on the result.
3101          *
3102          * Warning: if we're in a loop, more than one update frame on
3103          * the stack may point to the same object.  Be careful not to
3104          * overwrite an IND_OLDGEN in this case, because we'll screw
3105          * up the mutable lists.  To be on the safe side, don't
3106          * overwrite any kind of indirection at all.  See also
3107          * threadSqueezeStack in GC.c, where we have to make a similar
3108          * check.
3109          */
3110         if (!closure_IND(su->updatee)) {
3111             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3112         }
3113         su = su->link;
3114         sp += sizeofW(StgUpdateFrame) -1;
3115         sp[0] = (W_)ap; /* push onto stack */
3116         break;
3117       }
3118
3119     case CATCH_FRAME:
3120       {
3121         StgCatchFrame *cf = (StgCatchFrame *)su;
3122         StgClosure* o;
3123         
3124         /* We want a PAP, not an AP_UPD.  Fortunately, the
3125          * layout's the same.
3126          */
3127         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3128         TICK_ALLOC_UPD_PAP(words+1,0);
3129         
3130         /* now build o = FUN(catch,ap,handler) */
3131         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3132         TICK_ALLOC_FUN(2,0);
3133         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3134         o->payload[0] = (StgClosure *)ap;
3135         o->payload[1] = cf->handler;
3136         
3137         IF_DEBUG(scheduler,
3138                  fprintf(stderr,  "scheduler: Built ");
3139                  printObj((StgClosure *)o);
3140                  );
3141         
3142         /* pop the old handler and put o on the stack */
3143         su = cf->link;
3144         sp += sizeofW(StgCatchFrame) - 1;
3145         sp[0] = (W_)o;
3146         break;
3147       }
3148       
3149     case SEQ_FRAME:
3150       {
3151         StgSeqFrame *sf = (StgSeqFrame *)su;
3152         StgClosure* o;
3153         
3154         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3155         TICK_ALLOC_UPD_PAP(words+1,0);
3156         
3157         /* now build o = FUN(seq,ap) */
3158         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3159         TICK_ALLOC_SE_THK(1,0);
3160         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3161         o->payload[0] = (StgClosure *)ap;
3162         
3163         IF_DEBUG(scheduler,
3164                  fprintf(stderr,  "scheduler: Built ");
3165                  printObj((StgClosure *)o);
3166                  );
3167         
3168         /* pop the old handler and put o on the stack */
3169         su = sf->link;
3170         sp += sizeofW(StgSeqFrame) - 1;
3171         sp[0] = (W_)o;
3172         break;
3173       }
3174       
3175     case STOP_FRAME:
3176       /* We've stripped the entire stack, the thread is now dead. */
3177       sp += sizeofW(StgStopFrame) - 1;
3178       sp[0] = (W_)exception;    /* save the exception */
3179       tso->what_next = ThreadKilled;
3180       tso->su = (StgUpdateFrame *)(sp+1);
3181       tso->sp = sp;
3182       return;
3183
3184     default:
3185       barf("raiseAsync");
3186     }
3187   }
3188   barf("raiseAsync");
3189 }
3190
3191 /* -----------------------------------------------------------------------------
3192    resurrectThreads is called after garbage collection on the list of
3193    threads found to be garbage.  Each of these threads will be woken
3194    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3195    on an MVar, or NonTermination if the thread was blocked on a Black
3196    Hole.
3197    -------------------------------------------------------------------------- */
3198
3199 void
3200 resurrectThreads( StgTSO *threads )
3201 {
3202   StgTSO *tso, *next;
3203
3204   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3205     next = tso->global_link;
3206     tso->global_link = all_threads;
3207     all_threads = tso;
3208     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3209
3210     switch (tso->why_blocked) {
3211     case BlockedOnMVar:
3212     case BlockedOnException:
3213       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3214       break;
3215     case BlockedOnBlackHole:
3216       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3217       break;
3218     case NotBlocked:
3219       /* This might happen if the thread was blocked on a black hole
3220        * belonging to a thread that we've just woken up (raiseAsync
3221        * can wake up threads, remember...).
3222        */
3223       continue;
3224     default:
3225       barf("resurrectThreads: thread blocked in a strange way");
3226     }
3227   }
3228 }
3229
3230 /* -----------------------------------------------------------------------------
3231  * Blackhole detection: if we reach a deadlock, test whether any
3232  * threads are blocked on themselves.  Any threads which are found to
3233  * be self-blocked get sent a NonTermination exception.
3234  *
3235  * This is only done in a deadlock situation in order to avoid
3236  * performance overhead in the normal case.
3237  * -------------------------------------------------------------------------- */
3238
3239 static void
3240 detectBlackHoles( void )
3241 {
3242     StgTSO *t = all_threads;
3243     StgUpdateFrame *frame;
3244     StgClosure *blocked_on;
3245
3246     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3247
3248         while (t->what_next == ThreadRelocated) {
3249             t = t->link;
3250             ASSERT(get_itbl(t)->type == TSO);
3251         }
3252       
3253         if (t->why_blocked != BlockedOnBlackHole) {
3254             continue;
3255         }
3256
3257         blocked_on = t->block_info.closure;
3258
3259         for (frame = t->su; ; frame = frame->link) {
3260             switch (get_itbl(frame)->type) {
3261
3262             case UPDATE_FRAME:
3263                 if (frame->updatee == blocked_on) {
3264                     /* We are blocking on one of our own computations, so
3265                      * send this thread the NonTermination exception.  
3266                      */
3267                     IF_DEBUG(scheduler, 
3268                              sched_belch("thread %d is blocked on itself", t->id));
3269                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3270                     goto done;
3271                 }
3272                 else {
3273                     continue;
3274                 }
3275
3276             case CATCH_FRAME:
3277             case SEQ_FRAME:
3278                 continue;
3279                 
3280             case STOP_FRAME:
3281                 break;
3282             }
3283             break;
3284         }
3285
3286     done: ;
3287     }   
3288 }
3289
3290 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3291 //@subsection Debugging Routines
3292
3293 /* -----------------------------------------------------------------------------
3294    Debugging: why is a thread blocked
3295    -------------------------------------------------------------------------- */
3296
3297 #ifdef DEBUG
3298
3299 void
3300 printThreadBlockage(StgTSO *tso)
3301 {
3302   switch (tso->why_blocked) {
3303   case BlockedOnRead:
3304     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3305     break;
3306   case BlockedOnWrite:
3307     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3308     break;
3309   case BlockedOnDelay:
3310     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3311     break;
3312   case BlockedOnMVar:
3313     fprintf(stderr,"is blocked on an MVar");
3314     break;
3315   case BlockedOnException:
3316     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3317             tso->block_info.tso->id);
3318     break;
3319   case BlockedOnBlackHole:
3320     fprintf(stderr,"is blocked on a black hole");
3321     break;
3322   case NotBlocked:
3323     fprintf(stderr,"is not blocked");
3324     break;
3325 #if defined(PAR)
3326   case BlockedOnGA:
3327     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3328             tso->block_info.closure, info_type(tso->block_info.closure));
3329     break;
3330   case BlockedOnGA_NoSend:
3331     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3332             tso->block_info.closure, info_type(tso->block_info.closure));
3333     break;
3334 #endif
3335   default:
3336     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3337          tso->why_blocked, tso->id, tso);
3338   }
3339 }
3340
3341 void
3342 printThreadStatus(StgTSO *tso)
3343 {
3344   switch (tso->what_next) {
3345   case ThreadKilled:
3346     fprintf(stderr,"has been killed");
3347     break;
3348   case ThreadComplete:
3349     fprintf(stderr,"has completed");
3350     break;
3351   default:
3352     printThreadBlockage(tso);
3353   }
3354 }
3355
3356 void
3357 printAllThreads(void)
3358 {
3359   StgTSO *t;
3360
3361 # if defined(GRAN)
3362   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3363   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3364                        time_string, rtsFalse/*no commas!*/);
3365
3366   sched_belch("all threads at [%s]:", time_string);
3367 # elif defined(PAR)
3368   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3369   ullong_format_string(CURRENT_TIME,
3370                        time_string, rtsFalse/*no commas!*/);
3371
3372   sched_belch("all threads at [%s]:", time_string);
3373 # else
3374   sched_belch("all threads:");
3375 # endif
3376
3377   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3378     fprintf(stderr, "\tthread %d ", t->id);
3379     printThreadStatus(t);
3380     fprintf(stderr,"\n");
3381   }
3382 }
3383     
3384 /* 
3385    Print a whole blocking queue attached to node (debugging only).
3386 */
3387 //@cindex print_bq
3388 # if defined(PAR)
3389 void 
3390 print_bq (StgClosure *node)
3391 {
3392   StgBlockingQueueElement *bqe;
3393   StgTSO *tso;
3394   rtsBool end;
3395
3396   fprintf(stderr,"## BQ of closure %p (%s): ",
3397           node, info_type(node));
3398
3399   /* should cover all closures that may have a blocking queue */
3400   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3401          get_itbl(node)->type == FETCH_ME_BQ ||
3402          get_itbl(node)->type == RBH ||
3403          get_itbl(node)->type == MVAR);
3404     
3405   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3406
3407   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3408 }
3409
3410 /* 
3411    Print a whole blocking queue starting with the element bqe.
3412 */
3413 void 
3414 print_bqe (StgBlockingQueueElement *bqe)
3415 {
3416   rtsBool end;
3417
3418   /* 
3419      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3420   */
3421   for (end = (bqe==END_BQ_QUEUE);
3422        !end; // iterate until bqe points to a CONSTR
3423        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3424        bqe = end ? END_BQ_QUEUE : bqe->link) {
3425     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3426     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3427     /* types of closures that may appear in a blocking queue */
3428     ASSERT(get_itbl(bqe)->type == TSO ||           
3429            get_itbl(bqe)->type == BLOCKED_FETCH || 
3430            get_itbl(bqe)->type == CONSTR); 
3431     /* only BQs of an RBH end with an RBH_Save closure */
3432     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3433
3434     switch (get_itbl(bqe)->type) {
3435     case TSO:
3436       fprintf(stderr," TSO %u (%x),",
3437               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3438       break;
3439     case BLOCKED_FETCH:
3440       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3441               ((StgBlockedFetch *)bqe)->node, 
3442               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3443               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3444               ((StgBlockedFetch *)bqe)->ga.weight);
3445       break;
3446     case CONSTR:
3447       fprintf(stderr," %s (IP %p),",
3448               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3449                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3450                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3451                "RBH_Save_?"), get_itbl(bqe));
3452       break;
3453     default:
3454       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3455            info_type((StgClosure *)bqe)); // , node, info_type(node));
3456       break;
3457     }
3458   } /* for */
3459   fputc('\n', stderr);
3460 }
3461 # elif defined(GRAN)
3462 void 
3463 print_bq (StgClosure *node)
3464 {
3465   StgBlockingQueueElement *bqe;
3466   PEs node_loc, tso_loc;
3467   rtsBool end;
3468
3469   /* should cover all closures that may have a blocking queue */
3470   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3471          get_itbl(node)->type == FETCH_ME_BQ ||
3472          get_itbl(node)->type == RBH);
3473     
3474   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3475   node_loc = where_is(node);
3476
3477   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3478           node, info_type(node), node_loc);
3479
3480   /* 
3481      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3482   */
3483   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3484        !end; // iterate until bqe points to a CONSTR
3485        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3486     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3487     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3488     /* types of closures that may appear in a blocking queue */
3489     ASSERT(get_itbl(bqe)->type == TSO ||           
3490            get_itbl(bqe)->type == CONSTR); 
3491     /* only BQs of an RBH end with an RBH_Save closure */
3492     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3493
3494     tso_loc = where_is((StgClosure *)bqe);
3495     switch (get_itbl(bqe)->type) {
3496     case TSO:
3497       fprintf(stderr," TSO %d (%p) on [PE %d],",
3498               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3499       break;
3500     case CONSTR:
3501       fprintf(stderr," %s (IP %p),",
3502               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3503                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3504                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3505                "RBH_Save_?"), get_itbl(bqe));
3506       break;
3507     default:
3508       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3509            info_type((StgClosure *)bqe), node, info_type(node));
3510       break;
3511     }
3512   } /* for */
3513   fputc('\n', stderr);
3514 }
3515 #else
3516 /* 
3517    Nice and easy: only TSOs on the blocking queue
3518 */
3519 void 
3520 print_bq (StgClosure *node)
3521 {
3522   StgTSO *tso;
3523
3524   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3525   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3526        tso != END_TSO_QUEUE; 
3527        tso=tso->link) {
3528     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3529     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3530     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3531   }
3532   fputc('\n', stderr);
3533 }
3534 # endif
3535
3536 #if defined(PAR)
3537 static nat
3538 run_queue_len(void)
3539 {
3540   nat i;
3541   StgTSO *tso;
3542
3543   for (i=0, tso=run_queue_hd; 
3544        tso != END_TSO_QUEUE;
3545        i++, tso=tso->link)
3546     /* nothing */
3547
3548   return i;
3549 }
3550 #endif
3551
3552 static void
3553 sched_belch(char *s, ...)
3554 {
3555   va_list ap;
3556   va_start(ap,s);
3557 #ifdef SMP
3558   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3559 #elif defined(PAR)
3560   fprintf(stderr, "== ");
3561 #else
3562   fprintf(stderr, "scheduler: ");
3563 #endif
3564   vfprintf(stderr, s, ap);
3565   fprintf(stderr, "\n");
3566 }
3567
3568 #endif /* DEBUG */
3569
3570
3571 //@node Index,  , Debugging Routines, Main scheduling code
3572 //@subsection Index
3573
3574 //@index
3575 //* MainRegTable::  @cindex\s-+MainRegTable
3576 //* StgMainThread::  @cindex\s-+StgMainThread
3577 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3578 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3579 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3580 //* context_switch::  @cindex\s-+context_switch
3581 //* createThread::  @cindex\s-+createThread
3582 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3583 //* initScheduler::  @cindex\s-+initScheduler
3584 //* interrupted::  @cindex\s-+interrupted
3585 //* next_thread_id::  @cindex\s-+next_thread_id
3586 //* print_bq::  @cindex\s-+print_bq
3587 //* run_queue_hd::  @cindex\s-+run_queue_hd
3588 //* run_queue_tl::  @cindex\s-+run_queue_tl
3589 //* sched_mutex::  @cindex\s-+sched_mutex
3590 //* schedule::  @cindex\s-+schedule
3591 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3592 //* task_ids::  @cindex\s-+task_ids
3593 //* term_mutex::  @cindex\s-+term_mutex
3594 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3595 //@end index