[project @ 2000-02-29 14:38:19 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.47 2000/02/29 14:38:19 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-1999
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Prototypes::                
41 //* Main scheduling loop::      
42 //* Suspend and Resume::        
43 //* Run queue code::            
44 //* Garbage Collextion Routines::  
45 //* Blocking Queue Routines::   
46 //* Exception Handling Routines::  
47 //* Debugging Routines::        
48 //* Index::                     
49 //@end menu
50
51 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
52 //@subsection Includes
53
54 #include "Rts.h"
55 #include "SchedAPI.h"
56 #include "RtsUtils.h"
57 #include "RtsFlags.h"
58 #include "Storage.h"
59 #include "StgRun.h"
60 #include "StgStartup.h"
61 #include "GC.h"
62 #include "Hooks.h"
63 #include "Schedule.h"
64 #include "StgMiscClosures.h"
65 #include "Storage.h"
66 #include "Evaluator.h"
67 #include "Exception.h"
68 #include "Printer.h"
69 #include "Main.h"
70 #include "Signals.h"
71 #include "Profiling.h"
72 #include "Sanity.h"
73 #include "Stats.h"
74 #include "Sparks.h"
75 #if defined(GRAN) || defined(PAR)
76 # include "GranSimRts.h"
77 # include "GranSim.h"
78 # include "ParallelRts.h"
79 # include "Parallel.h"
80 # include "ParallelDebug.h"
81 # include "FetchMe.h"
82 # include "HLC.h"
83 #endif
84
85 #include <stdarg.h>
86
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
89
90 /* Main threads:
91  *
92  * These are the threads which clients have requested that we run.  
93  *
94  * In an SMP build, we might have several concurrent clients all
95  * waiting for results, and each one will wait on a condition variable
96  * until the result is available.
97  *
98  * In non-SMP, clients are strictly nested: the first client calls
99  * into the RTS, which might call out again to C with a _ccall_GC, and
100  * eventually re-enter the RTS.
101  *
102  * Main threads information is kept in a linked list:
103  */
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
106   StgTSO *         tso;
107   SchedulerStatus  stat;
108   StgClosure **    ret;
109 #ifdef SMP
110   pthread_cond_t wakeup;
111 #endif
112   struct StgMainThread_ *link;
113 } StgMainThread;
114
115 /* Main thread queue.
116  * Locks required: sched_mutex.
117  */
118 static StgMainThread *main_threads;
119
120 /* Thread queues.
121  * Locks required: sched_mutex.
122  */
123
124 #if defined(GRAN)
125
126 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
127 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
128
129 /* 
130    In GranSim we have a runable and a blocked queue for each processor.
131    In order to minimise code changes new arrays run_queue_hds/tls
132    are created. run_queue_hd is then a short cut (macro) for
133    run_queue_hds[CurrentProc] (see GranSim.h).
134    -- HWL
135 */
136 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
137 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
138 StgTSO *ccalling_threadss[MAX_PROC];
139
140 #else /* !GRAN */
141
142 //@cindex run_queue_hd
143 //@cindex run_queue_tl
144 //@cindex blocked_queue_hd
145 //@cindex blocked_queue_tl
146 StgTSO *run_queue_hd, *run_queue_tl;
147 StgTSO *blocked_queue_hd, *blocked_queue_tl;
148
149 /* Threads suspended in _ccall_GC.
150  * Locks required: sched_mutex.
151  */
152 static StgTSO *suspended_ccalling_threads;
153
154 static void GetRoots(void);
155 static StgTSO *threadStackOverflow(StgTSO *tso);
156 #endif
157
158 /* KH: The following two flags are shared memory locations.  There is no need
159        to lock them, since they are only unset at the end of a scheduler
160        operation.
161 */
162
163 /* flag set by signal handler to precipitate a context switch */
164 //@cindex context_switch
165 nat context_switch;
166
167 /* if this flag is set as well, give up execution */
168 //@cindex interrupted
169 rtsBool interrupted;
170
171 /* Next thread ID to allocate.
172  * Locks required: sched_mutex
173  */
174 //@cindex next_thread_id
175 StgThreadID next_thread_id = 1;
176
177 /*
178  * Pointers to the state of the current thread.
179  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
180  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
181  */
182  
183 /* The smallest stack size that makes any sense is:
184  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
185  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
186  *  + 1                       (the realworld token for an IO thread)
187  *  + 1                       (the closure to enter)
188  *
189  * A thread with this stack will bomb immediately with a stack
190  * overflow, which will increase its stack size.  
191  */
192
193 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
194
195 /* Free capability list.
196  * Locks required: sched_mutex.
197  */
198 #ifdef SMP
199 //@cindex free_capabilities
200 //@cindex n_free_capabilities
201 Capability *free_capabilities; /* Available capabilities for running threads */
202 nat n_free_capabilities;       /* total number of available capabilities */
203 #else
204 //@cindex MainRegTable
205 Capability MainRegTable;       /* for non-SMP, we have one global capability */
206 #endif
207
208 #if defined(GRAN)
209 StgTSO      *CurrentTSOs[MAX_PROC];
210 #else
211 StgTSO      *CurrentTSO;
212 #endif
213
214 rtsBool ready_to_gc;
215
216 /* All our current task ids, saved in case we need to kill them later.
217  */
218 #ifdef SMP
219 //@cindex task_ids
220 task_info *task_ids;
221 #endif
222
223 void            addToBlockedQueue ( StgTSO *tso );
224
225 static void     schedule          ( void );
226        void     interruptStgRts   ( void );
227 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
228
229 #ifdef DEBUG
230 static void sched_belch(char *s, ...);
231 #endif
232
233 #ifdef SMP
234 //@cindex sched_mutex
235 //@cindex term_mutex
236 //@cindex thread_ready_cond
237 //@cindex gc_pending_cond
238 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
239 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
240 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
241 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
242
243 nat await_death;
244 #endif
245
246 #if defined(PAR)
247 StgTSO *LastTSO;
248 rtsTime TimeOfLastYield;
249 #endif
250
251 /*
252  * The thread state for the main thread.
253 // ToDo: check whether not needed any more
254 StgTSO   *MainTSO;
255  */
256
257
258 //@node Prototypes, Main scheduling loop, Variables and Data structures, Main scheduling code
259 //@subsection Prototypes
260
261 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
262 //@subsection Main scheduling loop
263
264 /* ---------------------------------------------------------------------------
265    Main scheduling loop.
266
267    We use round-robin scheduling, each thread returning to the
268    scheduler loop when one of these conditions is detected:
269
270       * out of heap space
271       * timer expires (thread yields)
272       * thread blocks
273       * thread ends
274       * stack overflow
275
276    Locking notes:  we acquire the scheduler lock once at the beginning
277    of the scheduler loop, and release it when
278     
279       * running a thread, or
280       * waiting for work, or
281       * waiting for a GC to complete.
282
283    ------------------------------------------------------------------------ */
284 //@cindex schedule
285 static void
286 schedule( void )
287 {
288   StgTSO *t;
289   Capability *cap;
290   StgThreadReturnCode ret;
291 #if defined(GRAN)
292   rtsEvent *event;
293 #elif defined(PAR)
294   rtsSpark spark;
295   StgTSO *tso;
296   GlobalTaskId pe;
297 #endif
298   
299   ACQUIRE_LOCK(&sched_mutex);
300
301 #if defined(GRAN)
302 # error ToDo: implement GranSim scheduler
303 #elif defined(PAR)
304   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
305
306     if (PendingFetches != END_BF_QUEUE) {
307         processFetches();
308     }
309 #else
310   while (1) {
311 #endif
312
313     /* If we're interrupted (the user pressed ^C, or some other
314      * termination condition occurred), kill all the currently running
315      * threads.
316      */
317     if (interrupted) {
318       IF_DEBUG(scheduler, sched_belch("interrupted"));
319       for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
320         deleteThread(t);
321       }
322       for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
323         deleteThread(t);
324       }
325       run_queue_hd = run_queue_tl = END_TSO_QUEUE;
326       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
327     }
328
329     /* Go through the list of main threads and wake up any
330      * clients whose computations have finished.  ToDo: this
331      * should be done more efficiently without a linear scan
332      * of the main threads list, somehow...
333      */
334 #ifdef SMP
335     { 
336       StgMainThread *m, **prev;
337       prev = &main_threads;
338       for (m = main_threads; m != NULL; m = m->link) {
339         switch (m->tso->whatNext) {
340         case ThreadComplete:
341           if (m->ret) {
342             *(m->ret) = (StgClosure *)m->tso->sp[0];
343           }
344           *prev = m->link;
345           m->stat = Success;
346           pthread_cond_broadcast(&m->wakeup);
347           break;
348         case ThreadKilled:
349           *prev = m->link;
350           if (interrupted) {
351             m->stat = Interrupted;
352           } else {
353             m->stat = Killed;
354           }
355           pthread_cond_broadcast(&m->wakeup);
356           break;
357         default:
358           break;
359         }
360       }
361     }
362 #else
363     /* If our main thread has finished or been killed, return.
364      */
365     {
366       StgMainThread *m = main_threads;
367       if (m->tso->whatNext == ThreadComplete
368           || m->tso->whatNext == ThreadKilled) {
369         main_threads = main_threads->link;
370         if (m->tso->whatNext == ThreadComplete) {
371           /* we finished successfully, fill in the return value */
372           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
373           m->stat = Success;
374           return;
375         } else {
376           if (interrupted) {
377             m->stat = Interrupted;
378           } else {
379             m->stat = Killed;
380           }
381           return;
382         }
383       }
384     }
385 #endif
386
387     /* Top up the run queue from our spark pool.  We try to make the
388      * number of threads in the run queue equal to the number of
389      * free capabilities.
390      */
391 #if defined(SMP)
392     {
393       nat n = n_free_capabilities;
394       StgTSO *tso = run_queue_hd;
395
396       /* Count the run queue */
397       while (n > 0 && tso != END_TSO_QUEUE) {
398         tso = tso->link;
399         n--;
400       }
401
402       for (; n > 0; n--) {
403         StgClosure *spark;
404         spark = findSpark();
405         if (spark == NULL) {
406           break; /* no more sparks in the pool */
407         } else {
408           /* I'd prefer this to be done in activateSpark -- HWL */
409           /* tricky - it needs to hold the scheduler lock and
410            * not try to re-acquire it -- SDM */
411           StgTSO *tso;
412           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
413           pushClosure(tso,spark);
414           PUSH_ON_RUN_QUEUE(tso);
415 #ifdef PAR
416           advisory_thread_count++;
417 #endif
418           
419           IF_DEBUG(scheduler,
420                    sched_belch("turning spark of closure %p into a thread",
421                                (StgClosure *)spark));
422         }
423       }
424       /* We need to wake up the other tasks if we just created some
425        * work for them.
426        */
427       if (n_free_capabilities - n > 1) {
428           pthread_cond_signal(&thread_ready_cond);
429       }
430     }
431 #endif /* SMP */
432
433     /* Check whether any waiting threads need to be woken up.  If the
434      * run queue is empty, and there are no other tasks running, we
435      * can wait indefinitely for something to happen.
436      * ToDo: what if another client comes along & requests another
437      * main thread?
438      */
439     if (blocked_queue_hd != END_TSO_QUEUE) {
440       awaitEvent(
441            (run_queue_hd == END_TSO_QUEUE)
442 #ifdef SMP
443         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
444 #endif
445         );
446     }
447     
448     /* check for signals each time around the scheduler */
449 #ifndef __MINGW32__
450     if (signals_pending()) {
451       start_signal_handlers();
452     }
453 #endif
454
455     /* Detect deadlock: when we have no threads to run, there are
456      * no threads waiting on I/O or sleeping, and all the other
457      * tasks are waiting for work, we must have a deadlock.  Inform
458      * all the main threads.
459      */
460 #ifdef SMP
461     if (blocked_queue_hd == END_TSO_QUEUE
462         && run_queue_hd == END_TSO_QUEUE
463         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
464         ) {
465       StgMainThread *m;
466       for (m = main_threads; m != NULL; m = m->link) {
467           m->ret = NULL;
468           m->stat = Deadlock;
469           pthread_cond_broadcast(&m->wakeup);
470       }
471       main_threads = NULL;
472     }
473 #else /* ! SMP */
474     if (blocked_queue_hd == END_TSO_QUEUE
475         && run_queue_hd == END_TSO_QUEUE) {
476       StgMainThread *m = main_threads;
477       m->ret = NULL;
478       m->stat = Deadlock;
479       main_threads = m->link;
480       return;
481     }
482 #endif
483
484 #ifdef SMP
485     /* If there's a GC pending, don't do anything until it has
486      * completed.
487      */
488     if (ready_to_gc) {
489       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
490       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
491     }
492     
493     /* block until we've got a thread on the run queue and a free
494      * capability.
495      */
496     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
497       IF_DEBUG(scheduler, sched_belch("waiting for work"));
498       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
499       IF_DEBUG(scheduler, sched_belch("work now available"));
500     }
501 #endif
502
503 #if defined(GRAN)
504 # error ToDo: implement GranSim scheduler
505 #elif defined(PAR)
506     /* ToDo: phps merge with spark activation above */
507     /* check whether we have local work and send requests if we have none */
508     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
509       /* :-[  no local threads => look out for local sparks */
510       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
511           (pending_sparks_hd[REQUIRED_POOL] < pending_sparks_tl[REQUIRED_POOL] ||
512            pending_sparks_hd[ADVISORY_POOL] < pending_sparks_tl[ADVISORY_POOL])) {
513         /* 
514          * ToDo: add GC code check that we really have enough heap afterwards!!
515          * Old comment:
516          * If we're here (no runnable threads) and we have pending
517          * sparks, we must have a space problem.  Get enough space
518          * to turn one of those pending sparks into a
519          * thread... 
520          */
521         
522         spark = findSpark();                /* get a spark */
523         if (spark != (rtsSpark) NULL) {
524           tso = activateSpark(spark);       /* turn the spark into a thread */
525           IF_PAR_DEBUG(verbose,
526                        belch("== [%x] schedule: Created TSO %p (%d); %d threads active",
527                              mytid, tso, tso->id, advisory_thread_count));
528
529           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
530             belch("^^ failed to activate spark");
531             goto next_thread;
532           }               /* otherwise fall through & pick-up new tso */
533         } else {
534           IF_PAR_DEBUG(verbose,
535                        belch("^^ no local sparks (spark pool contains only NFs: %d)", 
536                              spark_queue_len(ADVISORY_POOL)));
537           goto next_thread;
538         }
539       } else  
540       /* =8-[  no local sparks => look for work on other PEs */
541       {
542         /*
543          * We really have absolutely no work.  Send out a fish
544          * (there may be some out there already), and wait for
545          * something to arrive.  We clearly can't run any threads
546          * until a SCHEDULE or RESUME arrives, and so that's what
547          * we're hoping to see.  (Of course, we still have to
548          * respond to other types of messages.)
549          */
550         if (//!fishing &&  
551             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
552           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
553           /* fishing set in sendFish, processFish;
554              avoid flooding system with fishes via delay */
555           pe = choosePE();
556           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
557                    NEW_FISH_HUNGER);
558         }
559         
560         processMessages();
561         goto next_thread;
562         // ReSchedule(0);
563       }
564     } else if (PacketsWaiting()) {  /* Look for incoming messages */
565       processMessages();
566     }
567
568     /* Now we are sure that we have some work available */
569     ASSERT(run_queue_hd != END_TSO_QUEUE);
570     /* Take a thread from the run queue, if we have work */
571     t = take_off_run_queue(END_TSO_QUEUE);
572
573     /* ToDo: write something to the log-file
574     if (RTSflags.ParFlags.granSimStats && !sameThread)
575         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
576     */
577
578     CurrentTSO = t;
579
580     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; lim=%x)", 
581                               spark_queue_len(ADVISORY_POOL), CURRENT_PROC,
582                               pending_sparks_hd[ADVISORY_POOL], 
583                               pending_sparks_tl[ADVISORY_POOL], 
584                               pending_sparks_lim[ADVISORY_POOL]));
585
586     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
587                               run_queue_len(), CURRENT_PROC,
588                               run_queue_hd, run_queue_tl));
589
590     if (t != LastTSO) {
591       /* 
592          we are running a different TSO, so write a schedule event to log file
593          NB: If we use fair scheduling we also have to write  a deschedule 
594              event for LastTSO; with unfair scheduling we know that the
595              previous tso has blocked whenever we switch to another tso, so
596              we don't need it in GUM for now
597       */
598       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
599                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
600       
601     }
602
603 #else /* !GRAN && !PAR */
604   
605     /* grab a thread from the run queue
606      */
607     t = POP_RUN_QUEUE();
608     IF_DEBUG(sanity,checkTSO(t));
609
610 #endif
611     
612     /* grab a capability
613      */
614 #ifdef SMP
615     cap = free_capabilities;
616     free_capabilities = cap->link;
617     n_free_capabilities--;
618 #else
619     cap = &MainRegTable;
620 #endif
621     
622     cap->rCurrentTSO = t;
623     
624     /* set the context_switch flag
625      */
626     if (run_queue_hd == END_TSO_QUEUE)
627       context_switch = 0;
628     else
629       context_switch = 1;
630
631     RELEASE_LOCK(&sched_mutex);
632     
633     IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
634
635     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
636     /* Run the current thread 
637      */
638     switch (cap->rCurrentTSO->whatNext) {
639     case ThreadKilled:
640     case ThreadComplete:
641       /* Thread already finished, return to scheduler. */
642       ret = ThreadFinished;
643       break;
644     case ThreadEnterGHC:
645       ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
646       break;
647     case ThreadRunGHC:
648       ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
649       break;
650     case ThreadEnterHugs:
651 #ifdef INTERPRETER
652       {
653          StgClosure* c;
654          IF_DEBUG(scheduler,sched_belch("entering Hugs"));
655          c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
656          cap->rCurrentTSO->sp += 1;
657          ret = enter(cap,c);
658          break;
659       }
660 #else
661       barf("Panic: entered a BCO but no bytecode interpreter in this build");
662 #endif
663     default:
664       barf("schedule: invalid whatNext field");
665     }
666     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
667     
668     /* Costs for the scheduler are assigned to CCS_SYSTEM */
669 #ifdef PROFILING
670     CCCS = CCS_SYSTEM;
671 #endif
672     
673     ACQUIRE_LOCK(&sched_mutex);
674
675 #ifdef SMP
676     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
677 #else
678     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
679 #endif
680     t = cap->rCurrentTSO;
681     
682     switch (ret) {
683     case HeapOverflow:
684       /* make all the running tasks block on a condition variable,
685        * maybe set context_switch and wait till they all pile in,
686        * then have them wait on a GC condition variable.
687        */
688       IF_DEBUG(scheduler,belch("thread %ld stopped: HeapOverflow", t->id));
689       threadPaused(t);
690       
691       ready_to_gc = rtsTrue;
692       context_switch = 1;               /* stop other threads ASAP */
693       PUSH_ON_RUN_QUEUE(t);
694       break;
695       
696     case StackOverflow:
697       /* just adjust the stack for this thread, then pop it back
698        * on the run queue.
699        */
700       IF_DEBUG(scheduler,belch("thread %ld stopped, StackOverflow", t->id));
701       threadPaused(t);
702       { 
703         StgMainThread *m;
704         /* enlarge the stack */
705         StgTSO *new_t = threadStackOverflow(t);
706         
707         /* This TSO has moved, so update any pointers to it from the
708          * main thread stack.  It better not be on any other queues...
709          * (it shouldn't be).
710          */
711         for (m = main_threads; m != NULL; m = m->link) {
712           if (m->tso == t) {
713             m->tso = new_t;
714           }
715         }
716         threadPaused(new_t);
717         ready_to_gc = rtsTrue;
718         context_switch = 1;
719         PUSH_ON_RUN_QUEUE(new_t);
720       }
721       break;
722
723     case ThreadYielding:
724 #if defined(GRAN)
725       IF_DEBUG(gran, 
726                DumpGranEvent(GR_DESCHEDULE, t));
727       globalGranStats.tot_yields++;
728 #elif defined(PAR)
729       IF_DEBUG(par, 
730                DumpGranEvent(GR_DESCHEDULE, t));
731 #endif
732       /* put the thread back on the run queue.  Then, if we're ready to
733        * GC, check whether this is the last task to stop.  If so, wake
734        * up the GC thread.  getThread will block during a GC until the
735        * GC is finished.
736        */
737       IF_DEBUG(scheduler,
738                if (t->whatNext == ThreadEnterHugs) {
739                  /* ToDo: or maybe a timer expired when we were in Hugs?
740                   * or maybe someone hit ctrl-C
741                   */
742                  belch("thread %ld stopped to switch to Hugs", t->id);
743                } else {
744                  belch("thread %ld stopped, yielding", t->id);
745                }
746                );
747       threadPaused(t);
748       APPEND_TO_RUN_QUEUE(t);
749       break;
750       
751     case ThreadBlocked:
752 #if defined(GRAN)
753 # error ToDo: implement GranSim scheduler
754 #elif defined(PAR)
755       IF_DEBUG(par, 
756                DumpGranEvent(GR_DESCHEDULE, t)); 
757 #else
758 #endif
759       /* don't need to do anything.  Either the thread is blocked on
760        * I/O, in which case we'll have called addToBlockedQueue
761        * previously, or it's blocked on an MVar or Blackhole, in which
762        * case it'll be on the relevant queue already.
763        */
764       IF_DEBUG(scheduler,
765                fprintf(stderr, "thread %d stopped, ", t->id);
766                printThreadBlockage(t);
767                fprintf(stderr, "\n"));
768       threadPaused(t);
769       break;
770       
771     case ThreadFinished:
772       /* Need to check whether this was a main thread, and if so, signal
773        * the task that started it with the return value.  If we have no
774        * more main threads, we probably need to stop all the tasks until
775        * we get a new one.
776        */
777       IF_DEBUG(scheduler,belch("thread %ld finished", t->id));
778       t->whatNext = ThreadComplete;
779 #if defined(GRAN)
780       // ToDo: endThread(t, CurrentProc); // clean-up the thread
781 #elif defined(PAR)
782       advisory_thread_count--;
783       if (RtsFlags.ParFlags.ParStats.Full) 
784         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
785 #endif
786       break;
787       
788     default:
789       barf("doneThread: invalid thread return code");
790     }
791     
792 #ifdef SMP
793     cap->link = free_capabilities;
794     free_capabilities = cap;
795     n_free_capabilities++;
796 #endif
797
798 #ifdef SMP
799     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
800 #else
801     if (ready_to_gc) 
802 #endif
803       {
804       /* everybody back, start the GC.
805        * Could do it in this thread, or signal a condition var
806        * to do it in another thread.  Either way, we need to
807        * broadcast on gc_pending_cond afterward.
808        */
809 #ifdef SMP
810       IF_DEBUG(scheduler,sched_belch("doing GC"));
811 #endif
812       GarbageCollect(GetRoots);
813       ready_to_gc = rtsFalse;
814 #ifdef SMP
815       pthread_cond_broadcast(&gc_pending_cond);
816 #endif
817     }
818 #if defined(GRAN)
819   next_thread:
820     IF_GRAN_DEBUG(unused,
821                   print_eventq(EventHd));
822
823     event = get_next_event();
824
825 #elif defined(PAR)
826   next_thread:
827     /* ToDo: wait for next message to arrive rather than busy wait */
828
829 #else /* GRAN */
830   /* not any more
831   next_thread:
832     t = take_off_run_queue(END_TSO_QUEUE);
833   */
834 #endif /* GRAN */
835   } /* end of while(1) */
836 }
837
838 /* A hack for Hugs concurrency support.  Needs sanitisation (?) */
839 void deleteAllThreads ( void )
840 {
841   StgTSO* t;
842   IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
843   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
844     deleteThread(t);
845   }
846   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
847     deleteThread(t);
848   }
849   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
850   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
851 }
852
853 /* startThread and  insertThread are now in GranSim.c -- HWL */
854
855 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
856 //@subsection Suspend and Resume
857
858 /* ---------------------------------------------------------------------------
859  * Suspending & resuming Haskell threads.
860  * 
861  * When making a "safe" call to C (aka _ccall_GC), the task gives back
862  * its capability before calling the C function.  This allows another
863  * task to pick up the capability and carry on running Haskell
864  * threads.  It also means that if the C call blocks, it won't lock
865  * the whole system.
866  *
867  * The Haskell thread making the C call is put to sleep for the
868  * duration of the call, on the susepended_ccalling_threads queue.  We
869  * give out a token to the task, which it can use to resume the thread
870  * on return from the C function.
871  * ------------------------------------------------------------------------- */
872    
873 StgInt
874 suspendThread( Capability *cap )
875 {
876   nat tok;
877
878   ACQUIRE_LOCK(&sched_mutex);
879
880   IF_DEBUG(scheduler,
881            sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
882
883   threadPaused(cap->rCurrentTSO);
884   cap->rCurrentTSO->link = suspended_ccalling_threads;
885   suspended_ccalling_threads = cap->rCurrentTSO;
886
887   /* Use the thread ID as the token; it should be unique */
888   tok = cap->rCurrentTSO->id;
889
890 #ifdef SMP
891   cap->link = free_capabilities;
892   free_capabilities = cap;
893   n_free_capabilities++;
894 #endif
895
896   RELEASE_LOCK(&sched_mutex);
897   return tok; 
898 }
899
900 Capability *
901 resumeThread( StgInt tok )
902 {
903   StgTSO *tso, **prev;
904   Capability *cap;
905
906   ACQUIRE_LOCK(&sched_mutex);
907
908   prev = &suspended_ccalling_threads;
909   for (tso = suspended_ccalling_threads; 
910        tso != END_TSO_QUEUE; 
911        prev = &tso->link, tso = tso->link) {
912     if (tso->id == (StgThreadID)tok) {
913       *prev = tso->link;
914       break;
915     }
916   }
917   if (tso == END_TSO_QUEUE) {
918     barf("resumeThread: thread not found");
919   }
920
921 #ifdef SMP
922   while (free_capabilities == NULL) {
923     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
924     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
925     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
926   }
927   cap = free_capabilities;
928   free_capabilities = cap->link;
929   n_free_capabilities--;
930 #else  
931   cap = &MainRegTable;
932 #endif
933
934   cap->rCurrentTSO = tso;
935
936   RELEASE_LOCK(&sched_mutex);
937   return cap;
938 }
939
940
941 /* ---------------------------------------------------------------------------
942  * Static functions
943  * ------------------------------------------------------------------------ */
944 static void unblockThread(StgTSO *tso);
945
946 /* ---------------------------------------------------------------------------
947  * Comparing Thread ids.
948  *
949  * This is used from STG land in the implementation of the
950  * instances of Eq/Ord for ThreadIds.
951  * ------------------------------------------------------------------------ */
952
953 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
954
955   StgThreadID id1 = tso1->id; 
956   StgThreadID id2 = tso2->id;
957  
958   if (id1 < id2) return (-1);
959   if (id1 > id2) return 1;
960   return 0;
961 }
962
963 /* ---------------------------------------------------------------------------
964    Create a new thread.
965
966    The new thread starts with the given stack size.  Before the
967    scheduler can run, however, this thread needs to have a closure
968    (and possibly some arguments) pushed on its stack.  See
969    pushClosure() in Schedule.h.
970
971    createGenThread() and createIOThread() (in SchedAPI.h) are
972    convenient packaged versions of this function.
973    ------------------------------------------------------------------------ */
974 //@cindex createThread
975 #if defined(GRAN)
976 /* currently pri (priority) is only used in a GRAN setup -- HWL */
977 StgTSO *
978 createThread(nat stack_size, StgInt pri)
979 {
980   return createThread_(stack_size, rtsFalse, pri);
981 }
982
983 static StgTSO *
984 createThread_(nat size, rtsBool have_lock, StgInt pri)
985 {
986 #else
987 StgTSO *
988 createThread(nat stack_size)
989 {
990   return createThread_(stack_size, rtsFalse);
991 }
992
993 static StgTSO *
994 createThread_(nat size, rtsBool have_lock)
995 {
996 #endif
997     StgTSO *tso;
998     nat stack_size;
999
1000     /* First check whether we should create a thread at all */
1001 #if defined(PAR)
1002   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1003   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1004     threadsIgnored++;
1005     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1006           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1007     return END_TSO_QUEUE;
1008   }
1009   threadsCreated++;
1010 #endif
1011
1012 #if defined(GRAN)
1013   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1014 #endif
1015
1016   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1017
1018   /* catch ridiculously small stack sizes */
1019   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1020     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1021   }
1022
1023   tso = (StgTSO *)allocate(size);
1024   TICK_ALLOC_TSO(size-sizeofW(StgTSO),0);
1025   
1026   stack_size = size - TSO_STRUCT_SIZEW;
1027
1028   // Hmm, this CCS_MAIN is not protected by a PROFILING cpp var;
1029   SET_HDR(tso, &TSO_info, CCS_MAIN);
1030 #if defined(GRAN)
1031   SET_GRAN_HDR(tso, ThisPE);
1032 #endif
1033   tso->whatNext     = ThreadEnterGHC;
1034
1035   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1036          protect the increment operation on next_thread_id.
1037          In future, we could use an atomic increment instead.
1038   */
1039   
1040   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1041   tso->id = next_thread_id++; 
1042   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1043
1044   tso->why_blocked  = NotBlocked;
1045   tso->blocked_exceptions = NULL;
1046
1047   tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1048   tso->stack_size   = stack_size;
1049   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1050                               - TSO_STRUCT_SIZEW;
1051   tso->sp           = (P_)&(tso->stack) + stack_size;
1052
1053 #ifdef PROFILING
1054   tso->prof.CCCS = CCS_MAIN;
1055 #endif
1056
1057   /* put a stop frame on the stack */
1058   tso->sp -= sizeofW(StgStopFrame);
1059   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
1060   tso->su = (StgUpdateFrame*)tso->sp;
1061
1062   IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words", 
1063                            tso->id, tso, tso->stack_size));
1064
1065   // ToDo: check this
1066 #if defined(GRAN)
1067   tso->link = END_TSO_QUEUE;
1068   /* uses more flexible routine in GranSim */
1069   insertThread(tso, CurrentProc);
1070 #else
1071   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1072      from its creation
1073   */
1074 #endif
1075
1076 #if defined(GRAN)
1077   tso->gran.pri = pri;
1078   tso->gran.magic = TSO_MAGIC; // debugging only
1079   tso->gran.sparkname   = 0;
1080   tso->gran.startedat   = CURRENT_TIME; 
1081   tso->gran.exported    = 0;
1082   tso->gran.basicblocks = 0;
1083   tso->gran.allocs      = 0;
1084   tso->gran.exectime    = 0;
1085   tso->gran.fetchtime   = 0;
1086   tso->gran.fetchcount  = 0;
1087   tso->gran.blocktime   = 0;
1088   tso->gran.blockcount  = 0;
1089   tso->gran.blockedat   = 0;
1090   tso->gran.globalsparks = 0;
1091   tso->gran.localsparks  = 0;
1092   if (RtsFlags.GranFlags.Light)
1093     tso->gran.clock  = Now; /* local clock */
1094   else
1095     tso->gran.clock  = 0;
1096
1097   IF_DEBUG(gran,printTSO(tso));
1098 #elif defined(PAR)
1099   tso->par.sparkname   = 0;
1100   tso->par.startedat   = CURRENT_TIME; 
1101   tso->par.exported    = 0;
1102   tso->par.basicblocks = 0;
1103   tso->par.allocs      = 0;
1104   tso->par.exectime    = 0;
1105   tso->par.fetchtime   = 0;
1106   tso->par.fetchcount  = 0;
1107   tso->par.blocktime   = 0;
1108   tso->par.blockcount  = 0;
1109   tso->par.blockedat   = 0;
1110   tso->par.globalsparks = 0;
1111   tso->par.localsparks  = 0;
1112 #endif
1113
1114 #if defined(GRAN)
1115   globalGranStats.tot_threads_created++;
1116   globalGranStats.threads_created_on_PE[CurrentProc]++;
1117   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1118   globalGranStats.tot_sq_probes++;
1119 #endif 
1120
1121   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1122                                  tso->id, tso->stack_size));
1123   return tso;
1124 }
1125
1126 /* ---------------------------------------------------------------------------
1127  * scheduleThread()
1128  *
1129  * scheduleThread puts a thread on the head of the runnable queue.
1130  * This will usually be done immediately after a thread is created.
1131  * The caller of scheduleThread must create the thread using e.g.
1132  * createThread and push an appropriate closure
1133  * on this thread's stack before the scheduler is invoked.
1134  * ------------------------------------------------------------------------ */
1135
1136 void
1137 scheduleThread(StgTSO *tso)
1138 {
1139   ACQUIRE_LOCK(&sched_mutex);
1140
1141   /* Put the new thread on the head of the runnable queue.  The caller
1142    * better push an appropriate closure on this thread's stack
1143    * beforehand.  In the SMP case, the thread may start running as
1144    * soon as we release the scheduler lock below.
1145    */
1146   PUSH_ON_RUN_QUEUE(tso);
1147   THREAD_RUNNABLE();
1148
1149   IF_DEBUG(scheduler,printTSO(tso));
1150   RELEASE_LOCK(&sched_mutex);
1151 }
1152
1153 /* ---------------------------------------------------------------------------
1154  * startTasks()
1155  *
1156  * Start up Posix threads to run each of the scheduler tasks.
1157  * I believe the task ids are not needed in the system as defined.
1158  *  KH @ 25/10/99
1159  * ------------------------------------------------------------------------ */
1160
1161 #ifdef SMP
1162 static void *
1163 taskStart( void *arg STG_UNUSED )
1164 {
1165   schedule();
1166   return NULL;
1167 }
1168 #endif
1169
1170 /* ---------------------------------------------------------------------------
1171  * initScheduler()
1172  *
1173  * Initialise the scheduler.  This resets all the queues - if the
1174  * queues contained any threads, they'll be garbage collected at the
1175  * next pass.
1176  *
1177  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1178  * ------------------------------------------------------------------------ */
1179
1180 #ifdef SMP
1181 static void
1182 term_handler(int sig STG_UNUSED)
1183 {
1184   stat_workerStop();
1185   ACQUIRE_LOCK(&term_mutex);
1186   await_death--;
1187   RELEASE_LOCK(&term_mutex);
1188   pthread_exit(NULL);
1189 }
1190 #endif
1191
1192 //@cindex initScheduler
1193 void 
1194 initScheduler(void)
1195 {
1196 #if defined(GRAN)
1197   nat i;
1198
1199   for (i=0; i<=MAX_PROC; i++) {
1200     run_queue_hds[i]      = END_TSO_QUEUE;
1201     run_queue_tls[i]      = END_TSO_QUEUE;
1202     blocked_queue_hds[i]  = END_TSO_QUEUE;
1203     blocked_queue_tls[i]  = END_TSO_QUEUE;
1204     ccalling_threadss[i]  = END_TSO_QUEUE;
1205   }
1206 #else
1207   run_queue_hd      = END_TSO_QUEUE;
1208   run_queue_tl      = END_TSO_QUEUE;
1209   blocked_queue_hd  = END_TSO_QUEUE;
1210   blocked_queue_tl  = END_TSO_QUEUE;
1211 #endif 
1212
1213   suspended_ccalling_threads  = END_TSO_QUEUE;
1214
1215   main_threads = NULL;
1216
1217   context_switch = 0;
1218   interrupted    = 0;
1219
1220   enteredCAFs = END_CAF_LIST;
1221
1222   /* Install the SIGHUP handler */
1223 #ifdef SMP
1224   {
1225     struct sigaction action,oact;
1226
1227     action.sa_handler = term_handler;
1228     sigemptyset(&action.sa_mask);
1229     action.sa_flags = 0;
1230     if (sigaction(SIGTERM, &action, &oact) != 0) {
1231       barf("can't install TERM handler");
1232     }
1233   }
1234 #endif
1235
1236 #ifdef SMP
1237   /* Allocate N Capabilities */
1238   {
1239     nat i;
1240     Capability *cap, *prev;
1241     cap  = NULL;
1242     prev = NULL;
1243     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1244       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1245       cap->link = prev;
1246       prev = cap;
1247     }
1248     free_capabilities = cap;
1249     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1250   }
1251   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1252                              n_free_capabilities););
1253 #endif
1254
1255 #if defined(SMP) || defined(PAR)
1256   initSparkPools();
1257 #endif
1258 }
1259
1260 #ifdef SMP
1261 void
1262 startTasks( void )
1263 {
1264   nat i;
1265   int r;
1266   pthread_t tid;
1267   
1268   /* make some space for saving all the thread ids */
1269   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1270                             "initScheduler:task_ids");
1271   
1272   /* and create all the threads */
1273   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1274     r = pthread_create(&tid,NULL,taskStart,NULL);
1275     if (r != 0) {
1276       barf("startTasks: Can't create new Posix thread");
1277     }
1278     task_ids[i].id = tid;
1279     task_ids[i].mut_time = 0.0;
1280     task_ids[i].mut_etime = 0.0;
1281     task_ids[i].gc_time = 0.0;
1282     task_ids[i].gc_etime = 0.0;
1283     task_ids[i].elapsedtimestart = elapsedtime();
1284     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1285   }
1286 }
1287 #endif
1288
1289 void
1290 exitScheduler( void )
1291 {
1292 #ifdef SMP
1293   nat i;
1294
1295   /* Don't want to use pthread_cancel, since we'd have to install
1296    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1297    * all our locks.
1298    */
1299 #if 0
1300   /* Cancel all our tasks */
1301   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1302     pthread_cancel(task_ids[i].id);
1303   }
1304   
1305   /* Wait for all the tasks to terminate */
1306   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1307     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1308                                task_ids[i].id));
1309     pthread_join(task_ids[i].id, NULL);
1310   }
1311 #endif
1312
1313   /* Send 'em all a SIGHUP.  That should shut 'em up.
1314    */
1315   await_death = RtsFlags.ParFlags.nNodes;
1316   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1317     pthread_kill(task_ids[i].id,SIGTERM);
1318   }
1319   while (await_death > 0) {
1320     sched_yield();
1321   }
1322 #endif
1323 }
1324
1325 /* -----------------------------------------------------------------------------
1326    Managing the per-task allocation areas.
1327    
1328    Each capability comes with an allocation area.  These are
1329    fixed-length block lists into which allocation can be done.
1330
1331    ToDo: no support for two-space collection at the moment???
1332    -------------------------------------------------------------------------- */
1333
1334 /* -----------------------------------------------------------------------------
1335  * waitThread is the external interface for running a new computataion
1336  * and waiting for the result.
1337  *
1338  * In the non-SMP case, we create a new main thread, push it on the 
1339  * main-thread stack, and invoke the scheduler to run it.  The
1340  * scheduler will return when the top main thread on the stack has
1341  * completed or died, and fill in the necessary fields of the
1342  * main_thread structure.
1343  *
1344  * In the SMP case, we create a main thread as before, but we then
1345  * create a new condition variable and sleep on it.  When our new
1346  * main thread has completed, we'll be woken up and the status/result
1347  * will be in the main_thread struct.
1348  * -------------------------------------------------------------------------- */
1349
1350 SchedulerStatus
1351 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1352 {
1353   StgMainThread *m;
1354   SchedulerStatus stat;
1355
1356   ACQUIRE_LOCK(&sched_mutex);
1357   
1358   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1359
1360   m->tso = tso;
1361   m->ret = ret;
1362   m->stat = NoStatus;
1363 #ifdef SMP
1364   pthread_cond_init(&m->wakeup, NULL);
1365 #endif
1366
1367   m->link = main_threads;
1368   main_threads = m;
1369
1370   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1371                               m->tso->id));
1372
1373 #ifdef SMP
1374   do {
1375     pthread_cond_wait(&m->wakeup, &sched_mutex);
1376   } while (m->stat == NoStatus);
1377 #else
1378   schedule();
1379   ASSERT(m->stat != NoStatus);
1380 #endif
1381
1382   stat = m->stat;
1383
1384 #ifdef SMP
1385   pthread_cond_destroy(&m->wakeup);
1386 #endif
1387
1388   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1389                               m->tso->id));
1390   free(m);
1391
1392   RELEASE_LOCK(&sched_mutex);
1393
1394   return stat;
1395 }
1396
1397 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1398 //@subsection Run queue code 
1399
1400 #if 0
1401 /* 
1402    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1403        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1404        implicit global variable that has to be correct when calling these
1405        fcts -- HWL 
1406 */
1407
1408 /* Put the new thread on the head of the runnable queue.
1409  * The caller of createThread better push an appropriate closure
1410  * on this thread's stack before the scheduler is invoked.
1411  */
1412 static /* inline */ void
1413 add_to_run_queue(tso)
1414 StgTSO* tso; 
1415 {
1416   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1417   tso->link = run_queue_hd;
1418   run_queue_hd = tso;
1419   if (run_queue_tl == END_TSO_QUEUE) {
1420     run_queue_tl = tso;
1421   }
1422 }
1423
1424 /* Put the new thread at the end of the runnable queue. */
1425 static /* inline */ void
1426 push_on_run_queue(tso)
1427 StgTSO* tso; 
1428 {
1429   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1430   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1431   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1432   if (run_queue_hd == END_TSO_QUEUE) {
1433     run_queue_hd = tso;
1434   } else {
1435     run_queue_tl->link = tso;
1436   }
1437   run_queue_tl = tso;
1438 }
1439
1440 /* 
1441    Should be inlined because it's used very often in schedule.  The tso
1442    argument is actually only needed in GranSim, where we want to have the
1443    possibility to schedule *any* TSO on the run queue, irrespective of the
1444    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1445    the run queue and dequeue the tso, adjusting the links in the queue. 
1446 */
1447 //@cindex take_off_run_queue
1448 static /* inline */ StgTSO*
1449 take_off_run_queue(StgTSO *tso) {
1450   StgTSO *t, *prev;
1451
1452   /* 
1453      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1454
1455      if tso is specified, unlink that tso from the run_queue (doesn't have
1456      to be at the beginning of the queue); GranSim only 
1457   */
1458   if (tso!=END_TSO_QUEUE) {
1459     /* find tso in queue */
1460     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1461          t!=END_TSO_QUEUE && t!=tso;
1462          prev=t, t=t->link) 
1463       /* nothing */ ;
1464     ASSERT(t==tso);
1465     /* now actually dequeue the tso */
1466     if (prev!=END_TSO_QUEUE) {
1467       ASSERT(run_queue_hd!=t);
1468       prev->link = t->link;
1469     } else {
1470       /* t is at beginning of thread queue */
1471       ASSERT(run_queue_hd==t);
1472       run_queue_hd = t->link;
1473     }
1474     /* t is at end of thread queue */
1475     if (t->link==END_TSO_QUEUE) {
1476       ASSERT(t==run_queue_tl);
1477       run_queue_tl = prev;
1478     } else {
1479       ASSERT(run_queue_tl!=t);
1480     }
1481     t->link = END_TSO_QUEUE;
1482   } else {
1483     /* take tso from the beginning of the queue; std concurrent code */
1484     t = run_queue_hd;
1485     if (t != END_TSO_QUEUE) {
1486       run_queue_hd = t->link;
1487       t->link = END_TSO_QUEUE;
1488       if (run_queue_hd == END_TSO_QUEUE) {
1489         run_queue_tl = END_TSO_QUEUE;
1490       }
1491     }
1492   }
1493   return t;
1494 }
1495
1496 #endif /* 0 */
1497
1498 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1499 //@subsection Garbage Collextion Routines
1500
1501 /* ---------------------------------------------------------------------------
1502    Where are the roots that we know about?
1503
1504         - all the threads on the runnable queue
1505         - all the threads on the blocked queue
1506         - all the thread currently executing a _ccall_GC
1507         - all the "main threads"
1508      
1509    ------------------------------------------------------------------------ */
1510
1511 /* This has to be protected either by the scheduler monitor, or by the
1512         garbage collection monitor (probably the latter).
1513         KH @ 25/10/99
1514 */
1515
1516 static void GetRoots(void)
1517 {
1518   StgMainThread *m;
1519
1520 #if defined(GRAN)
1521   {
1522     nat i;
1523     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1524       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1525         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1526       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1527         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1528       
1529       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1530         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1531       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1532         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1533       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1534         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1535     }
1536   }
1537
1538   markEventQueue();
1539
1540 #else /* !GRAN */
1541   run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1542   run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1543
1544   blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1545   blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1546 #endif 
1547
1548   for (m = main_threads; m != NULL; m = m->link) {
1549     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1550   }
1551   suspended_ccalling_threads = 
1552     (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1553
1554 #if defined(SMP) || defined(PAR) || defined(GRAN)
1555   markSparkQueue();
1556 #endif
1557 }
1558
1559 /* -----------------------------------------------------------------------------
1560    performGC
1561
1562    This is the interface to the garbage collector from Haskell land.
1563    We provide this so that external C code can allocate and garbage
1564    collect when called from Haskell via _ccall_GC.
1565
1566    It might be useful to provide an interface whereby the programmer
1567    can specify more roots (ToDo).
1568    
1569    This needs to be protected by the GC condition variable above.  KH.
1570    -------------------------------------------------------------------------- */
1571
1572 void (*extra_roots)(void);
1573
1574 void
1575 performGC(void)
1576 {
1577   GarbageCollect(GetRoots);
1578 }
1579
1580 static void
1581 AllRoots(void)
1582 {
1583   GetRoots();                   /* the scheduler's roots */
1584   extra_roots();                /* the user's roots */
1585 }
1586
1587 void
1588 performGCWithRoots(void (*get_roots)(void))
1589 {
1590   extra_roots = get_roots;
1591
1592   GarbageCollect(AllRoots);
1593 }
1594
1595 /* -----------------------------------------------------------------------------
1596    Stack overflow
1597
1598    If the thread has reached its maximum stack size, then raise the
1599    StackOverflow exception in the offending thread.  Otherwise
1600    relocate the TSO into a larger chunk of memory and adjust its stack
1601    size appropriately.
1602    -------------------------------------------------------------------------- */
1603
1604 static StgTSO *
1605 threadStackOverflow(StgTSO *tso)
1606 {
1607   nat new_stack_size, new_tso_size, diff, stack_words;
1608   StgPtr new_sp;
1609   StgTSO *dest;
1610
1611   IF_DEBUG(sanity,checkTSO(tso));
1612   if (tso->stack_size >= tso->max_stack_size) {
1613 #if 0
1614     /* If we're debugging, just print out the top of the stack */
1615     printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
1616                                      tso->sp+64));
1617 #endif
1618 #ifdef INTERPRETER
1619     fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
1620     exit(1);
1621 #else
1622     /* Send this thread the StackOverflow exception */
1623     raiseAsync(tso, (StgClosure *)&stackOverflow_closure);
1624 #endif
1625     return tso;
1626   }
1627
1628   /* Try to double the current stack size.  If that takes us over the
1629    * maximum stack size for this thread, then use the maximum instead.
1630    * Finally round up so the TSO ends up as a whole number of blocks.
1631    */
1632   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
1633   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
1634                                        TSO_STRUCT_SIZE)/sizeof(W_);
1635   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
1636   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
1637
1638   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
1639
1640   dest = (StgTSO *)allocate(new_tso_size);
1641   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
1642
1643   /* copy the TSO block and the old stack into the new area */
1644   memcpy(dest,tso,TSO_STRUCT_SIZE);
1645   stack_words = tso->stack + tso->stack_size - tso->sp;
1646   new_sp = (P_)dest + new_tso_size - stack_words;
1647   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
1648
1649   /* relocate the stack pointers... */
1650   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
1651   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
1652   dest->sp    = new_sp;
1653   dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
1654   dest->stack_size = new_stack_size;
1655         
1656   /* and relocate the update frame list */
1657   relocate_TSO(tso, dest);
1658
1659   /* Mark the old TSO as relocated.  We have to check for relocated
1660    * TSOs in the garbage collector and any primops that deal with TSOs.
1661    *
1662    * It's important to set the sp and su values to just beyond the end
1663    * of the stack, so we don't attempt to scavenge any part of the
1664    * dead TSO's stack.
1665    */
1666   tso->whatNext = ThreadRelocated;
1667   tso->link = dest;
1668   tso->sp = (P_)&(tso->stack[tso->stack_size]);
1669   tso->su = (StgUpdateFrame *)tso->sp;
1670   tso->why_blocked = NotBlocked;
1671   dest->mut_link = NULL;
1672
1673   IF_DEBUG(sanity,checkTSO(tso));
1674 #if 0
1675   IF_DEBUG(scheduler,printTSO(dest));
1676 #endif
1677
1678   return dest;
1679 }
1680
1681 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
1682 //@subsection Blocking Queue Routines
1683
1684 /* ---------------------------------------------------------------------------
1685    Wake up a queue that was blocked on some resource.
1686    ------------------------------------------------------------------------ */
1687
1688 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
1689
1690 #if defined(GRAN)
1691 static inline void
1692 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
1693 {
1694 }
1695 #elif defined(PAR)
1696 static inline void
1697 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
1698 {
1699   /* write RESUME events to log file and
1700      update blocked and fetch time (depending on type of the orig closure) */
1701   if (RtsFlags.ParFlags.ParStats.Full) {
1702     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
1703                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
1704                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
1705
1706     switch (get_itbl(node)->type) {
1707         case FETCH_ME_BQ:
1708           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
1709           break;
1710         case RBH:
1711         case FETCH_ME:
1712         case BLACKHOLE_BQ:
1713           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
1714           break;
1715         default:
1716           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
1717         }
1718       }
1719 }
1720 #endif
1721
1722 #if defined(GRAN)
1723 static StgBlockingQueueElement *
1724 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
1725 {
1726     StgBlockingQueueElement *next;
1727     PEs node_loc, tso_loc;
1728
1729     node_loc = where_is(node); // should be lifted out of loop
1730     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
1731     tso_loc = where_is(tso);
1732     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
1733       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
1734       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
1735       bq_processing_time += RtsFlags.GranFlags.Costs.lunblocktime;
1736       // insertThread(tso, node_loc);
1737       new_event(tso_loc, tso_loc,
1738                 CurrentTime[CurrentProc]+bq_processing_time,
1739                 ResumeThread,
1740                 tso, node, (rtsSpark*)NULL);
1741       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
1742       // len_local++;
1743       // len++;
1744     } else { // TSO is remote (actually should be FMBQ)
1745       bq_processing_time += RtsFlags.GranFlags.Costs.mpacktime;
1746       bq_processing_time += RtsFlags.GranFlags.Costs.gunblocktime;
1747       new_event(tso_loc, CurrentProc, 
1748                 CurrentTime[CurrentProc]+bq_processing_time+
1749                 RtsFlags.GranFlags.Costs.latency,
1750                 UnblockThread,
1751                 tso, node, (rtsSpark*)NULL);
1752       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
1753       bq_processing_time += RtsFlags.GranFlags.Costs.mtidytime;
1754       // len++;
1755     }      
1756     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
1757     IF_GRAN_DEBUG(bq,
1758                   fprintf(stderr," %s TSO %d (%p) [PE %d] (blocked_on=%p) (next=%p) ,",
1759                           (node_loc==tso_loc ? "Local" : "Global"), 
1760                           tso->id, tso, CurrentProc, tso->blocked_on, tso->link))
1761     tso->blocked_on = NULL;
1762     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
1763                              tso->id, tso));
1764   }
1765
1766   /* if this is the BQ of an RBH, we have to put back the info ripped out of
1767      the closure to make room for the anchor of the BQ */
1768   if (next!=END_BQ_QUEUE) {
1769     ASSERT(get_itbl(node)->type == RBH && get_itbl(next)->type == CONSTR);
1770     /*
1771     ASSERT((info_ptr==&RBH_Save_0_info) ||
1772            (info_ptr==&RBH_Save_1_info) ||
1773            (info_ptr==&RBH_Save_2_info));
1774     */
1775     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
1776     ((StgRBH *)node)->blocking_queue = ((StgRBHSave *)next)->payload[0];
1777     ((StgRBH *)node)->mut_link       = ((StgRBHSave *)next)->payload[1];
1778
1779     IF_GRAN_DEBUG(bq,
1780                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
1781                         node, info_type(node)));
1782   }
1783 }
1784 #elif defined(PAR)
1785 static StgBlockingQueueElement *
1786 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
1787 {
1788     StgBlockingQueueElement *next;
1789
1790     switch (get_itbl(bqe)->type) {
1791     case TSO:
1792       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
1793       /* if it's a TSO just push it onto the run_queue */
1794       next = bqe->link;
1795       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
1796       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
1797       THREAD_RUNNABLE();
1798       unblockCount(bqe, node);
1799       /* reset blocking status after dumping event */
1800       ((StgTSO *)bqe)->why_blocked = NotBlocked;
1801       break;
1802
1803     case BLOCKED_FETCH:
1804       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
1805       next = bqe->link;
1806       bqe->link = PendingFetches;
1807       PendingFetches = bqe;
1808       break;
1809
1810 # if defined(DEBUG)
1811       /* can ignore this case in a non-debugging setup; 
1812          see comments on RBHSave closures above */
1813     case CONSTR:
1814       /* check that the closure is an RBHSave closure */
1815       ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
1816              get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
1817              get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
1818       break;
1819
1820     default:
1821       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
1822            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
1823            (StgClosure *)bqe);
1824 # endif
1825     }
1826   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
1827   return next;
1828 }
1829
1830 #else /* !GRAN && !PAR */
1831 static StgTSO *
1832 unblockOneLocked(StgTSO *tso)
1833 {
1834   StgTSO *next;
1835
1836   ASSERT(get_itbl(tso)->type == TSO);
1837   ASSERT(tso->why_blocked != NotBlocked);
1838   tso->why_blocked = NotBlocked;
1839   next = tso->link;
1840   PUSH_ON_RUN_QUEUE(tso);
1841   THREAD_RUNNABLE();
1842   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
1843   return next;
1844 }
1845 #endif
1846
1847 #if defined(GRAN)
1848 inline StgTSO *
1849 unblockOne(StgTSO *tso, StgClosure *node)
1850 {
1851   ACQUIRE_LOCK(&sched_mutex);
1852   tso = unblockOneLocked(tso, node);
1853   RELEASE_LOCK(&sched_mutex);
1854   return tso;
1855 }
1856 #elif defined(PAR)
1857 inline StgTSO *
1858 unblockOne(StgTSO *tso, StgClosure *node)
1859 {
1860   ACQUIRE_LOCK(&sched_mutex);
1861   tso = unblockOneLocked(tso, node);
1862   RELEASE_LOCK(&sched_mutex);
1863   return tso;
1864 }
1865 #else
1866 inline StgTSO *
1867 unblockOne(StgTSO *tso)
1868 {
1869   ACQUIRE_LOCK(&sched_mutex);
1870   tso = unblockOneLocked(tso);
1871   RELEASE_LOCK(&sched_mutex);
1872   return tso;
1873 }
1874 #endif
1875
1876 #if defined(GRAN)
1877 void 
1878 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
1879 {
1880   StgBlockingQueueElement *bqe, *next;
1881   StgTSO *tso;
1882   PEs node_loc, tso_loc;
1883   rtsTime bq_processing_time = 0;
1884   nat len = 0, len_local = 0;
1885
1886   IF_GRAN_DEBUG(bq, 
1887                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
1888                       node, CurrentProc, CurrentTime[CurrentProc], 
1889                       CurrentTSO->id, CurrentTSO));
1890
1891   node_loc = where_is(node);
1892
1893   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
1894          get_itbl(q)->type == CONSTR); // closure (type constructor)
1895   ASSERT(is_unique(node));
1896
1897   /* FAKE FETCH: magically copy the node to the tso's proc;
1898      no Fetch necessary because in reality the node should not have been 
1899      moved to the other PE in the first place
1900   */
1901   if (CurrentProc!=node_loc) {
1902     IF_GRAN_DEBUG(bq, 
1903                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
1904                         node, node_loc, CurrentProc, CurrentTSO->id, 
1905                         // CurrentTSO, where_is(CurrentTSO),
1906                         node->header.gran.procs));
1907     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
1908     IF_GRAN_DEBUG(bq, 
1909                   belch("## new bitmask of node %p is %#x",
1910                         node, node->header.gran.procs));
1911     if (RtsFlags.GranFlags.GranSimStats.Global) {
1912       globalGranStats.tot_fake_fetches++;
1913     }
1914   }
1915
1916   bqe = q;
1917   // ToDo: check: ASSERT(CurrentProc==node_loc);
1918   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
1919     //next = bqe->link;
1920     /* 
1921        bqe points to the current element in the queue
1922        next points to the next element in the queue
1923     */
1924     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
1925     //tso_loc = where_is(tso);
1926     bqe = unblockOneLocked(bqe, node);
1927   }
1928
1929   /* statistics gathering */
1930   /* ToDo: fix counters
1931   if (RtsFlags.GranFlags.GranSimStats.Global) {
1932     globalGranStats.tot_bq_processing_time += bq_processing_time;
1933     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
1934     globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
1935     globalGranStats.tot_awbq++;             // total no. of bqs awakened
1936   }
1937   IF_GRAN_DEBUG(bq,
1938                 fprintf(stderr,"## BQ Stats of %p: [%d entries, %d local] %s\n",
1939                         node, len, len_local, (next!=END_TSO_QUEUE) ? "RBH" : ""));
1940   */
1941 }
1942 #elif defined(PAR)
1943 void 
1944 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
1945 {
1946   StgBlockingQueueElement *bqe, *next;
1947
1948   ACQUIRE_LOCK(&sched_mutex);
1949
1950   IF_PAR_DEBUG(verbose, 
1951                belch("## AwBQ for node %p on [%x]: ",
1952                      node, mytid));
1953
1954   ASSERT(get_itbl(q)->type == TSO ||           
1955          get_itbl(q)->type == BLOCKED_FETCH || 
1956          get_itbl(q)->type == CONSTR); 
1957
1958   bqe = q;
1959   while (get_itbl(bqe)->type==TSO || 
1960          get_itbl(bqe)->type==BLOCKED_FETCH) {
1961     bqe = unblockOneLocked(bqe, node);
1962   }
1963   RELEASE_LOCK(&sched_mutex);
1964 }
1965
1966 #else   /* !GRAN && !PAR */
1967 void
1968 awakenBlockedQueue(StgTSO *tso)
1969 {
1970   ACQUIRE_LOCK(&sched_mutex);
1971   while (tso != END_TSO_QUEUE) {
1972     tso = unblockOneLocked(tso);
1973   }
1974   RELEASE_LOCK(&sched_mutex);
1975 }
1976 #endif
1977
1978 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
1979 //@subsection Exception Handling Routines
1980
1981 /* ---------------------------------------------------------------------------
1982    Interrupt execution
1983    - usually called inside a signal handler so it mustn't do anything fancy.   
1984    ------------------------------------------------------------------------ */
1985
1986 void
1987 interruptStgRts(void)
1988 {
1989     interrupted    = 1;
1990     context_switch = 1;
1991 }
1992
1993 /* -----------------------------------------------------------------------------
1994    Unblock a thread
1995
1996    This is for use when we raise an exception in another thread, which
1997    may be blocked.
1998    This has nothing to do with the UnblockThread event in GranSim. -- HWL
1999    -------------------------------------------------------------------------- */
2000
2001 static void
2002 unblockThread(StgTSO *tso)
2003 {
2004   StgTSO *t, **last;
2005
2006   ACQUIRE_LOCK(&sched_mutex);
2007   switch (tso->why_blocked) {
2008
2009   case NotBlocked:
2010     return;  /* not blocked */
2011
2012   case BlockedOnMVar:
2013     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2014     {
2015       StgTSO *last_tso = END_TSO_QUEUE;
2016       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2017
2018       last = &mvar->head;
2019       for (t = mvar->head; t != END_TSO_QUEUE; 
2020            last = &t->link, last_tso = t, t = t->link) {
2021         if (t == tso) {
2022           *last = tso->link;
2023           if (mvar->tail == tso) {
2024             mvar->tail = last_tso;
2025           }
2026           goto done;
2027         }
2028       }
2029       barf("unblockThread (MVAR): TSO not found");
2030     }
2031
2032   case BlockedOnBlackHole:
2033     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2034     {
2035       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2036
2037       last = &bq->blocking_queue;
2038       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2039            last = &t->link, t = t->link) {
2040         if (t == tso) {
2041           *last = tso->link;
2042           goto done;
2043         }
2044       }
2045       barf("unblockThread (BLACKHOLE): TSO not found");
2046     }
2047
2048   case BlockedOnException:
2049     {
2050       StgTSO *target  = tso->block_info.tso;
2051
2052       ASSERT(get_itbl(target)->type == TSO);
2053       ASSERT(target->blocked_exceptions != NULL);
2054
2055       last = &target->blocked_exceptions;
2056       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2057            last = &t->link, t = t->link) {
2058         ASSERT(get_itbl(t)->type == TSO);
2059         if (t == tso) {
2060           *last = tso->link;
2061           goto done;
2062         }
2063       }
2064       barf("unblockThread (Exception): TSO not found");
2065     }
2066
2067   case BlockedOnDelay:
2068   case BlockedOnRead:
2069   case BlockedOnWrite:
2070     {
2071       StgTSO *prev = NULL;
2072       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2073            prev = t, t = t->link) {
2074         if (t == tso) {
2075           if (prev == NULL) {
2076             blocked_queue_hd = t->link;
2077             if (blocked_queue_tl == t) {
2078               blocked_queue_tl = END_TSO_QUEUE;
2079             }
2080           } else {
2081             prev->link = t->link;
2082             if (blocked_queue_tl == t) {
2083               blocked_queue_tl = prev;
2084             }
2085           }
2086           goto done;
2087         }
2088       }
2089       barf("unblockThread (I/O): TSO not found");
2090     }
2091
2092   default:
2093     barf("unblockThread");
2094   }
2095
2096  done:
2097   tso->link = END_TSO_QUEUE;
2098   tso->why_blocked = NotBlocked;
2099   tso->block_info.closure = NULL;
2100   PUSH_ON_RUN_QUEUE(tso);
2101   RELEASE_LOCK(&sched_mutex);
2102 }
2103
2104 /* -----------------------------------------------------------------------------
2105  * raiseAsync()
2106  *
2107  * The following function implements the magic for raising an
2108  * asynchronous exception in an existing thread.
2109  *
2110  * We first remove the thread from any queue on which it might be
2111  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2112  *
2113  * We strip the stack down to the innermost CATCH_FRAME, building
2114  * thunks in the heap for all the active computations, so they can 
2115  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2116  * an application of the handler to the exception, and push it on
2117  * the top of the stack.
2118  * 
2119  * How exactly do we save all the active computations?  We create an
2120  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2121  * AP_UPDs pushes everything from the corresponding update frame
2122  * upwards onto the stack.  (Actually, it pushes everything up to the
2123  * next update frame plus a pointer to the next AP_UPD object.
2124  * Entering the next AP_UPD object pushes more onto the stack until we
2125  * reach the last AP_UPD object - at which point the stack should look
2126  * exactly as it did when we killed the TSO and we can continue
2127  * execution by entering the closure on top of the stack.
2128  *
2129  * We can also kill a thread entirely - this happens if either (a) the 
2130  * exception passed to raiseAsync is NULL, or (b) there's no
2131  * CATCH_FRAME on the stack.  In either case, we strip the entire
2132  * stack and replace the thread with a zombie.
2133  *
2134  * -------------------------------------------------------------------------- */
2135  
2136 void 
2137 deleteThread(StgTSO *tso)
2138 {
2139   raiseAsync(tso,NULL);
2140 }
2141
2142 void
2143 raiseAsync(StgTSO *tso, StgClosure *exception)
2144 {
2145   StgUpdateFrame* su = tso->su;
2146   StgPtr          sp = tso->sp;
2147   
2148   /* Thread already dead? */
2149   if (tso->whatNext == ThreadComplete || tso->whatNext == ThreadKilled) {
2150     return;
2151   }
2152
2153   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2154
2155   /* Remove it from any blocking queues */
2156   unblockThread(tso);
2157
2158   /* The stack freezing code assumes there's a closure pointer on
2159    * the top of the stack.  This isn't always the case with compiled
2160    * code, so we have to push a dummy closure on the top which just
2161    * returns to the next return address on the stack.
2162    */
2163   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2164     *(--sp) = (W_)&dummy_ret_closure;
2165   }
2166
2167   while (1) {
2168     int words = ((P_)su - (P_)sp) - 1;
2169     nat i;
2170     StgAP_UPD * ap;
2171
2172     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2173      * then build PAP(handler,exception,realworld#), and leave it on
2174      * top of the stack ready to enter.
2175      */
2176     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2177       StgCatchFrame *cf = (StgCatchFrame *)su;
2178       /* we've got an exception to raise, so let's pass it to the
2179        * handler in this frame.
2180        */
2181       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
2182       TICK_ALLOC_UPD_PAP(3,0);
2183       SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2184               
2185       ap->n_args = 2;
2186       ap->fun = cf->handler;    /* :: Exception -> IO a */
2187       ap->payload[0] = (P_)exception;
2188       ap->payload[1] = ARG_TAG(0); /* realworld token */
2189
2190       /* throw away the stack from Sp up to and including the
2191        * CATCH_FRAME.
2192        */
2193       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
2194       tso->su = cf->link;
2195
2196       /* Restore the blocked/unblocked state for asynchronous exceptions
2197        * at the CATCH_FRAME.  
2198        *
2199        * If exceptions were unblocked at the catch, arrange that they
2200        * are unblocked again after executing the handler by pushing an
2201        * unblockAsyncExceptions_ret stack frame.
2202        */
2203       if (!cf->exceptions_blocked) {
2204         *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2205       }
2206       
2207       /* Ensure that async exceptions are blocked when running the handler.
2208        */
2209       if (tso->blocked_exceptions == NULL) {
2210         tso->blocked_exceptions = END_TSO_QUEUE;
2211       }
2212       
2213       /* Put the newly-built PAP on top of the stack, ready to execute
2214        * when the thread restarts.
2215        */
2216       sp[0] = (W_)ap;
2217       tso->sp = sp;
2218       tso->whatNext = ThreadEnterGHC;
2219       return;
2220     }
2221
2222     /* First build an AP_UPD consisting of the stack chunk above the
2223      * current update frame, with the top word on the stack as the
2224      * fun field.
2225      */
2226     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2227     
2228     ASSERT(words >= 0);
2229     
2230     ap->n_args = words;
2231     ap->fun    = (StgClosure *)sp[0];
2232     sp++;
2233     for(i=0; i < (nat)words; ++i) {
2234       ap->payload[i] = (P_)*sp++;
2235     }
2236     
2237     switch (get_itbl(su)->type) {
2238       
2239     case UPDATE_FRAME:
2240       {
2241         SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2242         TICK_ALLOC_UP_THK(words+1,0);
2243         
2244         IF_DEBUG(scheduler,
2245                  fprintf(stderr,  "scheduler: Updating ");
2246                  printPtr((P_)su->updatee); 
2247                  fprintf(stderr,  " with ");
2248                  printObj((StgClosure *)ap);
2249                  );
2250         
2251         /* Replace the updatee with an indirection - happily
2252          * this will also wake up any threads currently
2253          * waiting on the result.
2254          */
2255         UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2256         su = su->link;
2257         sp += sizeofW(StgUpdateFrame) -1;
2258         sp[0] = (W_)ap; /* push onto stack */
2259         break;
2260       }
2261       
2262     case CATCH_FRAME:
2263       {
2264         StgCatchFrame *cf = (StgCatchFrame *)su;
2265         StgClosure* o;
2266         
2267         /* We want a PAP, not an AP_UPD.  Fortunately, the
2268          * layout's the same.
2269          */
2270         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2271         TICK_ALLOC_UPD_PAP(words+1,0);
2272         
2273         /* now build o = FUN(catch,ap,handler) */
2274         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2275         TICK_ALLOC_FUN(2,0);
2276         SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2277         o->payload[0] = (StgClosure *)ap;
2278         o->payload[1] = cf->handler;
2279         
2280         IF_DEBUG(scheduler,
2281                  fprintf(stderr,  "scheduler: Built ");
2282                  printObj((StgClosure *)o);
2283                  );
2284         
2285         /* pop the old handler and put o on the stack */
2286         su = cf->link;
2287         sp += sizeofW(StgCatchFrame) - 1;
2288         sp[0] = (W_)o;
2289         break;
2290       }
2291       
2292     case SEQ_FRAME:
2293       {
2294         StgSeqFrame *sf = (StgSeqFrame *)su;
2295         StgClosure* o;
2296         
2297         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2298         TICK_ALLOC_UPD_PAP(words+1,0);
2299         
2300         /* now build o = FUN(seq,ap) */
2301         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2302         TICK_ALLOC_SE_THK(1,0);
2303         SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2304         payloadCPtr(o,0) = (StgClosure *)ap;
2305         
2306         IF_DEBUG(scheduler,
2307                  fprintf(stderr,  "scheduler: Built ");
2308                  printObj((StgClosure *)o);
2309                  );
2310         
2311         /* pop the old handler and put o on the stack */
2312         su = sf->link;
2313         sp += sizeofW(StgSeqFrame) - 1;
2314         sp[0] = (W_)o;
2315         break;
2316       }
2317       
2318     case STOP_FRAME:
2319       /* We've stripped the entire stack, the thread is now dead. */
2320       sp += sizeofW(StgStopFrame) - 1;
2321       sp[0] = (W_)exception;    /* save the exception */
2322       tso->whatNext = ThreadKilled;
2323       tso->su = (StgUpdateFrame *)(sp+1);
2324       tso->sp = sp;
2325       return;
2326       
2327     default:
2328       barf("raiseAsync");
2329     }
2330   }
2331   barf("raiseAsync");
2332 }
2333
2334 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2335 //@subsection Debugging Routines
2336
2337 /* -----------------------------------------------------------------------------
2338    Debugging: why is a thread blocked
2339    -------------------------------------------------------------------------- */
2340
2341 #ifdef DEBUG
2342
2343 void printThreadBlockage(StgTSO *tso)
2344 {
2345   switch (tso->why_blocked) {
2346   case BlockedOnRead:
2347     fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2348     break;
2349   case BlockedOnWrite:
2350     fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2351     break;
2352   case BlockedOnDelay:
2353     fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2354     break;
2355   case BlockedOnMVar:
2356     fprintf(stderr,"blocked on an MVar");
2357     break;
2358   case BlockedOnException:
2359     fprintf(stderr,"blocked on delivering an exception to thread %d",
2360             tso->block_info.tso->id);
2361     break;
2362   case BlockedOnBlackHole:
2363     fprintf(stderr,"blocked on a black hole");
2364     break;
2365   case NotBlocked:
2366     fprintf(stderr,"not blocked");
2367     break;
2368 #if defined(PAR)
2369   case BlockedOnGA:
2370     fprintf(stderr,"blocked on global address");
2371     break;
2372 #endif
2373   }
2374 }
2375
2376 /* 
2377    Print a whole blocking queue attached to node (debugging only).
2378 */
2379 //@cindex print_bq
2380 # if defined(PAR)
2381 void 
2382 print_bq (StgClosure *node)
2383 {
2384   StgBlockingQueueElement *bqe;
2385   StgTSO *tso;
2386   rtsBool end;
2387
2388   fprintf(stderr,"## BQ of closure %p (%s): ",
2389           node, info_type(node));
2390
2391   /* should cover all closures that may have a blocking queue */
2392   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2393          get_itbl(node)->type == FETCH_ME_BQ ||
2394          get_itbl(node)->type == RBH);
2395     
2396   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2397   /* 
2398      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2399   */
2400   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2401        !end; // iterate until bqe points to a CONSTR
2402        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2403     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2404     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2405     /* types of closures that may appear in a blocking queue */
2406     ASSERT(get_itbl(bqe)->type == TSO ||           
2407            get_itbl(bqe)->type == BLOCKED_FETCH || 
2408            get_itbl(bqe)->type == CONSTR); 
2409     /* only BQs of an RBH end with an RBH_Save closure */
2410     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2411
2412     switch (get_itbl(bqe)->type) {
2413     case TSO:
2414       fprintf(stderr," TSO %d (%x),",
2415               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
2416       break;
2417     case BLOCKED_FETCH:
2418       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
2419               ((StgBlockedFetch *)bqe)->node, 
2420               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
2421               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
2422               ((StgBlockedFetch *)bqe)->ga.weight);
2423       break;
2424     case CONSTR:
2425       fprintf(stderr," %s (IP %p),",
2426               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2427                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2428                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2429                "RBH_Save_?"), get_itbl(bqe));
2430       break;
2431     default:
2432       barf("Unexpected closure type %s in blocking queue of %p (%s)",
2433            info_type(bqe), node, info_type(node));
2434       break;
2435     }
2436   } /* for */
2437   fputc('\n', stderr);
2438 }
2439 # elif defined(GRAN)
2440 void 
2441 print_bq (StgClosure *node)
2442 {
2443   StgBlockingQueueElement *bqe;
2444   StgTSO *tso;
2445   PEs node_loc, tso_loc;
2446   rtsBool end;
2447
2448   /* should cover all closures that may have a blocking queue */
2449   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2450          get_itbl(node)->type == FETCH_ME_BQ ||
2451          get_itbl(node)->type == RBH);
2452     
2453   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2454   node_loc = where_is(node);
2455
2456   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
2457           node, info_type(node), node_loc);
2458
2459   /* 
2460      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2461   */
2462   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2463        !end; // iterate until bqe points to a CONSTR
2464        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2465     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2466     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2467     /* types of closures that may appear in a blocking queue */
2468     ASSERT(get_itbl(bqe)->type == TSO ||           
2469            get_itbl(bqe)->type == CONSTR); 
2470     /* only BQs of an RBH end with an RBH_Save closure */
2471     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2472
2473     tso_loc = where_is((StgClosure *)bqe);
2474     switch (get_itbl(bqe)->type) {
2475     case TSO:
2476       fprintf(stderr," TSO %d (%x) on [PE %d],",
2477               ((StgTSO *)bqe)->id, ((StgTSO *)bqe), tso_loc);
2478       break;
2479     case CONSTR:
2480       fprintf(stderr," %s (IP %p),",
2481               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2482                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2483                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2484                "RBH_Save_?"), get_itbl(bqe));
2485       break;
2486     default:
2487       barf("Unexpected closure type %s in blocking queue of %p (%s)",
2488            info_type(bqe), node, info_type(node));
2489       break;
2490     }
2491   } /* for */
2492   fputc('\n', stderr);
2493 }
2494 #else
2495 /* 
2496    Nice and easy: only TSOs on the blocking queue
2497 */
2498 void 
2499 print_bq (StgClosure *node)
2500 {
2501   StgTSO *tso;
2502
2503   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2504   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
2505        tso != END_TSO_QUEUE; 
2506        tso=tso->link) {
2507     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
2508     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
2509     fprintf(stderr," TSO %d (%p),", tso->id, tso);
2510   }
2511   fputc('\n', stderr);
2512 }
2513 # endif
2514
2515 #if defined(PAR)
2516 static nat
2517 run_queue_len(void)
2518 {
2519   nat i;
2520   StgTSO *tso;
2521
2522   for (i=0, tso=run_queue_hd; 
2523        tso != END_TSO_QUEUE;
2524        i++, tso=tso->link)
2525     /* nothing */
2526
2527   return i;
2528 }
2529 #endif
2530
2531 static void
2532 sched_belch(char *s, ...)
2533 {
2534   va_list ap;
2535   va_start(ap,s);
2536 #ifdef SMP
2537   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
2538 #else
2539   fprintf(stderr, "scheduler: ");
2540 #endif
2541   vfprintf(stderr, s, ap);
2542   fprintf(stderr, "\n");
2543 }
2544
2545 #endif /* DEBUG */
2546
2547 //@node Index,  , Debugging Routines, Main scheduling code
2548 //@subsection Index
2549
2550 //@index
2551 //* MainRegTable::  @cindex\s-+MainRegTable
2552 //* StgMainThread::  @cindex\s-+StgMainThread
2553 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
2554 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
2555 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
2556 //* context_switch::  @cindex\s-+context_switch
2557 //* createThread::  @cindex\s-+createThread
2558 //* free_capabilities::  @cindex\s-+free_capabilities
2559 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
2560 //* initScheduler::  @cindex\s-+initScheduler
2561 //* interrupted::  @cindex\s-+interrupted
2562 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
2563 //* next_thread_id::  @cindex\s-+next_thread_id
2564 //* print_bq::  @cindex\s-+print_bq
2565 //* run_queue_hd::  @cindex\s-+run_queue_hd
2566 //* run_queue_tl::  @cindex\s-+run_queue_tl
2567 //* sched_mutex::  @cindex\s-+sched_mutex
2568 //* schedule::  @cindex\s-+schedule
2569 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
2570 //* task_ids::  @cindex\s-+task_ids
2571 //* term_mutex::  @cindex\s-+term_mutex
2572 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
2573 //@end index