[project @ 2000-03-14 09:55:05 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.52 2000/03/14 09:55:05 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Prototypes::                
41 //* Main scheduling loop::      
42 //* Suspend and Resume::        
43 //* Run queue code::            
44 //* Garbage Collextion Routines::  
45 //* Blocking Queue Routines::   
46 //* Exception Handling Routines::  
47 //* Debugging Routines::        
48 //* Index::                     
49 //@end menu
50
51 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
52 //@subsection Includes
53
54 #include "Rts.h"
55 #include "SchedAPI.h"
56 #include "RtsUtils.h"
57 #include "RtsFlags.h"
58 #include "Storage.h"
59 #include "StgRun.h"
60 #include "StgStartup.h"
61 #include "GC.h"
62 #include "Hooks.h"
63 #include "Schedule.h"
64 #include "StgMiscClosures.h"
65 #include "Storage.h"
66 #include "Evaluator.h"
67 #include "Exception.h"
68 #include "Printer.h"
69 #include "Main.h"
70 #include "Signals.h"
71 #include "Profiling.h"
72 #include "Sanity.h"
73 #include "Stats.h"
74 #include "Sparks.h"
75 #include "Prelude.h"
76 #if defined(GRAN) || defined(PAR)
77 # include "GranSimRts.h"
78 # include "GranSim.h"
79 # include "ParallelRts.h"
80 # include "Parallel.h"
81 # include "ParallelDebug.h"
82 # include "FetchMe.h"
83 # include "HLC.h"
84 #endif
85
86 #include <stdarg.h>
87
88 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
89 //@subsection Variables and Data structures
90
91 /* Main threads:
92  *
93  * These are the threads which clients have requested that we run.  
94  *
95  * In an SMP build, we might have several concurrent clients all
96  * waiting for results, and each one will wait on a condition variable
97  * until the result is available.
98  *
99  * In non-SMP, clients are strictly nested: the first client calls
100  * into the RTS, which might call out again to C with a _ccall_GC, and
101  * eventually re-enter the RTS.
102  *
103  * Main threads information is kept in a linked list:
104  */
105 //@cindex StgMainThread
106 typedef struct StgMainThread_ {
107   StgTSO *         tso;
108   SchedulerStatus  stat;
109   StgClosure **    ret;
110 #ifdef SMP
111   pthread_cond_t wakeup;
112 #endif
113   struct StgMainThread_ *link;
114 } StgMainThread;
115
116 /* Main thread queue.
117  * Locks required: sched_mutex.
118  */
119 static StgMainThread *main_threads;
120
121 /* Thread queues.
122  * Locks required: sched_mutex.
123  */
124
125 #if defined(GRAN)
126
127 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
128 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
129
130 /* 
131    In GranSim we have a runable and a blocked queue for each processor.
132    In order to minimise code changes new arrays run_queue_hds/tls
133    are created. run_queue_hd is then a short cut (macro) for
134    run_queue_hds[CurrentProc] (see GranSim.h).
135    -- HWL
136 */
137 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
138 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
139 StgTSO *ccalling_threadss[MAX_PROC];
140
141 #else /* !GRAN */
142
143 //@cindex run_queue_hd
144 //@cindex run_queue_tl
145 //@cindex blocked_queue_hd
146 //@cindex blocked_queue_tl
147 StgTSO *run_queue_hd, *run_queue_tl;
148 StgTSO *blocked_queue_hd, *blocked_queue_tl;
149
150 /* Threads suspended in _ccall_GC.
151  * Locks required: sched_mutex.
152  */
153 static StgTSO *suspended_ccalling_threads;
154
155 static void GetRoots(void);
156 static StgTSO *threadStackOverflow(StgTSO *tso);
157 #endif
158
159 /* KH: The following two flags are shared memory locations.  There is no need
160        to lock them, since they are only unset at the end of a scheduler
161        operation.
162 */
163
164 /* flag set by signal handler to precipitate a context switch */
165 //@cindex context_switch
166 nat context_switch;
167
168 /* if this flag is set as well, give up execution */
169 //@cindex interrupted
170 rtsBool interrupted;
171
172 /* Next thread ID to allocate.
173  * Locks required: sched_mutex
174  */
175 //@cindex next_thread_id
176 StgThreadID next_thread_id = 1;
177
178 /*
179  * Pointers to the state of the current thread.
180  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
181  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
182  */
183  
184 /* The smallest stack size that makes any sense is:
185  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
186  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
187  *  + 1                       (the realworld token for an IO thread)
188  *  + 1                       (the closure to enter)
189  *
190  * A thread with this stack will bomb immediately with a stack
191  * overflow, which will increase its stack size.  
192  */
193
194 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
195
196 /* Free capability list.
197  * Locks required: sched_mutex.
198  */
199 #ifdef SMP
200 //@cindex free_capabilities
201 //@cindex n_free_capabilities
202 Capability *free_capabilities; /* Available capabilities for running threads */
203 nat n_free_capabilities;       /* total number of available capabilities */
204 #else
205 //@cindex MainRegTable
206 Capability MainRegTable;       /* for non-SMP, we have one global capability */
207 #endif
208
209 #if defined(GRAN)
210 StgTSO      *CurrentTSOs[MAX_PROC];
211 #else
212 StgTSO      *CurrentTSO;
213 #endif
214
215 rtsBool ready_to_gc;
216
217 /* All our current task ids, saved in case we need to kill them later.
218  */
219 #ifdef SMP
220 //@cindex task_ids
221 task_info *task_ids;
222 #endif
223
224 void            addToBlockedQueue ( StgTSO *tso );
225
226 static void     schedule          ( void );
227        void     interruptStgRts   ( void );
228 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
229
230 #ifdef DEBUG
231 static void sched_belch(char *s, ...);
232 #endif
233
234 #ifdef SMP
235 //@cindex sched_mutex
236 //@cindex term_mutex
237 //@cindex thread_ready_cond
238 //@cindex gc_pending_cond
239 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
240 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
241 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
242 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
243
244 nat await_death;
245 #endif
246
247 #if defined(PAR)
248 StgTSO *LastTSO;
249 rtsTime TimeOfLastYield;
250 #endif
251
252 /*
253  * The thread state for the main thread.
254 // ToDo: check whether not needed any more
255 StgTSO   *MainTSO;
256  */
257
258
259 //@node Prototypes, Main scheduling loop, Variables and Data structures, Main scheduling code
260 //@subsection Prototypes
261
262 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
263 //@subsection Main scheduling loop
264
265 /* ---------------------------------------------------------------------------
266    Main scheduling loop.
267
268    We use round-robin scheduling, each thread returning to the
269    scheduler loop when one of these conditions is detected:
270
271       * out of heap space
272       * timer expires (thread yields)
273       * thread blocks
274       * thread ends
275       * stack overflow
276
277    Locking notes:  we acquire the scheduler lock once at the beginning
278    of the scheduler loop, and release it when
279     
280       * running a thread, or
281       * waiting for work, or
282       * waiting for a GC to complete.
283
284    ------------------------------------------------------------------------ */
285 //@cindex schedule
286 static void
287 schedule( void )
288 {
289   StgTSO *t;
290   Capability *cap;
291   StgThreadReturnCode ret;
292 #if defined(GRAN)
293   rtsEvent *event;
294 #elif defined(PAR)
295   rtsSpark spark;
296   StgTSO *tso;
297   GlobalTaskId pe;
298 #endif
299   rtsBool was_interrupted = rtsFalse;
300   
301   ACQUIRE_LOCK(&sched_mutex);
302
303 #if defined(GRAN)
304 # error ToDo: implement GranSim scheduler
305 #elif defined(PAR)
306   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
307
308     if (PendingFetches != END_BF_QUEUE) {
309         processFetches();
310     }
311 #else
312   while (1) {
313 #endif
314
315     /* If we're interrupted (the user pressed ^C, or some other
316      * termination condition occurred), kill all the currently running
317      * threads.
318      */
319     if (interrupted) {
320       IF_DEBUG(scheduler, sched_belch("interrupted"));
321       for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
322         deleteThread(t);
323       }
324       for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
325         deleteThread(t);
326       }
327       run_queue_hd = run_queue_tl = END_TSO_QUEUE;
328       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
329       interrupted = rtsFalse;
330       was_interrupted = rtsTrue;
331     }
332
333     /* Go through the list of main threads and wake up any
334      * clients whose computations have finished.  ToDo: this
335      * should be done more efficiently without a linear scan
336      * of the main threads list, somehow...
337      */
338 #ifdef SMP
339     { 
340       StgMainThread *m, **prev;
341       prev = &main_threads;
342       for (m = main_threads; m != NULL; m = m->link) {
343         switch (m->tso->whatNext) {
344         case ThreadComplete:
345           if (m->ret) {
346             *(m->ret) = (StgClosure *)m->tso->sp[0];
347           }
348           *prev = m->link;
349           m->stat = Success;
350           pthread_cond_broadcast(&m->wakeup);
351           break;
352         case ThreadKilled:
353           *prev = m->link;
354           if (was_interrupted) {
355             m->stat = Interrupted;
356           } else {
357             m->stat = Killed;
358           }
359           pthread_cond_broadcast(&m->wakeup);
360           break;
361         default:
362           break;
363         }
364       }
365     }
366 #else
367     /* If our main thread has finished or been killed, return.
368      */
369     {
370       StgMainThread *m = main_threads;
371       if (m->tso->whatNext == ThreadComplete
372           || m->tso->whatNext == ThreadKilled) {
373         main_threads = main_threads->link;
374         if (m->tso->whatNext == ThreadComplete) {
375           /* we finished successfully, fill in the return value */
376           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
377           m->stat = Success;
378           return;
379         } else {
380           if (was_interrupted) {
381             m->stat = Interrupted;
382           } else {
383             m->stat = Killed;
384           }
385           return;
386         }
387       }
388     }
389 #endif
390
391     /* Top up the run queue from our spark pool.  We try to make the
392      * number of threads in the run queue equal to the number of
393      * free capabilities.
394      */
395 #if defined(SMP)
396     {
397       nat n = n_free_capabilities;
398       StgTSO *tso = run_queue_hd;
399
400       /* Count the run queue */
401       while (n > 0 && tso != END_TSO_QUEUE) {
402         tso = tso->link;
403         n--;
404       }
405
406       for (; n > 0; n--) {
407         StgClosure *spark;
408         spark = findSpark();
409         if (spark == NULL) {
410           break; /* no more sparks in the pool */
411         } else {
412           /* I'd prefer this to be done in activateSpark -- HWL */
413           /* tricky - it needs to hold the scheduler lock and
414            * not try to re-acquire it -- SDM */
415           StgTSO *tso;
416           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
417           pushClosure(tso,spark);
418           PUSH_ON_RUN_QUEUE(tso);
419 #ifdef PAR
420           advisory_thread_count++;
421 #endif
422           
423           IF_DEBUG(scheduler,
424                    sched_belch("turning spark of closure %p into a thread",
425                                (StgClosure *)spark));
426         }
427       }
428       /* We need to wake up the other tasks if we just created some
429        * work for them.
430        */
431       if (n_free_capabilities - n > 1) {
432           pthread_cond_signal(&thread_ready_cond);
433       }
434     }
435 #endif /* SMP */
436
437     /* Check whether any waiting threads need to be woken up.  If the
438      * run queue is empty, and there are no other tasks running, we
439      * can wait indefinitely for something to happen.
440      * ToDo: what if another client comes along & requests another
441      * main thread?
442      */
443     if (blocked_queue_hd != END_TSO_QUEUE) {
444       awaitEvent(
445            (run_queue_hd == END_TSO_QUEUE)
446 #ifdef SMP
447         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
448 #endif
449         );
450     }
451     
452     /* check for signals each time around the scheduler */
453 #ifndef __MINGW32__
454     if (signals_pending()) {
455       start_signal_handlers();
456     }
457 #endif
458
459     /* Detect deadlock: when we have no threads to run, there are
460      * no threads waiting on I/O or sleeping, and all the other
461      * tasks are waiting for work, we must have a deadlock.  Inform
462      * all the main threads.
463      */
464 #ifdef SMP
465     if (blocked_queue_hd == END_TSO_QUEUE
466         && run_queue_hd == END_TSO_QUEUE
467         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
468         ) {
469       StgMainThread *m;
470       for (m = main_threads; m != NULL; m = m->link) {
471           m->ret = NULL;
472           m->stat = Deadlock;
473           pthread_cond_broadcast(&m->wakeup);
474       }
475       main_threads = NULL;
476     }
477 #else /* ! SMP */
478     if (blocked_queue_hd == END_TSO_QUEUE
479         && run_queue_hd == END_TSO_QUEUE) {
480       StgMainThread *m = main_threads;
481       m->ret = NULL;
482       m->stat = Deadlock;
483       main_threads = m->link;
484       return;
485     }
486 #endif
487
488 #ifdef SMP
489     /* If there's a GC pending, don't do anything until it has
490      * completed.
491      */
492     if (ready_to_gc) {
493       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
494       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
495     }
496     
497     /* block until we've got a thread on the run queue and a free
498      * capability.
499      */
500     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
501       IF_DEBUG(scheduler, sched_belch("waiting for work"));
502       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
503       IF_DEBUG(scheduler, sched_belch("work now available"));
504     }
505 #endif
506
507 #if defined(GRAN)
508 # error ToDo: implement GranSim scheduler
509 #elif defined(PAR)
510     /* ToDo: phps merge with spark activation above */
511     /* check whether we have local work and send requests if we have none */
512     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
513       /* :-[  no local threads => look out for local sparks */
514       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
515           (pending_sparks_hd[REQUIRED_POOL] < pending_sparks_tl[REQUIRED_POOL] ||
516            pending_sparks_hd[ADVISORY_POOL] < pending_sparks_tl[ADVISORY_POOL])) {
517         /* 
518          * ToDo: add GC code check that we really have enough heap afterwards!!
519          * Old comment:
520          * If we're here (no runnable threads) and we have pending
521          * sparks, we must have a space problem.  Get enough space
522          * to turn one of those pending sparks into a
523          * thread... 
524          */
525         
526         spark = findSpark();                /* get a spark */
527         if (spark != (rtsSpark) NULL) {
528           tso = activateSpark(spark);       /* turn the spark into a thread */
529           IF_PAR_DEBUG(verbose,
530                        belch("== [%x] schedule: Created TSO %p (%d); %d threads active",
531                              mytid, tso, tso->id, advisory_thread_count));
532
533           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
534             belch("^^ failed to activate spark");
535             goto next_thread;
536           }               /* otherwise fall through & pick-up new tso */
537         } else {
538           IF_PAR_DEBUG(verbose,
539                        belch("^^ no local sparks (spark pool contains only NFs: %d)", 
540                              spark_queue_len(ADVISORY_POOL)));
541           goto next_thread;
542         }
543       } else  
544       /* =8-[  no local sparks => look for work on other PEs */
545       {
546         /*
547          * We really have absolutely no work.  Send out a fish
548          * (there may be some out there already), and wait for
549          * something to arrive.  We clearly can't run any threads
550          * until a SCHEDULE or RESUME arrives, and so that's what
551          * we're hoping to see.  (Of course, we still have to
552          * respond to other types of messages.)
553          */
554         if (//!fishing &&  
555             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
556           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
557           /* fishing set in sendFish, processFish;
558              avoid flooding system with fishes via delay */
559           pe = choosePE();
560           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
561                    NEW_FISH_HUNGER);
562         }
563         
564         processMessages();
565         goto next_thread;
566         // ReSchedule(0);
567       }
568     } else if (PacketsWaiting()) {  /* Look for incoming messages */
569       processMessages();
570     }
571
572     /* Now we are sure that we have some work available */
573     ASSERT(run_queue_hd != END_TSO_QUEUE);
574     /* Take a thread from the run queue, if we have work */
575     t = take_off_run_queue(END_TSO_QUEUE);
576
577     /* ToDo: write something to the log-file
578     if (RTSflags.ParFlags.granSimStats && !sameThread)
579         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
580     */
581
582     CurrentTSO = t;
583
584     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; lim=%x)", 
585                               spark_queue_len(ADVISORY_POOL), CURRENT_PROC,
586                               pending_sparks_hd[ADVISORY_POOL], 
587                               pending_sparks_tl[ADVISORY_POOL], 
588                               pending_sparks_lim[ADVISORY_POOL]));
589
590     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
591                               run_queue_len(), CURRENT_PROC,
592                               run_queue_hd, run_queue_tl));
593
594     if (t != LastTSO) {
595       /* 
596          we are running a different TSO, so write a schedule event to log file
597          NB: If we use fair scheduling we also have to write  a deschedule 
598              event for LastTSO; with unfair scheduling we know that the
599              previous tso has blocked whenever we switch to another tso, so
600              we don't need it in GUM for now
601       */
602       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
603                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
604       
605     }
606
607 #else /* !GRAN && !PAR */
608   
609     /* grab a thread from the run queue
610      */
611     t = POP_RUN_QUEUE();
612     IF_DEBUG(sanity,checkTSO(t));
613
614 #endif
615     
616     /* grab a capability
617      */
618 #ifdef SMP
619     cap = free_capabilities;
620     free_capabilities = cap->link;
621     n_free_capabilities--;
622 #else
623     cap = &MainRegTable;
624 #endif
625     
626     cap->rCurrentTSO = t;
627     
628     /* set the context_switch flag
629      */
630     if (run_queue_hd == END_TSO_QUEUE)
631       context_switch = 0;
632     else
633       context_switch = 1;
634
635     RELEASE_LOCK(&sched_mutex);
636     
637     IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
638
639     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
640     /* Run the current thread 
641      */
642     switch (cap->rCurrentTSO->whatNext) {
643     case ThreadKilled:
644     case ThreadComplete:
645       /* Thread already finished, return to scheduler. */
646       ret = ThreadFinished;
647       break;
648     case ThreadEnterGHC:
649       ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
650       break;
651     case ThreadRunGHC:
652       ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
653       break;
654     case ThreadEnterHugs:
655 #ifdef INTERPRETER
656       {
657          StgClosure* c;
658          IF_DEBUG(scheduler,sched_belch("entering Hugs"));
659          c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
660          cap->rCurrentTSO->sp += 1;
661          ret = enter(cap,c);
662          break;
663       }
664 #else
665       barf("Panic: entered a BCO but no bytecode interpreter in this build");
666 #endif
667     default:
668       barf("schedule: invalid whatNext field");
669     }
670     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
671     
672     /* Costs for the scheduler are assigned to CCS_SYSTEM */
673 #ifdef PROFILING
674     CCCS = CCS_SYSTEM;
675 #endif
676     
677     ACQUIRE_LOCK(&sched_mutex);
678
679 #ifdef SMP
680     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
681 #else
682     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
683 #endif
684     t = cap->rCurrentTSO;
685     
686     switch (ret) {
687     case HeapOverflow:
688       /* make all the running tasks block on a condition variable,
689        * maybe set context_switch and wait till they all pile in,
690        * then have them wait on a GC condition variable.
691        */
692       IF_DEBUG(scheduler,belch("thread %ld stopped: HeapOverflow", t->id));
693       threadPaused(t);
694       
695       ready_to_gc = rtsTrue;
696       context_switch = 1;               /* stop other threads ASAP */
697       PUSH_ON_RUN_QUEUE(t);
698       break;
699       
700     case StackOverflow:
701       /* just adjust the stack for this thread, then pop it back
702        * on the run queue.
703        */
704       IF_DEBUG(scheduler,belch("thread %ld stopped, StackOverflow", t->id));
705       threadPaused(t);
706       { 
707         StgMainThread *m;
708         /* enlarge the stack */
709         StgTSO *new_t = threadStackOverflow(t);
710         
711         /* This TSO has moved, so update any pointers to it from the
712          * main thread stack.  It better not be on any other queues...
713          * (it shouldn't be).
714          */
715         for (m = main_threads; m != NULL; m = m->link) {
716           if (m->tso == t) {
717             m->tso = new_t;
718           }
719         }
720         threadPaused(new_t);
721         PUSH_ON_RUN_QUEUE(new_t);
722       }
723       break;
724
725     case ThreadYielding:
726 #if defined(GRAN)
727       IF_DEBUG(gran, 
728                DumpGranEvent(GR_DESCHEDULE, t));
729       globalGranStats.tot_yields++;
730 #elif defined(PAR)
731       IF_DEBUG(par, 
732                DumpGranEvent(GR_DESCHEDULE, t));
733 #endif
734       /* put the thread back on the run queue.  Then, if we're ready to
735        * GC, check whether this is the last task to stop.  If so, wake
736        * up the GC thread.  getThread will block during a GC until the
737        * GC is finished.
738        */
739       IF_DEBUG(scheduler,
740                if (t->whatNext == ThreadEnterHugs) {
741                  /* ToDo: or maybe a timer expired when we were in Hugs?
742                   * or maybe someone hit ctrl-C
743                   */
744                  belch("thread %ld stopped to switch to Hugs", t->id);
745                } else {
746                  belch("thread %ld stopped, yielding", t->id);
747                }
748                );
749       threadPaused(t);
750       APPEND_TO_RUN_QUEUE(t);
751       break;
752       
753     case ThreadBlocked:
754 #if defined(GRAN)
755 # error ToDo: implement GranSim scheduler
756 #elif defined(PAR)
757       IF_DEBUG(par, 
758                DumpGranEvent(GR_DESCHEDULE, t)); 
759 #else
760 #endif
761       /* don't need to do anything.  Either the thread is blocked on
762        * I/O, in which case we'll have called addToBlockedQueue
763        * previously, or it's blocked on an MVar or Blackhole, in which
764        * case it'll be on the relevant queue already.
765        */
766       IF_DEBUG(scheduler,
767                fprintf(stderr, "thread %d stopped, ", t->id);
768                printThreadBlockage(t);
769                fprintf(stderr, "\n"));
770       threadPaused(t);
771       break;
772       
773     case ThreadFinished:
774       /* Need to check whether this was a main thread, and if so, signal
775        * the task that started it with the return value.  If we have no
776        * more main threads, we probably need to stop all the tasks until
777        * we get a new one.
778        */
779       IF_DEBUG(scheduler,belch("thread %ld finished", t->id));
780       t->whatNext = ThreadComplete;
781 #if defined(GRAN)
782       // ToDo: endThread(t, CurrentProc); // clean-up the thread
783 #elif defined(PAR)
784       advisory_thread_count--;
785       if (RtsFlags.ParFlags.ParStats.Full) 
786         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
787 #endif
788       break;
789       
790     default:
791       barf("doneThread: invalid thread return code");
792     }
793     
794 #ifdef SMP
795     cap->link = free_capabilities;
796     free_capabilities = cap;
797     n_free_capabilities++;
798 #endif
799
800 #ifdef SMP
801     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
802 #else
803     if (ready_to_gc) 
804 #endif
805       {
806       /* everybody back, start the GC.
807        * Could do it in this thread, or signal a condition var
808        * to do it in another thread.  Either way, we need to
809        * broadcast on gc_pending_cond afterward.
810        */
811 #ifdef SMP
812       IF_DEBUG(scheduler,sched_belch("doing GC"));
813 #endif
814       GarbageCollect(GetRoots);
815       ready_to_gc = rtsFalse;
816 #ifdef SMP
817       pthread_cond_broadcast(&gc_pending_cond);
818 #endif
819     }
820 #if defined(GRAN)
821   next_thread:
822     IF_GRAN_DEBUG(unused,
823                   print_eventq(EventHd));
824
825     event = get_next_event();
826
827 #elif defined(PAR)
828   next_thread:
829     /* ToDo: wait for next message to arrive rather than busy wait */
830
831 #else /* GRAN */
832   /* not any more
833   next_thread:
834     t = take_off_run_queue(END_TSO_QUEUE);
835   */
836 #endif /* GRAN */
837   } /* end of while(1) */
838 }
839
840 /* A hack for Hugs concurrency support.  Needs sanitisation (?) */
841 void deleteAllThreads ( void )
842 {
843   StgTSO* t;
844   IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
845   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
846     deleteThread(t);
847   }
848   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
849     deleteThread(t);
850   }
851   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
852   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
853 }
854
855 /* startThread and  insertThread are now in GranSim.c -- HWL */
856
857 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
858 //@subsection Suspend and Resume
859
860 /* ---------------------------------------------------------------------------
861  * Suspending & resuming Haskell threads.
862  * 
863  * When making a "safe" call to C (aka _ccall_GC), the task gives back
864  * its capability before calling the C function.  This allows another
865  * task to pick up the capability and carry on running Haskell
866  * threads.  It also means that if the C call blocks, it won't lock
867  * the whole system.
868  *
869  * The Haskell thread making the C call is put to sleep for the
870  * duration of the call, on the susepended_ccalling_threads queue.  We
871  * give out a token to the task, which it can use to resume the thread
872  * on return from the C function.
873  * ------------------------------------------------------------------------- */
874    
875 StgInt
876 suspendThread( Capability *cap )
877 {
878   nat tok;
879
880   ACQUIRE_LOCK(&sched_mutex);
881
882   IF_DEBUG(scheduler,
883            sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
884
885   threadPaused(cap->rCurrentTSO);
886   cap->rCurrentTSO->link = suspended_ccalling_threads;
887   suspended_ccalling_threads = cap->rCurrentTSO;
888
889   /* Use the thread ID as the token; it should be unique */
890   tok = cap->rCurrentTSO->id;
891
892 #ifdef SMP
893   cap->link = free_capabilities;
894   free_capabilities = cap;
895   n_free_capabilities++;
896 #endif
897
898   RELEASE_LOCK(&sched_mutex);
899   return tok; 
900 }
901
902 Capability *
903 resumeThread( StgInt tok )
904 {
905   StgTSO *tso, **prev;
906   Capability *cap;
907
908   ACQUIRE_LOCK(&sched_mutex);
909
910   prev = &suspended_ccalling_threads;
911   for (tso = suspended_ccalling_threads; 
912        tso != END_TSO_QUEUE; 
913        prev = &tso->link, tso = tso->link) {
914     if (tso->id == (StgThreadID)tok) {
915       *prev = tso->link;
916       break;
917     }
918   }
919   if (tso == END_TSO_QUEUE) {
920     barf("resumeThread: thread not found");
921   }
922
923 #ifdef SMP
924   while (free_capabilities == NULL) {
925     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
926     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
927     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
928   }
929   cap = free_capabilities;
930   free_capabilities = cap->link;
931   n_free_capabilities--;
932 #else  
933   cap = &MainRegTable;
934 #endif
935
936   cap->rCurrentTSO = tso;
937
938   RELEASE_LOCK(&sched_mutex);
939   return cap;
940 }
941
942
943 /* ---------------------------------------------------------------------------
944  * Static functions
945  * ------------------------------------------------------------------------ */
946 static void unblockThread(StgTSO *tso);
947
948 /* ---------------------------------------------------------------------------
949  * Comparing Thread ids.
950  *
951  * This is used from STG land in the implementation of the
952  * instances of Eq/Ord for ThreadIds.
953  * ------------------------------------------------------------------------ */
954
955 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
956
957   StgThreadID id1 = tso1->id; 
958   StgThreadID id2 = tso2->id;
959  
960   if (id1 < id2) return (-1);
961   if (id1 > id2) return 1;
962   return 0;
963 }
964
965 /* ---------------------------------------------------------------------------
966    Create a new thread.
967
968    The new thread starts with the given stack size.  Before the
969    scheduler can run, however, this thread needs to have a closure
970    (and possibly some arguments) pushed on its stack.  See
971    pushClosure() in Schedule.h.
972
973    createGenThread() and createIOThread() (in SchedAPI.h) are
974    convenient packaged versions of this function.
975    ------------------------------------------------------------------------ */
976 //@cindex createThread
977 #if defined(GRAN)
978 /* currently pri (priority) is only used in a GRAN setup -- HWL */
979 StgTSO *
980 createThread(nat stack_size, StgInt pri)
981 {
982   return createThread_(stack_size, rtsFalse, pri);
983 }
984
985 static StgTSO *
986 createThread_(nat size, rtsBool have_lock, StgInt pri)
987 {
988 #else
989 StgTSO *
990 createThread(nat stack_size)
991 {
992   return createThread_(stack_size, rtsFalse);
993 }
994
995 static StgTSO *
996 createThread_(nat size, rtsBool have_lock)
997 {
998 #endif
999     StgTSO *tso;
1000     nat stack_size;
1001
1002     /* First check whether we should create a thread at all */
1003 #if defined(PAR)
1004   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1005   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1006     threadsIgnored++;
1007     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1008           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1009     return END_TSO_QUEUE;
1010   }
1011   threadsCreated++;
1012 #endif
1013
1014 #if defined(GRAN)
1015   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1016 #endif
1017
1018   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1019
1020   /* catch ridiculously small stack sizes */
1021   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1022     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1023   }
1024
1025   tso = (StgTSO *)allocate(size);
1026   TICK_ALLOC_TSO(size-sizeofW(StgTSO),0);
1027   
1028   stack_size = size - TSO_STRUCT_SIZEW;
1029
1030   // Hmm, this CCS_MAIN is not protected by a PROFILING cpp var;
1031   SET_HDR(tso, &TSO_info, CCS_MAIN);
1032 #if defined(GRAN)
1033   SET_GRAN_HDR(tso, ThisPE);
1034 #endif
1035   tso->whatNext     = ThreadEnterGHC;
1036
1037   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1038          protect the increment operation on next_thread_id.
1039          In future, we could use an atomic increment instead.
1040   */
1041   
1042   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1043   tso->id = next_thread_id++; 
1044   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1045
1046   tso->why_blocked  = NotBlocked;
1047   tso->blocked_exceptions = NULL;
1048
1049   tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1050   tso->stack_size   = stack_size;
1051   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1052                               - TSO_STRUCT_SIZEW;
1053   tso->sp           = (P_)&(tso->stack) + stack_size;
1054
1055 #ifdef PROFILING
1056   tso->prof.CCCS = CCS_MAIN;
1057 #endif
1058
1059   /* put a stop frame on the stack */
1060   tso->sp -= sizeofW(StgStopFrame);
1061   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
1062   tso->su = (StgUpdateFrame*)tso->sp;
1063
1064   IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words", 
1065                            tso->id, tso, tso->stack_size));
1066
1067   // ToDo: check this
1068 #if defined(GRAN)
1069   tso->link = END_TSO_QUEUE;
1070   /* uses more flexible routine in GranSim */
1071   insertThread(tso, CurrentProc);
1072 #else
1073   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1074      from its creation
1075   */
1076 #endif
1077
1078 #if defined(GRAN)
1079   tso->gran.pri = pri;
1080   tso->gran.magic = TSO_MAGIC; // debugging only
1081   tso->gran.sparkname   = 0;
1082   tso->gran.startedat   = CURRENT_TIME; 
1083   tso->gran.exported    = 0;
1084   tso->gran.basicblocks = 0;
1085   tso->gran.allocs      = 0;
1086   tso->gran.exectime    = 0;
1087   tso->gran.fetchtime   = 0;
1088   tso->gran.fetchcount  = 0;
1089   tso->gran.blocktime   = 0;
1090   tso->gran.blockcount  = 0;
1091   tso->gran.blockedat   = 0;
1092   tso->gran.globalsparks = 0;
1093   tso->gran.localsparks  = 0;
1094   if (RtsFlags.GranFlags.Light)
1095     tso->gran.clock  = Now; /* local clock */
1096   else
1097     tso->gran.clock  = 0;
1098
1099   IF_DEBUG(gran,printTSO(tso));
1100 #elif defined(PAR)
1101   tso->par.sparkname   = 0;
1102   tso->par.startedat   = CURRENT_TIME; 
1103   tso->par.exported    = 0;
1104   tso->par.basicblocks = 0;
1105   tso->par.allocs      = 0;
1106   tso->par.exectime    = 0;
1107   tso->par.fetchtime   = 0;
1108   tso->par.fetchcount  = 0;
1109   tso->par.blocktime   = 0;
1110   tso->par.blockcount  = 0;
1111   tso->par.blockedat   = 0;
1112   tso->par.globalsparks = 0;
1113   tso->par.localsparks  = 0;
1114 #endif
1115
1116 #if defined(GRAN)
1117   globalGranStats.tot_threads_created++;
1118   globalGranStats.threads_created_on_PE[CurrentProc]++;
1119   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1120   globalGranStats.tot_sq_probes++;
1121 #endif 
1122
1123   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1124                                  tso->id, tso->stack_size));
1125   return tso;
1126 }
1127
1128 /* ---------------------------------------------------------------------------
1129  * scheduleThread()
1130  *
1131  * scheduleThread puts a thread on the head of the runnable queue.
1132  * This will usually be done immediately after a thread is created.
1133  * The caller of scheduleThread must create the thread using e.g.
1134  * createThread and push an appropriate closure
1135  * on this thread's stack before the scheduler is invoked.
1136  * ------------------------------------------------------------------------ */
1137
1138 void
1139 scheduleThread(StgTSO *tso)
1140 {
1141   ACQUIRE_LOCK(&sched_mutex);
1142
1143   /* Put the new thread on the head of the runnable queue.  The caller
1144    * better push an appropriate closure on this thread's stack
1145    * beforehand.  In the SMP case, the thread may start running as
1146    * soon as we release the scheduler lock below.
1147    */
1148   PUSH_ON_RUN_QUEUE(tso);
1149   THREAD_RUNNABLE();
1150
1151   IF_DEBUG(scheduler,printTSO(tso));
1152   RELEASE_LOCK(&sched_mutex);
1153 }
1154
1155 /* ---------------------------------------------------------------------------
1156  * startTasks()
1157  *
1158  * Start up Posix threads to run each of the scheduler tasks.
1159  * I believe the task ids are not needed in the system as defined.
1160  *  KH @ 25/10/99
1161  * ------------------------------------------------------------------------ */
1162
1163 #ifdef SMP
1164 static void *
1165 taskStart( void *arg STG_UNUSED )
1166 {
1167   schedule();
1168   return NULL;
1169 }
1170 #endif
1171
1172 /* ---------------------------------------------------------------------------
1173  * initScheduler()
1174  *
1175  * Initialise the scheduler.  This resets all the queues - if the
1176  * queues contained any threads, they'll be garbage collected at the
1177  * next pass.
1178  *
1179  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1180  * ------------------------------------------------------------------------ */
1181
1182 #ifdef SMP
1183 static void
1184 term_handler(int sig STG_UNUSED)
1185 {
1186   stat_workerStop();
1187   ACQUIRE_LOCK(&term_mutex);
1188   await_death--;
1189   RELEASE_LOCK(&term_mutex);
1190   pthread_exit(NULL);
1191 }
1192 #endif
1193
1194 //@cindex initScheduler
1195 void 
1196 initScheduler(void)
1197 {
1198 #if defined(GRAN)
1199   nat i;
1200
1201   for (i=0; i<=MAX_PROC; i++) {
1202     run_queue_hds[i]      = END_TSO_QUEUE;
1203     run_queue_tls[i]      = END_TSO_QUEUE;
1204     blocked_queue_hds[i]  = END_TSO_QUEUE;
1205     blocked_queue_tls[i]  = END_TSO_QUEUE;
1206     ccalling_threadss[i]  = END_TSO_QUEUE;
1207   }
1208 #else
1209   run_queue_hd      = END_TSO_QUEUE;
1210   run_queue_tl      = END_TSO_QUEUE;
1211   blocked_queue_hd  = END_TSO_QUEUE;
1212   blocked_queue_tl  = END_TSO_QUEUE;
1213 #endif 
1214
1215   suspended_ccalling_threads  = END_TSO_QUEUE;
1216
1217   main_threads = NULL;
1218
1219   context_switch = 0;
1220   interrupted    = 0;
1221
1222   enteredCAFs = END_CAF_LIST;
1223
1224   /* Install the SIGHUP handler */
1225 #ifdef SMP
1226   {
1227     struct sigaction action,oact;
1228
1229     action.sa_handler = term_handler;
1230     sigemptyset(&action.sa_mask);
1231     action.sa_flags = 0;
1232     if (sigaction(SIGTERM, &action, &oact) != 0) {
1233       barf("can't install TERM handler");
1234     }
1235   }
1236 #endif
1237
1238 #ifdef SMP
1239   /* Allocate N Capabilities */
1240   {
1241     nat i;
1242     Capability *cap, *prev;
1243     cap  = NULL;
1244     prev = NULL;
1245     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1246       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1247       cap->link = prev;
1248       prev = cap;
1249     }
1250     free_capabilities = cap;
1251     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1252   }
1253   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1254                              n_free_capabilities););
1255 #endif
1256
1257 #if defined(SMP) || defined(PAR)
1258   initSparkPools();
1259 #endif
1260 }
1261
1262 #ifdef SMP
1263 void
1264 startTasks( void )
1265 {
1266   nat i;
1267   int r;
1268   pthread_t tid;
1269   
1270   /* make some space for saving all the thread ids */
1271   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1272                             "initScheduler:task_ids");
1273   
1274   /* and create all the threads */
1275   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1276     r = pthread_create(&tid,NULL,taskStart,NULL);
1277     if (r != 0) {
1278       barf("startTasks: Can't create new Posix thread");
1279     }
1280     task_ids[i].id = tid;
1281     task_ids[i].mut_time = 0.0;
1282     task_ids[i].mut_etime = 0.0;
1283     task_ids[i].gc_time = 0.0;
1284     task_ids[i].gc_etime = 0.0;
1285     task_ids[i].elapsedtimestart = elapsedtime();
1286     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1287   }
1288 }
1289 #endif
1290
1291 void
1292 exitScheduler( void )
1293 {
1294 #ifdef SMP
1295   nat i;
1296
1297   /* Don't want to use pthread_cancel, since we'd have to install
1298    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1299    * all our locks.
1300    */
1301 #if 0
1302   /* Cancel all our tasks */
1303   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1304     pthread_cancel(task_ids[i].id);
1305   }
1306   
1307   /* Wait for all the tasks to terminate */
1308   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1309     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1310                                task_ids[i].id));
1311     pthread_join(task_ids[i].id, NULL);
1312   }
1313 #endif
1314
1315   /* Send 'em all a SIGHUP.  That should shut 'em up.
1316    */
1317   await_death = RtsFlags.ParFlags.nNodes;
1318   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1319     pthread_kill(task_ids[i].id,SIGTERM);
1320   }
1321   while (await_death > 0) {
1322     sched_yield();
1323   }
1324 #endif
1325 }
1326
1327 /* -----------------------------------------------------------------------------
1328    Managing the per-task allocation areas.
1329    
1330    Each capability comes with an allocation area.  These are
1331    fixed-length block lists into which allocation can be done.
1332
1333    ToDo: no support for two-space collection at the moment???
1334    -------------------------------------------------------------------------- */
1335
1336 /* -----------------------------------------------------------------------------
1337  * waitThread is the external interface for running a new computataion
1338  * and waiting for the result.
1339  *
1340  * In the non-SMP case, we create a new main thread, push it on the 
1341  * main-thread stack, and invoke the scheduler to run it.  The
1342  * scheduler will return when the top main thread on the stack has
1343  * completed or died, and fill in the necessary fields of the
1344  * main_thread structure.
1345  *
1346  * In the SMP case, we create a main thread as before, but we then
1347  * create a new condition variable and sleep on it.  When our new
1348  * main thread has completed, we'll be woken up and the status/result
1349  * will be in the main_thread struct.
1350  * -------------------------------------------------------------------------- */
1351
1352 SchedulerStatus
1353 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1354 {
1355   StgMainThread *m;
1356   SchedulerStatus stat;
1357
1358   ACQUIRE_LOCK(&sched_mutex);
1359   
1360   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1361
1362   m->tso = tso;
1363   m->ret = ret;
1364   m->stat = NoStatus;
1365 #ifdef SMP
1366   pthread_cond_init(&m->wakeup, NULL);
1367 #endif
1368
1369   m->link = main_threads;
1370   main_threads = m;
1371
1372   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1373                               m->tso->id));
1374
1375 #ifdef SMP
1376   do {
1377     pthread_cond_wait(&m->wakeup, &sched_mutex);
1378   } while (m->stat == NoStatus);
1379 #else
1380   schedule();
1381   ASSERT(m->stat != NoStatus);
1382 #endif
1383
1384   stat = m->stat;
1385
1386 #ifdef SMP
1387   pthread_cond_destroy(&m->wakeup);
1388 #endif
1389
1390   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1391                               m->tso->id));
1392   free(m);
1393
1394   RELEASE_LOCK(&sched_mutex);
1395
1396   return stat;
1397 }
1398
1399 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1400 //@subsection Run queue code 
1401
1402 #if 0
1403 /* 
1404    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1405        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1406        implicit global variable that has to be correct when calling these
1407        fcts -- HWL 
1408 */
1409
1410 /* Put the new thread on the head of the runnable queue.
1411  * The caller of createThread better push an appropriate closure
1412  * on this thread's stack before the scheduler is invoked.
1413  */
1414 static /* inline */ void
1415 add_to_run_queue(tso)
1416 StgTSO* tso; 
1417 {
1418   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1419   tso->link = run_queue_hd;
1420   run_queue_hd = tso;
1421   if (run_queue_tl == END_TSO_QUEUE) {
1422     run_queue_tl = tso;
1423   }
1424 }
1425
1426 /* Put the new thread at the end of the runnable queue. */
1427 static /* inline */ void
1428 push_on_run_queue(tso)
1429 StgTSO* tso; 
1430 {
1431   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1432   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1433   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1434   if (run_queue_hd == END_TSO_QUEUE) {
1435     run_queue_hd = tso;
1436   } else {
1437     run_queue_tl->link = tso;
1438   }
1439   run_queue_tl = tso;
1440 }
1441
1442 /* 
1443    Should be inlined because it's used very often in schedule.  The tso
1444    argument is actually only needed in GranSim, where we want to have the
1445    possibility to schedule *any* TSO on the run queue, irrespective of the
1446    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1447    the run queue and dequeue the tso, adjusting the links in the queue. 
1448 */
1449 //@cindex take_off_run_queue
1450 static /* inline */ StgTSO*
1451 take_off_run_queue(StgTSO *tso) {
1452   StgTSO *t, *prev;
1453
1454   /* 
1455      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1456
1457      if tso is specified, unlink that tso from the run_queue (doesn't have
1458      to be at the beginning of the queue); GranSim only 
1459   */
1460   if (tso!=END_TSO_QUEUE) {
1461     /* find tso in queue */
1462     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1463          t!=END_TSO_QUEUE && t!=tso;
1464          prev=t, t=t->link) 
1465       /* nothing */ ;
1466     ASSERT(t==tso);
1467     /* now actually dequeue the tso */
1468     if (prev!=END_TSO_QUEUE) {
1469       ASSERT(run_queue_hd!=t);
1470       prev->link = t->link;
1471     } else {
1472       /* t is at beginning of thread queue */
1473       ASSERT(run_queue_hd==t);
1474       run_queue_hd = t->link;
1475     }
1476     /* t is at end of thread queue */
1477     if (t->link==END_TSO_QUEUE) {
1478       ASSERT(t==run_queue_tl);
1479       run_queue_tl = prev;
1480     } else {
1481       ASSERT(run_queue_tl!=t);
1482     }
1483     t->link = END_TSO_QUEUE;
1484   } else {
1485     /* take tso from the beginning of the queue; std concurrent code */
1486     t = run_queue_hd;
1487     if (t != END_TSO_QUEUE) {
1488       run_queue_hd = t->link;
1489       t->link = END_TSO_QUEUE;
1490       if (run_queue_hd == END_TSO_QUEUE) {
1491         run_queue_tl = END_TSO_QUEUE;
1492       }
1493     }
1494   }
1495   return t;
1496 }
1497
1498 #endif /* 0 */
1499
1500 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1501 //@subsection Garbage Collextion Routines
1502
1503 /* ---------------------------------------------------------------------------
1504    Where are the roots that we know about?
1505
1506         - all the threads on the runnable queue
1507         - all the threads on the blocked queue
1508         - all the thread currently executing a _ccall_GC
1509         - all the "main threads"
1510      
1511    ------------------------------------------------------------------------ */
1512
1513 /* This has to be protected either by the scheduler monitor, or by the
1514         garbage collection monitor (probably the latter).
1515         KH @ 25/10/99
1516 */
1517
1518 static void GetRoots(void)
1519 {
1520   StgMainThread *m;
1521
1522 #if defined(GRAN)
1523   {
1524     nat i;
1525     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1526       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1527         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1528       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1529         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1530       
1531       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1532         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1533       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1534         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1535       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1536         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1537     }
1538   }
1539
1540   markEventQueue();
1541
1542 #else /* !GRAN */
1543   run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1544   run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1545
1546   blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1547   blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1548 #endif 
1549
1550   for (m = main_threads; m != NULL; m = m->link) {
1551     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1552   }
1553   suspended_ccalling_threads = 
1554     (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1555
1556 #if defined(SMP) || defined(PAR) || defined(GRAN)
1557   markSparkQueue();
1558 #endif
1559 }
1560
1561 /* -----------------------------------------------------------------------------
1562    performGC
1563
1564    This is the interface to the garbage collector from Haskell land.
1565    We provide this so that external C code can allocate and garbage
1566    collect when called from Haskell via _ccall_GC.
1567
1568    It might be useful to provide an interface whereby the programmer
1569    can specify more roots (ToDo).
1570    
1571    This needs to be protected by the GC condition variable above.  KH.
1572    -------------------------------------------------------------------------- */
1573
1574 void (*extra_roots)(void);
1575
1576 void
1577 performGC(void)
1578 {
1579   GarbageCollect(GetRoots);
1580 }
1581
1582 static void
1583 AllRoots(void)
1584 {
1585   GetRoots();                   /* the scheduler's roots */
1586   extra_roots();                /* the user's roots */
1587 }
1588
1589 void
1590 performGCWithRoots(void (*get_roots)(void))
1591 {
1592   extra_roots = get_roots;
1593
1594   GarbageCollect(AllRoots);
1595 }
1596
1597 /* -----------------------------------------------------------------------------
1598    Stack overflow
1599
1600    If the thread has reached its maximum stack size, then raise the
1601    StackOverflow exception in the offending thread.  Otherwise
1602    relocate the TSO into a larger chunk of memory and adjust its stack
1603    size appropriately.
1604    -------------------------------------------------------------------------- */
1605
1606 static StgTSO *
1607 threadStackOverflow(StgTSO *tso)
1608 {
1609   nat new_stack_size, new_tso_size, diff, stack_words;
1610   StgPtr new_sp;
1611   StgTSO *dest;
1612
1613   IF_DEBUG(sanity,checkTSO(tso));
1614   if (tso->stack_size >= tso->max_stack_size) {
1615 #if 0
1616     /* If we're debugging, just print out the top of the stack */
1617     printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
1618                                      tso->sp+64));
1619 #endif
1620 #ifdef INTERPRETER
1621     fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
1622     exit(1);
1623 #else
1624     /* Send this thread the StackOverflow exception */
1625     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
1626 #endif
1627     return tso;
1628   }
1629
1630   /* Try to double the current stack size.  If that takes us over the
1631    * maximum stack size for this thread, then use the maximum instead.
1632    * Finally round up so the TSO ends up as a whole number of blocks.
1633    */
1634   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
1635   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
1636                                        TSO_STRUCT_SIZE)/sizeof(W_);
1637   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
1638   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
1639
1640   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
1641
1642   dest = (StgTSO *)allocate(new_tso_size);
1643   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
1644
1645   /* copy the TSO block and the old stack into the new area */
1646   memcpy(dest,tso,TSO_STRUCT_SIZE);
1647   stack_words = tso->stack + tso->stack_size - tso->sp;
1648   new_sp = (P_)dest + new_tso_size - stack_words;
1649   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
1650
1651   /* relocate the stack pointers... */
1652   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
1653   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
1654   dest->sp    = new_sp;
1655   dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
1656   dest->stack_size = new_stack_size;
1657         
1658   /* and relocate the update frame list */
1659   relocate_TSO(tso, dest);
1660
1661   /* Mark the old TSO as relocated.  We have to check for relocated
1662    * TSOs in the garbage collector and any primops that deal with TSOs.
1663    *
1664    * It's important to set the sp and su values to just beyond the end
1665    * of the stack, so we don't attempt to scavenge any part of the
1666    * dead TSO's stack.
1667    */
1668   tso->whatNext = ThreadRelocated;
1669   tso->link = dest;
1670   tso->sp = (P_)&(tso->stack[tso->stack_size]);
1671   tso->su = (StgUpdateFrame *)tso->sp;
1672   tso->why_blocked = NotBlocked;
1673   dest->mut_link = NULL;
1674
1675   IF_DEBUG(sanity,checkTSO(tso));
1676 #if 0
1677   IF_DEBUG(scheduler,printTSO(dest));
1678 #endif
1679
1680   return dest;
1681 }
1682
1683 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
1684 //@subsection Blocking Queue Routines
1685
1686 /* ---------------------------------------------------------------------------
1687    Wake up a queue that was blocked on some resource.
1688    ------------------------------------------------------------------------ */
1689
1690 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
1691
1692 #if defined(GRAN)
1693 static inline void
1694 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
1695 {
1696 }
1697 #elif defined(PAR)
1698 static inline void
1699 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
1700 {
1701   /* write RESUME events to log file and
1702      update blocked and fetch time (depending on type of the orig closure) */
1703   if (RtsFlags.ParFlags.ParStats.Full) {
1704     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
1705                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
1706                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
1707
1708     switch (get_itbl(node)->type) {
1709         case FETCH_ME_BQ:
1710           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
1711           break;
1712         case RBH:
1713         case FETCH_ME:
1714         case BLACKHOLE_BQ:
1715           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
1716           break;
1717         default:
1718           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
1719         }
1720       }
1721 }
1722 #endif
1723
1724 #if defined(GRAN)
1725 static StgBlockingQueueElement *
1726 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
1727 {
1728     StgBlockingQueueElement *next;
1729     PEs node_loc, tso_loc;
1730
1731     node_loc = where_is(node); // should be lifted out of loop
1732     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
1733     tso_loc = where_is(tso);
1734     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
1735       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
1736       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
1737       bq_processing_time += RtsFlags.GranFlags.Costs.lunblocktime;
1738       // insertThread(tso, node_loc);
1739       new_event(tso_loc, tso_loc,
1740                 CurrentTime[CurrentProc]+bq_processing_time,
1741                 ResumeThread,
1742                 tso, node, (rtsSpark*)NULL);
1743       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
1744       // len_local++;
1745       // len++;
1746     } else { // TSO is remote (actually should be FMBQ)
1747       bq_processing_time += RtsFlags.GranFlags.Costs.mpacktime;
1748       bq_processing_time += RtsFlags.GranFlags.Costs.gunblocktime;
1749       new_event(tso_loc, CurrentProc, 
1750                 CurrentTime[CurrentProc]+bq_processing_time+
1751                 RtsFlags.GranFlags.Costs.latency,
1752                 UnblockThread,
1753                 tso, node, (rtsSpark*)NULL);
1754       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
1755       bq_processing_time += RtsFlags.GranFlags.Costs.mtidytime;
1756       // len++;
1757     }      
1758     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
1759     IF_GRAN_DEBUG(bq,
1760                   fprintf(stderr," %s TSO %d (%p) [PE %d] (blocked_on=%p) (next=%p) ,",
1761                           (node_loc==tso_loc ? "Local" : "Global"), 
1762                           tso->id, tso, CurrentProc, tso->blocked_on, tso->link))
1763     tso->blocked_on = NULL;
1764     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
1765                              tso->id, tso));
1766   }
1767
1768   /* if this is the BQ of an RBH, we have to put back the info ripped out of
1769      the closure to make room for the anchor of the BQ */
1770   if (next!=END_BQ_QUEUE) {
1771     ASSERT(get_itbl(node)->type == RBH && get_itbl(next)->type == CONSTR);
1772     /*
1773     ASSERT((info_ptr==&RBH_Save_0_info) ||
1774            (info_ptr==&RBH_Save_1_info) ||
1775            (info_ptr==&RBH_Save_2_info));
1776     */
1777     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
1778     ((StgRBH *)node)->blocking_queue = ((StgRBHSave *)next)->payload[0];
1779     ((StgRBH *)node)->mut_link       = ((StgRBHSave *)next)->payload[1];
1780
1781     IF_GRAN_DEBUG(bq,
1782                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
1783                         node, info_type(node)));
1784   }
1785 }
1786 #elif defined(PAR)
1787 static StgBlockingQueueElement *
1788 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
1789 {
1790     StgBlockingQueueElement *next;
1791
1792     switch (get_itbl(bqe)->type) {
1793     case TSO:
1794       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
1795       /* if it's a TSO just push it onto the run_queue */
1796       next = bqe->link;
1797       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
1798       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
1799       THREAD_RUNNABLE();
1800       unblockCount(bqe, node);
1801       /* reset blocking status after dumping event */
1802       ((StgTSO *)bqe)->why_blocked = NotBlocked;
1803       break;
1804
1805     case BLOCKED_FETCH:
1806       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
1807       next = bqe->link;
1808       bqe->link = PendingFetches;
1809       PendingFetches = bqe;
1810       break;
1811
1812 # if defined(DEBUG)
1813       /* can ignore this case in a non-debugging setup; 
1814          see comments on RBHSave closures above */
1815     case CONSTR:
1816       /* check that the closure is an RBHSave closure */
1817       ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
1818              get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
1819              get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
1820       break;
1821
1822     default:
1823       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
1824            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
1825            (StgClosure *)bqe);
1826 # endif
1827     }
1828   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
1829   return next;
1830 }
1831
1832 #else /* !GRAN && !PAR */
1833 static StgTSO *
1834 unblockOneLocked(StgTSO *tso)
1835 {
1836   StgTSO *next;
1837
1838   ASSERT(get_itbl(tso)->type == TSO);
1839   ASSERT(tso->why_blocked != NotBlocked);
1840   tso->why_blocked = NotBlocked;
1841   next = tso->link;
1842   PUSH_ON_RUN_QUEUE(tso);
1843   THREAD_RUNNABLE();
1844   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
1845   return next;
1846 }
1847 #endif
1848
1849 #if defined(PAR) || defined(GRAN)
1850 inline StgTSO *
1851 unblockOne(StgTSO *tso, StgClosure *node)
1852 {
1853   ACQUIRE_LOCK(&sched_mutex);
1854   tso = unblockOneLocked(tso, node);
1855   RELEASE_LOCK(&sched_mutex);
1856   return tso;
1857 }
1858 #else
1859 inline StgTSO *
1860 unblockOne(StgTSO *tso)
1861 {
1862   ACQUIRE_LOCK(&sched_mutex);
1863   tso = unblockOneLocked(tso);
1864   RELEASE_LOCK(&sched_mutex);
1865   return tso;
1866 }
1867 #endif
1868
1869 #if defined(GRAN)
1870 void 
1871 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
1872 {
1873   StgBlockingQueueElement *bqe, *next;
1874   StgTSO *tso;
1875   PEs node_loc, tso_loc;
1876   rtsTime bq_processing_time = 0;
1877   nat len = 0, len_local = 0;
1878
1879   IF_GRAN_DEBUG(bq, 
1880                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
1881                       node, CurrentProc, CurrentTime[CurrentProc], 
1882                       CurrentTSO->id, CurrentTSO));
1883
1884   node_loc = where_is(node);
1885
1886   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
1887          get_itbl(q)->type == CONSTR); // closure (type constructor)
1888   ASSERT(is_unique(node));
1889
1890   /* FAKE FETCH: magically copy the node to the tso's proc;
1891      no Fetch necessary because in reality the node should not have been 
1892      moved to the other PE in the first place
1893   */
1894   if (CurrentProc!=node_loc) {
1895     IF_GRAN_DEBUG(bq, 
1896                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
1897                         node, node_loc, CurrentProc, CurrentTSO->id, 
1898                         // CurrentTSO, where_is(CurrentTSO),
1899                         node->header.gran.procs));
1900     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
1901     IF_GRAN_DEBUG(bq, 
1902                   belch("## new bitmask of node %p is %#x",
1903                         node, node->header.gran.procs));
1904     if (RtsFlags.GranFlags.GranSimStats.Global) {
1905       globalGranStats.tot_fake_fetches++;
1906     }
1907   }
1908
1909   bqe = q;
1910   // ToDo: check: ASSERT(CurrentProc==node_loc);
1911   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
1912     //next = bqe->link;
1913     /* 
1914        bqe points to the current element in the queue
1915        next points to the next element in the queue
1916     */
1917     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
1918     //tso_loc = where_is(tso);
1919     bqe = unblockOneLocked(bqe, node);
1920   }
1921
1922   /* statistics gathering */
1923   /* ToDo: fix counters
1924   if (RtsFlags.GranFlags.GranSimStats.Global) {
1925     globalGranStats.tot_bq_processing_time += bq_processing_time;
1926     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
1927     globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
1928     globalGranStats.tot_awbq++;             // total no. of bqs awakened
1929   }
1930   IF_GRAN_DEBUG(bq,
1931                 fprintf(stderr,"## BQ Stats of %p: [%d entries, %d local] %s\n",
1932                         node, len, len_local, (next!=END_TSO_QUEUE) ? "RBH" : ""));
1933   */
1934 }
1935 #elif defined(PAR)
1936 void 
1937 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
1938 {
1939   StgBlockingQueueElement *bqe, *next;
1940
1941   ACQUIRE_LOCK(&sched_mutex);
1942
1943   IF_PAR_DEBUG(verbose, 
1944                belch("## AwBQ for node %p on [%x]: ",
1945                      node, mytid));
1946
1947   ASSERT(get_itbl(q)->type == TSO ||           
1948          get_itbl(q)->type == BLOCKED_FETCH || 
1949          get_itbl(q)->type == CONSTR); 
1950
1951   bqe = q;
1952   while (get_itbl(bqe)->type==TSO || 
1953          get_itbl(bqe)->type==BLOCKED_FETCH) {
1954     bqe = unblockOneLocked(bqe, node);
1955   }
1956   RELEASE_LOCK(&sched_mutex);
1957 }
1958
1959 #else   /* !GRAN && !PAR */
1960 void
1961 awakenBlockedQueue(StgTSO *tso)
1962 {
1963   ACQUIRE_LOCK(&sched_mutex);
1964   while (tso != END_TSO_QUEUE) {
1965     tso = unblockOneLocked(tso);
1966   }
1967   RELEASE_LOCK(&sched_mutex);
1968 }
1969 #endif
1970
1971 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
1972 //@subsection Exception Handling Routines
1973
1974 /* ---------------------------------------------------------------------------
1975    Interrupt execution
1976    - usually called inside a signal handler so it mustn't do anything fancy.   
1977    ------------------------------------------------------------------------ */
1978
1979 void
1980 interruptStgRts(void)
1981 {
1982     interrupted    = 1;
1983     context_switch = 1;
1984 }
1985
1986 /* -----------------------------------------------------------------------------
1987    Unblock a thread
1988
1989    This is for use when we raise an exception in another thread, which
1990    may be blocked.
1991    This has nothing to do with the UnblockThread event in GranSim. -- HWL
1992    -------------------------------------------------------------------------- */
1993
1994 static void
1995 unblockThread(StgTSO *tso)
1996 {
1997   StgTSO *t, **last;
1998
1999   ACQUIRE_LOCK(&sched_mutex);
2000   switch (tso->why_blocked) {
2001
2002   case NotBlocked:
2003     return;  /* not blocked */
2004
2005   case BlockedOnMVar:
2006     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2007     {
2008       StgTSO *last_tso = END_TSO_QUEUE;
2009       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2010
2011       last = &mvar->head;
2012       for (t = mvar->head; t != END_TSO_QUEUE; 
2013            last = &t->link, last_tso = t, t = t->link) {
2014         if (t == tso) {
2015           *last = tso->link;
2016           if (mvar->tail == tso) {
2017             mvar->tail = last_tso;
2018           }
2019           goto done;
2020         }
2021       }
2022       barf("unblockThread (MVAR): TSO not found");
2023     }
2024
2025   case BlockedOnBlackHole:
2026     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2027     {
2028       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2029
2030       last = &bq->blocking_queue;
2031       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2032            last = &t->link, t = t->link) {
2033         if (t == tso) {
2034           *last = tso->link;
2035           goto done;
2036         }
2037       }
2038       barf("unblockThread (BLACKHOLE): TSO not found");
2039     }
2040
2041   case BlockedOnException:
2042     {
2043       StgTSO *target  = tso->block_info.tso;
2044
2045       ASSERT(get_itbl(target)->type == TSO);
2046       ASSERT(target->blocked_exceptions != NULL);
2047
2048       last = &target->blocked_exceptions;
2049       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2050            last = &t->link, t = t->link) {
2051         ASSERT(get_itbl(t)->type == TSO);
2052         if (t == tso) {
2053           *last = tso->link;
2054           goto done;
2055         }
2056       }
2057       barf("unblockThread (Exception): TSO not found");
2058     }
2059
2060   case BlockedOnDelay:
2061   case BlockedOnRead:
2062   case BlockedOnWrite:
2063     {
2064       StgTSO *prev = NULL;
2065       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2066            prev = t, t = t->link) {
2067         if (t == tso) {
2068           if (prev == NULL) {
2069             blocked_queue_hd = t->link;
2070             if (blocked_queue_tl == t) {
2071               blocked_queue_tl = END_TSO_QUEUE;
2072             }
2073           } else {
2074             prev->link = t->link;
2075             if (blocked_queue_tl == t) {
2076               blocked_queue_tl = prev;
2077             }
2078           }
2079           goto done;
2080         }
2081       }
2082       barf("unblockThread (I/O): TSO not found");
2083     }
2084
2085   default:
2086     barf("unblockThread");
2087   }
2088
2089  done:
2090   tso->link = END_TSO_QUEUE;
2091   tso->why_blocked = NotBlocked;
2092   tso->block_info.closure = NULL;
2093   PUSH_ON_RUN_QUEUE(tso);
2094   RELEASE_LOCK(&sched_mutex);
2095 }
2096
2097 /* -----------------------------------------------------------------------------
2098  * raiseAsync()
2099  *
2100  * The following function implements the magic for raising an
2101  * asynchronous exception in an existing thread.
2102  *
2103  * We first remove the thread from any queue on which it might be
2104  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2105  *
2106  * We strip the stack down to the innermost CATCH_FRAME, building
2107  * thunks in the heap for all the active computations, so they can 
2108  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2109  * an application of the handler to the exception, and push it on
2110  * the top of the stack.
2111  * 
2112  * How exactly do we save all the active computations?  We create an
2113  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2114  * AP_UPDs pushes everything from the corresponding update frame
2115  * upwards onto the stack.  (Actually, it pushes everything up to the
2116  * next update frame plus a pointer to the next AP_UPD object.
2117  * Entering the next AP_UPD object pushes more onto the stack until we
2118  * reach the last AP_UPD object - at which point the stack should look
2119  * exactly as it did when we killed the TSO and we can continue
2120  * execution by entering the closure on top of the stack.
2121  *
2122  * We can also kill a thread entirely - this happens if either (a) the 
2123  * exception passed to raiseAsync is NULL, or (b) there's no
2124  * CATCH_FRAME on the stack.  In either case, we strip the entire
2125  * stack and replace the thread with a zombie.
2126  *
2127  * -------------------------------------------------------------------------- */
2128  
2129 void 
2130 deleteThread(StgTSO *tso)
2131 {
2132   raiseAsync(tso,NULL);
2133 }
2134
2135 void
2136 raiseAsync(StgTSO *tso, StgClosure *exception)
2137 {
2138   StgUpdateFrame* su = tso->su;
2139   StgPtr          sp = tso->sp;
2140   
2141   /* Thread already dead? */
2142   if (tso->whatNext == ThreadComplete || tso->whatNext == ThreadKilled) {
2143     return;
2144   }
2145
2146   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2147
2148   /* Remove it from any blocking queues */
2149   unblockThread(tso);
2150
2151   /* The stack freezing code assumes there's a closure pointer on
2152    * the top of the stack.  This isn't always the case with compiled
2153    * code, so we have to push a dummy closure on the top which just
2154    * returns to the next return address on the stack.
2155    */
2156   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2157     *(--sp) = (W_)&dummy_ret_closure;
2158   }
2159
2160   while (1) {
2161     int words = ((P_)su - (P_)sp) - 1;
2162     nat i;
2163     StgAP_UPD * ap;
2164
2165     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2166      * then build PAP(handler,exception,realworld#), and leave it on
2167      * top of the stack ready to enter.
2168      */
2169     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2170       StgCatchFrame *cf = (StgCatchFrame *)su;
2171       /* we've got an exception to raise, so let's pass it to the
2172        * handler in this frame.
2173        */
2174       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2175       TICK_ALLOC_UPD_PAP(3,0);
2176       SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2177               
2178       ap->n_args = 2;
2179       ap->fun = cf->handler;    /* :: Exception -> IO a */
2180       ap->payload[0] = (P_)exception;
2181       ap->payload[1] = ARG_TAG(0); /* realworld token */
2182
2183       /* throw away the stack from Sp up to and including the
2184        * CATCH_FRAME.
2185        */
2186       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2187       tso->su = cf->link;
2188
2189       /* Restore the blocked/unblocked state for asynchronous exceptions
2190        * at the CATCH_FRAME.  
2191        *
2192        * If exceptions were unblocked at the catch, arrange that they
2193        * are unblocked again after executing the handler by pushing an
2194        * unblockAsyncExceptions_ret stack frame.
2195        */
2196       if (!cf->exceptions_blocked) {
2197         *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2198       }
2199       
2200       /* Ensure that async exceptions are blocked when running the handler.
2201        */
2202       if (tso->blocked_exceptions == NULL) {
2203         tso->blocked_exceptions = END_TSO_QUEUE;
2204       }
2205       
2206       /* Put the newly-built PAP on top of the stack, ready to execute
2207        * when the thread restarts.
2208        */
2209       sp[0] = (W_)ap;
2210       tso->sp = sp;
2211       tso->whatNext = ThreadEnterGHC;
2212       return;
2213     }
2214
2215     /* First build an AP_UPD consisting of the stack chunk above the
2216      * current update frame, with the top word on the stack as the
2217      * fun field.
2218      */
2219     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2220     
2221     ASSERT(words >= 0);
2222     
2223     ap->n_args = words;
2224     ap->fun    = (StgClosure *)sp[0];
2225     sp++;
2226     for(i=0; i < (nat)words; ++i) {
2227       ap->payload[i] = (P_)*sp++;
2228     }
2229     
2230     switch (get_itbl(su)->type) {
2231       
2232     case UPDATE_FRAME:
2233       {
2234         SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2235         TICK_ALLOC_UP_THK(words+1,0);
2236         
2237         IF_DEBUG(scheduler,
2238                  fprintf(stderr,  "scheduler: Updating ");
2239                  printPtr((P_)su->updatee); 
2240                  fprintf(stderr,  " with ");
2241                  printObj((StgClosure *)ap);
2242                  );
2243         
2244         /* Replace the updatee with an indirection - happily
2245          * this will also wake up any threads currently
2246          * waiting on the result.
2247          */
2248         UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2249         su = su->link;
2250         sp += sizeofW(StgUpdateFrame) -1;
2251         sp[0] = (W_)ap; /* push onto stack */
2252         break;
2253       }
2254       
2255     case CATCH_FRAME:
2256       {
2257         StgCatchFrame *cf = (StgCatchFrame *)su;
2258         StgClosure* o;
2259         
2260         /* We want a PAP, not an AP_UPD.  Fortunately, the
2261          * layout's the same.
2262          */
2263         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2264         TICK_ALLOC_UPD_PAP(words+1,0);
2265         
2266         /* now build o = FUN(catch,ap,handler) */
2267         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2268         TICK_ALLOC_FUN(2,0);
2269         SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2270         o->payload[0] = (StgClosure *)ap;
2271         o->payload[1] = cf->handler;
2272         
2273         IF_DEBUG(scheduler,
2274                  fprintf(stderr,  "scheduler: Built ");
2275                  printObj((StgClosure *)o);
2276                  );
2277         
2278         /* pop the old handler and put o on the stack */
2279         su = cf->link;
2280         sp += sizeofW(StgCatchFrame) - 1;
2281         sp[0] = (W_)o;
2282         break;
2283       }
2284       
2285     case SEQ_FRAME:
2286       {
2287         StgSeqFrame *sf = (StgSeqFrame *)su;
2288         StgClosure* o;
2289         
2290         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2291         TICK_ALLOC_UPD_PAP(words+1,0);
2292         
2293         /* now build o = FUN(seq,ap) */
2294         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2295         TICK_ALLOC_SE_THK(1,0);
2296         SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2297         payloadCPtr(o,0) = (StgClosure *)ap;
2298         
2299         IF_DEBUG(scheduler,
2300                  fprintf(stderr,  "scheduler: Built ");
2301                  printObj((StgClosure *)o);
2302                  );
2303         
2304         /* pop the old handler and put o on the stack */
2305         su = sf->link;
2306         sp += sizeofW(StgSeqFrame) - 1;
2307         sp[0] = (W_)o;
2308         break;
2309       }
2310       
2311     case STOP_FRAME:
2312       /* We've stripped the entire stack, the thread is now dead. */
2313       sp += sizeofW(StgStopFrame) - 1;
2314       sp[0] = (W_)exception;    /* save the exception */
2315       tso->whatNext = ThreadKilled;
2316       tso->su = (StgUpdateFrame *)(sp+1);
2317       tso->sp = sp;
2318       return;
2319       
2320     default:
2321       barf("raiseAsync");
2322     }
2323   }
2324   barf("raiseAsync");
2325 }
2326
2327 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2328 //@subsection Debugging Routines
2329
2330 /* -----------------------------------------------------------------------------
2331    Debugging: why is a thread blocked
2332    -------------------------------------------------------------------------- */
2333
2334 #ifdef DEBUG
2335
2336 void printThreadBlockage(StgTSO *tso)
2337 {
2338   switch (tso->why_blocked) {
2339   case BlockedOnRead:
2340     fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2341     break;
2342   case BlockedOnWrite:
2343     fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2344     break;
2345   case BlockedOnDelay:
2346     fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2347     break;
2348   case BlockedOnMVar:
2349     fprintf(stderr,"blocked on an MVar");
2350     break;
2351   case BlockedOnException:
2352     fprintf(stderr,"blocked on delivering an exception to thread %d",
2353             tso->block_info.tso->id);
2354     break;
2355   case BlockedOnBlackHole:
2356     fprintf(stderr,"blocked on a black hole");
2357     break;
2358   case NotBlocked:
2359     fprintf(stderr,"not blocked");
2360     break;
2361 #if defined(PAR)
2362   case BlockedOnGA:
2363     fprintf(stderr,"blocked on global address");
2364     break;
2365 #endif
2366   }
2367 }
2368
2369 /* 
2370    Print a whole blocking queue attached to node (debugging only).
2371 */
2372 //@cindex print_bq
2373 # if defined(PAR)
2374 void 
2375 print_bq (StgClosure *node)
2376 {
2377   StgBlockingQueueElement *bqe;
2378   StgTSO *tso;
2379   rtsBool end;
2380
2381   fprintf(stderr,"## BQ of closure %p (%s): ",
2382           node, info_type(node));
2383
2384   /* should cover all closures that may have a blocking queue */
2385   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2386          get_itbl(node)->type == FETCH_ME_BQ ||
2387          get_itbl(node)->type == RBH);
2388     
2389   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2390   /* 
2391      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2392   */
2393   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2394        !end; // iterate until bqe points to a CONSTR
2395        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2396     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2397     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2398     /* types of closures that may appear in a blocking queue */
2399     ASSERT(get_itbl(bqe)->type == TSO ||           
2400            get_itbl(bqe)->type == BLOCKED_FETCH || 
2401            get_itbl(bqe)->type == CONSTR); 
2402     /* only BQs of an RBH end with an RBH_Save closure */
2403     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2404
2405     switch (get_itbl(bqe)->type) {
2406     case TSO:
2407       fprintf(stderr," TSO %d (%x),",
2408               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
2409       break;
2410     case BLOCKED_FETCH:
2411       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
2412               ((StgBlockedFetch *)bqe)->node, 
2413               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
2414               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
2415               ((StgBlockedFetch *)bqe)->ga.weight);
2416       break;
2417     case CONSTR:
2418       fprintf(stderr," %s (IP %p),",
2419               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2420                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2421                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2422                "RBH_Save_?"), get_itbl(bqe));
2423       break;
2424     default:
2425       barf("Unexpected closure type %s in blocking queue of %p (%s)",
2426            info_type(bqe), node, info_type(node));
2427       break;
2428     }
2429   } /* for */
2430   fputc('\n', stderr);
2431 }
2432 # elif defined(GRAN)
2433 void 
2434 print_bq (StgClosure *node)
2435 {
2436   StgBlockingQueueElement *bqe;
2437   StgTSO *tso;
2438   PEs node_loc, tso_loc;
2439   rtsBool end;
2440
2441   /* should cover all closures that may have a blocking queue */
2442   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2443          get_itbl(node)->type == FETCH_ME_BQ ||
2444          get_itbl(node)->type == RBH);
2445     
2446   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2447   node_loc = where_is(node);
2448
2449   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
2450           node, info_type(node), node_loc);
2451
2452   /* 
2453      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2454   */
2455   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2456        !end; // iterate until bqe points to a CONSTR
2457        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2458     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2459     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2460     /* types of closures that may appear in a blocking queue */
2461     ASSERT(get_itbl(bqe)->type == TSO ||           
2462            get_itbl(bqe)->type == CONSTR); 
2463     /* only BQs of an RBH end with an RBH_Save closure */
2464     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2465
2466     tso_loc = where_is((StgClosure *)bqe);
2467     switch (get_itbl(bqe)->type) {
2468     case TSO:
2469       fprintf(stderr," TSO %d (%x) on [PE %d],",
2470               ((StgTSO *)bqe)->id, ((StgTSO *)bqe), tso_loc);
2471       break;
2472     case CONSTR:
2473       fprintf(stderr," %s (IP %p),",
2474               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2475                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2476                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2477                "RBH_Save_?"), get_itbl(bqe));
2478       break;
2479     default:
2480       barf("Unexpected closure type %s in blocking queue of %p (%s)",
2481            info_type(bqe), node, info_type(node));
2482       break;
2483     }
2484   } /* for */
2485   fputc('\n', stderr);
2486 }
2487 #else
2488 /* 
2489    Nice and easy: only TSOs on the blocking queue
2490 */
2491 void 
2492 print_bq (StgClosure *node)
2493 {
2494   StgTSO *tso;
2495
2496   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2497   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
2498        tso != END_TSO_QUEUE; 
2499        tso=tso->link) {
2500     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
2501     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
2502     fprintf(stderr," TSO %d (%p),", tso->id, tso);
2503   }
2504   fputc('\n', stderr);
2505 }
2506 # endif
2507
2508 #if defined(PAR)
2509 static nat
2510 run_queue_len(void)
2511 {
2512   nat i;
2513   StgTSO *tso;
2514
2515   for (i=0, tso=run_queue_hd; 
2516        tso != END_TSO_QUEUE;
2517        i++, tso=tso->link)
2518     /* nothing */
2519
2520   return i;
2521 }
2522 #endif
2523
2524 static void
2525 sched_belch(char *s, ...)
2526 {
2527   va_list ap;
2528   va_start(ap,s);
2529 #ifdef SMP
2530   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
2531 #else
2532   fprintf(stderr, "scheduler: ");
2533 #endif
2534   vfprintf(stderr, s, ap);
2535   fprintf(stderr, "\n");
2536 }
2537
2538 #endif /* DEBUG */
2539
2540 //@node Index,  , Debugging Routines, Main scheduling code
2541 //@subsection Index
2542
2543 //@index
2544 //* MainRegTable::  @cindex\s-+MainRegTable
2545 //* StgMainThread::  @cindex\s-+StgMainThread
2546 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
2547 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
2548 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
2549 //* context_switch::  @cindex\s-+context_switch
2550 //* createThread::  @cindex\s-+createThread
2551 //* free_capabilities::  @cindex\s-+free_capabilities
2552 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
2553 //* initScheduler::  @cindex\s-+initScheduler
2554 //* interrupted::  @cindex\s-+interrupted
2555 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
2556 //* next_thread_id::  @cindex\s-+next_thread_id
2557 //* print_bq::  @cindex\s-+print_bq
2558 //* run_queue_hd::  @cindex\s-+run_queue_hd
2559 //* run_queue_tl::  @cindex\s-+run_queue_tl
2560 //* sched_mutex::  @cindex\s-+sched_mutex
2561 //* schedule::  @cindex\s-+schedule
2562 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
2563 //* task_ids::  @cindex\s-+task_ids
2564 //* term_mutex::  @cindex\s-+term_mutex
2565 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
2566 //@end index