[project @ 2000-04-03 15:52:53 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.62 2000/04/03 15:52:53 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Main scheduling loop::      
41 //* Suspend and Resume::        
42 //* Run queue code::            
43 //* Garbage Collextion Routines::  
44 //* Blocking Queue Routines::   
45 //* Exception Handling Routines::  
46 //* Debugging Routines::        
47 //* Index::                     
48 //@end menu
49
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
52
53 #include "Rts.h"
54 #include "SchedAPI.h"
55 #include "RtsUtils.h"
56 #include "RtsFlags.h"
57 #include "Storage.h"
58 #include "StgRun.h"
59 #include "StgStartup.h"
60 #include "GC.h"
61 #include "Hooks.h"
62 #include "Schedule.h"
63 #include "StgMiscClosures.h"
64 #include "Storage.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
67 #include "Printer.h"
68 #include "Main.h"
69 #include "Signals.h"
70 #include "Sanity.h"
71 #include "Stats.h"
72 #include "Itimer.h"
73 #include "Prelude.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
76 # include "GranSim.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
80 # include "FetchMe.h"
81 # include "HLC.h"
82 #endif
83 #include "Sparks.h"
84
85 #include <stdarg.h>
86
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
89
90 /* Main threads:
91  *
92  * These are the threads which clients have requested that we run.  
93  *
94  * In an SMP build, we might have several concurrent clients all
95  * waiting for results, and each one will wait on a condition variable
96  * until the result is available.
97  *
98  * In non-SMP, clients are strictly nested: the first client calls
99  * into the RTS, which might call out again to C with a _ccall_GC, and
100  * eventually re-enter the RTS.
101  *
102  * Main threads information is kept in a linked list:
103  */
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
106   StgTSO *         tso;
107   SchedulerStatus  stat;
108   StgClosure **    ret;
109 #ifdef SMP
110   pthread_cond_t wakeup;
111 #endif
112   struct StgMainThread_ *link;
113 } StgMainThread;
114
115 /* Main thread queue.
116  * Locks required: sched_mutex.
117  */
118 static StgMainThread *main_threads;
119
120 /* Thread queues.
121  * Locks required: sched_mutex.
122  */
123 #if defined(GRAN)
124
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
127
128 /* 
129    In GranSim we have a runable and a blocked queue for each processor.
130    In order to minimise code changes new arrays run_queue_hds/tls
131    are created. run_queue_hd is then a short cut (macro) for
132    run_queue_hds[CurrentProc] (see GranSim.h).
133    -- HWL
134 */
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139    the std RTS (i.e. we are cheating). However, we don't use this list in
140    the GranSim specific code at the moment (so we are only potentially
141    cheating).  */
142
143 #else /* !GRAN */
144
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
147
148 #endif
149
150 /* Linked list of all threads.
151  * Used for detecting garbage collected threads.
152  */
153 StgTSO *all_threads;
154
155 /* Threads suspended in _ccall_GC.
156  */
157 static StgTSO *suspended_ccalling_threads;
158
159 static void GetRoots(void);
160 static StgTSO *threadStackOverflow(StgTSO *tso);
161
162 /* KH: The following two flags are shared memory locations.  There is no need
163        to lock them, since they are only unset at the end of a scheduler
164        operation.
165 */
166
167 /* flag set by signal handler to precipitate a context switch */
168 //@cindex context_switch
169 nat context_switch;
170
171 /* if this flag is set as well, give up execution */
172 //@cindex interrupted
173 rtsBool interrupted;
174
175 /* Next thread ID to allocate.
176  * Locks required: sched_mutex
177  */
178 //@cindex next_thread_id
179 StgThreadID next_thread_id = 1;
180
181 /*
182  * Pointers to the state of the current thread.
183  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
185  */
186  
187 /* The smallest stack size that makes any sense is:
188  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
189  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
190  *  + 1                       (the realworld token for an IO thread)
191  *  + 1                       (the closure to enter)
192  *
193  * A thread with this stack will bomb immediately with a stack
194  * overflow, which will increase its stack size.  
195  */
196
197 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
198
199 /* Free capability list.
200  * Locks required: sched_mutex.
201  */
202 #ifdef SMP
203 //@cindex free_capabilities
204 //@cindex n_free_capabilities
205 Capability *free_capabilities; /* Available capabilities for running threads */
206 nat n_free_capabilities;       /* total number of available capabilities */
207 #else
208 //@cindex MainRegTable
209 Capability MainRegTable;       /* for non-SMP, we have one global capability */
210 #endif
211
212 #if defined(GRAN)
213 StgTSO *CurrentTSO;
214 #endif
215
216 rtsBool ready_to_gc;
217
218 /* All our current task ids, saved in case we need to kill them later.
219  */
220 #ifdef SMP
221 //@cindex task_ids
222 task_info *task_ids;
223 #endif
224
225 void            addToBlockedQueue ( StgTSO *tso );
226
227 static void     schedule          ( void );
228        void     interruptStgRts   ( void );
229 #if defined(GRAN)
230 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
231 #else
232 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
233 #endif
234
235 #ifdef DEBUG
236 static void sched_belch(char *s, ...);
237 #endif
238
239 #ifdef SMP
240 //@cindex sched_mutex
241 //@cindex term_mutex
242 //@cindex thread_ready_cond
243 //@cindex gc_pending_cond
244 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
245 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
246 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
247 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
248
249 nat await_death;
250 #endif
251
252 #if defined(PAR)
253 StgTSO *LastTSO;
254 rtsTime TimeOfLastYield;
255 #endif
256
257 #if DEBUG
258 char *whatNext_strs[] = {
259   "ThreadEnterGHC",
260   "ThreadRunGHC",
261   "ThreadEnterHugs",
262   "ThreadKilled",
263   "ThreadComplete"
264 };
265
266 char *threadReturnCode_strs[] = {
267   "HeapOverflow",                       /* might also be StackOverflow */
268   "StackOverflow",
269   "ThreadYielding",
270   "ThreadBlocked",
271   "ThreadFinished"
272 };
273 #endif
274
275 /*
276  * The thread state for the main thread.
277 // ToDo: check whether not needed any more
278 StgTSO   *MainTSO;
279  */
280
281 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
282 //@subsection Main scheduling loop
283
284 /* ---------------------------------------------------------------------------
285    Main scheduling loop.
286
287    We use round-robin scheduling, each thread returning to the
288    scheduler loop when one of these conditions is detected:
289
290       * out of heap space
291       * timer expires (thread yields)
292       * thread blocks
293       * thread ends
294       * stack overflow
295
296    Locking notes:  we acquire the scheduler lock once at the beginning
297    of the scheduler loop, and release it when
298     
299       * running a thread, or
300       * waiting for work, or
301       * waiting for a GC to complete.
302
303    GRAN version:
304      In a GranSim setup this loop iterates over the global event queue.
305      This revolves around the global event queue, which determines what 
306      to do next. Therefore, it's more complicated than either the 
307      concurrent or the parallel (GUM) setup.
308
309    GUM version:
310      GUM iterates over incoming messages.
311      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312      and sends out a fish whenever it has nothing to do; in-between
313      doing the actual reductions (shared code below) it processes the
314      incoming messages and deals with delayed operations 
315      (see PendingFetches).
316      This is not the ugliest code you could imagine, but it's bloody close.
317
318    ------------------------------------------------------------------------ */
319 //@cindex schedule
320 static void
321 schedule( void )
322 {
323   StgTSO *t;
324   Capability *cap;
325   StgThreadReturnCode ret;
326 #if defined(GRAN)
327   rtsEvent *event;
328 #elif defined(PAR)
329   StgSparkPool *pool;
330   rtsSpark spark;
331   StgTSO *tso;
332   GlobalTaskId pe;
333 #endif
334   rtsBool was_interrupted = rtsFalse;
335   
336   ACQUIRE_LOCK(&sched_mutex);
337
338 #if defined(GRAN)
339
340   /* set up first event to get things going */
341   /* ToDo: assign costs for system setup and init MainTSO ! */
342   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
343             ContinueThread, 
344             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
345
346   IF_DEBUG(gran,
347            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
348            G_TSO(CurrentTSO, 5));
349
350   if (RtsFlags.GranFlags.Light) {
351     /* Save current time; GranSim Light only */
352     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
353   }      
354
355   event = get_next_event();
356
357   while (event!=(rtsEvent*)NULL) {
358     /* Choose the processor with the next event */
359     CurrentProc = event->proc;
360     CurrentTSO = event->tso;
361
362 #elif defined(PAR)
363
364   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
365
366 #else
367
368   while (1) {
369
370 #endif
371
372     IF_DEBUG(scheduler, printAllThreads());
373
374     /* If we're interrupted (the user pressed ^C, or some other
375      * termination condition occurred), kill all the currently running
376      * threads.
377      */
378     if (interrupted) {
379       IF_DEBUG(scheduler, sched_belch("interrupted"));
380       for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
381         deleteThread(t);
382       }
383       for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
384         deleteThread(t);
385       }
386       run_queue_hd = run_queue_tl = END_TSO_QUEUE;
387       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
388       interrupted = rtsFalse;
389       was_interrupted = rtsTrue;
390     }
391
392     /* Go through the list of main threads and wake up any
393      * clients whose computations have finished.  ToDo: this
394      * should be done more efficiently without a linear scan
395      * of the main threads list, somehow...
396      */
397 #ifdef SMP
398     { 
399       StgMainThread *m, **prev;
400       prev = &main_threads;
401       for (m = main_threads; m != NULL; m = m->link) {
402         switch (m->tso->what_next) {
403         case ThreadComplete:
404           if (m->ret) {
405             *(m->ret) = (StgClosure *)m->tso->sp[0];
406           }
407           *prev = m->link;
408           m->stat = Success;
409           pthread_cond_broadcast(&m->wakeup);
410           break;
411         case ThreadKilled:
412           *prev = m->link;
413           if (was_interrupted) {
414             m->stat = Interrupted;
415           } else {
416             m->stat = Killed;
417           }
418           pthread_cond_broadcast(&m->wakeup);
419           break;
420         default:
421           break;
422         }
423       }
424     }
425
426 #else
427 # if defined(PAR)
428     /* in GUM do this only on the Main PE */
429     if (IAmMainThread)
430 # endif
431     /* If our main thread has finished or been killed, return.
432      */
433     {
434       StgMainThread *m = main_threads;
435       if (m->tso->what_next == ThreadComplete
436           || m->tso->what_next == ThreadKilled) {
437         main_threads = main_threads->link;
438         if (m->tso->what_next == ThreadComplete) {
439           /* we finished successfully, fill in the return value */
440           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
441           m->stat = Success;
442           return;
443         } else {
444           if (was_interrupted) {
445             m->stat = Interrupted;
446           } else {
447             m->stat = Killed;
448           }
449           return;
450         }
451       }
452     }
453 #endif
454
455     /* Top up the run queue from our spark pool.  We try to make the
456      * number of threads in the run queue equal to the number of
457      * free capabilities.
458      */
459 #if defined(SMP)
460     {
461       nat n = n_free_capabilities;
462       StgTSO *tso = run_queue_hd;
463
464       /* Count the run queue */
465       while (n > 0 && tso != END_TSO_QUEUE) {
466         tso = tso->link;
467         n--;
468       }
469
470       for (; n > 0; n--) {
471         StgClosure *spark;
472         spark = findSpark();
473         if (spark == NULL) {
474           break; /* no more sparks in the pool */
475         } else {
476           /* I'd prefer this to be done in activateSpark -- HWL */
477           /* tricky - it needs to hold the scheduler lock and
478            * not try to re-acquire it -- SDM */
479           StgTSO *tso;
480           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
481           pushClosure(tso,spark);
482           PUSH_ON_RUN_QUEUE(tso);
483 #ifdef PAR
484           advisory_thread_count++;
485 #endif
486           
487           IF_DEBUG(scheduler,
488                    sched_belch("turning spark of closure %p into a thread",
489                                (StgClosure *)spark));
490         }
491       }
492       /* We need to wake up the other tasks if we just created some
493        * work for them.
494        */
495       if (n_free_capabilities - n > 1) {
496           pthread_cond_signal(&thread_ready_cond);
497       }
498     }
499 #endif /* SMP */
500
501     /* Check whether any waiting threads need to be woken up.  If the
502      * run queue is empty, and there are no other tasks running, we
503      * can wait indefinitely for something to happen.
504      * ToDo: what if another client comes along & requests another
505      * main thread?
506      */
507     if (blocked_queue_hd != END_TSO_QUEUE) {
508       awaitEvent(
509            (run_queue_hd == END_TSO_QUEUE)
510 #ifdef SMP
511         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
512 #endif
513         );
514     }
515     
516     /* check for signals each time around the scheduler */
517 #ifndef __MINGW32__
518     if (signals_pending()) {
519       start_signal_handlers();
520     }
521 #endif
522
523     /* Detect deadlock: when we have no threads to run, there are
524      * no threads waiting on I/O or sleeping, and all the other
525      * tasks are waiting for work, we must have a deadlock.  Inform
526      * all the main threads.
527      */
528 #ifdef SMP
529     if (blocked_queue_hd == END_TSO_QUEUE
530         && run_queue_hd == END_TSO_QUEUE
531         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
532         ) {
533       StgMainThread *m;
534       for (m = main_threads; m != NULL; m = m->link) {
535           m->ret = NULL;
536           m->stat = Deadlock;
537           pthread_cond_broadcast(&m->wakeup);
538       }
539       main_threads = NULL;
540     }
541 #else /* ! SMP */
542     /* 
543        In GUM all non-main PEs come in here with no work;
544        we ignore multiple main threads for now 
545
546     if (blocked_queue_hd == END_TSO_QUEUE
547         && run_queue_hd == END_TSO_QUEUE) {
548       StgMainThread *m = main_threads;
549       m->ret = NULL;
550       m->stat = Deadlock;
551       main_threads = m->link;
552       return;
553     }
554     */
555 #endif
556
557 #ifdef SMP
558     /* If there's a GC pending, don't do anything until it has
559      * completed.
560      */
561     if (ready_to_gc) {
562       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
563       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
564     }
565     
566     /* block until we've got a thread on the run queue and a free
567      * capability.
568      */
569     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
570       IF_DEBUG(scheduler, sched_belch("waiting for work"));
571       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
572       IF_DEBUG(scheduler, sched_belch("work now available"));
573     }
574 #endif
575
576 #if defined(GRAN)
577
578     if (RtsFlags.GranFlags.Light)
579       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
580
581     /* adjust time based on time-stamp */
582     if (event->time > CurrentTime[CurrentProc] &&
583         event->evttype != ContinueThread)
584       CurrentTime[CurrentProc] = event->time;
585     
586     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
587     if (!RtsFlags.GranFlags.Light)
588       handleIdlePEs();
589
590     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
591
592     /* main event dispatcher in GranSim */
593     switch (event->evttype) {
594       /* Should just be continuing execution */
595     case ContinueThread:
596       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
597       /* ToDo: check assertion
598       ASSERT(run_queue_hd != (StgTSO*)NULL &&
599              run_queue_hd != END_TSO_QUEUE);
600       */
601       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
602       if (!RtsFlags.GranFlags.DoAsyncFetch &&
603           procStatus[CurrentProc]==Fetching) {
604         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
605               CurrentTSO->id, CurrentTSO, CurrentProc);
606         goto next_thread;
607       } 
608       /* Ignore ContinueThreads for completed threads */
609       if (CurrentTSO->what_next == ThreadComplete) {
610         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
611               CurrentTSO->id, CurrentTSO, CurrentProc);
612         goto next_thread;
613       } 
614       /* Ignore ContinueThreads for threads that are being migrated */
615       if (PROCS(CurrentTSO)==Nowhere) { 
616         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
617               CurrentTSO->id, CurrentTSO, CurrentProc);
618         goto next_thread;
619       }
620       /* The thread should be at the beginning of the run queue */
621       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
622         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
623               CurrentTSO->id, CurrentTSO, CurrentProc);
624         break; // run the thread anyway
625       }
626       /*
627       new_event(proc, proc, CurrentTime[proc],
628                 FindWork,
629                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
630       goto next_thread; 
631       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
632       break; // now actually run the thread; DaH Qu'vam yImuHbej 
633
634     case FetchNode:
635       do_the_fetchnode(event);
636       goto next_thread;             /* handle next event in event queue  */
637       
638     case GlobalBlock:
639       do_the_globalblock(event);
640       goto next_thread;             /* handle next event in event queue  */
641       
642     case FetchReply:
643       do_the_fetchreply(event);
644       goto next_thread;             /* handle next event in event queue  */
645       
646     case UnblockThread:   /* Move from the blocked queue to the tail of */
647       do_the_unblock(event);
648       goto next_thread;             /* handle next event in event queue  */
649       
650     case ResumeThread:  /* Move from the blocked queue to the tail of */
651       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
652       event->tso->gran.blocktime += 
653         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
654       do_the_startthread(event);
655       goto next_thread;             /* handle next event in event queue  */
656       
657     case StartThread:
658       do_the_startthread(event);
659       goto next_thread;             /* handle next event in event queue  */
660       
661     case MoveThread:
662       do_the_movethread(event);
663       goto next_thread;             /* handle next event in event queue  */
664       
665     case MoveSpark:
666       do_the_movespark(event);
667       goto next_thread;             /* handle next event in event queue  */
668       
669     case FindWork:
670       do_the_findwork(event);
671       goto next_thread;             /* handle next event in event queue  */
672       
673     default:
674       barf("Illegal event type %u\n", event->evttype);
675     }  /* switch */
676     
677     /* This point was scheduler_loop in the old RTS */
678
679     IF_DEBUG(gran, belch("GRAN: after main switch"));
680
681     TimeOfLastEvent = CurrentTime[CurrentProc];
682     TimeOfNextEvent = get_time_of_next_event();
683     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
684     // CurrentTSO = ThreadQueueHd;
685
686     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
687                          TimeOfNextEvent));
688
689     if (RtsFlags.GranFlags.Light) 
690       GranSimLight_leave_system(event, &ActiveTSO); 
691
692     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
693
694     IF_DEBUG(gran, 
695              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
696
697     /* in a GranSim setup the TSO stays on the run queue */
698     t = CurrentTSO;
699     /* Take a thread from the run queue. */
700     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
701
702     IF_DEBUG(gran, 
703              fprintf(stderr, "GRAN: About to run current thread, which is\n");
704              G_TSO(t,5))
705
706     context_switch = 0; // turned on via GranYield, checking events and time slice
707
708     IF_DEBUG(gran, 
709              DumpGranEvent(GR_SCHEDULE, t));
710
711     procStatus[CurrentProc] = Busy;
712
713 #elif defined(PAR)
714
715     if (PendingFetches != END_BF_QUEUE) {
716         processFetches();
717     }
718
719     /* ToDo: phps merge with spark activation above */
720     /* check whether we have local work and send requests if we have none */
721     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
722       /* :-[  no local threads => look out for local sparks */
723       /* the spark pool for the current PE */
724       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
725       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
726           pool->hd < pool->tl) {
727         /* 
728          * ToDo: add GC code check that we really have enough heap afterwards!!
729          * Old comment:
730          * If we're here (no runnable threads) and we have pending
731          * sparks, we must have a space problem.  Get enough space
732          * to turn one of those pending sparks into a
733          * thread... 
734          */
735         
736         spark = findSpark();                /* get a spark */
737         if (spark != (rtsSpark) NULL) {
738           tso = activateSpark(spark);       /* turn the spark into a thread */
739           IF_PAR_DEBUG(schedule,
740                        belch("==== schedule: Created TSO %d (%p); %d threads active",
741                              tso->id, tso, advisory_thread_count));
742
743           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
744             belch("==^^ failed to activate spark");
745             goto next_thread;
746           }               /* otherwise fall through & pick-up new tso */
747         } else {
748           IF_PAR_DEBUG(verbose,
749                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
750                              spark_queue_len(pool)));
751           goto next_thread;
752         }
753       } else  
754       /* =8-[  no local sparks => look for work on other PEs */
755       {
756         /*
757          * We really have absolutely no work.  Send out a fish
758          * (there may be some out there already), and wait for
759          * something to arrive.  We clearly can't run any threads
760          * until a SCHEDULE or RESUME arrives, and so that's what
761          * we're hoping to see.  (Of course, we still have to
762          * respond to other types of messages.)
763          */
764         if (//!fishing &&  
765             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
766           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
767           /* fishing set in sendFish, processFish;
768              avoid flooding system with fishes via delay */
769           pe = choosePE();
770           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
771                    NEW_FISH_HUNGER);
772         }
773         
774         processMessages();
775         goto next_thread;
776         // ReSchedule(0);
777       }
778     } else if (PacketsWaiting()) {  /* Look for incoming messages */
779       processMessages();
780     }
781
782     /* Now we are sure that we have some work available */
783     ASSERT(run_queue_hd != END_TSO_QUEUE);
784     /* Take a thread from the run queue, if we have work */
785     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
786
787     /* ToDo: write something to the log-file
788     if (RTSflags.ParFlags.granSimStats && !sameThread)
789         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
790
791     CurrentTSO = t;
792     */
793     /* the spark pool for the current PE */
794     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
795
796     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)", 
797                               spark_queue_len(pool), 
798                               CURRENT_PROC,
799                               pool->hd, pool->tl, pool->base, pool->lim));
800
801     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
802                               run_queue_len(), CURRENT_PROC,
803                               run_queue_hd, run_queue_tl));
804
805 #if 0
806     if (t != LastTSO) {
807       /* 
808          we are running a different TSO, so write a schedule event to log file
809          NB: If we use fair scheduling we also have to write  a deschedule 
810              event for LastTSO; with unfair scheduling we know that the
811              previous tso has blocked whenever we switch to another tso, so
812              we don't need it in GUM for now
813       */
814       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
815                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
816       
817     }
818 #endif
819 #else /* !GRAN && !PAR */
820   
821     /* grab a thread from the run queue
822      */
823     t = POP_RUN_QUEUE();
824     IF_DEBUG(sanity,checkTSO(t));
825
826 #endif
827     
828     /* grab a capability
829      */
830 #ifdef SMP
831     cap = free_capabilities;
832     free_capabilities = cap->link;
833     n_free_capabilities--;
834 #else
835     cap = &MainRegTable;
836 #endif
837     
838     cap->rCurrentTSO = t;
839     
840     /* set the context_switch flag
841      */
842     if (run_queue_hd == END_TSO_QUEUE)
843       context_switch = 0;
844     else
845       context_switch = 1;
846
847     RELEASE_LOCK(&sched_mutex);
848
849 #if defined(GRAN) || defined(PAR)    
850     IF_DEBUG(scheduler, belch("-->> Running TSO %ld (%p) %s ...", 
851                               t->id, t, whatNext_strs[t->what_next]));
852 #else
853     IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
854 #endif
855
856     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
857     /* Run the current thread 
858      */
859     switch (cap->rCurrentTSO->what_next) {
860     case ThreadKilled:
861     case ThreadComplete:
862       /* Thread already finished, return to scheduler. */
863       ret = ThreadFinished;
864       break;
865     case ThreadEnterGHC:
866       ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
867       break;
868     case ThreadRunGHC:
869       ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
870       break;
871     case ThreadEnterHugs:
872 #ifdef INTERPRETER
873       {
874          StgClosure* c;
875          IF_DEBUG(scheduler,sched_belch("entering Hugs"));
876          c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
877          cap->rCurrentTSO->sp += 1;
878          ret = enter(cap,c);
879          break;
880       }
881 #else
882       barf("Panic: entered a BCO but no bytecode interpreter in this build");
883 #endif
884     default:
885       barf("schedule: invalid what_next field");
886     }
887     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
888     
889     /* Costs for the scheduler are assigned to CCS_SYSTEM */
890 #ifdef PROFILING
891     CCCS = CCS_SYSTEM;
892 #endif
893     
894     ACQUIRE_LOCK(&sched_mutex);
895
896 #ifdef SMP
897     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
898 #elif !defined(GRAN) && !defined(PAR)
899     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
900 #endif
901     t = cap->rCurrentTSO;
902     
903 #if defined(PAR)
904     /* HACK 675: if the last thread didn't yield, make sure to print a 
905        SCHEDULE event to the log file when StgRunning the next thread, even
906        if it is the same one as before */
907     LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t; 
908     TimeOfLastYield = CURRENT_TIME;
909 #endif
910
911     switch (ret) {
912     case HeapOverflow:
913       /* make all the running tasks block on a condition variable,
914        * maybe set context_switch and wait till they all pile in,
915        * then have them wait on a GC condition variable.
916        */
917 #if defined(GRAN) || defined(PAR)    
918       IF_DEBUG(scheduler,belch("--<< TSO %ld (%p; %s) stopped: HeapOverflow", 
919                                t->id, t, whatNext_strs[t->what_next]));
920 #endif
921       threadPaused(t);
922 #if defined(GRAN)
923       ASSERT(!is_on_queue(t,CurrentProc));
924 #endif
925       
926       ready_to_gc = rtsTrue;
927       context_switch = 1;               /* stop other threads ASAP */
928       PUSH_ON_RUN_QUEUE(t);
929       /* actual GC is done at the end of the while loop */
930       break;
931       
932     case StackOverflow:
933 #if defined(GRAN) || defined(PAR)    
934       IF_DEBUG(scheduler,belch("--<< TSO %ld (%p; %s) stopped, StackOverflow", 
935                                t->id, t, whatNext_strs[t->what_next]));
936 #endif
937       /* just adjust the stack for this thread, then pop it back
938        * on the run queue.
939        */
940       threadPaused(t);
941       { 
942         StgMainThread *m;
943         /* enlarge the stack */
944         StgTSO *new_t = threadStackOverflow(t);
945         
946         /* This TSO has moved, so update any pointers to it from the
947          * main thread stack.  It better not be on any other queues...
948          * (it shouldn't be).
949          */
950         for (m = main_threads; m != NULL; m = m->link) {
951           if (m->tso == t) {
952             m->tso = new_t;
953           }
954         }
955         threadPaused(new_t);
956         PUSH_ON_RUN_QUEUE(new_t);
957       }
958       break;
959
960     case ThreadYielding:
961 #if defined(GRAN)
962       IF_DEBUG(gran, 
963                DumpGranEvent(GR_DESCHEDULE, t));
964       globalGranStats.tot_yields++;
965 #elif defined(PAR)
966       IF_DEBUG(par, 
967                DumpGranEvent(GR_DESCHEDULE, t));
968 #endif
969       /* put the thread back on the run queue.  Then, if we're ready to
970        * GC, check whether this is the last task to stop.  If so, wake
971        * up the GC thread.  getThread will block during a GC until the
972        * GC is finished.
973        */
974 #if defined(GRAN) || defined(PAR)    
975       IF_DEBUG(scheduler,
976                if (t->what_next == ThreadEnterHugs) {
977                    /* ToDo: or maybe a timer expired when we were in Hugs?
978                     * or maybe someone hit ctrl-C
979                     */
980                    belch("--<< TSO %ld (%p; %s) stopped to switch to Hugs", 
981                          t->id, t, whatNext_strs[t->what_next]);
982                } else {
983                    belch("--<< TSO %ld (%p; %s) stopped, yielding", 
984                          t->id, t, whatNext_strs[t->what_next]);
985                }
986                );
987 #else
988       IF_DEBUG(scheduler,
989                if (t->what_next == ThreadEnterHugs) {
990                  /* ToDo: or maybe a timer expired when we were in Hugs?
991                   * or maybe someone hit ctrl-C
992                   */
993                  belch("thread %ld stopped to switch to Hugs", t->id);
994                } else {
995                  belch("thread %ld stopped, yielding", t->id);
996                }
997                );
998 #endif
999       threadPaused(t);
1000       IF_DEBUG(sanity,
1001                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1002                checkTSO(t));
1003       ASSERT(t->link == END_TSO_QUEUE);
1004 #if defined(GRAN)
1005       ASSERT(!is_on_queue(t,CurrentProc));
1006
1007       IF_DEBUG(sanity,
1008                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1009                checkThreadQsSanity(rtsTrue));
1010 #endif
1011       APPEND_TO_RUN_QUEUE(t);
1012 #if defined(GRAN)
1013       /* add a ContinueThread event to actually process the thread */
1014       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1015                 ContinueThread,
1016                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1017       IF_GRAN_DEBUG(bq, 
1018                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1019                G_EVENTQ(0);
1020                G_CURR_THREADQ(0))
1021 #endif /* GRAN */
1022       break;
1023       
1024     case ThreadBlocked:
1025 #if defined(GRAN)
1026       IF_DEBUG(scheduler,
1027                belch("--<< TSO %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1028                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1029                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1030
1031       // ??? needed; should emit block before
1032       IF_DEBUG(gran, 
1033                DumpGranEvent(GR_DESCHEDULE, t)); 
1034       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1035       /*
1036         ngoq Dogh!
1037       ASSERT(procStatus[CurrentProc]==Busy || 
1038               ((procStatus[CurrentProc]==Fetching) && 
1039               (t->block_info.closure!=(StgClosure*)NULL)));
1040       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1041           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1042             procStatus[CurrentProc]==Fetching)) 
1043         procStatus[CurrentProc] = Idle;
1044       */
1045 #elif defined(PAR)
1046       IF_DEBUG(par, 
1047                DumpGranEvent(GR_DESCHEDULE, t)); 
1048
1049       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1050       blockThread(t);
1051
1052       IF_DEBUG(scheduler,
1053                belch("--<< TSO %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1054                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1055                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1056
1057 #else /* !GRAN */
1058       /* don't need to do anything.  Either the thread is blocked on
1059        * I/O, in which case we'll have called addToBlockedQueue
1060        * previously, or it's blocked on an MVar or Blackhole, in which
1061        * case it'll be on the relevant queue already.
1062        */
1063       IF_DEBUG(scheduler,
1064                fprintf(stderr, "--<< TSO %d (%p) stopped ", t->id, t);
1065                printThreadBlockage(t);
1066                fprintf(stderr, "\n"));
1067
1068       /* Only for dumping event to log file 
1069          ToDo: do I need this in GranSim, too?
1070       blockThread(t);
1071       */
1072 #endif
1073       threadPaused(t);
1074       break;
1075       
1076     case ThreadFinished:
1077       /* Need to check whether this was a main thread, and if so, signal
1078        * the task that started it with the return value.  If we have no
1079        * more main threads, we probably need to stop all the tasks until
1080        * we get a new one.
1081        */
1082       IF_DEBUG(scheduler,belch("--++ TSO %d (%p) finished", t->id, t));
1083       t->what_next = ThreadComplete;
1084 #if defined(GRAN)
1085       endThread(t, CurrentProc); // clean-up the thread
1086 #elif defined(PAR)
1087       advisory_thread_count--;
1088       if (RtsFlags.ParFlags.ParStats.Full) 
1089         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1090 #endif
1091       break;
1092       
1093     default:
1094       barf("doneThread: invalid thread return code");
1095     }
1096     
1097 #ifdef SMP
1098     cap->link = free_capabilities;
1099     free_capabilities = cap;
1100     n_free_capabilities++;
1101 #endif
1102
1103 #ifdef SMP
1104     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1105 #else
1106     if (ready_to_gc) 
1107 #endif
1108       {
1109       /* everybody back, start the GC.
1110        * Could do it in this thread, or signal a condition var
1111        * to do it in another thread.  Either way, we need to
1112        * broadcast on gc_pending_cond afterward.
1113        */
1114 #ifdef SMP
1115       IF_DEBUG(scheduler,sched_belch("doing GC"));
1116 #endif
1117       GarbageCollect(GetRoots);
1118       ready_to_gc = rtsFalse;
1119 #ifdef SMP
1120       pthread_cond_broadcast(&gc_pending_cond);
1121 #endif
1122 #if defined(GRAN)
1123       /* add a ContinueThread event to continue execution of current thread */
1124       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1125                 ContinueThread,
1126                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1127       IF_GRAN_DEBUG(bq, 
1128                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1129                G_EVENTQ(0);
1130                G_CURR_THREADQ(0))
1131 #endif /* GRAN */
1132     }
1133 #if defined(GRAN)
1134   next_thread:
1135     IF_GRAN_DEBUG(unused,
1136                   print_eventq(EventHd));
1137
1138     event = get_next_event();
1139
1140 #elif defined(PAR)
1141   next_thread:
1142     /* ToDo: wait for next message to arrive rather than busy wait */
1143
1144 #else /* GRAN */
1145   /* not any more
1146   next_thread:
1147     t = take_off_run_queue(END_TSO_QUEUE);
1148   */
1149 #endif /* GRAN */
1150   } /* end of while(1) */
1151 }
1152
1153 /* A hack for Hugs concurrency support.  Needs sanitisation (?) */
1154 void deleteAllThreads ( void )
1155 {
1156   StgTSO* t;
1157   IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
1158   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1159     deleteThread(t);
1160   }
1161   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1162     deleteThread(t);
1163   }
1164   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1165   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1166 }
1167
1168 /* startThread and  insertThread are now in GranSim.c -- HWL */
1169
1170 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1171 //@subsection Suspend and Resume
1172
1173 /* ---------------------------------------------------------------------------
1174  * Suspending & resuming Haskell threads.
1175  * 
1176  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1177  * its capability before calling the C function.  This allows another
1178  * task to pick up the capability and carry on running Haskell
1179  * threads.  It also means that if the C call blocks, it won't lock
1180  * the whole system.
1181  *
1182  * The Haskell thread making the C call is put to sleep for the
1183  * duration of the call, on the susepended_ccalling_threads queue.  We
1184  * give out a token to the task, which it can use to resume the thread
1185  * on return from the C function.
1186  * ------------------------------------------------------------------------- */
1187    
1188 StgInt
1189 suspendThread( Capability *cap )
1190 {
1191   nat tok;
1192
1193   ACQUIRE_LOCK(&sched_mutex);
1194
1195   IF_DEBUG(scheduler,
1196            sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
1197
1198   threadPaused(cap->rCurrentTSO);
1199   cap->rCurrentTSO->link = suspended_ccalling_threads;
1200   suspended_ccalling_threads = cap->rCurrentTSO;
1201
1202   /* Use the thread ID as the token; it should be unique */
1203   tok = cap->rCurrentTSO->id;
1204
1205 #ifdef SMP
1206   cap->link = free_capabilities;
1207   free_capabilities = cap;
1208   n_free_capabilities++;
1209 #endif
1210
1211   RELEASE_LOCK(&sched_mutex);
1212   return tok; 
1213 }
1214
1215 Capability *
1216 resumeThread( StgInt tok )
1217 {
1218   StgTSO *tso, **prev;
1219   Capability *cap;
1220
1221   ACQUIRE_LOCK(&sched_mutex);
1222
1223   prev = &suspended_ccalling_threads;
1224   for (tso = suspended_ccalling_threads; 
1225        tso != END_TSO_QUEUE; 
1226        prev = &tso->link, tso = tso->link) {
1227     if (tso->id == (StgThreadID)tok) {
1228       *prev = tso->link;
1229       break;
1230     }
1231   }
1232   if (tso == END_TSO_QUEUE) {
1233     barf("resumeThread: thread not found");
1234   }
1235
1236 #ifdef SMP
1237   while (free_capabilities == NULL) {
1238     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1239     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1240     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1241   }
1242   cap = free_capabilities;
1243   free_capabilities = cap->link;
1244   n_free_capabilities--;
1245 #else  
1246   cap = &MainRegTable;
1247 #endif
1248
1249   cap->rCurrentTSO = tso;
1250
1251   RELEASE_LOCK(&sched_mutex);
1252   return cap;
1253 }
1254
1255
1256 /* ---------------------------------------------------------------------------
1257  * Static functions
1258  * ------------------------------------------------------------------------ */
1259 static void unblockThread(StgTSO *tso);
1260
1261 /* ---------------------------------------------------------------------------
1262  * Comparing Thread ids.
1263  *
1264  * This is used from STG land in the implementation of the
1265  * instances of Eq/Ord for ThreadIds.
1266  * ------------------------------------------------------------------------ */
1267
1268 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1269
1270   StgThreadID id1 = tso1->id; 
1271   StgThreadID id2 = tso2->id;
1272  
1273   if (id1 < id2) return (-1);
1274   if (id1 > id2) return 1;
1275   return 0;
1276 }
1277
1278 /* ---------------------------------------------------------------------------
1279    Create a new thread.
1280
1281    The new thread starts with the given stack size.  Before the
1282    scheduler can run, however, this thread needs to have a closure
1283    (and possibly some arguments) pushed on its stack.  See
1284    pushClosure() in Schedule.h.
1285
1286    createGenThread() and createIOThread() (in SchedAPI.h) are
1287    convenient packaged versions of this function.
1288
1289    currently pri (priority) is only used in a GRAN setup -- HWL
1290    ------------------------------------------------------------------------ */
1291 //@cindex createThread
1292 #if defined(GRAN)
1293 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1294 StgTSO *
1295 createThread(nat stack_size, StgInt pri)
1296 {
1297   return createThread_(stack_size, rtsFalse, pri);
1298 }
1299
1300 static StgTSO *
1301 createThread_(nat size, rtsBool have_lock, StgInt pri)
1302 {
1303 #else
1304 StgTSO *
1305 createThread(nat stack_size)
1306 {
1307   return createThread_(stack_size, rtsFalse);
1308 }
1309
1310 static StgTSO *
1311 createThread_(nat size, rtsBool have_lock)
1312 {
1313 #endif
1314
1315     StgTSO *tso;
1316     nat stack_size;
1317
1318     /* First check whether we should create a thread at all */
1319 #if defined(PAR)
1320   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1321   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1322     threadsIgnored++;
1323     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1324           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1325     return END_TSO_QUEUE;
1326   }
1327   threadsCreated++;
1328 #endif
1329
1330 #if defined(GRAN)
1331   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1332 #endif
1333
1334   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1335
1336   /* catch ridiculously small stack sizes */
1337   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1338     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1339   }
1340
1341   stack_size = size - TSO_STRUCT_SIZEW;
1342
1343   tso = (StgTSO *)allocate(size);
1344   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1345
1346   SET_HDR(tso, &TSO_info, CCS_MAIN);
1347 #if defined(GRAN)
1348   SET_GRAN_HDR(tso, ThisPE);
1349 #endif
1350   tso->what_next     = ThreadEnterGHC;
1351
1352   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1353    * protect the increment operation on next_thread_id.
1354    * In future, we could use an atomic increment instead.
1355    */
1356   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1357   tso->id = next_thread_id++; 
1358   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1359
1360   tso->why_blocked  = NotBlocked;
1361   tso->blocked_exceptions = NULL;
1362
1363   tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1364   tso->stack_size   = stack_size;
1365   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1366                               - TSO_STRUCT_SIZEW;
1367   tso->sp           = (P_)&(tso->stack) + stack_size;
1368
1369 #ifdef PROFILING
1370   tso->prof.CCCS = CCS_MAIN;
1371 #endif
1372
1373   /* put a stop frame on the stack */
1374   tso->sp -= sizeofW(StgStopFrame);
1375   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
1376   tso->su = (StgUpdateFrame*)tso->sp;
1377
1378   IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words", 
1379                            tso->id, tso, tso->stack_size));
1380
1381   // ToDo: check this
1382 #if defined(GRAN)
1383   tso->link = END_TSO_QUEUE;
1384   /* uses more flexible routine in GranSim */
1385   insertThread(tso, CurrentProc);
1386 #else
1387   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1388    * from its creation
1389    */
1390 #endif
1391
1392 #if defined(GRAN) || defined(PAR)
1393   DumpGranEvent(GR_START,tso);
1394 #endif
1395
1396   /* Link the new thread on the global thread list.
1397    */
1398   tso->global_link = all_threads;
1399   all_threads = tso;
1400
1401 #if defined(GRAN)
1402   tso->gran.pri = pri;
1403 # if defined(DEBUG)
1404   tso->gran.magic = TSO_MAGIC; // debugging only
1405 # endif
1406   tso->gran.sparkname   = 0;
1407   tso->gran.startedat   = CURRENT_TIME; 
1408   tso->gran.exported    = 0;
1409   tso->gran.basicblocks = 0;
1410   tso->gran.allocs      = 0;
1411   tso->gran.exectime    = 0;
1412   tso->gran.fetchtime   = 0;
1413   tso->gran.fetchcount  = 0;
1414   tso->gran.blocktime   = 0;
1415   tso->gran.blockcount  = 0;
1416   tso->gran.blockedat   = 0;
1417   tso->gran.globalsparks = 0;
1418   tso->gran.localsparks  = 0;
1419   if (RtsFlags.GranFlags.Light)
1420     tso->gran.clock  = Now; /* local clock */
1421   else
1422     tso->gran.clock  = 0;
1423
1424   IF_DEBUG(gran,printTSO(tso));
1425 #elif defined(PAR)
1426 # if defined(DEBUG)
1427   tso->par.magic = TSO_MAGIC; // debugging only
1428 # endif
1429   tso->par.sparkname   = 0;
1430   tso->par.startedat   = CURRENT_TIME; 
1431   tso->par.exported    = 0;
1432   tso->par.basicblocks = 0;
1433   tso->par.allocs      = 0;
1434   tso->par.exectime    = 0;
1435   tso->par.fetchtime   = 0;
1436   tso->par.fetchcount  = 0;
1437   tso->par.blocktime   = 0;
1438   tso->par.blockcount  = 0;
1439   tso->par.blockedat   = 0;
1440   tso->par.globalsparks = 0;
1441   tso->par.localsparks  = 0;
1442 #endif
1443
1444 #if defined(GRAN)
1445   globalGranStats.tot_threads_created++;
1446   globalGranStats.threads_created_on_PE[CurrentProc]++;
1447   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1448   globalGranStats.tot_sq_probes++;
1449 #endif 
1450
1451 #if defined(GRAN)
1452   IF_GRAN_DEBUG(pri,
1453                 belch("==__ schedule: Created TSO %d (%p);",
1454                       CurrentProc, tso, tso->id));
1455 #elif defined(PAR)
1456     IF_PAR_DEBUG(verbose,
1457                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1458                        tso->id, tso, advisory_thread_count));
1459 #else
1460   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1461                                  tso->id, tso->stack_size));
1462 #endif    
1463   return tso;
1464 }
1465
1466 /*
1467   Turn a spark into a thread.
1468   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1469 */
1470 #if defined(PAR)
1471 //@cindex activateSpark
1472 StgTSO *
1473 activateSpark (rtsSpark spark) 
1474 {
1475   StgTSO *tso;
1476   
1477   ASSERT(spark != (rtsSpark)NULL);
1478   tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1479   if (tso!=END_TSO_QUEUE) {
1480     pushClosure(tso,spark);
1481     PUSH_ON_RUN_QUEUE(tso);
1482     advisory_thread_count++;
1483
1484     if (RtsFlags.ParFlags.ParStats.Full) {
1485       //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1486       IF_PAR_DEBUG(verbose,
1487                    belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1488                          (StgClosure *)spark, info_type((StgClosure *)spark)));
1489     }
1490   } else {
1491     barf("activateSpark: Cannot create TSO");
1492   }
1493   // ToDo: fwd info on local/global spark to thread -- HWL
1494   // tso->gran.exported =  spark->exported;
1495   // tso->gran.locked =   !spark->global;
1496   // tso->gran.sparkname = spark->name;
1497
1498   return tso;
1499 }
1500 #endif
1501
1502 /* ---------------------------------------------------------------------------
1503  * scheduleThread()
1504  *
1505  * scheduleThread puts a thread on the head of the runnable queue.
1506  * This will usually be done immediately after a thread is created.
1507  * The caller of scheduleThread must create the thread using e.g.
1508  * createThread and push an appropriate closure
1509  * on this thread's stack before the scheduler is invoked.
1510  * ------------------------------------------------------------------------ */
1511
1512 void
1513 scheduleThread(StgTSO *tso)
1514 {
1515   if (tso==END_TSO_QUEUE){    
1516     schedule();
1517     return;
1518   }
1519
1520   ACQUIRE_LOCK(&sched_mutex);
1521
1522   /* Put the new thread on the head of the runnable queue.  The caller
1523    * better push an appropriate closure on this thread's stack
1524    * beforehand.  In the SMP case, the thread may start running as
1525    * soon as we release the scheduler lock below.
1526    */
1527   PUSH_ON_RUN_QUEUE(tso);
1528   THREAD_RUNNABLE();
1529
1530   IF_DEBUG(scheduler,printTSO(tso));
1531   RELEASE_LOCK(&sched_mutex);
1532 }
1533
1534 /* ---------------------------------------------------------------------------
1535  * startTasks()
1536  *
1537  * Start up Posix threads to run each of the scheduler tasks.
1538  * I believe the task ids are not needed in the system as defined.
1539  *  KH @ 25/10/99
1540  * ------------------------------------------------------------------------ */
1541
1542 #if defined(PAR) || defined(SMP)
1543 void *
1544 taskStart( void *arg STG_UNUSED )
1545 {
1546   rts_evalNothing(NULL);
1547 }
1548 #endif
1549
1550 /* ---------------------------------------------------------------------------
1551  * initScheduler()
1552  *
1553  * Initialise the scheduler.  This resets all the queues - if the
1554  * queues contained any threads, they'll be garbage collected at the
1555  * next pass.
1556  *
1557  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1558  * ------------------------------------------------------------------------ */
1559
1560 #ifdef SMP
1561 static void
1562 term_handler(int sig STG_UNUSED)
1563 {
1564   stat_workerStop();
1565   ACQUIRE_LOCK(&term_mutex);
1566   await_death--;
1567   RELEASE_LOCK(&term_mutex);
1568   pthread_exit(NULL);
1569 }
1570 #endif
1571
1572 //@cindex initScheduler
1573 void 
1574 initScheduler(void)
1575 {
1576 #if defined(GRAN)
1577   nat i;
1578
1579   for (i=0; i<=MAX_PROC; i++) {
1580     run_queue_hds[i]      = END_TSO_QUEUE;
1581     run_queue_tls[i]      = END_TSO_QUEUE;
1582     blocked_queue_hds[i]  = END_TSO_QUEUE;
1583     blocked_queue_tls[i]  = END_TSO_QUEUE;
1584     ccalling_threadss[i]  = END_TSO_QUEUE;
1585   }
1586 #else
1587   run_queue_hd      = END_TSO_QUEUE;
1588   run_queue_tl      = END_TSO_QUEUE;
1589   blocked_queue_hd  = END_TSO_QUEUE;
1590   blocked_queue_tl  = END_TSO_QUEUE;
1591 #endif 
1592
1593   suspended_ccalling_threads  = END_TSO_QUEUE;
1594
1595   main_threads = NULL;
1596   all_threads  = END_TSO_QUEUE;
1597
1598   context_switch = 0;
1599   interrupted    = 0;
1600
1601   enteredCAFs = END_CAF_LIST;
1602
1603   /* Install the SIGHUP handler */
1604 #ifdef SMP
1605   {
1606     struct sigaction action,oact;
1607
1608     action.sa_handler = term_handler;
1609     sigemptyset(&action.sa_mask);
1610     action.sa_flags = 0;
1611     if (sigaction(SIGTERM, &action, &oact) != 0) {
1612       barf("can't install TERM handler");
1613     }
1614   }
1615 #endif
1616
1617 #ifdef SMP
1618   /* Allocate N Capabilities */
1619   {
1620     nat i;
1621     Capability *cap, *prev;
1622     cap  = NULL;
1623     prev = NULL;
1624     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1625       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1626       cap->link = prev;
1627       prev = cap;
1628     }
1629     free_capabilities = cap;
1630     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1631   }
1632   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1633                              n_free_capabilities););
1634 #endif
1635
1636 #if defined(SMP) || defined(PAR)
1637   initSparkPools();
1638 #endif
1639 }
1640
1641 #ifdef SMP
1642 void
1643 startTasks( void )
1644 {
1645   nat i;
1646   int r;
1647   pthread_t tid;
1648   
1649   /* make some space for saving all the thread ids */
1650   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1651                             "initScheduler:task_ids");
1652   
1653   /* and create all the threads */
1654   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1655     r = pthread_create(&tid,NULL,taskStart,NULL);
1656     if (r != 0) {
1657       barf("startTasks: Can't create new Posix thread");
1658     }
1659     task_ids[i].id = tid;
1660     task_ids[i].mut_time = 0.0;
1661     task_ids[i].mut_etime = 0.0;
1662     task_ids[i].gc_time = 0.0;
1663     task_ids[i].gc_etime = 0.0;
1664     task_ids[i].elapsedtimestart = elapsedtime();
1665     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1666   }
1667 }
1668 #endif
1669
1670 void
1671 exitScheduler( void )
1672 {
1673 #ifdef SMP
1674   nat i;
1675
1676   /* Don't want to use pthread_cancel, since we'd have to install
1677    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1678    * all our locks.
1679    */
1680 #if 0
1681   /* Cancel all our tasks */
1682   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1683     pthread_cancel(task_ids[i].id);
1684   }
1685   
1686   /* Wait for all the tasks to terminate */
1687   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1688     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1689                                task_ids[i].id));
1690     pthread_join(task_ids[i].id, NULL);
1691   }
1692 #endif
1693
1694   /* Send 'em all a SIGHUP.  That should shut 'em up.
1695    */
1696   await_death = RtsFlags.ParFlags.nNodes;
1697   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1698     pthread_kill(task_ids[i].id,SIGTERM);
1699   }
1700   while (await_death > 0) {
1701     sched_yield();
1702   }
1703 #endif
1704 }
1705
1706 /* -----------------------------------------------------------------------------
1707    Managing the per-task allocation areas.
1708    
1709    Each capability comes with an allocation area.  These are
1710    fixed-length block lists into which allocation can be done.
1711
1712    ToDo: no support for two-space collection at the moment???
1713    -------------------------------------------------------------------------- */
1714
1715 /* -----------------------------------------------------------------------------
1716  * waitThread is the external interface for running a new computation
1717  * and waiting for the result.
1718  *
1719  * In the non-SMP case, we create a new main thread, push it on the 
1720  * main-thread stack, and invoke the scheduler to run it.  The
1721  * scheduler will return when the top main thread on the stack has
1722  * completed or died, and fill in the necessary fields of the
1723  * main_thread structure.
1724  *
1725  * In the SMP case, we create a main thread as before, but we then
1726  * create a new condition variable and sleep on it.  When our new
1727  * main thread has completed, we'll be woken up and the status/result
1728  * will be in the main_thread struct.
1729  * -------------------------------------------------------------------------- */
1730
1731 SchedulerStatus
1732 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1733 {
1734   StgMainThread *m;
1735   SchedulerStatus stat;
1736
1737   ACQUIRE_LOCK(&sched_mutex);
1738   
1739   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1740
1741   m->tso = tso;
1742   m->ret = ret;
1743   m->stat = NoStatus;
1744 #ifdef SMP
1745   pthread_cond_init(&m->wakeup, NULL);
1746 #endif
1747
1748   m->link = main_threads;
1749   main_threads = m;
1750
1751   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1752                               m->tso->id));
1753
1754 #ifdef SMP
1755   do {
1756     pthread_cond_wait(&m->wakeup, &sched_mutex);
1757   } while (m->stat == NoStatus);
1758 #elif defined(GRAN)
1759   /* GranSim specific init */
1760   CurrentTSO = m->tso;                // the TSO to run
1761   procStatus[MainProc] = Busy;        // status of main PE
1762   CurrentProc = MainProc;             // PE to run it on
1763
1764   schedule();
1765 #else
1766   schedule();
1767   ASSERT(m->stat != NoStatus);
1768 #endif
1769
1770   stat = m->stat;
1771
1772 #ifdef SMP
1773   pthread_cond_destroy(&m->wakeup);
1774 #endif
1775
1776   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1777                               m->tso->id));
1778   free(m);
1779
1780   RELEASE_LOCK(&sched_mutex);
1781
1782   return stat;
1783 }
1784
1785 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1786 //@subsection Run queue code 
1787
1788 #if 0
1789 /* 
1790    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1791        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1792        implicit global variable that has to be correct when calling these
1793        fcts -- HWL 
1794 */
1795
1796 /* Put the new thread on the head of the runnable queue.
1797  * The caller of createThread better push an appropriate closure
1798  * on this thread's stack before the scheduler is invoked.
1799  */
1800 static /* inline */ void
1801 add_to_run_queue(tso)
1802 StgTSO* tso; 
1803 {
1804   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1805   tso->link = run_queue_hd;
1806   run_queue_hd = tso;
1807   if (run_queue_tl == END_TSO_QUEUE) {
1808     run_queue_tl = tso;
1809   }
1810 }
1811
1812 /* Put the new thread at the end of the runnable queue. */
1813 static /* inline */ void
1814 push_on_run_queue(tso)
1815 StgTSO* tso; 
1816 {
1817   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1818   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1819   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1820   if (run_queue_hd == END_TSO_QUEUE) {
1821     run_queue_hd = tso;
1822   } else {
1823     run_queue_tl->link = tso;
1824   }
1825   run_queue_tl = tso;
1826 }
1827
1828 /* 
1829    Should be inlined because it's used very often in schedule.  The tso
1830    argument is actually only needed in GranSim, where we want to have the
1831    possibility to schedule *any* TSO on the run queue, irrespective of the
1832    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1833    the run queue and dequeue the tso, adjusting the links in the queue. 
1834 */
1835 //@cindex take_off_run_queue
1836 static /* inline */ StgTSO*
1837 take_off_run_queue(StgTSO *tso) {
1838   StgTSO *t, *prev;
1839
1840   /* 
1841      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1842
1843      if tso is specified, unlink that tso from the run_queue (doesn't have
1844      to be at the beginning of the queue); GranSim only 
1845   */
1846   if (tso!=END_TSO_QUEUE) {
1847     /* find tso in queue */
1848     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1849          t!=END_TSO_QUEUE && t!=tso;
1850          prev=t, t=t->link) 
1851       /* nothing */ ;
1852     ASSERT(t==tso);
1853     /* now actually dequeue the tso */
1854     if (prev!=END_TSO_QUEUE) {
1855       ASSERT(run_queue_hd!=t);
1856       prev->link = t->link;
1857     } else {
1858       /* t is at beginning of thread queue */
1859       ASSERT(run_queue_hd==t);
1860       run_queue_hd = t->link;
1861     }
1862     /* t is at end of thread queue */
1863     if (t->link==END_TSO_QUEUE) {
1864       ASSERT(t==run_queue_tl);
1865       run_queue_tl = prev;
1866     } else {
1867       ASSERT(run_queue_tl!=t);
1868     }
1869     t->link = END_TSO_QUEUE;
1870   } else {
1871     /* take tso from the beginning of the queue; std concurrent code */
1872     t = run_queue_hd;
1873     if (t != END_TSO_QUEUE) {
1874       run_queue_hd = t->link;
1875       t->link = END_TSO_QUEUE;
1876       if (run_queue_hd == END_TSO_QUEUE) {
1877         run_queue_tl = END_TSO_QUEUE;
1878       }
1879     }
1880   }
1881   return t;
1882 }
1883
1884 #endif /* 0 */
1885
1886 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1887 //@subsection Garbage Collextion Routines
1888
1889 /* ---------------------------------------------------------------------------
1890    Where are the roots that we know about?
1891
1892         - all the threads on the runnable queue
1893         - all the threads on the blocked queue
1894         - all the thread currently executing a _ccall_GC
1895         - all the "main threads"
1896      
1897    ------------------------------------------------------------------------ */
1898
1899 /* This has to be protected either by the scheduler monitor, or by the
1900         garbage collection monitor (probably the latter).
1901         KH @ 25/10/99
1902 */
1903
1904 static void GetRoots(void)
1905 {
1906   StgMainThread *m;
1907
1908 #if defined(GRAN)
1909   {
1910     nat i;
1911     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1912       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1913         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1914       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1915         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1916       
1917       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1918         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1919       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1920         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1921       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1922         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1923     }
1924   }
1925
1926   markEventQueue();
1927
1928 #else /* !GRAN */
1929   if (run_queue_hd != END_TSO_QUEUE) {
1930     ASSERT(run_queue_tl != END_TSO_QUEUE);
1931     run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1932     run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1933   }
1934
1935   if (blocked_queue_hd != END_TSO_QUEUE) {
1936     ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1937     blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1938     blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1939   }
1940 #endif 
1941
1942   for (m = main_threads; m != NULL; m = m->link) {
1943     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1944   }
1945   if (suspended_ccalling_threads != END_TSO_QUEUE)
1946     suspended_ccalling_threads = 
1947       (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1948
1949 #if defined(SMP) || defined(PAR) || defined(GRAN)
1950   markSparkQueue();
1951 #endif
1952 }
1953
1954 /* -----------------------------------------------------------------------------
1955    performGC
1956
1957    This is the interface to the garbage collector from Haskell land.
1958    We provide this so that external C code can allocate and garbage
1959    collect when called from Haskell via _ccall_GC.
1960
1961    It might be useful to provide an interface whereby the programmer
1962    can specify more roots (ToDo).
1963    
1964    This needs to be protected by the GC condition variable above.  KH.
1965    -------------------------------------------------------------------------- */
1966
1967 void (*extra_roots)(void);
1968
1969 void
1970 performGC(void)
1971 {
1972   GarbageCollect(GetRoots);
1973 }
1974
1975 static void
1976 AllRoots(void)
1977 {
1978   GetRoots();                   /* the scheduler's roots */
1979   extra_roots();                /* the user's roots */
1980 }
1981
1982 void
1983 performGCWithRoots(void (*get_roots)(void))
1984 {
1985   extra_roots = get_roots;
1986
1987   GarbageCollect(AllRoots);
1988 }
1989
1990 /* -----------------------------------------------------------------------------
1991    Stack overflow
1992
1993    If the thread has reached its maximum stack size, then raise the
1994    StackOverflow exception in the offending thread.  Otherwise
1995    relocate the TSO into a larger chunk of memory and adjust its stack
1996    size appropriately.
1997    -------------------------------------------------------------------------- */
1998
1999 static StgTSO *
2000 threadStackOverflow(StgTSO *tso)
2001 {
2002   nat new_stack_size, new_tso_size, diff, stack_words;
2003   StgPtr new_sp;
2004   StgTSO *dest;
2005
2006   IF_DEBUG(sanity,checkTSO(tso));
2007   if (tso->stack_size >= tso->max_stack_size) {
2008
2009     IF_DEBUG(gc,
2010              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2011                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2012              /* If we're debugging, just print out the top of the stack */
2013              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2014                                               tso->sp+64)));
2015
2016 #ifdef INTERPRETER
2017     fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
2018     exit(1);
2019 #else
2020     /* Send this thread the StackOverflow exception */
2021     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2022 #endif
2023     return tso;
2024   }
2025
2026   /* Try to double the current stack size.  If that takes us over the
2027    * maximum stack size for this thread, then use the maximum instead.
2028    * Finally round up so the TSO ends up as a whole number of blocks.
2029    */
2030   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2031   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2032                                        TSO_STRUCT_SIZE)/sizeof(W_);
2033   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2034   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2035
2036   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2037
2038   dest = (StgTSO *)allocate(new_tso_size);
2039   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2040
2041   /* copy the TSO block and the old stack into the new area */
2042   memcpy(dest,tso,TSO_STRUCT_SIZE);
2043   stack_words = tso->stack + tso->stack_size - tso->sp;
2044   new_sp = (P_)dest + new_tso_size - stack_words;
2045   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2046
2047   /* relocate the stack pointers... */
2048   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2049   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2050   dest->sp    = new_sp;
2051   dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2052   dest->stack_size = new_stack_size;
2053         
2054   /* and relocate the update frame list */
2055   relocate_TSO(tso, dest);
2056
2057   /* Mark the old TSO as relocated.  We have to check for relocated
2058    * TSOs in the garbage collector and any primops that deal with TSOs.
2059    *
2060    * It's important to set the sp and su values to just beyond the end
2061    * of the stack, so we don't attempt to scavenge any part of the
2062    * dead TSO's stack.
2063    */
2064   tso->what_next = ThreadRelocated;
2065   tso->link = dest;
2066   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2067   tso->su = (StgUpdateFrame *)tso->sp;
2068   tso->why_blocked = NotBlocked;
2069   dest->mut_link = NULL;
2070
2071   IF_PAR_DEBUG(verbose,
2072                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2073                      tso->id, tso, tso->stack_size);
2074                /* If we're debugging, just print out the top of the stack */
2075                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2076                                                 tso->sp+64)));
2077   
2078   IF_DEBUG(sanity,checkTSO(tso));
2079 #if 0
2080   IF_DEBUG(scheduler,printTSO(dest));
2081 #endif
2082
2083   return dest;
2084 }
2085
2086 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2087 //@subsection Blocking Queue Routines
2088
2089 /* ---------------------------------------------------------------------------
2090    Wake up a queue that was blocked on some resource.
2091    ------------------------------------------------------------------------ */
2092
2093 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
2094
2095 #if defined(GRAN)
2096 static inline void
2097 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2098 {
2099 }
2100 #elif defined(PAR)
2101 static inline void
2102 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2103 {
2104   /* write RESUME events to log file and
2105      update blocked and fetch time (depending on type of the orig closure) */
2106   if (RtsFlags.ParFlags.ParStats.Full) {
2107     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2108                      GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2109                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2110
2111     switch (get_itbl(node)->type) {
2112         case FETCH_ME_BQ:
2113           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2114           break;
2115         case RBH:
2116         case FETCH_ME:
2117         case BLACKHOLE_BQ:
2118           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2119           break;
2120         default:
2121           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2122         }
2123       }
2124 }
2125 #endif
2126
2127 #if defined(GRAN)
2128 static StgBlockingQueueElement *
2129 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2130 {
2131     StgTSO *tso;
2132     PEs node_loc, tso_loc;
2133
2134     node_loc = where_is(node); // should be lifted out of loop
2135     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2136     tso_loc = where_is((StgClosure *)tso);
2137     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2138       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2139       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2140       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2141       // insertThread(tso, node_loc);
2142       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2143                 ResumeThread,
2144                 tso, node, (rtsSpark*)NULL);
2145       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2146       // len_local++;
2147       // len++;
2148     } else { // TSO is remote (actually should be FMBQ)
2149       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2150                                   RtsFlags.GranFlags.Costs.gunblocktime +
2151                                   RtsFlags.GranFlags.Costs.latency;
2152       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2153                 UnblockThread,
2154                 tso, node, (rtsSpark*)NULL);
2155       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2156       // len++;
2157     }
2158     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2159     IF_GRAN_DEBUG(bq,
2160                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2161                           (node_loc==tso_loc ? "Local" : "Global"), 
2162                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2163     tso->block_info.closure = NULL;
2164     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2165                              tso->id, tso));
2166 }
2167 #elif defined(PAR)
2168 static StgBlockingQueueElement *
2169 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2170 {
2171     StgBlockingQueueElement *next;
2172
2173     switch (get_itbl(bqe)->type) {
2174     case TSO:
2175       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2176       /* if it's a TSO just push it onto the run_queue */
2177       next = bqe->link;
2178       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2179       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2180       THREAD_RUNNABLE();
2181       unblockCount(bqe, node);
2182       /* reset blocking status after dumping event */
2183       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2184       break;
2185
2186     case BLOCKED_FETCH:
2187       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2188       next = bqe->link;
2189       bqe->link = PendingFetches;
2190       PendingFetches = bqe;
2191       break;
2192
2193 # if defined(DEBUG)
2194       /* can ignore this case in a non-debugging setup; 
2195          see comments on RBHSave closures above */
2196     case CONSTR:
2197       /* check that the closure is an RBHSave closure */
2198       ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2199              get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2200              get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2201       break;
2202
2203     default:
2204       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2205            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2206            (StgClosure *)bqe);
2207 # endif
2208     }
2209   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2210   return next;
2211 }
2212
2213 #else /* !GRAN && !PAR */
2214 static StgTSO *
2215 unblockOneLocked(StgTSO *tso)
2216 {
2217   StgTSO *next;
2218
2219   ASSERT(get_itbl(tso)->type == TSO);
2220   ASSERT(tso->why_blocked != NotBlocked);
2221   tso->why_blocked = NotBlocked;
2222   next = tso->link;
2223   PUSH_ON_RUN_QUEUE(tso);
2224   THREAD_RUNNABLE();
2225   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2226   return next;
2227 }
2228 #endif
2229
2230 #if defined(GRAN) || defined(PAR)
2231 inline StgBlockingQueueElement *
2232 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2233 {
2234   ACQUIRE_LOCK(&sched_mutex);
2235   bqe = unblockOneLocked(bqe, node);
2236   RELEASE_LOCK(&sched_mutex);
2237   return bqe;
2238 }
2239 #else
2240 inline StgTSO *
2241 unblockOne(StgTSO *tso)
2242 {
2243   ACQUIRE_LOCK(&sched_mutex);
2244   tso = unblockOneLocked(tso);
2245   RELEASE_LOCK(&sched_mutex);
2246   return tso;
2247 }
2248 #endif
2249
2250 #if defined(GRAN)
2251 void 
2252 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2253 {
2254   StgBlockingQueueElement *bqe;
2255   PEs node_loc;
2256   nat len = 0; 
2257
2258   IF_GRAN_DEBUG(bq, 
2259                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2260                       node, CurrentProc, CurrentTime[CurrentProc], 
2261                       CurrentTSO->id, CurrentTSO));
2262
2263   node_loc = where_is(node);
2264
2265   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2266          get_itbl(q)->type == CONSTR); // closure (type constructor)
2267   ASSERT(is_unique(node));
2268
2269   /* FAKE FETCH: magically copy the node to the tso's proc;
2270      no Fetch necessary because in reality the node should not have been 
2271      moved to the other PE in the first place
2272   */
2273   if (CurrentProc!=node_loc) {
2274     IF_GRAN_DEBUG(bq, 
2275                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2276                         node, node_loc, CurrentProc, CurrentTSO->id, 
2277                         // CurrentTSO, where_is(CurrentTSO),
2278                         node->header.gran.procs));
2279     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2280     IF_GRAN_DEBUG(bq, 
2281                   belch("## new bitmask of node %p is %#x",
2282                         node, node->header.gran.procs));
2283     if (RtsFlags.GranFlags.GranSimStats.Global) {
2284       globalGranStats.tot_fake_fetches++;
2285     }
2286   }
2287
2288   bqe = q;
2289   // ToDo: check: ASSERT(CurrentProc==node_loc);
2290   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2291     //next = bqe->link;
2292     /* 
2293        bqe points to the current element in the queue
2294        next points to the next element in the queue
2295     */
2296     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2297     //tso_loc = where_is(tso);
2298     len++;
2299     bqe = unblockOneLocked(bqe, node);
2300   }
2301
2302   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2303      the closure to make room for the anchor of the BQ */
2304   if (bqe!=END_BQ_QUEUE) {
2305     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2306     /*
2307     ASSERT((info_ptr==&RBH_Save_0_info) ||
2308            (info_ptr==&RBH_Save_1_info) ||
2309            (info_ptr==&RBH_Save_2_info));
2310     */
2311     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2312     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2313     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2314
2315     IF_GRAN_DEBUG(bq,
2316                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2317                         node, info_type(node)));
2318   }
2319
2320   /* statistics gathering */
2321   if (RtsFlags.GranFlags.GranSimStats.Global) {
2322     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2323     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2324     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2325     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2326   }
2327   IF_GRAN_DEBUG(bq,
2328                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2329                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2330 }
2331 #elif defined(PAR)
2332 void 
2333 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2334 {
2335   StgBlockingQueueElement *bqe, *next;
2336
2337   ACQUIRE_LOCK(&sched_mutex);
2338
2339   IF_PAR_DEBUG(verbose, 
2340                belch("## AwBQ for node %p on [%x]: ",
2341                      node, mytid));
2342
2343   ASSERT(get_itbl(q)->type == TSO ||           
2344          get_itbl(q)->type == BLOCKED_FETCH || 
2345          get_itbl(q)->type == CONSTR); 
2346
2347   bqe = q;
2348   while (get_itbl(bqe)->type==TSO || 
2349          get_itbl(bqe)->type==BLOCKED_FETCH) {
2350     bqe = unblockOneLocked(bqe, node);
2351   }
2352   RELEASE_LOCK(&sched_mutex);
2353 }
2354
2355 #else   /* !GRAN && !PAR */
2356 void
2357 awakenBlockedQueue(StgTSO *tso)
2358 {
2359   ACQUIRE_LOCK(&sched_mutex);
2360   while (tso != END_TSO_QUEUE) {
2361     tso = unblockOneLocked(tso);
2362   }
2363   RELEASE_LOCK(&sched_mutex);
2364 }
2365 #endif
2366
2367 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2368 //@subsection Exception Handling Routines
2369
2370 /* ---------------------------------------------------------------------------
2371    Interrupt execution
2372    - usually called inside a signal handler so it mustn't do anything fancy.   
2373    ------------------------------------------------------------------------ */
2374
2375 void
2376 interruptStgRts(void)
2377 {
2378     interrupted    = 1;
2379     context_switch = 1;
2380 }
2381
2382 /* -----------------------------------------------------------------------------
2383    Unblock a thread
2384
2385    This is for use when we raise an exception in another thread, which
2386    may be blocked.
2387    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2388    -------------------------------------------------------------------------- */
2389
2390 #if defined(GRAN) || defined(PAR)
2391 /*
2392   NB: only the type of the blocking queue is different in GranSim and GUM
2393       the operations on the queue-elements are the same
2394       long live polymorphism!
2395 */
2396 static void
2397 unblockThread(StgTSO *tso)
2398 {
2399   StgBlockingQueueElement *t, **last;
2400
2401   ACQUIRE_LOCK(&sched_mutex);
2402   switch (tso->why_blocked) {
2403
2404   case NotBlocked:
2405     return;  /* not blocked */
2406
2407   case BlockedOnMVar:
2408     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2409     {
2410       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2411       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2412
2413       last = (StgBlockingQueueElement **)&mvar->head;
2414       for (t = (StgBlockingQueueElement *)mvar->head; 
2415            t != END_BQ_QUEUE; 
2416            last = &t->link, last_tso = t, t = t->link) {
2417         if (t == (StgBlockingQueueElement *)tso) {
2418           *last = (StgBlockingQueueElement *)tso->link;
2419           if (mvar->tail == tso) {
2420             mvar->tail = (StgTSO *)last_tso;
2421           }
2422           goto done;
2423         }
2424       }
2425       barf("unblockThread (MVAR): TSO not found");
2426     }
2427
2428   case BlockedOnBlackHole:
2429     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2430     {
2431       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2432
2433       last = &bq->blocking_queue;
2434       for (t = bq->blocking_queue; 
2435            t != END_BQ_QUEUE; 
2436            last = &t->link, t = t->link) {
2437         if (t == (StgBlockingQueueElement *)tso) {
2438           *last = (StgBlockingQueueElement *)tso->link;
2439           goto done;
2440         }
2441       }
2442       barf("unblockThread (BLACKHOLE): TSO not found");
2443     }
2444
2445   case BlockedOnException:
2446     {
2447       StgTSO *target  = tso->block_info.tso;
2448
2449       ASSERT(get_itbl(target)->type == TSO);
2450       ASSERT(target->blocked_exceptions != NULL);
2451
2452       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2453       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2454            t != END_BQ_QUEUE; 
2455            last = &t->link, t = t->link) {
2456         ASSERT(get_itbl(t)->type == TSO);
2457         if (t == (StgBlockingQueueElement *)tso) {
2458           *last = (StgBlockingQueueElement *)tso->link;
2459           goto done;
2460         }
2461       }
2462       barf("unblockThread (Exception): TSO not found");
2463     }
2464
2465   case BlockedOnDelay:
2466   case BlockedOnRead:
2467   case BlockedOnWrite:
2468     {
2469       StgBlockingQueueElement *prev = NULL;
2470       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2471            prev = t, t = t->link) {
2472         if (t == (StgBlockingQueueElement *)tso) {
2473           if (prev == NULL) {
2474             blocked_queue_hd = (StgTSO *)t->link;
2475             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2476               blocked_queue_tl = END_TSO_QUEUE;
2477             }
2478           } else {
2479             prev->link = t->link;
2480             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2481               blocked_queue_tl = (StgTSO *)prev;
2482             }
2483           }
2484           goto done;
2485         }
2486       }
2487       barf("unblockThread (I/O): TSO not found");
2488     }
2489
2490   default:
2491     barf("unblockThread");
2492   }
2493
2494  done:
2495   tso->link = END_TSO_QUEUE;
2496   tso->why_blocked = NotBlocked;
2497   tso->block_info.closure = NULL;
2498   PUSH_ON_RUN_QUEUE(tso);
2499   RELEASE_LOCK(&sched_mutex);
2500 }
2501 #else
2502 static void
2503 unblockThread(StgTSO *tso)
2504 {
2505   StgTSO *t, **last;
2506
2507   ACQUIRE_LOCK(&sched_mutex);
2508   switch (tso->why_blocked) {
2509
2510   case NotBlocked:
2511     return;  /* not blocked */
2512
2513   case BlockedOnMVar:
2514     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2515     {
2516       StgTSO *last_tso = END_TSO_QUEUE;
2517       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2518
2519       last = &mvar->head;
2520       for (t = mvar->head; t != END_TSO_QUEUE; 
2521            last = &t->link, last_tso = t, t = t->link) {
2522         if (t == tso) {
2523           *last = tso->link;
2524           if (mvar->tail == tso) {
2525             mvar->tail = last_tso;
2526           }
2527           goto done;
2528         }
2529       }
2530       barf("unblockThread (MVAR): TSO not found");
2531     }
2532
2533   case BlockedOnBlackHole:
2534     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2535     {
2536       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2537
2538       last = &bq->blocking_queue;
2539       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2540            last = &t->link, t = t->link) {
2541         if (t == tso) {
2542           *last = tso->link;
2543           goto done;
2544         }
2545       }
2546       barf("unblockThread (BLACKHOLE): TSO not found");
2547     }
2548
2549   case BlockedOnException:
2550     {
2551       StgTSO *target  = tso->block_info.tso;
2552
2553       ASSERT(get_itbl(target)->type == TSO);
2554       ASSERT(target->blocked_exceptions != NULL);
2555
2556       last = &target->blocked_exceptions;
2557       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2558            last = &t->link, t = t->link) {
2559         ASSERT(get_itbl(t)->type == TSO);
2560         if (t == tso) {
2561           *last = tso->link;
2562           goto done;
2563         }
2564       }
2565       barf("unblockThread (Exception): TSO not found");
2566     }
2567
2568   case BlockedOnDelay:
2569   case BlockedOnRead:
2570   case BlockedOnWrite:
2571     {
2572       StgTSO *prev = NULL;
2573       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2574            prev = t, t = t->link) {
2575         if (t == tso) {
2576           if (prev == NULL) {
2577             blocked_queue_hd = t->link;
2578             if (blocked_queue_tl == t) {
2579               blocked_queue_tl = END_TSO_QUEUE;
2580             }
2581           } else {
2582             prev->link = t->link;
2583             if (blocked_queue_tl == t) {
2584               blocked_queue_tl = prev;
2585             }
2586           }
2587           goto done;
2588         }
2589       }
2590       barf("unblockThread (I/O): TSO not found");
2591     }
2592
2593   default:
2594     barf("unblockThread");
2595   }
2596
2597  done:
2598   tso->link = END_TSO_QUEUE;
2599   tso->why_blocked = NotBlocked;
2600   tso->block_info.closure = NULL;
2601   PUSH_ON_RUN_QUEUE(tso);
2602   RELEASE_LOCK(&sched_mutex);
2603 }
2604 #endif
2605
2606 /* -----------------------------------------------------------------------------
2607  * raiseAsync()
2608  *
2609  * The following function implements the magic for raising an
2610  * asynchronous exception in an existing thread.
2611  *
2612  * We first remove the thread from any queue on which it might be
2613  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2614  *
2615  * We strip the stack down to the innermost CATCH_FRAME, building
2616  * thunks in the heap for all the active computations, so they can 
2617  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2618  * an application of the handler to the exception, and push it on
2619  * the top of the stack.
2620  * 
2621  * How exactly do we save all the active computations?  We create an
2622  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2623  * AP_UPDs pushes everything from the corresponding update frame
2624  * upwards onto the stack.  (Actually, it pushes everything up to the
2625  * next update frame plus a pointer to the next AP_UPD object.
2626  * Entering the next AP_UPD object pushes more onto the stack until we
2627  * reach the last AP_UPD object - at which point the stack should look
2628  * exactly as it did when we killed the TSO and we can continue
2629  * execution by entering the closure on top of the stack.
2630  *
2631  * We can also kill a thread entirely - this happens if either (a) the 
2632  * exception passed to raiseAsync is NULL, or (b) there's no
2633  * CATCH_FRAME on the stack.  In either case, we strip the entire
2634  * stack and replace the thread with a zombie.
2635  *
2636  * -------------------------------------------------------------------------- */
2637  
2638 void 
2639 deleteThread(StgTSO *tso)
2640 {
2641   raiseAsync(tso,NULL);
2642 }
2643
2644 void
2645 raiseAsync(StgTSO *tso, StgClosure *exception)
2646 {
2647   StgUpdateFrame* su = tso->su;
2648   StgPtr          sp = tso->sp;
2649   
2650   /* Thread already dead? */
2651   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2652     return;
2653   }
2654
2655   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2656
2657   /* Remove it from any blocking queues */
2658   unblockThread(tso);
2659
2660   /* The stack freezing code assumes there's a closure pointer on
2661    * the top of the stack.  This isn't always the case with compiled
2662    * code, so we have to push a dummy closure on the top which just
2663    * returns to the next return address on the stack.
2664    */
2665   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2666     *(--sp) = (W_)&dummy_ret_closure;
2667   }
2668
2669   while (1) {
2670     int words = ((P_)su - (P_)sp) - 1;
2671     nat i;
2672     StgAP_UPD * ap;
2673
2674     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2675      * then build PAP(handler,exception,realworld#), and leave it on
2676      * top of the stack ready to enter.
2677      */
2678     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2679       StgCatchFrame *cf = (StgCatchFrame *)su;
2680       /* we've got an exception to raise, so let's pass it to the
2681        * handler in this frame.
2682        */
2683       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2684       TICK_ALLOC_UPD_PAP(3,0);
2685       SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2686               
2687       ap->n_args = 2;
2688       ap->fun = cf->handler;    /* :: Exception -> IO a */
2689       ap->payload[0] = (P_)exception;
2690       ap->payload[1] = ARG_TAG(0); /* realworld token */
2691
2692       /* throw away the stack from Sp up to and including the
2693        * CATCH_FRAME.
2694        */
2695       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2696       tso->su = cf->link;
2697
2698       /* Restore the blocked/unblocked state for asynchronous exceptions
2699        * at the CATCH_FRAME.  
2700        *
2701        * If exceptions were unblocked at the catch, arrange that they
2702        * are unblocked again after executing the handler by pushing an
2703        * unblockAsyncExceptions_ret stack frame.
2704        */
2705       if (!cf->exceptions_blocked) {
2706         *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2707       }
2708       
2709       /* Ensure that async exceptions are blocked when running the handler.
2710        */
2711       if (tso->blocked_exceptions == NULL) {
2712         tso->blocked_exceptions = END_TSO_QUEUE;
2713       }
2714       
2715       /* Put the newly-built PAP on top of the stack, ready to execute
2716        * when the thread restarts.
2717        */
2718       sp[0] = (W_)ap;
2719       tso->sp = sp;
2720       tso->what_next = ThreadEnterGHC;
2721       return;
2722     }
2723
2724     /* First build an AP_UPD consisting of the stack chunk above the
2725      * current update frame, with the top word on the stack as the
2726      * fun field.
2727      */
2728     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2729     
2730     ASSERT(words >= 0);
2731     
2732     ap->n_args = words;
2733     ap->fun    = (StgClosure *)sp[0];
2734     sp++;
2735     for(i=0; i < (nat)words; ++i) {
2736       ap->payload[i] = (P_)*sp++;
2737     }
2738     
2739     switch (get_itbl(su)->type) {
2740       
2741     case UPDATE_FRAME:
2742       {
2743         SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2744         TICK_ALLOC_UP_THK(words+1,0);
2745         
2746         IF_DEBUG(scheduler,
2747                  fprintf(stderr,  "scheduler: Updating ");
2748                  printPtr((P_)su->updatee); 
2749                  fprintf(stderr,  " with ");
2750                  printObj((StgClosure *)ap);
2751                  );
2752         
2753         /* Replace the updatee with an indirection - happily
2754          * this will also wake up any threads currently
2755          * waiting on the result.
2756          */
2757         UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2758         su = su->link;
2759         sp += sizeofW(StgUpdateFrame) -1;
2760         sp[0] = (W_)ap; /* push onto stack */
2761         break;
2762       }
2763       
2764     case CATCH_FRAME:
2765       {
2766         StgCatchFrame *cf = (StgCatchFrame *)su;
2767         StgClosure* o;
2768         
2769         /* We want a PAP, not an AP_UPD.  Fortunately, the
2770          * layout's the same.
2771          */
2772         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2773         TICK_ALLOC_UPD_PAP(words+1,0);
2774         
2775         /* now build o = FUN(catch,ap,handler) */
2776         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2777         TICK_ALLOC_FUN(2,0);
2778         SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2779         o->payload[0] = (StgClosure *)ap;
2780         o->payload[1] = cf->handler;
2781         
2782         IF_DEBUG(scheduler,
2783                  fprintf(stderr,  "scheduler: Built ");
2784                  printObj((StgClosure *)o);
2785                  );
2786         
2787         /* pop the old handler and put o on the stack */
2788         su = cf->link;
2789         sp += sizeofW(StgCatchFrame) - 1;
2790         sp[0] = (W_)o;
2791         break;
2792       }
2793       
2794     case SEQ_FRAME:
2795       {
2796         StgSeqFrame *sf = (StgSeqFrame *)su;
2797         StgClosure* o;
2798         
2799         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2800         TICK_ALLOC_UPD_PAP(words+1,0);
2801         
2802         /* now build o = FUN(seq,ap) */
2803         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2804         TICK_ALLOC_SE_THK(1,0);
2805         SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2806         o->payload[0] = (StgClosure *)ap;
2807         
2808         IF_DEBUG(scheduler,
2809                  fprintf(stderr,  "scheduler: Built ");
2810                  printObj((StgClosure *)o);
2811                  );
2812         
2813         /* pop the old handler and put o on the stack */
2814         su = sf->link;
2815         sp += sizeofW(StgSeqFrame) - 1;
2816         sp[0] = (W_)o;
2817         break;
2818       }
2819       
2820     case STOP_FRAME:
2821       /* We've stripped the entire stack, the thread is now dead. */
2822       sp += sizeofW(StgStopFrame) - 1;
2823       sp[0] = (W_)exception;    /* save the exception */
2824       tso->what_next = ThreadKilled;
2825       tso->su = (StgUpdateFrame *)(sp+1);
2826       tso->sp = sp;
2827       return;
2828       
2829     default:
2830       barf("raiseAsync");
2831     }
2832   }
2833   barf("raiseAsync");
2834 }
2835
2836 /* -----------------------------------------------------------------------------
2837    resurrectThreads is called after garbage collection on the list of
2838    threads found to be garbage.  Each of these threads will be woken
2839    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2840    on an MVar, or NonTermination if the thread was blocked on a Black
2841    Hole.
2842    -------------------------------------------------------------------------- */
2843
2844 void
2845 resurrectThreads( StgTSO *threads )
2846 {
2847   StgTSO *tso, *next;
2848
2849   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2850     next = tso->global_link;
2851     tso->global_link = all_threads;
2852     all_threads = tso;
2853     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2854
2855     switch (tso->why_blocked) {
2856     case BlockedOnMVar:
2857     case BlockedOnException:
2858       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2859       break;
2860     case BlockedOnBlackHole:
2861       raiseAsync(tso,(StgClosure *)NonTermination_closure);
2862       break;
2863     case NotBlocked:
2864       /* This might happen if the thread was blocked on a black hole
2865        * belonging to a thread that we've just woken up (raiseAsync
2866        * can wake up threads, remember...).
2867        */
2868       continue;
2869     default:
2870       barf("resurrectThreads: thread blocked in a strange way");
2871     }
2872   }
2873 }
2874
2875 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2876 //@subsection Debugging Routines
2877
2878 /* -----------------------------------------------------------------------------
2879    Debugging: why is a thread blocked
2880    -------------------------------------------------------------------------- */
2881
2882 #ifdef DEBUG
2883
2884 void
2885 printThreadBlockage(StgTSO *tso)
2886 {
2887   switch (tso->why_blocked) {
2888   case BlockedOnRead:
2889     fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2890     break;
2891   case BlockedOnWrite:
2892     fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2893     break;
2894   case BlockedOnDelay:
2895 #if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
2896     fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2897 #else
2898     fprintf(stderr,"blocked on delay of %d ms", 
2899             tso->block_info.target - getourtimeofday());
2900 #endif
2901     break;
2902   case BlockedOnMVar:
2903     fprintf(stderr,"blocked on an MVar");
2904     break;
2905   case BlockedOnException:
2906     fprintf(stderr,"blocked on delivering an exception to thread %d",
2907             tso->block_info.tso->id);
2908     break;
2909   case BlockedOnBlackHole:
2910     fprintf(stderr,"blocked on a black hole");
2911     break;
2912   case NotBlocked:
2913     fprintf(stderr,"not blocked");
2914     break;
2915 #if defined(PAR)
2916   case BlockedOnGA:
2917     fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)",
2918             tso->block_info.closure, info_type(tso->block_info.closure));
2919     break;
2920   case BlockedOnGA_NoSend:
2921     fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)",
2922             tso->block_info.closure, info_type(tso->block_info.closure));
2923     break;
2924 #endif
2925   default:
2926     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
2927          tso->why_blocked, tso->id, tso);
2928   }
2929 }
2930
2931 void
2932 printThreadStatus(StgTSO *tso)
2933 {
2934   switch (tso->what_next) {
2935   case ThreadKilled:
2936     fprintf(stderr,"has been killed");
2937     break;
2938   case ThreadComplete:
2939     fprintf(stderr,"has completed");
2940     break;
2941   default:
2942     printThreadBlockage(tso);
2943   }
2944 }
2945
2946 void
2947 printAllThreads(void)
2948 {
2949   StgTSO *t;
2950
2951   sched_belch("all threads:");
2952   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2953     fprintf(stderr, "\tthread %d is ", t->id);
2954     printThreadStatus(t);
2955     fprintf(stderr,"\n");
2956   }
2957 }
2958     
2959 /* 
2960    Print a whole blocking queue attached to node (debugging only).
2961 */
2962 //@cindex print_bq
2963 # if defined(PAR)
2964 void 
2965 print_bq (StgClosure *node)
2966 {
2967   StgBlockingQueueElement *bqe;
2968   StgTSO *tso;
2969   rtsBool end;
2970
2971   fprintf(stderr,"## BQ of closure %p (%s): ",
2972           node, info_type(node));
2973
2974   /* should cover all closures that may have a blocking queue */
2975   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2976          get_itbl(node)->type == FETCH_ME_BQ ||
2977          get_itbl(node)->type == RBH);
2978     
2979   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2980   /* 
2981      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2982   */
2983   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2984        !end; // iterate until bqe points to a CONSTR
2985        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2986     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2987     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2988     /* types of closures that may appear in a blocking queue */
2989     ASSERT(get_itbl(bqe)->type == TSO ||           
2990            get_itbl(bqe)->type == BLOCKED_FETCH || 
2991            get_itbl(bqe)->type == CONSTR); 
2992     /* only BQs of an RBH end with an RBH_Save closure */
2993     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2994
2995     switch (get_itbl(bqe)->type) {
2996     case TSO:
2997       fprintf(stderr," TSO %d (%x),",
2998               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
2999       break;
3000     case BLOCKED_FETCH:
3001       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3002               ((StgBlockedFetch *)bqe)->node, 
3003               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3004               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3005               ((StgBlockedFetch *)bqe)->ga.weight);
3006       break;
3007     case CONSTR:
3008       fprintf(stderr," %s (IP %p),",
3009               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3010                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3011                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3012                "RBH_Save_?"), get_itbl(bqe));
3013       break;
3014     default:
3015       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3016            info_type(bqe), node, info_type(node));
3017       break;
3018     }
3019   } /* for */
3020   fputc('\n', stderr);
3021 }
3022 # elif defined(GRAN)
3023 void 
3024 print_bq (StgClosure *node)
3025 {
3026   StgBlockingQueueElement *bqe;
3027   PEs node_loc, tso_loc;
3028   rtsBool end;
3029
3030   /* should cover all closures that may have a blocking queue */
3031   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3032          get_itbl(node)->type == FETCH_ME_BQ ||
3033          get_itbl(node)->type == RBH);
3034     
3035   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3036   node_loc = where_is(node);
3037
3038   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3039           node, info_type(node), node_loc);
3040
3041   /* 
3042      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3043   */
3044   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3045        !end; // iterate until bqe points to a CONSTR
3046        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3047     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3048     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3049     /* types of closures that may appear in a blocking queue */
3050     ASSERT(get_itbl(bqe)->type == TSO ||           
3051            get_itbl(bqe)->type == CONSTR); 
3052     /* only BQs of an RBH end with an RBH_Save closure */
3053     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3054
3055     tso_loc = where_is((StgClosure *)bqe);
3056     switch (get_itbl(bqe)->type) {
3057     case TSO:
3058       fprintf(stderr," TSO %d (%p) on [PE %d],",
3059               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3060       break;
3061     case CONSTR:
3062       fprintf(stderr," %s (IP %p),",
3063               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3064                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3065                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3066                "RBH_Save_?"), get_itbl(bqe));
3067       break;
3068     default:
3069       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3070            info_type((StgClosure *)bqe), node, info_type(node));
3071       break;
3072     }
3073   } /* for */
3074   fputc('\n', stderr);
3075 }
3076 #else
3077 /* 
3078    Nice and easy: only TSOs on the blocking queue
3079 */
3080 void 
3081 print_bq (StgClosure *node)
3082 {
3083   StgTSO *tso;
3084
3085   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3086   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3087        tso != END_TSO_QUEUE; 
3088        tso=tso->link) {
3089     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3090     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3091     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3092   }
3093   fputc('\n', stderr);
3094 }
3095 # endif
3096
3097 #if defined(PAR)
3098 static nat
3099 run_queue_len(void)
3100 {
3101   nat i;
3102   StgTSO *tso;
3103
3104   for (i=0, tso=run_queue_hd; 
3105        tso != END_TSO_QUEUE;
3106        i++, tso=tso->link)
3107     /* nothing */
3108
3109   return i;
3110 }
3111 #endif
3112
3113 static void
3114 sched_belch(char *s, ...)
3115 {
3116   va_list ap;
3117   va_start(ap,s);
3118 #ifdef SMP
3119   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3120 #else
3121   fprintf(stderr, "scheduler: ");
3122 #endif
3123   vfprintf(stderr, s, ap);
3124   fprintf(stderr, "\n");
3125 }
3126
3127 #endif /* DEBUG */
3128
3129
3130 //@node Index,  , Debugging Routines, Main scheduling code
3131 //@subsection Index
3132
3133 //@index
3134 //* MainRegTable::  @cindex\s-+MainRegTable
3135 //* StgMainThread::  @cindex\s-+StgMainThread
3136 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3137 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3138 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3139 //* context_switch::  @cindex\s-+context_switch
3140 //* createThread::  @cindex\s-+createThread
3141 //* free_capabilities::  @cindex\s-+free_capabilities
3142 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3143 //* initScheduler::  @cindex\s-+initScheduler
3144 //* interrupted::  @cindex\s-+interrupted
3145 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3146 //* next_thread_id::  @cindex\s-+next_thread_id
3147 //* print_bq::  @cindex\s-+print_bq
3148 //* run_queue_hd::  @cindex\s-+run_queue_hd
3149 //* run_queue_tl::  @cindex\s-+run_queue_tl
3150 //* sched_mutex::  @cindex\s-+sched_mutex
3151 //* schedule::  @cindex\s-+schedule
3152 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3153 //* task_ids::  @cindex\s-+task_ids
3154 //* term_mutex::  @cindex\s-+term_mutex
3155 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3156 //@end index