[project @ 2000-04-07 09:47:38 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.65 2000/04/07 09:47:38 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Main scheduling loop::      
41 //* Suspend and Resume::        
42 //* Run queue code::            
43 //* Garbage Collextion Routines::  
44 //* Blocking Queue Routines::   
45 //* Exception Handling Routines::  
46 //* Debugging Routines::        
47 //* Index::                     
48 //@end menu
49
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
52
53 #include "Rts.h"
54 #include "SchedAPI.h"
55 #include "RtsUtils.h"
56 #include "RtsFlags.h"
57 #include "Storage.h"
58 #include "StgRun.h"
59 #include "StgStartup.h"
60 #include "GC.h"
61 #include "Hooks.h"
62 #include "Schedule.h"
63 #include "StgMiscClosures.h"
64 #include "Storage.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
67 #include "Printer.h"
68 #include "Main.h"
69 #include "Signals.h"
70 #include "Sanity.h"
71 #include "Stats.h"
72 #include "Itimer.h"
73 #include "Prelude.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
76 # include "GranSim.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
80 # include "FetchMe.h"
81 # include "HLC.h"
82 #endif
83 #include "Sparks.h"
84
85 #include <stdarg.h>
86
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
89
90 /* Main threads:
91  *
92  * These are the threads which clients have requested that we run.  
93  *
94  * In an SMP build, we might have several concurrent clients all
95  * waiting for results, and each one will wait on a condition variable
96  * until the result is available.
97  *
98  * In non-SMP, clients are strictly nested: the first client calls
99  * into the RTS, which might call out again to C with a _ccall_GC, and
100  * eventually re-enter the RTS.
101  *
102  * Main threads information is kept in a linked list:
103  */
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
106   StgTSO *         tso;
107   SchedulerStatus  stat;
108   StgClosure **    ret;
109 #ifdef SMP
110   pthread_cond_t wakeup;
111 #endif
112   struct StgMainThread_ *link;
113 } StgMainThread;
114
115 /* Main thread queue.
116  * Locks required: sched_mutex.
117  */
118 static StgMainThread *main_threads;
119
120 /* Thread queues.
121  * Locks required: sched_mutex.
122  */
123 #if defined(GRAN)
124
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
127
128 /* 
129    In GranSim we have a runable and a blocked queue for each processor.
130    In order to minimise code changes new arrays run_queue_hds/tls
131    are created. run_queue_hd is then a short cut (macro) for
132    run_queue_hds[CurrentProc] (see GranSim.h).
133    -- HWL
134 */
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139    the std RTS (i.e. we are cheating). However, we don't use this list in
140    the GranSim specific code at the moment (so we are only potentially
141    cheating).  */
142
143 #else /* !GRAN */
144
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
147
148 #endif
149
150 /* Linked list of all threads.
151  * Used for detecting garbage collected threads.
152  */
153 StgTSO *all_threads;
154
155 /* Threads suspended in _ccall_GC.
156  */
157 static StgTSO *suspended_ccalling_threads;
158
159 static void GetRoots(void);
160 static StgTSO *threadStackOverflow(StgTSO *tso);
161
162 /* KH: The following two flags are shared memory locations.  There is no need
163        to lock them, since they are only unset at the end of a scheduler
164        operation.
165 */
166
167 /* flag set by signal handler to precipitate a context switch */
168 //@cindex context_switch
169 nat context_switch;
170
171 /* if this flag is set as well, give up execution */
172 //@cindex interrupted
173 rtsBool interrupted;
174
175 /* Next thread ID to allocate.
176  * Locks required: sched_mutex
177  */
178 //@cindex next_thread_id
179 StgThreadID next_thread_id = 1;
180
181 /*
182  * Pointers to the state of the current thread.
183  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
185  */
186  
187 /* The smallest stack size that makes any sense is:
188  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
189  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
190  *  + 1                       (the realworld token for an IO thread)
191  *  + 1                       (the closure to enter)
192  *
193  * A thread with this stack will bomb immediately with a stack
194  * overflow, which will increase its stack size.  
195  */
196
197 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
198
199 /* Free capability list.
200  * Locks required: sched_mutex.
201  */
202 #ifdef SMP
203 //@cindex free_capabilities
204 //@cindex n_free_capabilities
205 Capability *free_capabilities; /* Available capabilities for running threads */
206 nat n_free_capabilities;       /* total number of available capabilities */
207 #else
208 //@cindex MainRegTable
209 Capability MainRegTable;       /* for non-SMP, we have one global capability */
210 #endif
211
212 #if defined(GRAN)
213 StgTSO *CurrentTSO;
214 #endif
215
216 rtsBool ready_to_gc;
217
218 /* All our current task ids, saved in case we need to kill them later.
219  */
220 #ifdef SMP
221 //@cindex task_ids
222 task_info *task_ids;
223 #endif
224
225 void            addToBlockedQueue ( StgTSO *tso );
226
227 static void     schedule          ( void );
228        void     interruptStgRts   ( void );
229 #if defined(GRAN)
230 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
231 #else
232 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
233 #endif
234
235 #ifdef DEBUG
236 static void sched_belch(char *s, ...);
237 #endif
238
239 #ifdef SMP
240 //@cindex sched_mutex
241 //@cindex term_mutex
242 //@cindex thread_ready_cond
243 //@cindex gc_pending_cond
244 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
245 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
246 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
247 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
248
249 nat await_death;
250 #endif
251
252 #if defined(PAR)
253 StgTSO *LastTSO;
254 rtsTime TimeOfLastYield;
255 #endif
256
257 #if DEBUG
258 char *whatNext_strs[] = {
259   "ThreadEnterGHC",
260   "ThreadRunGHC",
261   "ThreadEnterHugs",
262   "ThreadKilled",
263   "ThreadComplete"
264 };
265
266 char *threadReturnCode_strs[] = {
267   "HeapOverflow",                       /* might also be StackOverflow */
268   "StackOverflow",
269   "ThreadYielding",
270   "ThreadBlocked",
271   "ThreadFinished"
272 };
273 #endif
274
275 /*
276  * The thread state for the main thread.
277 // ToDo: check whether not needed any more
278 StgTSO   *MainTSO;
279  */
280
281 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
282 //@subsection Main scheduling loop
283
284 /* ---------------------------------------------------------------------------
285    Main scheduling loop.
286
287    We use round-robin scheduling, each thread returning to the
288    scheduler loop when one of these conditions is detected:
289
290       * out of heap space
291       * timer expires (thread yields)
292       * thread blocks
293       * thread ends
294       * stack overflow
295
296    Locking notes:  we acquire the scheduler lock once at the beginning
297    of the scheduler loop, and release it when
298     
299       * running a thread, or
300       * waiting for work, or
301       * waiting for a GC to complete.
302
303    GRAN version:
304      In a GranSim setup this loop iterates over the global event queue.
305      This revolves around the global event queue, which determines what 
306      to do next. Therefore, it's more complicated than either the 
307      concurrent or the parallel (GUM) setup.
308
309    GUM version:
310      GUM iterates over incoming messages.
311      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312      and sends out a fish whenever it has nothing to do; in-between
313      doing the actual reductions (shared code below) it processes the
314      incoming messages and deals with delayed operations 
315      (see PendingFetches).
316      This is not the ugliest code you could imagine, but it's bloody close.
317
318    ------------------------------------------------------------------------ */
319 //@cindex schedule
320 static void
321 schedule( void )
322 {
323   StgTSO *t;
324   Capability *cap;
325   StgThreadReturnCode ret;
326 #if defined(GRAN)
327   rtsEvent *event;
328 #elif defined(PAR)
329   StgSparkPool *pool;
330   rtsSpark spark;
331   StgTSO *tso;
332   GlobalTaskId pe;
333 #endif
334   rtsBool was_interrupted = rtsFalse;
335   
336   ACQUIRE_LOCK(&sched_mutex);
337
338 #if defined(GRAN)
339
340   /* set up first event to get things going */
341   /* ToDo: assign costs for system setup and init MainTSO ! */
342   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
343             ContinueThread, 
344             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
345
346   IF_DEBUG(gran,
347            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
348            G_TSO(CurrentTSO, 5));
349
350   if (RtsFlags.GranFlags.Light) {
351     /* Save current time; GranSim Light only */
352     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
353   }      
354
355   event = get_next_event();
356
357   while (event!=(rtsEvent*)NULL) {
358     /* Choose the processor with the next event */
359     CurrentProc = event->proc;
360     CurrentTSO = event->tso;
361
362 #elif defined(PAR)
363
364   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
365
366 #else
367
368   while (1) {
369
370 #endif
371
372     IF_DEBUG(scheduler, printAllThreads());
373
374     /* If we're interrupted (the user pressed ^C, or some other
375      * termination condition occurred), kill all the currently running
376      * threads.
377      */
378     if (interrupted) {
379       IF_DEBUG(scheduler, sched_belch("interrupted"));
380       for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
381         deleteThread(t);
382       }
383       for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
384         deleteThread(t);
385       }
386       run_queue_hd = run_queue_tl = END_TSO_QUEUE;
387       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
388       interrupted = rtsFalse;
389       was_interrupted = rtsTrue;
390     }
391
392     /* Go through the list of main threads and wake up any
393      * clients whose computations have finished.  ToDo: this
394      * should be done more efficiently without a linear scan
395      * of the main threads list, somehow...
396      */
397 #ifdef SMP
398     { 
399       StgMainThread *m, **prev;
400       prev = &main_threads;
401       for (m = main_threads; m != NULL; m = m->link) {
402         switch (m->tso->what_next) {
403         case ThreadComplete:
404           if (m->ret) {
405             *(m->ret) = (StgClosure *)m->tso->sp[0];
406           }
407           *prev = m->link;
408           m->stat = Success;
409           pthread_cond_broadcast(&m->wakeup);
410           break;
411         case ThreadKilled:
412           *prev = m->link;
413           if (was_interrupted) {
414             m->stat = Interrupted;
415           } else {
416             m->stat = Killed;
417           }
418           pthread_cond_broadcast(&m->wakeup);
419           break;
420         default:
421           break;
422         }
423       }
424     }
425
426 #else
427 # if defined(PAR)
428     /* in GUM do this only on the Main PE */
429     if (IAmMainThread)
430 # endif
431     /* If our main thread has finished or been killed, return.
432      */
433     {
434       StgMainThread *m = main_threads;
435       if (m->tso->what_next == ThreadComplete
436           || m->tso->what_next == ThreadKilled) {
437         main_threads = main_threads->link;
438         if (m->tso->what_next == ThreadComplete) {
439           /* we finished successfully, fill in the return value */
440           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
441           m->stat = Success;
442           return;
443         } else {
444           if (was_interrupted) {
445             m->stat = Interrupted;
446           } else {
447             m->stat = Killed;
448           }
449           return;
450         }
451       }
452     }
453 #endif
454
455     /* Top up the run queue from our spark pool.  We try to make the
456      * number of threads in the run queue equal to the number of
457      * free capabilities.
458      */
459 #if defined(SMP)
460     {
461       nat n = n_free_capabilities;
462       StgTSO *tso = run_queue_hd;
463
464       /* Count the run queue */
465       while (n > 0 && tso != END_TSO_QUEUE) {
466         tso = tso->link;
467         n--;
468       }
469
470       for (; n > 0; n--) {
471         StgClosure *spark;
472         spark = findSpark();
473         if (spark == NULL) {
474           break; /* no more sparks in the pool */
475         } else {
476           /* I'd prefer this to be done in activateSpark -- HWL */
477           /* tricky - it needs to hold the scheduler lock and
478            * not try to re-acquire it -- SDM */
479           StgTSO *tso;
480           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
481           pushClosure(tso,spark);
482           PUSH_ON_RUN_QUEUE(tso);
483 #ifdef PAR
484           advisory_thread_count++;
485 #endif
486           
487           IF_DEBUG(scheduler,
488                    sched_belch("turning spark of closure %p into a thread",
489                                (StgClosure *)spark));
490         }
491       }
492       /* We need to wake up the other tasks if we just created some
493        * work for them.
494        */
495       if (n_free_capabilities - n > 1) {
496           pthread_cond_signal(&thread_ready_cond);
497       }
498     }
499 #endif /* SMP */
500
501     /* Check whether any waiting threads need to be woken up.  If the
502      * run queue is empty, and there are no other tasks running, we
503      * can wait indefinitely for something to happen.
504      * ToDo: what if another client comes along & requests another
505      * main thread?
506      */
507     if (blocked_queue_hd != END_TSO_QUEUE) {
508       awaitEvent(
509            (run_queue_hd == END_TSO_QUEUE)
510 #ifdef SMP
511         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
512 #endif
513         );
514     }
515     
516     /* check for signals each time around the scheduler */
517 #ifndef __MINGW32__
518     if (signals_pending()) {
519       start_signal_handlers();
520     }
521 #endif
522
523     /* Detect deadlock: when we have no threads to run, there are
524      * no threads waiting on I/O or sleeping, and all the other
525      * tasks are waiting for work, we must have a deadlock.  Inform
526      * all the main threads.
527      */
528 #ifdef SMP
529     if (blocked_queue_hd == END_TSO_QUEUE
530         && run_queue_hd == END_TSO_QUEUE
531         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
532         ) {
533       StgMainThread *m;
534       for (m = main_threads; m != NULL; m = m->link) {
535           m->ret = NULL;
536           m->stat = Deadlock;
537           pthread_cond_broadcast(&m->wakeup);
538       }
539       main_threads = NULL;
540     }
541 #else /* ! SMP */
542     if (blocked_queue_hd == END_TSO_QUEUE
543         && run_queue_hd == END_TSO_QUEUE) {
544         StgMainThread *m = main_threads;
545         m->ret = NULL;
546         m->stat = Deadlock;
547         main_threads = m->link;
548         return;
549     }
550 #endif
551
552 #ifdef SMP
553     /* If there's a GC pending, don't do anything until it has
554      * completed.
555      */
556     if (ready_to_gc) {
557       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
558       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
559     }
560     
561     /* block until we've got a thread on the run queue and a free
562      * capability.
563      */
564     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
565       IF_DEBUG(scheduler, sched_belch("waiting for work"));
566       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
567       IF_DEBUG(scheduler, sched_belch("work now available"));
568     }
569 #endif
570
571 #if defined(GRAN)
572
573     if (RtsFlags.GranFlags.Light)
574       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
575
576     /* adjust time based on time-stamp */
577     if (event->time > CurrentTime[CurrentProc] &&
578         event->evttype != ContinueThread)
579       CurrentTime[CurrentProc] = event->time;
580     
581     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
582     if (!RtsFlags.GranFlags.Light)
583       handleIdlePEs();
584
585     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
586
587     /* main event dispatcher in GranSim */
588     switch (event->evttype) {
589       /* Should just be continuing execution */
590     case ContinueThread:
591       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
592       /* ToDo: check assertion
593       ASSERT(run_queue_hd != (StgTSO*)NULL &&
594              run_queue_hd != END_TSO_QUEUE);
595       */
596       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
597       if (!RtsFlags.GranFlags.DoAsyncFetch &&
598           procStatus[CurrentProc]==Fetching) {
599         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
600               CurrentTSO->id, CurrentTSO, CurrentProc);
601         goto next_thread;
602       } 
603       /* Ignore ContinueThreads for completed threads */
604       if (CurrentTSO->what_next == ThreadComplete) {
605         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
606               CurrentTSO->id, CurrentTSO, CurrentProc);
607         goto next_thread;
608       } 
609       /* Ignore ContinueThreads for threads that are being migrated */
610       if (PROCS(CurrentTSO)==Nowhere) { 
611         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
612               CurrentTSO->id, CurrentTSO, CurrentProc);
613         goto next_thread;
614       }
615       /* The thread should be at the beginning of the run queue */
616       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
617         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
618               CurrentTSO->id, CurrentTSO, CurrentProc);
619         break; // run the thread anyway
620       }
621       /*
622       new_event(proc, proc, CurrentTime[proc],
623                 FindWork,
624                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
625       goto next_thread; 
626       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
627       break; // now actually run the thread; DaH Qu'vam yImuHbej 
628
629     case FetchNode:
630       do_the_fetchnode(event);
631       goto next_thread;             /* handle next event in event queue  */
632       
633     case GlobalBlock:
634       do_the_globalblock(event);
635       goto next_thread;             /* handle next event in event queue  */
636       
637     case FetchReply:
638       do_the_fetchreply(event);
639       goto next_thread;             /* handle next event in event queue  */
640       
641     case UnblockThread:   /* Move from the blocked queue to the tail of */
642       do_the_unblock(event);
643       goto next_thread;             /* handle next event in event queue  */
644       
645     case ResumeThread:  /* Move from the blocked queue to the tail of */
646       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
647       event->tso->gran.blocktime += 
648         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
649       do_the_startthread(event);
650       goto next_thread;             /* handle next event in event queue  */
651       
652     case StartThread:
653       do_the_startthread(event);
654       goto next_thread;             /* handle next event in event queue  */
655       
656     case MoveThread:
657       do_the_movethread(event);
658       goto next_thread;             /* handle next event in event queue  */
659       
660     case MoveSpark:
661       do_the_movespark(event);
662       goto next_thread;             /* handle next event in event queue  */
663       
664     case FindWork:
665       do_the_findwork(event);
666       goto next_thread;             /* handle next event in event queue  */
667       
668     default:
669       barf("Illegal event type %u\n", event->evttype);
670     }  /* switch */
671     
672     /* This point was scheduler_loop in the old RTS */
673
674     IF_DEBUG(gran, belch("GRAN: after main switch"));
675
676     TimeOfLastEvent = CurrentTime[CurrentProc];
677     TimeOfNextEvent = get_time_of_next_event();
678     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
679     // CurrentTSO = ThreadQueueHd;
680
681     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
682                          TimeOfNextEvent));
683
684     if (RtsFlags.GranFlags.Light) 
685       GranSimLight_leave_system(event, &ActiveTSO); 
686
687     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
688
689     IF_DEBUG(gran, 
690              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
691
692     /* in a GranSim setup the TSO stays on the run queue */
693     t = CurrentTSO;
694     /* Take a thread from the run queue. */
695     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
696
697     IF_DEBUG(gran, 
698              fprintf(stderr, "GRAN: About to run current thread, which is\n");
699              G_TSO(t,5))
700
701     context_switch = 0; // turned on via GranYield, checking events and time slice
702
703     IF_DEBUG(gran, 
704              DumpGranEvent(GR_SCHEDULE, t));
705
706     procStatus[CurrentProc] = Busy;
707
708 #elif defined(PAR)
709
710     if (PendingFetches != END_BF_QUEUE) {
711         processFetches();
712     }
713
714     /* ToDo: phps merge with spark activation above */
715     /* check whether we have local work and send requests if we have none */
716     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
717       /* :-[  no local threads => look out for local sparks */
718       /* the spark pool for the current PE */
719       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
720       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
721           pool->hd < pool->tl) {
722         /* 
723          * ToDo: add GC code check that we really have enough heap afterwards!!
724          * Old comment:
725          * If we're here (no runnable threads) and we have pending
726          * sparks, we must have a space problem.  Get enough space
727          * to turn one of those pending sparks into a
728          * thread... 
729          */
730         
731         spark = findSpark();                /* get a spark */
732         if (spark != (rtsSpark) NULL) {
733           tso = activateSpark(spark);       /* turn the spark into a thread */
734           IF_PAR_DEBUG(schedule,
735                        belch("==== schedule: Created TSO %d (%p); %d threads active",
736                              tso->id, tso, advisory_thread_count));
737
738           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
739             belch("==^^ failed to activate spark");
740             goto next_thread;
741           }               /* otherwise fall through & pick-up new tso */
742         } else {
743           IF_PAR_DEBUG(verbose,
744                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
745                              spark_queue_len(pool)));
746           goto next_thread;
747         }
748       } else  
749       /* =8-[  no local sparks => look for work on other PEs */
750       {
751         /*
752          * We really have absolutely no work.  Send out a fish
753          * (there may be some out there already), and wait for
754          * something to arrive.  We clearly can't run any threads
755          * until a SCHEDULE or RESUME arrives, and so that's what
756          * we're hoping to see.  (Of course, we still have to
757          * respond to other types of messages.)
758          */
759         if (//!fishing &&  
760             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
761           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
762           /* fishing set in sendFish, processFish;
763              avoid flooding system with fishes via delay */
764           pe = choosePE();
765           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
766                    NEW_FISH_HUNGER);
767         }
768         
769         processMessages();
770         goto next_thread;
771         // ReSchedule(0);
772       }
773     } else if (PacketsWaiting()) {  /* Look for incoming messages */
774       processMessages();
775     }
776
777     /* Now we are sure that we have some work available */
778     ASSERT(run_queue_hd != END_TSO_QUEUE);
779     /* Take a thread from the run queue, if we have work */
780     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
781
782     /* ToDo: write something to the log-file
783     if (RTSflags.ParFlags.granSimStats && !sameThread)
784         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
785
786     CurrentTSO = t;
787     */
788     /* the spark pool for the current PE */
789     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
790
791     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)", 
792                               spark_queue_len(pool), 
793                               CURRENT_PROC,
794                               pool->hd, pool->tl, pool->base, pool->lim));
795
796     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
797                               run_queue_len(), CURRENT_PROC,
798                               run_queue_hd, run_queue_tl));
799
800 #if 0
801     if (t != LastTSO) {
802       /* 
803          we are running a different TSO, so write a schedule event to log file
804          NB: If we use fair scheduling we also have to write  a deschedule 
805              event for LastTSO; with unfair scheduling we know that the
806              previous tso has blocked whenever we switch to another tso, so
807              we don't need it in GUM for now
808       */
809       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
810                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
811       
812     }
813 #endif
814 #else /* !GRAN && !PAR */
815   
816     /* grab a thread from the run queue
817      */
818     ASSERT(run_queue_hd != END_TSO_QUEUE);
819     t = POP_RUN_QUEUE();
820     IF_DEBUG(sanity,checkTSO(t));
821
822 #endif
823     
824     /* grab a capability
825      */
826 #ifdef SMP
827     cap = free_capabilities;
828     free_capabilities = cap->link;
829     n_free_capabilities--;
830 #else
831     cap = &MainRegTable;
832 #endif
833     
834     cap->rCurrentTSO = t;
835     
836     /* set the context_switch flag
837      */
838     if (run_queue_hd == END_TSO_QUEUE)
839       context_switch = 0;
840     else
841       context_switch = 1;
842
843     RELEASE_LOCK(&sched_mutex);
844
845     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
846                               t->id, t, whatNext_strs[t->what_next]));
847
848     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
849     /* Run the current thread 
850      */
851     switch (cap->rCurrentTSO->what_next) {
852     case ThreadKilled:
853     case ThreadComplete:
854       /* Thread already finished, return to scheduler. */
855       ret = ThreadFinished;
856       break;
857     case ThreadEnterGHC:
858       ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
859       break;
860     case ThreadRunGHC:
861       ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
862       break;
863     case ThreadEnterHugs:
864 #ifdef INTERPRETER
865       {
866          StgClosure* c;
867          IF_DEBUG(scheduler,sched_belch("entering Hugs"));
868          c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
869          cap->rCurrentTSO->sp += 1;
870          ret = enter(cap,c);
871          break;
872       }
873 #else
874       barf("Panic: entered a BCO but no bytecode interpreter in this build");
875 #endif
876     default:
877       barf("schedule: invalid what_next field");
878     }
879     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
880     
881     /* Costs for the scheduler are assigned to CCS_SYSTEM */
882 #ifdef PROFILING
883     CCCS = CCS_SYSTEM;
884 #endif
885     
886     ACQUIRE_LOCK(&sched_mutex);
887
888 #ifdef SMP
889     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
890 #elif !defined(GRAN) && !defined(PAR)
891     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
892 #endif
893     t = cap->rCurrentTSO;
894     
895 #if defined(PAR)
896     /* HACK 675: if the last thread didn't yield, make sure to print a 
897        SCHEDULE event to the log file when StgRunning the next thread, even
898        if it is the same one as before */
899     LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t; 
900     TimeOfLastYield = CURRENT_TIME;
901 #endif
902
903     switch (ret) {
904     case HeapOverflow:
905       /* make all the running tasks block on a condition variable,
906        * maybe set context_switch and wait till they all pile in,
907        * then have them wait on a GC condition variable.
908        */
909       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
910                                t->id, t, whatNext_strs[t->what_next]));
911       threadPaused(t);
912 #if defined(GRAN)
913       ASSERT(!is_on_queue(t,CurrentProc));
914 #endif
915       
916       ready_to_gc = rtsTrue;
917       context_switch = 1;               /* stop other threads ASAP */
918       PUSH_ON_RUN_QUEUE(t);
919       /* actual GC is done at the end of the while loop */
920       break;
921       
922     case StackOverflow:
923       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
924                                t->id, t, whatNext_strs[t->what_next]));
925       /* just adjust the stack for this thread, then pop it back
926        * on the run queue.
927        */
928       threadPaused(t);
929       { 
930         StgMainThread *m;
931         /* enlarge the stack */
932         StgTSO *new_t = threadStackOverflow(t);
933         
934         /* This TSO has moved, so update any pointers to it from the
935          * main thread stack.  It better not be on any other queues...
936          * (it shouldn't be).
937          */
938         for (m = main_threads; m != NULL; m = m->link) {
939           if (m->tso == t) {
940             m->tso = new_t;
941           }
942         }
943         threadPaused(new_t);
944         PUSH_ON_RUN_QUEUE(new_t);
945       }
946       break;
947
948     case ThreadYielding:
949 #if defined(GRAN)
950       IF_DEBUG(gran, 
951                DumpGranEvent(GR_DESCHEDULE, t));
952       globalGranStats.tot_yields++;
953 #elif defined(PAR)
954       IF_DEBUG(par, 
955                DumpGranEvent(GR_DESCHEDULE, t));
956 #endif
957       /* put the thread back on the run queue.  Then, if we're ready to
958        * GC, check whether this is the last task to stop.  If so, wake
959        * up the GC thread.  getThread will block during a GC until the
960        * GC is finished.
961        */
962       IF_DEBUG(scheduler,
963                if (t->what_next == ThreadEnterHugs) {
964                    /* ToDo: or maybe a timer expired when we were in Hugs?
965                     * or maybe someone hit ctrl-C
966                     */
967                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
968                          t->id, t, whatNext_strs[t->what_next]);
969                } else {
970                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
971                          t->id, t, whatNext_strs[t->what_next]);
972                }
973                );
974       threadPaused(t);
975       IF_DEBUG(sanity,
976                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
977                checkTSO(t));
978       ASSERT(t->link == END_TSO_QUEUE);
979 #if defined(GRAN)
980       ASSERT(!is_on_queue(t,CurrentProc));
981
982       IF_DEBUG(sanity,
983                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
984                checkThreadQsSanity(rtsTrue));
985 #endif
986       APPEND_TO_RUN_QUEUE(t);
987 #if defined(GRAN)
988       /* add a ContinueThread event to actually process the thread */
989       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
990                 ContinueThread,
991                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
992       IF_GRAN_DEBUG(bq, 
993                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
994                G_EVENTQ(0);
995                G_CURR_THREADQ(0))
996 #endif /* GRAN */
997       break;
998       
999     case ThreadBlocked:
1000 #if defined(GRAN)
1001       IF_DEBUG(scheduler,
1002                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1003                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1004                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1005
1006       // ??? needed; should emit block before
1007       IF_DEBUG(gran, 
1008                DumpGranEvent(GR_DESCHEDULE, t)); 
1009       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1010       /*
1011         ngoq Dogh!
1012       ASSERT(procStatus[CurrentProc]==Busy || 
1013               ((procStatus[CurrentProc]==Fetching) && 
1014               (t->block_info.closure!=(StgClosure*)NULL)));
1015       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1016           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1017             procStatus[CurrentProc]==Fetching)) 
1018         procStatus[CurrentProc] = Idle;
1019       */
1020 #elif defined(PAR)
1021       IF_DEBUG(par, 
1022                DumpGranEvent(GR_DESCHEDULE, t)); 
1023
1024       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1025       blockThread(t);
1026
1027       IF_DEBUG(scheduler,
1028                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1029                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1030                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1031
1032 #else /* !GRAN */
1033       /* don't need to do anything.  Either the thread is blocked on
1034        * I/O, in which case we'll have called addToBlockedQueue
1035        * previously, or it's blocked on an MVar or Blackhole, in which
1036        * case it'll be on the relevant queue already.
1037        */
1038       IF_DEBUG(scheduler,
1039                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1040                printThreadBlockage(t);
1041                fprintf(stderr, "\n"));
1042
1043       /* Only for dumping event to log file 
1044          ToDo: do I need this in GranSim, too?
1045       blockThread(t);
1046       */
1047 #endif
1048       threadPaused(t);
1049       break;
1050       
1051     case ThreadFinished:
1052       /* Need to check whether this was a main thread, and if so, signal
1053        * the task that started it with the return value.  If we have no
1054        * more main threads, we probably need to stop all the tasks until
1055        * we get a new one.
1056        */
1057       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1058       t->what_next = ThreadComplete;
1059 #if defined(GRAN)
1060       endThread(t, CurrentProc); // clean-up the thread
1061 #elif defined(PAR)
1062       advisory_thread_count--;
1063       if (RtsFlags.ParFlags.ParStats.Full) 
1064         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1065 #endif
1066       break;
1067       
1068     default:
1069       barf("doneThread: invalid thread return code");
1070     }
1071     
1072 #ifdef SMP
1073     cap->link = free_capabilities;
1074     free_capabilities = cap;
1075     n_free_capabilities++;
1076 #endif
1077
1078 #ifdef SMP
1079     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1080 #else
1081     if (ready_to_gc) 
1082 #endif
1083       {
1084       /* everybody back, start the GC.
1085        * Could do it in this thread, or signal a condition var
1086        * to do it in another thread.  Either way, we need to
1087        * broadcast on gc_pending_cond afterward.
1088        */
1089 #ifdef SMP
1090       IF_DEBUG(scheduler,sched_belch("doing GC"));
1091 #endif
1092       GarbageCollect(GetRoots);
1093       ready_to_gc = rtsFalse;
1094 #ifdef SMP
1095       pthread_cond_broadcast(&gc_pending_cond);
1096 #endif
1097 #if defined(GRAN)
1098       /* add a ContinueThread event to continue execution of current thread */
1099       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1100                 ContinueThread,
1101                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1102       IF_GRAN_DEBUG(bq, 
1103                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1104                G_EVENTQ(0);
1105                G_CURR_THREADQ(0))
1106 #endif /* GRAN */
1107     }
1108 #if defined(GRAN)
1109   next_thread:
1110     IF_GRAN_DEBUG(unused,
1111                   print_eventq(EventHd));
1112
1113     event = get_next_event();
1114
1115 #elif defined(PAR)
1116   next_thread:
1117     /* ToDo: wait for next message to arrive rather than busy wait */
1118
1119 #else /* GRAN */
1120   /* not any more
1121   next_thread:
1122     t = take_off_run_queue(END_TSO_QUEUE);
1123   */
1124 #endif /* GRAN */
1125   } /* end of while(1) */
1126 }
1127
1128 /* A hack for Hugs concurrency support.  Needs sanitisation (?) */
1129 void deleteAllThreads ( void )
1130 {
1131   StgTSO* t;
1132   IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
1133   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1134     deleteThread(t);
1135   }
1136   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1137     deleteThread(t);
1138   }
1139   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1140   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1141 }
1142
1143 /* startThread and  insertThread are now in GranSim.c -- HWL */
1144
1145 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1146 //@subsection Suspend and Resume
1147
1148 /* ---------------------------------------------------------------------------
1149  * Suspending & resuming Haskell threads.
1150  * 
1151  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1152  * its capability before calling the C function.  This allows another
1153  * task to pick up the capability and carry on running Haskell
1154  * threads.  It also means that if the C call blocks, it won't lock
1155  * the whole system.
1156  *
1157  * The Haskell thread making the C call is put to sleep for the
1158  * duration of the call, on the susepended_ccalling_threads queue.  We
1159  * give out a token to the task, which it can use to resume the thread
1160  * on return from the C function.
1161  * ------------------------------------------------------------------------- */
1162    
1163 StgInt
1164 suspendThread( Capability *cap )
1165 {
1166   nat tok;
1167
1168   ACQUIRE_LOCK(&sched_mutex);
1169
1170   IF_DEBUG(scheduler,
1171            sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
1172
1173   threadPaused(cap->rCurrentTSO);
1174   cap->rCurrentTSO->link = suspended_ccalling_threads;
1175   suspended_ccalling_threads = cap->rCurrentTSO;
1176
1177   /* Use the thread ID as the token; it should be unique */
1178   tok = cap->rCurrentTSO->id;
1179
1180 #ifdef SMP
1181   cap->link = free_capabilities;
1182   free_capabilities = cap;
1183   n_free_capabilities++;
1184 #endif
1185
1186   RELEASE_LOCK(&sched_mutex);
1187   return tok; 
1188 }
1189
1190 Capability *
1191 resumeThread( StgInt tok )
1192 {
1193   StgTSO *tso, **prev;
1194   Capability *cap;
1195
1196   ACQUIRE_LOCK(&sched_mutex);
1197
1198   prev = &suspended_ccalling_threads;
1199   for (tso = suspended_ccalling_threads; 
1200        tso != END_TSO_QUEUE; 
1201        prev = &tso->link, tso = tso->link) {
1202     if (tso->id == (StgThreadID)tok) {
1203       *prev = tso->link;
1204       break;
1205     }
1206   }
1207   if (tso == END_TSO_QUEUE) {
1208     barf("resumeThread: thread not found");
1209   }
1210
1211 #ifdef SMP
1212   while (free_capabilities == NULL) {
1213     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1214     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1215     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1216   }
1217   cap = free_capabilities;
1218   free_capabilities = cap->link;
1219   n_free_capabilities--;
1220 #else  
1221   cap = &MainRegTable;
1222 #endif
1223
1224   cap->rCurrentTSO = tso;
1225
1226   RELEASE_LOCK(&sched_mutex);
1227   return cap;
1228 }
1229
1230
1231 /* ---------------------------------------------------------------------------
1232  * Static functions
1233  * ------------------------------------------------------------------------ */
1234 static void unblockThread(StgTSO *tso);
1235
1236 /* ---------------------------------------------------------------------------
1237  * Comparing Thread ids.
1238  *
1239  * This is used from STG land in the implementation of the
1240  * instances of Eq/Ord for ThreadIds.
1241  * ------------------------------------------------------------------------ */
1242
1243 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1244
1245   StgThreadID id1 = tso1->id; 
1246   StgThreadID id2 = tso2->id;
1247  
1248   if (id1 < id2) return (-1);
1249   if (id1 > id2) return 1;
1250   return 0;
1251 }
1252
1253 /* ---------------------------------------------------------------------------
1254    Create a new thread.
1255
1256    The new thread starts with the given stack size.  Before the
1257    scheduler can run, however, this thread needs to have a closure
1258    (and possibly some arguments) pushed on its stack.  See
1259    pushClosure() in Schedule.h.
1260
1261    createGenThread() and createIOThread() (in SchedAPI.h) are
1262    convenient packaged versions of this function.
1263
1264    currently pri (priority) is only used in a GRAN setup -- HWL
1265    ------------------------------------------------------------------------ */
1266 //@cindex createThread
1267 #if defined(GRAN)
1268 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1269 StgTSO *
1270 createThread(nat stack_size, StgInt pri)
1271 {
1272   return createThread_(stack_size, rtsFalse, pri);
1273 }
1274
1275 static StgTSO *
1276 createThread_(nat size, rtsBool have_lock, StgInt pri)
1277 {
1278 #else
1279 StgTSO *
1280 createThread(nat stack_size)
1281 {
1282   return createThread_(stack_size, rtsFalse);
1283 }
1284
1285 static StgTSO *
1286 createThread_(nat size, rtsBool have_lock)
1287 {
1288 #endif
1289
1290     StgTSO *tso;
1291     nat stack_size;
1292
1293     /* First check whether we should create a thread at all */
1294 #if defined(PAR)
1295   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1296   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1297     threadsIgnored++;
1298     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1299           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1300     return END_TSO_QUEUE;
1301   }
1302   threadsCreated++;
1303 #endif
1304
1305 #if defined(GRAN)
1306   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1307 #endif
1308
1309   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1310
1311   /* catch ridiculously small stack sizes */
1312   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1313     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1314   }
1315
1316   stack_size = size - TSO_STRUCT_SIZEW;
1317
1318   tso = (StgTSO *)allocate(size);
1319   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1320
1321   SET_HDR(tso, &TSO_info, CCS_SYSTEM);
1322 #if defined(GRAN)
1323   SET_GRAN_HDR(tso, ThisPE);
1324 #endif
1325   tso->what_next     = ThreadEnterGHC;
1326
1327   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1328    * protect the increment operation on next_thread_id.
1329    * In future, we could use an atomic increment instead.
1330    */
1331   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1332   tso->id = next_thread_id++; 
1333   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1334
1335   tso->why_blocked  = NotBlocked;
1336   tso->blocked_exceptions = NULL;
1337
1338   tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1339   tso->stack_size   = stack_size;
1340   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1341                               - TSO_STRUCT_SIZEW;
1342   tso->sp           = (P_)&(tso->stack) + stack_size;
1343
1344 #ifdef PROFILING
1345   tso->prof.CCCS = CCS_MAIN;
1346 #endif
1347
1348   /* put a stop frame on the stack */
1349   tso->sp -= sizeofW(StgStopFrame);
1350   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1351   tso->su = (StgUpdateFrame*)tso->sp;
1352
1353   // ToDo: check this
1354 #if defined(GRAN)
1355   tso->link = END_TSO_QUEUE;
1356   /* uses more flexible routine in GranSim */
1357   insertThread(tso, CurrentProc);
1358 #else
1359   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1360    * from its creation
1361    */
1362 #endif
1363
1364 #if defined(GRAN) || defined(PAR)
1365   DumpGranEvent(GR_START,tso);
1366 #endif
1367
1368   /* Link the new thread on the global thread list.
1369    */
1370   tso->global_link = all_threads;
1371   all_threads = tso;
1372
1373 #if defined(GRAN)
1374   tso->gran.pri = pri;
1375 # if defined(DEBUG)
1376   tso->gran.magic = TSO_MAGIC; // debugging only
1377 # endif
1378   tso->gran.sparkname   = 0;
1379   tso->gran.startedat   = CURRENT_TIME; 
1380   tso->gran.exported    = 0;
1381   tso->gran.basicblocks = 0;
1382   tso->gran.allocs      = 0;
1383   tso->gran.exectime    = 0;
1384   tso->gran.fetchtime   = 0;
1385   tso->gran.fetchcount  = 0;
1386   tso->gran.blocktime   = 0;
1387   tso->gran.blockcount  = 0;
1388   tso->gran.blockedat   = 0;
1389   tso->gran.globalsparks = 0;
1390   tso->gran.localsparks  = 0;
1391   if (RtsFlags.GranFlags.Light)
1392     tso->gran.clock  = Now; /* local clock */
1393   else
1394     tso->gran.clock  = 0;
1395
1396   IF_DEBUG(gran,printTSO(tso));
1397 #elif defined(PAR)
1398 # if defined(DEBUG)
1399   tso->par.magic = TSO_MAGIC; // debugging only
1400 # endif
1401   tso->par.sparkname   = 0;
1402   tso->par.startedat   = CURRENT_TIME; 
1403   tso->par.exported    = 0;
1404   tso->par.basicblocks = 0;
1405   tso->par.allocs      = 0;
1406   tso->par.exectime    = 0;
1407   tso->par.fetchtime   = 0;
1408   tso->par.fetchcount  = 0;
1409   tso->par.blocktime   = 0;
1410   tso->par.blockcount  = 0;
1411   tso->par.blockedat   = 0;
1412   tso->par.globalsparks = 0;
1413   tso->par.localsparks  = 0;
1414 #endif
1415
1416 #if defined(GRAN)
1417   globalGranStats.tot_threads_created++;
1418   globalGranStats.threads_created_on_PE[CurrentProc]++;
1419   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1420   globalGranStats.tot_sq_probes++;
1421 #endif 
1422
1423 #if defined(GRAN)
1424   IF_GRAN_DEBUG(pri,
1425                 belch("==__ schedule: Created TSO %d (%p);",
1426                       CurrentProc, tso, tso->id));
1427 #elif defined(PAR)
1428     IF_PAR_DEBUG(verbose,
1429                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1430                        tso->id, tso, advisory_thread_count));
1431 #else
1432   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1433                                  tso->id, tso->stack_size));
1434 #endif    
1435   return tso;
1436 }
1437
1438 /*
1439   Turn a spark into a thread.
1440   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1441 */
1442 #if defined(PAR)
1443 //@cindex activateSpark
1444 StgTSO *
1445 activateSpark (rtsSpark spark) 
1446 {
1447   StgTSO *tso;
1448   
1449   ASSERT(spark != (rtsSpark)NULL);
1450   tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1451   if (tso!=END_TSO_QUEUE) {
1452     pushClosure(tso,spark);
1453     PUSH_ON_RUN_QUEUE(tso);
1454     advisory_thread_count++;
1455
1456     if (RtsFlags.ParFlags.ParStats.Full) {
1457       //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1458       IF_PAR_DEBUG(verbose,
1459                    belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1460                          (StgClosure *)spark, info_type((StgClosure *)spark)));
1461     }
1462   } else {
1463     barf("activateSpark: Cannot create TSO");
1464   }
1465   // ToDo: fwd info on local/global spark to thread -- HWL
1466   // tso->gran.exported =  spark->exported;
1467   // tso->gran.locked =   !spark->global;
1468   // tso->gran.sparkname = spark->name;
1469
1470   return tso;
1471 }
1472 #endif
1473
1474 /* ---------------------------------------------------------------------------
1475  * scheduleThread()
1476  *
1477  * scheduleThread puts a thread on the head of the runnable queue.
1478  * This will usually be done immediately after a thread is created.
1479  * The caller of scheduleThread must create the thread using e.g.
1480  * createThread and push an appropriate closure
1481  * on this thread's stack before the scheduler is invoked.
1482  * ------------------------------------------------------------------------ */
1483
1484 void
1485 scheduleThread(StgTSO *tso)
1486 {
1487   if (tso==END_TSO_QUEUE){    
1488     schedule();
1489     return;
1490   }
1491
1492   ACQUIRE_LOCK(&sched_mutex);
1493
1494   /* Put the new thread on the head of the runnable queue.  The caller
1495    * better push an appropriate closure on this thread's stack
1496    * beforehand.  In the SMP case, the thread may start running as
1497    * soon as we release the scheduler lock below.
1498    */
1499   PUSH_ON_RUN_QUEUE(tso);
1500   THREAD_RUNNABLE();
1501
1502 #if 0
1503   IF_DEBUG(scheduler,printTSO(tso));
1504 #endif
1505   RELEASE_LOCK(&sched_mutex);
1506 }
1507
1508 /* ---------------------------------------------------------------------------
1509  * startTasks()
1510  *
1511  * Start up Posix threads to run each of the scheduler tasks.
1512  * I believe the task ids are not needed in the system as defined.
1513  *  KH @ 25/10/99
1514  * ------------------------------------------------------------------------ */
1515
1516 #if defined(PAR) || defined(SMP)
1517 void *
1518 taskStart( void *arg STG_UNUSED )
1519 {
1520   rts_evalNothing(NULL);
1521 }
1522 #endif
1523
1524 /* ---------------------------------------------------------------------------
1525  * initScheduler()
1526  *
1527  * Initialise the scheduler.  This resets all the queues - if the
1528  * queues contained any threads, they'll be garbage collected at the
1529  * next pass.
1530  *
1531  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1532  * ------------------------------------------------------------------------ */
1533
1534 #ifdef SMP
1535 static void
1536 term_handler(int sig STG_UNUSED)
1537 {
1538   stat_workerStop();
1539   ACQUIRE_LOCK(&term_mutex);
1540   await_death--;
1541   RELEASE_LOCK(&term_mutex);
1542   pthread_exit(NULL);
1543 }
1544 #endif
1545
1546 //@cindex initScheduler
1547 void 
1548 initScheduler(void)
1549 {
1550 #if defined(GRAN)
1551   nat i;
1552
1553   for (i=0; i<=MAX_PROC; i++) {
1554     run_queue_hds[i]      = END_TSO_QUEUE;
1555     run_queue_tls[i]      = END_TSO_QUEUE;
1556     blocked_queue_hds[i]  = END_TSO_QUEUE;
1557     blocked_queue_tls[i]  = END_TSO_QUEUE;
1558     ccalling_threadss[i]  = END_TSO_QUEUE;
1559   }
1560 #else
1561   run_queue_hd      = END_TSO_QUEUE;
1562   run_queue_tl      = END_TSO_QUEUE;
1563   blocked_queue_hd  = END_TSO_QUEUE;
1564   blocked_queue_tl  = END_TSO_QUEUE;
1565 #endif 
1566
1567   suspended_ccalling_threads  = END_TSO_QUEUE;
1568
1569   main_threads = NULL;
1570   all_threads  = END_TSO_QUEUE;
1571
1572   context_switch = 0;
1573   interrupted    = 0;
1574
1575   enteredCAFs = END_CAF_LIST;
1576
1577   /* Install the SIGHUP handler */
1578 #ifdef SMP
1579   {
1580     struct sigaction action,oact;
1581
1582     action.sa_handler = term_handler;
1583     sigemptyset(&action.sa_mask);
1584     action.sa_flags = 0;
1585     if (sigaction(SIGTERM, &action, &oact) != 0) {
1586       barf("can't install TERM handler");
1587     }
1588   }
1589 #endif
1590
1591 #ifdef SMP
1592   /* Allocate N Capabilities */
1593   {
1594     nat i;
1595     Capability *cap, *prev;
1596     cap  = NULL;
1597     prev = NULL;
1598     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1599       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1600       cap->link = prev;
1601       prev = cap;
1602     }
1603     free_capabilities = cap;
1604     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1605   }
1606   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1607                              n_free_capabilities););
1608 #endif
1609
1610 #if defined(SMP) || defined(PAR)
1611   initSparkPools();
1612 #endif
1613 }
1614
1615 #ifdef SMP
1616 void
1617 startTasks( void )
1618 {
1619   nat i;
1620   int r;
1621   pthread_t tid;
1622   
1623   /* make some space for saving all the thread ids */
1624   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1625                             "initScheduler:task_ids");
1626   
1627   /* and create all the threads */
1628   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1629     r = pthread_create(&tid,NULL,taskStart,NULL);
1630     if (r != 0) {
1631       barf("startTasks: Can't create new Posix thread");
1632     }
1633     task_ids[i].id = tid;
1634     task_ids[i].mut_time = 0.0;
1635     task_ids[i].mut_etime = 0.0;
1636     task_ids[i].gc_time = 0.0;
1637     task_ids[i].gc_etime = 0.0;
1638     task_ids[i].elapsedtimestart = elapsedtime();
1639     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1640   }
1641 }
1642 #endif
1643
1644 void
1645 exitScheduler( void )
1646 {
1647 #ifdef SMP
1648   nat i;
1649
1650   /* Don't want to use pthread_cancel, since we'd have to install
1651    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1652    * all our locks.
1653    */
1654 #if 0
1655   /* Cancel all our tasks */
1656   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1657     pthread_cancel(task_ids[i].id);
1658   }
1659   
1660   /* Wait for all the tasks to terminate */
1661   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1662     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1663                                task_ids[i].id));
1664     pthread_join(task_ids[i].id, NULL);
1665   }
1666 #endif
1667
1668   /* Send 'em all a SIGHUP.  That should shut 'em up.
1669    */
1670   await_death = RtsFlags.ParFlags.nNodes;
1671   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1672     pthread_kill(task_ids[i].id,SIGTERM);
1673   }
1674   while (await_death > 0) {
1675     sched_yield();
1676   }
1677 #endif
1678 }
1679
1680 /* -----------------------------------------------------------------------------
1681    Managing the per-task allocation areas.
1682    
1683    Each capability comes with an allocation area.  These are
1684    fixed-length block lists into which allocation can be done.
1685
1686    ToDo: no support for two-space collection at the moment???
1687    -------------------------------------------------------------------------- */
1688
1689 /* -----------------------------------------------------------------------------
1690  * waitThread is the external interface for running a new computation
1691  * and waiting for the result.
1692  *
1693  * In the non-SMP case, we create a new main thread, push it on the 
1694  * main-thread stack, and invoke the scheduler to run it.  The
1695  * scheduler will return when the top main thread on the stack has
1696  * completed or died, and fill in the necessary fields of the
1697  * main_thread structure.
1698  *
1699  * In the SMP case, we create a main thread as before, but we then
1700  * create a new condition variable and sleep on it.  When our new
1701  * main thread has completed, we'll be woken up and the status/result
1702  * will be in the main_thread struct.
1703  * -------------------------------------------------------------------------- */
1704
1705 SchedulerStatus
1706 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1707 {
1708   StgMainThread *m;
1709   SchedulerStatus stat;
1710
1711   ACQUIRE_LOCK(&sched_mutex);
1712   
1713   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1714
1715   m->tso = tso;
1716   m->ret = ret;
1717   m->stat = NoStatus;
1718 #ifdef SMP
1719   pthread_cond_init(&m->wakeup, NULL);
1720 #endif
1721
1722   m->link = main_threads;
1723   main_threads = m;
1724
1725   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1726                               m->tso->id));
1727
1728 #ifdef SMP
1729   do {
1730     pthread_cond_wait(&m->wakeup, &sched_mutex);
1731   } while (m->stat == NoStatus);
1732 #elif defined(GRAN)
1733   /* GranSim specific init */
1734   CurrentTSO = m->tso;                // the TSO to run
1735   procStatus[MainProc] = Busy;        // status of main PE
1736   CurrentProc = MainProc;             // PE to run it on
1737
1738   schedule();
1739 #else
1740   schedule();
1741   ASSERT(m->stat != NoStatus);
1742 #endif
1743
1744   stat = m->stat;
1745
1746 #ifdef SMP
1747   pthread_cond_destroy(&m->wakeup);
1748 #endif
1749
1750   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1751                               m->tso->id));
1752   free(m);
1753
1754   RELEASE_LOCK(&sched_mutex);
1755
1756   return stat;
1757 }
1758
1759 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1760 //@subsection Run queue code 
1761
1762 #if 0
1763 /* 
1764    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1765        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1766        implicit global variable that has to be correct when calling these
1767        fcts -- HWL 
1768 */
1769
1770 /* Put the new thread on the head of the runnable queue.
1771  * The caller of createThread better push an appropriate closure
1772  * on this thread's stack before the scheduler is invoked.
1773  */
1774 static /* inline */ void
1775 add_to_run_queue(tso)
1776 StgTSO* tso; 
1777 {
1778   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1779   tso->link = run_queue_hd;
1780   run_queue_hd = tso;
1781   if (run_queue_tl == END_TSO_QUEUE) {
1782     run_queue_tl = tso;
1783   }
1784 }
1785
1786 /* Put the new thread at the end of the runnable queue. */
1787 static /* inline */ void
1788 push_on_run_queue(tso)
1789 StgTSO* tso; 
1790 {
1791   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1792   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1793   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1794   if (run_queue_hd == END_TSO_QUEUE) {
1795     run_queue_hd = tso;
1796   } else {
1797     run_queue_tl->link = tso;
1798   }
1799   run_queue_tl = tso;
1800 }
1801
1802 /* 
1803    Should be inlined because it's used very often in schedule.  The tso
1804    argument is actually only needed in GranSim, where we want to have the
1805    possibility to schedule *any* TSO on the run queue, irrespective of the
1806    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1807    the run queue and dequeue the tso, adjusting the links in the queue. 
1808 */
1809 //@cindex take_off_run_queue
1810 static /* inline */ StgTSO*
1811 take_off_run_queue(StgTSO *tso) {
1812   StgTSO *t, *prev;
1813
1814   /* 
1815      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1816
1817      if tso is specified, unlink that tso from the run_queue (doesn't have
1818      to be at the beginning of the queue); GranSim only 
1819   */
1820   if (tso!=END_TSO_QUEUE) {
1821     /* find tso in queue */
1822     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1823          t!=END_TSO_QUEUE && t!=tso;
1824          prev=t, t=t->link) 
1825       /* nothing */ ;
1826     ASSERT(t==tso);
1827     /* now actually dequeue the tso */
1828     if (prev!=END_TSO_QUEUE) {
1829       ASSERT(run_queue_hd!=t);
1830       prev->link = t->link;
1831     } else {
1832       /* t is at beginning of thread queue */
1833       ASSERT(run_queue_hd==t);
1834       run_queue_hd = t->link;
1835     }
1836     /* t is at end of thread queue */
1837     if (t->link==END_TSO_QUEUE) {
1838       ASSERT(t==run_queue_tl);
1839       run_queue_tl = prev;
1840     } else {
1841       ASSERT(run_queue_tl!=t);
1842     }
1843     t->link = END_TSO_QUEUE;
1844   } else {
1845     /* take tso from the beginning of the queue; std concurrent code */
1846     t = run_queue_hd;
1847     if (t != END_TSO_QUEUE) {
1848       run_queue_hd = t->link;
1849       t->link = END_TSO_QUEUE;
1850       if (run_queue_hd == END_TSO_QUEUE) {
1851         run_queue_tl = END_TSO_QUEUE;
1852       }
1853     }
1854   }
1855   return t;
1856 }
1857
1858 #endif /* 0 */
1859
1860 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1861 //@subsection Garbage Collextion Routines
1862
1863 /* ---------------------------------------------------------------------------
1864    Where are the roots that we know about?
1865
1866         - all the threads on the runnable queue
1867         - all the threads on the blocked queue
1868         - all the thread currently executing a _ccall_GC
1869         - all the "main threads"
1870      
1871    ------------------------------------------------------------------------ */
1872
1873 /* This has to be protected either by the scheduler monitor, or by the
1874         garbage collection monitor (probably the latter).
1875         KH @ 25/10/99
1876 */
1877
1878 static void GetRoots(void)
1879 {
1880   StgMainThread *m;
1881
1882 #if defined(GRAN)
1883   {
1884     nat i;
1885     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1886       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1887         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1888       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1889         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1890       
1891       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1892         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1893       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1894         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1895       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1896         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1897     }
1898   }
1899
1900   markEventQueue();
1901
1902 #else /* !GRAN */
1903   if (run_queue_hd != END_TSO_QUEUE) {
1904     ASSERT(run_queue_tl != END_TSO_QUEUE);
1905     run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1906     run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1907   }
1908
1909   if (blocked_queue_hd != END_TSO_QUEUE) {
1910     ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1911     blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1912     blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1913   }
1914 #endif 
1915
1916   for (m = main_threads; m != NULL; m = m->link) {
1917     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1918   }
1919   if (suspended_ccalling_threads != END_TSO_QUEUE)
1920     suspended_ccalling_threads = 
1921       (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1922
1923 #if defined(SMP) || defined(PAR) || defined(GRAN)
1924   markSparkQueue();
1925 #endif
1926 }
1927
1928 /* -----------------------------------------------------------------------------
1929    performGC
1930
1931    This is the interface to the garbage collector from Haskell land.
1932    We provide this so that external C code can allocate and garbage
1933    collect when called from Haskell via _ccall_GC.
1934
1935    It might be useful to provide an interface whereby the programmer
1936    can specify more roots (ToDo).
1937    
1938    This needs to be protected by the GC condition variable above.  KH.
1939    -------------------------------------------------------------------------- */
1940
1941 void (*extra_roots)(void);
1942
1943 void
1944 performGC(void)
1945 {
1946   GarbageCollect(GetRoots);
1947 }
1948
1949 static void
1950 AllRoots(void)
1951 {
1952   GetRoots();                   /* the scheduler's roots */
1953   extra_roots();                /* the user's roots */
1954 }
1955
1956 void
1957 performGCWithRoots(void (*get_roots)(void))
1958 {
1959   extra_roots = get_roots;
1960
1961   GarbageCollect(AllRoots);
1962 }
1963
1964 /* -----------------------------------------------------------------------------
1965    Stack overflow
1966
1967    If the thread has reached its maximum stack size, then raise the
1968    StackOverflow exception in the offending thread.  Otherwise
1969    relocate the TSO into a larger chunk of memory and adjust its stack
1970    size appropriately.
1971    -------------------------------------------------------------------------- */
1972
1973 static StgTSO *
1974 threadStackOverflow(StgTSO *tso)
1975 {
1976   nat new_stack_size, new_tso_size, diff, stack_words;
1977   StgPtr new_sp;
1978   StgTSO *dest;
1979
1980   IF_DEBUG(sanity,checkTSO(tso));
1981   if (tso->stack_size >= tso->max_stack_size) {
1982
1983     IF_DEBUG(gc,
1984              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
1985                    tso->id, tso, tso->stack_size, tso->max_stack_size);
1986              /* If we're debugging, just print out the top of the stack */
1987              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
1988                                               tso->sp+64)));
1989
1990 #ifdef INTERPRETER
1991     fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
1992     exit(1);
1993 #else
1994     /* Send this thread the StackOverflow exception */
1995     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
1996 #endif
1997     return tso;
1998   }
1999
2000   /* Try to double the current stack size.  If that takes us over the
2001    * maximum stack size for this thread, then use the maximum instead.
2002    * Finally round up so the TSO ends up as a whole number of blocks.
2003    */
2004   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2005   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2006                                        TSO_STRUCT_SIZE)/sizeof(W_);
2007   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2008   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2009
2010   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2011
2012   dest = (StgTSO *)allocate(new_tso_size);
2013   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2014
2015   /* copy the TSO block and the old stack into the new area */
2016   memcpy(dest,tso,TSO_STRUCT_SIZE);
2017   stack_words = tso->stack + tso->stack_size - tso->sp;
2018   new_sp = (P_)dest + new_tso_size - stack_words;
2019   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2020
2021   /* relocate the stack pointers... */
2022   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2023   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2024   dest->sp    = new_sp;
2025   dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2026   dest->stack_size = new_stack_size;
2027         
2028   /* and relocate the update frame list */
2029   relocate_TSO(tso, dest);
2030
2031   /* Mark the old TSO as relocated.  We have to check for relocated
2032    * TSOs in the garbage collector and any primops that deal with TSOs.
2033    *
2034    * It's important to set the sp and su values to just beyond the end
2035    * of the stack, so we don't attempt to scavenge any part of the
2036    * dead TSO's stack.
2037    */
2038   tso->what_next = ThreadRelocated;
2039   tso->link = dest;
2040   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2041   tso->su = (StgUpdateFrame *)tso->sp;
2042   tso->why_blocked = NotBlocked;
2043   dest->mut_link = NULL;
2044
2045   IF_PAR_DEBUG(verbose,
2046                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2047                      tso->id, tso, tso->stack_size);
2048                /* If we're debugging, just print out the top of the stack */
2049                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2050                                                 tso->sp+64)));
2051   
2052   IF_DEBUG(sanity,checkTSO(tso));
2053 #if 0
2054   IF_DEBUG(scheduler,printTSO(dest));
2055 #endif
2056
2057   return dest;
2058 }
2059
2060 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2061 //@subsection Blocking Queue Routines
2062
2063 /* ---------------------------------------------------------------------------
2064    Wake up a queue that was blocked on some resource.
2065    ------------------------------------------------------------------------ */
2066
2067 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
2068
2069 #if defined(GRAN)
2070 static inline void
2071 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2072 {
2073 }
2074 #elif defined(PAR)
2075 static inline void
2076 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2077 {
2078   /* write RESUME events to log file and
2079      update blocked and fetch time (depending on type of the orig closure) */
2080   if (RtsFlags.ParFlags.ParStats.Full) {
2081     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2082                      GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2083                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2084
2085     switch (get_itbl(node)->type) {
2086         case FETCH_ME_BQ:
2087           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2088           break;
2089         case RBH:
2090         case FETCH_ME:
2091         case BLACKHOLE_BQ:
2092           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2093           break;
2094         default:
2095           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2096         }
2097       }
2098 }
2099 #endif
2100
2101 #if defined(GRAN)
2102 static StgBlockingQueueElement *
2103 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2104 {
2105     StgTSO *tso;
2106     PEs node_loc, tso_loc;
2107
2108     node_loc = where_is(node); // should be lifted out of loop
2109     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2110     tso_loc = where_is((StgClosure *)tso);
2111     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2112       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2113       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2114       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2115       // insertThread(tso, node_loc);
2116       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2117                 ResumeThread,
2118                 tso, node, (rtsSpark*)NULL);
2119       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2120       // len_local++;
2121       // len++;
2122     } else { // TSO is remote (actually should be FMBQ)
2123       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2124                                   RtsFlags.GranFlags.Costs.gunblocktime +
2125                                   RtsFlags.GranFlags.Costs.latency;
2126       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2127                 UnblockThread,
2128                 tso, node, (rtsSpark*)NULL);
2129       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2130       // len++;
2131     }
2132     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2133     IF_GRAN_DEBUG(bq,
2134                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2135                           (node_loc==tso_loc ? "Local" : "Global"), 
2136                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2137     tso->block_info.closure = NULL;
2138     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2139                              tso->id, tso));
2140 }
2141 #elif defined(PAR)
2142 static StgBlockingQueueElement *
2143 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2144 {
2145     StgBlockingQueueElement *next;
2146
2147     switch (get_itbl(bqe)->type) {
2148     case TSO:
2149       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2150       /* if it's a TSO just push it onto the run_queue */
2151       next = bqe->link;
2152       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2153       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2154       THREAD_RUNNABLE();
2155       unblockCount(bqe, node);
2156       /* reset blocking status after dumping event */
2157       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2158       break;
2159
2160     case BLOCKED_FETCH:
2161       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2162       next = bqe->link;
2163       bqe->link = PendingFetches;
2164       PendingFetches = bqe;
2165       break;
2166
2167 # if defined(DEBUG)
2168       /* can ignore this case in a non-debugging setup; 
2169          see comments on RBHSave closures above */
2170     case CONSTR:
2171       /* check that the closure is an RBHSave closure */
2172       ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2173              get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2174              get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2175       break;
2176
2177     default:
2178       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2179            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2180            (StgClosure *)bqe);
2181 # endif
2182     }
2183   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2184   return next;
2185 }
2186
2187 #else /* !GRAN && !PAR */
2188 static StgTSO *
2189 unblockOneLocked(StgTSO *tso)
2190 {
2191   StgTSO *next;
2192
2193   ASSERT(get_itbl(tso)->type == TSO);
2194   ASSERT(tso->why_blocked != NotBlocked);
2195   tso->why_blocked = NotBlocked;
2196   next = tso->link;
2197   PUSH_ON_RUN_QUEUE(tso);
2198   THREAD_RUNNABLE();
2199   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2200   return next;
2201 }
2202 #endif
2203
2204 #if defined(GRAN) || defined(PAR)
2205 inline StgBlockingQueueElement *
2206 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2207 {
2208   ACQUIRE_LOCK(&sched_mutex);
2209   bqe = unblockOneLocked(bqe, node);
2210   RELEASE_LOCK(&sched_mutex);
2211   return bqe;
2212 }
2213 #else
2214 inline StgTSO *
2215 unblockOne(StgTSO *tso)
2216 {
2217   ACQUIRE_LOCK(&sched_mutex);
2218   tso = unblockOneLocked(tso);
2219   RELEASE_LOCK(&sched_mutex);
2220   return tso;
2221 }
2222 #endif
2223
2224 #if defined(GRAN)
2225 void 
2226 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2227 {
2228   StgBlockingQueueElement *bqe;
2229   PEs node_loc;
2230   nat len = 0; 
2231
2232   IF_GRAN_DEBUG(bq, 
2233                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2234                       node, CurrentProc, CurrentTime[CurrentProc], 
2235                       CurrentTSO->id, CurrentTSO));
2236
2237   node_loc = where_is(node);
2238
2239   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2240          get_itbl(q)->type == CONSTR); // closure (type constructor)
2241   ASSERT(is_unique(node));
2242
2243   /* FAKE FETCH: magically copy the node to the tso's proc;
2244      no Fetch necessary because in reality the node should not have been 
2245      moved to the other PE in the first place
2246   */
2247   if (CurrentProc!=node_loc) {
2248     IF_GRAN_DEBUG(bq, 
2249                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2250                         node, node_loc, CurrentProc, CurrentTSO->id, 
2251                         // CurrentTSO, where_is(CurrentTSO),
2252                         node->header.gran.procs));
2253     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2254     IF_GRAN_DEBUG(bq, 
2255                   belch("## new bitmask of node %p is %#x",
2256                         node, node->header.gran.procs));
2257     if (RtsFlags.GranFlags.GranSimStats.Global) {
2258       globalGranStats.tot_fake_fetches++;
2259     }
2260   }
2261
2262   bqe = q;
2263   // ToDo: check: ASSERT(CurrentProc==node_loc);
2264   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2265     //next = bqe->link;
2266     /* 
2267        bqe points to the current element in the queue
2268        next points to the next element in the queue
2269     */
2270     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2271     //tso_loc = where_is(tso);
2272     len++;
2273     bqe = unblockOneLocked(bqe, node);
2274   }
2275
2276   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2277      the closure to make room for the anchor of the BQ */
2278   if (bqe!=END_BQ_QUEUE) {
2279     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2280     /*
2281     ASSERT((info_ptr==&RBH_Save_0_info) ||
2282            (info_ptr==&RBH_Save_1_info) ||
2283            (info_ptr==&RBH_Save_2_info));
2284     */
2285     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2286     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2287     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2288
2289     IF_GRAN_DEBUG(bq,
2290                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2291                         node, info_type(node)));
2292   }
2293
2294   /* statistics gathering */
2295   if (RtsFlags.GranFlags.GranSimStats.Global) {
2296     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2297     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2298     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2299     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2300   }
2301   IF_GRAN_DEBUG(bq,
2302                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2303                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2304 }
2305 #elif defined(PAR)
2306 void 
2307 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2308 {
2309   StgBlockingQueueElement *bqe, *next;
2310
2311   ACQUIRE_LOCK(&sched_mutex);
2312
2313   IF_PAR_DEBUG(verbose, 
2314                belch("## AwBQ for node %p on [%x]: ",
2315                      node, mytid));
2316
2317   ASSERT(get_itbl(q)->type == TSO ||           
2318          get_itbl(q)->type == BLOCKED_FETCH || 
2319          get_itbl(q)->type == CONSTR); 
2320
2321   bqe = q;
2322   while (get_itbl(bqe)->type==TSO || 
2323          get_itbl(bqe)->type==BLOCKED_FETCH) {
2324     bqe = unblockOneLocked(bqe, node);
2325   }
2326   RELEASE_LOCK(&sched_mutex);
2327 }
2328
2329 #else   /* !GRAN && !PAR */
2330 void
2331 awakenBlockedQueue(StgTSO *tso)
2332 {
2333   ACQUIRE_LOCK(&sched_mutex);
2334   while (tso != END_TSO_QUEUE) {
2335     tso = unblockOneLocked(tso);
2336   }
2337   RELEASE_LOCK(&sched_mutex);
2338 }
2339 #endif
2340
2341 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2342 //@subsection Exception Handling Routines
2343
2344 /* ---------------------------------------------------------------------------
2345    Interrupt execution
2346    - usually called inside a signal handler so it mustn't do anything fancy.   
2347    ------------------------------------------------------------------------ */
2348
2349 void
2350 interruptStgRts(void)
2351 {
2352     interrupted    = 1;
2353     context_switch = 1;
2354 }
2355
2356 /* -----------------------------------------------------------------------------
2357    Unblock a thread
2358
2359    This is for use when we raise an exception in another thread, which
2360    may be blocked.
2361    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2362    -------------------------------------------------------------------------- */
2363
2364 #if defined(GRAN) || defined(PAR)
2365 /*
2366   NB: only the type of the blocking queue is different in GranSim and GUM
2367       the operations on the queue-elements are the same
2368       long live polymorphism!
2369 */
2370 static void
2371 unblockThread(StgTSO *tso)
2372 {
2373   StgBlockingQueueElement *t, **last;
2374
2375   ACQUIRE_LOCK(&sched_mutex);
2376   switch (tso->why_blocked) {
2377
2378   case NotBlocked:
2379     return;  /* not blocked */
2380
2381   case BlockedOnMVar:
2382     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2383     {
2384       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2385       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2386
2387       last = (StgBlockingQueueElement **)&mvar->head;
2388       for (t = (StgBlockingQueueElement *)mvar->head; 
2389            t != END_BQ_QUEUE; 
2390            last = &t->link, last_tso = t, t = t->link) {
2391         if (t == (StgBlockingQueueElement *)tso) {
2392           *last = (StgBlockingQueueElement *)tso->link;
2393           if (mvar->tail == tso) {
2394             mvar->tail = (StgTSO *)last_tso;
2395           }
2396           goto done;
2397         }
2398       }
2399       barf("unblockThread (MVAR): TSO not found");
2400     }
2401
2402   case BlockedOnBlackHole:
2403     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2404     {
2405       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2406
2407       last = &bq->blocking_queue;
2408       for (t = bq->blocking_queue; 
2409            t != END_BQ_QUEUE; 
2410            last = &t->link, t = t->link) {
2411         if (t == (StgBlockingQueueElement *)tso) {
2412           *last = (StgBlockingQueueElement *)tso->link;
2413           goto done;
2414         }
2415       }
2416       barf("unblockThread (BLACKHOLE): TSO not found");
2417     }
2418
2419   case BlockedOnException:
2420     {
2421       StgTSO *target  = tso->block_info.tso;
2422
2423       ASSERT(get_itbl(target)->type == TSO);
2424       ASSERT(target->blocked_exceptions != NULL);
2425
2426       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2427       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2428            t != END_BQ_QUEUE; 
2429            last = &t->link, t = t->link) {
2430         ASSERT(get_itbl(t)->type == TSO);
2431         if (t == (StgBlockingQueueElement *)tso) {
2432           *last = (StgBlockingQueueElement *)tso->link;
2433           goto done;
2434         }
2435       }
2436       barf("unblockThread (Exception): TSO not found");
2437     }
2438
2439   case BlockedOnDelay:
2440   case BlockedOnRead:
2441   case BlockedOnWrite:
2442     {
2443       StgBlockingQueueElement *prev = NULL;
2444       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2445            prev = t, t = t->link) {
2446         if (t == (StgBlockingQueueElement *)tso) {
2447           if (prev == NULL) {
2448             blocked_queue_hd = (StgTSO *)t->link;
2449             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2450               blocked_queue_tl = END_TSO_QUEUE;
2451             }
2452           } else {
2453             prev->link = t->link;
2454             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2455               blocked_queue_tl = (StgTSO *)prev;
2456             }
2457           }
2458           goto done;
2459         }
2460       }
2461       barf("unblockThread (I/O): TSO not found");
2462     }
2463
2464   default:
2465     barf("unblockThread");
2466   }
2467
2468  done:
2469   tso->link = END_TSO_QUEUE;
2470   tso->why_blocked = NotBlocked;
2471   tso->block_info.closure = NULL;
2472   PUSH_ON_RUN_QUEUE(tso);
2473   RELEASE_LOCK(&sched_mutex);
2474 }
2475 #else
2476 static void
2477 unblockThread(StgTSO *tso)
2478 {
2479   StgTSO *t, **last;
2480
2481   ACQUIRE_LOCK(&sched_mutex);
2482   switch (tso->why_blocked) {
2483
2484   case NotBlocked:
2485     return;  /* not blocked */
2486
2487   case BlockedOnMVar:
2488     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2489     {
2490       StgTSO *last_tso = END_TSO_QUEUE;
2491       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2492
2493       last = &mvar->head;
2494       for (t = mvar->head; t != END_TSO_QUEUE; 
2495            last = &t->link, last_tso = t, t = t->link) {
2496         if (t == tso) {
2497           *last = tso->link;
2498           if (mvar->tail == tso) {
2499             mvar->tail = last_tso;
2500           }
2501           goto done;
2502         }
2503       }
2504       barf("unblockThread (MVAR): TSO not found");
2505     }
2506
2507   case BlockedOnBlackHole:
2508     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2509     {
2510       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2511
2512       last = &bq->blocking_queue;
2513       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2514            last = &t->link, t = t->link) {
2515         if (t == tso) {
2516           *last = tso->link;
2517           goto done;
2518         }
2519       }
2520       barf("unblockThread (BLACKHOLE): TSO not found");
2521     }
2522
2523   case BlockedOnException:
2524     {
2525       StgTSO *target  = tso->block_info.tso;
2526
2527       ASSERT(get_itbl(target)->type == TSO);
2528       ASSERT(target->blocked_exceptions != NULL);
2529
2530       last = &target->blocked_exceptions;
2531       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2532            last = &t->link, t = t->link) {
2533         ASSERT(get_itbl(t)->type == TSO);
2534         if (t == tso) {
2535           *last = tso->link;
2536           goto done;
2537         }
2538       }
2539       barf("unblockThread (Exception): TSO not found");
2540     }
2541
2542   case BlockedOnDelay:
2543   case BlockedOnRead:
2544   case BlockedOnWrite:
2545     {
2546       StgTSO *prev = NULL;
2547       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2548            prev = t, t = t->link) {
2549         if (t == tso) {
2550           if (prev == NULL) {
2551             blocked_queue_hd = t->link;
2552             if (blocked_queue_tl == t) {
2553               blocked_queue_tl = END_TSO_QUEUE;
2554             }
2555           } else {
2556             prev->link = t->link;
2557             if (blocked_queue_tl == t) {
2558               blocked_queue_tl = prev;
2559             }
2560           }
2561           goto done;
2562         }
2563       }
2564       barf("unblockThread (I/O): TSO not found");
2565     }
2566
2567   default:
2568     barf("unblockThread");
2569   }
2570
2571  done:
2572   tso->link = END_TSO_QUEUE;
2573   tso->why_blocked = NotBlocked;
2574   tso->block_info.closure = NULL;
2575   PUSH_ON_RUN_QUEUE(tso);
2576   RELEASE_LOCK(&sched_mutex);
2577 }
2578 #endif
2579
2580 /* -----------------------------------------------------------------------------
2581  * raiseAsync()
2582  *
2583  * The following function implements the magic for raising an
2584  * asynchronous exception in an existing thread.
2585  *
2586  * We first remove the thread from any queue on which it might be
2587  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2588  *
2589  * We strip the stack down to the innermost CATCH_FRAME, building
2590  * thunks in the heap for all the active computations, so they can 
2591  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2592  * an application of the handler to the exception, and push it on
2593  * the top of the stack.
2594  * 
2595  * How exactly do we save all the active computations?  We create an
2596  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2597  * AP_UPDs pushes everything from the corresponding update frame
2598  * upwards onto the stack.  (Actually, it pushes everything up to the
2599  * next update frame plus a pointer to the next AP_UPD object.
2600  * Entering the next AP_UPD object pushes more onto the stack until we
2601  * reach the last AP_UPD object - at which point the stack should look
2602  * exactly as it did when we killed the TSO and we can continue
2603  * execution by entering the closure on top of the stack.
2604  *
2605  * We can also kill a thread entirely - this happens if either (a) the 
2606  * exception passed to raiseAsync is NULL, or (b) there's no
2607  * CATCH_FRAME on the stack.  In either case, we strip the entire
2608  * stack and replace the thread with a zombie.
2609  *
2610  * -------------------------------------------------------------------------- */
2611  
2612 void 
2613 deleteThread(StgTSO *tso)
2614 {
2615   raiseAsync(tso,NULL);
2616 }
2617
2618 void
2619 raiseAsync(StgTSO *tso, StgClosure *exception)
2620 {
2621   StgUpdateFrame* su = tso->su;
2622   StgPtr          sp = tso->sp;
2623   
2624   /* Thread already dead? */
2625   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2626     return;
2627   }
2628
2629   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2630
2631   /* Remove it from any blocking queues */
2632   unblockThread(tso);
2633
2634   /* The stack freezing code assumes there's a closure pointer on
2635    * the top of the stack.  This isn't always the case with compiled
2636    * code, so we have to push a dummy closure on the top which just
2637    * returns to the next return address on the stack.
2638    */
2639   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2640     *(--sp) = (W_)&dummy_ret_closure;
2641   }
2642
2643   while (1) {
2644     int words = ((P_)su - (P_)sp) - 1;
2645     nat i;
2646     StgAP_UPD * ap;
2647
2648     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2649      * then build PAP(handler,exception,realworld#), and leave it on
2650      * top of the stack ready to enter.
2651      */
2652     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2653       StgCatchFrame *cf = (StgCatchFrame *)su;
2654       /* we've got an exception to raise, so let's pass it to the
2655        * handler in this frame.
2656        */
2657       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2658       TICK_ALLOC_UPD_PAP(3,0);
2659       SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2660               
2661       ap->n_args = 2;
2662       ap->fun = cf->handler;    /* :: Exception -> IO a */
2663       ap->payload[0] = (P_)exception;
2664       ap->payload[1] = ARG_TAG(0); /* realworld token */
2665
2666       /* throw away the stack from Sp up to and including the
2667        * CATCH_FRAME.
2668        */
2669       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2670       tso->su = cf->link;
2671
2672       /* Restore the blocked/unblocked state for asynchronous exceptions
2673        * at the CATCH_FRAME.  
2674        *
2675        * If exceptions were unblocked at the catch, arrange that they
2676        * are unblocked again after executing the handler by pushing an
2677        * unblockAsyncExceptions_ret stack frame.
2678        */
2679       if (!cf->exceptions_blocked) {
2680         *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2681       }
2682       
2683       /* Ensure that async exceptions are blocked when running the handler.
2684        */
2685       if (tso->blocked_exceptions == NULL) {
2686         tso->blocked_exceptions = END_TSO_QUEUE;
2687       }
2688       
2689       /* Put the newly-built PAP on top of the stack, ready to execute
2690        * when the thread restarts.
2691        */
2692       sp[0] = (W_)ap;
2693       tso->sp = sp;
2694       tso->what_next = ThreadEnterGHC;
2695       return;
2696     }
2697
2698     /* First build an AP_UPD consisting of the stack chunk above the
2699      * current update frame, with the top word on the stack as the
2700      * fun field.
2701      */
2702     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2703     
2704     ASSERT(words >= 0);
2705     
2706     ap->n_args = words;
2707     ap->fun    = (StgClosure *)sp[0];
2708     sp++;
2709     for(i=0; i < (nat)words; ++i) {
2710       ap->payload[i] = (P_)*sp++;
2711     }
2712     
2713     switch (get_itbl(su)->type) {
2714       
2715     case UPDATE_FRAME:
2716       {
2717         SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2718         TICK_ALLOC_UP_THK(words+1,0);
2719         
2720         IF_DEBUG(scheduler,
2721                  fprintf(stderr,  "scheduler: Updating ");
2722                  printPtr((P_)su->updatee); 
2723                  fprintf(stderr,  " with ");
2724                  printObj((StgClosure *)ap);
2725                  );
2726         
2727         /* Replace the updatee with an indirection - happily
2728          * this will also wake up any threads currently
2729          * waiting on the result.
2730          */
2731         UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2732         su = su->link;
2733         sp += sizeofW(StgUpdateFrame) -1;
2734         sp[0] = (W_)ap; /* push onto stack */
2735         break;
2736       }
2737       
2738     case CATCH_FRAME:
2739       {
2740         StgCatchFrame *cf = (StgCatchFrame *)su;
2741         StgClosure* o;
2742         
2743         /* We want a PAP, not an AP_UPD.  Fortunately, the
2744          * layout's the same.
2745          */
2746         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2747         TICK_ALLOC_UPD_PAP(words+1,0);
2748         
2749         /* now build o = FUN(catch,ap,handler) */
2750         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2751         TICK_ALLOC_FUN(2,0);
2752         SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2753         o->payload[0] = (StgClosure *)ap;
2754         o->payload[1] = cf->handler;
2755         
2756         IF_DEBUG(scheduler,
2757                  fprintf(stderr,  "scheduler: Built ");
2758                  printObj((StgClosure *)o);
2759                  );
2760         
2761         /* pop the old handler and put o on the stack */
2762         su = cf->link;
2763         sp += sizeofW(StgCatchFrame) - 1;
2764         sp[0] = (W_)o;
2765         break;
2766       }
2767       
2768     case SEQ_FRAME:
2769       {
2770         StgSeqFrame *sf = (StgSeqFrame *)su;
2771         StgClosure* o;
2772         
2773         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2774         TICK_ALLOC_UPD_PAP(words+1,0);
2775         
2776         /* now build o = FUN(seq,ap) */
2777         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2778         TICK_ALLOC_SE_THK(1,0);
2779         SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2780         o->payload[0] = (StgClosure *)ap;
2781         
2782         IF_DEBUG(scheduler,
2783                  fprintf(stderr,  "scheduler: Built ");
2784                  printObj((StgClosure *)o);
2785                  );
2786         
2787         /* pop the old handler and put o on the stack */
2788         su = sf->link;
2789         sp += sizeofW(StgSeqFrame) - 1;
2790         sp[0] = (W_)o;
2791         break;
2792       }
2793       
2794     case STOP_FRAME:
2795       /* We've stripped the entire stack, the thread is now dead. */
2796       sp += sizeofW(StgStopFrame) - 1;
2797       sp[0] = (W_)exception;    /* save the exception */
2798       tso->what_next = ThreadKilled;
2799       tso->su = (StgUpdateFrame *)(sp+1);
2800       tso->sp = sp;
2801       return;
2802       
2803     default:
2804       barf("raiseAsync");
2805     }
2806   }
2807   barf("raiseAsync");
2808 }
2809
2810 /* -----------------------------------------------------------------------------
2811    resurrectThreads is called after garbage collection on the list of
2812    threads found to be garbage.  Each of these threads will be woken
2813    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2814    on an MVar, or NonTermination if the thread was blocked on a Black
2815    Hole.
2816    -------------------------------------------------------------------------- */
2817
2818 void
2819 resurrectThreads( StgTSO *threads )
2820 {
2821   StgTSO *tso, *next;
2822
2823   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2824     next = tso->global_link;
2825     tso->global_link = all_threads;
2826     all_threads = tso;
2827     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2828
2829     switch (tso->why_blocked) {
2830     case BlockedOnMVar:
2831     case BlockedOnException:
2832       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2833       break;
2834     case BlockedOnBlackHole:
2835       raiseAsync(tso,(StgClosure *)NonTermination_closure);
2836       break;
2837     case NotBlocked:
2838       /* This might happen if the thread was blocked on a black hole
2839        * belonging to a thread that we've just woken up (raiseAsync
2840        * can wake up threads, remember...).
2841        */
2842       continue;
2843     default:
2844       barf("resurrectThreads: thread blocked in a strange way");
2845     }
2846   }
2847 }
2848
2849 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2850 //@subsection Debugging Routines
2851
2852 /* -----------------------------------------------------------------------------
2853    Debugging: why is a thread blocked
2854    -------------------------------------------------------------------------- */
2855
2856 #ifdef DEBUG
2857
2858 void
2859 printThreadBlockage(StgTSO *tso)
2860 {
2861   switch (tso->why_blocked) {
2862   case BlockedOnRead:
2863     fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2864     break;
2865   case BlockedOnWrite:
2866     fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2867     break;
2868   case BlockedOnDelay:
2869 #if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
2870     fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2871 #else
2872     fprintf(stderr,"blocked on delay of %d ms", 
2873             tso->block_info.target - getourtimeofday());
2874 #endif
2875     break;
2876   case BlockedOnMVar:
2877     fprintf(stderr,"blocked on an MVar");
2878     break;
2879   case BlockedOnException:
2880     fprintf(stderr,"blocked on delivering an exception to thread %d",
2881             tso->block_info.tso->id);
2882     break;
2883   case BlockedOnBlackHole:
2884     fprintf(stderr,"blocked on a black hole");
2885     break;
2886   case NotBlocked:
2887     fprintf(stderr,"not blocked");
2888     break;
2889 #if defined(PAR)
2890   case BlockedOnGA:
2891     fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)",
2892             tso->block_info.closure, info_type(tso->block_info.closure));
2893     break;
2894   case BlockedOnGA_NoSend:
2895     fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)",
2896             tso->block_info.closure, info_type(tso->block_info.closure));
2897     break;
2898 #endif
2899   default:
2900     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
2901          tso->why_blocked, tso->id, tso);
2902   }
2903 }
2904
2905 void
2906 printThreadStatus(StgTSO *tso)
2907 {
2908   switch (tso->what_next) {
2909   case ThreadKilled:
2910     fprintf(stderr,"has been killed");
2911     break;
2912   case ThreadComplete:
2913     fprintf(stderr,"has completed");
2914     break;
2915   default:
2916     printThreadBlockage(tso);
2917   }
2918 }
2919
2920 void
2921 printAllThreads(void)
2922 {
2923   StgTSO *t;
2924
2925   sched_belch("all threads:");
2926   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2927     fprintf(stderr, "\tthread %d is ", t->id);
2928     printThreadStatus(t);
2929     fprintf(stderr,"\n");
2930   }
2931 }
2932     
2933 /* 
2934    Print a whole blocking queue attached to node (debugging only).
2935 */
2936 //@cindex print_bq
2937 # if defined(PAR)
2938 void 
2939 print_bq (StgClosure *node)
2940 {
2941   StgBlockingQueueElement *bqe;
2942   StgTSO *tso;
2943   rtsBool end;
2944
2945   fprintf(stderr,"## BQ of closure %p (%s): ",
2946           node, info_type(node));
2947
2948   /* should cover all closures that may have a blocking queue */
2949   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2950          get_itbl(node)->type == FETCH_ME_BQ ||
2951          get_itbl(node)->type == RBH);
2952     
2953   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2954   /* 
2955      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2956   */
2957   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2958        !end; // iterate until bqe points to a CONSTR
2959        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2960     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2961     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2962     /* types of closures that may appear in a blocking queue */
2963     ASSERT(get_itbl(bqe)->type == TSO ||           
2964            get_itbl(bqe)->type == BLOCKED_FETCH || 
2965            get_itbl(bqe)->type == CONSTR); 
2966     /* only BQs of an RBH end with an RBH_Save closure */
2967     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2968
2969     switch (get_itbl(bqe)->type) {
2970     case TSO:
2971       fprintf(stderr," TSO %d (%x),",
2972               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
2973       break;
2974     case BLOCKED_FETCH:
2975       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
2976               ((StgBlockedFetch *)bqe)->node, 
2977               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
2978               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
2979               ((StgBlockedFetch *)bqe)->ga.weight);
2980       break;
2981     case CONSTR:
2982       fprintf(stderr," %s (IP %p),",
2983               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2984                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2985                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2986                "RBH_Save_?"), get_itbl(bqe));
2987       break;
2988     default:
2989       barf("Unexpected closure type %s in blocking queue of %p (%s)",
2990            info_type(bqe), node, info_type(node));
2991       break;
2992     }
2993   } /* for */
2994   fputc('\n', stderr);
2995 }
2996 # elif defined(GRAN)
2997 void 
2998 print_bq (StgClosure *node)
2999 {
3000   StgBlockingQueueElement *bqe;
3001   PEs node_loc, tso_loc;
3002   rtsBool end;
3003
3004   /* should cover all closures that may have a blocking queue */
3005   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3006          get_itbl(node)->type == FETCH_ME_BQ ||
3007          get_itbl(node)->type == RBH);
3008     
3009   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3010   node_loc = where_is(node);
3011
3012   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3013           node, info_type(node), node_loc);
3014
3015   /* 
3016      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3017   */
3018   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3019        !end; // iterate until bqe points to a CONSTR
3020        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3021     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3022     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3023     /* types of closures that may appear in a blocking queue */
3024     ASSERT(get_itbl(bqe)->type == TSO ||           
3025            get_itbl(bqe)->type == CONSTR); 
3026     /* only BQs of an RBH end with an RBH_Save closure */
3027     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3028
3029     tso_loc = where_is((StgClosure *)bqe);
3030     switch (get_itbl(bqe)->type) {
3031     case TSO:
3032       fprintf(stderr," TSO %d (%p) on [PE %d],",
3033               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3034       break;
3035     case CONSTR:
3036       fprintf(stderr," %s (IP %p),",
3037               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3038                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3039                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3040                "RBH_Save_?"), get_itbl(bqe));
3041       break;
3042     default:
3043       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3044            info_type((StgClosure *)bqe), node, info_type(node));
3045       break;
3046     }
3047   } /* for */
3048   fputc('\n', stderr);
3049 }
3050 #else
3051 /* 
3052    Nice and easy: only TSOs on the blocking queue
3053 */
3054 void 
3055 print_bq (StgClosure *node)
3056 {
3057   StgTSO *tso;
3058
3059   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3060   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3061        tso != END_TSO_QUEUE; 
3062        tso=tso->link) {
3063     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3064     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3065     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3066   }
3067   fputc('\n', stderr);
3068 }
3069 # endif
3070
3071 #if defined(PAR)
3072 static nat
3073 run_queue_len(void)
3074 {
3075   nat i;
3076   StgTSO *tso;
3077
3078   for (i=0, tso=run_queue_hd; 
3079        tso != END_TSO_QUEUE;
3080        i++, tso=tso->link)
3081     /* nothing */
3082
3083   return i;
3084 }
3085 #endif
3086
3087 static void
3088 sched_belch(char *s, ...)
3089 {
3090   va_list ap;
3091   va_start(ap,s);
3092 #ifdef SMP
3093   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3094 #else
3095   fprintf(stderr, "scheduler: ");
3096 #endif
3097   vfprintf(stderr, s, ap);
3098   fprintf(stderr, "\n");
3099 }
3100
3101 #endif /* DEBUG */
3102
3103
3104 //@node Index,  , Debugging Routines, Main scheduling code
3105 //@subsection Index
3106
3107 //@index
3108 //* MainRegTable::  @cindex\s-+MainRegTable
3109 //* StgMainThread::  @cindex\s-+StgMainThread
3110 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3111 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3112 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3113 //* context_switch::  @cindex\s-+context_switch
3114 //* createThread::  @cindex\s-+createThread
3115 //* free_capabilities::  @cindex\s-+free_capabilities
3116 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3117 //* initScheduler::  @cindex\s-+initScheduler
3118 //* interrupted::  @cindex\s-+interrupted
3119 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3120 //* next_thread_id::  @cindex\s-+next_thread_id
3121 //* print_bq::  @cindex\s-+print_bq
3122 //* run_queue_hd::  @cindex\s-+run_queue_hd
3123 //* run_queue_tl::  @cindex\s-+run_queue_tl
3124 //* sched_mutex::  @cindex\s-+sched_mutex
3125 //* schedule::  @cindex\s-+schedule
3126 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3127 //* task_ids::  @cindex\s-+task_ids
3128 //* term_mutex::  @cindex\s-+term_mutex
3129 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3130 //@end index