[project @ 2000-01-22 18:00:03 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.45 2000/01/22 18:00:03 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-1999
5  *
6  * Scheduler
7  *
8  * The main scheduling code in GranSim is quite different from that in std
9  * (concurrent) Haskell: while concurrent Haskell just iterates over the
10  * threads in the runnable queue, GranSim is event driven, i.e. it iterates
11  * over the events in the global event queue.  -- HWL
12  * --------------------------------------------------------------------------*/
13
14 //@node Main scheduling code, , ,
15 //@section Main scheduling code
16
17 /* Version with scheduler monitor support for SMPs.
18
19    This design provides a high-level API to create and schedule threads etc.
20    as documented in the SMP design document.
21
22    It uses a monitor design controlled by a single mutex to exercise control
23    over accesses to shared data structures, and builds on the Posix threads
24    library.
25
26    The majority of state is shared.  In order to keep essential per-task state,
27    there is a Capability structure, which contains all the information
28    needed to run a thread: its STG registers, a pointer to its TSO, a
29    nursery etc.  During STG execution, a pointer to the capability is
30    kept in a register (BaseReg).
31
32    In a non-SMP build, there is one global capability, namely MainRegTable.
33
34    SDM & KH, 10/99
35 */
36
37 //@menu
38 //* Includes::                  
39 //* Variables and Data structures::  
40 //* Prototypes::                
41 //* Main scheduling loop::      
42 //* Suspend and Resume::        
43 //* Run queue code::            
44 //* Garbage Collextion Routines::  
45 //* Blocking Queue Routines::   
46 //* Exception Handling Routines::  
47 //* Debugging Routines::        
48 //* Index::                     
49 //@end menu
50
51 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
52 //@subsection Includes
53
54 #include "Rts.h"
55 #include "SchedAPI.h"
56 #include "RtsUtils.h"
57 #include "RtsFlags.h"
58 #include "Storage.h"
59 #include "StgRun.h"
60 #include "StgStartup.h"
61 #include "GC.h"
62 #include "Hooks.h"
63 #include "Schedule.h"
64 #include "StgMiscClosures.h"
65 #include "Storage.h"
66 #include "Evaluator.h"
67 #include "Exception.h"
68 #include "Printer.h"
69 #include "Main.h"
70 #include "Signals.h"
71 #include "Profiling.h"
72 #include "Sanity.h"
73 #include "Stats.h"
74 #include "Sparks.h"
75 #if defined(GRAN) || defined(PAR)
76 # include "GranSimRts.h"
77 # include "GranSim.h"
78 # include "ParallelRts.h"
79 # include "Parallel.h"
80 # include "ParallelDebug.h"
81 # include "FetchMe.h"
82 # include "HLC.h"
83 #endif
84
85 #include <stdarg.h>
86
87 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
88 //@subsection Variables and Data structures
89
90 /* Main threads:
91  *
92  * These are the threads which clients have requested that we run.  
93  *
94  * In an SMP build, we might have several concurrent clients all
95  * waiting for results, and each one will wait on a condition variable
96  * until the result is available.
97  *
98  * In non-SMP, clients are strictly nested: the first client calls
99  * into the RTS, which might call out again to C with a _ccall_GC, and
100  * eventually re-enter the RTS.
101  *
102  * Main threads information is kept in a linked list:
103  */
104 //@cindex StgMainThread
105 typedef struct StgMainThread_ {
106   StgTSO *         tso;
107   SchedulerStatus  stat;
108   StgClosure **    ret;
109 #ifdef SMP
110   pthread_cond_t wakeup;
111 #endif
112   struct StgMainThread_ *link;
113 } StgMainThread;
114
115 /* Main thread queue.
116  * Locks required: sched_mutex.
117  */
118 static StgMainThread *main_threads;
119
120 /* Thread queues.
121  * Locks required: sched_mutex.
122  */
123
124 #if defined(GRAN)
125
126 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
127 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
128
129 /* 
130    In GranSim we have a runable and a blocked queue for each processor.
131    In order to minimise code changes new arrays run_queue_hds/tls
132    are created. run_queue_hd is then a short cut (macro) for
133    run_queue_hds[CurrentProc] (see GranSim.h).
134    -- HWL
135 */
136 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
137 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
138 StgTSO *ccalling_threadss[MAX_PROC];
139
140 #else /* !GRAN */
141
142 //@cindex run_queue_hd
143 //@cindex run_queue_tl
144 //@cindex blocked_queue_hd
145 //@cindex blocked_queue_tl
146 StgTSO *run_queue_hd, *run_queue_tl;
147 StgTSO *blocked_queue_hd, *blocked_queue_tl;
148
149 /* Threads suspended in _ccall_GC.
150  * Locks required: sched_mutex.
151  */
152 static StgTSO *suspended_ccalling_threads;
153
154 static void GetRoots(void);
155 static StgTSO *threadStackOverflow(StgTSO *tso);
156 #endif
157
158 /* KH: The following two flags are shared memory locations.  There is no need
159        to lock them, since they are only unset at the end of a scheduler
160        operation.
161 */
162
163 /* flag set by signal handler to precipitate a context switch */
164 //@cindex context_switch
165 nat context_switch;
166
167 /* if this flag is set as well, give up execution */
168 //@cindex interrupted
169 rtsBool interrupted;
170
171 /* Next thread ID to allocate.
172  * Locks required: sched_mutex
173  */
174 //@cindex next_thread_id
175 StgThreadID next_thread_id = 1;
176
177 /*
178  * Pointers to the state of the current thread.
179  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
180  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
181  */
182  
183 /* The smallest stack size that makes any sense is:
184  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
185  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
186  *  + 1                       (the realworld token for an IO thread)
187  *  + 1                       (the closure to enter)
188  *
189  * A thread with this stack will bomb immediately with a stack
190  * overflow, which will increase its stack size.  
191  */
192
193 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
194
195 /* Free capability list.
196  * Locks required: sched_mutex.
197  */
198 #ifdef SMP
199 //@cindex free_capabilities
200 //@cindex n_free_capabilities
201 Capability *free_capabilities; /* Available capabilities for running threads */
202 nat n_free_capabilities;       /* total number of available capabilities */
203 #else
204 //@cindex MainRegTable
205 Capability MainRegTable;       /* for non-SMP, we have one global capability */
206 #endif
207
208 #if defined(GRAN)
209 StgTSO      *CurrentTSOs[MAX_PROC];
210 #else
211 StgTSO      *CurrentTSO;
212 #endif
213
214 rtsBool ready_to_gc;
215
216 /* All our current task ids, saved in case we need to kill them later.
217  */
218 #ifdef SMP
219 //@cindex task_ids
220 task_info *task_ids;
221 #endif
222
223 void            addToBlockedQueue ( StgTSO *tso );
224
225 static void     schedule          ( void );
226        void     interruptStgRts   ( void );
227 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
228
229 #ifdef DEBUG
230 static void sched_belch(char *s, ...);
231 #endif
232
233 #ifdef SMP
234 //@cindex sched_mutex
235 //@cindex term_mutex
236 //@cindex thread_ready_cond
237 //@cindex gc_pending_cond
238 pthread_mutex_t sched_mutex       = PTHREAD_MUTEX_INITIALIZER;
239 pthread_mutex_t term_mutex        = PTHREAD_MUTEX_INITIALIZER;
240 pthread_cond_t  thread_ready_cond = PTHREAD_COND_INITIALIZER;
241 pthread_cond_t  gc_pending_cond   = PTHREAD_COND_INITIALIZER;
242
243 nat await_death;
244 #endif
245
246 #if defined(PAR)
247 StgTSO *LastTSO;
248 rtsTime TimeOfLastYield;
249 #endif
250
251 /*
252  * The thread state for the main thread.
253 // ToDo: check whether not needed any more
254 StgTSO   *MainTSO;
255  */
256
257
258 //@node Prototypes, Main scheduling loop, Variables and Data structures, Main scheduling code
259 //@subsection Prototypes
260
261 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
262 //@subsection Main scheduling loop
263
264 /* ---------------------------------------------------------------------------
265    Main scheduling loop.
266
267    We use round-robin scheduling, each thread returning to the
268    scheduler loop when one of these conditions is detected:
269
270       * out of heap space
271       * timer expires (thread yields)
272       * thread blocks
273       * thread ends
274       * stack overflow
275
276    Locking notes:  we acquire the scheduler lock once at the beginning
277    of the scheduler loop, and release it when
278     
279       * running a thread, or
280       * waiting for work, or
281       * waiting for a GC to complete.
282
283    ------------------------------------------------------------------------ */
284 //@cindex schedule
285 static void
286 schedule( void )
287 {
288   StgTSO *t;
289   Capability *cap;
290   StgThreadReturnCode ret;
291 #if defined(GRAN)
292   rtsEvent *event;
293 #elif defined(PAR)
294   rtsSpark spark;
295   StgTSO *tso;
296   GlobalTaskId pe;
297 #endif
298   
299   ACQUIRE_LOCK(&sched_mutex);
300
301 #if defined(GRAN)
302 # error ToDo: implement GranSim scheduler
303 #elif defined(PAR)
304   while (!GlobalStopPending) {          /* GlobalStopPending set in par_exit */
305
306     if (PendingFetches != END_BF_QUEUE) {
307         processFetches();
308     }
309 #else
310   while (1) {
311 #endif
312
313     /* If we're interrupted (the user pressed ^C, or some other
314      * termination condition occurred), kill all the currently running
315      * threads.
316      */
317     if (interrupted) {
318       IF_DEBUG(scheduler, sched_belch("interrupted"));
319       for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
320         deleteThread(t);
321       }
322       for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
323         deleteThread(t);
324       }
325       run_queue_hd = run_queue_tl = END_TSO_QUEUE;
326       blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
327     }
328
329     /* Go through the list of main threads and wake up any
330      * clients whose computations have finished.  ToDo: this
331      * should be done more efficiently without a linear scan
332      * of the main threads list, somehow...
333      */
334 #ifdef SMP
335     { 
336       StgMainThread *m, **prev;
337       prev = &main_threads;
338       for (m = main_threads; m != NULL; m = m->link) {
339         switch (m->tso->whatNext) {
340         case ThreadComplete:
341           if (m->ret) {
342             *(m->ret) = (StgClosure *)m->tso->sp[0];
343           }
344           *prev = m->link;
345           m->stat = Success;
346           pthread_cond_broadcast(&m->wakeup);
347           break;
348         case ThreadKilled:
349           *prev = m->link;
350           m->stat = Killed;
351           pthread_cond_broadcast(&m->wakeup);
352           break;
353         default:
354           break;
355         }
356       }
357     }
358 #else
359     /* If our main thread has finished or been killed, return.
360      */
361     {
362       StgMainThread *m = main_threads;
363       if (m->tso->whatNext == ThreadComplete
364           || m->tso->whatNext == ThreadKilled) {
365         main_threads = main_threads->link;
366         if (m->tso->whatNext == ThreadComplete) {
367           /* we finished successfully, fill in the return value */
368           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
369           m->stat = Success;
370           return;
371         } else {
372           m->stat = Killed;
373           return;
374         }
375       }
376     }
377 #endif
378
379     /* Top up the run queue from our spark pool.  We try to make the
380      * number of threads in the run queue equal to the number of
381      * free capabilities.
382      */
383 #if defined(SMP)
384     {
385       nat n = n_free_capabilities;
386       StgTSO *tso = run_queue_hd;
387
388       /* Count the run queue */
389       while (n > 0 && tso != END_TSO_QUEUE) {
390         tso = tso->link;
391         n--;
392       }
393
394       for (; n > 0; n--) {
395         StgClosure *spark;
396         spark = findSpark();
397         if (spark == NULL) {
398           break; /* no more sparks in the pool */
399         } else {
400           /* I'd prefer this to be done in activateSpark -- HWL */
401           /* tricky - it needs to hold the scheduler lock and
402            * not try to re-acquire it -- SDM */
403           StgTSO *tso;
404           tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
405           pushClosure(tso,spark);
406           PUSH_ON_RUN_QUEUE(tso);
407 #ifdef PAR
408           advisory_thread_count++;
409 #endif
410           
411           IF_DEBUG(scheduler,
412                    sched_belch("turning spark of closure %p into a thread",
413                                (StgClosure *)spark));
414         }
415       }
416       /* We need to wake up the other tasks if we just created some
417        * work for them.
418        */
419       if (n_free_capabilities - n > 1) {
420           pthread_cond_signal(&thread_ready_cond);
421       }
422     }
423 #endif /* SMP */
424
425     /* Check whether any waiting threads need to be woken up.  If the
426      * run queue is empty, and there are no other tasks running, we
427      * can wait indefinitely for something to happen.
428      * ToDo: what if another client comes along & requests another
429      * main thread?
430      */
431     if (blocked_queue_hd != END_TSO_QUEUE) {
432       awaitEvent(
433            (run_queue_hd == END_TSO_QUEUE)
434 #ifdef SMP
435         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
436 #endif
437         );
438     }
439     
440     /* check for signals each time around the scheduler */
441 #ifndef __MINGW32__
442     if (signals_pending()) {
443       start_signal_handlers();
444     }
445 #endif
446
447     /* Detect deadlock: when we have no threads to run, there are
448      * no threads waiting on I/O or sleeping, and all the other
449      * tasks are waiting for work, we must have a deadlock.  Inform
450      * all the main threads.
451      */
452 #ifdef SMP
453     if (blocked_queue_hd == END_TSO_QUEUE
454         && run_queue_hd == END_TSO_QUEUE
455         && (n_free_capabilities == RtsFlags.ParFlags.nNodes)
456         ) {
457       StgMainThread *m;
458       for (m = main_threads; m != NULL; m = m->link) {
459           m->ret = NULL;
460           m->stat = Deadlock;
461           pthread_cond_broadcast(&m->wakeup);
462       }
463       main_threads = NULL;
464     }
465 #else /* ! SMP */
466     if (blocked_queue_hd == END_TSO_QUEUE
467         && run_queue_hd == END_TSO_QUEUE) {
468       StgMainThread *m = main_threads;
469       m->ret = NULL;
470       m->stat = Deadlock;
471       main_threads = m->link;
472       return;
473     }
474 #endif
475
476 #ifdef SMP
477     /* If there's a GC pending, don't do anything until it has
478      * completed.
479      */
480     if (ready_to_gc) {
481       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
482       pthread_cond_wait(&gc_pending_cond, &sched_mutex);
483     }
484     
485     /* block until we've got a thread on the run queue and a free
486      * capability.
487      */
488     while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) {
489       IF_DEBUG(scheduler, sched_belch("waiting for work"));
490       pthread_cond_wait(&thread_ready_cond, &sched_mutex);
491       IF_DEBUG(scheduler, sched_belch("work now available"));
492     }
493 #endif
494
495 #if defined(GRAN)
496 # error ToDo: implement GranSim scheduler
497 #elif defined(PAR)
498     /* ToDo: phps merge with spark activation above */
499     /* check whether we have local work and send requests if we have none */
500     if (run_queue_hd == END_TSO_QUEUE) {  /* no runnable threads */
501       /* :-[  no local threads => look out for local sparks */
502       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
503           (pending_sparks_hd[REQUIRED_POOL] < pending_sparks_tl[REQUIRED_POOL] ||
504            pending_sparks_hd[ADVISORY_POOL] < pending_sparks_tl[ADVISORY_POOL])) {
505         /* 
506          * ToDo: add GC code check that we really have enough heap afterwards!!
507          * Old comment:
508          * If we're here (no runnable threads) and we have pending
509          * sparks, we must have a space problem.  Get enough space
510          * to turn one of those pending sparks into a
511          * thread... 
512          */
513         
514         spark = findSpark();                /* get a spark */
515         if (spark != (rtsSpark) NULL) {
516           tso = activateSpark(spark);       /* turn the spark into a thread */
517           IF_PAR_DEBUG(verbose,
518                        belch("== [%x] schedule: Created TSO %p (%d); %d threads active",
519                              mytid, tso, tso->id, advisory_thread_count));
520
521           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
522             belch("^^ failed to activate spark");
523             goto next_thread;
524           }               /* otherwise fall through & pick-up new tso */
525         } else {
526           IF_PAR_DEBUG(verbose,
527                        belch("^^ no local sparks (spark pool contains only NFs: %d)", 
528                              spark_queue_len(ADVISORY_POOL)));
529           goto next_thread;
530         }
531       } else  
532       /* =8-[  no local sparks => look for work on other PEs */
533       {
534         /*
535          * We really have absolutely no work.  Send out a fish
536          * (there may be some out there already), and wait for
537          * something to arrive.  We clearly can't run any threads
538          * until a SCHEDULE or RESUME arrives, and so that's what
539          * we're hoping to see.  (Of course, we still have to
540          * respond to other types of messages.)
541          */
542         if (//!fishing &&  
543             outstandingFishes < RtsFlags.ParFlags.maxFishes ) { // &&
544           // (last_fish_arrived_at+FISH_DELAY < CURRENT_TIME)) {
545           /* fishing set in sendFish, processFish;
546              avoid flooding system with fishes via delay */
547           pe = choosePE();
548           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
549                    NEW_FISH_HUNGER);
550         }
551         
552         processMessages();
553         goto next_thread;
554         // ReSchedule(0);
555       }
556     } else if (PacketsWaiting()) {  /* Look for incoming messages */
557       processMessages();
558     }
559
560     /* Now we are sure that we have some work available */
561     ASSERT(run_queue_hd != END_TSO_QUEUE);
562     /* Take a thread from the run queue, if we have work */
563     t = take_off_run_queue(END_TSO_QUEUE);
564
565     /* ToDo: write something to the log-file
566     if (RTSflags.ParFlags.granSimStats && !sameThread)
567         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
568     */
569
570     CurrentTSO = t;
571
572     IF_DEBUG(scheduler, belch("--^^ %d sparks on [%#x] (hd=%x; tl=%x; lim=%x)", 
573                               spark_queue_len(ADVISORY_POOL), CURRENT_PROC,
574                               pending_sparks_hd[ADVISORY_POOL], 
575                               pending_sparks_tl[ADVISORY_POOL], 
576                               pending_sparks_lim[ADVISORY_POOL]));
577
578     IF_DEBUG(scheduler, belch("--== %d threads on [%#x] (hd=%x; tl=%x)", 
579                               run_queue_len(), CURRENT_PROC,
580                               run_queue_hd, run_queue_tl));
581
582     if (t != LastTSO) {
583       /* 
584          we are running a different TSO, so write a schedule event to log file
585          NB: If we use fair scheduling we also have to write  a deschedule 
586              event for LastTSO; with unfair scheduling we know that the
587              previous tso has blocked whenever we switch to another tso, so
588              we don't need it in GUM for now
589       */
590       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
591                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
592       
593     }
594
595 #else /* !GRAN && !PAR */
596   
597     /* grab a thread from the run queue
598      */
599     t = POP_RUN_QUEUE();
600
601 #endif
602     
603     /* grab a capability
604      */
605 #ifdef SMP
606     cap = free_capabilities;
607     free_capabilities = cap->link;
608     n_free_capabilities--;
609 #else
610     cap = &MainRegTable;
611 #endif
612     
613     cap->rCurrentTSO = t;
614     
615     /* set the context_switch flag
616      */
617     if (run_queue_hd == END_TSO_QUEUE)
618       context_switch = 0;
619     else
620       context_switch = 1;
621
622     RELEASE_LOCK(&sched_mutex);
623     
624     IF_DEBUG(scheduler,sched_belch("running thread %d", t->id));
625
626     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
627     /* Run the current thread 
628      */
629     switch (cap->rCurrentTSO->whatNext) {
630     case ThreadKilled:
631     case ThreadComplete:
632       /* Thread already finished, return to scheduler. */
633       ret = ThreadFinished;
634       break;
635     case ThreadEnterGHC:
636       ret = StgRun((StgFunPtr) stg_enterStackTop, cap);
637       break;
638     case ThreadRunGHC:
639       ret = StgRun((StgFunPtr) stg_returnToStackTop, cap);
640       break;
641     case ThreadEnterHugs:
642 #ifdef INTERPRETER
643       {
644          StgClosure* c;
645          IF_DEBUG(scheduler,sched_belch("entering Hugs"));
646          c = (StgClosure *)(cap->rCurrentTSO->sp[0]);
647          cap->rCurrentTSO->sp += 1;
648          ret = enter(cap,c);
649          break;
650       }
651 #else
652       barf("Panic: entered a BCO but no bytecode interpreter in this build");
653 #endif
654     default:
655       barf("schedule: invalid whatNext field");
656     }
657     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
658     
659     /* Costs for the scheduler are assigned to CCS_SYSTEM */
660 #ifdef PROFILING
661     CCCS = CCS_SYSTEM;
662 #endif
663     
664     ACQUIRE_LOCK(&sched_mutex);
665
666 #ifdef SMP
667     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", pthread_self()););
668 #else
669     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
670 #endif
671     t = cap->rCurrentTSO;
672     
673     switch (ret) {
674     case HeapOverflow:
675       /* make all the running tasks block on a condition variable,
676        * maybe set context_switch and wait till they all pile in,
677        * then have them wait on a GC condition variable.
678        */
679       IF_DEBUG(scheduler,belch("thread %ld stopped: HeapOverflow", t->id));
680       threadPaused(t);
681       
682       ready_to_gc = rtsTrue;
683       context_switch = 1;               /* stop other threads ASAP */
684       PUSH_ON_RUN_QUEUE(t);
685       break;
686       
687     case StackOverflow:
688       /* just adjust the stack for this thread, then pop it back
689        * on the run queue.
690        */
691       IF_DEBUG(scheduler,belch("thread %ld stopped, StackOverflow", t->id));
692       threadPaused(t);
693       { 
694         StgMainThread *m;
695         /* enlarge the stack */
696         StgTSO *new_t = threadStackOverflow(t);
697         
698         /* This TSO has moved, so update any pointers to it from the
699          * main thread stack.  It better not be on any other queues...
700          * (it shouldn't be).
701          */
702         for (m = main_threads; m != NULL; m = m->link) {
703           if (m->tso == t) {
704             m->tso = new_t;
705           }
706         }
707         ready_to_gc = rtsTrue;
708         context_switch = 1;
709         PUSH_ON_RUN_QUEUE(new_t);
710       }
711       break;
712
713     case ThreadYielding:
714 #if defined(GRAN)
715       IF_DEBUG(gran, 
716                DumpGranEvent(GR_DESCHEDULE, t));
717       globalGranStats.tot_yields++;
718 #elif defined(PAR)
719       IF_DEBUG(par, 
720                DumpGranEvent(GR_DESCHEDULE, t));
721 #endif
722       /* put the thread back on the run queue.  Then, if we're ready to
723        * GC, check whether this is the last task to stop.  If so, wake
724        * up the GC thread.  getThread will block during a GC until the
725        * GC is finished.
726        */
727       IF_DEBUG(scheduler,
728                if (t->whatNext == ThreadEnterHugs) {
729                  /* ToDo: or maybe a timer expired when we were in Hugs?
730                   * or maybe someone hit ctrl-C
731                   */
732                  belch("thread %ld stopped to switch to Hugs", t->id);
733                } else {
734                  belch("thread %ld stopped, yielding", t->id);
735                }
736                );
737       threadPaused(t);
738       APPEND_TO_RUN_QUEUE(t);
739       break;
740       
741     case ThreadBlocked:
742 #if defined(GRAN)
743 # error ToDo: implement GranSim scheduler
744 #elif defined(PAR)
745       IF_DEBUG(par, 
746                DumpGranEvent(GR_DESCHEDULE, t)); 
747 #else
748 #endif
749       /* don't need to do anything.  Either the thread is blocked on
750        * I/O, in which case we'll have called addToBlockedQueue
751        * previously, or it's blocked on an MVar or Blackhole, in which
752        * case it'll be on the relevant queue already.
753        */
754       IF_DEBUG(scheduler,
755                fprintf(stderr, "thread %d stopped, ", t->id);
756                printThreadBlockage(t);
757                fprintf(stderr, "\n"));
758       threadPaused(t);
759       break;
760       
761     case ThreadFinished:
762       /* Need to check whether this was a main thread, and if so, signal
763        * the task that started it with the return value.  If we have no
764        * more main threads, we probably need to stop all the tasks until
765        * we get a new one.
766        */
767       IF_DEBUG(scheduler,belch("thread %ld finished", t->id));
768       t->whatNext = ThreadComplete;
769 #if defined(GRAN)
770       // ToDo: endThread(t, CurrentProc); // clean-up the thread
771 #elif defined(PAR)
772       advisory_thread_count--;
773       if (RtsFlags.ParFlags.ParStats.Full) 
774         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
775 #endif
776       break;
777       
778     default:
779       barf("doneThread: invalid thread return code");
780     }
781     
782 #ifdef SMP
783     cap->link = free_capabilities;
784     free_capabilities = cap;
785     n_free_capabilities++;
786 #endif
787
788 #ifdef SMP
789     if (ready_to_gc && n_free_capabilities == RtsFlags.ParFlags.nNodes) 
790 #else
791     if (ready_to_gc) 
792 #endif
793       {
794       /* everybody back, start the GC.
795        * Could do it in this thread, or signal a condition var
796        * to do it in another thread.  Either way, we need to
797        * broadcast on gc_pending_cond afterward.
798        */
799 #ifdef SMP
800       IF_DEBUG(scheduler,sched_belch("doing GC"));
801 #endif
802       GarbageCollect(GetRoots);
803       ready_to_gc = rtsFalse;
804 #ifdef SMP
805       pthread_cond_broadcast(&gc_pending_cond);
806 #endif
807     }
808 #if defined(GRAN)
809   next_thread:
810     IF_GRAN_DEBUG(unused,
811                   print_eventq(EventHd));
812
813     event = get_next_event();
814
815 #elif defined(PAR)
816   next_thread:
817     /* ToDo: wait for next message to arrive rather than busy wait */
818
819 #else /* GRAN */
820   /* not any more
821   next_thread:
822     t = take_off_run_queue(END_TSO_QUEUE);
823   */
824 #endif /* GRAN */
825   } /* end of while(1) */
826 }
827
828 /* A hack for Hugs concurrency support.  Needs sanitisation (?) */
829 void deleteAllThreads ( void )
830 {
831   StgTSO* t;
832   IF_DEBUG(scheduler,sched_belch("deleteAllThreads()"));
833   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
834     deleteThread(t);
835   }
836   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
837     deleteThread(t);
838   }
839   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
840   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
841 }
842
843 /* startThread and  insertThread are now in GranSim.c -- HWL */
844
845 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
846 //@subsection Suspend and Resume
847
848 /* ---------------------------------------------------------------------------
849  * Suspending & resuming Haskell threads.
850  * 
851  * When making a "safe" call to C (aka _ccall_GC), the task gives back
852  * its capability before calling the C function.  This allows another
853  * task to pick up the capability and carry on running Haskell
854  * threads.  It also means that if the C call blocks, it won't lock
855  * the whole system.
856  *
857  * The Haskell thread making the C call is put to sleep for the
858  * duration of the call, on the susepended_ccalling_threads queue.  We
859  * give out a token to the task, which it can use to resume the thread
860  * on return from the C function.
861  * ------------------------------------------------------------------------- */
862    
863 StgInt
864 suspendThread( Capability *cap )
865 {
866   nat tok;
867
868   ACQUIRE_LOCK(&sched_mutex);
869
870   IF_DEBUG(scheduler,
871            sched_belch("thread %d did a _ccall_gc\n", cap->rCurrentTSO->id));
872
873   threadPaused(cap->rCurrentTSO);
874   cap->rCurrentTSO->link = suspended_ccalling_threads;
875   suspended_ccalling_threads = cap->rCurrentTSO;
876
877   /* Use the thread ID as the token; it should be unique */
878   tok = cap->rCurrentTSO->id;
879
880 #ifdef SMP
881   cap->link = free_capabilities;
882   free_capabilities = cap;
883   n_free_capabilities++;
884 #endif
885
886   RELEASE_LOCK(&sched_mutex);
887   return tok; 
888 }
889
890 Capability *
891 resumeThread( StgInt tok )
892 {
893   StgTSO *tso, **prev;
894   Capability *cap;
895
896   ACQUIRE_LOCK(&sched_mutex);
897
898   prev = &suspended_ccalling_threads;
899   for (tso = suspended_ccalling_threads; 
900        tso != END_TSO_QUEUE; 
901        prev = &tso->link, tso = tso->link) {
902     if (tso->id == (StgThreadID)tok) {
903       *prev = tso->link;
904       break;
905     }
906   }
907   if (tso == END_TSO_QUEUE) {
908     barf("resumeThread: thread not found");
909   }
910
911 #ifdef SMP
912   while (free_capabilities == NULL) {
913     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
914     pthread_cond_wait(&thread_ready_cond, &sched_mutex);
915     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
916   }
917   cap = free_capabilities;
918   free_capabilities = cap->link;
919   n_free_capabilities--;
920 #else  
921   cap = &MainRegTable;
922 #endif
923
924   cap->rCurrentTSO = tso;
925
926   RELEASE_LOCK(&sched_mutex);
927   return cap;
928 }
929
930
931 /* ---------------------------------------------------------------------------
932  * Static functions
933  * ------------------------------------------------------------------------ */
934 static void unblockThread(StgTSO *tso);
935
936 /* ---------------------------------------------------------------------------
937  * Comparing Thread ids.
938  *
939  * This is used from STG land in the implementation of the
940  * instances of Eq/Ord for ThreadIds.
941  * ------------------------------------------------------------------------ */
942
943 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
944
945   StgThreadID id1 = tso1->id; 
946   StgThreadID id2 = tso2->id;
947  
948   if (id1 < id2) return (-1);
949   if (id1 > id2) return 1;
950   return 0;
951 }
952
953 /* ---------------------------------------------------------------------------
954    Create a new thread.
955
956    The new thread starts with the given stack size.  Before the
957    scheduler can run, however, this thread needs to have a closure
958    (and possibly some arguments) pushed on its stack.  See
959    pushClosure() in Schedule.h.
960
961    createGenThread() and createIOThread() (in SchedAPI.h) are
962    convenient packaged versions of this function.
963    ------------------------------------------------------------------------ */
964 //@cindex createThread
965 #if defined(GRAN)
966 /* currently pri (priority) is only used in a GRAN setup -- HWL */
967 StgTSO *
968 createThread(nat stack_size, StgInt pri)
969 {
970   return createThread_(stack_size, rtsFalse, pri);
971 }
972
973 static StgTSO *
974 createThread_(nat size, rtsBool have_lock, StgInt pri)
975 {
976 #else
977 StgTSO *
978 createThread(nat stack_size)
979 {
980   return createThread_(stack_size, rtsFalse);
981 }
982
983 static StgTSO *
984 createThread_(nat size, rtsBool have_lock)
985 {
986 #endif
987     StgTSO *tso;
988     nat stack_size;
989
990     /* First check whether we should create a thread at all */
991 #if defined(PAR)
992   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
993   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
994     threadsIgnored++;
995     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
996           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
997     return END_TSO_QUEUE;
998   }
999   threadsCreated++;
1000 #endif
1001
1002 #if defined(GRAN)
1003   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1004 #endif
1005
1006   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1007
1008   /* catch ridiculously small stack sizes */
1009   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1010     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1011   }
1012
1013   tso = (StgTSO *)allocate(size);
1014   TICK_ALLOC_TSO(size-sizeofW(StgTSO),0);
1015   
1016   stack_size = size - TSO_STRUCT_SIZEW;
1017
1018   // Hmm, this CCS_MAIN is not protected by a PROFILING cpp var;
1019   SET_HDR(tso, &TSO_info, CCS_MAIN);
1020 #if defined(GRAN)
1021   SET_GRAN_HDR(tso, ThisPE);
1022 #endif
1023   tso->whatNext     = ThreadEnterGHC;
1024
1025   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1026          protect the increment operation on next_thread_id.
1027          In future, we could use an atomic increment instead.
1028   */
1029   
1030   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1031   tso->id = next_thread_id++; 
1032   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1033
1034   tso->why_blocked  = NotBlocked;
1035   tso->blocked_exceptions = NULL;
1036
1037   tso->splim        = (P_)&(tso->stack) + RESERVED_STACK_WORDS;
1038   tso->stack_size   = stack_size;
1039   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1040                               - TSO_STRUCT_SIZEW;
1041   tso->sp           = (P_)&(tso->stack) + stack_size;
1042
1043 #ifdef PROFILING
1044   tso->prof.CCCS = CCS_MAIN;
1045 #endif
1046
1047   /* put a stop frame on the stack */
1048   tso->sp -= sizeofW(StgStopFrame);
1049   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN);
1050   tso->su = (StgUpdateFrame*)tso->sp;
1051
1052   IF_DEBUG(scheduler,belch("---- Initialised TSO %ld (%p), stack size = %lx words", 
1053                            tso->id, tso, tso->stack_size));
1054
1055   // ToDo: check this
1056 #if defined(GRAN)
1057   tso->link = END_TSO_QUEUE;
1058   /* uses more flexible routine in GranSim */
1059   insertThread(tso, CurrentProc);
1060 #else
1061   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1062      from its creation
1063   */
1064 #endif
1065
1066 #if defined(GRAN)
1067   tso->gran.pri = pri;
1068   tso->gran.magic = TSO_MAGIC; // debugging only
1069   tso->gran.sparkname   = 0;
1070   tso->gran.startedat   = CURRENT_TIME; 
1071   tso->gran.exported    = 0;
1072   tso->gran.basicblocks = 0;
1073   tso->gran.allocs      = 0;
1074   tso->gran.exectime    = 0;
1075   tso->gran.fetchtime   = 0;
1076   tso->gran.fetchcount  = 0;
1077   tso->gran.blocktime   = 0;
1078   tso->gran.blockcount  = 0;
1079   tso->gran.blockedat   = 0;
1080   tso->gran.globalsparks = 0;
1081   tso->gran.localsparks  = 0;
1082   if (RtsFlags.GranFlags.Light)
1083     tso->gran.clock  = Now; /* local clock */
1084   else
1085     tso->gran.clock  = 0;
1086
1087   IF_DEBUG(gran,printTSO(tso));
1088 #elif defined(PAR)
1089   tso->par.sparkname   = 0;
1090   tso->par.startedat   = CURRENT_TIME; 
1091   tso->par.exported    = 0;
1092   tso->par.basicblocks = 0;
1093   tso->par.allocs      = 0;
1094   tso->par.exectime    = 0;
1095   tso->par.fetchtime   = 0;
1096   tso->par.fetchcount  = 0;
1097   tso->par.blocktime   = 0;
1098   tso->par.blockcount  = 0;
1099   tso->par.blockedat   = 0;
1100   tso->par.globalsparks = 0;
1101   tso->par.localsparks  = 0;
1102 #endif
1103
1104 #if defined(GRAN)
1105   globalGranStats.tot_threads_created++;
1106   globalGranStats.threads_created_on_PE[CurrentProc]++;
1107   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1108   globalGranStats.tot_sq_probes++;
1109 #endif 
1110
1111   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1112                                  tso->id, tso->stack_size));
1113   return tso;
1114 }
1115
1116 /* ---------------------------------------------------------------------------
1117  * scheduleThread()
1118  *
1119  * scheduleThread puts a thread on the head of the runnable queue.
1120  * This will usually be done immediately after a thread is created.
1121  * The caller of scheduleThread must create the thread using e.g.
1122  * createThread and push an appropriate closure
1123  * on this thread's stack before the scheduler is invoked.
1124  * ------------------------------------------------------------------------ */
1125
1126 void
1127 scheduleThread(StgTSO *tso)
1128 {
1129   ACQUIRE_LOCK(&sched_mutex);
1130
1131   /* Put the new thread on the head of the runnable queue.  The caller
1132    * better push an appropriate closure on this thread's stack
1133    * beforehand.  In the SMP case, the thread may start running as
1134    * soon as we release the scheduler lock below.
1135    */
1136   PUSH_ON_RUN_QUEUE(tso);
1137   THREAD_RUNNABLE();
1138
1139   IF_DEBUG(scheduler,printTSO(tso));
1140   RELEASE_LOCK(&sched_mutex);
1141 }
1142
1143 /* ---------------------------------------------------------------------------
1144  * startTasks()
1145  *
1146  * Start up Posix threads to run each of the scheduler tasks.
1147  * I believe the task ids are not needed in the system as defined.
1148  *  KH @ 25/10/99
1149  * ------------------------------------------------------------------------ */
1150
1151 #ifdef SMP
1152 static void *
1153 taskStart( void *arg STG_UNUSED )
1154 {
1155   schedule();
1156   return NULL;
1157 }
1158 #endif
1159
1160 /* ---------------------------------------------------------------------------
1161  * initScheduler()
1162  *
1163  * Initialise the scheduler.  This resets all the queues - if the
1164  * queues contained any threads, they'll be garbage collected at the
1165  * next pass.
1166  *
1167  * This now calls startTasks(), so should only be called once!  KH @ 25/10/99
1168  * ------------------------------------------------------------------------ */
1169
1170 #ifdef SMP
1171 static void
1172 term_handler(int sig STG_UNUSED)
1173 {
1174   stat_workerStop();
1175   ACQUIRE_LOCK(&term_mutex);
1176   await_death--;
1177   RELEASE_LOCK(&term_mutex);
1178   pthread_exit(NULL);
1179 }
1180 #endif
1181
1182 //@cindex initScheduler
1183 void 
1184 initScheduler(void)
1185 {
1186 #if defined(GRAN)
1187   nat i;
1188
1189   for (i=0; i<=MAX_PROC; i++) {
1190     run_queue_hds[i]      = END_TSO_QUEUE;
1191     run_queue_tls[i]      = END_TSO_QUEUE;
1192     blocked_queue_hds[i]  = END_TSO_QUEUE;
1193     blocked_queue_tls[i]  = END_TSO_QUEUE;
1194     ccalling_threadss[i]  = END_TSO_QUEUE;
1195   }
1196 #else
1197   run_queue_hd      = END_TSO_QUEUE;
1198   run_queue_tl      = END_TSO_QUEUE;
1199   blocked_queue_hd  = END_TSO_QUEUE;
1200   blocked_queue_tl  = END_TSO_QUEUE;
1201 #endif 
1202
1203   suspended_ccalling_threads  = END_TSO_QUEUE;
1204
1205   main_threads = NULL;
1206
1207   context_switch = 0;
1208   interrupted    = 0;
1209
1210   enteredCAFs = END_CAF_LIST;
1211
1212   /* Install the SIGHUP handler */
1213 #ifdef SMP
1214   {
1215     struct sigaction action,oact;
1216
1217     action.sa_handler = term_handler;
1218     sigemptyset(&action.sa_mask);
1219     action.sa_flags = 0;
1220     if (sigaction(SIGTERM, &action, &oact) != 0) {
1221       barf("can't install TERM handler");
1222     }
1223   }
1224 #endif
1225
1226 #ifdef SMP
1227   /* Allocate N Capabilities */
1228   {
1229     nat i;
1230     Capability *cap, *prev;
1231     cap  = NULL;
1232     prev = NULL;
1233     for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1234       cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities");
1235       cap->link = prev;
1236       prev = cap;
1237     }
1238     free_capabilities = cap;
1239     n_free_capabilities = RtsFlags.ParFlags.nNodes;
1240   }
1241   IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Allocated %d capabilities\n",
1242                              n_free_capabilities););
1243 #endif
1244
1245 #if defined(SMP) || defined(PAR)
1246   initSparkPools();
1247 #endif
1248 }
1249
1250 #ifdef SMP
1251 void
1252 startTasks( void )
1253 {
1254   nat i;
1255   int r;
1256   pthread_t tid;
1257   
1258   /* make some space for saving all the thread ids */
1259   task_ids = stgMallocBytes(RtsFlags.ParFlags.nNodes * sizeof(task_info),
1260                             "initScheduler:task_ids");
1261   
1262   /* and create all the threads */
1263   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1264     r = pthread_create(&tid,NULL,taskStart,NULL);
1265     if (r != 0) {
1266       barf("startTasks: Can't create new Posix thread");
1267     }
1268     task_ids[i].id = tid;
1269     task_ids[i].mut_time = 0.0;
1270     task_ids[i].mut_etime = 0.0;
1271     task_ids[i].gc_time = 0.0;
1272     task_ids[i].gc_etime = 0.0;
1273     task_ids[i].elapsedtimestart = elapsedtime();
1274     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: Started task: %ld\n",tid););
1275   }
1276 }
1277 #endif
1278
1279 void
1280 exitScheduler( void )
1281 {
1282 #ifdef SMP
1283   nat i;
1284
1285   /* Don't want to use pthread_cancel, since we'd have to install
1286    * these silly exception handlers (pthread_cleanup_{push,pop}) around
1287    * all our locks.
1288    */
1289 #if 0
1290   /* Cancel all our tasks */
1291   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1292     pthread_cancel(task_ids[i].id);
1293   }
1294   
1295   /* Wait for all the tasks to terminate */
1296   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1297     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: waiting for task %ld\n", 
1298                                task_ids[i].id));
1299     pthread_join(task_ids[i].id, NULL);
1300   }
1301 #endif
1302
1303   /* Send 'em all a SIGHUP.  That should shut 'em up.
1304    */
1305   await_death = RtsFlags.ParFlags.nNodes;
1306   for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
1307     pthread_kill(task_ids[i].id,SIGTERM);
1308   }
1309   while (await_death > 0) {
1310     sched_yield();
1311   }
1312 #endif
1313 }
1314
1315 /* -----------------------------------------------------------------------------
1316    Managing the per-task allocation areas.
1317    
1318    Each capability comes with an allocation area.  These are
1319    fixed-length block lists into which allocation can be done.
1320
1321    ToDo: no support for two-space collection at the moment???
1322    -------------------------------------------------------------------------- */
1323
1324 /* -----------------------------------------------------------------------------
1325  * waitThread is the external interface for running a new computataion
1326  * and waiting for the result.
1327  *
1328  * In the non-SMP case, we create a new main thread, push it on the 
1329  * main-thread stack, and invoke the scheduler to run it.  The
1330  * scheduler will return when the top main thread on the stack has
1331  * completed or died, and fill in the necessary fields of the
1332  * main_thread structure.
1333  *
1334  * In the SMP case, we create a main thread as before, but we then
1335  * create a new condition variable and sleep on it.  When our new
1336  * main thread has completed, we'll be woken up and the status/result
1337  * will be in the main_thread struct.
1338  * -------------------------------------------------------------------------- */
1339
1340 SchedulerStatus
1341 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
1342 {
1343   StgMainThread *m;
1344   SchedulerStatus stat;
1345
1346   ACQUIRE_LOCK(&sched_mutex);
1347   
1348   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1349
1350   m->tso = tso;
1351   m->ret = ret;
1352   m->stat = NoStatus;
1353 #ifdef SMP
1354   pthread_cond_init(&m->wakeup, NULL);
1355 #endif
1356
1357   m->link = main_threads;
1358   main_threads = m;
1359
1360   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: new main thread (%d)\n", 
1361                               m->tso->id));
1362
1363 #ifdef SMP
1364   do {
1365     pthread_cond_wait(&m->wakeup, &sched_mutex);
1366   } while (m->stat == NoStatus);
1367 #else
1368   schedule();
1369   ASSERT(m->stat != NoStatus);
1370 #endif
1371
1372   stat = m->stat;
1373
1374 #ifdef SMP
1375   pthread_cond_destroy(&m->wakeup);
1376 #endif
1377
1378   IF_DEBUG(scheduler, fprintf(stderr, "scheduler: main thread (%d) finished\n", 
1379                               m->tso->id));
1380   free(m);
1381
1382   RELEASE_LOCK(&sched_mutex);
1383
1384   return stat;
1385 }
1386
1387 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
1388 //@subsection Run queue code 
1389
1390 #if 0
1391 /* 
1392    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
1393        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
1394        implicit global variable that has to be correct when calling these
1395        fcts -- HWL 
1396 */
1397
1398 /* Put the new thread on the head of the runnable queue.
1399  * The caller of createThread better push an appropriate closure
1400  * on this thread's stack before the scheduler is invoked.
1401  */
1402 static /* inline */ void
1403 add_to_run_queue(tso)
1404 StgTSO* tso; 
1405 {
1406   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1407   tso->link = run_queue_hd;
1408   run_queue_hd = tso;
1409   if (run_queue_tl == END_TSO_QUEUE) {
1410     run_queue_tl = tso;
1411   }
1412 }
1413
1414 /* Put the new thread at the end of the runnable queue. */
1415 static /* inline */ void
1416 push_on_run_queue(tso)
1417 StgTSO* tso; 
1418 {
1419   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
1420   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
1421   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
1422   if (run_queue_hd == END_TSO_QUEUE) {
1423     run_queue_hd = tso;
1424   } else {
1425     run_queue_tl->link = tso;
1426   }
1427   run_queue_tl = tso;
1428 }
1429
1430 /* 
1431    Should be inlined because it's used very often in schedule.  The tso
1432    argument is actually only needed in GranSim, where we want to have the
1433    possibility to schedule *any* TSO on the run queue, irrespective of the
1434    actual ordering. Therefore, if tso is not the nil TSO then we traverse
1435    the run queue and dequeue the tso, adjusting the links in the queue. 
1436 */
1437 //@cindex take_off_run_queue
1438 static /* inline */ StgTSO*
1439 take_off_run_queue(StgTSO *tso) {
1440   StgTSO *t, *prev;
1441
1442   /* 
1443      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
1444
1445      if tso is specified, unlink that tso from the run_queue (doesn't have
1446      to be at the beginning of the queue); GranSim only 
1447   */
1448   if (tso!=END_TSO_QUEUE) {
1449     /* find tso in queue */
1450     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
1451          t!=END_TSO_QUEUE && t!=tso;
1452          prev=t, t=t->link) 
1453       /* nothing */ ;
1454     ASSERT(t==tso);
1455     /* now actually dequeue the tso */
1456     if (prev!=END_TSO_QUEUE) {
1457       ASSERT(run_queue_hd!=t);
1458       prev->link = t->link;
1459     } else {
1460       /* t is at beginning of thread queue */
1461       ASSERT(run_queue_hd==t);
1462       run_queue_hd = t->link;
1463     }
1464     /* t is at end of thread queue */
1465     if (t->link==END_TSO_QUEUE) {
1466       ASSERT(t==run_queue_tl);
1467       run_queue_tl = prev;
1468     } else {
1469       ASSERT(run_queue_tl!=t);
1470     }
1471     t->link = END_TSO_QUEUE;
1472   } else {
1473     /* take tso from the beginning of the queue; std concurrent code */
1474     t = run_queue_hd;
1475     if (t != END_TSO_QUEUE) {
1476       run_queue_hd = t->link;
1477       t->link = END_TSO_QUEUE;
1478       if (run_queue_hd == END_TSO_QUEUE) {
1479         run_queue_tl = END_TSO_QUEUE;
1480       }
1481     }
1482   }
1483   return t;
1484 }
1485
1486 #endif /* 0 */
1487
1488 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
1489 //@subsection Garbage Collextion Routines
1490
1491 /* ---------------------------------------------------------------------------
1492    Where are the roots that we know about?
1493
1494         - all the threads on the runnable queue
1495         - all the threads on the blocked queue
1496         - all the thread currently executing a _ccall_GC
1497         - all the "main threads"
1498      
1499    ------------------------------------------------------------------------ */
1500
1501 /* This has to be protected either by the scheduler monitor, or by the
1502         garbage collection monitor (probably the latter).
1503         KH @ 25/10/99
1504 */
1505
1506 static void GetRoots(void)
1507 {
1508   StgMainThread *m;
1509
1510 #if defined(GRAN)
1511   {
1512     nat i;
1513     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
1514       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
1515         run_queue_hds[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_hds[i]);
1516       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
1517         run_queue_tls[i]    = (StgTSO *)MarkRoot((StgClosure *)run_queue_tls[i]);
1518       
1519       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
1520         blocked_queue_hds[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hds[i]);
1521       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
1522         blocked_queue_tls[i] = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tls[i]);
1523       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
1524         ccalling_threadss[i] = (StgTSO *)MarkRoot((StgClosure *)ccalling_threadss[i]);
1525     }
1526   }
1527
1528   markEventQueue();
1529
1530 #else /* !GRAN */
1531   run_queue_hd      = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd);
1532   run_queue_tl      = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl);
1533
1534   blocked_queue_hd  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd);
1535   blocked_queue_tl  = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl);
1536 #endif 
1537
1538   for (m = main_threads; m != NULL; m = m->link) {
1539     m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso);
1540   }
1541   suspended_ccalling_threads = 
1542     (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads);
1543
1544 #if defined(SMP) || defined(PAR) || defined(GRAN)
1545   markSparkQueue();
1546 #endif
1547 }
1548
1549 /* -----------------------------------------------------------------------------
1550    performGC
1551
1552    This is the interface to the garbage collector from Haskell land.
1553    We provide this so that external C code can allocate and garbage
1554    collect when called from Haskell via _ccall_GC.
1555
1556    It might be useful to provide an interface whereby the programmer
1557    can specify more roots (ToDo).
1558    
1559    This needs to be protected by the GC condition variable above.  KH.
1560    -------------------------------------------------------------------------- */
1561
1562 void (*extra_roots)(void);
1563
1564 void
1565 performGC(void)
1566 {
1567   GarbageCollect(GetRoots);
1568 }
1569
1570 static void
1571 AllRoots(void)
1572 {
1573   GetRoots();                   /* the scheduler's roots */
1574   extra_roots();                /* the user's roots */
1575 }
1576
1577 void
1578 performGCWithRoots(void (*get_roots)(void))
1579 {
1580   extra_roots = get_roots;
1581
1582   GarbageCollect(AllRoots);
1583 }
1584
1585 /* -----------------------------------------------------------------------------
1586    Stack overflow
1587
1588    If the thread has reached its maximum stack size, then raise the
1589    StackOverflow exception in the offending thread.  Otherwise
1590    relocate the TSO into a larger chunk of memory and adjust its stack
1591    size appropriately.
1592    -------------------------------------------------------------------------- */
1593
1594 static StgTSO *
1595 threadStackOverflow(StgTSO *tso)
1596 {
1597   nat new_stack_size, new_tso_size, diff, stack_words;
1598   StgPtr new_sp;
1599   StgTSO *dest;
1600
1601   if (tso->stack_size >= tso->max_stack_size) {
1602 #if 0
1603     /* If we're debugging, just print out the top of the stack */
1604     printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
1605                                      tso->sp+64));
1606 #endif
1607 #ifdef INTERPRETER
1608     fprintf(stderr, "fatal: stack overflow in Hugs; aborting\n" );
1609     exit(1);
1610 #else
1611     /* Send this thread the StackOverflow exception */
1612     raiseAsync(tso, (StgClosure *)&stackOverflow_closure);
1613 #endif
1614     return tso;
1615   }
1616
1617   /* Try to double the current stack size.  If that takes us over the
1618    * maximum stack size for this thread, then use the maximum instead.
1619    * Finally round up so the TSO ends up as a whole number of blocks.
1620    */
1621   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
1622   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
1623                                        TSO_STRUCT_SIZE)/sizeof(W_);
1624   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
1625   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
1626
1627   IF_DEBUG(scheduler, fprintf(stderr,"scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
1628
1629   dest = (StgTSO *)allocate(new_tso_size);
1630   TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0);
1631
1632   /* copy the TSO block and the old stack into the new area */
1633   memcpy(dest,tso,TSO_STRUCT_SIZE);
1634   stack_words = tso->stack + tso->stack_size - tso->sp;
1635   new_sp = (P_)dest + new_tso_size - stack_words;
1636   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
1637
1638   /* relocate the stack pointers... */
1639   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
1640   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
1641   dest->sp    = new_sp;
1642   dest->splim = (P_)dest->splim + (nat)((P_)dest - (P_)tso);
1643   dest->stack_size = new_stack_size;
1644         
1645   /* and relocate the update frame list */
1646   relocate_TSO(tso, dest);
1647
1648   /* Mark the old TSO as relocated.  We have to check for relocated
1649    * TSOs in the garbage collector and any primops that deal with TSOs.
1650    *
1651    * It's important to set the sp and su values to just beyond the end
1652    * of the stack, so we don't attempt to scavenge any part of the
1653    * dead TSO's stack.
1654    */
1655   tso->whatNext = ThreadRelocated;
1656   tso->link = dest;
1657   tso->sp = (P_)&(tso->stack[tso->stack_size]);
1658   tso->su = (StgUpdateFrame *)tso->sp;
1659   tso->why_blocked = NotBlocked;
1660   dest->mut_link = NULL;
1661
1662   IF_DEBUG(sanity,checkTSO(tso));
1663 #if 0
1664   IF_DEBUG(scheduler,printTSO(dest));
1665 #endif
1666
1667   return dest;
1668 }
1669
1670 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
1671 //@subsection Blocking Queue Routines
1672
1673 /* ---------------------------------------------------------------------------
1674    Wake up a queue that was blocked on some resource.
1675    ------------------------------------------------------------------------ */
1676
1677 /* ToDo: check push_on_run_queue vs. PUSH_ON_RUN_QUEUE */
1678
1679 #if defined(GRAN)
1680 static inline void
1681 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
1682 {
1683 }
1684 #elif defined(PAR)
1685 static inline void
1686 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
1687 {
1688   /* write RESUME events to log file and
1689      update blocked and fetch time (depending on type of the orig closure) */
1690   if (RtsFlags.ParFlags.ParStats.Full) {
1691     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
1692                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
1693                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
1694
1695     switch (get_itbl(node)->type) {
1696         case FETCH_ME_BQ:
1697           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
1698           break;
1699         case RBH:
1700         case FETCH_ME:
1701         case BLACKHOLE_BQ:
1702           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
1703           break;
1704         default:
1705           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
1706         }
1707       }
1708 }
1709 #endif
1710
1711 #if defined(GRAN)
1712 static StgBlockingQueueElement *
1713 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
1714 {
1715     StgBlockingQueueElement *next;
1716     PEs node_loc, tso_loc;
1717
1718     node_loc = where_is(node); // should be lifted out of loop
1719     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
1720     tso_loc = where_is(tso);
1721     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
1722       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
1723       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
1724       bq_processing_time += RtsFlags.GranFlags.Costs.lunblocktime;
1725       // insertThread(tso, node_loc);
1726       new_event(tso_loc, tso_loc,
1727                 CurrentTime[CurrentProc]+bq_processing_time,
1728                 ResumeThread,
1729                 tso, node, (rtsSpark*)NULL);
1730       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
1731       // len_local++;
1732       // len++;
1733     } else { // TSO is remote (actually should be FMBQ)
1734       bq_processing_time += RtsFlags.GranFlags.Costs.mpacktime;
1735       bq_processing_time += RtsFlags.GranFlags.Costs.gunblocktime;
1736       new_event(tso_loc, CurrentProc, 
1737                 CurrentTime[CurrentProc]+bq_processing_time+
1738                 RtsFlags.GranFlags.Costs.latency,
1739                 UnblockThread,
1740                 tso, node, (rtsSpark*)NULL);
1741       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
1742       bq_processing_time += RtsFlags.GranFlags.Costs.mtidytime;
1743       // len++;
1744     }      
1745     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
1746     IF_GRAN_DEBUG(bq,
1747                   fprintf(stderr," %s TSO %d (%p) [PE %d] (blocked_on=%p) (next=%p) ,",
1748                           (node_loc==tso_loc ? "Local" : "Global"), 
1749                           tso->id, tso, CurrentProc, tso->blocked_on, tso->link))
1750     tso->blocked_on = NULL;
1751     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
1752                              tso->id, tso));
1753   }
1754
1755   /* if this is the BQ of an RBH, we have to put back the info ripped out of
1756      the closure to make room for the anchor of the BQ */
1757   if (next!=END_BQ_QUEUE) {
1758     ASSERT(get_itbl(node)->type == RBH && get_itbl(next)->type == CONSTR);
1759     /*
1760     ASSERT((info_ptr==&RBH_Save_0_info) ||
1761            (info_ptr==&RBH_Save_1_info) ||
1762            (info_ptr==&RBH_Save_2_info));
1763     */
1764     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
1765     ((StgRBH *)node)->blocking_queue = ((StgRBHSave *)next)->payload[0];
1766     ((StgRBH *)node)->mut_link       = ((StgRBHSave *)next)->payload[1];
1767
1768     IF_GRAN_DEBUG(bq,
1769                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
1770                         node, info_type(node)));
1771   }
1772 }
1773 #elif defined(PAR)
1774 static StgBlockingQueueElement *
1775 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
1776 {
1777     StgBlockingQueueElement *next;
1778
1779     switch (get_itbl(bqe)->type) {
1780     case TSO:
1781       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
1782       /* if it's a TSO just push it onto the run_queue */
1783       next = bqe->link;
1784       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
1785       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
1786       THREAD_RUNNABLE();
1787       unblockCount(bqe, node);
1788       /* reset blocking status after dumping event */
1789       ((StgTSO *)bqe)->why_blocked = NotBlocked;
1790       break;
1791
1792     case BLOCKED_FETCH:
1793       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
1794       next = bqe->link;
1795       bqe->link = PendingFetches;
1796       PendingFetches = bqe;
1797       break;
1798
1799 # if defined(DEBUG)
1800       /* can ignore this case in a non-debugging setup; 
1801          see comments on RBHSave closures above */
1802     case CONSTR:
1803       /* check that the closure is an RBHSave closure */
1804       ASSERT(get_itbl((StgClosure *)bqe) == &RBH_Save_0_info ||
1805              get_itbl((StgClosure *)bqe) == &RBH_Save_1_info ||
1806              get_itbl((StgClosure *)bqe) == &RBH_Save_2_info);
1807       break;
1808
1809     default:
1810       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
1811            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
1812            (StgClosure *)bqe);
1813 # endif
1814     }
1815   // IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
1816   return next;
1817 }
1818
1819 #else /* !GRAN && !PAR */
1820 static StgTSO *
1821 unblockOneLocked(StgTSO *tso)
1822 {
1823   StgTSO *next;
1824
1825   ASSERT(get_itbl(tso)->type == TSO);
1826   ASSERT(tso->why_blocked != NotBlocked);
1827   tso->why_blocked = NotBlocked;
1828   next = tso->link;
1829   PUSH_ON_RUN_QUEUE(tso);
1830   THREAD_RUNNABLE();
1831   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
1832   return next;
1833 }
1834 #endif
1835
1836 #if defined(GRAN)
1837 inline StgTSO *
1838 unblockOne(StgTSO *tso, StgClosure *node)
1839 {
1840   ACQUIRE_LOCK(&sched_mutex);
1841   tso = unblockOneLocked(tso, node);
1842   RELEASE_LOCK(&sched_mutex);
1843   return tso;
1844 }
1845 #elif defined(PAR)
1846 inline StgTSO *
1847 unblockOne(StgTSO *tso, StgClosure *node)
1848 {
1849   ACQUIRE_LOCK(&sched_mutex);
1850   tso = unblockOneLocked(tso, node);
1851   RELEASE_LOCK(&sched_mutex);
1852   return tso;
1853 }
1854 #else
1855 inline StgTSO *
1856 unblockOne(StgTSO *tso)
1857 {
1858   ACQUIRE_LOCK(&sched_mutex);
1859   tso = unblockOneLocked(tso);
1860   RELEASE_LOCK(&sched_mutex);
1861   return tso;
1862 }
1863 #endif
1864
1865 #if defined(GRAN)
1866 void 
1867 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
1868 {
1869   StgBlockingQueueElement *bqe, *next;
1870   StgTSO *tso;
1871   PEs node_loc, tso_loc;
1872   rtsTime bq_processing_time = 0;
1873   nat len = 0, len_local = 0;
1874
1875   IF_GRAN_DEBUG(bq, 
1876                 belch("## AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
1877                       node, CurrentProc, CurrentTime[CurrentProc], 
1878                       CurrentTSO->id, CurrentTSO));
1879
1880   node_loc = where_is(node);
1881
1882   ASSERT(get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
1883          get_itbl(q)->type == CONSTR); // closure (type constructor)
1884   ASSERT(is_unique(node));
1885
1886   /* FAKE FETCH: magically copy the node to the tso's proc;
1887      no Fetch necessary because in reality the node should not have been 
1888      moved to the other PE in the first place
1889   */
1890   if (CurrentProc!=node_loc) {
1891     IF_GRAN_DEBUG(bq, 
1892                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
1893                         node, node_loc, CurrentProc, CurrentTSO->id, 
1894                         // CurrentTSO, where_is(CurrentTSO),
1895                         node->header.gran.procs));
1896     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
1897     IF_GRAN_DEBUG(bq, 
1898                   belch("## new bitmask of node %p is %#x",
1899                         node, node->header.gran.procs));
1900     if (RtsFlags.GranFlags.GranSimStats.Global) {
1901       globalGranStats.tot_fake_fetches++;
1902     }
1903   }
1904
1905   bqe = q;
1906   // ToDo: check: ASSERT(CurrentProc==node_loc);
1907   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
1908     //next = bqe->link;
1909     /* 
1910        bqe points to the current element in the queue
1911        next points to the next element in the queue
1912     */
1913     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
1914     //tso_loc = where_is(tso);
1915     bqe = unblockOneLocked(bqe, node);
1916   }
1917
1918   /* statistics gathering */
1919   /* ToDo: fix counters
1920   if (RtsFlags.GranFlags.GranSimStats.Global) {
1921     globalGranStats.tot_bq_processing_time += bq_processing_time;
1922     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
1923     globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
1924     globalGranStats.tot_awbq++;             // total no. of bqs awakened
1925   }
1926   IF_GRAN_DEBUG(bq,
1927                 fprintf(stderr,"## BQ Stats of %p: [%d entries, %d local] %s\n",
1928                         node, len, len_local, (next!=END_TSO_QUEUE) ? "RBH" : ""));
1929   */
1930 }
1931 #elif defined(PAR)
1932 void 
1933 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
1934 {
1935   StgBlockingQueueElement *bqe, *next;
1936
1937   ACQUIRE_LOCK(&sched_mutex);
1938
1939   IF_PAR_DEBUG(verbose, 
1940                belch("## AwBQ for node %p on [%x]: ",
1941                      node, mytid));
1942
1943   ASSERT(get_itbl(q)->type == TSO ||           
1944          get_itbl(q)->type == BLOCKED_FETCH || 
1945          get_itbl(q)->type == CONSTR); 
1946
1947   bqe = q;
1948   while (get_itbl(bqe)->type==TSO || 
1949          get_itbl(bqe)->type==BLOCKED_FETCH) {
1950     bqe = unblockOneLocked(bqe, node);
1951   }
1952   RELEASE_LOCK(&sched_mutex);
1953 }
1954
1955 #else   /* !GRAN && !PAR */
1956 void
1957 awakenBlockedQueue(StgTSO *tso)
1958 {
1959   ACQUIRE_LOCK(&sched_mutex);
1960   while (tso != END_TSO_QUEUE) {
1961     tso = unblockOneLocked(tso);
1962   }
1963   RELEASE_LOCK(&sched_mutex);
1964 }
1965 #endif
1966
1967 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
1968 //@subsection Exception Handling Routines
1969
1970 /* ---------------------------------------------------------------------------
1971    Interrupt execution
1972    - usually called inside a signal handler so it mustn't do anything fancy.   
1973    ------------------------------------------------------------------------ */
1974
1975 void
1976 interruptStgRts(void)
1977 {
1978     interrupted    = 1;
1979     context_switch = 1;
1980 }
1981
1982 /* -----------------------------------------------------------------------------
1983    Unblock a thread
1984
1985    This is for use when we raise an exception in another thread, which
1986    may be blocked.
1987    This has nothing to do with the UnblockThread event in GranSim. -- HWL
1988    -------------------------------------------------------------------------- */
1989
1990 static void
1991 unblockThread(StgTSO *tso)
1992 {
1993   StgTSO *t, **last;
1994
1995   ACQUIRE_LOCK(&sched_mutex);
1996   switch (tso->why_blocked) {
1997
1998   case NotBlocked:
1999     return;  /* not blocked */
2000
2001   case BlockedOnMVar:
2002     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2003     {
2004       StgTSO *last_tso = END_TSO_QUEUE;
2005       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2006
2007       last = &mvar->head;
2008       for (t = mvar->head; t != END_TSO_QUEUE; 
2009            last = &t->link, last_tso = t, t = t->link) {
2010         if (t == tso) {
2011           *last = tso->link;
2012           if (mvar->tail == tso) {
2013             mvar->tail = last_tso;
2014           }
2015           goto done;
2016         }
2017       }
2018       barf("unblockThread (MVAR): TSO not found");
2019     }
2020
2021   case BlockedOnBlackHole:
2022     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2023     {
2024       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2025
2026       last = &bq->blocking_queue;
2027       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2028            last = &t->link, t = t->link) {
2029         if (t == tso) {
2030           *last = tso->link;
2031           goto done;
2032         }
2033       }
2034       barf("unblockThread (BLACKHOLE): TSO not found");
2035     }
2036
2037   case BlockedOnException:
2038     {
2039       StgTSO *target  = tso->block_info.tso;
2040
2041       ASSERT(get_itbl(target)->type == TSO);
2042       ASSERT(target->blocked_exceptions != NULL);
2043
2044       last = &target->blocked_exceptions;
2045       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2046            last = &t->link, t = t->link) {
2047         ASSERT(get_itbl(t)->type == TSO);
2048         if (t == tso) {
2049           *last = tso->link;
2050           goto done;
2051         }
2052       }
2053       barf("unblockThread (Exception): TSO not found");
2054     }
2055
2056   case BlockedOnDelay:
2057   case BlockedOnRead:
2058   case BlockedOnWrite:
2059     {
2060       StgTSO *prev = NULL;
2061       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2062            prev = t, t = t->link) {
2063         if (t == tso) {
2064           if (prev == NULL) {
2065             blocked_queue_hd = t->link;
2066             if (blocked_queue_tl == t) {
2067               blocked_queue_tl = END_TSO_QUEUE;
2068             }
2069           } else {
2070             prev->link = t->link;
2071             if (blocked_queue_tl == t) {
2072               blocked_queue_tl = prev;
2073             }
2074           }
2075           goto done;
2076         }
2077       }
2078       barf("unblockThread (I/O): TSO not found");
2079     }
2080
2081   default:
2082     barf("unblockThread");
2083   }
2084
2085  done:
2086   tso->link = END_TSO_QUEUE;
2087   tso->why_blocked = NotBlocked;
2088   tso->block_info.closure = NULL;
2089   PUSH_ON_RUN_QUEUE(tso);
2090   RELEASE_LOCK(&sched_mutex);
2091 }
2092
2093 /* -----------------------------------------------------------------------------
2094  * raiseAsync()
2095  *
2096  * The following function implements the magic for raising an
2097  * asynchronous exception in an existing thread.
2098  *
2099  * We first remove the thread from any queue on which it might be
2100  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
2101  *
2102  * We strip the stack down to the innermost CATCH_FRAME, building
2103  * thunks in the heap for all the active computations, so they can 
2104  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
2105  * an application of the handler to the exception, and push it on
2106  * the top of the stack.
2107  * 
2108  * How exactly do we save all the active computations?  We create an
2109  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
2110  * AP_UPDs pushes everything from the corresponding update frame
2111  * upwards onto the stack.  (Actually, it pushes everything up to the
2112  * next update frame plus a pointer to the next AP_UPD object.
2113  * Entering the next AP_UPD object pushes more onto the stack until we
2114  * reach the last AP_UPD object - at which point the stack should look
2115  * exactly as it did when we killed the TSO and we can continue
2116  * execution by entering the closure on top of the stack.
2117  *
2118  * We can also kill a thread entirely - this happens if either (a) the 
2119  * exception passed to raiseAsync is NULL, or (b) there's no
2120  * CATCH_FRAME on the stack.  In either case, we strip the entire
2121  * stack and replace the thread with a zombie.
2122  *
2123  * -------------------------------------------------------------------------- */
2124  
2125 void 
2126 deleteThread(StgTSO *tso)
2127 {
2128   raiseAsync(tso,NULL);
2129 }
2130
2131 void
2132 raiseAsync(StgTSO *tso, StgClosure *exception)
2133 {
2134   StgUpdateFrame* su = tso->su;
2135   StgPtr          sp = tso->sp;
2136   
2137   /* Thread already dead? */
2138   if (tso->whatNext == ThreadComplete || tso->whatNext == ThreadKilled) {
2139     return;
2140   }
2141
2142   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
2143
2144   /* Remove it from any blocking queues */
2145   unblockThread(tso);
2146
2147   /* The stack freezing code assumes there's a closure pointer on
2148    * the top of the stack.  This isn't always the case with compiled
2149    * code, so we have to push a dummy closure on the top which just
2150    * returns to the next return address on the stack.
2151    */
2152   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
2153     *(--sp) = (W_)&dummy_ret_closure;
2154   }
2155
2156   while (1) {
2157     int words = ((P_)su - (P_)sp) - 1;
2158     nat i;
2159     StgAP_UPD * ap;
2160
2161     /* If we find a CATCH_FRAME, and we've got an exception to raise,
2162      * then build PAP(handler,exception), and leave it on top of
2163      * the stack ready to enter.
2164      */
2165     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
2166       StgCatchFrame *cf = (StgCatchFrame *)su;
2167       /* we've got an exception to raise, so let's pass it to the
2168        * handler in this frame.
2169        */
2170       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 1);
2171       TICK_ALLOC_UPD_PAP(2,0);
2172       SET_HDR(ap,&PAP_info,cf->header.prof.ccs);
2173               
2174       ap->n_args = 1;
2175       ap->fun = cf->handler;
2176       ap->payload[0] = (P_)exception;
2177
2178       /* sp currently points to the word above the CATCH_FRAME on the stack.
2179        */
2180       sp += sizeofW(StgCatchFrame);
2181       tso->su = cf->link;
2182
2183       /* Restore the blocked/unblocked state for asynchronous exceptions
2184        * at the CATCH_FRAME.  
2185        *
2186        * If exceptions were unblocked at the catch, arrange that they
2187        * are unblocked again after executing the handler by pushing an
2188        * unblockAsyncExceptions_ret stack frame.
2189        */
2190       if (!cf->exceptions_blocked) {
2191         *(sp--) = (W_)&unblockAsyncExceptionszh_ret_info;
2192       }
2193       
2194       /* Ensure that async exceptions are blocked when running the handler.
2195        */
2196       if (tso->blocked_exceptions == NULL) {
2197         tso->blocked_exceptions = END_TSO_QUEUE;
2198       }
2199       
2200       /* Put the newly-built PAP on top of the stack, ready to execute
2201        * when the thread restarts.
2202        */
2203       sp[0] = (W_)ap;
2204       tso->sp = sp;
2205       tso->whatNext = ThreadEnterGHC;
2206       return;
2207     }
2208
2209     /* First build an AP_UPD consisting of the stack chunk above the
2210      * current update frame, with the top word on the stack as the
2211      * fun field.
2212      */
2213     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
2214     
2215     ASSERT(words >= 0);
2216     
2217     ap->n_args = words;
2218     ap->fun    = (StgClosure *)sp[0];
2219     sp++;
2220     for(i=0; i < (nat)words; ++i) {
2221       ap->payload[i] = (P_)*sp++;
2222     }
2223     
2224     switch (get_itbl(su)->type) {
2225       
2226     case UPDATE_FRAME:
2227       {
2228         SET_HDR(ap,&AP_UPD_info,su->header.prof.ccs /* ToDo */); 
2229         TICK_ALLOC_UP_THK(words+1,0);
2230         
2231         IF_DEBUG(scheduler,
2232                  fprintf(stderr,  "scheduler: Updating ");
2233                  printPtr((P_)su->updatee); 
2234                  fprintf(stderr,  " with ");
2235                  printObj((StgClosure *)ap);
2236                  );
2237         
2238         /* Replace the updatee with an indirection - happily
2239          * this will also wake up any threads currently
2240          * waiting on the result.
2241          */
2242         UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
2243         su = su->link;
2244         sp += sizeofW(StgUpdateFrame) -1;
2245         sp[0] = (W_)ap; /* push onto stack */
2246         break;
2247       }
2248       
2249     case CATCH_FRAME:
2250       {
2251         StgCatchFrame *cf = (StgCatchFrame *)su;
2252         StgClosure* o;
2253         
2254         /* We want a PAP, not an AP_UPD.  Fortunately, the
2255          * layout's the same.
2256          */
2257         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2258         TICK_ALLOC_UPD_PAP(words+1,0);
2259         
2260         /* now build o = FUN(catch,ap,handler) */
2261         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
2262         TICK_ALLOC_FUN(2,0);
2263         SET_HDR(o,&catch_info,su->header.prof.ccs /* ToDo */);
2264         o->payload[0] = (StgClosure *)ap;
2265         o->payload[1] = cf->handler;
2266         
2267         IF_DEBUG(scheduler,
2268                  fprintf(stderr,  "scheduler: Built ");
2269                  printObj((StgClosure *)o);
2270                  );
2271         
2272         /* pop the old handler and put o on the stack */
2273         su = cf->link;
2274         sp += sizeofW(StgCatchFrame) - 1;
2275         sp[0] = (W_)o;
2276         break;
2277       }
2278       
2279     case SEQ_FRAME:
2280       {
2281         StgSeqFrame *sf = (StgSeqFrame *)su;
2282         StgClosure* o;
2283         
2284         SET_HDR(ap,&PAP_info,su->header.prof.ccs /* ToDo */);
2285         TICK_ALLOC_UPD_PAP(words+1,0);
2286         
2287         /* now build o = FUN(seq,ap) */
2288         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
2289         TICK_ALLOC_SE_THK(1,0);
2290         SET_HDR(o,&seq_info,su->header.prof.ccs /* ToDo */);
2291         payloadCPtr(o,0) = (StgClosure *)ap;
2292         
2293         IF_DEBUG(scheduler,
2294                  fprintf(stderr,  "scheduler: Built ");
2295                  printObj((StgClosure *)o);
2296                  );
2297         
2298         /* pop the old handler and put o on the stack */
2299         su = sf->link;
2300         sp += sizeofW(StgSeqFrame) - 1;
2301         sp[0] = (W_)o;
2302         break;
2303       }
2304       
2305     case STOP_FRAME:
2306       /* We've stripped the entire stack, the thread is now dead. */
2307       sp += sizeofW(StgStopFrame) - 1;
2308       sp[0] = (W_)exception;    /* save the exception */
2309       tso->whatNext = ThreadKilled;
2310       tso->su = (StgUpdateFrame *)(sp+1);
2311       tso->sp = sp;
2312       return;
2313       
2314     default:
2315       barf("raiseAsync");
2316     }
2317   }
2318   barf("raiseAsync");
2319 }
2320
2321 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
2322 //@subsection Debugging Routines
2323
2324 /* -----------------------------------------------------------------------------
2325    Debugging: why is a thread blocked
2326    -------------------------------------------------------------------------- */
2327
2328 #ifdef DEBUG
2329
2330 void printThreadBlockage(StgTSO *tso)
2331 {
2332   switch (tso->why_blocked) {
2333   case BlockedOnRead:
2334     fprintf(stderr,"blocked on read from fd %d", tso->block_info.fd);
2335     break;
2336   case BlockedOnWrite:
2337     fprintf(stderr,"blocked on write to fd %d", tso->block_info.fd);
2338     break;
2339   case BlockedOnDelay:
2340     fprintf(stderr,"blocked on delay of %d ms", tso->block_info.delay);
2341     break;
2342   case BlockedOnMVar:
2343     fprintf(stderr,"blocked on an MVar");
2344     break;
2345   case BlockedOnException:
2346     fprintf(stderr,"blocked on delivering an exception to thread %d",
2347             tso->block_info.tso->id);
2348     break;
2349   case BlockedOnBlackHole:
2350     fprintf(stderr,"blocked on a black hole");
2351     break;
2352   case NotBlocked:
2353     fprintf(stderr,"not blocked");
2354     break;
2355 #if defined(PAR)
2356   case BlockedOnGA:
2357     fprintf(stderr,"blocked on global address");
2358     break;
2359 #endif
2360   }
2361 }
2362
2363 /* 
2364    Print a whole blocking queue attached to node (debugging only).
2365 */
2366 //@cindex print_bq
2367 # if defined(PAR)
2368 void 
2369 print_bq (StgClosure *node)
2370 {
2371   StgBlockingQueueElement *bqe;
2372   StgTSO *tso;
2373   rtsBool end;
2374
2375   fprintf(stderr,"## BQ of closure %p (%s): ",
2376           node, info_type(node));
2377
2378   /* should cover all closures that may have a blocking queue */
2379   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2380          get_itbl(node)->type == FETCH_ME_BQ ||
2381          get_itbl(node)->type == RBH);
2382     
2383   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2384   /* 
2385      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2386   */
2387   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2388        !end; // iterate until bqe points to a CONSTR
2389        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2390     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2391     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2392     /* types of closures that may appear in a blocking queue */
2393     ASSERT(get_itbl(bqe)->type == TSO ||           
2394            get_itbl(bqe)->type == BLOCKED_FETCH || 
2395            get_itbl(bqe)->type == CONSTR); 
2396     /* only BQs of an RBH end with an RBH_Save closure */
2397     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2398
2399     switch (get_itbl(bqe)->type) {
2400     case TSO:
2401       fprintf(stderr," TSO %d (%x),",
2402               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
2403       break;
2404     case BLOCKED_FETCH:
2405       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
2406               ((StgBlockedFetch *)bqe)->node, 
2407               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
2408               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
2409               ((StgBlockedFetch *)bqe)->ga.weight);
2410       break;
2411     case CONSTR:
2412       fprintf(stderr," %s (IP %p),",
2413               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2414                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2415                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2416                "RBH_Save_?"), get_itbl(bqe));
2417       break;
2418     default:
2419       barf("Unexpected closure type %s in blocking queue of %p (%s)",
2420            info_type(bqe), node, info_type(node));
2421       break;
2422     }
2423   } /* for */
2424   fputc('\n', stderr);
2425 }
2426 # elif defined(GRAN)
2427 void 
2428 print_bq (StgClosure *node)
2429 {
2430   StgBlockingQueueElement *bqe;
2431   StgTSO *tso;
2432   PEs node_loc, tso_loc;
2433   rtsBool end;
2434
2435   /* should cover all closures that may have a blocking queue */
2436   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
2437          get_itbl(node)->type == FETCH_ME_BQ ||
2438          get_itbl(node)->type == RBH);
2439     
2440   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2441   node_loc = where_is(node);
2442
2443   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
2444           node, info_type(node), node_loc);
2445
2446   /* 
2447      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
2448   */
2449   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
2450        !end; // iterate until bqe points to a CONSTR
2451        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
2452     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
2453     ASSERT(bqe != (StgTSO*)NULL);            // sanity check
2454     /* types of closures that may appear in a blocking queue */
2455     ASSERT(get_itbl(bqe)->type == TSO ||           
2456            get_itbl(bqe)->type == CONSTR); 
2457     /* only BQs of an RBH end with an RBH_Save closure */
2458     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
2459
2460     tso_loc = where_is((StgClosure *)bqe);
2461     switch (get_itbl(bqe)->type) {
2462     case TSO:
2463       fprintf(stderr," TSO %d (%x) on [PE %d],",
2464               ((StgTSO *)bqe)->id, ((StgTSO *)bqe), tso_loc);
2465       break;
2466     case CONSTR:
2467       fprintf(stderr," %s (IP %p),",
2468               (get_itbl(bqe) == &RBH_Save_0_info ? "RBH_Save_0" :
2469                get_itbl(bqe) == &RBH_Save_1_info ? "RBH_Save_1" :
2470                get_itbl(bqe) == &RBH_Save_2_info ? "RBH_Save_2" :
2471                "RBH_Save_?"), get_itbl(bqe));
2472       break;
2473     default:
2474       barf("Unexpected closure type %s in blocking queue of %p (%s)",
2475            info_type(bqe), node, info_type(node));
2476       break;
2477     }
2478   } /* for */
2479   fputc('\n', stderr);
2480 }
2481 #else
2482 /* 
2483    Nice and easy: only TSOs on the blocking queue
2484 */
2485 void 
2486 print_bq (StgClosure *node)
2487 {
2488   StgTSO *tso;
2489
2490   ASSERT(node!=(StgClosure*)NULL);         // sanity check
2491   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
2492        tso != END_TSO_QUEUE; 
2493        tso=tso->link) {
2494     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
2495     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
2496     fprintf(stderr," TSO %d (%p),", tso->id, tso);
2497   }
2498   fputc('\n', stderr);
2499 }
2500 # endif
2501
2502 #if defined(PAR)
2503 static nat
2504 run_queue_len(void)
2505 {
2506   nat i;
2507   StgTSO *tso;
2508
2509   for (i=0, tso=run_queue_hd; 
2510        tso != END_TSO_QUEUE;
2511        i++, tso=tso->link)
2512     /* nothing */
2513
2514   return i;
2515 }
2516 #endif
2517
2518 static void
2519 sched_belch(char *s, ...)
2520 {
2521   va_list ap;
2522   va_start(ap,s);
2523 #ifdef SMP
2524   fprintf(stderr, "scheduler (task %ld): ", pthread_self());
2525 #else
2526   fprintf(stderr, "scheduler: ");
2527 #endif
2528   vfprintf(stderr, s, ap);
2529   fprintf(stderr, "\n");
2530 }
2531
2532 #endif /* DEBUG */
2533
2534 //@node Index,  , Debugging Routines, Main scheduling code
2535 //@subsection Index
2536
2537 //@index
2538 //* MainRegTable::  @cindex\s-+MainRegTable
2539 //* StgMainThread::  @cindex\s-+StgMainThread
2540 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
2541 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
2542 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
2543 //* context_switch::  @cindex\s-+context_switch
2544 //* createThread::  @cindex\s-+createThread
2545 //* free_capabilities::  @cindex\s-+free_capabilities
2546 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
2547 //* initScheduler::  @cindex\s-+initScheduler
2548 //* interrupted::  @cindex\s-+interrupted
2549 //* n_free_capabilities::  @cindex\s-+n_free_capabilities
2550 //* next_thread_id::  @cindex\s-+next_thread_id
2551 //* print_bq::  @cindex\s-+print_bq
2552 //* run_queue_hd::  @cindex\s-+run_queue_hd
2553 //* run_queue_tl::  @cindex\s-+run_queue_tl
2554 //* sched_mutex::  @cindex\s-+sched_mutex
2555 //* schedule::  @cindex\s-+schedule
2556 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
2557 //* task_ids::  @cindex\s-+task_ids
2558 //* term_mutex::  @cindex\s-+term_mutex
2559 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
2560 //@end index