[project @ 2000-04-26 09:44:18 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.69 2000/04/26 09:44:28 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Main scheduling loop::      
41 //* Suspend and Resume::        
42 //* Run queue code::            
43 //* Garbage Collextion Routines::  
44 //* Blocking Queue Routines::   
45 //* Exception Handling Routines::  
46 //* Debugging Routines::        
47 //* Index::                     
48 //@end menu
49
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
52
53 #include "Rts.h"
54 #include "SchedAPI.h"
55 #include "RtsUtils.h"
56 #include "RtsFlags.h"
57 #include "Storage.h"
58 #include "StgRun.h"
59 #include "StgStartup.h"
60 #include "GC.h"
61 #include "Hooks.h"
62 #include "Schedule.h"
63 #include "StgMiscClosures.h"
64 #include "Storage.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
67 #include "Printer.h"
68 #include "Main.h"
69 #include "Signals.h"
70 #include "Sanity.h"
71 #include "Stats.h"
72 #include "Itimer.h"
73 #include "Prelude.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
76 # include "GranSim.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
80 # include "FetchMe.h"
81 # include "HLC.h"
82 #endif
83 #include "Sparks.h"
84
85 #include <stdarg.h>
86
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
89
90 /* Main threads:
91  *
92  * These are the threads which clients have requested that we run.  
93  *
94  * In an SMP build, we might have several concurrent clients all
95  * waiting for results, and each one will wait on a condition variable
96  * until the result is available.
97  *
98  * In non-SMP, clients are strictly nested: the first client calls
99  * into the RTS, which might call out again to C with a _ccall_GC, and
100  * eventually re-enter the RTS.
101  *
102  * Main threads information is kept in a linked list:
103  */
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
106   StgTSO *         tso;
107   SchedulerStatus  stat;
108   StgClosure **    ret;
109 #ifdef SMP
110   pthread_cond_t wakeup;
111 #endif
112   struct StgMainThread_ *link;
113 } StgMainThread;
114
115 /* Main thread queue.
116  * Locks required: sched_mutex.
117  */
118 static StgMainThread *main_threads;
119
120 /* Thread queues.
121  * Locks required: sched_mutex.
122  */
123 #if defined(GRAN)
124
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
127
128 /* 
129    In GranSim we have a runable and a blocked queue for each processor.
130    In order to minimise code changes new arrays run_queue_hds/tls
131    are created. run_queue_hd is then a short cut (macro) for
132    run_queue_hds[CurrentProc] (see GranSim.h).
133    -- HWL
134 */
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139    the std RTS (i.e. we are cheating). However, we don't use this list in
140    the GranSim specific code at the moment (so we are only potentially
141    cheating).  */
142
143 #else /* !GRAN */
144
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
147
148 #endif
149
150 /* Linked list of all threads.
151  * Used for detecting garbage collected threads.
152  */
153 StgTSO *all_threads;
154
155 /* Threads suspended in _ccall_GC.
156  */
157 static StgTSO *suspended_ccalling_threads;
158
159 static void GetRoots(void);
160 static StgTSO *threadStackOverflow(StgTSO *tso);
161
162 /* KH: The following two flags are shared memory locations.  There is no need
163        to lock them, since they are only unset at the end of a scheduler
164        operation.
165 */
166
167 /* flag set by signal handler to precipitate a context switch */
168 //@cindex context_switch
169 nat context_switch;
170
171 /* if this flag is set as well, give up execution */
172 //@cindex interrupted
173 rtsBool interrupted;
174
175 /* Next thread ID to allocate.
176  * Locks required: sched_mutex
177  */
178 //@cindex next_thread_id
179 StgThreadID next_thread_id = 1;
180
181 /*
182  * Pointers to the state of the current thread.
183  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
185  */
186  
187 /* The smallest stack size that makes any sense is:
188  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
189  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
190  *  + 1                       (the realworld token for an IO thread)
191  *  + 1                       (the closure to enter)
192  *
193  * A thread with this stack will bomb immediately with a stack
194  * overflow, which will increase its stack size.  
195  */
196
197 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
198
199 /* Free capability list.
200  * Locks required: sched_mutex.
201  */
202 #ifdef SMP
203 //@cindex free_capabilities
204 //@cindex n_free_capabilities
205 Capability *free_capabilities; /* Available capabilities for running threads */
206 nat n_free_capabilities;       /* total number of available capabilities */
207 #else
208 //@cindex MainRegTable
209 Capability MainRegTable;       /* for non-SMP, we have one global capability */
210 #endif
211
212 #if defined(GRAN)
213 StgTSO *CurrentTSO;
214 #endif
215
216 rtsBool ready_to_gc;
217
218 /* All our current task ids, saved in case we need to kill them later.
219  */
220 #ifdef SMP
221 //@cindex task_ids
222 task_info *task_ids;
223 #endif
224
225 void            addToBlockedQueue ( StgTSO *tso );
226
227 static void     schedule          ( void );
228        void     interruptStgRts   ( void );
229 #if defined(GRAN)
230 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
231 #else
232 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
233 #endif
234
235 #ifdef DEBUG
236 static void sched_belch(char *s, ...);
237 #endif
238
239 #ifdef SMP
240 //@cindex sched_mutex
241 //@cindex term_mutex
242 //@cindex thread_ready_cond
243 //@cindex gc_pending_cond
244 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
245 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
246 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
247 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
248
249 nat await_death;
250 #endif
251
252 #if defined(PAR)
253 StgTSO *LastTSO;
254 rtsTime TimeOfLastYield;
255 #endif
256
257 #if DEBUG
258 char *whatNext_strs[] = {
259   "ThreadEnterGHC",
260   "ThreadRunGHC",
261   "ThreadEnterHugs",
262   "ThreadKilled",
263   "ThreadComplete"
264 };
265
266 char *threadReturnCode_strs[] = {
267   "HeapOverflow",                       /* might also be StackOverflow */
268   "StackOverflow",
269   "ThreadYielding",
270   "ThreadBlocked",
271   "ThreadFinished"
272 };
273 #endif
274
275 /*
276  * The thread state for the main thread.
277 // ToDo: check whether not needed any more
278 StgTSO   *MainTSO;
279  */
280
281 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
282 //@subsection Main scheduling loop
283
284 /* ---------------------------------------------------------------------------
285    Main scheduling loop.
286
287    We use round-robin scheduling, each thread returning to the
288    scheduler loop when one of these conditions is detected:
289
290       * out of heap space
291       * timer expires (thread yields)
292       * thread blocks
293       * thread ends
294       * stack overflow
295
296    Locking notes:  we acquire the scheduler lock once at the beginning
297    of the scheduler loop, and release it when
298     
299       * running a thread, or
300       * waiting for work, or
301       * waiting for a GC to complete.
302
303    GRAN version:
304      In a GranSim setup this loop iterates over the global event queue.
305      This revolves around the global event queue, which determines what 
306      to do next. Therefore, it's more complicated than either the 
307      concurrent or the parallel (GUM) setup.
308
309    GUM version:
310      GUM iterates over incoming messages.
311      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312      and sends out a fish whenever it has nothing to do; in-between
313      doing the actual reductions (shared code below) it processes the
314      incoming messages and deals with delayed operations 
315      (see PendingFetches).
316      This is not the ugliest code you could imagine, but it's bloody close.
317
318    ------------------------------------------------------------------------ */
319 //@cindex schedule
320 static void
321 schedule( void )
322 {
323   StgTSO *t;
324   Capability *cap;
325   StgThreadReturnCode ret;
326 #if defined(GRAN)
327   rtsEvent *event;
328 #elif defined(PAR)
329   StgSparkPool *pool;
330   rtsSpark spark;
331   StgTSO *tso;
332   GlobalTaskId pe;
333 #endif
334   rtsBool was_interrupted = rtsFalse;
335   
336   ACQUIRE_LOCK(&sched_mutex);
337
338 #if defined(GRAN)
339
340   /* set up first event to get things going */
341   /* ToDo: assign costs for system setup and init MainTSO ! */
342   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
343             ContinueThread, 
344             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
345
346   IF_DEBUG(gran,
347            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
348            G_TSO(CurrentTSO, 5));
349
350   if (RtsFlags.GranFlags.Light) {
351     /* Save current time; GranSim Light only */
352     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
353   }      
354
355   event = get_next_event();
356
357   while (event!=(rtsEvent*)NULL) {
358     /* Choose the processor with the next event */
359     CurrentProc = event->proc;
360     CurrentTSO = event->tso;
361
362 #elif defined(PAR)
363
364   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
365
366 #else
367
368   while (1) {
369
370 #endif
371
372     IF_DEBUG(scheduler, printAllThreads());
373
374     /* If we're interrupted (the user pressed ^C, or some other
375      * termination condition occurred), kill all the currently running
376      * threads.
377      */
378     if (interrupted) {
379       IF_DEBUG(scheduler, sched_belch("interrupted"));
380       for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
381         deleteThread(t);
382       }
383       for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
384         deleteThread(t);
385       }
386       run_queue_hd = run_queue_tl = END_TSO_QUEUE;
387       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
388       interrupted = rtsFalse;
389       was_interrupted = rtsTrue;
390     }
391
392     /* Go through the list of main threads and wake up any
393      * clients whose computations have finished.  ToDo: this
394      * should be done more efficiently without a linear scan
395      * of the main threads list, somehow...
396      */
397 #ifdef SMP
398     { 
399       StgMainThread *m, **prev;
400       prev = &main_threads;
401       for (m = main_threads; m != NULL; m = m->link) {
402         switch (m->tso->what_next) {
403         case ThreadComplete:
404           if (m->ret) {
405             *(m->ret) = (StgClosure *)m->tso->sp[0];
406           }
407           *prev = m->link;
408           m->stat = Success;
409           pthread_cond_broadcast(&m->wakeup);
410           break;
411         case ThreadKilled:
412           *prev = m->link;
413           if (was_interrupted) {
414             m->stat = Interrupted;
415           } else {
416             m->stat = Killed;
417           }
418           pthread_cond_broadcast(&m->wakeup);
419           break;
420         default:
421           break;
422         }
423       }
424     }
425
426 #else
427 # if defined(PAR)
428     /* in GUM do this only on the Main PE */
429     if (IAmMainThread)
430 # endif
431     /* If our main thread has finished or been killed, return.
432      */
433     {
434       StgMainThread *m = main_threads;
435       if (m->tso->what_next == ThreadComplete
436           || m->tso->what_next == ThreadKilled) {
437         main_threads = main_threads->link;
438         if (m->tso->what_next == ThreadComplete) {
439           /* we finished successfully, fill in the return value */
440           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
441           m->stat = Success;
442           return;
443         } else {
444           if (was_interrupted) {
445             m->stat = Interrupted;
446           } else {
447             m->stat = Killed;
448           }
449           return;
450         }
451       }
452     }
453 #endif
454
455     /* Top up the run queue from our spark pool.  We try to make the
456      * number of threads in the run queue equal to the number of
457      * free capabilities.
458      */
459 #if defined(SMP)
460     {
461       nat n = n_free_capabilities;
462       StgTSO *tso = run_queue_hd;
463
464       /* Count the run queue */
465       while (n > 0 && tso != END_TSO_QUEUE) {
466         tso = tso->link;
467         n--;
468       }
469
470       for (; n > 0; n--) {
471         StgClosure *spark;
472         spark = findSpark();
473         if (spark == NULL) {
474           break; /* no more sparks in the pool */
475         } else {
476           /* I'd prefer this to be done in activateSpark -- HWL */
477           /* tricky - it needs to hold the scheduler lock and
478            * not try to re-acquire it -- SDM */
479           StgTSO *tso;
480           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
481           pushClosure(tso,spark);
482           PUSH_ON_RUN_QUEUE(tso);
483 #ifdef PAR
484           advisory_thread_count++;
485 #endif
486           
487           IF_DEBUG(scheduler,
488                    sched_belch("turning spark of closure %p into a thread",
489                                (StgClosure *)spark));
490         }
491       }
492       /* We need to wake up the other tasks if we just created some
493        * work for them.
494        */
495       if (n_free_capabilities - n > 1) {
496           pthread_cond_signal(&thread_ready_cond);
497       }
498     }
499 #endif /* SMP */
500
501     /* Check whether any waiting threads need to be woken up.  If the
502      * run queue is empty, and there are no other tasks running, we
503      * can wait indefinitely for something to happen.
504      * ToDo: what if another client comes along & requests another
505      * main thread?
506      */
507     if (blocked_queue_hd != END_TSO_QUEUE) {
508       awaitEvent(
509            (run_queue_hd == END_TSO_QUEUE)
510 #ifdef SMP
511         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
512 #endif
513         );
514     }
515     
516     /* check for signals each time around the scheduler */
517 #ifndef __MINGW32__
518     if (signals_pending()) {
519       start_signal_handlers();
520     }
521 #endif
522
523     /* Detect deadlock: when we have no threads to run, there are
524      * no threads waiting on I/O or sleeping, and all the other
525      * tasks are waiting for work, we must have a deadlock.  Inform
526      * all the main threads.
527      */
528 #ifdef SMP
529     if (blocked_queue_hd == END_TSO_QUEUE
530         && run_queue_hd == END_TSO_QUEUE
531         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
532         ) {
533       StgMainThread *m;
534       for (m = main_threads; m != NULL; m = m->link) {
535           m->ret = NULL;
536           m->stat = Deadlock;
537           pthread_cond_broadcast(&m->wakeup);
538       }
539       main_threads = NULL;
540     }
541 #else /* ! SMP */
542     if (blocked_queue_hd == END_TSO_QUEUE
543         && run_queue_hd == END_TSO_QUEUE) {
544         StgMainThread *m = main_threads;
545         m->ret = NULL;
546         m->stat = Deadlock;
547         main_threads = m->link;
548         return;
549     }
550 #endif
551
552 #ifdef SMP
553     /* If there's a GC pending, don't do anything until it has
554      * completed.
555      */
556     if (ready_to_gc) {
557       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
558       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
559     }
560     
561     /* block until we've got a thread on the run queue and a free
562      * capability.
563      */
564     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
565       IF_DEBUG(scheduler, sched_belch("waiting for work"));
566       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
567       IF_DEBUG(scheduler, sched_belch("work now available"));
568     }
569 #endif
570
571 #if defined(GRAN)
572
573     if (RtsFlags.GranFlags.Light)
574       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
575
576     /* adjust time based on time-stamp */
577     if (event->time > CurrentTime[CurrentProc] &&
578         event->evttype != ContinueThread)
579       CurrentTime[CurrentProc] = event->time;
580     
581     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
582     if (!RtsFlags.GranFlags.Light)
583       handleIdlePEs();
584
585     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
586
587     /* main event dispatcher in GranSim */
588     switch (event->evttype) {
589       /* Should just be continuing execution */
590     case ContinueThread:
591       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
592       /* ToDo: check assertion
593       ASSERT(run_queue_hd != (StgTSO*)NULL &&
594              run_queue_hd != END_TSO_QUEUE);
595       */
596       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
597       if (!RtsFlags.GranFlags.DoAsyncFetch &&
598           procStatus[CurrentProc]==Fetching) {
599         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
600               CurrentTSO->id, CurrentTSO, CurrentProc);
601         goto next_thread;
602       } 
603       /* Ignore ContinueThreads for completed threads */
604       if (CurrentTSO->what_next == ThreadComplete) {
605         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
606               CurrentTSO->id, CurrentTSO, CurrentProc);
607         goto next_thread;
608       } 
609       /* Ignore ContinueThreads for threads that are being migrated */
610       if (PROCS(CurrentTSO)==Nowhere) { 
611         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
612               CurrentTSO->id, CurrentTSO, CurrentProc);
613         goto next_thread;
614       }
615       /* The thread should be at the beginning of the run queue */
616       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
617         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
618               CurrentTSO->id, CurrentTSO, CurrentProc);
619         break; // run the thread anyway
620       }
621       /*
622       new_event(proc, proc, CurrentTime[proc],
623                 FindWork,
624                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
625       goto next_thread; 
626       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
627       break; // now actually run the thread; DaH Qu'vam yImuHbej 
628
629     case FetchNode:
630       do_the_fetchnode(event);
631       goto next_thread;             /* handle next event in event queue  */
632       
633     case GlobalBlock:
634       do_the_globalblock(event);
635       goto next_thread;             /* handle next event in event queue  */
636       
637     case FetchReply:
638       do_the_fetchreply(event);
639       goto next_thread;             /* handle next event in event queue  */
640       
641     case UnblockThread:   /* Move from the blocked queue to the tail of */
642       do_the_unblock(event);
643       goto next_thread;             /* handle next event in event queue  */
644       
645     case ResumeThread:  /* Move from the blocked queue to the tail of */
646       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
647       event->tso->gran.blocktime += 
648         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
649       do_the_startthread(event);
650       goto next_thread;             /* handle next event in event queue  */
651       
652     case StartThread:
653       do_the_startthread(event);
654       goto next_thread;             /* handle next event in event queue  */
655       
656     case MoveThread:
657       do_the_movethread(event);
658       goto next_thread;             /* handle next event in event queue  */
659       
660     case MoveSpark:
661       do_the_movespark(event);
662       goto next_thread;             /* handle next event in event queue  */
663       
664     case FindWork:
665       do_the_findwork(event);
666       goto next_thread;             /* handle next event in event queue  */
667       
668     default:
669       barf("Illegal event type %u\n", event->evttype);
670     }  /* switch */
671     
672     /* This point was scheduler_loop in the old RTS */
673
674     IF_DEBUG(gran, belch("GRAN: after main switch"));
675
676     TimeOfLastEvent = CurrentTime[CurrentProc];
677     TimeOfNextEvent = get_time_of_next_event();
678     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
679     // CurrentTSO = ThreadQueueHd;
680
681     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
682                          TimeOfNextEvent));
683
684     if (RtsFlags.GranFlags.Light) 
685       GranSimLight_leave_system(event, &ActiveTSO); 
686
687     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
688
689     IF_DEBUG(gran, 
690              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
691
692     /* in a GranSim setup the TSO stays on the run queue */
693     t = CurrentTSO;
694     /* Take a thread from the run queue. */
695     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
696
697     IF_DEBUG(gran, 
698              fprintf(stderr, "GRAN: About to run current thread, which is\n");
699              G_TSO(t,5))
700
701     context_switch = 0; // turned on via GranYield, checking events and time slice
702
703     IF_DEBUG(gran, 
704              DumpGranEvent(GR_SCHEDULE, t));
705
706     procStatus[CurrentProc] = Busy;
707
708 #elif defined(PAR)
709
710     if (PendingFetches != END_BF_QUEUE) {
711         processFetches();
712     }
713
714     /* ToDo: phps merge with spark activation above */
715     /* check whether we have local work and send requests if we have none */
716     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
717       /* :-[  no local threads => look out for local sparks */
718       /* the spark pool for the current PE */
719       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
720       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
721           pool->hd < pool->tl) {
722         /* 
723          * ToDo: add GC code check that we really have enough heap afterwards!!
724          * Old comment:
725          * If we're here (no runnable threads) and we have pending
726          * sparks, we must have a space problem.  Get enough space
727          * to turn one of those pending sparks into a
728          * thread... 
729          */
730         
731         spark = findSpark();                /* get a spark */
732         if (spark != (rtsSpark) NULL) {
733           tso = activateSpark(spark);       /* turn the spark into a thread */
734           IF_PAR_DEBUG(schedule,
735                        belch("==== schedule: Created TSO %d (%p); %d threads active",
736                              tso->id, tso, advisory_thread_count));
737
738           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
739             belch("==^^ failed to activate spark");
740             goto next_thread;
741           }               /* otherwise fall through & pick-up new tso */
742         } else {
743           IF_PAR_DEBUG(verbose,
744                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
745                              spark_queue_len(pool)));
746           goto next_thread;
747         }
748       } else  
749       /* =8-[  no local sparks => look for work on other PEs */
750       {
751         /*
752          * We really have absolutely no work.  Send out a fish
753          * (there may be some out there already), and wait for
754          * something to arrive.  We clearly can't run any threads
755          * until a SCHEDULE or RESUME arrives, and so that's what
756          * we're hoping to see.  (Of course, we still have to
757          * respond to other types of messages.)
758          */
759         if (//!fishing &&  
760             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
761           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
762           /* fishing set in sendFish, processFish;
763              avoid flooding system with fishes via delay */
764           pe = choosePE();
765           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
766                    NEW_FISH_HUNGER);
767         }
768         
769         processMessages();
770         goto next_thread;
771         // ReSchedule(0);
772       }
773     } else if (PacketsWaiting()) {  /* Look for incoming messages */
774       processMessages();
775     }
776
777     /* Now we are sure that we have some work available */
778     ASSERT(run_queue_hd != END_TSO_QUEUE);
779     /* Take a thread from the run queue, if we have work */
780     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
781
782     /* ToDo: write something to the log-file
783     if (RTSflags.ParFlags.granSimStats && !sameThread)
784         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
785
786     CurrentTSO = t;
787     */
788     /* the spark pool for the current PE */
789     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
790
791     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)", 
792                               spark_queue_len(pool), 
793                               CURRENT_PROC,
794                               pool->hd, pool->tl, pool->base, pool->lim));
795
796     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
797                               run_queue_len(), CURRENT_PROC,
798                               run_queue_hd, run_queue_tl));
799
800 #if 0
801     if (t != LastTSO) {
802       /* 
803          we are running a different TSO, so write a schedule event to log file
804          NB: If we use fair scheduling we also have to write  a deschedule 
805              event for LastTSO; with unfair scheduling we know that the
806              previous tso has blocked whenever we switch to another tso, so
807              we don't need it in GUM for now
808       */
809       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
810                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
811       
812     }
813 #endif
814 #else /* !GRAN && !PAR */
815   
816     /* grab a thread from the run queue
817      */
818     ASSERT(run_queue_hd != END_TSO_QUEUE);
819     t = POP_RUN_QUEUE();
820     IF_DEBUG(sanity,checkTSO(t));
821
822 #endif
823     
824     /* grab a capability
825      */
826 #ifdef SMP
827     cap = free_capabilities;
828     free_capabilities = cap->link;
829     n_free_capabilities--;
830 #else
831     cap = &MainRegTable;
832 #endif
833     
834     cap->rCurrentTSO = t;
835     
836     /* set the context_switch flag
837      */
838     if (run_queue_hd == END_TSO_QUEUE)
839       context_switch = 0;
840     else
841       context_switch = 1;
842
843     RELEASE_LOCK(&sched_mutex);
844
845     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
846                               t->id, t, whatNext_strs[t->what_next]));
847
848     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
849     /* Run the current thread 
850      */
851     switch (cap->rCurrentTSO->what_next) {
852     case ThreadKilled:
853     case ThreadComplete:
854       /* Thread already finished, return to scheduler. */
855       ret = ThreadFinished;
856       break;
857     case ThreadEnterGHC:
858       ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
859       break;
860     case ThreadRunGHC:
861       ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
862       break;
863     case ThreadEnterHugs:
864 #ifdef INTERPRETER
865       {
866          StgClosure* c;
867          IF_DEBUG(scheduler,sched_belch("entering Hugs"));
868          c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
869          cap->rCurrentTSO->sp += 1;
870          ret = enter(cap,c);
871          break;
872       }
873 #else
874       barf("Panic: entered a BCO but no bytecode interpreter in this build");
875 #endif
876     default:
877       barf("schedule: invalid what_next field");
878     }
879     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
880     
881     /* Costs for the scheduler are assigned to CCS_SYSTEM */
882 #ifdef PROFILING
883     CCCS = CCS_SYSTEM;
884 #endif
885     
886     ACQUIRE_LOCK(&sched_mutex);
887
888 #ifdef SMP
889     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
890 #elif !defined(GRAN) && !defined(PAR)
891     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
892 #endif
893     t = cap->rCurrentTSO;
894     
895 #if defined(PAR)
896     /* HACK 675: if the last thread didn't yield, make sure to print a 
897        SCHEDULE event to the log file when StgRunning the next thread, even
898        if it is the same one as before */
899     LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t; 
900     TimeOfLastYield = CURRENT_TIME;
901 #endif
902
903     switch (ret) {
904     case HeapOverflow:
905       /* make all the running tasks block on a condition variable,
906        * maybe set context_switch and wait till they all pile in,
907        * then have them wait on a GC condition variable.
908        */
909       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
910                                t->id, t, whatNext_strs[t->what_next]));
911       threadPaused(t);
912 #if defined(GRAN)
913       ASSERT(!is_on_queue(t,CurrentProc));
914 #endif
915       
916       ready_to_gc = rtsTrue;
917       context_switch = 1;               /* stop other threads ASAP */
918       PUSH_ON_RUN_QUEUE(t);
919       /* actual GC is done at the end of the while loop */
920       break;
921       
922     case StackOverflow:
923       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
924                                t->id, t, whatNext_strs[t->what_next]));
925       /* just adjust the stack for this thread, then pop it back
926        * on the run queue.
927        */
928       threadPaused(t);
929       { 
930         StgMainThread *m;
931         /* enlarge the stack */
932         StgTSO *new_t = threadStackOverflow(t);
933         
934         /* This TSO has moved, so update any pointers to it from the
935          * main thread stack.  It better not be on any other queues...
936          * (it shouldn't be).
937          */
938         for (m = main_threads; m != NULL; m = m->link) {
939           if (m->tso == t) {
940             m->tso = new_t;
941           }
942         }
943         threadPaused(new_t);
944         PUSH_ON_RUN_QUEUE(new_t);
945       }
946       break;
947
948     case ThreadYielding:
949 #if defined(GRAN)
950       IF_DEBUG(gran, 
951                DumpGranEvent(GR_DESCHEDULE, t));
952       globalGranStats.tot_yields++;
953 #elif defined(PAR)
954       IF_DEBUG(par, 
955                DumpGranEvent(GR_DESCHEDULE, t));
956 #endif
957       /* put the thread back on the run queue.  Then, if we're ready to
958        * GC, check whether this is the last task to stop.  If so, wake
959        * up the GC thread.  getThread will block during a GC until the
960        * GC is finished.
961        */
962       IF_DEBUG(scheduler,
963                if (t->what_next == ThreadEnterHugs) {
964                    /* ToDo: or maybe a timer expired when we were in Hugs?
965                     * or maybe someone hit ctrl-C
966                     */
967                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
968                          t->id, t, whatNext_strs[t->what_next]);
969                } else {
970                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
971                          t->id, t, whatNext_strs[t->what_next]);
972                }
973                );
974
975       threadPaused(t);
976
977       IF_DEBUG(sanity,
978                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
979                checkTSO(t));
980       ASSERT(t->link == END_TSO_QUEUE);
981 #if defined(GRAN)
982       ASSERT(!is_on_queue(t,CurrentProc));
983
984       IF_DEBUG(sanity,
985                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
986                checkThreadQsSanity(rtsTrue));
987 #endif
988       APPEND_TO_RUN_QUEUE(t);
989 #if defined(GRAN)
990       /* add a ContinueThread event to actually process the thread */
991       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
992                 ContinueThread,
993                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
994       IF_GRAN_DEBUG(bq, 
995                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
996                G_EVENTQ(0);
997                G_CURR_THREADQ(0))
998 #endif /* GRAN */
999       break;
1000       
1001     case ThreadBlocked:
1002 #if defined(GRAN)
1003       IF_DEBUG(scheduler,
1004                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1005                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1006                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1007
1008       // ??? needed; should emit block before
1009       IF_DEBUG(gran, 
1010                DumpGranEvent(GR_DESCHEDULE, t)); 
1011       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1012       /*
1013         ngoq Dogh!
1014       ASSERT(procStatus[CurrentProc]==Busy || 
1015               ((procStatus[CurrentProc]==Fetching) && 
1016               (t->block_info.closure!=(StgClosure*)NULL)));
1017       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1018           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1019             procStatus[CurrentProc]==Fetching)) 
1020         procStatus[CurrentProc] = Idle;
1021       */
1022 #elif defined(PAR)
1023       IF_DEBUG(par, 
1024                DumpGranEvent(GR_DESCHEDULE, t)); 
1025
1026       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1027       blockThread(t);
1028
1029       IF_DEBUG(scheduler,
1030                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1031                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1032                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1033
1034 #else /* !GRAN */
1035       /* don't need to do anything.  Either the thread is blocked on
1036        * I/O, in which case we'll have called addToBlockedQueue
1037        * previously, or it's blocked on an MVar or Blackhole, in which
1038        * case it'll be on the relevant queue already.
1039        */
1040       IF_DEBUG(scheduler,
1041                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1042                printThreadBlockage(t);
1043                fprintf(stderr, "\n"));
1044
1045       /* Only for dumping event to log file 
1046          ToDo: do I need this in GranSim, too?
1047       blockThread(t);
1048       */
1049 #endif
1050       threadPaused(t);
1051       break;
1052       
1053     case ThreadFinished:
1054       /* Need to check whether this was a main thread, and if so, signal
1055        * the task that started it with the return value.  If we have no
1056        * more main threads, we probably need to stop all the tasks until
1057        * we get a new one.
1058        */
1059       /* We also end up here if the thread kills itself with an
1060        * uncaught exception, see Exception.hc.
1061        */
1062       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1063 #if defined(GRAN)
1064       endThread(t, CurrentProc); // clean-up the thread
1065 #elif defined(PAR)
1066       advisory_thread_count--;
1067       if (RtsFlags.ParFlags.ParStats.Full) 
1068         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1069 #endif
1070       break;
1071       
1072     default:
1073       barf("schedule: invalid thread return code %d", (int)ret);
1074     }
1075     
1076 #ifdef SMP
1077     cap->link = free_capabilities;
1078     free_capabilities = cap;
1079     n_free_capabilities++;
1080 #endif
1081
1082 #ifdef SMP
1083     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1084 #else
1085     if (ready_to_gc) 
1086 #endif
1087       {
1088       /* everybody back, start the GC.
1089        * Could do it in this thread, or signal a condition var
1090        * to do it in another thread.  Either way, we need to
1091        * broadcast on gc_pending_cond afterward.
1092        */
1093 #ifdef SMP
1094       IF_DEBUG(scheduler,sched_belch("doing GC"));
1095 #endif
1096       GarbageCollect(GetRoots,rtsFalse);
1097       ready_to_gc = rtsFalse;
1098 #ifdef SMP
1099       pthread_cond_broadcast(&gc_pending_cond);
1100 #endif
1101 #if defined(GRAN)
1102       /* add a ContinueThread event to continue execution of current thread */
1103       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1104                 ContinueThread,
1105                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1106       IF_GRAN_DEBUG(bq, 
1107                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1108                G_EVENTQ(0);
1109                G_CURR_THREADQ(0))
1110 #endif /* GRAN */
1111     }
1112 #if defined(GRAN)
1113   next_thread:
1114     IF_GRAN_DEBUG(unused,
1115                   print_eventq(EventHd));
1116
1117     event = get_next_event();
1118
1119 #elif defined(PAR)
1120   next_thread:
1121     /* ToDo: wait for next message to arrive rather than busy wait */
1122
1123 #else /* GRAN */
1124   /* not any more
1125   next_thread:
1126     t = take_off_run_queue(END_TSO_QUEUE);
1127   */
1128 #endif /* GRAN */
1129   } /* end of while(1) */
1130 }
1131
1132 /* A hack for Hugs concurrency support.  Needs sanitisation (?) */
1133 void deleteAllThreads ( void )
1134 {
1135   StgTSO* t;
1136   IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
1137   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1138     deleteThread(t);
1139   }
1140   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1141     deleteThread(t);
1142   }
1143   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1144   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1145 }
1146
1147 /* startThread and  insertThread are now in GranSim.c -- HWL */
1148
1149 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1150 //@subsection Suspend and Resume
1151
1152 /* ---------------------------------------------------------------------------
1153  * Suspending & resuming Haskell threads.
1154  * 
1155  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1156  * its capability before calling the C function.  This allows another
1157  * task to pick up the capability and carry on running Haskell
1158  * threads.  It also means that if the C call blocks, it won't lock
1159  * the whole system.
1160  *
1161  * The Haskell thread making the C call is put to sleep for the
1162  * duration of the call, on the susepended_ccalling_threads queue.  We
1163  * give out a token to the task, which it can use to resume the thread
1164  * on return from the C function.
1165  * ------------------------------------------------------------------------- */
1166    
1167 StgInt
1168 suspendThread( Capability *cap )
1169 {
1170   nat tok;
1171
1172   ACQUIRE_LOCK(&sched_mutex);
1173
1174   IF_DEBUG(scheduler,
1175            sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
1176
1177   threadPaused(cap->rCurrentTSO);
1178   cap->rCurrentTSO->link = suspended_ccalling_threads;
1179   suspended_ccalling_threads = cap->rCurrentTSO;
1180
1181   /* Use the thread ID as the token; it should be unique */
1182   tok = cap->rCurrentTSO->id;
1183
1184 #ifdef SMP
1185   cap->link = free_capabilities;
1186   free_capabilities = cap;
1187   n_free_capabilities++;
1188 #endif
1189
1190   RELEASE_LOCK(&sched_mutex);
1191   return tok; 
1192 }
1193
1194 Capability *
1195 resumeThread( StgInt tok )
1196 {
1197   StgTSO *tso, **prev;
1198   Capability *cap;
1199
1200   ACQUIRE_LOCK(&sched_mutex);
1201
1202   prev = &suspended_ccalling_threads;
1203   for (tso = suspended_ccalling_threads; 
1204        tso != END_TSO_QUEUE; 
1205        prev = &tso->link, tso = tso->link) {
1206     if (tso->id == (StgThreadID)tok) {
1207       *prev = tso->link;
1208       break;
1209     }
1210   }
1211   if (tso == END_TSO_QUEUE) {
1212     barf("resumeThread: thread not found");
1213   }
1214
1215 #ifdef SMP
1216   while (free_capabilities == NULL) {
1217     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1218     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1219     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1220   }
1221   cap = free_capabilities;
1222   free_capabilities = cap->link;
1223   n_free_capabilities--;
1224 #else  
1225   cap = &MainRegTable;
1226 #endif
1227
1228   cap->rCurrentTSO = tso;
1229
1230   RELEASE_LOCK(&sched_mutex);
1231   return cap;
1232 }
1233
1234
1235 /* ---------------------------------------------------------------------------
1236  * Static functions
1237  * ------------------------------------------------------------------------ */
1238 static void unblockThread(StgTSO *tso);
1239
1240 /* ---------------------------------------------------------------------------
1241  * Comparing Thread ids.
1242  *
1243  * This is used from STG land in the implementation of the
1244  * instances of Eq/Ord for ThreadIds.
1245  * ------------------------------------------------------------------------ */
1246
1247 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1248
1249   StgThreadID id1 = tso1->id; 
1250   StgThreadID id2 = tso2->id;
1251  
1252   if (id1 < id2) return (-1);
1253   if (id1 > id2) return 1;
1254   return 0;
1255 }
1256
1257 /* ---------------------------------------------------------------------------
1258    Create a new thread.
1259
1260    The new thread starts with the given stack size.  Before the
1261    scheduler can run, however, this thread needs to have a closure
1262    (and possibly some arguments) pushed on its stack.  See
1263    pushClosure() in Schedule.h.
1264
1265    createGenThread() and createIOThread() (in SchedAPI.h) are
1266    convenient packaged versions of this function.
1267
1268    currently pri (priority) is only used in a GRAN setup -- HWL
1269    ------------------------------------------------------------------------ */
1270 //@cindex createThread
1271 #if defined(GRAN)
1272 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1273 StgTSO *
1274 createThread(nat stack_size, StgInt pri)
1275 {
1276   return createThread_(stack_size, rtsFalse, pri);
1277 }
1278
1279 static StgTSO *
1280 createThread_(nat size, rtsBool have_lock, StgInt pri)
1281 {
1282 #else
1283 StgTSO *
1284 createThread(nat stack_size)
1285 {
1286   return createThread_(stack_size, rtsFalse);
1287 }
1288
1289 static StgTSO *
1290 createThread_(nat size, rtsBool have_lock)
1291 {
1292 #endif
1293
1294     StgTSO *tso;
1295     nat stack_size;
1296
1297     /* First check whether we should create a thread at all */
1298 #if defined(PAR)
1299   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1300   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1301     threadsIgnored++;
1302     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1303           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1304     return END_TSO_QUEUE;
1305   }
1306   threadsCreated++;
1307 #endif
1308
1309 #if defined(GRAN)
1310   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1311 #endif
1312
1313   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1314
1315   /* catch ridiculously small stack sizes */
1316   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1317     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1318   }
1319
1320   stack_size = size - TSO_STRUCT_SIZEW;
1321
1322   tso = (StgTSO *)allocate(size);
1323   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1324
1325   SET_HDR(tso, &TSO_info, CCS_SYSTEM);
1326 #if defined(GRAN)
1327   SET_GRAN_HDR(tso, ThisPE);
1328 #endif
1329   tso->what_next     = ThreadEnterGHC;
1330
1331   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1332    * protect the increment operation on next_thread_id.
1333    * In future, we could use an atomic increment instead.
1334    */
1335   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1336   tso->id = next_thread_id++; 
1337   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1338
1339   tso->why_blocked  = NotBlocked;
1340   tso->blocked_exceptions = NULL;
1341
1342   tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1343   tso->stack_size   = stack_size;
1344   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1345                               - TSO_STRUCT_SIZEW;
1346   tso->sp           = (P_)&(tso->stack) + stack_size;
1347
1348 #ifdef PROFILING
1349   tso->prof.CCCS = CCS_MAIN;
1350 #endif
1351
1352   /* put a stop frame on the stack */
1353   tso->sp -= sizeofW(StgStopFrame);
1354   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1355   tso->su = (StgUpdateFrame*)tso->sp;
1356
1357   // ToDo: check this
1358 #if defined(GRAN)
1359   tso->link = END_TSO_QUEUE;
1360   /* uses more flexible routine in GranSim */
1361   insertThread(tso, CurrentProc);
1362 #else
1363   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1364    * from its creation
1365    */
1366 #endif
1367
1368 #if defined(GRAN) || defined(PAR)
1369   DumpGranEvent(GR_START,tso);
1370 #endif
1371
1372   /* Link the new thread on the global thread list.
1373    */
1374   tso->global_link = all_threads;
1375   all_threads = tso;
1376
1377 #if defined(GRAN)
1378   tso->gran.pri = pri;
1379 # if defined(DEBUG)
1380   tso->gran.magic = TSO_MAGIC; // debugging only
1381 # endif
1382   tso->gran.sparkname   = 0;
1383   tso->gran.startedat   = CURRENT_TIME; 
1384   tso->gran.exported    = 0;
1385   tso->gran.basicblocks = 0;
1386   tso->gran.allocs      = 0;
1387   tso->gran.exectime    = 0;
1388   tso->gran.fetchtime   = 0;
1389   tso->gran.fetchcount  = 0;
1390   tso->gran.blocktime   = 0;
1391   tso->gran.blockcount  = 0;
1392   tso->gran.blockedat   = 0;
1393   tso->gran.globalsparks = 0;
1394   tso->gran.localsparks  = 0;
1395   if (RtsFlags.GranFlags.Light)
1396     tso->gran.clock  = Now; /* local clock */
1397   else
1398     tso->gran.clock  = 0;
1399
1400   IF_DEBUG(gran,printTSO(tso));
1401 #elif defined(PAR)
1402 # if defined(DEBUG)
1403   tso->par.magic = TSO_MAGIC; // debugging only
1404 # endif
1405   tso->par.sparkname   = 0;
1406   tso->par.startedat   = CURRENT_TIME; 
1407   tso->par.exported    = 0;
1408   tso->par.basicblocks = 0;
1409   tso->par.allocs      = 0;
1410   tso->par.exectime    = 0;
1411   tso->par.fetchtime   = 0;
1412   tso->par.fetchcount  = 0;
1413   tso->par.blocktime   = 0;
1414   tso->par.blockcount  = 0;
1415   tso->par.blockedat   = 0;
1416   tso->par.globalsparks = 0;
1417   tso->par.localsparks  = 0;
1418 #endif
1419
1420 #if defined(GRAN)
1421   globalGranStats.tot_threads_created++;
1422   globalGranStats.threads_created_on_PE[CurrentProc]++;
1423   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1424   globalGranStats.tot_sq_probes++;
1425 #endif 
1426
1427 #if defined(GRAN)
1428   IF_GRAN_DEBUG(pri,
1429                 belch("==__ schedule: Created TSO %d (%p);",
1430                       CurrentProc, tso, tso->id));
1431 #elif defined(PAR)
1432     IF_PAR_DEBUG(verbose,
1433                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1434                        tso->id, tso, advisory_thread_count));
1435 #else
1436   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1437                                  tso->id, tso->stack_size));
1438 #endif    
1439   return tso;
1440 }
1441
1442 /*
1443   Turn a spark into a thread.
1444   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1445 */
1446 #if defined(PAR)
1447 //@cindex activateSpark
1448 StgTSO *
1449 activateSpark (rtsSpark spark) 
1450 {
1451   StgTSO *tso;
1452   
1453   ASSERT(spark != (rtsSpark)NULL);
1454   tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1455   if (tso!=END_TSO_QUEUE) {
1456     pushClosure(tso,spark);
1457     PUSH_ON_RUN_QUEUE(tso);
1458     advisory_thread_count++;
1459
1460     if (RtsFlags.ParFlags.ParStats.Full) {
1461       //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1462       IF_PAR_DEBUG(verbose,
1463                    belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1464                          (StgClosure *)spark, info_type((StgClosure *)spark)));
1465     }
1466   } else {
1467     barf("activateSpark: Cannot create TSO");
1468   }
1469   // ToDo: fwd info on local/global spark to thread -- HWL
1470   // tso->gran.exported =  spark->exported;
1471   // tso->gran.locked =   !spark->global;
1472   // tso->gran.sparkname = spark->name;
1473
1474   return tso;
1475 }
1476 #endif
1477
1478 /* ---------------------------------------------------------------------------
1479  * scheduleThread()
1480  *
1481  * scheduleThread puts a thread on the head of the runnable queue.
1482  * This will usually be done immediately after a thread is created.
1483  * The caller of scheduleThread must create the thread using e.g.
1484  * createThread and push an appropriate closure
1485  * on this thread's stack before the scheduler is invoked.
1486  * ------------------------------------------------------------------------ */
1487
1488 void
1489 scheduleThread(StgTSO *tso)
1490 {
1491   if (tso==END_TSO_QUEUE){    
1492     schedule();
1493     return;
1494   }
1495
1496   ACQUIRE_LOCK(&sched_mutex);
1497
1498   /* Put the new thread on the head of the runnable queue.  The caller
1499    * better push an appropriate closure on this thread's stack
1500    * beforehand.  In the SMP case, the thread may start running as
1501    * soon as we release the scheduler lock below.
1502    */
1503   PUSH_ON_RUN_QUEUE(tso);
1504   THREAD_RUNNABLE();
1505
1506 #if 0
1507   IF_DEBUG(scheduler,printTSO(tso));
1508 #endif
1509   RELEASE_LOCK(&sched_mutex);
1510 }
1511
1512 /* ---------------------------------------------------------------------------
1513  * startTasks()
1514  *
1515  * Start up Posix threads to run each of the scheduler tasks.
1516  * I believe the task ids are not needed in the system as defined.
1517  *  KH @ 25/10/99
1518  * ------------------------------------------------------------------------ */
1519
1520 #if defined(PAR) || defined(SMP)
1521 void *
1522 taskStart( void *arg STG_UNUSED )
1523 {
1524   rts_evalNothing(NULL);
1525 }
1526 #endif
1527
1528 /* ---------------------------------------------------------------------------
1529  * initScheduler()
1530  *
1531  * Initialise the scheduler.  This resets all the queues - if the
1532  * queues contained any threads, they'll be garbage collected at the
1533  * next pass.
1534  *
1535  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1536  * ------------------------------------------------------------------------ */
1537
1538 #ifdef SMP
1539 static void
1540 term_handler(int sig STG_UNUSED)
1541 {
1542   stat_workerStop();
1543   ACQUIRE_LOCK(&term_mutex);
1544   await_death--;
1545   RELEASE_LOCK(&term_mutex);
1546   pthread_exit(NULL);
1547 }
1548 #endif
1549
1550 //@cindex initScheduler
1551 void 
1552 initScheduler(void)
1553 {
1554 #if defined(GRAN)
1555   nat i;
1556
1557   for (i=0; i<=MAX_PROC; i++) {
1558     run_queue_hds[i]      = END_TSO_QUEUE;
1559     run_queue_tls[i]      = END_TSO_QUEUE;
1560     blocked_queue_hds[i]  = END_TSO_QUEUE;
1561     blocked_queue_tls[i]  = END_TSO_QUEUE;
1562     ccalling_threadss[i]  = END_TSO_QUEUE;
1563   }
1564 #else
1565   run_queue_hd      = END_TSO_QUEUE;
1566   run_queue_tl      = END_TSO_QUEUE;
1567   blocked_queue_hd  = END_TSO_QUEUE;
1568   blocked_queue_tl  = END_TSO_QUEUE;
1569 #endif 
1570
1571   suspended_ccalling_threads  = END_TSO_QUEUE;
1572
1573   main_threads = NULL;
1574   all_threads  = END_TSO_QUEUE;
1575
1576   context_switch = 0;
1577   interrupted    = 0;
1578
1579 #ifdef INTERPRETER
1580   ecafList = END_ECAF_LIST;
1581   clearECafTable();
1582 #endif
1583
1584   /* Install the SIGHUP handler */
1585 #ifdef SMP
1586   {
1587     struct sigaction action,oact;
1588
1589     action.sa_handler = term_handler;
1590     sigemptyset(&action.sa_mask);
1591     action.sa_flags = 0;
1592     if (sigaction(SIGTERM, &action, &oact) != 0) {
1593       barf("can't install TERM handler");
1594     }
1595   }
1596 #endif
1597
1598 #ifdef SMP
1599   /* Allocate N Capabilities */
1600   {
1601     nat i;
1602     Capability *cap, *prev;
1603     cap  = NULL;
1604     prev = NULL;
1605     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1606       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1607       cap->link = prev;
1608       prev = cap;
1609     }
1610     free_capabilities = cap;
1611     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1612   }
1613   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1614                              n_free_capabilities););
1615 #endif
1616
1617 #if defined(SMP) || defined(PAR)
1618   initSparkPools();
1619 #endif
1620 }
1621
1622 #ifdef SMP
1623 void
1624 startTasks( void )
1625 {
1626   nat i;
1627   int r;
1628   pthread_t tid;
1629   
1630   /* make some space for saving all the thread ids */
1631   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1632                             "initScheduler:task_ids");
1633   
1634   /* and create all the threads */
1635   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1636     r = pthread_create(&tid,NULL,taskStart,NULL);
1637     if (r != 0) {
1638       barf("startTasks: Can't create new Posix thread");
1639     }
1640     task_ids[i].id = tid;
1641     task_ids[i].mut_time = 0.0;
1642     task_ids[i].mut_etime = 0.0;
1643     task_ids[i].gc_time = 0.0;
1644     task_ids[i].gc_etime = 0.0;
1645     task_ids[i].elapsedtimestart = elapsedtime();
1646     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1647   }
1648 }
1649 #endif
1650
1651 void
1652 exitScheduler( void )
1653 {
1654 #ifdef SMP
1655   nat i;
1656
1657   /* Don't want to use pthread_cancel, since we'd have to install
1658    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1659    * all our locks.
1660    */
1661 #if 0
1662   /* Cancel all our tasks */
1663   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1664     pthread_cancel(task_ids[i].id);
1665   }
1666   
1667   /* Wait for all the tasks to terminate */
1668   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1669     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1670                                task_ids[i].id));
1671     pthread_join(task_ids[i].id, NULL);
1672   }
1673 #endif
1674
1675   /* Send 'em all a SIGHUP.  That should shut 'em up.
1676    */
1677   await_death = RtsFlags.ParFlags.nNodes;
1678   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1679     pthread_kill(task_ids[i].id,SIGTERM);
1680   }
1681   while (await_death > 0) {
1682     sched_yield();
1683   }
1684 #endif
1685 }
1686
1687 /* -----------------------------------------------------------------------------
1688    Managing the per-task allocation areas.
1689    
1690    Each capability comes with an allocation area.  These are
1691    fixed-length block lists into which allocation can be done.
1692
1693    ToDo: no support for two-space collection at the moment???
1694    -------------------------------------------------------------------------- */
1695
1696 /* -----------------------------------------------------------------------------
1697  * waitThread is the external interface for running a new computation
1698  * and waiting for the result.
1699  *
1700  * In the non-SMP case, we create a new main thread, push it on the 
1701  * main-thread stack, and invoke the scheduler to run it.  The
1702  * scheduler will return when the top main thread on the stack has
1703  * completed or died, and fill in the necessary fields of the
1704  * main_thread structure.
1705  *
1706  * In the SMP case, we create a main thread as before, but we then
1707  * create a new condition variable and sleep on it.  When our new
1708  * main thread has completed, we'll be woken up and the status/result
1709  * will be in the main_thread struct.
1710  * -------------------------------------------------------------------------- */
1711
1712 int 
1713 howManyThreadsAvail ( void )
1714 {
1715    int i = 0;
1716    StgTSO* q;
1717    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
1718       i++;
1719    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
1720       i++;
1721    return i;
1722 }
1723
1724 void
1725 finishAllThreads ( void )
1726 {
1727    do {
1728       while (run_queue_hd != END_TSO_QUEUE) {
1729          waitThread ( run_queue_hd, NULL );
1730       }
1731       while (blocked_queue_hd != END_TSO_QUEUE) {
1732          waitThread ( blocked_queue_hd, NULL );
1733       }
1734    } while 
1735       (blocked_queue_hd != END_TSO_QUEUE || 
1736         run_queue_hd != END_TSO_QUEUE);
1737 }
1738
1739 SchedulerStatus
1740 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1741 {
1742   StgMainThread *m;
1743   SchedulerStatus stat;
1744
1745   ACQUIRE_LOCK(&sched_mutex);
1746   
1747   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1748
1749   m->tso = tso;
1750   m->ret = ret;
1751   m->stat = NoStatus;
1752 #ifdef SMP
1753   pthread_cond_init(&m->wakeup, NULL);
1754 #endif
1755
1756   m->link = main_threads;
1757   main_threads = m;
1758
1759   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1760                               m->tso->id));
1761
1762 #ifdef SMP
1763   do {
1764     pthread_cond_wait(&m->wakeup, &sched_mutex);
1765   } while (m->stat == NoStatus);
1766 #elif defined(GRAN)
1767   /* GranSim specific init */
1768   CurrentTSO = m->tso;                // the TSO to run
1769   procStatus[MainProc] = Busy;        // status of main PE
1770   CurrentProc = MainProc;             // PE to run it on
1771
1772   schedule();
1773 #else
1774   schedule();
1775   ASSERT(m->stat != NoStatus);
1776 #endif
1777
1778   stat = m->stat;
1779
1780 #ifdef SMP
1781   pthread_cond_destroy(&m->wakeup);
1782 #endif
1783
1784   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1785                               m->tso->id));
1786   free(m);
1787
1788   RELEASE_LOCK(&sched_mutex);
1789
1790   return stat;
1791 }
1792
1793 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1794 //@subsection Run queue code 
1795
1796 #if 0
1797 /* 
1798    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1799        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1800        implicit global variable that has to be correct when calling these
1801        fcts -- HWL 
1802 */
1803
1804 /* Put the new thread on the head of the runnable queue.
1805  * The caller of createThread better push an appropriate closure
1806  * on this thread's stack before the scheduler is invoked.
1807  */
1808 static /* inline */ void
1809 add_to_run_queue(tso)
1810 StgTSO* tso; 
1811 {
1812   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1813   tso->link = run_queue_hd;
1814   run_queue_hd = tso;
1815   if (run_queue_tl == END_TSO_QUEUE) {
1816     run_queue_tl = tso;
1817   }
1818 }
1819
1820 /* Put the new thread at the end of the runnable queue. */
1821 static /* inline */ void
1822 push_on_run_queue(tso)
1823 StgTSO* tso; 
1824 {
1825   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1826   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1827   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1828   if (run_queue_hd == END_TSO_QUEUE) {
1829     run_queue_hd = tso;
1830   } else {
1831     run_queue_tl->link = tso;
1832   }
1833   run_queue_tl = tso;
1834 }
1835
1836 /* 
1837    Should be inlined because it's used very often in schedule.  The tso
1838    argument is actually only needed in GranSim, where we want to have the
1839    possibility to schedule *any* TSO on the run queue, irrespective of the
1840    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1841    the run queue and dequeue the tso, adjusting the links in the queue. 
1842 */
1843 //@cindex take_off_run_queue
1844 static /* inline */ StgTSO*
1845 take_off_run_queue(StgTSO *tso) {
1846   StgTSO *t, *prev;
1847
1848   /* 
1849      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1850
1851      if tso is specified, unlink that tso from the run_queue (doesn't have
1852      to be at the beginning of the queue); GranSim only 
1853   */
1854   if (tso!=END_TSO_QUEUE) {
1855     /* find tso in queue */
1856     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1857          t!=END_TSO_QUEUE && t!=tso;
1858          prev=t, t=t->link) 
1859       /* nothing */ ;
1860     ASSERT(t==tso);
1861     /* now actually dequeue the tso */
1862     if (prev!=END_TSO_QUEUE) {
1863       ASSERT(run_queue_hd!=t);
1864       prev->link = t->link;
1865     } else {
1866       /* t is at beginning of thread queue */
1867       ASSERT(run_queue_hd==t);
1868       run_queue_hd = t->link;
1869     }
1870     /* t is at end of thread queue */
1871     if (t->link==END_TSO_QUEUE) {
1872       ASSERT(t==run_queue_tl);
1873       run_queue_tl = prev;
1874     } else {
1875       ASSERT(run_queue_tl!=t);
1876     }
1877     t->link = END_TSO_QUEUE;
1878   } else {
1879     /* take tso from the beginning of the queue; std concurrent code */
1880     t = run_queue_hd;
1881     if (t != END_TSO_QUEUE) {
1882       run_queue_hd = t->link;
1883       t->link = END_TSO_QUEUE;
1884       if (run_queue_hd == END_TSO_QUEUE) {
1885         run_queue_tl = END_TSO_QUEUE;
1886       }
1887     }
1888   }
1889   return t;
1890 }
1891
1892 #endif /* 0 */
1893
1894 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1895 //@subsection Garbage Collextion Routines
1896
1897 /* ---------------------------------------------------------------------------
1898    Where are the roots that we know about?
1899
1900         - all the threads on the runnable queue
1901         - all the threads on the blocked queue
1902         - all the thread currently executing a _ccall_GC
1903         - all the "main threads"
1904      
1905    ------------------------------------------------------------------------ */
1906
1907 /* This has to be protected either by the scheduler monitor, or by the
1908         garbage collection monitor (probably the latter).
1909         KH @ 25/10/99
1910 */
1911
1912 static void GetRoots(void)
1913 {
1914   StgMainThread *m;
1915
1916 #if defined(GRAN)
1917   {
1918     nat i;
1919     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1920       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1921         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1922       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1923         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1924       
1925       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1926         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1927       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1928         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1929       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1930         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1931     }
1932   }
1933
1934   markEventQueue();
1935
1936 #else /* !GRAN */
1937   if (run_queue_hd != END_TSO_QUEUE) {
1938     ASSERT(run_queue_tl != END_TSO_QUEUE);
1939     run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1940     run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1941   }
1942
1943   if (blocked_queue_hd != END_TSO_QUEUE) {
1944     ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1945     blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1946     blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1947   }
1948 #endif 
1949
1950   for (m = main_threads; m != NULL; m = m->link) {
1951     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1952   }
1953   if (suspended_ccalling_threads != END_TSO_QUEUE)
1954     suspended_ccalling_threads = 
1955       (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1956
1957 #if defined(SMP) || defined(PAR) || defined(GRAN)
1958   markSparkQueue();
1959 #endif
1960 }
1961
1962 /* -----------------------------------------------------------------------------
1963    performGC
1964
1965    This is the interface to the garbage collector from Haskell land.
1966    We provide this so that external C code can allocate and garbage
1967    collect when called from Haskell via _ccall_GC.
1968
1969    It might be useful to provide an interface whereby the programmer
1970    can specify more roots (ToDo).
1971    
1972    This needs to be protected by the GC condition variable above.  KH.
1973    -------------------------------------------------------------------------- */
1974
1975 void (*extra_roots)(void);
1976
1977 void
1978 performGC(void)
1979 {
1980   GarbageCollect(GetRoots,rtsFalse);
1981 }
1982
1983 void
1984 performMajorGC(void)
1985 {
1986   GarbageCollect(GetRoots,rtsTrue);
1987 }
1988
1989 static void
1990 AllRoots(void)
1991 {
1992   GetRoots();                   /* the scheduler's roots */
1993   extra_roots();                /* the user's roots */
1994 }
1995
1996 void
1997 performGCWithRoots(void (*get_roots)(void))
1998 {
1999   extra_roots = get_roots;
2000
2001   GarbageCollect(AllRoots,rtsFalse);
2002 }
2003
2004 /* -----------------------------------------------------------------------------
2005    Stack overflow
2006
2007    If the thread has reached its maximum stack size, then raise the
2008    StackOverflow exception in the offending thread.  Otherwise
2009    relocate the TSO into a larger chunk of memory and adjust its stack
2010    size appropriately.
2011    -------------------------------------------------------------------------- */
2012
2013 static StgTSO *
2014 threadStackOverflow(StgTSO *tso)
2015 {
2016   nat new_stack_size, new_tso_size, diff, stack_words;
2017   StgPtr new_sp;
2018   StgTSO *dest;
2019
2020   IF_DEBUG(sanity,checkTSO(tso));
2021   if (tso->stack_size >= tso->max_stack_size) {
2022
2023     IF_DEBUG(gc,
2024              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2025                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2026              /* If we're debugging, just print out the top of the stack */
2027              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2028                                               tso->sp+64)));
2029
2030 #ifdef INTERPRETER
2031     fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
2032     exit(1);
2033 #else
2034     /* Send this thread the StackOverflow exception */
2035     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2036 #endif
2037     return tso;
2038   }
2039
2040   /* Try to double the current stack size.  If that takes us over the
2041    * maximum stack size for this thread, then use the maximum instead.
2042    * Finally round up so the TSO ends up as a whole number of blocks.
2043    */
2044   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2045   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2046                                        TSO_STRUCT_SIZE)/sizeof(W_);
2047   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2048   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2049
2050   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2051
2052   dest = (StgTSO *)allocate(new_tso_size);
2053   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2054
2055   /* copy the TSO block and the old stack into the new area */
2056   memcpy(dest,tso,TSO_STRUCT_SIZE);
2057   stack_words = tso->stack + tso->stack_size - tso->sp;
2058   new_sp = (P_)dest + new_tso_size - stack_words;
2059   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2060
2061   /* relocate the stack pointers... */
2062   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2063   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2064   dest->sp    = new_sp;
2065   dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2066   dest->stack_size = new_stack_size;
2067         
2068   /* and relocate the update frame list */
2069   relocate_TSO(tso, dest);
2070
2071   /* Mark the old TSO as relocated.  We have to check for relocated
2072    * TSOs in the garbage collector and any primops that deal with TSOs.
2073    *
2074    * It's important to set the sp and su values to just beyond the end
2075    * of the stack, so we don't attempt to scavenge any part of the
2076    * dead TSO's stack.
2077    */
2078   tso->what_next = ThreadRelocated;
2079   tso->link = dest;
2080   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2081   tso->su = (StgUpdateFrame *)tso->sp;
2082   tso->why_blocked = NotBlocked;
2083   dest->mut_link = NULL;
2084
2085   IF_PAR_DEBUG(verbose,
2086                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2087                      tso->id, tso, tso->stack_size);
2088                /* If we're debugging, just print out the top of the stack */
2089                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2090                                                 tso->sp+64)));
2091   
2092   IF_DEBUG(sanity,checkTSO(tso));
2093 #if 0
2094   IF_DEBUG(scheduler,printTSO(dest));
2095 #endif
2096
2097   return dest;
2098 }
2099
2100 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2101 //@subsection Blocking Queue Routines
2102
2103 /* ---------------------------------------------------------------------------
2104    Wake up a queue that was blocked on some resource.
2105    ------------------------------------------------------------------------ */
2106
2107 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
2108
2109 #if defined(GRAN)
2110 static inline void
2111 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2112 {
2113 }
2114 #elif defined(PAR)
2115 static inline void
2116 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2117 {
2118   /* write RESUME events to log file and
2119      update blocked and fetch time (depending on type of the orig closure) */
2120   if (RtsFlags.ParFlags.ParStats.Full) {
2121     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2122                      GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2123                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2124
2125     switch (get_itbl(node)->type) {
2126         case FETCH_ME_BQ:
2127           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2128           break;
2129         case RBH:
2130         case FETCH_ME:
2131         case BLACKHOLE_BQ:
2132           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2133           break;
2134         default:
2135           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2136         }
2137       }
2138 }
2139 #endif
2140
2141 #if defined(GRAN)
2142 static StgBlockingQueueElement *
2143 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2144 {
2145     StgTSO *tso;
2146     PEs node_loc, tso_loc;
2147
2148     node_loc = where_is(node); // should be lifted out of loop
2149     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2150     tso_loc = where_is((StgClosure *)tso);
2151     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2152       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2153       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2154       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2155       // insertThread(tso, node_loc);
2156       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2157                 ResumeThread,
2158                 tso, node, (rtsSpark*)NULL);
2159       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2160       // len_local++;
2161       // len++;
2162     } else { // TSO is remote (actually should be FMBQ)
2163       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2164                                   RtsFlags.GranFlags.Costs.gunblocktime +
2165                                   RtsFlags.GranFlags.Costs.latency;
2166       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2167                 UnblockThread,
2168                 tso, node, (rtsSpark*)NULL);
2169       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2170       // len++;
2171     }
2172     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2173     IF_GRAN_DEBUG(bq,
2174                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2175                           (node_loc==tso_loc ? "Local" : "Global"), 
2176                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2177     tso->block_info.closure = NULL;
2178     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2179                              tso->id, tso));
2180 }
2181 #elif defined(PAR)
2182 static StgBlockingQueueElement *
2183 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2184 {
2185     StgBlockingQueueElement *next;
2186
2187     switch (get_itbl(bqe)->type) {
2188     case TSO:
2189       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2190       /* if it's a TSO just push it onto the run_queue */
2191       next = bqe->link;
2192       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2193       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2194       THREAD_RUNNABLE();
2195       unblockCount(bqe, node);
2196       /* reset blocking status after dumping event */
2197       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2198       break;
2199
2200     case BLOCKED_FETCH:
2201       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2202       next = bqe->link;
2203       bqe->link = PendingFetches;
2204       PendingFetches = bqe;
2205       break;
2206
2207 # if defined(DEBUG)
2208       /* can ignore this case in a non-debugging setup; 
2209          see comments on RBHSave closures above */
2210     case CONSTR:
2211       /* check that the closure is an RBHSave closure */
2212       ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2213              get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2214              get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2215       break;
2216
2217     default:
2218       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2219            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2220            (StgClosure *)bqe);
2221 # endif
2222     }
2223   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2224   return next;
2225 }
2226
2227 #else /* !GRAN && !PAR */
2228 static StgTSO *
2229 unblockOneLocked(StgTSO *tso)
2230 {
2231   StgTSO *next;
2232
2233   ASSERT(get_itbl(tso)->type == TSO);
2234   ASSERT(tso->why_blocked != NotBlocked);
2235   tso->why_blocked = NotBlocked;
2236   next = tso->link;
2237   PUSH_ON_RUN_QUEUE(tso);
2238   THREAD_RUNNABLE();
2239   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2240   return next;
2241 }
2242 #endif
2243
2244 #if defined(GRAN) || defined(PAR)
2245 inline StgBlockingQueueElement *
2246 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2247 {
2248   ACQUIRE_LOCK(&sched_mutex);
2249   bqe = unblockOneLocked(bqe, node);
2250   RELEASE_LOCK(&sched_mutex);
2251   return bqe;
2252 }
2253 #else
2254 inline StgTSO *
2255 unblockOne(StgTSO *tso)
2256 {
2257   ACQUIRE_LOCK(&sched_mutex);
2258   tso = unblockOneLocked(tso);
2259   RELEASE_LOCK(&sched_mutex);
2260   return tso;
2261 }
2262 #endif
2263
2264 #if defined(GRAN)
2265 void 
2266 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2267 {
2268   StgBlockingQueueElement *bqe;
2269   PEs node_loc;
2270   nat len = 0; 
2271
2272   IF_GRAN_DEBUG(bq, 
2273                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2274                       node, CurrentProc, CurrentTime[CurrentProc], 
2275                       CurrentTSO->id, CurrentTSO));
2276
2277   node_loc = where_is(node);
2278
2279   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2280          get_itbl(q)->type == CONSTR); // closure (type constructor)
2281   ASSERT(is_unique(node));
2282
2283   /* FAKE FETCH: magically copy the node to the tso's proc;
2284      no Fetch necessary because in reality the node should not have been 
2285      moved to the other PE in the first place
2286   */
2287   if (CurrentProc!=node_loc) {
2288     IF_GRAN_DEBUG(bq, 
2289                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2290                         node, node_loc, CurrentProc, CurrentTSO->id, 
2291                         // CurrentTSO, where_is(CurrentTSO),
2292                         node->header.gran.procs));
2293     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2294     IF_GRAN_DEBUG(bq, 
2295                   belch("## new bitmask of node %p is %#x",
2296                         node, node->header.gran.procs));
2297     if (RtsFlags.GranFlags.GranSimStats.Global) {
2298       globalGranStats.tot_fake_fetches++;
2299     }
2300   }
2301
2302   bqe = q;
2303   // ToDo: check: ASSERT(CurrentProc==node_loc);
2304   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2305     //next = bqe->link;
2306     /* 
2307        bqe points to the current element in the queue
2308        next points to the next element in the queue
2309     */
2310     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2311     //tso_loc = where_is(tso);
2312     len++;
2313     bqe = unblockOneLocked(bqe, node);
2314   }
2315
2316   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2317      the closure to make room for the anchor of the BQ */
2318   if (bqe!=END_BQ_QUEUE) {
2319     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2320     /*
2321     ASSERT((info_ptr==&RBH_Save_0_info) ||
2322            (info_ptr==&RBH_Save_1_info) ||
2323            (info_ptr==&RBH_Save_2_info));
2324     */
2325     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2326     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2327     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2328
2329     IF_GRAN_DEBUG(bq,
2330                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2331                         node, info_type(node)));
2332   }
2333
2334   /* statistics gathering */
2335   if (RtsFlags.GranFlags.GranSimStats.Global) {
2336     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2337     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2338     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2339     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2340   }
2341   IF_GRAN_DEBUG(bq,
2342                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2343                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2344 }
2345 #elif defined(PAR)
2346 void 
2347 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2348 {
2349   StgBlockingQueueElement *bqe, *next;
2350
2351   ACQUIRE_LOCK(&sched_mutex);
2352
2353   IF_PAR_DEBUG(verbose, 
2354                belch("## AwBQ for node %p on [%x]: ",
2355                      node, mytid));
2356
2357   ASSERT(get_itbl(q)->type == TSO ||           
2358          get_itbl(q)->type == BLOCKED_FETCH || 
2359          get_itbl(q)->type == CONSTR); 
2360
2361   bqe = q;
2362   while (get_itbl(bqe)->type==TSO || 
2363          get_itbl(bqe)->type==BLOCKED_FETCH) {
2364     bqe = unblockOneLocked(bqe, node);
2365   }
2366   RELEASE_LOCK(&sched_mutex);
2367 }
2368
2369 #else   /* !GRAN && !PAR */
2370 void
2371 awakenBlockedQueue(StgTSO *tso)
2372 {
2373   ACQUIRE_LOCK(&sched_mutex);
2374   while (tso != END_TSO_QUEUE) {
2375     tso = unblockOneLocked(tso);
2376   }
2377   RELEASE_LOCK(&sched_mutex);
2378 }
2379 #endif
2380
2381 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2382 //@subsection Exception Handling Routines
2383
2384 /* ---------------------------------------------------------------------------
2385    Interrupt execution
2386    - usually called inside a signal handler so it mustn't do anything fancy.   
2387    ------------------------------------------------------------------------ */
2388
2389 void
2390 interruptStgRts(void)
2391 {
2392     interrupted    = 1;
2393     context_switch = 1;
2394 }
2395
2396 /* -----------------------------------------------------------------------------
2397    Unblock a thread
2398
2399    This is for use when we raise an exception in another thread, which
2400    may be blocked.
2401    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2402    -------------------------------------------------------------------------- */
2403
2404 #if defined(GRAN) || defined(PAR)
2405 /*
2406   NB: only the type of the blocking queue is different in GranSim and GUM
2407       the operations on the queue-elements are the same
2408       long live polymorphism!
2409 */
2410 static void
2411 unblockThread(StgTSO *tso)
2412 {
2413   StgBlockingQueueElement *t, **last;
2414
2415   ACQUIRE_LOCK(&sched_mutex);
2416   switch (tso->why_blocked) {
2417
2418   case NotBlocked:
2419     return;  /* not blocked */
2420
2421   case BlockedOnMVar:
2422     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2423     {
2424       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2425       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2426
2427       last = (StgBlockingQueueElement **)&mvar->head;
2428       for (t = (StgBlockingQueueElement *)mvar->head; 
2429            t != END_BQ_QUEUE; 
2430            last = &t->link, last_tso = t, t = t->link) {
2431         if (t == (StgBlockingQueueElement *)tso) {
2432           *last = (StgBlockingQueueElement *)tso->link;
2433           if (mvar->tail == tso) {
2434             mvar->tail = (StgTSO *)last_tso;
2435           }
2436           goto done;
2437         }
2438       }
2439       barf("unblockThread (MVAR): TSO not found");
2440     }
2441
2442   case BlockedOnBlackHole:
2443     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2444     {
2445       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2446
2447       last = &bq->blocking_queue;
2448       for (t = bq->blocking_queue; 
2449            t != END_BQ_QUEUE; 
2450            last = &t->link, t = t->link) {
2451         if (t == (StgBlockingQueueElement *)tso) {
2452           *last = (StgBlockingQueueElement *)tso->link;
2453           goto done;
2454         }
2455       }
2456       barf("unblockThread (BLACKHOLE): TSO not found");
2457     }
2458
2459   case BlockedOnException:
2460     {
2461       StgTSO *target  = tso->block_info.tso;
2462
2463       ASSERT(get_itbl(target)->type == TSO);
2464       ASSERT(target->blocked_exceptions != NULL);
2465
2466       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2467       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2468            t != END_BQ_QUEUE; 
2469            last = &t->link, t = t->link) {
2470         ASSERT(get_itbl(t)->type == TSO);
2471         if (t == (StgBlockingQueueElement *)tso) {
2472           *last = (StgBlockingQueueElement *)tso->link;
2473           goto done;
2474         }
2475       }
2476       barf("unblockThread (Exception): TSO not found");
2477     }
2478
2479   case BlockedOnDelay:
2480   case BlockedOnRead:
2481   case BlockedOnWrite:
2482     {
2483       StgBlockingQueueElement *prev = NULL;
2484       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2485            prev = t, t = t->link) {
2486         if (t == (StgBlockingQueueElement *)tso) {
2487           if (prev == NULL) {
2488             blocked_queue_hd = (StgTSO *)t->link;
2489             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2490               blocked_queue_tl = END_TSO_QUEUE;
2491             }
2492           } else {
2493             prev->link = t->link;
2494             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2495               blocked_queue_tl = (StgTSO *)prev;
2496             }
2497           }
2498           goto done;
2499         }
2500       }
2501       barf("unblockThread (I/O): TSO not found");
2502     }
2503
2504   default:
2505     barf("unblockThread");
2506   }
2507
2508  done:
2509   tso->link = END_TSO_QUEUE;
2510   tso->why_blocked = NotBlocked;
2511   tso->block_info.closure = NULL;
2512   PUSH_ON_RUN_QUEUE(tso);
2513   RELEASE_LOCK(&sched_mutex);
2514 }
2515 #else
2516 static void
2517 unblockThread(StgTSO *tso)
2518 {
2519   StgTSO *t, **last;
2520
2521   ACQUIRE_LOCK(&sched_mutex);
2522   switch (tso->why_blocked) {
2523
2524   case NotBlocked:
2525     return;  /* not blocked */
2526
2527   case BlockedOnMVar:
2528     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2529     {
2530       StgTSO *last_tso = END_TSO_QUEUE;
2531       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2532
2533       last = &mvar->head;
2534       for (t = mvar->head; t != END_TSO_QUEUE; 
2535            last = &t->link, last_tso = t, t = t->link) {
2536         if (t == tso) {
2537           *last = tso->link;
2538           if (mvar->tail == tso) {
2539             mvar->tail = last_tso;
2540           }
2541           goto done;
2542         }
2543       }
2544       barf("unblockThread (MVAR): TSO not found");
2545     }
2546
2547   case BlockedOnBlackHole:
2548     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2549     {
2550       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2551
2552       last = &bq->blocking_queue;
2553       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2554            last = &t->link, t = t->link) {
2555         if (t == tso) {
2556           *last = tso->link;
2557           goto done;
2558         }
2559       }
2560       barf("unblockThread (BLACKHOLE): TSO not found");
2561     }
2562
2563   case BlockedOnException:
2564     {
2565       StgTSO *target  = tso->block_info.tso;
2566
2567       ASSERT(get_itbl(target)->type == TSO);
2568       ASSERT(target->blocked_exceptions != NULL);
2569
2570       last = &target->blocked_exceptions;
2571       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2572            last = &t->link, t = t->link) {
2573         ASSERT(get_itbl(t)->type == TSO);
2574         if (t == tso) {
2575           *last = tso->link;
2576           goto done;
2577         }
2578       }
2579       barf("unblockThread (Exception): TSO not found");
2580     }
2581
2582   case BlockedOnDelay:
2583   case BlockedOnRead:
2584   case BlockedOnWrite:
2585     {
2586       StgTSO *prev = NULL;
2587       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2588            prev = t, t = t->link) {
2589         if (t == tso) {
2590           if (prev == NULL) {
2591             blocked_queue_hd = t->link;
2592             if (blocked_queue_tl == t) {
2593               blocked_queue_tl = END_TSO_QUEUE;
2594             }
2595           } else {
2596             prev->link = t->link;
2597             if (blocked_queue_tl == t) {
2598               blocked_queue_tl = prev;
2599             }
2600           }
2601           goto done;
2602         }
2603       }
2604       barf("unblockThread (I/O): TSO not found");
2605     }
2606
2607   default:
2608     barf("unblockThread");
2609   }
2610
2611  done:
2612   tso->link = END_TSO_QUEUE;
2613   tso->why_blocked = NotBlocked;
2614   tso->block_info.closure = NULL;
2615   PUSH_ON_RUN_QUEUE(tso);
2616   RELEASE_LOCK(&sched_mutex);
2617 }
2618 #endif
2619
2620 /* -----------------------------------------------------------------------------
2621  * raiseAsync()
2622  *
2623  * The following function implements the magic for raising an
2624  * asynchronous exception in an existing thread.
2625  *
2626  * We first remove the thread from any queue on which it might be
2627  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2628  *
2629  * We strip the stack down to the innermost CATCH_FRAME, building
2630  * thunks in the heap for all the active computations, so they can 
2631  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2632  * an application of the handler to the exception, and push it on
2633  * the top of the stack.
2634  * 
2635  * How exactly do we save all the active computations?  We create an
2636  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2637  * AP_UPDs pushes everything from the corresponding update frame
2638  * upwards onto the stack.  (Actually, it pushes everything up to the
2639  * next update frame plus a pointer to the next AP_UPD object.
2640  * Entering the next AP_UPD object pushes more onto the stack until we
2641  * reach the last AP_UPD object - at which point the stack should look
2642  * exactly as it did when we killed the TSO and we can continue
2643  * execution by entering the closure on top of the stack.
2644  *
2645  * We can also kill a thread entirely - this happens if either (a) the 
2646  * exception passed to raiseAsync is NULL, or (b) there's no
2647  * CATCH_FRAME on the stack.  In either case, we strip the entire
2648  * stack and replace the thread with a zombie.
2649  *
2650  * -------------------------------------------------------------------------- */
2651  
2652 void 
2653 deleteThread(StgTSO *tso)
2654 {
2655   raiseAsync(tso,NULL);
2656 }
2657
2658 void
2659 raiseAsync(StgTSO *tso, StgClosure *exception)
2660 {
2661   StgUpdateFrame* su = tso->su;
2662   StgPtr          sp = tso->sp;
2663   
2664   /* Thread already dead? */
2665   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2666     return;
2667   }
2668
2669   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2670
2671   /* Remove it from any blocking queues */
2672   unblockThread(tso);
2673
2674   /* The stack freezing code assumes there's a closure pointer on
2675    * the top of the stack.  This isn't always the case with compiled
2676    * code, so we have to push a dummy closure on the top which just
2677    * returns to the next return address on the stack.
2678    */
2679   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2680     *(--sp) = (W_)&dummy_ret_closure;
2681   }
2682
2683   while (1) {
2684     int words = ((P_)su - (P_)sp) - 1;
2685     nat i;
2686     StgAP_UPD * ap;
2687
2688     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2689      * then build PAP(handler,exception,realworld#), and leave it on
2690      * top of the stack ready to enter.
2691      */
2692     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2693       StgCatchFrame *cf = (StgCatchFrame *)su;
2694       /* we've got an exception to raise, so let's pass it to the
2695        * handler in this frame.
2696        */
2697       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2698       TICK_ALLOC_UPD_PAP(3,0);
2699       SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2700               
2701       ap->n_args = 2;
2702       ap->fun = cf->handler;    /* :: Exception -> IO a */
2703       ap->payload[0] = (P_)exception;
2704       ap->payload[1] = ARG_TAG(0); /* realworld token */
2705
2706       /* throw away the stack from Sp up to and including the
2707        * CATCH_FRAME.
2708        */
2709       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2710       tso->su = cf->link;
2711
2712       /* Restore the blocked/unblocked state for asynchronous exceptions
2713        * at the CATCH_FRAME.  
2714        *
2715        * If exceptions were unblocked at the catch, arrange that they
2716        * are unblocked again after executing the handler by pushing an
2717        * unblockAsyncExceptions_ret stack frame.
2718        */
2719       if (!cf->exceptions_blocked) {
2720         *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2721       }
2722       
2723       /* Ensure that async exceptions are blocked when running the handler.
2724        */
2725       if (tso->blocked_exceptions == NULL) {
2726         tso->blocked_exceptions = END_TSO_QUEUE;
2727       }
2728       
2729       /* Put the newly-built PAP on top of the stack, ready to execute
2730        * when the thread restarts.
2731        */
2732       sp[0] = (W_)ap;
2733       tso->sp = sp;
2734       tso->what_next = ThreadEnterGHC;
2735       return;
2736     }
2737
2738     /* First build an AP_UPD consisting of the stack chunk above the
2739      * current update frame, with the top word on the stack as the
2740      * fun field.
2741      */
2742     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2743     
2744     ASSERT(words >= 0);
2745     
2746     ap->n_args = words;
2747     ap->fun    = (StgClosure *)sp[0];
2748     sp++;
2749     for(i=0; i < (nat)words; ++i) {
2750       ap->payload[i] = (P_)*sp++;
2751     }
2752     
2753     switch (get_itbl(su)->type) {
2754       
2755     case UPDATE_FRAME:
2756       {
2757         SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2758         TICK_ALLOC_UP_THK(words+1,0);
2759         
2760         IF_DEBUG(scheduler,
2761                  fprintf(stderr,  "scheduler: Updating ");
2762                  printPtr((P_)su->updatee); 
2763                  fprintf(stderr,  " with ");
2764                  printObj((StgClosure *)ap);
2765                  );
2766         
2767         /* Replace the updatee with an indirection - happily
2768          * this will also wake up any threads currently
2769          * waiting on the result.
2770          */
2771         UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2772         su = su->link;
2773         sp += sizeofW(StgUpdateFrame) -1;
2774         sp[0] = (W_)ap; /* push onto stack */
2775         break;
2776       }
2777       
2778     case CATCH_FRAME:
2779       {
2780         StgCatchFrame *cf = (StgCatchFrame *)su;
2781         StgClosure* o;
2782         
2783         /* We want a PAP, not an AP_UPD.  Fortunately, the
2784          * layout's the same.
2785          */
2786         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2787         TICK_ALLOC_UPD_PAP(words+1,0);
2788         
2789         /* now build o = FUN(catch,ap,handler) */
2790         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2791         TICK_ALLOC_FUN(2,0);
2792         SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2793         o->payload[0] = (StgClosure *)ap;
2794         o->payload[1] = cf->handler;
2795         
2796         IF_DEBUG(scheduler,
2797                  fprintf(stderr,  "scheduler: Built ");
2798                  printObj((StgClosure *)o);
2799                  );
2800         
2801         /* pop the old handler and put o on the stack */
2802         su = cf->link;
2803         sp += sizeofW(StgCatchFrame) - 1;
2804         sp[0] = (W_)o;
2805         break;
2806       }
2807       
2808     case SEQ_FRAME:
2809       {
2810         StgSeqFrame *sf = (StgSeqFrame *)su;
2811         StgClosure* o;
2812         
2813         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2814         TICK_ALLOC_UPD_PAP(words+1,0);
2815         
2816         /* now build o = FUN(seq,ap) */
2817         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2818         TICK_ALLOC_SE_THK(1,0);
2819         SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2820         o->payload[0] = (StgClosure *)ap;
2821         
2822         IF_DEBUG(scheduler,
2823                  fprintf(stderr,  "scheduler: Built ");
2824                  printObj((StgClosure *)o);
2825                  );
2826         
2827         /* pop the old handler and put o on the stack */
2828         su = sf->link;
2829         sp += sizeofW(StgSeqFrame) - 1;
2830         sp[0] = (W_)o;
2831         break;
2832       }
2833       
2834     case STOP_FRAME:
2835       /* We've stripped the entire stack, the thread is now dead. */
2836       sp += sizeofW(StgStopFrame) - 1;
2837       sp[0] = (W_)exception;    /* save the exception */
2838       tso->what_next = ThreadKilled;
2839       tso->su = (StgUpdateFrame *)(sp+1);
2840       tso->sp = sp;
2841       return;
2842       
2843     default:
2844       barf("raiseAsync");
2845     }
2846   }
2847   barf("raiseAsync");
2848 }
2849
2850 /* -----------------------------------------------------------------------------
2851    resurrectThreads is called after garbage collection on the list of
2852    threads found to be garbage.  Each of these threads will be woken
2853    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2854    on an MVar, or NonTermination if the thread was blocked on a Black
2855    Hole.
2856    -------------------------------------------------------------------------- */
2857
2858 void
2859 resurrectThreads( StgTSO *threads )
2860 {
2861   StgTSO *tso, *next;
2862
2863   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2864     next = tso->global_link;
2865     tso->global_link = all_threads;
2866     all_threads = tso;
2867     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2868
2869     switch (tso->why_blocked) {
2870     case BlockedOnMVar:
2871     case BlockedOnException:
2872       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2873       break;
2874     case BlockedOnBlackHole:
2875       raiseAsync(tso,(StgClosure *)NonTermination_closure);
2876       break;
2877     case NotBlocked:
2878       /* This might happen if the thread was blocked on a black hole
2879        * belonging to a thread that we've just woken up (raiseAsync
2880        * can wake up threads, remember...).
2881        */
2882       continue;
2883     default:
2884       barf("resurrectThreads: thread blocked in a strange way");
2885     }
2886   }
2887 }
2888
2889 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2890 //@subsection Debugging Routines
2891
2892 /* -----------------------------------------------------------------------------
2893    Debugging: why is a thread blocked
2894    -------------------------------------------------------------------------- */
2895
2896 #ifdef DEBUG
2897
2898 void
2899 printThreadBlockage(StgTSO *tso)
2900 {
2901   switch (tso->why_blocked) {
2902   case BlockedOnRead:
2903     fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2904     break;
2905   case BlockedOnWrite:
2906     fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2907     break;
2908   case BlockedOnDelay:
2909 #if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
2910     fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2911 #else
2912     fprintf(stderr,"blocked on delay of %d ms", 
2913             tso->block_info.target - getourtimeofday());
2914 #endif
2915     break;
2916   case BlockedOnMVar:
2917     fprintf(stderr,"blocked on an MVar");
2918     break;
2919   case BlockedOnException:
2920     fprintf(stderr,"blocked on delivering an exception to thread %d",
2921             tso->block_info.tso->id);
2922     break;
2923   case BlockedOnBlackHole:
2924     fprintf(stderr,"blocked on a black hole");
2925     break;
2926   case NotBlocked:
2927     fprintf(stderr,"not blocked");
2928     break;
2929 #if defined(PAR)
2930   case BlockedOnGA:
2931     fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)",
2932             tso->block_info.closure, info_type(tso->block_info.closure));
2933     break;
2934   case BlockedOnGA_NoSend:
2935     fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)",
2936             tso->block_info.closure, info_type(tso->block_info.closure));
2937     break;
2938 #endif
2939   default:
2940     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
2941          tso->why_blocked, tso->id, tso);
2942   }
2943 }
2944
2945 void
2946 printThreadStatus(StgTSO *tso)
2947 {
2948   switch (tso->what_next) {
2949   case ThreadKilled:
2950     fprintf(stderr,"has been killed");
2951     break;
2952   case ThreadComplete:
2953     fprintf(stderr,"has completed");
2954     break;
2955   default:
2956     printThreadBlockage(tso);
2957   }
2958 }
2959
2960 void
2961 printAllThreads(void)
2962 {
2963   StgTSO *t;
2964
2965   sched_belch("all threads:");
2966   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2967     fprintf(stderr, "\tthread %d is ", t->id);
2968     printThreadStatus(t);
2969     fprintf(stderr,"\n");
2970   }
2971 }
2972     
2973 /* 
2974    Print a whole blocking queue attached to node (debugging only).
2975 */
2976 //@cindex print_bq
2977 # if defined(PAR)
2978 void 
2979 print_bq (StgClosure *node)
2980 {
2981   StgBlockingQueueElement *bqe;
2982   StgTSO *tso;
2983   rtsBool end;
2984
2985   fprintf(stderr,"## BQ of closure %p (%s): ",
2986           node, info_type(node));
2987
2988   /* should cover all closures that may have a blocking queue */
2989   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2990          get_itbl(node)->type == FETCH_ME_BQ ||
2991          get_itbl(node)->type == RBH);
2992     
2993   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2994   /* 
2995      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2996   */
2997   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2998        !end; // iterate until bqe points to a CONSTR
2999        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3000     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3001     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
3002     /* types of closures that may appear in a blocking queue */
3003     ASSERT(get_itbl(bqe)->type == TSO ||           
3004            get_itbl(bqe)->type == BLOCKED_FETCH || 
3005            get_itbl(bqe)->type == CONSTR); 
3006     /* only BQs of an RBH end with an RBH_Save closure */
3007     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3008
3009     switch (get_itbl(bqe)->type) {
3010     case TSO:
3011       fprintf(stderr," TSO %d (%x),",
3012               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3013       break;
3014     case BLOCKED_FETCH:
3015       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3016               ((StgBlockedFetch *)bqe)->node, 
3017               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3018               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3019               ((StgBlockedFetch *)bqe)->ga.weight);
3020       break;
3021     case CONSTR:
3022       fprintf(stderr," %s (IP %p),",
3023               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3024                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3025                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3026                "RBH_Save_?"), get_itbl(bqe));
3027       break;
3028     default:
3029       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3030            info_type(bqe), node, info_type(node));
3031       break;
3032     }
3033   } /* for */
3034   fputc('\n', stderr);
3035 }
3036 # elif defined(GRAN)
3037 void 
3038 print_bq (StgClosure *node)
3039 {
3040   StgBlockingQueueElement *bqe;
3041   PEs node_loc, tso_loc;
3042   rtsBool end;
3043
3044   /* should cover all closures that may have a blocking queue */
3045   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3046          get_itbl(node)->type == FETCH_ME_BQ ||
3047          get_itbl(node)->type == RBH);
3048     
3049   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3050   node_loc = where_is(node);
3051
3052   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3053           node, info_type(node), node_loc);
3054
3055   /* 
3056      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3057   */
3058   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3059        !end; // iterate until bqe points to a CONSTR
3060        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3061     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3062     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3063     /* types of closures that may appear in a blocking queue */
3064     ASSERT(get_itbl(bqe)->type == TSO ||           
3065            get_itbl(bqe)->type == CONSTR); 
3066     /* only BQs of an RBH end with an RBH_Save closure */
3067     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3068
3069     tso_loc = where_is((StgClosure *)bqe);
3070     switch (get_itbl(bqe)->type) {
3071     case TSO:
3072       fprintf(stderr," TSO %d (%p) on [PE %d],",
3073               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3074       break;
3075     case CONSTR:
3076       fprintf(stderr," %s (IP %p),",
3077               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3078                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3079                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3080                "RBH_Save_?"), get_itbl(bqe));
3081       break;
3082     default:
3083       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3084            info_type((StgClosure *)bqe), node, info_type(node));
3085       break;
3086     }
3087   } /* for */
3088   fputc('\n', stderr);
3089 }
3090 #else
3091 /* 
3092    Nice and easy: only TSOs on the blocking queue
3093 */
3094 void 
3095 print_bq (StgClosure *node)
3096 {
3097   StgTSO *tso;
3098
3099   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3100   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3101        tso != END_TSO_QUEUE; 
3102        tso=tso->link) {
3103     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3104     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3105     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3106   }
3107   fputc('\n', stderr);
3108 }
3109 # endif
3110
3111 #if defined(PAR)
3112 static nat
3113 run_queue_len(void)
3114 {
3115   nat i;
3116   StgTSO *tso;
3117
3118   for (i=0, tso=run_queue_hd; 
3119        tso != END_TSO_QUEUE;
3120        i++, tso=tso->link)
3121     /* nothing */
3122
3123   return i;
3124 }
3125 #endif
3126
3127 static void
3128 sched_belch(char *s, ...)
3129 {
3130   va_list ap;
3131   va_start(ap,s);
3132 #ifdef SMP
3133   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3134 #else
3135   fprintf(stderr, "scheduler: ");
3136 #endif
3137   vfprintf(stderr, s, ap);
3138   fprintf(stderr, "\n");
3139 }
3140
3141 #endif /* DEBUG */
3142
3143
3144 //@node Index,  , Debugging Routines, Main scheduling code
3145 //@subsection Index
3146
3147 //@index
3148 //* MainRegTable::  @cindex\s-+MainRegTable
3149 //* StgMainThread::  @cindex\s-+StgMainThread
3150 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3151 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3152 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3153 //* context_switch::  @cindex\s-+context_switch
3154 //* createThread::  @cindex\s-+createThread
3155 //* free_capabilities::  @cindex\s-+free_capabilities
3156 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3157 //* initScheduler::  @cindex\s-+initScheduler
3158 //* interrupted::  @cindex\s-+interrupted
3159 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3160 //* next_thread_id::  @cindex\s-+next_thread_id
3161 //* print_bq::  @cindex\s-+print_bq
3162 //* run_queue_hd::  @cindex\s-+run_queue_hd
3163 //* run_queue_tl::  @cindex\s-+run_queue_tl
3164 //* sched_mutex::  @cindex\s-+sched_mutex
3165 //* schedule::  @cindex\s-+schedule
3166 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3167 //* task_ids::  @cindex\s-+task_ids
3168 //* term_mutex::  @cindex\s-+term_mutex
3169 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3170 //@end index