ec0ac22b9f2e8a29ce69cecfcb53dca10c656877
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.106 2001/11/08 16:17:35 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "PosixSource.h"
78 #include "Rts.h"
79 #include "SchedAPI.h"
80 #include "RtsUtils.h"
81 #include "RtsFlags.h"
82 #include "Storage.h"
83 #include "StgRun.h"
84 #include "StgStartup.h"
85 #include "Hooks.h"
86 #include "Schedule.h"
87 #include "StgMiscClosures.h"
88 #include "Storage.h"
89 #include "Interpreter.h"
90 #include "Exception.h"
91 #include "Printer.h"
92 #include "Main.h"
93 #include "Signals.h"
94 #include "Sanity.h"
95 #include "Stats.h"
96 #include "Itimer.h"
97 #include "Prelude.h"
98 #if defined(GRAN) || defined(PAR)
99 # include "GranSimRts.h"
100 # include "GranSim.h"
101 # include "ParallelRts.h"
102 # include "Parallel.h"
103 # include "ParallelDebug.h"
104 # include "FetchMe.h"
105 # include "HLC.h"
106 #endif
107 #include "Sparks.h"
108
109 #include <stdarg.h>
110
111 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
112 //@subsection Variables and Data structures
113
114 /* Main threads:
115  *
116  * These are the threads which clients have requested that we run.  
117  *
118  * In an SMP build, we might have several concurrent clients all
119  * waiting for results, and each one will wait on a condition variable
120  * until the result is available.
121  *
122  * In non-SMP, clients are strictly nested: the first client calls
123  * into the RTS, which might call out again to C with a _ccall_GC, and
124  * eventually re-enter the RTS.
125  *
126  * Main threads information is kept in a linked list:
127  */
128 //@cindex StgMainThread
129 typedef struct StgMainThread_ {
130   StgTSO *         tso;
131   SchedulerStatus  stat;
132   StgClosure **    ret;
133 #ifdef SMP
134   pthread_cond_t wakeup;
135 #endif
136   struct StgMainThread_ *link;
137 } StgMainThread;
138
139 /* Main thread queue.
140  * Locks required: sched_mutex.
141  */
142 static StgMainThread *main_threads;
143
144 /* Thread queues.
145  * Locks required: sched_mutex.
146  */
147 #if defined(GRAN)
148
149 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
150 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
151
152 /* 
153    In GranSim we have a runable and a blocked queue for each processor.
154    In order to minimise code changes new arrays run_queue_hds/tls
155    are created. run_queue_hd is then a short cut (macro) for
156    run_queue_hds[CurrentProc] (see GranSim.h).
157    -- HWL
158 */
159 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
160 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
161 StgTSO *ccalling_threadss[MAX_PROC];
162 /* We use the same global list of threads (all_threads) in GranSim as in
163    the std RTS (i.e. we are cheating). However, we don't use this list in
164    the GranSim specific code at the moment (so we are only potentially
165    cheating).  */
166
167 #else /* !GRAN */
168
169 StgTSO *run_queue_hd, *run_queue_tl;
170 StgTSO *blocked_queue_hd, *blocked_queue_tl;
171 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
172
173 #endif
174
175 /* Linked list of all threads.
176  * Used for detecting garbage collected threads.
177  */
178 StgTSO *all_threads;
179
180 /* Threads suspended in _ccall_GC.
181  */
182 static StgTSO *suspended_ccalling_threads;
183
184 static void GetRoots(evac_fn);
185 static StgTSO *threadStackOverflow(StgTSO *tso);
186
187 /* KH: The following two flags are shared memory locations.  There is no need
188        to lock them, since they are only unset at the end of a scheduler
189        operation.
190 */
191
192 /* flag set by signal handler to precipitate a context switch */
193 //@cindex context_switch
194 nat context_switch;
195
196 /* if this flag is set as well, give up execution */
197 //@cindex interrupted
198 rtsBool interrupted;
199
200 /* Next thread ID to allocate.
201  * Locks required: sched_mutex
202  */
203 //@cindex next_thread_id
204 StgThreadID next_thread_id = 1;
205
206 /*
207  * Pointers to the state of the current thread.
208  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
209  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
210  */
211  
212 /* The smallest stack size that makes any sense is:
213  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
214  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
215  *  + 1                       (the realworld token for an IO thread)
216  *  + 1                       (the closure to enter)
217  *
218  * A thread with this stack will bomb immediately with a stack
219  * overflow, which will increase its stack size.  
220  */
221
222 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
223
224 /* Free capability list.
225  * Locks required: sched_mutex.
226  */
227 #ifdef SMP
228 Capability *free_capabilities; /* Available capabilities for running threads */
229 nat n_free_capabilities;       /* total number of available capabilities */
230 #else
231 Capability MainCapability;     /* for non-SMP, we have one global capability */
232 #endif
233
234 #if defined(GRAN)
235 StgTSO *CurrentTSO;
236 #endif
237
238 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
239  *  exists - earlier gccs apparently didn't.
240  *  -= chak
241  */
242 StgTSO dummy_tso;
243
244 rtsBool ready_to_gc;
245
246 /* All our current task ids, saved in case we need to kill them later.
247  */
248 #ifdef SMP
249 //@cindex task_ids
250 task_info *task_ids;
251 #endif
252
253 void            addToBlockedQueue ( StgTSO *tso );
254
255 static void     schedule          ( void );
256        void     interruptStgRts   ( void );
257 #if defined(GRAN)
258 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
259 #else
260 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
261 #endif
262
263 static void     detectBlackHoles  ( void );
264
265 #ifdef DEBUG
266 static void sched_belch(char *s, ...);
267 #endif
268
269 #ifdef SMP
270 //@cindex sched_mutex
271 //@cindex term_mutex
272 //@cindex thread_ready_cond
273 //@cindex gc_pending_cond
274 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
275 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
276 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
277 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
278
279 nat await_death;
280 #endif
281
282 #if defined(PAR)
283 StgTSO *LastTSO;
284 rtsTime TimeOfLastYield;
285 rtsBool emitSchedule = rtsTrue;
286 #endif
287
288 #if DEBUG
289 char *whatNext_strs[] = {
290   "ThreadEnterGHC",
291   "ThreadRunGHC",
292   "ThreadEnterInterp",
293   "ThreadKilled",
294   "ThreadComplete"
295 };
296
297 char *threadReturnCode_strs[] = {
298   "HeapOverflow",                       /* might also be StackOverflow */
299   "StackOverflow",
300   "ThreadYielding",
301   "ThreadBlocked",
302   "ThreadFinished"
303 };
304 #endif
305
306 #ifdef PAR
307 StgTSO * createSparkThread(rtsSpark spark);
308 StgTSO * activateSpark (rtsSpark spark);  
309 #endif
310
311 /*
312  * The thread state for the main thread.
313 // ToDo: check whether not needed any more
314 StgTSO   *MainTSO;
315  */
316
317 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
318 //@subsection Main scheduling loop
319
320 /* ---------------------------------------------------------------------------
321    Main scheduling loop.
322
323    We use round-robin scheduling, each thread returning to the
324    scheduler loop when one of these conditions is detected:
325
326       * out of heap space
327       * timer expires (thread yields)
328       * thread blocks
329       * thread ends
330       * stack overflow
331
332    Locking notes:  we acquire the scheduler lock once at the beginning
333    of the scheduler loop, and release it when
334     
335       * running a thread, or
336       * waiting for work, or
337       * waiting for a GC to complete.
338
339    GRAN version:
340      In a GranSim setup this loop iterates over the global event queue.
341      This revolves around the global event queue, which determines what 
342      to do next. Therefore, it's more complicated than either the 
343      concurrent or the parallel (GUM) setup.
344
345    GUM version:
346      GUM iterates over incoming messages.
347      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
348      and sends out a fish whenever it has nothing to do; in-between
349      doing the actual reductions (shared code below) it processes the
350      incoming messages and deals with delayed operations 
351      (see PendingFetches).
352      This is not the ugliest code you could imagine, but it's bloody close.
353
354    ------------------------------------------------------------------------ */
355 //@cindex schedule
356 static void
357 schedule( void )
358 {
359   StgTSO *t;
360   Capability *cap;
361   StgThreadReturnCode ret;
362 #if defined(GRAN)
363   rtsEvent *event;
364 #elif defined(PAR)
365   StgSparkPool *pool;
366   rtsSpark spark;
367   StgTSO *tso;
368   GlobalTaskId pe;
369   rtsBool receivedFinish = rtsFalse;
370 # if defined(DEBUG)
371   nat tp_size, sp_size; // stats only
372 # endif
373 #endif
374   rtsBool was_interrupted = rtsFalse;
375   
376   ACQUIRE_LOCK(&sched_mutex);
377
378 #if defined(GRAN)
379
380   /* set up first event to get things going */
381   /* ToDo: assign costs for system setup and init MainTSO ! */
382   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
383             ContinueThread, 
384             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
385
386   IF_DEBUG(gran,
387            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
388            G_TSO(CurrentTSO, 5));
389
390   if (RtsFlags.GranFlags.Light) {
391     /* Save current time; GranSim Light only */
392     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
393   }      
394
395   event = get_next_event();
396
397   while (event!=(rtsEvent*)NULL) {
398     /* Choose the processor with the next event */
399     CurrentProc = event->proc;
400     CurrentTSO = event->tso;
401
402 #elif defined(PAR)
403
404   while (!receivedFinish) {    /* set by processMessages */
405                                /* when receiving PP_FINISH message         */ 
406 #else
407
408   while (1) {
409
410 #endif
411
412     IF_DEBUG(scheduler, printAllThreads());
413
414     /* If we're interrupted (the user pressed ^C, or some other
415      * termination condition occurred), kill all the currently running
416      * threads.
417      */
418     if (interrupted) {
419       IF_DEBUG(scheduler, sched_belch("interrupted"));
420       deleteAllThreads();
421       interrupted = rtsFalse;
422       was_interrupted = rtsTrue;
423     }
424
425     /* Go through the list of main threads and wake up any
426      * clients whose computations have finished.  ToDo: this
427      * should be done more efficiently without a linear scan
428      * of the main threads list, somehow...
429      */
430 #ifdef SMP
431     { 
432       StgMainThread *m, **prev;
433       prev = &main_threads;
434       for (m = main_threads; m != NULL; m = m->link) {
435         switch (m->tso->what_next) {
436         case ThreadComplete:
437           if (m->ret) {
438             *(m->ret) = (StgClosure *)m->tso->sp[0];
439           }
440           *prev = m->link;
441           m->stat = Success;
442           pthread_cond_broadcast(&m->wakeup);
443           break;
444         case ThreadKilled:
445           if (m->ret) *(m->ret) = NULL;
446           *prev = m->link;
447           if (was_interrupted) {
448             m->stat = Interrupted;
449           } else {
450             m->stat = Killed;
451           }
452           pthread_cond_broadcast(&m->wakeup);
453           break;
454         default:
455           break;
456         }
457       }
458     }
459
460 #else // not SMP
461
462 # if defined(PAR)
463     /* in GUM do this only on the Main PE */
464     if (IAmMainThread)
465 # endif
466     /* If our main thread has finished or been killed, return.
467      */
468     {
469       StgMainThread *m = main_threads;
470       if (m->tso->what_next == ThreadComplete
471           || m->tso->what_next == ThreadKilled) {
472         main_threads = main_threads->link;
473         if (m->tso->what_next == ThreadComplete) {
474           /* we finished successfully, fill in the return value */
475           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
476           m->stat = Success;
477           return;
478         } else {
479           if (m->ret) { *(m->ret) = NULL; };
480           if (was_interrupted) {
481             m->stat = Interrupted;
482           } else {
483             m->stat = Killed;
484           }
485           return;
486         }
487       }
488     }
489 #endif
490
491     /* Top up the run queue from our spark pool.  We try to make the
492      * number of threads in the run queue equal to the number of
493      * free capabilities.
494      */
495 #if defined(SMP)
496     {
497       nat n = n_free_capabilities;
498       StgTSO *tso = run_queue_hd;
499
500       /* Count the run queue */
501       while (n > 0 && tso != END_TSO_QUEUE) {
502         tso = tso->link;
503         n--;
504       }
505
506       for (; n > 0; n--) {
507         StgClosure *spark;
508         spark = findSpark(rtsFalse);
509         if (spark == NULL) {
510           break; /* no more sparks in the pool */
511         } else {
512           /* I'd prefer this to be done in activateSpark -- HWL */
513           /* tricky - it needs to hold the scheduler lock and
514            * not try to re-acquire it -- SDM */
515           createSparkThread(spark);       
516           IF_DEBUG(scheduler,
517                    sched_belch("==^^ turning spark of closure %p into a thread",
518                                (StgClosure *)spark));
519         }
520       }
521       /* We need to wake up the other tasks if we just created some
522        * work for them.
523        */
524       if (n_free_capabilities - n > 1) {
525           pthread_cond_signal(&thread_ready_cond);
526       }
527     }
528 #endif // SMP
529
530     /* check for signals each time around the scheduler */
531 #ifndef mingw32_TARGET_OS
532     if (signals_pending()) {
533       startSignalHandlers();
534     }
535 #endif
536
537     /* Check whether any waiting threads need to be woken up.  If the
538      * run queue is empty, and there are no other tasks running, we
539      * can wait indefinitely for something to happen.
540      * ToDo: what if another client comes along & requests another
541      * main thread?
542      */
543     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
544       awaitEvent(
545            (run_queue_hd == END_TSO_QUEUE)
546 #ifdef SMP
547         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
548 #endif
549         );
550     }
551     /* we can be interrupted while waiting for I/O... */
552     if (interrupted) continue;
553
554     /* 
555      * Detect deadlock: when we have no threads to run, there are no
556      * threads waiting on I/O or sleeping, and all the other tasks are
557      * waiting for work, we must have a deadlock of some description.
558      *
559      * We first try to find threads blocked on themselves (ie. black
560      * holes), and generate NonTermination exceptions where necessary.
561      *
562      * If no threads are black holed, we have a deadlock situation, so
563      * inform all the main threads.
564      */
565 #ifndef PAR
566     if (blocked_queue_hd == END_TSO_QUEUE
567         && run_queue_hd == END_TSO_QUEUE
568         && sleeping_queue == END_TSO_QUEUE
569 #ifdef SMP
570         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
571 #endif
572         )
573     {
574         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
575         GarbageCollect(GetRoots,rtsTrue);
576         if (blocked_queue_hd == END_TSO_QUEUE
577             && run_queue_hd == END_TSO_QUEUE
578             && sleeping_queue == END_TSO_QUEUE) {
579             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
580             detectBlackHoles();
581             if (run_queue_hd == END_TSO_QUEUE) {
582                 StgMainThread *m = main_threads;
583 #ifdef SMP
584                 for (; m != NULL; m = m->link) {
585                     deleteThread(m->tso);
586                     m->ret = NULL;
587                     m->stat = Deadlock;
588                     pthread_cond_broadcast(&m->wakeup);
589                 }
590                 main_threads = NULL;
591 #else
592                 deleteThread(m->tso);
593                 m->ret = NULL;
594                 m->stat = Deadlock;
595                 main_threads = m->link;
596                 return;
597 #endif
598             }
599         }
600     }
601 #elif defined(PAR)
602     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
603 #endif
604
605 #ifdef SMP
606     /* If there's a GC pending, don't do anything until it has
607      * completed.
608      */
609     if (ready_to_gc) {
610       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
611       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
612     }
613     
614     /* block until we've got a thread on the run queue and a free
615      * capability.
616      */
617     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
618       IF_DEBUG(scheduler, sched_belch("waiting for work"));
619       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
620       IF_DEBUG(scheduler, sched_belch("work now available"));
621     }
622 #endif
623
624 #if defined(GRAN)
625
626     if (RtsFlags.GranFlags.Light)
627       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
628
629     /* adjust time based on time-stamp */
630     if (event->time > CurrentTime[CurrentProc] &&
631         event->evttype != ContinueThread)
632       CurrentTime[CurrentProc] = event->time;
633     
634     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
635     if (!RtsFlags.GranFlags.Light)
636       handleIdlePEs();
637
638     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
639
640     /* main event dispatcher in GranSim */
641     switch (event->evttype) {
642       /* Should just be continuing execution */
643     case ContinueThread:
644       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
645       /* ToDo: check assertion
646       ASSERT(run_queue_hd != (StgTSO*)NULL &&
647              run_queue_hd != END_TSO_QUEUE);
648       */
649       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
650       if (!RtsFlags.GranFlags.DoAsyncFetch &&
651           procStatus[CurrentProc]==Fetching) {
652         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
653               CurrentTSO->id, CurrentTSO, CurrentProc);
654         goto next_thread;
655       } 
656       /* Ignore ContinueThreads for completed threads */
657       if (CurrentTSO->what_next == ThreadComplete) {
658         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
659               CurrentTSO->id, CurrentTSO, CurrentProc);
660         goto next_thread;
661       } 
662       /* Ignore ContinueThreads for threads that are being migrated */
663       if (PROCS(CurrentTSO)==Nowhere) { 
664         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
665               CurrentTSO->id, CurrentTSO, CurrentProc);
666         goto next_thread;
667       }
668       /* The thread should be at the beginning of the run queue */
669       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
670         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
671               CurrentTSO->id, CurrentTSO, CurrentProc);
672         break; // run the thread anyway
673       }
674       /*
675       new_event(proc, proc, CurrentTime[proc],
676                 FindWork,
677                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
678       goto next_thread; 
679       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
680       break; // now actually run the thread; DaH Qu'vam yImuHbej 
681
682     case FetchNode:
683       do_the_fetchnode(event);
684       goto next_thread;             /* handle next event in event queue  */
685       
686     case GlobalBlock:
687       do_the_globalblock(event);
688       goto next_thread;             /* handle next event in event queue  */
689       
690     case FetchReply:
691       do_the_fetchreply(event);
692       goto next_thread;             /* handle next event in event queue  */
693       
694     case UnblockThread:   /* Move from the blocked queue to the tail of */
695       do_the_unblock(event);
696       goto next_thread;             /* handle next event in event queue  */
697       
698     case ResumeThread:  /* Move from the blocked queue to the tail of */
699       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
700       event->tso->gran.blocktime += 
701         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
702       do_the_startthread(event);
703       goto next_thread;             /* handle next event in event queue  */
704       
705     case StartThread:
706       do_the_startthread(event);
707       goto next_thread;             /* handle next event in event queue  */
708       
709     case MoveThread:
710       do_the_movethread(event);
711       goto next_thread;             /* handle next event in event queue  */
712       
713     case MoveSpark:
714       do_the_movespark(event);
715       goto next_thread;             /* handle next event in event queue  */
716       
717     case FindWork:
718       do_the_findwork(event);
719       goto next_thread;             /* handle next event in event queue  */
720       
721     default:
722       barf("Illegal event type %u\n", event->evttype);
723     }  /* switch */
724     
725     /* This point was scheduler_loop in the old RTS */
726
727     IF_DEBUG(gran, belch("GRAN: after main switch"));
728
729     TimeOfLastEvent = CurrentTime[CurrentProc];
730     TimeOfNextEvent = get_time_of_next_event();
731     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
732     // CurrentTSO = ThreadQueueHd;
733
734     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
735                          TimeOfNextEvent));
736
737     if (RtsFlags.GranFlags.Light) 
738       GranSimLight_leave_system(event, &ActiveTSO); 
739
740     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
741
742     IF_DEBUG(gran, 
743              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
744
745     /* in a GranSim setup the TSO stays on the run queue */
746     t = CurrentTSO;
747     /* Take a thread from the run queue. */
748     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
749
750     IF_DEBUG(gran, 
751              fprintf(stderr, "GRAN: About to run current thread, which is\n");
752              G_TSO(t,5));
753
754     context_switch = 0; // turned on via GranYield, checking events and time slice
755
756     IF_DEBUG(gran, 
757              DumpGranEvent(GR_SCHEDULE, t));
758
759     procStatus[CurrentProc] = Busy;
760
761 #elif defined(PAR)
762     if (PendingFetches != END_BF_QUEUE) {
763         processFetches();
764     }
765
766     /* ToDo: phps merge with spark activation above */
767     /* check whether we have local work and send requests if we have none */
768     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
769       /* :-[  no local threads => look out for local sparks */
770       /* the spark pool for the current PE */
771       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
772       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
773           pool->hd < pool->tl) {
774         /* 
775          * ToDo: add GC code check that we really have enough heap afterwards!!
776          * Old comment:
777          * If we're here (no runnable threads) and we have pending
778          * sparks, we must have a space problem.  Get enough space
779          * to turn one of those pending sparks into a
780          * thread... 
781          */
782
783         spark = findSpark(rtsFalse);                /* get a spark */
784         if (spark != (rtsSpark) NULL) {
785           tso = activateSpark(spark);       /* turn the spark into a thread */
786           IF_PAR_DEBUG(schedule,
787                        belch("==== schedule: Created TSO %d (%p); %d threads active",
788                              tso->id, tso, advisory_thread_count));
789
790           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
791             belch("==^^ failed to activate spark");
792             goto next_thread;
793           }               /* otherwise fall through & pick-up new tso */
794         } else {
795           IF_PAR_DEBUG(verbose,
796                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
797                              spark_queue_len(pool)));
798           goto next_thread;
799         }
800       }
801
802       /* If we still have no work we need to send a FISH to get a spark
803          from another PE 
804       */
805       if (EMPTY_RUN_QUEUE()) {
806       /* =8-[  no local sparks => look for work on other PEs */
807         /*
808          * We really have absolutely no work.  Send out a fish
809          * (there may be some out there already), and wait for
810          * something to arrive.  We clearly can't run any threads
811          * until a SCHEDULE or RESUME arrives, and so that's what
812          * we're hoping to see.  (Of course, we still have to
813          * respond to other types of messages.)
814          */
815         TIME now = msTime() /*CURRENT_TIME*/;
816         IF_PAR_DEBUG(verbose, 
817                      belch("--  now=%ld", now));
818         IF_PAR_DEBUG(verbose,
819                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
820                          (last_fish_arrived_at!=0 &&
821                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
822                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
823                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
824                              last_fish_arrived_at,
825                              RtsFlags.ParFlags.fishDelay, now);
826                      });
827         
828         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
829             (last_fish_arrived_at==0 ||
830              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
831           /* outstandingFishes is set in sendFish, processFish;
832              avoid flooding system with fishes via delay */
833           pe = choosePE();
834           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
835                    NEW_FISH_HUNGER);
836
837           // Global statistics: count no. of fishes
838           if (RtsFlags.ParFlags.ParStats.Global &&
839               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
840             globalParStats.tot_fish_mess++;
841           }
842         }
843       
844         receivedFinish = processMessages();
845         goto next_thread;
846       }
847     } else if (PacketsWaiting()) {  /* Look for incoming messages */
848       receivedFinish = processMessages();
849     }
850
851     /* Now we are sure that we have some work available */
852     ASSERT(run_queue_hd != END_TSO_QUEUE);
853
854     /* Take a thread from the run queue, if we have work */
855     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
856     IF_DEBUG(sanity,checkTSO(t));
857
858     /* ToDo: write something to the log-file
859     if (RTSflags.ParFlags.granSimStats && !sameThread)
860         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
861
862     CurrentTSO = t;
863     */
864     /* the spark pool for the current PE */
865     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
866
867     IF_DEBUG(scheduler, 
868              belch("--=^ %d threads, %d sparks on [%#x]", 
869                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
870
871 #if 1
872     if (0 && RtsFlags.ParFlags.ParStats.Full && 
873         t && LastTSO && t->id != LastTSO->id && 
874         LastTSO->why_blocked == NotBlocked && 
875         LastTSO->what_next != ThreadComplete) {
876       // if previously scheduled TSO not blocked we have to record the context switch
877       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
878                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
879     }
880
881     if (RtsFlags.ParFlags.ParStats.Full && 
882         (emitSchedule /* forced emit */ ||
883         (t && LastTSO && t->id != LastTSO->id))) {
884       /* 
885          we are running a different TSO, so write a schedule event to log file
886          NB: If we use fair scheduling we also have to write  a deschedule 
887              event for LastTSO; with unfair scheduling we know that the
888              previous tso has blocked whenever we switch to another tso, so
889              we don't need it in GUM for now
890       */
891       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
892                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
893       emitSchedule = rtsFalse;
894     }
895      
896 #endif
897 #else /* !GRAN && !PAR */
898   
899     /* grab a thread from the run queue
900      */
901     ASSERT(run_queue_hd != END_TSO_QUEUE);
902     t = POP_RUN_QUEUE();
903
904     // Sanity check the thread we're about to run.  This can be
905     // expensive if there is lots of thread switching going on...
906     IF_DEBUG(sanity,checkTSO(t));
907
908 #endif
909     
910     /* grab a capability
911      */
912 #ifdef SMP
913     cap = free_capabilities;
914     free_capabilities = cap->link;
915     n_free_capabilities--;
916 #else
917     cap = &MainCapability;
918 #endif
919
920     cap->r.rCurrentTSO = t;
921     
922     /* context switches are now initiated by the timer signal, unless
923      * the user specified "context switch as often as possible", with
924      * +RTS -C0
925      */
926     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
927         && (run_queue_hd != END_TSO_QUEUE
928             || blocked_queue_hd != END_TSO_QUEUE
929             || sleeping_queue != END_TSO_QUEUE))
930         context_switch = 1;
931     else
932         context_switch = 0;
933
934     RELEASE_LOCK(&sched_mutex);
935
936     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
937                               t->id, t, whatNext_strs[t->what_next]));
938
939     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
940     /* Run the current thread 
941      */
942     switch (cap->r.rCurrentTSO->what_next) {
943     case ThreadKilled:
944     case ThreadComplete:
945         /* Thread already finished, return to scheduler. */
946         ret = ThreadFinished;
947         break;
948     case ThreadEnterGHC:
949         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
950         break;
951     case ThreadRunGHC:
952         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
953         break;
954     case ThreadEnterInterp:
955         ret = interpretBCO(cap);
956         break;
957     default:
958       barf("schedule: invalid what_next field");
959     }
960     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
961     
962     /* Costs for the scheduler are assigned to CCS_SYSTEM */
963 #ifdef PROFILING
964     CCCS = CCS_SYSTEM;
965 #endif
966     
967     ACQUIRE_LOCK(&sched_mutex);
968
969 #ifdef SMP
970     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
971 #elif !defined(GRAN) && !defined(PAR)
972     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
973 #endif
974     t = cap->r.rCurrentTSO;
975     
976 #if defined(PAR)
977     /* HACK 675: if the last thread didn't yield, make sure to print a 
978        SCHEDULE event to the log file when StgRunning the next thread, even
979        if it is the same one as before */
980     LastTSO = t; 
981     TimeOfLastYield = CURRENT_TIME;
982 #endif
983
984     switch (ret) {
985     case HeapOverflow:
986 #if defined(GRAN)
987       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
988       globalGranStats.tot_heapover++;
989 #elif defined(PAR)
990       globalParStats.tot_heapover++;
991 #endif
992
993       // did the task ask for a large block?
994       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
995           // if so, get one and push it on the front of the nursery.
996           bdescr *bd;
997           nat blocks;
998           
999           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1000
1001           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1002                                    t->id, t,
1003                                    whatNext_strs[t->what_next], blocks));
1004
1005           // don't do this if it would push us over the
1006           // alloc_blocks_lim limit; we'll GC first.
1007           if (alloc_blocks + blocks < alloc_blocks_lim) {
1008
1009               alloc_blocks += blocks;
1010               bd = allocGroup( blocks );
1011
1012               // link the new group into the list
1013               bd->link = cap->r.rCurrentNursery;
1014               bd->u.back = cap->r.rCurrentNursery->u.back;
1015               if (cap->r.rCurrentNursery->u.back != NULL) {
1016                   cap->r.rCurrentNursery->u.back->link = bd;
1017               } else {
1018                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1019                          g0s0->blocks == cap->r.rNursery);
1020                   cap->r.rNursery = g0s0->blocks = bd;
1021               }           
1022               cap->r.rCurrentNursery->u.back = bd;
1023
1024               // initialise it as a nursery block
1025               bd->step = g0s0;
1026               bd->gen_no = 0;
1027               bd->flags = 0;
1028               bd->free = bd->start;
1029
1030               // don't forget to update the block count in g0s0.
1031               g0s0->n_blocks += blocks;
1032               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1033
1034               // now update the nursery to point to the new block
1035               cap->r.rCurrentNursery = bd;
1036
1037               // we might be unlucky and have another thread get on the
1038               // run queue before us and steal the large block, but in that
1039               // case the thread will just end up requesting another large
1040               // block.
1041               PUSH_ON_RUN_QUEUE(t);
1042               break;
1043           }
1044       }
1045
1046       /* make all the running tasks block on a condition variable,
1047        * maybe set context_switch and wait till they all pile in,
1048        * then have them wait on a GC condition variable.
1049        */
1050       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1051                                t->id, t, whatNext_strs[t->what_next]));
1052       threadPaused(t);
1053 #if defined(GRAN)
1054       ASSERT(!is_on_queue(t,CurrentProc));
1055 #elif defined(PAR)
1056       /* Currently we emit a DESCHEDULE event before GC in GUM.
1057          ToDo: either add separate event to distinguish SYSTEM time from rest
1058                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1059       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1060         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1061                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1062         emitSchedule = rtsTrue;
1063       }
1064 #endif
1065       
1066       ready_to_gc = rtsTrue;
1067       context_switch = 1;               /* stop other threads ASAP */
1068       PUSH_ON_RUN_QUEUE(t);
1069       /* actual GC is done at the end of the while loop */
1070       break;
1071       
1072     case StackOverflow:
1073 #if defined(GRAN)
1074       IF_DEBUG(gran, 
1075                DumpGranEvent(GR_DESCHEDULE, t));
1076       globalGranStats.tot_stackover++;
1077 #elif defined(PAR)
1078       // IF_DEBUG(par, 
1079       // DumpGranEvent(GR_DESCHEDULE, t);
1080       globalParStats.tot_stackover++;
1081 #endif
1082       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1083                                t->id, t, whatNext_strs[t->what_next]));
1084       /* just adjust the stack for this thread, then pop it back
1085        * on the run queue.
1086        */
1087       threadPaused(t);
1088       { 
1089         StgMainThread *m;
1090         /* enlarge the stack */
1091         StgTSO *new_t = threadStackOverflow(t);
1092         
1093         /* This TSO has moved, so update any pointers to it from the
1094          * main thread stack.  It better not be on any other queues...
1095          * (it shouldn't be).
1096          */
1097         for (m = main_threads; m != NULL; m = m->link) {
1098           if (m->tso == t) {
1099             m->tso = new_t;
1100           }
1101         }
1102         threadPaused(new_t);
1103         PUSH_ON_RUN_QUEUE(new_t);
1104       }
1105       break;
1106
1107     case ThreadYielding:
1108 #if defined(GRAN)
1109       IF_DEBUG(gran, 
1110                DumpGranEvent(GR_DESCHEDULE, t));
1111       globalGranStats.tot_yields++;
1112 #elif defined(PAR)
1113       // IF_DEBUG(par, 
1114       // DumpGranEvent(GR_DESCHEDULE, t);
1115       globalParStats.tot_yields++;
1116 #endif
1117       /* put the thread back on the run queue.  Then, if we're ready to
1118        * GC, check whether this is the last task to stop.  If so, wake
1119        * up the GC thread.  getThread will block during a GC until the
1120        * GC is finished.
1121        */
1122       IF_DEBUG(scheduler,
1123                if (t->what_next == ThreadEnterInterp) {
1124                    /* ToDo: or maybe a timer expired when we were in Hugs?
1125                     * or maybe someone hit ctrl-C
1126                     */
1127                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1128                          t->id, t, whatNext_strs[t->what_next]);
1129                } else {
1130                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1131                          t->id, t, whatNext_strs[t->what_next]);
1132                }
1133                );
1134
1135       threadPaused(t);
1136
1137       IF_DEBUG(sanity,
1138                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1139                checkTSO(t));
1140       ASSERT(t->link == END_TSO_QUEUE);
1141 #if defined(GRAN)
1142       ASSERT(!is_on_queue(t,CurrentProc));
1143
1144       IF_DEBUG(sanity,
1145                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1146                checkThreadQsSanity(rtsTrue));
1147 #endif
1148 #if defined(PAR)
1149       if (RtsFlags.ParFlags.doFairScheduling) { 
1150         /* this does round-robin scheduling; good for concurrency */
1151         APPEND_TO_RUN_QUEUE(t);
1152       } else {
1153         /* this does unfair scheduling; good for parallelism */
1154         PUSH_ON_RUN_QUEUE(t);
1155       }
1156 #else
1157       /* this does round-robin scheduling; good for concurrency */
1158       APPEND_TO_RUN_QUEUE(t);
1159 #endif
1160 #if defined(GRAN)
1161       /* add a ContinueThread event to actually process the thread */
1162       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1163                 ContinueThread,
1164                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1165       IF_GRAN_DEBUG(bq, 
1166                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1167                G_EVENTQ(0);
1168                G_CURR_THREADQ(0));
1169 #endif /* GRAN */
1170       break;
1171       
1172     case ThreadBlocked:
1173 #if defined(GRAN)
1174       IF_DEBUG(scheduler,
1175                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1176                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1177                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1178
1179       // ??? needed; should emit block before
1180       IF_DEBUG(gran, 
1181                DumpGranEvent(GR_DESCHEDULE, t)); 
1182       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1183       /*
1184         ngoq Dogh!
1185       ASSERT(procStatus[CurrentProc]==Busy || 
1186               ((procStatus[CurrentProc]==Fetching) && 
1187               (t->block_info.closure!=(StgClosure*)NULL)));
1188       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1189           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1190             procStatus[CurrentProc]==Fetching)) 
1191         procStatus[CurrentProc] = Idle;
1192       */
1193 #elif defined(PAR)
1194       IF_DEBUG(scheduler,
1195                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1196                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1197       IF_PAR_DEBUG(bq,
1198
1199                    if (t->block_info.closure!=(StgClosure*)NULL) 
1200                      print_bq(t->block_info.closure));
1201
1202       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1203       blockThread(t);
1204
1205       /* whatever we schedule next, we must log that schedule */
1206       emitSchedule = rtsTrue;
1207
1208 #else /* !GRAN */
1209       /* don't need to do anything.  Either the thread is blocked on
1210        * I/O, in which case we'll have called addToBlockedQueue
1211        * previously, or it's blocked on an MVar or Blackhole, in which
1212        * case it'll be on the relevant queue already.
1213        */
1214       IF_DEBUG(scheduler,
1215                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1216                printThreadBlockage(t);
1217                fprintf(stderr, "\n"));
1218
1219       /* Only for dumping event to log file 
1220          ToDo: do I need this in GranSim, too?
1221       blockThread(t);
1222       */
1223 #endif
1224       threadPaused(t);
1225       break;
1226       
1227     case ThreadFinished:
1228       /* Need to check whether this was a main thread, and if so, signal
1229        * the task that started it with the return value.  If we have no
1230        * more main threads, we probably need to stop all the tasks until
1231        * we get a new one.
1232        */
1233       /* We also end up here if the thread kills itself with an
1234        * uncaught exception, see Exception.hc.
1235        */
1236       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1237 #if defined(GRAN)
1238       endThread(t, CurrentProc); // clean-up the thread
1239 #elif defined(PAR)
1240       /* For now all are advisory -- HWL */
1241       //if(t->priority==AdvisoryPriority) ??
1242       advisory_thread_count--;
1243       
1244 # ifdef DIST
1245       if(t->dist.priority==RevalPriority)
1246         FinishReval(t);
1247 # endif
1248       
1249       if (RtsFlags.ParFlags.ParStats.Full &&
1250           !RtsFlags.ParFlags.ParStats.Suppressed) 
1251         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1252 #endif
1253       break;
1254       
1255     default:
1256       barf("schedule: invalid thread return code %d", (int)ret);
1257     }
1258     
1259 #ifdef SMP
1260     cap->link = free_capabilities;
1261     free_capabilities = cap;
1262     n_free_capabilities++;
1263 #endif
1264
1265 #ifdef SMP
1266     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1267 #else
1268     if (ready_to_gc) 
1269 #endif
1270       {
1271       /* everybody back, start the GC.
1272        * Could do it in this thread, or signal a condition var
1273        * to do it in another thread.  Either way, we need to
1274        * broadcast on gc_pending_cond afterward.
1275        */
1276 #ifdef SMP
1277       IF_DEBUG(scheduler,sched_belch("doing GC"));
1278 #endif
1279       GarbageCollect(GetRoots,rtsFalse);
1280       ready_to_gc = rtsFalse;
1281 #ifdef SMP
1282       pthread_cond_broadcast(&gc_pending_cond);
1283 #endif
1284 #if defined(GRAN)
1285       /* add a ContinueThread event to continue execution of current thread */
1286       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1287                 ContinueThread,
1288                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1289       IF_GRAN_DEBUG(bq, 
1290                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1291                G_EVENTQ(0);
1292                G_CURR_THREADQ(0));
1293 #endif /* GRAN */
1294     }
1295
1296 #if defined(GRAN)
1297   next_thread:
1298     IF_GRAN_DEBUG(unused,
1299                   print_eventq(EventHd));
1300
1301     event = get_next_event();
1302 #elif defined(PAR)
1303   next_thread:
1304     /* ToDo: wait for next message to arrive rather than busy wait */
1305 #endif /* GRAN */
1306
1307   } /* end of while(1) */
1308
1309   IF_PAR_DEBUG(verbose,
1310                belch("== Leaving schedule() after having received Finish"));
1311 }
1312
1313 /* ---------------------------------------------------------------------------
1314  * deleteAllThreads():  kill all the live threads.
1315  *
1316  * This is used when we catch a user interrupt (^C), before performing
1317  * any necessary cleanups and running finalizers.
1318  * ------------------------------------------------------------------------- */
1319    
1320 void deleteAllThreads ( void )
1321 {
1322   StgTSO* t;
1323   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1324   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1325       deleteThread(t);
1326   }
1327   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1328       deleteThread(t);
1329   }
1330   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1331       deleteThread(t);
1332   }
1333   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1334   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1335   sleeping_queue = END_TSO_QUEUE;
1336 }
1337
1338 /* startThread and  insertThread are now in GranSim.c -- HWL */
1339
1340 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1341 //@subsection Suspend and Resume
1342
1343 /* ---------------------------------------------------------------------------
1344  * Suspending & resuming Haskell threads.
1345  * 
1346  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1347  * its capability before calling the C function.  This allows another
1348  * task to pick up the capability and carry on running Haskell
1349  * threads.  It also means that if the C call blocks, it won't lock
1350  * the whole system.
1351  *
1352  * The Haskell thread making the C call is put to sleep for the
1353  * duration of the call, on the susepended_ccalling_threads queue.  We
1354  * give out a token to the task, which it can use to resume the thread
1355  * on return from the C function.
1356  * ------------------------------------------------------------------------- */
1357    
1358 StgInt
1359 suspendThread( StgRegTable *reg )
1360 {
1361   nat tok;
1362   Capability *cap;
1363
1364   // assume that *reg is a pointer to the StgRegTable part of a Capability
1365   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1366
1367   ACQUIRE_LOCK(&sched_mutex);
1368
1369   IF_DEBUG(scheduler,
1370            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1371
1372   threadPaused(cap->r.rCurrentTSO);
1373   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1374   suspended_ccalling_threads = cap->r.rCurrentTSO;
1375
1376   /* Use the thread ID as the token; it should be unique */
1377   tok = cap->r.rCurrentTSO->id;
1378
1379 #ifdef SMP
1380   cap->link = free_capabilities;
1381   free_capabilities = cap;
1382   n_free_capabilities++;
1383 #endif
1384
1385   RELEASE_LOCK(&sched_mutex);
1386   return tok; 
1387 }
1388
1389 StgRegTable *
1390 resumeThread( StgInt tok )
1391 {
1392   StgTSO *tso, **prev;
1393   Capability *cap;
1394
1395   ACQUIRE_LOCK(&sched_mutex);
1396
1397   prev = &suspended_ccalling_threads;
1398   for (tso = suspended_ccalling_threads; 
1399        tso != END_TSO_QUEUE; 
1400        prev = &tso->link, tso = tso->link) {
1401     if (tso->id == (StgThreadID)tok) {
1402       *prev = tso->link;
1403       break;
1404     }
1405   }
1406   if (tso == END_TSO_QUEUE) {
1407     barf("resumeThread: thread not found");
1408   }
1409   tso->link = END_TSO_QUEUE;
1410
1411 #ifdef SMP
1412   while (free_capabilities == NULL) {
1413     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1414     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1415     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1416   }
1417   cap = free_capabilities;
1418   free_capabilities = cap->link;
1419   n_free_capabilities--;
1420 #else  
1421   cap = &MainCapability;
1422 #endif
1423
1424   cap->r.rCurrentTSO = tso;
1425
1426   RELEASE_LOCK(&sched_mutex);
1427   return &cap->r;
1428 }
1429
1430
1431 /* ---------------------------------------------------------------------------
1432  * Static functions
1433  * ------------------------------------------------------------------------ */
1434 static void unblockThread(StgTSO *tso);
1435
1436 /* ---------------------------------------------------------------------------
1437  * Comparing Thread ids.
1438  *
1439  * This is used from STG land in the implementation of the
1440  * instances of Eq/Ord for ThreadIds.
1441  * ------------------------------------------------------------------------ */
1442
1443 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1444
1445   StgThreadID id1 = tso1->id; 
1446   StgThreadID id2 = tso2->id;
1447  
1448   if (id1 < id2) return (-1);
1449   if (id1 > id2) return 1;
1450   return 0;
1451 }
1452
1453 /* ---------------------------------------------------------------------------
1454  * Fetching the ThreadID from an StgTSO.
1455  *
1456  * This is used in the implementation of Show for ThreadIds.
1457  * ------------------------------------------------------------------------ */
1458 int rts_getThreadId(const StgTSO *tso) 
1459 {
1460   return tso->id;
1461 }
1462
1463 /* ---------------------------------------------------------------------------
1464    Create a new thread.
1465
1466    The new thread starts with the given stack size.  Before the
1467    scheduler can run, however, this thread needs to have a closure
1468    (and possibly some arguments) pushed on its stack.  See
1469    pushClosure() in Schedule.h.
1470
1471    createGenThread() and createIOThread() (in SchedAPI.h) are
1472    convenient packaged versions of this function.
1473
1474    currently pri (priority) is only used in a GRAN setup -- HWL
1475    ------------------------------------------------------------------------ */
1476 //@cindex createThread
1477 #if defined(GRAN)
1478 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1479 StgTSO *
1480 createThread(nat stack_size, StgInt pri)
1481 {
1482   return createThread_(stack_size, rtsFalse, pri);
1483 }
1484
1485 static StgTSO *
1486 createThread_(nat size, rtsBool have_lock, StgInt pri)
1487 {
1488 #else
1489 StgTSO *
1490 createThread(nat stack_size)
1491 {
1492   return createThread_(stack_size, rtsFalse);
1493 }
1494
1495 static StgTSO *
1496 createThread_(nat size, rtsBool have_lock)
1497 {
1498 #endif
1499
1500     StgTSO *tso;
1501     nat stack_size;
1502
1503     /* First check whether we should create a thread at all */
1504 #if defined(PAR)
1505   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1506   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1507     threadsIgnored++;
1508     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1509           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1510     return END_TSO_QUEUE;
1511   }
1512   threadsCreated++;
1513 #endif
1514
1515 #if defined(GRAN)
1516   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1517 #endif
1518
1519   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1520
1521   /* catch ridiculously small stack sizes */
1522   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1523     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1524   }
1525
1526   stack_size = size - TSO_STRUCT_SIZEW;
1527
1528   tso = (StgTSO *)allocate(size);
1529   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1530
1531   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1532 #if defined(GRAN)
1533   SET_GRAN_HDR(tso, ThisPE);
1534 #endif
1535   tso->what_next     = ThreadEnterGHC;
1536
1537   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1538    * protect the increment operation on next_thread_id.
1539    * In future, we could use an atomic increment instead.
1540    */
1541   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1542   tso->id = next_thread_id++; 
1543   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1544
1545   tso->why_blocked  = NotBlocked;
1546   tso->blocked_exceptions = NULL;
1547
1548   tso->stack_size   = stack_size;
1549   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1550                               - TSO_STRUCT_SIZEW;
1551   tso->sp           = (P_)&(tso->stack) + stack_size;
1552
1553 #ifdef PROFILING
1554   tso->prof.CCCS = CCS_MAIN;
1555 #endif
1556
1557   /* put a stop frame on the stack */
1558   tso->sp -= sizeofW(StgStopFrame);
1559   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1560   tso->su = (StgUpdateFrame*)tso->sp;
1561
1562   // ToDo: check this
1563 #if defined(GRAN)
1564   tso->link = END_TSO_QUEUE;
1565   /* uses more flexible routine in GranSim */
1566   insertThread(tso, CurrentProc);
1567 #else
1568   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1569    * from its creation
1570    */
1571 #endif
1572
1573 #if defined(GRAN) 
1574   if (RtsFlags.GranFlags.GranSimStats.Full) 
1575     DumpGranEvent(GR_START,tso);
1576 #elif defined(PAR)
1577   if (RtsFlags.ParFlags.ParStats.Full) 
1578     DumpGranEvent(GR_STARTQ,tso);
1579   /* HACk to avoid SCHEDULE 
1580      LastTSO = tso; */
1581 #endif
1582
1583   /* Link the new thread on the global thread list.
1584    */
1585   tso->global_link = all_threads;
1586   all_threads = tso;
1587
1588 #if defined(DIST)
1589   tso->dist.priority = MandatoryPriority; //by default that is...
1590 #endif
1591
1592 #if defined(GRAN)
1593   tso->gran.pri = pri;
1594 # if defined(DEBUG)
1595   tso->gran.magic = TSO_MAGIC; // debugging only
1596 # endif
1597   tso->gran.sparkname   = 0;
1598   tso->gran.startedat   = CURRENT_TIME; 
1599   tso->gran.exported    = 0;
1600   tso->gran.basicblocks = 0;
1601   tso->gran.allocs      = 0;
1602   tso->gran.exectime    = 0;
1603   tso->gran.fetchtime   = 0;
1604   tso->gran.fetchcount  = 0;
1605   tso->gran.blocktime   = 0;
1606   tso->gran.blockcount  = 0;
1607   tso->gran.blockedat   = 0;
1608   tso->gran.globalsparks = 0;
1609   tso->gran.localsparks  = 0;
1610   if (RtsFlags.GranFlags.Light)
1611     tso->gran.clock  = Now; /* local clock */
1612   else
1613     tso->gran.clock  = 0;
1614
1615   IF_DEBUG(gran,printTSO(tso));
1616 #elif defined(PAR)
1617 # if defined(DEBUG)
1618   tso->par.magic = TSO_MAGIC; // debugging only
1619 # endif
1620   tso->par.sparkname   = 0;
1621   tso->par.startedat   = CURRENT_TIME; 
1622   tso->par.exported    = 0;
1623   tso->par.basicblocks = 0;
1624   tso->par.allocs      = 0;
1625   tso->par.exectime    = 0;
1626   tso->par.fetchtime   = 0;
1627   tso->par.fetchcount  = 0;
1628   tso->par.blocktime   = 0;
1629   tso->par.blockcount  = 0;
1630   tso->par.blockedat   = 0;
1631   tso->par.globalsparks = 0;
1632   tso->par.localsparks  = 0;
1633 #endif
1634
1635 #if defined(GRAN)
1636   globalGranStats.tot_threads_created++;
1637   globalGranStats.threads_created_on_PE[CurrentProc]++;
1638   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1639   globalGranStats.tot_sq_probes++;
1640 #elif defined(PAR)
1641   // collect parallel global statistics (currently done together with GC stats)
1642   if (RtsFlags.ParFlags.ParStats.Global &&
1643       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1644     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1645     globalParStats.tot_threads_created++;
1646   }
1647 #endif 
1648
1649 #if defined(GRAN)
1650   IF_GRAN_DEBUG(pri,
1651                 belch("==__ schedule: Created TSO %d (%p);",
1652                       CurrentProc, tso, tso->id));
1653 #elif defined(PAR)
1654     IF_PAR_DEBUG(verbose,
1655                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1656                        tso->id, tso, advisory_thread_count));
1657 #else
1658   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1659                                  tso->id, tso->stack_size));
1660 #endif    
1661   return tso;
1662 }
1663
1664 #if defined(PAR)
1665 /* RFP:
1666    all parallel thread creation calls should fall through the following routine.
1667 */
1668 StgTSO *
1669 createSparkThread(rtsSpark spark) 
1670 { StgTSO *tso;
1671   ASSERT(spark != (rtsSpark)NULL);
1672   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1673   { threadsIgnored++;
1674     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1675           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1676     return END_TSO_QUEUE;
1677   }
1678   else
1679   { threadsCreated++;
1680     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1681     if (tso==END_TSO_QUEUE)     
1682       barf("createSparkThread: Cannot create TSO");
1683 #if defined(DIST)
1684     tso->priority = AdvisoryPriority;
1685 #endif
1686     pushClosure(tso,spark);
1687     PUSH_ON_RUN_QUEUE(tso);
1688     advisory_thread_count++;    
1689   }
1690   return tso;
1691 }
1692 #endif
1693
1694 /*
1695   Turn a spark into a thread.
1696   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1697 */
1698 #if defined(PAR)
1699 //@cindex activateSpark
1700 StgTSO *
1701 activateSpark (rtsSpark spark) 
1702 {
1703   StgTSO *tso;
1704
1705   tso = createSparkThread(spark);
1706   if (RtsFlags.ParFlags.ParStats.Full) {   
1707     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1708     IF_PAR_DEBUG(verbose,
1709                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1710                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1711   }
1712   // ToDo: fwd info on local/global spark to thread -- HWL
1713   // tso->gran.exported =  spark->exported;
1714   // tso->gran.locked =   !spark->global;
1715   // tso->gran.sparkname = spark->name;
1716
1717   return tso;
1718 }
1719 #endif
1720
1721 /* ---------------------------------------------------------------------------
1722  * scheduleThread()
1723  *
1724  * scheduleThread puts a thread on the head of the runnable queue.
1725  * This will usually be done immediately after a thread is created.
1726  * The caller of scheduleThread must create the thread using e.g.
1727  * createThread and push an appropriate closure
1728  * on this thread's stack before the scheduler is invoked.
1729  * ------------------------------------------------------------------------ */
1730
1731 void
1732 scheduleThread(StgTSO *tso)
1733 {
1734   if (tso==END_TSO_QUEUE){    
1735     schedule();
1736     return;
1737   }
1738
1739   ACQUIRE_LOCK(&sched_mutex);
1740
1741   /* Put the new thread on the head of the runnable queue.  The caller
1742    * better push an appropriate closure on this thread's stack
1743    * beforehand.  In the SMP case, the thread may start running as
1744    * soon as we release the scheduler lock below.
1745    */
1746   PUSH_ON_RUN_QUEUE(tso);
1747   THREAD_RUNNABLE();
1748
1749 #if 0
1750   IF_DEBUG(scheduler,printTSO(tso));
1751 #endif
1752   RELEASE_LOCK(&sched_mutex);
1753 }
1754
1755 /* ---------------------------------------------------------------------------
1756  * startTasks()
1757  *
1758  * Start up Posix threads to run each of the scheduler tasks.
1759  * I believe the task ids are not needed in the system as defined.
1760  *  KH @ 25/10/99
1761  * ------------------------------------------------------------------------ */
1762
1763 #if defined(PAR) || defined(SMP)
1764 void
1765 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1766 {
1767   scheduleThread(END_TSO_QUEUE);
1768 }
1769 #endif
1770
1771 /* ---------------------------------------------------------------------------
1772  * initScheduler()
1773  *
1774  * Initialise the scheduler.  This resets all the queues - if the
1775  * queues contained any threads, they'll be garbage collected at the
1776  * next pass.
1777  *
1778  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1779  * ------------------------------------------------------------------------ */
1780
1781 #ifdef SMP
1782 static void
1783 term_handler(int sig STG_UNUSED)
1784 {
1785   stat_workerStop();
1786   ACQUIRE_LOCK(&term_mutex);
1787   await_death--;
1788   RELEASE_LOCK(&term_mutex);
1789   pthread_exit(NULL);
1790 }
1791 #endif
1792
1793 static void
1794 initCapability( Capability *cap )
1795 {
1796     cap->f.stgChk0         = (F_)__stg_chk_0;
1797     cap->f.stgChk1         = (F_)__stg_chk_1;
1798     cap->f.stgGCEnter1     = (F_)__stg_gc_enter_1;
1799     cap->f.stgUpdatePAP    = (F_)__stg_update_PAP;
1800 }
1801
1802 void 
1803 initScheduler(void)
1804 {
1805 #if defined(GRAN)
1806   nat i;
1807
1808   for (i=0; i<=MAX_PROC; i++) {
1809     run_queue_hds[i]      = END_TSO_QUEUE;
1810     run_queue_tls[i]      = END_TSO_QUEUE;
1811     blocked_queue_hds[i]  = END_TSO_QUEUE;
1812     blocked_queue_tls[i]  = END_TSO_QUEUE;
1813     ccalling_threadss[i]  = END_TSO_QUEUE;
1814     sleeping_queue        = END_TSO_QUEUE;
1815   }
1816 #else
1817   run_queue_hd      = END_TSO_QUEUE;
1818   run_queue_tl      = END_TSO_QUEUE;
1819   blocked_queue_hd  = END_TSO_QUEUE;
1820   blocked_queue_tl  = END_TSO_QUEUE;
1821   sleeping_queue    = END_TSO_QUEUE;
1822 #endif 
1823
1824   suspended_ccalling_threads  = END_TSO_QUEUE;
1825
1826   main_threads = NULL;
1827   all_threads  = END_TSO_QUEUE;
1828
1829   context_switch = 0;
1830   interrupted    = 0;
1831
1832   RtsFlags.ConcFlags.ctxtSwitchTicks =
1833       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1834
1835   /* Install the SIGHUP handler */
1836 #ifdef SMP
1837   {
1838     struct sigaction action,oact;
1839
1840     action.sa_handler = term_handler;
1841     sigemptyset(&action.sa_mask);
1842     action.sa_flags = 0;
1843     if (sigaction(SIGTERM, &action, &oact) != 0) {
1844       barf("can't install TERM handler");
1845     }
1846   }
1847 #endif
1848
1849 #ifdef SMP
1850   /* Allocate N Capabilities */
1851   {
1852     nat i;
1853     Capability *cap, *prev;
1854     cap  = NULL;
1855     prev = NULL;
1856     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1857       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1858       initCapability(cap);
1859       cap->link = prev;
1860       prev = cap;
1861     }
1862     free_capabilities = cap;
1863     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1864   }
1865   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1866                              n_free_capabilities););
1867 #else
1868   initCapability(&MainCapability);
1869 #endif
1870
1871 #if defined(SMP) || defined(PAR)
1872   initSparkPools();
1873 #endif
1874 }
1875
1876 #ifdef SMP
1877 void
1878 startTasks( void )
1879 {
1880   nat i;
1881   int r;
1882   pthread_t tid;
1883   
1884   /* make some space for saving all the thread ids */
1885   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1886                             "initScheduler:task_ids");
1887   
1888   /* and create all the threads */
1889   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1890     r = pthread_create(&tid,NULL,taskStart,NULL);
1891     if (r != 0) {
1892       barf("startTasks: Can't create new Posix thread");
1893     }
1894     task_ids[i].id = tid;
1895     task_ids[i].mut_time = 0.0;
1896     task_ids[i].mut_etime = 0.0;
1897     task_ids[i].gc_time = 0.0;
1898     task_ids[i].gc_etime = 0.0;
1899     task_ids[i].elapsedtimestart = elapsedtime();
1900     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1901   }
1902 }
1903 #endif
1904
1905 void
1906 exitScheduler( void )
1907 {
1908 #ifdef SMP
1909   nat i;
1910
1911   /* Don't want to use pthread_cancel, since we'd have to install
1912    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1913    * all our locks.
1914    */
1915 #if 0
1916   /* Cancel all our tasks */
1917   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1918     pthread_cancel(task_ids[i].id);
1919   }
1920   
1921   /* Wait for all the tasks to terminate */
1922   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1923     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1924                                task_ids[i].id));
1925     pthread_join(task_ids[i].id, NULL);
1926   }
1927 #endif
1928
1929   /* Send 'em all a SIGHUP.  That should shut 'em up.
1930    */
1931   await_death = RtsFlags.ParFlags.nNodes;
1932   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1933     pthread_kill(task_ids[i].id,SIGTERM);
1934   }
1935   while (await_death > 0) {
1936     sched_yield();
1937   }
1938 #endif
1939 }
1940
1941 /* -----------------------------------------------------------------------------
1942    Managing the per-task allocation areas.
1943    
1944    Each capability comes with an allocation area.  These are
1945    fixed-length block lists into which allocation can be done.
1946
1947    ToDo: no support for two-space collection at the moment???
1948    -------------------------------------------------------------------------- */
1949
1950 /* -----------------------------------------------------------------------------
1951  * waitThread is the external interface for running a new computation
1952  * and waiting for the result.
1953  *
1954  * In the non-SMP case, we create a new main thread, push it on the 
1955  * main-thread stack, and invoke the scheduler to run it.  The
1956  * scheduler will return when the top main thread on the stack has
1957  * completed or died, and fill in the necessary fields of the
1958  * main_thread structure.
1959  *
1960  * In the SMP case, we create a main thread as before, but we then
1961  * create a new condition variable and sleep on it.  When our new
1962  * main thread has completed, we'll be woken up and the status/result
1963  * will be in the main_thread struct.
1964  * -------------------------------------------------------------------------- */
1965
1966 int 
1967 howManyThreadsAvail ( void )
1968 {
1969    int i = 0;
1970    StgTSO* q;
1971    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1972       i++;
1973    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1974       i++;
1975    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1976       i++;
1977    return i;
1978 }
1979
1980 void
1981 finishAllThreads ( void )
1982 {
1983    do {
1984       while (run_queue_hd != END_TSO_QUEUE) {
1985          waitThread ( run_queue_hd, NULL );
1986       }
1987       while (blocked_queue_hd != END_TSO_QUEUE) {
1988          waitThread ( blocked_queue_hd, NULL );
1989       }
1990       while (sleeping_queue != END_TSO_QUEUE) {
1991          waitThread ( blocked_queue_hd, NULL );
1992       }
1993    } while 
1994       (blocked_queue_hd != END_TSO_QUEUE || 
1995        run_queue_hd     != END_TSO_QUEUE ||
1996        sleeping_queue   != END_TSO_QUEUE);
1997 }
1998
1999 SchedulerStatus
2000 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2001 {
2002   StgMainThread *m;
2003   SchedulerStatus stat;
2004
2005   ACQUIRE_LOCK(&sched_mutex);
2006   
2007   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2008
2009   m->tso = tso;
2010   m->ret = ret;
2011   m->stat = NoStatus;
2012 #ifdef SMP
2013   pthread_cond_init(&m->wakeup, NULL);
2014 #endif
2015
2016   m->link = main_threads;
2017   main_threads = m;
2018
2019   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2020                               m->tso->id));
2021
2022 #ifdef SMP
2023   do {
2024     pthread_cond_wait(&m->wakeup, &sched_mutex);
2025   } while (m->stat == NoStatus);
2026 #elif defined(GRAN)
2027   /* GranSim specific init */
2028   CurrentTSO = m->tso;                // the TSO to run
2029   procStatus[MainProc] = Busy;        // status of main PE
2030   CurrentProc = MainProc;             // PE to run it on
2031
2032   schedule();
2033 #else
2034   schedule();
2035   ASSERT(m->stat != NoStatus);
2036 #endif
2037
2038   stat = m->stat;
2039
2040 #ifdef SMP
2041   pthread_cond_destroy(&m->wakeup);
2042 #endif
2043
2044   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2045                               m->tso->id));
2046   free(m);
2047
2048   RELEASE_LOCK(&sched_mutex);
2049
2050   return stat;
2051 }
2052
2053 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2054 //@subsection Run queue code 
2055
2056 #if 0
2057 /* 
2058    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2059        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2060        implicit global variable that has to be correct when calling these
2061        fcts -- HWL 
2062 */
2063
2064 /* Put the new thread on the head of the runnable queue.
2065  * The caller of createThread better push an appropriate closure
2066  * on this thread's stack before the scheduler is invoked.
2067  */
2068 static /* inline */ void
2069 add_to_run_queue(tso)
2070 StgTSO* tso; 
2071 {
2072   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2073   tso->link = run_queue_hd;
2074   run_queue_hd = tso;
2075   if (run_queue_tl == END_TSO_QUEUE) {
2076     run_queue_tl = tso;
2077   }
2078 }
2079
2080 /* Put the new thread at the end of the runnable queue. */
2081 static /* inline */ void
2082 push_on_run_queue(tso)
2083 StgTSO* tso; 
2084 {
2085   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2086   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2087   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2088   if (run_queue_hd == END_TSO_QUEUE) {
2089     run_queue_hd = tso;
2090   } else {
2091     run_queue_tl->link = tso;
2092   }
2093   run_queue_tl = tso;
2094 }
2095
2096 /* 
2097    Should be inlined because it's used very often in schedule.  The tso
2098    argument is actually only needed in GranSim, where we want to have the
2099    possibility to schedule *any* TSO on the run queue, irrespective of the
2100    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2101    the run queue and dequeue the tso, adjusting the links in the queue. 
2102 */
2103 //@cindex take_off_run_queue
2104 static /* inline */ StgTSO*
2105 take_off_run_queue(StgTSO *tso) {
2106   StgTSO *t, *prev;
2107
2108   /* 
2109      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2110
2111      if tso is specified, unlink that tso from the run_queue (doesn't have
2112      to be at the beginning of the queue); GranSim only 
2113   */
2114   if (tso!=END_TSO_QUEUE) {
2115     /* find tso in queue */
2116     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2117          t!=END_TSO_QUEUE && t!=tso;
2118          prev=t, t=t->link) 
2119       /* nothing */ ;
2120     ASSERT(t==tso);
2121     /* now actually dequeue the tso */
2122     if (prev!=END_TSO_QUEUE) {
2123       ASSERT(run_queue_hd!=t);
2124       prev->link = t->link;
2125     } else {
2126       /* t is at beginning of thread queue */
2127       ASSERT(run_queue_hd==t);
2128       run_queue_hd = t->link;
2129     }
2130     /* t is at end of thread queue */
2131     if (t->link==END_TSO_QUEUE) {
2132       ASSERT(t==run_queue_tl);
2133       run_queue_tl = prev;
2134     } else {
2135       ASSERT(run_queue_tl!=t);
2136     }
2137     t->link = END_TSO_QUEUE;
2138   } else {
2139     /* take tso from the beginning of the queue; std concurrent code */
2140     t = run_queue_hd;
2141     if (t != END_TSO_QUEUE) {
2142       run_queue_hd = t->link;
2143       t->link = END_TSO_QUEUE;
2144       if (run_queue_hd == END_TSO_QUEUE) {
2145         run_queue_tl = END_TSO_QUEUE;
2146       }
2147     }
2148   }
2149   return t;
2150 }
2151
2152 #endif /* 0 */
2153
2154 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2155 //@subsection Garbage Collextion Routines
2156
2157 /* ---------------------------------------------------------------------------
2158    Where are the roots that we know about?
2159
2160         - all the threads on the runnable queue
2161         - all the threads on the blocked queue
2162         - all the threads on the sleeping queue
2163         - all the thread currently executing a _ccall_GC
2164         - all the "main threads"
2165      
2166    ------------------------------------------------------------------------ */
2167
2168 /* This has to be protected either by the scheduler monitor, or by the
2169         garbage collection monitor (probably the latter).
2170         KH @ 25/10/99
2171 */
2172
2173 static void
2174 GetRoots(evac_fn evac)
2175 {
2176   StgMainThread *m;
2177
2178 #if defined(GRAN)
2179   {
2180     nat i;
2181     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2182       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2183           evac((StgClosure **)&run_queue_hds[i]);
2184       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2185           evac((StgClosure **)&run_queue_tls[i]);
2186       
2187       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2188           evac((StgClosure **)&blocked_queue_hds[i]);
2189       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2190           evac((StgClosure **)&blocked_queue_tls[i]);
2191       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2192           evac((StgClosure **)&ccalling_threads[i]);
2193     }
2194   }
2195
2196   markEventQueue();
2197
2198 #else /* !GRAN */
2199   if (run_queue_hd != END_TSO_QUEUE) {
2200       ASSERT(run_queue_tl != END_TSO_QUEUE);
2201       evac((StgClosure **)&run_queue_hd);
2202       evac((StgClosure **)&run_queue_tl);
2203   }
2204   
2205   if (blocked_queue_hd != END_TSO_QUEUE) {
2206       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2207       evac((StgClosure **)&blocked_queue_hd);
2208       evac((StgClosure **)&blocked_queue_tl);
2209   }
2210   
2211   if (sleeping_queue != END_TSO_QUEUE) {
2212       evac((StgClosure **)&sleeping_queue);
2213   }
2214 #endif 
2215
2216   for (m = main_threads; m != NULL; m = m->link) {
2217       evac((StgClosure **)&m->tso);
2218   }
2219   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2220       evac((StgClosure **)&suspended_ccalling_threads);
2221   }
2222
2223 #if defined(SMP) || defined(PAR) || defined(GRAN)
2224   markSparkQueue(evac);
2225 #endif
2226 }
2227
2228 /* -----------------------------------------------------------------------------
2229    performGC
2230
2231    This is the interface to the garbage collector from Haskell land.
2232    We provide this so that external C code can allocate and garbage
2233    collect when called from Haskell via _ccall_GC.
2234
2235    It might be useful to provide an interface whereby the programmer
2236    can specify more roots (ToDo).
2237    
2238    This needs to be protected by the GC condition variable above.  KH.
2239    -------------------------------------------------------------------------- */
2240
2241 void (*extra_roots)(evac_fn);
2242
2243 void
2244 performGC(void)
2245 {
2246   GarbageCollect(GetRoots,rtsFalse);
2247 }
2248
2249 void
2250 performMajorGC(void)
2251 {
2252   GarbageCollect(GetRoots,rtsTrue);
2253 }
2254
2255 static void
2256 AllRoots(evac_fn evac)
2257 {
2258     GetRoots(evac);             // the scheduler's roots
2259     extra_roots(evac);          // the user's roots
2260 }
2261
2262 void
2263 performGCWithRoots(void (*get_roots)(evac_fn))
2264 {
2265   extra_roots = get_roots;
2266   GarbageCollect(AllRoots,rtsFalse);
2267 }
2268
2269 /* -----------------------------------------------------------------------------
2270    Stack overflow
2271
2272    If the thread has reached its maximum stack size, then raise the
2273    StackOverflow exception in the offending thread.  Otherwise
2274    relocate the TSO into a larger chunk of memory and adjust its stack
2275    size appropriately.
2276    -------------------------------------------------------------------------- */
2277
2278 static StgTSO *
2279 threadStackOverflow(StgTSO *tso)
2280 {
2281   nat new_stack_size, new_tso_size, diff, stack_words;
2282   StgPtr new_sp;
2283   StgTSO *dest;
2284
2285   IF_DEBUG(sanity,checkTSO(tso));
2286   if (tso->stack_size >= tso->max_stack_size) {
2287
2288     IF_DEBUG(gc,
2289              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2290                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2291              /* If we're debugging, just print out the top of the stack */
2292              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2293                                               tso->sp+64)));
2294
2295     /* Send this thread the StackOverflow exception */
2296     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2297     return tso;
2298   }
2299
2300   /* Try to double the current stack size.  If that takes us over the
2301    * maximum stack size for this thread, then use the maximum instead.
2302    * Finally round up so the TSO ends up as a whole number of blocks.
2303    */
2304   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2305   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2306                                        TSO_STRUCT_SIZE)/sizeof(W_);
2307   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2308   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2309
2310   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2311
2312   dest = (StgTSO *)allocate(new_tso_size);
2313   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2314
2315   /* copy the TSO block and the old stack into the new area */
2316   memcpy(dest,tso,TSO_STRUCT_SIZE);
2317   stack_words = tso->stack + tso->stack_size - tso->sp;
2318   new_sp = (P_)dest + new_tso_size - stack_words;
2319   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2320
2321   /* relocate the stack pointers... */
2322   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2323   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2324   dest->sp    = new_sp;
2325   dest->stack_size = new_stack_size;
2326         
2327   /* and relocate the update frame list */
2328   relocate_stack(dest, diff);
2329
2330   /* Mark the old TSO as relocated.  We have to check for relocated
2331    * TSOs in the garbage collector and any primops that deal with TSOs.
2332    *
2333    * It's important to set the sp and su values to just beyond the end
2334    * of the stack, so we don't attempt to scavenge any part of the
2335    * dead TSO's stack.
2336    */
2337   tso->what_next = ThreadRelocated;
2338   tso->link = dest;
2339   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2340   tso->su = (StgUpdateFrame *)tso->sp;
2341   tso->why_blocked = NotBlocked;
2342   dest->mut_link = NULL;
2343
2344   IF_PAR_DEBUG(verbose,
2345                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2346                      tso->id, tso, tso->stack_size);
2347                /* If we're debugging, just print out the top of the stack */
2348                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2349                                                 tso->sp+64)));
2350   
2351   IF_DEBUG(sanity,checkTSO(tso));
2352 #if 0
2353   IF_DEBUG(scheduler,printTSO(dest));
2354 #endif
2355
2356   return dest;
2357 }
2358
2359 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2360 //@subsection Blocking Queue Routines
2361
2362 /* ---------------------------------------------------------------------------
2363    Wake up a queue that was blocked on some resource.
2364    ------------------------------------------------------------------------ */
2365
2366 #if defined(GRAN)
2367 static inline void
2368 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2369 {
2370 }
2371 #elif defined(PAR)
2372 static inline void
2373 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2374 {
2375   /* write RESUME events to log file and
2376      update blocked and fetch time (depending on type of the orig closure) */
2377   if (RtsFlags.ParFlags.ParStats.Full) {
2378     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2379                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2380                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2381     if (EMPTY_RUN_QUEUE())
2382       emitSchedule = rtsTrue;
2383
2384     switch (get_itbl(node)->type) {
2385         case FETCH_ME_BQ:
2386           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2387           break;
2388         case RBH:
2389         case FETCH_ME:
2390         case BLACKHOLE_BQ:
2391           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2392           break;
2393 #ifdef DIST
2394         case MVAR:
2395           break;
2396 #endif    
2397         default:
2398           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2399         }
2400       }
2401 }
2402 #endif
2403
2404 #if defined(GRAN)
2405 static StgBlockingQueueElement *
2406 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2407 {
2408     StgTSO *tso;
2409     PEs node_loc, tso_loc;
2410
2411     node_loc = where_is(node); // should be lifted out of loop
2412     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2413     tso_loc = where_is((StgClosure *)tso);
2414     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2415       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2416       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2417       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2418       // insertThread(tso, node_loc);
2419       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2420                 ResumeThread,
2421                 tso, node, (rtsSpark*)NULL);
2422       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2423       // len_local++;
2424       // len++;
2425     } else { // TSO is remote (actually should be FMBQ)
2426       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2427                                   RtsFlags.GranFlags.Costs.gunblocktime +
2428                                   RtsFlags.GranFlags.Costs.latency;
2429       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2430                 UnblockThread,
2431                 tso, node, (rtsSpark*)NULL);
2432       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2433       // len++;
2434     }
2435     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2436     IF_GRAN_DEBUG(bq,
2437                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2438                           (node_loc==tso_loc ? "Local" : "Global"), 
2439                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2440     tso->block_info.closure = NULL;
2441     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2442                              tso->id, tso));
2443 }
2444 #elif defined(PAR)
2445 static StgBlockingQueueElement *
2446 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2447 {
2448     StgBlockingQueueElement *next;
2449
2450     switch (get_itbl(bqe)->type) {
2451     case TSO:
2452       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2453       /* if it's a TSO just push it onto the run_queue */
2454       next = bqe->link;
2455       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2456       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2457       THREAD_RUNNABLE();
2458       unblockCount(bqe, node);
2459       /* reset blocking status after dumping event */
2460       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2461       break;
2462
2463     case BLOCKED_FETCH:
2464       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2465       next = bqe->link;
2466       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2467       PendingFetches = (StgBlockedFetch *)bqe;
2468       break;
2469
2470 # if defined(DEBUG)
2471       /* can ignore this case in a non-debugging setup; 
2472          see comments on RBHSave closures above */
2473     case CONSTR:
2474       /* check that the closure is an RBHSave closure */
2475       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2476              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2477              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2478       break;
2479
2480     default:
2481       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2482            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2483            (StgClosure *)bqe);
2484 # endif
2485     }
2486   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2487   return next;
2488 }
2489
2490 #else /* !GRAN && !PAR */
2491 static StgTSO *
2492 unblockOneLocked(StgTSO *tso)
2493 {
2494   StgTSO *next;
2495
2496   ASSERT(get_itbl(tso)->type == TSO);
2497   ASSERT(tso->why_blocked != NotBlocked);
2498   tso->why_blocked = NotBlocked;
2499   next = tso->link;
2500   PUSH_ON_RUN_QUEUE(tso);
2501   THREAD_RUNNABLE();
2502   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2503   return next;
2504 }
2505 #endif
2506
2507 #if defined(GRAN) || defined(PAR)
2508 inline StgBlockingQueueElement *
2509 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2510 {
2511   ACQUIRE_LOCK(&sched_mutex);
2512   bqe = unblockOneLocked(bqe, node);
2513   RELEASE_LOCK(&sched_mutex);
2514   return bqe;
2515 }
2516 #else
2517 inline StgTSO *
2518 unblockOne(StgTSO *tso)
2519 {
2520   ACQUIRE_LOCK(&sched_mutex);
2521   tso = unblockOneLocked(tso);
2522   RELEASE_LOCK(&sched_mutex);
2523   return tso;
2524 }
2525 #endif
2526
2527 #if defined(GRAN)
2528 void 
2529 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2530 {
2531   StgBlockingQueueElement *bqe;
2532   PEs node_loc;
2533   nat len = 0; 
2534
2535   IF_GRAN_DEBUG(bq, 
2536                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2537                       node, CurrentProc, CurrentTime[CurrentProc], 
2538                       CurrentTSO->id, CurrentTSO));
2539
2540   node_loc = where_is(node);
2541
2542   ASSERT(q == END_BQ_QUEUE ||
2543          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2544          get_itbl(q)->type == CONSTR); // closure (type constructor)
2545   ASSERT(is_unique(node));
2546
2547   /* FAKE FETCH: magically copy the node to the tso's proc;
2548      no Fetch necessary because in reality the node should not have been 
2549      moved to the other PE in the first place
2550   */
2551   if (CurrentProc!=node_loc) {
2552     IF_GRAN_DEBUG(bq, 
2553                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2554                         node, node_loc, CurrentProc, CurrentTSO->id, 
2555                         // CurrentTSO, where_is(CurrentTSO),
2556                         node->header.gran.procs));
2557     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2558     IF_GRAN_DEBUG(bq, 
2559                   belch("## new bitmask of node %p is %#x",
2560                         node, node->header.gran.procs));
2561     if (RtsFlags.GranFlags.GranSimStats.Global) {
2562       globalGranStats.tot_fake_fetches++;
2563     }
2564   }
2565
2566   bqe = q;
2567   // ToDo: check: ASSERT(CurrentProc==node_loc);
2568   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2569     //next = bqe->link;
2570     /* 
2571        bqe points to the current element in the queue
2572        next points to the next element in the queue
2573     */
2574     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2575     //tso_loc = where_is(tso);
2576     len++;
2577     bqe = unblockOneLocked(bqe, node);
2578   }
2579
2580   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2581      the closure to make room for the anchor of the BQ */
2582   if (bqe!=END_BQ_QUEUE) {
2583     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2584     /*
2585     ASSERT((info_ptr==&RBH_Save_0_info) ||
2586            (info_ptr==&RBH_Save_1_info) ||
2587            (info_ptr==&RBH_Save_2_info));
2588     */
2589     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2590     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2591     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2592
2593     IF_GRAN_DEBUG(bq,
2594                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2595                         node, info_type(node)));
2596   }
2597
2598   /* statistics gathering */
2599   if (RtsFlags.GranFlags.GranSimStats.Global) {
2600     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2601     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2602     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2603     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2604   }
2605   IF_GRAN_DEBUG(bq,
2606                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2607                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2608 }
2609 #elif defined(PAR)
2610 void 
2611 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2612 {
2613   StgBlockingQueueElement *bqe;
2614
2615   ACQUIRE_LOCK(&sched_mutex);
2616
2617   IF_PAR_DEBUG(verbose, 
2618                belch("##-_ AwBQ for node %p on [%x]: ",
2619                      node, mytid));
2620 #ifdef DIST  
2621   //RFP
2622   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2623     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2624     return;
2625   }
2626 #endif
2627   
2628   ASSERT(q == END_BQ_QUEUE ||
2629          get_itbl(q)->type == TSO ||           
2630          get_itbl(q)->type == BLOCKED_FETCH || 
2631          get_itbl(q)->type == CONSTR); 
2632
2633   bqe = q;
2634   while (get_itbl(bqe)->type==TSO || 
2635          get_itbl(bqe)->type==BLOCKED_FETCH) {
2636     bqe = unblockOneLocked(bqe, node);
2637   }
2638   RELEASE_LOCK(&sched_mutex);
2639 }
2640
2641 #else   /* !GRAN && !PAR */
2642 void
2643 awakenBlockedQueue(StgTSO *tso)
2644 {
2645   ACQUIRE_LOCK(&sched_mutex);
2646   while (tso != END_TSO_QUEUE) {
2647     tso = unblockOneLocked(tso);
2648   }
2649   RELEASE_LOCK(&sched_mutex);
2650 }
2651 #endif
2652
2653 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2654 //@subsection Exception Handling Routines
2655
2656 /* ---------------------------------------------------------------------------
2657    Interrupt execution
2658    - usually called inside a signal handler so it mustn't do anything fancy.   
2659    ------------------------------------------------------------------------ */
2660
2661 void
2662 interruptStgRts(void)
2663 {
2664     interrupted    = 1;
2665     context_switch = 1;
2666 }
2667
2668 /* -----------------------------------------------------------------------------
2669    Unblock a thread
2670
2671    This is for use when we raise an exception in another thread, which
2672    may be blocked.
2673    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2674    -------------------------------------------------------------------------- */
2675
2676 #if defined(GRAN) || defined(PAR)
2677 /*
2678   NB: only the type of the blocking queue is different in GranSim and GUM
2679       the operations on the queue-elements are the same
2680       long live polymorphism!
2681 */
2682 static void
2683 unblockThread(StgTSO *tso)
2684 {
2685   StgBlockingQueueElement *t, **last;
2686
2687   ACQUIRE_LOCK(&sched_mutex);
2688   switch (tso->why_blocked) {
2689
2690   case NotBlocked:
2691     return;  /* not blocked */
2692
2693   case BlockedOnMVar:
2694     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2695     {
2696       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2697       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2698
2699       last = (StgBlockingQueueElement **)&mvar->head;
2700       for (t = (StgBlockingQueueElement *)mvar->head; 
2701            t != END_BQ_QUEUE; 
2702            last = &t->link, last_tso = t, t = t->link) {
2703         if (t == (StgBlockingQueueElement *)tso) {
2704           *last = (StgBlockingQueueElement *)tso->link;
2705           if (mvar->tail == tso) {
2706             mvar->tail = (StgTSO *)last_tso;
2707           }
2708           goto done;
2709         }
2710       }
2711       barf("unblockThread (MVAR): TSO not found");
2712     }
2713
2714   case BlockedOnBlackHole:
2715     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2716     {
2717       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2718
2719       last = &bq->blocking_queue;
2720       for (t = bq->blocking_queue; 
2721            t != END_BQ_QUEUE; 
2722            last = &t->link, t = t->link) {
2723         if (t == (StgBlockingQueueElement *)tso) {
2724           *last = (StgBlockingQueueElement *)tso->link;
2725           goto done;
2726         }
2727       }
2728       barf("unblockThread (BLACKHOLE): TSO not found");
2729     }
2730
2731   case BlockedOnException:
2732     {
2733       StgTSO *target  = tso->block_info.tso;
2734
2735       ASSERT(get_itbl(target)->type == TSO);
2736
2737       if (target->what_next == ThreadRelocated) {
2738           target = target->link;
2739           ASSERT(get_itbl(target)->type == TSO);
2740       }
2741
2742       ASSERT(target->blocked_exceptions != NULL);
2743
2744       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2745       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2746            t != END_BQ_QUEUE; 
2747            last = &t->link, t = t->link) {
2748         ASSERT(get_itbl(t)->type == TSO);
2749         if (t == (StgBlockingQueueElement *)tso) {
2750           *last = (StgBlockingQueueElement *)tso->link;
2751           goto done;
2752         }
2753       }
2754       barf("unblockThread (Exception): TSO not found");
2755     }
2756
2757   case BlockedOnRead:
2758   case BlockedOnWrite:
2759     {
2760       /* take TSO off blocked_queue */
2761       StgBlockingQueueElement *prev = NULL;
2762       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2763            prev = t, t = t->link) {
2764         if (t == (StgBlockingQueueElement *)tso) {
2765           if (prev == NULL) {
2766             blocked_queue_hd = (StgTSO *)t->link;
2767             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2768               blocked_queue_tl = END_TSO_QUEUE;
2769             }
2770           } else {
2771             prev->link = t->link;
2772             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2773               blocked_queue_tl = (StgTSO *)prev;
2774             }
2775           }
2776           goto done;
2777         }
2778       }
2779       barf("unblockThread (I/O): TSO not found");
2780     }
2781
2782   case BlockedOnDelay:
2783     {
2784       /* take TSO off sleeping_queue */
2785       StgBlockingQueueElement *prev = NULL;
2786       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2787            prev = t, t = t->link) {
2788         if (t == (StgBlockingQueueElement *)tso) {
2789           if (prev == NULL) {
2790             sleeping_queue = (StgTSO *)t->link;
2791           } else {
2792             prev->link = t->link;
2793           }
2794           goto done;
2795         }
2796       }
2797       barf("unblockThread (I/O): TSO not found");
2798     }
2799
2800   default:
2801     barf("unblockThread");
2802   }
2803
2804  done:
2805   tso->link = END_TSO_QUEUE;
2806   tso->why_blocked = NotBlocked;
2807   tso->block_info.closure = NULL;
2808   PUSH_ON_RUN_QUEUE(tso);
2809   RELEASE_LOCK(&sched_mutex);
2810 }
2811 #else
2812 static void
2813 unblockThread(StgTSO *tso)
2814 {
2815   StgTSO *t, **last;
2816
2817   ACQUIRE_LOCK(&sched_mutex);
2818   switch (tso->why_blocked) {
2819
2820   case NotBlocked:
2821     return;  /* not blocked */
2822
2823   case BlockedOnMVar:
2824     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2825     {
2826       StgTSO *last_tso = END_TSO_QUEUE;
2827       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2828
2829       last = &mvar->head;
2830       for (t = mvar->head; t != END_TSO_QUEUE; 
2831            last = &t->link, last_tso = t, t = t->link) {
2832         if (t == tso) {
2833           *last = tso->link;
2834           if (mvar->tail == tso) {
2835             mvar->tail = last_tso;
2836           }
2837           goto done;
2838         }
2839       }
2840       barf("unblockThread (MVAR): TSO not found");
2841     }
2842
2843   case BlockedOnBlackHole:
2844     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2845     {
2846       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2847
2848       last = &bq->blocking_queue;
2849       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2850            last = &t->link, t = t->link) {
2851         if (t == tso) {
2852           *last = tso->link;
2853           goto done;
2854         }
2855       }
2856       barf("unblockThread (BLACKHOLE): TSO not found");
2857     }
2858
2859   case BlockedOnException:
2860     {
2861       StgTSO *target  = tso->block_info.tso;
2862
2863       ASSERT(get_itbl(target)->type == TSO);
2864
2865       while (target->what_next == ThreadRelocated) {
2866           target = target->link;
2867           ASSERT(get_itbl(target)->type == TSO);
2868       }
2869       
2870       ASSERT(target->blocked_exceptions != NULL);
2871
2872       last = &target->blocked_exceptions;
2873       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2874            last = &t->link, t = t->link) {
2875         ASSERT(get_itbl(t)->type == TSO);
2876         if (t == tso) {
2877           *last = tso->link;
2878           goto done;
2879         }
2880       }
2881       barf("unblockThread (Exception): TSO not found");
2882     }
2883
2884   case BlockedOnRead:
2885   case BlockedOnWrite:
2886     {
2887       StgTSO *prev = NULL;
2888       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2889            prev = t, t = t->link) {
2890         if (t == tso) {
2891           if (prev == NULL) {
2892             blocked_queue_hd = t->link;
2893             if (blocked_queue_tl == t) {
2894               blocked_queue_tl = END_TSO_QUEUE;
2895             }
2896           } else {
2897             prev->link = t->link;
2898             if (blocked_queue_tl == t) {
2899               blocked_queue_tl = prev;
2900             }
2901           }
2902           goto done;
2903         }
2904       }
2905       barf("unblockThread (I/O): TSO not found");
2906     }
2907
2908   case BlockedOnDelay:
2909     {
2910       StgTSO *prev = NULL;
2911       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2912            prev = t, t = t->link) {
2913         if (t == tso) {
2914           if (prev == NULL) {
2915             sleeping_queue = t->link;
2916           } else {
2917             prev->link = t->link;
2918           }
2919           goto done;
2920         }
2921       }
2922       barf("unblockThread (I/O): TSO not found");
2923     }
2924
2925   default:
2926     barf("unblockThread");
2927   }
2928
2929  done:
2930   tso->link = END_TSO_QUEUE;
2931   tso->why_blocked = NotBlocked;
2932   tso->block_info.closure = NULL;
2933   PUSH_ON_RUN_QUEUE(tso);
2934   RELEASE_LOCK(&sched_mutex);
2935 }
2936 #endif
2937
2938 /* -----------------------------------------------------------------------------
2939  * raiseAsync()
2940  *
2941  * The following function implements the magic for raising an
2942  * asynchronous exception in an existing thread.
2943  *
2944  * We first remove the thread from any queue on which it might be
2945  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2946  *
2947  * We strip the stack down to the innermost CATCH_FRAME, building
2948  * thunks in the heap for all the active computations, so they can 
2949  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2950  * an application of the handler to the exception, and push it on
2951  * the top of the stack.
2952  * 
2953  * How exactly do we save all the active computations?  We create an
2954  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2955  * AP_UPDs pushes everything from the corresponding update frame
2956  * upwards onto the stack.  (Actually, it pushes everything up to the
2957  * next update frame plus a pointer to the next AP_UPD object.
2958  * Entering the next AP_UPD object pushes more onto the stack until we
2959  * reach the last AP_UPD object - at which point the stack should look
2960  * exactly as it did when we killed the TSO and we can continue
2961  * execution by entering the closure on top of the stack.
2962  *
2963  * We can also kill a thread entirely - this happens if either (a) the 
2964  * exception passed to raiseAsync is NULL, or (b) there's no
2965  * CATCH_FRAME on the stack.  In either case, we strip the entire
2966  * stack and replace the thread with a zombie.
2967  *
2968  * -------------------------------------------------------------------------- */
2969  
2970 void 
2971 deleteThread(StgTSO *tso)
2972 {
2973   raiseAsync(tso,NULL);
2974 }
2975
2976 void
2977 raiseAsync(StgTSO *tso, StgClosure *exception)
2978 {
2979   StgUpdateFrame* su = tso->su;
2980   StgPtr          sp = tso->sp;
2981   
2982   /* Thread already dead? */
2983   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2984     return;
2985   }
2986
2987   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2988
2989   /* Remove it from any blocking queues */
2990   unblockThread(tso);
2991
2992   /* The stack freezing code assumes there's a closure pointer on
2993    * the top of the stack.  This isn't always the case with compiled
2994    * code, so we have to push a dummy closure on the top which just
2995    * returns to the next return address on the stack.
2996    */
2997   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2998     *(--sp) = (W_)&stg_dummy_ret_closure;
2999   }
3000
3001   while (1) {
3002     nat words = ((P_)su - (P_)sp) - 1;
3003     nat i;
3004     StgAP_UPD * ap;
3005
3006     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3007      * then build PAP(handler,exception,realworld#), and leave it on
3008      * top of the stack ready to enter.
3009      */
3010     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3011       StgCatchFrame *cf = (StgCatchFrame *)su;
3012       /* we've got an exception to raise, so let's pass it to the
3013        * handler in this frame.
3014        */
3015       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3016       TICK_ALLOC_UPD_PAP(3,0);
3017       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3018               
3019       ap->n_args = 2;
3020       ap->fun = cf->handler;    /* :: Exception -> IO a */
3021       ap->payload[0] = exception;
3022       ap->payload[1] = ARG_TAG(0); /* realworld token */
3023
3024       /* throw away the stack from Sp up to and including the
3025        * CATCH_FRAME.
3026        */
3027       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3028       tso->su = cf->link;
3029
3030       /* Restore the blocked/unblocked state for asynchronous exceptions
3031        * at the CATCH_FRAME.  
3032        *
3033        * If exceptions were unblocked at the catch, arrange that they
3034        * are unblocked again after executing the handler by pushing an
3035        * unblockAsyncExceptions_ret stack frame.
3036        */
3037       if (!cf->exceptions_blocked) {
3038         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3039       }
3040       
3041       /* Ensure that async exceptions are blocked when running the handler.
3042        */
3043       if (tso->blocked_exceptions == NULL) {
3044         tso->blocked_exceptions = END_TSO_QUEUE;
3045       }
3046       
3047       /* Put the newly-built PAP on top of the stack, ready to execute
3048        * when the thread restarts.
3049        */
3050       sp[0] = (W_)ap;
3051       tso->sp = sp;
3052       tso->what_next = ThreadEnterGHC;
3053       IF_DEBUG(sanity, checkTSO(tso));
3054       return;
3055     }
3056
3057     /* First build an AP_UPD consisting of the stack chunk above the
3058      * current update frame, with the top word on the stack as the
3059      * fun field.
3060      */
3061     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3062     
3063     ASSERT(words >= 0);
3064     
3065     ap->n_args = words;
3066     ap->fun    = (StgClosure *)sp[0];
3067     sp++;
3068     for(i=0; i < (nat)words; ++i) {
3069       ap->payload[i] = (StgClosure *)*sp++;
3070     }
3071     
3072     switch (get_itbl(su)->type) {
3073       
3074     case UPDATE_FRAME:
3075       {
3076         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3077         TICK_ALLOC_UP_THK(words+1,0);
3078         
3079         IF_DEBUG(scheduler,
3080                  fprintf(stderr,  "scheduler: Updating ");
3081                  printPtr((P_)su->updatee); 
3082                  fprintf(stderr,  " with ");
3083                  printObj((StgClosure *)ap);
3084                  );
3085         
3086         /* Replace the updatee with an indirection - happily
3087          * this will also wake up any threads currently
3088          * waiting on the result.
3089          *
3090          * Warning: if we're in a loop, more than one update frame on
3091          * the stack may point to the same object.  Be careful not to
3092          * overwrite an IND_OLDGEN in this case, because we'll screw
3093          * up the mutable lists.  To be on the safe side, don't
3094          * overwrite any kind of indirection at all.  See also
3095          * threadSqueezeStack in GC.c, where we have to make a similar
3096          * check.
3097          */
3098         if (!closure_IND(su->updatee)) {
3099             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3100         }
3101         su = su->link;
3102         sp += sizeofW(StgUpdateFrame) -1;
3103         sp[0] = (W_)ap; /* push onto stack */
3104         break;
3105       }
3106
3107     case CATCH_FRAME:
3108       {
3109         StgCatchFrame *cf = (StgCatchFrame *)su;
3110         StgClosure* o;
3111         
3112         /* We want a PAP, not an AP_UPD.  Fortunately, the
3113          * layout's the same.
3114          */
3115         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3116         TICK_ALLOC_UPD_PAP(words+1,0);
3117         
3118         /* now build o = FUN(catch,ap,handler) */
3119         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3120         TICK_ALLOC_FUN(2,0);
3121         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3122         o->payload[0] = (StgClosure *)ap;
3123         o->payload[1] = cf->handler;
3124         
3125         IF_DEBUG(scheduler,
3126                  fprintf(stderr,  "scheduler: Built ");
3127                  printObj((StgClosure *)o);
3128                  );
3129         
3130         /* pop the old handler and put o on the stack */
3131         su = cf->link;
3132         sp += sizeofW(StgCatchFrame) - 1;
3133         sp[0] = (W_)o;
3134         break;
3135       }
3136       
3137     case SEQ_FRAME:
3138       {
3139         StgSeqFrame *sf = (StgSeqFrame *)su;
3140         StgClosure* o;
3141         
3142         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3143         TICK_ALLOC_UPD_PAP(words+1,0);
3144         
3145         /* now build o = FUN(seq,ap) */
3146         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3147         TICK_ALLOC_SE_THK(1,0);
3148         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3149         o->payload[0] = (StgClosure *)ap;
3150         
3151         IF_DEBUG(scheduler,
3152                  fprintf(stderr,  "scheduler: Built ");
3153                  printObj((StgClosure *)o);
3154                  );
3155         
3156         /* pop the old handler and put o on the stack */
3157         su = sf->link;
3158         sp += sizeofW(StgSeqFrame) - 1;
3159         sp[0] = (W_)o;
3160         break;
3161       }
3162       
3163     case STOP_FRAME:
3164       /* We've stripped the entire stack, the thread is now dead. */
3165       sp += sizeofW(StgStopFrame) - 1;
3166       sp[0] = (W_)exception;    /* save the exception */
3167       tso->what_next = ThreadKilled;
3168       tso->su = (StgUpdateFrame *)(sp+1);
3169       tso->sp = sp;
3170       return;
3171
3172     default:
3173       barf("raiseAsync");
3174     }
3175   }
3176   barf("raiseAsync");
3177 }
3178
3179 /* -----------------------------------------------------------------------------
3180    resurrectThreads is called after garbage collection on the list of
3181    threads found to be garbage.  Each of these threads will be woken
3182    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3183    on an MVar, or NonTermination if the thread was blocked on a Black
3184    Hole.
3185    -------------------------------------------------------------------------- */
3186
3187 void
3188 resurrectThreads( StgTSO *threads )
3189 {
3190   StgTSO *tso, *next;
3191
3192   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3193     next = tso->global_link;
3194     tso->global_link = all_threads;
3195     all_threads = tso;
3196     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3197
3198     switch (tso->why_blocked) {
3199     case BlockedOnMVar:
3200     case BlockedOnException:
3201       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3202       break;
3203     case BlockedOnBlackHole:
3204       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3205       break;
3206     case NotBlocked:
3207       /* This might happen if the thread was blocked on a black hole
3208        * belonging to a thread that we've just woken up (raiseAsync
3209        * can wake up threads, remember...).
3210        */
3211       continue;
3212     default:
3213       barf("resurrectThreads: thread blocked in a strange way");
3214     }
3215   }
3216 }
3217
3218 /* -----------------------------------------------------------------------------
3219  * Blackhole detection: if we reach a deadlock, test whether any
3220  * threads are blocked on themselves.  Any threads which are found to
3221  * be self-blocked get sent a NonTermination exception.
3222  *
3223  * This is only done in a deadlock situation in order to avoid
3224  * performance overhead in the normal case.
3225  * -------------------------------------------------------------------------- */
3226
3227 static void
3228 detectBlackHoles( void )
3229 {
3230     StgTSO *t = all_threads;
3231     StgUpdateFrame *frame;
3232     StgClosure *blocked_on;
3233
3234     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3235
3236         while (t->what_next == ThreadRelocated) {
3237             t = t->link;
3238             ASSERT(get_itbl(t)->type == TSO);
3239         }
3240       
3241         if (t->why_blocked != BlockedOnBlackHole) {
3242             continue;
3243         }
3244
3245         blocked_on = t->block_info.closure;
3246
3247         for (frame = t->su; ; frame = frame->link) {
3248             switch (get_itbl(frame)->type) {
3249
3250             case UPDATE_FRAME:
3251                 if (frame->updatee == blocked_on) {
3252                     /* We are blocking on one of our own computations, so
3253                      * send this thread the NonTermination exception.  
3254                      */
3255                     IF_DEBUG(scheduler, 
3256                              sched_belch("thread %d is blocked on itself", t->id));
3257                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3258                     goto done;
3259                 }
3260                 else {
3261                     continue;
3262                 }
3263
3264             case CATCH_FRAME:
3265             case SEQ_FRAME:
3266                 continue;
3267                 
3268             case STOP_FRAME:
3269                 break;
3270             }
3271             break;
3272         }
3273
3274     done: ;
3275     }   
3276 }
3277
3278 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3279 //@subsection Debugging Routines
3280
3281 /* -----------------------------------------------------------------------------
3282    Debugging: why is a thread blocked
3283    -------------------------------------------------------------------------- */
3284
3285 #ifdef DEBUG
3286
3287 void
3288 printThreadBlockage(StgTSO *tso)
3289 {
3290   switch (tso->why_blocked) {
3291   case BlockedOnRead:
3292     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3293     break;
3294   case BlockedOnWrite:
3295     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3296     break;
3297   case BlockedOnDelay:
3298     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3299     break;
3300   case BlockedOnMVar:
3301     fprintf(stderr,"is blocked on an MVar");
3302     break;
3303   case BlockedOnException:
3304     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3305             tso->block_info.tso->id);
3306     break;
3307   case BlockedOnBlackHole:
3308     fprintf(stderr,"is blocked on a black hole");
3309     break;
3310   case NotBlocked:
3311     fprintf(stderr,"is not blocked");
3312     break;
3313 #if defined(PAR)
3314   case BlockedOnGA:
3315     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3316             tso->block_info.closure, info_type(tso->block_info.closure));
3317     break;
3318   case BlockedOnGA_NoSend:
3319     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3320             tso->block_info.closure, info_type(tso->block_info.closure));
3321     break;
3322 #endif
3323   default:
3324     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3325          tso->why_blocked, tso->id, tso);
3326   }
3327 }
3328
3329 void
3330 printThreadStatus(StgTSO *tso)
3331 {
3332   switch (tso->what_next) {
3333   case ThreadKilled:
3334     fprintf(stderr,"has been killed");
3335     break;
3336   case ThreadComplete:
3337     fprintf(stderr,"has completed");
3338     break;
3339   default:
3340     printThreadBlockage(tso);
3341   }
3342 }
3343
3344 void
3345 printAllThreads(void)
3346 {
3347   StgTSO *t;
3348
3349 # if defined(GRAN)
3350   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3351   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3352                        time_string, rtsFalse/*no commas!*/);
3353
3354   sched_belch("all threads at [%s]:", time_string);
3355 # elif defined(PAR)
3356   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3357   ullong_format_string(CURRENT_TIME,
3358                        time_string, rtsFalse/*no commas!*/);
3359
3360   sched_belch("all threads at [%s]:", time_string);
3361 # else
3362   sched_belch("all threads:");
3363 # endif
3364
3365   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3366     fprintf(stderr, "\tthread %d ", t->id);
3367     printThreadStatus(t);
3368     fprintf(stderr,"\n");
3369   }
3370 }
3371     
3372 /* 
3373    Print a whole blocking queue attached to node (debugging only).
3374 */
3375 //@cindex print_bq
3376 # if defined(PAR)
3377 void 
3378 print_bq (StgClosure *node)
3379 {
3380   StgBlockingQueueElement *bqe;
3381   StgTSO *tso;
3382   rtsBool end;
3383
3384   fprintf(stderr,"## BQ of closure %p (%s): ",
3385           node, info_type(node));
3386
3387   /* should cover all closures that may have a blocking queue */
3388   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3389          get_itbl(node)->type == FETCH_ME_BQ ||
3390          get_itbl(node)->type == RBH ||
3391          get_itbl(node)->type == MVAR);
3392     
3393   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3394
3395   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3396 }
3397
3398 /* 
3399    Print a whole blocking queue starting with the element bqe.
3400 */
3401 void 
3402 print_bqe (StgBlockingQueueElement *bqe)
3403 {
3404   rtsBool end;
3405
3406   /* 
3407      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3408   */
3409   for (end = (bqe==END_BQ_QUEUE);
3410        !end; // iterate until bqe points to a CONSTR
3411        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3412        bqe = end ? END_BQ_QUEUE : bqe->link) {
3413     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3414     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3415     /* types of closures that may appear in a blocking queue */
3416     ASSERT(get_itbl(bqe)->type == TSO ||           
3417            get_itbl(bqe)->type == BLOCKED_FETCH || 
3418            get_itbl(bqe)->type == CONSTR); 
3419     /* only BQs of an RBH end with an RBH_Save closure */
3420     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3421
3422     switch (get_itbl(bqe)->type) {
3423     case TSO:
3424       fprintf(stderr," TSO %u (%x),",
3425               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3426       break;
3427     case BLOCKED_FETCH:
3428       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3429               ((StgBlockedFetch *)bqe)->node, 
3430               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3431               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3432               ((StgBlockedFetch *)bqe)->ga.weight);
3433       break;
3434     case CONSTR:
3435       fprintf(stderr," %s (IP %p),",
3436               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3437                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3438                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3439                "RBH_Save_?"), get_itbl(bqe));
3440       break;
3441     default:
3442       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3443            info_type((StgClosure *)bqe)); // , node, info_type(node));
3444       break;
3445     }
3446   } /* for */
3447   fputc('\n', stderr);
3448 }
3449 # elif defined(GRAN)
3450 void 
3451 print_bq (StgClosure *node)
3452 {
3453   StgBlockingQueueElement *bqe;
3454   PEs node_loc, tso_loc;
3455   rtsBool end;
3456
3457   /* should cover all closures that may have a blocking queue */
3458   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3459          get_itbl(node)->type == FETCH_ME_BQ ||
3460          get_itbl(node)->type == RBH);
3461     
3462   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3463   node_loc = where_is(node);
3464
3465   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3466           node, info_type(node), node_loc);
3467
3468   /* 
3469      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3470   */
3471   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3472        !end; // iterate until bqe points to a CONSTR
3473        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3474     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3475     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3476     /* types of closures that may appear in a blocking queue */
3477     ASSERT(get_itbl(bqe)->type == TSO ||           
3478            get_itbl(bqe)->type == CONSTR); 
3479     /* only BQs of an RBH end with an RBH_Save closure */
3480     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3481
3482     tso_loc = where_is((StgClosure *)bqe);
3483     switch (get_itbl(bqe)->type) {
3484     case TSO:
3485       fprintf(stderr," TSO %d (%p) on [PE %d],",
3486               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3487       break;
3488     case CONSTR:
3489       fprintf(stderr," %s (IP %p),",
3490               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3491                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3492                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3493                "RBH_Save_?"), get_itbl(bqe));
3494       break;
3495     default:
3496       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3497            info_type((StgClosure *)bqe), node, info_type(node));
3498       break;
3499     }
3500   } /* for */
3501   fputc('\n', stderr);
3502 }
3503 #else
3504 /* 
3505    Nice and easy: only TSOs on the blocking queue
3506 */
3507 void 
3508 print_bq (StgClosure *node)
3509 {
3510   StgTSO *tso;
3511
3512   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3513   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3514        tso != END_TSO_QUEUE; 
3515        tso=tso->link) {
3516     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3517     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3518     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3519   }
3520   fputc('\n', stderr);
3521 }
3522 # endif
3523
3524 #if defined(PAR)
3525 static nat
3526 run_queue_len(void)
3527 {
3528   nat i;
3529   StgTSO *tso;
3530
3531   for (i=0, tso=run_queue_hd; 
3532        tso != END_TSO_QUEUE;
3533        i++, tso=tso->link)
3534     /* nothing */
3535
3536   return i;
3537 }
3538 #endif
3539
3540 static void
3541 sched_belch(char *s, ...)
3542 {
3543   va_list ap;
3544   va_start(ap,s);
3545 #ifdef SMP
3546   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3547 #elif defined(PAR)
3548   fprintf(stderr, "== ");
3549 #else
3550   fprintf(stderr, "scheduler: ");
3551 #endif
3552   vfprintf(stderr, s, ap);
3553   fprintf(stderr, "\n");
3554 }
3555
3556 #endif /* DEBUG */
3557
3558
3559 //@node Index,  , Debugging Routines, Main scheduling code
3560 //@subsection Index
3561
3562 //@index
3563 //* MainRegTable::  @cindex\s-+MainRegTable
3564 //* StgMainThread::  @cindex\s-+StgMainThread
3565 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3566 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3567 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3568 //* context_switch::  @cindex\s-+context_switch
3569 //* createThread::  @cindex\s-+createThread
3570 //* free_capabilities::  @cindex\s-+free_capabilities
3571 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3572 //* initScheduler::  @cindex\s-+initScheduler
3573 //* interrupted::  @cindex\s-+interrupted
3574 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3575 //* next_thread_id::  @cindex\s-+next_thread_id
3576 //* print_bq::  @cindex\s-+print_bq
3577 //* run_queue_hd::  @cindex\s-+run_queue_hd
3578 //* run_queue_tl::  @cindex\s-+run_queue_tl
3579 //* sched_mutex::  @cindex\s-+sched_mutex
3580 //* schedule::  @cindex\s-+schedule
3581 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3582 //* task_ids::  @cindex\s-+task_ids
3583 //* term_mutex::  @cindex\s-+term_mutex
3584 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3585 //@end index