8a4744309194dda4ef63d162ad3e1eb0f3ed47c2
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.135 2002/04/01 11:18:19 panne Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #include <stdarg.h>
118
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
121
122 /* Main thread queue.
123  * Locks required: sched_mutex.
124  */
125 StgMainThread *main_threads;
126
127 /* Thread queues.
128  * Locks required: sched_mutex.
129  */
130 #if defined(GRAN)
131
132 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
133 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
134
135 /* 
136    In GranSim we have a runnable and a blocked queue for each processor.
137    In order to minimise code changes new arrays run_queue_hds/tls
138    are created. run_queue_hd is then a short cut (macro) for
139    run_queue_hds[CurrentProc] (see GranSim.h).
140    -- HWL
141 */
142 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
143 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
144 StgTSO *ccalling_threadss[MAX_PROC];
145 /* We use the same global list of threads (all_threads) in GranSim as in
146    the std RTS (i.e. we are cheating). However, we don't use this list in
147    the GranSim specific code at the moment (so we are only potentially
148    cheating).  */
149
150 #else /* !GRAN */
151
152 StgTSO *run_queue_hd, *run_queue_tl;
153 StgTSO *blocked_queue_hd, *blocked_queue_tl;
154 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
155
156 #endif
157
158 /* Linked list of all threads.
159  * Used for detecting garbage collected threads.
160  */
161 StgTSO *all_threads;
162
163 /* When a thread performs a safe C call (_ccall_GC, using old
164  * terminology), it gets put on the suspended_ccalling_threads
165  * list. Used by the garbage collector.
166  */
167 static StgTSO *suspended_ccalling_threads;
168
169 static StgTSO *threadStackOverflow(StgTSO *tso);
170
171 /* KH: The following two flags are shared memory locations.  There is no need
172        to lock them, since they are only unset at the end of a scheduler
173        operation.
174 */
175
176 /* flag set by signal handler to precipitate a context switch */
177 //@cindex context_switch
178 nat context_switch;
179
180 /* if this flag is set as well, give up execution */
181 //@cindex interrupted
182 rtsBool interrupted;
183
184 /* Next thread ID to allocate.
185  * Locks required: sched_mutex
186  */
187 //@cindex next_thread_id
188 StgThreadID next_thread_id = 1;
189
190 /*
191  * Pointers to the state of the current thread.
192  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
193  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
194  */
195  
196 /* The smallest stack size that makes any sense is:
197  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
198  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
199  *  + 1                       (the realworld token for an IO thread)
200  *  + 1                       (the closure to enter)
201  *
202  * A thread with this stack will bomb immediately with a stack
203  * overflow, which will increase its stack size.  
204  */
205
206 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
207
208
209 #if defined(GRAN)
210 StgTSO *CurrentTSO;
211 #endif
212
213 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
214  *  exists - earlier gccs apparently didn't.
215  *  -= chak
216  */
217 StgTSO dummy_tso;
218
219 rtsBool ready_to_gc;
220
221 void            addToBlockedQueue ( StgTSO *tso );
222
223 static void     schedule          ( void );
224        void     interruptStgRts   ( void );
225 #if defined(GRAN)
226 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
227 #else
228 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
229 #endif
230
231 static void     detectBlackHoles  ( void );
232
233 #ifdef DEBUG
234 static void sched_belch(char *s, ...);
235 #endif
236
237 #if defined(RTS_SUPPORTS_THREADS)
238 /* ToDo: carefully document the invariants that go together
239  *       with these synchronisation objects.
240  */
241 Mutex     sched_mutex       = INIT_MUTEX_VAR;
242 Mutex     term_mutex        = INIT_MUTEX_VAR;
243
244 # if defined(SMP)
245 static Condition gc_pending_cond = INIT_COND_VAR;
246 nat await_death;
247 # endif
248
249 #endif /* RTS_SUPPORTS_THREADS */
250
251 #if defined(PAR)
252 StgTSO *LastTSO;
253 rtsTime TimeOfLastYield;
254 rtsBool emitSchedule = rtsTrue;
255 #endif
256
257 #if DEBUG
258 char *whatNext_strs[] = {
259   "ThreadEnterGHC",
260   "ThreadRunGHC",
261   "ThreadEnterInterp",
262   "ThreadKilled",
263   "ThreadComplete"
264 };
265
266 char *threadReturnCode_strs[] = {
267   "HeapOverflow",                       /* might also be StackOverflow */
268   "StackOverflow",
269   "ThreadYielding",
270   "ThreadBlocked",
271   "ThreadFinished"
272 };
273 #endif
274
275 #if defined(PAR)
276 StgTSO * createSparkThread(rtsSpark spark);
277 StgTSO * activateSpark (rtsSpark spark);  
278 #endif
279
280 /*
281  * The thread state for the main thread.
282 // ToDo: check whether not needed any more
283 StgTSO   *MainTSO;
284  */
285
286 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
287 static void taskStart(void);
288 static void
289 taskStart(void)
290 {
291   schedule();
292 }
293 #endif
294
295
296
297
298 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
299 //@subsection Main scheduling loop
300
301 /* ---------------------------------------------------------------------------
302    Main scheduling loop.
303
304    We use round-robin scheduling, each thread returning to the
305    scheduler loop when one of these conditions is detected:
306
307       * out of heap space
308       * timer expires (thread yields)
309       * thread blocks
310       * thread ends
311       * stack overflow
312
313    Locking notes:  we acquire the scheduler lock once at the beginning
314    of the scheduler loop, and release it when
315     
316       * running a thread, or
317       * waiting for work, or
318       * waiting for a GC to complete.
319
320    GRAN version:
321      In a GranSim setup this loop iterates over the global event queue.
322      This revolves around the global event queue, which determines what 
323      to do next. Therefore, it's more complicated than either the 
324      concurrent or the parallel (GUM) setup.
325
326    GUM version:
327      GUM iterates over incoming messages.
328      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
329      and sends out a fish whenever it has nothing to do; in-between
330      doing the actual reductions (shared code below) it processes the
331      incoming messages and deals with delayed operations 
332      (see PendingFetches).
333      This is not the ugliest code you could imagine, but it's bloody close.
334
335    ------------------------------------------------------------------------ */
336 //@cindex schedule
337 static void
338 schedule( void )
339 {
340   StgTSO *t;
341   Capability *cap;
342   StgThreadReturnCode ret;
343 #if defined(GRAN)
344   rtsEvent *event;
345 #elif defined(PAR)
346   StgSparkPool *pool;
347   rtsSpark spark;
348   StgTSO *tso;
349   GlobalTaskId pe;
350   rtsBool receivedFinish = rtsFalse;
351 # if defined(DEBUG)
352   nat tp_size, sp_size; // stats only
353 # endif
354 #endif
355   rtsBool was_interrupted = rtsFalse;
356   
357   ACQUIRE_LOCK(&sched_mutex);
358  
359 #if defined(RTS_SUPPORTS_THREADS)
360   /* Check to see whether there are any worker threads
361      waiting to deposit external call results. If so,
362      yield our capability */
363   yieldToReturningWorker(&sched_mutex, cap);
364
365   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
366 #endif
367
368 #if defined(GRAN)
369   /* set up first event to get things going */
370   /* ToDo: assign costs for system setup and init MainTSO ! */
371   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
372             ContinueThread, 
373             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
374
375   IF_DEBUG(gran,
376            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
377            G_TSO(CurrentTSO, 5));
378
379   if (RtsFlags.GranFlags.Light) {
380     /* Save current time; GranSim Light only */
381     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
382   }      
383
384   event = get_next_event();
385
386   while (event!=(rtsEvent*)NULL) {
387     /* Choose the processor with the next event */
388     CurrentProc = event->proc;
389     CurrentTSO = event->tso;
390
391 #elif defined(PAR)
392
393   while (!receivedFinish) {    /* set by processMessages */
394                                /* when receiving PP_FINISH message         */ 
395 #else
396
397   while (1) {
398
399 #endif
400
401     IF_DEBUG(scheduler, printAllThreads());
402
403     /* If we're interrupted (the user pressed ^C, or some other
404      * termination condition occurred), kill all the currently running
405      * threads.
406      */
407     if (interrupted) {
408       IF_DEBUG(scheduler, sched_belch("interrupted"));
409       deleteAllThreads();
410       interrupted = rtsFalse;
411       was_interrupted = rtsTrue;
412     }
413
414     /* Go through the list of main threads and wake up any
415      * clients whose computations have finished.  ToDo: this
416      * should be done more efficiently without a linear scan
417      * of the main threads list, somehow...
418      */
419 #if defined(RTS_SUPPORTS_THREADS)
420     { 
421       StgMainThread *m, **prev;
422       prev = &main_threads;
423       for (m = main_threads; m != NULL; m = m->link) {
424         switch (m->tso->what_next) {
425         case ThreadComplete:
426           if (m->ret) {
427             *(m->ret) = (StgClosure *)m->tso->sp[0];
428           }
429           *prev = m->link;
430           m->stat = Success;
431           broadcastCondition(&m->wakeup);
432           break;
433         case ThreadKilled:
434           if (m->ret) *(m->ret) = NULL;
435           *prev = m->link;
436           if (was_interrupted) {
437             m->stat = Interrupted;
438           } else {
439             m->stat = Killed;
440           }
441           broadcastCondition(&m->wakeup);
442           break;
443         default:
444           break;
445         }
446       }
447     }
448
449 #else /* not threaded */
450
451 # if defined(PAR)
452     /* in GUM do this only on the Main PE */
453     if (IAmMainThread)
454 # endif
455     /* If our main thread has finished or been killed, return.
456      */
457     {
458       StgMainThread *m = main_threads;
459       if (m->tso->what_next == ThreadComplete
460           || m->tso->what_next == ThreadKilled) {
461         main_threads = main_threads->link;
462         if (m->tso->what_next == ThreadComplete) {
463           /* we finished successfully, fill in the return value */
464           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
465           m->stat = Success;
466           return;
467         } else {
468           if (m->ret) { *(m->ret) = NULL; };
469           if (was_interrupted) {
470             m->stat = Interrupted;
471           } else {
472             m->stat = Killed;
473           }
474           return;
475         }
476       }
477     }
478 #endif
479
480     /* Top up the run queue from our spark pool.  We try to make the
481      * number of threads in the run queue equal to the number of
482      * free capabilities.
483      *
484      * Disable spark support in SMP for now, non-essential & requires
485      * a little bit of work to make it compile cleanly. -- sof 1/02.
486      */
487 #if 0 /* defined(SMP) */
488     {
489       nat n = getFreeCapabilities();
490       StgTSO *tso = run_queue_hd;
491
492       /* Count the run queue */
493       while (n > 0 && tso != END_TSO_QUEUE) {
494         tso = tso->link;
495         n--;
496       }
497
498       for (; n > 0; n--) {
499         StgClosure *spark;
500         spark = findSpark(rtsFalse);
501         if (spark == NULL) {
502           break; /* no more sparks in the pool */
503         } else {
504           /* I'd prefer this to be done in activateSpark -- HWL */
505           /* tricky - it needs to hold the scheduler lock and
506            * not try to re-acquire it -- SDM */
507           createSparkThread(spark);       
508           IF_DEBUG(scheduler,
509                    sched_belch("==^^ turning spark of closure %p into a thread",
510                                (StgClosure *)spark));
511         }
512       }
513       /* We need to wake up the other tasks if we just created some
514        * work for them.
515        */
516       if (getFreeCapabilities() - n > 1) {
517           signalCondition( &thread_ready_cond );
518       }
519     }
520 #endif // SMP
521
522     /* check for signals each time around the scheduler */
523 #ifndef mingw32_TARGET_OS
524     if (signals_pending()) {
525       startSignalHandlers();
526     }
527 #endif
528
529     /* Check whether any waiting threads need to be woken up.  If the
530      * run queue is empty, and there are no other tasks running, we
531      * can wait indefinitely for something to happen.
532      * ToDo: what if another client comes along & requests another
533      * main thread?
534      */
535     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
536       awaitEvent( EMPTY_RUN_QUEUE()
537 #if defined(SMP)
538         && allFreeCapabilities()
539 #endif
540         );
541     }
542     /* we can be interrupted while waiting for I/O... */
543     if (interrupted) continue;
544
545     /* 
546      * Detect deadlock: when we have no threads to run, there are no
547      * threads waiting on I/O or sleeping, and all the other tasks are
548      * waiting for work, we must have a deadlock of some description.
549      *
550      * We first try to find threads blocked on themselves (ie. black
551      * holes), and generate NonTermination exceptions where necessary.
552      *
553      * If no threads are black holed, we have a deadlock situation, so
554      * inform all the main threads.
555      */
556 #ifndef PAR
557     if (   EMPTY_THREAD_QUEUES()
558 #if defined(RTS_SUPPORTS_THREADS)
559         && EMPTY_QUEUE(suspended_ccalling_threads)
560 #endif
561 #ifdef SMP
562         && allFreeCapabilities()
563 #endif
564         )
565     {
566         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
567 #if defined(THREADED_RTS)
568         /* and SMP mode ..? */
569         releaseCapability(cap);
570 #endif
571         // Garbage collection can release some new threads due to
572         // either (a) finalizers or (b) threads resurrected because
573         // they are about to be send BlockedOnDeadMVar.  Any threads
574         // thus released will be immediately runnable.
575         GarbageCollect(GetRoots,rtsTrue);
576
577         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
578
579         IF_DEBUG(scheduler, 
580                  sched_belch("still deadlocked, checking for black holes..."));
581         detectBlackHoles();
582
583         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
584
585 #ifndef mingw32_TARGET_OS
586         /* If we have user-installed signal handlers, then wait
587          * for signals to arrive rather then bombing out with a
588          * deadlock.
589          */
590         if ( anyUserHandlers() ) {
591             IF_DEBUG(scheduler, 
592                      sched_belch("still deadlocked, waiting for signals..."));
593
594             awaitUserSignals();
595
596             // we might be interrupted...
597             if (interrupted) { continue; }
598
599             if (signals_pending()) {
600                 startSignalHandlers();
601             }
602             ASSERT(!EMPTY_RUN_QUEUE());
603             goto not_deadlocked;
604         }
605 #endif
606
607         /* Probably a real deadlock.  Send the current main thread the
608          * Deadlock exception (or in the SMP build, send *all* main
609          * threads the deadlock exception, since none of them can make
610          * progress).
611          */
612         {
613             StgMainThread *m;
614 #if defined(RTS_SUPPORTS_THREADS)
615             for (m = main_threads; m != NULL; m = m->link) {
616                 switch (m->tso->why_blocked) {
617                 case BlockedOnBlackHole:
618                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
619                     break;
620                 case BlockedOnException:
621                 case BlockedOnMVar:
622                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
623                     break;
624                 default:
625                     barf("deadlock: main thread blocked in a strange way");
626                 }
627             }
628 #else
629             m = main_threads;
630             switch (m->tso->why_blocked) {
631             case BlockedOnBlackHole:
632                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
633                 break;
634             case BlockedOnException:
635             case BlockedOnMVar:
636                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
637                 break;
638             default:
639                 barf("deadlock: main thread blocked in a strange way");
640             }
641 #endif
642         }
643
644 #if defined(RTS_SUPPORTS_THREADS)
645         /* ToDo: revisit conditions (and mechanism) for shutting
646            down a multi-threaded world  */
647         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
648         shutdownHaskellAndExit(0);
649 #endif
650     }
651   not_deadlocked:
652
653 #elif defined(PAR)
654     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
655 #endif
656
657 #if defined(SMP)
658     /* If there's a GC pending, don't do anything until it has
659      * completed.
660      */
661     if (ready_to_gc) {
662       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
663       waitCondition( &gc_pending_cond, &sched_mutex );
664     }
665 #endif    
666
667 #if defined(RTS_SUPPORTS_THREADS)
668     /* block until we've got a thread on the run queue and a free
669      * capability.
670      *
671      */
672     if ( EMPTY_RUN_QUEUE() ) {
673       /* Give up our capability */
674       releaseCapability(cap);
675       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
676       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
677       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
678 #if 0
679       while ( EMPTY_RUN_QUEUE() ) {
680         waitForWorkCapability(&sched_mutex, &cap);
681         IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
682       }
683 #endif
684     }
685 #endif
686
687 #if defined(GRAN)
688     if (RtsFlags.GranFlags.Light)
689       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
690
691     /* adjust time based on time-stamp */
692     if (event->time > CurrentTime[CurrentProc] &&
693         event->evttype != ContinueThread)
694       CurrentTime[CurrentProc] = event->time;
695     
696     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
697     if (!RtsFlags.GranFlags.Light)
698       handleIdlePEs();
699
700     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
701
702     /* main event dispatcher in GranSim */
703     switch (event->evttype) {
704       /* Should just be continuing execution */
705     case ContinueThread:
706       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
707       /* ToDo: check assertion
708       ASSERT(run_queue_hd != (StgTSO*)NULL &&
709              run_queue_hd != END_TSO_QUEUE);
710       */
711       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
712       if (!RtsFlags.GranFlags.DoAsyncFetch &&
713           procStatus[CurrentProc]==Fetching) {
714         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
715               CurrentTSO->id, CurrentTSO, CurrentProc);
716         goto next_thread;
717       } 
718       /* Ignore ContinueThreads for completed threads */
719       if (CurrentTSO->what_next == ThreadComplete) {
720         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
721               CurrentTSO->id, CurrentTSO, CurrentProc);
722         goto next_thread;
723       } 
724       /* Ignore ContinueThreads for threads that are being migrated */
725       if (PROCS(CurrentTSO)==Nowhere) { 
726         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
727               CurrentTSO->id, CurrentTSO, CurrentProc);
728         goto next_thread;
729       }
730       /* The thread should be at the beginning of the run queue */
731       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
732         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
733               CurrentTSO->id, CurrentTSO, CurrentProc);
734         break; // run the thread anyway
735       }
736       /*
737       new_event(proc, proc, CurrentTime[proc],
738                 FindWork,
739                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
740       goto next_thread; 
741       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
742       break; // now actually run the thread; DaH Qu'vam yImuHbej 
743
744     case FetchNode:
745       do_the_fetchnode(event);
746       goto next_thread;             /* handle next event in event queue  */
747       
748     case GlobalBlock:
749       do_the_globalblock(event);
750       goto next_thread;             /* handle next event in event queue  */
751       
752     case FetchReply:
753       do_the_fetchreply(event);
754       goto next_thread;             /* handle next event in event queue  */
755       
756     case UnblockThread:   /* Move from the blocked queue to the tail of */
757       do_the_unblock(event);
758       goto next_thread;             /* handle next event in event queue  */
759       
760     case ResumeThread:  /* Move from the blocked queue to the tail of */
761       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
762       event->tso->gran.blocktime += 
763         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
764       do_the_startthread(event);
765       goto next_thread;             /* handle next event in event queue  */
766       
767     case StartThread:
768       do_the_startthread(event);
769       goto next_thread;             /* handle next event in event queue  */
770       
771     case MoveThread:
772       do_the_movethread(event);
773       goto next_thread;             /* handle next event in event queue  */
774       
775     case MoveSpark:
776       do_the_movespark(event);
777       goto next_thread;             /* handle next event in event queue  */
778       
779     case FindWork:
780       do_the_findwork(event);
781       goto next_thread;             /* handle next event in event queue  */
782       
783     default:
784       barf("Illegal event type %u\n", event->evttype);
785     }  /* switch */
786     
787     /* This point was scheduler_loop in the old RTS */
788
789     IF_DEBUG(gran, belch("GRAN: after main switch"));
790
791     TimeOfLastEvent = CurrentTime[CurrentProc];
792     TimeOfNextEvent = get_time_of_next_event();
793     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
794     // CurrentTSO = ThreadQueueHd;
795
796     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
797                          TimeOfNextEvent));
798
799     if (RtsFlags.GranFlags.Light) 
800       GranSimLight_leave_system(event, &ActiveTSO); 
801
802     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
803
804     IF_DEBUG(gran, 
805              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
806
807     /* in a GranSim setup the TSO stays on the run queue */
808     t = CurrentTSO;
809     /* Take a thread from the run queue. */
810     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
811
812     IF_DEBUG(gran, 
813              fprintf(stderr, "GRAN: About to run current thread, which is\n");
814              G_TSO(t,5));
815
816     context_switch = 0; // turned on via GranYield, checking events and time slice
817
818     IF_DEBUG(gran, 
819              DumpGranEvent(GR_SCHEDULE, t));
820
821     procStatus[CurrentProc] = Busy;
822
823 #elif defined(PAR)
824     if (PendingFetches != END_BF_QUEUE) {
825         processFetches();
826     }
827
828     /* ToDo: phps merge with spark activation above */
829     /* check whether we have local work and send requests if we have none */
830     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
831       /* :-[  no local threads => look out for local sparks */
832       /* the spark pool for the current PE */
833       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
834       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
835           pool->hd < pool->tl) {
836         /* 
837          * ToDo: add GC code check that we really have enough heap afterwards!!
838          * Old comment:
839          * If we're here (no runnable threads) and we have pending
840          * sparks, we must have a space problem.  Get enough space
841          * to turn one of those pending sparks into a
842          * thread... 
843          */
844
845         spark = findSpark(rtsFalse);                /* get a spark */
846         if (spark != (rtsSpark) NULL) {
847           tso = activateSpark(spark);       /* turn the spark into a thread */
848           IF_PAR_DEBUG(schedule,
849                        belch("==== schedule: Created TSO %d (%p); %d threads active",
850                              tso->id, tso, advisory_thread_count));
851
852           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
853             belch("==^^ failed to activate spark");
854             goto next_thread;
855           }               /* otherwise fall through & pick-up new tso */
856         } else {
857           IF_PAR_DEBUG(verbose,
858                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
859                              spark_queue_len(pool)));
860           goto next_thread;
861         }
862       }
863
864       /* If we still have no work we need to send a FISH to get a spark
865          from another PE 
866       */
867       if (EMPTY_RUN_QUEUE()) {
868       /* =8-[  no local sparks => look for work on other PEs */
869         /*
870          * We really have absolutely no work.  Send out a fish
871          * (there may be some out there already), and wait for
872          * something to arrive.  We clearly can't run any threads
873          * until a SCHEDULE or RESUME arrives, and so that's what
874          * we're hoping to see.  (Of course, we still have to
875          * respond to other types of messages.)
876          */
877         TIME now = msTime() /*CURRENT_TIME*/;
878         IF_PAR_DEBUG(verbose, 
879                      belch("--  now=%ld", now));
880         IF_PAR_DEBUG(verbose,
881                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
882                          (last_fish_arrived_at!=0 &&
883                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
884                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
885                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
886                              last_fish_arrived_at,
887                              RtsFlags.ParFlags.fishDelay, now);
888                      });
889         
890         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
891             (last_fish_arrived_at==0 ||
892              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
893           /* outstandingFishes is set in sendFish, processFish;
894              avoid flooding system with fishes via delay */
895           pe = choosePE();
896           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
897                    NEW_FISH_HUNGER);
898
899           // Global statistics: count no. of fishes
900           if (RtsFlags.ParFlags.ParStats.Global &&
901               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
902             globalParStats.tot_fish_mess++;
903           }
904         }
905       
906         receivedFinish = processMessages();
907         goto next_thread;
908       }
909     } else if (PacketsWaiting()) {  /* Look for incoming messages */
910       receivedFinish = processMessages();
911     }
912
913     /* Now we are sure that we have some work available */
914     ASSERT(run_queue_hd != END_TSO_QUEUE);
915
916     /* Take a thread from the run queue, if we have work */
917     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
918     IF_DEBUG(sanity,checkTSO(t));
919
920     /* ToDo: write something to the log-file
921     if (RTSflags.ParFlags.granSimStats && !sameThread)
922         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
923
924     CurrentTSO = t;
925     */
926     /* the spark pool for the current PE */
927     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
928
929     IF_DEBUG(scheduler, 
930              belch("--=^ %d threads, %d sparks on [%#x]", 
931                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
932
933 # if 1
934     if (0 && RtsFlags.ParFlags.ParStats.Full && 
935         t && LastTSO && t->id != LastTSO->id && 
936         LastTSO->why_blocked == NotBlocked && 
937         LastTSO->what_next != ThreadComplete) {
938       // if previously scheduled TSO not blocked we have to record the context switch
939       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
940                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
941     }
942
943     if (RtsFlags.ParFlags.ParStats.Full && 
944         (emitSchedule /* forced emit */ ||
945         (t && LastTSO && t->id != LastTSO->id))) {
946       /* 
947          we are running a different TSO, so write a schedule event to log file
948          NB: If we use fair scheduling we also have to write  a deschedule 
949              event for LastTSO; with unfair scheduling we know that the
950              previous tso has blocked whenever we switch to another tso, so
951              we don't need it in GUM for now
952       */
953       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
954                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
955       emitSchedule = rtsFalse;
956     }
957      
958 # endif
959 #else /* !GRAN && !PAR */
960   
961     /* grab a thread from the run queue */
962     ASSERT(run_queue_hd != END_TSO_QUEUE);
963     t = POP_RUN_QUEUE();
964     // Sanity check the thread we're about to run.  This can be
965     // expensive if there is lots of thread switching going on...
966     IF_DEBUG(sanity,checkTSO(t));
967 #endif
968     
969     grabCapability(&cap);
970     cap->r.rCurrentTSO = t;
971     
972     /* context switches are now initiated by the timer signal, unless
973      * the user specified "context switch as often as possible", with
974      * +RTS -C0
975      */
976     if (
977 #ifdef PROFILING
978         RtsFlags.ProfFlags.profileInterval == 0 ||
979 #endif
980         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
981          && (run_queue_hd != END_TSO_QUEUE
982              || blocked_queue_hd != END_TSO_QUEUE
983              || sleeping_queue != END_TSO_QUEUE)))
984         context_switch = 1;
985     else
986         context_switch = 0;
987
988     RELEASE_LOCK(&sched_mutex);
989
990     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
991                               t->id, t, whatNext_strs[t->what_next]));
992
993 #ifdef PROFILING
994     startHeapProfTimer();
995 #endif
996
997     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
998     /* Run the current thread 
999      */
1000     switch (cap->r.rCurrentTSO->what_next) {
1001     case ThreadKilled:
1002     case ThreadComplete:
1003         /* Thread already finished, return to scheduler. */
1004         ret = ThreadFinished;
1005         break;
1006     case ThreadEnterGHC:
1007         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1008         break;
1009     case ThreadRunGHC:
1010         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1011         break;
1012     case ThreadEnterInterp:
1013         ret = interpretBCO(cap);
1014         break;
1015     default:
1016       barf("schedule: invalid what_next field");
1017     }
1018     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1019     
1020     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1021 #ifdef PROFILING
1022     stopHeapProfTimer();
1023     CCCS = CCS_SYSTEM;
1024 #endif
1025     
1026     ACQUIRE_LOCK(&sched_mutex);
1027
1028 #ifdef SMP
1029     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1030 #elif !defined(GRAN) && !defined(PAR)
1031     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1032 #endif
1033     t = cap->r.rCurrentTSO;
1034     
1035 #if defined(PAR)
1036     /* HACK 675: if the last thread didn't yield, make sure to print a 
1037        SCHEDULE event to the log file when StgRunning the next thread, even
1038        if it is the same one as before */
1039     LastTSO = t; 
1040     TimeOfLastYield = CURRENT_TIME;
1041 #endif
1042
1043     switch (ret) {
1044     case HeapOverflow:
1045 #if defined(GRAN)
1046       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1047       globalGranStats.tot_heapover++;
1048 #elif defined(PAR)
1049       globalParStats.tot_heapover++;
1050 #endif
1051
1052       // did the task ask for a large block?
1053       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1054           // if so, get one and push it on the front of the nursery.
1055           bdescr *bd;
1056           nat blocks;
1057           
1058           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1059
1060           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1061                                    t->id, t,
1062                                    whatNext_strs[t->what_next], blocks));
1063
1064           // don't do this if it would push us over the
1065           // alloc_blocks_lim limit; we'll GC first.
1066           if (alloc_blocks + blocks < alloc_blocks_lim) {
1067
1068               alloc_blocks += blocks;
1069               bd = allocGroup( blocks );
1070
1071               // link the new group into the list
1072               bd->link = cap->r.rCurrentNursery;
1073               bd->u.back = cap->r.rCurrentNursery->u.back;
1074               if (cap->r.rCurrentNursery->u.back != NULL) {
1075                   cap->r.rCurrentNursery->u.back->link = bd;
1076               } else {
1077                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1078                          g0s0->blocks == cap->r.rNursery);
1079                   cap->r.rNursery = g0s0->blocks = bd;
1080               }           
1081               cap->r.rCurrentNursery->u.back = bd;
1082
1083               // initialise it as a nursery block
1084               bd->step = g0s0;
1085               bd->gen_no = 0;
1086               bd->flags = 0;
1087               bd->free = bd->start;
1088
1089               // don't forget to update the block count in g0s0.
1090               g0s0->n_blocks += blocks;
1091               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1092
1093               // now update the nursery to point to the new block
1094               cap->r.rCurrentNursery = bd;
1095
1096               // we might be unlucky and have another thread get on the
1097               // run queue before us and steal the large block, but in that
1098               // case the thread will just end up requesting another large
1099               // block.
1100               PUSH_ON_RUN_QUEUE(t);
1101               break;
1102           }
1103       }
1104
1105       /* make all the running tasks block on a condition variable,
1106        * maybe set context_switch and wait till they all pile in,
1107        * then have them wait on a GC condition variable.
1108        */
1109       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1110                                t->id, t, whatNext_strs[t->what_next]));
1111       threadPaused(t);
1112 #if defined(GRAN)
1113       ASSERT(!is_on_queue(t,CurrentProc));
1114 #elif defined(PAR)
1115       /* Currently we emit a DESCHEDULE event before GC in GUM.
1116          ToDo: either add separate event to distinguish SYSTEM time from rest
1117                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1118       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1119         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1120                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1121         emitSchedule = rtsTrue;
1122       }
1123 #endif
1124       
1125       ready_to_gc = rtsTrue;
1126       context_switch = 1;               /* stop other threads ASAP */
1127       PUSH_ON_RUN_QUEUE(t);
1128       /* actual GC is done at the end of the while loop */
1129       break;
1130       
1131     case StackOverflow:
1132 #if defined(GRAN)
1133       IF_DEBUG(gran, 
1134                DumpGranEvent(GR_DESCHEDULE, t));
1135       globalGranStats.tot_stackover++;
1136 #elif defined(PAR)
1137       // IF_DEBUG(par, 
1138       // DumpGranEvent(GR_DESCHEDULE, t);
1139       globalParStats.tot_stackover++;
1140 #endif
1141       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1142                                t->id, t, whatNext_strs[t->what_next]));
1143       /* just adjust the stack for this thread, then pop it back
1144        * on the run queue.
1145        */
1146       threadPaused(t);
1147       { 
1148         StgMainThread *m;
1149         /* enlarge the stack */
1150         StgTSO *new_t = threadStackOverflow(t);
1151         
1152         /* This TSO has moved, so update any pointers to it from the
1153          * main thread stack.  It better not be on any other queues...
1154          * (it shouldn't be).
1155          */
1156         for (m = main_threads; m != NULL; m = m->link) {
1157           if (m->tso == t) {
1158             m->tso = new_t;
1159           }
1160         }
1161         threadPaused(new_t);
1162         PUSH_ON_RUN_QUEUE(new_t);
1163       }
1164       break;
1165
1166     case ThreadYielding:
1167 #if defined(GRAN)
1168       IF_DEBUG(gran, 
1169                DumpGranEvent(GR_DESCHEDULE, t));
1170       globalGranStats.tot_yields++;
1171 #elif defined(PAR)
1172       // IF_DEBUG(par, 
1173       // DumpGranEvent(GR_DESCHEDULE, t);
1174       globalParStats.tot_yields++;
1175 #endif
1176       /* put the thread back on the run queue.  Then, if we're ready to
1177        * GC, check whether this is the last task to stop.  If so, wake
1178        * up the GC thread.  getThread will block during a GC until the
1179        * GC is finished.
1180        */
1181       IF_DEBUG(scheduler,
1182                if (t->what_next == ThreadEnterInterp) {
1183                    /* ToDo: or maybe a timer expired when we were in Hugs?
1184                     * or maybe someone hit ctrl-C
1185                     */
1186                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1187                          t->id, t, whatNext_strs[t->what_next]);
1188                } else {
1189                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1190                          t->id, t, whatNext_strs[t->what_next]);
1191                }
1192                );
1193
1194       threadPaused(t);
1195
1196       IF_DEBUG(sanity,
1197                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1198                checkTSO(t));
1199       ASSERT(t->link == END_TSO_QUEUE);
1200 #if defined(GRAN)
1201       ASSERT(!is_on_queue(t,CurrentProc));
1202
1203       IF_DEBUG(sanity,
1204                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1205                checkThreadQsSanity(rtsTrue));
1206 #endif
1207 #if defined(PAR)
1208       if (RtsFlags.ParFlags.doFairScheduling) { 
1209         /* this does round-robin scheduling; good for concurrency */
1210         APPEND_TO_RUN_QUEUE(t);
1211       } else {
1212         /* this does unfair scheduling; good for parallelism */
1213         PUSH_ON_RUN_QUEUE(t);
1214       }
1215 #else
1216       /* this does round-robin scheduling; good for concurrency */
1217       APPEND_TO_RUN_QUEUE(t);
1218 #endif
1219 #if defined(GRAN)
1220       /* add a ContinueThread event to actually process the thread */
1221       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1222                 ContinueThread,
1223                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1224       IF_GRAN_DEBUG(bq, 
1225                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1226                G_EVENTQ(0);
1227                G_CURR_THREADQ(0));
1228 #endif /* GRAN */
1229       break;
1230       
1231     case ThreadBlocked:
1232 #if defined(GRAN)
1233       IF_DEBUG(scheduler,
1234                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1235                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1236                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1237
1238       // ??? needed; should emit block before
1239       IF_DEBUG(gran, 
1240                DumpGranEvent(GR_DESCHEDULE, t)); 
1241       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1242       /*
1243         ngoq Dogh!
1244       ASSERT(procStatus[CurrentProc]==Busy || 
1245               ((procStatus[CurrentProc]==Fetching) && 
1246               (t->block_info.closure!=(StgClosure*)NULL)));
1247       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1248           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1249             procStatus[CurrentProc]==Fetching)) 
1250         procStatus[CurrentProc] = Idle;
1251       */
1252 #elif defined(PAR)
1253       IF_DEBUG(scheduler,
1254                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1255                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1256       IF_PAR_DEBUG(bq,
1257
1258                    if (t->block_info.closure!=(StgClosure*)NULL) 
1259                      print_bq(t->block_info.closure));
1260
1261       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1262       blockThread(t);
1263
1264       /* whatever we schedule next, we must log that schedule */
1265       emitSchedule = rtsTrue;
1266
1267 #else /* !GRAN */
1268       /* don't need to do anything.  Either the thread is blocked on
1269        * I/O, in which case we'll have called addToBlockedQueue
1270        * previously, or it's blocked on an MVar or Blackhole, in which
1271        * case it'll be on the relevant queue already.
1272        */
1273       IF_DEBUG(scheduler,
1274                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1275                printThreadBlockage(t);
1276                fprintf(stderr, "\n"));
1277
1278       /* Only for dumping event to log file 
1279          ToDo: do I need this in GranSim, too?
1280       blockThread(t);
1281       */
1282 #endif
1283       threadPaused(t);
1284       break;
1285       
1286     case ThreadFinished:
1287       /* Need to check whether this was a main thread, and if so, signal
1288        * the task that started it with the return value.  If we have no
1289        * more main threads, we probably need to stop all the tasks until
1290        * we get a new one.
1291        */
1292       /* We also end up here if the thread kills itself with an
1293        * uncaught exception, see Exception.hc.
1294        */
1295       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1296 #if defined(GRAN)
1297       endThread(t, CurrentProc); // clean-up the thread
1298 #elif defined(PAR)
1299       /* For now all are advisory -- HWL */
1300       //if(t->priority==AdvisoryPriority) ??
1301       advisory_thread_count--;
1302       
1303 # ifdef DIST
1304       if(t->dist.priority==RevalPriority)
1305         FinishReval(t);
1306 # endif
1307       
1308       if (RtsFlags.ParFlags.ParStats.Full &&
1309           !RtsFlags.ParFlags.ParStats.Suppressed) 
1310         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1311 #endif
1312       break;
1313       
1314     default:
1315       barf("schedule: invalid thread return code %d", (int)ret);
1316     }
1317     
1318 #if defined(RTS_SUPPORTS_THREADS)
1319     /* I don't understand what this re-grab is doing -- sof */
1320     grabCapability(&cap);
1321 #endif
1322
1323 #ifdef PROFILING
1324     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1325         GarbageCollect(GetRoots, rtsTrue);
1326         heapCensus();
1327         performHeapProfile = rtsFalse;
1328         ready_to_gc = rtsFalse; // we already GC'd
1329     }
1330 #endif
1331
1332     if (ready_to_gc 
1333 #ifdef SMP
1334         && allFreeCapabilities() 
1335 #endif
1336         ) {
1337       /* everybody back, start the GC.
1338        * Could do it in this thread, or signal a condition var
1339        * to do it in another thread.  Either way, we need to
1340        * broadcast on gc_pending_cond afterward.
1341        */
1342 #if defined(RTS_SUPPORTS_THREADS)
1343       IF_DEBUG(scheduler,sched_belch("doing GC"));
1344 #endif
1345       GarbageCollect(GetRoots,rtsFalse);
1346       ready_to_gc = rtsFalse;
1347 #ifdef SMP
1348       broadcastCondition(&gc_pending_cond);
1349 #endif
1350 #if defined(GRAN)
1351       /* add a ContinueThread event to continue execution of current thread */
1352       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1353                 ContinueThread,
1354                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1355       IF_GRAN_DEBUG(bq, 
1356                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1357                G_EVENTQ(0);
1358                G_CURR_THREADQ(0));
1359 #endif /* GRAN */
1360     }
1361
1362 #if defined(GRAN)
1363   next_thread:
1364     IF_GRAN_DEBUG(unused,
1365                   print_eventq(EventHd));
1366
1367     event = get_next_event();
1368 #elif defined(PAR)
1369   next_thread:
1370     /* ToDo: wait for next message to arrive rather than busy wait */
1371 #endif /* GRAN */
1372
1373   } /* end of while(1) */
1374
1375   IF_PAR_DEBUG(verbose,
1376                belch("== Leaving schedule() after having received Finish"));
1377 }
1378
1379 /* ---------------------------------------------------------------------------
1380  * deleteAllThreads():  kill all the live threads.
1381  *
1382  * This is used when we catch a user interrupt (^C), before performing
1383  * any necessary cleanups and running finalizers.
1384  * ------------------------------------------------------------------------- */
1385    
1386 void deleteAllThreads ( void )
1387 {
1388   StgTSO* t, *next;
1389   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1390   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1391       next = t->global_link;
1392       deleteThread(t);
1393   }      
1394   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1395   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1396   sleeping_queue = END_TSO_QUEUE;
1397 }
1398
1399 /* startThread and  insertThread are now in GranSim.c -- HWL */
1400
1401
1402 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1403 //@subsection Suspend and Resume
1404
1405 /* ---------------------------------------------------------------------------
1406  * Suspending & resuming Haskell threads.
1407  * 
1408  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1409  * its capability before calling the C function.  This allows another
1410  * task to pick up the capability and carry on running Haskell
1411  * threads.  It also means that if the C call blocks, it won't lock
1412  * the whole system.
1413  *
1414  * The Haskell thread making the C call is put to sleep for the
1415  * duration of the call, on the susepended_ccalling_threads queue.  We
1416  * give out a token to the task, which it can use to resume the thread
1417  * on return from the C function.
1418  * ------------------------------------------------------------------------- */
1419    
1420 StgInt
1421 suspendThread( StgRegTable *reg, 
1422                rtsBool concCall
1423 #if !defined(RTS_SUPPORTS_THREADS)
1424                STG_UNUSED
1425 #endif
1426                )
1427 {
1428   nat tok;
1429   Capability *cap;
1430
1431   /* assume that *reg is a pointer to the StgRegTable part
1432    * of a Capability.
1433    */
1434   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1435
1436   ACQUIRE_LOCK(&sched_mutex);
1437
1438   IF_DEBUG(scheduler,
1439            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1440
1441   threadPaused(cap->r.rCurrentTSO);
1442   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1443   suspended_ccalling_threads = cap->r.rCurrentTSO;
1444
1445 #if defined(RTS_SUPPORTS_THREADS)
1446   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1447 #endif
1448
1449   /* Use the thread ID as the token; it should be unique */
1450   tok = cap->r.rCurrentTSO->id;
1451
1452   /* Hand back capability */
1453   releaseCapability(cap);
1454   
1455 #if defined(RTS_SUPPORTS_THREADS)
1456   /* Preparing to leave the RTS, so ensure there's a native thread/task
1457      waiting to take over.
1458      
1459      ToDo: optimise this and only create a new task if there's a need
1460      for one (i.e., if there's only one Concurrent Haskell thread alive,
1461      there's no need to create a new task).
1462   */
1463   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1464   if (concCall) {
1465     startTask(taskStart);
1466   }
1467 #endif
1468
1469   /* Other threads _might_ be available for execution; signal this */
1470   THREAD_RUNNABLE();
1471   RELEASE_LOCK(&sched_mutex);
1472   return tok; 
1473 }
1474
1475 StgRegTable *
1476 resumeThread( StgInt tok,
1477               rtsBool concCall
1478 #if !defined(RTS_SUPPORTS_THREADS)
1479                STG_UNUSED
1480 #endif
1481               )
1482 {
1483   StgTSO *tso, **prev;
1484   Capability *cap;
1485
1486 #if defined(RTS_SUPPORTS_THREADS)
1487   /* Wait for permission to re-enter the RTS with the result. */
1488   if ( concCall ) {
1489     grabReturnCapability(&sched_mutex, &cap);
1490   } else {
1491     grabCapability(&cap);
1492   }
1493 #else
1494   grabCapability(&cap);
1495 #endif
1496
1497   /* Remove the thread off of the suspended list */
1498   prev = &suspended_ccalling_threads;
1499   for (tso = suspended_ccalling_threads; 
1500        tso != END_TSO_QUEUE; 
1501        prev = &tso->link, tso = tso->link) {
1502     if (tso->id == (StgThreadID)tok) {
1503       *prev = tso->link;
1504       break;
1505     }
1506   }
1507   if (tso == END_TSO_QUEUE) {
1508     barf("resumeThread: thread not found");
1509   }
1510   tso->link = END_TSO_QUEUE;
1511   /* Reset blocking status */
1512   tso->why_blocked  = NotBlocked;
1513
1514   RELEASE_LOCK(&sched_mutex);
1515
1516   cap->r.rCurrentTSO = tso;
1517   return &cap->r;
1518 }
1519
1520
1521 /* ---------------------------------------------------------------------------
1522  * Static functions
1523  * ------------------------------------------------------------------------ */
1524 static void unblockThread(StgTSO *tso);
1525
1526 /* ---------------------------------------------------------------------------
1527  * Comparing Thread ids.
1528  *
1529  * This is used from STG land in the implementation of the
1530  * instances of Eq/Ord for ThreadIds.
1531  * ------------------------------------------------------------------------ */
1532
1533 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1534
1535   StgThreadID id1 = tso1->id; 
1536   StgThreadID id2 = tso2->id;
1537  
1538   if (id1 < id2) return (-1);
1539   if (id1 > id2) return 1;
1540   return 0;
1541 }
1542
1543 /* ---------------------------------------------------------------------------
1544  * Fetching the ThreadID from an StgTSO.
1545  *
1546  * This is used in the implementation of Show for ThreadIds.
1547  * ------------------------------------------------------------------------ */
1548 int rts_getThreadId(const StgTSO *tso) 
1549 {
1550   return tso->id;
1551 }
1552
1553 /* ---------------------------------------------------------------------------
1554    Create a new thread.
1555
1556    The new thread starts with the given stack size.  Before the
1557    scheduler can run, however, this thread needs to have a closure
1558    (and possibly some arguments) pushed on its stack.  See
1559    pushClosure() in Schedule.h.
1560
1561    createGenThread() and createIOThread() (in SchedAPI.h) are
1562    convenient packaged versions of this function.
1563
1564    currently pri (priority) is only used in a GRAN setup -- HWL
1565    ------------------------------------------------------------------------ */
1566 //@cindex createThread
1567 #if defined(GRAN)
1568 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1569 StgTSO *
1570 createThread(nat stack_size, StgInt pri)
1571 {
1572   return createThread_(stack_size, rtsFalse, pri);
1573 }
1574
1575 static StgTSO *
1576 createThread_(nat size, rtsBool have_lock, StgInt pri)
1577 {
1578 #else
1579 StgTSO *
1580 createThread(nat stack_size)
1581 {
1582   return createThread_(stack_size, rtsFalse);
1583 }
1584
1585 static StgTSO *
1586 createThread_(nat size, rtsBool have_lock)
1587 {
1588 #endif
1589
1590     StgTSO *tso;
1591     nat stack_size;
1592
1593     /* First check whether we should create a thread at all */
1594 #if defined(PAR)
1595   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1596   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1597     threadsIgnored++;
1598     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1599           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1600     return END_TSO_QUEUE;
1601   }
1602   threadsCreated++;
1603 #endif
1604
1605 #if defined(GRAN)
1606   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1607 #endif
1608
1609   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1610
1611   /* catch ridiculously small stack sizes */
1612   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1613     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1614   }
1615
1616   stack_size = size - TSO_STRUCT_SIZEW;
1617
1618   tso = (StgTSO *)allocate(size);
1619   TICK_ALLOC_TSO(stack_size, 0);
1620
1621   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1622 #if defined(GRAN)
1623   SET_GRAN_HDR(tso, ThisPE);
1624 #endif
1625   tso->what_next     = ThreadEnterGHC;
1626
1627   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1628    * protect the increment operation on next_thread_id.
1629    * In future, we could use an atomic increment instead.
1630    */
1631   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1632   tso->id = next_thread_id++; 
1633   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1634
1635   tso->why_blocked  = NotBlocked;
1636   tso->blocked_exceptions = NULL;
1637
1638   tso->stack_size   = stack_size;
1639   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1640                               - TSO_STRUCT_SIZEW;
1641   tso->sp           = (P_)&(tso->stack) + stack_size;
1642
1643 #ifdef PROFILING
1644   tso->prof.CCCS = CCS_MAIN;
1645 #endif
1646
1647   /* put a stop frame on the stack */
1648   tso->sp -= sizeofW(StgStopFrame);
1649   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1650   tso->su = (StgUpdateFrame*)tso->sp;
1651
1652   // ToDo: check this
1653 #if defined(GRAN)
1654   tso->link = END_TSO_QUEUE;
1655   /* uses more flexible routine in GranSim */
1656   insertThread(tso, CurrentProc);
1657 #else
1658   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1659    * from its creation
1660    */
1661 #endif
1662
1663 #if defined(GRAN) 
1664   if (RtsFlags.GranFlags.GranSimStats.Full) 
1665     DumpGranEvent(GR_START,tso);
1666 #elif defined(PAR)
1667   if (RtsFlags.ParFlags.ParStats.Full) 
1668     DumpGranEvent(GR_STARTQ,tso);
1669   /* HACk to avoid SCHEDULE 
1670      LastTSO = tso; */
1671 #endif
1672
1673   /* Link the new thread on the global thread list.
1674    */
1675   tso->global_link = all_threads;
1676   all_threads = tso;
1677
1678 #if defined(DIST)
1679   tso->dist.priority = MandatoryPriority; //by default that is...
1680 #endif
1681
1682 #if defined(GRAN)
1683   tso->gran.pri = pri;
1684 # if defined(DEBUG)
1685   tso->gran.magic = TSO_MAGIC; // debugging only
1686 # endif
1687   tso->gran.sparkname   = 0;
1688   tso->gran.startedat   = CURRENT_TIME; 
1689   tso->gran.exported    = 0;
1690   tso->gran.basicblocks = 0;
1691   tso->gran.allocs      = 0;
1692   tso->gran.exectime    = 0;
1693   tso->gran.fetchtime   = 0;
1694   tso->gran.fetchcount  = 0;
1695   tso->gran.blocktime   = 0;
1696   tso->gran.blockcount  = 0;
1697   tso->gran.blockedat   = 0;
1698   tso->gran.globalsparks = 0;
1699   tso->gran.localsparks  = 0;
1700   if (RtsFlags.GranFlags.Light)
1701     tso->gran.clock  = Now; /* local clock */
1702   else
1703     tso->gran.clock  = 0;
1704
1705   IF_DEBUG(gran,printTSO(tso));
1706 #elif defined(PAR)
1707 # if defined(DEBUG)
1708   tso->par.magic = TSO_MAGIC; // debugging only
1709 # endif
1710   tso->par.sparkname   = 0;
1711   tso->par.startedat   = CURRENT_TIME; 
1712   tso->par.exported    = 0;
1713   tso->par.basicblocks = 0;
1714   tso->par.allocs      = 0;
1715   tso->par.exectime    = 0;
1716   tso->par.fetchtime   = 0;
1717   tso->par.fetchcount  = 0;
1718   tso->par.blocktime   = 0;
1719   tso->par.blockcount  = 0;
1720   tso->par.blockedat   = 0;
1721   tso->par.globalsparks = 0;
1722   tso->par.localsparks  = 0;
1723 #endif
1724
1725 #if defined(GRAN)
1726   globalGranStats.tot_threads_created++;
1727   globalGranStats.threads_created_on_PE[CurrentProc]++;
1728   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1729   globalGranStats.tot_sq_probes++;
1730 #elif defined(PAR)
1731   // collect parallel global statistics (currently done together with GC stats)
1732   if (RtsFlags.ParFlags.ParStats.Global &&
1733       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1734     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1735     globalParStats.tot_threads_created++;
1736   }
1737 #endif 
1738
1739 #if defined(GRAN)
1740   IF_GRAN_DEBUG(pri,
1741                 belch("==__ schedule: Created TSO %d (%p);",
1742                       CurrentProc, tso, tso->id));
1743 #elif defined(PAR)
1744     IF_PAR_DEBUG(verbose,
1745                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1746                        tso->id, tso, advisory_thread_count));
1747 #else
1748   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1749                                  tso->id, tso->stack_size));
1750 #endif    
1751   return tso;
1752 }
1753
1754 #if defined(PAR)
1755 /* RFP:
1756    all parallel thread creation calls should fall through the following routine.
1757 */
1758 StgTSO *
1759 createSparkThread(rtsSpark spark) 
1760 { StgTSO *tso;
1761   ASSERT(spark != (rtsSpark)NULL);
1762   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1763   { threadsIgnored++;
1764     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1765           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1766     return END_TSO_QUEUE;
1767   }
1768   else
1769   { threadsCreated++;
1770     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1771     if (tso==END_TSO_QUEUE)     
1772       barf("createSparkThread: Cannot create TSO");
1773 #if defined(DIST)
1774     tso->priority = AdvisoryPriority;
1775 #endif
1776     pushClosure(tso,spark);
1777     PUSH_ON_RUN_QUEUE(tso);
1778     advisory_thread_count++;    
1779   }
1780   return tso;
1781 }
1782 #endif
1783
1784 /*
1785   Turn a spark into a thread.
1786   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1787 */
1788 #if defined(PAR)
1789 //@cindex activateSpark
1790 StgTSO *
1791 activateSpark (rtsSpark spark) 
1792 {
1793   StgTSO *tso;
1794
1795   tso = createSparkThread(spark);
1796   if (RtsFlags.ParFlags.ParStats.Full) {   
1797     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1798     IF_PAR_DEBUG(verbose,
1799                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1800                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1801   }
1802   // ToDo: fwd info on local/global spark to thread -- HWL
1803   // tso->gran.exported =  spark->exported;
1804   // tso->gran.locked =   !spark->global;
1805   // tso->gran.sparkname = spark->name;
1806
1807   return tso;
1808 }
1809 #endif
1810
1811 /* ---------------------------------------------------------------------------
1812  * scheduleThread()
1813  *
1814  * scheduleThread puts a thread on the head of the runnable queue.
1815  * This will usually be done immediately after a thread is created.
1816  * The caller of scheduleThread must create the thread using e.g.
1817  * createThread and push an appropriate closure
1818  * on this thread's stack before the scheduler is invoked.
1819  * ------------------------------------------------------------------------ */
1820
1821 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1822
1823 void
1824 scheduleThread_(StgTSO *tso
1825                , rtsBool createTask
1826 #if !defined(THREADED_RTS)
1827                  STG_UNUSED
1828 #endif
1829               )
1830 {
1831   ACQUIRE_LOCK(&sched_mutex);
1832
1833   /* Put the new thread on the head of the runnable queue.  The caller
1834    * better push an appropriate closure on this thread's stack
1835    * beforehand.  In the SMP case, the thread may start running as
1836    * soon as we release the scheduler lock below.
1837    */
1838   PUSH_ON_RUN_QUEUE(tso);
1839 #if defined(THREADED_RTS)
1840   /* If main() is scheduling a thread, don't bother creating a 
1841    * new task.
1842    */
1843   if ( createTask ) {
1844     startTask(taskStart);
1845   }
1846 #endif
1847   THREAD_RUNNABLE();
1848
1849 #if 0
1850   IF_DEBUG(scheduler,printTSO(tso));
1851 #endif
1852   RELEASE_LOCK(&sched_mutex);
1853 }
1854
1855 void scheduleThread(StgTSO* tso)
1856 {
1857   return scheduleThread_(tso, rtsFalse);
1858 }
1859
1860 void scheduleExtThread(StgTSO* tso)
1861 {
1862   return scheduleThread_(tso, rtsTrue);
1863 }
1864
1865 /* ---------------------------------------------------------------------------
1866  * initScheduler()
1867  *
1868  * Initialise the scheduler.  This resets all the queues - if the
1869  * queues contained any threads, they'll be garbage collected at the
1870  * next pass.
1871  *
1872  * ------------------------------------------------------------------------ */
1873
1874 #ifdef SMP
1875 static void
1876 term_handler(int sig STG_UNUSED)
1877 {
1878   stat_workerStop();
1879   ACQUIRE_LOCK(&term_mutex);
1880   await_death--;
1881   RELEASE_LOCK(&term_mutex);
1882   shutdownThread();
1883 }
1884 #endif
1885
1886 void 
1887 initScheduler(void)
1888 {
1889 #if defined(GRAN)
1890   nat i;
1891
1892   for (i=0; i<=MAX_PROC; i++) {
1893     run_queue_hds[i]      = END_TSO_QUEUE;
1894     run_queue_tls[i]      = END_TSO_QUEUE;
1895     blocked_queue_hds[i]  = END_TSO_QUEUE;
1896     blocked_queue_tls[i]  = END_TSO_QUEUE;
1897     ccalling_threadss[i]  = END_TSO_QUEUE;
1898     sleeping_queue        = END_TSO_QUEUE;
1899   }
1900 #else
1901   run_queue_hd      = END_TSO_QUEUE;
1902   run_queue_tl      = END_TSO_QUEUE;
1903   blocked_queue_hd  = END_TSO_QUEUE;
1904   blocked_queue_tl  = END_TSO_QUEUE;
1905   sleeping_queue    = END_TSO_QUEUE;
1906 #endif 
1907
1908   suspended_ccalling_threads  = END_TSO_QUEUE;
1909
1910   main_threads = NULL;
1911   all_threads  = END_TSO_QUEUE;
1912
1913   context_switch = 0;
1914   interrupted    = 0;
1915
1916   RtsFlags.ConcFlags.ctxtSwitchTicks =
1917       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1918       
1919 #if defined(RTS_SUPPORTS_THREADS)
1920   /* Initialise the mutex and condition variables used by
1921    * the scheduler. */
1922   initMutex(&sched_mutex);
1923   initMutex(&term_mutex);
1924
1925   initCondition(&thread_ready_cond);
1926 #endif
1927   
1928 #if defined(SMP)
1929   initCondition(&gc_pending_cond);
1930 #endif
1931
1932 #if defined(RTS_SUPPORTS_THREADS)
1933   ACQUIRE_LOCK(&sched_mutex);
1934 #endif
1935
1936   /* Install the SIGHUP handler */
1937 #if defined(SMP)
1938   {
1939     struct sigaction action,oact;
1940
1941     action.sa_handler = term_handler;
1942     sigemptyset(&action.sa_mask);
1943     action.sa_flags = 0;
1944     if (sigaction(SIGTERM, &action, &oact) != 0) {
1945       barf("can't install TERM handler");
1946     }
1947   }
1948 #endif
1949
1950   /* A capability holds the state a native thread needs in
1951    * order to execute STG code. At least one capability is
1952    * floating around (only SMP builds have more than one).
1953    */
1954   initCapabilities();
1955   
1956 #if defined(RTS_SUPPORTS_THREADS)
1957     /* start our haskell execution tasks */
1958 # if defined(SMP)
1959     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
1960 # else
1961     startTaskManager(0,taskStart);
1962 # endif
1963 #endif
1964
1965 #if /* defined(SMP) ||*/ defined(PAR)
1966   initSparkPools();
1967 #endif
1968
1969 #if defined(RTS_SUPPORTS_THREADS)
1970   RELEASE_LOCK(&sched_mutex);
1971 #endif
1972
1973 }
1974
1975 void
1976 exitScheduler( void )
1977 {
1978 #if defined(RTS_SUPPORTS_THREADS)
1979   stopTaskManager();
1980 #endif
1981 }
1982
1983 /* -----------------------------------------------------------------------------
1984    Managing the per-task allocation areas.
1985    
1986    Each capability comes with an allocation area.  These are
1987    fixed-length block lists into which allocation can be done.
1988
1989    ToDo: no support for two-space collection at the moment???
1990    -------------------------------------------------------------------------- */
1991
1992 /* -----------------------------------------------------------------------------
1993  * waitThread is the external interface for running a new computation
1994  * and waiting for the result.
1995  *
1996  * In the non-SMP case, we create a new main thread, push it on the 
1997  * main-thread stack, and invoke the scheduler to run it.  The
1998  * scheduler will return when the top main thread on the stack has
1999  * completed or died, and fill in the necessary fields of the
2000  * main_thread structure.
2001  *
2002  * In the SMP case, we create a main thread as before, but we then
2003  * create a new condition variable and sleep on it.  When our new
2004  * main thread has completed, we'll be woken up and the status/result
2005  * will be in the main_thread struct.
2006  * -------------------------------------------------------------------------- */
2007
2008 int 
2009 howManyThreadsAvail ( void )
2010 {
2011    int i = 0;
2012    StgTSO* q;
2013    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2014       i++;
2015    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2016       i++;
2017    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2018       i++;
2019    return i;
2020 }
2021
2022 void
2023 finishAllThreads ( void )
2024 {
2025    do {
2026       while (run_queue_hd != END_TSO_QUEUE) {
2027          waitThread ( run_queue_hd, NULL);
2028       }
2029       while (blocked_queue_hd != END_TSO_QUEUE) {
2030          waitThread ( blocked_queue_hd, NULL);
2031       }
2032       while (sleeping_queue != END_TSO_QUEUE) {
2033          waitThread ( blocked_queue_hd, NULL);
2034       }
2035    } while 
2036       (blocked_queue_hd != END_TSO_QUEUE || 
2037        run_queue_hd     != END_TSO_QUEUE ||
2038        sleeping_queue   != END_TSO_QUEUE);
2039 }
2040
2041 SchedulerStatus
2042 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2043
2044 #if defined(THREADED_RTS)
2045   return waitThread_(tso,ret, rtsFalse);
2046 #else
2047   return waitThread_(tso,ret);
2048 #endif
2049 }
2050
2051 SchedulerStatus
2052 waitThread_(StgTSO *tso,
2053             /*out*/StgClosure **ret
2054 #if defined(THREADED_RTS)
2055             , rtsBool blockWaiting
2056 #endif
2057            )
2058 {
2059   StgMainThread *m;
2060   SchedulerStatus stat;
2061
2062   ACQUIRE_LOCK(&sched_mutex);
2063   
2064   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2065
2066   m->tso = tso;
2067   m->ret = ret;
2068   m->stat = NoStatus;
2069 #if defined(RTS_SUPPORTS_THREADS)
2070   initCondition(&m->wakeup);
2071 #endif
2072
2073   m->link = main_threads;
2074   main_threads = m;
2075
2076   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2077
2078 #if defined(RTS_SUPPORTS_THREADS)
2079
2080 # if defined(THREADED_RTS)
2081   if (!blockWaiting) {
2082     /* In the threaded case, the OS thread that called main()
2083      * gets to enter the RTS directly without going via another
2084      * task/thread.
2085      */
2086     RELEASE_LOCK(&sched_mutex);
2087     schedule();
2088     ASSERT(m->stat != NoStatus);
2089   } else 
2090 # endif
2091   {
2092     IF_DEBUG(scheduler, sched_belch("sfoo"));
2093     do {
2094       waitCondition(&m->wakeup, &sched_mutex);
2095     } while (m->stat == NoStatus);
2096   }
2097 #elif defined(GRAN)
2098   /* GranSim specific init */
2099   CurrentTSO = m->tso;                // the TSO to run
2100   procStatus[MainProc] = Busy;        // status of main PE
2101   CurrentProc = MainProc;             // PE to run it on
2102
2103   schedule();
2104 #else
2105   RELEASE_LOCK(&sched_mutex);
2106   schedule();
2107   ASSERT(m->stat != NoStatus);
2108 #endif
2109
2110   stat = m->stat;
2111
2112 #if defined(RTS_SUPPORTS_THREADS)
2113   closeCondition(&m->wakeup);
2114 #endif
2115
2116   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2117                               m->tso->id));
2118   free(m);
2119
2120 #if defined(THREADED_RTS)
2121   if (blockWaiting) 
2122 #endif
2123     RELEASE_LOCK(&sched_mutex);
2124
2125   return stat;
2126 }
2127
2128 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2129 //@subsection Run queue code 
2130
2131 #if 0
2132 /* 
2133    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2134        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2135        implicit global variable that has to be correct when calling these
2136        fcts -- HWL 
2137 */
2138
2139 /* Put the new thread on the head of the runnable queue.
2140  * The caller of createThread better push an appropriate closure
2141  * on this thread's stack before the scheduler is invoked.
2142  */
2143 static /* inline */ void
2144 add_to_run_queue(tso)
2145 StgTSO* tso; 
2146 {
2147   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2148   tso->link = run_queue_hd;
2149   run_queue_hd = tso;
2150   if (run_queue_tl == END_TSO_QUEUE) {
2151     run_queue_tl = tso;
2152   }
2153 }
2154
2155 /* Put the new thread at the end of the runnable queue. */
2156 static /* inline */ void
2157 push_on_run_queue(tso)
2158 StgTSO* tso; 
2159 {
2160   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2161   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2162   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2163   if (run_queue_hd == END_TSO_QUEUE) {
2164     run_queue_hd = tso;
2165   } else {
2166     run_queue_tl->link = tso;
2167   }
2168   run_queue_tl = tso;
2169 }
2170
2171 /* 
2172    Should be inlined because it's used very often in schedule.  The tso
2173    argument is actually only needed in GranSim, where we want to have the
2174    possibility to schedule *any* TSO on the run queue, irrespective of the
2175    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2176    the run queue and dequeue the tso, adjusting the links in the queue. 
2177 */
2178 //@cindex take_off_run_queue
2179 static /* inline */ StgTSO*
2180 take_off_run_queue(StgTSO *tso) {
2181   StgTSO *t, *prev;
2182
2183   /* 
2184      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2185
2186      if tso is specified, unlink that tso from the run_queue (doesn't have
2187      to be at the beginning of the queue); GranSim only 
2188   */
2189   if (tso!=END_TSO_QUEUE) {
2190     /* find tso in queue */
2191     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2192          t!=END_TSO_QUEUE && t!=tso;
2193          prev=t, t=t->link) 
2194       /* nothing */ ;
2195     ASSERT(t==tso);
2196     /* now actually dequeue the tso */
2197     if (prev!=END_TSO_QUEUE) {
2198       ASSERT(run_queue_hd!=t);
2199       prev->link = t->link;
2200     } else {
2201       /* t is at beginning of thread queue */
2202       ASSERT(run_queue_hd==t);
2203       run_queue_hd = t->link;
2204     }
2205     /* t is at end of thread queue */
2206     if (t->link==END_TSO_QUEUE) {
2207       ASSERT(t==run_queue_tl);
2208       run_queue_tl = prev;
2209     } else {
2210       ASSERT(run_queue_tl!=t);
2211     }
2212     t->link = END_TSO_QUEUE;
2213   } else {
2214     /* take tso from the beginning of the queue; std concurrent code */
2215     t = run_queue_hd;
2216     if (t != END_TSO_QUEUE) {
2217       run_queue_hd = t->link;
2218       t->link = END_TSO_QUEUE;
2219       if (run_queue_hd == END_TSO_QUEUE) {
2220         run_queue_tl = END_TSO_QUEUE;
2221       }
2222     }
2223   }
2224   return t;
2225 }
2226
2227 #endif /* 0 */
2228
2229 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2230 //@subsection Garbage Collextion Routines
2231
2232 /* ---------------------------------------------------------------------------
2233    Where are the roots that we know about?
2234
2235         - all the threads on the runnable queue
2236         - all the threads on the blocked queue
2237         - all the threads on the sleeping queue
2238         - all the thread currently executing a _ccall_GC
2239         - all the "main threads"
2240      
2241    ------------------------------------------------------------------------ */
2242
2243 /* This has to be protected either by the scheduler monitor, or by the
2244         garbage collection monitor (probably the latter).
2245         KH @ 25/10/99
2246 */
2247
2248 void
2249 GetRoots(evac_fn evac)
2250 {
2251 #if defined(GRAN)
2252   {
2253     nat i;
2254     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2255       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2256           evac((StgClosure **)&run_queue_hds[i]);
2257       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2258           evac((StgClosure **)&run_queue_tls[i]);
2259       
2260       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2261           evac((StgClosure **)&blocked_queue_hds[i]);
2262       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2263           evac((StgClosure **)&blocked_queue_tls[i]);
2264       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2265           evac((StgClosure **)&ccalling_threads[i]);
2266     }
2267   }
2268
2269   markEventQueue();
2270
2271 #else /* !GRAN */
2272   if (run_queue_hd != END_TSO_QUEUE) {
2273       ASSERT(run_queue_tl != END_TSO_QUEUE);
2274       evac((StgClosure **)&run_queue_hd);
2275       evac((StgClosure **)&run_queue_tl);
2276   }
2277   
2278   if (blocked_queue_hd != END_TSO_QUEUE) {
2279       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2280       evac((StgClosure **)&blocked_queue_hd);
2281       evac((StgClosure **)&blocked_queue_tl);
2282   }
2283   
2284   if (sleeping_queue != END_TSO_QUEUE) {
2285       evac((StgClosure **)&sleeping_queue);
2286   }
2287 #endif 
2288
2289   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2290       evac((StgClosure **)&suspended_ccalling_threads);
2291   }
2292
2293 #if defined(PAR) || defined(GRAN)
2294   markSparkQueue(evac);
2295 #endif
2296 }
2297
2298 /* -----------------------------------------------------------------------------
2299    performGC
2300
2301    This is the interface to the garbage collector from Haskell land.
2302    We provide this so that external C code can allocate and garbage
2303    collect when called from Haskell via _ccall_GC.
2304
2305    It might be useful to provide an interface whereby the programmer
2306    can specify more roots (ToDo).
2307    
2308    This needs to be protected by the GC condition variable above.  KH.
2309    -------------------------------------------------------------------------- */
2310
2311 void (*extra_roots)(evac_fn);
2312
2313 void
2314 performGC(void)
2315 {
2316   /* Obligated to hold this lock upon entry */
2317   ACQUIRE_LOCK(&sched_mutex);
2318   GarbageCollect(GetRoots,rtsFalse);
2319   RELEASE_LOCK(&sched_mutex);
2320 }
2321
2322 void
2323 performMajorGC(void)
2324 {
2325   ACQUIRE_LOCK(&sched_mutex);
2326   GarbageCollect(GetRoots,rtsTrue);
2327   RELEASE_LOCK(&sched_mutex);
2328 }
2329
2330 static void
2331 AllRoots(evac_fn evac)
2332 {
2333     GetRoots(evac);             // the scheduler's roots
2334     extra_roots(evac);          // the user's roots
2335 }
2336
2337 void
2338 performGCWithRoots(void (*get_roots)(evac_fn))
2339 {
2340   ACQUIRE_LOCK(&sched_mutex);
2341   extra_roots = get_roots;
2342   GarbageCollect(AllRoots,rtsFalse);
2343   RELEASE_LOCK(&sched_mutex);
2344 }
2345
2346 /* -----------------------------------------------------------------------------
2347    Stack overflow
2348
2349    If the thread has reached its maximum stack size, then raise the
2350    StackOverflow exception in the offending thread.  Otherwise
2351    relocate the TSO into a larger chunk of memory and adjust its stack
2352    size appropriately.
2353    -------------------------------------------------------------------------- */
2354
2355 static StgTSO *
2356 threadStackOverflow(StgTSO *tso)
2357 {
2358   nat new_stack_size, new_tso_size, diff, stack_words;
2359   StgPtr new_sp;
2360   StgTSO *dest;
2361
2362   IF_DEBUG(sanity,checkTSO(tso));
2363   if (tso->stack_size >= tso->max_stack_size) {
2364
2365     IF_DEBUG(gc,
2366              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2367                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2368              /* If we're debugging, just print out the top of the stack */
2369              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2370                                               tso->sp+64)));
2371
2372     /* Send this thread the StackOverflow exception */
2373     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2374     return tso;
2375   }
2376
2377   /* Try to double the current stack size.  If that takes us over the
2378    * maximum stack size for this thread, then use the maximum instead.
2379    * Finally round up so the TSO ends up as a whole number of blocks.
2380    */
2381   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2382   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2383                                        TSO_STRUCT_SIZE)/sizeof(W_);
2384   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2385   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2386
2387   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2388
2389   dest = (StgTSO *)allocate(new_tso_size);
2390   TICK_ALLOC_TSO(new_stack_size,0);
2391
2392   /* copy the TSO block and the old stack into the new area */
2393   memcpy(dest,tso,TSO_STRUCT_SIZE);
2394   stack_words = tso->stack + tso->stack_size - tso->sp;
2395   new_sp = (P_)dest + new_tso_size - stack_words;
2396   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2397
2398   /* relocate the stack pointers... */
2399   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2400   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2401   dest->sp    = new_sp;
2402   dest->stack_size = new_stack_size;
2403         
2404   /* and relocate the update frame list */
2405   relocate_stack(dest, diff);
2406
2407   /* Mark the old TSO as relocated.  We have to check for relocated
2408    * TSOs in the garbage collector and any primops that deal with TSOs.
2409    *
2410    * It's important to set the sp and su values to just beyond the end
2411    * of the stack, so we don't attempt to scavenge any part of the
2412    * dead TSO's stack.
2413    */
2414   tso->what_next = ThreadRelocated;
2415   tso->link = dest;
2416   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2417   tso->su = (StgUpdateFrame *)tso->sp;
2418   tso->why_blocked = NotBlocked;
2419   dest->mut_link = NULL;
2420
2421   IF_PAR_DEBUG(verbose,
2422                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2423                      tso->id, tso, tso->stack_size);
2424                /* If we're debugging, just print out the top of the stack */
2425                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2426                                                 tso->sp+64)));
2427   
2428   IF_DEBUG(sanity,checkTSO(tso));
2429 #if 0
2430   IF_DEBUG(scheduler,printTSO(dest));
2431 #endif
2432
2433   return dest;
2434 }
2435
2436 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2437 //@subsection Blocking Queue Routines
2438
2439 /* ---------------------------------------------------------------------------
2440    Wake up a queue that was blocked on some resource.
2441    ------------------------------------------------------------------------ */
2442
2443 #if defined(GRAN)
2444 static inline void
2445 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2446 {
2447 }
2448 #elif defined(PAR)
2449 static inline void
2450 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2451 {
2452   /* write RESUME events to log file and
2453      update blocked and fetch time (depending on type of the orig closure) */
2454   if (RtsFlags.ParFlags.ParStats.Full) {
2455     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2456                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2457                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2458     if (EMPTY_RUN_QUEUE())
2459       emitSchedule = rtsTrue;
2460
2461     switch (get_itbl(node)->type) {
2462         case FETCH_ME_BQ:
2463           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2464           break;
2465         case RBH:
2466         case FETCH_ME:
2467         case BLACKHOLE_BQ:
2468           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2469           break;
2470 #ifdef DIST
2471         case MVAR:
2472           break;
2473 #endif    
2474         default:
2475           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2476         }
2477       }
2478 }
2479 #endif
2480
2481 #if defined(GRAN)
2482 static StgBlockingQueueElement *
2483 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2484 {
2485     StgTSO *tso;
2486     PEs node_loc, tso_loc;
2487
2488     node_loc = where_is(node); // should be lifted out of loop
2489     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2490     tso_loc = where_is((StgClosure *)tso);
2491     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2492       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2493       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2494       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2495       // insertThread(tso, node_loc);
2496       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2497                 ResumeThread,
2498                 tso, node, (rtsSpark*)NULL);
2499       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2500       // len_local++;
2501       // len++;
2502     } else { // TSO is remote (actually should be FMBQ)
2503       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2504                                   RtsFlags.GranFlags.Costs.gunblocktime +
2505                                   RtsFlags.GranFlags.Costs.latency;
2506       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2507                 UnblockThread,
2508                 tso, node, (rtsSpark*)NULL);
2509       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2510       // len++;
2511     }
2512     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2513     IF_GRAN_DEBUG(bq,
2514                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2515                           (node_loc==tso_loc ? "Local" : "Global"), 
2516                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2517     tso->block_info.closure = NULL;
2518     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2519                              tso->id, tso));
2520 }
2521 #elif defined(PAR)
2522 static StgBlockingQueueElement *
2523 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2524 {
2525     StgBlockingQueueElement *next;
2526
2527     switch (get_itbl(bqe)->type) {
2528     case TSO:
2529       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2530       /* if it's a TSO just push it onto the run_queue */
2531       next = bqe->link;
2532       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2533       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2534       THREAD_RUNNABLE();
2535       unblockCount(bqe, node);
2536       /* reset blocking status after dumping event */
2537       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2538       break;
2539
2540     case BLOCKED_FETCH:
2541       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2542       next = bqe->link;
2543       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2544       PendingFetches = (StgBlockedFetch *)bqe;
2545       break;
2546
2547 # if defined(DEBUG)
2548       /* can ignore this case in a non-debugging setup; 
2549          see comments on RBHSave closures above */
2550     case CONSTR:
2551       /* check that the closure is an RBHSave closure */
2552       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2553              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2554              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2555       break;
2556
2557     default:
2558       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2559            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2560            (StgClosure *)bqe);
2561 # endif
2562     }
2563   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2564   return next;
2565 }
2566
2567 #else /* !GRAN && !PAR */
2568 static StgTSO *
2569 unblockOneLocked(StgTSO *tso)
2570 {
2571   StgTSO *next;
2572
2573   ASSERT(get_itbl(tso)->type == TSO);
2574   ASSERT(tso->why_blocked != NotBlocked);
2575   tso->why_blocked = NotBlocked;
2576   next = tso->link;
2577   PUSH_ON_RUN_QUEUE(tso);
2578   THREAD_RUNNABLE();
2579   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2580   return next;
2581 }
2582 #endif
2583
2584 #if defined(GRAN) || defined(PAR)
2585 inline StgBlockingQueueElement *
2586 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2587 {
2588   ACQUIRE_LOCK(&sched_mutex);
2589   bqe = unblockOneLocked(bqe, node);
2590   RELEASE_LOCK(&sched_mutex);
2591   return bqe;
2592 }
2593 #else
2594 inline StgTSO *
2595 unblockOne(StgTSO *tso)
2596 {
2597   ACQUIRE_LOCK(&sched_mutex);
2598   tso = unblockOneLocked(tso);
2599   RELEASE_LOCK(&sched_mutex);
2600   return tso;
2601 }
2602 #endif
2603
2604 #if defined(GRAN)
2605 void 
2606 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2607 {
2608   StgBlockingQueueElement *bqe;
2609   PEs node_loc;
2610   nat len = 0; 
2611
2612   IF_GRAN_DEBUG(bq, 
2613                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2614                       node, CurrentProc, CurrentTime[CurrentProc], 
2615                       CurrentTSO->id, CurrentTSO));
2616
2617   node_loc = where_is(node);
2618
2619   ASSERT(q == END_BQ_QUEUE ||
2620          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2621          get_itbl(q)->type == CONSTR); // closure (type constructor)
2622   ASSERT(is_unique(node));
2623
2624   /* FAKE FETCH: magically copy the node to the tso's proc;
2625      no Fetch necessary because in reality the node should not have been 
2626      moved to the other PE in the first place
2627   */
2628   if (CurrentProc!=node_loc) {
2629     IF_GRAN_DEBUG(bq, 
2630                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2631                         node, node_loc, CurrentProc, CurrentTSO->id, 
2632                         // CurrentTSO, where_is(CurrentTSO),
2633                         node->header.gran.procs));
2634     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2635     IF_GRAN_DEBUG(bq, 
2636                   belch("## new bitmask of node %p is %#x",
2637                         node, node->header.gran.procs));
2638     if (RtsFlags.GranFlags.GranSimStats.Global) {
2639       globalGranStats.tot_fake_fetches++;
2640     }
2641   }
2642
2643   bqe = q;
2644   // ToDo: check: ASSERT(CurrentProc==node_loc);
2645   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2646     //next = bqe->link;
2647     /* 
2648        bqe points to the current element in the queue
2649        next points to the next element in the queue
2650     */
2651     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2652     //tso_loc = where_is(tso);
2653     len++;
2654     bqe = unblockOneLocked(bqe, node);
2655   }
2656
2657   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2658      the closure to make room for the anchor of the BQ */
2659   if (bqe!=END_BQ_QUEUE) {
2660     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2661     /*
2662     ASSERT((info_ptr==&RBH_Save_0_info) ||
2663            (info_ptr==&RBH_Save_1_info) ||
2664            (info_ptr==&RBH_Save_2_info));
2665     */
2666     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2667     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2668     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2669
2670     IF_GRAN_DEBUG(bq,
2671                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2672                         node, info_type(node)));
2673   }
2674
2675   /* statistics gathering */
2676   if (RtsFlags.GranFlags.GranSimStats.Global) {
2677     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2678     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2679     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2680     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2681   }
2682   IF_GRAN_DEBUG(bq,
2683                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2684                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2685 }
2686 #elif defined(PAR)
2687 void 
2688 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2689 {
2690   StgBlockingQueueElement *bqe;
2691
2692   ACQUIRE_LOCK(&sched_mutex);
2693
2694   IF_PAR_DEBUG(verbose, 
2695                belch("##-_ AwBQ for node %p on [%x]: ",
2696                      node, mytid));
2697 #ifdef DIST  
2698   //RFP
2699   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2700     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2701     return;
2702   }
2703 #endif
2704   
2705   ASSERT(q == END_BQ_QUEUE ||
2706          get_itbl(q)->type == TSO ||           
2707          get_itbl(q)->type == BLOCKED_FETCH || 
2708          get_itbl(q)->type == CONSTR); 
2709
2710   bqe = q;
2711   while (get_itbl(bqe)->type==TSO || 
2712          get_itbl(bqe)->type==BLOCKED_FETCH) {
2713     bqe = unblockOneLocked(bqe, node);
2714   }
2715   RELEASE_LOCK(&sched_mutex);
2716 }
2717
2718 #else   /* !GRAN && !PAR */
2719 void
2720 awakenBlockedQueue(StgTSO *tso)
2721 {
2722   ACQUIRE_LOCK(&sched_mutex);
2723   while (tso != END_TSO_QUEUE) {
2724     tso = unblockOneLocked(tso);
2725   }
2726   RELEASE_LOCK(&sched_mutex);
2727 }
2728 #endif
2729
2730 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2731 //@subsection Exception Handling Routines
2732
2733 /* ---------------------------------------------------------------------------
2734    Interrupt execution
2735    - usually called inside a signal handler so it mustn't do anything fancy.   
2736    ------------------------------------------------------------------------ */
2737
2738 void
2739 interruptStgRts(void)
2740 {
2741     interrupted    = 1;
2742     context_switch = 1;
2743 }
2744
2745 /* -----------------------------------------------------------------------------
2746    Unblock a thread
2747
2748    This is for use when we raise an exception in another thread, which
2749    may be blocked.
2750    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2751    -------------------------------------------------------------------------- */
2752
2753 #if defined(GRAN) || defined(PAR)
2754 /*
2755   NB: only the type of the blocking queue is different in GranSim and GUM
2756       the operations on the queue-elements are the same
2757       long live polymorphism!
2758 */
2759 static void
2760 unblockThread(StgTSO *tso)
2761 {
2762   StgBlockingQueueElement *t, **last;
2763
2764   ACQUIRE_LOCK(&sched_mutex);
2765   switch (tso->why_blocked) {
2766
2767   case NotBlocked:
2768     return;  /* not blocked */
2769
2770   case BlockedOnMVar:
2771     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2772     {
2773       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2774       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2775
2776       last = (StgBlockingQueueElement **)&mvar->head;
2777       for (t = (StgBlockingQueueElement *)mvar->head; 
2778            t != END_BQ_QUEUE; 
2779            last = &t->link, last_tso = t, t = t->link) {
2780         if (t == (StgBlockingQueueElement *)tso) {
2781           *last = (StgBlockingQueueElement *)tso->link;
2782           if (mvar->tail == tso) {
2783             mvar->tail = (StgTSO *)last_tso;
2784           }
2785           goto done;
2786         }
2787       }
2788       barf("unblockThread (MVAR): TSO not found");
2789     }
2790
2791   case BlockedOnBlackHole:
2792     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2793     {
2794       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2795
2796       last = &bq->blocking_queue;
2797       for (t = bq->blocking_queue; 
2798            t != END_BQ_QUEUE; 
2799            last = &t->link, t = t->link) {
2800         if (t == (StgBlockingQueueElement *)tso) {
2801           *last = (StgBlockingQueueElement *)tso->link;
2802           goto done;
2803         }
2804       }
2805       barf("unblockThread (BLACKHOLE): TSO not found");
2806     }
2807
2808   case BlockedOnException:
2809     {
2810       StgTSO *target  = tso->block_info.tso;
2811
2812       ASSERT(get_itbl(target)->type == TSO);
2813
2814       if (target->what_next == ThreadRelocated) {
2815           target = target->link;
2816           ASSERT(get_itbl(target)->type == TSO);
2817       }
2818
2819       ASSERT(target->blocked_exceptions != NULL);
2820
2821       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2822       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2823            t != END_BQ_QUEUE; 
2824            last = &t->link, t = t->link) {
2825         ASSERT(get_itbl(t)->type == TSO);
2826         if (t == (StgBlockingQueueElement *)tso) {
2827           *last = (StgBlockingQueueElement *)tso->link;
2828           goto done;
2829         }
2830       }
2831       barf("unblockThread (Exception): TSO not found");
2832     }
2833
2834   case BlockedOnRead:
2835   case BlockedOnWrite:
2836     {
2837       /* take TSO off blocked_queue */
2838       StgBlockingQueueElement *prev = NULL;
2839       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2840            prev = t, t = t->link) {
2841         if (t == (StgBlockingQueueElement *)tso) {
2842           if (prev == NULL) {
2843             blocked_queue_hd = (StgTSO *)t->link;
2844             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2845               blocked_queue_tl = END_TSO_QUEUE;
2846             }
2847           } else {
2848             prev->link = t->link;
2849             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2850               blocked_queue_tl = (StgTSO *)prev;
2851             }
2852           }
2853           goto done;
2854         }
2855       }
2856       barf("unblockThread (I/O): TSO not found");
2857     }
2858
2859   case BlockedOnDelay:
2860     {
2861       /* take TSO off sleeping_queue */
2862       StgBlockingQueueElement *prev = NULL;
2863       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2864            prev = t, t = t->link) {
2865         if (t == (StgBlockingQueueElement *)tso) {
2866           if (prev == NULL) {
2867             sleeping_queue = (StgTSO *)t->link;
2868           } else {
2869             prev->link = t->link;
2870           }
2871           goto done;
2872         }
2873       }
2874       barf("unblockThread (I/O): TSO not found");
2875     }
2876
2877   default:
2878     barf("unblockThread");
2879   }
2880
2881  done:
2882   tso->link = END_TSO_QUEUE;
2883   tso->why_blocked = NotBlocked;
2884   tso->block_info.closure = NULL;
2885   PUSH_ON_RUN_QUEUE(tso);
2886   RELEASE_LOCK(&sched_mutex);
2887 }
2888 #else
2889 static void
2890 unblockThread(StgTSO *tso)
2891 {
2892   StgTSO *t, **last;
2893
2894   ACQUIRE_LOCK(&sched_mutex);
2895   switch (tso->why_blocked) {
2896
2897   case NotBlocked:
2898     return;  /* not blocked */
2899
2900   case BlockedOnMVar:
2901     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2902     {
2903       StgTSO *last_tso = END_TSO_QUEUE;
2904       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2905
2906       last = &mvar->head;
2907       for (t = mvar->head; t != END_TSO_QUEUE; 
2908            last = &t->link, last_tso = t, t = t->link) {
2909         if (t == tso) {
2910           *last = tso->link;
2911           if (mvar->tail == tso) {
2912             mvar->tail = last_tso;
2913           }
2914           goto done;
2915         }
2916       }
2917       barf("unblockThread (MVAR): TSO not found");
2918     }
2919
2920   case BlockedOnBlackHole:
2921     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2922     {
2923       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2924
2925       last = &bq->blocking_queue;
2926       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2927            last = &t->link, t = t->link) {
2928         if (t == tso) {
2929           *last = tso->link;
2930           goto done;
2931         }
2932       }
2933       barf("unblockThread (BLACKHOLE): TSO not found");
2934     }
2935
2936   case BlockedOnException:
2937     {
2938       StgTSO *target  = tso->block_info.tso;
2939
2940       ASSERT(get_itbl(target)->type == TSO);
2941
2942       while (target->what_next == ThreadRelocated) {
2943           target = target->link;
2944           ASSERT(get_itbl(target)->type == TSO);
2945       }
2946       
2947       ASSERT(target->blocked_exceptions != NULL);
2948
2949       last = &target->blocked_exceptions;
2950       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2951            last = &t->link, t = t->link) {
2952         ASSERT(get_itbl(t)->type == TSO);
2953         if (t == tso) {
2954           *last = tso->link;
2955           goto done;
2956         }
2957       }
2958       barf("unblockThread (Exception): TSO not found");
2959     }
2960
2961   case BlockedOnRead:
2962   case BlockedOnWrite:
2963     {
2964       StgTSO *prev = NULL;
2965       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2966            prev = t, t = t->link) {
2967         if (t == tso) {
2968           if (prev == NULL) {
2969             blocked_queue_hd = t->link;
2970             if (blocked_queue_tl == t) {
2971               blocked_queue_tl = END_TSO_QUEUE;
2972             }
2973           } else {
2974             prev->link = t->link;
2975             if (blocked_queue_tl == t) {
2976               blocked_queue_tl = prev;
2977             }
2978           }
2979           goto done;
2980         }
2981       }
2982       barf("unblockThread (I/O): TSO not found");
2983     }
2984
2985   case BlockedOnDelay:
2986     {
2987       StgTSO *prev = NULL;
2988       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2989            prev = t, t = t->link) {
2990         if (t == tso) {
2991           if (prev == NULL) {
2992             sleeping_queue = t->link;
2993           } else {
2994             prev->link = t->link;
2995           }
2996           goto done;
2997         }
2998       }
2999       barf("unblockThread (I/O): TSO not found");
3000     }
3001
3002   default:
3003     barf("unblockThread");
3004   }
3005
3006  done:
3007   tso->link = END_TSO_QUEUE;
3008   tso->why_blocked = NotBlocked;
3009   tso->block_info.closure = NULL;
3010   PUSH_ON_RUN_QUEUE(tso);
3011   RELEASE_LOCK(&sched_mutex);
3012 }
3013 #endif
3014
3015 /* -----------------------------------------------------------------------------
3016  * raiseAsync()
3017  *
3018  * The following function implements the magic for raising an
3019  * asynchronous exception in an existing thread.
3020  *
3021  * We first remove the thread from any queue on which it might be
3022  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3023  *
3024  * We strip the stack down to the innermost CATCH_FRAME, building
3025  * thunks in the heap for all the active computations, so they can 
3026  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3027  * an application of the handler to the exception, and push it on
3028  * the top of the stack.
3029  * 
3030  * How exactly do we save all the active computations?  We create an
3031  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3032  * AP_UPDs pushes everything from the corresponding update frame
3033  * upwards onto the stack.  (Actually, it pushes everything up to the
3034  * next update frame plus a pointer to the next AP_UPD object.
3035  * Entering the next AP_UPD object pushes more onto the stack until we
3036  * reach the last AP_UPD object - at which point the stack should look
3037  * exactly as it did when we killed the TSO and we can continue
3038  * execution by entering the closure on top of the stack.
3039  *
3040  * We can also kill a thread entirely - this happens if either (a) the 
3041  * exception passed to raiseAsync is NULL, or (b) there's no
3042  * CATCH_FRAME on the stack.  In either case, we strip the entire
3043  * stack and replace the thread with a zombie.
3044  *
3045  * -------------------------------------------------------------------------- */
3046  
3047 void 
3048 deleteThread(StgTSO *tso)
3049 {
3050   raiseAsync(tso,NULL);
3051 }
3052
3053 void
3054 raiseAsync(StgTSO *tso, StgClosure *exception)
3055 {
3056   StgUpdateFrame* su = tso->su;
3057   StgPtr          sp = tso->sp;
3058   
3059   /* Thread already dead? */
3060   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3061     return;
3062   }
3063
3064   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3065
3066   /* Remove it from any blocking queues */
3067   unblockThread(tso);
3068
3069   /* The stack freezing code assumes there's a closure pointer on
3070    * the top of the stack.  This isn't always the case with compiled
3071    * code, so we have to push a dummy closure on the top which just
3072    * returns to the next return address on the stack.
3073    */
3074   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3075     *(--sp) = (W_)&stg_dummy_ret_closure;
3076   }
3077
3078   while (1) {
3079     nat words = ((P_)su - (P_)sp) - 1;
3080     nat i;
3081     StgAP_UPD * ap;
3082
3083     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3084      * then build the THUNK raise(exception), and leave it on
3085      * top of the CATCH_FRAME ready to enter.
3086      */
3087     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3088 #ifdef PROFILING
3089       StgCatchFrame *cf = (StgCatchFrame *)su;
3090 #endif
3091       StgClosure *raise;
3092
3093       /* we've got an exception to raise, so let's pass it to the
3094        * handler in this frame.
3095        */
3096       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3097       TICK_ALLOC_SE_THK(1,0);
3098       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3099       raise->payload[0] = exception;
3100
3101       /* throw away the stack from Sp up to the CATCH_FRAME.
3102        */
3103       sp = (P_)su - 1;
3104
3105       /* Ensure that async excpetions are blocked now, so we don't get
3106        * a surprise exception before we get around to executing the
3107        * handler.
3108        */
3109       if (tso->blocked_exceptions == NULL) {
3110           tso->blocked_exceptions = END_TSO_QUEUE;
3111       }
3112
3113       /* Put the newly-built THUNK on top of the stack, ready to execute
3114        * when the thread restarts.
3115        */
3116       sp[0] = (W_)raise;
3117       tso->sp = sp;
3118       tso->su = su;
3119       tso->what_next = ThreadEnterGHC;
3120       IF_DEBUG(sanity, checkTSO(tso));
3121       return;
3122     }
3123
3124     /* First build an AP_UPD consisting of the stack chunk above the
3125      * current update frame, with the top word on the stack as the
3126      * fun field.
3127      */
3128     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3129     
3130     ASSERT(words >= 0);
3131     
3132     ap->n_args = words;
3133     ap->fun    = (StgClosure *)sp[0];
3134     sp++;
3135     for(i=0; i < (nat)words; ++i) {
3136       ap->payload[i] = (StgClosure *)*sp++;
3137     }
3138     
3139     switch (get_itbl(su)->type) {
3140       
3141     case UPDATE_FRAME:
3142       {
3143         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3144         TICK_ALLOC_UP_THK(words+1,0);
3145         
3146         IF_DEBUG(scheduler,
3147                  fprintf(stderr,  "scheduler: Updating ");
3148                  printPtr((P_)su->updatee); 
3149                  fprintf(stderr,  " with ");
3150                  printObj((StgClosure *)ap);
3151                  );
3152         
3153         /* Replace the updatee with an indirection - happily
3154          * this will also wake up any threads currently
3155          * waiting on the result.
3156          *
3157          * Warning: if we're in a loop, more than one update frame on
3158          * the stack may point to the same object.  Be careful not to
3159          * overwrite an IND_OLDGEN in this case, because we'll screw
3160          * up the mutable lists.  To be on the safe side, don't
3161          * overwrite any kind of indirection at all.  See also
3162          * threadSqueezeStack in GC.c, where we have to make a similar
3163          * check.
3164          */
3165         if (!closure_IND(su->updatee)) {
3166             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3167         }
3168         su = su->link;
3169         sp += sizeofW(StgUpdateFrame) -1;
3170         sp[0] = (W_)ap; /* push onto stack */
3171         break;
3172       }
3173
3174     case CATCH_FRAME:
3175       {
3176         StgCatchFrame *cf = (StgCatchFrame *)su;
3177         StgClosure* o;
3178         
3179         /* We want a PAP, not an AP_UPD.  Fortunately, the
3180          * layout's the same.
3181          */
3182         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3183         TICK_ALLOC_UPD_PAP(words+1,0);
3184         
3185         /* now build o = FUN(catch,ap,handler) */
3186         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3187         TICK_ALLOC_FUN(2,0);
3188         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3189         o->payload[0] = (StgClosure *)ap;
3190         o->payload[1] = cf->handler;
3191         
3192         IF_DEBUG(scheduler,
3193                  fprintf(stderr,  "scheduler: Built ");
3194                  printObj((StgClosure *)o);
3195                  );
3196         
3197         /* pop the old handler and put o on the stack */
3198         su = cf->link;
3199         sp += sizeofW(StgCatchFrame) - 1;
3200         sp[0] = (W_)o;
3201         break;
3202       }
3203       
3204     case SEQ_FRAME:
3205       {
3206         StgSeqFrame *sf = (StgSeqFrame *)su;
3207         StgClosure* o;
3208         
3209         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3210         TICK_ALLOC_UPD_PAP(words+1,0);
3211         
3212         /* now build o = FUN(seq,ap) */
3213         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3214         TICK_ALLOC_SE_THK(1,0);
3215         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3216         o->payload[0] = (StgClosure *)ap;
3217         
3218         IF_DEBUG(scheduler,
3219                  fprintf(stderr,  "scheduler: Built ");
3220                  printObj((StgClosure *)o);
3221                  );
3222         
3223         /* pop the old handler and put o on the stack */
3224         su = sf->link;
3225         sp += sizeofW(StgSeqFrame) - 1;
3226         sp[0] = (W_)o;
3227         break;
3228       }
3229       
3230     case STOP_FRAME:
3231       /* We've stripped the entire stack, the thread is now dead. */
3232       sp += sizeofW(StgStopFrame) - 1;
3233       sp[0] = (W_)exception;    /* save the exception */
3234       tso->what_next = ThreadKilled;
3235       tso->su = (StgUpdateFrame *)(sp+1);
3236       tso->sp = sp;
3237       return;
3238
3239     default:
3240       barf("raiseAsync");
3241     }
3242   }
3243   barf("raiseAsync");
3244 }
3245
3246 /* -----------------------------------------------------------------------------
3247    resurrectThreads is called after garbage collection on the list of
3248    threads found to be garbage.  Each of these threads will be woken
3249    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3250    on an MVar, or NonTermination if the thread was blocked on a Black
3251    Hole.
3252    -------------------------------------------------------------------------- */
3253
3254 void
3255 resurrectThreads( StgTSO *threads )
3256 {
3257   StgTSO *tso, *next;
3258
3259   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3260     next = tso->global_link;
3261     tso->global_link = all_threads;
3262     all_threads = tso;
3263     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3264
3265     switch (tso->why_blocked) {
3266     case BlockedOnMVar:
3267     case BlockedOnException:
3268       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3269       break;
3270     case BlockedOnBlackHole:
3271       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3272       break;
3273     case NotBlocked:
3274       /* This might happen if the thread was blocked on a black hole
3275        * belonging to a thread that we've just woken up (raiseAsync
3276        * can wake up threads, remember...).
3277        */
3278       continue;
3279     default:
3280       barf("resurrectThreads: thread blocked in a strange way");
3281     }
3282   }
3283 }
3284
3285 /* -----------------------------------------------------------------------------
3286  * Blackhole detection: if we reach a deadlock, test whether any
3287  * threads are blocked on themselves.  Any threads which are found to
3288  * be self-blocked get sent a NonTermination exception.
3289  *
3290  * This is only done in a deadlock situation in order to avoid
3291  * performance overhead in the normal case.
3292  * -------------------------------------------------------------------------- */
3293
3294 static void
3295 detectBlackHoles( void )
3296 {
3297     StgTSO *t = all_threads;
3298     StgUpdateFrame *frame;
3299     StgClosure *blocked_on;
3300
3301     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3302
3303         while (t->what_next == ThreadRelocated) {
3304             t = t->link;
3305             ASSERT(get_itbl(t)->type == TSO);
3306         }
3307       
3308         if (t->why_blocked != BlockedOnBlackHole) {
3309             continue;
3310         }
3311
3312         blocked_on = t->block_info.closure;
3313
3314         for (frame = t->su; ; frame = frame->link) {
3315             switch (get_itbl(frame)->type) {
3316
3317             case UPDATE_FRAME:
3318                 if (frame->updatee == blocked_on) {
3319                     /* We are blocking on one of our own computations, so
3320                      * send this thread the NonTermination exception.  
3321                      */
3322                     IF_DEBUG(scheduler, 
3323                              sched_belch("thread %d is blocked on itself", t->id));
3324                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3325                     goto done;
3326                 }
3327                 else {
3328                     continue;
3329                 }
3330
3331             case CATCH_FRAME:
3332             case SEQ_FRAME:
3333                 continue;
3334                 
3335             case STOP_FRAME:
3336                 break;
3337             }
3338             break;
3339         }
3340
3341     done: ;
3342     }   
3343 }
3344
3345 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3346 //@subsection Debugging Routines
3347
3348 /* -----------------------------------------------------------------------------
3349    Debugging: why is a thread blocked
3350    -------------------------------------------------------------------------- */
3351
3352 #ifdef DEBUG
3353
3354 void
3355 printThreadBlockage(StgTSO *tso)
3356 {
3357   switch (tso->why_blocked) {
3358   case BlockedOnRead:
3359     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3360     break;
3361   case BlockedOnWrite:
3362     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3363     break;
3364   case BlockedOnDelay:
3365     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3366     break;
3367   case BlockedOnMVar:
3368     fprintf(stderr,"is blocked on an MVar");
3369     break;
3370   case BlockedOnException:
3371     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3372             tso->block_info.tso->id);
3373     break;
3374   case BlockedOnBlackHole:
3375     fprintf(stderr,"is blocked on a black hole");
3376     break;
3377   case NotBlocked:
3378     fprintf(stderr,"is not blocked");
3379     break;
3380 #if defined(PAR)
3381   case BlockedOnGA:
3382     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3383             tso->block_info.closure, info_type(tso->block_info.closure));
3384     break;
3385   case BlockedOnGA_NoSend:
3386     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3387             tso->block_info.closure, info_type(tso->block_info.closure));
3388     break;
3389 #endif
3390 #if defined(RTS_SUPPORTS_THREADS)
3391   case BlockedOnCCall:
3392     fprintf(stderr,"is blocked on an external call");
3393     break;
3394 #endif
3395   default:
3396     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3397          tso->why_blocked, tso->id, tso);
3398   }
3399 }
3400
3401 void
3402 printThreadStatus(StgTSO *tso)
3403 {
3404   switch (tso->what_next) {
3405   case ThreadKilled:
3406     fprintf(stderr,"has been killed");
3407     break;
3408   case ThreadComplete:
3409     fprintf(stderr,"has completed");
3410     break;
3411   default:
3412     printThreadBlockage(tso);
3413   }
3414 }
3415
3416 void
3417 printAllThreads(void)
3418 {
3419   StgTSO *t;
3420
3421 # if defined(GRAN)
3422   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3423   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3424                        time_string, rtsFalse/*no commas!*/);
3425
3426   sched_belch("all threads at [%s]:", time_string);
3427 # elif defined(PAR)
3428   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3429   ullong_format_string(CURRENT_TIME,
3430                        time_string, rtsFalse/*no commas!*/);
3431
3432   sched_belch("all threads at [%s]:", time_string);
3433 # else
3434   sched_belch("all threads:");
3435 # endif
3436
3437   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3438     fprintf(stderr, "\tthread %d ", t->id);
3439     printThreadStatus(t);
3440     fprintf(stderr,"\n");
3441   }
3442 }
3443     
3444 /* 
3445    Print a whole blocking queue attached to node (debugging only).
3446 */
3447 //@cindex print_bq
3448 # if defined(PAR)
3449 void 
3450 print_bq (StgClosure *node)
3451 {
3452   StgBlockingQueueElement *bqe;
3453   StgTSO *tso;
3454   rtsBool end;
3455
3456   fprintf(stderr,"## BQ of closure %p (%s): ",
3457           node, info_type(node));
3458
3459   /* should cover all closures that may have a blocking queue */
3460   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3461          get_itbl(node)->type == FETCH_ME_BQ ||
3462          get_itbl(node)->type == RBH ||
3463          get_itbl(node)->type == MVAR);
3464     
3465   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3466
3467   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3468 }
3469
3470 /* 
3471    Print a whole blocking queue starting with the element bqe.
3472 */
3473 void 
3474 print_bqe (StgBlockingQueueElement *bqe)
3475 {
3476   rtsBool end;
3477
3478   /* 
3479      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3480   */
3481   for (end = (bqe==END_BQ_QUEUE);
3482        !end; // iterate until bqe points to a CONSTR
3483        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3484        bqe = end ? END_BQ_QUEUE : bqe->link) {
3485     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3486     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3487     /* types of closures that may appear in a blocking queue */
3488     ASSERT(get_itbl(bqe)->type == TSO ||           
3489            get_itbl(bqe)->type == BLOCKED_FETCH || 
3490            get_itbl(bqe)->type == CONSTR); 
3491     /* only BQs of an RBH end with an RBH_Save closure */
3492     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3493
3494     switch (get_itbl(bqe)->type) {
3495     case TSO:
3496       fprintf(stderr," TSO %u (%x),",
3497               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3498       break;
3499     case BLOCKED_FETCH:
3500       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3501               ((StgBlockedFetch *)bqe)->node, 
3502               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3503               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3504               ((StgBlockedFetch *)bqe)->ga.weight);
3505       break;
3506     case CONSTR:
3507       fprintf(stderr," %s (IP %p),",
3508               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3509                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3510                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3511                "RBH_Save_?"), get_itbl(bqe));
3512       break;
3513     default:
3514       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3515            info_type((StgClosure *)bqe)); // , node, info_type(node));
3516       break;
3517     }
3518   } /* for */
3519   fputc('\n', stderr);
3520 }
3521 # elif defined(GRAN)
3522 void 
3523 print_bq (StgClosure *node)
3524 {
3525   StgBlockingQueueElement *bqe;
3526   PEs node_loc, tso_loc;
3527   rtsBool end;
3528
3529   /* should cover all closures that may have a blocking queue */
3530   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3531          get_itbl(node)->type == FETCH_ME_BQ ||
3532          get_itbl(node)->type == RBH);
3533     
3534   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3535   node_loc = where_is(node);
3536
3537   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3538           node, info_type(node), node_loc);
3539
3540   /* 
3541      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3542   */
3543   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3544        !end; // iterate until bqe points to a CONSTR
3545        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3546     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3547     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3548     /* types of closures that may appear in a blocking queue */
3549     ASSERT(get_itbl(bqe)->type == TSO ||           
3550            get_itbl(bqe)->type == CONSTR); 
3551     /* only BQs of an RBH end with an RBH_Save closure */
3552     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3553
3554     tso_loc = where_is((StgClosure *)bqe);
3555     switch (get_itbl(bqe)->type) {
3556     case TSO:
3557       fprintf(stderr," TSO %d (%p) on [PE %d],",
3558               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3559       break;
3560     case CONSTR:
3561       fprintf(stderr," %s (IP %p),",
3562               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3563                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3564                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3565                "RBH_Save_?"), get_itbl(bqe));
3566       break;
3567     default:
3568       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3569            info_type((StgClosure *)bqe), node, info_type(node));
3570       break;
3571     }
3572   } /* for */
3573   fputc('\n', stderr);
3574 }
3575 #else
3576 /* 
3577    Nice and easy: only TSOs on the blocking queue
3578 */
3579 void 
3580 print_bq (StgClosure *node)
3581 {
3582   StgTSO *tso;
3583
3584   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3585   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3586        tso != END_TSO_QUEUE; 
3587        tso=tso->link) {
3588     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3589     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3590     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3591   }
3592   fputc('\n', stderr);
3593 }
3594 # endif
3595
3596 #if defined(PAR)
3597 static nat
3598 run_queue_len(void)
3599 {
3600   nat i;
3601   StgTSO *tso;
3602
3603   for (i=0, tso=run_queue_hd; 
3604        tso != END_TSO_QUEUE;
3605        i++, tso=tso->link)
3606     /* nothing */
3607
3608   return i;
3609 }
3610 #endif
3611
3612 static void
3613 sched_belch(char *s, ...)
3614 {
3615   va_list ap;
3616   va_start(ap,s);
3617 #ifdef SMP
3618   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3619 #elif defined(PAR)
3620   fprintf(stderr, "== ");
3621 #else
3622   fprintf(stderr, "scheduler: ");
3623 #endif
3624   vfprintf(stderr, s, ap);
3625   fprintf(stderr, "\n");
3626 }
3627
3628 #endif /* DEBUG */
3629
3630
3631 //@node Index,  , Debugging Routines, Main scheduling code
3632 //@subsection Index
3633
3634 //@index
3635 //* StgMainThread::  @cindex\s-+StgMainThread
3636 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3637 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3638 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3639 //* context_switch::  @cindex\s-+context_switch
3640 //* createThread::  @cindex\s-+createThread
3641 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3642 //* initScheduler::  @cindex\s-+initScheduler
3643 //* interrupted::  @cindex\s-+interrupted
3644 //* next_thread_id::  @cindex\s-+next_thread_id
3645 //* print_bq::  @cindex\s-+print_bq
3646 //* run_queue_hd::  @cindex\s-+run_queue_hd
3647 //* run_queue_tl::  @cindex\s-+run_queue_tl
3648 //* sched_mutex::  @cindex\s-+sched_mutex
3649 //* schedule::  @cindex\s-+schedule
3650 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3651 //* term_mutex::  @cindex\s-+term_mutex
3652 //@end index