[project @ 2000-04-14 16:47:43 by panne]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.68 2000/04/14 16:47:43 panne Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Main scheduling loop::      
41 //* Suspend and Resume::        
42 //* Run queue code::            
43 //* Garbage Collextion Routines::  
44 //* Blocking Queue Routines::   
45 //* Exception Handling Routines::  
46 //* Debugging Routines::        
47 //* Index::                     
48 //@end menu
49
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
52
53 #include "Rts.h"
54 #include "SchedAPI.h"
55 #include "RtsUtils.h"
56 #include "RtsFlags.h"
57 #include "Storage.h"
58 #include "StgRun.h"
59 #include "StgStartup.h"
60 #include "GC.h"
61 #include "Hooks.h"
62 #include "Schedule.h"
63 #include "StgMiscClosures.h"
64 #include "Storage.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
67 #include "Printer.h"
68 #include "Main.h"
69 #include "Signals.h"
70 #include "Sanity.h"
71 #include "Stats.h"
72 #include "Itimer.h"
73 #include "Prelude.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
76 # include "GranSim.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
80 # include "FetchMe.h"
81 # include "HLC.h"
82 #endif
83 #include "Sparks.h"
84
85 #include <stdarg.h>
86
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
89
90 /* Main threads:
91  *
92  * These are the threads which clients have requested that we run.  
93  *
94  * In an SMP build, we might have several concurrent clients all
95  * waiting for results, and each one will wait on a condition variable
96  * until the result is available.
97  *
98  * In non-SMP, clients are strictly nested: the first client calls
99  * into the RTS, which might call out again to C with a _ccall_GC, and
100  * eventually re-enter the RTS.
101  *
102  * Main threads information is kept in a linked list:
103  */
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
106   StgTSO *         tso;
107   SchedulerStatus  stat;
108   StgClosure **    ret;
109 #ifdef SMP
110   pthread_cond_t wakeup;
111 #endif
112   struct StgMainThread_ *link;
113 } StgMainThread;
114
115 /* Main thread queue.
116  * Locks required: sched_mutex.
117  */
118 static StgMainThread *main_threads;
119
120 /* Thread queues.
121  * Locks required: sched_mutex.
122  */
123 #if defined(GRAN)
124
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
127
128 /* 
129    In GranSim we have a runable and a blocked queue for each processor.
130    In order to minimise code changes new arrays run_queue_hds/tls
131    are created. run_queue_hd is then a short cut (macro) for
132    run_queue_hds[CurrentProc] (see GranSim.h).
133    -- HWL
134 */
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139    the std RTS (i.e. we are cheating). However, we don't use this list in
140    the GranSim specific code at the moment (so we are only potentially
141    cheating).  */
142
143 #else /* !GRAN */
144
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
147
148 #endif
149
150 /* Linked list of all threads.
151  * Used for detecting garbage collected threads.
152  */
153 StgTSO *all_threads;
154
155 /* Threads suspended in _ccall_GC.
156  */
157 static StgTSO *suspended_ccalling_threads;
158
159 static void GetRoots(void);
160 static StgTSO *threadStackOverflow(StgTSO *tso);
161
162 /* KH: The following two flags are shared memory locations.  There is no need
163        to lock them, since they are only unset at the end of a scheduler
164        operation.
165 */
166
167 /* flag set by signal handler to precipitate a context switch */
168 //@cindex context_switch
169 nat context_switch;
170
171 /* if this flag is set as well, give up execution */
172 //@cindex interrupted
173 rtsBool interrupted;
174
175 /* Next thread ID to allocate.
176  * Locks required: sched_mutex
177  */
178 //@cindex next_thread_id
179 StgThreadID next_thread_id = 1;
180
181 /*
182  * Pointers to the state of the current thread.
183  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
185  */
186  
187 /* The smallest stack size that makes any sense is:
188  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
189  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
190  *  + 1                       (the realworld token for an IO thread)
191  *  + 1                       (the closure to enter)
192  *
193  * A thread with this stack will bomb immediately with a stack
194  * overflow, which will increase its stack size.  
195  */
196
197 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
198
199 /* Free capability list.
200  * Locks required: sched_mutex.
201  */
202 #ifdef SMP
203 //@cindex free_capabilities
204 //@cindex n_free_capabilities
205 Capability *free_capabilities; /* Available capabilities for running threads */
206 nat n_free_capabilities;       /* total number of available capabilities */
207 #else
208 //@cindex MainRegTable
209 Capability MainRegTable;       /* for non-SMP, we have one global capability */
210 #endif
211
212 #if defined(GRAN)
213 StgTSO *CurrentTSO;
214 #endif
215
216 rtsBool ready_to_gc;
217
218 /* All our current task ids, saved in case we need to kill them later.
219  */
220 #ifdef SMP
221 //@cindex task_ids
222 task_info *task_ids;
223 #endif
224
225 void            addToBlockedQueue ( StgTSO *tso );
226
227 static void     schedule          ( void );
228        void     interruptStgRts   ( void );
229 #if defined(GRAN)
230 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
231 #else
232 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
233 #endif
234
235 #ifdef DEBUG
236 static void sched_belch(char *s, ...);
237 #endif
238
239 #ifdef SMP
240 //@cindex sched_mutex
241 //@cindex term_mutex
242 //@cindex thread_ready_cond
243 //@cindex gc_pending_cond
244 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
245 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
246 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
247 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
248
249 nat await_death;
250 #endif
251
252 #if defined(PAR)
253 StgTSO *LastTSO;
254 rtsTime TimeOfLastYield;
255 #endif
256
257 #if DEBUG
258 char *whatNext_strs[] = {
259   "ThreadEnterGHC",
260   "ThreadRunGHC",
261   "ThreadEnterHugs",
262   "ThreadKilled",
263   "ThreadComplete"
264 };
265
266 char *threadReturnCode_strs[] = {
267   "HeapOverflow",                       /* might also be StackOverflow */
268   "StackOverflow",
269   "ThreadYielding",
270   "ThreadBlocked",
271   "ThreadFinished"
272 };
273 #endif
274
275 /*
276  * The thread state for the main thread.
277 // ToDo: check whether not needed any more
278 StgTSO   *MainTSO;
279  */
280
281 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
282 //@subsection Main scheduling loop
283
284 /* ---------------------------------------------------------------------------
285    Main scheduling loop.
286
287    We use round-robin scheduling, each thread returning to the
288    scheduler loop when one of these conditions is detected:
289
290       * out of heap space
291       * timer expires (thread yields)
292       * thread blocks
293       * thread ends
294       * stack overflow
295
296    Locking notes:  we acquire the scheduler lock once at the beginning
297    of the scheduler loop, and release it when
298     
299       * running a thread, or
300       * waiting for work, or
301       * waiting for a GC to complete.
302
303    GRAN version:
304      In a GranSim setup this loop iterates over the global event queue.
305      This revolves around the global event queue, which determines what 
306      to do next. Therefore, it's more complicated than either the 
307      concurrent or the parallel (GUM) setup.
308
309    GUM version:
310      GUM iterates over incoming messages.
311      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312      and sends out a fish whenever it has nothing to do; in-between
313      doing the actual reductions (shared code below) it processes the
314      incoming messages and deals with delayed operations 
315      (see PendingFetches).
316      This is not the ugliest code you could imagine, but it's bloody close.
317
318    ------------------------------------------------------------------------ */
319 //@cindex schedule
320 static void
321 schedule( void )
322 {
323   StgTSO *t;
324   Capability *cap;
325   StgThreadReturnCode ret;
326 #if defined(GRAN)
327   rtsEvent *event;
328 #elif defined(PAR)
329   StgSparkPool *pool;
330   rtsSpark spark;
331   StgTSO *tso;
332   GlobalTaskId pe;
333 #endif
334   rtsBool was_interrupted = rtsFalse;
335   
336   ACQUIRE_LOCK(&sched_mutex);
337
338 #if defined(GRAN)
339
340   /* set up first event to get things going */
341   /* ToDo: assign costs for system setup and init MainTSO ! */
342   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
343             ContinueThread, 
344             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
345
346   IF_DEBUG(gran,
347            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
348            G_TSO(CurrentTSO, 5));
349
350   if (RtsFlags.GranFlags.Light) {
351     /* Save current time; GranSim Light only */
352     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
353   }      
354
355   event = get_next_event();
356
357   while (event!=(rtsEvent*)NULL) {
358     /* Choose the processor with the next event */
359     CurrentProc = event->proc;
360     CurrentTSO = event->tso;
361
362 #elif defined(PAR)
363
364   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
365
366 #else
367
368   while (1) {
369
370 #endif
371
372     IF_DEBUG(scheduler, printAllThreads());
373
374     /* If we're interrupted (the user pressed ^C, or some other
375      * termination condition occurred), kill all the currently running
376      * threads.
377      */
378     if (interrupted) {
379       IF_DEBUG(scheduler, sched_belch("interrupted"));
380       for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
381         deleteThread(t);
382       }
383       for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
384         deleteThread(t);
385       }
386       run_queue_hd = run_queue_tl = END_TSO_QUEUE;
387       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
388       interrupted = rtsFalse;
389       was_interrupted = rtsTrue;
390     }
391
392     /* Go through the list of main threads and wake up any
393      * clients whose computations have finished.  ToDo: this
394      * should be done more efficiently without a linear scan
395      * of the main threads list, somehow...
396      */
397 #ifdef SMP
398     { 
399       StgMainThread *m, **prev;
400       prev = &main_threads;
401       for (m = main_threads; m != NULL; m = m->link) {
402         switch (m->tso->what_next) {
403         case ThreadComplete:
404           if (m->ret) {
405             *(m->ret) = (StgClosure *)m->tso->sp[0];
406           }
407           *prev = m->link;
408           m->stat = Success;
409           pthread_cond_broadcast(&m->wakeup);
410           break;
411         case ThreadKilled:
412           *prev = m->link;
413           if (was_interrupted) {
414             m->stat = Interrupted;
415           } else {
416             m->stat = Killed;
417           }
418           pthread_cond_broadcast(&m->wakeup);
419           break;
420         default:
421           break;
422         }
423       }
424     }
425
426 #else
427 # if defined(PAR)
428     /* in GUM do this only on the Main PE */
429     if (IAmMainThread)
430 # endif
431     /* If our main thread has finished or been killed, return.
432      */
433     {
434       StgMainThread *m = main_threads;
435       if (m->tso->what_next == ThreadComplete
436           || m->tso->what_next == ThreadKilled) {
437         main_threads = main_threads->link;
438         if (m->tso->what_next == ThreadComplete) {
439           /* we finished successfully, fill in the return value */
440           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
441           m->stat = Success;
442           return;
443         } else {
444           if (was_interrupted) {
445             m->stat = Interrupted;
446           } else {
447             m->stat = Killed;
448           }
449           return;
450         }
451       }
452     }
453 #endif
454
455     /* Top up the run queue from our spark pool.  We try to make the
456      * number of threads in the run queue equal to the number of
457      * free capabilities.
458      */
459 #if defined(SMP)
460     {
461       nat n = n_free_capabilities;
462       StgTSO *tso = run_queue_hd;
463
464       /* Count the run queue */
465       while (n > 0 && tso != END_TSO_QUEUE) {
466         tso = tso->link;
467         n--;
468       }
469
470       for (; n > 0; n--) {
471         StgClosure *spark;
472         spark = findSpark();
473         if (spark == NULL) {
474           break; /* no more sparks in the pool */
475         } else {
476           /* I'd prefer this to be done in activateSpark -- HWL */
477           /* tricky - it needs to hold the scheduler lock and
478            * not try to re-acquire it -- SDM */
479           StgTSO *tso;
480           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
481           pushClosure(tso,spark);
482           PUSH_ON_RUN_QUEUE(tso);
483 #ifdef PAR
484           advisory_thread_count++;
485 #endif
486           
487           IF_DEBUG(scheduler,
488                    sched_belch("turning spark of closure %p into a thread",
489                                (StgClosure *)spark));
490         }
491       }
492       /* We need to wake up the other tasks if we just created some
493        * work for them.
494        */
495       if (n_free_capabilities - n > 1) {
496           pthread_cond_signal(&thread_ready_cond);
497       }
498     }
499 #endif /* SMP */
500
501     /* Check whether any waiting threads need to be woken up.  If the
502      * run queue is empty, and there are no other tasks running, we
503      * can wait indefinitely for something to happen.
504      * ToDo: what if another client comes along & requests another
505      * main thread?
506      */
507     if (blocked_queue_hd != END_TSO_QUEUE) {
508       awaitEvent(
509            (run_queue_hd == END_TSO_QUEUE)
510 #ifdef SMP
511         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
512 #endif
513         );
514     }
515     
516     /* check for signals each time around the scheduler */
517 #ifndef __MINGW32__
518     if (signals_pending()) {
519       start_signal_handlers();
520     }
521 #endif
522
523     /* Detect deadlock: when we have no threads to run, there are
524      * no threads waiting on I/O or sleeping, and all the other
525      * tasks are waiting for work, we must have a deadlock.  Inform
526      * all the main threads.
527      */
528 #ifdef SMP
529     if (blocked_queue_hd == END_TSO_QUEUE
530         && run_queue_hd == END_TSO_QUEUE
531         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
532         ) {
533       StgMainThread *m;
534       for (m = main_threads; m != NULL; m = m->link) {
535           m->ret = NULL;
536           m->stat = Deadlock;
537           pthread_cond_broadcast(&m->wakeup);
538       }
539       main_threads = NULL;
540     }
541 #else /* ! SMP */
542     if (blocked_queue_hd == END_TSO_QUEUE
543         && run_queue_hd == END_TSO_QUEUE) {
544         StgMainThread *m = main_threads;
545         m->ret = NULL;
546         m->stat = Deadlock;
547         main_threads = m->link;
548         return;
549     }
550 #endif
551
552 #ifdef SMP
553     /* If there's a GC pending, don't do anything until it has
554      * completed.
555      */
556     if (ready_to_gc) {
557       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
558       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
559     }
560     
561     /* block until we've got a thread on the run queue and a free
562      * capability.
563      */
564     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
565       IF_DEBUG(scheduler, sched_belch("waiting for work"));
566       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
567       IF_DEBUG(scheduler, sched_belch("work now available"));
568     }
569 #endif
570
571 #if defined(GRAN)
572
573     if (RtsFlags.GranFlags.Light)
574       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
575
576     /* adjust time based on time-stamp */
577     if (event->time > CurrentTime[CurrentProc] &&
578         event->evttype != ContinueThread)
579       CurrentTime[CurrentProc] = event->time;
580     
581     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
582     if (!RtsFlags.GranFlags.Light)
583       handleIdlePEs();
584
585     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
586
587     /* main event dispatcher in GranSim */
588     switch (event->evttype) {
589       /* Should just be continuing execution */
590     case ContinueThread:
591       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
592       /* ToDo: check assertion
593       ASSERT(run_queue_hd != (StgTSO*)NULL &&
594              run_queue_hd != END_TSO_QUEUE);
595       */
596       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
597       if (!RtsFlags.GranFlags.DoAsyncFetch &&
598           procStatus[CurrentProc]==Fetching) {
599         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
600               CurrentTSO->id, CurrentTSO, CurrentProc);
601         goto next_thread;
602       } 
603       /* Ignore ContinueThreads for completed threads */
604       if (CurrentTSO->what_next == ThreadComplete) {
605         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
606               CurrentTSO->id, CurrentTSO, CurrentProc);
607         goto next_thread;
608       } 
609       /* Ignore ContinueThreads for threads that are being migrated */
610       if (PROCS(CurrentTSO)==Nowhere) { 
611         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
612               CurrentTSO->id, CurrentTSO, CurrentProc);
613         goto next_thread;
614       }
615       /* The thread should be at the beginning of the run queue */
616       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
617         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
618               CurrentTSO->id, CurrentTSO, CurrentProc);
619         break; // run the thread anyway
620       }
621       /*
622       new_event(proc, proc, CurrentTime[proc],
623                 FindWork,
624                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
625       goto next_thread; 
626       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
627       break; // now actually run the thread; DaH Qu'vam yImuHbej 
628
629     case FetchNode:
630       do_the_fetchnode(event);
631       goto next_thread;             /* handle next event in event queue  */
632       
633     case GlobalBlock:
634       do_the_globalblock(event);
635       goto next_thread;             /* handle next event in event queue  */
636       
637     case FetchReply:
638       do_the_fetchreply(event);
639       goto next_thread;             /* handle next event in event queue  */
640       
641     case UnblockThread:   /* Move from the blocked queue to the tail of */
642       do_the_unblock(event);
643       goto next_thread;             /* handle next event in event queue  */
644       
645     case ResumeThread:  /* Move from the blocked queue to the tail of */
646       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
647       event->tso->gran.blocktime += 
648         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
649       do_the_startthread(event);
650       goto next_thread;             /* handle next event in event queue  */
651       
652     case StartThread:
653       do_the_startthread(event);
654       goto next_thread;             /* handle next event in event queue  */
655       
656     case MoveThread:
657       do_the_movethread(event);
658       goto next_thread;             /* handle next event in event queue  */
659       
660     case MoveSpark:
661       do_the_movespark(event);
662       goto next_thread;             /* handle next event in event queue  */
663       
664     case FindWork:
665       do_the_findwork(event);
666       goto next_thread;             /* handle next event in event queue  */
667       
668     default:
669       barf("Illegal event type %u\n", event->evttype);
670     }  /* switch */
671     
672     /* This point was scheduler_loop in the old RTS */
673
674     IF_DEBUG(gran, belch("GRAN: after main switch"));
675
676     TimeOfLastEvent = CurrentTime[CurrentProc];
677     TimeOfNextEvent = get_time_of_next_event();
678     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
679     // CurrentTSO = ThreadQueueHd;
680
681     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
682                          TimeOfNextEvent));
683
684     if (RtsFlags.GranFlags.Light) 
685       GranSimLight_leave_system(event, &ActiveTSO); 
686
687     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
688
689     IF_DEBUG(gran, 
690              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
691
692     /* in a GranSim setup the TSO stays on the run queue */
693     t = CurrentTSO;
694     /* Take a thread from the run queue. */
695     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
696
697     IF_DEBUG(gran, 
698              fprintf(stderr, "GRAN: About to run current thread, which is\n");
699              G_TSO(t,5))
700
701     context_switch = 0; // turned on via GranYield, checking events and time slice
702
703     IF_DEBUG(gran, 
704              DumpGranEvent(GR_SCHEDULE, t));
705
706     procStatus[CurrentProc] = Busy;
707
708 #elif defined(PAR)
709
710     if (PendingFetches != END_BF_QUEUE) {
711         processFetches();
712     }
713
714     /* ToDo: phps merge with spark activation above */
715     /* check whether we have local work and send requests if we have none */
716     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
717       /* :-[  no local threads => look out for local sparks */
718       /* the spark pool for the current PE */
719       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
720       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
721           pool->hd < pool->tl) {
722         /* 
723          * ToDo: add GC code check that we really have enough heap afterwards!!
724          * Old comment:
725          * If we're here (no runnable threads) and we have pending
726          * sparks, we must have a space problem.  Get enough space
727          * to turn one of those pending sparks into a
728          * thread... 
729          */
730         
731         spark = findSpark();                /* get a spark */
732         if (spark != (rtsSpark) NULL) {
733           tso = activateSpark(spark);       /* turn the spark into a thread */
734           IF_PAR_DEBUG(schedule,
735                        belch("==== schedule: Created TSO %d (%p); %d threads active",
736                              tso->id, tso, advisory_thread_count));
737
738           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
739             belch("==^^ failed to activate spark");
740             goto next_thread;
741           }               /* otherwise fall through & pick-up new tso */
742         } else {
743           IF_PAR_DEBUG(verbose,
744                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
745                              spark_queue_len(pool)));
746           goto next_thread;
747         }
748       } else  
749       /* =8-[  no local sparks => look for work on other PEs */
750       {
751         /*
752          * We really have absolutely no work.  Send out a fish
753          * (there may be some out there already), and wait for
754          * something to arrive.  We clearly can't run any threads
755          * until a SCHEDULE or RESUME arrives, and so that's what
756          * we're hoping to see.  (Of course, we still have to
757          * respond to other types of messages.)
758          */
759         if (//!fishing &&  
760             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
761           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
762           /* fishing set in sendFish, processFish;
763              avoid flooding system with fishes via delay */
764           pe = choosePE();
765           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
766                    NEW_FISH_HUNGER);
767         }
768         
769         processMessages();
770         goto next_thread;
771         // ReSchedule(0);
772       }
773     } else if (PacketsWaiting()) {  /* Look for incoming messages */
774       processMessages();
775     }
776
777     /* Now we are sure that we have some work available */
778     ASSERT(run_queue_hd != END_TSO_QUEUE);
779     /* Take a thread from the run queue, if we have work */
780     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
781
782     /* ToDo: write something to the log-file
783     if (RTSflags.ParFlags.granSimStats && !sameThread)
784         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
785
786     CurrentTSO = t;
787     */
788     /* the spark pool for the current PE */
789     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
790
791     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)", 
792                               spark_queue_len(pool), 
793                               CURRENT_PROC,
794                               pool->hd, pool->tl, pool->base, pool->lim));
795
796     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
797                               run_queue_len(), CURRENT_PROC,
798                               run_queue_hd, run_queue_tl));
799
800 #if 0
801     if (t != LastTSO) {
802       /* 
803          we are running a different TSO, so write a schedule event to log file
804          NB: If we use fair scheduling we also have to write  a deschedule 
805              event for LastTSO; with unfair scheduling we know that the
806              previous tso has blocked whenever we switch to another tso, so
807              we don't need it in GUM for now
808       */
809       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
810                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
811       
812     }
813 #endif
814 #else /* !GRAN && !PAR */
815   
816     /* grab a thread from the run queue
817      */
818     ASSERT(run_queue_hd != END_TSO_QUEUE);
819     t = POP_RUN_QUEUE();
820     IF_DEBUG(sanity,checkTSO(t));
821
822 #endif
823     
824     /* grab a capability
825      */
826 #ifdef SMP
827     cap = free_capabilities;
828     free_capabilities = cap->link;
829     n_free_capabilities--;
830 #else
831     cap = &MainRegTable;
832 #endif
833     
834     cap->rCurrentTSO = t;
835     
836     /* set the context_switch flag
837      */
838     if (run_queue_hd == END_TSO_QUEUE)
839       context_switch = 0;
840     else
841       context_switch = 1;
842
843     RELEASE_LOCK(&sched_mutex);
844
845     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
846                               t->id, t, whatNext_strs[t->what_next]));
847
848     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
849     /* Run the current thread 
850      */
851     switch (cap->rCurrentTSO->what_next) {
852     case ThreadKilled:
853     case ThreadComplete:
854       /* Thread already finished, return to scheduler. */
855       ret = ThreadFinished;
856       break;
857     case ThreadEnterGHC:
858       ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
859       break;
860     case ThreadRunGHC:
861       ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
862       break;
863     case ThreadEnterHugs:
864 #ifdef INTERPRETER
865       {
866          StgClosure* c;
867          IF_DEBUG(scheduler,sched_belch("entering Hugs"));
868          c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
869          cap->rCurrentTSO->sp += 1;
870          ret = enter(cap,c);
871          break;
872       }
873 #else
874       barf("Panic: entered a BCO but no bytecode interpreter in this build");
875 #endif
876     default:
877       barf("schedule: invalid what_next field");
878     }
879     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
880     
881     /* Costs for the scheduler are assigned to CCS_SYSTEM */
882 #ifdef PROFILING
883     CCCS = CCS_SYSTEM;
884 #endif
885     
886     ACQUIRE_LOCK(&sched_mutex);
887
888 #ifdef SMP
889     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
890 #elif !defined(GRAN) && !defined(PAR)
891     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
892 #endif
893     t = cap->rCurrentTSO;
894     
895 #if defined(PAR)
896     /* HACK 675: if the last thread didn't yield, make sure to print a 
897        SCHEDULE event to the log file when StgRunning the next thread, even
898        if it is the same one as before */
899     LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t; 
900     TimeOfLastYield = CURRENT_TIME;
901 #endif
902
903     switch (ret) {
904     case HeapOverflow:
905       /* make all the running tasks block on a condition variable,
906        * maybe set context_switch and wait till they all pile in,
907        * then have them wait on a GC condition variable.
908        */
909       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
910                                t->id, t, whatNext_strs[t->what_next]));
911       threadPaused(t);
912 #if defined(GRAN)
913       ASSERT(!is_on_queue(t,CurrentProc));
914 #endif
915       
916       ready_to_gc = rtsTrue;
917       context_switch = 1;               /* stop other threads ASAP */
918       PUSH_ON_RUN_QUEUE(t);
919       /* actual GC is done at the end of the while loop */
920       break;
921       
922     case StackOverflow:
923       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
924                                t->id, t, whatNext_strs[t->what_next]));
925       /* just adjust the stack for this thread, then pop it back
926        * on the run queue.
927        */
928       threadPaused(t);
929       { 
930         StgMainThread *m;
931         /* enlarge the stack */
932         StgTSO *new_t = threadStackOverflow(t);
933         
934         /* This TSO has moved, so update any pointers to it from the
935          * main thread stack.  It better not be on any other queues...
936          * (it shouldn't be).
937          */
938         for (m = main_threads; m != NULL; m = m->link) {
939           if (m->tso == t) {
940             m->tso = new_t;
941           }
942         }
943         threadPaused(new_t);
944         PUSH_ON_RUN_QUEUE(new_t);
945       }
946       break;
947
948     case ThreadYielding:
949 #if defined(GRAN)
950       IF_DEBUG(gran, 
951                DumpGranEvent(GR_DESCHEDULE, t));
952       globalGranStats.tot_yields++;
953 #elif defined(PAR)
954       IF_DEBUG(par, 
955                DumpGranEvent(GR_DESCHEDULE, t));
956 #endif
957       /* put the thread back on the run queue.  Then, if we're ready to
958        * GC, check whether this is the last task to stop.  If so, wake
959        * up the GC thread.  getThread will block during a GC until the
960        * GC is finished.
961        */
962       IF_DEBUG(scheduler,
963                if (t->what_next == ThreadEnterHugs) {
964                    /* ToDo: or maybe a timer expired when we were in Hugs?
965                     * or maybe someone hit ctrl-C
966                     */
967                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
968                          t->id, t, whatNext_strs[t->what_next]);
969                } else {
970                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
971                          t->id, t, whatNext_strs[t->what_next]);
972                }
973                );
974       threadPaused(t);
975       IF_DEBUG(sanity,
976                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
977                checkTSO(t));
978       ASSERT(t->link == END_TSO_QUEUE);
979 #if defined(GRAN)
980       ASSERT(!is_on_queue(t,CurrentProc));
981
982       IF_DEBUG(sanity,
983                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
984                checkThreadQsSanity(rtsTrue));
985 #endif
986       APPEND_TO_RUN_QUEUE(t);
987 #if defined(GRAN)
988       /* add a ContinueThread event to actually process the thread */
989       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
990                 ContinueThread,
991                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
992       IF_GRAN_DEBUG(bq, 
993                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
994                G_EVENTQ(0);
995                G_CURR_THREADQ(0))
996 #endif /* GRAN */
997       break;
998       
999     case ThreadBlocked:
1000 #if defined(GRAN)
1001       IF_DEBUG(scheduler,
1002                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1003                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1004                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1005
1006       // ??? needed; should emit block before
1007       IF_DEBUG(gran, 
1008                DumpGranEvent(GR_DESCHEDULE, t)); 
1009       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1010       /*
1011         ngoq Dogh!
1012       ASSERT(procStatus[CurrentProc]==Busy || 
1013               ((procStatus[CurrentProc]==Fetching) && 
1014               (t->block_info.closure!=(StgClosure*)NULL)));
1015       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1016           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1017             procStatus[CurrentProc]==Fetching)) 
1018         procStatus[CurrentProc] = Idle;
1019       */
1020 #elif defined(PAR)
1021       IF_DEBUG(par, 
1022                DumpGranEvent(GR_DESCHEDULE, t)); 
1023
1024       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1025       blockThread(t);
1026
1027       IF_DEBUG(scheduler,
1028                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1029                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1030                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1031
1032 #else /* !GRAN */
1033       /* don't need to do anything.  Either the thread is blocked on
1034        * I/O, in which case we'll have called addToBlockedQueue
1035        * previously, or it's blocked on an MVar or Blackhole, in which
1036        * case it'll be on the relevant queue already.
1037        */
1038       IF_DEBUG(scheduler,
1039                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1040                printThreadBlockage(t);
1041                fprintf(stderr, "\n"));
1042
1043       /* Only for dumping event to log file 
1044          ToDo: do I need this in GranSim, too?
1045       blockThread(t);
1046       */
1047 #endif
1048       threadPaused(t);
1049       break;
1050       
1051     case ThreadFinished:
1052       /* Need to check whether this was a main thread, and if so, signal
1053        * the task that started it with the return value.  If we have no
1054        * more main threads, we probably need to stop all the tasks until
1055        * we get a new one.
1056        */
1057       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1058       t->what_next = ThreadComplete;
1059 #if defined(GRAN)
1060       endThread(t, CurrentProc); // clean-up the thread
1061 #elif defined(PAR)
1062       advisory_thread_count--;
1063       if (RtsFlags.ParFlags.ParStats.Full) 
1064         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1065 #endif
1066       break;
1067       
1068     default:
1069       barf("schedule: invalid thread return code %d", (int)ret);
1070     }
1071     
1072 #ifdef SMP
1073     cap->link = free_capabilities;
1074     free_capabilities = cap;
1075     n_free_capabilities++;
1076 #endif
1077
1078 #ifdef SMP
1079     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1080 #else
1081     if (ready_to_gc) 
1082 #endif
1083       {
1084       /* everybody back, start the GC.
1085        * Could do it in this thread, or signal a condition var
1086        * to do it in another thread.  Either way, we need to
1087        * broadcast on gc_pending_cond afterward.
1088        */
1089 #ifdef SMP
1090       IF_DEBUG(scheduler,sched_belch("doing GC"));
1091 #endif
1092       GarbageCollect(GetRoots,rtsFalse);
1093       ready_to_gc = rtsFalse;
1094 #ifdef SMP
1095       pthread_cond_broadcast(&gc_pending_cond);
1096 #endif
1097 #if defined(GRAN)
1098       /* add a ContinueThread event to continue execution of current thread */
1099       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1100                 ContinueThread,
1101                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1102       IF_GRAN_DEBUG(bq, 
1103                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1104                G_EVENTQ(0);
1105                G_CURR_THREADQ(0))
1106 #endif /* GRAN */
1107     }
1108 #if defined(GRAN)
1109   next_thread:
1110     IF_GRAN_DEBUG(unused,
1111                   print_eventq(EventHd));
1112
1113     event = get_next_event();
1114
1115 #elif defined(PAR)
1116   next_thread:
1117     /* ToDo: wait for next message to arrive rather than busy wait */
1118
1119 #else /* GRAN */
1120   /* not any more
1121   next_thread:
1122     t = take_off_run_queue(END_TSO_QUEUE);
1123   */
1124 #endif /* GRAN */
1125   } /* end of while(1) */
1126 }
1127
1128 /* A hack for Hugs concurrency support.  Needs sanitisation (?) */
1129 void deleteAllThreads ( void )
1130 {
1131   StgTSO* t;
1132   IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
1133   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1134     deleteThread(t);
1135   }
1136   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1137     deleteThread(t);
1138   }
1139   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1140   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1141 }
1142
1143 /* startThread and  insertThread are now in GranSim.c -- HWL */
1144
1145 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1146 //@subsection Suspend and Resume
1147
1148 /* ---------------------------------------------------------------------------
1149  * Suspending & resuming Haskell threads.
1150  * 
1151  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1152  * its capability before calling the C function.  This allows another
1153  * task to pick up the capability and carry on running Haskell
1154  * threads.  It also means that if the C call blocks, it won't lock
1155  * the whole system.
1156  *
1157  * The Haskell thread making the C call is put to sleep for the
1158  * duration of the call, on the susepended_ccalling_threads queue.  We
1159  * give out a token to the task, which it can use to resume the thread
1160  * on return from the C function.
1161  * ------------------------------------------------------------------------- */
1162    
1163 StgInt
1164 suspendThread( Capability *cap )
1165 {
1166   nat tok;
1167
1168   ACQUIRE_LOCK(&sched_mutex);
1169
1170   IF_DEBUG(scheduler,
1171            sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
1172
1173   threadPaused(cap->rCurrentTSO);
1174   cap->rCurrentTSO->link = suspended_ccalling_threads;
1175   suspended_ccalling_threads = cap->rCurrentTSO;
1176
1177   /* Use the thread ID as the token; it should be unique */
1178   tok = cap->rCurrentTSO->id;
1179
1180 #ifdef SMP
1181   cap->link = free_capabilities;
1182   free_capabilities = cap;
1183   n_free_capabilities++;
1184 #endif
1185
1186   RELEASE_LOCK(&sched_mutex);
1187   return tok; 
1188 }
1189
1190 Capability *
1191 resumeThread( StgInt tok )
1192 {
1193   StgTSO *tso, **prev;
1194   Capability *cap;
1195
1196   ACQUIRE_LOCK(&sched_mutex);
1197
1198   prev = &suspended_ccalling_threads;
1199   for (tso = suspended_ccalling_threads; 
1200        tso != END_TSO_QUEUE; 
1201        prev = &tso->link, tso = tso->link) {
1202     if (tso->id == (StgThreadID)tok) {
1203       *prev = tso->link;
1204       break;
1205     }
1206   }
1207   if (tso == END_TSO_QUEUE) {
1208     barf("resumeThread: thread not found");
1209   }
1210
1211 #ifdef SMP
1212   while (free_capabilities == NULL) {
1213     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1214     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1215     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1216   }
1217   cap = free_capabilities;
1218   free_capabilities = cap->link;
1219   n_free_capabilities--;
1220 #else  
1221   cap = &MainRegTable;
1222 #endif
1223
1224   cap->rCurrentTSO = tso;
1225
1226   RELEASE_LOCK(&sched_mutex);
1227   return cap;
1228 }
1229
1230
1231 /* ---------------------------------------------------------------------------
1232  * Static functions
1233  * ------------------------------------------------------------------------ */
1234 static void unblockThread(StgTSO *tso);
1235
1236 /* ---------------------------------------------------------------------------
1237  * Comparing Thread ids.
1238  *
1239  * This is used from STG land in the implementation of the
1240  * instances of Eq/Ord for ThreadIds.
1241  * ------------------------------------------------------------------------ */
1242
1243 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1244
1245   StgThreadID id1 = tso1->id; 
1246   StgThreadID id2 = tso2->id;
1247  
1248   if (id1 < id2) return (-1);
1249   if (id1 > id2) return 1;
1250   return 0;
1251 }
1252
1253 /* ---------------------------------------------------------------------------
1254    Create a new thread.
1255
1256    The new thread starts with the given stack size.  Before the
1257    scheduler can run, however, this thread needs to have a closure
1258    (and possibly some arguments) pushed on its stack.  See
1259    pushClosure() in Schedule.h.
1260
1261    createGenThread() and createIOThread() (in SchedAPI.h) are
1262    convenient packaged versions of this function.
1263
1264    currently pri (priority) is only used in a GRAN setup -- HWL
1265    ------------------------------------------------------------------------ */
1266 //@cindex createThread
1267 #if defined(GRAN)
1268 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1269 StgTSO *
1270 createThread(nat stack_size, StgInt pri)
1271 {
1272   return createThread_(stack_size, rtsFalse, pri);
1273 }
1274
1275 static StgTSO *
1276 createThread_(nat size, rtsBool have_lock, StgInt pri)
1277 {
1278 #else
1279 StgTSO *
1280 createThread(nat stack_size)
1281 {
1282   return createThread_(stack_size, rtsFalse);
1283 }
1284
1285 static StgTSO *
1286 createThread_(nat size, rtsBool have_lock)
1287 {
1288 #endif
1289
1290     StgTSO *tso;
1291     nat stack_size;
1292
1293     /* First check whether we should create a thread at all */
1294 #if defined(PAR)
1295   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1296   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1297     threadsIgnored++;
1298     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1299           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1300     return END_TSO_QUEUE;
1301   }
1302   threadsCreated++;
1303 #endif
1304
1305 #if defined(GRAN)
1306   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1307 #endif
1308
1309   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1310
1311   /* catch ridiculously small stack sizes */
1312   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1313     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1314   }
1315
1316   stack_size = size - TSO_STRUCT_SIZEW;
1317
1318   tso = (StgTSO *)allocate(size);
1319   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1320
1321   SET_HDR(tso, &TSO_info, CCS_SYSTEM);
1322 #if defined(GRAN)
1323   SET_GRAN_HDR(tso, ThisPE);
1324 #endif
1325   tso->what_next     = ThreadEnterGHC;
1326
1327   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1328    * protect the increment operation on next_thread_id.
1329    * In future, we could use an atomic increment instead.
1330    */
1331   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1332   tso->id = next_thread_id++; 
1333   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1334
1335   tso->why_blocked  = NotBlocked;
1336   tso->blocked_exceptions = NULL;
1337
1338   tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1339   tso->stack_size   = stack_size;
1340   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1341                               - TSO_STRUCT_SIZEW;
1342   tso->sp           = (P_)&(tso->stack) + stack_size;
1343
1344 #ifdef PROFILING
1345   tso->prof.CCCS = CCS_MAIN;
1346 #endif
1347
1348   /* put a stop frame on the stack */
1349   tso->sp -= sizeofW(StgStopFrame);
1350   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1351   tso->su = (StgUpdateFrame*)tso->sp;
1352
1353   // ToDo: check this
1354 #if defined(GRAN)
1355   tso->link = END_TSO_QUEUE;
1356   /* uses more flexible routine in GranSim */
1357   insertThread(tso, CurrentProc);
1358 #else
1359   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1360    * from its creation
1361    */
1362 #endif
1363
1364 #if defined(GRAN) || defined(PAR)
1365   DumpGranEvent(GR_START,tso);
1366 #endif
1367
1368   /* Link the new thread on the global thread list.
1369    */
1370   tso->global_link = all_threads;
1371   all_threads = tso;
1372
1373 #if defined(GRAN)
1374   tso->gran.pri = pri;
1375 # if defined(DEBUG)
1376   tso->gran.magic = TSO_MAGIC; // debugging only
1377 # endif
1378   tso->gran.sparkname   = 0;
1379   tso->gran.startedat   = CURRENT_TIME; 
1380   tso->gran.exported    = 0;
1381   tso->gran.basicblocks = 0;
1382   tso->gran.allocs      = 0;
1383   tso->gran.exectime    = 0;
1384   tso->gran.fetchtime   = 0;
1385   tso->gran.fetchcount  = 0;
1386   tso->gran.blocktime   = 0;
1387   tso->gran.blockcount  = 0;
1388   tso->gran.blockedat   = 0;
1389   tso->gran.globalsparks = 0;
1390   tso->gran.localsparks  = 0;
1391   if (RtsFlags.GranFlags.Light)
1392     tso->gran.clock  = Now; /* local clock */
1393   else
1394     tso->gran.clock  = 0;
1395
1396   IF_DEBUG(gran,printTSO(tso));
1397 #elif defined(PAR)
1398 # if defined(DEBUG)
1399   tso->par.magic = TSO_MAGIC; // debugging only
1400 # endif
1401   tso->par.sparkname   = 0;
1402   tso->par.startedat   = CURRENT_TIME; 
1403   tso->par.exported    = 0;
1404   tso->par.basicblocks = 0;
1405   tso->par.allocs      = 0;
1406   tso->par.exectime    = 0;
1407   tso->par.fetchtime   = 0;
1408   tso->par.fetchcount  = 0;
1409   tso->par.blocktime   = 0;
1410   tso->par.blockcount  = 0;
1411   tso->par.blockedat   = 0;
1412   tso->par.globalsparks = 0;
1413   tso->par.localsparks  = 0;
1414 #endif
1415
1416 #if defined(GRAN)
1417   globalGranStats.tot_threads_created++;
1418   globalGranStats.threads_created_on_PE[CurrentProc]++;
1419   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1420   globalGranStats.tot_sq_probes++;
1421 #endif 
1422
1423 #if defined(GRAN)
1424   IF_GRAN_DEBUG(pri,
1425                 belch("==__ schedule: Created TSO %d (%p);",
1426                       CurrentProc, tso, tso->id));
1427 #elif defined(PAR)
1428     IF_PAR_DEBUG(verbose,
1429                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1430                        tso->id, tso, advisory_thread_count));
1431 #else
1432   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1433                                  tso->id, tso->stack_size));
1434 #endif    
1435   return tso;
1436 }
1437
1438 /*
1439   Turn a spark into a thread.
1440   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1441 */
1442 #if defined(PAR)
1443 //@cindex activateSpark
1444 StgTSO *
1445 activateSpark (rtsSpark spark) 
1446 {
1447   StgTSO *tso;
1448   
1449   ASSERT(spark != (rtsSpark)NULL);
1450   tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1451   if (tso!=END_TSO_QUEUE) {
1452     pushClosure(tso,spark);
1453     PUSH_ON_RUN_QUEUE(tso);
1454     advisory_thread_count++;
1455
1456     if (RtsFlags.ParFlags.ParStats.Full) {
1457       //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1458       IF_PAR_DEBUG(verbose,
1459                    belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1460                          (StgClosure *)spark, info_type((StgClosure *)spark)));
1461     }
1462   } else {
1463     barf("activateSpark: Cannot create TSO");
1464   }
1465   // ToDo: fwd info on local/global spark to thread -- HWL
1466   // tso->gran.exported =  spark->exported;
1467   // tso->gran.locked =   !spark->global;
1468   // tso->gran.sparkname = spark->name;
1469
1470   return tso;
1471 }
1472 #endif
1473
1474 /* ---------------------------------------------------------------------------
1475  * scheduleThread()
1476  *
1477  * scheduleThread puts a thread on the head of the runnable queue.
1478  * This will usually be done immediately after a thread is created.
1479  * The caller of scheduleThread must create the thread using e.g.
1480  * createThread and push an appropriate closure
1481  * on this thread's stack before the scheduler is invoked.
1482  * ------------------------------------------------------------------------ */
1483
1484 void
1485 scheduleThread(StgTSO *tso)
1486 {
1487   if (tso==END_TSO_QUEUE){    
1488     schedule();
1489     return;
1490   }
1491
1492   ACQUIRE_LOCK(&sched_mutex);
1493
1494   /* Put the new thread on the head of the runnable queue.  The caller
1495    * better push an appropriate closure on this thread's stack
1496    * beforehand.  In the SMP case, the thread may start running as
1497    * soon as we release the scheduler lock below.
1498    */
1499   PUSH_ON_RUN_QUEUE(tso);
1500   THREAD_RUNNABLE();
1501
1502 #if 0
1503   IF_DEBUG(scheduler,printTSO(tso));
1504 #endif
1505   RELEASE_LOCK(&sched_mutex);
1506 }
1507
1508 /* ---------------------------------------------------------------------------
1509  * startTasks()
1510  *
1511  * Start up Posix threads to run each of the scheduler tasks.
1512  * I believe the task ids are not needed in the system as defined.
1513  *  KH @ 25/10/99
1514  * ------------------------------------------------------------------------ */
1515
1516 #if defined(PAR) || defined(SMP)
1517 void *
1518 taskStart( void *arg STG_UNUSED )
1519 {
1520   rts_evalNothing(NULL);
1521 }
1522 #endif
1523
1524 /* ---------------------------------------------------------------------------
1525  * initScheduler()
1526  *
1527  * Initialise the scheduler.  This resets all the queues - if the
1528  * queues contained any threads, they'll be garbage collected at the
1529  * next pass.
1530  *
1531  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1532  * ------------------------------------------------------------------------ */
1533
1534 #ifdef SMP
1535 static void
1536 term_handler(int sig STG_UNUSED)
1537 {
1538   stat_workerStop();
1539   ACQUIRE_LOCK(&term_mutex);
1540   await_death--;
1541   RELEASE_LOCK(&term_mutex);
1542   pthread_exit(NULL);
1543 }
1544 #endif
1545
1546 //@cindex initScheduler
1547 void 
1548 initScheduler(void)
1549 {
1550 #if defined(GRAN)
1551   nat i;
1552
1553   for (i=0; i<=MAX_PROC; i++) {
1554     run_queue_hds[i]      = END_TSO_QUEUE;
1555     run_queue_tls[i]      = END_TSO_QUEUE;
1556     blocked_queue_hds[i]  = END_TSO_QUEUE;
1557     blocked_queue_tls[i]  = END_TSO_QUEUE;
1558     ccalling_threadss[i]  = END_TSO_QUEUE;
1559   }
1560 #else
1561   run_queue_hd      = END_TSO_QUEUE;
1562   run_queue_tl      = END_TSO_QUEUE;
1563   blocked_queue_hd  = END_TSO_QUEUE;
1564   blocked_queue_tl  = END_TSO_QUEUE;
1565 #endif 
1566
1567   suspended_ccalling_threads  = END_TSO_QUEUE;
1568
1569   main_threads = NULL;
1570   all_threads  = END_TSO_QUEUE;
1571
1572   context_switch = 0;
1573   interrupted    = 0;
1574
1575 #ifdef INTERPRETER
1576   ecafList = END_ECAF_LIST;
1577   clearECafTable();
1578 #endif
1579
1580   /* Install the SIGHUP handler */
1581 #ifdef SMP
1582   {
1583     struct sigaction action,oact;
1584
1585     action.sa_handler = term_handler;
1586     sigemptyset(&action.sa_mask);
1587     action.sa_flags = 0;
1588     if (sigaction(SIGTERM, &action, &oact) != 0) {
1589       barf("can't install TERM handler");
1590     }
1591   }
1592 #endif
1593
1594 #ifdef SMP
1595   /* Allocate N Capabilities */
1596   {
1597     nat i;
1598     Capability *cap, *prev;
1599     cap  = NULL;
1600     prev = NULL;
1601     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1602       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1603       cap->link = prev;
1604       prev = cap;
1605     }
1606     free_capabilities = cap;
1607     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1608   }
1609   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1610                              n_free_capabilities););
1611 #endif
1612
1613 #if defined(SMP) || defined(PAR)
1614   initSparkPools();
1615 #endif
1616 }
1617
1618 #ifdef SMP
1619 void
1620 startTasks( void )
1621 {
1622   nat i;
1623   int r;
1624   pthread_t tid;
1625   
1626   /* make some space for saving all the thread ids */
1627   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1628                             "initScheduler:task_ids");
1629   
1630   /* and create all the threads */
1631   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1632     r = pthread_create(&tid,NULL,taskStart,NULL);
1633     if (r != 0) {
1634       barf("startTasks: Can't create new Posix thread");
1635     }
1636     task_ids[i].id = tid;
1637     task_ids[i].mut_time = 0.0;
1638     task_ids[i].mut_etime = 0.0;
1639     task_ids[i].gc_time = 0.0;
1640     task_ids[i].gc_etime = 0.0;
1641     task_ids[i].elapsedtimestart = elapsedtime();
1642     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1643   }
1644 }
1645 #endif
1646
1647 void
1648 exitScheduler( void )
1649 {
1650 #ifdef SMP
1651   nat i;
1652
1653   /* Don't want to use pthread_cancel, since we'd have to install
1654    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1655    * all our locks.
1656    */
1657 #if 0
1658   /* Cancel all our tasks */
1659   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1660     pthread_cancel(task_ids[i].id);
1661   }
1662   
1663   /* Wait for all the tasks to terminate */
1664   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1665     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1666                                task_ids[i].id));
1667     pthread_join(task_ids[i].id, NULL);
1668   }
1669 #endif
1670
1671   /* Send 'em all a SIGHUP.  That should shut 'em up.
1672    */
1673   await_death = RtsFlags.ParFlags.nNodes;
1674   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1675     pthread_kill(task_ids[i].id,SIGTERM);
1676   }
1677   while (await_death > 0) {
1678     sched_yield();
1679   }
1680 #endif
1681 }
1682
1683 /* -----------------------------------------------------------------------------
1684    Managing the per-task allocation areas.
1685    
1686    Each capability comes with an allocation area.  These are
1687    fixed-length block lists into which allocation can be done.
1688
1689    ToDo: no support for two-space collection at the moment???
1690    -------------------------------------------------------------------------- */
1691
1692 /* -----------------------------------------------------------------------------
1693  * waitThread is the external interface for running a new computation
1694  * and waiting for the result.
1695  *
1696  * In the non-SMP case, we create a new main thread, push it on the 
1697  * main-thread stack, and invoke the scheduler to run it.  The
1698  * scheduler will return when the top main thread on the stack has
1699  * completed or died, and fill in the necessary fields of the
1700  * main_thread structure.
1701  *
1702  * In the SMP case, we create a main thread as before, but we then
1703  * create a new condition variable and sleep on it.  When our new
1704  * main thread has completed, we'll be woken up and the status/result
1705  * will be in the main_thread struct.
1706  * -------------------------------------------------------------------------- */
1707
1708 int 
1709 howManyThreadsAvail ( void )
1710 {
1711    int i = 0;
1712    StgTSO* q;
1713    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1714       i++;
1715    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1716       i++;
1717    return i;
1718 }
1719
1720 void
1721 finishAllThreads ( void )
1722 {
1723    do {
1724       while (run_queue_hd != END_TSO_QUEUE) {
1725          waitThread ( run_queue_hd, NULL );
1726       }
1727       while (blocked_queue_hd != END_TSO_QUEUE) {
1728          waitThread ( blocked_queue_hd, NULL );
1729       }
1730    } while 
1731       (blocked_queue_hd != END_TSO_QUEUE || 
1732         run_queue_hd != END_TSO_QUEUE);
1733 }
1734
1735 SchedulerStatus
1736 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1737 {
1738   StgMainThread *m;
1739   SchedulerStatus stat;
1740
1741   ACQUIRE_LOCK(&sched_mutex);
1742   
1743   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1744
1745   m->tso = tso;
1746   m->ret = ret;
1747   m->stat = NoStatus;
1748 #ifdef SMP
1749   pthread_cond_init(&m->wakeup, NULL);
1750 #endif
1751
1752   m->link = main_threads;
1753   main_threads = m;
1754
1755   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1756                               m->tso->id));
1757
1758 #ifdef SMP
1759   do {
1760     pthread_cond_wait(&m->wakeup, &sched_mutex);
1761   } while (m->stat == NoStatus);
1762 #elif defined(GRAN)
1763   /* GranSim specific init */
1764   CurrentTSO = m->tso;                // the TSO to run
1765   procStatus[MainProc] = Busy;        // status of main PE
1766   CurrentProc = MainProc;             // PE to run it on
1767
1768   schedule();
1769 #else
1770   schedule();
1771   ASSERT(m->stat != NoStatus);
1772 #endif
1773
1774   stat = m->stat;
1775
1776 #ifdef SMP
1777   pthread_cond_destroy(&m->wakeup);
1778 #endif
1779
1780   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1781                               m->tso->id));
1782   free(m);
1783
1784   RELEASE_LOCK(&sched_mutex);
1785
1786   return stat;
1787 }
1788
1789 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1790 //@subsection Run queue code 
1791
1792 #if 0
1793 /* 
1794    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1795        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1796        implicit global variable that has to be correct when calling these
1797        fcts -- HWL 
1798 */
1799
1800 /* Put the new thread on the head of the runnable queue.
1801  * The caller of createThread better push an appropriate closure
1802  * on this thread's stack before the scheduler is invoked.
1803  */
1804 static /* inline */ void
1805 add_to_run_queue(tso)
1806 StgTSO* tso; 
1807 {
1808   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1809   tso->link = run_queue_hd;
1810   run_queue_hd = tso;
1811   if (run_queue_tl == END_TSO_QUEUE) {
1812     run_queue_tl = tso;
1813   }
1814 }
1815
1816 /* Put the new thread at the end of the runnable queue. */
1817 static /* inline */ void
1818 push_on_run_queue(tso)
1819 StgTSO* tso; 
1820 {
1821   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1822   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1823   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1824   if (run_queue_hd == END_TSO_QUEUE) {
1825     run_queue_hd = tso;
1826   } else {
1827     run_queue_tl->link = tso;
1828   }
1829   run_queue_tl = tso;
1830 }
1831
1832 /* 
1833    Should be inlined because it's used very often in schedule.  The tso
1834    argument is actually only needed in GranSim, where we want to have the
1835    possibility to schedule *any* TSO on the run queue, irrespective of the
1836    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1837    the run queue and dequeue the tso, adjusting the links in the queue. 
1838 */
1839 //@cindex take_off_run_queue
1840 static /* inline */ StgTSO*
1841 take_off_run_queue(StgTSO *tso) {
1842   StgTSO *t, *prev;
1843
1844   /* 
1845      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1846
1847      if tso is specified, unlink that tso from the run_queue (doesn't have
1848      to be at the beginning of the queue); GranSim only 
1849   */
1850   if (tso!=END_TSO_QUEUE) {
1851     /* find tso in queue */
1852     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1853          t!=END_TSO_QUEUE && t!=tso;
1854          prev=t, t=t->link) 
1855       /* nothing */ ;
1856     ASSERT(t==tso);
1857     /* now actually dequeue the tso */
1858     if (prev!=END_TSO_QUEUE) {
1859       ASSERT(run_queue_hd!=t);
1860       prev->link = t->link;
1861     } else {
1862       /* t is at beginning of thread queue */
1863       ASSERT(run_queue_hd==t);
1864       run_queue_hd = t->link;
1865     }
1866     /* t is at end of thread queue */
1867     if (t->link==END_TSO_QUEUE) {
1868       ASSERT(t==run_queue_tl);
1869       run_queue_tl = prev;
1870     } else {
1871       ASSERT(run_queue_tl!=t);
1872     }
1873     t->link = END_TSO_QUEUE;
1874   } else {
1875     /* take tso from the beginning of the queue; std concurrent code */
1876     t = run_queue_hd;
1877     if (t != END_TSO_QUEUE) {
1878       run_queue_hd = t->link;
1879       t->link = END_TSO_QUEUE;
1880       if (run_queue_hd == END_TSO_QUEUE) {
1881         run_queue_tl = END_TSO_QUEUE;
1882       }
1883     }
1884   }
1885   return t;
1886 }
1887
1888 #endif /* 0 */
1889
1890 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1891 //@subsection Garbage Collextion Routines
1892
1893 /* ---------------------------------------------------------------------------
1894    Where are the roots that we know about?
1895
1896         - all the threads on the runnable queue
1897         - all the threads on the blocked queue
1898         - all the thread currently executing a _ccall_GC
1899         - all the "main threads"
1900      
1901    ------------------------------------------------------------------------ */
1902
1903 /* This has to be protected either by the scheduler monitor, or by the
1904         garbage collection monitor (probably the latter).
1905         KH @ 25/10/99
1906 */
1907
1908 static void GetRoots(void)
1909 {
1910   StgMainThread *m;
1911
1912 #if defined(GRAN)
1913   {
1914     nat i;
1915     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1916       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1917         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1918       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1919         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1920       
1921       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1922         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1923       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1924         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1925       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1926         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1927     }
1928   }
1929
1930   markEventQueue();
1931
1932 #else /* !GRAN */
1933   if (run_queue_hd != END_TSO_QUEUE) {
1934     ASSERT(run_queue_tl != END_TSO_QUEUE);
1935     run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1936     run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1937   }
1938
1939   if (blocked_queue_hd != END_TSO_QUEUE) {
1940     ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1941     blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1942     blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1943   }
1944 #endif 
1945
1946   for (m = main_threads; m != NULL; m = m->link) {
1947     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1948   }
1949   if (suspended_ccalling_threads != END_TSO_QUEUE)
1950     suspended_ccalling_threads = 
1951       (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1952
1953 #if defined(SMP) || defined(PAR) || defined(GRAN)
1954   markSparkQueue();
1955 #endif
1956 }
1957
1958 /* -----------------------------------------------------------------------------
1959    performGC
1960
1961    This is the interface to the garbage collector from Haskell land.
1962    We provide this so that external C code can allocate and garbage
1963    collect when called from Haskell via _ccall_GC.
1964
1965    It might be useful to provide an interface whereby the programmer
1966    can specify more roots (ToDo).
1967    
1968    This needs to be protected by the GC condition variable above.  KH.
1969    -------------------------------------------------------------------------- */
1970
1971 void (*extra_roots)(void);
1972
1973 void
1974 performGC(void)
1975 {
1976   GarbageCollect(GetRoots,rtsFalse);
1977 }
1978
1979 void
1980 performMajorGC(void)
1981 {
1982   GarbageCollect(GetRoots,rtsTrue);
1983 }
1984
1985 static void
1986 AllRoots(void)
1987 {
1988   GetRoots();                   /* the scheduler's roots */
1989   extra_roots();                /* the user's roots */
1990 }
1991
1992 void
1993 performGCWithRoots(void (*get_roots)(void))
1994 {
1995   extra_roots = get_roots;
1996
1997   GarbageCollect(AllRoots,rtsFalse);
1998 }
1999
2000 /* -----------------------------------------------------------------------------
2001    Stack overflow
2002
2003    If the thread has reached its maximum stack size, then raise the
2004    StackOverflow exception in the offending thread.  Otherwise
2005    relocate the TSO into a larger chunk of memory and adjust its stack
2006    size appropriately.
2007    -------------------------------------------------------------------------- */
2008
2009 static StgTSO *
2010 threadStackOverflow(StgTSO *tso)
2011 {
2012   nat new_stack_size, new_tso_size, diff, stack_words;
2013   StgPtr new_sp;
2014   StgTSO *dest;
2015
2016   IF_DEBUG(sanity,checkTSO(tso));
2017   if (tso->stack_size >= tso->max_stack_size) {
2018
2019     IF_DEBUG(gc,
2020              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2021                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2022              /* If we're debugging, just print out the top of the stack */
2023              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2024                                               tso->sp+64)));
2025
2026 #ifdef INTERPRETER
2027     fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
2028     exit(1);
2029 #else
2030     /* Send this thread the StackOverflow exception */
2031     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2032 #endif
2033     return tso;
2034   }
2035
2036   /* Try to double the current stack size.  If that takes us over the
2037    * maximum stack size for this thread, then use the maximum instead.
2038    * Finally round up so the TSO ends up as a whole number of blocks.
2039    */
2040   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2041   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2042                                        TSO_STRUCT_SIZE)/sizeof(W_);
2043   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2044   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2045
2046   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2047
2048   dest = (StgTSO *)allocate(new_tso_size);
2049   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2050
2051   /* copy the TSO block and the old stack into the new area */
2052   memcpy(dest,tso,TSO_STRUCT_SIZE);
2053   stack_words = tso->stack + tso->stack_size - tso->sp;
2054   new_sp = (P_)dest + new_tso_size - stack_words;
2055   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2056
2057   /* relocate the stack pointers... */
2058   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2059   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2060   dest->sp    = new_sp;
2061   dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2062   dest->stack_size = new_stack_size;
2063         
2064   /* and relocate the update frame list */
2065   relocate_TSO(tso, dest);
2066
2067   /* Mark the old TSO as relocated.  We have to check for relocated
2068    * TSOs in the garbage collector and any primops that deal with TSOs.
2069    *
2070    * It's important to set the sp and su values to just beyond the end
2071    * of the stack, so we don't attempt to scavenge any part of the
2072    * dead TSO's stack.
2073    */
2074   tso->what_next = ThreadRelocated;
2075   tso->link = dest;
2076   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2077   tso->su = (StgUpdateFrame *)tso->sp;
2078   tso->why_blocked = NotBlocked;
2079   dest->mut_link = NULL;
2080
2081   IF_PAR_DEBUG(verbose,
2082                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2083                      tso->id, tso, tso->stack_size);
2084                /* If we're debugging, just print out the top of the stack */
2085                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2086                                                 tso->sp+64)));
2087   
2088   IF_DEBUG(sanity,checkTSO(tso));
2089 #if 0
2090   IF_DEBUG(scheduler,printTSO(dest));
2091 #endif
2092
2093   return dest;
2094 }
2095
2096 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2097 //@subsection Blocking Queue Routines
2098
2099 /* ---------------------------------------------------------------------------
2100    Wake up a queue that was blocked on some resource.
2101    ------------------------------------------------------------------------ */
2102
2103 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
2104
2105 #if defined(GRAN)
2106 static inline void
2107 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2108 {
2109 }
2110 #elif defined(PAR)
2111 static inline void
2112 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2113 {
2114   /* write RESUME events to log file and
2115      update blocked and fetch time (depending on type of the orig closure) */
2116   if (RtsFlags.ParFlags.ParStats.Full) {
2117     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2118                      GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2119                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2120
2121     switch (get_itbl(node)->type) {
2122         case FETCH_ME_BQ:
2123           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2124           break;
2125         case RBH:
2126         case FETCH_ME:
2127         case BLACKHOLE_BQ:
2128           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2129           break;
2130         default:
2131           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2132         }
2133       }
2134 }
2135 #endif
2136
2137 #if defined(GRAN)
2138 static StgBlockingQueueElement *
2139 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2140 {
2141     StgTSO *tso;
2142     PEs node_loc, tso_loc;
2143
2144     node_loc = where_is(node); // should be lifted out of loop
2145     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2146     tso_loc = where_is((StgClosure *)tso);
2147     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2148       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2149       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2150       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2151       // insertThread(tso, node_loc);
2152       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2153                 ResumeThread,
2154                 tso, node, (rtsSpark*)NULL);
2155       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2156       // len_local++;
2157       // len++;
2158     } else { // TSO is remote (actually should be FMBQ)
2159       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2160                                   RtsFlags.GranFlags.Costs.gunblocktime +
2161                                   RtsFlags.GranFlags.Costs.latency;
2162       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2163                 UnblockThread,
2164                 tso, node, (rtsSpark*)NULL);
2165       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2166       // len++;
2167     }
2168     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2169     IF_GRAN_DEBUG(bq,
2170                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2171                           (node_loc==tso_loc ? "Local" : "Global"), 
2172                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2173     tso->block_info.closure = NULL;
2174     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2175                              tso->id, tso));
2176 }
2177 #elif defined(PAR)
2178 static StgBlockingQueueElement *
2179 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2180 {
2181     StgBlockingQueueElement *next;
2182
2183     switch (get_itbl(bqe)->type) {
2184     case TSO:
2185       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2186       /* if it's a TSO just push it onto the run_queue */
2187       next = bqe->link;
2188       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2189       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2190       THREAD_RUNNABLE();
2191       unblockCount(bqe, node);
2192       /* reset blocking status after dumping event */
2193       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2194       break;
2195
2196     case BLOCKED_FETCH:
2197       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2198       next = bqe->link;
2199       bqe->link = PendingFetches;
2200       PendingFetches = bqe;
2201       break;
2202
2203 # if defined(DEBUG)
2204       /* can ignore this case in a non-debugging setup; 
2205          see comments on RBHSave closures above */
2206     case CONSTR:
2207       /* check that the closure is an RBHSave closure */
2208       ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2209              get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2210              get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2211       break;
2212
2213     default:
2214       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2215            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2216            (StgClosure *)bqe);
2217 # endif
2218     }
2219   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2220   return next;
2221 }
2222
2223 #else /* !GRAN && !PAR */
2224 static StgTSO *
2225 unblockOneLocked(StgTSO *tso)
2226 {
2227   StgTSO *next;
2228
2229   ASSERT(get_itbl(tso)->type == TSO);
2230   ASSERT(tso->why_blocked != NotBlocked);
2231   tso->why_blocked = NotBlocked;
2232   next = tso->link;
2233   PUSH_ON_RUN_QUEUE(tso);
2234   THREAD_RUNNABLE();
2235   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2236   return next;
2237 }
2238 #endif
2239
2240 #if defined(GRAN) || defined(PAR)
2241 inline StgBlockingQueueElement *
2242 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2243 {
2244   ACQUIRE_LOCK(&sched_mutex);
2245   bqe = unblockOneLocked(bqe, node);
2246   RELEASE_LOCK(&sched_mutex);
2247   return bqe;
2248 }
2249 #else
2250 inline StgTSO *
2251 unblockOne(StgTSO *tso)
2252 {
2253   ACQUIRE_LOCK(&sched_mutex);
2254   tso = unblockOneLocked(tso);
2255   RELEASE_LOCK(&sched_mutex);
2256   return tso;
2257 }
2258 #endif
2259
2260 #if defined(GRAN)
2261 void 
2262 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2263 {
2264   StgBlockingQueueElement *bqe;
2265   PEs node_loc;
2266   nat len = 0; 
2267
2268   IF_GRAN_DEBUG(bq, 
2269                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2270                       node, CurrentProc, CurrentTime[CurrentProc], 
2271                       CurrentTSO->id, CurrentTSO));
2272
2273   node_loc = where_is(node);
2274
2275   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2276          get_itbl(q)->type == CONSTR); // closure (type constructor)
2277   ASSERT(is_unique(node));
2278
2279   /* FAKE FETCH: magically copy the node to the tso's proc;
2280      no Fetch necessary because in reality the node should not have been 
2281      moved to the other PE in the first place
2282   */
2283   if (CurrentProc!=node_loc) {
2284     IF_GRAN_DEBUG(bq, 
2285                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2286                         node, node_loc, CurrentProc, CurrentTSO->id, 
2287                         // CurrentTSO, where_is(CurrentTSO),
2288                         node->header.gran.procs));
2289     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2290     IF_GRAN_DEBUG(bq, 
2291                   belch("## new bitmask of node %p is %#x",
2292                         node, node->header.gran.procs));
2293     if (RtsFlags.GranFlags.GranSimStats.Global) {
2294       globalGranStats.tot_fake_fetches++;
2295     }
2296   }
2297
2298   bqe = q;
2299   // ToDo: check: ASSERT(CurrentProc==node_loc);
2300   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2301     //next = bqe->link;
2302     /* 
2303        bqe points to the current element in the queue
2304        next points to the next element in the queue
2305     */
2306     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2307     //tso_loc = where_is(tso);
2308     len++;
2309     bqe = unblockOneLocked(bqe, node);
2310   }
2311
2312   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2313      the closure to make room for the anchor of the BQ */
2314   if (bqe!=END_BQ_QUEUE) {
2315     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2316     /*
2317     ASSERT((info_ptr==&RBH_Save_0_info) ||
2318            (info_ptr==&RBH_Save_1_info) ||
2319            (info_ptr==&RBH_Save_2_info));
2320     */
2321     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2322     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2323     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2324
2325     IF_GRAN_DEBUG(bq,
2326                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2327                         node, info_type(node)));
2328   }
2329
2330   /* statistics gathering */
2331   if (RtsFlags.GranFlags.GranSimStats.Global) {
2332     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2333     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2334     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2335     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2336   }
2337   IF_GRAN_DEBUG(bq,
2338                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2339                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2340 }
2341 #elif defined(PAR)
2342 void 
2343 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2344 {
2345   StgBlockingQueueElement *bqe, *next;
2346
2347   ACQUIRE_LOCK(&sched_mutex);
2348
2349   IF_PAR_DEBUG(verbose, 
2350                belch("## AwBQ for node %p on [%x]: ",
2351                      node, mytid));
2352
2353   ASSERT(get_itbl(q)->type == TSO ||           
2354          get_itbl(q)->type == BLOCKED_FETCH || 
2355          get_itbl(q)->type == CONSTR); 
2356
2357   bqe = q;
2358   while (get_itbl(bqe)->type==TSO || 
2359          get_itbl(bqe)->type==BLOCKED_FETCH) {
2360     bqe = unblockOneLocked(bqe, node);
2361   }
2362   RELEASE_LOCK(&sched_mutex);
2363 }
2364
2365 #else   /* !GRAN && !PAR */
2366 void
2367 awakenBlockedQueue(StgTSO *tso)
2368 {
2369   ACQUIRE_LOCK(&sched_mutex);
2370   while (tso != END_TSO_QUEUE) {
2371     tso = unblockOneLocked(tso);
2372   }
2373   RELEASE_LOCK(&sched_mutex);
2374 }
2375 #endif
2376
2377 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2378 //@subsection Exception Handling Routines
2379
2380 /* ---------------------------------------------------------------------------
2381    Interrupt execution
2382    - usually called inside a signal handler so it mustn't do anything fancy.   
2383    ------------------------------------------------------------------------ */
2384
2385 void
2386 interruptStgRts(void)
2387 {
2388     interrupted    = 1;
2389     context_switch = 1;
2390 }
2391
2392 /* -----------------------------------------------------------------------------
2393    Unblock a thread
2394
2395    This is for use when we raise an exception in another thread, which
2396    may be blocked.
2397    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2398    -------------------------------------------------------------------------- */
2399
2400 #if defined(GRAN) || defined(PAR)
2401 /*
2402   NB: only the type of the blocking queue is different in GranSim and GUM
2403       the operations on the queue-elements are the same
2404       long live polymorphism!
2405 */
2406 static void
2407 unblockThread(StgTSO *tso)
2408 {
2409   StgBlockingQueueElement *t, **last;
2410
2411   ACQUIRE_LOCK(&sched_mutex);
2412   switch (tso->why_blocked) {
2413
2414   case NotBlocked:
2415     return;  /* not blocked */
2416
2417   case BlockedOnMVar:
2418     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2419     {
2420       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2421       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2422
2423       last = (StgBlockingQueueElement **)&mvar->head;
2424       for (t = (StgBlockingQueueElement *)mvar->head; 
2425            t != END_BQ_QUEUE; 
2426            last = &t->link, last_tso = t, t = t->link) {
2427         if (t == (StgBlockingQueueElement *)tso) {
2428           *last = (StgBlockingQueueElement *)tso->link;
2429           if (mvar->tail == tso) {
2430             mvar->tail = (StgTSO *)last_tso;
2431           }
2432           goto done;
2433         }
2434       }
2435       barf("unblockThread (MVAR): TSO not found");
2436     }
2437
2438   case BlockedOnBlackHole:
2439     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2440     {
2441       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2442
2443       last = &bq->blocking_queue;
2444       for (t = bq->blocking_queue; 
2445            t != END_BQ_QUEUE; 
2446            last = &t->link, t = t->link) {
2447         if (t == (StgBlockingQueueElement *)tso) {
2448           *last = (StgBlockingQueueElement *)tso->link;
2449           goto done;
2450         }
2451       }
2452       barf("unblockThread (BLACKHOLE): TSO not found");
2453     }
2454
2455   case BlockedOnException:
2456     {
2457       StgTSO *target  = tso->block_info.tso;
2458
2459       ASSERT(get_itbl(target)->type == TSO);
2460       ASSERT(target->blocked_exceptions != NULL);
2461
2462       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2463       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2464            t != END_BQ_QUEUE; 
2465            last = &t->link, t = t->link) {
2466         ASSERT(get_itbl(t)->type == TSO);
2467         if (t == (StgBlockingQueueElement *)tso) {
2468           *last = (StgBlockingQueueElement *)tso->link;
2469           goto done;
2470         }
2471       }
2472       barf("unblockThread (Exception): TSO not found");
2473     }
2474
2475   case BlockedOnDelay:
2476   case BlockedOnRead:
2477   case BlockedOnWrite:
2478     {
2479       StgBlockingQueueElement *prev = NULL;
2480       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2481            prev = t, t = t->link) {
2482         if (t == (StgBlockingQueueElement *)tso) {
2483           if (prev == NULL) {
2484             blocked_queue_hd = (StgTSO *)t->link;
2485             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2486               blocked_queue_tl = END_TSO_QUEUE;
2487             }
2488           } else {
2489             prev->link = t->link;
2490             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2491               blocked_queue_tl = (StgTSO *)prev;
2492             }
2493           }
2494           goto done;
2495         }
2496       }
2497       barf("unblockThread (I/O): TSO not found");
2498     }
2499
2500   default:
2501     barf("unblockThread");
2502   }
2503
2504  done:
2505   tso->link = END_TSO_QUEUE;
2506   tso->why_blocked = NotBlocked;
2507   tso->block_info.closure = NULL;
2508   PUSH_ON_RUN_QUEUE(tso);
2509   RELEASE_LOCK(&sched_mutex);
2510 }
2511 #else
2512 static void
2513 unblockThread(StgTSO *tso)
2514 {
2515   StgTSO *t, **last;
2516
2517   ACQUIRE_LOCK(&sched_mutex);
2518   switch (tso->why_blocked) {
2519
2520   case NotBlocked:
2521     return;  /* not blocked */
2522
2523   case BlockedOnMVar:
2524     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2525     {
2526       StgTSO *last_tso = END_TSO_QUEUE;
2527       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2528
2529       last = &mvar->head;
2530       for (t = mvar->head; t != END_TSO_QUEUE; 
2531            last = &t->link, last_tso = t, t = t->link) {
2532         if (t == tso) {
2533           *last = tso->link;
2534           if (mvar->tail == tso) {
2535             mvar->tail = last_tso;
2536           }
2537           goto done;
2538         }
2539       }
2540       barf("unblockThread (MVAR): TSO not found");
2541     }
2542
2543   case BlockedOnBlackHole:
2544     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2545     {
2546       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2547
2548       last = &bq->blocking_queue;
2549       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2550            last = &t->link, t = t->link) {
2551         if (t == tso) {
2552           *last = tso->link;
2553           goto done;
2554         }
2555       }
2556       barf("unblockThread (BLACKHOLE): TSO not found");
2557     }
2558
2559   case BlockedOnException:
2560     {
2561       StgTSO *target  = tso->block_info.tso;
2562
2563       ASSERT(get_itbl(target)->type == TSO);
2564       ASSERT(target->blocked_exceptions != NULL);
2565
2566       last = &target->blocked_exceptions;
2567       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2568            last = &t->link, t = t->link) {
2569         ASSERT(get_itbl(t)->type == TSO);
2570         if (t == tso) {
2571           *last = tso->link;
2572           goto done;
2573         }
2574       }
2575       barf("unblockThread (Exception): TSO not found");
2576     }
2577
2578   case BlockedOnDelay:
2579   case BlockedOnRead:
2580   case BlockedOnWrite:
2581     {
2582       StgTSO *prev = NULL;
2583       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2584            prev = t, t = t->link) {
2585         if (t == tso) {
2586           if (prev == NULL) {
2587             blocked_queue_hd = t->link;
2588             if (blocked_queue_tl == t) {
2589               blocked_queue_tl = END_TSO_QUEUE;
2590             }
2591           } else {
2592             prev->link = t->link;
2593             if (blocked_queue_tl == t) {
2594               blocked_queue_tl = prev;
2595             }
2596           }
2597           goto done;
2598         }
2599       }
2600       barf("unblockThread (I/O): TSO not found");
2601     }
2602
2603   default:
2604     barf("unblockThread");
2605   }
2606
2607  done:
2608   tso->link = END_TSO_QUEUE;
2609   tso->why_blocked = NotBlocked;
2610   tso->block_info.closure = NULL;
2611   PUSH_ON_RUN_QUEUE(tso);
2612   RELEASE_LOCK(&sched_mutex);
2613 }
2614 #endif
2615
2616 /* -----------------------------------------------------------------------------
2617  * raiseAsync()
2618  *
2619  * The following function implements the magic for raising an
2620  * asynchronous exception in an existing thread.
2621  *
2622  * We first remove the thread from any queue on which it might be
2623  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2624  *
2625  * We strip the stack down to the innermost CATCH_FRAME, building
2626  * thunks in the heap for all the active computations, so they can 
2627  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2628  * an application of the handler to the exception, and push it on
2629  * the top of the stack.
2630  * 
2631  * How exactly do we save all the active computations?  We create an
2632  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2633  * AP_UPDs pushes everything from the corresponding update frame
2634  * upwards onto the stack.  (Actually, it pushes everything up to the
2635  * next update frame plus a pointer to the next AP_UPD object.
2636  * Entering the next AP_UPD object pushes more onto the stack until we
2637  * reach the last AP_UPD object - at which point the stack should look
2638  * exactly as it did when we killed the TSO and we can continue
2639  * execution by entering the closure on top of the stack.
2640  *
2641  * We can also kill a thread entirely - this happens if either (a) the 
2642  * exception passed to raiseAsync is NULL, or (b) there's no
2643  * CATCH_FRAME on the stack.  In either case, we strip the entire
2644  * stack and replace the thread with a zombie.
2645  *
2646  * -------------------------------------------------------------------------- */
2647  
2648 void 
2649 deleteThread(StgTSO *tso)
2650 {
2651   raiseAsync(tso,NULL);
2652 }
2653
2654 void
2655 raiseAsync(StgTSO *tso, StgClosure *exception)
2656 {
2657   StgUpdateFrame* su = tso->su;
2658   StgPtr          sp = tso->sp;
2659   
2660   /* Thread already dead? */
2661   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2662     return;
2663   }
2664
2665   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2666
2667   /* Remove it from any blocking queues */
2668   unblockThread(tso);
2669
2670   /* The stack freezing code assumes there's a closure pointer on
2671    * the top of the stack.  This isn't always the case with compiled
2672    * code, so we have to push a dummy closure on the top which just
2673    * returns to the next return address on the stack.
2674    */
2675   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2676     *(--sp) = (W_)&dummy_ret_closure;
2677   }
2678
2679   while (1) {
2680     int words = ((P_)su - (P_)sp) - 1;
2681     nat i;
2682     StgAP_UPD * ap;
2683
2684     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2685      * then build PAP(handler,exception,realworld#), and leave it on
2686      * top of the stack ready to enter.
2687      */
2688     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2689       StgCatchFrame *cf = (StgCatchFrame *)su;
2690       /* we've got an exception to raise, so let's pass it to the
2691        * handler in this frame.
2692        */
2693       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2694       TICK_ALLOC_UPD_PAP(3,0);
2695       SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2696               
2697       ap->n_args = 2;
2698       ap->fun = cf->handler;    /* :: Exception -> IO a */
2699       ap->payload[0] = (P_)exception;
2700       ap->payload[1] = ARG_TAG(0); /* realworld token */
2701
2702       /* throw away the stack from Sp up to and including the
2703        * CATCH_FRAME.
2704        */
2705       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2706       tso->su = cf->link;
2707
2708       /* Restore the blocked/unblocked state for asynchronous exceptions
2709        * at the CATCH_FRAME.  
2710        *
2711        * If exceptions were unblocked at the catch, arrange that they
2712        * are unblocked again after executing the handler by pushing an
2713        * unblockAsyncExceptions_ret stack frame.
2714        */
2715       if (!cf->exceptions_blocked) {
2716         *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2717       }
2718       
2719       /* Ensure that async exceptions are blocked when running the handler.
2720        */
2721       if (tso->blocked_exceptions == NULL) {
2722         tso->blocked_exceptions = END_TSO_QUEUE;
2723       }
2724       
2725       /* Put the newly-built PAP on top of the stack, ready to execute
2726        * when the thread restarts.
2727        */
2728       sp[0] = (W_)ap;
2729       tso->sp = sp;
2730       tso->what_next = ThreadEnterGHC;
2731       return;
2732     }
2733
2734     /* First build an AP_UPD consisting of the stack chunk above the
2735      * current update frame, with the top word on the stack as the
2736      * fun field.
2737      */
2738     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2739     
2740     ASSERT(words >= 0);
2741     
2742     ap->n_args = words;
2743     ap->fun    = (StgClosure *)sp[0];
2744     sp++;
2745     for(i=0; i < (nat)words; ++i) {
2746       ap->payload[i] = (P_)*sp++;
2747     }
2748     
2749     switch (get_itbl(su)->type) {
2750       
2751     case UPDATE_FRAME:
2752       {
2753         SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2754         TICK_ALLOC_UP_THK(words+1,0);
2755         
2756         IF_DEBUG(scheduler,
2757                  fprintf(stderr,  "scheduler: Updating ");
2758                  printPtr((P_)su->updatee); 
2759                  fprintf(stderr,  " with ");
2760                  printObj((StgClosure *)ap);
2761                  );
2762         
2763         /* Replace the updatee with an indirection - happily
2764          * this will also wake up any threads currently
2765          * waiting on the result.
2766          */
2767         UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2768         su = su->link;
2769         sp += sizeofW(StgUpdateFrame) -1;
2770         sp[0] = (W_)ap; /* push onto stack */
2771         break;
2772       }
2773       
2774     case CATCH_FRAME:
2775       {
2776         StgCatchFrame *cf = (StgCatchFrame *)su;
2777         StgClosure* o;
2778         
2779         /* We want a PAP, not an AP_UPD.  Fortunately, the
2780          * layout's the same.
2781          */
2782         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2783         TICK_ALLOC_UPD_PAP(words+1,0);
2784         
2785         /* now build o = FUN(catch,ap,handler) */
2786         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2787         TICK_ALLOC_FUN(2,0);
2788         SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2789         o->payload[0] = (StgClosure *)ap;
2790         o->payload[1] = cf->handler;
2791         
2792         IF_DEBUG(scheduler,
2793                  fprintf(stderr,  "scheduler: Built ");
2794                  printObj((StgClosure *)o);
2795                  );
2796         
2797         /* pop the old handler and put o on the stack */
2798         su = cf->link;
2799         sp += sizeofW(StgCatchFrame) - 1;
2800         sp[0] = (W_)o;
2801         break;
2802       }
2803       
2804     case SEQ_FRAME:
2805       {
2806         StgSeqFrame *sf = (StgSeqFrame *)su;
2807         StgClosure* o;
2808         
2809         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2810         TICK_ALLOC_UPD_PAP(words+1,0);
2811         
2812         /* now build o = FUN(seq,ap) */
2813         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2814         TICK_ALLOC_SE_THK(1,0);
2815         SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2816         o->payload[0] = (StgClosure *)ap;
2817         
2818         IF_DEBUG(scheduler,
2819                  fprintf(stderr,  "scheduler: Built ");
2820                  printObj((StgClosure *)o);
2821                  );
2822         
2823         /* pop the old handler and put o on the stack */
2824         su = sf->link;
2825         sp += sizeofW(StgSeqFrame) - 1;
2826         sp[0] = (W_)o;
2827         break;
2828       }
2829       
2830     case STOP_FRAME:
2831       /* We've stripped the entire stack, the thread is now dead. */
2832       sp += sizeofW(StgStopFrame) - 1;
2833       sp[0] = (W_)exception;    /* save the exception */
2834       tso->what_next = ThreadKilled;
2835       tso->su = (StgUpdateFrame *)(sp+1);
2836       tso->sp = sp;
2837       return;
2838       
2839     default:
2840       barf("raiseAsync");
2841     }
2842   }
2843   barf("raiseAsync");
2844 }
2845
2846 /* -----------------------------------------------------------------------------
2847    resurrectThreads is called after garbage collection on the list of
2848    threads found to be garbage.  Each of these threads will be woken
2849    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2850    on an MVar, or NonTermination if the thread was blocked on a Black
2851    Hole.
2852    -------------------------------------------------------------------------- */
2853
2854 void
2855 resurrectThreads( StgTSO *threads )
2856 {
2857   StgTSO *tso, *next;
2858
2859   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2860     next = tso->global_link;
2861     tso->global_link = all_threads;
2862     all_threads = tso;
2863     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2864
2865     switch (tso->why_blocked) {
2866     case BlockedOnMVar:
2867     case BlockedOnException:
2868       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2869       break;
2870     case BlockedOnBlackHole:
2871       raiseAsync(tso,(StgClosure *)NonTermination_closure);
2872       break;
2873     case NotBlocked:
2874       /* This might happen if the thread was blocked on a black hole
2875        * belonging to a thread that we've just woken up (raiseAsync
2876        * can wake up threads, remember...).
2877        */
2878       continue;
2879     default:
2880       barf("resurrectThreads: thread blocked in a strange way");
2881     }
2882   }
2883 }
2884
2885 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2886 //@subsection Debugging Routines
2887
2888 /* -----------------------------------------------------------------------------
2889    Debugging: why is a thread blocked
2890    -------------------------------------------------------------------------- */
2891
2892 #ifdef DEBUG
2893
2894 void
2895 printThreadBlockage(StgTSO *tso)
2896 {
2897   switch (tso->why_blocked) {
2898   case BlockedOnRead:
2899     fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2900     break;
2901   case BlockedOnWrite:
2902     fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2903     break;
2904   case BlockedOnDelay:
2905 #if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
2906     fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2907 #else
2908     fprintf(stderr,"blocked on delay of %d ms", 
2909             tso->block_info.target - getourtimeofday());
2910 #endif
2911     break;
2912   case BlockedOnMVar:
2913     fprintf(stderr,"blocked on an MVar");
2914     break;
2915   case BlockedOnException:
2916     fprintf(stderr,"blocked on delivering an exception to thread %d",
2917             tso->block_info.tso->id);
2918     break;
2919   case BlockedOnBlackHole:
2920     fprintf(stderr,"blocked on a black hole");
2921     break;
2922   case NotBlocked:
2923     fprintf(stderr,"not blocked");
2924     break;
2925 #if defined(PAR)
2926   case BlockedOnGA:
2927     fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)",
2928             tso->block_info.closure, info_type(tso->block_info.closure));
2929     break;
2930   case BlockedOnGA_NoSend:
2931     fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)",
2932             tso->block_info.closure, info_type(tso->block_info.closure));
2933     break;
2934 #endif
2935   default:
2936     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
2937          tso->why_blocked, tso->id, tso);
2938   }
2939 }
2940
2941 void
2942 printThreadStatus(StgTSO *tso)
2943 {
2944   switch (tso->what_next) {
2945   case ThreadKilled:
2946     fprintf(stderr,"has been killed");
2947     break;
2948   case ThreadComplete:
2949     fprintf(stderr,"has completed");
2950     break;
2951   default:
2952     printThreadBlockage(tso);
2953   }
2954 }
2955
2956 void
2957 printAllThreads(void)
2958 {
2959   StgTSO *t;
2960
2961   sched_belch("all threads:");
2962   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2963     fprintf(stderr, "\tthread %d is ", t->id);
2964     printThreadStatus(t);
2965     fprintf(stderr,"\n");
2966   }
2967 }
2968     
2969 /* 
2970    Print a whole blocking queue attached to node (debugging only).
2971 */
2972 //@cindex print_bq
2973 # if defined(PAR)
2974 void 
2975 print_bq (StgClosure *node)
2976 {
2977   StgBlockingQueueElement *bqe;
2978   StgTSO *tso;
2979   rtsBool end;
2980
2981   fprintf(stderr,"## BQ of closure %p (%s): ",
2982           node, info_type(node));
2983
2984   /* should cover all closures that may have a blocking queue */
2985   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2986          get_itbl(node)->type == FETCH_ME_BQ ||
2987          get_itbl(node)->type == RBH);
2988     
2989   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2990   /* 
2991      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2992   */
2993   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2994        !end; // iterate until bqe points to a CONSTR
2995        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2996     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2997     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2998     /* types of closures that may appear in a blocking queue */
2999     ASSERT(get_itbl(bqe)->type == TSO ||           
3000            get_itbl(bqe)->type == BLOCKED_FETCH || 
3001            get_itbl(bqe)->type == CONSTR); 
3002     /* only BQs of an RBH end with an RBH_Save closure */
3003     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3004
3005     switch (get_itbl(bqe)->type) {
3006     case TSO:
3007       fprintf(stderr," TSO %d (%x),",
3008               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3009       break;
3010     case BLOCKED_FETCH:
3011       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3012               ((StgBlockedFetch *)bqe)->node, 
3013               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3014               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3015               ((StgBlockedFetch *)bqe)->ga.weight);
3016       break;
3017     case CONSTR:
3018       fprintf(stderr," %s (IP %p),",
3019               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3020                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3021                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3022                "RBH_Save_?"), get_itbl(bqe));
3023       break;
3024     default:
3025       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3026            info_type(bqe), node, info_type(node));
3027       break;
3028     }
3029   } /* for */
3030   fputc('\n', stderr);
3031 }
3032 # elif defined(GRAN)
3033 void 
3034 print_bq (StgClosure *node)
3035 {
3036   StgBlockingQueueElement *bqe;
3037   PEs node_loc, tso_loc;
3038   rtsBool end;
3039
3040   /* should cover all closures that may have a blocking queue */
3041   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3042          get_itbl(node)->type == FETCH_ME_BQ ||
3043          get_itbl(node)->type == RBH);
3044     
3045   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3046   node_loc = where_is(node);
3047
3048   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3049           node, info_type(node), node_loc);
3050
3051   /* 
3052      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3053   */
3054   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3055        !end; // iterate until bqe points to a CONSTR
3056        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3057     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3058     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3059     /* types of closures that may appear in a blocking queue */
3060     ASSERT(get_itbl(bqe)->type == TSO ||           
3061            get_itbl(bqe)->type == CONSTR); 
3062     /* only BQs of an RBH end with an RBH_Save closure */
3063     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3064
3065     tso_loc = where_is((StgClosure *)bqe);
3066     switch (get_itbl(bqe)->type) {
3067     case TSO:
3068       fprintf(stderr," TSO %d (%p) on [PE %d],",
3069               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3070       break;
3071     case CONSTR:
3072       fprintf(stderr," %s (IP %p),",
3073               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3074                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3075                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3076                "RBH_Save_?"), get_itbl(bqe));
3077       break;
3078     default:
3079       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3080            info_type((StgClosure *)bqe), node, info_type(node));
3081       break;
3082     }
3083   } /* for */
3084   fputc('\n', stderr);
3085 }
3086 #else
3087 /* 
3088    Nice and easy: only TSOs on the blocking queue
3089 */
3090 void 
3091 print_bq (StgClosure *node)
3092 {
3093   StgTSO *tso;
3094
3095   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3096   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3097        tso != END_TSO_QUEUE; 
3098        tso=tso->link) {
3099     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3100     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3101     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3102   }
3103   fputc('\n', stderr);
3104 }
3105 # endif
3106
3107 #if defined(PAR)
3108 static nat
3109 run_queue_len(void)
3110 {
3111   nat i;
3112   StgTSO *tso;
3113
3114   for (i=0, tso=run_queue_hd; 
3115        tso != END_TSO_QUEUE;
3116        i++, tso=tso->link)
3117     /* nothing */
3118
3119   return i;
3120 }
3121 #endif
3122
3123 static void
3124 sched_belch(char *s, ...)
3125 {
3126   va_list ap;
3127   va_start(ap,s);
3128 #ifdef SMP
3129   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3130 #else
3131   fprintf(stderr, "scheduler: ");
3132 #endif
3133   vfprintf(stderr, s, ap);
3134   fprintf(stderr, "\n");
3135 }
3136
3137 #endif /* DEBUG */
3138
3139
3140 //@node Index,  , Debugging Routines, Main scheduling code
3141 //@subsection Index
3142
3143 //@index
3144 //* MainRegTable::  @cindex\s-+MainRegTable
3145 //* StgMainThread::  @cindex\s-+StgMainThread
3146 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3147 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3148 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3149 //* context_switch::  @cindex\s-+context_switch
3150 //* createThread::  @cindex\s-+createThread
3151 //* free_capabilities::  @cindex\s-+free_capabilities
3152 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3153 //* initScheduler::  @cindex\s-+initScheduler
3154 //* interrupted::  @cindex\s-+interrupted
3155 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3156 //* next_thread_id::  @cindex\s-+next_thread_id
3157 //* print_bq::  @cindex\s-+print_bq
3158 //* run_queue_hd::  @cindex\s-+run_queue_hd
3159 //* run_queue_tl::  @cindex\s-+run_queue_tl
3160 //* sched_mutex::  @cindex\s-+sched_mutex
3161 //* schedule::  @cindex\s-+schedule
3162 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3163 //* task_ids::  @cindex\s-+task_ids
3164 //* term_mutex::  @cindex\s-+term_mutex
3165 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3166 //@end index