50009f272120814e9783ff2bbcc34d9f54c6a1c3
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.66 2000/04/11 16:36:53 sewardj Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Main scheduling loop::      
41 //* Suspend and Resume::        
42 //* Run queue code::            
43 //* Garbage Collextion Routines::  
44 //* Blocking Queue Routines::   
45 //* Exception Handling Routines::  
46 //* Debugging Routines::        
47 //* Index::                     
48 //@end menu
49
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
52
53 #include "Rts.h"
54 #include "SchedAPI.h"
55 #include "RtsUtils.h"
56 #include "RtsFlags.h"
57 #include "Storage.h"
58 #include "StgRun.h"
59 #include "StgStartup.h"
60 #include "GC.h"
61 #include "Hooks.h"
62 #include "Schedule.h"
63 #include "StgMiscClosures.h"
64 #include "Storage.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
67 #include "Printer.h"
68 #include "Main.h"
69 #include "Signals.h"
70 #include "Sanity.h"
71 #include "Stats.h"
72 #include "Itimer.h"
73 #include "Prelude.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
76 # include "GranSim.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
80 # include "FetchMe.h"
81 # include "HLC.h"
82 #endif
83 #include "Sparks.h"
84
85 #include <stdarg.h>
86
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
89
90 /* Main threads:
91  *
92  * These are the threads which clients have requested that we run.  
93  *
94  * In an SMP build, we might have several concurrent clients all
95  * waiting for results, and each one will wait on a condition variable
96  * until the result is available.
97  *
98  * In non-SMP, clients are strictly nested: the first client calls
99  * into the RTS, which might call out again to C with a _ccall_GC, and
100  * eventually re-enter the RTS.
101  *
102  * Main threads information is kept in a linked list:
103  */
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
106   StgTSO *         tso;
107   SchedulerStatus  stat;
108   StgClosure **    ret;
109 #ifdef SMP
110   pthread_cond_t wakeup;
111 #endif
112   struct StgMainThread_ *link;
113 } StgMainThread;
114
115 /* Main thread queue.
116  * Locks required: sched_mutex.
117  */
118 static StgMainThread *main_threads;
119
120 /* Thread queues.
121  * Locks required: sched_mutex.
122  */
123 #if defined(GRAN)
124
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
127
128 /* 
129    In GranSim we have a runable and a blocked queue for each processor.
130    In order to minimise code changes new arrays run_queue_hds/tls
131    are created. run_queue_hd is then a short cut (macro) for
132    run_queue_hds[CurrentProc] (see GranSim.h).
133    -- HWL
134 */
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139    the std RTS (i.e. we are cheating). However, we don't use this list in
140    the GranSim specific code at the moment (so we are only potentially
141    cheating).  */
142
143 #else /* !GRAN */
144
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
147
148 #endif
149
150 /* Linked list of all threads.
151  * Used for detecting garbage collected threads.
152  */
153 StgTSO *all_threads;
154
155 /* Threads suspended in _ccall_GC.
156  */
157 static StgTSO *suspended_ccalling_threads;
158
159 static void GetRoots(void);
160 static StgTSO *threadStackOverflow(StgTSO *tso);
161
162 /* KH: The following two flags are shared memory locations.  There is no need
163        to lock them, since they are only unset at the end of a scheduler
164        operation.
165 */
166
167 /* flag set by signal handler to precipitate a context switch */
168 //@cindex context_switch
169 nat context_switch;
170
171 /* if this flag is set as well, give up execution */
172 //@cindex interrupted
173 rtsBool interrupted;
174
175 /* Next thread ID to allocate.
176  * Locks required: sched_mutex
177  */
178 //@cindex next_thread_id
179 StgThreadID next_thread_id = 1;
180
181 /*
182  * Pointers to the state of the current thread.
183  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
185  */
186  
187 /* The smallest stack size that makes any sense is:
188  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
189  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
190  *  + 1                       (the realworld token for an IO thread)
191  *  + 1                       (the closure to enter)
192  *
193  * A thread with this stack will bomb immediately with a stack
194  * overflow, which will increase its stack size.  
195  */
196
197 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
198
199 /* Free capability list.
200  * Locks required: sched_mutex.
201  */
202 #ifdef SMP
203 //@cindex free_capabilities
204 //@cindex n_free_capabilities
205 Capability *free_capabilities; /* Available capabilities for running threads */
206 nat n_free_capabilities;       /* total number of available capabilities */
207 #else
208 //@cindex MainRegTable
209 Capability MainRegTable;       /* for non-SMP, we have one global capability */
210 #endif
211
212 #if defined(GRAN)
213 StgTSO *CurrentTSO;
214 #endif
215
216 rtsBool ready_to_gc;
217
218 /* All our current task ids, saved in case we need to kill them later.
219  */
220 #ifdef SMP
221 //@cindex task_ids
222 task_info *task_ids;
223 #endif
224
225 void            addToBlockedQueue ( StgTSO *tso );
226
227 static void     schedule          ( void );
228        void     interruptStgRts   ( void );
229 #if defined(GRAN)
230 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
231 #else
232 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
233 #endif
234
235 #ifdef DEBUG
236 static void sched_belch(char *s, ...);
237 #endif
238
239 #ifdef SMP
240 //@cindex sched_mutex
241 //@cindex term_mutex
242 //@cindex thread_ready_cond
243 //@cindex gc_pending_cond
244 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
245 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
246 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
247 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
248
249 nat await_death;
250 #endif
251
252 #if defined(PAR)
253 StgTSO *LastTSO;
254 rtsTime TimeOfLastYield;
255 #endif
256
257 #if DEBUG
258 char *whatNext_strs[] = {
259   "ThreadEnterGHC",
260   "ThreadRunGHC",
261   "ThreadEnterHugs",
262   "ThreadKilled",
263   "ThreadComplete"
264 };
265
266 char *threadReturnCode_strs[] = {
267   "HeapOverflow",                       /* might also be StackOverflow */
268   "StackOverflow",
269   "ThreadYielding",
270   "ThreadBlocked",
271   "ThreadFinished"
272 };
273 #endif
274
275 /*
276  * The thread state for the main thread.
277 // ToDo: check whether not needed any more
278 StgTSO   *MainTSO;
279  */
280
281 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
282 //@subsection Main scheduling loop
283
284 /* ---------------------------------------------------------------------------
285    Main scheduling loop.
286
287    We use round-robin scheduling, each thread returning to the
288    scheduler loop when one of these conditions is detected:
289
290       * out of heap space
291       * timer expires (thread yields)
292       * thread blocks
293       * thread ends
294       * stack overflow
295
296    Locking notes:  we acquire the scheduler lock once at the beginning
297    of the scheduler loop, and release it when
298     
299       * running a thread, or
300       * waiting for work, or
301       * waiting for a GC to complete.
302
303    GRAN version:
304      In a GranSim setup this loop iterates over the global event queue.
305      This revolves around the global event queue, which determines what 
306      to do next. Therefore, it's more complicated than either the 
307      concurrent or the parallel (GUM) setup.
308
309    GUM version:
310      GUM iterates over incoming messages.
311      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312      and sends out a fish whenever it has nothing to do; in-between
313      doing the actual reductions (shared code below) it processes the
314      incoming messages and deals with delayed operations 
315      (see PendingFetches).
316      This is not the ugliest code you could imagine, but it's bloody close.
317
318    ------------------------------------------------------------------------ */
319 //@cindex schedule
320 static void
321 schedule( void )
322 {
323   StgTSO *t;
324   Capability *cap;
325   StgThreadReturnCode ret;
326 #if defined(GRAN)
327   rtsEvent *event;
328 #elif defined(PAR)
329   StgSparkPool *pool;
330   rtsSpark spark;
331   StgTSO *tso;
332   GlobalTaskId pe;
333 #endif
334   rtsBool was_interrupted = rtsFalse;
335   
336   ACQUIRE_LOCK(&sched_mutex);
337
338 #if defined(GRAN)
339
340   /* set up first event to get things going */
341   /* ToDo: assign costs for system setup and init MainTSO ! */
342   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
343             ContinueThread, 
344             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
345
346   IF_DEBUG(gran,
347            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
348            G_TSO(CurrentTSO, 5));
349
350   if (RtsFlags.GranFlags.Light) {
351     /* Save current time; GranSim Light only */
352     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
353   }      
354
355   event = get_next_event();
356
357   while (event!=(rtsEvent*)NULL) {
358     /* Choose the processor with the next event */
359     CurrentProc = event->proc;
360     CurrentTSO = event->tso;
361
362 #elif defined(PAR)
363
364   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
365
366 #else
367
368   while (1) {
369
370 #endif
371
372     IF_DEBUG(scheduler, printAllThreads());
373
374     /* If we're interrupted (the user pressed ^C, or some other
375      * termination condition occurred), kill all the currently running
376      * threads.
377      */
378     if (interrupted) {
379       IF_DEBUG(scheduler, sched_belch("interrupted"));
380       for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
381         deleteThread(t);
382       }
383       for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
384         deleteThread(t);
385       }
386       run_queue_hd = run_queue_tl = END_TSO_QUEUE;
387       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
388       interrupted = rtsFalse;
389       was_interrupted = rtsTrue;
390     }
391
392     /* Go through the list of main threads and wake up any
393      * clients whose computations have finished.  ToDo: this
394      * should be done more efficiently without a linear scan
395      * of the main threads list, somehow...
396      */
397 #ifdef SMP
398     { 
399       StgMainThread *m, **prev;
400       prev = &main_threads;
401       for (m = main_threads; m != NULL; m = m->link) {
402         switch (m->tso->what_next) {
403         case ThreadComplete:
404           if (m->ret) {
405             *(m->ret) = (StgClosure *)m->tso->sp[0];
406           }
407           *prev = m->link;
408           m->stat = Success;
409           pthread_cond_broadcast(&m->wakeup);
410           break;
411         case ThreadKilled:
412           *prev = m->link;
413           if (was_interrupted) {
414             m->stat = Interrupted;
415           } else {
416             m->stat = Killed;
417           }
418           pthread_cond_broadcast(&m->wakeup);
419           break;
420         default:
421           break;
422         }
423       }
424     }
425
426 #else
427 # if defined(PAR)
428     /* in GUM do this only on the Main PE */
429     if (IAmMainThread)
430 # endif
431     /* If our main thread has finished or been killed, return.
432      */
433     {
434       StgMainThread *m = main_threads;
435       if (m->tso->what_next == ThreadComplete
436           || m->tso->what_next == ThreadKilled) {
437         main_threads = main_threads->link;
438         if (m->tso->what_next == ThreadComplete) {
439           /* we finished successfully, fill in the return value */
440           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
441           m->stat = Success;
442           return;
443         } else {
444           if (was_interrupted) {
445             m->stat = Interrupted;
446           } else {
447             m->stat = Killed;
448           }
449           return;
450         }
451       }
452     }
453 #endif
454
455     /* Top up the run queue from our spark pool.  We try to make the
456      * number of threads in the run queue equal to the number of
457      * free capabilities.
458      */
459 #if defined(SMP)
460     {
461       nat n = n_free_capabilities;
462       StgTSO *tso = run_queue_hd;
463
464       /* Count the run queue */
465       while (n > 0 && tso != END_TSO_QUEUE) {
466         tso = tso->link;
467         n--;
468       }
469
470       for (; n > 0; n--) {
471         StgClosure *spark;
472         spark = findSpark();
473         if (spark == NULL) {
474           break; /* no more sparks in the pool */
475         } else {
476           /* I'd prefer this to be done in activateSpark -- HWL */
477           /* tricky - it needs to hold the scheduler lock and
478            * not try to re-acquire it -- SDM */
479           StgTSO *tso;
480           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
481           pushClosure(tso,spark);
482           PUSH_ON_RUN_QUEUE(tso);
483 #ifdef PAR
484           advisory_thread_count++;
485 #endif
486           
487           IF_DEBUG(scheduler,
488                    sched_belch("turning spark of closure %p into a thread",
489                                (StgClosure *)spark));
490         }
491       }
492       /* We need to wake up the other tasks if we just created some
493        * work for them.
494        */
495       if (n_free_capabilities - n > 1) {
496           pthread_cond_signal(&thread_ready_cond);
497       }
498     }
499 #endif /* SMP */
500
501     /* Check whether any waiting threads need to be woken up.  If the
502      * run queue is empty, and there are no other tasks running, we
503      * can wait indefinitely for something to happen.
504      * ToDo: what if another client comes along & requests another
505      * main thread?
506      */
507     if (blocked_queue_hd != END_TSO_QUEUE) {
508       awaitEvent(
509            (run_queue_hd == END_TSO_QUEUE)
510 #ifdef SMP
511         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
512 #endif
513         );
514     }
515     
516     /* check for signals each time around the scheduler */
517 #ifndef __MINGW32__
518     if (signals_pending()) {
519       start_signal_handlers();
520     }
521 #endif
522
523     /* Detect deadlock: when we have no threads to run, there are
524      * no threads waiting on I/O or sleeping, and all the other
525      * tasks are waiting for work, we must have a deadlock.  Inform
526      * all the main threads.
527      */
528 #ifdef SMP
529     if (blocked_queue_hd == END_TSO_QUEUE
530         && run_queue_hd == END_TSO_QUEUE
531         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
532         ) {
533       StgMainThread *m;
534       for (m = main_threads; m != NULL; m = m->link) {
535           m->ret = NULL;
536           m->stat = Deadlock;
537           pthread_cond_broadcast(&m->wakeup);
538       }
539       main_threads = NULL;
540     }
541 #else /* ! SMP */
542     if (blocked_queue_hd == END_TSO_QUEUE
543         && run_queue_hd == END_TSO_QUEUE) {
544         StgMainThread *m = main_threads;
545         m->ret = NULL;
546         m->stat = Deadlock;
547         main_threads = m->link;
548         return;
549     }
550 #endif
551
552 #ifdef SMP
553     /* If there's a GC pending, don't do anything until it has
554      * completed.
555      */
556     if (ready_to_gc) {
557       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
558       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
559     }
560     
561     /* block until we've got a thread on the run queue and a free
562      * capability.
563      */
564     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
565       IF_DEBUG(scheduler, sched_belch("waiting for work"));
566       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
567       IF_DEBUG(scheduler, sched_belch("work now available"));
568     }
569 #endif
570
571 #if defined(GRAN)
572
573     if (RtsFlags.GranFlags.Light)
574       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
575
576     /* adjust time based on time-stamp */
577     if (event->time > CurrentTime[CurrentProc] &&
578         event->evttype != ContinueThread)
579       CurrentTime[CurrentProc] = event->time;
580     
581     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
582     if (!RtsFlags.GranFlags.Light)
583       handleIdlePEs();
584
585     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
586
587     /* main event dispatcher in GranSim */
588     switch (event->evttype) {
589       /* Should just be continuing execution */
590     case ContinueThread:
591       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
592       /* ToDo: check assertion
593       ASSERT(run_queue_hd != (StgTSO*)NULL &&
594              run_queue_hd != END_TSO_QUEUE);
595       */
596       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
597       if (!RtsFlags.GranFlags.DoAsyncFetch &&
598           procStatus[CurrentProc]==Fetching) {
599         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
600               CurrentTSO->id, CurrentTSO, CurrentProc);
601         goto next_thread;
602       } 
603       /* Ignore ContinueThreads for completed threads */
604       if (CurrentTSO->what_next == ThreadComplete) {
605         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
606               CurrentTSO->id, CurrentTSO, CurrentProc);
607         goto next_thread;
608       } 
609       /* Ignore ContinueThreads for threads that are being migrated */
610       if (PROCS(CurrentTSO)==Nowhere) { 
611         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
612               CurrentTSO->id, CurrentTSO, CurrentProc);
613         goto next_thread;
614       }
615       /* The thread should be at the beginning of the run queue */
616       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
617         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
618               CurrentTSO->id, CurrentTSO, CurrentProc);
619         break; // run the thread anyway
620       }
621       /*
622       new_event(proc, proc, CurrentTime[proc],
623                 FindWork,
624                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
625       goto next_thread; 
626       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
627       break; // now actually run the thread; DaH Qu'vam yImuHbej 
628
629     case FetchNode:
630       do_the_fetchnode(event);
631       goto next_thread;             /* handle next event in event queue  */
632       
633     case GlobalBlock:
634       do_the_globalblock(event);
635       goto next_thread;             /* handle next event in event queue  */
636       
637     case FetchReply:
638       do_the_fetchreply(event);
639       goto next_thread;             /* handle next event in event queue  */
640       
641     case UnblockThread:   /* Move from the blocked queue to the tail of */
642       do_the_unblock(event);
643       goto next_thread;             /* handle next event in event queue  */
644       
645     case ResumeThread:  /* Move from the blocked queue to the tail of */
646       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
647       event->tso->gran.blocktime += 
648         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
649       do_the_startthread(event);
650       goto next_thread;             /* handle next event in event queue  */
651       
652     case StartThread:
653       do_the_startthread(event);
654       goto next_thread;             /* handle next event in event queue  */
655       
656     case MoveThread:
657       do_the_movethread(event);
658       goto next_thread;             /* handle next event in event queue  */
659       
660     case MoveSpark:
661       do_the_movespark(event);
662       goto next_thread;             /* handle next event in event queue  */
663       
664     case FindWork:
665       do_the_findwork(event);
666       goto next_thread;             /* handle next event in event queue  */
667       
668     default:
669       barf("Illegal event type %u\n", event->evttype);
670     }  /* switch */
671     
672     /* This point was scheduler_loop in the old RTS */
673
674     IF_DEBUG(gran, belch("GRAN: after main switch"));
675
676     TimeOfLastEvent = CurrentTime[CurrentProc];
677     TimeOfNextEvent = get_time_of_next_event();
678     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
679     // CurrentTSO = ThreadQueueHd;
680
681     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
682                          TimeOfNextEvent));
683
684     if (RtsFlags.GranFlags.Light) 
685       GranSimLight_leave_system(event, &ActiveTSO); 
686
687     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
688
689     IF_DEBUG(gran, 
690              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
691
692     /* in a GranSim setup the TSO stays on the run queue */
693     t = CurrentTSO;
694     /* Take a thread from the run queue. */
695     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
696
697     IF_DEBUG(gran, 
698              fprintf(stderr, "GRAN: About to run current thread, which is\n");
699              G_TSO(t,5))
700
701     context_switch = 0; // turned on via GranYield, checking events and time slice
702
703     IF_DEBUG(gran, 
704              DumpGranEvent(GR_SCHEDULE, t));
705
706     procStatus[CurrentProc] = Busy;
707
708 #elif defined(PAR)
709
710     if (PendingFetches != END_BF_QUEUE) {
711         processFetches();
712     }
713
714     /* ToDo: phps merge with spark activation above */
715     /* check whether we have local work and send requests if we have none */
716     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
717       /* :-[  no local threads => look out for local sparks */
718       /* the spark pool for the current PE */
719       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
720       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
721           pool->hd < pool->tl) {
722         /* 
723          * ToDo: add GC code check that we really have enough heap afterwards!!
724          * Old comment:
725          * If we're here (no runnable threads) and we have pending
726          * sparks, we must have a space problem.  Get enough space
727          * to turn one of those pending sparks into a
728          * thread... 
729          */
730         
731         spark = findSpark();                /* get a spark */
732         if (spark != (rtsSpark) NULL) {
733           tso = activateSpark(spark);       /* turn the spark into a thread */
734           IF_PAR_DEBUG(schedule,
735                        belch("==== schedule: Created TSO %d (%p); %d threads active",
736                              tso->id, tso, advisory_thread_count));
737
738           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
739             belch("==^^ failed to activate spark");
740             goto next_thread;
741           }               /* otherwise fall through & pick-up new tso */
742         } else {
743           IF_PAR_DEBUG(verbose,
744                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
745                              spark_queue_len(pool)));
746           goto next_thread;
747         }
748       } else  
749       /* =8-[  no local sparks => look for work on other PEs */
750       {
751         /*
752          * We really have absolutely no work.  Send out a fish
753          * (there may be some out there already), and wait for
754          * something to arrive.  We clearly can't run any threads
755          * until a SCHEDULE or RESUME arrives, and so that's what
756          * we're hoping to see.  (Of course, we still have to
757          * respond to other types of messages.)
758          */
759         if (//!fishing &&  
760             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
761           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
762           /* fishing set in sendFish, processFish;
763              avoid flooding system with fishes via delay */
764           pe = choosePE();
765           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
766                    NEW_FISH_HUNGER);
767         }
768         
769         processMessages();
770         goto next_thread;
771         // ReSchedule(0);
772       }
773     } else if (PacketsWaiting()) {  /* Look for incoming messages */
774       processMessages();
775     }
776
777     /* Now we are sure that we have some work available */
778     ASSERT(run_queue_hd != END_TSO_QUEUE);
779     /* Take a thread from the run queue, if we have work */
780     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
781
782     /* ToDo: write something to the log-file
783     if (RTSflags.ParFlags.granSimStats && !sameThread)
784         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
785
786     CurrentTSO = t;
787     */
788     /* the spark pool for the current PE */
789     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
790
791     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)", 
792                               spark_queue_len(pool), 
793                               CURRENT_PROC,
794                               pool->hd, pool->tl, pool->base, pool->lim));
795
796     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
797                               run_queue_len(), CURRENT_PROC,
798                               run_queue_hd, run_queue_tl));
799
800 #if 0
801     if (t != LastTSO) {
802       /* 
803          we are running a different TSO, so write a schedule event to log file
804          NB: If we use fair scheduling we also have to write  a deschedule 
805              event for LastTSO; with unfair scheduling we know that the
806              previous tso has blocked whenever we switch to another tso, so
807              we don't need it in GUM for now
808       */
809       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
810                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
811       
812     }
813 #endif
814 #else /* !GRAN && !PAR */
815   
816     /* grab a thread from the run queue
817      */
818     ASSERT(run_queue_hd != END_TSO_QUEUE);
819     t = POP_RUN_QUEUE();
820     IF_DEBUG(sanity,checkTSO(t));
821
822 #endif
823     
824     /* grab a capability
825      */
826 #ifdef SMP
827     cap = free_capabilities;
828     free_capabilities = cap->link;
829     n_free_capabilities--;
830 #else
831     cap = &MainRegTable;
832 #endif
833     
834     cap->rCurrentTSO = t;
835     
836     /* set the context_switch flag
837      */
838     if (run_queue_hd == END_TSO_QUEUE)
839       context_switch = 0;
840     else
841       context_switch = 1;
842
843     RELEASE_LOCK(&sched_mutex);
844
845     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
846                               t->id, t, whatNext_strs[t->what_next]));
847
848     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
849     /* Run the current thread 
850      */
851     switch (cap->rCurrentTSO->what_next) {
852     case ThreadKilled:
853     case ThreadComplete:
854       /* Thread already finished, return to scheduler. */
855       ret = ThreadFinished;
856       break;
857     case ThreadEnterGHC:
858       ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
859       break;
860     case ThreadRunGHC:
861       ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
862       break;
863     case ThreadEnterHugs:
864 #ifdef INTERPRETER
865       {
866          StgClosure* c;
867          IF_DEBUG(scheduler,sched_belch("entering Hugs"));
868          c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
869          cap->rCurrentTSO->sp += 1;
870          ret = enter(cap,c);
871          break;
872       }
873 #else
874       barf("Panic: entered a BCO but no bytecode interpreter in this build");
875 #endif
876     default:
877       barf("schedule: invalid what_next field");
878     }
879     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
880     
881     /* Costs for the scheduler are assigned to CCS_SYSTEM */
882 #ifdef PROFILING
883     CCCS = CCS_SYSTEM;
884 #endif
885     
886     ACQUIRE_LOCK(&sched_mutex);
887
888 #ifdef SMP
889     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
890 #elif !defined(GRAN) && !defined(PAR)
891     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
892 #endif
893     t = cap->rCurrentTSO;
894     
895 #if defined(PAR)
896     /* HACK 675: if the last thread didn't yield, make sure to print a 
897        SCHEDULE event to the log file when StgRunning the next thread, even
898        if it is the same one as before */
899     LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t; 
900     TimeOfLastYield = CURRENT_TIME;
901 #endif
902
903     switch (ret) {
904     case HeapOverflow:
905       /* make all the running tasks block on a condition variable,
906        * maybe set context_switch and wait till they all pile in,
907        * then have them wait on a GC condition variable.
908        */
909       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
910                                t->id, t, whatNext_strs[t->what_next]));
911       threadPaused(t);
912 #if defined(GRAN)
913       ASSERT(!is_on_queue(t,CurrentProc));
914 #endif
915       
916       ready_to_gc = rtsTrue;
917       context_switch = 1;               /* stop other threads ASAP */
918       PUSH_ON_RUN_QUEUE(t);
919       /* actual GC is done at the end of the while loop */
920       break;
921       
922     case StackOverflow:
923       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
924                                t->id, t, whatNext_strs[t->what_next]));
925       /* just adjust the stack for this thread, then pop it back
926        * on the run queue.
927        */
928       threadPaused(t);
929       { 
930         StgMainThread *m;
931         /* enlarge the stack */
932         StgTSO *new_t = threadStackOverflow(t);
933         
934         /* This TSO has moved, so update any pointers to it from the
935          * main thread stack.  It better not be on any other queues...
936          * (it shouldn't be).
937          */
938         for (m = main_threads; m != NULL; m = m->link) {
939           if (m->tso == t) {
940             m->tso = new_t;
941           }
942         }
943         threadPaused(new_t);
944         PUSH_ON_RUN_QUEUE(new_t);
945       }
946       break;
947
948     case ThreadYielding:
949 #if defined(GRAN)
950       IF_DEBUG(gran, 
951                DumpGranEvent(GR_DESCHEDULE, t));
952       globalGranStats.tot_yields++;
953 #elif defined(PAR)
954       IF_DEBUG(par, 
955                DumpGranEvent(GR_DESCHEDULE, t));
956 #endif
957       /* put the thread back on the run queue.  Then, if we're ready to
958        * GC, check whether this is the last task to stop.  If so, wake
959        * up the GC thread.  getThread will block during a GC until the
960        * GC is finished.
961        */
962       IF_DEBUG(scheduler,
963                if (t->what_next == ThreadEnterHugs) {
964                    /* ToDo: or maybe a timer expired when we were in Hugs?
965                     * or maybe someone hit ctrl-C
966                     */
967                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
968                          t->id, t, whatNext_strs[t->what_next]);
969                } else {
970                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
971                          t->id, t, whatNext_strs[t->what_next]);
972                }
973                );
974       threadPaused(t);
975       IF_DEBUG(sanity,
976                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
977                checkTSO(t));
978       ASSERT(t->link == END_TSO_QUEUE);
979 #if defined(GRAN)
980       ASSERT(!is_on_queue(t,CurrentProc));
981
982       IF_DEBUG(sanity,
983                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
984                checkThreadQsSanity(rtsTrue));
985 #endif
986       APPEND_TO_RUN_QUEUE(t);
987 #if defined(GRAN)
988       /* add a ContinueThread event to actually process the thread */
989       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
990                 ContinueThread,
991                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
992       IF_GRAN_DEBUG(bq, 
993                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
994                G_EVENTQ(0);
995                G_CURR_THREADQ(0))
996 #endif /* GRAN */
997       break;
998       
999     case ThreadBlocked:
1000 #if defined(GRAN)
1001       IF_DEBUG(scheduler,
1002                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1003                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1004                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1005
1006       // ??? needed; should emit block before
1007       IF_DEBUG(gran, 
1008                DumpGranEvent(GR_DESCHEDULE, t)); 
1009       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1010       /*
1011         ngoq Dogh!
1012       ASSERT(procStatus[CurrentProc]==Busy || 
1013               ((procStatus[CurrentProc]==Fetching) && 
1014               (t->block_info.closure!=(StgClosure*)NULL)));
1015       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1016           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1017             procStatus[CurrentProc]==Fetching)) 
1018         procStatus[CurrentProc] = Idle;
1019       */
1020 #elif defined(PAR)
1021       IF_DEBUG(par, 
1022                DumpGranEvent(GR_DESCHEDULE, t)); 
1023
1024       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1025       blockThread(t);
1026
1027       IF_DEBUG(scheduler,
1028                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1029                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1030                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1031
1032 #else /* !GRAN */
1033       /* don't need to do anything.  Either the thread is blocked on
1034        * I/O, in which case we'll have called addToBlockedQueue
1035        * previously, or it's blocked on an MVar or Blackhole, in which
1036        * case it'll be on the relevant queue already.
1037        */
1038       IF_DEBUG(scheduler,
1039                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1040                printThreadBlockage(t);
1041                fprintf(stderr, "\n"));
1042
1043       /* Only for dumping event to log file 
1044          ToDo: do I need this in GranSim, too?
1045       blockThread(t);
1046       */
1047 #endif
1048       threadPaused(t);
1049       break;
1050       
1051     case ThreadFinished:
1052       /* Need to check whether this was a main thread, and if so, signal
1053        * the task that started it with the return value.  If we have no
1054        * more main threads, we probably need to stop all the tasks until
1055        * we get a new one.
1056        */
1057       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1058       t->what_next = ThreadComplete;
1059 #if defined(GRAN)
1060       endThread(t, CurrentProc); // clean-up the thread
1061 #elif defined(PAR)
1062       advisory_thread_count--;
1063       if (RtsFlags.ParFlags.ParStats.Full) 
1064         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1065 #endif
1066       break;
1067       
1068     default:
1069       barf("doneThread: invalid thread return code");
1070     }
1071     
1072 #ifdef SMP
1073     cap->link = free_capabilities;
1074     free_capabilities = cap;
1075     n_free_capabilities++;
1076 #endif
1077
1078 #ifdef SMP
1079     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1080 #else
1081     if (ready_to_gc) 
1082 #endif
1083       {
1084       /* everybody back, start the GC.
1085        * Could do it in this thread, or signal a condition var
1086        * to do it in another thread.  Either way, we need to
1087        * broadcast on gc_pending_cond afterward.
1088        */
1089 #ifdef SMP
1090       IF_DEBUG(scheduler,sched_belch("doing GC"));
1091 #endif
1092       GarbageCollect(GetRoots,rtsFalse);
1093       ready_to_gc = rtsFalse;
1094 #ifdef SMP
1095       pthread_cond_broadcast(&gc_pending_cond);
1096 #endif
1097 #if defined(GRAN)
1098       /* add a ContinueThread event to continue execution of current thread */
1099       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1100                 ContinueThread,
1101                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1102       IF_GRAN_DEBUG(bq, 
1103                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1104                G_EVENTQ(0);
1105                G_CURR_THREADQ(0))
1106 #endif /* GRAN */
1107     }
1108 #if defined(GRAN)
1109   next_thread:
1110     IF_GRAN_DEBUG(unused,
1111                   print_eventq(EventHd));
1112
1113     event = get_next_event();
1114
1115 #elif defined(PAR)
1116   next_thread:
1117     /* ToDo: wait for next message to arrive rather than busy wait */
1118
1119 #else /* GRAN */
1120   /* not any more
1121   next_thread:
1122     t = take_off_run_queue(END_TSO_QUEUE);
1123   */
1124 #endif /* GRAN */
1125   } /* end of while(1) */
1126 }
1127
1128 /* A hack for Hugs concurrency support.  Needs sanitisation (?) */
1129 void deleteAllThreads ( void )
1130 {
1131   StgTSO* t;
1132   IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
1133   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1134     deleteThread(t);
1135   }
1136   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1137     deleteThread(t);
1138   }
1139   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1140   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1141 }
1142
1143 /* startThread and  insertThread are now in GranSim.c -- HWL */
1144
1145 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1146 //@subsection Suspend and Resume
1147
1148 /* ---------------------------------------------------------------------------
1149  * Suspending & resuming Haskell threads.
1150  * 
1151  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1152  * its capability before calling the C function.  This allows another
1153  * task to pick up the capability and carry on running Haskell
1154  * threads.  It also means that if the C call blocks, it won't lock
1155  * the whole system.
1156  *
1157  * The Haskell thread making the C call is put to sleep for the
1158  * duration of the call, on the susepended_ccalling_threads queue.  We
1159  * give out a token to the task, which it can use to resume the thread
1160  * on return from the C function.
1161  * ------------------------------------------------------------------------- */
1162    
1163 StgInt
1164 suspendThread( Capability *cap )
1165 {
1166   nat tok;
1167
1168   ACQUIRE_LOCK(&sched_mutex);
1169
1170   IF_DEBUG(scheduler,
1171            sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
1172
1173   threadPaused(cap->rCurrentTSO);
1174   cap->rCurrentTSO->link = suspended_ccalling_threads;
1175   suspended_ccalling_threads = cap->rCurrentTSO;
1176
1177   /* Use the thread ID as the token; it should be unique */
1178   tok = cap->rCurrentTSO->id;
1179
1180 #ifdef SMP
1181   cap->link = free_capabilities;
1182   free_capabilities = cap;
1183   n_free_capabilities++;
1184 #endif
1185
1186   RELEASE_LOCK(&sched_mutex);
1187   return tok; 
1188 }
1189
1190 Capability *
1191 resumeThread( StgInt tok )
1192 {
1193   StgTSO *tso, **prev;
1194   Capability *cap;
1195
1196   ACQUIRE_LOCK(&sched_mutex);
1197
1198   prev = &suspended_ccalling_threads;
1199   for (tso = suspended_ccalling_threads; 
1200        tso != END_TSO_QUEUE; 
1201        prev = &tso->link, tso = tso->link) {
1202     if (tso->id == (StgThreadID)tok) {
1203       *prev = tso->link;
1204       break;
1205     }
1206   }
1207   if (tso == END_TSO_QUEUE) {
1208     barf("resumeThread: thread not found");
1209   }
1210
1211 #ifdef SMP
1212   while (free_capabilities == NULL) {
1213     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1214     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1215     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1216   }
1217   cap = free_capabilities;
1218   free_capabilities = cap->link;
1219   n_free_capabilities--;
1220 #else  
1221   cap = &MainRegTable;
1222 #endif
1223
1224   cap->rCurrentTSO = tso;
1225
1226   RELEASE_LOCK(&sched_mutex);
1227   return cap;
1228 }
1229
1230
1231 /* ---------------------------------------------------------------------------
1232  * Static functions
1233  * ------------------------------------------------------------------------ */
1234 static void unblockThread(StgTSO *tso);
1235
1236 /* ---------------------------------------------------------------------------
1237  * Comparing Thread ids.
1238  *
1239  * This is used from STG land in the implementation of the
1240  * instances of Eq/Ord for ThreadIds.
1241  * ------------------------------------------------------------------------ */
1242
1243 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1244
1245   StgThreadID id1 = tso1->id; 
1246   StgThreadID id2 = tso2->id;
1247  
1248   if (id1 < id2) return (-1);
1249   if (id1 > id2) return 1;
1250   return 0;
1251 }
1252
1253 /* ---------------------------------------------------------------------------
1254    Create a new thread.
1255
1256    The new thread starts with the given stack size.  Before the
1257    scheduler can run, however, this thread needs to have a closure
1258    (and possibly some arguments) pushed on its stack.  See
1259    pushClosure() in Schedule.h.
1260
1261    createGenThread() and createIOThread() (in SchedAPI.h) are
1262    convenient packaged versions of this function.
1263
1264    currently pri (priority) is only used in a GRAN setup -- HWL
1265    ------------------------------------------------------------------------ */
1266 //@cindex createThread
1267 #if defined(GRAN)
1268 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1269 StgTSO *
1270 createThread(nat stack_size, StgInt pri)
1271 {
1272   return createThread_(stack_size, rtsFalse, pri);
1273 }
1274
1275 static StgTSO *
1276 createThread_(nat size, rtsBool have_lock, StgInt pri)
1277 {
1278 #else
1279 StgTSO *
1280 createThread(nat stack_size)
1281 {
1282   return createThread_(stack_size, rtsFalse);
1283 }
1284
1285 static StgTSO *
1286 createThread_(nat size, rtsBool have_lock)
1287 {
1288 #endif
1289
1290     StgTSO *tso;
1291     nat stack_size;
1292
1293     /* First check whether we should create a thread at all */
1294 #if defined(PAR)
1295   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1296   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1297     threadsIgnored++;
1298     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1299           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1300     return END_TSO_QUEUE;
1301   }
1302   threadsCreated++;
1303 #endif
1304
1305 #if defined(GRAN)
1306   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1307 #endif
1308
1309   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1310
1311   /* catch ridiculously small stack sizes */
1312   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1313     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1314   }
1315
1316   stack_size = size - TSO_STRUCT_SIZEW;
1317
1318   tso = (StgTSO *)allocate(size);
1319   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1320
1321   SET_HDR(tso, &TSO_info, CCS_SYSTEM);
1322 #if defined(GRAN)
1323   SET_GRAN_HDR(tso, ThisPE);
1324 #endif
1325   tso->what_next     = ThreadEnterGHC;
1326
1327   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1328    * protect the increment operation on next_thread_id.
1329    * In future, we could use an atomic increment instead.
1330    */
1331   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1332   tso->id = next_thread_id++; 
1333   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1334
1335   tso->why_blocked  = NotBlocked;
1336   tso->blocked_exceptions = NULL;
1337
1338   tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1339   tso->stack_size   = stack_size;
1340   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1341                               - TSO_STRUCT_SIZEW;
1342   tso->sp           = (P_)&(tso->stack) + stack_size;
1343
1344 #ifdef PROFILING
1345   tso->prof.CCCS = CCS_MAIN;
1346 #endif
1347
1348   /* put a stop frame on the stack */
1349   tso->sp -= sizeofW(StgStopFrame);
1350   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1351   tso->su = (StgUpdateFrame*)tso->sp;
1352
1353   // ToDo: check this
1354 #if defined(GRAN)
1355   tso->link = END_TSO_QUEUE;
1356   /* uses more flexible routine in GranSim */
1357   insertThread(tso, CurrentProc);
1358 #else
1359   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1360    * from its creation
1361    */
1362 #endif
1363
1364 #if defined(GRAN) || defined(PAR)
1365   DumpGranEvent(GR_START,tso);
1366 #endif
1367
1368   /* Link the new thread on the global thread list.
1369    */
1370   tso->global_link = all_threads;
1371   all_threads = tso;
1372
1373 #if defined(GRAN)
1374   tso->gran.pri = pri;
1375 # if defined(DEBUG)
1376   tso->gran.magic = TSO_MAGIC; // debugging only
1377 # endif
1378   tso->gran.sparkname   = 0;
1379   tso->gran.startedat   = CURRENT_TIME; 
1380   tso->gran.exported    = 0;
1381   tso->gran.basicblocks = 0;
1382   tso->gran.allocs      = 0;
1383   tso->gran.exectime    = 0;
1384   tso->gran.fetchtime   = 0;
1385   tso->gran.fetchcount  = 0;
1386   tso->gran.blocktime   = 0;
1387   tso->gran.blockcount  = 0;
1388   tso->gran.blockedat   = 0;
1389   tso->gran.globalsparks = 0;
1390   tso->gran.localsparks  = 0;
1391   if (RtsFlags.GranFlags.Light)
1392     tso->gran.clock  = Now; /* local clock */
1393   else
1394     tso->gran.clock  = 0;
1395
1396   IF_DEBUG(gran,printTSO(tso));
1397 #elif defined(PAR)
1398 # if defined(DEBUG)
1399   tso->par.magic = TSO_MAGIC; // debugging only
1400 # endif
1401   tso->par.sparkname   = 0;
1402   tso->par.startedat   = CURRENT_TIME; 
1403   tso->par.exported    = 0;
1404   tso->par.basicblocks = 0;
1405   tso->par.allocs      = 0;
1406   tso->par.exectime    = 0;
1407   tso->par.fetchtime   = 0;
1408   tso->par.fetchcount  = 0;
1409   tso->par.blocktime   = 0;
1410   tso->par.blockcount  = 0;
1411   tso->par.blockedat   = 0;
1412   tso->par.globalsparks = 0;
1413   tso->par.localsparks  = 0;
1414 #endif
1415
1416 #if defined(GRAN)
1417   globalGranStats.tot_threads_created++;
1418   globalGranStats.threads_created_on_PE[CurrentProc]++;
1419   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1420   globalGranStats.tot_sq_probes++;
1421 #endif 
1422
1423 #if defined(GRAN)
1424   IF_GRAN_DEBUG(pri,
1425                 belch("==__ schedule: Created TSO %d (%p);",
1426                       CurrentProc, tso, tso->id));
1427 #elif defined(PAR)
1428     IF_PAR_DEBUG(verbose,
1429                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1430                        tso->id, tso, advisory_thread_count));
1431 #else
1432   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1433                                  tso->id, tso->stack_size));
1434 #endif    
1435   return tso;
1436 }
1437
1438 /*
1439   Turn a spark into a thread.
1440   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1441 */
1442 #if defined(PAR)
1443 //@cindex activateSpark
1444 StgTSO *
1445 activateSpark (rtsSpark spark) 
1446 {
1447   StgTSO *tso;
1448   
1449   ASSERT(spark != (rtsSpark)NULL);
1450   tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1451   if (tso!=END_TSO_QUEUE) {
1452     pushClosure(tso,spark);
1453     PUSH_ON_RUN_QUEUE(tso);
1454     advisory_thread_count++;
1455
1456     if (RtsFlags.ParFlags.ParStats.Full) {
1457       //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1458       IF_PAR_DEBUG(verbose,
1459                    belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1460                          (StgClosure *)spark, info_type((StgClosure *)spark)));
1461     }
1462   } else {
1463     barf("activateSpark: Cannot create TSO");
1464   }
1465   // ToDo: fwd info on local/global spark to thread -- HWL
1466   // tso->gran.exported =  spark->exported;
1467   // tso->gran.locked =   !spark->global;
1468   // tso->gran.sparkname = spark->name;
1469
1470   return tso;
1471 }
1472 #endif
1473
1474 /* ---------------------------------------------------------------------------
1475  * scheduleThread()
1476  *
1477  * scheduleThread puts a thread on the head of the runnable queue.
1478  * This will usually be done immediately after a thread is created.
1479  * The caller of scheduleThread must create the thread using e.g.
1480  * createThread and push an appropriate closure
1481  * on this thread's stack before the scheduler is invoked.
1482  * ------------------------------------------------------------------------ */
1483
1484 void
1485 scheduleThread(StgTSO *tso)
1486 {
1487   if (tso==END_TSO_QUEUE){    
1488     schedule();
1489     return;
1490   }
1491
1492   ACQUIRE_LOCK(&sched_mutex);
1493
1494   /* Put the new thread on the head of the runnable queue.  The caller
1495    * better push an appropriate closure on this thread's stack
1496    * beforehand.  In the SMP case, the thread may start running as
1497    * soon as we release the scheduler lock below.
1498    */
1499   PUSH_ON_RUN_QUEUE(tso);
1500   THREAD_RUNNABLE();
1501
1502 #if 0
1503   IF_DEBUG(scheduler,printTSO(tso));
1504 #endif
1505   RELEASE_LOCK(&sched_mutex);
1506 }
1507
1508 /* ---------------------------------------------------------------------------
1509  * startTasks()
1510  *
1511  * Start up Posix threads to run each of the scheduler tasks.
1512  * I believe the task ids are not needed in the system as defined.
1513  *  KH @ 25/10/99
1514  * ------------------------------------------------------------------------ */
1515
1516 #if defined(PAR) || defined(SMP)
1517 void *
1518 taskStart( void *arg STG_UNUSED )
1519 {
1520   rts_evalNothing(NULL);
1521 }
1522 #endif
1523
1524 /* ---------------------------------------------------------------------------
1525  * initScheduler()
1526  *
1527  * Initialise the scheduler.  This resets all the queues - if the
1528  * queues contained any threads, they'll be garbage collected at the
1529  * next pass.
1530  *
1531  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1532  * ------------------------------------------------------------------------ */
1533
1534 #ifdef SMP
1535 static void
1536 term_handler(int sig STG_UNUSED)
1537 {
1538   stat_workerStop();
1539   ACQUIRE_LOCK(&term_mutex);
1540   await_death--;
1541   RELEASE_LOCK(&term_mutex);
1542   pthread_exit(NULL);
1543 }
1544 #endif
1545
1546 //@cindex initScheduler
1547 void 
1548 initScheduler(void)
1549 {
1550 #if defined(GRAN)
1551   nat i;
1552
1553   for (i=0; i<=MAX_PROC; i++) {
1554     run_queue_hds[i]      = END_TSO_QUEUE;
1555     run_queue_tls[i]      = END_TSO_QUEUE;
1556     blocked_queue_hds[i]  = END_TSO_QUEUE;
1557     blocked_queue_tls[i]  = END_TSO_QUEUE;
1558     ccalling_threadss[i]  = END_TSO_QUEUE;
1559   }
1560 #else
1561   run_queue_hd      = END_TSO_QUEUE;
1562   run_queue_tl      = END_TSO_QUEUE;
1563   blocked_queue_hd  = END_TSO_QUEUE;
1564   blocked_queue_tl  = END_TSO_QUEUE;
1565 #endif 
1566
1567   suspended_ccalling_threads  = END_TSO_QUEUE;
1568
1569   main_threads = NULL;
1570   all_threads  = END_TSO_QUEUE;
1571
1572   context_switch = 0;
1573   interrupted    = 0;
1574
1575   enteredCAFs = END_CAF_LIST;
1576
1577   /* Install the SIGHUP handler */
1578 #ifdef SMP
1579   {
1580     struct sigaction action,oact;
1581
1582     action.sa_handler = term_handler;
1583     sigemptyset(&action.sa_mask);
1584     action.sa_flags = 0;
1585     if (sigaction(SIGTERM, &action, &oact) != 0) {
1586       barf("can't install TERM handler");
1587     }
1588   }
1589 #endif
1590
1591 #ifdef SMP
1592   /* Allocate N Capabilities */
1593   {
1594     nat i;
1595     Capability *cap, *prev;
1596     cap  = NULL;
1597     prev = NULL;
1598     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1599       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1600       cap->link = prev;
1601       prev = cap;
1602     }
1603     free_capabilities = cap;
1604     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1605   }
1606   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1607                              n_free_capabilities););
1608 #endif
1609
1610 #if defined(SMP) || defined(PAR)
1611   initSparkPools();
1612 #endif
1613 }
1614
1615 #ifdef SMP
1616 void
1617 startTasks( void )
1618 {
1619   nat i;
1620   int r;
1621   pthread_t tid;
1622   
1623   /* make some space for saving all the thread ids */
1624   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1625                             "initScheduler:task_ids");
1626   
1627   /* and create all the threads */
1628   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1629     r = pthread_create(&tid,NULL,taskStart,NULL);
1630     if (r != 0) {
1631       barf("startTasks: Can't create new Posix thread");
1632     }
1633     task_ids[i].id = tid;
1634     task_ids[i].mut_time = 0.0;
1635     task_ids[i].mut_etime = 0.0;
1636     task_ids[i].gc_time = 0.0;
1637     task_ids[i].gc_etime = 0.0;
1638     task_ids[i].elapsedtimestart = elapsedtime();
1639     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1640   }
1641 }
1642 #endif
1643
1644 void
1645 exitScheduler( void )
1646 {
1647 #ifdef SMP
1648   nat i;
1649
1650   /* Don't want to use pthread_cancel, since we'd have to install
1651    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1652    * all our locks.
1653    */
1654 #if 0
1655   /* Cancel all our tasks */
1656   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1657     pthread_cancel(task_ids[i].id);
1658   }
1659   
1660   /* Wait for all the tasks to terminate */
1661   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1662     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1663                                task_ids[i].id));
1664     pthread_join(task_ids[i].id, NULL);
1665   }
1666 #endif
1667
1668   /* Send 'em all a SIGHUP.  That should shut 'em up.
1669    */
1670   await_death = RtsFlags.ParFlags.nNodes;
1671   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1672     pthread_kill(task_ids[i].id,SIGTERM);
1673   }
1674   while (await_death > 0) {
1675     sched_yield();
1676   }
1677 #endif
1678 }
1679
1680 /* -----------------------------------------------------------------------------
1681    Managing the per-task allocation areas.
1682    
1683    Each capability comes with an allocation area.  These are
1684    fixed-length block lists into which allocation can be done.
1685
1686    ToDo: no support for two-space collection at the moment???
1687    -------------------------------------------------------------------------- */
1688
1689 /* -----------------------------------------------------------------------------
1690  * waitThread is the external interface for running a new computation
1691  * and waiting for the result.
1692  *
1693  * In the non-SMP case, we create a new main thread, push it on the 
1694  * main-thread stack, and invoke the scheduler to run it.  The
1695  * scheduler will return when the top main thread on the stack has
1696  * completed or died, and fill in the necessary fields of the
1697  * main_thread structure.
1698  *
1699  * In the SMP case, we create a main thread as before, but we then
1700  * create a new condition variable and sleep on it.  When our new
1701  * main thread has completed, we'll be woken up and the status/result
1702  * will be in the main_thread struct.
1703  * -------------------------------------------------------------------------- */
1704
1705 SchedulerStatus
1706 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1707 {
1708   StgMainThread *m;
1709   SchedulerStatus stat;
1710
1711   ACQUIRE_LOCK(&sched_mutex);
1712   
1713   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1714
1715   m->tso = tso;
1716   m->ret = ret;
1717   m->stat = NoStatus;
1718 #ifdef SMP
1719   pthread_cond_init(&m->wakeup, NULL);
1720 #endif
1721
1722   m->link = main_threads;
1723   main_threads = m;
1724
1725   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1726                               m->tso->id));
1727
1728 #ifdef SMP
1729   do {
1730     pthread_cond_wait(&m->wakeup, &sched_mutex);
1731   } while (m->stat == NoStatus);
1732 #elif defined(GRAN)
1733   /* GranSim specific init */
1734   CurrentTSO = m->tso;                // the TSO to run
1735   procStatus[MainProc] = Busy;        // status of main PE
1736   CurrentProc = MainProc;             // PE to run it on
1737
1738   schedule();
1739 #else
1740   schedule();
1741   ASSERT(m->stat != NoStatus);
1742 #endif
1743
1744   stat = m->stat;
1745
1746 #ifdef SMP
1747   pthread_cond_destroy(&m->wakeup);
1748 #endif
1749
1750   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1751                               m->tso->id));
1752   free(m);
1753
1754   RELEASE_LOCK(&sched_mutex);
1755
1756   return stat;
1757 }
1758
1759 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1760 //@subsection Run queue code 
1761
1762 #if 0
1763 /* 
1764    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1765        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1766        implicit global variable that has to be correct when calling these
1767        fcts -- HWL 
1768 */
1769
1770 /* Put the new thread on the head of the runnable queue.
1771  * The caller of createThread better push an appropriate closure
1772  * on this thread's stack before the scheduler is invoked.
1773  */
1774 static /* inline */ void
1775 add_to_run_queue(tso)
1776 StgTSO* tso; 
1777 {
1778   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1779   tso->link = run_queue_hd;
1780   run_queue_hd = tso;
1781   if (run_queue_tl == END_TSO_QUEUE) {
1782     run_queue_tl = tso;
1783   }
1784 }
1785
1786 /* Put the new thread at the end of the runnable queue. */
1787 static /* inline */ void
1788 push_on_run_queue(tso)
1789 StgTSO* tso; 
1790 {
1791   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1792   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1793   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1794   if (run_queue_hd == END_TSO_QUEUE) {
1795     run_queue_hd = tso;
1796   } else {
1797     run_queue_tl->link = tso;
1798   }
1799   run_queue_tl = tso;
1800 }
1801
1802 /* 
1803    Should be inlined because it's used very often in schedule.  The tso
1804    argument is actually only needed in GranSim, where we want to have the
1805    possibility to schedule *any* TSO on the run queue, irrespective of the
1806    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1807    the run queue and dequeue the tso, adjusting the links in the queue. 
1808 */
1809 //@cindex take_off_run_queue
1810 static /* inline */ StgTSO*
1811 take_off_run_queue(StgTSO *tso) {
1812   StgTSO *t, *prev;
1813
1814   /* 
1815      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1816
1817      if tso is specified, unlink that tso from the run_queue (doesn't have
1818      to be at the beginning of the queue); GranSim only 
1819   */
1820   if (tso!=END_TSO_QUEUE) {
1821     /* find tso in queue */
1822     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1823          t!=END_TSO_QUEUE && t!=tso;
1824          prev=t, t=t->link) 
1825       /* nothing */ ;
1826     ASSERT(t==tso);
1827     /* now actually dequeue the tso */
1828     if (prev!=END_TSO_QUEUE) {
1829       ASSERT(run_queue_hd!=t);
1830       prev->link = t->link;
1831     } else {
1832       /* t is at beginning of thread queue */
1833       ASSERT(run_queue_hd==t);
1834       run_queue_hd = t->link;
1835     }
1836     /* t is at end of thread queue */
1837     if (t->link==END_TSO_QUEUE) {
1838       ASSERT(t==run_queue_tl);
1839       run_queue_tl = prev;
1840     } else {
1841       ASSERT(run_queue_tl!=t);
1842     }
1843     t->link = END_TSO_QUEUE;
1844   } else {
1845     /* take tso from the beginning of the queue; std concurrent code */
1846     t = run_queue_hd;
1847     if (t != END_TSO_QUEUE) {
1848       run_queue_hd = t->link;
1849       t->link = END_TSO_QUEUE;
1850       if (run_queue_hd == END_TSO_QUEUE) {
1851         run_queue_tl = END_TSO_QUEUE;
1852       }
1853     }
1854   }
1855   return t;
1856 }
1857
1858 #endif /* 0 */
1859
1860 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1861 //@subsection Garbage Collextion Routines
1862
1863 /* ---------------------------------------------------------------------------
1864    Where are the roots that we know about?
1865
1866         - all the threads on the runnable queue
1867         - all the threads on the blocked queue
1868         - all the thread currently executing a _ccall_GC
1869         - all the "main threads"
1870      
1871    ------------------------------------------------------------------------ */
1872
1873 /* This has to be protected either by the scheduler monitor, or by the
1874         garbage collection monitor (probably the latter).
1875         KH @ 25/10/99
1876 */
1877
1878 static void GetRoots(void)
1879 {
1880   StgMainThread *m;
1881
1882 #if defined(GRAN)
1883   {
1884     nat i;
1885     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1886       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1887         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1888       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1889         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1890       
1891       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1892         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1893       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1894         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1895       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1896         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1897     }
1898   }
1899
1900   markEventQueue();
1901
1902 #else /* !GRAN */
1903   if (run_queue_hd != END_TSO_QUEUE) {
1904     ASSERT(run_queue_tl != END_TSO_QUEUE);
1905     run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1906     run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1907   }
1908
1909   if (blocked_queue_hd != END_TSO_QUEUE) {
1910     ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1911     blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1912     blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1913   }
1914 #endif 
1915
1916   for (m = main_threads; m != NULL; m = m->link) {
1917     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1918   }
1919   if (suspended_ccalling_threads != END_TSO_QUEUE)
1920     suspended_ccalling_threads = 
1921       (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1922
1923 #if defined(SMP) || defined(PAR) || defined(GRAN)
1924   markSparkQueue();
1925 #endif
1926 }
1927
1928 /* -----------------------------------------------------------------------------
1929    performGC
1930
1931    This is the interface to the garbage collector from Haskell land.
1932    We provide this so that external C code can allocate and garbage
1933    collect when called from Haskell via _ccall_GC.
1934
1935    It might be useful to provide an interface whereby the programmer
1936    can specify more roots (ToDo).
1937    
1938    This needs to be protected by the GC condition variable above.  KH.
1939    -------------------------------------------------------------------------- */
1940
1941 void (*extra_roots)(void);
1942
1943 void
1944 performGC(void)
1945 {
1946   GarbageCollect(GetRoots,rtsFalse);
1947 }
1948
1949 void
1950 performMajorGC(void)
1951 {
1952   GarbageCollect(GetRoots,rtsTrue);
1953 }
1954
1955 static void
1956 AllRoots(void)
1957 {
1958   GetRoots();                   /* the scheduler's roots */
1959   extra_roots();                /* the user's roots */
1960 }
1961
1962 void
1963 performGCWithRoots(void (*get_roots)(void))
1964 {
1965   extra_roots = get_roots;
1966
1967   GarbageCollect(AllRoots,rtsFalse);
1968 }
1969
1970 /* -----------------------------------------------------------------------------
1971    Stack overflow
1972
1973    If the thread has reached its maximum stack size, then raise the
1974    StackOverflow exception in the offending thread.  Otherwise
1975    relocate the TSO into a larger chunk of memory and adjust its stack
1976    size appropriately.
1977    -------------------------------------------------------------------------- */
1978
1979 static StgTSO *
1980 threadStackOverflow(StgTSO *tso)
1981 {
1982   nat new_stack_size, new_tso_size, diff, stack_words;
1983   StgPtr new_sp;
1984   StgTSO *dest;
1985
1986   IF_DEBUG(sanity,checkTSO(tso));
1987   if (tso->stack_size >= tso->max_stack_size) {
1988
1989     IF_DEBUG(gc,
1990              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
1991                    tso->id, tso, tso->stack_size, tso->max_stack_size);
1992              /* If we're debugging, just print out the top of the stack */
1993              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
1994                                               tso->sp+64)));
1995
1996 #ifdef INTERPRETER
1997     fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
1998     exit(1);
1999 #else
2000     /* Send this thread the StackOverflow exception */
2001     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2002 #endif
2003     return tso;
2004   }
2005
2006   /* Try to double the current stack size.  If that takes us over the
2007    * maximum stack size for this thread, then use the maximum instead.
2008    * Finally round up so the TSO ends up as a whole number of blocks.
2009    */
2010   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2011   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2012                                        TSO_STRUCT_SIZE)/sizeof(W_);
2013   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2014   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2015
2016   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2017
2018   dest = (StgTSO *)allocate(new_tso_size);
2019   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2020
2021   /* copy the TSO block and the old stack into the new area */
2022   memcpy(dest,tso,TSO_STRUCT_SIZE);
2023   stack_words = tso->stack + tso->stack_size - tso->sp;
2024   new_sp = (P_)dest + new_tso_size - stack_words;
2025   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2026
2027   /* relocate the stack pointers... */
2028   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2029   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2030   dest->sp    = new_sp;
2031   dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2032   dest->stack_size = new_stack_size;
2033         
2034   /* and relocate the update frame list */
2035   relocate_TSO(tso, dest);
2036
2037   /* Mark the old TSO as relocated.  We have to check for relocated
2038    * TSOs in the garbage collector and any primops that deal with TSOs.
2039    *
2040    * It's important to set the sp and su values to just beyond the end
2041    * of the stack, so we don't attempt to scavenge any part of the
2042    * dead TSO's stack.
2043    */
2044   tso->what_next = ThreadRelocated;
2045   tso->link = dest;
2046   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2047   tso->su = (StgUpdateFrame *)tso->sp;
2048   tso->why_blocked = NotBlocked;
2049   dest->mut_link = NULL;
2050
2051   IF_PAR_DEBUG(verbose,
2052                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2053                      tso->id, tso, tso->stack_size);
2054                /* If we're debugging, just print out the top of the stack */
2055                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2056                                                 tso->sp+64)));
2057   
2058   IF_DEBUG(sanity,checkTSO(tso));
2059 #if 0
2060   IF_DEBUG(scheduler,printTSO(dest));
2061 #endif
2062
2063   return dest;
2064 }
2065
2066 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2067 //@subsection Blocking Queue Routines
2068
2069 /* ---------------------------------------------------------------------------
2070    Wake up a queue that was blocked on some resource.
2071    ------------------------------------------------------------------------ */
2072
2073 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
2074
2075 #if defined(GRAN)
2076 static inline void
2077 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2078 {
2079 }
2080 #elif defined(PAR)
2081 static inline void
2082 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2083 {
2084   /* write RESUME events to log file and
2085      update blocked and fetch time (depending on type of the orig closure) */
2086   if (RtsFlags.ParFlags.ParStats.Full) {
2087     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2088                      GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2089                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2090
2091     switch (get_itbl(node)->type) {
2092         case FETCH_ME_BQ:
2093           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2094           break;
2095         case RBH:
2096         case FETCH_ME:
2097         case BLACKHOLE_BQ:
2098           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2099           break;
2100         default:
2101           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2102         }
2103       }
2104 }
2105 #endif
2106
2107 #if defined(GRAN)
2108 static StgBlockingQueueElement *
2109 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2110 {
2111     StgTSO *tso;
2112     PEs node_loc, tso_loc;
2113
2114     node_loc = where_is(node); // should be lifted out of loop
2115     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2116     tso_loc = where_is((StgClosure *)tso);
2117     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2118       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2119       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2120       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2121       // insertThread(tso, node_loc);
2122       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2123                 ResumeThread,
2124                 tso, node, (rtsSpark*)NULL);
2125       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2126       // len_local++;
2127       // len++;
2128     } else { // TSO is remote (actually should be FMBQ)
2129       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2130                                   RtsFlags.GranFlags.Costs.gunblocktime +
2131                                   RtsFlags.GranFlags.Costs.latency;
2132       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2133                 UnblockThread,
2134                 tso, node, (rtsSpark*)NULL);
2135       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2136       // len++;
2137     }
2138     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2139     IF_GRAN_DEBUG(bq,
2140                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2141                           (node_loc==tso_loc ? "Local" : "Global"), 
2142                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2143     tso->block_info.closure = NULL;
2144     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2145                              tso->id, tso));
2146 }
2147 #elif defined(PAR)
2148 static StgBlockingQueueElement *
2149 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2150 {
2151     StgBlockingQueueElement *next;
2152
2153     switch (get_itbl(bqe)->type) {
2154     case TSO:
2155       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2156       /* if it's a TSO just push it onto the run_queue */
2157       next = bqe->link;
2158       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2159       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2160       THREAD_RUNNABLE();
2161       unblockCount(bqe, node);
2162       /* reset blocking status after dumping event */
2163       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2164       break;
2165
2166     case BLOCKED_FETCH:
2167       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2168       next = bqe->link;
2169       bqe->link = PendingFetches;
2170       PendingFetches = bqe;
2171       break;
2172
2173 # if defined(DEBUG)
2174       /* can ignore this case in a non-debugging setup; 
2175          see comments on RBHSave closures above */
2176     case CONSTR:
2177       /* check that the closure is an RBHSave closure */
2178       ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2179              get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2180              get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2181       break;
2182
2183     default:
2184       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2185            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2186            (StgClosure *)bqe);
2187 # endif
2188     }
2189   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2190   return next;
2191 }
2192
2193 #else /* !GRAN && !PAR */
2194 static StgTSO *
2195 unblockOneLocked(StgTSO *tso)
2196 {
2197   StgTSO *next;
2198
2199   ASSERT(get_itbl(tso)->type == TSO);
2200   ASSERT(tso->why_blocked != NotBlocked);
2201   tso->why_blocked = NotBlocked;
2202   next = tso->link;
2203   PUSH_ON_RUN_QUEUE(tso);
2204   THREAD_RUNNABLE();
2205   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2206   return next;
2207 }
2208 #endif
2209
2210 #if defined(GRAN) || defined(PAR)
2211 inline StgBlockingQueueElement *
2212 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2213 {
2214   ACQUIRE_LOCK(&sched_mutex);
2215   bqe = unblockOneLocked(bqe, node);
2216   RELEASE_LOCK(&sched_mutex);
2217   return bqe;
2218 }
2219 #else
2220 inline StgTSO *
2221 unblockOne(StgTSO *tso)
2222 {
2223   ACQUIRE_LOCK(&sched_mutex);
2224   tso = unblockOneLocked(tso);
2225   RELEASE_LOCK(&sched_mutex);
2226   return tso;
2227 }
2228 #endif
2229
2230 #if defined(GRAN)
2231 void 
2232 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2233 {
2234   StgBlockingQueueElement *bqe;
2235   PEs node_loc;
2236   nat len = 0; 
2237
2238   IF_GRAN_DEBUG(bq, 
2239                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2240                       node, CurrentProc, CurrentTime[CurrentProc], 
2241                       CurrentTSO->id, CurrentTSO));
2242
2243   node_loc = where_is(node);
2244
2245   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2246          get_itbl(q)->type == CONSTR); // closure (type constructor)
2247   ASSERT(is_unique(node));
2248
2249   /* FAKE FETCH: magically copy the node to the tso's proc;
2250      no Fetch necessary because in reality the node should not have been 
2251      moved to the other PE in the first place
2252   */
2253   if (CurrentProc!=node_loc) {
2254     IF_GRAN_DEBUG(bq, 
2255                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2256                         node, node_loc, CurrentProc, CurrentTSO->id, 
2257                         // CurrentTSO, where_is(CurrentTSO),
2258                         node->header.gran.procs));
2259     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2260     IF_GRAN_DEBUG(bq, 
2261                   belch("## new bitmask of node %p is %#x",
2262                         node, node->header.gran.procs));
2263     if (RtsFlags.GranFlags.GranSimStats.Global) {
2264       globalGranStats.tot_fake_fetches++;
2265     }
2266   }
2267
2268   bqe = q;
2269   // ToDo: check: ASSERT(CurrentProc==node_loc);
2270   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2271     //next = bqe->link;
2272     /* 
2273        bqe points to the current element in the queue
2274        next points to the next element in the queue
2275     */
2276     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2277     //tso_loc = where_is(tso);
2278     len++;
2279     bqe = unblockOneLocked(bqe, node);
2280   }
2281
2282   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2283      the closure to make room for the anchor of the BQ */
2284   if (bqe!=END_BQ_QUEUE) {
2285     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2286     /*
2287     ASSERT((info_ptr==&RBH_Save_0_info) ||
2288            (info_ptr==&RBH_Save_1_info) ||
2289            (info_ptr==&RBH_Save_2_info));
2290     */
2291     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2292     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2293     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2294
2295     IF_GRAN_DEBUG(bq,
2296                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2297                         node, info_type(node)));
2298   }
2299
2300   /* statistics gathering */
2301   if (RtsFlags.GranFlags.GranSimStats.Global) {
2302     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2303     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2304     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2305     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2306   }
2307   IF_GRAN_DEBUG(bq,
2308                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2309                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2310 }
2311 #elif defined(PAR)
2312 void 
2313 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2314 {
2315   StgBlockingQueueElement *bqe, *next;
2316
2317   ACQUIRE_LOCK(&sched_mutex);
2318
2319   IF_PAR_DEBUG(verbose, 
2320                belch("## AwBQ for node %p on [%x]: ",
2321                      node, mytid));
2322
2323   ASSERT(get_itbl(q)->type == TSO ||           
2324          get_itbl(q)->type == BLOCKED_FETCH || 
2325          get_itbl(q)->type == CONSTR); 
2326
2327   bqe = q;
2328   while (get_itbl(bqe)->type==TSO || 
2329          get_itbl(bqe)->type==BLOCKED_FETCH) {
2330     bqe = unblockOneLocked(bqe, node);
2331   }
2332   RELEASE_LOCK(&sched_mutex);
2333 }
2334
2335 #else   /* !GRAN && !PAR */
2336 void
2337 awakenBlockedQueue(StgTSO *tso)
2338 {
2339   ACQUIRE_LOCK(&sched_mutex);
2340   while (tso != END_TSO_QUEUE) {
2341     tso = unblockOneLocked(tso);
2342   }
2343   RELEASE_LOCK(&sched_mutex);
2344 }
2345 #endif
2346
2347 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2348 //@subsection Exception Handling Routines
2349
2350 /* ---------------------------------------------------------------------------
2351    Interrupt execution
2352    - usually called inside a signal handler so it mustn't do anything fancy.   
2353    ------------------------------------------------------------------------ */
2354
2355 void
2356 interruptStgRts(void)
2357 {
2358     interrupted    = 1;
2359     context_switch = 1;
2360 }
2361
2362 /* -----------------------------------------------------------------------------
2363    Unblock a thread
2364
2365    This is for use when we raise an exception in another thread, which
2366    may be blocked.
2367    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2368    -------------------------------------------------------------------------- */
2369
2370 #if defined(GRAN) || defined(PAR)
2371 /*
2372   NB: only the type of the blocking queue is different in GranSim and GUM
2373       the operations on the queue-elements are the same
2374       long live polymorphism!
2375 */
2376 static void
2377 unblockThread(StgTSO *tso)
2378 {
2379   StgBlockingQueueElement *t, **last;
2380
2381   ACQUIRE_LOCK(&sched_mutex);
2382   switch (tso->why_blocked) {
2383
2384   case NotBlocked:
2385     return;  /* not blocked */
2386
2387   case BlockedOnMVar:
2388     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2389     {
2390       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2391       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2392
2393       last = (StgBlockingQueueElement **)&mvar->head;
2394       for (t = (StgBlockingQueueElement *)mvar->head; 
2395            t != END_BQ_QUEUE; 
2396            last = &t->link, last_tso = t, t = t->link) {
2397         if (t == (StgBlockingQueueElement *)tso) {
2398           *last = (StgBlockingQueueElement *)tso->link;
2399           if (mvar->tail == tso) {
2400             mvar->tail = (StgTSO *)last_tso;
2401           }
2402           goto done;
2403         }
2404       }
2405       barf("unblockThread (MVAR): TSO not found");
2406     }
2407
2408   case BlockedOnBlackHole:
2409     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2410     {
2411       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2412
2413       last = &bq->blocking_queue;
2414       for (t = bq->blocking_queue; 
2415            t != END_BQ_QUEUE; 
2416            last = &t->link, t = t->link) {
2417         if (t == (StgBlockingQueueElement *)tso) {
2418           *last = (StgBlockingQueueElement *)tso->link;
2419           goto done;
2420         }
2421       }
2422       barf("unblockThread (BLACKHOLE): TSO not found");
2423     }
2424
2425   case BlockedOnException:
2426     {
2427       StgTSO *target  = tso->block_info.tso;
2428
2429       ASSERT(get_itbl(target)->type == TSO);
2430       ASSERT(target->blocked_exceptions != NULL);
2431
2432       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2433       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2434            t != END_BQ_QUEUE; 
2435            last = &t->link, t = t->link) {
2436         ASSERT(get_itbl(t)->type == TSO);
2437         if (t == (StgBlockingQueueElement *)tso) {
2438           *last = (StgBlockingQueueElement *)tso->link;
2439           goto done;
2440         }
2441       }
2442       barf("unblockThread (Exception): TSO not found");
2443     }
2444
2445   case BlockedOnDelay:
2446   case BlockedOnRead:
2447   case BlockedOnWrite:
2448     {
2449       StgBlockingQueueElement *prev = NULL;
2450       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2451            prev = t, t = t->link) {
2452         if (t == (StgBlockingQueueElement *)tso) {
2453           if (prev == NULL) {
2454             blocked_queue_hd = (StgTSO *)t->link;
2455             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2456               blocked_queue_tl = END_TSO_QUEUE;
2457             }
2458           } else {
2459             prev->link = t->link;
2460             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2461               blocked_queue_tl = (StgTSO *)prev;
2462             }
2463           }
2464           goto done;
2465         }
2466       }
2467       barf("unblockThread (I/O): TSO not found");
2468     }
2469
2470   default:
2471     barf("unblockThread");
2472   }
2473
2474  done:
2475   tso->link = END_TSO_QUEUE;
2476   tso->why_blocked = NotBlocked;
2477   tso->block_info.closure = NULL;
2478   PUSH_ON_RUN_QUEUE(tso);
2479   RELEASE_LOCK(&sched_mutex);
2480 }
2481 #else
2482 static void
2483 unblockThread(StgTSO *tso)
2484 {
2485   StgTSO *t, **last;
2486
2487   ACQUIRE_LOCK(&sched_mutex);
2488   switch (tso->why_blocked) {
2489
2490   case NotBlocked:
2491     return;  /* not blocked */
2492
2493   case BlockedOnMVar:
2494     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2495     {
2496       StgTSO *last_tso = END_TSO_QUEUE;
2497       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2498
2499       last = &mvar->head;
2500       for (t = mvar->head; t != END_TSO_QUEUE; 
2501            last = &t->link, last_tso = t, t = t->link) {
2502         if (t == tso) {
2503           *last = tso->link;
2504           if (mvar->tail == tso) {
2505             mvar->tail = last_tso;
2506           }
2507           goto done;
2508         }
2509       }
2510       barf("unblockThread (MVAR): TSO not found");
2511     }
2512
2513   case BlockedOnBlackHole:
2514     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2515     {
2516       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2517
2518       last = &bq->blocking_queue;
2519       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2520            last = &t->link, t = t->link) {
2521         if (t == tso) {
2522           *last = tso->link;
2523           goto done;
2524         }
2525       }
2526       barf("unblockThread (BLACKHOLE): TSO not found");
2527     }
2528
2529   case BlockedOnException:
2530     {
2531       StgTSO *target  = tso->block_info.tso;
2532
2533       ASSERT(get_itbl(target)->type == TSO);
2534       ASSERT(target->blocked_exceptions != NULL);
2535
2536       last = &target->blocked_exceptions;
2537       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2538            last = &t->link, t = t->link) {
2539         ASSERT(get_itbl(t)->type == TSO);
2540         if (t == tso) {
2541           *last = tso->link;
2542           goto done;
2543         }
2544       }
2545       barf("unblockThread (Exception): TSO not found");
2546     }
2547
2548   case BlockedOnDelay:
2549   case BlockedOnRead:
2550   case BlockedOnWrite:
2551     {
2552       StgTSO *prev = NULL;
2553       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2554            prev = t, t = t->link) {
2555         if (t == tso) {
2556           if (prev == NULL) {
2557             blocked_queue_hd = t->link;
2558             if (blocked_queue_tl == t) {
2559               blocked_queue_tl = END_TSO_QUEUE;
2560             }
2561           } else {
2562             prev->link = t->link;
2563             if (blocked_queue_tl == t) {
2564               blocked_queue_tl = prev;
2565             }
2566           }
2567           goto done;
2568         }
2569       }
2570       barf("unblockThread (I/O): TSO not found");
2571     }
2572
2573   default:
2574     barf("unblockThread");
2575   }
2576
2577  done:
2578   tso->link = END_TSO_QUEUE;
2579   tso->why_blocked = NotBlocked;
2580   tso->block_info.closure = NULL;
2581   PUSH_ON_RUN_QUEUE(tso);
2582   RELEASE_LOCK(&sched_mutex);
2583 }
2584 #endif
2585
2586 /* -----------------------------------------------------------------------------
2587  * raiseAsync()
2588  *
2589  * The following function implements the magic for raising an
2590  * asynchronous exception in an existing thread.
2591  *
2592  * We first remove the thread from any queue on which it might be
2593  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2594  *
2595  * We strip the stack down to the innermost CATCH_FRAME, building
2596  * thunks in the heap for all the active computations, so they can 
2597  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2598  * an application of the handler to the exception, and push it on
2599  * the top of the stack.
2600  * 
2601  * How exactly do we save all the active computations?  We create an
2602  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2603  * AP_UPDs pushes everything from the corresponding update frame
2604  * upwards onto the stack.  (Actually, it pushes everything up to the
2605  * next update frame plus a pointer to the next AP_UPD object.
2606  * Entering the next AP_UPD object pushes more onto the stack until we
2607  * reach the last AP_UPD object - at which point the stack should look
2608  * exactly as it did when we killed the TSO and we can continue
2609  * execution by entering the closure on top of the stack.
2610  *
2611  * We can also kill a thread entirely - this happens if either (a) the 
2612  * exception passed to raiseAsync is NULL, or (b) there's no
2613  * CATCH_FRAME on the stack.  In either case, we strip the entire
2614  * stack and replace the thread with a zombie.
2615  *
2616  * -------------------------------------------------------------------------- */
2617  
2618 void 
2619 deleteThread(StgTSO *tso)
2620 {
2621   raiseAsync(tso,NULL);
2622 }
2623
2624 void
2625 raiseAsync(StgTSO *tso, StgClosure *exception)
2626 {
2627   StgUpdateFrame* su = tso->su;
2628   StgPtr          sp = tso->sp;
2629   
2630   /* Thread already dead? */
2631   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2632     return;
2633   }
2634
2635   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2636
2637   /* Remove it from any blocking queues */
2638   unblockThread(tso);
2639
2640   /* The stack freezing code assumes there's a closure pointer on
2641    * the top of the stack.  This isn't always the case with compiled
2642    * code, so we have to push a dummy closure on the top which just
2643    * returns to the next return address on the stack.
2644    */
2645   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2646     *(--sp) = (W_)&dummy_ret_closure;
2647   }
2648
2649   while (1) {
2650     int words = ((P_)su - (P_)sp) - 1;
2651     nat i;
2652     StgAP_UPD * ap;
2653
2654     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2655      * then build PAP(handler,exception,realworld#), and leave it on
2656      * top of the stack ready to enter.
2657      */
2658     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2659       StgCatchFrame *cf = (StgCatchFrame *)su;
2660       /* we've got an exception to raise, so let's pass it to the
2661        * handler in this frame.
2662        */
2663       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2664       TICK_ALLOC_UPD_PAP(3,0);
2665       SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2666               
2667       ap->n_args = 2;
2668       ap->fun = cf->handler;    /* :: Exception -> IO a */
2669       ap->payload[0] = (P_)exception;
2670       ap->payload[1] = ARG_TAG(0); /* realworld token */
2671
2672       /* throw away the stack from Sp up to and including the
2673        * CATCH_FRAME.
2674        */
2675       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2676       tso->su = cf->link;
2677
2678       /* Restore the blocked/unblocked state for asynchronous exceptions
2679        * at the CATCH_FRAME.  
2680        *
2681        * If exceptions were unblocked at the catch, arrange that they
2682        * are unblocked again after executing the handler by pushing an
2683        * unblockAsyncExceptions_ret stack frame.
2684        */
2685       if (!cf->exceptions_blocked) {
2686         *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2687       }
2688       
2689       /* Ensure that async exceptions are blocked when running the handler.
2690        */
2691       if (tso->blocked_exceptions == NULL) {
2692         tso->blocked_exceptions = END_TSO_QUEUE;
2693       }
2694       
2695       /* Put the newly-built PAP on top of the stack, ready to execute
2696        * when the thread restarts.
2697        */
2698       sp[0] = (W_)ap;
2699       tso->sp = sp;
2700       tso->what_next = ThreadEnterGHC;
2701       return;
2702     }
2703
2704     /* First build an AP_UPD consisting of the stack chunk above the
2705      * current update frame, with the top word on the stack as the
2706      * fun field.
2707      */
2708     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2709     
2710     ASSERT(words >= 0);
2711     
2712     ap->n_args = words;
2713     ap->fun    = (StgClosure *)sp[0];
2714     sp++;
2715     for(i=0; i < (nat)words; ++i) {
2716       ap->payload[i] = (P_)*sp++;
2717     }
2718     
2719     switch (get_itbl(su)->type) {
2720       
2721     case UPDATE_FRAME:
2722       {
2723         SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2724         TICK_ALLOC_UP_THK(words+1,0);
2725         
2726         IF_DEBUG(scheduler,
2727                  fprintf(stderr,  "scheduler: Updating ");
2728                  printPtr((P_)su->updatee); 
2729                  fprintf(stderr,  " with ");
2730                  printObj((StgClosure *)ap);
2731                  );
2732         
2733         /* Replace the updatee with an indirection - happily
2734          * this will also wake up any threads currently
2735          * waiting on the result.
2736          */
2737         UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2738         su = su->link;
2739         sp += sizeofW(StgUpdateFrame) -1;
2740         sp[0] = (W_)ap; /* push onto stack */
2741         break;
2742       }
2743       
2744     case CATCH_FRAME:
2745       {
2746         StgCatchFrame *cf = (StgCatchFrame *)su;
2747         StgClosure* o;
2748         
2749         /* We want a PAP, not an AP_UPD.  Fortunately, the
2750          * layout's the same.
2751          */
2752         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2753         TICK_ALLOC_UPD_PAP(words+1,0);
2754         
2755         /* now build o = FUN(catch,ap,handler) */
2756         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2757         TICK_ALLOC_FUN(2,0);
2758         SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2759         o->payload[0] = (StgClosure *)ap;
2760         o->payload[1] = cf->handler;
2761         
2762         IF_DEBUG(scheduler,
2763                  fprintf(stderr,  "scheduler: Built ");
2764                  printObj((StgClosure *)o);
2765                  );
2766         
2767         /* pop the old handler and put o on the stack */
2768         su = cf->link;
2769         sp += sizeofW(StgCatchFrame) - 1;
2770         sp[0] = (W_)o;
2771         break;
2772       }
2773       
2774     case SEQ_FRAME:
2775       {
2776         StgSeqFrame *sf = (StgSeqFrame *)su;
2777         StgClosure* o;
2778         
2779         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2780         TICK_ALLOC_UPD_PAP(words+1,0);
2781         
2782         /* now build o = FUN(seq,ap) */
2783         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2784         TICK_ALLOC_SE_THK(1,0);
2785         SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2786         o->payload[0] = (StgClosure *)ap;
2787         
2788         IF_DEBUG(scheduler,
2789                  fprintf(stderr,  "scheduler: Built ");
2790                  printObj((StgClosure *)o);
2791                  );
2792         
2793         /* pop the old handler and put o on the stack */
2794         su = sf->link;
2795         sp += sizeofW(StgSeqFrame) - 1;
2796         sp[0] = (W_)o;
2797         break;
2798       }
2799       
2800     case STOP_FRAME:
2801       /* We've stripped the entire stack, the thread is now dead. */
2802       sp += sizeofW(StgStopFrame) - 1;
2803       sp[0] = (W_)exception;    /* save the exception */
2804       tso->what_next = ThreadKilled;
2805       tso->su = (StgUpdateFrame *)(sp+1);
2806       tso->sp = sp;
2807       return;
2808       
2809     default:
2810       barf("raiseAsync");
2811     }
2812   }
2813   barf("raiseAsync");
2814 }
2815
2816 /* -----------------------------------------------------------------------------
2817    resurrectThreads is called after garbage collection on the list of
2818    threads found to be garbage.  Each of these threads will be woken
2819    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2820    on an MVar, or NonTermination if the thread was blocked on a Black
2821    Hole.
2822    -------------------------------------------------------------------------- */
2823
2824 void
2825 resurrectThreads( StgTSO *threads )
2826 {
2827   StgTSO *tso, *next;
2828
2829   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2830     next = tso->global_link;
2831     tso->global_link = all_threads;
2832     all_threads = tso;
2833     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2834
2835     switch (tso->why_blocked) {
2836     case BlockedOnMVar:
2837     case BlockedOnException:
2838       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2839       break;
2840     case BlockedOnBlackHole:
2841       raiseAsync(tso,(StgClosure *)NonTermination_closure);
2842       break;
2843     case NotBlocked:
2844       /* This might happen if the thread was blocked on a black hole
2845        * belonging to a thread that we've just woken up (raiseAsync
2846        * can wake up threads, remember...).
2847        */
2848       continue;
2849     default:
2850       barf("resurrectThreads: thread blocked in a strange way");
2851     }
2852   }
2853 }
2854
2855 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2856 //@subsection Debugging Routines
2857
2858 /* -----------------------------------------------------------------------------
2859    Debugging: why is a thread blocked
2860    -------------------------------------------------------------------------- */
2861
2862 #ifdef DEBUG
2863
2864 void
2865 printThreadBlockage(StgTSO *tso)
2866 {
2867   switch (tso->why_blocked) {
2868   case BlockedOnRead:
2869     fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2870     break;
2871   case BlockedOnWrite:
2872     fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2873     break;
2874   case BlockedOnDelay:
2875 #if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
2876     fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2877 #else
2878     fprintf(stderr,"blocked on delay of %d ms", 
2879             tso->block_info.target - getourtimeofday());
2880 #endif
2881     break;
2882   case BlockedOnMVar:
2883     fprintf(stderr,"blocked on an MVar");
2884     break;
2885   case BlockedOnException:
2886     fprintf(stderr,"blocked on delivering an exception to thread %d",
2887             tso->block_info.tso->id);
2888     break;
2889   case BlockedOnBlackHole:
2890     fprintf(stderr,"blocked on a black hole");
2891     break;
2892   case NotBlocked:
2893     fprintf(stderr,"not blocked");
2894     break;
2895 #if defined(PAR)
2896   case BlockedOnGA:
2897     fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)",
2898             tso->block_info.closure, info_type(tso->block_info.closure));
2899     break;
2900   case BlockedOnGA_NoSend:
2901     fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)",
2902             tso->block_info.closure, info_type(tso->block_info.closure));
2903     break;
2904 #endif
2905   default:
2906     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
2907          tso->why_blocked, tso->id, tso);
2908   }
2909 }
2910
2911 void
2912 printThreadStatus(StgTSO *tso)
2913 {
2914   switch (tso->what_next) {
2915   case ThreadKilled:
2916     fprintf(stderr,"has been killed");
2917     break;
2918   case ThreadComplete:
2919     fprintf(stderr,"has completed");
2920     break;
2921   default:
2922     printThreadBlockage(tso);
2923   }
2924 }
2925
2926 void
2927 printAllThreads(void)
2928 {
2929   StgTSO *t;
2930
2931   sched_belch("all threads:");
2932   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2933     fprintf(stderr, "\tthread %d is ", t->id);
2934     printThreadStatus(t);
2935     fprintf(stderr,"\n");
2936   }
2937 }
2938     
2939 /* 
2940    Print a whole blocking queue attached to node (debugging only).
2941 */
2942 //@cindex print_bq
2943 # if defined(PAR)
2944 void 
2945 print_bq (StgClosure *node)
2946 {
2947   StgBlockingQueueElement *bqe;
2948   StgTSO *tso;
2949   rtsBool end;
2950
2951   fprintf(stderr,"## BQ of closure %p (%s): ",
2952           node, info_type(node));
2953
2954   /* should cover all closures that may have a blocking queue */
2955   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2956          get_itbl(node)->type == FETCH_ME_BQ ||
2957          get_itbl(node)->type == RBH);
2958     
2959   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2960   /* 
2961      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2962   */
2963   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2964        !end; // iterate until bqe points to a CONSTR
2965        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2966     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2967     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2968     /* types of closures that may appear in a blocking queue */
2969     ASSERT(get_itbl(bqe)->type == TSO ||           
2970            get_itbl(bqe)->type == BLOCKED_FETCH || 
2971            get_itbl(bqe)->type == CONSTR); 
2972     /* only BQs of an RBH end with an RBH_Save closure */
2973     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2974
2975     switch (get_itbl(bqe)->type) {
2976     case TSO:
2977       fprintf(stderr," TSO %d (%x),",
2978               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
2979       break;
2980     case BLOCKED_FETCH:
2981       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
2982               ((StgBlockedFetch *)bqe)->node, 
2983               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
2984               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
2985               ((StgBlockedFetch *)bqe)->ga.weight);
2986       break;
2987     case CONSTR:
2988       fprintf(stderr," %s (IP %p),",
2989               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2990                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2991                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2992                "RBH_Save_?"), get_itbl(bqe));
2993       break;
2994     default:
2995       barf("Unexpected closure type %s in blocking queue of %p (%s)",
2996            info_type(bqe), node, info_type(node));
2997       break;
2998     }
2999   } /* for */
3000   fputc('\n', stderr);
3001 }
3002 # elif defined(GRAN)
3003 void 
3004 print_bq (StgClosure *node)
3005 {
3006   StgBlockingQueueElement *bqe;
3007   PEs node_loc, tso_loc;
3008   rtsBool end;
3009
3010   /* should cover all closures that may have a blocking queue */
3011   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3012          get_itbl(node)->type == FETCH_ME_BQ ||
3013          get_itbl(node)->type == RBH);
3014     
3015   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3016   node_loc = where_is(node);
3017
3018   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3019           node, info_type(node), node_loc);
3020
3021   /* 
3022      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3023   */
3024   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3025        !end; // iterate until bqe points to a CONSTR
3026        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3027     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3028     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3029     /* types of closures that may appear in a blocking queue */
3030     ASSERT(get_itbl(bqe)->type == TSO ||           
3031            get_itbl(bqe)->type == CONSTR); 
3032     /* only BQs of an RBH end with an RBH_Save closure */
3033     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3034
3035     tso_loc = where_is((StgClosure *)bqe);
3036     switch (get_itbl(bqe)->type) {
3037     case TSO:
3038       fprintf(stderr," TSO %d (%p) on [PE %d],",
3039               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3040       break;
3041     case CONSTR:
3042       fprintf(stderr," %s (IP %p),",
3043               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3044                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3045                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3046                "RBH_Save_?"), get_itbl(bqe));
3047       break;
3048     default:
3049       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3050            info_type((StgClosure *)bqe), node, info_type(node));
3051       break;
3052     }
3053   } /* for */
3054   fputc('\n', stderr);
3055 }
3056 #else
3057 /* 
3058    Nice and easy: only TSOs on the blocking queue
3059 */
3060 void 
3061 print_bq (StgClosure *node)
3062 {
3063   StgTSO *tso;
3064
3065   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3066   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3067        tso != END_TSO_QUEUE; 
3068        tso=tso->link) {
3069     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3070     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3071     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3072   }
3073   fputc('\n', stderr);
3074 }
3075 # endif
3076
3077 #if defined(PAR)
3078 static nat
3079 run_queue_len(void)
3080 {
3081   nat i;
3082   StgTSO *tso;
3083
3084   for (i=0, tso=run_queue_hd; 
3085        tso != END_TSO_QUEUE;
3086        i++, tso=tso->link)
3087     /* nothing */
3088
3089   return i;
3090 }
3091 #endif
3092
3093 static void
3094 sched_belch(char *s, ...)
3095 {
3096   va_list ap;
3097   va_start(ap,s);
3098 #ifdef SMP
3099   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3100 #else
3101   fprintf(stderr, "scheduler: ");
3102 #endif
3103   vfprintf(stderr, s, ap);
3104   fprintf(stderr, "\n");
3105 }
3106
3107 #endif /* DEBUG */
3108
3109
3110 //@node Index,  , Debugging Routines, Main scheduling code
3111 //@subsection Index
3112
3113 //@index
3114 //* MainRegTable::  @cindex\s-+MainRegTable
3115 //* StgMainThread::  @cindex\s-+StgMainThread
3116 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3117 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3118 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3119 //* context_switch::  @cindex\s-+context_switch
3120 //* createThread::  @cindex\s-+createThread
3121 //* free_capabilities::  @cindex\s-+free_capabilities
3122 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3123 //* initScheduler::  @cindex\s-+initScheduler
3124 //* interrupted::  @cindex\s-+interrupted
3125 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3126 //* next_thread_id::  @cindex\s-+next_thread_id
3127 //* print_bq::  @cindex\s-+print_bq
3128 //* run_queue_hd::  @cindex\s-+run_queue_hd
3129 //* run_queue_tl::  @cindex\s-+run_queue_tl
3130 //* sched_mutex::  @cindex\s-+sched_mutex
3131 //* schedule::  @cindex\s-+schedule
3132 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3133 //* task_ids::  @cindex\s-+task_ids
3134 //* term_mutex::  @cindex\s-+term_mutex
3135 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3136 //@end index