37eeda944af856173a1fb0bbd921adf982c76b58
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.44 2000/01/14 13:39:59 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-1999
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Prototypes::                
41 //* Main scheduling loop::      
42 //* Suspend and Resume::        
43 //* Run queue code::            
44 //* Garbage Collextion Routines::  
45 //* Blocking Queue Routines::   
46 //* Exception Handling Routines::  
47 //* Debugging Routines::        
48 //* Index::                     
49 //@end menu
50
51 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
52 //@subsection Includes
53
54 #include "Rts.h"
55 #include "SchedAPI.h"
56 #include "RtsUtils.h"
57 #include "RtsFlags.h"
58 #include "Storage.h"
59 #include "StgRun.h"
60 #include "StgStartup.h"
61 #include "GC.h"
62 #include "Hooks.h"
63 #include "Schedule.h"
64 #include "StgMiscClosures.h"
65 #include "Storage.h"
66 #include "Evaluator.h"
67 #include "Exception.h"
68 #include "Printer.h"
69 #include "Main.h"
70 #include "Signals.h"
71 #include "Profiling.h"
72 #include "Sanity.h"
73 #include "Stats.h"
74 #include "Sparks.h"
75 #if defined(GRAN) || defined(PAR)
76 # include "GranSimRts.h"
77 # include "GranSim.h"
78 # include "ParallelRts.h"
79 # include "Parallel.h"
80 # include "ParallelDebug.h"
81 # include "FetchMe.h"
82 # include "HLC.h"
83 #endif
84
85 #include <stdarg.h>
86
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
89
90 /* Main threads:
91  *
92  * These are the threads which clients have requested that we run.  
93  *
94  * In an SMP build, we might have several concurrent clients all
95  * waiting for results, and each one will wait on a condition variable
96  * until the result is available.
97  *
98  * In non-SMP, clients are strictly nested: the first client calls
99  * into the RTS, which might call out again to C with a _ccall_GC, and
100  * eventually re-enter the RTS.
101  *
102  * Main threads information is kept in a linked list:
103  */
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
106   StgTSO *         tso;
107   SchedulerStatus  stat;
108   StgClosure **    ret;
109 #ifdef SMP
110   pthread_cond_t wakeup;
111 #endif
112   struct StgMainThread_ *link;
113 } StgMainThread;
114
115 /* Main thread queue.
116  * Locks required: sched_mutex.
117  */
118 static StgMainThread *main_threads;
119
120 /* Thread queues.
121  * Locks required: sched_mutex.
122  */
123
124 #if defined(GRAN)
125
126 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
127 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
128
129 /* 
130    In GranSim we have a runable and a blocked queue for each processor.
131    In order to minimise code changes new arrays run_queue_hds/tls
132    are created. run_queue_hd is then a short cut (macro) for
133    run_queue_hds[CurrentProc] (see GranSim.h).
134    -- HWL
135 */
136 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
137 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
138 StgTSO *ccalling_threadss[MAX_PROC];
139
140 #else /* !GRAN */
141
142 //@cindex run_queue_hd
143 //@cindex run_queue_tl
144 //@cindex blocked_queue_hd
145 //@cindex blocked_queue_tl
146 StgTSO *run_queue_hd, *run_queue_tl;
147 StgTSO *blocked_queue_hd, *blocked_queue_tl;
148
149 /* Threads suspended in _ccall_GC.
150  * Locks required: sched_mutex.
151  */
152 static StgTSO *suspended_ccalling_threads;
153
154 static void GetRoots(void);
155 static StgTSO *threadStackOverflow(StgTSO *tso);
156 #endif
157
158 /* KH: The following two flags are shared memory locations.  There is no need
159        to lock them, since they are only unset at the end of a scheduler
160        operation.
161 */
162
163 /* flag set by signal handler to precipitate a context switch */
164 //@cindex context_switch
165 nat context_switch;
166
167 /* if this flag is set as well, give up execution */
168 //@cindex interrupted
169 rtsBool interrupted;
170
171 /* Next thread ID to allocate.
172  * Locks required: sched_mutex
173  */
174 //@cindex next_thread_id
175 StgThreadID next_thread_id = 1;
176
177 /*
178  * Pointers to the state of the current thread.
179  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
180  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
181  */
182  
183 /* The smallest stack size that makes any sense is:
184  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
185  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
186  *  + 1                       (the realworld token for an IO thread)
187  *  + 1                       (the closure to enter)
188  *
189  * A thread with this stack will bomb immediately with a stack
190  * overflow, which will increase its stack size.  
191  */
192
193 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
194
195 /* Free capability list.
196  * Locks required: sched_mutex.
197  */
198 #ifdef SMP
199 //@cindex free_capabilities
200 //@cindex n_free_capabilities
201 Capability *free_capabilities; /* Available capabilities for running threads */
202 nat n_free_capabilities;       /* total number of available capabilities */
203 #else
204 //@cindex MainRegTable
205 Capability MainRegTable;       /* for non-SMP, we have one global capability */
206 #endif
207
208 #if defined(GRAN)
209 StgTSO      *CurrentTSOs[MAX_PROC];
210 #else
211 StgTSO      *CurrentTSO;
212 #endif
213
214 rtsBool ready_to_gc;
215
216 /* All our current task ids, saved in case we need to kill them later.
217  */
218 #ifdef SMP
219 //@cindex task_ids
220 task_info *task_ids;
221 #endif
222
223 void            addToBlockedQueue ( StgTSO *tso );
224
225 static void     schedule          ( void );
226        void     interruptStgRts   ( void );
227 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
228
229 #ifdef DEBUG
230 static void sched_belch(char *s, ...);
231 #endif
232
233 #ifdef SMP
234 //@cindex sched_mutex
235 //@cindex term_mutex
236 //@cindex thread_ready_cond
237 //@cindex gc_pending_cond
238 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
239 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
240 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
241 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
242
243 nat await_death;
244 #endif
245
246 #if defined(PAR)
247 StgTSO *LastTSO;
248 rtsTime TimeOfLastYield;
249 #endif
250
251 /*
252  * The thread state for the main thread.
253 // ToDo: check whether not needed any more
254 StgTSO   *MainTSO;
255  */
256
257
258 //@node Prototypes, Main scheduling loop, Variables and Data structures, Main scheduling code
259 //@subsection Prototypes
260
261 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
262 //@subsection Main scheduling loop
263
264 /* ---------------------------------------------------------------------------
265    Main scheduling loop.
266
267    We use round-robin scheduling, each thread returning to the
268    scheduler loop when one of these conditions is detected:
269
270       * out of heap space
271       * timer expires (thread yields)
272       * thread blocks
273       * thread ends
274       * stack overflow
275
276    Locking notes:  we acquire the scheduler lock once at the beginning
277    of the scheduler loop, and release it when
278     
279       * running a thread, or
280       * waiting for work, or
281       * waiting for a GC to complete.
282
283    ------------------------------------------------------------------------ */
284 //@cindex schedule
285 static void
286 schedule( void )
287 {
288   StgTSO *t;
289   Capability *cap;
290   StgThreadReturnCode ret;
291 #if defined(GRAN)
292   rtsEvent *event;
293 #elif defined(PAR)
294   rtsSpark spark;
295   StgTSO *tso;
296   GlobalTaskId pe;
297 #endif
298   
299   ACQUIRE_LOCK(&sched_mutex);
300
301 #if defined(GRAN)
302 # error ToDo: implement GranSim scheduler
303 #elif defined(PAR)
304   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
305
306     if (PendingFetches != END_BF_QUEUE) {
307         processFetches();
308     }
309 #else
310   while (1) {
311 #endif
312
313     /* If we're interrupted (the user pressed ^C, or some other
314      * termination condition occurred), kill all the currently running
315      * threads.
316      */
317     if (interrupted) {
318       IF_DEBUG(scheduler, sched_belch("interrupted"));
319       for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
320         deleteThread(t);
321       }
322       for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
323         deleteThread(t);
324       }
325       run_queue_hd = run_queue_tl = END_TSO_QUEUE;
326       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
327     }
328
329     /* Go through the list of main threads and wake up any
330      * clients whose computations have finished.  ToDo: this
331      * should be done more efficiently without a linear scan
332      * of the main threads list, somehow...
333      */
334 #ifdef SMP
335     { 
336       StgMainThread *m, **prev;
337       prev = &main_threads;
338       for (m = main_threads; m != NULL; m = m->link) {
339         switch (m->tso->whatNext) {
340         case ThreadComplete:
341           if (m->ret) {
342             *(m->ret) = (StgClosure *)m->tso->sp[0];
343           }
344           *prev = m->link;
345           m->stat = Success;
346           pthread_cond_broadcast(&m->wakeup);
347           break;
348         case ThreadKilled:
349           *prev = m->link;
350           m->stat = Killed;
351           pthread_cond_broadcast(&m->wakeup);
352           break;
353         default:
354           break;
355         }
356       }
357     }
358 #else
359     /* If our main thread has finished or been killed, return.
360      */
361     {
362       StgMainThread *m = main_threads;
363       if (m->tso->whatNext == ThreadComplete
364           || m->tso->whatNext == ThreadKilled) {
365         main_threads = main_threads->link;
366         if (m->tso->whatNext == ThreadComplete) {
367           /* we finished successfully, fill in the return value */
368           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
369           m->stat = Success;
370           return;
371         } else {
372           m->stat = Killed;
373           return;
374         }
375       }
376     }
377 #endif
378
379     /* Top up the run queue from our spark pool.  We try to make the
380      * number of threads in the run queue equal to the number of
381      * free capabilities.
382      */
383 #if defined(SMP)
384     {
385       nat n = n_free_capabilities;
386       StgTSO *tso = run_queue_hd;
387
388       /* Count the run queue */
389       while (n > 0 && tso != END_TSO_QUEUE) {
390         tso = tso->link;
391         n--;
392       }
393
394       for (; n > 0; n--) {
395         StgClosure *spark;
396         spark = findSpark();
397         if (spark == NULL) {
398           break; /* no more sparks in the pool */
399         } else {
400           /* I'd prefer this to be done in activateSpark -- HWL */
401           /* tricky - it needs to hold the scheduler lock and
402            * not try to re-acquire it -- SDM */
403           StgTSO *tso;
404           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
405           pushClosure(tso,spark);
406           PUSH_ON_RUN_QUEUE(tso);
407 #ifdef PAR
408           advisory_thread_count++;
409 #endif
410           
411           IF_DEBUG(scheduler,
412                    sched_belch("turning spark of closure %p into a thread",
413                                (StgClosure *)spark));
414         }
415       }
416       /* We need to wake up the other tasks if we just created some
417        * work for them.
418        */
419       if (n_free_capabilities - n > 1) {
420           pthread_cond_signal(&thread_ready_cond);
421       }
422     }
423 #endif /* SMP */
424
425     /* Check whether any waiting threads need to be woken up.  If the
426      * run queue is empty, and there are no other tasks running, we
427      * can wait indefinitely for something to happen.
428      * ToDo: what if another client comes along & requests another
429      * main thread?
430      */
431     if (blocked_queue_hd != END_TSO_QUEUE) {
432       awaitEvent(
433            (run_queue_hd == END_TSO_QUEUE)
434 #ifdef SMP
435         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
436 #endif
437         );
438     }
439     
440     /* check for signals each time around the scheduler */
441 #ifndef __MINGW32__
442     if (signals_pending()) {
443       start_signal_handlers();
444     }
445 #endif
446
447     /* Detect deadlock: when we have no threads to run, there are
448      * no threads waiting on I/O or sleeping, and all the other
449      * tasks are waiting for work, we must have a deadlock.  Inform
450      * all the main threads.
451      */
452 #ifdef SMP
453     if (blocked_queue_hd == END_TSO_QUEUE
454         && run_queue_hd == END_TSO_QUEUE
455         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
456         ) {
457       StgMainThread *m;
458       for (m = main_threads; m != NULL; m = m->link) {
459           m->ret = NULL;
460           m->stat = Deadlock;
461           pthread_cond_broadcast(&m->wakeup);
462       }
463       main_threads = NULL;
464     }
465 #else /* ! SMP */
466     if (blocked_queue_hd == END_TSO_QUEUE
467         && run_queue_hd == END_TSO_QUEUE) {
468       StgMainThread *m = main_threads;
469       m->ret = NULL;
470       m->stat = Deadlock;
471       main_threads = m->link;
472       return;
473     }
474 #endif
475
476 #ifdef SMP
477     /* If there's a GC pending, don't do anything until it has
478      * completed.
479      */
480     if (ready_to_gc) {
481       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
482       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
483     }
484     
485     /* block until we've got a thread on the run queue and a free
486      * capability.
487      */
488     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
489       IF_DEBUG(scheduler, sched_belch("waiting for work"));
490       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
491       IF_DEBUG(scheduler, sched_belch("work now available"));
492     }
493 #endif
494
495 #if defined(GRAN)
496 # error ToDo: implement GranSim scheduler
497 #elif defined(PAR)
498     /* ToDo: phps merge with spark activation above */
499     /* check whether we have local work and send requests if we have none */
500     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
501       /* :-[  no local threads => look out for local sparks */
502       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
503           (pending_sparks_hd[REQUIRED_POOL] < pending_sparks_tl[REQUIRED_POOL] ||
504            pending_sparks_hd[ADVISORY_POOL] < pending_sparks_tl[ADVISORY_POOL])) {
505         /* 
506          * ToDo: add GC code check that we really have enough heap afterwards!!
507          * Old comment:
508          * If we're here (no runnable threads) and we have pending
509          * sparks, we must have a space problem.  Get enough space
510          * to turn one of those pending sparks into a
511          * thread... 
512          */
513         
514         spark = findSpark();                /* get a spark */
515         if (spark != (rtsSpark) NULL) {
516           tso = activateSpark(spark);       /* turn the spark into a thread */
517           IF_PAR_DEBUG(verbose,
518                        belch("== [%x] schedule: Created TSO %p (%d); %d threads active",
519                              mytid, tso, tso->id, advisory_thread_count));
520
521           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
522             belch("^^ failed to activate spark");
523             goto next_thread;
524           }               /* otherwise fall through & pick-up new tso */
525         } else {
526           IF_PAR_DEBUG(verbose,
527                        belch("^^ no local sparks (spark pool contains only NFs: %d)", 
528                              spark_queue_len(ADVISORY_POOL)));
529           goto next_thread;
530         }
531       } else  
532       /* =8-[  no local sparks => look for work on other PEs */
533       {
534         /*
535          * We really have absolutely no work.  Send out a fish
536          * (there may be some out there already), and wait for
537          * something to arrive.  We clearly can't run any threads
538          * until a SCHEDULE or RESUME arrives, and so that's what
539          * we're hoping to see.  (Of course, we still have to
540          * respond to other types of messages.)
541          */
542         if (//!fishing &&  
543             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
544           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
545           /* fishing set in sendFish, processFish;
546              avoid flooding system with fishes via delay */
547           pe = choosePE();
548           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
549                    NEW_FISH_HUNGER);
550         }
551         
552         processMessages();
553         goto next_thread;
554         // ReSchedule(0);
555       }
556     } else if (PacketsWaiting()) {  /* Look for incoming messages */
557       processMessages();
558     }
559
560     /* Now we are sure that we have some work available */
561     ASSERT(run_queue_hd != END_TSO_QUEUE);
562     /* Take a thread from the run queue, if we have work */
563     t = take_off_run_queue(END_TSO_QUEUE);
564
565     /* ToDo: write something to the log-file
566     if (RTSflags.ParFlags.granSimStats && !sameThread)
567         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
568     */
569
570     CurrentTSO = t;
571
572     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; lim=%x)", 
573                               spark_queue_len(ADVISORY_POOL), CURRENT_PROC,
574                               pending_sparks_hd[ADVISORY_POOL], 
575                               pending_sparks_tl[ADVISORY_POOL], 
576                               pending_sparks_lim[ADVISORY_POOL]));
577
578     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
579                               run_queue_len(), CURRENT_PROC,
580                               run_queue_hd, run_queue_tl));
581
582     if (t != LastTSO) {
583       /* 
584          we are running a different TSO, so write a schedule event to log file
585          NB: If we use fair scheduling we also have to write  a deschedule 
586              event for LastTSO; with unfair scheduling we know that the
587              previous tso has blocked whenever we switch to another tso, so
588              we don't need it in GUM for now
589       */
590       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
591                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
592       
593     }
594
595 #else /* !GRAN && !PAR */
596   
597     /* grab a thread from the run queue
598      */
599     t = POP_RUN_QUEUE();
600
601 #endif
602     
603     /* grab a capability
604      */
605 #ifdef SMP
606     cap = free_capabilities;
607     free_capabilities = cap->link;
608     n_free_capabilities--;
609 #else
610     cap = &MainRegTable;
611 #endif
612     
613     cap->rCurrentTSO = t;
614     
615     /* set the context_switch flag
616      */
617     if (run_queue_hd == END_TSO_QUEUE)
618       context_switch = 0;
619     else
620       context_switch = 1;
621
622     RELEASE_LOCK(&sched_mutex);
623     
624     IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
625
626     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
627     /* Run the current thread 
628      */
629     switch (cap->rCurrentTSO->whatNext) {
630     case ThreadKilled:
631     case ThreadComplete:
632       /* Thread already finished, return to scheduler. */
633       ret = ThreadFinished;
634       break;
635     case ThreadEnterGHC:
636       ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
637       break;
638     case ThreadRunGHC:
639       ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
640       break;
641     case ThreadEnterHugs:
642 #ifdef INTERPRETER
643       {
644          StgClosure* c;
645          IF_DEBUG(scheduler,sched_belch("entering Hugs"));
646          c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
647          cap->rCurrentTSO->sp += 1;
648          ret = enter(cap,c);
649          break;
650       }
651 #else
652       barf("Panic: entered a BCO but no bytecode interpreter in this build");
653 #endif
654     default:
655       barf("schedule: invalid whatNext field");
656     }
657     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
658     
659     /* Costs for the scheduler are assigned to CCS_SYSTEM */
660 #ifdef PROFILING
661     CCCS = CCS_SYSTEM;
662 #endif
663     
664     ACQUIRE_LOCK(&sched_mutex);
665
666 #ifdef SMP
667     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
668 #else
669     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
670 #endif
671     t = cap->rCurrentTSO;
672     
673     switch (ret) {
674     case HeapOverflow:
675       /* make all the running tasks block on a condition variable,
676        * maybe set context_switch and wait till they all pile in,
677        * then have them wait on a GC condition variable.
678        */
679       IF_DEBUG(scheduler,belch("thread %ld stopped: HeapOverflow", t->id));
680       threadPaused(t);
681       
682       ready_to_gc = rtsTrue;
683       context_switch = 1;               /* stop other threads ASAP */
684       PUSH_ON_RUN_QUEUE(t);
685       break;
686       
687     case StackOverflow:
688       /* just adjust the stack for this thread, then pop it back
689        * on the run queue.
690        */
691       IF_DEBUG(scheduler,belch("thread %ld stopped, StackOverflow", t->id));
692       threadPaused(t);
693       { 
694         StgMainThread *m;
695         /* enlarge the stack */
696         StgTSO *new_t = threadStackOverflow(t);
697         
698         /* This TSO has moved, so update any pointers to it from the
699          * main thread stack.  It better not be on any other queues...
700          * (it shouldn't be)
701          */
702         for (m = main_threads; m != NULL; m = m->link) {
703           if (m->tso == t) {
704             m->tso = new_t;
705           }
706         }
707         PUSH_ON_RUN_QUEUE(new_t);
708       }
709       break;
710
711     case ThreadYielding:
712 #if defined(GRAN)
713       IF_DEBUG(gran, 
714                DumpGranEvent(GR_DESCHEDULE, t));
715       globalGranStats.tot_yields++;
716 #elif defined(PAR)
717       IF_DEBUG(par, 
718                DumpGranEvent(GR_DESCHEDULE, t));
719 #endif
720       /* put the thread back on the run queue.  Then, if we're ready to
721        * GC, check whether this is the last task to stop.  If so, wake
722        * up the GC thread.  getThread will block during a GC until the
723        * GC is finished.
724        */
725       IF_DEBUG(scheduler,
726                if (t->whatNext == ThreadEnterHugs) {
727                  /* ToDo: or maybe a timer expired when we were in Hugs?
728                   * or maybe someone hit ctrl-C
729                   */
730                  belch("thread %ld stopped to switch to Hugs", t->id);
731                } else {
732                  belch("thread %ld stopped, yielding", t->id);
733                }
734                );
735       threadPaused(t);
736       APPEND_TO_RUN_QUEUE(t);
737       break;
738       
739     case ThreadBlocked:
740 #if defined(GRAN)
741 # error ToDo: implement GranSim scheduler
742 #elif defined(PAR)
743       IF_DEBUG(par, 
744                DumpGranEvent(GR_DESCHEDULE, t)); 
745 #else
746 #endif
747       /* don't need to do anything.  Either the thread is blocked on
748        * I/O, in which case we'll have called addToBlockedQueue
749        * previously, or it's blocked on an MVar or Blackhole, in which
750        * case it'll be on the relevant queue already.
751        */
752       IF_DEBUG(scheduler,
753                fprintf(stderr, "thread %d stopped, ", t->id);
754                printThreadBlockage(t);
755                fprintf(stderr, "\n"));
756       threadPaused(t);
757       break;
758       
759     case ThreadFinished:
760       /* Need to check whether this was a main thread, and if so, signal
761        * the task that started it with the return value.  If we have no
762        * more main threads, we probably need to stop all the tasks until
763        * we get a new one.
764        */
765       IF_DEBUG(scheduler,belch("thread %ld finished", t->id));
766       t->whatNext = ThreadComplete;
767 #if defined(GRAN)
768       // ToDo: endThread(t, CurrentProc); // clean-up the thread
769 #elif defined(PAR)
770       advisory_thread_count--;
771       if (RtsFlags.ParFlags.ParStats.Full) 
772         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
773 #endif
774       break;
775       
776     default:
777       barf("doneThread: invalid thread return code");
778     }
779     
780 #ifdef SMP
781     cap->link = free_capabilities;
782     free_capabilities = cap;
783     n_free_capabilities++;
784 #endif
785
786 #ifdef SMP
787     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
788 #else
789     if (ready_to_gc) 
790 #endif
791       {
792       /* everybody back, start the GC.
793        * Could do it in this thread, or signal a condition var
794        * to do it in another thread.  Either way, we need to
795        * broadcast on gc_pending_cond afterward.
796        */
797 #ifdef SMP
798       IF_DEBUG(scheduler,sched_belch("doing GC"));
799 #endif
800       GarbageCollect(GetRoots);
801       ready_to_gc = rtsFalse;
802 #ifdef SMP
803       pthread_cond_broadcast(&gc_pending_cond);
804 #endif
805     }
806 #if defined(GRAN)
807   next_thread:
808     IF_GRAN_DEBUG(unused,
809                   print_eventq(EventHd));
810
811     event = get_next_event();
812
813 #elif defined(PAR)
814   next_thread:
815     /* ToDo: wait for next message to arrive rather than busy wait */
816
817 #else /* GRAN */
818   /* not any more
819   next_thread:
820     t = take_off_run_queue(END_TSO_QUEUE);
821   */
822 #endif /* GRAN */
823   } /* end of while(1) */
824 }
825
826 /* A hack for Hugs concurrency support.  Needs sanitisation (?) */
827 void deleteAllThreads ( void )
828 {
829   StgTSO* t;
830   IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
831   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
832     deleteThread(t);
833   }
834   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
835     deleteThread(t);
836   }
837   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
838   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
839 }
840
841 /* startThread and  insertThread are now in GranSim.c -- HWL */
842
843 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
844 //@subsection Suspend and Resume
845
846 /* ---------------------------------------------------------------------------
847  * Suspending & resuming Haskell threads.
848  * 
849  * When making a "safe" call to C (aka _ccall_GC), the task gives back
850  * its capability before calling the C function.  This allows another
851  * task to pick up the capability and carry on running Haskell
852  * threads.  It also means that if the C call blocks, it won't lock
853  * the whole system.
854  *
855  * The Haskell thread making the C call is put to sleep for the
856  * duration of the call, on the susepended_ccalling_threads queue.  We
857  * give out a token to the task, which it can use to resume the thread
858  * on return from the C function.
859  * ------------------------------------------------------------------------- */
860    
861 StgInt
862 suspendThread( Capability *cap )
863 {
864   nat tok;
865
866   ACQUIRE_LOCK(&sched_mutex);
867
868   IF_DEBUG(scheduler,
869            sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
870
871   threadPaused(cap->rCurrentTSO);
872   cap->rCurrentTSO->link = suspended_ccalling_threads;
873   suspended_ccalling_threads = cap->rCurrentTSO;
874
875   /* Use the thread ID as the token; it should be unique */
876   tok = cap->rCurrentTSO->id;
877
878 #ifdef SMP
879   cap->link = free_capabilities;
880   free_capabilities = cap;
881   n_free_capabilities++;
882 #endif
883
884   RELEASE_LOCK(&sched_mutex);
885   return tok; 
886 }
887
888 Capability *
889 resumeThread( StgInt tok )
890 {
891   StgTSO *tso, **prev;
892   Capability *cap;
893
894   ACQUIRE_LOCK(&sched_mutex);
895
896   prev = &suspended_ccalling_threads;
897   for (tso = suspended_ccalling_threads; 
898        tso != END_TSO_QUEUE; 
899        prev = &tso->link, tso = tso->link) {
900     if (tso->id == (StgThreadID)tok) {
901       *prev = tso->link;
902       break;
903     }
904   }
905   if (tso == END_TSO_QUEUE) {
906     barf("resumeThread: thread not found");
907   }
908
909 #ifdef SMP
910   while (free_capabilities == NULL) {
911     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
912     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
913     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
914   }
915   cap = free_capabilities;
916   free_capabilities = cap->link;
917   n_free_capabilities--;
918 #else  
919   cap = &MainRegTable;
920 #endif
921
922   cap->rCurrentTSO = tso;
923
924   RELEASE_LOCK(&sched_mutex);
925   return cap;
926 }
927
928
929 /* ---------------------------------------------------------------------------
930  * Static functions
931  * ------------------------------------------------------------------------ */
932 static void unblockThread(StgTSO *tso);
933
934 /* ---------------------------------------------------------------------------
935  * Comparing Thread ids.
936  *
937  * This is used from STG land in the implementation of the
938  * instances of Eq/Ord for ThreadIds.
939  * ------------------------------------------------------------------------ */
940
941 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
942
943   StgThreadID id1 = tso1->id; 
944   StgThreadID id2 = tso2->id;
945  
946   if (id1 < id2) return (-1);
947   if (id1 > id2) return 1;
948   return 0;
949 }
950
951 /* ---------------------------------------------------------------------------
952    Create a new thread.
953
954    The new thread starts with the given stack size.  Before the
955    scheduler can run, however, this thread needs to have a closure
956    (and possibly some arguments) pushed on its stack.  See
957    pushClosure() in Schedule.h.
958
959    createGenThread() and createIOThread() (in SchedAPI.h) are
960    convenient packaged versions of this function.
961    ------------------------------------------------------------------------ */
962 //@cindex createThread
963 #if defined(GRAN)
964 /* currently pri (priority) is only used in a GRAN setup -- HWL */
965 StgTSO *
966 createThread(nat stack_size, StgInt pri)
967 {
968   return createThread_(stack_size, rtsFalse, pri);
969 }
970
971 static StgTSO *
972 createThread_(nat size, rtsBool have_lock, StgInt pri)
973 {
974 #else
975 StgTSO *
976 createThread(nat stack_size)
977 {
978   return createThread_(stack_size, rtsFalse);
979 }
980
981 static StgTSO *
982 createThread_(nat size, rtsBool have_lock)
983 {
984 #endif
985     StgTSO *tso;
986     nat stack_size;
987
988     /* First check whether we should create a thread at all */
989 #if defined(PAR)
990   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
991   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
992     threadsIgnored++;
993     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
994           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
995     return END_TSO_QUEUE;
996   }
997   threadsCreated++;
998 #endif
999
1000 #if defined(GRAN)
1001   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1002 #endif
1003
1004   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1005
1006   /* catch ridiculously small stack sizes */
1007   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1008     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1009   }
1010
1011   tso = (StgTSO *)allocate(size);
1012   TICK_ALLOC_TSO(size-sizeofW(StgTSO),0);
1013   
1014   stack_size = size - TSO_STRUCT_SIZEW;
1015
1016   // Hmm, this CCS_MAIN is not protected by a PROFILING cpp var;
1017   SET_HDR(tso, &TSO_info, CCS_MAIN);
1018 #if defined(GRAN)
1019   SET_GRAN_HDR(tso, ThisPE);
1020 #endif
1021   tso->whatNext     = ThreadEnterGHC;
1022
1023   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1024          protect the increment operation on next_thread_id.
1025          In future, we could use an atomic increment instead.
1026   */
1027   
1028   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1029   tso->id = next_thread_id++; 
1030   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1031
1032   tso->why_blocked  = NotBlocked;
1033   tso->blocked_exceptions = NULL;
1034
1035   tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1036   tso->stack_size   = stack_size;
1037   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1038                               - TSO_STRUCT_SIZEW;
1039   tso->sp           = (P_)&(tso->stack) + stack_size;
1040
1041 #ifdef PROFILING
1042   tso->prof.CCCS = CCS_MAIN;
1043 #endif
1044
1045   /* put a stop frame on the stack */
1046   tso->sp -= sizeofW(StgStopFrame);
1047   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
1048   tso->su = (StgUpdateFrame*)tso->sp;
1049
1050   IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words", 
1051                            tso->id, tso, tso->stack_size));
1052
1053   // ToDo: check this
1054 #if defined(GRAN)
1055   tso->link = END_TSO_QUEUE;
1056   /* uses more flexible routine in GranSim */
1057   insertThread(tso, CurrentProc);
1058 #else
1059   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1060      from its creation
1061   */
1062 #endif
1063
1064 #if defined(GRAN)
1065   tso->gran.pri = pri;
1066   tso->gran.magic = TSO_MAGIC; // debugging only
1067   tso->gran.sparkname   = 0;
1068   tso->gran.startedat   = CURRENT_TIME; 
1069   tso->gran.exported    = 0;
1070   tso->gran.basicblocks = 0;
1071   tso->gran.allocs      = 0;
1072   tso->gran.exectime    = 0;
1073   tso->gran.fetchtime   = 0;
1074   tso->gran.fetchcount  = 0;
1075   tso->gran.blocktime   = 0;
1076   tso->gran.blockcount  = 0;
1077   tso->gran.blockedat   = 0;
1078   tso->gran.globalsparks = 0;
1079   tso->gran.localsparks  = 0;
1080   if (RtsFlags.GranFlags.Light)
1081     tso->gran.clock  = Now; /* local clock */
1082   else
1083     tso->gran.clock  = 0;
1084
1085   IF_DEBUG(gran,printTSO(tso));
1086 #elif defined(PAR)
1087   tso->par.sparkname   = 0;
1088   tso->par.startedat   = CURRENT_TIME; 
1089   tso->par.exported    = 0;
1090   tso->par.basicblocks = 0;
1091   tso->par.allocs      = 0;
1092   tso->par.exectime    = 0;
1093   tso->par.fetchtime   = 0;
1094   tso->par.fetchcount  = 0;
1095   tso->par.blocktime   = 0;
1096   tso->par.blockcount  = 0;
1097   tso->par.blockedat   = 0;
1098   tso->par.globalsparks = 0;
1099   tso->par.localsparks  = 0;
1100 #endif
1101
1102 #if defined(GRAN)
1103   globalGranStats.tot_threads_created++;
1104   globalGranStats.threads_created_on_PE[CurrentProc]++;
1105   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1106   globalGranStats.tot_sq_probes++;
1107 #endif 
1108
1109   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1110                                  tso->id, tso->stack_size));
1111   return tso;
1112 }
1113
1114 /* ---------------------------------------------------------------------------
1115  * scheduleThread()
1116  *
1117  * scheduleThread puts a thread on the head of the runnable queue.
1118  * This will usually be done immediately after a thread is created.
1119  * The caller of scheduleThread must create the thread using e.g.
1120  * createThread and push an appropriate closure
1121  * on this thread's stack before the scheduler is invoked.
1122  * ------------------------------------------------------------------------ */
1123
1124 void
1125 scheduleThread(StgTSO *tso)
1126 {
1127   ACQUIRE_LOCK(&sched_mutex);
1128
1129   /* Put the new thread on the head of the runnable queue.  The caller
1130    * better push an appropriate closure on this thread's stack
1131    * beforehand.  In the SMP case, the thread may start running as
1132    * soon as we release the scheduler lock below.
1133    */
1134   PUSH_ON_RUN_QUEUE(tso);
1135   THREAD_RUNNABLE();
1136
1137   IF_DEBUG(scheduler,printTSO(tso));
1138   RELEASE_LOCK(&sched_mutex);
1139 }
1140
1141 /* ---------------------------------------------------------------------------
1142  * startTasks()
1143  *
1144  * Start up Posix threads to run each of the scheduler tasks.
1145  * I believe the task ids are not needed in the system as defined.
1146  *  KH @ 25/10/99
1147  * ------------------------------------------------------------------------ */
1148
1149 #ifdef SMP
1150 static void *
1151 taskStart( void *arg STG_UNUSED )
1152 {
1153   schedule();
1154   return NULL;
1155 }
1156 #endif
1157
1158 /* ---------------------------------------------------------------------------
1159  * initScheduler()
1160  *
1161  * Initialise the scheduler.  This resets all the queues - if the
1162  * queues contained any threads, they'll be garbage collected at the
1163  * next pass.
1164  *
1165  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1166  * ------------------------------------------------------------------------ */
1167
1168 #ifdef SMP
1169 static void
1170 term_handler(int sig STG_UNUSED)
1171 {
1172   stat_workerStop();
1173   ACQUIRE_LOCK(&term_mutex);
1174   await_death--;
1175   RELEASE_LOCK(&term_mutex);
1176   pthread_exit(NULL);
1177 }
1178 #endif
1179
1180 //@cindex initScheduler
1181 void 
1182 initScheduler(void)
1183 {
1184 #if defined(GRAN)
1185   nat i;
1186
1187   for (i=0; i<=MAX_PROC; i++) {
1188     run_queue_hds[i]      = END_TSO_QUEUE;
1189     run_queue_tls[i]      = END_TSO_QUEUE;
1190     blocked_queue_hds[i]  = END_TSO_QUEUE;
1191     blocked_queue_tls[i]  = END_TSO_QUEUE;
1192     ccalling_threadss[i]  = END_TSO_QUEUE;
1193   }
1194 #else
1195   run_queue_hd      = END_TSO_QUEUE;
1196   run_queue_tl      = END_TSO_QUEUE;
1197   blocked_queue_hd  = END_TSO_QUEUE;
1198   blocked_queue_tl  = END_TSO_QUEUE;
1199 #endif 
1200
1201   suspended_ccalling_threads  = END_TSO_QUEUE;
1202
1203   main_threads = NULL;
1204
1205   context_switch = 0;
1206   interrupted    = 0;
1207
1208   enteredCAFs = END_CAF_LIST;
1209
1210   /* Install the SIGHUP handler */
1211 #ifdef SMP
1212   {
1213     struct sigaction action,oact;
1214
1215     action.sa_handler = term_handler;
1216     sigemptyset(&action.sa_mask);
1217     action.sa_flags = 0;
1218     if (sigaction(SIGTERM, &action, &oact) != 0) {
1219       barf("can't install TERM handler");
1220     }
1221   }
1222 #endif
1223
1224 #ifdef SMP
1225   /* Allocate N Capabilities */
1226   {
1227     nat i;
1228     Capability *cap, *prev;
1229     cap  = NULL;
1230     prev = NULL;
1231     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1232       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1233       cap->link = prev;
1234       prev = cap;
1235     }
1236     free_capabilities = cap;
1237     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1238   }
1239   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1240                              n_free_capabilities););
1241 #endif
1242
1243 #if defined(SMP) || defined(PAR)
1244   initSparkPools();
1245 #endif
1246 }
1247
1248 #ifdef SMP
1249 void
1250 startTasks( void )
1251 {
1252   nat i;
1253   int r;
1254   pthread_t tid;
1255   
1256   /* make some space for saving all the thread ids */
1257   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1258                             "initScheduler:task_ids");
1259   
1260   /* and create all the threads */
1261   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1262     r = pthread_create(&tid,NULL,taskStart,NULL);
1263     if (r != 0) {
1264       barf("startTasks: Can't create new Posix thread");
1265     }
1266     task_ids[i].id = tid;
1267     task_ids[i].mut_time = 0.0;
1268     task_ids[i].mut_etime = 0.0;
1269     task_ids[i].gc_time = 0.0;
1270     task_ids[i].gc_etime = 0.0;
1271     task_ids[i].elapsedtimestart = elapsedtime();
1272     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1273   }
1274 }
1275 #endif
1276
1277 void
1278 exitScheduler( void )
1279 {
1280 #ifdef SMP
1281   nat i;
1282
1283   /* Don't want to use pthread_cancel, since we'd have to install
1284    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1285    * all our locks.
1286    */
1287 #if 0
1288   /* Cancel all our tasks */
1289   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1290     pthread_cancel(task_ids[i].id);
1291   }
1292   
1293   /* Wait for all the tasks to terminate */
1294   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1295     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1296                                task_ids[i].id));
1297     pthread_join(task_ids[i].id, NULL);
1298   }
1299 #endif
1300
1301   /* Send 'em all a SIGHUP.  That should shut 'em up.
1302    */
1303   await_death = RtsFlags.ParFlags.nNodes;
1304   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1305     pthread_kill(task_ids[i].id,SIGTERM);
1306   }
1307   while (await_death > 0) {
1308     sched_yield();
1309   }
1310 #endif
1311 }
1312
1313 /* -----------------------------------------------------------------------------
1314    Managing the per-task allocation areas.
1315    
1316    Each capability comes with an allocation area.  These are
1317    fixed-length block lists into which allocation can be done.
1318
1319    ToDo: no support for two-space collection at the moment???
1320    -------------------------------------------------------------------------- */
1321
1322 /* -----------------------------------------------------------------------------
1323  * waitThread is the external interface for running a new computataion
1324  * and waiting for the result.
1325  *
1326  * In the non-SMP case, we create a new main thread, push it on the 
1327  * main-thread stack, and invoke the scheduler to run it.  The
1328  * scheduler will return when the top main thread on the stack has
1329  * completed or died, and fill in the necessary fields of the
1330  * main_thread structure.
1331  *
1332  * In the SMP case, we create a main thread as before, but we then
1333  * create a new condition variable and sleep on it.  When our new
1334  * main thread has completed, we'll be woken up and the status/result
1335  * will be in the main_thread struct.
1336  * -------------------------------------------------------------------------- */
1337
1338 SchedulerStatus
1339 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1340 {
1341   StgMainThread *m;
1342   SchedulerStatus stat;
1343
1344   ACQUIRE_LOCK(&sched_mutex);
1345   
1346   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1347
1348   m->tso = tso;
1349   m->ret = ret;
1350   m->stat = NoStatus;
1351 #ifdef SMP
1352   pthread_cond_init(&m->wakeup, NULL);
1353 #endif
1354
1355   m->link = main_threads;
1356   main_threads = m;
1357
1358   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1359                               m->tso->id));
1360
1361 #ifdef SMP
1362   do {
1363     pthread_cond_wait(&m->wakeup, &sched_mutex);
1364   } while (m->stat == NoStatus);
1365 #else
1366   schedule();
1367   ASSERT(m->stat != NoStatus);
1368 #endif
1369
1370   stat = m->stat;
1371
1372 #ifdef SMP
1373   pthread_cond_destroy(&m->wakeup);
1374 #endif
1375
1376   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1377                               m->tso->id));
1378   free(m);
1379
1380   RELEASE_LOCK(&sched_mutex);
1381
1382   return stat;
1383 }
1384
1385 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1386 //@subsection Run queue code 
1387
1388 #if 0
1389 /* 
1390    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1391        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1392        implicit global variable that has to be correct when calling these
1393        fcts -- HWL 
1394 */
1395
1396 /* Put the new thread on the head of the runnable queue.
1397  * The caller of createThread better push an appropriate closure
1398  * on this thread's stack before the scheduler is invoked.
1399  */
1400 static /* inline */ void
1401 add_to_run_queue(tso)
1402 StgTSO* tso; 
1403 {
1404   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1405   tso->link = run_queue_hd;
1406   run_queue_hd = tso;
1407   if (run_queue_tl == END_TSO_QUEUE) {
1408     run_queue_tl = tso;
1409   }
1410 }
1411
1412 /* Put the new thread at the end of the runnable queue. */
1413 static /* inline */ void
1414 push_on_run_queue(tso)
1415 StgTSO* tso; 
1416 {
1417   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1418   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1419   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1420   if (run_queue_hd == END_TSO_QUEUE) {
1421     run_queue_hd = tso;
1422   } else {
1423     run_queue_tl->link = tso;
1424   }
1425   run_queue_tl = tso;
1426 }
1427
1428 /* 
1429    Should be inlined because it's used very often in schedule.  The tso
1430    argument is actually only needed in GranSim, where we want to have the
1431    possibility to schedule *any* TSO on the run queue, irrespective of the
1432    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1433    the run queue and dequeue the tso, adjusting the links in the queue. 
1434 */
1435 //@cindex take_off_run_queue
1436 static /* inline */ StgTSO*
1437 take_off_run_queue(StgTSO *tso) {
1438   StgTSO *t, *prev;
1439
1440   /* 
1441      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1442
1443      if tso is specified, unlink that tso from the run_queue (doesn't have
1444      to be at the beginning of the queue); GranSim only 
1445   */
1446   if (tso!=END_TSO_QUEUE) {
1447     /* find tso in queue */
1448     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1449          t!=END_TSO_QUEUE && t!=tso;
1450          prev=t, t=t->link) 
1451       /* nothing */ ;
1452     ASSERT(t==tso);
1453     /* now actually dequeue the tso */
1454     if (prev!=END_TSO_QUEUE) {
1455       ASSERT(run_queue_hd!=t);
1456       prev->link = t->link;
1457     } else {
1458       /* t is at beginning of thread queue */
1459       ASSERT(run_queue_hd==t);
1460       run_queue_hd = t->link;
1461     }
1462     /* t is at end of thread queue */
1463     if (t->link==END_TSO_QUEUE) {
1464       ASSERT(t==run_queue_tl);
1465       run_queue_tl = prev;
1466     } else {
1467       ASSERT(run_queue_tl!=t);
1468     }
1469     t->link = END_TSO_QUEUE;
1470   } else {
1471     /* take tso from the beginning of the queue; std concurrent code */
1472     t = run_queue_hd;
1473     if (t != END_TSO_QUEUE) {
1474       run_queue_hd = t->link;
1475       t->link = END_TSO_QUEUE;
1476       if (run_queue_hd == END_TSO_QUEUE) {
1477         run_queue_tl = END_TSO_QUEUE;
1478       }
1479     }
1480   }
1481   return t;
1482 }
1483
1484 #endif /* 0 */
1485
1486 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1487 //@subsection Garbage Collextion Routines
1488
1489 /* ---------------------------------------------------------------------------
1490    Where are the roots that we know about?
1491
1492         - all the threads on the runnable queue
1493         - all the threads on the blocked queue
1494         - all the thread currently executing a _ccall_GC
1495         - all the "main threads"
1496      
1497    ------------------------------------------------------------------------ */
1498
1499 /* This has to be protected either by the scheduler monitor, or by the
1500         garbage collection monitor (probably the latter).
1501         KH @ 25/10/99
1502 */
1503
1504 static void GetRoots(void)
1505 {
1506   StgMainThread *m;
1507
1508 #if defined(GRAN)
1509   {
1510     nat i;
1511     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1512       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1513         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1514       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1515         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1516       
1517       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1518         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1519       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1520         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1521       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1522         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1523     }
1524   }
1525
1526   markEventQueue();
1527
1528 #else /* !GRAN */
1529   run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1530   run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1531
1532   blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1533   blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1534 #endif 
1535
1536   for (m = main_threads; m != NULL; m = m->link) {
1537     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1538   }
1539   suspended_ccalling_threads = 
1540     (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1541
1542 #if defined(SMP) || defined(PAR) || defined(GRAN)
1543   markSparkQueue();
1544 #endif
1545 }
1546
1547 /* -----------------------------------------------------------------------------
1548    performGC
1549
1550    This is the interface to the garbage collector from Haskell land.
1551    We provide this so that external C code can allocate and garbage
1552    collect when called from Haskell via _ccall_GC.
1553
1554    It might be useful to provide an interface whereby the programmer
1555    can specify more roots (ToDo).
1556    
1557    This needs to be protected by the GC condition variable above.  KH.
1558    -------------------------------------------------------------------------- */
1559
1560 void (*extra_roots)(void);
1561
1562 void
1563 performGC(void)
1564 {
1565   GarbageCollect(GetRoots);
1566 }
1567
1568 static void
1569 AllRoots(void)
1570 {
1571   GetRoots();                   /* the scheduler's roots */
1572   extra_roots();                /* the user's roots */
1573 }
1574
1575 void
1576 performGCWithRoots(void (*get_roots)(void))
1577 {
1578   extra_roots = get_roots;
1579
1580   GarbageCollect(AllRoots);
1581 }
1582
1583 /* -----------------------------------------------------------------------------
1584    Stack overflow
1585
1586    If the thread has reached its maximum stack size,
1587    then bomb out.  Otherwise relocate the TSO into a larger chunk of
1588    memory and adjust its stack size appropriately.
1589    -------------------------------------------------------------------------- */
1590
1591 static StgTSO *
1592 threadStackOverflow(StgTSO *tso)
1593 {
1594   nat new_stack_size, new_tso_size, diff, stack_words;
1595   StgPtr new_sp;
1596   StgTSO *dest;
1597
1598   if (tso->stack_size >= tso->max_stack_size) {
1599 #if 0
1600     /* If we're debugging, just print out the top of the stack */
1601     printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
1602                                      tso->sp+64));
1603 #endif
1604 #ifdef INTERPRETER
1605     fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
1606     exit(1);
1607 #else
1608     /* Send this thread the StackOverflow exception */
1609     raiseAsync(tso, (StgClosure *)&stackOverflow_closure);
1610 #endif
1611     return tso;
1612   }
1613
1614   /* Try to double the current stack size.  If that takes us over the
1615    * maximum stack size for this thread, then use the maximum instead.
1616    * Finally round up so the TSO ends up as a whole number of blocks.
1617    */
1618   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
1619   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
1620                                        TSO_STRUCT_SIZE)/sizeof(W_);
1621   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
1622   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
1623
1624   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
1625
1626   dest = (StgTSO *)allocate(new_tso_size);
1627   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
1628
1629   /* copy the TSO block and the old stack into the new area */
1630   memcpy(dest,tso,TSO_STRUCT_SIZE);
1631   stack_words = tso->stack + tso->stack_size - tso->sp;
1632   new_sp = (P_)dest + new_tso_size - stack_words;
1633   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
1634
1635   /* relocate the stack pointers... */
1636   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
1637   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
1638   dest->sp    = new_sp;
1639   dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
1640   dest->stack_size = new_stack_size;
1641         
1642   /* and relocate the update frame list */
1643   relocate_TSO(tso, dest);
1644
1645   /* Mark the old one as dead so we don't try to scavenge it during
1646    * garbage collection (the TSO will likely be on a mutables list in
1647    * some generation, but it'll get collected soon enough).  It's
1648    * important to set the sp and su values to just beyond the end of
1649    * the stack, so we don't attempt to scavenge any part of the dead
1650    * TSO's stack.
1651    */
1652   tso->whatNext = ThreadKilled;
1653   tso->sp = (P_)&(tso->stack[tso->stack_size]);
1654   tso->su = (StgUpdateFrame *)tso->sp;
1655   tso->why_blocked = NotBlocked;
1656   dest->mut_link = NULL;
1657
1658   IF_DEBUG(sanity,checkTSO(tso));
1659 #if 0
1660   IF_DEBUG(scheduler,printTSO(dest));
1661 #endif
1662
1663 #if 0
1664   /* This will no longer work: KH */
1665   if (tso == MainTSO) { /* hack */
1666       MainTSO = dest;
1667   }
1668 #endif
1669   return dest;
1670 }
1671
1672 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
1673 //@subsection Blocking Queue Routines
1674
1675 /* ---------------------------------------------------------------------------
1676    Wake up a queue that was blocked on some resource.
1677    ------------------------------------------------------------------------ */
1678
1679 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
1680
1681 #if defined(GRAN)
1682 static inline void
1683 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
1684 {
1685 }
1686 #elif defined(PAR)
1687 static inline void
1688 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
1689 {
1690   /* write RESUME events to log file and
1691      update blocked and fetch time (depending on type of the orig closure) */
1692   if (RtsFlags.ParFlags.ParStats.Full) {
1693     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
1694                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
1695                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
1696
1697     switch (get_itbl(node)->type) {
1698         case FETCH_ME_BQ:
1699           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
1700           break;
1701         case RBH:
1702         case FETCH_ME:
1703         case BLACKHOLE_BQ:
1704           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
1705           break;
1706         default:
1707           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
1708         }
1709       }
1710 }
1711 #endif
1712
1713 #if defined(GRAN)
1714 static StgBlockingQueueElement *
1715 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
1716 {
1717     StgBlockingQueueElement *next;
1718     PEs node_loc, tso_loc;
1719
1720     node_loc = where_is(node); // should be lifted out of loop
1721     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
1722     tso_loc = where_is(tso);
1723     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
1724       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
1725       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
1726       bq_processing_time += RtsFlags.GranFlags.Costs.lunblocktime;
1727       // insertThread(tso, node_loc);
1728       new_event(tso_loc, tso_loc,
1729                 CurrentTime[CurrentProc]+bq_processing_time,
1730                 ResumeThread,
1731                 tso, node, (rtsSpark*)NULL);
1732       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
1733       // len_local++;
1734       // len++;
1735     } else { // TSO is remote (actually should be FMBQ)
1736       bq_processing_time += RtsFlags.GranFlags.Costs.mpacktime;
1737       bq_processing_time += RtsFlags.GranFlags.Costs.gunblocktime;
1738       new_event(tso_loc, CurrentProc, 
1739                 CurrentTime[CurrentProc]+bq_processing_time+
1740                 RtsFlags.GranFlags.Costs.latency,
1741                 UnblockThread,
1742                 tso, node, (rtsSpark*)NULL);
1743       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
1744       bq_processing_time += RtsFlags.GranFlags.Costs.mtidytime;
1745       // len++;
1746     }      
1747     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
1748     IF_GRAN_DEBUG(bq,
1749                   fprintf(stderr," %s TSO %d (%p) [PE %d] (blocked_on=%p) (next=%p) ,",
1750                           (node_loc==tso_loc ? "Local" : "Global"), 
1751                           tso->id, tso, CurrentProc, tso->blocked_on, tso->link))
1752     tso->blocked_on = NULL;
1753     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
1754                              tso->id, tso));
1755   }
1756
1757   /* if this is the BQ of an RBH, we have to put back the info ripped out of
1758      the closure to make room for the anchor of the BQ */
1759   if (next!=END_BQ_QUEUE) {
1760     ASSERT(get_itbl(node)->type == RBH && get_itbl(next)->type == CONSTR);
1761     /*
1762     ASSERT((info_ptr==&RBH_Save_0_info) ||
1763            (info_ptr==&RBH_Save_1_info) ||
1764            (info_ptr==&RBH_Save_2_info));
1765     */
1766     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
1767     ((StgRBH *)node)->blocking_queue = ((StgRBHSave *)next)->payload[0];
1768     ((StgRBH *)node)->mut_link       = ((StgRBHSave *)next)->payload[1];
1769
1770     IF_GRAN_DEBUG(bq,
1771                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
1772                         node, info_type(node)));
1773   }
1774 }
1775 #elif defined(PAR)
1776 static StgBlockingQueueElement *
1777 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
1778 {
1779     StgBlockingQueueElement *next;
1780
1781     switch (get_itbl(bqe)->type) {
1782     case TSO:
1783       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
1784       /* if it's a TSO just push it onto the run_queue */
1785       next = bqe->link;
1786       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
1787       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
1788       THREAD_RUNNABLE();
1789       unblockCount(bqe, node);
1790       /* reset blocking status after dumping event */
1791       ((StgTSO *)bqe)->why_blocked = NotBlocked;
1792       break;
1793
1794     case BLOCKED_FETCH:
1795       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
1796       next = bqe->link;
1797       bqe->link = PendingFetches;
1798       PendingFetches = bqe;
1799       break;
1800
1801 # if defined(DEBUG)
1802       /* can ignore this case in a non-debugging setup; 
1803          see comments on RBHSave closures above */
1804     case CONSTR:
1805       /* check that the closure is an RBHSave closure */
1806       ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
1807              get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
1808              get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
1809       break;
1810
1811     default:
1812       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
1813            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
1814            (StgClosure *)bqe);
1815 # endif
1816     }
1817   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
1818   return next;
1819 }
1820
1821 #else /* !GRAN && !PAR */
1822 static StgTSO *
1823 unblockOneLocked(StgTSO *tso)
1824 {
1825   StgTSO *next;
1826
1827   ASSERT(get_itbl(tso)->type == TSO);
1828   ASSERT(tso->why_blocked != NotBlocked);
1829   tso->why_blocked = NotBlocked;
1830   next = tso->link;
1831   PUSH_ON_RUN_QUEUE(tso);
1832   THREAD_RUNNABLE();
1833   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
1834   return next;
1835 }
1836 #endif
1837
1838 #if defined(GRAN)
1839 inline StgTSO *
1840 unblockOne(StgTSO *tso, StgClosure *node)
1841 {
1842   ACQUIRE_LOCK(&sched_mutex);
1843   tso = unblockOneLocked(tso, node);
1844   RELEASE_LOCK(&sched_mutex);
1845   return tso;
1846 }
1847 #elif defined(PAR)
1848 inline StgTSO *
1849 unblockOne(StgTSO *tso, StgClosure *node)
1850 {
1851   ACQUIRE_LOCK(&sched_mutex);
1852   tso = unblockOneLocked(tso, node);
1853   RELEASE_LOCK(&sched_mutex);
1854   return tso;
1855 }
1856 #else
1857 inline StgTSO *
1858 unblockOne(StgTSO *tso)
1859 {
1860   ACQUIRE_LOCK(&sched_mutex);
1861   tso = unblockOneLocked(tso);
1862   RELEASE_LOCK(&sched_mutex);
1863   return tso;
1864 }
1865 #endif
1866
1867 #if defined(GRAN)
1868 void 
1869 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
1870 {
1871   StgBlockingQueueElement *bqe, *next;
1872   StgTSO *tso;
1873   PEs node_loc, tso_loc;
1874   rtsTime bq_processing_time = 0;
1875   nat len = 0, len_local = 0;
1876
1877   IF_GRAN_DEBUG(bq, 
1878                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
1879                       node, CurrentProc, CurrentTime[CurrentProc], 
1880                       CurrentTSO->id, CurrentTSO));
1881
1882   node_loc = where_is(node);
1883
1884   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
1885          get_itbl(q)->type == CONSTR); // closure (type constructor)
1886   ASSERT(is_unique(node));
1887
1888   /* FAKE FETCH: magically copy the node to the tso's proc;
1889      no Fetch necessary because in reality the node should not have been 
1890      moved to the other PE in the first place
1891   */
1892   if (CurrentProc!=node_loc) {
1893     IF_GRAN_DEBUG(bq, 
1894                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
1895                         node, node_loc, CurrentProc, CurrentTSO->id, 
1896                         // CurrentTSO, where_is(CurrentTSO),
1897                         node->header.gran.procs));
1898     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
1899     IF_GRAN_DEBUG(bq, 
1900                   belch("## new bitmask of node %p is %#x",
1901                         node, node->header.gran.procs));
1902     if (RtsFlags.GranFlags.GranSimStats.Global) {
1903       globalGranStats.tot_fake_fetches++;
1904     }
1905   }
1906
1907   bqe = q;
1908   // ToDo: check: ASSERT(CurrentProc==node_loc);
1909   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
1910     //next = bqe->link;
1911     /* 
1912        bqe points to the current element in the queue
1913        next points to the next element in the queue
1914     */
1915     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
1916     //tso_loc = where_is(tso);
1917     bqe = unblockOneLocked(bqe, node);
1918   }
1919
1920   /* statistics gathering */
1921   /* ToDo: fix counters
1922   if (RtsFlags.GranFlags.GranSimStats.Global) {
1923     globalGranStats.tot_bq_processing_time += bq_processing_time;
1924     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
1925     globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
1926     globalGranStats.tot_awbq++;             // total no. of bqs awakened
1927   }
1928   IF_GRAN_DEBUG(bq,
1929                 fprintf(stderr,"## BQ Stats of %p: [%d entries, %d local] %s\n",
1930                         node, len, len_local, (next!=END_TSO_QUEUE) ? "RBH" : ""));
1931   */
1932 }
1933 #elif defined(PAR)
1934 void 
1935 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
1936 {
1937   StgBlockingQueueElement *bqe, *next;
1938
1939   ACQUIRE_LOCK(&sched_mutex);
1940
1941   IF_PAR_DEBUG(verbose, 
1942                belch("## AwBQ for node %p on [%x]: ",
1943                      node, mytid));
1944
1945   ASSERT(get_itbl(q)->type == TSO ||           
1946          get_itbl(q)->type == BLOCKED_FETCH || 
1947          get_itbl(q)->type == CONSTR); 
1948
1949   bqe = q;
1950   while (get_itbl(bqe)->type==TSO || 
1951          get_itbl(bqe)->type==BLOCKED_FETCH) {
1952     bqe = unblockOneLocked(bqe, node);
1953   }
1954   RELEASE_LOCK(&sched_mutex);
1955 }
1956
1957 #else   /* !GRAN && !PAR */
1958 void
1959 awakenBlockedQueue(StgTSO *tso)
1960 {
1961   ACQUIRE_LOCK(&sched_mutex);
1962   while (tso != END_TSO_QUEUE) {
1963     tso = unblockOneLocked(tso);
1964   }
1965   RELEASE_LOCK(&sched_mutex);
1966 }
1967 #endif
1968
1969 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
1970 //@subsection Exception Handling Routines
1971
1972 /* ---------------------------------------------------------------------------
1973    Interrupt execution
1974    - usually called inside a signal handler so it mustn't do anything fancy.   
1975    ------------------------------------------------------------------------ */
1976
1977 void
1978 interruptStgRts(void)
1979 {
1980     interrupted    = 1;
1981     context_switch = 1;
1982 }
1983
1984 /* -----------------------------------------------------------------------------
1985    Unblock a thread
1986
1987    This is for use when we raise an exception in another thread, which
1988    may be blocked.
1989    This has nothing to do with the UnblockThread event in GranSim. -- HWL
1990    -------------------------------------------------------------------------- */
1991
1992 static void
1993 unblockThread(StgTSO *tso)
1994 {
1995   StgTSO *t, **last;
1996
1997   ACQUIRE_LOCK(&sched_mutex);
1998   switch (tso->why_blocked) {
1999
2000   case NotBlocked:
2001     return;  /* not blocked */
2002
2003   case BlockedOnMVar:
2004     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2005     {
2006       StgTSO *last_tso = END_TSO_QUEUE;
2007       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2008
2009       last = &mvar->head;
2010       for (t = mvar->head; t != END_TSO_QUEUE; 
2011            last = &t->link, last_tso = t, t = t->link) {
2012         if (t == tso) {
2013           *last = tso->link;
2014           if (mvar->tail == tso) {
2015             mvar->tail = last_tso;
2016           }
2017           goto done;
2018         }
2019       }
2020       barf("unblockThread (MVAR): TSO not found");
2021     }
2022
2023   case BlockedOnBlackHole:
2024     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2025     {
2026       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2027
2028       last = &bq->blocking_queue;
2029       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2030            last = &t->link, t = t->link) {
2031         if (t == tso) {
2032           *last = tso->link;
2033           goto done;
2034         }
2035       }
2036       barf("unblockThread (BLACKHOLE): TSO not found");
2037     }
2038
2039   case BlockedOnException:
2040     {
2041       StgTSO *target  = tso->block_info.tso;
2042
2043       ASSERT(get_itbl(target)->type == TSO);
2044       ASSERT(target->blocked_exceptions != NULL);
2045
2046       last = &target->blocked_exceptions;
2047       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2048            last = &t->link, t = t->link) {
2049         ASSERT(get_itbl(t)->type == TSO);
2050         if (t == tso) {
2051           *last = tso->link;
2052           goto done;
2053         }
2054       }
2055       barf("unblockThread (Exception): TSO not found");
2056     }
2057
2058   case BlockedOnDelay:
2059   case BlockedOnRead:
2060   case BlockedOnWrite:
2061     {
2062       StgTSO *prev = NULL;
2063       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2064            prev = t, t = t->link) {
2065         if (t == tso) {
2066           if (prev == NULL) {
2067             blocked_queue_hd = t->link;
2068             if (blocked_queue_tl == t) {
2069               blocked_queue_tl = END_TSO_QUEUE;
2070             }
2071           } else {
2072             prev->link = t->link;
2073             if (blocked_queue_tl == t) {
2074               blocked_queue_tl = prev;
2075             }
2076           }
2077           goto done;
2078         }
2079       }
2080       barf("unblockThread (I/O): TSO not found");
2081     }
2082
2083   default:
2084     barf("unblockThread");
2085   }
2086
2087  done:
2088   tso->link = END_TSO_QUEUE;
2089   tso->why_blocked = NotBlocked;
2090   tso->block_info.closure = NULL;
2091   PUSH_ON_RUN_QUEUE(tso);
2092   RELEASE_LOCK(&sched_mutex);
2093 }
2094
2095 /* -----------------------------------------------------------------------------
2096  * raiseAsync()
2097  *
2098  * The following function implements the magic for raising an
2099  * asynchronous exception in an existing thread.
2100  *
2101  * We first remove the thread from any queue on which it might be
2102  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2103  *
2104  * We strip the stack down to the innermost CATCH_FRAME, building
2105  * thunks in the heap for all the active computations, so they can 
2106  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2107  * an application of the handler to the exception, and push it on
2108  * the top of the stack.
2109  * 
2110  * How exactly do we save all the active computations?  We create an
2111  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2112  * AP_UPDs pushes everything from the corresponding update frame
2113  * upwards onto the stack.  (Actually, it pushes everything up to the
2114  * next update frame plus a pointer to the next AP_UPD object.
2115  * Entering the next AP_UPD object pushes more onto the stack until we
2116  * reach the last AP_UPD object - at which point the stack should look
2117  * exactly as it did when we killed the TSO and we can continue
2118  * execution by entering the closure on top of the stack.
2119  *
2120  * We can also kill a thread entirely - this happens if either (a) the 
2121  * exception passed to raiseAsync is NULL, or (b) there's no
2122  * CATCH_FRAME on the stack.  In either case, we strip the entire
2123  * stack and replace the thread with a zombie.
2124  *
2125  * -------------------------------------------------------------------------- */
2126  
2127 void 
2128 deleteThread(StgTSO *tso)
2129 {
2130   raiseAsync(tso,NULL);
2131 }
2132
2133 void
2134 raiseAsync(StgTSO *tso, StgClosure *exception)
2135 {
2136   StgUpdateFrame* su = tso->su;
2137   StgPtr          sp = tso->sp;
2138   
2139   /* Thread already dead? */
2140   if (tso->whatNext == ThreadComplete || tso->whatNext == ThreadKilled) {
2141     return;
2142   }
2143
2144   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2145
2146   /* Remove it from any blocking queues */
2147   unblockThread(tso);
2148
2149   /* The stack freezing code assumes there's a closure pointer on
2150    * the top of the stack.  This isn't always the case with compiled
2151    * code, so we have to push a dummy closure on the top which just
2152    * returns to the next return address on the stack.
2153    */
2154   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2155     *(--sp) = (W_)&dummy_ret_closure;
2156   }
2157
2158   while (1) {
2159     int words = ((P_)su - (P_)sp) - 1;
2160     nat i;
2161     StgAP_UPD * ap;
2162
2163     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2164      * then build PAP(handler,exception), and leave it on top of
2165      * the stack ready to enter.
2166      */
2167     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2168       StgCatchFrame *cf = (StgCatchFrame *)su;
2169       /* we've got an exception to raise, so let's pass it to the
2170        * handler in this frame.
2171        */
2172       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 1);
2173       TICK_ALLOC_UPD_PAP(2,0);
2174       SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2175               
2176       ap->n_args = 1;
2177       ap->fun = cf->handler;
2178       ap->payload[0] = (P_)exception;
2179
2180       /* sp currently points to the word above the CATCH_FRAME on the stack.
2181        */
2182       sp += sizeofW(StgCatchFrame);
2183       tso->su = cf->link;
2184
2185       /* Restore the blocked/unblocked state for asynchronous exceptions
2186        * at the CATCH_FRAME.  
2187        *
2188        * If exceptions were unblocked at the catch, arrange that they
2189        * are unblocked again after executing the handler by pushing an
2190        * unblockAsyncExceptions_ret stack frame.
2191        */
2192       if (!cf->exceptions_blocked) {
2193         *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2194       }
2195       
2196       /* Ensure that async exceptions are blocked when running the handler.
2197        */
2198       if (tso->blocked_exceptions == NULL) {
2199         tso->blocked_exceptions = END_TSO_QUEUE;
2200       }
2201       
2202       /* Put the newly-built PAP on top of the stack, ready to execute
2203        * when the thread restarts.
2204        */
2205       sp[0] = (W_)ap;
2206       tso->sp = sp;
2207       tso->whatNext = ThreadEnterGHC;
2208       return;
2209     }
2210
2211     /* First build an AP_UPD consisting of the stack chunk above the
2212      * current update frame, with the top word on the stack as the
2213      * fun field.
2214      */
2215     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2216     
2217     ASSERT(words >= 0);
2218     
2219     ap->n_args = words;
2220     ap->fun    = (StgClosure *)sp[0];
2221     sp++;
2222     for(i=0; i < (nat)words; ++i) {
2223       ap->payload[i] = (P_)*sp++;
2224     }
2225     
2226     switch (get_itbl(su)->type) {
2227       
2228     case UPDATE_FRAME:
2229       {
2230         SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2231         TICK_ALLOC_UP_THK(words+1,0);
2232         
2233         IF_DEBUG(scheduler,
2234                  fprintf(stderr,  "scheduler: Updating ");
2235                  printPtr((P_)su->updatee); 
2236                  fprintf(stderr,  " with ");
2237                  printObj((StgClosure *)ap);
2238                  );
2239         
2240         /* Replace the updatee with an indirection - happily
2241          * this will also wake up any threads currently
2242          * waiting on the result.
2243          */
2244         UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2245         su = su->link;
2246         sp += sizeofW(StgUpdateFrame) -1;
2247         sp[0] = (W_)ap; /* push onto stack */
2248         break;
2249       }
2250       
2251     case CATCH_FRAME:
2252       {
2253         StgCatchFrame *cf = (StgCatchFrame *)su;
2254         StgClosure* o;
2255         
2256         /* We want a PAP, not an AP_UPD.  Fortunately, the
2257          * layout's the same.
2258          */
2259         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2260         TICK_ALLOC_UPD_PAP(words+1,0);
2261         
2262         /* now build o = FUN(catch,ap,handler) */
2263         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2264         TICK_ALLOC_FUN(2,0);
2265         SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2266         o->payload[0] = (StgClosure *)ap;
2267         o->payload[1] = cf->handler;
2268         
2269         IF_DEBUG(scheduler,
2270                  fprintf(stderr,  "scheduler: Built ");
2271                  printObj((StgClosure *)o);
2272                  );
2273         
2274         /* pop the old handler and put o on the stack */
2275         su = cf->link;
2276         sp += sizeofW(StgCatchFrame) - 1;
2277         sp[0] = (W_)o;
2278         break;
2279       }
2280       
2281     case SEQ_FRAME:
2282       {
2283         StgSeqFrame *sf = (StgSeqFrame *)su;
2284         StgClosure* o;
2285         
2286         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2287         TICK_ALLOC_UPD_PAP(words+1,0);
2288         
2289         /* now build o = FUN(seq,ap) */
2290         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2291         TICK_ALLOC_SE_THK(1,0);
2292         SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2293         payloadCPtr(o,0) = (StgClosure *)ap;
2294         
2295         IF_DEBUG(scheduler,
2296                  fprintf(stderr,  "scheduler: Built ");
2297                  printObj((StgClosure *)o);
2298                  );
2299         
2300         /* pop the old handler and put o on the stack */
2301         su = sf->link;
2302         sp += sizeofW(StgSeqFrame) - 1;
2303         sp[0] = (W_)o;
2304         break;
2305       }
2306       
2307     case STOP_FRAME:
2308       /* We've stripped the entire stack, the thread is now dead. */
2309       sp += sizeofW(StgStopFrame) - 1;
2310       sp[0] = (W_)exception;    /* save the exception */
2311       tso->whatNext = ThreadKilled;
2312       tso->su = (StgUpdateFrame *)(sp+1);
2313       tso->sp = sp;
2314       return;
2315       
2316     default:
2317       barf("raiseAsync");
2318     }
2319   }
2320   barf("raiseAsync");
2321 }
2322
2323 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2324 //@subsection Debugging Routines
2325
2326 /* -----------------------------------------------------------------------------
2327    Debugging: why is a thread blocked
2328    -------------------------------------------------------------------------- */
2329
2330 #ifdef DEBUG
2331
2332 void printThreadBlockage(StgTSO *tso)
2333 {
2334   switch (tso->why_blocked) {
2335   case BlockedOnRead:
2336     fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2337     break;
2338   case BlockedOnWrite:
2339     fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2340     break;
2341   case BlockedOnDelay:
2342     fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2343     break;
2344   case BlockedOnMVar:
2345     fprintf(stderr,"blocked on an MVar");
2346     break;
2347   case BlockedOnException:
2348     fprintf(stderr,"blocked on delivering an exception to thread %d",
2349             tso->block_info.tso->id);
2350     break;
2351   case BlockedOnBlackHole:
2352     fprintf(stderr,"blocked on a black hole");
2353     break;
2354   case NotBlocked:
2355     fprintf(stderr,"not blocked");
2356     break;
2357 #if defined(PAR)
2358   case BlockedOnGA:
2359     fprintf(stderr,"blocked on global address");
2360     break;
2361 #endif
2362   }
2363 }
2364
2365 /* 
2366    Print a whole blocking queue attached to node (debugging only).
2367 */
2368 //@cindex print_bq
2369 # if defined(PAR)
2370 void 
2371 print_bq (StgClosure *node)
2372 {
2373   StgBlockingQueueElement *bqe;
2374   StgTSO *tso;
2375   rtsBool end;
2376
2377   fprintf(stderr,"## BQ of closure %p (%s): ",
2378           node, info_type(node));
2379
2380   /* should cover all closures that may have a blocking queue */
2381   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2382          get_itbl(node)->type == FETCH_ME_BQ ||
2383          get_itbl(node)->type == RBH);
2384     
2385   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2386   /* 
2387      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2388   */
2389   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2390        !end; // iterate until bqe points to a CONSTR
2391        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2392     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2393     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2394     /* types of closures that may appear in a blocking queue */
2395     ASSERT(get_itbl(bqe)->type == TSO ||           
2396            get_itbl(bqe)->type == BLOCKED_FETCH || 
2397            get_itbl(bqe)->type == CONSTR); 
2398     /* only BQs of an RBH end with an RBH_Save closure */
2399     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2400
2401     switch (get_itbl(bqe)->type) {
2402     case TSO:
2403       fprintf(stderr," TSO %d (%x),",
2404               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
2405       break;
2406     case BLOCKED_FETCH:
2407       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
2408               ((StgBlockedFetch *)bqe)->node, 
2409               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
2410               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
2411               ((StgBlockedFetch *)bqe)->ga.weight);
2412       break;
2413     case CONSTR:
2414       fprintf(stderr," %s (IP %p),",
2415               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2416                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2417                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2418                "RBH_Save_?"), get_itbl(bqe));
2419       break;
2420     default:
2421       barf("Unexpected closure type %s in blocking queue of %p (%s)",
2422            info_type(bqe), node, info_type(node));
2423       break;
2424     }
2425   } /* for */
2426   fputc('\n', stderr);
2427 }
2428 # elif defined(GRAN)
2429 void 
2430 print_bq (StgClosure *node)
2431 {
2432   StgBlockingQueueElement *bqe;
2433   StgTSO *tso;
2434   PEs node_loc, tso_loc;
2435   rtsBool end;
2436
2437   /* should cover all closures that may have a blocking queue */
2438   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2439          get_itbl(node)->type == FETCH_ME_BQ ||
2440          get_itbl(node)->type == RBH);
2441     
2442   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2443   node_loc = where_is(node);
2444
2445   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
2446           node, info_type(node), node_loc);
2447
2448   /* 
2449      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2450   */
2451   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2452        !end; // iterate until bqe points to a CONSTR
2453        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2454     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2455     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2456     /* types of closures that may appear in a blocking queue */
2457     ASSERT(get_itbl(bqe)->type == TSO ||           
2458            get_itbl(bqe)->type == CONSTR); 
2459     /* only BQs of an RBH end with an RBH_Save closure */
2460     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2461
2462     tso_loc = where_is((StgClosure *)bqe);
2463     switch (get_itbl(bqe)->type) {
2464     case TSO:
2465       fprintf(stderr," TSO %d (%x) on [PE %d],",
2466               ((StgTSO *)bqe)->id, ((StgTSO *)bqe), tso_loc);
2467       break;
2468     case CONSTR:
2469       fprintf(stderr," %s (IP %p),",
2470               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2471                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2472                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2473                "RBH_Save_?"), get_itbl(bqe));
2474       break;
2475     default:
2476       barf("Unexpected closure type %s in blocking queue of %p (%s)",
2477            info_type(bqe), node, info_type(node));
2478       break;
2479     }
2480   } /* for */
2481   fputc('\n', stderr);
2482 }
2483 #else
2484 /* 
2485    Nice and easy: only TSOs on the blocking queue
2486 */
2487 void 
2488 print_bq (StgClosure *node)
2489 {
2490   StgTSO *tso;
2491
2492   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2493   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
2494        tso != END_TSO_QUEUE; 
2495        tso=tso->link) {
2496     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
2497     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
2498     fprintf(stderr," TSO %d (%p),", tso->id, tso);
2499   }
2500   fputc('\n', stderr);
2501 }
2502 # endif
2503
2504 #if defined(PAR)
2505 static nat
2506 run_queue_len(void)
2507 {
2508   nat i;
2509   StgTSO *tso;
2510
2511   for (i=0, tso=run_queue_hd; 
2512        tso != END_TSO_QUEUE;
2513        i++, tso=tso->link)
2514     /* nothing */
2515
2516   return i;
2517 }
2518 #endif
2519
2520 static void
2521 sched_belch(char *s, ...)
2522 {
2523   va_list ap;
2524   va_start(ap,s);
2525 #ifdef SMP
2526   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
2527 #else
2528   fprintf(stderr, "scheduler: ");
2529 #endif
2530   vfprintf(stderr, s, ap);
2531   fprintf(stderr, "\n");
2532 }
2533
2534 #endif /* DEBUG */
2535
2536 //@node Index,  , Debugging Routines, Main scheduling code
2537 //@subsection Index
2538
2539 //@index
2540 //* MainRegTable::  @cindex\s-+MainRegTable
2541 //* StgMainThread::  @cindex\s-+StgMainThread
2542 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
2543 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
2544 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
2545 //* context_switch::  @cindex\s-+context_switch
2546 //* createThread::  @cindex\s-+createThread
2547 //* free_capabilities::  @cindex\s-+free_capabilities
2548 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
2549 //* initScheduler::  @cindex\s-+initScheduler
2550 //* interrupted::  @cindex\s-+interrupted
2551 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
2552 //* next_thread_id::  @cindex\s-+next_thread_id
2553 //* print_bq::  @cindex\s-+print_bq
2554 //* run_queue_hd::  @cindex\s-+run_queue_hd
2555 //* run_queue_tl::  @cindex\s-+run_queue_tl
2556 //* sched_mutex::  @cindex\s-+sched_mutex
2557 //* schedule::  @cindex\s-+schedule
2558 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
2559 //* task_ids::  @cindex\s-+task_ids
2560 //* term_mutex::  @cindex\s-+term_mutex
2561 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
2562 //@end index