[project @ 2001-07-24 06:31:35 by ken]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.98 2001/07/24 06:31:36 ken Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR       Parallel execution on a distributed memory machine
14  * s    SMP      SMP       Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN      Simulation of parallel execution
16  * md   GUM/GdH  DIST      Distributed execution (based on GUM)
17  * --------------------------------------------------------------------------*/
18
19 //@node Main scheduling code, , ,
20 //@section Main scheduling code
21
22 /* 
23  * Version with scheduler monitor support for SMPs (WAY=s):
24
25    This design provides a high-level API to create and schedule threads etc.
26    as documented in the SMP design document.
27
28    It uses a monitor design controlled by a single mutex to exercise control
29    over accesses to shared data structures, and builds on the Posix threads
30    library.
31
32    The majority of state is shared.  In order to keep essential per-task state,
33    there is a Capability structure, which contains all the information
34    needed to run a thread: its STG registers, a pointer to its TSO, a
35    nursery etc.  During STG execution, a pointer to the capability is
36    kept in a register (BaseReg).
37
38    In a non-SMP build, there is one global capability, namely MainRegTable.
39
40    SDM & KH, 10/99
41
42  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
43
44    The main scheduling loop in GUM iterates until a finish message is received.
45    In that case a global flag @receivedFinish@ is set and this instance of
46    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
47    for the handling of incoming messages, such as PP_FINISH.
48    Note that in the parallel case we have a system manager that coordinates
49    different PEs, each of which are running one instance of the RTS.
50    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
51    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
52
53  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
54
55    The main scheduling code in GranSim is quite different from that in std
56    (concurrent) Haskell: while concurrent Haskell just iterates over the
57    threads in the runnable queue, GranSim is event driven, i.e. it iterates
58    over the events in the global event queue.  -- HWL
59 */
60
61 //@menu
62 //* Includes::                  
63 //* Variables and Data structures::  
64 //* Main scheduling loop::      
65 //* Suspend and Resume::        
66 //* Run queue code::            
67 //* Garbage Collextion Routines::  
68 //* Blocking Queue Routines::   
69 //* Exception Handling Routines::  
70 //* Debugging Routines::        
71 //* Index::                     
72 //@end menu
73
74 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
75 //@subsection Includes
76
77 #include "Rts.h"
78 #include "SchedAPI.h"
79 #include "RtsUtils.h"
80 #include "RtsFlags.h"
81 #include "Storage.h"
82 #include "StgRun.h"
83 #include "StgStartup.h"
84 #include "Hooks.h"
85 #include "Schedule.h"
86 #include "StgMiscClosures.h"
87 #include "Storage.h"
88 #include "Interpreter.h"
89 #include "Exception.h"
90 #include "Printer.h"
91 #include "Main.h"
92 #include "Signals.h"
93 #include "Sanity.h"
94 #include "Stats.h"
95 #include "Itimer.h"
96 #include "Prelude.h"
97 #if defined(GRAN) || defined(PAR)
98 # include "GranSimRts.h"
99 # include "GranSim.h"
100 # include "ParallelRts.h"
101 # include "Parallel.h"
102 # include "ParallelDebug.h"
103 # include "FetchMe.h"
104 # include "HLC.h"
105 #endif
106 #include "Sparks.h"
107
108 #include <stdarg.h>
109
110 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
111 //@subsection Variables and Data structures
112
113 /* Main threads:
114  *
115  * These are the threads which clients have requested that we run.  
116  *
117  * In an SMP build, we might have several concurrent clients all
118  * waiting for results, and each one will wait on a condition variable
119  * until the result is available.
120  *
121  * In non-SMP, clients are strictly nested: the first client calls
122  * into the RTS, which might call out again to C with a _ccall_GC, and
123  * eventually re-enter the RTS.
124  *
125  * Main threads information is kept in a linked list:
126  */
127 //@cindex StgMainThread
128 typedef struct StgMainThread_ {
129   StgTSO *         tso;
130   SchedulerStatus  stat;
131   StgClosure **    ret;
132 #ifdef SMP
133   pthread_cond_t wakeup;
134 #endif
135   struct StgMainThread_ *link;
136 } StgMainThread;
137
138 /* Main thread queue.
139  * Locks required: sched_mutex.
140  */
141 static StgMainThread *main_threads;
142
143 /* Thread queues.
144  * Locks required: sched_mutex.
145  */
146 #if defined(GRAN)
147
148 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
149 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
150
151 /* 
152    In GranSim we have a runable and a blocked queue for each processor.
153    In order to minimise code changes new arrays run_queue_hds/tls
154    are created. run_queue_hd is then a short cut (macro) for
155    run_queue_hds[CurrentProc] (see GranSim.h).
156    -- HWL
157 */
158 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
159 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
160 StgTSO *ccalling_threadss[MAX_PROC];
161 /* We use the same global list of threads (all_threads) in GranSim as in
162    the std RTS (i.e. we are cheating). However, we don't use this list in
163    the GranSim specific code at the moment (so we are only potentially
164    cheating).  */
165
166 #else /* !GRAN */
167
168 StgTSO *run_queue_hd, *run_queue_tl;
169 StgTSO *blocked_queue_hd, *blocked_queue_tl;
170 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
171
172 #endif
173
174 /* Linked list of all threads.
175  * Used for detecting garbage collected threads.
176  */
177 StgTSO *all_threads;
178
179 /* Threads suspended in _ccall_GC.
180  */
181 static StgTSO *suspended_ccalling_threads;
182
183 static void GetRoots(evac_fn);
184 static StgTSO *threadStackOverflow(StgTSO *tso);
185
186 /* KH: The following two flags are shared memory locations.  There is no need
187        to lock them, since they are only unset at the end of a scheduler
188        operation.
189 */
190
191 /* flag set by signal handler to precipitate a context switch */
192 //@cindex context_switch
193 nat context_switch;
194
195 /* if this flag is set as well, give up execution */
196 //@cindex interrupted
197 rtsBool interrupted;
198
199 /* Next thread ID to allocate.
200  * Locks required: sched_mutex
201  */
202 //@cindex next_thread_id
203 StgThreadID next_thread_id = 1;
204
205 /*
206  * Pointers to the state of the current thread.
207  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
208  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
209  */
210  
211 /* The smallest stack size that makes any sense is:
212  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
213  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
214  *  + 1                       (the realworld token for an IO thread)
215  *  + 1                       (the closure to enter)
216  *
217  * A thread with this stack will bomb immediately with a stack
218  * overflow, which will increase its stack size.  
219  */
220
221 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
222
223 /* Free capability list.
224  * Locks required: sched_mutex.
225  */
226 #ifdef SMP
227 //@cindex free_capabilities
228 //@cindex n_free_capabilities
229 Capability *free_capabilities; /* Available capabilities for running threads */
230 nat n_free_capabilities;       /* total number of available capabilities */
231 #else
232 //@cindex MainRegTable
233 Capability MainRegTable;       /* for non-SMP, we have one global capability */
234 #endif
235
236 #if defined(GRAN)
237 StgTSO *CurrentTSO;
238 #endif
239
240 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
241  *  exists - earlier gccs apparently didn't.
242  *  -= chak
243  */
244 StgTSO dummy_tso;
245
246 rtsBool ready_to_gc;
247
248 /* All our current task ids, saved in case we need to kill them later.
249  */
250 #ifdef SMP
251 //@cindex task_ids
252 task_info *task_ids;
253 #endif
254
255 void            addToBlockedQueue ( StgTSO *tso );
256
257 static void     schedule          ( void );
258        void     interruptStgRts   ( void );
259 #if defined(GRAN)
260 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
261 #else
262 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
263 #endif
264
265 static void     detectBlackHoles  ( void );
266
267 #ifdef DEBUG
268 static void sched_belch(char *s, ...);
269 #endif
270
271 #ifdef SMP
272 //@cindex sched_mutex
273 //@cindex term_mutex
274 //@cindex thread_ready_cond
275 //@cindex gc_pending_cond
276 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
277 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
278 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
279 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
280
281 nat await_death;
282 #endif
283
284 #if defined(PAR)
285 StgTSO *LastTSO;
286 rtsTime TimeOfLastYield;
287 rtsBool emitSchedule = rtsTrue;
288 #endif
289
290 #if DEBUG
291 char *whatNext_strs[] = {
292   "ThreadEnterGHC",
293   "ThreadRunGHC",
294   "ThreadEnterInterp",
295   "ThreadKilled",
296   "ThreadComplete"
297 };
298
299 char *threadReturnCode_strs[] = {
300   "HeapOverflow",                       /* might also be StackOverflow */
301   "StackOverflow",
302   "ThreadYielding",
303   "ThreadBlocked",
304   "ThreadFinished"
305 };
306 #endif
307
308 #ifdef PAR
309 StgTSO * createSparkThread(rtsSpark spark);
310 StgTSO * activateSpark (rtsSpark spark);  
311 #endif
312
313 /*
314  * The thread state for the main thread.
315 // ToDo: check whether not needed any more
316 StgTSO   *MainTSO;
317  */
318
319 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
320 //@subsection Main scheduling loop
321
322 /* ---------------------------------------------------------------------------
323    Main scheduling loop.
324
325    We use round-robin scheduling, each thread returning to the
326    scheduler loop when one of these conditions is detected:
327
328       * out of heap space
329       * timer expires (thread yields)
330       * thread blocks
331       * thread ends
332       * stack overflow
333
334    Locking notes:  we acquire the scheduler lock once at the beginning
335    of the scheduler loop, and release it when
336     
337       * running a thread, or
338       * waiting for work, or
339       * waiting for a GC to complete.
340
341    GRAN version:
342      In a GranSim setup this loop iterates over the global event queue.
343      This revolves around the global event queue, which determines what 
344      to do next. Therefore, it's more complicated than either the 
345      concurrent or the parallel (GUM) setup.
346
347    GUM version:
348      GUM iterates over incoming messages.
349      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
350      and sends out a fish whenever it has nothing to do; in-between
351      doing the actual reductions (shared code below) it processes the
352      incoming messages and deals with delayed operations 
353      (see PendingFetches).
354      This is not the ugliest code you could imagine, but it's bloody close.
355
356    ------------------------------------------------------------------------ */
357 //@cindex schedule
358 static void
359 schedule( void )
360 {
361   StgTSO *t;
362   Capability *cap;
363   StgThreadReturnCode ret;
364 #if defined(GRAN)
365   rtsEvent *event;
366 #elif defined(PAR)
367   StgSparkPool *pool;
368   rtsSpark spark;
369   StgTSO *tso;
370   GlobalTaskId pe;
371   rtsBool receivedFinish = rtsFalse;
372 # if defined(DEBUG)
373   nat tp_size, sp_size; // stats only
374 # endif
375 #endif
376   rtsBool was_interrupted = rtsFalse;
377   
378   ACQUIRE_LOCK(&sched_mutex);
379
380 #if defined(GRAN)
381
382   /* set up first event to get things going */
383   /* ToDo: assign costs for system setup and init MainTSO ! */
384   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
385             ContinueThread, 
386             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
387
388   IF_DEBUG(gran,
389            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
390            G_TSO(CurrentTSO, 5));
391
392   if (RtsFlags.GranFlags.Light) {
393     /* Save current time; GranSim Light only */
394     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
395   }      
396
397   event = get_next_event();
398
399   while (event!=(rtsEvent*)NULL) {
400     /* Choose the processor with the next event */
401     CurrentProc = event->proc;
402     CurrentTSO = event->tso;
403
404 #elif defined(PAR)
405
406   while (!receivedFinish) {    /* set by processMessages */
407                                /* when receiving PP_FINISH message         */ 
408 #else
409
410   while (1) {
411
412 #endif
413
414     IF_DEBUG(scheduler, printAllThreads());
415
416     /* If we're interrupted (the user pressed ^C, or some other
417      * termination condition occurred), kill all the currently running
418      * threads.
419      */
420     if (interrupted) {
421       IF_DEBUG(scheduler, sched_belch("interrupted"));
422       deleteAllThreads();
423       interrupted = rtsFalse;
424       was_interrupted = rtsTrue;
425     }
426
427     /* Go through the list of main threads and wake up any
428      * clients whose computations have finished.  ToDo: this
429      * should be done more efficiently without a linear scan
430      * of the main threads list, somehow...
431      */
432 #ifdef SMP
433     { 
434       StgMainThread *m, **prev;
435       prev = &main_threads;
436       for (m = main_threads; m != NULL; m = m->link) {
437         switch (m->tso->what_next) {
438         case ThreadComplete:
439           if (m->ret) {
440             *(m->ret) = (StgClosure *)m->tso->sp[0];
441           }
442           *prev = m->link;
443           m->stat = Success;
444           pthread_cond_broadcast(&m->wakeup);
445           break;
446         case ThreadKilled:
447           *prev = m->link;
448           if (was_interrupted) {
449             m->stat = Interrupted;
450           } else {
451             m->stat = Killed;
452           }
453           pthread_cond_broadcast(&m->wakeup);
454           break;
455         default:
456           break;
457         }
458       }
459     }
460
461 #else
462 # if defined(PAR)
463     /* in GUM do this only on the Main PE */
464     if (IAmMainThread)
465 # endif
466     /* If our main thread has finished or been killed, return.
467      */
468     {
469       StgMainThread *m = main_threads;
470       if (m->tso->what_next == ThreadComplete
471           || m->tso->what_next == ThreadKilled) {
472         main_threads = main_threads->link;
473         if (m->tso->what_next == ThreadComplete) {
474           /* we finished successfully, fill in the return value */
475           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
476           m->stat = Success;
477           return;
478         } else {
479           if (was_interrupted) {
480             m->stat = Interrupted;
481           } else {
482             m->stat = Killed;
483           }
484           return;
485         }
486       }
487     }
488 #endif
489
490     /* Top up the run queue from our spark pool.  We try to make the
491      * number of threads in the run queue equal to the number of
492      * free capabilities.
493      */
494 #if defined(SMP)
495     {
496       nat n = n_free_capabilities;
497       StgTSO *tso = run_queue_hd;
498
499       /* Count the run queue */
500       while (n > 0 && tso != END_TSO_QUEUE) {
501         tso = tso->link;
502         n--;
503       }
504
505       for (; n > 0; n--) {
506         StgClosure *spark;
507         spark = findSpark(rtsFalse);
508         if (spark == NULL) {
509           break; /* no more sparks in the pool */
510         } else {
511           /* I'd prefer this to be done in activateSpark -- HWL */
512           /* tricky - it needs to hold the scheduler lock and
513            * not try to re-acquire it -- SDM */
514           createSparkThread(spark);       
515           IF_DEBUG(scheduler,
516                    sched_belch("==^^ turning spark of closure %p into a thread",
517                                (StgClosure *)spark));
518         }
519       }
520       /* We need to wake up the other tasks if we just created some
521        * work for them.
522        */
523       if (n_free_capabilities - n > 1) {
524           pthread_cond_signal(&thread_ready_cond);
525       }
526     }
527 #endif /* SMP */
528
529     /* Check whether any waiting threads need to be woken up.  If the
530      * run queue is empty, and there are no other tasks running, we
531      * can wait indefinitely for something to happen.
532      * ToDo: what if another client comes along & requests another
533      * main thread?
534      */
535     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
536       awaitEvent(
537            (run_queue_hd == END_TSO_QUEUE)
538 #ifdef SMP
539         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
540 #endif
541         );
542     }
543     /* we can be interrupted while waiting for I/O... */
544     if (interrupted) continue;
545
546     /* check for signals each time around the scheduler */
547 #ifndef mingw32_TARGET_OS
548     if (signals_pending()) {
549       start_signal_handlers();
550     }
551 #endif
552
553     /* 
554      * Detect deadlock: when we have no threads to run, there are no
555      * threads waiting on I/O or sleeping, and all the other tasks are
556      * waiting for work, we must have a deadlock of some description.
557      *
558      * We first try to find threads blocked on themselves (ie. black
559      * holes), and generate NonTermination exceptions where necessary.
560      *
561      * If no threads are black holed, we have a deadlock situation, so
562      * inform all the main threads.
563      */
564 #ifndef PAR
565     if (blocked_queue_hd == END_TSO_QUEUE
566         && run_queue_hd == END_TSO_QUEUE
567         && sleeping_queue == END_TSO_QUEUE
568 #ifdef SMP
569         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
570 #endif
571         )
572     {
573         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
574         GarbageCollect(GetRoots,rtsTrue);
575         if (blocked_queue_hd == END_TSO_QUEUE
576             && run_queue_hd == END_TSO_QUEUE
577             && sleeping_queue == END_TSO_QUEUE) {
578             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
579             detectBlackHoles();
580             if (run_queue_hd == END_TSO_QUEUE) {
581                 StgMainThread *m = main_threads;
582 #ifdef SMP
583                 for (; m != NULL; m = m->link) {
584                     m->ret = NULL;
585                     m->stat = Deadlock;
586                     pthread_cond_broadcast(&m->wakeup);
587                 }
588                 main_threads = NULL;
589 #else
590                 m->ret = NULL;
591                 m->stat = Deadlock;
592                 main_threads = m->link;
593                 return;
594 #endif
595             }
596         }
597     }
598 #elif defined(PAR)
599     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
600 #endif
601
602 #ifdef SMP
603     /* If there's a GC pending, don't do anything until it has
604      * completed.
605      */
606     if (ready_to_gc) {
607       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
608       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
609     }
610     
611     /* block until we've got a thread on the run queue and a free
612      * capability.
613      */
614     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
615       IF_DEBUG(scheduler, sched_belch("waiting for work"));
616       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
617       IF_DEBUG(scheduler, sched_belch("work now available"));
618     }
619 #endif
620
621 #if defined(GRAN)
622
623     if (RtsFlags.GranFlags.Light)
624       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
625
626     /* adjust time based on time-stamp */
627     if (event->time > CurrentTime[CurrentProc] &&
628         event->evttype != ContinueThread)
629       CurrentTime[CurrentProc] = event->time;
630     
631     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
632     if (!RtsFlags.GranFlags.Light)
633       handleIdlePEs();
634
635     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
636
637     /* main event dispatcher in GranSim */
638     switch (event->evttype) {
639       /* Should just be continuing execution */
640     case ContinueThread:
641       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
642       /* ToDo: check assertion
643       ASSERT(run_queue_hd != (StgTSO*)NULL &&
644              run_queue_hd != END_TSO_QUEUE);
645       */
646       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
647       if (!RtsFlags.GranFlags.DoAsyncFetch &&
648           procStatus[CurrentProc]==Fetching) {
649         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
650               CurrentTSO->id, CurrentTSO, CurrentProc);
651         goto next_thread;
652       } 
653       /* Ignore ContinueThreads for completed threads */
654       if (CurrentTSO->what_next == ThreadComplete) {
655         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
656               CurrentTSO->id, CurrentTSO, CurrentProc);
657         goto next_thread;
658       } 
659       /* Ignore ContinueThreads for threads that are being migrated */
660       if (PROCS(CurrentTSO)==Nowhere) { 
661         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
662               CurrentTSO->id, CurrentTSO, CurrentProc);
663         goto next_thread;
664       }
665       /* The thread should be at the beginning of the run queue */
666       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
667         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
668               CurrentTSO->id, CurrentTSO, CurrentProc);
669         break; // run the thread anyway
670       }
671       /*
672       new_event(proc, proc, CurrentTime[proc],
673                 FindWork,
674                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
675       goto next_thread; 
676       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
677       break; // now actually run the thread; DaH Qu'vam yImuHbej 
678
679     case FetchNode:
680       do_the_fetchnode(event);
681       goto next_thread;             /* handle next event in event queue  */
682       
683     case GlobalBlock:
684       do_the_globalblock(event);
685       goto next_thread;             /* handle next event in event queue  */
686       
687     case FetchReply:
688       do_the_fetchreply(event);
689       goto next_thread;             /* handle next event in event queue  */
690       
691     case UnblockThread:   /* Move from the blocked queue to the tail of */
692       do_the_unblock(event);
693       goto next_thread;             /* handle next event in event queue  */
694       
695     case ResumeThread:  /* Move from the blocked queue to the tail of */
696       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
697       event->tso->gran.blocktime += 
698         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
699       do_the_startthread(event);
700       goto next_thread;             /* handle next event in event queue  */
701       
702     case StartThread:
703       do_the_startthread(event);
704       goto next_thread;             /* handle next event in event queue  */
705       
706     case MoveThread:
707       do_the_movethread(event);
708       goto next_thread;             /* handle next event in event queue  */
709       
710     case MoveSpark:
711       do_the_movespark(event);
712       goto next_thread;             /* handle next event in event queue  */
713       
714     case FindWork:
715       do_the_findwork(event);
716       goto next_thread;             /* handle next event in event queue  */
717       
718     default:
719       barf("Illegal event type %u\n", event->evttype);
720     }  /* switch */
721     
722     /* This point was scheduler_loop in the old RTS */
723
724     IF_DEBUG(gran, belch("GRAN: after main switch"));
725
726     TimeOfLastEvent = CurrentTime[CurrentProc];
727     TimeOfNextEvent = get_time_of_next_event();
728     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
729     // CurrentTSO = ThreadQueueHd;
730
731     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
732                          TimeOfNextEvent));
733
734     if (RtsFlags.GranFlags.Light) 
735       GranSimLight_leave_system(event, &ActiveTSO); 
736
737     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
738
739     IF_DEBUG(gran, 
740              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
741
742     /* in a GranSim setup the TSO stays on the run queue */
743     t = CurrentTSO;
744     /* Take a thread from the run queue. */
745     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
746
747     IF_DEBUG(gran, 
748              fprintf(stderr, "GRAN: About to run current thread, which is\n");
749              G_TSO(t,5));
750
751     context_switch = 0; // turned on via GranYield, checking events and time slice
752
753     IF_DEBUG(gran, 
754              DumpGranEvent(GR_SCHEDULE, t));
755
756     procStatus[CurrentProc] = Busy;
757
758 #elif defined(PAR)
759     if (PendingFetches != END_BF_QUEUE) {
760         processFetches();
761     }
762
763     /* ToDo: phps merge with spark activation above */
764     /* check whether we have local work and send requests if we have none */
765     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
766       /* :-[  no local threads => look out for local sparks */
767       /* the spark pool for the current PE */
768       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
769       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
770           pool->hd < pool->tl) {
771         /* 
772          * ToDo: add GC code check that we really have enough heap afterwards!!
773          * Old comment:
774          * If we're here (no runnable threads) and we have pending
775          * sparks, we must have a space problem.  Get enough space
776          * to turn one of those pending sparks into a
777          * thread... 
778          */
779
780         spark = findSpark(rtsFalse);                /* get a spark */
781         if (spark != (rtsSpark) NULL) {
782           tso = activateSpark(spark);       /* turn the spark into a thread */
783           IF_PAR_DEBUG(schedule,
784                        belch("==== schedule: Created TSO %d (%p); %d threads active",
785                              tso->id, tso, advisory_thread_count));
786
787           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
788             belch("==^^ failed to activate spark");
789             goto next_thread;
790           }               /* otherwise fall through & pick-up new tso */
791         } else {
792           IF_PAR_DEBUG(verbose,
793                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
794                              spark_queue_len(pool)));
795           goto next_thread;
796         }
797       }
798
799       /* If we still have no work we need to send a FISH to get a spark
800          from another PE 
801       */
802       if (EMPTY_RUN_QUEUE()) {
803       /* =8-[  no local sparks => look for work on other PEs */
804         /*
805          * We really have absolutely no work.  Send out a fish
806          * (there may be some out there already), and wait for
807          * something to arrive.  We clearly can't run any threads
808          * until a SCHEDULE or RESUME arrives, and so that's what
809          * we're hoping to see.  (Of course, we still have to
810          * respond to other types of messages.)
811          */
812         TIME now = msTime() /*CURRENT_TIME*/;
813         IF_PAR_DEBUG(verbose, 
814                      belch("--  now=%ld", now));
815         IF_PAR_DEBUG(verbose,
816                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
817                          (last_fish_arrived_at!=0 &&
818                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
819                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
820                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
821                              last_fish_arrived_at,
822                              RtsFlags.ParFlags.fishDelay, now);
823                      });
824         
825         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
826             (last_fish_arrived_at==0 ||
827              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
828           /* outstandingFishes is set in sendFish, processFish;
829              avoid flooding system with fishes via delay */
830           pe = choosePE();
831           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
832                    NEW_FISH_HUNGER);
833
834           // Global statistics: count no. of fishes
835           if (RtsFlags.ParFlags.ParStats.Global &&
836               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
837             globalParStats.tot_fish_mess++;
838           }
839         }
840       
841         receivedFinish = processMessages();
842         goto next_thread;
843       }
844     } else if (PacketsWaiting()) {  /* Look for incoming messages */
845       receivedFinish = processMessages();
846     }
847
848     /* Now we are sure that we have some work available */
849     ASSERT(run_queue_hd != END_TSO_QUEUE);
850
851     /* Take a thread from the run queue, if we have work */
852     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
853     IF_DEBUG(sanity,checkTSO(t));
854
855     /* ToDo: write something to the log-file
856     if (RTSflags.ParFlags.granSimStats && !sameThread)
857         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
858
859     CurrentTSO = t;
860     */
861     /* the spark pool for the current PE */
862     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
863
864     IF_DEBUG(scheduler, 
865              belch("--=^ %d threads, %d sparks on [%#x]", 
866                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
867
868 #if 1
869     if (0 && RtsFlags.ParFlags.ParStats.Full && 
870         t && LastTSO && t->id != LastTSO->id && 
871         LastTSO->why_blocked == NotBlocked && 
872         LastTSO->what_next != ThreadComplete) {
873       // if previously scheduled TSO not blocked we have to record the context switch
874       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
875                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
876     }
877
878     if (RtsFlags.ParFlags.ParStats.Full && 
879         (emitSchedule /* forced emit */ ||
880         (t && LastTSO && t->id != LastTSO->id))) {
881       /* 
882          we are running a different TSO, so write a schedule event to log file
883          NB: If we use fair scheduling we also have to write  a deschedule 
884              event for LastTSO; with unfair scheduling we know that the
885              previous tso has blocked whenever we switch to another tso, so
886              we don't need it in GUM for now
887       */
888       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
889                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
890       emitSchedule = rtsFalse;
891     }
892      
893 #endif
894 #else /* !GRAN && !PAR */
895   
896     /* grab a thread from the run queue
897      */
898     ASSERT(run_queue_hd != END_TSO_QUEUE);
899     t = POP_RUN_QUEUE();
900     IF_DEBUG(sanity,checkTSO(t));
901
902 #endif
903     
904     /* grab a capability
905      */
906 #ifdef SMP
907     cap = free_capabilities;
908     free_capabilities = cap->link;
909     n_free_capabilities--;
910 #else
911     cap = &MainRegTable;
912 #endif
913
914     cap->rCurrentTSO = t;
915     
916     /* context switches are now initiated by the timer signal, unless
917      * the user specified "context switch as often as possible", with
918      * +RTS -C0
919      */
920     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
921         && (run_queue_hd != END_TSO_QUEUE
922             || blocked_queue_hd != END_TSO_QUEUE
923             || sleeping_queue != END_TSO_QUEUE))
924         context_switch = 1;
925     else
926         context_switch = 0;
927
928     RELEASE_LOCK(&sched_mutex);
929
930     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
931                               t->id, t, whatNext_strs[t->what_next]));
932
933     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
934     /* Run the current thread 
935      */
936     switch (cap->rCurrentTSO->what_next) {
937     case ThreadKilled:
938     case ThreadComplete:
939         /* Thread already finished, return to scheduler. */
940         ret = ThreadFinished;
941         break;
942     case ThreadEnterGHC:
943         ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
944         break;
945     case ThreadRunGHC:
946         ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
947         break;
948     case ThreadEnterInterp:
949         ret = interpretBCO(cap);
950         break;
951     default:
952       barf("schedule: invalid what_next field");
953     }
954     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
955     
956     /* Costs for the scheduler are assigned to CCS_SYSTEM */
957 #ifdef PROFILING
958     CCCS = CCS_SYSTEM;
959 #endif
960     
961     ACQUIRE_LOCK(&sched_mutex);
962
963 #ifdef SMP
964     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
965 #elif !defined(GRAN) && !defined(PAR)
966     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
967 #endif
968     t = cap->rCurrentTSO;
969     
970 #if defined(PAR)
971     /* HACK 675: if the last thread didn't yield, make sure to print a 
972        SCHEDULE event to the log file when StgRunning the next thread, even
973        if it is the same one as before */
974     LastTSO = t; 
975     TimeOfLastYield = CURRENT_TIME;
976 #endif
977
978     switch (ret) {
979     case HeapOverflow:
980 #if defined(GRAN)
981       IF_DEBUG(gran, 
982                DumpGranEvent(GR_DESCHEDULE, t));
983       globalGranStats.tot_heapover++;
984 #elif defined(PAR)
985       // IF_DEBUG(par, 
986       //DumpGranEvent(GR_DESCHEDULE, t);
987       globalParStats.tot_heapover++;
988 #endif
989       /* make all the running tasks block on a condition variable,
990        * maybe set context_switch and wait till they all pile in,
991        * then have them wait on a GC condition variable.
992        */
993       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
994                                t->id, t, whatNext_strs[t->what_next]));
995       threadPaused(t);
996 #if defined(GRAN)
997       ASSERT(!is_on_queue(t,CurrentProc));
998 #elif defined(PAR)
999       /* Currently we emit a DESCHEDULE event before GC in GUM.
1000          ToDo: either add separate event to distinguish SYSTEM time from rest
1001                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1002       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1003         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1004                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1005         emitSchedule = rtsTrue;
1006       }
1007 #endif
1008       
1009       ready_to_gc = rtsTrue;
1010       context_switch = 1;               /* stop other threads ASAP */
1011       PUSH_ON_RUN_QUEUE(t);
1012       /* actual GC is done at the end of the while loop */
1013       break;
1014       
1015     case StackOverflow:
1016 #if defined(GRAN)
1017       IF_DEBUG(gran, 
1018                DumpGranEvent(GR_DESCHEDULE, t));
1019       globalGranStats.tot_stackover++;
1020 #elif defined(PAR)
1021       // IF_DEBUG(par, 
1022       // DumpGranEvent(GR_DESCHEDULE, t);
1023       globalParStats.tot_stackover++;
1024 #endif
1025       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1026                                t->id, t, whatNext_strs[t->what_next]));
1027       /* just adjust the stack for this thread, then pop it back
1028        * on the run queue.
1029        */
1030       threadPaused(t);
1031       { 
1032         StgMainThread *m;
1033         /* enlarge the stack */
1034         StgTSO *new_t = threadStackOverflow(t);
1035         
1036         /* This TSO has moved, so update any pointers to it from the
1037          * main thread stack.  It better not be on any other queues...
1038          * (it shouldn't be).
1039          */
1040         for (m = main_threads; m != NULL; m = m->link) {
1041           if (m->tso == t) {
1042             m->tso = new_t;
1043           }
1044         }
1045         threadPaused(new_t);
1046         PUSH_ON_RUN_QUEUE(new_t);
1047       }
1048       break;
1049
1050     case ThreadYielding:
1051 #if defined(GRAN)
1052       IF_DEBUG(gran, 
1053                DumpGranEvent(GR_DESCHEDULE, t));
1054       globalGranStats.tot_yields++;
1055 #elif defined(PAR)
1056       // IF_DEBUG(par, 
1057       // DumpGranEvent(GR_DESCHEDULE, t);
1058       globalParStats.tot_yields++;
1059 #endif
1060       /* put the thread back on the run queue.  Then, if we're ready to
1061        * GC, check whether this is the last task to stop.  If so, wake
1062        * up the GC thread.  getThread will block during a GC until the
1063        * GC is finished.
1064        */
1065       IF_DEBUG(scheduler,
1066                if (t->what_next == ThreadEnterInterp) {
1067                    /* ToDo: or maybe a timer expired when we were in Hugs?
1068                     * or maybe someone hit ctrl-C
1069                     */
1070                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1071                          t->id, t, whatNext_strs[t->what_next]);
1072                } else {
1073                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1074                          t->id, t, whatNext_strs[t->what_next]);
1075                }
1076                );
1077
1078       threadPaused(t);
1079
1080       IF_DEBUG(sanity,
1081                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1082                checkTSO(t));
1083       ASSERT(t->link == END_TSO_QUEUE);
1084 #if defined(GRAN)
1085       ASSERT(!is_on_queue(t,CurrentProc));
1086
1087       IF_DEBUG(sanity,
1088                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1089                checkThreadQsSanity(rtsTrue));
1090 #endif
1091 #if defined(PAR)
1092       if (RtsFlags.ParFlags.doFairScheduling) { 
1093         /* this does round-robin scheduling; good for concurrency */
1094         APPEND_TO_RUN_QUEUE(t);
1095       } else {
1096         /* this does unfair scheduling; good for parallelism */
1097         PUSH_ON_RUN_QUEUE(t);
1098       }
1099 #else
1100       /* this does round-robin scheduling; good for concurrency */
1101       APPEND_TO_RUN_QUEUE(t);
1102 #endif
1103 #if defined(GRAN)
1104       /* add a ContinueThread event to actually process the thread */
1105       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1106                 ContinueThread,
1107                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1108       IF_GRAN_DEBUG(bq, 
1109                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1110                G_EVENTQ(0);
1111                G_CURR_THREADQ(0));
1112 #endif /* GRAN */
1113       break;
1114       
1115     case ThreadBlocked:
1116 #if defined(GRAN)
1117       IF_DEBUG(scheduler,
1118                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1119                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1120                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1121
1122       // ??? needed; should emit block before
1123       IF_DEBUG(gran, 
1124                DumpGranEvent(GR_DESCHEDULE, t)); 
1125       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1126       /*
1127         ngoq Dogh!
1128       ASSERT(procStatus[CurrentProc]==Busy || 
1129               ((procStatus[CurrentProc]==Fetching) && 
1130               (t->block_info.closure!=(StgClosure*)NULL)));
1131       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1132           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1133             procStatus[CurrentProc]==Fetching)) 
1134         procStatus[CurrentProc] = Idle;
1135       */
1136 #elif defined(PAR)
1137       IF_DEBUG(scheduler,
1138                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1139                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1140       IF_PAR_DEBUG(bq,
1141
1142                    if (t->block_info.closure!=(StgClosure*)NULL) 
1143                      print_bq(t->block_info.closure));
1144
1145       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1146       blockThread(t);
1147
1148       /* whatever we schedule next, we must log that schedule */
1149       emitSchedule = rtsTrue;
1150
1151 #else /* !GRAN */
1152       /* don't need to do anything.  Either the thread is blocked on
1153        * I/O, in which case we'll have called addToBlockedQueue
1154        * previously, or it's blocked on an MVar or Blackhole, in which
1155        * case it'll be on the relevant queue already.
1156        */
1157       IF_DEBUG(scheduler,
1158                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1159                printThreadBlockage(t);
1160                fprintf(stderr, "\n"));
1161
1162       /* Only for dumping event to log file 
1163          ToDo: do I need this in GranSim, too?
1164       blockThread(t);
1165       */
1166 #endif
1167       threadPaused(t);
1168       break;
1169       
1170     case ThreadFinished:
1171       /* Need to check whether this was a main thread, and if so, signal
1172        * the task that started it with the return value.  If we have no
1173        * more main threads, we probably need to stop all the tasks until
1174        * we get a new one.
1175        */
1176       /* We also end up here if the thread kills itself with an
1177        * uncaught exception, see Exception.hc.
1178        */
1179       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1180 #if defined(GRAN)
1181       endThread(t, CurrentProc); // clean-up the thread
1182 #elif defined(PAR)
1183       /* For now all are advisory -- HWL */
1184       //if(t->priority==AdvisoryPriority) ??
1185       advisory_thread_count--;
1186       
1187 # ifdef DIST
1188       if(t->dist.priority==RevalPriority)
1189         FinishReval(t);
1190 # endif
1191       
1192       if (RtsFlags.ParFlags.ParStats.Full &&
1193           !RtsFlags.ParFlags.ParStats.Suppressed) 
1194         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1195 #endif
1196       break;
1197       
1198     default:
1199       barf("schedule: invalid thread return code %d", (int)ret);
1200     }
1201     
1202 #ifdef SMP
1203     cap->link = free_capabilities;
1204     free_capabilities = cap;
1205     n_free_capabilities++;
1206 #endif
1207
1208 #ifdef SMP
1209     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1210 #else
1211     if (ready_to_gc) 
1212 #endif
1213       {
1214       /* everybody back, start the GC.
1215        * Could do it in this thread, or signal a condition var
1216        * to do it in another thread.  Either way, we need to
1217        * broadcast on gc_pending_cond afterward.
1218        */
1219 #ifdef SMP
1220       IF_DEBUG(scheduler,sched_belch("doing GC"));
1221 #endif
1222       GarbageCollect(GetRoots,rtsFalse);
1223       ready_to_gc = rtsFalse;
1224 #ifdef SMP
1225       pthread_cond_broadcast(&gc_pending_cond);
1226 #endif
1227 #if defined(GRAN)
1228       /* add a ContinueThread event to continue execution of current thread */
1229       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1230                 ContinueThread,
1231                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1232       IF_GRAN_DEBUG(bq, 
1233                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1234                G_EVENTQ(0);
1235                G_CURR_THREADQ(0));
1236 #endif /* GRAN */
1237     }
1238 #if defined(GRAN)
1239   next_thread:
1240     IF_GRAN_DEBUG(unused,
1241                   print_eventq(EventHd));
1242
1243     event = get_next_event();
1244
1245 #elif defined(PAR)
1246   next_thread:
1247     /* ToDo: wait for next message to arrive rather than busy wait */
1248
1249 #else /* GRAN */
1250   /* not any more
1251   next_thread:
1252     t = take_off_run_queue(END_TSO_QUEUE);
1253   */
1254 #endif /* GRAN */
1255   } /* end of while(1) */
1256   IF_PAR_DEBUG(verbose,
1257                belch("== Leaving schedule() after having received Finish"));
1258 }
1259
1260 /* ---------------------------------------------------------------------------
1261  * deleteAllThreads():  kill all the live threads.
1262  *
1263  * This is used when we catch a user interrupt (^C), before performing
1264  * any necessary cleanups and running finalizers.
1265  * ------------------------------------------------------------------------- */
1266    
1267 void deleteAllThreads ( void )
1268 {
1269   StgTSO* t;
1270   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1271   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1272       deleteThread(t);
1273   }
1274   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1275       deleteThread(t);
1276   }
1277   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1278       deleteThread(t);
1279   }
1280   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1281   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1282   sleeping_queue = END_TSO_QUEUE;
1283 }
1284
1285 /* startThread and  insertThread are now in GranSim.c -- HWL */
1286
1287 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1288 //@subsection Suspend and Resume
1289
1290 /* ---------------------------------------------------------------------------
1291  * Suspending & resuming Haskell threads.
1292  * 
1293  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1294  * its capability before calling the C function.  This allows another
1295  * task to pick up the capability and carry on running Haskell
1296  * threads.  It also means that if the C call blocks, it won't lock
1297  * the whole system.
1298  *
1299  * The Haskell thread making the C call is put to sleep for the
1300  * duration of the call, on the susepended_ccalling_threads queue.  We
1301  * give out a token to the task, which it can use to resume the thread
1302  * on return from the C function.
1303  * ------------------------------------------------------------------------- */
1304    
1305 StgInt
1306 suspendThread( Capability *cap )
1307 {
1308   nat tok;
1309
1310   ACQUIRE_LOCK(&sched_mutex);
1311
1312   IF_DEBUG(scheduler,
1313            sched_belch("thread %d did a _ccall_gc", cap->rCurrentTSO->id));
1314
1315   threadPaused(cap->rCurrentTSO);
1316   cap->rCurrentTSO->link = suspended_ccalling_threads;
1317   suspended_ccalling_threads = cap->rCurrentTSO;
1318
1319   /* Use the thread ID as the token; it should be unique */
1320   tok = cap->rCurrentTSO->id;
1321
1322 #ifdef SMP
1323   cap->link = free_capabilities;
1324   free_capabilities = cap;
1325   n_free_capabilities++;
1326 #endif
1327
1328   RELEASE_LOCK(&sched_mutex);
1329   return tok; 
1330 }
1331
1332 Capability *
1333 resumeThread( StgInt tok )
1334 {
1335   StgTSO *tso, **prev;
1336   Capability *cap;
1337
1338   ACQUIRE_LOCK(&sched_mutex);
1339
1340   prev = &suspended_ccalling_threads;
1341   for (tso = suspended_ccalling_threads; 
1342        tso != END_TSO_QUEUE; 
1343        prev = &tso->link, tso = tso->link) {
1344     if (tso->id == (StgThreadID)tok) {
1345       *prev = tso->link;
1346       break;
1347     }
1348   }
1349   if (tso == END_TSO_QUEUE) {
1350     barf("resumeThread: thread not found");
1351   }
1352   tso->link = END_TSO_QUEUE;
1353
1354 #ifdef SMP
1355   while (free_capabilities == NULL) {
1356     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1357     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1358     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1359   }
1360   cap = free_capabilities;
1361   free_capabilities = cap->link;
1362   n_free_capabilities--;
1363 #else  
1364   cap = &MainRegTable;
1365 #endif
1366
1367   cap->rCurrentTSO = tso;
1368
1369   RELEASE_LOCK(&sched_mutex);
1370   return cap;
1371 }
1372
1373
1374 /* ---------------------------------------------------------------------------
1375  * Static functions
1376  * ------------------------------------------------------------------------ */
1377 static void unblockThread(StgTSO *tso);
1378
1379 /* ---------------------------------------------------------------------------
1380  * Comparing Thread ids.
1381  *
1382  * This is used from STG land in the implementation of the
1383  * instances of Eq/Ord for ThreadIds.
1384  * ------------------------------------------------------------------------ */
1385
1386 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1387
1388   StgThreadID id1 = tso1->id; 
1389   StgThreadID id2 = tso2->id;
1390  
1391   if (id1 < id2) return (-1);
1392   if (id1 > id2) return 1;
1393   return 0;
1394 }
1395
1396 /* ---------------------------------------------------------------------------
1397    Create a new thread.
1398
1399    The new thread starts with the given stack size.  Before the
1400    scheduler can run, however, this thread needs to have a closure
1401    (and possibly some arguments) pushed on its stack.  See
1402    pushClosure() in Schedule.h.
1403
1404    createGenThread() and createIOThread() (in SchedAPI.h) are
1405    convenient packaged versions of this function.
1406
1407    currently pri (priority) is only used in a GRAN setup -- HWL
1408    ------------------------------------------------------------------------ */
1409 //@cindex createThread
1410 #if defined(GRAN)
1411 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1412 StgTSO *
1413 createThread(nat stack_size, StgInt pri)
1414 {
1415   return createThread_(stack_size, rtsFalse, pri);
1416 }
1417
1418 static StgTSO *
1419 createThread_(nat size, rtsBool have_lock, StgInt pri)
1420 {
1421 #else
1422 StgTSO *
1423 createThread(nat stack_size)
1424 {
1425   return createThread_(stack_size, rtsFalse);
1426 }
1427
1428 static StgTSO *
1429 createThread_(nat size, rtsBool have_lock)
1430 {
1431 #endif
1432
1433     StgTSO *tso;
1434     nat stack_size;
1435
1436     /* First check whether we should create a thread at all */
1437 #if defined(PAR)
1438   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1439   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1440     threadsIgnored++;
1441     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1442           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1443     return END_TSO_QUEUE;
1444   }
1445   threadsCreated++;
1446 #endif
1447
1448 #if defined(GRAN)
1449   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1450 #endif
1451
1452   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1453
1454   /* catch ridiculously small stack sizes */
1455   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1456     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1457   }
1458
1459   stack_size = size - TSO_STRUCT_SIZEW;
1460
1461   tso = (StgTSO *)allocate(size);
1462   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1463
1464   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1465 #if defined(GRAN)
1466   SET_GRAN_HDR(tso, ThisPE);
1467 #endif
1468   tso->what_next     = ThreadEnterGHC;
1469
1470   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1471    * protect the increment operation on next_thread_id.
1472    * In future, we could use an atomic increment instead.
1473    */
1474   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1475   tso->id = next_thread_id++; 
1476   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1477
1478   tso->why_blocked  = NotBlocked;
1479   tso->blocked_exceptions = NULL;
1480
1481   tso->stack_size   = stack_size;
1482   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1483                               - TSO_STRUCT_SIZEW;
1484   tso->sp           = (P_)&(tso->stack) + stack_size;
1485
1486 #ifdef PROFILING
1487   tso->prof.CCCS = CCS_MAIN;
1488 #endif
1489
1490   /* put a stop frame on the stack */
1491   tso->sp -= sizeofW(StgStopFrame);
1492   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1493   tso->su = (StgUpdateFrame*)tso->sp;
1494
1495   // ToDo: check this
1496 #if defined(GRAN)
1497   tso->link = END_TSO_QUEUE;
1498   /* uses more flexible routine in GranSim */
1499   insertThread(tso, CurrentProc);
1500 #else
1501   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1502    * from its creation
1503    */
1504 #endif
1505
1506 #if defined(GRAN) 
1507   if (RtsFlags.GranFlags.GranSimStats.Full) 
1508     DumpGranEvent(GR_START,tso);
1509 #elif defined(PAR)
1510   if (RtsFlags.ParFlags.ParStats.Full) 
1511     DumpGranEvent(GR_STARTQ,tso);
1512   /* HACk to avoid SCHEDULE 
1513      LastTSO = tso; */
1514 #endif
1515
1516   /* Link the new thread on the global thread list.
1517    */
1518   tso->global_link = all_threads;
1519   all_threads = tso;
1520
1521 #if defined(DIST)
1522   tso->dist.priority = MandatoryPriority; //by default that is...
1523 #endif
1524
1525 #if defined(GRAN)
1526   tso->gran.pri = pri;
1527 # if defined(DEBUG)
1528   tso->gran.magic = TSO_MAGIC; // debugging only
1529 # endif
1530   tso->gran.sparkname   = 0;
1531   tso->gran.startedat   = CURRENT_TIME; 
1532   tso->gran.exported    = 0;
1533   tso->gran.basicblocks = 0;
1534   tso->gran.allocs      = 0;
1535   tso->gran.exectime    = 0;
1536   tso->gran.fetchtime   = 0;
1537   tso->gran.fetchcount  = 0;
1538   tso->gran.blocktime   = 0;
1539   tso->gran.blockcount  = 0;
1540   tso->gran.blockedat   = 0;
1541   tso->gran.globalsparks = 0;
1542   tso->gran.localsparks  = 0;
1543   if (RtsFlags.GranFlags.Light)
1544     tso->gran.clock  = Now; /* local clock */
1545   else
1546     tso->gran.clock  = 0;
1547
1548   IF_DEBUG(gran,printTSO(tso));
1549 #elif defined(PAR)
1550 # if defined(DEBUG)
1551   tso->par.magic = TSO_MAGIC; // debugging only
1552 # endif
1553   tso->par.sparkname   = 0;
1554   tso->par.startedat   = CURRENT_TIME; 
1555   tso->par.exported    = 0;
1556   tso->par.basicblocks = 0;
1557   tso->par.allocs      = 0;
1558   tso->par.exectime    = 0;
1559   tso->par.fetchtime   = 0;
1560   tso->par.fetchcount  = 0;
1561   tso->par.blocktime   = 0;
1562   tso->par.blockcount  = 0;
1563   tso->par.blockedat   = 0;
1564   tso->par.globalsparks = 0;
1565   tso->par.localsparks  = 0;
1566 #endif
1567
1568 #if defined(GRAN)
1569   globalGranStats.tot_threads_created++;
1570   globalGranStats.threads_created_on_PE[CurrentProc]++;
1571   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1572   globalGranStats.tot_sq_probes++;
1573 #elif defined(PAR)
1574   // collect parallel global statistics (currently done together with GC stats)
1575   if (RtsFlags.ParFlags.ParStats.Global &&
1576       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1577     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1578     globalParStats.tot_threads_created++;
1579   }
1580 #endif 
1581
1582 #if defined(GRAN)
1583   IF_GRAN_DEBUG(pri,
1584                 belch("==__ schedule: Created TSO %d (%p);",
1585                       CurrentProc, tso, tso->id));
1586 #elif defined(PAR)
1587     IF_PAR_DEBUG(verbose,
1588                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1589                        tso->id, tso, advisory_thread_count));
1590 #else
1591   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1592                                  tso->id, tso->stack_size));
1593 #endif    
1594   return tso;
1595 }
1596
1597 #if defined(PAR)
1598 /* RFP:
1599    all parallel thread creation calls should fall through the following routine.
1600 */
1601 StgTSO *
1602 createSparkThread(rtsSpark spark) 
1603 { StgTSO *tso;
1604   ASSERT(spark != (rtsSpark)NULL);
1605   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1606   { threadsIgnored++;
1607     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1608           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1609     return END_TSO_QUEUE;
1610   }
1611   else
1612   { threadsCreated++;
1613     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1614     if (tso==END_TSO_QUEUE)     
1615       barf("createSparkThread: Cannot create TSO");
1616 #if defined(DIST)
1617     tso->priority = AdvisoryPriority;
1618 #endif
1619     pushClosure(tso,spark);
1620     PUSH_ON_RUN_QUEUE(tso);
1621     advisory_thread_count++;    
1622   }
1623   return tso;
1624 }
1625 #endif
1626
1627 /*
1628   Turn a spark into a thread.
1629   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1630 */
1631 #if defined(PAR)
1632 //@cindex activateSpark
1633 StgTSO *
1634 activateSpark (rtsSpark spark) 
1635 {
1636   StgTSO *tso;
1637
1638   tso = createSparkThread(spark);
1639   if (RtsFlags.ParFlags.ParStats.Full) {   
1640     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1641     IF_PAR_DEBUG(verbose,
1642                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1643                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1644   }
1645   // ToDo: fwd info on local/global spark to thread -- HWL
1646   // tso->gran.exported =  spark->exported;
1647   // tso->gran.locked =   !spark->global;
1648   // tso->gran.sparkname = spark->name;
1649
1650   return tso;
1651 }
1652 #endif
1653
1654 /* ---------------------------------------------------------------------------
1655  * scheduleThread()
1656  *
1657  * scheduleThread puts a thread on the head of the runnable queue.
1658  * This will usually be done immediately after a thread is created.
1659  * The caller of scheduleThread must create the thread using e.g.
1660  * createThread and push an appropriate closure
1661  * on this thread's stack before the scheduler is invoked.
1662  * ------------------------------------------------------------------------ */
1663
1664 void
1665 scheduleThread(StgTSO *tso)
1666 {
1667   if (tso==END_TSO_QUEUE){    
1668     schedule();
1669     return;
1670   }
1671
1672   ACQUIRE_LOCK(&sched_mutex);
1673
1674   /* Put the new thread on the head of the runnable queue.  The caller
1675    * better push an appropriate closure on this thread's stack
1676    * beforehand.  In the SMP case, the thread may start running as
1677    * soon as we release the scheduler lock below.
1678    */
1679   PUSH_ON_RUN_QUEUE(tso);
1680   THREAD_RUNNABLE();
1681
1682 #if 0
1683   IF_DEBUG(scheduler,printTSO(tso));
1684 #endif
1685   RELEASE_LOCK(&sched_mutex);
1686 }
1687
1688 /* ---------------------------------------------------------------------------
1689  * startTasks()
1690  *
1691  * Start up Posix threads to run each of the scheduler tasks.
1692  * I believe the task ids are not needed in the system as defined.
1693  *  KH @ 25/10/99
1694  * ------------------------------------------------------------------------ */
1695
1696 #if defined(PAR) || defined(SMP)
1697 void
1698 taskStart(void) /*  ( void *arg STG_UNUSED)  */
1699 {
1700   scheduleThread(END_TSO_QUEUE);
1701 }
1702 #endif
1703
1704 /* ---------------------------------------------------------------------------
1705  * initScheduler()
1706  *
1707  * Initialise the scheduler.  This resets all the queues - if the
1708  * queues contained any threads, they'll be garbage collected at the
1709  * next pass.
1710  *
1711  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1712  * ------------------------------------------------------------------------ */
1713
1714 #ifdef SMP
1715 static void
1716 term_handler(int sig STG_UNUSED)
1717 {
1718   stat_workerStop();
1719   ACQUIRE_LOCK(&term_mutex);
1720   await_death--;
1721   RELEASE_LOCK(&term_mutex);
1722   pthread_exit(NULL);
1723 }
1724 #endif
1725
1726 //@cindex initScheduler
1727 void 
1728 initScheduler(void)
1729 {
1730 #if defined(GRAN)
1731   nat i;
1732
1733   for (i=0; i<=MAX_PROC; i++) {
1734     run_queue_hds[i]      = END_TSO_QUEUE;
1735     run_queue_tls[i]      = END_TSO_QUEUE;
1736     blocked_queue_hds[i]  = END_TSO_QUEUE;
1737     blocked_queue_tls[i]  = END_TSO_QUEUE;
1738     ccalling_threadss[i]  = END_TSO_QUEUE;
1739     sleeping_queue        = END_TSO_QUEUE;
1740   }
1741 #else
1742   run_queue_hd      = END_TSO_QUEUE;
1743   run_queue_tl      = END_TSO_QUEUE;
1744   blocked_queue_hd  = END_TSO_QUEUE;
1745   blocked_queue_tl  = END_TSO_QUEUE;
1746   sleeping_queue    = END_TSO_QUEUE;
1747 #endif 
1748
1749   suspended_ccalling_threads  = END_TSO_QUEUE;
1750
1751   main_threads = NULL;
1752   all_threads  = END_TSO_QUEUE;
1753
1754   context_switch = 0;
1755   interrupted    = 0;
1756
1757   RtsFlags.ConcFlags.ctxtSwitchTicks =
1758       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1759
1760   /* Install the SIGHUP handler */
1761 #ifdef SMP
1762   {
1763     struct sigaction action,oact;
1764
1765     action.sa_handler = term_handler;
1766     sigemptyset(&action.sa_mask);
1767     action.sa_flags = 0;
1768     if (sigaction(SIGTERM, &action, &oact) != 0) {
1769       barf("can't install TERM handler");
1770     }
1771   }
1772 #endif
1773
1774 #ifdef SMP
1775   /* Allocate N Capabilities */
1776   {
1777     nat i;
1778     Capability *cap, *prev;
1779     cap  = NULL;
1780     prev = NULL;
1781     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1782       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1783       cap->link = prev;
1784       prev = cap;
1785     }
1786     free_capabilities = cap;
1787     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1788   }
1789   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1790                              n_free_capabilities););
1791 #endif
1792
1793 #if defined(SMP) || defined(PAR)
1794   initSparkPools();
1795 #endif
1796 }
1797
1798 #ifdef SMP
1799 void
1800 startTasks( void )
1801 {
1802   nat i;
1803   int r;
1804   pthread_t tid;
1805   
1806   /* make some space for saving all the thread ids */
1807   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1808                             "initScheduler:task_ids");
1809   
1810   /* and create all the threads */
1811   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1812     r = pthread_create(&tid,NULL,taskStart,NULL);
1813     if (r != 0) {
1814       barf("startTasks: Can't create new Posix thread");
1815     }
1816     task_ids[i].id = tid;
1817     task_ids[i].mut_time = 0.0;
1818     task_ids[i].mut_etime = 0.0;
1819     task_ids[i].gc_time = 0.0;
1820     task_ids[i].gc_etime = 0.0;
1821     task_ids[i].elapsedtimestart = elapsedtime();
1822     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1823   }
1824 }
1825 #endif
1826
1827 void
1828 exitScheduler( void )
1829 {
1830 #ifdef SMP
1831   nat i;
1832
1833   /* Don't want to use pthread_cancel, since we'd have to install
1834    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1835    * all our locks.
1836    */
1837 #if 0
1838   /* Cancel all our tasks */
1839   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1840     pthread_cancel(task_ids[i].id);
1841   }
1842   
1843   /* Wait for all the tasks to terminate */
1844   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1845     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1846                                task_ids[i].id));
1847     pthread_join(task_ids[i].id, NULL);
1848   }
1849 #endif
1850
1851   /* Send 'em all a SIGHUP.  That should shut 'em up.
1852    */
1853   await_death = RtsFlags.ParFlags.nNodes;
1854   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1855     pthread_kill(task_ids[i].id,SIGTERM);
1856   }
1857   while (await_death > 0) {
1858     sched_yield();
1859   }
1860 #endif
1861 }
1862
1863 /* -----------------------------------------------------------------------------
1864    Managing the per-task allocation areas.
1865    
1866    Each capability comes with an allocation area.  These are
1867    fixed-length block lists into which allocation can be done.
1868
1869    ToDo: no support for two-space collection at the moment???
1870    -------------------------------------------------------------------------- */
1871
1872 /* -----------------------------------------------------------------------------
1873  * waitThread is the external interface for running a new computation
1874  * and waiting for the result.
1875  *
1876  * In the non-SMP case, we create a new main thread, push it on the 
1877  * main-thread stack, and invoke the scheduler to run it.  The
1878  * scheduler will return when the top main thread on the stack has
1879  * completed or died, and fill in the necessary fields of the
1880  * main_thread structure.
1881  *
1882  * In the SMP case, we create a main thread as before, but we then
1883  * create a new condition variable and sleep on it.  When our new
1884  * main thread has completed, we'll be woken up and the status/result
1885  * will be in the main_thread struct.
1886  * -------------------------------------------------------------------------- */
1887
1888 int 
1889 howManyThreadsAvail ( void )
1890 {
1891    int i = 0;
1892    StgTSO* q;
1893    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1894       i++;
1895    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1896       i++;
1897    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
1898       i++;
1899    return i;
1900 }
1901
1902 void
1903 finishAllThreads ( void )
1904 {
1905    do {
1906       while (run_queue_hd != END_TSO_QUEUE) {
1907          waitThread ( run_queue_hd, NULL );
1908       }
1909       while (blocked_queue_hd != END_TSO_QUEUE) {
1910          waitThread ( blocked_queue_hd, NULL );
1911       }
1912       while (sleeping_queue != END_TSO_QUEUE) {
1913          waitThread ( blocked_queue_hd, NULL );
1914       }
1915    } while 
1916       (blocked_queue_hd != END_TSO_QUEUE || 
1917        run_queue_hd     != END_TSO_QUEUE ||
1918        sleeping_queue   != END_TSO_QUEUE);
1919 }
1920
1921 SchedulerStatus
1922 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1923 {
1924   StgMainThread *m;
1925   SchedulerStatus stat;
1926
1927   ACQUIRE_LOCK(&sched_mutex);
1928   
1929   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1930
1931   m->tso = tso;
1932   m->ret = ret;
1933   m->stat = NoStatus;
1934 #ifdef SMP
1935   pthread_cond_init(&m->wakeup, NULL);
1936 #endif
1937
1938   m->link = main_threads;
1939   main_threads = m;
1940
1941   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
1942                               m->tso->id));
1943
1944 #ifdef SMP
1945   do {
1946     pthread_cond_wait(&m->wakeup, &sched_mutex);
1947   } while (m->stat == NoStatus);
1948 #elif defined(GRAN)
1949   /* GranSim specific init */
1950   CurrentTSO = m->tso;                // the TSO to run
1951   procStatus[MainProc] = Busy;        // status of main PE
1952   CurrentProc = MainProc;             // PE to run it on
1953
1954   schedule();
1955 #else
1956   schedule();
1957   ASSERT(m->stat != NoStatus);
1958 #endif
1959
1960   stat = m->stat;
1961
1962 #ifdef SMP
1963   pthread_cond_destroy(&m->wakeup);
1964 #endif
1965
1966   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
1967                               m->tso->id));
1968   free(m);
1969
1970   RELEASE_LOCK(&sched_mutex);
1971
1972   return stat;
1973 }
1974
1975 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1976 //@subsection Run queue code 
1977
1978 #if 0
1979 /* 
1980    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1981        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1982        implicit global variable that has to be correct when calling these
1983        fcts -- HWL 
1984 */
1985
1986 /* Put the new thread on the head of the runnable queue.
1987  * The caller of createThread better push an appropriate closure
1988  * on this thread's stack before the scheduler is invoked.
1989  */
1990 static /* inline */ void
1991 add_to_run_queue(tso)
1992 StgTSO* tso; 
1993 {
1994   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1995   tso->link = run_queue_hd;
1996   run_queue_hd = tso;
1997   if (run_queue_tl == END_TSO_QUEUE) {
1998     run_queue_tl = tso;
1999   }
2000 }
2001
2002 /* Put the new thread at the end of the runnable queue. */
2003 static /* inline */ void
2004 push_on_run_queue(tso)
2005 StgTSO* tso; 
2006 {
2007   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2008   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2009   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2010   if (run_queue_hd == END_TSO_QUEUE) {
2011     run_queue_hd = tso;
2012   } else {
2013     run_queue_tl->link = tso;
2014   }
2015   run_queue_tl = tso;
2016 }
2017
2018 /* 
2019    Should be inlined because it's used very often in schedule.  The tso
2020    argument is actually only needed in GranSim, where we want to have the
2021    possibility to schedule *any* TSO on the run queue, irrespective of the
2022    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2023    the run queue and dequeue the tso, adjusting the links in the queue. 
2024 */
2025 //@cindex take_off_run_queue
2026 static /* inline */ StgTSO*
2027 take_off_run_queue(StgTSO *tso) {
2028   StgTSO *t, *prev;
2029
2030   /* 
2031      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2032
2033      if tso is specified, unlink that tso from the run_queue (doesn't have
2034      to be at the beginning of the queue); GranSim only 
2035   */
2036   if (tso!=END_TSO_QUEUE) {
2037     /* find tso in queue */
2038     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2039          t!=END_TSO_QUEUE && t!=tso;
2040          prev=t, t=t->link) 
2041       /* nothing */ ;
2042     ASSERT(t==tso);
2043     /* now actually dequeue the tso */
2044     if (prev!=END_TSO_QUEUE) {
2045       ASSERT(run_queue_hd!=t);
2046       prev->link = t->link;
2047     } else {
2048       /* t is at beginning of thread queue */
2049       ASSERT(run_queue_hd==t);
2050       run_queue_hd = t->link;
2051     }
2052     /* t is at end of thread queue */
2053     if (t->link==END_TSO_QUEUE) {
2054       ASSERT(t==run_queue_tl);
2055       run_queue_tl = prev;
2056     } else {
2057       ASSERT(run_queue_tl!=t);
2058     }
2059     t->link = END_TSO_QUEUE;
2060   } else {
2061     /* take tso from the beginning of the queue; std concurrent code */
2062     t = run_queue_hd;
2063     if (t != END_TSO_QUEUE) {
2064       run_queue_hd = t->link;
2065       t->link = END_TSO_QUEUE;
2066       if (run_queue_hd == END_TSO_QUEUE) {
2067         run_queue_tl = END_TSO_QUEUE;
2068       }
2069     }
2070   }
2071   return t;
2072 }
2073
2074 #endif /* 0 */
2075
2076 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2077 //@subsection Garbage Collextion Routines
2078
2079 /* ---------------------------------------------------------------------------
2080    Where are the roots that we know about?
2081
2082         - all the threads on the runnable queue
2083         - all the threads on the blocked queue
2084         - all the threads on the sleeping queue
2085         - all the thread currently executing a _ccall_GC
2086         - all the "main threads"
2087      
2088    ------------------------------------------------------------------------ */
2089
2090 /* This has to be protected either by the scheduler monitor, or by the
2091         garbage collection monitor (probably the latter).
2092         KH @ 25/10/99
2093 */
2094
2095 static void GetRoots(evac_fn evac)
2096 {
2097   StgMainThread *m;
2098
2099 #if defined(GRAN)
2100   {
2101     nat i;
2102     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2103       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2104           evac((StgClosure **)&run_queue_hds[i]);
2105       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2106           evac((StgClosure **)&run_queue_tls[i]);
2107       
2108       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2109           evac((StgClosure **)&blocked_queue_hds[i]);
2110       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2111           evac((StgClosure **)&blocked_queue_tls[i]);
2112       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2113           evac((StgClosure **)&ccalling_threads[i]);
2114     }
2115   }
2116
2117   markEventQueue();
2118
2119 #else /* !GRAN */
2120   if (run_queue_hd != END_TSO_QUEUE) {
2121       ASSERT(run_queue_tl != END_TSO_QUEUE);
2122       evac((StgClosure **)&run_queue_hd);
2123       evac((StgClosure **)&run_queue_tl);
2124   }
2125   
2126   if (blocked_queue_hd != END_TSO_QUEUE) {
2127       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2128       evac((StgClosure **)&blocked_queue_hd);
2129       evac((StgClosure **)&blocked_queue_tl);
2130   }
2131   
2132   if (sleeping_queue != END_TSO_QUEUE) {
2133       evac((StgClosure **)&sleeping_queue);
2134   }
2135 #endif 
2136
2137   for (m = main_threads; m != NULL; m = m->link) {
2138       evac((StgClosure **)&m->tso);
2139   }
2140   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2141       evac((StgClosure **)&suspended_ccalling_threads);
2142   }
2143
2144 #if defined(SMP) || defined(PAR) || defined(GRAN)
2145   markSparkQueue(evac);
2146 #endif
2147 }
2148
2149 /* -----------------------------------------------------------------------------
2150    performGC
2151
2152    This is the interface to the garbage collector from Haskell land.
2153    We provide this so that external C code can allocate and garbage
2154    collect when called from Haskell via _ccall_GC.
2155
2156    It might be useful to provide an interface whereby the programmer
2157    can specify more roots (ToDo).
2158    
2159    This needs to be protected by the GC condition variable above.  KH.
2160    -------------------------------------------------------------------------- */
2161
2162 void (*extra_roots)(evac_fn);
2163
2164 void
2165 performGC(void)
2166 {
2167   GarbageCollect(GetRoots,rtsFalse);
2168 }
2169
2170 void
2171 performMajorGC(void)
2172 {
2173   GarbageCollect(GetRoots,rtsTrue);
2174 }
2175
2176 static void
2177 AllRoots(evac_fn evac)
2178 {
2179     GetRoots(evac);             // the scheduler's roots
2180     extra_roots(evac);          // the user's roots
2181 }
2182
2183 void
2184 performGCWithRoots(void (*get_roots)(evac_fn))
2185 {
2186   extra_roots = get_roots;
2187   GarbageCollect(AllRoots,rtsFalse);
2188 }
2189
2190 /* -----------------------------------------------------------------------------
2191    Stack overflow
2192
2193    If the thread has reached its maximum stack size, then raise the
2194    StackOverflow exception in the offending thread.  Otherwise
2195    relocate the TSO into a larger chunk of memory and adjust its stack
2196    size appropriately.
2197    -------------------------------------------------------------------------- */
2198
2199 static StgTSO *
2200 threadStackOverflow(StgTSO *tso)
2201 {
2202   nat new_stack_size, new_tso_size, diff, stack_words;
2203   StgPtr new_sp;
2204   StgTSO *dest;
2205
2206   IF_DEBUG(sanity,checkTSO(tso));
2207   if (tso->stack_size >= tso->max_stack_size) {
2208
2209     IF_DEBUG(gc,
2210              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2211                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2212              /* If we're debugging, just print out the top of the stack */
2213              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2214                                               tso->sp+64)));
2215
2216     /* Send this thread the StackOverflow exception */
2217     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2218     return tso;
2219   }
2220
2221   /* Try to double the current stack size.  If that takes us over the
2222    * maximum stack size for this thread, then use the maximum instead.
2223    * Finally round up so the TSO ends up as a whole number of blocks.
2224    */
2225   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2226   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2227                                        TSO_STRUCT_SIZE)/sizeof(W_);
2228   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2229   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2230
2231   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2232
2233   dest = (StgTSO *)allocate(new_tso_size);
2234   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2235
2236   /* copy the TSO block and the old stack into the new area */
2237   memcpy(dest,tso,TSO_STRUCT_SIZE);
2238   stack_words = tso->stack + tso->stack_size - tso->sp;
2239   new_sp = (P_)dest + new_tso_size - stack_words;
2240   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2241
2242   /* relocate the stack pointers... */
2243   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2244   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2245   dest->sp    = new_sp;
2246   dest->stack_size = new_stack_size;
2247         
2248   /* and relocate the update frame list */
2249   relocate_stack(dest, diff);
2250
2251   /* Mark the old TSO as relocated.  We have to check for relocated
2252    * TSOs in the garbage collector and any primops that deal with TSOs.
2253    *
2254    * It's important to set the sp and su values to just beyond the end
2255    * of the stack, so we don't attempt to scavenge any part of the
2256    * dead TSO's stack.
2257    */
2258   tso->what_next = ThreadRelocated;
2259   tso->link = dest;
2260   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2261   tso->su = (StgUpdateFrame *)tso->sp;
2262   tso->why_blocked = NotBlocked;
2263   dest->mut_link = NULL;
2264
2265   IF_PAR_DEBUG(verbose,
2266                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2267                      tso->id, tso, tso->stack_size);
2268                /* If we're debugging, just print out the top of the stack */
2269                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2270                                                 tso->sp+64)));
2271   
2272   IF_DEBUG(sanity,checkTSO(tso));
2273 #if 0
2274   IF_DEBUG(scheduler,printTSO(dest));
2275 #endif
2276
2277   return dest;
2278 }
2279
2280 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2281 //@subsection Blocking Queue Routines
2282
2283 /* ---------------------------------------------------------------------------
2284    Wake up a queue that was blocked on some resource.
2285    ------------------------------------------------------------------------ */
2286
2287 #if defined(GRAN)
2288 static inline void
2289 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2290 {
2291 }
2292 #elif defined(PAR)
2293 static inline void
2294 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2295 {
2296   /* write RESUME events to log file and
2297      update blocked and fetch time (depending on type of the orig closure) */
2298   if (RtsFlags.ParFlags.ParStats.Full) {
2299     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2300                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2301                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2302     if (EMPTY_RUN_QUEUE())
2303       emitSchedule = rtsTrue;
2304
2305     switch (get_itbl(node)->type) {
2306         case FETCH_ME_BQ:
2307           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2308           break;
2309         case RBH:
2310         case FETCH_ME:
2311         case BLACKHOLE_BQ:
2312           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2313           break;
2314 #ifdef DIST
2315         case MVAR:
2316           break;
2317 #endif    
2318         default:
2319           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2320         }
2321       }
2322 }
2323 #endif
2324
2325 #if defined(GRAN)
2326 static StgBlockingQueueElement *
2327 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2328 {
2329     StgTSO *tso;
2330     PEs node_loc, tso_loc;
2331
2332     node_loc = where_is(node); // should be lifted out of loop
2333     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2334     tso_loc = where_is((StgClosure *)tso);
2335     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2336       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2337       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2338       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2339       // insertThread(tso, node_loc);
2340       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2341                 ResumeThread,
2342                 tso, node, (rtsSpark*)NULL);
2343       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2344       // len_local++;
2345       // len++;
2346     } else { // TSO is remote (actually should be FMBQ)
2347       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2348                                   RtsFlags.GranFlags.Costs.gunblocktime +
2349                                   RtsFlags.GranFlags.Costs.latency;
2350       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2351                 UnblockThread,
2352                 tso, node, (rtsSpark*)NULL);
2353       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2354       // len++;
2355     }
2356     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2357     IF_GRAN_DEBUG(bq,
2358                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2359                           (node_loc==tso_loc ? "Local" : "Global"), 
2360                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2361     tso->block_info.closure = NULL;
2362     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2363                              tso->id, tso));
2364 }
2365 #elif defined(PAR)
2366 static StgBlockingQueueElement *
2367 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2368 {
2369     StgBlockingQueueElement *next;
2370
2371     switch (get_itbl(bqe)->type) {
2372     case TSO:
2373       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2374       /* if it's a TSO just push it onto the run_queue */
2375       next = bqe->link;
2376       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2377       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2378       THREAD_RUNNABLE();
2379       unblockCount(bqe, node);
2380       /* reset blocking status after dumping event */
2381       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2382       break;
2383
2384     case BLOCKED_FETCH:
2385       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2386       next = bqe->link;
2387       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2388       PendingFetches = (StgBlockedFetch *)bqe;
2389       break;
2390
2391 # if defined(DEBUG)
2392       /* can ignore this case in a non-debugging setup; 
2393          see comments on RBHSave closures above */
2394     case CONSTR:
2395       /* check that the closure is an RBHSave closure */
2396       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2397              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2398              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2399       break;
2400
2401     default:
2402       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2403            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2404            (StgClosure *)bqe);
2405 # endif
2406     }
2407   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2408   return next;
2409 }
2410
2411 #else /* !GRAN && !PAR */
2412 static StgTSO *
2413 unblockOneLocked(StgTSO *tso)
2414 {
2415   StgTSO *next;
2416
2417   ASSERT(get_itbl(tso)->type == TSO);
2418   ASSERT(tso->why_blocked != NotBlocked);
2419   tso->why_blocked = NotBlocked;
2420   next = tso->link;
2421   PUSH_ON_RUN_QUEUE(tso);
2422   THREAD_RUNNABLE();
2423   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2424   return next;
2425 }
2426 #endif
2427
2428 #if defined(GRAN) || defined(PAR)
2429 inline StgBlockingQueueElement *
2430 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2431 {
2432   ACQUIRE_LOCK(&sched_mutex);
2433   bqe = unblockOneLocked(bqe, node);
2434   RELEASE_LOCK(&sched_mutex);
2435   return bqe;
2436 }
2437 #else
2438 inline StgTSO *
2439 unblockOne(StgTSO *tso)
2440 {
2441   ACQUIRE_LOCK(&sched_mutex);
2442   tso = unblockOneLocked(tso);
2443   RELEASE_LOCK(&sched_mutex);
2444   return tso;
2445 }
2446 #endif
2447
2448 #if defined(GRAN)
2449 void 
2450 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2451 {
2452   StgBlockingQueueElement *bqe;
2453   PEs node_loc;
2454   nat len = 0; 
2455
2456   IF_GRAN_DEBUG(bq, 
2457                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2458                       node, CurrentProc, CurrentTime[CurrentProc], 
2459                       CurrentTSO->id, CurrentTSO));
2460
2461   node_loc = where_is(node);
2462
2463   ASSERT(q == END_BQ_QUEUE ||
2464          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2465          get_itbl(q)->type == CONSTR); // closure (type constructor)
2466   ASSERT(is_unique(node));
2467
2468   /* FAKE FETCH: magically copy the node to the tso's proc;
2469      no Fetch necessary because in reality the node should not have been 
2470      moved to the other PE in the first place
2471   */
2472   if (CurrentProc!=node_loc) {
2473     IF_GRAN_DEBUG(bq, 
2474                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2475                         node, node_loc, CurrentProc, CurrentTSO->id, 
2476                         // CurrentTSO, where_is(CurrentTSO),
2477                         node->header.gran.procs));
2478     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2479     IF_GRAN_DEBUG(bq, 
2480                   belch("## new bitmask of node %p is %#x",
2481                         node, node->header.gran.procs));
2482     if (RtsFlags.GranFlags.GranSimStats.Global) {
2483       globalGranStats.tot_fake_fetches++;
2484     }
2485   }
2486
2487   bqe = q;
2488   // ToDo: check: ASSERT(CurrentProc==node_loc);
2489   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2490     //next = bqe->link;
2491     /* 
2492        bqe points to the current element in the queue
2493        next points to the next element in the queue
2494     */
2495     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2496     //tso_loc = where_is(tso);
2497     len++;
2498     bqe = unblockOneLocked(bqe, node);
2499   }
2500
2501   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2502      the closure to make room for the anchor of the BQ */
2503   if (bqe!=END_BQ_QUEUE) {
2504     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2505     /*
2506     ASSERT((info_ptr==&RBH_Save_0_info) ||
2507            (info_ptr==&RBH_Save_1_info) ||
2508            (info_ptr==&RBH_Save_2_info));
2509     */
2510     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2511     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2512     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2513
2514     IF_GRAN_DEBUG(bq,
2515                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2516                         node, info_type(node)));
2517   }
2518
2519   /* statistics gathering */
2520   if (RtsFlags.GranFlags.GranSimStats.Global) {
2521     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2522     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2523     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2524     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2525   }
2526   IF_GRAN_DEBUG(bq,
2527                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2528                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2529 }
2530 #elif defined(PAR)
2531 void 
2532 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2533 {
2534   StgBlockingQueueElement *bqe;
2535
2536   ACQUIRE_LOCK(&sched_mutex);
2537
2538   IF_PAR_DEBUG(verbose, 
2539                belch("##-_ AwBQ for node %p on [%x]: ",
2540                      node, mytid));
2541 #ifdef DIST  
2542   //RFP
2543   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2544     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2545     return;
2546   }
2547 #endif
2548   
2549   ASSERT(q == END_BQ_QUEUE ||
2550          get_itbl(q)->type == TSO ||           
2551          get_itbl(q)->type == BLOCKED_FETCH || 
2552          get_itbl(q)->type == CONSTR); 
2553
2554   bqe = q;
2555   while (get_itbl(bqe)->type==TSO || 
2556          get_itbl(bqe)->type==BLOCKED_FETCH) {
2557     bqe = unblockOneLocked(bqe, node);
2558   }
2559   RELEASE_LOCK(&sched_mutex);
2560 }
2561
2562 #else   /* !GRAN && !PAR */
2563 void
2564 awakenBlockedQueue(StgTSO *tso)
2565 {
2566   ACQUIRE_LOCK(&sched_mutex);
2567   while (tso != END_TSO_QUEUE) {
2568     tso = unblockOneLocked(tso);
2569   }
2570   RELEASE_LOCK(&sched_mutex);
2571 }
2572 #endif
2573
2574 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2575 //@subsection Exception Handling Routines
2576
2577 /* ---------------------------------------------------------------------------
2578    Interrupt execution
2579    - usually called inside a signal handler so it mustn't do anything fancy.   
2580    ------------------------------------------------------------------------ */
2581
2582 void
2583 interruptStgRts(void)
2584 {
2585     interrupted    = 1;
2586     context_switch = 1;
2587 }
2588
2589 /* -----------------------------------------------------------------------------
2590    Unblock a thread
2591
2592    This is for use when we raise an exception in another thread, which
2593    may be blocked.
2594    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2595    -------------------------------------------------------------------------- */
2596
2597 #if defined(GRAN) || defined(PAR)
2598 /*
2599   NB: only the type of the blocking queue is different in GranSim and GUM
2600       the operations on the queue-elements are the same
2601       long live polymorphism!
2602 */
2603 static void
2604 unblockThread(StgTSO *tso)
2605 {
2606   StgBlockingQueueElement *t, **last;
2607
2608   ACQUIRE_LOCK(&sched_mutex);
2609   switch (tso->why_blocked) {
2610
2611   case NotBlocked:
2612     return;  /* not blocked */
2613
2614   case BlockedOnMVar:
2615     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2616     {
2617       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2618       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2619
2620       last = (StgBlockingQueueElement **)&mvar->head;
2621       for (t = (StgBlockingQueueElement *)mvar->head; 
2622            t != END_BQ_QUEUE; 
2623            last = &t->link, last_tso = t, t = t->link) {
2624         if (t == (StgBlockingQueueElement *)tso) {
2625           *last = (StgBlockingQueueElement *)tso->link;
2626           if (mvar->tail == tso) {
2627             mvar->tail = (StgTSO *)last_tso;
2628           }
2629           goto done;
2630         }
2631       }
2632       barf("unblockThread (MVAR): TSO not found");
2633     }
2634
2635   case BlockedOnBlackHole:
2636     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2637     {
2638       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2639
2640       last = &bq->blocking_queue;
2641       for (t = bq->blocking_queue; 
2642            t != END_BQ_QUEUE; 
2643            last = &t->link, t = t->link) {
2644         if (t == (StgBlockingQueueElement *)tso) {
2645           *last = (StgBlockingQueueElement *)tso->link;
2646           goto done;
2647         }
2648       }
2649       barf("unblockThread (BLACKHOLE): TSO not found");
2650     }
2651
2652   case BlockedOnException:
2653     {
2654       StgTSO *target  = tso->block_info.tso;
2655
2656       ASSERT(get_itbl(target)->type == TSO);
2657
2658       if (target->what_next == ThreadRelocated) {
2659           target = target->link;
2660           ASSERT(get_itbl(target)->type == TSO);
2661       }
2662
2663       ASSERT(target->blocked_exceptions != NULL);
2664
2665       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2666       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2667            t != END_BQ_QUEUE; 
2668            last = &t->link, t = t->link) {
2669         ASSERT(get_itbl(t)->type == TSO);
2670         if (t == (StgBlockingQueueElement *)tso) {
2671           *last = (StgBlockingQueueElement *)tso->link;
2672           goto done;
2673         }
2674       }
2675       barf("unblockThread (Exception): TSO not found");
2676     }
2677
2678   case BlockedOnRead:
2679   case BlockedOnWrite:
2680     {
2681       /* take TSO off blocked_queue */
2682       StgBlockingQueueElement *prev = NULL;
2683       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2684            prev = t, t = t->link) {
2685         if (t == (StgBlockingQueueElement *)tso) {
2686           if (prev == NULL) {
2687             blocked_queue_hd = (StgTSO *)t->link;
2688             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2689               blocked_queue_tl = END_TSO_QUEUE;
2690             }
2691           } else {
2692             prev->link = t->link;
2693             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2694               blocked_queue_tl = (StgTSO *)prev;
2695             }
2696           }
2697           goto done;
2698         }
2699       }
2700       barf("unblockThread (I/O): TSO not found");
2701     }
2702
2703   case BlockedOnDelay:
2704     {
2705       /* take TSO off sleeping_queue */
2706       StgBlockingQueueElement *prev = NULL;
2707       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2708            prev = t, t = t->link) {
2709         if (t == (StgBlockingQueueElement *)tso) {
2710           if (prev == NULL) {
2711             sleeping_queue = (StgTSO *)t->link;
2712           } else {
2713             prev->link = t->link;
2714           }
2715           goto done;
2716         }
2717       }
2718       barf("unblockThread (I/O): TSO not found");
2719     }
2720
2721   default:
2722     barf("unblockThread");
2723   }
2724
2725  done:
2726   tso->link = END_TSO_QUEUE;
2727   tso->why_blocked = NotBlocked;
2728   tso->block_info.closure = NULL;
2729   PUSH_ON_RUN_QUEUE(tso);
2730   RELEASE_LOCK(&sched_mutex);
2731 }
2732 #else
2733 static void
2734 unblockThread(StgTSO *tso)
2735 {
2736   StgTSO *t, **last;
2737
2738   ACQUIRE_LOCK(&sched_mutex);
2739   switch (tso->why_blocked) {
2740
2741   case NotBlocked:
2742     return;  /* not blocked */
2743
2744   case BlockedOnMVar:
2745     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2746     {
2747       StgTSO *last_tso = END_TSO_QUEUE;
2748       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2749
2750       last = &mvar->head;
2751       for (t = mvar->head; t != END_TSO_QUEUE; 
2752            last = &t->link, last_tso = t, t = t->link) {
2753         if (t == tso) {
2754           *last = tso->link;
2755           if (mvar->tail == tso) {
2756             mvar->tail = last_tso;
2757           }
2758           goto done;
2759         }
2760       }
2761       barf("unblockThread (MVAR): TSO not found");
2762     }
2763
2764   case BlockedOnBlackHole:
2765     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2766     {
2767       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2768
2769       last = &bq->blocking_queue;
2770       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2771            last = &t->link, t = t->link) {
2772         if (t == tso) {
2773           *last = tso->link;
2774           goto done;
2775         }
2776       }
2777       barf("unblockThread (BLACKHOLE): TSO not found");
2778     }
2779
2780   case BlockedOnException:
2781     {
2782       StgTSO *target  = tso->block_info.tso;
2783
2784       ASSERT(get_itbl(target)->type == TSO);
2785
2786       while (target->what_next == ThreadRelocated) {
2787           target = target->link;
2788           ASSERT(get_itbl(target)->type == TSO);
2789       }
2790       
2791       ASSERT(target->blocked_exceptions != NULL);
2792
2793       last = &target->blocked_exceptions;
2794       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2795            last = &t->link, t = t->link) {
2796         ASSERT(get_itbl(t)->type == TSO);
2797         if (t == tso) {
2798           *last = tso->link;
2799           goto done;
2800         }
2801       }
2802       barf("unblockThread (Exception): TSO not found");
2803     }
2804
2805   case BlockedOnRead:
2806   case BlockedOnWrite:
2807     {
2808       StgTSO *prev = NULL;
2809       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2810            prev = t, t = t->link) {
2811         if (t == tso) {
2812           if (prev == NULL) {
2813             blocked_queue_hd = t->link;
2814             if (blocked_queue_tl == t) {
2815               blocked_queue_tl = END_TSO_QUEUE;
2816             }
2817           } else {
2818             prev->link = t->link;
2819             if (blocked_queue_tl == t) {
2820               blocked_queue_tl = prev;
2821             }
2822           }
2823           goto done;
2824         }
2825       }
2826       barf("unblockThread (I/O): TSO not found");
2827     }
2828
2829   case BlockedOnDelay:
2830     {
2831       StgTSO *prev = NULL;
2832       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2833            prev = t, t = t->link) {
2834         if (t == tso) {
2835           if (prev == NULL) {
2836             sleeping_queue = t->link;
2837           } else {
2838             prev->link = t->link;
2839           }
2840           goto done;
2841         }
2842       }
2843       barf("unblockThread (I/O): TSO not found");
2844     }
2845
2846   default:
2847     barf("unblockThread");
2848   }
2849
2850  done:
2851   tso->link = END_TSO_QUEUE;
2852   tso->why_blocked = NotBlocked;
2853   tso->block_info.closure = NULL;
2854   PUSH_ON_RUN_QUEUE(tso);
2855   RELEASE_LOCK(&sched_mutex);
2856 }
2857 #endif
2858
2859 /* -----------------------------------------------------------------------------
2860  * raiseAsync()
2861  *
2862  * The following function implements the magic for raising an
2863  * asynchronous exception in an existing thread.
2864  *
2865  * We first remove the thread from any queue on which it might be
2866  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2867  *
2868  * We strip the stack down to the innermost CATCH_FRAME, building
2869  * thunks in the heap for all the active computations, so they can 
2870  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2871  * an application of the handler to the exception, and push it on
2872  * the top of the stack.
2873  * 
2874  * How exactly do we save all the active computations?  We create an
2875  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2876  * AP_UPDs pushes everything from the corresponding update frame
2877  * upwards onto the stack.  (Actually, it pushes everything up to the
2878  * next update frame plus a pointer to the next AP_UPD object.
2879  * Entering the next AP_UPD object pushes more onto the stack until we
2880  * reach the last AP_UPD object - at which point the stack should look
2881  * exactly as it did when we killed the TSO and we can continue
2882  * execution by entering the closure on top of the stack.
2883  *
2884  * We can also kill a thread entirely - this happens if either (a) the 
2885  * exception passed to raiseAsync is NULL, or (b) there's no
2886  * CATCH_FRAME on the stack.  In either case, we strip the entire
2887  * stack and replace the thread with a zombie.
2888  *
2889  * -------------------------------------------------------------------------- */
2890  
2891 void 
2892 deleteThread(StgTSO *tso)
2893 {
2894   raiseAsync(tso,NULL);
2895 }
2896
2897 void
2898 raiseAsync(StgTSO *tso, StgClosure *exception)
2899 {
2900   StgUpdateFrame* su = tso->su;
2901   StgPtr          sp = tso->sp;
2902   
2903   /* Thread already dead? */
2904   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2905     return;
2906   }
2907
2908   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2909
2910   /* Remove it from any blocking queues */
2911   unblockThread(tso);
2912
2913   /* The stack freezing code assumes there's a closure pointer on
2914    * the top of the stack.  This isn't always the case with compiled
2915    * code, so we have to push a dummy closure on the top which just
2916    * returns to the next return address on the stack.
2917    */
2918   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2919     *(--sp) = (W_)&stg_dummy_ret_closure;
2920   }
2921
2922   while (1) {
2923     nat words = ((P_)su - (P_)sp) - 1;
2924     nat i;
2925     StgAP_UPD * ap;
2926
2927     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2928      * then build PAP(handler,exception,realworld#), and leave it on
2929      * top of the stack ready to enter.
2930      */
2931     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2932       StgCatchFrame *cf = (StgCatchFrame *)su;
2933       /* we've got an exception to raise, so let's pass it to the
2934        * handler in this frame.
2935        */
2936       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2937       TICK_ALLOC_UPD_PAP(3,0);
2938       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
2939               
2940       ap->n_args = 2;
2941       ap->fun = cf->handler;    /* :: Exception -> IO a */
2942       ap->payload[0] = exception;
2943       ap->payload[1] = ARG_TAG(0); /* realworld token */
2944
2945       /* throw away the stack from Sp up to and including the
2946        * CATCH_FRAME.
2947        */
2948       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2949       tso->su = cf->link;
2950
2951       /* Restore the blocked/unblocked state for asynchronous exceptions
2952        * at the CATCH_FRAME.  
2953        *
2954        * If exceptions were unblocked at the catch, arrange that they
2955        * are unblocked again after executing the handler by pushing an
2956        * unblockAsyncExceptions_ret stack frame.
2957        */
2958       if (!cf->exceptions_blocked) {
2959         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
2960       }
2961       
2962       /* Ensure that async exceptions are blocked when running the handler.
2963        */
2964       if (tso->blocked_exceptions == NULL) {
2965         tso->blocked_exceptions = END_TSO_QUEUE;
2966       }
2967       
2968       /* Put the newly-built PAP on top of the stack, ready to execute
2969        * when the thread restarts.
2970        */
2971       sp[0] = (W_)ap;
2972       tso->sp = sp;
2973       tso->what_next = ThreadEnterGHC;
2974       IF_DEBUG(sanity, checkTSO(tso));
2975       return;
2976     }
2977
2978     /* First build an AP_UPD consisting of the stack chunk above the
2979      * current update frame, with the top word on the stack as the
2980      * fun field.
2981      */
2982     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2983     
2984     ASSERT(words >= 0);
2985     
2986     ap->n_args = words;
2987     ap->fun    = (StgClosure *)sp[0];
2988     sp++;
2989     for(i=0; i < (nat)words; ++i) {
2990       ap->payload[i] = (StgClosure *)*sp++;
2991     }
2992     
2993     switch (get_itbl(su)->type) {
2994       
2995     case UPDATE_FRAME:
2996       {
2997         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2998         TICK_ALLOC_UP_THK(words+1,0);
2999         
3000         IF_DEBUG(scheduler,
3001                  fprintf(stderr,  "scheduler: Updating ");
3002                  printPtr((P_)su->updatee); 
3003                  fprintf(stderr,  " with ");
3004                  printObj((StgClosure *)ap);
3005                  );
3006         
3007         /* Replace the updatee with an indirection - happily
3008          * this will also wake up any threads currently
3009          * waiting on the result.
3010          *
3011          * Warning: if we're in a loop, more than one update frame on
3012          * the stack may point to the same object.  Be careful not to
3013          * overwrite an IND_OLDGEN in this case, because we'll screw
3014          * up the mutable lists.  To be on the safe side, don't
3015          * overwrite any kind of indirection at all.  See also
3016          * threadSqueezeStack in GC.c, where we have to make a similar
3017          * check.
3018          */
3019         if (!closure_IND(su->updatee)) {
3020             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3021         }
3022         su = su->link;
3023         sp += sizeofW(StgUpdateFrame) -1;
3024         sp[0] = (W_)ap; /* push onto stack */
3025         break;
3026       }
3027
3028     case CATCH_FRAME:
3029       {
3030         StgCatchFrame *cf = (StgCatchFrame *)su;
3031         StgClosure* o;
3032         
3033         /* We want a PAP, not an AP_UPD.  Fortunately, the
3034          * layout's the same.
3035          */
3036         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3037         TICK_ALLOC_UPD_PAP(words+1,0);
3038         
3039         /* now build o = FUN(catch,ap,handler) */
3040         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3041         TICK_ALLOC_FUN(2,0);
3042         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3043         o->payload[0] = (StgClosure *)ap;
3044         o->payload[1] = cf->handler;
3045         
3046         IF_DEBUG(scheduler,
3047                  fprintf(stderr,  "scheduler: Built ");
3048                  printObj((StgClosure *)o);
3049                  );
3050         
3051         /* pop the old handler and put o on the stack */
3052         su = cf->link;
3053         sp += sizeofW(StgCatchFrame) - 1;
3054         sp[0] = (W_)o;
3055         break;
3056       }
3057       
3058     case SEQ_FRAME:
3059       {
3060         StgSeqFrame *sf = (StgSeqFrame *)su;
3061         StgClosure* o;
3062         
3063         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3064         TICK_ALLOC_UPD_PAP(words+1,0);
3065         
3066         /* now build o = FUN(seq,ap) */
3067         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3068         TICK_ALLOC_SE_THK(1,0);
3069         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3070         o->payload[0] = (StgClosure *)ap;
3071         
3072         IF_DEBUG(scheduler,
3073                  fprintf(stderr,  "scheduler: Built ");
3074                  printObj((StgClosure *)o);
3075                  );
3076         
3077         /* pop the old handler and put o on the stack */
3078         su = sf->link;
3079         sp += sizeofW(StgSeqFrame) - 1;
3080         sp[0] = (W_)o;
3081         break;
3082       }
3083       
3084     case STOP_FRAME:
3085       /* We've stripped the entire stack, the thread is now dead. */
3086       sp += sizeofW(StgStopFrame) - 1;
3087       sp[0] = (W_)exception;    /* save the exception */
3088       tso->what_next = ThreadKilled;
3089       tso->su = (StgUpdateFrame *)(sp+1);
3090       tso->sp = sp;
3091       return;
3092
3093     default:
3094       barf("raiseAsync");
3095     }
3096   }
3097   barf("raiseAsync");
3098 }
3099
3100 /* -----------------------------------------------------------------------------
3101    resurrectThreads is called after garbage collection on the list of
3102    threads found to be garbage.  Each of these threads will be woken
3103    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3104    on an MVar, or NonTermination if the thread was blocked on a Black
3105    Hole.
3106    -------------------------------------------------------------------------- */
3107
3108 void
3109 resurrectThreads( StgTSO *threads )
3110 {
3111   StgTSO *tso, *next;
3112
3113   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3114     next = tso->global_link;
3115     tso->global_link = all_threads;
3116     all_threads = tso;
3117     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3118
3119     switch (tso->why_blocked) {
3120     case BlockedOnMVar:
3121     case BlockedOnException:
3122       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3123       break;
3124     case BlockedOnBlackHole:
3125       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3126       break;
3127     case NotBlocked:
3128       /* This might happen if the thread was blocked on a black hole
3129        * belonging to a thread that we've just woken up (raiseAsync
3130        * can wake up threads, remember...).
3131        */
3132       continue;
3133     default:
3134       barf("resurrectThreads: thread blocked in a strange way");
3135     }
3136   }
3137 }
3138
3139 /* -----------------------------------------------------------------------------
3140  * Blackhole detection: if we reach a deadlock, test whether any
3141  * threads are blocked on themselves.  Any threads which are found to
3142  * be self-blocked get sent a NonTermination exception.
3143  *
3144  * This is only done in a deadlock situation in order to avoid
3145  * performance overhead in the normal case.
3146  * -------------------------------------------------------------------------- */
3147
3148 static void
3149 detectBlackHoles( void )
3150 {
3151     StgTSO *t = all_threads;
3152     StgUpdateFrame *frame;
3153     StgClosure *blocked_on;
3154
3155     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3156
3157         while (t->what_next == ThreadRelocated) {
3158             t = t->link;
3159             ASSERT(get_itbl(t)->type == TSO);
3160         }
3161       
3162         if (t->why_blocked != BlockedOnBlackHole) {
3163             continue;
3164         }
3165
3166         blocked_on = t->block_info.closure;
3167
3168         for (frame = t->su; ; frame = frame->link) {
3169             switch (get_itbl(frame)->type) {
3170
3171             case UPDATE_FRAME:
3172                 if (frame->updatee == blocked_on) {
3173                     /* We are blocking on one of our own computations, so
3174                      * send this thread the NonTermination exception.  
3175                      */
3176                     IF_DEBUG(scheduler, 
3177                              sched_belch("thread %d is blocked on itself", t->id));
3178                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3179                     goto done;
3180                 }
3181                 else {
3182                     continue;
3183                 }
3184
3185             case CATCH_FRAME:
3186             case SEQ_FRAME:
3187                 continue;
3188                 
3189             case STOP_FRAME:
3190                 break;
3191             }
3192             break;
3193         }
3194
3195     done: ;
3196     }   
3197 }
3198
3199 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3200 //@subsection Debugging Routines
3201
3202 /* -----------------------------------------------------------------------------
3203    Debugging: why is a thread blocked
3204    -------------------------------------------------------------------------- */
3205
3206 #ifdef DEBUG
3207
3208 void
3209 printThreadBlockage(StgTSO *tso)
3210 {
3211   switch (tso->why_blocked) {
3212   case BlockedOnRead:
3213     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3214     break;
3215   case BlockedOnWrite:
3216     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3217     break;
3218   case BlockedOnDelay:
3219     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3220     break;
3221   case BlockedOnMVar:
3222     fprintf(stderr,"is blocked on an MVar");
3223     break;
3224   case BlockedOnException:
3225     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3226             tso->block_info.tso->id);
3227     break;
3228   case BlockedOnBlackHole:
3229     fprintf(stderr,"is blocked on a black hole");
3230     break;
3231   case NotBlocked:
3232     fprintf(stderr,"is not blocked");
3233     break;
3234 #if defined(PAR)
3235   case BlockedOnGA:
3236     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3237             tso->block_info.closure, info_type(tso->block_info.closure));
3238     break;
3239   case BlockedOnGA_NoSend:
3240     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3241             tso->block_info.closure, info_type(tso->block_info.closure));
3242     break;
3243 #endif
3244   default:
3245     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3246          tso->why_blocked, tso->id, tso);
3247   }
3248 }
3249
3250 void
3251 printThreadStatus(StgTSO *tso)
3252 {
3253   switch (tso->what_next) {
3254   case ThreadKilled:
3255     fprintf(stderr,"has been killed");
3256     break;
3257   case ThreadComplete:
3258     fprintf(stderr,"has completed");
3259     break;
3260   default:
3261     printThreadBlockage(tso);
3262   }
3263 }
3264
3265 void
3266 printAllThreads(void)
3267 {
3268   StgTSO *t;
3269
3270 # if defined(GRAN)
3271   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3272   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3273                        time_string, rtsFalse/*no commas!*/);
3274
3275   sched_belch("all threads at [%s]:", time_string);
3276 # elif defined(PAR)
3277   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3278   ullong_format_string(CURRENT_TIME,
3279                        time_string, rtsFalse/*no commas!*/);
3280
3281   sched_belch("all threads at [%s]:", time_string);
3282 # else
3283   sched_belch("all threads:");
3284 # endif
3285
3286   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3287     fprintf(stderr, "\tthread %d ", t->id);
3288     printThreadStatus(t);
3289     fprintf(stderr,"\n");
3290   }
3291 }
3292     
3293 /* 
3294    Print a whole blocking queue attached to node (debugging only).
3295 */
3296 //@cindex print_bq
3297 # if defined(PAR)
3298 void 
3299 print_bq (StgClosure *node)
3300 {
3301   StgBlockingQueueElement *bqe;
3302   StgTSO *tso;
3303   rtsBool end;
3304
3305   fprintf(stderr,"## BQ of closure %p (%s): ",
3306           node, info_type(node));
3307
3308   /* should cover all closures that may have a blocking queue */
3309   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3310          get_itbl(node)->type == FETCH_ME_BQ ||
3311          get_itbl(node)->type == RBH ||
3312          get_itbl(node)->type == MVAR);
3313     
3314   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3315
3316   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3317 }
3318
3319 /* 
3320    Print a whole blocking queue starting with the element bqe.
3321 */
3322 void 
3323 print_bqe (StgBlockingQueueElement *bqe)
3324 {
3325   rtsBool end;
3326
3327   /* 
3328      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3329   */
3330   for (end = (bqe==END_BQ_QUEUE);
3331        !end; // iterate until bqe points to a CONSTR
3332        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3333        bqe = end ? END_BQ_QUEUE : bqe->link) {
3334     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3335     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3336     /* types of closures that may appear in a blocking queue */
3337     ASSERT(get_itbl(bqe)->type == TSO ||           
3338            get_itbl(bqe)->type == BLOCKED_FETCH || 
3339            get_itbl(bqe)->type == CONSTR); 
3340     /* only BQs of an RBH end with an RBH_Save closure */
3341     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3342
3343     switch (get_itbl(bqe)->type) {
3344     case TSO:
3345       fprintf(stderr," TSO %u (%x),",
3346               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3347       break;
3348     case BLOCKED_FETCH:
3349       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3350               ((StgBlockedFetch *)bqe)->node, 
3351               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3352               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3353               ((StgBlockedFetch *)bqe)->ga.weight);
3354       break;
3355     case CONSTR:
3356       fprintf(stderr," %s (IP %p),",
3357               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3358                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3359                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3360                "RBH_Save_?"), get_itbl(bqe));
3361       break;
3362     default:
3363       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3364            info_type((StgClosure *)bqe)); // , node, info_type(node));
3365       break;
3366     }
3367   } /* for */
3368   fputc('\n', stderr);
3369 }
3370 # elif defined(GRAN)
3371 void 
3372 print_bq (StgClosure *node)
3373 {
3374   StgBlockingQueueElement *bqe;
3375   PEs node_loc, tso_loc;
3376   rtsBool end;
3377
3378   /* should cover all closures that may have a blocking queue */
3379   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3380          get_itbl(node)->type == FETCH_ME_BQ ||
3381          get_itbl(node)->type == RBH);
3382     
3383   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3384   node_loc = where_is(node);
3385
3386   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3387           node, info_type(node), node_loc);
3388
3389   /* 
3390      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3391   */
3392   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3393        !end; // iterate until bqe points to a CONSTR
3394        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3395     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3396     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3397     /* types of closures that may appear in a blocking queue */
3398     ASSERT(get_itbl(bqe)->type == TSO ||           
3399            get_itbl(bqe)->type == CONSTR); 
3400     /* only BQs of an RBH end with an RBH_Save closure */
3401     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3402
3403     tso_loc = where_is((StgClosure *)bqe);
3404     switch (get_itbl(bqe)->type) {
3405     case TSO:
3406       fprintf(stderr," TSO %d (%p) on [PE %d],",
3407               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3408       break;
3409     case CONSTR:
3410       fprintf(stderr," %s (IP %p),",
3411               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3412                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3413                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3414                "RBH_Save_?"), get_itbl(bqe));
3415       break;
3416     default:
3417       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3418            info_type((StgClosure *)bqe), node, info_type(node));
3419       break;
3420     }
3421   } /* for */
3422   fputc('\n', stderr);
3423 }
3424 #else
3425 /* 
3426    Nice and easy: only TSOs on the blocking queue
3427 */
3428 void 
3429 print_bq (StgClosure *node)
3430 {
3431   StgTSO *tso;
3432
3433   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3434   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3435        tso != END_TSO_QUEUE; 
3436        tso=tso->link) {
3437     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3438     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3439     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3440   }
3441   fputc('\n', stderr);
3442 }
3443 # endif
3444
3445 #if defined(PAR)
3446 static nat
3447 run_queue_len(void)
3448 {
3449   nat i;
3450   StgTSO *tso;
3451
3452   for (i=0, tso=run_queue_hd; 
3453        tso != END_TSO_QUEUE;
3454        i++, tso=tso->link)
3455     /* nothing */
3456
3457   return i;
3458 }
3459 #endif
3460
3461 static void
3462 sched_belch(char *s, ...)
3463 {
3464   va_list ap;
3465   va_start(ap,s);
3466 #ifdef SMP
3467   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3468 #elif defined(PAR)
3469   fprintf(stderr, "== ");
3470 #else
3471   fprintf(stderr, "scheduler: ");
3472 #endif
3473   vfprintf(stderr, s, ap);
3474   fprintf(stderr, "\n");
3475 }
3476
3477 #endif /* DEBUG */
3478
3479
3480 //@node Index,  , Debugging Routines, Main scheduling code
3481 //@subsection Index
3482
3483 //@index
3484 //* MainRegTable::  @cindex\s-+MainRegTable
3485 //* StgMainThread::  @cindex\s-+StgMainThread
3486 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3487 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3488 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3489 //* context_switch::  @cindex\s-+context_switch
3490 //* createThread::  @cindex\s-+createThread
3491 //* free_capabilities::  @cindex\s-+free_capabilities
3492 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3493 //* initScheduler::  @cindex\s-+initScheduler
3494 //* interrupted::  @cindex\s-+interrupted
3495 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3496 //* next_thread_id::  @cindex\s-+next_thread_id
3497 //* print_bq::  @cindex\s-+print_bq
3498 //* run_queue_hd::  @cindex\s-+run_queue_hd
3499 //* run_queue_tl::  @cindex\s-+run_queue_tl
3500 //* sched_mutex::  @cindex\s-+sched_mutex
3501 //* schedule::  @cindex\s-+schedule
3502 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3503 //* task_ids::  @cindex\s-+task_ids
3504 //* term_mutex::  @cindex\s-+term_mutex
3505 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3506 //@end index