[project @ 2000-03-31 03:09:35 by hwloidl]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.60 2000/03/31 03:09:36 hwloidl Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Main scheduling loop::      
41 //* Suspend and Resume::        
42 //* Run queue code::            
43 //* Garbage Collextion Routines::  
44 //* Blocking Queue Routines::   
45 //* Exception Handling Routines::  
46 //* Debugging Routines::        
47 //* Index::                     
48 //@end menu
49
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
52
53 #include "Rts.h"
54 #include "SchedAPI.h"
55 #include "RtsUtils.h"
56 #include "RtsFlags.h"
57 #include "Storage.h"
58 #include "StgRun.h"
59 #include "StgStartup.h"
60 #include "GC.h"
61 #include "Hooks.h"
62 #include "Schedule.h"
63 #include "StgMiscClosures.h"
64 #include "Storage.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
67 #include "Printer.h"
68 #include "Main.h"
69 #include "Signals.h"
70 #include "Profiling.h"
71 #include "Sanity.h"
72 #include "Stats.h"
73 #include "Itimer.h"
74 #include "Prelude.h"
75 #if defined(GRAN) || defined(PAR)
76 # include "GranSimRts.h"
77 # include "GranSim.h"
78 # include "ParallelRts.h"
79 # include "Parallel.h"
80 # include "ParallelDebug.h"
81 # include "FetchMe.h"
82 # include "HLC.h"
83 #endif
84 #include "Sparks.h"
85
86 #include <stdarg.h>
87
88 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
89 //@subsection Variables and Data structures
90
91 /* Main threads:
92  *
93  * These are the threads which clients have requested that we run.  
94  *
95  * In an SMP build, we might have several concurrent clients all
96  * waiting for results, and each one will wait on a condition variable
97  * until the result is available.
98  *
99  * In non-SMP, clients are strictly nested: the first client calls
100  * into the RTS, which might call out again to C with a _ccall_GC, and
101  * eventually re-enter the RTS.
102  *
103  * Main threads information is kept in a linked list:
104  */
105 //@cindex StgMainThread
106 typedef struct StgMainThread_ {
107   StgTSO *         tso;
108   SchedulerStatus  stat;
109   StgClosure **    ret;
110 #ifdef SMP
111   pthread_cond_t wakeup;
112 #endif
113   struct StgMainThread_ *link;
114 } StgMainThread;
115
116 /* Main thread queue.
117  * Locks required: sched_mutex.
118  */
119 static StgMainThread *main_threads;
120
121 /* Thread queues.
122  * Locks required: sched_mutex.
123  */
124 #if defined(GRAN)
125
126 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
127 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
128
129 /* 
130    In GranSim we have a runable and a blocked queue for each processor.
131    In order to minimise code changes new arrays run_queue_hds/tls
132    are created. run_queue_hd is then a short cut (macro) for
133    run_queue_hds[CurrentProc] (see GranSim.h).
134    -- HWL
135 */
136 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
137 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
138 StgTSO *ccalling_threadss[MAX_PROC];
139 /* We use the same global list of threads (all_threads) in GranSim as in
140    the std RTS (i.e. we are cheating). However, we don't use this list in
141    the GranSim specific code at the moment (so we are only potentially
142    cheating).  */
143
144 #else /* !GRAN */
145
146 StgTSO *run_queue_hd, *run_queue_tl;
147 StgTSO *blocked_queue_hd, *blocked_queue_tl;
148
149 #endif
150
151 /* Linked list of all threads.
152  * Used for detecting garbage collected threads.
153  */
154 StgTSO *all_threads;
155
156 /* Threads suspended in _ccall_GC.
157  */
158 static StgTSO *suspended_ccalling_threads;
159
160 static void GetRoots(void);
161 static StgTSO *threadStackOverflow(StgTSO *tso);
162
163 /* KH: The following two flags are shared memory locations.  There is no need
164        to lock them, since they are only unset at the end of a scheduler
165        operation.
166 */
167
168 /* flag set by signal handler to precipitate a context switch */
169 //@cindex context_switch
170 nat context_switch;
171
172 /* if this flag is set as well, give up execution */
173 //@cindex interrupted
174 rtsBool interrupted;
175
176 /* Next thread ID to allocate.
177  * Locks required: sched_mutex
178  */
179 //@cindex next_thread_id
180 StgThreadID next_thread_id = 1;
181
182 /*
183  * Pointers to the state of the current thread.
184  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
185  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
186  */
187  
188 /* The smallest stack size that makes any sense is:
189  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
190  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
191  *  + 1                       (the realworld token for an IO thread)
192  *  + 1                       (the closure to enter)
193  *
194  * A thread with this stack will bomb immediately with a stack
195  * overflow, which will increase its stack size.  
196  */
197
198 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
199
200 /* Free capability list.
201  * Locks required: sched_mutex.
202  */
203 #ifdef SMP
204 //@cindex free_capabilities
205 //@cindex n_free_capabilities
206 Capability *free_capabilities; /* Available capabilities for running threads */
207 nat n_free_capabilities;       /* total number of available capabilities */
208 #else
209 //@cindex MainRegTable
210 Capability MainRegTable;       /* for non-SMP, we have one global capability */
211 #endif
212
213 #if defined(GRAN)
214 StgTSO *CurrentTSO;
215 #endif
216
217 rtsBool ready_to_gc;
218
219 /* All our current task ids, saved in case we need to kill them later.
220  */
221 #ifdef SMP
222 //@cindex task_ids
223 task_info *task_ids;
224 #endif
225
226 void            addToBlockedQueue ( StgTSO *tso );
227
228 static void     schedule          ( void );
229        void     interruptStgRts   ( void );
230 #if defined(GRAN)
231 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
232 #else
233 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
234 #endif
235
236 #ifdef DEBUG
237 static void sched_belch(char *s, ...);
238 #endif
239
240 #ifdef SMP
241 //@cindex sched_mutex
242 //@cindex term_mutex
243 //@cindex thread_ready_cond
244 //@cindex gc_pending_cond
245 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
246 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
247 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
248 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
249
250 nat await_death;
251 #endif
252
253 #if defined(PAR)
254 StgTSO *LastTSO;
255 rtsTime TimeOfLastYield;
256 #endif
257
258 #if DEBUG
259 char *whatNext_strs[] = {
260   "ThreadEnterGHC",
261   "ThreadRunGHC",
262   "ThreadEnterHugs",
263   "ThreadKilled",
264   "ThreadComplete"
265 };
266
267 char *threadReturnCode_strs[] = {
268   "HeapOverflow",                       /* might also be StackOverflow */
269   "StackOverflow",
270   "ThreadYielding",
271   "ThreadBlocked",
272   "ThreadFinished"
273 };
274 #endif
275
276 /*
277  * The thread state for the main thread.
278 // ToDo: check whether not needed any more
279 StgTSO   *MainTSO;
280  */
281
282 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
283 //@subsection Main scheduling loop
284
285 /* ---------------------------------------------------------------------------
286    Main scheduling loop.
287
288    We use round-robin scheduling, each thread returning to the
289    scheduler loop when one of these conditions is detected:
290
291       * out of heap space
292       * timer expires (thread yields)
293       * thread blocks
294       * thread ends
295       * stack overflow
296
297    Locking notes:  we acquire the scheduler lock once at the beginning
298    of the scheduler loop, and release it when
299     
300       * running a thread, or
301       * waiting for work, or
302       * waiting for a GC to complete.
303
304    GRAN version:
305      In a GranSim setup this loop iterates over the global event queue.
306      This revolves around the global event queue, which determines what 
307      to do next. Therefore, it's more complicated than either the 
308      concurrent or the parallel (GUM) setup.
309
310    GUM version:
311      GUM iterates over incoming messages.
312      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
313      and sends out a fish whenever it has nothing to do; in-between
314      doing the actual reductions (shared code below) it processes the
315      incoming messages and deals with delayed operations 
316      (see PendingFetches).
317      This is not the ugliest code you could imagine, but it's bloody close.
318
319    ------------------------------------------------------------------------ */
320 //@cindex schedule
321 static void
322 schedule( void )
323 {
324   StgTSO *t;
325   Capability *cap;
326   StgThreadReturnCode ret;
327 #if defined(GRAN)
328   rtsEvent *event;
329 #elif defined(PAR)
330   StgSparkPool *pool;
331   rtsSpark spark;
332   StgTSO *tso;
333   GlobalTaskId pe;
334 #endif
335   rtsBool was_interrupted = rtsFalse;
336   
337   ACQUIRE_LOCK(&sched_mutex);
338
339 #if defined(GRAN)
340
341   /* set up first event to get things going */
342   /* ToDo: assign costs for system setup and init MainTSO ! */
343   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
344             ContinueThread, 
345             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
346
347   IF_DEBUG(gran,
348            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
349            G_TSO(CurrentTSO, 5));
350
351   if (RtsFlags.GranFlags.Light) {
352     /* Save current time; GranSim Light only */
353     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
354   }      
355
356   event = get_next_event();
357
358   while (event!=(rtsEvent*)NULL) {
359     /* Choose the processor with the next event */
360     CurrentProc = event->proc;
361     CurrentTSO = event->tso;
362
363 #elif defined(PAR)
364
365   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
366
367 #else
368
369   while (1) {
370
371 #endif
372
373     IF_DEBUG(scheduler, printAllThreads());
374
375     /* If we're interrupted (the user pressed ^C, or some other
376      * termination condition occurred), kill all the currently running
377      * threads.
378      */
379     if (interrupted) {
380       IF_DEBUG(scheduler, sched_belch("interrupted"));
381       for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
382         deleteThread(t);
383       }
384       for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
385         deleteThread(t);
386       }
387       run_queue_hd = run_queue_tl = END_TSO_QUEUE;
388       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
389       interrupted = rtsFalse;
390       was_interrupted = rtsTrue;
391     }
392
393     /* Go through the list of main threads and wake up any
394      * clients whose computations have finished.  ToDo: this
395      * should be done more efficiently without a linear scan
396      * of the main threads list, somehow...
397      */
398 #ifdef SMP
399     { 
400       StgMainThread *m, **prev;
401       prev = &main_threads;
402       for (m = main_threads; m != NULL; m = m->link) {
403         switch (m->tso->what_next) {
404         case ThreadComplete:
405           if (m->ret) {
406             *(m->ret) = (StgClosure *)m->tso->sp[0];
407           }
408           *prev = m->link;
409           m->stat = Success;
410           pthread_cond_broadcast(&m->wakeup);
411           break;
412         case ThreadKilled:
413           *prev = m->link;
414           if (was_interrupted) {
415             m->stat = Interrupted;
416           } else {
417             m->stat = Killed;
418           }
419           pthread_cond_broadcast(&m->wakeup);
420           break;
421         default:
422           break;
423         }
424       }
425     }
426
427 #else
428 # if defined(PAR)
429     /* in GUM do this only on the Main PE */
430     if (IAmMainThread)
431 # endif
432     /* If our main thread has finished or been killed, return.
433      */
434     {
435       StgMainThread *m = main_threads;
436       if (m->tso->what_next == ThreadComplete
437           || m->tso->what_next == ThreadKilled) {
438         main_threads = main_threads->link;
439         if (m->tso->what_next == ThreadComplete) {
440           /* we finished successfully, fill in the return value */
441           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
442           m->stat = Success;
443           return;
444         } else {
445           if (was_interrupted) {
446             m->stat = Interrupted;
447           } else {
448             m->stat = Killed;
449           }
450           return;
451         }
452       }
453     }
454 #endif
455
456     /* Top up the run queue from our spark pool.  We try to make the
457      * number of threads in the run queue equal to the number of
458      * free capabilities.
459      */
460 #if defined(SMP)
461     {
462       nat n = n_free_capabilities;
463       StgTSO *tso = run_queue_hd;
464
465       /* Count the run queue */
466       while (n > 0 && tso != END_TSO_QUEUE) {
467         tso = tso->link;
468         n--;
469       }
470
471       for (; n > 0; n--) {
472         StgClosure *spark;
473         spark = findSpark();
474         if (spark == NULL) {
475           break; /* no more sparks in the pool */
476         } else {
477           /* I'd prefer this to be done in activateSpark -- HWL */
478           /* tricky - it needs to hold the scheduler lock and
479            * not try to re-acquire it -- SDM */
480           StgTSO *tso;
481           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
482           pushClosure(tso,spark);
483           PUSH_ON_RUN_QUEUE(tso);
484 #ifdef PAR
485           advisory_thread_count++;
486 #endif
487           
488           IF_DEBUG(scheduler,
489                    sched_belch("turning spark of closure %p into a thread",
490                                (StgClosure *)spark));
491         }
492       }
493       /* We need to wake up the other tasks if we just created some
494        * work for them.
495        */
496       if (n_free_capabilities - n > 1) {
497           pthread_cond_signal(&thread_ready_cond);
498       }
499     }
500 #endif /* SMP */
501
502     /* Check whether any waiting threads need to be woken up.  If the
503      * run queue is empty, and there are no other tasks running, we
504      * can wait indefinitely for something to happen.
505      * ToDo: what if another client comes along & requests another
506      * main thread?
507      */
508     if (blocked_queue_hd != END_TSO_QUEUE) {
509       awaitEvent(
510            (run_queue_hd == END_TSO_QUEUE)
511 #ifdef SMP
512         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
513 #endif
514         );
515     }
516     
517     /* check for signals each time around the scheduler */
518 #ifndef __MINGW32__
519     if (signals_pending()) {
520       start_signal_handlers();
521     }
522 #endif
523
524     /* Detect deadlock: when we have no threads to run, there are
525      * no threads waiting on I/O or sleeping, and all the other
526      * tasks are waiting for work, we must have a deadlock.  Inform
527      * all the main threads.
528      */
529 #ifdef SMP
530     if (blocked_queue_hd == END_TSO_QUEUE
531         && run_queue_hd == END_TSO_QUEUE
532         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
533         ) {
534       StgMainThread *m;
535       for (m = main_threads; m != NULL; m = m->link) {
536           m->ret = NULL;
537           m->stat = Deadlock;
538           pthread_cond_broadcast(&m->wakeup);
539       }
540       main_threads = NULL;
541     }
542 #else /* ! SMP */
543     /* 
544        In GUM all non-main PEs come in here with no work;
545        we ignore multiple main threads for now 
546
547     if (blocked_queue_hd == END_TSO_QUEUE
548         && run_queue_hd == END_TSO_QUEUE) {
549       StgMainThread *m = main_threads;
550       m->ret = NULL;
551       m->stat = Deadlock;
552       main_threads = m->link;
553       return;
554     }
555     */
556 #endif
557
558 #ifdef SMP
559     /* If there's a GC pending, don't do anything until it has
560      * completed.
561      */
562     if (ready_to_gc) {
563       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
564       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
565     }
566     
567     /* block until we've got a thread on the run queue and a free
568      * capability.
569      */
570     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
571       IF_DEBUG(scheduler, sched_belch("waiting for work"));
572       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
573       IF_DEBUG(scheduler, sched_belch("work now available"));
574     }
575 #endif
576
577 #if defined(GRAN)
578
579     if (RtsFlags.GranFlags.Light)
580       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
581
582     /* adjust time based on time-stamp */
583     if (event->time > CurrentTime[CurrentProc] &&
584         event->evttype != ContinueThread)
585       CurrentTime[CurrentProc] = event->time;
586     
587     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
588     if (!RtsFlags.GranFlags.Light)
589       handleIdlePEs();
590
591     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
592
593     /* main event dispatcher in GranSim */
594     switch (event->evttype) {
595       /* Should just be continuing execution */
596     case ContinueThread:
597       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
598       /* ToDo: check assertion
599       ASSERT(run_queue_hd != (StgTSO*)NULL &&
600              run_queue_hd != END_TSO_QUEUE);
601       */
602       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
603       if (!RtsFlags.GranFlags.DoAsyncFetch &&
604           procStatus[CurrentProc]==Fetching) {
605         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
606               CurrentTSO->id, CurrentTSO, CurrentProc);
607         goto next_thread;
608       } 
609       /* Ignore ContinueThreads for completed threads */
610       if (CurrentTSO->what_next == ThreadComplete) {
611         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
612               CurrentTSO->id, CurrentTSO, CurrentProc);
613         goto next_thread;
614       } 
615       /* Ignore ContinueThreads for threads that are being migrated */
616       if (PROCS(CurrentTSO)==Nowhere) { 
617         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
618               CurrentTSO->id, CurrentTSO, CurrentProc);
619         goto next_thread;
620       }
621       /* The thread should be at the beginning of the run queue */
622       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
623         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
624               CurrentTSO->id, CurrentTSO, CurrentProc);
625         break; // run the thread anyway
626       }
627       /*
628       new_event(proc, proc, CurrentTime[proc],
629                 FindWork,
630                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
631       goto next_thread; 
632       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
633       break; // now actually run the thread; DaH Qu'vam yImuHbej 
634
635     case FetchNode:
636       do_the_fetchnode(event);
637       goto next_thread;             /* handle next event in event queue  */
638       
639     case GlobalBlock:
640       do_the_globalblock(event);
641       goto next_thread;             /* handle next event in event queue  */
642       
643     case FetchReply:
644       do_the_fetchreply(event);
645       goto next_thread;             /* handle next event in event queue  */
646       
647     case UnblockThread:   /* Move from the blocked queue to the tail of */
648       do_the_unblock(event);
649       goto next_thread;             /* handle next event in event queue  */
650       
651     case ResumeThread:  /* Move from the blocked queue to the tail of */
652       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
653       event->tso->gran.blocktime += 
654         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
655       do_the_startthread(event);
656       goto next_thread;             /* handle next event in event queue  */
657       
658     case StartThread:
659       do_the_startthread(event);
660       goto next_thread;             /* handle next event in event queue  */
661       
662     case MoveThread:
663       do_the_movethread(event);
664       goto next_thread;             /* handle next event in event queue  */
665       
666     case MoveSpark:
667       do_the_movespark(event);
668       goto next_thread;             /* handle next event in event queue  */
669       
670     case FindWork:
671       do_the_findwork(event);
672       goto next_thread;             /* handle next event in event queue  */
673       
674     default:
675       barf("Illegal event type %u\n", event->evttype);
676     }  /* switch */
677     
678     /* This point was scheduler_loop in the old RTS */
679
680     IF_DEBUG(gran, belch("GRAN: after main switch"));
681
682     TimeOfLastEvent = CurrentTime[CurrentProc];
683     TimeOfNextEvent = get_time_of_next_event();
684     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
685     // CurrentTSO = ThreadQueueHd;
686
687     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
688                          TimeOfNextEvent));
689
690     if (RtsFlags.GranFlags.Light) 
691       GranSimLight_leave_system(event, &ActiveTSO); 
692
693     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
694
695     IF_DEBUG(gran, 
696              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
697
698     /* in a GranSim setup the TSO stays on the run queue */
699     t = CurrentTSO;
700     /* Take a thread from the run queue. */
701     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
702
703     IF_DEBUG(gran, 
704              fprintf(stderr, "GRAN: About to run current thread, which is\n");
705              G_TSO(t,5))
706
707     context_switch = 0; // turned on via GranYield, checking events and time slice
708
709     IF_DEBUG(gran, 
710              DumpGranEvent(GR_SCHEDULE, t));
711
712     procStatus[CurrentProc] = Busy;
713
714 #elif defined(PAR)
715
716     if (PendingFetches != END_BF_QUEUE) {
717         processFetches();
718     }
719
720     /* ToDo: phps merge with spark activation above */
721     /* check whether we have local work and send requests if we have none */
722     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
723       /* :-[  no local threads => look out for local sparks */
724       /* the spark pool for the current PE */
725       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
726       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
727           pool->hd < pool->tl) {
728         /* 
729          * ToDo: add GC code check that we really have enough heap afterwards!!
730          * Old comment:
731          * If we're here (no runnable threads) and we have pending
732          * sparks, we must have a space problem.  Get enough space
733          * to turn one of those pending sparks into a
734          * thread... 
735          */
736         
737         spark = findSpark();                /* get a spark */
738         if (spark != (rtsSpark) NULL) {
739           tso = activateSpark(spark);       /* turn the spark into a thread */
740           IF_PAR_DEBUG(schedule,
741                        belch("==== schedule: Created TSO %d (%p); %d threads active",
742                              tso->id, tso, advisory_thread_count));
743
744           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
745             belch("==^^ failed to activate spark");
746             goto next_thread;
747           }               /* otherwise fall through & pick-up new tso */
748         } else {
749           IF_PAR_DEBUG(verbose,
750                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
751                              spark_queue_len(pool)));
752           goto next_thread;
753         }
754       } else  
755       /* =8-[  no local sparks => look for work on other PEs */
756       {
757         /*
758          * We really have absolutely no work.  Send out a fish
759          * (there may be some out there already), and wait for
760          * something to arrive.  We clearly can't run any threads
761          * until a SCHEDULE or RESUME arrives, and so that's what
762          * we're hoping to see.  (Of course, we still have to
763          * respond to other types of messages.)
764          */
765         if (//!fishing &&  
766             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
767           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
768           /* fishing set in sendFish, processFish;
769              avoid flooding system with fishes via delay */
770           pe = choosePE();
771           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
772                    NEW_FISH_HUNGER);
773         }
774         
775         processMessages();
776         goto next_thread;
777         // ReSchedule(0);
778       }
779     } else if (PacketsWaiting()) {  /* Look for incoming messages */
780       processMessages();
781     }
782
783     /* Now we are sure that we have some work available */
784     ASSERT(run_queue_hd != END_TSO_QUEUE);
785     /* Take a thread from the run queue, if we have work */
786     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
787
788     /* ToDo: write something to the log-file
789     if (RTSflags.ParFlags.granSimStats && !sameThread)
790         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
791
792     CurrentTSO = t;
793     */
794     /* the spark pool for the current PE */
795     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
796
797     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)", 
798                               spark_queue_len(pool), 
799                               CURRENT_PROC,
800                               pool->hd, pool->tl, pool->base, pool->lim));
801
802     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
803                               run_queue_len(), CURRENT_PROC,
804                               run_queue_hd, run_queue_tl));
805
806 #if 0
807     if (t != LastTSO) {
808       /* 
809          we are running a different TSO, so write a schedule event to log file
810          NB: If we use fair scheduling we also have to write  a deschedule 
811              event for LastTSO; with unfair scheduling we know that the
812              previous tso has blocked whenever we switch to another tso, so
813              we don't need it in GUM for now
814       */
815       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
816                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
817       
818     }
819 #endif
820 #else /* !GRAN && !PAR */
821   
822     /* grab a thread from the run queue
823      */
824     t = POP_RUN_QUEUE();
825     IF_DEBUG(sanity,checkTSO(t));
826
827 #endif
828     
829     /* grab a capability
830      */
831 #ifdef SMP
832     cap = free_capabilities;
833     free_capabilities = cap->link;
834     n_free_capabilities--;
835 #else
836     cap = &MainRegTable;
837 #endif
838     
839     cap->rCurrentTSO = t;
840     
841     /* set the context_switch flag
842      */
843     if (run_queue_hd == END_TSO_QUEUE)
844       context_switch = 0;
845     else
846       context_switch = 1;
847
848     RELEASE_LOCK(&sched_mutex);
849
850 #if defined(GRAN) || defined(PAR)    
851     IF_DEBUG(scheduler, belch("-->> Running TSO %ld (%p) %s ...", 
852                               t->id, t, whatNext_strs[t->what_next]));
853 #else
854     IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
855 #endif
856
857     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
858     /* Run the current thread 
859      */
860     switch (cap->rCurrentTSO->what_next) {
861     case ThreadKilled:
862     case ThreadComplete:
863       /* Thread already finished, return to scheduler. */
864       ret = ThreadFinished;
865       break;
866     case ThreadEnterGHC:
867       ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
868       break;
869     case ThreadRunGHC:
870       ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
871       break;
872     case ThreadEnterHugs:
873 #ifdef INTERPRETER
874       {
875          StgClosure* c;
876          IF_DEBUG(scheduler,sched_belch("entering Hugs"));
877          c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
878          cap->rCurrentTSO->sp += 1;
879          ret = enter(cap,c);
880          break;
881       }
882 #else
883       barf("Panic: entered a BCO but no bytecode interpreter in this build");
884 #endif
885     default:
886       barf("schedule: invalid what_next field");
887     }
888     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
889     
890     /* Costs for the scheduler are assigned to CCS_SYSTEM */
891 #ifdef PROFILING
892     CCCS = CCS_SYSTEM;
893 #endif
894     
895     ACQUIRE_LOCK(&sched_mutex);
896
897 #ifdef SMP
898     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
899 #elif !defined(GRAN) && !defined(PAR)
900     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
901 #endif
902     t = cap->rCurrentTSO;
903     
904 #if defined(PAR)
905     /* HACK 675: if the last thread didn't yield, make sure to print a 
906        SCHEDULE event to the log file when StgRunning the next thread, even
907        if it is the same one as before */
908     LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t; 
909     TimeOfLastYield = CURRENT_TIME;
910 #endif
911
912     switch (ret) {
913     case HeapOverflow:
914       /* make all the running tasks block on a condition variable,
915        * maybe set context_switch and wait till they all pile in,
916        * then have them wait on a GC condition variable.
917        */
918 #if defined(GRAN) || defined(PAR)    
919       IF_DEBUG(scheduler,belch("--<< TSO %ld (%p; %s) stopped: HeapOverflow", 
920                                t->id, t, whatNext_strs[t->what_next]));
921 #endif
922       threadPaused(t);
923 #if defined(GRAN)
924       ASSERT(!is_on_queue(t,CurrentProc));
925 #endif
926       
927       ready_to_gc = rtsTrue;
928       context_switch = 1;               /* stop other threads ASAP */
929       PUSH_ON_RUN_QUEUE(t);
930       /* actual GC is done at the end of the while loop */
931       break;
932       
933     case StackOverflow:
934 #if defined(GRAN) || defined(PAR)    
935       IF_DEBUG(scheduler,belch("--<< TSO %ld (%p; %s) stopped, StackOverflow", 
936                                t->id, t, whatNext_strs[t->what_next]));
937 #endif
938       /* just adjust the stack for this thread, then pop it back
939        * on the run queue.
940        */
941       threadPaused(t);
942       { 
943         StgMainThread *m;
944         /* enlarge the stack */
945         StgTSO *new_t = threadStackOverflow(t);
946         
947         /* This TSO has moved, so update any pointers to it from the
948          * main thread stack.  It better not be on any other queues...
949          * (it shouldn't be).
950          */
951         for (m = main_threads; m != NULL; m = m->link) {
952           if (m->tso == t) {
953             m->tso = new_t;
954           }
955         }
956         threadPaused(new_t);
957         PUSH_ON_RUN_QUEUE(new_t);
958       }
959       break;
960
961     case ThreadYielding:
962 #if defined(GRAN)
963       IF_DEBUG(gran, 
964                DumpGranEvent(GR_DESCHEDULE, t));
965       globalGranStats.tot_yields++;
966 #elif defined(PAR)
967       IF_DEBUG(par, 
968                DumpGranEvent(GR_DESCHEDULE, t));
969 #endif
970       /* put the thread back on the run queue.  Then, if we're ready to
971        * GC, check whether this is the last task to stop.  If so, wake
972        * up the GC thread.  getThread will block during a GC until the
973        * GC is finished.
974        */
975 #if defined(GRAN) || defined(PAR)    
976       IF_DEBUG(scheduler,
977                if (t->what_next == ThreadEnterHugs) {
978                    /* ToDo: or maybe a timer expired when we were in Hugs?
979                     * or maybe someone hit ctrl-C
980                     */
981                    belch("--<< TSO %ld (%p; %s) stopped to switch to Hugs", 
982                          t->id, t, whatNext_strs[t->what_next]);
983                } else {
984                    belch("--<< TSO %ld (%p; %s) stopped, yielding", 
985                          t->id, t, whatNext_strs[t->what_next]);
986                }
987                );
988 #else
989       IF_DEBUG(scheduler,
990                if (t->what_next == ThreadEnterHugs) {
991                  /* ToDo: or maybe a timer expired when we were in Hugs?
992                   * or maybe someone hit ctrl-C
993                   */
994                  belch("thread %ld stopped to switch to Hugs", t->id);
995                } else {
996                  belch("thread %ld stopped, yielding", t->id);
997                }
998                );
999 #endif
1000       threadPaused(t);
1001       IF_DEBUG(sanity,
1002                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1003                checkTSO(t));
1004       ASSERT(t->link == END_TSO_QUEUE);
1005 #if defined(GRAN)
1006       ASSERT(!is_on_queue(t,CurrentProc));
1007
1008       IF_DEBUG(sanity,
1009                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1010                checkThreadQsSanity(rtsTrue));
1011 #endif
1012       APPEND_TO_RUN_QUEUE(t);
1013 #if defined(GRAN)
1014       /* add a ContinueThread event to actually process the thread */
1015       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1016                 ContinueThread,
1017                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1018       IF_GRAN_DEBUG(bq, 
1019                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1020                G_EVENTQ(0);
1021                G_CURR_THREADQ(0))
1022 #endif /* GRAN */
1023       break;
1024       
1025     case ThreadBlocked:
1026 #if defined(GRAN)
1027       IF_DEBUG(scheduler,
1028                belch("--<< TSO %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1029                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1030                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1031
1032       // ??? needed; should emit block before
1033       IF_DEBUG(gran, 
1034                DumpGranEvent(GR_DESCHEDULE, t)); 
1035       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1036       /*
1037         ngoq Dogh!
1038       ASSERT(procStatus[CurrentProc]==Busy || 
1039               ((procStatus[CurrentProc]==Fetching) && 
1040               (t->block_info.closure!=(StgClosure*)NULL)));
1041       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1042           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1043             procStatus[CurrentProc]==Fetching)) 
1044         procStatus[CurrentProc] = Idle;
1045       */
1046 #elif defined(PAR)
1047       IF_DEBUG(par, 
1048                DumpGranEvent(GR_DESCHEDULE, t)); 
1049
1050       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1051       blockThread(t);
1052
1053       IF_DEBUG(scheduler,
1054                belch("--<< TSO %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1055                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1056                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1057
1058 #else /* !GRAN */
1059       /* don't need to do anything.  Either the thread is blocked on
1060        * I/O, in which case we'll have called addToBlockedQueue
1061        * previously, or it's blocked on an MVar or Blackhole, in which
1062        * case it'll be on the relevant queue already.
1063        */
1064       IF_DEBUG(scheduler,
1065                fprintf(stderr, "--<< TSO %d (%p) stopped ", t->id, t);
1066                printThreadBlockage(t);
1067                fprintf(stderr, "\n"));
1068
1069       /* Only for dumping event to log file 
1070          ToDo: do I need this in GranSim, too?
1071       blockThread(t);
1072       */
1073 #endif
1074       threadPaused(t);
1075       break;
1076       
1077     case ThreadFinished:
1078       /* Need to check whether this was a main thread, and if so, signal
1079        * the task that started it with the return value.  If we have no
1080        * more main threads, we probably need to stop all the tasks until
1081        * we get a new one.
1082        */
1083       IF_DEBUG(scheduler,belch("--++ TSO %d (%p) finished", t->id, t));
1084       t->what_next = ThreadComplete;
1085 #if defined(GRAN)
1086       endThread(t, CurrentProc); // clean-up the thread
1087 #elif defined(PAR)
1088       advisory_thread_count--;
1089       if (RtsFlags.ParFlags.ParStats.Full) 
1090         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1091 #endif
1092       break;
1093       
1094     default:
1095       barf("doneThread: invalid thread return code");
1096     }
1097     
1098 #ifdef SMP
1099     cap->link = free_capabilities;
1100     free_capabilities = cap;
1101     n_free_capabilities++;
1102 #endif
1103
1104 #ifdef SMP
1105     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1106 #else
1107     if (ready_to_gc) 
1108 #endif
1109       {
1110       /* everybody back, start the GC.
1111        * Could do it in this thread, or signal a condition var
1112        * to do it in another thread.  Either way, we need to
1113        * broadcast on gc_pending_cond afterward.
1114        */
1115 #ifdef SMP
1116       IF_DEBUG(scheduler,sched_belch("doing GC"));
1117 #endif
1118       GarbageCollect(GetRoots);
1119       ready_to_gc = rtsFalse;
1120 #ifdef SMP
1121       pthread_cond_broadcast(&gc_pending_cond);
1122 #endif
1123 #if defined(GRAN)
1124       /* add a ContinueThread event to continue execution of current thread */
1125       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1126                 ContinueThread,
1127                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1128       IF_GRAN_DEBUG(bq, 
1129                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1130                G_EVENTQ(0);
1131                G_CURR_THREADQ(0))
1132 #endif /* GRAN */
1133     }
1134 #if defined(GRAN)
1135   next_thread:
1136     IF_GRAN_DEBUG(unused,
1137                   print_eventq(EventHd));
1138
1139     event = get_next_event();
1140
1141 #elif defined(PAR)
1142   next_thread:
1143     /* ToDo: wait for next message to arrive rather than busy wait */
1144
1145 #else /* GRAN */
1146   /* not any more
1147   next_thread:
1148     t = take_off_run_queue(END_TSO_QUEUE);
1149   */
1150 #endif /* GRAN */
1151   } /* end of while(1) */
1152 }
1153
1154 /* A hack for Hugs concurrency support.  Needs sanitisation (?) */
1155 void deleteAllThreads ( void )
1156 {
1157   StgTSO* t;
1158   IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
1159   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1160     deleteThread(t);
1161   }
1162   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1163     deleteThread(t);
1164   }
1165   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1166   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1167 }
1168
1169 /* startThread and  insertThread are now in GranSim.c -- HWL */
1170
1171 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1172 //@subsection Suspend and Resume
1173
1174 /* ---------------------------------------------------------------------------
1175  * Suspending & resuming Haskell threads.
1176  * 
1177  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1178  * its capability before calling the C function.  This allows another
1179  * task to pick up the capability and carry on running Haskell
1180  * threads.  It also means that if the C call blocks, it won't lock
1181  * the whole system.
1182  *
1183  * The Haskell thread making the C call is put to sleep for the
1184  * duration of the call, on the susepended_ccalling_threads queue.  We
1185  * give out a token to the task, which it can use to resume the thread
1186  * on return from the C function.
1187  * ------------------------------------------------------------------------- */
1188    
1189 StgInt
1190 suspendThread( Capability *cap )
1191 {
1192   nat tok;
1193
1194   ACQUIRE_LOCK(&sched_mutex);
1195
1196   IF_DEBUG(scheduler,
1197            sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
1198
1199   threadPaused(cap->rCurrentTSO);
1200   cap->rCurrentTSO->link = suspended_ccalling_threads;
1201   suspended_ccalling_threads = cap->rCurrentTSO;
1202
1203   /* Use the thread ID as the token; it should be unique */
1204   tok = cap->rCurrentTSO->id;
1205
1206 #ifdef SMP
1207   cap->link = free_capabilities;
1208   free_capabilities = cap;
1209   n_free_capabilities++;
1210 #endif
1211
1212   RELEASE_LOCK(&sched_mutex);
1213   return tok; 
1214 }
1215
1216 Capability *
1217 resumeThread( StgInt tok )
1218 {
1219   StgTSO *tso, **prev;
1220   Capability *cap;
1221
1222   ACQUIRE_LOCK(&sched_mutex);
1223
1224   prev = &suspended_ccalling_threads;
1225   for (tso = suspended_ccalling_threads; 
1226        tso != END_TSO_QUEUE; 
1227        prev = &tso->link, tso = tso->link) {
1228     if (tso->id == (StgThreadID)tok) {
1229       *prev = tso->link;
1230       break;
1231     }
1232   }
1233   if (tso == END_TSO_QUEUE) {
1234     barf("resumeThread: thread not found");
1235   }
1236
1237 #ifdef SMP
1238   while (free_capabilities == NULL) {
1239     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1240     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1241     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1242   }
1243   cap = free_capabilities;
1244   free_capabilities = cap->link;
1245   n_free_capabilities--;
1246 #else  
1247   cap = &MainRegTable;
1248 #endif
1249
1250   cap->rCurrentTSO = tso;
1251
1252   RELEASE_LOCK(&sched_mutex);
1253   return cap;
1254 }
1255
1256
1257 /* ---------------------------------------------------------------------------
1258  * Static functions
1259  * ------------------------------------------------------------------------ */
1260 static void unblockThread(StgTSO *tso);
1261
1262 /* ---------------------------------------------------------------------------
1263  * Comparing Thread ids.
1264  *
1265  * This is used from STG land in the implementation of the
1266  * instances of Eq/Ord for ThreadIds.
1267  * ------------------------------------------------------------------------ */
1268
1269 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1270
1271   StgThreadID id1 = tso1->id; 
1272   StgThreadID id2 = tso2->id;
1273  
1274   if (id1 < id2) return (-1);
1275   if (id1 > id2) return 1;
1276   return 0;
1277 }
1278
1279 /* ---------------------------------------------------------------------------
1280    Create a new thread.
1281
1282    The new thread starts with the given stack size.  Before the
1283    scheduler can run, however, this thread needs to have a closure
1284    (and possibly some arguments) pushed on its stack.  See
1285    pushClosure() in Schedule.h.
1286
1287    createGenThread() and createIOThread() (in SchedAPI.h) are
1288    convenient packaged versions of this function.
1289
1290    currently pri (priority) is only used in a GRAN setup -- HWL
1291    ------------------------------------------------------------------------ */
1292 //@cindex createThread
1293 #if defined(GRAN)
1294 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1295 StgTSO *
1296 createThread(nat stack_size, StgInt pri)
1297 {
1298   return createThread_(stack_size, rtsFalse, pri);
1299 }
1300
1301 static StgTSO *
1302 createThread_(nat size, rtsBool have_lock, StgInt pri)
1303 {
1304 #else
1305 StgTSO *
1306 createThread(nat stack_size)
1307 {
1308   return createThread_(stack_size, rtsFalse);
1309 }
1310
1311 static StgTSO *
1312 createThread_(nat size, rtsBool have_lock)
1313 {
1314 #endif
1315
1316     StgTSO *tso;
1317     nat stack_size;
1318
1319     /* First check whether we should create a thread at all */
1320 #if defined(PAR)
1321   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1322   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1323     threadsIgnored++;
1324     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1325           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1326     return END_TSO_QUEUE;
1327   }
1328   threadsCreated++;
1329 #endif
1330
1331 #if defined(GRAN)
1332   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1333 #endif
1334
1335   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1336
1337   /* catch ridiculously small stack sizes */
1338   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1339     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1340   }
1341
1342   stack_size = size - TSO_STRUCT_SIZEW;
1343
1344   tso = (StgTSO *)allocate(size);
1345   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1346
1347   SET_HDR(tso, &TSO_info, CCS_MAIN);
1348 #if defined(GRAN)
1349   SET_GRAN_HDR(tso, ThisPE);
1350 #endif
1351   tso->what_next     = ThreadEnterGHC;
1352
1353   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1354    * protect the increment operation on next_thread_id.
1355    * In future, we could use an atomic increment instead.
1356    */
1357   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1358   tso->id = next_thread_id++; 
1359   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1360
1361   tso->why_blocked  = NotBlocked;
1362   tso->blocked_exceptions = NULL;
1363
1364   tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1365   tso->stack_size   = stack_size;
1366   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1367                               - TSO_STRUCT_SIZEW;
1368   tso->sp           = (P_)&(tso->stack) + stack_size;
1369
1370 #ifdef PROFILING
1371   tso->prof.CCCS = CCS_MAIN;
1372 #endif
1373
1374   /* put a stop frame on the stack */
1375   tso->sp -= sizeofW(StgStopFrame);
1376   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
1377   tso->su = (StgUpdateFrame*)tso->sp;
1378
1379   IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words", 
1380                            tso->id, tso, tso->stack_size));
1381
1382   // ToDo: check this
1383 #if defined(GRAN)
1384   tso->link = END_TSO_QUEUE;
1385   /* uses more flexible routine in GranSim */
1386   insertThread(tso, CurrentProc);
1387 #else
1388   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1389    * from its creation
1390    */
1391 #endif
1392
1393 #if defined(GRAN) || defined(PAR)
1394   DumpGranEvent(GR_START,tso);
1395 #endif
1396
1397   /* Link the new thread on the global thread list.
1398    */
1399   tso->global_link = all_threads;
1400   all_threads = tso;
1401
1402 #if defined(GRAN)
1403   tso->gran.pri = pri;
1404 # if defined(DEBUG)
1405   tso->gran.magic = TSO_MAGIC; // debugging only
1406 # endif
1407   tso->gran.sparkname   = 0;
1408   tso->gran.startedat   = CURRENT_TIME; 
1409   tso->gran.exported    = 0;
1410   tso->gran.basicblocks = 0;
1411   tso->gran.allocs      = 0;
1412   tso->gran.exectime    = 0;
1413   tso->gran.fetchtime   = 0;
1414   tso->gran.fetchcount  = 0;
1415   tso->gran.blocktime   = 0;
1416   tso->gran.blockcount  = 0;
1417   tso->gran.blockedat   = 0;
1418   tso->gran.globalsparks = 0;
1419   tso->gran.localsparks  = 0;
1420   if (RtsFlags.GranFlags.Light)
1421     tso->gran.clock  = Now; /* local clock */
1422   else
1423     tso->gran.clock  = 0;
1424
1425   IF_DEBUG(gran,printTSO(tso));
1426 #elif defined(PAR)
1427 # if defined(DEBUG)
1428   tso->par.magic = TSO_MAGIC; // debugging only
1429 # endif
1430   tso->par.sparkname   = 0;
1431   tso->par.startedat   = CURRENT_TIME; 
1432   tso->par.exported    = 0;
1433   tso->par.basicblocks = 0;
1434   tso->par.allocs      = 0;
1435   tso->par.exectime    = 0;
1436   tso->par.fetchtime   = 0;
1437   tso->par.fetchcount  = 0;
1438   tso->par.blocktime   = 0;
1439   tso->par.blockcount  = 0;
1440   tso->par.blockedat   = 0;
1441   tso->par.globalsparks = 0;
1442   tso->par.localsparks  = 0;
1443 #endif
1444
1445 #if defined(GRAN)
1446   globalGranStats.tot_threads_created++;
1447   globalGranStats.threads_created_on_PE[CurrentProc]++;
1448   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1449   globalGranStats.tot_sq_probes++;
1450 #endif 
1451
1452 #if defined(GRAN)
1453   IF_GRAN_DEBUG(pri,
1454                 belch("==__ schedule: Created TSO %d (%p);",
1455                       CurrentProc, tso, tso->id));
1456 #elif defined(PAR)
1457     IF_PAR_DEBUG(verbose,
1458                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1459                        tso->id, tso, advisory_thread_count));
1460 #else
1461   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1462                                  tso->id, tso->stack_size));
1463 #endif    
1464   return tso;
1465 }
1466
1467 /*
1468   Turn a spark into a thread.
1469   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1470 */
1471 #if defined(PAR)
1472 //@cindex activateSpark
1473 StgTSO *
1474 activateSpark (rtsSpark spark) 
1475 {
1476   StgTSO *tso;
1477   
1478   ASSERT(spark != (rtsSpark)NULL);
1479   tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1480   if (tso!=END_TSO_QUEUE) {
1481     pushClosure(tso,spark);
1482     PUSH_ON_RUN_QUEUE(tso);
1483     advisory_thread_count++;
1484
1485     if (RtsFlags.ParFlags.ParStats.Full) {
1486       //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1487       IF_PAR_DEBUG(verbose,
1488                    belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1489                          (StgClosure *)spark, info_type((StgClosure *)spark)));
1490     }
1491   } else {
1492     barf("activateSpark: Cannot create TSO");
1493   }
1494   // ToDo: fwd info on local/global spark to thread -- HWL
1495   // tso->gran.exported =  spark->exported;
1496   // tso->gran.locked =   !spark->global;
1497   // tso->gran.sparkname = spark->name;
1498
1499   return tso;
1500 }
1501 #endif
1502
1503 /* ---------------------------------------------------------------------------
1504  * scheduleThread()
1505  *
1506  * scheduleThread puts a thread on the head of the runnable queue.
1507  * This will usually be done immediately after a thread is created.
1508  * The caller of scheduleThread must create the thread using e.g.
1509  * createThread and push an appropriate closure
1510  * on this thread's stack before the scheduler is invoked.
1511  * ------------------------------------------------------------------------ */
1512
1513 void
1514 scheduleThread(StgTSO *tso)
1515 {
1516   if (tso==END_TSO_QUEUE){    
1517     schedule();
1518     return;
1519   }
1520
1521   ACQUIRE_LOCK(&sched_mutex);
1522
1523   /* Put the new thread on the head of the runnable queue.  The caller
1524    * better push an appropriate closure on this thread's stack
1525    * beforehand.  In the SMP case, the thread may start running as
1526    * soon as we release the scheduler lock below.
1527    */
1528   PUSH_ON_RUN_QUEUE(tso);
1529   THREAD_RUNNABLE();
1530
1531   IF_DEBUG(scheduler,printTSO(tso));
1532   RELEASE_LOCK(&sched_mutex);
1533 }
1534
1535 /* ---------------------------------------------------------------------------
1536  * startTasks()
1537  *
1538  * Start up Posix threads to run each of the scheduler tasks.
1539  * I believe the task ids are not needed in the system as defined.
1540  *  KH @ 25/10/99
1541  * ------------------------------------------------------------------------ */
1542
1543 #if defined(PAR) || defined(SMP)
1544 void *
1545 taskStart( void *arg STG_UNUSED )
1546 {
1547   rts_evalNothing(NULL);
1548 }
1549 #endif
1550
1551 /* ---------------------------------------------------------------------------
1552  * initScheduler()
1553  *
1554  * Initialise the scheduler.  This resets all the queues - if the
1555  * queues contained any threads, they'll be garbage collected at the
1556  * next pass.
1557  *
1558  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1559  * ------------------------------------------------------------------------ */
1560
1561 #ifdef SMP
1562 static void
1563 term_handler(int sig STG_UNUSED)
1564 {
1565   stat_workerStop();
1566   ACQUIRE_LOCK(&term_mutex);
1567   await_death--;
1568   RELEASE_LOCK(&term_mutex);
1569   pthread_exit(NULL);
1570 }
1571 #endif
1572
1573 //@cindex initScheduler
1574 void 
1575 initScheduler(void)
1576 {
1577 #if defined(GRAN)
1578   nat i;
1579
1580   for (i=0; i<=MAX_PROC; i++) {
1581     run_queue_hds[i]      = END_TSO_QUEUE;
1582     run_queue_tls[i]      = END_TSO_QUEUE;
1583     blocked_queue_hds[i]  = END_TSO_QUEUE;
1584     blocked_queue_tls[i]  = END_TSO_QUEUE;
1585     ccalling_threadss[i]  = END_TSO_QUEUE;
1586   }
1587 #else
1588   run_queue_hd      = END_TSO_QUEUE;
1589   run_queue_tl      = END_TSO_QUEUE;
1590   blocked_queue_hd  = END_TSO_QUEUE;
1591   blocked_queue_tl  = END_TSO_QUEUE;
1592 #endif 
1593
1594   suspended_ccalling_threads  = END_TSO_QUEUE;
1595
1596   main_threads = NULL;
1597   all_threads  = END_TSO_QUEUE;
1598
1599   context_switch = 0;
1600   interrupted    = 0;
1601
1602   enteredCAFs = END_CAF_LIST;
1603
1604   /* Install the SIGHUP handler */
1605 #ifdef SMP
1606   {
1607     struct sigaction action,oact;
1608
1609     action.sa_handler = term_handler;
1610     sigemptyset(&action.sa_mask);
1611     action.sa_flags = 0;
1612     if (sigaction(SIGTERM, &action, &oact) != 0) {
1613       barf("can't install TERM handler");
1614     }
1615   }
1616 #endif
1617
1618 #ifdef SMP
1619   /* Allocate N Capabilities */
1620   {
1621     nat i;
1622     Capability *cap, *prev;
1623     cap  = NULL;
1624     prev = NULL;
1625     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1626       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1627       cap->link = prev;
1628       prev = cap;
1629     }
1630     free_capabilities = cap;
1631     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1632   }
1633   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1634                              n_free_capabilities););
1635 #endif
1636
1637 #if defined(SMP) || defined(PAR)
1638   initSparkPools();
1639 #endif
1640 }
1641
1642 #ifdef SMP
1643 void
1644 startTasks( void )
1645 {
1646   nat i;
1647   int r;
1648   pthread_t tid;
1649   
1650   /* make some space for saving all the thread ids */
1651   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1652                             "initScheduler:task_ids");
1653   
1654   /* and create all the threads */
1655   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1656     r = pthread_create(&tid,NULL,taskStart,NULL);
1657     if (r != 0) {
1658       barf("startTasks: Can't create new Posix thread");
1659     }
1660     task_ids[i].id = tid;
1661     task_ids[i].mut_time = 0.0;
1662     task_ids[i].mut_etime = 0.0;
1663     task_ids[i].gc_time = 0.0;
1664     task_ids[i].gc_etime = 0.0;
1665     task_ids[i].elapsedtimestart = elapsedtime();
1666     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1667   }
1668 }
1669 #endif
1670
1671 void
1672 exitScheduler( void )
1673 {
1674 #ifdef SMP
1675   nat i;
1676
1677   /* Don't want to use pthread_cancel, since we'd have to install
1678    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1679    * all our locks.
1680    */
1681 #if 0
1682   /* Cancel all our tasks */
1683   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1684     pthread_cancel(task_ids[i].id);
1685   }
1686   
1687   /* Wait for all the tasks to terminate */
1688   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1689     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1690                                task_ids[i].id));
1691     pthread_join(task_ids[i].id, NULL);
1692   }
1693 #endif
1694
1695   /* Send 'em all a SIGHUP.  That should shut 'em up.
1696    */
1697   await_death = RtsFlags.ParFlags.nNodes;
1698   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1699     pthread_kill(task_ids[i].id,SIGTERM);
1700   }
1701   while (await_death > 0) {
1702     sched_yield();
1703   }
1704 #endif
1705 }
1706
1707 /* -----------------------------------------------------------------------------
1708    Managing the per-task allocation areas.
1709    
1710    Each capability comes with an allocation area.  These are
1711    fixed-length block lists into which allocation can be done.
1712
1713    ToDo: no support for two-space collection at the moment???
1714    -------------------------------------------------------------------------- */
1715
1716 /* -----------------------------------------------------------------------------
1717  * waitThread is the external interface for running a new computation
1718  * and waiting for the result.
1719  *
1720  * In the non-SMP case, we create a new main thread, push it on the 
1721  * main-thread stack, and invoke the scheduler to run it.  The
1722  * scheduler will return when the top main thread on the stack has
1723  * completed or died, and fill in the necessary fields of the
1724  * main_thread structure.
1725  *
1726  * In the SMP case, we create a main thread as before, but we then
1727  * create a new condition variable and sleep on it.  When our new
1728  * main thread has completed, we'll be woken up and the status/result
1729  * will be in the main_thread struct.
1730  * -------------------------------------------------------------------------- */
1731
1732 SchedulerStatus
1733 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1734 {
1735   StgMainThread *m;
1736   SchedulerStatus stat;
1737
1738   ACQUIRE_LOCK(&sched_mutex);
1739   
1740   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1741
1742   m->tso = tso;
1743   m->ret = ret;
1744   m->stat = NoStatus;
1745 #ifdef SMP
1746   pthread_cond_init(&m->wakeup, NULL);
1747 #endif
1748
1749   m->link = main_threads;
1750   main_threads = m;
1751
1752   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1753                               m->tso->id));
1754
1755 #ifdef SMP
1756   do {
1757     pthread_cond_wait(&m->wakeup, &sched_mutex);
1758   } while (m->stat == NoStatus);
1759 #elif defined(GRAN)
1760   /* GranSim specific init */
1761   CurrentTSO = m->tso;                // the TSO to run
1762   procStatus[MainProc] = Busy;        // status of main PE
1763   CurrentProc = MainProc;             // PE to run it on
1764
1765   schedule();
1766 #else
1767   schedule();
1768   ASSERT(m->stat != NoStatus);
1769 #endif
1770
1771   stat = m->stat;
1772
1773 #ifdef SMP
1774   pthread_cond_destroy(&m->wakeup);
1775 #endif
1776
1777   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1778                               m->tso->id));
1779   free(m);
1780
1781   RELEASE_LOCK(&sched_mutex);
1782
1783   return stat;
1784 }
1785
1786 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1787 //@subsection Run queue code 
1788
1789 #if 0
1790 /* 
1791    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1792        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1793        implicit global variable that has to be correct when calling these
1794        fcts -- HWL 
1795 */
1796
1797 /* Put the new thread on the head of the runnable queue.
1798  * The caller of createThread better push an appropriate closure
1799  * on this thread's stack before the scheduler is invoked.
1800  */
1801 static /* inline */ void
1802 add_to_run_queue(tso)
1803 StgTSO* tso; 
1804 {
1805   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1806   tso->link = run_queue_hd;
1807   run_queue_hd = tso;
1808   if (run_queue_tl == END_TSO_QUEUE) {
1809     run_queue_tl = tso;
1810   }
1811 }
1812
1813 /* Put the new thread at the end of the runnable queue. */
1814 static /* inline */ void
1815 push_on_run_queue(tso)
1816 StgTSO* tso; 
1817 {
1818   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1819   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1820   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1821   if (run_queue_hd == END_TSO_QUEUE) {
1822     run_queue_hd = tso;
1823   } else {
1824     run_queue_tl->link = tso;
1825   }
1826   run_queue_tl = tso;
1827 }
1828
1829 /* 
1830    Should be inlined because it's used very often in schedule.  The tso
1831    argument is actually only needed in GranSim, where we want to have the
1832    possibility to schedule *any* TSO on the run queue, irrespective of the
1833    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1834    the run queue and dequeue the tso, adjusting the links in the queue. 
1835 */
1836 //@cindex take_off_run_queue
1837 static /* inline */ StgTSO*
1838 take_off_run_queue(StgTSO *tso) {
1839   StgTSO *t, *prev;
1840
1841   /* 
1842      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1843
1844      if tso is specified, unlink that tso from the run_queue (doesn't have
1845      to be at the beginning of the queue); GranSim only 
1846   */
1847   if (tso!=END_TSO_QUEUE) {
1848     /* find tso in queue */
1849     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1850          t!=END_TSO_QUEUE && t!=tso;
1851          prev=t, t=t->link) 
1852       /* nothing */ ;
1853     ASSERT(t==tso);
1854     /* now actually dequeue the tso */
1855     if (prev!=END_TSO_QUEUE) {
1856       ASSERT(run_queue_hd!=t);
1857       prev->link = t->link;
1858     } else {
1859       /* t is at beginning of thread queue */
1860       ASSERT(run_queue_hd==t);
1861       run_queue_hd = t->link;
1862     }
1863     /* t is at end of thread queue */
1864     if (t->link==END_TSO_QUEUE) {
1865       ASSERT(t==run_queue_tl);
1866       run_queue_tl = prev;
1867     } else {
1868       ASSERT(run_queue_tl!=t);
1869     }
1870     t->link = END_TSO_QUEUE;
1871   } else {
1872     /* take tso from the beginning of the queue; std concurrent code */
1873     t = run_queue_hd;
1874     if (t != END_TSO_QUEUE) {
1875       run_queue_hd = t->link;
1876       t->link = END_TSO_QUEUE;
1877       if (run_queue_hd == END_TSO_QUEUE) {
1878         run_queue_tl = END_TSO_QUEUE;
1879       }
1880     }
1881   }
1882   return t;
1883 }
1884
1885 #endif /* 0 */
1886
1887 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1888 //@subsection Garbage Collextion Routines
1889
1890 /* ---------------------------------------------------------------------------
1891    Where are the roots that we know about?
1892
1893         - all the threads on the runnable queue
1894         - all the threads on the blocked queue
1895         - all the thread currently executing a _ccall_GC
1896         - all the "main threads"
1897      
1898    ------------------------------------------------------------------------ */
1899
1900 /* This has to be protected either by the scheduler monitor, or by the
1901         garbage collection monitor (probably the latter).
1902         KH @ 25/10/99
1903 */
1904
1905 static void GetRoots(void)
1906 {
1907   StgMainThread *m;
1908
1909 #if defined(GRAN)
1910   {
1911     nat i;
1912     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1913       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1914         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1915       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1916         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1917       
1918       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1919         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1920       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1921         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1922       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1923         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1924     }
1925   }
1926
1927   markEventQueue();
1928
1929 #else /* !GRAN */
1930   if (run_queue_hd != END_TSO_QUEUE) {
1931     ASSERT(run_queue_tl != END_TSO_QUEUE);
1932     run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1933     run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1934   }
1935
1936   if (blocked_queue_hd != END_TSO_QUEUE) {
1937     ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1938     blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1939     blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1940   }
1941 #endif 
1942
1943   for (m = main_threads; m != NULL; m = m->link) {
1944     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1945   }
1946   if (suspended_ccalling_threads != END_TSO_QUEUE)
1947     suspended_ccalling_threads = 
1948       (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1949
1950 #if defined(SMP) || defined(PAR) || defined(GRAN)
1951   markSparkQueue();
1952 #endif
1953 }
1954
1955 /* -----------------------------------------------------------------------------
1956    performGC
1957
1958    This is the interface to the garbage collector from Haskell land.
1959    We provide this so that external C code can allocate and garbage
1960    collect when called from Haskell via _ccall_GC.
1961
1962    It might be useful to provide an interface whereby the programmer
1963    can specify more roots (ToDo).
1964    
1965    This needs to be protected by the GC condition variable above.  KH.
1966    -------------------------------------------------------------------------- */
1967
1968 void (*extra_roots)(void);
1969
1970 void
1971 performGC(void)
1972 {
1973   GarbageCollect(GetRoots);
1974 }
1975
1976 static void
1977 AllRoots(void)
1978 {
1979   GetRoots();                   /* the scheduler's roots */
1980   extra_roots();                /* the user's roots */
1981 }
1982
1983 void
1984 performGCWithRoots(void (*get_roots)(void))
1985 {
1986   extra_roots = get_roots;
1987
1988   GarbageCollect(AllRoots);
1989 }
1990
1991 /* -----------------------------------------------------------------------------
1992    Stack overflow
1993
1994    If the thread has reached its maximum stack size, then raise the
1995    StackOverflow exception in the offending thread.  Otherwise
1996    relocate the TSO into a larger chunk of memory and adjust its stack
1997    size appropriately.
1998    -------------------------------------------------------------------------- */
1999
2000 static StgTSO *
2001 threadStackOverflow(StgTSO *tso)
2002 {
2003   nat new_stack_size, new_tso_size, diff, stack_words;
2004   StgPtr new_sp;
2005   StgTSO *dest;
2006
2007   IF_DEBUG(sanity,checkTSO(tso));
2008   if (tso->stack_size >= tso->max_stack_size) {
2009
2010     IF_DEBUG(gc,
2011              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2012                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2013              /* If we're debugging, just print out the top of the stack */
2014              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2015                                               tso->sp+64)));
2016
2017 #ifdef INTERPRETER
2018     fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
2019     exit(1);
2020 #else
2021     /* Send this thread the StackOverflow exception */
2022     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2023 #endif
2024     return tso;
2025   }
2026
2027   /* Try to double the current stack size.  If that takes us over the
2028    * maximum stack size for this thread, then use the maximum instead.
2029    * Finally round up so the TSO ends up as a whole number of blocks.
2030    */
2031   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2032   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2033                                        TSO_STRUCT_SIZE)/sizeof(W_);
2034   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2035   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2036
2037   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2038
2039   dest = (StgTSO *)allocate(new_tso_size);
2040   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2041
2042   /* copy the TSO block and the old stack into the new area */
2043   memcpy(dest,tso,TSO_STRUCT_SIZE);
2044   stack_words = tso->stack + tso->stack_size - tso->sp;
2045   new_sp = (P_)dest + new_tso_size - stack_words;
2046   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2047
2048   /* relocate the stack pointers... */
2049   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2050   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2051   dest->sp    = new_sp;
2052   dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2053   dest->stack_size = new_stack_size;
2054         
2055   /* and relocate the update frame list */
2056   relocate_TSO(tso, dest);
2057
2058   /* Mark the old TSO as relocated.  We have to check for relocated
2059    * TSOs in the garbage collector and any primops that deal with TSOs.
2060    *
2061    * It's important to set the sp and su values to just beyond the end
2062    * of the stack, so we don't attempt to scavenge any part of the
2063    * dead TSO's stack.
2064    */
2065   tso->what_next = ThreadRelocated;
2066   tso->link = dest;
2067   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2068   tso->su = (StgUpdateFrame *)tso->sp;
2069   tso->why_blocked = NotBlocked;
2070   dest->mut_link = NULL;
2071
2072   IF_PAR_DEBUG(verbose,
2073                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2074                      tso->id, tso, tso->stack_size);
2075                /* If we're debugging, just print out the top of the stack */
2076                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2077                                                 tso->sp+64)));
2078   
2079   IF_DEBUG(sanity,checkTSO(tso));
2080 #if 0
2081   IF_DEBUG(scheduler,printTSO(dest));
2082 #endif
2083
2084   return dest;
2085 }
2086
2087 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2088 //@subsection Blocking Queue Routines
2089
2090 /* ---------------------------------------------------------------------------
2091    Wake up a queue that was blocked on some resource.
2092    ------------------------------------------------------------------------ */
2093
2094 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
2095
2096 #if defined(GRAN)
2097 static inline void
2098 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2099 {
2100 }
2101 #elif defined(PAR)
2102 static inline void
2103 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2104 {
2105   /* write RESUME events to log file and
2106      update blocked and fetch time (depending on type of the orig closure) */
2107   if (RtsFlags.ParFlags.ParStats.Full) {
2108     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2109                      GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2110                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2111
2112     switch (get_itbl(node)->type) {
2113         case FETCH_ME_BQ:
2114           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2115           break;
2116         case RBH:
2117         case FETCH_ME:
2118         case BLACKHOLE_BQ:
2119           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2120           break;
2121         default:
2122           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2123         }
2124       }
2125 }
2126 #endif
2127
2128 #if defined(GRAN)
2129 static StgBlockingQueueElement *
2130 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2131 {
2132     StgTSO *tso;
2133     PEs node_loc, tso_loc;
2134
2135     node_loc = where_is(node); // should be lifted out of loop
2136     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2137     tso_loc = where_is((StgClosure *)tso);
2138     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2139       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2140       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2141       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2142       // insertThread(tso, node_loc);
2143       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2144                 ResumeThread,
2145                 tso, node, (rtsSpark*)NULL);
2146       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2147       // len_local++;
2148       // len++;
2149     } else { // TSO is remote (actually should be FMBQ)
2150       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2151                                   RtsFlags.GranFlags.Costs.gunblocktime +
2152                                   RtsFlags.GranFlags.Costs.latency;
2153       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2154                 UnblockThread,
2155                 tso, node, (rtsSpark*)NULL);
2156       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2157       // len++;
2158     }
2159     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2160     IF_GRAN_DEBUG(bq,
2161                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2162                           (node_loc==tso_loc ? "Local" : "Global"), 
2163                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2164     tso->block_info.closure = NULL;
2165     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2166                              tso->id, tso));
2167 }
2168 #elif defined(PAR)
2169 static StgBlockingQueueElement *
2170 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2171 {
2172     StgBlockingQueueElement *next;
2173
2174     switch (get_itbl(bqe)->type) {
2175     case TSO:
2176       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2177       /* if it's a TSO just push it onto the run_queue */
2178       next = bqe->link;
2179       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2180       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2181       THREAD_RUNNABLE();
2182       unblockCount(bqe, node);
2183       /* reset blocking status after dumping event */
2184       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2185       break;
2186
2187     case BLOCKED_FETCH:
2188       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2189       next = bqe->link;
2190       bqe->link = PendingFetches;
2191       PendingFetches = bqe;
2192       break;
2193
2194 # if defined(DEBUG)
2195       /* can ignore this case in a non-debugging setup; 
2196          see comments on RBHSave closures above */
2197     case CONSTR:
2198       /* check that the closure is an RBHSave closure */
2199       ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2200              get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2201              get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2202       break;
2203
2204     default:
2205       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2206            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2207            (StgClosure *)bqe);
2208 # endif
2209     }
2210   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2211   return next;
2212 }
2213
2214 #else /* !GRAN && !PAR */
2215 static StgTSO *
2216 unblockOneLocked(StgTSO *tso)
2217 {
2218   StgTSO *next;
2219
2220   ASSERT(get_itbl(tso)->type == TSO);
2221   ASSERT(tso->why_blocked != NotBlocked);
2222   tso->why_blocked = NotBlocked;
2223   next = tso->link;
2224   PUSH_ON_RUN_QUEUE(tso);
2225   THREAD_RUNNABLE();
2226   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2227   return next;
2228 }
2229 #endif
2230
2231 #if defined(GRAN) || defined(PAR)
2232 inline StgBlockingQueueElement *
2233 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2234 {
2235   ACQUIRE_LOCK(&sched_mutex);
2236   bqe = unblockOneLocked(bqe, node);
2237   RELEASE_LOCK(&sched_mutex);
2238   return bqe;
2239 }
2240 #else
2241 inline StgTSO *
2242 unblockOne(StgTSO *tso)
2243 {
2244   ACQUIRE_LOCK(&sched_mutex);
2245   tso = unblockOneLocked(tso);
2246   RELEASE_LOCK(&sched_mutex);
2247   return tso;
2248 }
2249 #endif
2250
2251 #if defined(GRAN)
2252 void 
2253 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2254 {
2255   StgBlockingQueueElement *bqe;
2256   PEs node_loc;
2257   nat len = 0; 
2258
2259   IF_GRAN_DEBUG(bq, 
2260                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2261                       node, CurrentProc, CurrentTime[CurrentProc], 
2262                       CurrentTSO->id, CurrentTSO));
2263
2264   node_loc = where_is(node);
2265
2266   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2267          get_itbl(q)->type == CONSTR); // closure (type constructor)
2268   ASSERT(is_unique(node));
2269
2270   /* FAKE FETCH: magically copy the node to the tso's proc;
2271      no Fetch necessary because in reality the node should not have been 
2272      moved to the other PE in the first place
2273   */
2274   if (CurrentProc!=node_loc) {
2275     IF_GRAN_DEBUG(bq, 
2276                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2277                         node, node_loc, CurrentProc, CurrentTSO->id, 
2278                         // CurrentTSO, where_is(CurrentTSO),
2279                         node->header.gran.procs));
2280     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2281     IF_GRAN_DEBUG(bq, 
2282                   belch("## new bitmask of node %p is %#x",
2283                         node, node->header.gran.procs));
2284     if (RtsFlags.GranFlags.GranSimStats.Global) {
2285       globalGranStats.tot_fake_fetches++;
2286     }
2287   }
2288
2289   bqe = q;
2290   // ToDo: check: ASSERT(CurrentProc==node_loc);
2291   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2292     //next = bqe->link;
2293     /* 
2294        bqe points to the current element in the queue
2295        next points to the next element in the queue
2296     */
2297     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2298     //tso_loc = where_is(tso);
2299     len++;
2300     bqe = unblockOneLocked(bqe, node);
2301   }
2302
2303   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2304      the closure to make room for the anchor of the BQ */
2305   if (bqe!=END_BQ_QUEUE) {
2306     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2307     /*
2308     ASSERT((info_ptr==&RBH_Save_0_info) ||
2309            (info_ptr==&RBH_Save_1_info) ||
2310            (info_ptr==&RBH_Save_2_info));
2311     */
2312     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2313     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2314     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2315
2316     IF_GRAN_DEBUG(bq,
2317                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2318                         node, info_type(node)));
2319   }
2320
2321   /* statistics gathering */
2322   if (RtsFlags.GranFlags.GranSimStats.Global) {
2323     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2324     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2325     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2326     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2327   }
2328   IF_GRAN_DEBUG(bq,
2329                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2330                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2331 }
2332 #elif defined(PAR)
2333 void 
2334 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2335 {
2336   StgBlockingQueueElement *bqe, *next;
2337
2338   ACQUIRE_LOCK(&sched_mutex);
2339
2340   IF_PAR_DEBUG(verbose, 
2341                belch("## AwBQ for node %p on [%x]: ",
2342                      node, mytid));
2343
2344   ASSERT(get_itbl(q)->type == TSO ||           
2345          get_itbl(q)->type == BLOCKED_FETCH || 
2346          get_itbl(q)->type == CONSTR); 
2347
2348   bqe = q;
2349   while (get_itbl(bqe)->type==TSO || 
2350          get_itbl(bqe)->type==BLOCKED_FETCH) {
2351     bqe = unblockOneLocked(bqe, node);
2352   }
2353   RELEASE_LOCK(&sched_mutex);
2354 }
2355
2356 #else   /* !GRAN && !PAR */
2357 void
2358 awakenBlockedQueue(StgTSO *tso)
2359 {
2360   ACQUIRE_LOCK(&sched_mutex);
2361   while (tso != END_TSO_QUEUE) {
2362     tso = unblockOneLocked(tso);
2363   }
2364   RELEASE_LOCK(&sched_mutex);
2365 }
2366 #endif
2367
2368 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2369 //@subsection Exception Handling Routines
2370
2371 /* ---------------------------------------------------------------------------
2372    Interrupt execution
2373    - usually called inside a signal handler so it mustn't do anything fancy.   
2374    ------------------------------------------------------------------------ */
2375
2376 void
2377 interruptStgRts(void)
2378 {
2379     interrupted    = 1;
2380     context_switch = 1;
2381 }
2382
2383 /* -----------------------------------------------------------------------------
2384    Unblock a thread
2385
2386    This is for use when we raise an exception in another thread, which
2387    may be blocked.
2388    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2389    -------------------------------------------------------------------------- */
2390
2391 #if defined(GRAN) || defined(PAR)
2392 /*
2393   NB: only the type of the blocking queue is different in GranSim and GUM
2394       the operations on the queue-elements are the same
2395       long live polymorphism!
2396 */
2397 static void
2398 unblockThread(StgTSO *tso)
2399 {
2400   StgBlockingQueueElement *t, **last;
2401
2402   ACQUIRE_LOCK(&sched_mutex);
2403   switch (tso->why_blocked) {
2404
2405   case NotBlocked:
2406     return;  /* not blocked */
2407
2408   case BlockedOnMVar:
2409     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2410     {
2411       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2412       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2413
2414       last = (StgBlockingQueueElement **)&mvar->head;
2415       for (t = (StgBlockingQueueElement *)mvar->head; 
2416            t != END_BQ_QUEUE; 
2417            last = &t->link, last_tso = t, t = t->link) {
2418         if (t == (StgBlockingQueueElement *)tso) {
2419           *last = (StgBlockingQueueElement *)tso->link;
2420           if (mvar->tail == tso) {
2421             mvar->tail = (StgTSO *)last_tso;
2422           }
2423           goto done;
2424         }
2425       }
2426       barf("unblockThread (MVAR): TSO not found");
2427     }
2428
2429   case BlockedOnBlackHole:
2430     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2431     {
2432       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2433
2434       last = &bq->blocking_queue;
2435       for (t = bq->blocking_queue; 
2436            t != END_BQ_QUEUE; 
2437            last = &t->link, t = t->link) {
2438         if (t == (StgBlockingQueueElement *)tso) {
2439           *last = (StgBlockingQueueElement *)tso->link;
2440           goto done;
2441         }
2442       }
2443       barf("unblockThread (BLACKHOLE): TSO not found");
2444     }
2445
2446   case BlockedOnException:
2447     {
2448       StgTSO *target  = tso->block_info.tso;
2449
2450       ASSERT(get_itbl(target)->type == TSO);
2451       ASSERT(target->blocked_exceptions != NULL);
2452
2453       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2454       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2455            t != END_BQ_QUEUE; 
2456            last = &t->link, t = t->link) {
2457         ASSERT(get_itbl(t)->type == TSO);
2458         if (t == (StgBlockingQueueElement *)tso) {
2459           *last = (StgBlockingQueueElement *)tso->link;
2460           goto done;
2461         }
2462       }
2463       barf("unblockThread (Exception): TSO not found");
2464     }
2465
2466   case BlockedOnDelay:
2467   case BlockedOnRead:
2468   case BlockedOnWrite:
2469     {
2470       StgBlockingQueueElement *prev = NULL;
2471       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2472            prev = t, t = t->link) {
2473         if (t == (StgBlockingQueueElement *)tso) {
2474           if (prev == NULL) {
2475             blocked_queue_hd = (StgTSO *)t->link;
2476             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2477               blocked_queue_tl = END_TSO_QUEUE;
2478             }
2479           } else {
2480             prev->link = t->link;
2481             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2482               blocked_queue_tl = (StgTSO *)prev;
2483             }
2484           }
2485           goto done;
2486         }
2487       }
2488       barf("unblockThread (I/O): TSO not found");
2489     }
2490
2491   default:
2492     barf("unblockThread");
2493   }
2494
2495  done:
2496   tso->link = END_TSO_QUEUE;
2497   tso->why_blocked = NotBlocked;
2498   tso->block_info.closure = NULL;
2499   PUSH_ON_RUN_QUEUE(tso);
2500   RELEASE_LOCK(&sched_mutex);
2501 }
2502 #else
2503 static void
2504 unblockThread(StgTSO *tso)
2505 {
2506   StgTSO *t, **last;
2507
2508   ACQUIRE_LOCK(&sched_mutex);
2509   switch (tso->why_blocked) {
2510
2511   case NotBlocked:
2512     return;  /* not blocked */
2513
2514   case BlockedOnMVar:
2515     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2516     {
2517       StgTSO *last_tso = END_TSO_QUEUE;
2518       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2519
2520       last = &mvar->head;
2521       for (t = mvar->head; t != END_TSO_QUEUE; 
2522            last = &t->link, last_tso = t, t = t->link) {
2523         if (t == tso) {
2524           *last = tso->link;
2525           if (mvar->tail == tso) {
2526             mvar->tail = last_tso;
2527           }
2528           goto done;
2529         }
2530       }
2531       barf("unblockThread (MVAR): TSO not found");
2532     }
2533
2534   case BlockedOnBlackHole:
2535     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2536     {
2537       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2538
2539       last = &bq->blocking_queue;
2540       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2541            last = &t->link, t = t->link) {
2542         if (t == tso) {
2543           *last = tso->link;
2544           goto done;
2545         }
2546       }
2547       barf("unblockThread (BLACKHOLE): TSO not found");
2548     }
2549
2550   case BlockedOnException:
2551     {
2552       StgTSO *target  = tso->block_info.tso;
2553
2554       ASSERT(get_itbl(target)->type == TSO);
2555       ASSERT(target->blocked_exceptions != NULL);
2556
2557       last = &target->blocked_exceptions;
2558       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2559            last = &t->link, t = t->link) {
2560         ASSERT(get_itbl(t)->type == TSO);
2561         if (t == tso) {
2562           *last = tso->link;
2563           goto done;
2564         }
2565       }
2566       barf("unblockThread (Exception): TSO not found");
2567     }
2568
2569   case BlockedOnDelay:
2570   case BlockedOnRead:
2571   case BlockedOnWrite:
2572     {
2573       StgTSO *prev = NULL;
2574       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2575            prev = t, t = t->link) {
2576         if (t == tso) {
2577           if (prev == NULL) {
2578             blocked_queue_hd = t->link;
2579             if (blocked_queue_tl == t) {
2580               blocked_queue_tl = END_TSO_QUEUE;
2581             }
2582           } else {
2583             prev->link = t->link;
2584             if (blocked_queue_tl == t) {
2585               blocked_queue_tl = prev;
2586             }
2587           }
2588           goto done;
2589         }
2590       }
2591       barf("unblockThread (I/O): TSO not found");
2592     }
2593
2594   default:
2595     barf("unblockThread");
2596   }
2597
2598  done:
2599   tso->link = END_TSO_QUEUE;
2600   tso->why_blocked = NotBlocked;
2601   tso->block_info.closure = NULL;
2602   PUSH_ON_RUN_QUEUE(tso);
2603   RELEASE_LOCK(&sched_mutex);
2604 }
2605 #endif
2606
2607 /* -----------------------------------------------------------------------------
2608  * raiseAsync()
2609  *
2610  * The following function implements the magic for raising an
2611  * asynchronous exception in an existing thread.
2612  *
2613  * We first remove the thread from any queue on which it might be
2614  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2615  *
2616  * We strip the stack down to the innermost CATCH_FRAME, building
2617  * thunks in the heap for all the active computations, so they can 
2618  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2619  * an application of the handler to the exception, and push it on
2620  * the top of the stack.
2621  * 
2622  * How exactly do we save all the active computations?  We create an
2623  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2624  * AP_UPDs pushes everything from the corresponding update frame
2625  * upwards onto the stack.  (Actually, it pushes everything up to the
2626  * next update frame plus a pointer to the next AP_UPD object.
2627  * Entering the next AP_UPD object pushes more onto the stack until we
2628  * reach the last AP_UPD object - at which point the stack should look
2629  * exactly as it did when we killed the TSO and we can continue
2630  * execution by entering the closure on top of the stack.
2631  *
2632  * We can also kill a thread entirely - this happens if either (a) the 
2633  * exception passed to raiseAsync is NULL, or (b) there's no
2634  * CATCH_FRAME on the stack.  In either case, we strip the entire
2635  * stack and replace the thread with a zombie.
2636  *
2637  * -------------------------------------------------------------------------- */
2638  
2639 void 
2640 deleteThread(StgTSO *tso)
2641 {
2642   raiseAsync(tso,NULL);
2643 }
2644
2645 void
2646 raiseAsync(StgTSO *tso, StgClosure *exception)
2647 {
2648   StgUpdateFrame* su = tso->su;
2649   StgPtr          sp = tso->sp;
2650   
2651   /* Thread already dead? */
2652   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2653     return;
2654   }
2655
2656   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2657
2658   /* Remove it from any blocking queues */
2659   unblockThread(tso);
2660
2661   /* The stack freezing code assumes there's a closure pointer on
2662    * the top of the stack.  This isn't always the case with compiled
2663    * code, so we have to push a dummy closure on the top which just
2664    * returns to the next return address on the stack.
2665    */
2666   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2667     *(--sp) = (W_)&dummy_ret_closure;
2668   }
2669
2670   while (1) {
2671     int words = ((P_)su - (P_)sp) - 1;
2672     nat i;
2673     StgAP_UPD * ap;
2674
2675     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2676      * then build PAP(handler,exception,realworld#), and leave it on
2677      * top of the stack ready to enter.
2678      */
2679     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2680       StgCatchFrame *cf = (StgCatchFrame *)su;
2681       /* we've got an exception to raise, so let's pass it to the
2682        * handler in this frame.
2683        */
2684       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2685       TICK_ALLOC_UPD_PAP(3,0);
2686       SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2687               
2688       ap->n_args = 2;
2689       ap->fun = cf->handler;    /* :: Exception -> IO a */
2690       ap->payload[0] = (P_)exception;
2691       ap->payload[1] = ARG_TAG(0); /* realworld token */
2692
2693       /* throw away the stack from Sp up to and including the
2694        * CATCH_FRAME.
2695        */
2696       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2697       tso->su = cf->link;
2698
2699       /* Restore the blocked/unblocked state for asynchronous exceptions
2700        * at the CATCH_FRAME.  
2701        *
2702        * If exceptions were unblocked at the catch, arrange that they
2703        * are unblocked again after executing the handler by pushing an
2704        * unblockAsyncExceptions_ret stack frame.
2705        */
2706       if (!cf->exceptions_blocked) {
2707         *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2708       }
2709       
2710       /* Ensure that async exceptions are blocked when running the handler.
2711        */
2712       if (tso->blocked_exceptions == NULL) {
2713         tso->blocked_exceptions = END_TSO_QUEUE;
2714       }
2715       
2716       /* Put the newly-built PAP on top of the stack, ready to execute
2717        * when the thread restarts.
2718        */
2719       sp[0] = (W_)ap;
2720       tso->sp = sp;
2721       tso->what_next = ThreadEnterGHC;
2722       return;
2723     }
2724
2725     /* First build an AP_UPD consisting of the stack chunk above the
2726      * current update frame, with the top word on the stack as the
2727      * fun field.
2728      */
2729     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2730     
2731     ASSERT(words >= 0);
2732     
2733     ap->n_args = words;
2734     ap->fun    = (StgClosure *)sp[0];
2735     sp++;
2736     for(i=0; i < (nat)words; ++i) {
2737       ap->payload[i] = (P_)*sp++;
2738     }
2739     
2740     switch (get_itbl(su)->type) {
2741       
2742     case UPDATE_FRAME:
2743       {
2744         SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2745         TICK_ALLOC_UP_THK(words+1,0);
2746         
2747         IF_DEBUG(scheduler,
2748                  fprintf(stderr,  "scheduler: Updating ");
2749                  printPtr((P_)su->updatee); 
2750                  fprintf(stderr,  " with ");
2751                  printObj((StgClosure *)ap);
2752                  );
2753         
2754         /* Replace the updatee with an indirection - happily
2755          * this will also wake up any threads currently
2756          * waiting on the result.
2757          */
2758         UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2759         su = su->link;
2760         sp += sizeofW(StgUpdateFrame) -1;
2761         sp[0] = (W_)ap; /* push onto stack */
2762         break;
2763       }
2764       
2765     case CATCH_FRAME:
2766       {
2767         StgCatchFrame *cf = (StgCatchFrame *)su;
2768         StgClosure* o;
2769         
2770         /* We want a PAP, not an AP_UPD.  Fortunately, the
2771          * layout's the same.
2772          */
2773         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2774         TICK_ALLOC_UPD_PAP(words+1,0);
2775         
2776         /* now build o = FUN(catch,ap,handler) */
2777         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2778         TICK_ALLOC_FUN(2,0);
2779         SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2780         o->payload[0] = (StgClosure *)ap;
2781         o->payload[1] = cf->handler;
2782         
2783         IF_DEBUG(scheduler,
2784                  fprintf(stderr,  "scheduler: Built ");
2785                  printObj((StgClosure *)o);
2786                  );
2787         
2788         /* pop the old handler and put o on the stack */
2789         su = cf->link;
2790         sp += sizeofW(StgCatchFrame) - 1;
2791         sp[0] = (W_)o;
2792         break;
2793       }
2794       
2795     case SEQ_FRAME:
2796       {
2797         StgSeqFrame *sf = (StgSeqFrame *)su;
2798         StgClosure* o;
2799         
2800         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2801         TICK_ALLOC_UPD_PAP(words+1,0);
2802         
2803         /* now build o = FUN(seq,ap) */
2804         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2805         TICK_ALLOC_SE_THK(1,0);
2806         SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2807         o->payload[0] = (StgClosure *)ap;
2808         
2809         IF_DEBUG(scheduler,
2810                  fprintf(stderr,  "scheduler: Built ");
2811                  printObj((StgClosure *)o);
2812                  );
2813         
2814         /* pop the old handler and put o on the stack */
2815         su = sf->link;
2816         sp += sizeofW(StgSeqFrame) - 1;
2817         sp[0] = (W_)o;
2818         break;
2819       }
2820       
2821     case STOP_FRAME:
2822       /* We've stripped the entire stack, the thread is now dead. */
2823       sp += sizeofW(StgStopFrame) - 1;
2824       sp[0] = (W_)exception;    /* save the exception */
2825       tso->what_next = ThreadKilled;
2826       tso->su = (StgUpdateFrame *)(sp+1);
2827       tso->sp = sp;
2828       return;
2829       
2830     default:
2831       barf("raiseAsync");
2832     }
2833   }
2834   barf("raiseAsync");
2835 }
2836
2837 /* -----------------------------------------------------------------------------
2838    resurrectThreads is called after garbage collection on the list of
2839    threads found to be garbage.  Each of these threads will be woken
2840    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2841    on an MVar, or NonTermination if the thread was blocked on a Black
2842    Hole.
2843    -------------------------------------------------------------------------- */
2844
2845 void
2846 resurrectThreads( StgTSO *threads )
2847 {
2848   StgTSO *tso, *next;
2849
2850   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2851     next = tso->global_link;
2852     tso->global_link = all_threads;
2853     all_threads = tso;
2854     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2855
2856     switch (tso->why_blocked) {
2857     case BlockedOnMVar:
2858     case BlockedOnException:
2859       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2860       break;
2861     case BlockedOnBlackHole:
2862       raiseAsync(tso,(StgClosure *)NonTermination_closure);
2863       break;
2864     case NotBlocked:
2865       /* This might happen if the thread was blocked on a black hole
2866        * belonging to a thread that we've just woken up (raiseAsync
2867        * can wake up threads, remember...).
2868        */
2869       continue;
2870     default:
2871       barf("resurrectThreads: thread blocked in a strange way");
2872     }
2873   }
2874 }
2875
2876 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2877 //@subsection Debugging Routines
2878
2879 /* -----------------------------------------------------------------------------
2880    Debugging: why is a thread blocked
2881    -------------------------------------------------------------------------- */
2882
2883 #ifdef DEBUG
2884
2885 void
2886 printThreadBlockage(StgTSO *tso)
2887 {
2888   switch (tso->why_blocked) {
2889   case BlockedOnRead:
2890     fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2891     break;
2892   case BlockedOnWrite:
2893     fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2894     break;
2895   case BlockedOnDelay:
2896 #if defined(HAVE_SETITIMER)
2897     fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2898 #else
2899     fprintf(stderr,"blocked on delay of %d ms", 
2900             tso->block_info.target - getourtimeofday());
2901 #endif
2902     break;
2903   case BlockedOnMVar:
2904     fprintf(stderr,"blocked on an MVar");
2905     break;
2906   case BlockedOnException:
2907     fprintf(stderr,"blocked on delivering an exception to thread %d",
2908             tso->block_info.tso->id);
2909     break;
2910   case BlockedOnBlackHole:
2911     fprintf(stderr,"blocked on a black hole");
2912     break;
2913   case NotBlocked:
2914     fprintf(stderr,"not blocked");
2915     break;
2916 #if defined(PAR)
2917   case BlockedOnGA:
2918     fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)",
2919             tso->block_info.closure, info_type(tso->block_info.closure));
2920     break;
2921   case BlockedOnGA_NoSend:
2922     fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)",
2923             tso->block_info.closure, info_type(tso->block_info.closure));
2924     break;
2925 #endif
2926   default:
2927     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
2928          tso->why_blocked, tso->id, tso);
2929   }
2930 }
2931
2932 void
2933 printThreadStatus(StgTSO *tso)
2934 {
2935   switch (tso->what_next) {
2936   case ThreadKilled:
2937     fprintf(stderr,"has been killed");
2938     break;
2939   case ThreadComplete:
2940     fprintf(stderr,"has completed");
2941     break;
2942   default:
2943     printThreadBlockage(tso);
2944   }
2945 }
2946
2947 void
2948 printAllThreads(void)
2949 {
2950   StgTSO *t;
2951
2952   sched_belch("all threads:");
2953   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2954     fprintf(stderr, "\tthread %d is ", t->id);
2955     printThreadStatus(t);
2956     fprintf(stderr,"\n");
2957   }
2958 }
2959     
2960 /* 
2961    Print a whole blocking queue attached to node (debugging only).
2962 */
2963 //@cindex print_bq
2964 # if defined(PAR)
2965 void 
2966 print_bq (StgClosure *node)
2967 {
2968   StgBlockingQueueElement *bqe;
2969   StgTSO *tso;
2970   rtsBool end;
2971
2972   fprintf(stderr,"## BQ of closure %p (%s): ",
2973           node, info_type(node));
2974
2975   /* should cover all closures that may have a blocking queue */
2976   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2977          get_itbl(node)->type == FETCH_ME_BQ ||
2978          get_itbl(node)->type == RBH);
2979     
2980   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2981   /* 
2982      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2983   */
2984   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2985        !end; // iterate until bqe points to a CONSTR
2986        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2987     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2988     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2989     /* types of closures that may appear in a blocking queue */
2990     ASSERT(get_itbl(bqe)->type == TSO ||           
2991            get_itbl(bqe)->type == BLOCKED_FETCH || 
2992            get_itbl(bqe)->type == CONSTR); 
2993     /* only BQs of an RBH end with an RBH_Save closure */
2994     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2995
2996     switch (get_itbl(bqe)->type) {
2997     case TSO:
2998       fprintf(stderr," TSO %d (%x),",
2999               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3000       break;
3001     case BLOCKED_FETCH:
3002       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3003               ((StgBlockedFetch *)bqe)->node, 
3004               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3005               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3006               ((StgBlockedFetch *)bqe)->ga.weight);
3007       break;
3008     case CONSTR:
3009       fprintf(stderr," %s (IP %p),",
3010               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3011                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3012                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3013                "RBH_Save_?"), get_itbl(bqe));
3014       break;
3015     default:
3016       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3017            info_type(bqe), node, info_type(node));
3018       break;
3019     }
3020   } /* for */
3021   fputc('\n', stderr);
3022 }
3023 # elif defined(GRAN)
3024 void 
3025 print_bq (StgClosure *node)
3026 {
3027   StgBlockingQueueElement *bqe;
3028   PEs node_loc, tso_loc;
3029   rtsBool end;
3030
3031   /* should cover all closures that may have a blocking queue */
3032   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3033          get_itbl(node)->type == FETCH_ME_BQ ||
3034          get_itbl(node)->type == RBH);
3035     
3036   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3037   node_loc = where_is(node);
3038
3039   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3040           node, info_type(node), node_loc);
3041
3042   /* 
3043      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3044   */
3045   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3046        !end; // iterate until bqe points to a CONSTR
3047        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3048     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3049     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3050     /* types of closures that may appear in a blocking queue */
3051     ASSERT(get_itbl(bqe)->type == TSO ||           
3052            get_itbl(bqe)->type == CONSTR); 
3053     /* only BQs of an RBH end with an RBH_Save closure */
3054     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3055
3056     tso_loc = where_is((StgClosure *)bqe);
3057     switch (get_itbl(bqe)->type) {
3058     case TSO:
3059       fprintf(stderr," TSO %d (%p) on [PE %d],",
3060               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3061       break;
3062     case CONSTR:
3063       fprintf(stderr," %s (IP %p),",
3064               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3065                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3066                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3067                "RBH_Save_?"), get_itbl(bqe));
3068       break;
3069     default:
3070       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3071            info_type((StgClosure *)bqe), node, info_type(node));
3072       break;
3073     }
3074   } /* for */
3075   fputc('\n', stderr);
3076 }
3077 #else
3078 /* 
3079    Nice and easy: only TSOs on the blocking queue
3080 */
3081 void 
3082 print_bq (StgClosure *node)
3083 {
3084   StgTSO *tso;
3085
3086   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3087   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3088        tso != END_TSO_QUEUE; 
3089        tso=tso->link) {
3090     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3091     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3092     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3093   }
3094   fputc('\n', stderr);
3095 }
3096 # endif
3097
3098 #if defined(PAR)
3099 static nat
3100 run_queue_len(void)
3101 {
3102   nat i;
3103   StgTSO *tso;
3104
3105   for (i=0, tso=run_queue_hd; 
3106        tso != END_TSO_QUEUE;
3107        i++, tso=tso->link)
3108     /* nothing */
3109
3110   return i;
3111 }
3112 #endif
3113
3114 static void
3115 sched_belch(char *s, ...)
3116 {
3117   va_list ap;
3118   va_start(ap,s);
3119 #ifdef SMP
3120   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3121 #else
3122   fprintf(stderr, "scheduler: ");
3123 #endif
3124   vfprintf(stderr, s, ap);
3125   fprintf(stderr, "\n");
3126 }
3127
3128 #endif /* DEBUG */
3129
3130
3131 //@node Index,  , Debugging Routines, Main scheduling code
3132 //@subsection Index
3133
3134 //@index
3135 //* MainRegTable::  @cindex\s-+MainRegTable
3136 //* StgMainThread::  @cindex\s-+StgMainThread
3137 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3138 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3139 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3140 //* context_switch::  @cindex\s-+context_switch
3141 //* createThread::  @cindex\s-+createThread
3142 //* free_capabilities::  @cindex\s-+free_capabilities
3143 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3144 //* initScheduler::  @cindex\s-+initScheduler
3145 //* interrupted::  @cindex\s-+interrupted
3146 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3147 //* next_thread_id::  @cindex\s-+next_thread_id
3148 //* print_bq::  @cindex\s-+print_bq
3149 //* run_queue_hd::  @cindex\s-+run_queue_hd
3150 //* run_queue_tl::  @cindex\s-+run_queue_tl
3151 //* sched_mutex::  @cindex\s-+sched_mutex
3152 //* schedule::  @cindex\s-+schedule
3153 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3154 //* task_ids::  @cindex\s-+task_ids
3155 //* term_mutex::  @cindex\s-+term_mutex
3156 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3157 //@end index