[project @ 2002-02-15 20:58:14 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.128 2002/02/15 20:58:14 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #include <stdarg.h>
118
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
121
122 /* Main threads:
123  *
124  * These are the threads which clients have requested that we run.  
125  *
126  * In a 'threaded' build, we might have several concurrent clients all
127  * waiting for results, and each one will wait on a condition variable
128  * until the result is available.
129  *
130  * In non-SMP, clients are strictly nested: the first client calls
131  * into the RTS, which might call out again to C with a _ccall_GC, and
132  * eventually re-enter the RTS.
133  *
134  * Main threads information is kept in a linked list:
135  */
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
138   StgTSO *         tso;
139   SchedulerStatus  stat;
140   StgClosure **    ret;
141 #if defined(RTS_SUPPORTS_THREADS)
142   Condition        wakeup;
143 #endif
144   struct StgMainThread_ *link;
145 } StgMainThread;
146
147 /* Main thread queue.
148  * Locks required: sched_mutex.
149  */
150 static StgMainThread *main_threads;
151
152 /* Thread queues.
153  * Locks required: sched_mutex.
154  */
155 #if defined(GRAN)
156
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
159
160 /* 
161    In GranSim we have a runnable and a blocked queue for each processor.
162    In order to minimise code changes new arrays run_queue_hds/tls
163    are created. run_queue_hd is then a short cut (macro) for
164    run_queue_hds[CurrentProc] (see GranSim.h).
165    -- HWL
166 */
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171    the std RTS (i.e. we are cheating). However, we don't use this list in
172    the GranSim specific code at the moment (so we are only potentially
173    cheating).  */
174
175 #else /* !GRAN */
176
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
180
181 #endif
182
183 /* Linked list of all threads.
184  * Used for detecting garbage collected threads.
185  */
186 StgTSO *all_threads;
187
188 /* When a thread performs a safe C call (_ccall_GC, using old
189  * terminology), it gets put on the suspended_ccalling_threads
190  * list. Used by the garbage collector.
191  */
192 static StgTSO *suspended_ccalling_threads;
193
194 static StgTSO *threadStackOverflow(StgTSO *tso);
195
196 /* KH: The following two flags are shared memory locations.  There is no need
197        to lock them, since they are only unset at the end of a scheduler
198        operation.
199 */
200
201 /* flag set by signal handler to precipitate a context switch */
202 //@cindex context_switch
203 nat context_switch;
204
205 /* if this flag is set as well, give up execution */
206 //@cindex interrupted
207 rtsBool interrupted;
208
209 /* Next thread ID to allocate.
210  * Locks required: sched_mutex
211  */
212 //@cindex next_thread_id
213 StgThreadID next_thread_id = 1;
214
215 /*
216  * Pointers to the state of the current thread.
217  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
218  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
219  */
220  
221 /* The smallest stack size that makes any sense is:
222  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
223  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
224  *  + 1                       (the realworld token for an IO thread)
225  *  + 1                       (the closure to enter)
226  *
227  * A thread with this stack will bomb immediately with a stack
228  * overflow, which will increase its stack size.  
229  */
230
231 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
232
233
234 #if defined(GRAN)
235 StgTSO *CurrentTSO;
236 #endif
237
238 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
239  *  exists - earlier gccs apparently didn't.
240  *  -= chak
241  */
242 StgTSO dummy_tso;
243
244 rtsBool ready_to_gc;
245
246 void            addToBlockedQueue ( StgTSO *tso );
247
248 static void     schedule          ( void );
249        void     interruptStgRts   ( void );
250 #if defined(GRAN)
251 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
252 #else
253 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
254 #endif
255
256 static void     detectBlackHoles  ( void );
257
258 #ifdef DEBUG
259 static void sched_belch(char *s, ...);
260 #endif
261
262 #if defined(RTS_SUPPORTS_THREADS)
263 /* ToDo: carefully document the invariants that go together
264  *       with these synchronisation objects.
265  */
266 Mutex     sched_mutex       = INIT_MUTEX_VAR;
267 Mutex     term_mutex        = INIT_MUTEX_VAR;
268
269 # if defined(SMP)
270 static Condition gc_pending_cond = INIT_COND_VAR;
271 nat await_death;
272 # endif
273
274 #endif /* RTS_SUPPORTS_THREADS */
275
276 #if defined(PAR)
277 StgTSO *LastTSO;
278 rtsTime TimeOfLastYield;
279 rtsBool emitSchedule = rtsTrue;
280 #endif
281
282 #if DEBUG
283 char *whatNext_strs[] = {
284   "ThreadEnterGHC",
285   "ThreadRunGHC",
286   "ThreadEnterInterp",
287   "ThreadKilled",
288   "ThreadComplete"
289 };
290
291 char *threadReturnCode_strs[] = {
292   "HeapOverflow",                       /* might also be StackOverflow */
293   "StackOverflow",
294   "ThreadYielding",
295   "ThreadBlocked",
296   "ThreadFinished"
297 };
298 #endif
299
300 #if defined(PAR)
301 StgTSO * createSparkThread(rtsSpark spark);
302 StgTSO * activateSpark (rtsSpark spark);  
303 #endif
304
305 /*
306  * The thread state for the main thread.
307 // ToDo: check whether not needed any more
308 StgTSO   *MainTSO;
309  */
310
311 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
312 static void taskStart(void);
313 static void
314 taskStart(void)
315 {
316   schedule();
317 }
318 #endif
319
320
321
322
323 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
324 //@subsection Main scheduling loop
325
326 /* ---------------------------------------------------------------------------
327    Main scheduling loop.
328
329    We use round-robin scheduling, each thread returning to the
330    scheduler loop when one of these conditions is detected:
331
332       * out of heap space
333       * timer expires (thread yields)
334       * thread blocks
335       * thread ends
336       * stack overflow
337
338    Locking notes:  we acquire the scheduler lock once at the beginning
339    of the scheduler loop, and release it when
340     
341       * running a thread, or
342       * waiting for work, or
343       * waiting for a GC to complete.
344
345    GRAN version:
346      In a GranSim setup this loop iterates over the global event queue.
347      This revolves around the global event queue, which determines what 
348      to do next. Therefore, it's more complicated than either the 
349      concurrent or the parallel (GUM) setup.
350
351    GUM version:
352      GUM iterates over incoming messages.
353      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
354      and sends out a fish whenever it has nothing to do; in-between
355      doing the actual reductions (shared code below) it processes the
356      incoming messages and deals with delayed operations 
357      (see PendingFetches).
358      This is not the ugliest code you could imagine, but it's bloody close.
359
360    ------------------------------------------------------------------------ */
361 //@cindex schedule
362 static void
363 schedule( void )
364 {
365   StgTSO *t;
366   Capability *cap;
367   StgThreadReturnCode ret;
368 #if defined(GRAN)
369   rtsEvent *event;
370 #elif defined(PAR)
371   StgSparkPool *pool;
372   rtsSpark spark;
373   StgTSO *tso;
374   GlobalTaskId pe;
375   rtsBool receivedFinish = rtsFalse;
376 # if defined(DEBUG)
377   nat tp_size, sp_size; // stats only
378 # endif
379 #endif
380   rtsBool was_interrupted = rtsFalse;
381   
382   ACQUIRE_LOCK(&sched_mutex);
383  
384 #if defined(RTS_SUPPORTS_THREADS)
385   /* Check to see whether there are any worker threads
386      waiting to deposit external call results. If so,
387      yield our capability */
388   yieldToReturningWorker(&sched_mutex, cap);
389
390   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
391 #endif
392
393 #if defined(GRAN)
394   /* set up first event to get things going */
395   /* ToDo: assign costs for system setup and init MainTSO ! */
396   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
397             ContinueThread, 
398             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
399
400   IF_DEBUG(gran,
401            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
402            G_TSO(CurrentTSO, 5));
403
404   if (RtsFlags.GranFlags.Light) {
405     /* Save current time; GranSim Light only */
406     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
407   }      
408
409   event = get_next_event();
410
411   while (event!=(rtsEvent*)NULL) {
412     /* Choose the processor with the next event */
413     CurrentProc = event->proc;
414     CurrentTSO = event->tso;
415
416 #elif defined(PAR)
417
418   while (!receivedFinish) {    /* set by processMessages */
419                                /* when receiving PP_FINISH message         */ 
420 #else
421
422   while (1) {
423
424 #endif
425
426     IF_DEBUG(scheduler, printAllThreads());
427
428     /* If we're interrupted (the user pressed ^C, or some other
429      * termination condition occurred), kill all the currently running
430      * threads.
431      */
432     if (interrupted) {
433       IF_DEBUG(scheduler, sched_belch("interrupted"));
434       deleteAllThreads();
435       interrupted = rtsFalse;
436       was_interrupted = rtsTrue;
437     }
438
439     /* Go through the list of main threads and wake up any
440      * clients whose computations have finished.  ToDo: this
441      * should be done more efficiently without a linear scan
442      * of the main threads list, somehow...
443      */
444 #if defined(RTS_SUPPORTS_THREADS)
445     { 
446       StgMainThread *m, **prev;
447       prev = &main_threads;
448       for (m = main_threads; m != NULL; m = m->link) {
449         switch (m->tso->what_next) {
450         case ThreadComplete:
451           if (m->ret) {
452             *(m->ret) = (StgClosure *)m->tso->sp[0];
453           }
454           *prev = m->link;
455           m->stat = Success;
456           broadcastCondition(&m->wakeup);
457           break;
458         case ThreadKilled:
459           if (m->ret) *(m->ret) = NULL;
460           *prev = m->link;
461           if (was_interrupted) {
462             m->stat = Interrupted;
463           } else {
464             m->stat = Killed;
465           }
466           broadcastCondition(&m->wakeup);
467           break;
468         default:
469           break;
470         }
471       }
472     }
473
474 #else /* not threaded */
475
476 # if defined(PAR)
477     /* in GUM do this only on the Main PE */
478     if (IAmMainThread)
479 # endif
480     /* If our main thread has finished or been killed, return.
481      */
482     {
483       StgMainThread *m = main_threads;
484       if (m->tso->what_next == ThreadComplete
485           || m->tso->what_next == ThreadKilled) {
486         main_threads = main_threads->link;
487         if (m->tso->what_next == ThreadComplete) {
488           /* we finished successfully, fill in the return value */
489           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
490           m->stat = Success;
491           return;
492         } else {
493           if (m->ret) { *(m->ret) = NULL; };
494           if (was_interrupted) {
495             m->stat = Interrupted;
496           } else {
497             m->stat = Killed;
498           }
499           return;
500         }
501       }
502     }
503 #endif
504
505     /* Top up the run queue from our spark pool.  We try to make the
506      * number of threads in the run queue equal to the number of
507      * free capabilities.
508      *
509      * Disable spark support in SMP for now, non-essential & requires
510      * a little bit of work to make it compile cleanly. -- sof 1/02.
511      */
512 #if 0 /* defined(SMP) */
513     {
514       nat n = getFreeCapabilities();
515       StgTSO *tso = run_queue_hd;
516
517       /* Count the run queue */
518       while (n > 0 && tso != END_TSO_QUEUE) {
519         tso = tso->link;
520         n--;
521       }
522
523       for (; n > 0; n--) {
524         StgClosure *spark;
525         spark = findSpark(rtsFalse);
526         if (spark == NULL) {
527           break; /* no more sparks in the pool */
528         } else {
529           /* I'd prefer this to be done in activateSpark -- HWL */
530           /* tricky - it needs to hold the scheduler lock and
531            * not try to re-acquire it -- SDM */
532           createSparkThread(spark);       
533           IF_DEBUG(scheduler,
534                    sched_belch("==^^ turning spark of closure %p into a thread",
535                                (StgClosure *)spark));
536         }
537       }
538       /* We need to wake up the other tasks if we just created some
539        * work for them.
540        */
541       if (getFreeCapabilities() - n > 1) {
542           signalCondition( &thread_ready_cond );
543       }
544     }
545 #endif // SMP
546
547     /* check for signals each time around the scheduler */
548 #ifndef mingw32_TARGET_OS
549     if (signals_pending()) {
550       startSignalHandlers();
551     }
552 #endif
553
554     /* Check whether any waiting threads need to be woken up.  If the
555      * run queue is empty, and there are no other tasks running, we
556      * can wait indefinitely for something to happen.
557      * ToDo: what if another client comes along & requests another
558      * main thread?
559      */
560     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
561       awaitEvent( EMPTY_RUN_QUEUE()
562 #if defined(SMP)
563         && allFreeCapabilities()
564 #endif
565         );
566     }
567     /* we can be interrupted while waiting for I/O... */
568     if (interrupted) continue;
569
570     /* 
571      * Detect deadlock: when we have no threads to run, there are no
572      * threads waiting on I/O or sleeping, and all the other tasks are
573      * waiting for work, we must have a deadlock of some description.
574      *
575      * We first try to find threads blocked on themselves (ie. black
576      * holes), and generate NonTermination exceptions where necessary.
577      *
578      * If no threads are black holed, we have a deadlock situation, so
579      * inform all the main threads.
580      */
581 #ifndef PAR
582     if (   EMPTY_RUN_QUEUE()
583         && EMPTY_QUEUE(blocked_queue_hd)
584         && EMPTY_QUEUE(sleeping_queue)
585 #if defined(RTS_SUPPORTS_THREADS)
586         && EMPTY_QUEUE(suspended_ccalling_threads)
587 #endif
588 #ifdef SMP
589         && allFreeCapabilities()
590 #endif
591         )
592     {
593         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
594 #if defined(THREADED_RTS)
595         /* and SMP mode ..? */
596         releaseCapability(cap);
597 #endif
598         RELEASE_LOCK(&sched_mutex);
599         GarbageCollect(GetRoots,rtsTrue);
600         ACQUIRE_LOCK(&sched_mutex);
601         if (   EMPTY_QUEUE(blocked_queue_hd)
602             && EMPTY_RUN_QUEUE()
603             && EMPTY_QUEUE(sleeping_queue) ) {
604
605             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
606             detectBlackHoles();
607
608             /* No black holes, so probably a real deadlock.  Send the
609              * current main thread the Deadlock exception (or in the SMP
610              * build, send *all* main threads the deadlock exception,
611              * since none of them can make progress).
612              */
613             if ( EMPTY_RUN_QUEUE() ) {
614                 StgMainThread *m;
615 #if defined(RTS_SUPPORTS_THREADS)
616                 for (m = main_threads; m != NULL; m = m->link) {
617                     switch (m->tso->why_blocked) {
618                     case BlockedOnBlackHole:
619                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
620                         break;
621                     case BlockedOnException:
622                     case BlockedOnMVar:
623                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
624                         break;
625                     default:
626                         barf("deadlock: main thread blocked in a strange way");
627                     }
628                 }
629 #else
630                 m = main_threads;
631                 switch (m->tso->why_blocked) {
632                 case BlockedOnBlackHole:
633                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
634                     break;
635                 case BlockedOnException:
636                 case BlockedOnMVar:
637                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
638                     break;
639                 default:
640                     barf("deadlock: main thread blocked in a strange way");
641                 }
642 #endif
643             }
644 #if defined(RTS_SUPPORTS_THREADS)
645             /* ToDo: revisit conditions (and mechanism) for shutting
646                down a multi-threaded world  */
647             if ( EMPTY_RUN_QUEUE() ) {
648               IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
649               shutdownHaskellAndExit(0);
650             }
651 #endif
652             ASSERT( !EMPTY_RUN_QUEUE() );
653         }
654     }
655 #elif defined(PAR)
656     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
657 #endif
658
659 #if defined(SMP)
660     /* If there's a GC pending, don't do anything until it has
661      * completed.
662      */
663     if (ready_to_gc) {
664       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
665       waitCondition( &gc_pending_cond, &sched_mutex );
666     }
667 #endif    
668
669 #if defined(RTS_SUPPORTS_THREADS)
670     /* block until we've got a thread on the run queue and a free
671      * capability.
672      *
673      */
674     if ( EMPTY_RUN_QUEUE() ) {
675       /* Give up our capability */
676       releaseCapability(cap);
677       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
678       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
679       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
680 #if 0
681       while ( EMPTY_RUN_QUEUE() ) {
682         waitForWorkCapability(&sched_mutex, &cap);
683         IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
684       }
685 #endif
686     }
687 #endif
688
689 #if defined(GRAN)
690     if (RtsFlags.GranFlags.Light)
691       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
692
693     /* adjust time based on time-stamp */
694     if (event->time > CurrentTime[CurrentProc] &&
695         event->evttype != ContinueThread)
696       CurrentTime[CurrentProc] = event->time;
697     
698     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
699     if (!RtsFlags.GranFlags.Light)
700       handleIdlePEs();
701
702     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
703
704     /* main event dispatcher in GranSim */
705     switch (event->evttype) {
706       /* Should just be continuing execution */
707     case ContinueThread:
708       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
709       /* ToDo: check assertion
710       ASSERT(run_queue_hd != (StgTSO*)NULL &&
711              run_queue_hd != END_TSO_QUEUE);
712       */
713       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
714       if (!RtsFlags.GranFlags.DoAsyncFetch &&
715           procStatus[CurrentProc]==Fetching) {
716         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
717               CurrentTSO->id, CurrentTSO, CurrentProc);
718         goto next_thread;
719       } 
720       /* Ignore ContinueThreads for completed threads */
721       if (CurrentTSO->what_next == ThreadComplete) {
722         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
723               CurrentTSO->id, CurrentTSO, CurrentProc);
724         goto next_thread;
725       } 
726       /* Ignore ContinueThreads for threads that are being migrated */
727       if (PROCS(CurrentTSO)==Nowhere) { 
728         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
729               CurrentTSO->id, CurrentTSO, CurrentProc);
730         goto next_thread;
731       }
732       /* The thread should be at the beginning of the run queue */
733       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
734         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
735               CurrentTSO->id, CurrentTSO, CurrentProc);
736         break; // run the thread anyway
737       }
738       /*
739       new_event(proc, proc, CurrentTime[proc],
740                 FindWork,
741                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
742       goto next_thread; 
743       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
744       break; // now actually run the thread; DaH Qu'vam yImuHbej 
745
746     case FetchNode:
747       do_the_fetchnode(event);
748       goto next_thread;             /* handle next event in event queue  */
749       
750     case GlobalBlock:
751       do_the_globalblock(event);
752       goto next_thread;             /* handle next event in event queue  */
753       
754     case FetchReply:
755       do_the_fetchreply(event);
756       goto next_thread;             /* handle next event in event queue  */
757       
758     case UnblockThread:   /* Move from the blocked queue to the tail of */
759       do_the_unblock(event);
760       goto next_thread;             /* handle next event in event queue  */
761       
762     case ResumeThread:  /* Move from the blocked queue to the tail of */
763       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
764       event->tso->gran.blocktime += 
765         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
766       do_the_startthread(event);
767       goto next_thread;             /* handle next event in event queue  */
768       
769     case StartThread:
770       do_the_startthread(event);
771       goto next_thread;             /* handle next event in event queue  */
772       
773     case MoveThread:
774       do_the_movethread(event);
775       goto next_thread;             /* handle next event in event queue  */
776       
777     case MoveSpark:
778       do_the_movespark(event);
779       goto next_thread;             /* handle next event in event queue  */
780       
781     case FindWork:
782       do_the_findwork(event);
783       goto next_thread;             /* handle next event in event queue  */
784       
785     default:
786       barf("Illegal event type %u\n", event->evttype);
787     }  /* switch */
788     
789     /* This point was scheduler_loop in the old RTS */
790
791     IF_DEBUG(gran, belch("GRAN: after main switch"));
792
793     TimeOfLastEvent = CurrentTime[CurrentProc];
794     TimeOfNextEvent = get_time_of_next_event();
795     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
796     // CurrentTSO = ThreadQueueHd;
797
798     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
799                          TimeOfNextEvent));
800
801     if (RtsFlags.GranFlags.Light) 
802       GranSimLight_leave_system(event, &ActiveTSO); 
803
804     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
805
806     IF_DEBUG(gran, 
807              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
808
809     /* in a GranSim setup the TSO stays on the run queue */
810     t = CurrentTSO;
811     /* Take a thread from the run queue. */
812     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
813
814     IF_DEBUG(gran, 
815              fprintf(stderr, "GRAN: About to run current thread, which is\n");
816              G_TSO(t,5));
817
818     context_switch = 0; // turned on via GranYield, checking events and time slice
819
820     IF_DEBUG(gran, 
821              DumpGranEvent(GR_SCHEDULE, t));
822
823     procStatus[CurrentProc] = Busy;
824
825 #elif defined(PAR)
826     if (PendingFetches != END_BF_QUEUE) {
827         processFetches();
828     }
829
830     /* ToDo: phps merge with spark activation above */
831     /* check whether we have local work and send requests if we have none */
832     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
833       /* :-[  no local threads => look out for local sparks */
834       /* the spark pool for the current PE */
835       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
836       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
837           pool->hd < pool->tl) {
838         /* 
839          * ToDo: add GC code check that we really have enough heap afterwards!!
840          * Old comment:
841          * If we're here (no runnable threads) and we have pending
842          * sparks, we must have a space problem.  Get enough space
843          * to turn one of those pending sparks into a
844          * thread... 
845          */
846
847         spark = findSpark(rtsFalse);                /* get a spark */
848         if (spark != (rtsSpark) NULL) {
849           tso = activateSpark(spark);       /* turn the spark into a thread */
850           IF_PAR_DEBUG(schedule,
851                        belch("==== schedule: Created TSO %d (%p); %d threads active",
852                              tso->id, tso, advisory_thread_count));
853
854           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
855             belch("==^^ failed to activate spark");
856             goto next_thread;
857           }               /* otherwise fall through & pick-up new tso */
858         } else {
859           IF_PAR_DEBUG(verbose,
860                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
861                              spark_queue_len(pool)));
862           goto next_thread;
863         }
864       }
865
866       /* If we still have no work we need to send a FISH to get a spark
867          from another PE 
868       */
869       if (EMPTY_RUN_QUEUE()) {
870       /* =8-[  no local sparks => look for work on other PEs */
871         /*
872          * We really have absolutely no work.  Send out a fish
873          * (there may be some out there already), and wait for
874          * something to arrive.  We clearly can't run any threads
875          * until a SCHEDULE or RESUME arrives, and so that's what
876          * we're hoping to see.  (Of course, we still have to
877          * respond to other types of messages.)
878          */
879         TIME now = msTime() /*CURRENT_TIME*/;
880         IF_PAR_DEBUG(verbose, 
881                      belch("--  now=%ld", now));
882         IF_PAR_DEBUG(verbose,
883                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
884                          (last_fish_arrived_at!=0 &&
885                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
886                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
887                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
888                              last_fish_arrived_at,
889                              RtsFlags.ParFlags.fishDelay, now);
890                      });
891         
892         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
893             (last_fish_arrived_at==0 ||
894              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
895           /* outstandingFishes is set in sendFish, processFish;
896              avoid flooding system with fishes via delay */
897           pe = choosePE();
898           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
899                    NEW_FISH_HUNGER);
900
901           // Global statistics: count no. of fishes
902           if (RtsFlags.ParFlags.ParStats.Global &&
903               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
904             globalParStats.tot_fish_mess++;
905           }
906         }
907       
908         receivedFinish = processMessages();
909         goto next_thread;
910       }
911     } else if (PacketsWaiting()) {  /* Look for incoming messages */
912       receivedFinish = processMessages();
913     }
914
915     /* Now we are sure that we have some work available */
916     ASSERT(run_queue_hd != END_TSO_QUEUE);
917
918     /* Take a thread from the run queue, if we have work */
919     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
920     IF_DEBUG(sanity,checkTSO(t));
921
922     /* ToDo: write something to the log-file
923     if (RTSflags.ParFlags.granSimStats && !sameThread)
924         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
925
926     CurrentTSO = t;
927     */
928     /* the spark pool for the current PE */
929     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
930
931     IF_DEBUG(scheduler, 
932              belch("--=^ %d threads, %d sparks on [%#x]", 
933                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
934
935 # if 1
936     if (0 && RtsFlags.ParFlags.ParStats.Full && 
937         t && LastTSO && t->id != LastTSO->id && 
938         LastTSO->why_blocked == NotBlocked && 
939         LastTSO->what_next != ThreadComplete) {
940       // if previously scheduled TSO not blocked we have to record the context switch
941       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
942                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
943     }
944
945     if (RtsFlags.ParFlags.ParStats.Full && 
946         (emitSchedule /* forced emit */ ||
947         (t && LastTSO && t->id != LastTSO->id))) {
948       /* 
949          we are running a different TSO, so write a schedule event to log file
950          NB: If we use fair scheduling we also have to write  a deschedule 
951              event for LastTSO; with unfair scheduling we know that the
952              previous tso has blocked whenever we switch to another tso, so
953              we don't need it in GUM for now
954       */
955       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
956                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
957       emitSchedule = rtsFalse;
958     }
959      
960 # endif
961 #else /* !GRAN && !PAR */
962   
963     /* grab a thread from the run queue */
964     ASSERT(run_queue_hd != END_TSO_QUEUE);
965     t = POP_RUN_QUEUE();
966     // Sanity check the thread we're about to run.  This can be
967     // expensive if there is lots of thread switching going on...
968     IF_DEBUG(sanity,checkTSO(t));
969 #endif
970     
971     grabCapability(&cap);
972     cap->r.rCurrentTSO = t;
973     
974     /* context switches are now initiated by the timer signal, unless
975      * the user specified "context switch as often as possible", with
976      * +RTS -C0
977      */
978     if (
979 #ifdef PROFILING
980         RtsFlags.ProfFlags.profileInterval == 0 ||
981 #endif
982         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
983          && (run_queue_hd != END_TSO_QUEUE
984              || blocked_queue_hd != END_TSO_QUEUE
985              || sleeping_queue != END_TSO_QUEUE)))
986         context_switch = 1;
987     else
988         context_switch = 0;
989
990     RELEASE_LOCK(&sched_mutex);
991
992     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
993                               t->id, t, whatNext_strs[t->what_next]));
994
995 #ifdef PROFILING
996     startHeapProfTimer();
997 #endif
998
999     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1000     /* Run the current thread 
1001      */
1002     switch (cap->r.rCurrentTSO->what_next) {
1003     case ThreadKilled:
1004     case ThreadComplete:
1005         /* Thread already finished, return to scheduler. */
1006         ret = ThreadFinished;
1007         break;
1008     case ThreadEnterGHC:
1009         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1010         break;
1011     case ThreadRunGHC:
1012         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1013         break;
1014     case ThreadEnterInterp:
1015         ret = interpretBCO(cap);
1016         break;
1017     default:
1018       barf("schedule: invalid what_next field");
1019     }
1020     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1021     
1022     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1023 #ifdef PROFILING
1024     stopHeapProfTimer();
1025     CCCS = CCS_SYSTEM;
1026 #endif
1027     
1028     ACQUIRE_LOCK(&sched_mutex);
1029
1030 #ifdef SMP
1031     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1032 #elif !defined(GRAN) && !defined(PAR)
1033     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1034 #endif
1035     t = cap->r.rCurrentTSO;
1036     
1037 #if defined(PAR)
1038     /* HACK 675: if the last thread didn't yield, make sure to print a 
1039        SCHEDULE event to the log file when StgRunning the next thread, even
1040        if it is the same one as before */
1041     LastTSO = t; 
1042     TimeOfLastYield = CURRENT_TIME;
1043 #endif
1044
1045     switch (ret) {
1046     case HeapOverflow:
1047 #if defined(GRAN)
1048       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1049       globalGranStats.tot_heapover++;
1050 #elif defined(PAR)
1051       globalParStats.tot_heapover++;
1052 #endif
1053
1054       // did the task ask for a large block?
1055       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1056           // if so, get one and push it on the front of the nursery.
1057           bdescr *bd;
1058           nat blocks;
1059           
1060           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1061
1062           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1063                                    t->id, t,
1064                                    whatNext_strs[t->what_next], blocks));
1065
1066           // don't do this if it would push us over the
1067           // alloc_blocks_lim limit; we'll GC first.
1068           if (alloc_blocks + blocks < alloc_blocks_lim) {
1069
1070               alloc_blocks += blocks;
1071               bd = allocGroup( blocks );
1072
1073               // link the new group into the list
1074               bd->link = cap->r.rCurrentNursery;
1075               bd->u.back = cap->r.rCurrentNursery->u.back;
1076               if (cap->r.rCurrentNursery->u.back != NULL) {
1077                   cap->r.rCurrentNursery->u.back->link = bd;
1078               } else {
1079                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1080                          g0s0->blocks == cap->r.rNursery);
1081                   cap->r.rNursery = g0s0->blocks = bd;
1082               }           
1083               cap->r.rCurrentNursery->u.back = bd;
1084
1085               // initialise it as a nursery block
1086               bd->step = g0s0;
1087               bd->gen_no = 0;
1088               bd->flags = 0;
1089               bd->free = bd->start;
1090
1091               // don't forget to update the block count in g0s0.
1092               g0s0->n_blocks += blocks;
1093               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1094
1095               // now update the nursery to point to the new block
1096               cap->r.rCurrentNursery = bd;
1097
1098               // we might be unlucky and have another thread get on the
1099               // run queue before us and steal the large block, but in that
1100               // case the thread will just end up requesting another large
1101               // block.
1102               PUSH_ON_RUN_QUEUE(t);
1103               break;
1104           }
1105       }
1106
1107       /* make all the running tasks block on a condition variable,
1108        * maybe set context_switch and wait till they all pile in,
1109        * then have them wait on a GC condition variable.
1110        */
1111       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1112                                t->id, t, whatNext_strs[t->what_next]));
1113       threadPaused(t);
1114 #if defined(GRAN)
1115       ASSERT(!is_on_queue(t,CurrentProc));
1116 #elif defined(PAR)
1117       /* Currently we emit a DESCHEDULE event before GC in GUM.
1118          ToDo: either add separate event to distinguish SYSTEM time from rest
1119                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1120       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1121         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1122                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1123         emitSchedule = rtsTrue;
1124       }
1125 #endif
1126       
1127       ready_to_gc = rtsTrue;
1128       context_switch = 1;               /* stop other threads ASAP */
1129       PUSH_ON_RUN_QUEUE(t);
1130       /* actual GC is done at the end of the while loop */
1131       break;
1132       
1133     case StackOverflow:
1134 #if defined(GRAN)
1135       IF_DEBUG(gran, 
1136                DumpGranEvent(GR_DESCHEDULE, t));
1137       globalGranStats.tot_stackover++;
1138 #elif defined(PAR)
1139       // IF_DEBUG(par, 
1140       // DumpGranEvent(GR_DESCHEDULE, t);
1141       globalParStats.tot_stackover++;
1142 #endif
1143       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1144                                t->id, t, whatNext_strs[t->what_next]));
1145       /* just adjust the stack for this thread, then pop it back
1146        * on the run queue.
1147        */
1148       threadPaused(t);
1149       { 
1150         StgMainThread *m;
1151         /* enlarge the stack */
1152         StgTSO *new_t = threadStackOverflow(t);
1153         
1154         /* This TSO has moved, so update any pointers to it from the
1155          * main thread stack.  It better not be on any other queues...
1156          * (it shouldn't be).
1157          */
1158         for (m = main_threads; m != NULL; m = m->link) {
1159           if (m->tso == t) {
1160             m->tso = new_t;
1161           }
1162         }
1163         threadPaused(new_t);
1164         PUSH_ON_RUN_QUEUE(new_t);
1165       }
1166       break;
1167
1168     case ThreadYielding:
1169 #if defined(GRAN)
1170       IF_DEBUG(gran, 
1171                DumpGranEvent(GR_DESCHEDULE, t));
1172       globalGranStats.tot_yields++;
1173 #elif defined(PAR)
1174       // IF_DEBUG(par, 
1175       // DumpGranEvent(GR_DESCHEDULE, t);
1176       globalParStats.tot_yields++;
1177 #endif
1178       /* put the thread back on the run queue.  Then, if we're ready to
1179        * GC, check whether this is the last task to stop.  If so, wake
1180        * up the GC thread.  getThread will block during a GC until the
1181        * GC is finished.
1182        */
1183       IF_DEBUG(scheduler,
1184                if (t->what_next == ThreadEnterInterp) {
1185                    /* ToDo: or maybe a timer expired when we were in Hugs?
1186                     * or maybe someone hit ctrl-C
1187                     */
1188                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1189                          t->id, t, whatNext_strs[t->what_next]);
1190                } else {
1191                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1192                          t->id, t, whatNext_strs[t->what_next]);
1193                }
1194                );
1195
1196       threadPaused(t);
1197
1198       IF_DEBUG(sanity,
1199                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1200                checkTSO(t));
1201       ASSERT(t->link == END_TSO_QUEUE);
1202 #if defined(GRAN)
1203       ASSERT(!is_on_queue(t,CurrentProc));
1204
1205       IF_DEBUG(sanity,
1206                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1207                checkThreadQsSanity(rtsTrue));
1208 #endif
1209 #if defined(PAR)
1210       if (RtsFlags.ParFlags.doFairScheduling) { 
1211         /* this does round-robin scheduling; good for concurrency */
1212         APPEND_TO_RUN_QUEUE(t);
1213       } else {
1214         /* this does unfair scheduling; good for parallelism */
1215         PUSH_ON_RUN_QUEUE(t);
1216       }
1217 #else
1218       /* this does round-robin scheduling; good for concurrency */
1219       APPEND_TO_RUN_QUEUE(t);
1220 #endif
1221 #if defined(GRAN)
1222       /* add a ContinueThread event to actually process the thread */
1223       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1224                 ContinueThread,
1225                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1226       IF_GRAN_DEBUG(bq, 
1227                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1228                G_EVENTQ(0);
1229                G_CURR_THREADQ(0));
1230 #endif /* GRAN */
1231       break;
1232       
1233     case ThreadBlocked:
1234 #if defined(GRAN)
1235       IF_DEBUG(scheduler,
1236                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1237                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1238                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1239
1240       // ??? needed; should emit block before
1241       IF_DEBUG(gran, 
1242                DumpGranEvent(GR_DESCHEDULE, t)); 
1243       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1244       /*
1245         ngoq Dogh!
1246       ASSERT(procStatus[CurrentProc]==Busy || 
1247               ((procStatus[CurrentProc]==Fetching) && 
1248               (t->block_info.closure!=(StgClosure*)NULL)));
1249       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1250           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1251             procStatus[CurrentProc]==Fetching)) 
1252         procStatus[CurrentProc] = Idle;
1253       */
1254 #elif defined(PAR)
1255       IF_DEBUG(scheduler,
1256                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1257                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1258       IF_PAR_DEBUG(bq,
1259
1260                    if (t->block_info.closure!=(StgClosure*)NULL) 
1261                      print_bq(t->block_info.closure));
1262
1263       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1264       blockThread(t);
1265
1266       /* whatever we schedule next, we must log that schedule */
1267       emitSchedule = rtsTrue;
1268
1269 #else /* !GRAN */
1270       /* don't need to do anything.  Either the thread is blocked on
1271        * I/O, in which case we'll have called addToBlockedQueue
1272        * previously, or it's blocked on an MVar or Blackhole, in which
1273        * case it'll be on the relevant queue already.
1274        */
1275       IF_DEBUG(scheduler,
1276                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1277                printThreadBlockage(t);
1278                fprintf(stderr, "\n"));
1279
1280       /* Only for dumping event to log file 
1281          ToDo: do I need this in GranSim, too?
1282       blockThread(t);
1283       */
1284 #endif
1285       threadPaused(t);
1286       break;
1287       
1288     case ThreadFinished:
1289       /* Need to check whether this was a main thread, and if so, signal
1290        * the task that started it with the return value.  If we have no
1291        * more main threads, we probably need to stop all the tasks until
1292        * we get a new one.
1293        */
1294       /* We also end up here if the thread kills itself with an
1295        * uncaught exception, see Exception.hc.
1296        */
1297       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1298 #if defined(GRAN)
1299       endThread(t, CurrentProc); // clean-up the thread
1300 #elif defined(PAR)
1301       /* For now all are advisory -- HWL */
1302       //if(t->priority==AdvisoryPriority) ??
1303       advisory_thread_count--;
1304       
1305 # ifdef DIST
1306       if(t->dist.priority==RevalPriority)
1307         FinishReval(t);
1308 # endif
1309       
1310       if (RtsFlags.ParFlags.ParStats.Full &&
1311           !RtsFlags.ParFlags.ParStats.Suppressed) 
1312         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1313 #endif
1314       break;
1315       
1316     default:
1317       barf("schedule: invalid thread return code %d", (int)ret);
1318     }
1319     
1320 #if defined(RTS_SUPPORTS_THREADS)
1321     /* I don't understand what this re-grab is doing -- sof */
1322     grabCapability(&cap);
1323 #endif
1324
1325 #ifdef PROFILING
1326     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1327         GarbageCollect(GetRoots, rtsTrue);
1328         heapCensus();
1329         performHeapProfile = rtsFalse;
1330         ready_to_gc = rtsFalse; // we already GC'd
1331     }
1332 #endif
1333
1334     if (ready_to_gc 
1335 #ifdef SMP
1336         && allFreeCapabilities() 
1337 #endif
1338         ) {
1339       /* everybody back, start the GC.
1340        * Could do it in this thread, or signal a condition var
1341        * to do it in another thread.  Either way, we need to
1342        * broadcast on gc_pending_cond afterward.
1343        */
1344 #if defined(RTS_SUPPORTS_THREADS)
1345       IF_DEBUG(scheduler,sched_belch("doing GC"));
1346 #endif
1347       GarbageCollect(GetRoots,rtsFalse);
1348       ready_to_gc = rtsFalse;
1349 #ifdef SMP
1350       broadcastCondition(&gc_pending_cond);
1351 #endif
1352 #if defined(GRAN)
1353       /* add a ContinueThread event to continue execution of current thread */
1354       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1355                 ContinueThread,
1356                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1357       IF_GRAN_DEBUG(bq, 
1358                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1359                G_EVENTQ(0);
1360                G_CURR_THREADQ(0));
1361 #endif /* GRAN */
1362     }
1363
1364 #if defined(GRAN)
1365   next_thread:
1366     IF_GRAN_DEBUG(unused,
1367                   print_eventq(EventHd));
1368
1369     event = get_next_event();
1370 #elif defined(PAR)
1371   next_thread:
1372     /* ToDo: wait for next message to arrive rather than busy wait */
1373 #endif /* GRAN */
1374
1375   } /* end of while(1) */
1376
1377   IF_PAR_DEBUG(verbose,
1378                belch("== Leaving schedule() after having received Finish"));
1379 }
1380
1381 /* ---------------------------------------------------------------------------
1382  * deleteAllThreads():  kill all the live threads.
1383  *
1384  * This is used when we catch a user interrupt (^C), before performing
1385  * any necessary cleanups and running finalizers.
1386  * ------------------------------------------------------------------------- */
1387    
1388 void deleteAllThreads ( void )
1389 {
1390   StgTSO* t, *next;
1391   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1392   for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1393       next = t->link;
1394       deleteThread(t);
1395   }
1396   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1397       next = t->link;
1398       deleteThread(t);
1399   }
1400   for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1401       next = t->link;
1402       deleteThread(t);
1403   }
1404   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1405   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1406   sleeping_queue = END_TSO_QUEUE;
1407 }
1408
1409 /* startThread and  insertThread are now in GranSim.c -- HWL */
1410
1411
1412 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1413 //@subsection Suspend and Resume
1414
1415 /* ---------------------------------------------------------------------------
1416  * Suspending & resuming Haskell threads.
1417  * 
1418  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1419  * its capability before calling the C function.  This allows another
1420  * task to pick up the capability and carry on running Haskell
1421  * threads.  It also means that if the C call blocks, it won't lock
1422  * the whole system.
1423  *
1424  * The Haskell thread making the C call is put to sleep for the
1425  * duration of the call, on the susepended_ccalling_threads queue.  We
1426  * give out a token to the task, which it can use to resume the thread
1427  * on return from the C function.
1428  * ------------------------------------------------------------------------- */
1429    
1430 StgInt
1431 suspendThread( StgRegTable *reg )
1432 {
1433   nat tok;
1434   Capability *cap;
1435
1436   /* assume that *reg is a pointer to the StgRegTable part
1437    * of a Capability.
1438    */
1439   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1440
1441   ACQUIRE_LOCK(&sched_mutex);
1442
1443   IF_DEBUG(scheduler,
1444            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1445
1446   threadPaused(cap->r.rCurrentTSO);
1447   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1448   suspended_ccalling_threads = cap->r.rCurrentTSO;
1449
1450 #if defined(RTS_SUPPORTS_THREADS)
1451   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1452 #endif
1453
1454   /* Use the thread ID as the token; it should be unique */
1455   tok = cap->r.rCurrentTSO->id;
1456
1457   /* Hand back capability */
1458   releaseCapability(cap);
1459   
1460 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1461   /* Preparing to leave the RTS, so ensure there's a native thread/task
1462      waiting to take over.
1463      
1464      ToDo: optimise this and only create a new task if there's a need
1465      for one (i.e., if there's only one Concurrent Haskell thread alive,
1466      there's no need to create a new task).
1467   */
1468   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1469   startTask(taskStart);
1470 #endif
1471
1472   /* Other threads _might_ be available for execution; signal this */
1473   THREAD_RUNNABLE();
1474   RELEASE_LOCK(&sched_mutex);
1475   return tok; 
1476 }
1477
1478 StgRegTable *
1479 resumeThread( StgInt tok )
1480 {
1481   StgTSO *tso, **prev;
1482   Capability *cap;
1483
1484 #if defined(RTS_SUPPORTS_THREADS)
1485   /* Wait for permission to re-enter the RTS with the result. */
1486   grabReturnCapability(&sched_mutex, &cap);
1487 #else
1488   grabCapability(&cap);
1489 #endif
1490
1491   /* Remove the thread off of the suspended list */
1492   prev = &suspended_ccalling_threads;
1493   for (tso = suspended_ccalling_threads; 
1494        tso != END_TSO_QUEUE; 
1495        prev = &tso->link, tso = tso->link) {
1496     if (tso->id == (StgThreadID)tok) {
1497       *prev = tso->link;
1498       break;
1499     }
1500   }
1501   if (tso == END_TSO_QUEUE) {
1502     barf("resumeThread: thread not found");
1503   }
1504   tso->link = END_TSO_QUEUE;
1505   /* Reset blocking status */
1506   tso->why_blocked  = NotBlocked;
1507
1508   RELEASE_LOCK(&sched_mutex);
1509
1510   cap->r.rCurrentTSO = tso;
1511   return &cap->r;
1512 }
1513
1514
1515 /* ---------------------------------------------------------------------------
1516  * Static functions
1517  * ------------------------------------------------------------------------ */
1518 static void unblockThread(StgTSO *tso);
1519
1520 /* ---------------------------------------------------------------------------
1521  * Comparing Thread ids.
1522  *
1523  * This is used from STG land in the implementation of the
1524  * instances of Eq/Ord for ThreadIds.
1525  * ------------------------------------------------------------------------ */
1526
1527 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1528
1529   StgThreadID id1 = tso1->id; 
1530   StgThreadID id2 = tso2->id;
1531  
1532   if (id1 < id2) return (-1);
1533   if (id1 > id2) return 1;
1534   return 0;
1535 }
1536
1537 /* ---------------------------------------------------------------------------
1538  * Fetching the ThreadID from an StgTSO.
1539  *
1540  * This is used in the implementation of Show for ThreadIds.
1541  * ------------------------------------------------------------------------ */
1542 int rts_getThreadId(const StgTSO *tso) 
1543 {
1544   return tso->id;
1545 }
1546
1547 /* ---------------------------------------------------------------------------
1548    Create a new thread.
1549
1550    The new thread starts with the given stack size.  Before the
1551    scheduler can run, however, this thread needs to have a closure
1552    (and possibly some arguments) pushed on its stack.  See
1553    pushClosure() in Schedule.h.
1554
1555    createGenThread() and createIOThread() (in SchedAPI.h) are
1556    convenient packaged versions of this function.
1557
1558    currently pri (priority) is only used in a GRAN setup -- HWL
1559    ------------------------------------------------------------------------ */
1560 //@cindex createThread
1561 #if defined(GRAN)
1562 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1563 StgTSO *
1564 createThread(nat stack_size, StgInt pri)
1565 {
1566   return createThread_(stack_size, rtsFalse, pri);
1567 }
1568
1569 static StgTSO *
1570 createThread_(nat size, rtsBool have_lock, StgInt pri)
1571 {
1572 #else
1573 StgTSO *
1574 createThread(nat stack_size)
1575 {
1576   return createThread_(stack_size, rtsFalse);
1577 }
1578
1579 static StgTSO *
1580 createThread_(nat size, rtsBool have_lock)
1581 {
1582 #endif
1583
1584     StgTSO *tso;
1585     nat stack_size;
1586
1587     /* First check whether we should create a thread at all */
1588 #if defined(PAR)
1589   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1590   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1591     threadsIgnored++;
1592     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1593           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1594     return END_TSO_QUEUE;
1595   }
1596   threadsCreated++;
1597 #endif
1598
1599 #if defined(GRAN)
1600   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1601 #endif
1602
1603   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1604
1605   /* catch ridiculously small stack sizes */
1606   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1607     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1608   }
1609
1610   stack_size = size - TSO_STRUCT_SIZEW;
1611
1612   tso = (StgTSO *)allocate(size);
1613   TICK_ALLOC_TSO(stack_size, 0);
1614
1615   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1616 #if defined(GRAN)
1617   SET_GRAN_HDR(tso, ThisPE);
1618 #endif
1619   tso->what_next     = ThreadEnterGHC;
1620
1621   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1622    * protect the increment operation on next_thread_id.
1623    * In future, we could use an atomic increment instead.
1624    */
1625   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1626   tso->id = next_thread_id++; 
1627   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1628
1629   tso->why_blocked  = NotBlocked;
1630   tso->blocked_exceptions = NULL;
1631
1632   tso->stack_size   = stack_size;
1633   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1634                               - TSO_STRUCT_SIZEW;
1635   tso->sp           = (P_)&(tso->stack) + stack_size;
1636
1637 #ifdef PROFILING
1638   tso->prof.CCCS = CCS_MAIN;
1639 #endif
1640
1641   /* put a stop frame on the stack */
1642   tso->sp -= sizeofW(StgStopFrame);
1643   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1644   tso->su = (StgUpdateFrame*)tso->sp;
1645
1646   // ToDo: check this
1647 #if defined(GRAN)
1648   tso->link = END_TSO_QUEUE;
1649   /* uses more flexible routine in GranSim */
1650   insertThread(tso, CurrentProc);
1651 #else
1652   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1653    * from its creation
1654    */
1655 #endif
1656
1657 #if defined(GRAN) 
1658   if (RtsFlags.GranFlags.GranSimStats.Full) 
1659     DumpGranEvent(GR_START,tso);
1660 #elif defined(PAR)
1661   if (RtsFlags.ParFlags.ParStats.Full) 
1662     DumpGranEvent(GR_STARTQ,tso);
1663   /* HACk to avoid SCHEDULE 
1664      LastTSO = tso; */
1665 #endif
1666
1667   /* Link the new thread on the global thread list.
1668    */
1669   tso->global_link = all_threads;
1670   all_threads = tso;
1671
1672 #if defined(DIST)
1673   tso->dist.priority = MandatoryPriority; //by default that is...
1674 #endif
1675
1676 #if defined(GRAN)
1677   tso->gran.pri = pri;
1678 # if defined(DEBUG)
1679   tso->gran.magic = TSO_MAGIC; // debugging only
1680 # endif
1681   tso->gran.sparkname   = 0;
1682   tso->gran.startedat   = CURRENT_TIME; 
1683   tso->gran.exported    = 0;
1684   tso->gran.basicblocks = 0;
1685   tso->gran.allocs      = 0;
1686   tso->gran.exectime    = 0;
1687   tso->gran.fetchtime   = 0;
1688   tso->gran.fetchcount  = 0;
1689   tso->gran.blocktime   = 0;
1690   tso->gran.blockcount  = 0;
1691   tso->gran.blockedat   = 0;
1692   tso->gran.globalsparks = 0;
1693   tso->gran.localsparks  = 0;
1694   if (RtsFlags.GranFlags.Light)
1695     tso->gran.clock  = Now; /* local clock */
1696   else
1697     tso->gran.clock  = 0;
1698
1699   IF_DEBUG(gran,printTSO(tso));
1700 #elif defined(PAR)
1701 # if defined(DEBUG)
1702   tso->par.magic = TSO_MAGIC; // debugging only
1703 # endif
1704   tso->par.sparkname   = 0;
1705   tso->par.startedat   = CURRENT_TIME; 
1706   tso->par.exported    = 0;
1707   tso->par.basicblocks = 0;
1708   tso->par.allocs      = 0;
1709   tso->par.exectime    = 0;
1710   tso->par.fetchtime   = 0;
1711   tso->par.fetchcount  = 0;
1712   tso->par.blocktime   = 0;
1713   tso->par.blockcount  = 0;
1714   tso->par.blockedat   = 0;
1715   tso->par.globalsparks = 0;
1716   tso->par.localsparks  = 0;
1717 #endif
1718
1719 #if defined(GRAN)
1720   globalGranStats.tot_threads_created++;
1721   globalGranStats.threads_created_on_PE[CurrentProc]++;
1722   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1723   globalGranStats.tot_sq_probes++;
1724 #elif defined(PAR)
1725   // collect parallel global statistics (currently done together with GC stats)
1726   if (RtsFlags.ParFlags.ParStats.Global &&
1727       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1728     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1729     globalParStats.tot_threads_created++;
1730   }
1731 #endif 
1732
1733 #if defined(GRAN)
1734   IF_GRAN_DEBUG(pri,
1735                 belch("==__ schedule: Created TSO %d (%p);",
1736                       CurrentProc, tso, tso->id));
1737 #elif defined(PAR)
1738     IF_PAR_DEBUG(verbose,
1739                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1740                        tso->id, tso, advisory_thread_count));
1741 #else
1742   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1743                                  tso->id, tso->stack_size));
1744 #endif    
1745   return tso;
1746 }
1747
1748 #if defined(PAR)
1749 /* RFP:
1750    all parallel thread creation calls should fall through the following routine.
1751 */
1752 StgTSO *
1753 createSparkThread(rtsSpark spark) 
1754 { StgTSO *tso;
1755   ASSERT(spark != (rtsSpark)NULL);
1756   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1757   { threadsIgnored++;
1758     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1759           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1760     return END_TSO_QUEUE;
1761   }
1762   else
1763   { threadsCreated++;
1764     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1765     if (tso==END_TSO_QUEUE)     
1766       barf("createSparkThread: Cannot create TSO");
1767 #if defined(DIST)
1768     tso->priority = AdvisoryPriority;
1769 #endif
1770     pushClosure(tso,spark);
1771     PUSH_ON_RUN_QUEUE(tso);
1772     advisory_thread_count++;    
1773   }
1774   return tso;
1775 }
1776 #endif
1777
1778 /*
1779   Turn a spark into a thread.
1780   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1781 */
1782 #if defined(PAR)
1783 //@cindex activateSpark
1784 StgTSO *
1785 activateSpark (rtsSpark spark) 
1786 {
1787   StgTSO *tso;
1788
1789   tso = createSparkThread(spark);
1790   if (RtsFlags.ParFlags.ParStats.Full) {   
1791     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1792     IF_PAR_DEBUG(verbose,
1793                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1794                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1795   }
1796   // ToDo: fwd info on local/global spark to thread -- HWL
1797   // tso->gran.exported =  spark->exported;
1798   // tso->gran.locked =   !spark->global;
1799   // tso->gran.sparkname = spark->name;
1800
1801   return tso;
1802 }
1803 #endif
1804
1805 /* ---------------------------------------------------------------------------
1806  * scheduleThread()
1807  *
1808  * scheduleThread puts a thread on the head of the runnable queue.
1809  * This will usually be done immediately after a thread is created.
1810  * The caller of scheduleThread must create the thread using e.g.
1811  * createThread and push an appropriate closure
1812  * on this thread's stack before the scheduler is invoked.
1813  * ------------------------------------------------------------------------ */
1814
1815 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1816
1817 void
1818 scheduleThread_(StgTSO *tso
1819                , rtsBool createTask
1820 #if !defined(THREADED_RTS)
1821                  STG_UNUSED
1822 #endif
1823               )
1824 {
1825   ACQUIRE_LOCK(&sched_mutex);
1826
1827   /* Put the new thread on the head of the runnable queue.  The caller
1828    * better push an appropriate closure on this thread's stack
1829    * beforehand.  In the SMP case, the thread may start running as
1830    * soon as we release the scheduler lock below.
1831    */
1832   PUSH_ON_RUN_QUEUE(tso);
1833 #if defined(THREADED_RTS)
1834   /* If main() is scheduling a thread, don't bother creating a 
1835    * new task.
1836    */
1837   if ( createTask ) {
1838     startTask(taskStart);
1839   }
1840 #endif
1841   THREAD_RUNNABLE();
1842
1843 #if 0
1844   IF_DEBUG(scheduler,printTSO(tso));
1845 #endif
1846   RELEASE_LOCK(&sched_mutex);
1847 }
1848
1849 void scheduleThread(StgTSO* tso)
1850 {
1851   return scheduleThread_(tso, rtsFalse);
1852 }
1853
1854 void scheduleExtThread(StgTSO* tso)
1855 {
1856   return scheduleThread_(tso, rtsTrue);
1857 }
1858
1859 /* ---------------------------------------------------------------------------
1860  * initScheduler()
1861  *
1862  * Initialise the scheduler.  This resets all the queues - if the
1863  * queues contained any threads, they'll be garbage collected at the
1864  * next pass.
1865  *
1866  * ------------------------------------------------------------------------ */
1867
1868 #ifdef SMP
1869 static void
1870 term_handler(int sig STG_UNUSED)
1871 {
1872   stat_workerStop();
1873   ACQUIRE_LOCK(&term_mutex);
1874   await_death--;
1875   RELEASE_LOCK(&term_mutex);
1876   shutdownThread();
1877 }
1878 #endif
1879
1880 void 
1881 initScheduler(void)
1882 {
1883 #if defined(GRAN)
1884   nat i;
1885
1886   for (i=0; i<=MAX_PROC; i++) {
1887     run_queue_hds[i]      = END_TSO_QUEUE;
1888     run_queue_tls[i]      = END_TSO_QUEUE;
1889     blocked_queue_hds[i]  = END_TSO_QUEUE;
1890     blocked_queue_tls[i]  = END_TSO_QUEUE;
1891     ccalling_threadss[i]  = END_TSO_QUEUE;
1892     sleeping_queue        = END_TSO_QUEUE;
1893   }
1894 #else
1895   run_queue_hd      = END_TSO_QUEUE;
1896   run_queue_tl      = END_TSO_QUEUE;
1897   blocked_queue_hd  = END_TSO_QUEUE;
1898   blocked_queue_tl  = END_TSO_QUEUE;
1899   sleeping_queue    = END_TSO_QUEUE;
1900 #endif 
1901
1902   suspended_ccalling_threads  = END_TSO_QUEUE;
1903
1904   main_threads = NULL;
1905   all_threads  = END_TSO_QUEUE;
1906
1907   context_switch = 0;
1908   interrupted    = 0;
1909
1910   RtsFlags.ConcFlags.ctxtSwitchTicks =
1911       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1912       
1913 #if defined(RTS_SUPPORTS_THREADS)
1914   /* Initialise the mutex and condition variables used by
1915    * the scheduler. */
1916   initMutex(&sched_mutex);
1917   initMutex(&term_mutex);
1918
1919   initCondition(&thread_ready_cond);
1920 #endif
1921   
1922 #if defined(SMP)
1923   initCondition(&gc_pending_cond);
1924 #endif
1925
1926 #if defined(RTS_SUPPORTS_THREADS)
1927   ACQUIRE_LOCK(&sched_mutex);
1928 #endif
1929
1930   /* Install the SIGHUP handler */
1931 #if defined(SMP)
1932   {
1933     struct sigaction action,oact;
1934
1935     action.sa_handler = term_handler;
1936     sigemptyset(&action.sa_mask);
1937     action.sa_flags = 0;
1938     if (sigaction(SIGTERM, &action, &oact) != 0) {
1939       barf("can't install TERM handler");
1940     }
1941   }
1942 #endif
1943
1944   /* A capability holds the state a native thread needs in
1945    * order to execute STG code. At least one capability is
1946    * floating around (only SMP builds have more than one).
1947    */
1948   initCapabilities();
1949   
1950 #if defined(RTS_SUPPORTS_THREADS)
1951     /* start our haskell execution tasks */
1952 # if defined(SMP)
1953     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
1954 # else
1955     startTaskManager(0,taskStart);
1956 # endif
1957 #endif
1958
1959 #if /* defined(SMP) ||*/ defined(PAR)
1960   initSparkPools();
1961 #endif
1962
1963 #if defined(RTS_SUPPORTS_THREADS)
1964   RELEASE_LOCK(&sched_mutex);
1965 #endif
1966
1967 }
1968
1969 void
1970 exitScheduler( void )
1971 {
1972 #if defined(RTS_SUPPORTS_THREADS)
1973   stopTaskManager();
1974 #endif
1975 }
1976
1977 /* -----------------------------------------------------------------------------
1978    Managing the per-task allocation areas.
1979    
1980    Each capability comes with an allocation area.  These are
1981    fixed-length block lists into which allocation can be done.
1982
1983    ToDo: no support for two-space collection at the moment???
1984    -------------------------------------------------------------------------- */
1985
1986 /* -----------------------------------------------------------------------------
1987  * waitThread is the external interface for running a new computation
1988  * and waiting for the result.
1989  *
1990  * In the non-SMP case, we create a new main thread, push it on the 
1991  * main-thread stack, and invoke the scheduler to run it.  The
1992  * scheduler will return when the top main thread on the stack has
1993  * completed or died, and fill in the necessary fields of the
1994  * main_thread structure.
1995  *
1996  * In the SMP case, we create a main thread as before, but we then
1997  * create a new condition variable and sleep on it.  When our new
1998  * main thread has completed, we'll be woken up and the status/result
1999  * will be in the main_thread struct.
2000  * -------------------------------------------------------------------------- */
2001
2002 int 
2003 howManyThreadsAvail ( void )
2004 {
2005    int i = 0;
2006    StgTSO* q;
2007    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2008       i++;
2009    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2010       i++;
2011    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2012       i++;
2013    return i;
2014 }
2015
2016 void
2017 finishAllThreads ( void )
2018 {
2019    do {
2020       while (run_queue_hd != END_TSO_QUEUE) {
2021          waitThread ( run_queue_hd, NULL);
2022       }
2023       while (blocked_queue_hd != END_TSO_QUEUE) {
2024          waitThread ( blocked_queue_hd, NULL);
2025       }
2026       while (sleeping_queue != END_TSO_QUEUE) {
2027          waitThread ( blocked_queue_hd, NULL);
2028       }
2029    } while 
2030       (blocked_queue_hd != END_TSO_QUEUE || 
2031        run_queue_hd     != END_TSO_QUEUE ||
2032        sleeping_queue   != END_TSO_QUEUE);
2033 }
2034
2035 SchedulerStatus
2036 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2037
2038 #if defined(THREADED_RTS)
2039   return waitThread_(tso,ret, rtsFalse);
2040 #else
2041   return waitThread_(tso,ret);
2042 #endif
2043 }
2044
2045 SchedulerStatus
2046 waitThread_(StgTSO *tso,
2047             /*out*/StgClosure **ret
2048 #if defined(THREADED_RTS)
2049             , rtsBool blockWaiting
2050 #endif
2051            )
2052 {
2053   StgMainThread *m;
2054   SchedulerStatus stat;
2055
2056   ACQUIRE_LOCK(&sched_mutex);
2057   
2058   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2059
2060   m->tso = tso;
2061   m->ret = ret;
2062   m->stat = NoStatus;
2063 #if defined(RTS_SUPPORTS_THREADS)
2064   initCondition(&m->wakeup);
2065 #endif
2066
2067   m->link = main_threads;
2068   main_threads = m;
2069
2070   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2071
2072 #if defined(RTS_SUPPORTS_THREADS)
2073
2074 # if defined(THREADED_RTS)
2075   if (!blockWaiting) {
2076     /* In the threaded case, the OS thread that called main()
2077      * gets to enter the RTS directly without going via another
2078      * task/thread.
2079      */
2080     RELEASE_LOCK(&sched_mutex);
2081     schedule();
2082     ASSERT(m->stat != NoStatus);
2083   } else 
2084 # endif
2085   {
2086     IF_DEBUG(scheduler, sched_belch("sfoo"));
2087     do {
2088       waitCondition(&m->wakeup, &sched_mutex);
2089     } while (m->stat == NoStatus);
2090   }
2091 #elif defined(GRAN)
2092   /* GranSim specific init */
2093   CurrentTSO = m->tso;                // the TSO to run
2094   procStatus[MainProc] = Busy;        // status of main PE
2095   CurrentProc = MainProc;             // PE to run it on
2096
2097   schedule();
2098 #else
2099   RELEASE_LOCK(&sched_mutex);
2100   schedule();
2101   ASSERT(m->stat != NoStatus);
2102 #endif
2103
2104   stat = m->stat;
2105
2106 #if defined(RTS_SUPPORTS_THREADS)
2107   closeCondition(&m->wakeup);
2108 #endif
2109
2110   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2111                               m->tso->id));
2112   free(m);
2113
2114 #if defined(THREADED_RTS)
2115   if (blockWaiting) 
2116 #endif
2117     RELEASE_LOCK(&sched_mutex);
2118
2119   return stat;
2120 }
2121
2122 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2123 //@subsection Run queue code 
2124
2125 #if 0
2126 /* 
2127    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2128        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2129        implicit global variable that has to be correct when calling these
2130        fcts -- HWL 
2131 */
2132
2133 /* Put the new thread on the head of the runnable queue.
2134  * The caller of createThread better push an appropriate closure
2135  * on this thread's stack before the scheduler is invoked.
2136  */
2137 static /* inline */ void
2138 add_to_run_queue(tso)
2139 StgTSO* tso; 
2140 {
2141   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2142   tso->link = run_queue_hd;
2143   run_queue_hd = tso;
2144   if (run_queue_tl == END_TSO_QUEUE) {
2145     run_queue_tl = tso;
2146   }
2147 }
2148
2149 /* Put the new thread at the end of the runnable queue. */
2150 static /* inline */ void
2151 push_on_run_queue(tso)
2152 StgTSO* tso; 
2153 {
2154   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2155   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2156   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2157   if (run_queue_hd == END_TSO_QUEUE) {
2158     run_queue_hd = tso;
2159   } else {
2160     run_queue_tl->link = tso;
2161   }
2162   run_queue_tl = tso;
2163 }
2164
2165 /* 
2166    Should be inlined because it's used very often in schedule.  The tso
2167    argument is actually only needed in GranSim, where we want to have the
2168    possibility to schedule *any* TSO on the run queue, irrespective of the
2169    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2170    the run queue and dequeue the tso, adjusting the links in the queue. 
2171 */
2172 //@cindex take_off_run_queue
2173 static /* inline */ StgTSO*
2174 take_off_run_queue(StgTSO *tso) {
2175   StgTSO *t, *prev;
2176
2177   /* 
2178      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2179
2180      if tso is specified, unlink that tso from the run_queue (doesn't have
2181      to be at the beginning of the queue); GranSim only 
2182   */
2183   if (tso!=END_TSO_QUEUE) {
2184     /* find tso in queue */
2185     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2186          t!=END_TSO_QUEUE && t!=tso;
2187          prev=t, t=t->link) 
2188       /* nothing */ ;
2189     ASSERT(t==tso);
2190     /* now actually dequeue the tso */
2191     if (prev!=END_TSO_QUEUE) {
2192       ASSERT(run_queue_hd!=t);
2193       prev->link = t->link;
2194     } else {
2195       /* t is at beginning of thread queue */
2196       ASSERT(run_queue_hd==t);
2197       run_queue_hd = t->link;
2198     }
2199     /* t is at end of thread queue */
2200     if (t->link==END_TSO_QUEUE) {
2201       ASSERT(t==run_queue_tl);
2202       run_queue_tl = prev;
2203     } else {
2204       ASSERT(run_queue_tl!=t);
2205     }
2206     t->link = END_TSO_QUEUE;
2207   } else {
2208     /* take tso from the beginning of the queue; std concurrent code */
2209     t = run_queue_hd;
2210     if (t != END_TSO_QUEUE) {
2211       run_queue_hd = t->link;
2212       t->link = END_TSO_QUEUE;
2213       if (run_queue_hd == END_TSO_QUEUE) {
2214         run_queue_tl = END_TSO_QUEUE;
2215       }
2216     }
2217   }
2218   return t;
2219 }
2220
2221 #endif /* 0 */
2222
2223 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2224 //@subsection Garbage Collextion Routines
2225
2226 /* ---------------------------------------------------------------------------
2227    Where are the roots that we know about?
2228
2229         - all the threads on the runnable queue
2230         - all the threads on the blocked queue
2231         - all the threads on the sleeping queue
2232         - all the thread currently executing a _ccall_GC
2233         - all the "main threads"
2234      
2235    ------------------------------------------------------------------------ */
2236
2237 /* This has to be protected either by the scheduler monitor, or by the
2238         garbage collection monitor (probably the latter).
2239         KH @ 25/10/99
2240 */
2241
2242 void
2243 GetRoots(evac_fn evac)
2244 {
2245   StgMainThread *m;
2246
2247 #if defined(GRAN)
2248   {
2249     nat i;
2250     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2251       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2252           evac((StgClosure **)&run_queue_hds[i]);
2253       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2254           evac((StgClosure **)&run_queue_tls[i]);
2255       
2256       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2257           evac((StgClosure **)&blocked_queue_hds[i]);
2258       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2259           evac((StgClosure **)&blocked_queue_tls[i]);
2260       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2261           evac((StgClosure **)&ccalling_threads[i]);
2262     }
2263   }
2264
2265   markEventQueue();
2266
2267 #else /* !GRAN */
2268   if (run_queue_hd != END_TSO_QUEUE) {
2269       ASSERT(run_queue_tl != END_TSO_QUEUE);
2270       evac((StgClosure **)&run_queue_hd);
2271       evac((StgClosure **)&run_queue_tl);
2272   }
2273   
2274   if (blocked_queue_hd != END_TSO_QUEUE) {
2275       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2276       evac((StgClosure **)&blocked_queue_hd);
2277       evac((StgClosure **)&blocked_queue_tl);
2278   }
2279   
2280   if (sleeping_queue != END_TSO_QUEUE) {
2281       evac((StgClosure **)&sleeping_queue);
2282   }
2283 #endif 
2284
2285   for (m = main_threads; m != NULL; m = m->link) {
2286       evac((StgClosure **)&m->tso);
2287   }
2288   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2289       evac((StgClosure **)&suspended_ccalling_threads);
2290   }
2291
2292 #if defined(PAR) || defined(GRAN)
2293   markSparkQueue(evac);
2294 #endif
2295 }
2296
2297 /* -----------------------------------------------------------------------------
2298    performGC
2299
2300    This is the interface to the garbage collector from Haskell land.
2301    We provide this so that external C code can allocate and garbage
2302    collect when called from Haskell via _ccall_GC.
2303
2304    It might be useful to provide an interface whereby the programmer
2305    can specify more roots (ToDo).
2306    
2307    This needs to be protected by the GC condition variable above.  KH.
2308    -------------------------------------------------------------------------- */
2309
2310 void (*extra_roots)(evac_fn);
2311
2312 void
2313 performGC(void)
2314 {
2315   GarbageCollect(GetRoots,rtsFalse);
2316 }
2317
2318 void
2319 performMajorGC(void)
2320 {
2321   GarbageCollect(GetRoots,rtsTrue);
2322 }
2323
2324 static void
2325 AllRoots(evac_fn evac)
2326 {
2327     GetRoots(evac);             // the scheduler's roots
2328     extra_roots(evac);          // the user's roots
2329 }
2330
2331 void
2332 performGCWithRoots(void (*get_roots)(evac_fn))
2333 {
2334   extra_roots = get_roots;
2335   GarbageCollect(AllRoots,rtsFalse);
2336 }
2337
2338 /* -----------------------------------------------------------------------------
2339    Stack overflow
2340
2341    If the thread has reached its maximum stack size, then raise the
2342    StackOverflow exception in the offending thread.  Otherwise
2343    relocate the TSO into a larger chunk of memory and adjust its stack
2344    size appropriately.
2345    -------------------------------------------------------------------------- */
2346
2347 static StgTSO *
2348 threadStackOverflow(StgTSO *tso)
2349 {
2350   nat new_stack_size, new_tso_size, diff, stack_words;
2351   StgPtr new_sp;
2352   StgTSO *dest;
2353
2354   IF_DEBUG(sanity,checkTSO(tso));
2355   if (tso->stack_size >= tso->max_stack_size) {
2356
2357     IF_DEBUG(gc,
2358              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2359                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2360              /* If we're debugging, just print out the top of the stack */
2361              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2362                                               tso->sp+64)));
2363
2364     /* Send this thread the StackOverflow exception */
2365     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2366     return tso;
2367   }
2368
2369   /* Try to double the current stack size.  If that takes us over the
2370    * maximum stack size for this thread, then use the maximum instead.
2371    * Finally round up so the TSO ends up as a whole number of blocks.
2372    */
2373   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2374   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2375                                        TSO_STRUCT_SIZE)/sizeof(W_);
2376   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2377   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2378
2379   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2380
2381   dest = (StgTSO *)allocate(new_tso_size);
2382   TICK_ALLOC_TSO(new_stack_size,0);
2383
2384   /* copy the TSO block and the old stack into the new area */
2385   memcpy(dest,tso,TSO_STRUCT_SIZE);
2386   stack_words = tso->stack + tso->stack_size - tso->sp;
2387   new_sp = (P_)dest + new_tso_size - stack_words;
2388   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2389
2390   /* relocate the stack pointers... */
2391   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2392   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2393   dest->sp    = new_sp;
2394   dest->stack_size = new_stack_size;
2395         
2396   /* and relocate the update frame list */
2397   relocate_stack(dest, diff);
2398
2399   /* Mark the old TSO as relocated.  We have to check for relocated
2400    * TSOs in the garbage collector and any primops that deal with TSOs.
2401    *
2402    * It's important to set the sp and su values to just beyond the end
2403    * of the stack, so we don't attempt to scavenge any part of the
2404    * dead TSO's stack.
2405    */
2406   tso->what_next = ThreadRelocated;
2407   tso->link = dest;
2408   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2409   tso->su = (StgUpdateFrame *)tso->sp;
2410   tso->why_blocked = NotBlocked;
2411   dest->mut_link = NULL;
2412
2413   IF_PAR_DEBUG(verbose,
2414                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2415                      tso->id, tso, tso->stack_size);
2416                /* If we're debugging, just print out the top of the stack */
2417                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2418                                                 tso->sp+64)));
2419   
2420   IF_DEBUG(sanity,checkTSO(tso));
2421 #if 0
2422   IF_DEBUG(scheduler,printTSO(dest));
2423 #endif
2424
2425   return dest;
2426 }
2427
2428 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2429 //@subsection Blocking Queue Routines
2430
2431 /* ---------------------------------------------------------------------------
2432    Wake up a queue that was blocked on some resource.
2433    ------------------------------------------------------------------------ */
2434
2435 #if defined(GRAN)
2436 static inline void
2437 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2438 {
2439 }
2440 #elif defined(PAR)
2441 static inline void
2442 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2443 {
2444   /* write RESUME events to log file and
2445      update blocked and fetch time (depending on type of the orig closure) */
2446   if (RtsFlags.ParFlags.ParStats.Full) {
2447     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2448                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2449                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2450     if (EMPTY_RUN_QUEUE())
2451       emitSchedule = rtsTrue;
2452
2453     switch (get_itbl(node)->type) {
2454         case FETCH_ME_BQ:
2455           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2456           break;
2457         case RBH:
2458         case FETCH_ME:
2459         case BLACKHOLE_BQ:
2460           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2461           break;
2462 #ifdef DIST
2463         case MVAR:
2464           break;
2465 #endif    
2466         default:
2467           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2468         }
2469       }
2470 }
2471 #endif
2472
2473 #if defined(GRAN)
2474 static StgBlockingQueueElement *
2475 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2476 {
2477     StgTSO *tso;
2478     PEs node_loc, tso_loc;
2479
2480     node_loc = where_is(node); // should be lifted out of loop
2481     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2482     tso_loc = where_is((StgClosure *)tso);
2483     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2484       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2485       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2486       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2487       // insertThread(tso, node_loc);
2488       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2489                 ResumeThread,
2490                 tso, node, (rtsSpark*)NULL);
2491       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2492       // len_local++;
2493       // len++;
2494     } else { // TSO is remote (actually should be FMBQ)
2495       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2496                                   RtsFlags.GranFlags.Costs.gunblocktime +
2497                                   RtsFlags.GranFlags.Costs.latency;
2498       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2499                 UnblockThread,
2500                 tso, node, (rtsSpark*)NULL);
2501       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2502       // len++;
2503     }
2504     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2505     IF_GRAN_DEBUG(bq,
2506                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2507                           (node_loc==tso_loc ? "Local" : "Global"), 
2508                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2509     tso->block_info.closure = NULL;
2510     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2511                              tso->id, tso));
2512 }
2513 #elif defined(PAR)
2514 static StgBlockingQueueElement *
2515 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2516 {
2517     StgBlockingQueueElement *next;
2518
2519     switch (get_itbl(bqe)->type) {
2520     case TSO:
2521       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2522       /* if it's a TSO just push it onto the run_queue */
2523       next = bqe->link;
2524       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2525       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2526       THREAD_RUNNABLE();
2527       unblockCount(bqe, node);
2528       /* reset blocking status after dumping event */
2529       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2530       break;
2531
2532     case BLOCKED_FETCH:
2533       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2534       next = bqe->link;
2535       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2536       PendingFetches = (StgBlockedFetch *)bqe;
2537       break;
2538
2539 # if defined(DEBUG)
2540       /* can ignore this case in a non-debugging setup; 
2541          see comments on RBHSave closures above */
2542     case CONSTR:
2543       /* check that the closure is an RBHSave closure */
2544       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2545              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2546              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2547       break;
2548
2549     default:
2550       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2551            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2552            (StgClosure *)bqe);
2553 # endif
2554     }
2555   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2556   return next;
2557 }
2558
2559 #else /* !GRAN && !PAR */
2560 static StgTSO *
2561 unblockOneLocked(StgTSO *tso)
2562 {
2563   StgTSO *next;
2564
2565   ASSERT(get_itbl(tso)->type == TSO);
2566   ASSERT(tso->why_blocked != NotBlocked);
2567   tso->why_blocked = NotBlocked;
2568   next = tso->link;
2569   PUSH_ON_RUN_QUEUE(tso);
2570   THREAD_RUNNABLE();
2571   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2572   return next;
2573 }
2574 #endif
2575
2576 #if defined(GRAN) || defined(PAR)
2577 inline StgBlockingQueueElement *
2578 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2579 {
2580   ACQUIRE_LOCK(&sched_mutex);
2581   bqe = unblockOneLocked(bqe, node);
2582   RELEASE_LOCK(&sched_mutex);
2583   return bqe;
2584 }
2585 #else
2586 inline StgTSO *
2587 unblockOne(StgTSO *tso)
2588 {
2589   ACQUIRE_LOCK(&sched_mutex);
2590   tso = unblockOneLocked(tso);
2591   RELEASE_LOCK(&sched_mutex);
2592   return tso;
2593 }
2594 #endif
2595
2596 #if defined(GRAN)
2597 void 
2598 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2599 {
2600   StgBlockingQueueElement *bqe;
2601   PEs node_loc;
2602   nat len = 0; 
2603
2604   IF_GRAN_DEBUG(bq, 
2605                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2606                       node, CurrentProc, CurrentTime[CurrentProc], 
2607                       CurrentTSO->id, CurrentTSO));
2608
2609   node_loc = where_is(node);
2610
2611   ASSERT(q == END_BQ_QUEUE ||
2612          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2613          get_itbl(q)->type == CONSTR); // closure (type constructor)
2614   ASSERT(is_unique(node));
2615
2616   /* FAKE FETCH: magically copy the node to the tso's proc;
2617      no Fetch necessary because in reality the node should not have been 
2618      moved to the other PE in the first place
2619   */
2620   if (CurrentProc!=node_loc) {
2621     IF_GRAN_DEBUG(bq, 
2622                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2623                         node, node_loc, CurrentProc, CurrentTSO->id, 
2624                         // CurrentTSO, where_is(CurrentTSO),
2625                         node->header.gran.procs));
2626     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2627     IF_GRAN_DEBUG(bq, 
2628                   belch("## new bitmask of node %p is %#x",
2629                         node, node->header.gran.procs));
2630     if (RtsFlags.GranFlags.GranSimStats.Global) {
2631       globalGranStats.tot_fake_fetches++;
2632     }
2633   }
2634
2635   bqe = q;
2636   // ToDo: check: ASSERT(CurrentProc==node_loc);
2637   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2638     //next = bqe->link;
2639     /* 
2640        bqe points to the current element in the queue
2641        next points to the next element in the queue
2642     */
2643     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2644     //tso_loc = where_is(tso);
2645     len++;
2646     bqe = unblockOneLocked(bqe, node);
2647   }
2648
2649   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2650      the closure to make room for the anchor of the BQ */
2651   if (bqe!=END_BQ_QUEUE) {
2652     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2653     /*
2654     ASSERT((info_ptr==&RBH_Save_0_info) ||
2655            (info_ptr==&RBH_Save_1_info) ||
2656            (info_ptr==&RBH_Save_2_info));
2657     */
2658     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2659     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2660     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2661
2662     IF_GRAN_DEBUG(bq,
2663                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2664                         node, info_type(node)));
2665   }
2666
2667   /* statistics gathering */
2668   if (RtsFlags.GranFlags.GranSimStats.Global) {
2669     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2670     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2671     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2672     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2673   }
2674   IF_GRAN_DEBUG(bq,
2675                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2676                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2677 }
2678 #elif defined(PAR)
2679 void 
2680 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2681 {
2682   StgBlockingQueueElement *bqe;
2683
2684   ACQUIRE_LOCK(&sched_mutex);
2685
2686   IF_PAR_DEBUG(verbose, 
2687                belch("##-_ AwBQ for node %p on [%x]: ",
2688                      node, mytid));
2689 #ifdef DIST  
2690   //RFP
2691   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2692     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2693     return;
2694   }
2695 #endif
2696   
2697   ASSERT(q == END_BQ_QUEUE ||
2698          get_itbl(q)->type == TSO ||           
2699          get_itbl(q)->type == BLOCKED_FETCH || 
2700          get_itbl(q)->type == CONSTR); 
2701
2702   bqe = q;
2703   while (get_itbl(bqe)->type==TSO || 
2704          get_itbl(bqe)->type==BLOCKED_FETCH) {
2705     bqe = unblockOneLocked(bqe, node);
2706   }
2707   RELEASE_LOCK(&sched_mutex);
2708 }
2709
2710 #else   /* !GRAN && !PAR */
2711 void
2712 awakenBlockedQueue(StgTSO *tso)
2713 {
2714   ACQUIRE_LOCK(&sched_mutex);
2715   while (tso != END_TSO_QUEUE) {
2716     tso = unblockOneLocked(tso);
2717   }
2718   RELEASE_LOCK(&sched_mutex);
2719 }
2720 #endif
2721
2722 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2723 //@subsection Exception Handling Routines
2724
2725 /* ---------------------------------------------------------------------------
2726    Interrupt execution
2727    - usually called inside a signal handler so it mustn't do anything fancy.   
2728    ------------------------------------------------------------------------ */
2729
2730 void
2731 interruptStgRts(void)
2732 {
2733     interrupted    = 1;
2734     context_switch = 1;
2735 }
2736
2737 /* -----------------------------------------------------------------------------
2738    Unblock a thread
2739
2740    This is for use when we raise an exception in another thread, which
2741    may be blocked.
2742    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2743    -------------------------------------------------------------------------- */
2744
2745 #if defined(GRAN) || defined(PAR)
2746 /*
2747   NB: only the type of the blocking queue is different in GranSim and GUM
2748       the operations on the queue-elements are the same
2749       long live polymorphism!
2750 */
2751 static void
2752 unblockThread(StgTSO *tso)
2753 {
2754   StgBlockingQueueElement *t, **last;
2755
2756   ACQUIRE_LOCK(&sched_mutex);
2757   switch (tso->why_blocked) {
2758
2759   case NotBlocked:
2760     return;  /* not blocked */
2761
2762   case BlockedOnMVar:
2763     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2764     {
2765       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2766       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2767
2768       last = (StgBlockingQueueElement **)&mvar->head;
2769       for (t = (StgBlockingQueueElement *)mvar->head; 
2770            t != END_BQ_QUEUE; 
2771            last = &t->link, last_tso = t, t = t->link) {
2772         if (t == (StgBlockingQueueElement *)tso) {
2773           *last = (StgBlockingQueueElement *)tso->link;
2774           if (mvar->tail == tso) {
2775             mvar->tail = (StgTSO *)last_tso;
2776           }
2777           goto done;
2778         }
2779       }
2780       barf("unblockThread (MVAR): TSO not found");
2781     }
2782
2783   case BlockedOnBlackHole:
2784     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2785     {
2786       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2787
2788       last = &bq->blocking_queue;
2789       for (t = bq->blocking_queue; 
2790            t != END_BQ_QUEUE; 
2791            last = &t->link, t = t->link) {
2792         if (t == (StgBlockingQueueElement *)tso) {
2793           *last = (StgBlockingQueueElement *)tso->link;
2794           goto done;
2795         }
2796       }
2797       barf("unblockThread (BLACKHOLE): TSO not found");
2798     }
2799
2800   case BlockedOnException:
2801     {
2802       StgTSO *target  = tso->block_info.tso;
2803
2804       ASSERT(get_itbl(target)->type == TSO);
2805
2806       if (target->what_next == ThreadRelocated) {
2807           target = target->link;
2808           ASSERT(get_itbl(target)->type == TSO);
2809       }
2810
2811       ASSERT(target->blocked_exceptions != NULL);
2812
2813       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2814       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2815            t != END_BQ_QUEUE; 
2816            last = &t->link, t = t->link) {
2817         ASSERT(get_itbl(t)->type == TSO);
2818         if (t == (StgBlockingQueueElement *)tso) {
2819           *last = (StgBlockingQueueElement *)tso->link;
2820           goto done;
2821         }
2822       }
2823       barf("unblockThread (Exception): TSO not found");
2824     }
2825
2826   case BlockedOnRead:
2827   case BlockedOnWrite:
2828     {
2829       /* take TSO off blocked_queue */
2830       StgBlockingQueueElement *prev = NULL;
2831       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2832            prev = t, t = t->link) {
2833         if (t == (StgBlockingQueueElement *)tso) {
2834           if (prev == NULL) {
2835             blocked_queue_hd = (StgTSO *)t->link;
2836             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2837               blocked_queue_tl = END_TSO_QUEUE;
2838             }
2839           } else {
2840             prev->link = t->link;
2841             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2842               blocked_queue_tl = (StgTSO *)prev;
2843             }
2844           }
2845           goto done;
2846         }
2847       }
2848       barf("unblockThread (I/O): TSO not found");
2849     }
2850
2851   case BlockedOnDelay:
2852     {
2853       /* take TSO off sleeping_queue */
2854       StgBlockingQueueElement *prev = NULL;
2855       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2856            prev = t, t = t->link) {
2857         if (t == (StgBlockingQueueElement *)tso) {
2858           if (prev == NULL) {
2859             sleeping_queue = (StgTSO *)t->link;
2860           } else {
2861             prev->link = t->link;
2862           }
2863           goto done;
2864         }
2865       }
2866       barf("unblockThread (I/O): TSO not found");
2867     }
2868
2869   default:
2870     barf("unblockThread");
2871   }
2872
2873  done:
2874   tso->link = END_TSO_QUEUE;
2875   tso->why_blocked = NotBlocked;
2876   tso->block_info.closure = NULL;
2877   PUSH_ON_RUN_QUEUE(tso);
2878   RELEASE_LOCK(&sched_mutex);
2879 }
2880 #else
2881 static void
2882 unblockThread(StgTSO *tso)
2883 {
2884   StgTSO *t, **last;
2885
2886   ACQUIRE_LOCK(&sched_mutex);
2887   switch (tso->why_blocked) {
2888
2889   case NotBlocked:
2890     return;  /* not blocked */
2891
2892   case BlockedOnMVar:
2893     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2894     {
2895       StgTSO *last_tso = END_TSO_QUEUE;
2896       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2897
2898       last = &mvar->head;
2899       for (t = mvar->head; t != END_TSO_QUEUE; 
2900            last = &t->link, last_tso = t, t = t->link) {
2901         if (t == tso) {
2902           *last = tso->link;
2903           if (mvar->tail == tso) {
2904             mvar->tail = last_tso;
2905           }
2906           goto done;
2907         }
2908       }
2909       barf("unblockThread (MVAR): TSO not found");
2910     }
2911
2912   case BlockedOnBlackHole:
2913     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2914     {
2915       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2916
2917       last = &bq->blocking_queue;
2918       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2919            last = &t->link, t = t->link) {
2920         if (t == tso) {
2921           *last = tso->link;
2922           goto done;
2923         }
2924       }
2925       barf("unblockThread (BLACKHOLE): TSO not found");
2926     }
2927
2928   case BlockedOnException:
2929     {
2930       StgTSO *target  = tso->block_info.tso;
2931
2932       ASSERT(get_itbl(target)->type == TSO);
2933
2934       while (target->what_next == ThreadRelocated) {
2935           target = target->link;
2936           ASSERT(get_itbl(target)->type == TSO);
2937       }
2938       
2939       ASSERT(target->blocked_exceptions != NULL);
2940
2941       last = &target->blocked_exceptions;
2942       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2943            last = &t->link, t = t->link) {
2944         ASSERT(get_itbl(t)->type == TSO);
2945         if (t == tso) {
2946           *last = tso->link;
2947           goto done;
2948         }
2949       }
2950       barf("unblockThread (Exception): TSO not found");
2951     }
2952
2953   case BlockedOnRead:
2954   case BlockedOnWrite:
2955     {
2956       StgTSO *prev = NULL;
2957       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2958            prev = t, t = t->link) {
2959         if (t == tso) {
2960           if (prev == NULL) {
2961             blocked_queue_hd = t->link;
2962             if (blocked_queue_tl == t) {
2963               blocked_queue_tl = END_TSO_QUEUE;
2964             }
2965           } else {
2966             prev->link = t->link;
2967             if (blocked_queue_tl == t) {
2968               blocked_queue_tl = prev;
2969             }
2970           }
2971           goto done;
2972         }
2973       }
2974       barf("unblockThread (I/O): TSO not found");
2975     }
2976
2977   case BlockedOnDelay:
2978     {
2979       StgTSO *prev = NULL;
2980       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2981            prev = t, t = t->link) {
2982         if (t == tso) {
2983           if (prev == NULL) {
2984             sleeping_queue = t->link;
2985           } else {
2986             prev->link = t->link;
2987           }
2988           goto done;
2989         }
2990       }
2991       barf("unblockThread (I/O): TSO not found");
2992     }
2993
2994   default:
2995     barf("unblockThread");
2996   }
2997
2998  done:
2999   tso->link = END_TSO_QUEUE;
3000   tso->why_blocked = NotBlocked;
3001   tso->block_info.closure = NULL;
3002   PUSH_ON_RUN_QUEUE(tso);
3003   RELEASE_LOCK(&sched_mutex);
3004 }
3005 #endif
3006
3007 /* -----------------------------------------------------------------------------
3008  * raiseAsync()
3009  *
3010  * The following function implements the magic for raising an
3011  * asynchronous exception in an existing thread.
3012  *
3013  * We first remove the thread from any queue on which it might be
3014  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3015  *
3016  * We strip the stack down to the innermost CATCH_FRAME, building
3017  * thunks in the heap for all the active computations, so they can 
3018  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3019  * an application of the handler to the exception, and push it on
3020  * the top of the stack.
3021  * 
3022  * How exactly do we save all the active computations?  We create an
3023  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3024  * AP_UPDs pushes everything from the corresponding update frame
3025  * upwards onto the stack.  (Actually, it pushes everything up to the
3026  * next update frame plus a pointer to the next AP_UPD object.
3027  * Entering the next AP_UPD object pushes more onto the stack until we
3028  * reach the last AP_UPD object - at which point the stack should look
3029  * exactly as it did when we killed the TSO and we can continue
3030  * execution by entering the closure on top of the stack.
3031  *
3032  * We can also kill a thread entirely - this happens if either (a) the 
3033  * exception passed to raiseAsync is NULL, or (b) there's no
3034  * CATCH_FRAME on the stack.  In either case, we strip the entire
3035  * stack and replace the thread with a zombie.
3036  *
3037  * -------------------------------------------------------------------------- */
3038  
3039 void 
3040 deleteThread(StgTSO *tso)
3041 {
3042   raiseAsync(tso,NULL);
3043 }
3044
3045 void
3046 raiseAsync(StgTSO *tso, StgClosure *exception)
3047 {
3048   StgUpdateFrame* su = tso->su;
3049   StgPtr          sp = tso->sp;
3050   
3051   /* Thread already dead? */
3052   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3053     return;
3054   }
3055
3056   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3057
3058   /* Remove it from any blocking queues */
3059   unblockThread(tso);
3060
3061   /* The stack freezing code assumes there's a closure pointer on
3062    * the top of the stack.  This isn't always the case with compiled
3063    * code, so we have to push a dummy closure on the top which just
3064    * returns to the next return address on the stack.
3065    */
3066   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3067     *(--sp) = (W_)&stg_dummy_ret_closure;
3068   }
3069
3070   while (1) {
3071     nat words = ((P_)su - (P_)sp) - 1;
3072     nat i;
3073     StgAP_UPD * ap;
3074
3075     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3076      * then build the THUNK raise(exception), and leave it on
3077      * top of the CATCH_FRAME ready to enter.
3078      */
3079     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3080       StgCatchFrame *cf = (StgCatchFrame *)su;
3081       StgClosure *raise;
3082
3083       /* we've got an exception to raise, so let's pass it to the
3084        * handler in this frame.
3085        */
3086       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3087       TICK_ALLOC_SE_THK(1,0);
3088       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3089       raise->payload[0] = exception;
3090
3091       /* throw away the stack from Sp up to the CATCH_FRAME.
3092        */
3093       sp = (P_)su - 1;
3094
3095       /* Ensure that async excpetions are blocked now, so we don't get
3096        * a surprise exception before we get around to executing the
3097        * handler.
3098        */
3099       if (tso->blocked_exceptions == NULL) {
3100           tso->blocked_exceptions = END_TSO_QUEUE;
3101       }
3102
3103       /* Put the newly-built THUNK on top of the stack, ready to execute
3104        * when the thread restarts.
3105        */
3106       sp[0] = (W_)raise;
3107       tso->sp = sp;
3108       tso->su = su;
3109       tso->what_next = ThreadEnterGHC;
3110       IF_DEBUG(sanity, checkTSO(tso));
3111       return;
3112     }
3113
3114     /* First build an AP_UPD consisting of the stack chunk above the
3115      * current update frame, with the top word on the stack as the
3116      * fun field.
3117      */
3118     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3119     
3120     ASSERT(words >= 0);
3121     
3122     ap->n_args = words;
3123     ap->fun    = (StgClosure *)sp[0];
3124     sp++;
3125     for(i=0; i < (nat)words; ++i) {
3126       ap->payload[i] = (StgClosure *)*sp++;
3127     }
3128     
3129     switch (get_itbl(su)->type) {
3130       
3131     case UPDATE_FRAME:
3132       {
3133         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3134         TICK_ALLOC_UP_THK(words+1,0);
3135         
3136         IF_DEBUG(scheduler,
3137                  fprintf(stderr,  "scheduler: Updating ");
3138                  printPtr((P_)su->updatee); 
3139                  fprintf(stderr,  " with ");
3140                  printObj((StgClosure *)ap);
3141                  );
3142         
3143         /* Replace the updatee with an indirection - happily
3144          * this will also wake up any threads currently
3145          * waiting on the result.
3146          *
3147          * Warning: if we're in a loop, more than one update frame on
3148          * the stack may point to the same object.  Be careful not to
3149          * overwrite an IND_OLDGEN in this case, because we'll screw
3150          * up the mutable lists.  To be on the safe side, don't
3151          * overwrite any kind of indirection at all.  See also
3152          * threadSqueezeStack in GC.c, where we have to make a similar
3153          * check.
3154          */
3155         if (!closure_IND(su->updatee)) {
3156             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3157         }
3158         su = su->link;
3159         sp += sizeofW(StgUpdateFrame) -1;
3160         sp[0] = (W_)ap; /* push onto stack */
3161         break;
3162       }
3163
3164     case CATCH_FRAME:
3165       {
3166         StgCatchFrame *cf = (StgCatchFrame *)su;
3167         StgClosure* o;
3168         
3169         /* We want a PAP, not an AP_UPD.  Fortunately, the
3170          * layout's the same.
3171          */
3172         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3173         TICK_ALLOC_UPD_PAP(words+1,0);
3174         
3175         /* now build o = FUN(catch,ap,handler) */
3176         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3177         TICK_ALLOC_FUN(2,0);
3178         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3179         o->payload[0] = (StgClosure *)ap;
3180         o->payload[1] = cf->handler;
3181         
3182         IF_DEBUG(scheduler,
3183                  fprintf(stderr,  "scheduler: Built ");
3184                  printObj((StgClosure *)o);
3185                  );
3186         
3187         /* pop the old handler and put o on the stack */
3188         su = cf->link;
3189         sp += sizeofW(StgCatchFrame) - 1;
3190         sp[0] = (W_)o;
3191         break;
3192       }
3193       
3194     case SEQ_FRAME:
3195       {
3196         StgSeqFrame *sf = (StgSeqFrame *)su;
3197         StgClosure* o;
3198         
3199         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3200         TICK_ALLOC_UPD_PAP(words+1,0);
3201         
3202         /* now build o = FUN(seq,ap) */
3203         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3204         TICK_ALLOC_SE_THK(1,0);
3205         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3206         o->payload[0] = (StgClosure *)ap;
3207         
3208         IF_DEBUG(scheduler,
3209                  fprintf(stderr,  "scheduler: Built ");
3210                  printObj((StgClosure *)o);
3211                  );
3212         
3213         /* pop the old handler and put o on the stack */
3214         su = sf->link;
3215         sp += sizeofW(StgSeqFrame) - 1;
3216         sp[0] = (W_)o;
3217         break;
3218       }
3219       
3220     case STOP_FRAME:
3221       /* We've stripped the entire stack, the thread is now dead. */
3222       sp += sizeofW(StgStopFrame) - 1;
3223       sp[0] = (W_)exception;    /* save the exception */
3224       tso->what_next = ThreadKilled;
3225       tso->su = (StgUpdateFrame *)(sp+1);
3226       tso->sp = sp;
3227       return;
3228
3229     default:
3230       barf("raiseAsync");
3231     }
3232   }
3233   barf("raiseAsync");
3234 }
3235
3236 /* -----------------------------------------------------------------------------
3237    resurrectThreads is called after garbage collection on the list of
3238    threads found to be garbage.  Each of these threads will be woken
3239    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3240    on an MVar, or NonTermination if the thread was blocked on a Black
3241    Hole.
3242    -------------------------------------------------------------------------- */
3243
3244 void
3245 resurrectThreads( StgTSO *threads )
3246 {
3247   StgTSO *tso, *next;
3248
3249   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3250     next = tso->global_link;
3251     tso->global_link = all_threads;
3252     all_threads = tso;
3253     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3254
3255     switch (tso->why_blocked) {
3256     case BlockedOnMVar:
3257     case BlockedOnException:
3258       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3259       break;
3260     case BlockedOnBlackHole:
3261       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3262       break;
3263     case NotBlocked:
3264       /* This might happen if the thread was blocked on a black hole
3265        * belonging to a thread that we've just woken up (raiseAsync
3266        * can wake up threads, remember...).
3267        */
3268       continue;
3269     default:
3270       barf("resurrectThreads: thread blocked in a strange way");
3271     }
3272   }
3273 }
3274
3275 /* -----------------------------------------------------------------------------
3276  * Blackhole detection: if we reach a deadlock, test whether any
3277  * threads are blocked on themselves.  Any threads which are found to
3278  * be self-blocked get sent a NonTermination exception.
3279  *
3280  * This is only done in a deadlock situation in order to avoid
3281  * performance overhead in the normal case.
3282  * -------------------------------------------------------------------------- */
3283
3284 static void
3285 detectBlackHoles( void )
3286 {
3287     StgTSO *t = all_threads;
3288     StgUpdateFrame *frame;
3289     StgClosure *blocked_on;
3290
3291     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3292
3293         while (t->what_next == ThreadRelocated) {
3294             t = t->link;
3295             ASSERT(get_itbl(t)->type == TSO);
3296         }
3297       
3298         if (t->why_blocked != BlockedOnBlackHole) {
3299             continue;
3300         }
3301
3302         blocked_on = t->block_info.closure;
3303
3304         for (frame = t->su; ; frame = frame->link) {
3305             switch (get_itbl(frame)->type) {
3306
3307             case UPDATE_FRAME:
3308                 if (frame->updatee == blocked_on) {
3309                     /* We are blocking on one of our own computations, so
3310                      * send this thread the NonTermination exception.  
3311                      */
3312                     IF_DEBUG(scheduler, 
3313                              sched_belch("thread %d is blocked on itself", t->id));
3314                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3315                     goto done;
3316                 }
3317                 else {
3318                     continue;
3319                 }
3320
3321             case CATCH_FRAME:
3322             case SEQ_FRAME:
3323                 continue;
3324                 
3325             case STOP_FRAME:
3326                 break;
3327             }
3328             break;
3329         }
3330
3331     done: ;
3332     }   
3333 }
3334
3335 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3336 //@subsection Debugging Routines
3337
3338 /* -----------------------------------------------------------------------------
3339    Debugging: why is a thread blocked
3340    -------------------------------------------------------------------------- */
3341
3342 #ifdef DEBUG
3343
3344 void
3345 printThreadBlockage(StgTSO *tso)
3346 {
3347   switch (tso->why_blocked) {
3348   case BlockedOnRead:
3349     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3350     break;
3351   case BlockedOnWrite:
3352     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3353     break;
3354   case BlockedOnDelay:
3355     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3356     break;
3357   case BlockedOnMVar:
3358     fprintf(stderr,"is blocked on an MVar");
3359     break;
3360   case BlockedOnException:
3361     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3362             tso->block_info.tso->id);
3363     break;
3364   case BlockedOnBlackHole:
3365     fprintf(stderr,"is blocked on a black hole");
3366     break;
3367   case NotBlocked:
3368     fprintf(stderr,"is not blocked");
3369     break;
3370 #if defined(PAR)
3371   case BlockedOnGA:
3372     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3373             tso->block_info.closure, info_type(tso->block_info.closure));
3374     break;
3375   case BlockedOnGA_NoSend:
3376     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3377             tso->block_info.closure, info_type(tso->block_info.closure));
3378     break;
3379 #endif
3380 #if defined(RTS_SUPPORTS_THREADS)
3381   case BlockedOnCCall:
3382     fprintf(stderr,"is blocked on an external call");
3383     break;
3384 #endif
3385   default:
3386     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3387          tso->why_blocked, tso->id, tso);
3388   }
3389 }
3390
3391 void
3392 printThreadStatus(StgTSO *tso)
3393 {
3394   switch (tso->what_next) {
3395   case ThreadKilled:
3396     fprintf(stderr,"has been killed");
3397     break;
3398   case ThreadComplete:
3399     fprintf(stderr,"has completed");
3400     break;
3401   default:
3402     printThreadBlockage(tso);
3403   }
3404 }
3405
3406 void
3407 printAllThreads(void)
3408 {
3409   StgTSO *t;
3410
3411 # if defined(GRAN)
3412   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3413   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3414                        time_string, rtsFalse/*no commas!*/);
3415
3416   sched_belch("all threads at [%s]:", time_string);
3417 # elif defined(PAR)
3418   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3419   ullong_format_string(CURRENT_TIME,
3420                        time_string, rtsFalse/*no commas!*/);
3421
3422   sched_belch("all threads at [%s]:", time_string);
3423 # else
3424   sched_belch("all threads:");
3425 # endif
3426
3427   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3428     fprintf(stderr, "\tthread %d ", t->id);
3429     printThreadStatus(t);
3430     fprintf(stderr,"\n");
3431   }
3432 }
3433     
3434 /* 
3435    Print a whole blocking queue attached to node (debugging only).
3436 */
3437 //@cindex print_bq
3438 # if defined(PAR)
3439 void 
3440 print_bq (StgClosure *node)
3441 {
3442   StgBlockingQueueElement *bqe;
3443   StgTSO *tso;
3444   rtsBool end;
3445
3446   fprintf(stderr,"## BQ of closure %p (%s): ",
3447           node, info_type(node));
3448
3449   /* should cover all closures that may have a blocking queue */
3450   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3451          get_itbl(node)->type == FETCH_ME_BQ ||
3452          get_itbl(node)->type == RBH ||
3453          get_itbl(node)->type == MVAR);
3454     
3455   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3456
3457   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3458 }
3459
3460 /* 
3461    Print a whole blocking queue starting with the element bqe.
3462 */
3463 void 
3464 print_bqe (StgBlockingQueueElement *bqe)
3465 {
3466   rtsBool end;
3467
3468   /* 
3469      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3470   */
3471   for (end = (bqe==END_BQ_QUEUE);
3472        !end; // iterate until bqe points to a CONSTR
3473        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3474        bqe = end ? END_BQ_QUEUE : bqe->link) {
3475     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3476     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3477     /* types of closures that may appear in a blocking queue */
3478     ASSERT(get_itbl(bqe)->type == TSO ||           
3479            get_itbl(bqe)->type == BLOCKED_FETCH || 
3480            get_itbl(bqe)->type == CONSTR); 
3481     /* only BQs of an RBH end with an RBH_Save closure */
3482     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3483
3484     switch (get_itbl(bqe)->type) {
3485     case TSO:
3486       fprintf(stderr," TSO %u (%x),",
3487               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3488       break;
3489     case BLOCKED_FETCH:
3490       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3491               ((StgBlockedFetch *)bqe)->node, 
3492               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3493               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3494               ((StgBlockedFetch *)bqe)->ga.weight);
3495       break;
3496     case CONSTR:
3497       fprintf(stderr," %s (IP %p),",
3498               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3499                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3500                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3501                "RBH_Save_?"), get_itbl(bqe));
3502       break;
3503     default:
3504       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3505            info_type((StgClosure *)bqe)); // , node, info_type(node));
3506       break;
3507     }
3508   } /* for */
3509   fputc('\n', stderr);
3510 }
3511 # elif defined(GRAN)
3512 void 
3513 print_bq (StgClosure *node)
3514 {
3515   StgBlockingQueueElement *bqe;
3516   PEs node_loc, tso_loc;
3517   rtsBool end;
3518
3519   /* should cover all closures that may have a blocking queue */
3520   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3521          get_itbl(node)->type == FETCH_ME_BQ ||
3522          get_itbl(node)->type == RBH);
3523     
3524   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3525   node_loc = where_is(node);
3526
3527   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3528           node, info_type(node), node_loc);
3529
3530   /* 
3531      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3532   */
3533   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3534        !end; // iterate until bqe points to a CONSTR
3535        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3536     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3537     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3538     /* types of closures that may appear in a blocking queue */
3539     ASSERT(get_itbl(bqe)->type == TSO ||           
3540            get_itbl(bqe)->type == CONSTR); 
3541     /* only BQs of an RBH end with an RBH_Save closure */
3542     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3543
3544     tso_loc = where_is((StgClosure *)bqe);
3545     switch (get_itbl(bqe)->type) {
3546     case TSO:
3547       fprintf(stderr," TSO %d (%p) on [PE %d],",
3548               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3549       break;
3550     case CONSTR:
3551       fprintf(stderr," %s (IP %p),",
3552               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3553                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3554                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3555                "RBH_Save_?"), get_itbl(bqe));
3556       break;
3557     default:
3558       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3559            info_type((StgClosure *)bqe), node, info_type(node));
3560       break;
3561     }
3562   } /* for */
3563   fputc('\n', stderr);
3564 }
3565 #else
3566 /* 
3567    Nice and easy: only TSOs on the blocking queue
3568 */
3569 void 
3570 print_bq (StgClosure *node)
3571 {
3572   StgTSO *tso;
3573
3574   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3575   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3576        tso != END_TSO_QUEUE; 
3577        tso=tso->link) {
3578     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3579     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3580     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3581   }
3582   fputc('\n', stderr);
3583 }
3584 # endif
3585
3586 #if defined(PAR)
3587 static nat
3588 run_queue_len(void)
3589 {
3590   nat i;
3591   StgTSO *tso;
3592
3593   for (i=0, tso=run_queue_hd; 
3594        tso != END_TSO_QUEUE;
3595        i++, tso=tso->link)
3596     /* nothing */
3597
3598   return i;
3599 }
3600 #endif
3601
3602 static void
3603 sched_belch(char *s, ...)
3604 {
3605   va_list ap;
3606   va_start(ap,s);
3607 #ifdef SMP
3608   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3609 #elif defined(PAR)
3610   fprintf(stderr, "== ");
3611 #else
3612   fprintf(stderr, "scheduler: ");
3613 #endif
3614   vfprintf(stderr, s, ap);
3615   fprintf(stderr, "\n");
3616 }
3617
3618 #endif /* DEBUG */
3619
3620
3621 //@node Index,  , Debugging Routines, Main scheduling code
3622 //@subsection Index
3623
3624 //@index
3625 //* StgMainThread::  @cindex\s-+StgMainThread
3626 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3627 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3628 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3629 //* context_switch::  @cindex\s-+context_switch
3630 //* createThread::  @cindex\s-+createThread
3631 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3632 //* initScheduler::  @cindex\s-+initScheduler
3633 //* interrupted::  @cindex\s-+interrupted
3634 //* next_thread_id::  @cindex\s-+next_thread_id
3635 //* print_bq::  @cindex\s-+print_bq
3636 //* run_queue_hd::  @cindex\s-+run_queue_hd
3637 //* run_queue_tl::  @cindex\s-+run_queue_tl
3638 //* sched_mutex::  @cindex\s-+sched_mutex
3639 //* schedule::  @cindex\s-+schedule
3640 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3641 //* term_mutex::  @cindex\s-+term_mutex
3642 //@end index