[project @ 2000-04-04 15:02:02 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.63 2000/04/04 15:02:02 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Main scheduling loop::      
41 //* Suspend and Resume::        
42 //* Run queue code::            
43 //* Garbage Collextion Routines::  
44 //* Blocking Queue Routines::   
45 //* Exception Handling Routines::  
46 //* Debugging Routines::        
47 //* Index::                     
48 //@end menu
49
50 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
51 //@subsection Includes
52
53 #include "Rts.h"
54 #include "SchedAPI.h"
55 #include "RtsUtils.h"
56 #include "RtsFlags.h"
57 #include "Storage.h"
58 #include "StgRun.h"
59 #include "StgStartup.h"
60 #include "GC.h"
61 #include "Hooks.h"
62 #include "Schedule.h"
63 #include "StgMiscClosures.h"
64 #include "Storage.h"
65 #include "Evaluator.h"
66 #include "Exception.h"
67 #include "Printer.h"
68 #include "Main.h"
69 #include "Signals.h"
70 #include "Sanity.h"
71 #include "Stats.h"
72 #include "Itimer.h"
73 #include "Prelude.h"
74 #if defined(GRAN) || defined(PAR)
75 # include "GranSimRts.h"
76 # include "GranSim.h"
77 # include "ParallelRts.h"
78 # include "Parallel.h"
79 # include "ParallelDebug.h"
80 # include "FetchMe.h"
81 # include "HLC.h"
82 #endif
83 #include "Sparks.h"
84
85 #include <stdarg.h>
86
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
89
90 /* Main threads:
91  *
92  * These are the threads which clients have requested that we run.  
93  *
94  * In an SMP build, we might have several concurrent clients all
95  * waiting for results, and each one will wait on a condition variable
96  * until the result is available.
97  *
98  * In non-SMP, clients are strictly nested: the first client calls
99  * into the RTS, which might call out again to C with a _ccall_GC, and
100  * eventually re-enter the RTS.
101  *
102  * Main threads information is kept in a linked list:
103  */
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
106   StgTSO *         tso;
107   SchedulerStatus  stat;
108   StgClosure **    ret;
109 #ifdef SMP
110   pthread_cond_t wakeup;
111 #endif
112   struct StgMainThread_ *link;
113 } StgMainThread;
114
115 /* Main thread queue.
116  * Locks required: sched_mutex.
117  */
118 static StgMainThread *main_threads;
119
120 /* Thread queues.
121  * Locks required: sched_mutex.
122  */
123 #if defined(GRAN)
124
125 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
126 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
127
128 /* 
129    In GranSim we have a runable and a blocked queue for each processor.
130    In order to minimise code changes new arrays run_queue_hds/tls
131    are created. run_queue_hd is then a short cut (macro) for
132    run_queue_hds[CurrentProc] (see GranSim.h).
133    -- HWL
134 */
135 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
136 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
137 StgTSO *ccalling_threadss[MAX_PROC];
138 /* We use the same global list of threads (all_threads) in GranSim as in
139    the std RTS (i.e. we are cheating). However, we don't use this list in
140    the GranSim specific code at the moment (so we are only potentially
141    cheating).  */
142
143 #else /* !GRAN */
144
145 StgTSO *run_queue_hd, *run_queue_tl;
146 StgTSO *blocked_queue_hd, *blocked_queue_tl;
147
148 #endif
149
150 /* Linked list of all threads.
151  * Used for detecting garbage collected threads.
152  */
153 StgTSO *all_threads;
154
155 /* Threads suspended in _ccall_GC.
156  */
157 static StgTSO *suspended_ccalling_threads;
158
159 static void GetRoots(void);
160 static StgTSO *threadStackOverflow(StgTSO *tso);
161
162 /* KH: The following two flags are shared memory locations.  There is no need
163        to lock them, since they are only unset at the end of a scheduler
164        operation.
165 */
166
167 /* flag set by signal handler to precipitate a context switch */
168 //@cindex context_switch
169 nat context_switch;
170
171 /* if this flag is set as well, give up execution */
172 //@cindex interrupted
173 rtsBool interrupted;
174
175 /* Next thread ID to allocate.
176  * Locks required: sched_mutex
177  */
178 //@cindex next_thread_id
179 StgThreadID next_thread_id = 1;
180
181 /*
182  * Pointers to the state of the current thread.
183  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
184  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
185  */
186  
187 /* The smallest stack size that makes any sense is:
188  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
189  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
190  *  + 1                       (the realworld token for an IO thread)
191  *  + 1                       (the closure to enter)
192  *
193  * A thread with this stack will bomb immediately with a stack
194  * overflow, which will increase its stack size.  
195  */
196
197 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
198
199 /* Free capability list.
200  * Locks required: sched_mutex.
201  */
202 #ifdef SMP
203 //@cindex free_capabilities
204 //@cindex n_free_capabilities
205 Capability *free_capabilities; /* Available capabilities for running threads */
206 nat n_free_capabilities;       /* total number of available capabilities */
207 #else
208 //@cindex MainRegTable
209 Capability MainRegTable;       /* for non-SMP, we have one global capability */
210 #endif
211
212 #if defined(GRAN)
213 StgTSO *CurrentTSO;
214 #endif
215
216 rtsBool ready_to_gc;
217
218 /* All our current task ids, saved in case we need to kill them later.
219  */
220 #ifdef SMP
221 //@cindex task_ids
222 task_info *task_ids;
223 #endif
224
225 void            addToBlockedQueue ( StgTSO *tso );
226
227 static void     schedule          ( void );
228        void     interruptStgRts   ( void );
229 #if defined(GRAN)
230 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
231 #else
232 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
233 #endif
234
235 #ifdef DEBUG
236 static void sched_belch(char *s, ...);
237 #endif
238
239 #ifdef SMP
240 //@cindex sched_mutex
241 //@cindex term_mutex
242 //@cindex thread_ready_cond
243 //@cindex gc_pending_cond
244 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
245 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
246 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
247 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
248
249 nat await_death;
250 #endif
251
252 #if defined(PAR)
253 StgTSO *LastTSO;
254 rtsTime TimeOfLastYield;
255 #endif
256
257 #if DEBUG
258 char *whatNext_strs[] = {
259   "ThreadEnterGHC",
260   "ThreadRunGHC",
261   "ThreadEnterHugs",
262   "ThreadKilled",
263   "ThreadComplete"
264 };
265
266 char *threadReturnCode_strs[] = {
267   "HeapOverflow",                       /* might also be StackOverflow */
268   "StackOverflow",
269   "ThreadYielding",
270   "ThreadBlocked",
271   "ThreadFinished"
272 };
273 #endif
274
275 /*
276  * The thread state for the main thread.
277 // ToDo: check whether not needed any more
278 StgTSO   *MainTSO;
279  */
280
281 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
282 //@subsection Main scheduling loop
283
284 /* ---------------------------------------------------------------------------
285    Main scheduling loop.
286
287    We use round-robin scheduling, each thread returning to the
288    scheduler loop when one of these conditions is detected:
289
290       * out of heap space
291       * timer expires (thread yields)
292       * thread blocks
293       * thread ends
294       * stack overflow
295
296    Locking notes:  we acquire the scheduler lock once at the beginning
297    of the scheduler loop, and release it when
298     
299       * running a thread, or
300       * waiting for work, or
301       * waiting for a GC to complete.
302
303    GRAN version:
304      In a GranSim setup this loop iterates over the global event queue.
305      This revolves around the global event queue, which determines what 
306      to do next. Therefore, it's more complicated than either the 
307      concurrent or the parallel (GUM) setup.
308
309    GUM version:
310      GUM iterates over incoming messages.
311      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
312      and sends out a fish whenever it has nothing to do; in-between
313      doing the actual reductions (shared code below) it processes the
314      incoming messages and deals with delayed operations 
315      (see PendingFetches).
316      This is not the ugliest code you could imagine, but it's bloody close.
317
318    ------------------------------------------------------------------------ */
319 //@cindex schedule
320 static void
321 schedule( void )
322 {
323   StgTSO *t;
324   Capability *cap;
325   StgThreadReturnCode ret;
326 #if defined(GRAN)
327   rtsEvent *event;
328 #elif defined(PAR)
329   StgSparkPool *pool;
330   rtsSpark spark;
331   StgTSO *tso;
332   GlobalTaskId pe;
333 #endif
334   rtsBool was_interrupted = rtsFalse;
335   
336   ACQUIRE_LOCK(&sched_mutex);
337
338 #if defined(GRAN)
339
340   /* set up first event to get things going */
341   /* ToDo: assign costs for system setup and init MainTSO ! */
342   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
343             ContinueThread, 
344             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
345
346   IF_DEBUG(gran,
347            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
348            G_TSO(CurrentTSO, 5));
349
350   if (RtsFlags.GranFlags.Light) {
351     /* Save current time; GranSim Light only */
352     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
353   }      
354
355   event = get_next_event();
356
357   while (event!=(rtsEvent*)NULL) {
358     /* Choose the processor with the next event */
359     CurrentProc = event->proc;
360     CurrentTSO = event->tso;
361
362 #elif defined(PAR)
363
364   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
365
366 #else
367
368   while (1) {
369
370 #endif
371
372     IF_DEBUG(scheduler, printAllThreads());
373
374     /* If we're interrupted (the user pressed ^C, or some other
375      * termination condition occurred), kill all the currently running
376      * threads.
377      */
378     if (interrupted) {
379       IF_DEBUG(scheduler, sched_belch("interrupted"));
380       for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
381         deleteThread(t);
382       }
383       for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
384         deleteThread(t);
385       }
386       run_queue_hd = run_queue_tl = END_TSO_QUEUE;
387       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
388       interrupted = rtsFalse;
389       was_interrupted = rtsTrue;
390     }
391
392     /* Go through the list of main threads and wake up any
393      * clients whose computations have finished.  ToDo: this
394      * should be done more efficiently without a linear scan
395      * of the main threads list, somehow...
396      */
397 #ifdef SMP
398     { 
399       StgMainThread *m, **prev;
400       prev = &main_threads;
401       for (m = main_threads; m != NULL; m = m->link) {
402         switch (m->tso->what_next) {
403         case ThreadComplete:
404           if (m->ret) {
405             *(m->ret) = (StgClosure *)m->tso->sp[0];
406           }
407           *prev = m->link;
408           m->stat = Success;
409           pthread_cond_broadcast(&m->wakeup);
410           break;
411         case ThreadKilled:
412           *prev = m->link;
413           if (was_interrupted) {
414             m->stat = Interrupted;
415           } else {
416             m->stat = Killed;
417           }
418           pthread_cond_broadcast(&m->wakeup);
419           break;
420         default:
421           break;
422         }
423       }
424     }
425
426 #else
427 # if defined(PAR)
428     /* in GUM do this only on the Main PE */
429     if (IAmMainThread)
430 # endif
431     /* If our main thread has finished or been killed, return.
432      */
433     {
434       StgMainThread *m = main_threads;
435       if (m->tso->what_next == ThreadComplete
436           || m->tso->what_next == ThreadKilled) {
437         main_threads = main_threads->link;
438         if (m->tso->what_next == ThreadComplete) {
439           /* we finished successfully, fill in the return value */
440           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
441           m->stat = Success;
442           return;
443         } else {
444           if (was_interrupted) {
445             m->stat = Interrupted;
446           } else {
447             m->stat = Killed;
448           }
449           return;
450         }
451       }
452     }
453 #endif
454
455     /* Top up the run queue from our spark pool.  We try to make the
456      * number of threads in the run queue equal to the number of
457      * free capabilities.
458      */
459 #if defined(SMP)
460     {
461       nat n = n_free_capabilities;
462       StgTSO *tso = run_queue_hd;
463
464       /* Count the run queue */
465       while (n > 0 && tso != END_TSO_QUEUE) {
466         tso = tso->link;
467         n--;
468       }
469
470       for (; n > 0; n--) {
471         StgClosure *spark;
472         spark = findSpark();
473         if (spark == NULL) {
474           break; /* no more sparks in the pool */
475         } else {
476           /* I'd prefer this to be done in activateSpark -- HWL */
477           /* tricky - it needs to hold the scheduler lock and
478            * not try to re-acquire it -- SDM */
479           StgTSO *tso;
480           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
481           pushClosure(tso,spark);
482           PUSH_ON_RUN_QUEUE(tso);
483 #ifdef PAR
484           advisory_thread_count++;
485 #endif
486           
487           IF_DEBUG(scheduler,
488                    sched_belch("turning spark of closure %p into a thread",
489                                (StgClosure *)spark));
490         }
491       }
492       /* We need to wake up the other tasks if we just created some
493        * work for them.
494        */
495       if (n_free_capabilities - n > 1) {
496           pthread_cond_signal(&thread_ready_cond);
497       }
498     }
499 #endif /* SMP */
500
501     /* Check whether any waiting threads need to be woken up.  If the
502      * run queue is empty, and there are no other tasks running, we
503      * can wait indefinitely for something to happen.
504      * ToDo: what if another client comes along & requests another
505      * main thread?
506      */
507     if (blocked_queue_hd != END_TSO_QUEUE) {
508       awaitEvent(
509            (run_queue_hd == END_TSO_QUEUE)
510 #ifdef SMP
511         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
512 #endif
513         );
514     }
515     
516     /* check for signals each time around the scheduler */
517 #ifndef __MINGW32__
518     if (signals_pending()) {
519       start_signal_handlers();
520     }
521 #endif
522
523     /* Detect deadlock: when we have no threads to run, there are
524      * no threads waiting on I/O or sleeping, and all the other
525      * tasks are waiting for work, we must have a deadlock.  Inform
526      * all the main threads.
527      */
528 #ifdef SMP
529     if (blocked_queue_hd == END_TSO_QUEUE
530         && run_queue_hd == END_TSO_QUEUE
531         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
532         ) {
533       StgMainThread *m;
534       for (m = main_threads; m != NULL; m = m->link) {
535           m->ret = NULL;
536           m->stat = Deadlock;
537           pthread_cond_broadcast(&m->wakeup);
538       }
539       main_threads = NULL;
540     }
541 #else /* ! SMP */
542     /* 
543        In GUM all non-main PEs come in here with no work;
544        we ignore multiple main threads for now 
545
546     if (blocked_queue_hd == END_TSO_QUEUE
547         && run_queue_hd == END_TSO_QUEUE) {
548       StgMainThread *m = main_threads;
549       m->ret = NULL;
550       m->stat = Deadlock;
551       main_threads = m->link;
552       return;
553     }
554     */
555 #endif
556
557 #ifdef SMP
558     /* If there's a GC pending, don't do anything until it has
559      * completed.
560      */
561     if (ready_to_gc) {
562       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
563       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
564     }
565     
566     /* block until we've got a thread on the run queue and a free
567      * capability.
568      */
569     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
570       IF_DEBUG(scheduler, sched_belch("waiting for work"));
571       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
572       IF_DEBUG(scheduler, sched_belch("work now available"));
573     }
574 #endif
575
576 #if defined(GRAN)
577
578     if (RtsFlags.GranFlags.Light)
579       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
580
581     /* adjust time based on time-stamp */
582     if (event->time > CurrentTime[CurrentProc] &&
583         event->evttype != ContinueThread)
584       CurrentTime[CurrentProc] = event->time;
585     
586     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
587     if (!RtsFlags.GranFlags.Light)
588       handleIdlePEs();
589
590     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"))
591
592     /* main event dispatcher in GranSim */
593     switch (event->evttype) {
594       /* Should just be continuing execution */
595     case ContinueThread:
596       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
597       /* ToDo: check assertion
598       ASSERT(run_queue_hd != (StgTSO*)NULL &&
599              run_queue_hd != END_TSO_QUEUE);
600       */
601       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
602       if (!RtsFlags.GranFlags.DoAsyncFetch &&
603           procStatus[CurrentProc]==Fetching) {
604         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
605               CurrentTSO->id, CurrentTSO, CurrentProc);
606         goto next_thread;
607       } 
608       /* Ignore ContinueThreads for completed threads */
609       if (CurrentTSO->what_next == ThreadComplete) {
610         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
611               CurrentTSO->id, CurrentTSO, CurrentProc);
612         goto next_thread;
613       } 
614       /* Ignore ContinueThreads for threads that are being migrated */
615       if (PROCS(CurrentTSO)==Nowhere) { 
616         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
617               CurrentTSO->id, CurrentTSO, CurrentProc);
618         goto next_thread;
619       }
620       /* The thread should be at the beginning of the run queue */
621       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
622         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
623               CurrentTSO->id, CurrentTSO, CurrentProc);
624         break; // run the thread anyway
625       }
626       /*
627       new_event(proc, proc, CurrentTime[proc],
628                 FindWork,
629                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
630       goto next_thread; 
631       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
632       break; // now actually run the thread; DaH Qu'vam yImuHbej 
633
634     case FetchNode:
635       do_the_fetchnode(event);
636       goto next_thread;             /* handle next event in event queue  */
637       
638     case GlobalBlock:
639       do_the_globalblock(event);
640       goto next_thread;             /* handle next event in event queue  */
641       
642     case FetchReply:
643       do_the_fetchreply(event);
644       goto next_thread;             /* handle next event in event queue  */
645       
646     case UnblockThread:   /* Move from the blocked queue to the tail of */
647       do_the_unblock(event);
648       goto next_thread;             /* handle next event in event queue  */
649       
650     case ResumeThread:  /* Move from the blocked queue to the tail of */
651       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
652       event->tso->gran.blocktime += 
653         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
654       do_the_startthread(event);
655       goto next_thread;             /* handle next event in event queue  */
656       
657     case StartThread:
658       do_the_startthread(event);
659       goto next_thread;             /* handle next event in event queue  */
660       
661     case MoveThread:
662       do_the_movethread(event);
663       goto next_thread;             /* handle next event in event queue  */
664       
665     case MoveSpark:
666       do_the_movespark(event);
667       goto next_thread;             /* handle next event in event queue  */
668       
669     case FindWork:
670       do_the_findwork(event);
671       goto next_thread;             /* handle next event in event queue  */
672       
673     default:
674       barf("Illegal event type %u\n", event->evttype);
675     }  /* switch */
676     
677     /* This point was scheduler_loop in the old RTS */
678
679     IF_DEBUG(gran, belch("GRAN: after main switch"));
680
681     TimeOfLastEvent = CurrentTime[CurrentProc];
682     TimeOfNextEvent = get_time_of_next_event();
683     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
684     // CurrentTSO = ThreadQueueHd;
685
686     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
687                          TimeOfNextEvent));
688
689     if (RtsFlags.GranFlags.Light) 
690       GranSimLight_leave_system(event, &ActiveTSO); 
691
692     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
693
694     IF_DEBUG(gran, 
695              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
696
697     /* in a GranSim setup the TSO stays on the run queue */
698     t = CurrentTSO;
699     /* Take a thread from the run queue. */
700     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
701
702     IF_DEBUG(gran, 
703              fprintf(stderr, "GRAN: About to run current thread, which is\n");
704              G_TSO(t,5))
705
706     context_switch = 0; // turned on via GranYield, checking events and time slice
707
708     IF_DEBUG(gran, 
709              DumpGranEvent(GR_SCHEDULE, t));
710
711     procStatus[CurrentProc] = Busy;
712
713 #elif defined(PAR)
714
715     if (PendingFetches != END_BF_QUEUE) {
716         processFetches();
717     }
718
719     /* ToDo: phps merge with spark activation above */
720     /* check whether we have local work and send requests if we have none */
721     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
722       /* :-[  no local threads => look out for local sparks */
723       /* the spark pool for the current PE */
724       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
725       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
726           pool->hd < pool->tl) {
727         /* 
728          * ToDo: add GC code check that we really have enough heap afterwards!!
729          * Old comment:
730          * If we're here (no runnable threads) and we have pending
731          * sparks, we must have a space problem.  Get enough space
732          * to turn one of those pending sparks into a
733          * thread... 
734          */
735         
736         spark = findSpark();                /* get a spark */
737         if (spark != (rtsSpark) NULL) {
738           tso = activateSpark(spark);       /* turn the spark into a thread */
739           IF_PAR_DEBUG(schedule,
740                        belch("==== schedule: Created TSO %d (%p); %d threads active",
741                              tso->id, tso, advisory_thread_count));
742
743           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
744             belch("==^^ failed to activate spark");
745             goto next_thread;
746           }               /* otherwise fall through & pick-up new tso */
747         } else {
748           IF_PAR_DEBUG(verbose,
749                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
750                              spark_queue_len(pool)));
751           goto next_thread;
752         }
753       } else  
754       /* =8-[  no local sparks => look for work on other PEs */
755       {
756         /*
757          * We really have absolutely no work.  Send out a fish
758          * (there may be some out there already), and wait for
759          * something to arrive.  We clearly can't run any threads
760          * until a SCHEDULE or RESUME arrives, and so that's what
761          * we're hoping to see.  (Of course, we still have to
762          * respond to other types of messages.)
763          */
764         if (//!fishing &&  
765             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
766           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
767           /* fishing set in sendFish, processFish;
768              avoid flooding system with fishes via delay */
769           pe = choosePE();
770           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
771                    NEW_FISH_HUNGER);
772         }
773         
774         processMessages();
775         goto next_thread;
776         // ReSchedule(0);
777       }
778     } else if (PacketsWaiting()) {  /* Look for incoming messages */
779       processMessages();
780     }
781
782     /* Now we are sure that we have some work available */
783     ASSERT(run_queue_hd != END_TSO_QUEUE);
784     /* Take a thread from the run queue, if we have work */
785     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
786
787     /* ToDo: write something to the log-file
788     if (RTSflags.ParFlags.granSimStats && !sameThread)
789         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
790
791     CurrentTSO = t;
792     */
793     /* the spark pool for the current PE */
794     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
795
796     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; base=%x, lim=%x)", 
797                               spark_queue_len(pool), 
798                               CURRENT_PROC,
799                               pool->hd, pool->tl, pool->base, pool->lim));
800
801     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
802                               run_queue_len(), CURRENT_PROC,
803                               run_queue_hd, run_queue_tl));
804
805 #if 0
806     if (t != LastTSO) {
807       /* 
808          we are running a different TSO, so write a schedule event to log file
809          NB: If we use fair scheduling we also have to write  a deschedule 
810              event for LastTSO; with unfair scheduling we know that the
811              previous tso has blocked whenever we switch to another tso, so
812              we don't need it in GUM for now
813       */
814       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
815                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
816       
817     }
818 #endif
819 #else /* !GRAN && !PAR */
820   
821     /* grab a thread from the run queue
822      */
823     t = POP_RUN_QUEUE();
824     IF_DEBUG(sanity,checkTSO(t));
825
826 #endif
827     
828     /* grab a capability
829      */
830 #ifdef SMP
831     cap = free_capabilities;
832     free_capabilities = cap->link;
833     n_free_capabilities--;
834 #else
835     cap = &MainRegTable;
836 #endif
837     
838     cap->rCurrentTSO = t;
839     
840     /* set the context_switch flag
841      */
842     if (run_queue_hd == END_TSO_QUEUE)
843       context_switch = 0;
844     else
845       context_switch = 1;
846
847     RELEASE_LOCK(&sched_mutex);
848
849 #if defined(GRAN) || defined(PAR)    
850     IF_DEBUG(scheduler, belch("-->> Running TSO %ld (%p) %s ...", 
851                               t->id, t, whatNext_strs[t->what_next]));
852 #else
853     IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
854 #endif
855
856     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
857     /* Run the current thread 
858      */
859     switch (cap->rCurrentTSO->what_next) {
860     case ThreadKilled:
861     case ThreadComplete:
862       /* Thread already finished, return to scheduler. */
863       ret = ThreadFinished;
864       break;
865     case ThreadEnterGHC:
866       ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
867       break;
868     case ThreadRunGHC:
869       ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
870       break;
871     case ThreadEnterHugs:
872 #ifdef INTERPRETER
873       {
874          StgClosure* c;
875          IF_DEBUG(scheduler,sched_belch("entering Hugs"));
876          c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
877          cap->rCurrentTSO->sp += 1;
878          ret = enter(cap,c);
879          break;
880       }
881 #else
882       barf("Panic: entered a BCO but no bytecode interpreter in this build");
883 #endif
884     default:
885       barf("schedule: invalid what_next field");
886     }
887     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
888     
889     /* Costs for the scheduler are assigned to CCS_SYSTEM */
890 #ifdef PROFILING
891     CCCS = CCS_SYSTEM;
892 #endif
893     
894     ACQUIRE_LOCK(&sched_mutex);
895
896 #ifdef SMP
897     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
898 #elif !defined(GRAN) && !defined(PAR)
899     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
900 #endif
901     t = cap->rCurrentTSO;
902     
903 #if defined(PAR)
904     /* HACK 675: if the last thread didn't yield, make sure to print a 
905        SCHEDULE event to the log file when StgRunning the next thread, even
906        if it is the same one as before */
907     LastTSO = t; //(ret == ThreadBlocked) ? END_TSO_QUEUE : t; 
908     TimeOfLastYield = CURRENT_TIME;
909 #endif
910
911     switch (ret) {
912     case HeapOverflow:
913       /* make all the running tasks block on a condition variable,
914        * maybe set context_switch and wait till they all pile in,
915        * then have them wait on a GC condition variable.
916        */
917       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
918                                t->id, t, whatNext_strs[t->what_next]));
919       threadPaused(t);
920 #if defined(GRAN)
921       ASSERT(!is_on_queue(t,CurrentProc));
922 #endif
923       
924       ready_to_gc = rtsTrue;
925       context_switch = 1;               /* stop other threads ASAP */
926       PUSH_ON_RUN_QUEUE(t);
927       /* actual GC is done at the end of the while loop */
928       break;
929       
930     case StackOverflow:
931       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
932                                t->id, t, whatNext_strs[t->what_next]));
933       /* just adjust the stack for this thread, then pop it back
934        * on the run queue.
935        */
936       threadPaused(t);
937       { 
938         StgMainThread *m;
939         /* enlarge the stack */
940         StgTSO *new_t = threadStackOverflow(t);
941         
942         /* This TSO has moved, so update any pointers to it from the
943          * main thread stack.  It better not be on any other queues...
944          * (it shouldn't be).
945          */
946         for (m = main_threads; m != NULL; m = m->link) {
947           if (m->tso == t) {
948             m->tso = new_t;
949           }
950         }
951         threadPaused(new_t);
952         PUSH_ON_RUN_QUEUE(new_t);
953       }
954       break;
955
956     case ThreadYielding:
957 #if defined(GRAN)
958       IF_DEBUG(gran, 
959                DumpGranEvent(GR_DESCHEDULE, t));
960       globalGranStats.tot_yields++;
961 #elif defined(PAR)
962       IF_DEBUG(par, 
963                DumpGranEvent(GR_DESCHEDULE, t));
964 #endif
965       /* put the thread back on the run queue.  Then, if we're ready to
966        * GC, check whether this is the last task to stop.  If so, wake
967        * up the GC thread.  getThread will block during a GC until the
968        * GC is finished.
969        */
970       IF_DEBUG(scheduler,
971                if (t->what_next == ThreadEnterHugs) {
972                    /* ToDo: or maybe a timer expired when we were in Hugs?
973                     * or maybe someone hit ctrl-C
974                     */
975                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
976                          t->id, t, whatNext_strs[t->what_next]);
977                } else {
978                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
979                          t->id, t, whatNext_strs[t->what_next]);
980                }
981                );
982       threadPaused(t);
983       IF_DEBUG(sanity,
984                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
985                checkTSO(t));
986       ASSERT(t->link == END_TSO_QUEUE);
987 #if defined(GRAN)
988       ASSERT(!is_on_queue(t,CurrentProc));
989
990       IF_DEBUG(sanity,
991                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
992                checkThreadQsSanity(rtsTrue));
993 #endif
994       APPEND_TO_RUN_QUEUE(t);
995 #if defined(GRAN)
996       /* add a ContinueThread event to actually process the thread */
997       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
998                 ContinueThread,
999                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1000       IF_GRAN_DEBUG(bq, 
1001                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1002                G_EVENTQ(0);
1003                G_CURR_THREADQ(0))
1004 #endif /* GRAN */
1005       break;
1006       
1007     case ThreadBlocked:
1008 #if defined(GRAN)
1009       IF_DEBUG(scheduler,
1010                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1011                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1012                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1013
1014       // ??? needed; should emit block before
1015       IF_DEBUG(gran, 
1016                DumpGranEvent(GR_DESCHEDULE, t)); 
1017       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1018       /*
1019         ngoq Dogh!
1020       ASSERT(procStatus[CurrentProc]==Busy || 
1021               ((procStatus[CurrentProc]==Fetching) && 
1022               (t->block_info.closure!=(StgClosure*)NULL)));
1023       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1024           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1025             procStatus[CurrentProc]==Fetching)) 
1026         procStatus[CurrentProc] = Idle;
1027       */
1028 #elif defined(PAR)
1029       IF_DEBUG(par, 
1030                DumpGranEvent(GR_DESCHEDULE, t)); 
1031
1032       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1033       blockThread(t);
1034
1035       IF_DEBUG(scheduler,
1036                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1037                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure);
1038                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1039
1040 #else /* !GRAN */
1041       /* don't need to do anything.  Either the thread is blocked on
1042        * I/O, in which case we'll have called addToBlockedQueue
1043        * previously, or it's blocked on an MVar or Blackhole, in which
1044        * case it'll be on the relevant queue already.
1045        */
1046       IF_DEBUG(scheduler,
1047                fprintf(stderr, "--<< thread %d (%p) stopped ", t->id, t);
1048                printThreadBlockage(t);
1049                fprintf(stderr, "\n"));
1050
1051       /* Only for dumping event to log file 
1052          ToDo: do I need this in GranSim, too?
1053       blockThread(t);
1054       */
1055 #endif
1056       threadPaused(t);
1057       break;
1058       
1059     case ThreadFinished:
1060       /* Need to check whether this was a main thread, and if so, signal
1061        * the task that started it with the return value.  If we have no
1062        * more main threads, we probably need to stop all the tasks until
1063        * we get a new one.
1064        */
1065       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1066       t->what_next = ThreadComplete;
1067 #if defined(GRAN)
1068       endThread(t, CurrentProc); // clean-up the thread
1069 #elif defined(PAR)
1070       advisory_thread_count--;
1071       if (RtsFlags.ParFlags.ParStats.Full) 
1072         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1073 #endif
1074       break;
1075       
1076     default:
1077       barf("doneThread: invalid thread return code");
1078     }
1079     
1080 #ifdef SMP
1081     cap->link = free_capabilities;
1082     free_capabilities = cap;
1083     n_free_capabilities++;
1084 #endif
1085
1086 #ifdef SMP
1087     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
1088 #else
1089     if (ready_to_gc) 
1090 #endif
1091       {
1092       /* everybody back, start the GC.
1093        * Could do it in this thread, or signal a condition var
1094        * to do it in another thread.  Either way, we need to
1095        * broadcast on gc_pending_cond afterward.
1096        */
1097 #ifdef SMP
1098       IF_DEBUG(scheduler,sched_belch("doing GC"));
1099 #endif
1100       GarbageCollect(GetRoots);
1101       ready_to_gc = rtsFalse;
1102 #ifdef SMP
1103       pthread_cond_broadcast(&gc_pending_cond);
1104 #endif
1105 #if defined(GRAN)
1106       /* add a ContinueThread event to continue execution of current thread */
1107       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1108                 ContinueThread,
1109                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1110       IF_GRAN_DEBUG(bq, 
1111                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1112                G_EVENTQ(0);
1113                G_CURR_THREADQ(0))
1114 #endif /* GRAN */
1115     }
1116 #if defined(GRAN)
1117   next_thread:
1118     IF_GRAN_DEBUG(unused,
1119                   print_eventq(EventHd));
1120
1121     event = get_next_event();
1122
1123 #elif defined(PAR)
1124   next_thread:
1125     /* ToDo: wait for next message to arrive rather than busy wait */
1126
1127 #else /* GRAN */
1128   /* not any more
1129   next_thread:
1130     t = take_off_run_queue(END_TSO_QUEUE);
1131   */
1132 #endif /* GRAN */
1133   } /* end of while(1) */
1134 }
1135
1136 /* A hack for Hugs concurrency support.  Needs sanitisation (?) */
1137 void deleteAllThreads ( void )
1138 {
1139   StgTSO* t;
1140   IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
1141   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1142     deleteThread(t);
1143   }
1144   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1145     deleteThread(t);
1146   }
1147   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1148   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1149 }
1150
1151 /* startThread and  insertThread are now in GranSim.c -- HWL */
1152
1153 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1154 //@subsection Suspend and Resume
1155
1156 /* ---------------------------------------------------------------------------
1157  * Suspending & resuming Haskell threads.
1158  * 
1159  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1160  * its capability before calling the C function.  This allows another
1161  * task to pick up the capability and carry on running Haskell
1162  * threads.  It also means that if the C call blocks, it won't lock
1163  * the whole system.
1164  *
1165  * The Haskell thread making the C call is put to sleep for the
1166  * duration of the call, on the susepended_ccalling_threads queue.  We
1167  * give out a token to the task, which it can use to resume the thread
1168  * on return from the C function.
1169  * ------------------------------------------------------------------------- */
1170    
1171 StgInt
1172 suspendThread( Capability *cap )
1173 {
1174   nat tok;
1175
1176   ACQUIRE_LOCK(&sched_mutex);
1177
1178   IF_DEBUG(scheduler,
1179            sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
1180
1181   threadPaused(cap->rCurrentTSO);
1182   cap->rCurrentTSO->link = suspended_ccalling_threads;
1183   suspended_ccalling_threads = cap->rCurrentTSO;
1184
1185   /* Use the thread ID as the token; it should be unique */
1186   tok = cap->rCurrentTSO->id;
1187
1188 #ifdef SMP
1189   cap->link = free_capabilities;
1190   free_capabilities = cap;
1191   n_free_capabilities++;
1192 #endif
1193
1194   RELEASE_LOCK(&sched_mutex);
1195   return tok; 
1196 }
1197
1198 Capability *
1199 resumeThread( StgInt tok )
1200 {
1201   StgTSO *tso, **prev;
1202   Capability *cap;
1203
1204   ACQUIRE_LOCK(&sched_mutex);
1205
1206   prev = &suspended_ccalling_threads;
1207   for (tso = suspended_ccalling_threads; 
1208        tso != END_TSO_QUEUE; 
1209        prev = &tso->link, tso = tso->link) {
1210     if (tso->id == (StgThreadID)tok) {
1211       *prev = tso->link;
1212       break;
1213     }
1214   }
1215   if (tso == END_TSO_QUEUE) {
1216     barf("resumeThread: thread not found");
1217   }
1218
1219 #ifdef SMP
1220   while (free_capabilities == NULL) {
1221     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1222     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
1223     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1224   }
1225   cap = free_capabilities;
1226   free_capabilities = cap->link;
1227   n_free_capabilities--;
1228 #else  
1229   cap = &MainRegTable;
1230 #endif
1231
1232   cap->rCurrentTSO = tso;
1233
1234   RELEASE_LOCK(&sched_mutex);
1235   return cap;
1236 }
1237
1238
1239 /* ---------------------------------------------------------------------------
1240  * Static functions
1241  * ------------------------------------------------------------------------ */
1242 static void unblockThread(StgTSO *tso);
1243
1244 /* ---------------------------------------------------------------------------
1245  * Comparing Thread ids.
1246  *
1247  * This is used from STG land in the implementation of the
1248  * instances of Eq/Ord for ThreadIds.
1249  * ------------------------------------------------------------------------ */
1250
1251 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1252
1253   StgThreadID id1 = tso1->id; 
1254   StgThreadID id2 = tso2->id;
1255  
1256   if (id1 < id2) return (-1);
1257   if (id1 > id2) return 1;
1258   return 0;
1259 }
1260
1261 /* ---------------------------------------------------------------------------
1262    Create a new thread.
1263
1264    The new thread starts with the given stack size.  Before the
1265    scheduler can run, however, this thread needs to have a closure
1266    (and possibly some arguments) pushed on its stack.  See
1267    pushClosure() in Schedule.h.
1268
1269    createGenThread() and createIOThread() (in SchedAPI.h) are
1270    convenient packaged versions of this function.
1271
1272    currently pri (priority) is only used in a GRAN setup -- HWL
1273    ------------------------------------------------------------------------ */
1274 //@cindex createThread
1275 #if defined(GRAN)
1276 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1277 StgTSO *
1278 createThread(nat stack_size, StgInt pri)
1279 {
1280   return createThread_(stack_size, rtsFalse, pri);
1281 }
1282
1283 static StgTSO *
1284 createThread_(nat size, rtsBool have_lock, StgInt pri)
1285 {
1286 #else
1287 StgTSO *
1288 createThread(nat stack_size)
1289 {
1290   return createThread_(stack_size, rtsFalse);
1291 }
1292
1293 static StgTSO *
1294 createThread_(nat size, rtsBool have_lock)
1295 {
1296 #endif
1297
1298     StgTSO *tso;
1299     nat stack_size;
1300
1301     /* First check whether we should create a thread at all */
1302 #if defined(PAR)
1303   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1304   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1305     threadsIgnored++;
1306     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1307           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1308     return END_TSO_QUEUE;
1309   }
1310   threadsCreated++;
1311 #endif
1312
1313 #if defined(GRAN)
1314   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1315 #endif
1316
1317   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1318
1319   /* catch ridiculously small stack sizes */
1320   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1321     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1322   }
1323
1324   stack_size = size - TSO_STRUCT_SIZEW;
1325
1326   tso = (StgTSO *)allocate(size);
1327   TICK_ALLOC_TSO(size-TSO_STRUCT_SIZEW, 0);
1328
1329   SET_HDR(tso, &TSO_info, CCS_MAIN);
1330 #if defined(GRAN)
1331   SET_GRAN_HDR(tso, ThisPE);
1332 #endif
1333   tso->what_next     = ThreadEnterGHC;
1334
1335   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1336    * protect the increment operation on next_thread_id.
1337    * In future, we could use an atomic increment instead.
1338    */
1339   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1340   tso->id = next_thread_id++; 
1341   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1342
1343   tso->why_blocked  = NotBlocked;
1344   tso->blocked_exceptions = NULL;
1345
1346   tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1347   tso->stack_size   = stack_size;
1348   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1349                               - TSO_STRUCT_SIZEW;
1350   tso->sp           = (P_)&(tso->stack) + stack_size;
1351
1352 #ifdef PROFILING
1353   tso->prof.CCCS = CCS_MAIN;
1354 #endif
1355
1356   /* put a stop frame on the stack */
1357   tso->sp -= sizeofW(StgStopFrame);
1358   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
1359   tso->su = (StgUpdateFrame*)tso->sp;
1360
1361   IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words", 
1362                            tso->id, tso, tso->stack_size));
1363
1364   // ToDo: check this
1365 #if defined(GRAN)
1366   tso->link = END_TSO_QUEUE;
1367   /* uses more flexible routine in GranSim */
1368   insertThread(tso, CurrentProc);
1369 #else
1370   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1371    * from its creation
1372    */
1373 #endif
1374
1375 #if defined(GRAN) || defined(PAR)
1376   DumpGranEvent(GR_START,tso);
1377 #endif
1378
1379   /* Link the new thread on the global thread list.
1380    */
1381   tso->global_link = all_threads;
1382   all_threads = tso;
1383
1384 #if defined(GRAN)
1385   tso->gran.pri = pri;
1386 # if defined(DEBUG)
1387   tso->gran.magic = TSO_MAGIC; // debugging only
1388 # endif
1389   tso->gran.sparkname   = 0;
1390   tso->gran.startedat   = CURRENT_TIME; 
1391   tso->gran.exported    = 0;
1392   tso->gran.basicblocks = 0;
1393   tso->gran.allocs      = 0;
1394   tso->gran.exectime    = 0;
1395   tso->gran.fetchtime   = 0;
1396   tso->gran.fetchcount  = 0;
1397   tso->gran.blocktime   = 0;
1398   tso->gran.blockcount  = 0;
1399   tso->gran.blockedat   = 0;
1400   tso->gran.globalsparks = 0;
1401   tso->gran.localsparks  = 0;
1402   if (RtsFlags.GranFlags.Light)
1403     tso->gran.clock  = Now; /* local clock */
1404   else
1405     tso->gran.clock  = 0;
1406
1407   IF_DEBUG(gran,printTSO(tso));
1408 #elif defined(PAR)
1409 # if defined(DEBUG)
1410   tso->par.magic = TSO_MAGIC; // debugging only
1411 # endif
1412   tso->par.sparkname   = 0;
1413   tso->par.startedat   = CURRENT_TIME; 
1414   tso->par.exported    = 0;
1415   tso->par.basicblocks = 0;
1416   tso->par.allocs      = 0;
1417   tso->par.exectime    = 0;
1418   tso->par.fetchtime   = 0;
1419   tso->par.fetchcount  = 0;
1420   tso->par.blocktime   = 0;
1421   tso->par.blockcount  = 0;
1422   tso->par.blockedat   = 0;
1423   tso->par.globalsparks = 0;
1424   tso->par.localsparks  = 0;
1425 #endif
1426
1427 #if defined(GRAN)
1428   globalGranStats.tot_threads_created++;
1429   globalGranStats.threads_created_on_PE[CurrentProc]++;
1430   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1431   globalGranStats.tot_sq_probes++;
1432 #endif 
1433
1434 #if defined(GRAN)
1435   IF_GRAN_DEBUG(pri,
1436                 belch("==__ schedule: Created TSO %d (%p);",
1437                       CurrentProc, tso, tso->id));
1438 #elif defined(PAR)
1439     IF_PAR_DEBUG(verbose,
1440                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1441                        tso->id, tso, advisory_thread_count));
1442 #else
1443   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1444                                  tso->id, tso->stack_size));
1445 #endif    
1446   return tso;
1447 }
1448
1449 /*
1450   Turn a spark into a thread.
1451   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1452 */
1453 #if defined(PAR)
1454 //@cindex activateSpark
1455 StgTSO *
1456 activateSpark (rtsSpark spark) 
1457 {
1458   StgTSO *tso;
1459   
1460   ASSERT(spark != (rtsSpark)NULL);
1461   tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1462   if (tso!=END_TSO_QUEUE) {
1463     pushClosure(tso,spark);
1464     PUSH_ON_RUN_QUEUE(tso);
1465     advisory_thread_count++;
1466
1467     if (RtsFlags.ParFlags.ParStats.Full) {
1468       //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1469       IF_PAR_DEBUG(verbose,
1470                    belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1471                          (StgClosure *)spark, info_type((StgClosure *)spark)));
1472     }
1473   } else {
1474     barf("activateSpark: Cannot create TSO");
1475   }
1476   // ToDo: fwd info on local/global spark to thread -- HWL
1477   // tso->gran.exported =  spark->exported;
1478   // tso->gran.locked =   !spark->global;
1479   // tso->gran.sparkname = spark->name;
1480
1481   return tso;
1482 }
1483 #endif
1484
1485 /* ---------------------------------------------------------------------------
1486  * scheduleThread()
1487  *
1488  * scheduleThread puts a thread on the head of the runnable queue.
1489  * This will usually be done immediately after a thread is created.
1490  * The caller of scheduleThread must create the thread using e.g.
1491  * createThread and push an appropriate closure
1492  * on this thread's stack before the scheduler is invoked.
1493  * ------------------------------------------------------------------------ */
1494
1495 void
1496 scheduleThread(StgTSO *tso)
1497 {
1498   if (tso==END_TSO_QUEUE){    
1499     schedule();
1500     return;
1501   }
1502
1503   ACQUIRE_LOCK(&sched_mutex);
1504
1505   /* Put the new thread on the head of the runnable queue.  The caller
1506    * better push an appropriate closure on this thread's stack
1507    * beforehand.  In the SMP case, the thread may start running as
1508    * soon as we release the scheduler lock below.
1509    */
1510   PUSH_ON_RUN_QUEUE(tso);
1511   THREAD_RUNNABLE();
1512
1513   IF_DEBUG(scheduler,printTSO(tso));
1514   RELEASE_LOCK(&sched_mutex);
1515 }
1516
1517 /* ---------------------------------------------------------------------------
1518  * startTasks()
1519  *
1520  * Start up Posix threads to run each of the scheduler tasks.
1521  * I believe the task ids are not needed in the system as defined.
1522  *  KH @ 25/10/99
1523  * ------------------------------------------------------------------------ */
1524
1525 #if defined(PAR) || defined(SMP)
1526 void *
1527 taskStart( void *arg STG_UNUSED )
1528 {
1529   rts_evalNothing(NULL);
1530 }
1531 #endif
1532
1533 /* ---------------------------------------------------------------------------
1534  * initScheduler()
1535  *
1536  * Initialise the scheduler.  This resets all the queues - if the
1537  * queues contained any threads, they'll be garbage collected at the
1538  * next pass.
1539  *
1540  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1541  * ------------------------------------------------------------------------ */
1542
1543 #ifdef SMP
1544 static void
1545 term_handler(int sig STG_UNUSED)
1546 {
1547   stat_workerStop();
1548   ACQUIRE_LOCK(&term_mutex);
1549   await_death--;
1550   RELEASE_LOCK(&term_mutex);
1551   pthread_exit(NULL);
1552 }
1553 #endif
1554
1555 //@cindex initScheduler
1556 void 
1557 initScheduler(void)
1558 {
1559 #if defined(GRAN)
1560   nat i;
1561
1562   for (i=0; i<=MAX_PROC; i++) {
1563     run_queue_hds[i]      = END_TSO_QUEUE;
1564     run_queue_tls[i]      = END_TSO_QUEUE;
1565     blocked_queue_hds[i]  = END_TSO_QUEUE;
1566     blocked_queue_tls[i]  = END_TSO_QUEUE;
1567     ccalling_threadss[i]  = END_TSO_QUEUE;
1568   }
1569 #else
1570   run_queue_hd      = END_TSO_QUEUE;
1571   run_queue_tl      = END_TSO_QUEUE;
1572   blocked_queue_hd  = END_TSO_QUEUE;
1573   blocked_queue_tl  = END_TSO_QUEUE;
1574 #endif 
1575
1576   suspended_ccalling_threads  = END_TSO_QUEUE;
1577
1578   main_threads = NULL;
1579   all_threads  = END_TSO_QUEUE;
1580
1581   context_switch = 0;
1582   interrupted    = 0;
1583
1584   enteredCAFs = END_CAF_LIST;
1585
1586   /* Install the SIGHUP handler */
1587 #ifdef SMP
1588   {
1589     struct sigaction action,oact;
1590
1591     action.sa_handler = term_handler;
1592     sigemptyset(&action.sa_mask);
1593     action.sa_flags = 0;
1594     if (sigaction(SIGTERM, &action, &oact) != 0) {
1595       barf("can't install TERM handler");
1596     }
1597   }
1598 #endif
1599
1600 #ifdef SMP
1601   /* Allocate N Capabilities */
1602   {
1603     nat i;
1604     Capability *cap, *prev;
1605     cap  = NULL;
1606     prev = NULL;
1607     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1608       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1609       cap->link = prev;
1610       prev = cap;
1611     }
1612     free_capabilities = cap;
1613     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1614   }
1615   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1616                              n_free_capabilities););
1617 #endif
1618
1619 #if defined(SMP) || defined(PAR)
1620   initSparkPools();
1621 #endif
1622 }
1623
1624 #ifdef SMP
1625 void
1626 startTasks( void )
1627 {
1628   nat i;
1629   int r;
1630   pthread_t tid;
1631   
1632   /* make some space for saving all the thread ids */
1633   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1634                             "initScheduler:task_ids");
1635   
1636   /* and create all the threads */
1637   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1638     r = pthread_create(&tid,NULL,taskStart,NULL);
1639     if (r != 0) {
1640       barf("startTasks: Can't create new Posix thread");
1641     }
1642     task_ids[i].id = tid;
1643     task_ids[i].mut_time = 0.0;
1644     task_ids[i].mut_etime = 0.0;
1645     task_ids[i].gc_time = 0.0;
1646     task_ids[i].gc_etime = 0.0;
1647     task_ids[i].elapsedtimestart = elapsedtime();
1648     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1649   }
1650 }
1651 #endif
1652
1653 void
1654 exitScheduler( void )
1655 {
1656 #ifdef SMP
1657   nat i;
1658
1659   /* Don't want to use pthread_cancel, since we'd have to install
1660    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1661    * all our locks.
1662    */
1663 #if 0
1664   /* Cancel all our tasks */
1665   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1666     pthread_cancel(task_ids[i].id);
1667   }
1668   
1669   /* Wait for all the tasks to terminate */
1670   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1671     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1672                                task_ids[i].id));
1673     pthread_join(task_ids[i].id, NULL);
1674   }
1675 #endif
1676
1677   /* Send 'em all a SIGHUP.  That should shut 'em up.
1678    */
1679   await_death = RtsFlags.ParFlags.nNodes;
1680   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1681     pthread_kill(task_ids[i].id,SIGTERM);
1682   }
1683   while (await_death > 0) {
1684     sched_yield();
1685   }
1686 #endif
1687 }
1688
1689 /* -----------------------------------------------------------------------------
1690    Managing the per-task allocation areas.
1691    
1692    Each capability comes with an allocation area.  These are
1693    fixed-length block lists into which allocation can be done.
1694
1695    ToDo: no support for two-space collection at the moment???
1696    -------------------------------------------------------------------------- */
1697
1698 /* -----------------------------------------------------------------------------
1699  * waitThread is the external interface for running a new computation
1700  * and waiting for the result.
1701  *
1702  * In the non-SMP case, we create a new main thread, push it on the 
1703  * main-thread stack, and invoke the scheduler to run it.  The
1704  * scheduler will return when the top main thread on the stack has
1705  * completed or died, and fill in the necessary fields of the
1706  * main_thread structure.
1707  *
1708  * In the SMP case, we create a main thread as before, but we then
1709  * create a new condition variable and sleep on it.  When our new
1710  * main thread has completed, we'll be woken up and the status/result
1711  * will be in the main_thread struct.
1712  * -------------------------------------------------------------------------- */
1713
1714 SchedulerStatus
1715 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1716 {
1717   StgMainThread *m;
1718   SchedulerStatus stat;
1719
1720   ACQUIRE_LOCK(&sched_mutex);
1721   
1722   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1723
1724   m->tso = tso;
1725   m->ret = ret;
1726   m->stat = NoStatus;
1727 #ifdef SMP
1728   pthread_cond_init(&m->wakeup, NULL);
1729 #endif
1730
1731   m->link = main_threads;
1732   main_threads = m;
1733
1734   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1735                               m->tso->id));
1736
1737 #ifdef SMP
1738   do {
1739     pthread_cond_wait(&m->wakeup, &sched_mutex);
1740   } while (m->stat == NoStatus);
1741 #elif defined(GRAN)
1742   /* GranSim specific init */
1743   CurrentTSO = m->tso;                // the TSO to run
1744   procStatus[MainProc] = Busy;        // status of main PE
1745   CurrentProc = MainProc;             // PE to run it on
1746
1747   schedule();
1748 #else
1749   schedule();
1750   ASSERT(m->stat != NoStatus);
1751 #endif
1752
1753   stat = m->stat;
1754
1755 #ifdef SMP
1756   pthread_cond_destroy(&m->wakeup);
1757 #endif
1758
1759   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1760                               m->tso->id));
1761   free(m);
1762
1763   RELEASE_LOCK(&sched_mutex);
1764
1765   return stat;
1766 }
1767
1768 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1769 //@subsection Run queue code 
1770
1771 #if 0
1772 /* 
1773    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1774        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1775        implicit global variable that has to be correct when calling these
1776        fcts -- HWL 
1777 */
1778
1779 /* Put the new thread on the head of the runnable queue.
1780  * The caller of createThread better push an appropriate closure
1781  * on this thread's stack before the scheduler is invoked.
1782  */
1783 static /* inline */ void
1784 add_to_run_queue(tso)
1785 StgTSO* tso; 
1786 {
1787   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1788   tso->link = run_queue_hd;
1789   run_queue_hd = tso;
1790   if (run_queue_tl == END_TSO_QUEUE) {
1791     run_queue_tl = tso;
1792   }
1793 }
1794
1795 /* Put the new thread at the end of the runnable queue. */
1796 static /* inline */ void
1797 push_on_run_queue(tso)
1798 StgTSO* tso; 
1799 {
1800   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1801   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1802   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1803   if (run_queue_hd == END_TSO_QUEUE) {
1804     run_queue_hd = tso;
1805   } else {
1806     run_queue_tl->link = tso;
1807   }
1808   run_queue_tl = tso;
1809 }
1810
1811 /* 
1812    Should be inlined because it's used very often in schedule.  The tso
1813    argument is actually only needed in GranSim, where we want to have the
1814    possibility to schedule *any* TSO on the run queue, irrespective of the
1815    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1816    the run queue and dequeue the tso, adjusting the links in the queue. 
1817 */
1818 //@cindex take_off_run_queue
1819 static /* inline */ StgTSO*
1820 take_off_run_queue(StgTSO *tso) {
1821   StgTSO *t, *prev;
1822
1823   /* 
1824      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1825
1826      if tso is specified, unlink that tso from the run_queue (doesn't have
1827      to be at the beginning of the queue); GranSim only 
1828   */
1829   if (tso!=END_TSO_QUEUE) {
1830     /* find tso in queue */
1831     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1832          t!=END_TSO_QUEUE && t!=tso;
1833          prev=t, t=t->link) 
1834       /* nothing */ ;
1835     ASSERT(t==tso);
1836     /* now actually dequeue the tso */
1837     if (prev!=END_TSO_QUEUE) {
1838       ASSERT(run_queue_hd!=t);
1839       prev->link = t->link;
1840     } else {
1841       /* t is at beginning of thread queue */
1842       ASSERT(run_queue_hd==t);
1843       run_queue_hd = t->link;
1844     }
1845     /* t is at end of thread queue */
1846     if (t->link==END_TSO_QUEUE) {
1847       ASSERT(t==run_queue_tl);
1848       run_queue_tl = prev;
1849     } else {
1850       ASSERT(run_queue_tl!=t);
1851     }
1852     t->link = END_TSO_QUEUE;
1853   } else {
1854     /* take tso from the beginning of the queue; std concurrent code */
1855     t = run_queue_hd;
1856     if (t != END_TSO_QUEUE) {
1857       run_queue_hd = t->link;
1858       t->link = END_TSO_QUEUE;
1859       if (run_queue_hd == END_TSO_QUEUE) {
1860         run_queue_tl = END_TSO_QUEUE;
1861       }
1862     }
1863   }
1864   return t;
1865 }
1866
1867 #endif /* 0 */
1868
1869 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1870 //@subsection Garbage Collextion Routines
1871
1872 /* ---------------------------------------------------------------------------
1873    Where are the roots that we know about?
1874
1875         - all the threads on the runnable queue
1876         - all the threads on the blocked queue
1877         - all the thread currently executing a _ccall_GC
1878         - all the "main threads"
1879      
1880    ------------------------------------------------------------------------ */
1881
1882 /* This has to be protected either by the scheduler monitor, or by the
1883         garbage collection monitor (probably the latter).
1884         KH @ 25/10/99
1885 */
1886
1887 static void GetRoots(void)
1888 {
1889   StgMainThread *m;
1890
1891 #if defined(GRAN)
1892   {
1893     nat i;
1894     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1895       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1896         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1897       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1898         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1899       
1900       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1901         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1902       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1903         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1904       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1905         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1906     }
1907   }
1908
1909   markEventQueue();
1910
1911 #else /* !GRAN */
1912   if (run_queue_hd != END_TSO_QUEUE) {
1913     ASSERT(run_queue_tl != END_TSO_QUEUE);
1914     run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1915     run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1916   }
1917
1918   if (blocked_queue_hd != END_TSO_QUEUE) {
1919     ASSERT(blocked_queue_tl != END_TSO_QUEUE);
1920     blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1921     blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1922   }
1923 #endif 
1924
1925   for (m = main_threads; m != NULL; m = m->link) {
1926     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1927   }
1928   if (suspended_ccalling_threads != END_TSO_QUEUE)
1929     suspended_ccalling_threads = 
1930       (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1931
1932 #if defined(SMP) || defined(PAR) || defined(GRAN)
1933   markSparkQueue();
1934 #endif
1935 }
1936
1937 /* -----------------------------------------------------------------------------
1938    performGC
1939
1940    This is the interface to the garbage collector from Haskell land.
1941    We provide this so that external C code can allocate and garbage
1942    collect when called from Haskell via _ccall_GC.
1943
1944    It might be useful to provide an interface whereby the programmer
1945    can specify more roots (ToDo).
1946    
1947    This needs to be protected by the GC condition variable above.  KH.
1948    -------------------------------------------------------------------------- */
1949
1950 void (*extra_roots)(void);
1951
1952 void
1953 performGC(void)
1954 {
1955   GarbageCollect(GetRoots);
1956 }
1957
1958 static void
1959 AllRoots(void)
1960 {
1961   GetRoots();                   /* the scheduler's roots */
1962   extra_roots();                /* the user's roots */
1963 }
1964
1965 void
1966 performGCWithRoots(void (*get_roots)(void))
1967 {
1968   extra_roots = get_roots;
1969
1970   GarbageCollect(AllRoots);
1971 }
1972
1973 /* -----------------------------------------------------------------------------
1974    Stack overflow
1975
1976    If the thread has reached its maximum stack size, then raise the
1977    StackOverflow exception in the offending thread.  Otherwise
1978    relocate the TSO into a larger chunk of memory and adjust its stack
1979    size appropriately.
1980    -------------------------------------------------------------------------- */
1981
1982 static StgTSO *
1983 threadStackOverflow(StgTSO *tso)
1984 {
1985   nat new_stack_size, new_tso_size, diff, stack_words;
1986   StgPtr new_sp;
1987   StgTSO *dest;
1988
1989   IF_DEBUG(sanity,checkTSO(tso));
1990   if (tso->stack_size >= tso->max_stack_size) {
1991
1992     IF_DEBUG(gc,
1993              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
1994                    tso->id, tso, tso->stack_size, tso->max_stack_size);
1995              /* If we're debugging, just print out the top of the stack */
1996              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
1997                                               tso->sp+64)));
1998
1999 #ifdef INTERPRETER
2000     fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
2001     exit(1);
2002 #else
2003     /* Send this thread the StackOverflow exception */
2004     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2005 #endif
2006     return tso;
2007   }
2008
2009   /* Try to double the current stack size.  If that takes us over the
2010    * maximum stack size for this thread, then use the maximum instead.
2011    * Finally round up so the TSO ends up as a whole number of blocks.
2012    */
2013   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2014   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2015                                        TSO_STRUCT_SIZE)/sizeof(W_);
2016   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2017   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2018
2019   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2020
2021   dest = (StgTSO *)allocate(new_tso_size);
2022   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
2023
2024   /* copy the TSO block and the old stack into the new area */
2025   memcpy(dest,tso,TSO_STRUCT_SIZE);
2026   stack_words = tso->stack + tso->stack_size - tso->sp;
2027   new_sp = (P_)dest + new_tso_size - stack_words;
2028   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2029
2030   /* relocate the stack pointers... */
2031   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2032   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2033   dest->sp    = new_sp;
2034   dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
2035   dest->stack_size = new_stack_size;
2036         
2037   /* and relocate the update frame list */
2038   relocate_TSO(tso, dest);
2039
2040   /* Mark the old TSO as relocated.  We have to check for relocated
2041    * TSOs in the garbage collector and any primops that deal with TSOs.
2042    *
2043    * It's important to set the sp and su values to just beyond the end
2044    * of the stack, so we don't attempt to scavenge any part of the
2045    * dead TSO's stack.
2046    */
2047   tso->what_next = ThreadRelocated;
2048   tso->link = dest;
2049   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2050   tso->su = (StgUpdateFrame *)tso->sp;
2051   tso->why_blocked = NotBlocked;
2052   dest->mut_link = NULL;
2053
2054   IF_PAR_DEBUG(verbose,
2055                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2056                      tso->id, tso, tso->stack_size);
2057                /* If we're debugging, just print out the top of the stack */
2058                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2059                                                 tso->sp+64)));
2060   
2061   IF_DEBUG(sanity,checkTSO(tso));
2062 #if 0
2063   IF_DEBUG(scheduler,printTSO(dest));
2064 #endif
2065
2066   return dest;
2067 }
2068
2069 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2070 //@subsection Blocking Queue Routines
2071
2072 /* ---------------------------------------------------------------------------
2073    Wake up a queue that was blocked on some resource.
2074    ------------------------------------------------------------------------ */
2075
2076 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
2077
2078 #if defined(GRAN)
2079 static inline void
2080 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2081 {
2082 }
2083 #elif defined(PAR)
2084 static inline void
2085 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2086 {
2087   /* write RESUME events to log file and
2088      update blocked and fetch time (depending on type of the orig closure) */
2089   if (RtsFlags.ParFlags.ParStats.Full) {
2090     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2091                      GR_RESUME, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2092                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2093
2094     switch (get_itbl(node)->type) {
2095         case FETCH_ME_BQ:
2096           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2097           break;
2098         case RBH:
2099         case FETCH_ME:
2100         case BLACKHOLE_BQ:
2101           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2102           break;
2103         default:
2104           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2105         }
2106       }
2107 }
2108 #endif
2109
2110 #if defined(GRAN)
2111 static StgBlockingQueueElement *
2112 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2113 {
2114     StgTSO *tso;
2115     PEs node_loc, tso_loc;
2116
2117     node_loc = where_is(node); // should be lifted out of loop
2118     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2119     tso_loc = where_is((StgClosure *)tso);
2120     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2121       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2122       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2123       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2124       // insertThread(tso, node_loc);
2125       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2126                 ResumeThread,
2127                 tso, node, (rtsSpark*)NULL);
2128       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2129       // len_local++;
2130       // len++;
2131     } else { // TSO is remote (actually should be FMBQ)
2132       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2133                                   RtsFlags.GranFlags.Costs.gunblocktime +
2134                                   RtsFlags.GranFlags.Costs.latency;
2135       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2136                 UnblockThread,
2137                 tso, node, (rtsSpark*)NULL);
2138       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2139       // len++;
2140     }
2141     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2142     IF_GRAN_DEBUG(bq,
2143                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2144                           (node_loc==tso_loc ? "Local" : "Global"), 
2145                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2146     tso->block_info.closure = NULL;
2147     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2148                              tso->id, tso));
2149 }
2150 #elif defined(PAR)
2151 static StgBlockingQueueElement *
2152 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2153 {
2154     StgBlockingQueueElement *next;
2155
2156     switch (get_itbl(bqe)->type) {
2157     case TSO:
2158       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2159       /* if it's a TSO just push it onto the run_queue */
2160       next = bqe->link;
2161       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2162       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2163       THREAD_RUNNABLE();
2164       unblockCount(bqe, node);
2165       /* reset blocking status after dumping event */
2166       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2167       break;
2168
2169     case BLOCKED_FETCH:
2170       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2171       next = bqe->link;
2172       bqe->link = PendingFetches;
2173       PendingFetches = bqe;
2174       break;
2175
2176 # if defined(DEBUG)
2177       /* can ignore this case in a non-debugging setup; 
2178          see comments on RBHSave closures above */
2179     case CONSTR:
2180       /* check that the closure is an RBHSave closure */
2181       ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
2182              get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
2183              get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
2184       break;
2185
2186     default:
2187       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2188            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2189            (StgClosure *)bqe);
2190 # endif
2191     }
2192   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2193   return next;
2194 }
2195
2196 #else /* !GRAN && !PAR */
2197 static StgTSO *
2198 unblockOneLocked(StgTSO *tso)
2199 {
2200   StgTSO *next;
2201
2202   ASSERT(get_itbl(tso)->type == TSO);
2203   ASSERT(tso->why_blocked != NotBlocked);
2204   tso->why_blocked = NotBlocked;
2205   next = tso->link;
2206   PUSH_ON_RUN_QUEUE(tso);
2207   THREAD_RUNNABLE();
2208   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2209   return next;
2210 }
2211 #endif
2212
2213 #if defined(GRAN) || defined(PAR)
2214 inline StgBlockingQueueElement *
2215 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2216 {
2217   ACQUIRE_LOCK(&sched_mutex);
2218   bqe = unblockOneLocked(bqe, node);
2219   RELEASE_LOCK(&sched_mutex);
2220   return bqe;
2221 }
2222 #else
2223 inline StgTSO *
2224 unblockOne(StgTSO *tso)
2225 {
2226   ACQUIRE_LOCK(&sched_mutex);
2227   tso = unblockOneLocked(tso);
2228   RELEASE_LOCK(&sched_mutex);
2229   return tso;
2230 }
2231 #endif
2232
2233 #if defined(GRAN)
2234 void 
2235 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2236 {
2237   StgBlockingQueueElement *bqe;
2238   PEs node_loc;
2239   nat len = 0; 
2240
2241   IF_GRAN_DEBUG(bq, 
2242                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2243                       node, CurrentProc, CurrentTime[CurrentProc], 
2244                       CurrentTSO->id, CurrentTSO));
2245
2246   node_loc = where_is(node);
2247
2248   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2249          get_itbl(q)->type == CONSTR); // closure (type constructor)
2250   ASSERT(is_unique(node));
2251
2252   /* FAKE FETCH: magically copy the node to the tso's proc;
2253      no Fetch necessary because in reality the node should not have been 
2254      moved to the other PE in the first place
2255   */
2256   if (CurrentProc!=node_loc) {
2257     IF_GRAN_DEBUG(bq, 
2258                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2259                         node, node_loc, CurrentProc, CurrentTSO->id, 
2260                         // CurrentTSO, where_is(CurrentTSO),
2261                         node->header.gran.procs));
2262     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2263     IF_GRAN_DEBUG(bq, 
2264                   belch("## new bitmask of node %p is %#x",
2265                         node, node->header.gran.procs));
2266     if (RtsFlags.GranFlags.GranSimStats.Global) {
2267       globalGranStats.tot_fake_fetches++;
2268     }
2269   }
2270
2271   bqe = q;
2272   // ToDo: check: ASSERT(CurrentProc==node_loc);
2273   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2274     //next = bqe->link;
2275     /* 
2276        bqe points to the current element in the queue
2277        next points to the next element in the queue
2278     */
2279     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2280     //tso_loc = where_is(tso);
2281     len++;
2282     bqe = unblockOneLocked(bqe, node);
2283   }
2284
2285   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2286      the closure to make room for the anchor of the BQ */
2287   if (bqe!=END_BQ_QUEUE) {
2288     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2289     /*
2290     ASSERT((info_ptr==&RBH_Save_0_info) ||
2291            (info_ptr==&RBH_Save_1_info) ||
2292            (info_ptr==&RBH_Save_2_info));
2293     */
2294     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2295     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2296     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2297
2298     IF_GRAN_DEBUG(bq,
2299                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2300                         node, info_type(node)));
2301   }
2302
2303   /* statistics gathering */
2304   if (RtsFlags.GranFlags.GranSimStats.Global) {
2305     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2306     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2307     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2308     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2309   }
2310   IF_GRAN_DEBUG(bq,
2311                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2312                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2313 }
2314 #elif defined(PAR)
2315 void 
2316 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2317 {
2318   StgBlockingQueueElement *bqe, *next;
2319
2320   ACQUIRE_LOCK(&sched_mutex);
2321
2322   IF_PAR_DEBUG(verbose, 
2323                belch("## AwBQ for node %p on [%x]: ",
2324                      node, mytid));
2325
2326   ASSERT(get_itbl(q)->type == TSO ||           
2327          get_itbl(q)->type == BLOCKED_FETCH || 
2328          get_itbl(q)->type == CONSTR); 
2329
2330   bqe = q;
2331   while (get_itbl(bqe)->type==TSO || 
2332          get_itbl(bqe)->type==BLOCKED_FETCH) {
2333     bqe = unblockOneLocked(bqe, node);
2334   }
2335   RELEASE_LOCK(&sched_mutex);
2336 }
2337
2338 #else   /* !GRAN && !PAR */
2339 void
2340 awakenBlockedQueue(StgTSO *tso)
2341 {
2342   ACQUIRE_LOCK(&sched_mutex);
2343   while (tso != END_TSO_QUEUE) {
2344     tso = unblockOneLocked(tso);
2345   }
2346   RELEASE_LOCK(&sched_mutex);
2347 }
2348 #endif
2349
2350 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2351 //@subsection Exception Handling Routines
2352
2353 /* ---------------------------------------------------------------------------
2354    Interrupt execution
2355    - usually called inside a signal handler so it mustn't do anything fancy.   
2356    ------------------------------------------------------------------------ */
2357
2358 void
2359 interruptStgRts(void)
2360 {
2361     interrupted    = 1;
2362     context_switch = 1;
2363 }
2364
2365 /* -----------------------------------------------------------------------------
2366    Unblock a thread
2367
2368    This is for use when we raise an exception in another thread, which
2369    may be blocked.
2370    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2371    -------------------------------------------------------------------------- */
2372
2373 #if defined(GRAN) || defined(PAR)
2374 /*
2375   NB: only the type of the blocking queue is different in GranSim and GUM
2376       the operations on the queue-elements are the same
2377       long live polymorphism!
2378 */
2379 static void
2380 unblockThread(StgTSO *tso)
2381 {
2382   StgBlockingQueueElement *t, **last;
2383
2384   ACQUIRE_LOCK(&sched_mutex);
2385   switch (tso->why_blocked) {
2386
2387   case NotBlocked:
2388     return;  /* not blocked */
2389
2390   case BlockedOnMVar:
2391     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2392     {
2393       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2394       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2395
2396       last = (StgBlockingQueueElement **)&mvar->head;
2397       for (t = (StgBlockingQueueElement *)mvar->head; 
2398            t != END_BQ_QUEUE; 
2399            last = &t->link, last_tso = t, t = t->link) {
2400         if (t == (StgBlockingQueueElement *)tso) {
2401           *last = (StgBlockingQueueElement *)tso->link;
2402           if (mvar->tail == tso) {
2403             mvar->tail = (StgTSO *)last_tso;
2404           }
2405           goto done;
2406         }
2407       }
2408       barf("unblockThread (MVAR): TSO not found");
2409     }
2410
2411   case BlockedOnBlackHole:
2412     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2413     {
2414       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2415
2416       last = &bq->blocking_queue;
2417       for (t = bq->blocking_queue; 
2418            t != END_BQ_QUEUE; 
2419            last = &t->link, t = t->link) {
2420         if (t == (StgBlockingQueueElement *)tso) {
2421           *last = (StgBlockingQueueElement *)tso->link;
2422           goto done;
2423         }
2424       }
2425       barf("unblockThread (BLACKHOLE): TSO not found");
2426     }
2427
2428   case BlockedOnException:
2429     {
2430       StgTSO *target  = tso->block_info.tso;
2431
2432       ASSERT(get_itbl(target)->type == TSO);
2433       ASSERT(target->blocked_exceptions != NULL);
2434
2435       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2436       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2437            t != END_BQ_QUEUE; 
2438            last = &t->link, t = t->link) {
2439         ASSERT(get_itbl(t)->type == TSO);
2440         if (t == (StgBlockingQueueElement *)tso) {
2441           *last = (StgBlockingQueueElement *)tso->link;
2442           goto done;
2443         }
2444       }
2445       barf("unblockThread (Exception): TSO not found");
2446     }
2447
2448   case BlockedOnDelay:
2449   case BlockedOnRead:
2450   case BlockedOnWrite:
2451     {
2452       StgBlockingQueueElement *prev = NULL;
2453       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2454            prev = t, t = t->link) {
2455         if (t == (StgBlockingQueueElement *)tso) {
2456           if (prev == NULL) {
2457             blocked_queue_hd = (StgTSO *)t->link;
2458             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2459               blocked_queue_tl = END_TSO_QUEUE;
2460             }
2461           } else {
2462             prev->link = t->link;
2463             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2464               blocked_queue_tl = (StgTSO *)prev;
2465             }
2466           }
2467           goto done;
2468         }
2469       }
2470       barf("unblockThread (I/O): TSO not found");
2471     }
2472
2473   default:
2474     barf("unblockThread");
2475   }
2476
2477  done:
2478   tso->link = END_TSO_QUEUE;
2479   tso->why_blocked = NotBlocked;
2480   tso->block_info.closure = NULL;
2481   PUSH_ON_RUN_QUEUE(tso);
2482   RELEASE_LOCK(&sched_mutex);
2483 }
2484 #else
2485 static void
2486 unblockThread(StgTSO *tso)
2487 {
2488   StgTSO *t, **last;
2489
2490   ACQUIRE_LOCK(&sched_mutex);
2491   switch (tso->why_blocked) {
2492
2493   case NotBlocked:
2494     return;  /* not blocked */
2495
2496   case BlockedOnMVar:
2497     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2498     {
2499       StgTSO *last_tso = END_TSO_QUEUE;
2500       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2501
2502       last = &mvar->head;
2503       for (t = mvar->head; t != END_TSO_QUEUE; 
2504            last = &t->link, last_tso = t, t = t->link) {
2505         if (t == tso) {
2506           *last = tso->link;
2507           if (mvar->tail == tso) {
2508             mvar->tail = last_tso;
2509           }
2510           goto done;
2511         }
2512       }
2513       barf("unblockThread (MVAR): TSO not found");
2514     }
2515
2516   case BlockedOnBlackHole:
2517     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2518     {
2519       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2520
2521       last = &bq->blocking_queue;
2522       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2523            last = &t->link, t = t->link) {
2524         if (t == tso) {
2525           *last = tso->link;
2526           goto done;
2527         }
2528       }
2529       barf("unblockThread (BLACKHOLE): TSO not found");
2530     }
2531
2532   case BlockedOnException:
2533     {
2534       StgTSO *target  = tso->block_info.tso;
2535
2536       ASSERT(get_itbl(target)->type == TSO);
2537       ASSERT(target->blocked_exceptions != NULL);
2538
2539       last = &target->blocked_exceptions;
2540       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2541            last = &t->link, t = t->link) {
2542         ASSERT(get_itbl(t)->type == TSO);
2543         if (t == tso) {
2544           *last = tso->link;
2545           goto done;
2546         }
2547       }
2548       barf("unblockThread (Exception): TSO not found");
2549     }
2550
2551   case BlockedOnDelay:
2552   case BlockedOnRead:
2553   case BlockedOnWrite:
2554     {
2555       StgTSO *prev = NULL;
2556       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2557            prev = t, t = t->link) {
2558         if (t == tso) {
2559           if (prev == NULL) {
2560             blocked_queue_hd = t->link;
2561             if (blocked_queue_tl == t) {
2562               blocked_queue_tl = END_TSO_QUEUE;
2563             }
2564           } else {
2565             prev->link = t->link;
2566             if (blocked_queue_tl == t) {
2567               blocked_queue_tl = prev;
2568             }
2569           }
2570           goto done;
2571         }
2572       }
2573       barf("unblockThread (I/O): TSO not found");
2574     }
2575
2576   default:
2577     barf("unblockThread");
2578   }
2579
2580  done:
2581   tso->link = END_TSO_QUEUE;
2582   tso->why_blocked = NotBlocked;
2583   tso->block_info.closure = NULL;
2584   PUSH_ON_RUN_QUEUE(tso);
2585   RELEASE_LOCK(&sched_mutex);
2586 }
2587 #endif
2588
2589 /* -----------------------------------------------------------------------------
2590  * raiseAsync()
2591  *
2592  * The following function implements the magic for raising an
2593  * asynchronous exception in an existing thread.
2594  *
2595  * We first remove the thread from any queue on which it might be
2596  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2597  *
2598  * We strip the stack down to the innermost CATCH_FRAME, building
2599  * thunks in the heap for all the active computations, so they can 
2600  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2601  * an application of the handler to the exception, and push it on
2602  * the top of the stack.
2603  * 
2604  * How exactly do we save all the active computations?  We create an
2605  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2606  * AP_UPDs pushes everything from the corresponding update frame
2607  * upwards onto the stack.  (Actually, it pushes everything up to the
2608  * next update frame plus a pointer to the next AP_UPD object.
2609  * Entering the next AP_UPD object pushes more onto the stack until we
2610  * reach the last AP_UPD object - at which point the stack should look
2611  * exactly as it did when we killed the TSO and we can continue
2612  * execution by entering the closure on top of the stack.
2613  *
2614  * We can also kill a thread entirely - this happens if either (a) the 
2615  * exception passed to raiseAsync is NULL, or (b) there's no
2616  * CATCH_FRAME on the stack.  In either case, we strip the entire
2617  * stack and replace the thread with a zombie.
2618  *
2619  * -------------------------------------------------------------------------- */
2620  
2621 void 
2622 deleteThread(StgTSO *tso)
2623 {
2624   raiseAsync(tso,NULL);
2625 }
2626
2627 void
2628 raiseAsync(StgTSO *tso, StgClosure *exception)
2629 {
2630   StgUpdateFrame* su = tso->su;
2631   StgPtr          sp = tso->sp;
2632   
2633   /* Thread already dead? */
2634   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
2635     return;
2636   }
2637
2638   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2639
2640   /* Remove it from any blocking queues */
2641   unblockThread(tso);
2642
2643   /* The stack freezing code assumes there's a closure pointer on
2644    * the top of the stack.  This isn't always the case with compiled
2645    * code, so we have to push a dummy closure on the top which just
2646    * returns to the next return address on the stack.
2647    */
2648   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2649     *(--sp) = (W_)&dummy_ret_closure;
2650   }
2651
2652   while (1) {
2653     int words = ((P_)su - (P_)sp) - 1;
2654     nat i;
2655     StgAP_UPD * ap;
2656
2657     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2658      * then build PAP(handler,exception,realworld#), and leave it on
2659      * top of the stack ready to enter.
2660      */
2661     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2662       StgCatchFrame *cf = (StgCatchFrame *)su;
2663       /* we've got an exception to raise, so let's pass it to the
2664        * handler in this frame.
2665        */
2666       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2667       TICK_ALLOC_UPD_PAP(3,0);
2668       SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2669               
2670       ap->n_args = 2;
2671       ap->fun = cf->handler;    /* :: Exception -> IO a */
2672       ap->payload[0] = (P_)exception;
2673       ap->payload[1] = ARG_TAG(0); /* realworld token */
2674
2675       /* throw away the stack from Sp up to and including the
2676        * CATCH_FRAME.
2677        */
2678       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2679       tso->su = cf->link;
2680
2681       /* Restore the blocked/unblocked state for asynchronous exceptions
2682        * at the CATCH_FRAME.  
2683        *
2684        * If exceptions were unblocked at the catch, arrange that they
2685        * are unblocked again after executing the handler by pushing an
2686        * unblockAsyncExceptions_ret stack frame.
2687        */
2688       if (!cf->exceptions_blocked) {
2689         *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2690       }
2691       
2692       /* Ensure that async exceptions are blocked when running the handler.
2693        */
2694       if (tso->blocked_exceptions == NULL) {
2695         tso->blocked_exceptions = END_TSO_QUEUE;
2696       }
2697       
2698       /* Put the newly-built PAP on top of the stack, ready to execute
2699        * when the thread restarts.
2700        */
2701       sp[0] = (W_)ap;
2702       tso->sp = sp;
2703       tso->what_next = ThreadEnterGHC;
2704       return;
2705     }
2706
2707     /* First build an AP_UPD consisting of the stack chunk above the
2708      * current update frame, with the top word on the stack as the
2709      * fun field.
2710      */
2711     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2712     
2713     ASSERT(words >= 0);
2714     
2715     ap->n_args = words;
2716     ap->fun    = (StgClosure *)sp[0];
2717     sp++;
2718     for(i=0; i < (nat)words; ++i) {
2719       ap->payload[i] = (P_)*sp++;
2720     }
2721     
2722     switch (get_itbl(su)->type) {
2723       
2724     case UPDATE_FRAME:
2725       {
2726         SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2727         TICK_ALLOC_UP_THK(words+1,0);
2728         
2729         IF_DEBUG(scheduler,
2730                  fprintf(stderr,  "scheduler: Updating ");
2731                  printPtr((P_)su->updatee); 
2732                  fprintf(stderr,  " with ");
2733                  printObj((StgClosure *)ap);
2734                  );
2735         
2736         /* Replace the updatee with an indirection - happily
2737          * this will also wake up any threads currently
2738          * waiting on the result.
2739          */
2740         UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2741         su = su->link;
2742         sp += sizeofW(StgUpdateFrame) -1;
2743         sp[0] = (W_)ap; /* push onto stack */
2744         break;
2745       }
2746       
2747     case CATCH_FRAME:
2748       {
2749         StgCatchFrame *cf = (StgCatchFrame *)su;
2750         StgClosure* o;
2751         
2752         /* We want a PAP, not an AP_UPD.  Fortunately, the
2753          * layout's the same.
2754          */
2755         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2756         TICK_ALLOC_UPD_PAP(words+1,0);
2757         
2758         /* now build o = FUN(catch,ap,handler) */
2759         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2760         TICK_ALLOC_FUN(2,0);
2761         SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2762         o->payload[0] = (StgClosure *)ap;
2763         o->payload[1] = cf->handler;
2764         
2765         IF_DEBUG(scheduler,
2766                  fprintf(stderr,  "scheduler: Built ");
2767                  printObj((StgClosure *)o);
2768                  );
2769         
2770         /* pop the old handler and put o on the stack */
2771         su = cf->link;
2772         sp += sizeofW(StgCatchFrame) - 1;
2773         sp[0] = (W_)o;
2774         break;
2775       }
2776       
2777     case SEQ_FRAME:
2778       {
2779         StgSeqFrame *sf = (StgSeqFrame *)su;
2780         StgClosure* o;
2781         
2782         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2783         TICK_ALLOC_UPD_PAP(words+1,0);
2784         
2785         /* now build o = FUN(seq,ap) */
2786         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2787         TICK_ALLOC_SE_THK(1,0);
2788         SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2789         o->payload[0] = (StgClosure *)ap;
2790         
2791         IF_DEBUG(scheduler,
2792                  fprintf(stderr,  "scheduler: Built ");
2793                  printObj((StgClosure *)o);
2794                  );
2795         
2796         /* pop the old handler and put o on the stack */
2797         su = sf->link;
2798         sp += sizeofW(StgSeqFrame) - 1;
2799         sp[0] = (W_)o;
2800         break;
2801       }
2802       
2803     case STOP_FRAME:
2804       /* We've stripped the entire stack, the thread is now dead. */
2805       sp += sizeofW(StgStopFrame) - 1;
2806       sp[0] = (W_)exception;    /* save the exception */
2807       tso->what_next = ThreadKilled;
2808       tso->su = (StgUpdateFrame *)(sp+1);
2809       tso->sp = sp;
2810       return;
2811       
2812     default:
2813       barf("raiseAsync");
2814     }
2815   }
2816   barf("raiseAsync");
2817 }
2818
2819 /* -----------------------------------------------------------------------------
2820    resurrectThreads is called after garbage collection on the list of
2821    threads found to be garbage.  Each of these threads will be woken
2822    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
2823    on an MVar, or NonTermination if the thread was blocked on a Black
2824    Hole.
2825    -------------------------------------------------------------------------- */
2826
2827 void
2828 resurrectThreads( StgTSO *threads )
2829 {
2830   StgTSO *tso, *next;
2831
2832   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
2833     next = tso->global_link;
2834     tso->global_link = all_threads;
2835     all_threads = tso;
2836     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
2837
2838     switch (tso->why_blocked) {
2839     case BlockedOnMVar:
2840     case BlockedOnException:
2841       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
2842       break;
2843     case BlockedOnBlackHole:
2844       raiseAsync(tso,(StgClosure *)NonTermination_closure);
2845       break;
2846     case NotBlocked:
2847       /* This might happen if the thread was blocked on a black hole
2848        * belonging to a thread that we've just woken up (raiseAsync
2849        * can wake up threads, remember...).
2850        */
2851       continue;
2852     default:
2853       barf("resurrectThreads: thread blocked in a strange way");
2854     }
2855   }
2856 }
2857
2858 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2859 //@subsection Debugging Routines
2860
2861 /* -----------------------------------------------------------------------------
2862    Debugging: why is a thread blocked
2863    -------------------------------------------------------------------------- */
2864
2865 #ifdef DEBUG
2866
2867 void
2868 printThreadBlockage(StgTSO *tso)
2869 {
2870   switch (tso->why_blocked) {
2871   case BlockedOnRead:
2872     fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2873     break;
2874   case BlockedOnWrite:
2875     fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2876     break;
2877   case BlockedOnDelay:
2878 #if defined(HAVE_SETITIMER) || defined(mingw32_TARGET_OS)
2879     fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2880 #else
2881     fprintf(stderr,"blocked on delay of %d ms", 
2882             tso->block_info.target - getourtimeofday());
2883 #endif
2884     break;
2885   case BlockedOnMVar:
2886     fprintf(stderr,"blocked on an MVar");
2887     break;
2888   case BlockedOnException:
2889     fprintf(stderr,"blocked on delivering an exception to thread %d",
2890             tso->block_info.tso->id);
2891     break;
2892   case BlockedOnBlackHole:
2893     fprintf(stderr,"blocked on a black hole");
2894     break;
2895   case NotBlocked:
2896     fprintf(stderr,"not blocked");
2897     break;
2898 #if defined(PAR)
2899   case BlockedOnGA:
2900     fprintf(stderr,"blocked on global address; local FM_BQ is %p (%s)",
2901             tso->block_info.closure, info_type(tso->block_info.closure));
2902     break;
2903   case BlockedOnGA_NoSend:
2904     fprintf(stderr,"blocked on global address (no send); local FM_BQ is %p (%s)",
2905             tso->block_info.closure, info_type(tso->block_info.closure));
2906     break;
2907 #endif
2908   default:
2909     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
2910          tso->why_blocked, tso->id, tso);
2911   }
2912 }
2913
2914 void
2915 printThreadStatus(StgTSO *tso)
2916 {
2917   switch (tso->what_next) {
2918   case ThreadKilled:
2919     fprintf(stderr,"has been killed");
2920     break;
2921   case ThreadComplete:
2922     fprintf(stderr,"has completed");
2923     break;
2924   default:
2925     printThreadBlockage(tso);
2926   }
2927 }
2928
2929 void
2930 printAllThreads(void)
2931 {
2932   StgTSO *t;
2933
2934   sched_belch("all threads:");
2935   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
2936     fprintf(stderr, "\tthread %d is ", t->id);
2937     printThreadStatus(t);
2938     fprintf(stderr,"\n");
2939   }
2940 }
2941     
2942 /* 
2943    Print a whole blocking queue attached to node (debugging only).
2944 */
2945 //@cindex print_bq
2946 # if defined(PAR)
2947 void 
2948 print_bq (StgClosure *node)
2949 {
2950   StgBlockingQueueElement *bqe;
2951   StgTSO *tso;
2952   rtsBool end;
2953
2954   fprintf(stderr,"## BQ of closure %p (%s): ",
2955           node, info_type(node));
2956
2957   /* should cover all closures that may have a blocking queue */
2958   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2959          get_itbl(node)->type == FETCH_ME_BQ ||
2960          get_itbl(node)->type == RBH);
2961     
2962   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2963   /* 
2964      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2965   */
2966   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2967        !end; // iterate until bqe points to a CONSTR
2968        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2969     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2970     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2971     /* types of closures that may appear in a blocking queue */
2972     ASSERT(get_itbl(bqe)->type == TSO ||           
2973            get_itbl(bqe)->type == BLOCKED_FETCH || 
2974            get_itbl(bqe)->type == CONSTR); 
2975     /* only BQs of an RBH end with an RBH_Save closure */
2976     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2977
2978     switch (get_itbl(bqe)->type) {
2979     case TSO:
2980       fprintf(stderr," TSO %d (%x),",
2981               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
2982       break;
2983     case BLOCKED_FETCH:
2984       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
2985               ((StgBlockedFetch *)bqe)->node, 
2986               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
2987               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
2988               ((StgBlockedFetch *)bqe)->ga.weight);
2989       break;
2990     case CONSTR:
2991       fprintf(stderr," %s (IP %p),",
2992               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2993                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2994                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2995                "RBH_Save_?"), get_itbl(bqe));
2996       break;
2997     default:
2998       barf("Unexpected closure type %s in blocking queue of %p (%s)",
2999            info_type(bqe), node, info_type(node));
3000       break;
3001     }
3002   } /* for */
3003   fputc('\n', stderr);
3004 }
3005 # elif defined(GRAN)
3006 void 
3007 print_bq (StgClosure *node)
3008 {
3009   StgBlockingQueueElement *bqe;
3010   PEs node_loc, tso_loc;
3011   rtsBool end;
3012
3013   /* should cover all closures that may have a blocking queue */
3014   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3015          get_itbl(node)->type == FETCH_ME_BQ ||
3016          get_itbl(node)->type == RBH);
3017     
3018   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3019   node_loc = where_is(node);
3020
3021   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3022           node, info_type(node), node_loc);
3023
3024   /* 
3025      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3026   */
3027   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3028        !end; // iterate until bqe points to a CONSTR
3029        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3030     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3031     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3032     /* types of closures that may appear in a blocking queue */
3033     ASSERT(get_itbl(bqe)->type == TSO ||           
3034            get_itbl(bqe)->type == CONSTR); 
3035     /* only BQs of an RBH end with an RBH_Save closure */
3036     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3037
3038     tso_loc = where_is((StgClosure *)bqe);
3039     switch (get_itbl(bqe)->type) {
3040     case TSO:
3041       fprintf(stderr," TSO %d (%p) on [PE %d],",
3042               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3043       break;
3044     case CONSTR:
3045       fprintf(stderr," %s (IP %p),",
3046               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
3047                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
3048                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
3049                "RBH_Save_?"), get_itbl(bqe));
3050       break;
3051     default:
3052       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3053            info_type((StgClosure *)bqe), node, info_type(node));
3054       break;
3055     }
3056   } /* for */
3057   fputc('\n', stderr);
3058 }
3059 #else
3060 /* 
3061    Nice and easy: only TSOs on the blocking queue
3062 */
3063 void 
3064 print_bq (StgClosure *node)
3065 {
3066   StgTSO *tso;
3067
3068   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3069   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3070        tso != END_TSO_QUEUE; 
3071        tso=tso->link) {
3072     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3073     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3074     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3075   }
3076   fputc('\n', stderr);
3077 }
3078 # endif
3079
3080 #if defined(PAR)
3081 static nat
3082 run_queue_len(void)
3083 {
3084   nat i;
3085   StgTSO *tso;
3086
3087   for (i=0, tso=run_queue_hd; 
3088        tso != END_TSO_QUEUE;
3089        i++, tso=tso->link)
3090     /* nothing */
3091
3092   return i;
3093 }
3094 #endif
3095
3096 static void
3097 sched_belch(char *s, ...)
3098 {
3099   va_list ap;
3100   va_start(ap,s);
3101 #ifdef SMP
3102   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
3103 #else
3104   fprintf(stderr, "scheduler: ");
3105 #endif
3106   vfprintf(stderr, s, ap);
3107   fprintf(stderr, "\n");
3108 }
3109
3110 #endif /* DEBUG */
3111
3112
3113 //@node Index,  , Debugging Routines, Main scheduling code
3114 //@subsection Index
3115
3116 //@index
3117 //* MainRegTable::  @cindex\s-+MainRegTable
3118 //* StgMainThread::  @cindex\s-+StgMainThread
3119 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3120 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3121 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3122 //* context_switch::  @cindex\s-+context_switch
3123 //* createThread::  @cindex\s-+createThread
3124 //* free_capabilities::  @cindex\s-+free_capabilities
3125 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3126 //* initScheduler::  @cindex\s-+initScheduler
3127 //* interrupted::  @cindex\s-+interrupted
3128 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
3129 //* next_thread_id::  @cindex\s-+next_thread_id
3130 //* print_bq::  @cindex\s-+print_bq
3131 //* run_queue_hd::  @cindex\s-+run_queue_hd
3132 //* run_queue_tl::  @cindex\s-+run_queue_tl
3133 //* sched_mutex::  @cindex\s-+sched_mutex
3134 //* schedule::  @cindex\s-+schedule
3135 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3136 //* task_ids::  @cindex\s-+task_ids
3137 //* term_mutex::  @cindex\s-+term_mutex
3138 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3139 //@end index