[project @ 2002-07-17 09:21:48 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.148 2002/07/17 09:21:50 simonmar Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #include "ThreadLabels.h"
100 #ifdef PROFILING
101 #include "Proftimer.h"
102 #include "ProfHeap.h"
103 #endif
104 #if defined(GRAN) || defined(PAR)
105 # include "GranSimRts.h"
106 # include "GranSim.h"
107 # include "ParallelRts.h"
108 # include "Parallel.h"
109 # include "ParallelDebug.h"
110 # include "FetchMe.h"
111 # include "HLC.h"
112 #endif
113 #include "Sparks.h"
114 #include "Capability.h"
115 #include "OSThreads.h"
116 #include  "Task.h"
117
118 #ifdef HAVE_SYS_TYPES_H
119 #include <sys/types.h>
120 #endif
121 #ifdef HAVE_UNISTD_H
122 #include <unistd.h>
123 #endif
124
125 #include <string.h>
126 #include <stdlib.h>
127 #include <stdarg.h>
128
129 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
130 //@subsection Variables and Data structures
131
132 /* Main thread queue.
133  * Locks required: sched_mutex.
134  */
135 StgMainThread *main_threads;
136
137 /* Thread queues.
138  * Locks required: sched_mutex.
139  */
140 #if defined(GRAN)
141
142 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
143 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
144
145 /* 
146    In GranSim we have a runnable and a blocked queue for each processor.
147    In order to minimise code changes new arrays run_queue_hds/tls
148    are created. run_queue_hd is then a short cut (macro) for
149    run_queue_hds[CurrentProc] (see GranSim.h).
150    -- HWL
151 */
152 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
153 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
154 StgTSO *ccalling_threadss[MAX_PROC];
155 /* We use the same global list of threads (all_threads) in GranSim as in
156    the std RTS (i.e. we are cheating). However, we don't use this list in
157    the GranSim specific code at the moment (so we are only potentially
158    cheating).  */
159
160 #else /* !GRAN */
161
162 StgTSO *run_queue_hd, *run_queue_tl;
163 StgTSO *blocked_queue_hd, *blocked_queue_tl;
164 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
165
166 #endif
167
168 /* Linked list of all threads.
169  * Used for detecting garbage collected threads.
170  */
171 StgTSO *all_threads;
172
173 /* When a thread performs a safe C call (_ccall_GC, using old
174  * terminology), it gets put on the suspended_ccalling_threads
175  * list. Used by the garbage collector.
176  */
177 static StgTSO *suspended_ccalling_threads;
178
179 static StgTSO *threadStackOverflow(StgTSO *tso);
180
181 /* KH: The following two flags are shared memory locations.  There is no need
182        to lock them, since they are only unset at the end of a scheduler
183        operation.
184 */
185
186 /* flag set by signal handler to precipitate a context switch */
187 //@cindex context_switch
188 nat context_switch;
189
190 /* if this flag is set as well, give up execution */
191 //@cindex interrupted
192 rtsBool interrupted;
193
194 /* Next thread ID to allocate.
195  * Locks required: thread_id_mutex
196  */
197 //@cindex next_thread_id
198 StgThreadID next_thread_id = 1;
199
200 /*
201  * Pointers to the state of the current thread.
202  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
203  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
204  */
205  
206 /* The smallest stack size that makes any sense is:
207  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
208  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
209  *  + 1                       (the realworld token for an IO thread)
210  *  + 1                       (the closure to enter)
211  *
212  * A thread with this stack will bomb immediately with a stack
213  * overflow, which will increase its stack size.  
214  */
215
216 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
217
218
219 #if defined(GRAN)
220 StgTSO *CurrentTSO;
221 #endif
222
223 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
224  *  exists - earlier gccs apparently didn't.
225  *  -= chak
226  */
227 StgTSO dummy_tso;
228
229 rtsBool ready_to_gc;
230
231 /*
232  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
233  * in an MT setting, needed to signal that a worker thread shouldn't hang around
234  * in the scheduler when it is out of work.
235  */
236 static rtsBool shutting_down_scheduler = rtsFalse;
237
238 void            addToBlockedQueue ( StgTSO *tso );
239
240 static void     schedule          ( void );
241        void     interruptStgRts   ( void );
242
243 static void     detectBlackHoles  ( void );
244
245 #ifdef DEBUG
246 static void sched_belch(char *s, ...);
247 #endif
248
249 #if defined(RTS_SUPPORTS_THREADS)
250 /* ToDo: carefully document the invariants that go together
251  *       with these synchronisation objects.
252  */
253 Mutex     sched_mutex       = INIT_MUTEX_VAR;
254 Mutex     term_mutex        = INIT_MUTEX_VAR;
255
256 /*
257  * A heavyweight solution to the problem of protecting
258  * the thread_id from concurrent update.
259  */
260 Mutex     thread_id_mutex   = INIT_MUTEX_VAR;
261
262
263 # if defined(SMP)
264 static Condition gc_pending_cond = INIT_COND_VAR;
265 nat await_death;
266 # endif
267
268 #endif /* RTS_SUPPORTS_THREADS */
269
270 #if defined(PAR)
271 StgTSO *LastTSO;
272 rtsTime TimeOfLastYield;
273 rtsBool emitSchedule = rtsTrue;
274 #endif
275
276 #if DEBUG
277 char *whatNext_strs[] = {
278   "ThreadEnterGHC",
279   "ThreadRunGHC",
280   "ThreadEnterInterp",
281   "ThreadKilled",
282   "ThreadComplete"
283 };
284
285 char *threadReturnCode_strs[] = {
286   "HeapOverflow",                       /* might also be StackOverflow */
287   "StackOverflow",
288   "ThreadYielding",
289   "ThreadBlocked",
290   "ThreadFinished"
291 };
292 #endif
293
294 #if defined(PAR)
295 StgTSO * createSparkThread(rtsSpark spark);
296 StgTSO * activateSpark (rtsSpark spark);  
297 #endif
298
299 /*
300  * The thread state for the main thread.
301 // ToDo: check whether not needed any more
302 StgTSO   *MainTSO;
303  */
304
305 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
306 static void taskStart(void);
307 static void
308 taskStart(void)
309 {
310   schedule();
311 }
312 #endif
313
314
315
316
317 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
318 //@subsection Main scheduling loop
319
320 /* ---------------------------------------------------------------------------
321    Main scheduling loop.
322
323    We use round-robin scheduling, each thread returning to the
324    scheduler loop when one of these conditions is detected:
325
326       * out of heap space
327       * timer expires (thread yields)
328       * thread blocks
329       * thread ends
330       * stack overflow
331
332    Locking notes:  we acquire the scheduler lock once at the beginning
333    of the scheduler loop, and release it when
334     
335       * running a thread, or
336       * waiting for work, or
337       * waiting for a GC to complete.
338
339    GRAN version:
340      In a GranSim setup this loop iterates over the global event queue.
341      This revolves around the global event queue, which determines what 
342      to do next. Therefore, it's more complicated than either the 
343      concurrent or the parallel (GUM) setup.
344
345    GUM version:
346      GUM iterates over incoming messages.
347      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
348      and sends out a fish whenever it has nothing to do; in-between
349      doing the actual reductions (shared code below) it processes the
350      incoming messages and deals with delayed operations 
351      (see PendingFetches).
352      This is not the ugliest code you could imagine, but it's bloody close.
353
354    ------------------------------------------------------------------------ */
355 //@cindex schedule
356 static void
357 schedule( void )
358 {
359   StgTSO *t;
360   Capability *cap;
361   StgThreadReturnCode ret;
362 #if defined(GRAN)
363   rtsEvent *event;
364 #elif defined(PAR)
365   StgSparkPool *pool;
366   rtsSpark spark;
367   StgTSO *tso;
368   GlobalTaskId pe;
369   rtsBool receivedFinish = rtsFalse;
370 # if defined(DEBUG)
371   nat tp_size, sp_size; // stats only
372 # endif
373 #endif
374   rtsBool was_interrupted = rtsFalse;
375   
376   ACQUIRE_LOCK(&sched_mutex);
377  
378 #if defined(RTS_SUPPORTS_THREADS)
379   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
380 #else
381   /* simply initialise it in the non-threaded case */
382   grabCapability(&cap);
383 #endif
384
385 #if defined(GRAN)
386   /* set up first event to get things going */
387   /* ToDo: assign costs for system setup and init MainTSO ! */
388   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
389             ContinueThread, 
390             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
391
392   IF_DEBUG(gran,
393            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
394            G_TSO(CurrentTSO, 5));
395
396   if (RtsFlags.GranFlags.Light) {
397     /* Save current time; GranSim Light only */
398     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
399   }      
400
401   event = get_next_event();
402
403   while (event!=(rtsEvent*)NULL) {
404     /* Choose the processor with the next event */
405     CurrentProc = event->proc;
406     CurrentTSO = event->tso;
407
408 #elif defined(PAR)
409
410   while (!receivedFinish) {    /* set by processMessages */
411                                /* when receiving PP_FINISH message         */ 
412 #else
413
414   while (1) {
415
416 #endif
417
418     IF_DEBUG(scheduler, printAllThreads());
419
420 #if defined(RTS_SUPPORTS_THREADS)
421     /* Check to see whether there are any worker threads
422        waiting to deposit external call results. If so,
423        yield our capability */
424     yieldToReturningWorker(&sched_mutex, &cap);
425 #endif
426
427     /* If we're interrupted (the user pressed ^C, or some other
428      * termination condition occurred), kill all the currently running
429      * threads.
430      */
431     if (interrupted) {
432       IF_DEBUG(scheduler, sched_belch("interrupted"));
433       deleteAllThreads();
434       interrupted = rtsFalse;
435       was_interrupted = rtsTrue;
436     }
437
438     /* Go through the list of main threads and wake up any
439      * clients whose computations have finished.  ToDo: this
440      * should be done more efficiently without a linear scan
441      * of the main threads list, somehow...
442      */
443 #if defined(RTS_SUPPORTS_THREADS)
444     { 
445       StgMainThread *m, **prev;
446       prev = &main_threads;
447       for (m = main_threads; m != NULL; m = m->link) {
448         switch (m->tso->what_next) {
449         case ThreadComplete:
450           if (m->ret) {
451             *(m->ret) = (StgClosure *)m->tso->sp[0];
452           }
453           *prev = m->link;
454           m->stat = Success;
455           broadcastCondition(&m->wakeup);
456 #ifdef DEBUG
457           removeThreadLabel(m->tso);
458 #endif
459           break;
460         case ThreadKilled:
461           if (m->ret) *(m->ret) = NULL;
462           *prev = m->link;
463           if (was_interrupted) {
464             m->stat = Interrupted;
465           } else {
466             m->stat = Killed;
467           }
468           broadcastCondition(&m->wakeup);
469 #ifdef DEBUG
470           removeThreadLabel(m->tso);
471 #endif
472           break;
473         default:
474           break;
475         }
476       }
477     }
478
479 #else /* not threaded */
480
481 # if defined(PAR)
482     /* in GUM do this only on the Main PE */
483     if (IAmMainThread)
484 # endif
485     /* If our main thread has finished or been killed, return.
486      */
487     {
488       StgMainThread *m = main_threads;
489       if (m->tso->what_next == ThreadComplete
490           || m->tso->what_next == ThreadKilled) {
491 #ifdef DEBUG
492         removeThreadLabel((StgWord)m->tso);
493 #endif
494         main_threads = main_threads->link;
495         if (m->tso->what_next == ThreadComplete) {
496           /* we finished successfully, fill in the return value */
497           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
498           m->stat = Success;
499           return;
500         } else {
501           if (m->ret) { *(m->ret) = NULL; };
502           if (was_interrupted) {
503             m->stat = Interrupted;
504           } else {
505             m->stat = Killed;
506           }
507           return;
508         }
509       }
510     }
511 #endif
512
513     /* Top up the run queue from our spark pool.  We try to make the
514      * number of threads in the run queue equal to the number of
515      * free capabilities.
516      *
517      * Disable spark support in SMP for now, non-essential & requires
518      * a little bit of work to make it compile cleanly. -- sof 1/02.
519      */
520 #if 0 /* defined(SMP) */
521     {
522       nat n = getFreeCapabilities();
523       StgTSO *tso = run_queue_hd;
524
525       /* Count the run queue */
526       while (n > 0 && tso != END_TSO_QUEUE) {
527         tso = tso->link;
528         n--;
529       }
530
531       for (; n > 0; n--) {
532         StgClosure *spark;
533         spark = findSpark(rtsFalse);
534         if (spark == NULL) {
535           break; /* no more sparks in the pool */
536         } else {
537           /* I'd prefer this to be done in activateSpark -- HWL */
538           /* tricky - it needs to hold the scheduler lock and
539            * not try to re-acquire it -- SDM */
540           createSparkThread(spark);       
541           IF_DEBUG(scheduler,
542                    sched_belch("==^^ turning spark of closure %p into a thread",
543                                (StgClosure *)spark));
544         }
545       }
546       /* We need to wake up the other tasks if we just created some
547        * work for them.
548        */
549       if (getFreeCapabilities() - n > 1) {
550           signalCondition( &thread_ready_cond );
551       }
552     }
553 #endif // SMP
554
555     /* check for signals each time around the scheduler */
556 #ifndef mingw32_TARGET_OS
557     if (signals_pending()) {
558       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
559       startSignalHandlers();
560       ACQUIRE_LOCK(&sched_mutex);
561     }
562 #endif
563
564     /* Check whether any waiting threads need to be woken up.  If the
565      * run queue is empty, and there are no other tasks running, we
566      * can wait indefinitely for something to happen.
567      * ToDo: what if another client comes along & requests another
568      * main thread?
569      */
570     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
571       awaitEvent( EMPTY_RUN_QUEUE()
572 #if defined(SMP)
573         && allFreeCapabilities()
574 #endif
575         );
576     }
577     /* we can be interrupted while waiting for I/O... */
578     if (interrupted) continue;
579
580     /* 
581      * Detect deadlock: when we have no threads to run, there are no
582      * threads waiting on I/O or sleeping, and all the other tasks are
583      * waiting for work, we must have a deadlock of some description.
584      *
585      * We first try to find threads blocked on themselves (ie. black
586      * holes), and generate NonTermination exceptions where necessary.
587      *
588      * If no threads are black holed, we have a deadlock situation, so
589      * inform all the main threads.
590      */
591 #ifndef PAR
592     if (   EMPTY_THREAD_QUEUES()
593 #if defined(RTS_SUPPORTS_THREADS)
594         && EMPTY_QUEUE(suspended_ccalling_threads)
595 #endif
596 #ifdef SMP
597         && allFreeCapabilities()
598 #endif
599         )
600     {
601         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
602 #if defined(THREADED_RTS)
603         /* and SMP mode ..? */
604         releaseCapability(cap);
605 #endif
606         // Garbage collection can release some new threads due to
607         // either (a) finalizers or (b) threads resurrected because
608         // they are about to be send BlockedOnDeadMVar.  Any threads
609         // thus released will be immediately runnable.
610         GarbageCollect(GetRoots,rtsTrue);
611
612         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
613
614         IF_DEBUG(scheduler, 
615                  sched_belch("still deadlocked, checking for black holes..."));
616         detectBlackHoles();
617
618         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
619
620 #ifndef mingw32_TARGET_OS
621         /* If we have user-installed signal handlers, then wait
622          * for signals to arrive rather then bombing out with a
623          * deadlock.
624          */
625 #if defined(RTS_SUPPORTS_THREADS)
626         if ( 0 ) { /* hmm..what to do? Simply stop waiting for
627                       a signal with no runnable threads (or I/O
628                       suspended ones) leads nowhere quick.
629                       For now, simply shut down when we reach this
630                       condition.
631                       
632                       ToDo: define precisely under what conditions
633                       the Scheduler should shut down in an MT setting.
634                    */
635 #else
636         if ( anyUserHandlers() ) {
637 #endif
638             IF_DEBUG(scheduler, 
639                      sched_belch("still deadlocked, waiting for signals..."));
640
641             awaitUserSignals();
642
643             // we might be interrupted...
644             if (interrupted) { continue; }
645
646             if (signals_pending()) {
647                 RELEASE_LOCK(&sched_mutex);
648                 startSignalHandlers();
649                 ACQUIRE_LOCK(&sched_mutex);
650             }
651             ASSERT(!EMPTY_RUN_QUEUE());
652             goto not_deadlocked;
653         }
654 #endif
655
656         /* Probably a real deadlock.  Send the current main thread the
657          * Deadlock exception (or in the SMP build, send *all* main
658          * threads the deadlock exception, since none of them can make
659          * progress).
660          */
661         {
662             StgMainThread *m;
663 #if defined(RTS_SUPPORTS_THREADS)
664             for (m = main_threads; m != NULL; m = m->link) {
665                 switch (m->tso->why_blocked) {
666                 case BlockedOnBlackHole:
667                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
668                     break;
669                 case BlockedOnException:
670                 case BlockedOnMVar:
671                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
672                     break;
673                 default:
674                     barf("deadlock: main thread blocked in a strange way");
675                 }
676             }
677 #else
678             m = main_threads;
679             switch (m->tso->why_blocked) {
680             case BlockedOnBlackHole:
681                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
682                 break;
683             case BlockedOnException:
684             case BlockedOnMVar:
685                 raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
686                 break;
687             default:
688                 barf("deadlock: main thread blocked in a strange way");
689             }
690 #endif
691         }
692
693 #if defined(RTS_SUPPORTS_THREADS)
694         /* ToDo: revisit conditions (and mechanism) for shutting
695            down a multi-threaded world  */
696         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
697         RELEASE_LOCK(&sched_mutex);
698         shutdownHaskell();
699         return;
700 #endif
701     }
702   not_deadlocked:
703
704 #elif defined(PAR)
705     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
706 #endif
707
708 #if defined(SMP)
709     /* If there's a GC pending, don't do anything until it has
710      * completed.
711      */
712     if (ready_to_gc) {
713       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
714       waitCondition( &gc_pending_cond, &sched_mutex );
715     }
716 #endif    
717
718 #if defined(RTS_SUPPORTS_THREADS)
719     /* block until we've got a thread on the run queue and a free
720      * capability.
721      *
722      */
723     if ( EMPTY_RUN_QUEUE() ) {
724       /* Give up our capability */
725       releaseCapability(cap);
726
727       /* If we're in the process of shutting down (& running the
728        * a batch of finalisers), don't wait around.
729        */
730       if ( shutting_down_scheduler ) {
731         RELEASE_LOCK(&sched_mutex);
732         return;
733       }
734       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
735       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
736       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
737     }
738 #endif
739
740 #if defined(GRAN)
741     if (RtsFlags.GranFlags.Light)
742       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
743
744     /* adjust time based on time-stamp */
745     if (event->time > CurrentTime[CurrentProc] &&
746         event->evttype != ContinueThread)
747       CurrentTime[CurrentProc] = event->time;
748     
749     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
750     if (!RtsFlags.GranFlags.Light)
751       handleIdlePEs();
752
753     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
754
755     /* main event dispatcher in GranSim */
756     switch (event->evttype) {
757       /* Should just be continuing execution */
758     case ContinueThread:
759       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
760       /* ToDo: check assertion
761       ASSERT(run_queue_hd != (StgTSO*)NULL &&
762              run_queue_hd != END_TSO_QUEUE);
763       */
764       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
765       if (!RtsFlags.GranFlags.DoAsyncFetch &&
766           procStatus[CurrentProc]==Fetching) {
767         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
768               CurrentTSO->id, CurrentTSO, CurrentProc);
769         goto next_thread;
770       } 
771       /* Ignore ContinueThreads for completed threads */
772       if (CurrentTSO->what_next == ThreadComplete) {
773         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
774               CurrentTSO->id, CurrentTSO, CurrentProc);
775         goto next_thread;
776       } 
777       /* Ignore ContinueThreads for threads that are being migrated */
778       if (PROCS(CurrentTSO)==Nowhere) { 
779         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
780               CurrentTSO->id, CurrentTSO, CurrentProc);
781         goto next_thread;
782       }
783       /* The thread should be at the beginning of the run queue */
784       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
785         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
786               CurrentTSO->id, CurrentTSO, CurrentProc);
787         break; // run the thread anyway
788       }
789       /*
790       new_event(proc, proc, CurrentTime[proc],
791                 FindWork,
792                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
793       goto next_thread; 
794       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
795       break; // now actually run the thread; DaH Qu'vam yImuHbej 
796
797     case FetchNode:
798       do_the_fetchnode(event);
799       goto next_thread;             /* handle next event in event queue  */
800       
801     case GlobalBlock:
802       do_the_globalblock(event);
803       goto next_thread;             /* handle next event in event queue  */
804       
805     case FetchReply:
806       do_the_fetchreply(event);
807       goto next_thread;             /* handle next event in event queue  */
808       
809     case UnblockThread:   /* Move from the blocked queue to the tail of */
810       do_the_unblock(event);
811       goto next_thread;             /* handle next event in event queue  */
812       
813     case ResumeThread:  /* Move from the blocked queue to the tail of */
814       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
815       event->tso->gran.blocktime += 
816         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
817       do_the_startthread(event);
818       goto next_thread;             /* handle next event in event queue  */
819       
820     case StartThread:
821       do_the_startthread(event);
822       goto next_thread;             /* handle next event in event queue  */
823       
824     case MoveThread:
825       do_the_movethread(event);
826       goto next_thread;             /* handle next event in event queue  */
827       
828     case MoveSpark:
829       do_the_movespark(event);
830       goto next_thread;             /* handle next event in event queue  */
831       
832     case FindWork:
833       do_the_findwork(event);
834       goto next_thread;             /* handle next event in event queue  */
835       
836     default:
837       barf("Illegal event type %u\n", event->evttype);
838     }  /* switch */
839     
840     /* This point was scheduler_loop in the old RTS */
841
842     IF_DEBUG(gran, belch("GRAN: after main switch"));
843
844     TimeOfLastEvent = CurrentTime[CurrentProc];
845     TimeOfNextEvent = get_time_of_next_event();
846     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
847     // CurrentTSO = ThreadQueueHd;
848
849     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
850                          TimeOfNextEvent));
851
852     if (RtsFlags.GranFlags.Light) 
853       GranSimLight_leave_system(event, &ActiveTSO); 
854
855     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
856
857     IF_DEBUG(gran, 
858              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
859
860     /* in a GranSim setup the TSO stays on the run queue */
861     t = CurrentTSO;
862     /* Take a thread from the run queue. */
863     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
864
865     IF_DEBUG(gran, 
866              fprintf(stderr, "GRAN: About to run current thread, which is\n");
867              G_TSO(t,5));
868
869     context_switch = 0; // turned on via GranYield, checking events and time slice
870
871     IF_DEBUG(gran, 
872              DumpGranEvent(GR_SCHEDULE, t));
873
874     procStatus[CurrentProc] = Busy;
875
876 #elif defined(PAR)
877     if (PendingFetches != END_BF_QUEUE) {
878         processFetches();
879     }
880
881     /* ToDo: phps merge with spark activation above */
882     /* check whether we have local work and send requests if we have none */
883     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
884       /* :-[  no local threads => look out for local sparks */
885       /* the spark pool for the current PE */
886       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
887       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
888           pool->hd < pool->tl) {
889         /* 
890          * ToDo: add GC code check that we really have enough heap afterwards!!
891          * Old comment:
892          * If we're here (no runnable threads) and we have pending
893          * sparks, we must have a space problem.  Get enough space
894          * to turn one of those pending sparks into a
895          * thread... 
896          */
897
898         spark = findSpark(rtsFalse);                /* get a spark */
899         if (spark != (rtsSpark) NULL) {
900           tso = activateSpark(spark);       /* turn the spark into a thread */
901           IF_PAR_DEBUG(schedule,
902                        belch("==== schedule: Created TSO %d (%p); %d threads active",
903                              tso->id, tso, advisory_thread_count));
904
905           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
906             belch("==^^ failed to activate spark");
907             goto next_thread;
908           }               /* otherwise fall through & pick-up new tso */
909         } else {
910           IF_PAR_DEBUG(verbose,
911                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
912                              spark_queue_len(pool)));
913           goto next_thread;
914         }
915       }
916
917       /* If we still have no work we need to send a FISH to get a spark
918          from another PE 
919       */
920       if (EMPTY_RUN_QUEUE()) {
921       /* =8-[  no local sparks => look for work on other PEs */
922         /*
923          * We really have absolutely no work.  Send out a fish
924          * (there may be some out there already), and wait for
925          * something to arrive.  We clearly can't run any threads
926          * until a SCHEDULE or RESUME arrives, and so that's what
927          * we're hoping to see.  (Of course, we still have to
928          * respond to other types of messages.)
929          */
930         TIME now = msTime() /*CURRENT_TIME*/;
931         IF_PAR_DEBUG(verbose, 
932                      belch("--  now=%ld", now));
933         IF_PAR_DEBUG(verbose,
934                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
935                          (last_fish_arrived_at!=0 &&
936                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
937                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
938                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
939                              last_fish_arrived_at,
940                              RtsFlags.ParFlags.fishDelay, now);
941                      });
942         
943         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
944             (last_fish_arrived_at==0 ||
945              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
946           /* outstandingFishes is set in sendFish, processFish;
947              avoid flooding system with fishes via delay */
948           pe = choosePE();
949           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
950                    NEW_FISH_HUNGER);
951
952           // Global statistics: count no. of fishes
953           if (RtsFlags.ParFlags.ParStats.Global &&
954               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
955             globalParStats.tot_fish_mess++;
956           }
957         }
958       
959         receivedFinish = processMessages();
960         goto next_thread;
961       }
962     } else if (PacketsWaiting()) {  /* Look for incoming messages */
963       receivedFinish = processMessages();
964     }
965
966     /* Now we are sure that we have some work available */
967     ASSERT(run_queue_hd != END_TSO_QUEUE);
968
969     /* Take a thread from the run queue, if we have work */
970     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
971     IF_DEBUG(sanity,checkTSO(t));
972
973     /* ToDo: write something to the log-file
974     if (RTSflags.ParFlags.granSimStats && !sameThread)
975         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
976
977     CurrentTSO = t;
978     */
979     /* the spark pool for the current PE */
980     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
981
982     IF_DEBUG(scheduler, 
983              belch("--=^ %d threads, %d sparks on [%#x]", 
984                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
985
986 # if 1
987     if (0 && RtsFlags.ParFlags.ParStats.Full && 
988         t && LastTSO && t->id != LastTSO->id && 
989         LastTSO->why_blocked == NotBlocked && 
990         LastTSO->what_next != ThreadComplete) {
991       // if previously scheduled TSO not blocked we have to record the context switch
992       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
993                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
994     }
995
996     if (RtsFlags.ParFlags.ParStats.Full && 
997         (emitSchedule /* forced emit */ ||
998         (t && LastTSO && t->id != LastTSO->id))) {
999       /* 
1000          we are running a different TSO, so write a schedule event to log file
1001          NB: If we use fair scheduling we also have to write  a deschedule 
1002              event for LastTSO; with unfair scheduling we know that the
1003              previous tso has blocked whenever we switch to another tso, so
1004              we don't need it in GUM for now
1005       */
1006       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1007                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1008       emitSchedule = rtsFalse;
1009     }
1010      
1011 # endif
1012 #else /* !GRAN && !PAR */
1013   
1014     /* grab a thread from the run queue */
1015     ASSERT(run_queue_hd != END_TSO_QUEUE);
1016     t = POP_RUN_QUEUE();
1017     // Sanity check the thread we're about to run.  This can be
1018     // expensive if there is lots of thread switching going on...
1019     IF_DEBUG(sanity,checkTSO(t));
1020 #endif
1021     
1022     cap->r.rCurrentTSO = t;
1023     
1024     /* context switches are now initiated by the timer signal, unless
1025      * the user specified "context switch as often as possible", with
1026      * +RTS -C0
1027      */
1028     if (
1029 #ifdef PROFILING
1030         RtsFlags.ProfFlags.profileInterval == 0 ||
1031 #endif
1032         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1033          && (run_queue_hd != END_TSO_QUEUE
1034              || blocked_queue_hd != END_TSO_QUEUE
1035              || sleeping_queue != END_TSO_QUEUE)))
1036         context_switch = 1;
1037     else
1038         context_switch = 0;
1039
1040     RELEASE_LOCK(&sched_mutex);
1041
1042     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1043                               t->id, t, whatNext_strs[t->what_next]));
1044
1045 #ifdef PROFILING
1046     startHeapProfTimer();
1047 #endif
1048
1049     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1050     /* Run the current thread 
1051      */
1052     switch (cap->r.rCurrentTSO->what_next) {
1053     case ThreadKilled:
1054     case ThreadComplete:
1055         /* Thread already finished, return to scheduler. */
1056         ret = ThreadFinished;
1057         break;
1058     case ThreadEnterGHC:
1059         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1060         break;
1061     case ThreadRunGHC:
1062         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1063         break;
1064     case ThreadEnterInterp:
1065         ret = interpretBCO(cap);
1066         break;
1067     default:
1068       barf("schedule: invalid what_next field");
1069     }
1070     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1071     
1072     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1073 #ifdef PROFILING
1074     stopHeapProfTimer();
1075     CCCS = CCS_SYSTEM;
1076 #endif
1077     
1078     ACQUIRE_LOCK(&sched_mutex);
1079
1080 #ifdef SMP
1081     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1082 #elif !defined(GRAN) && !defined(PAR)
1083     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1084 #endif
1085     t = cap->r.rCurrentTSO;
1086     
1087 #if defined(PAR)
1088     /* HACK 675: if the last thread didn't yield, make sure to print a 
1089        SCHEDULE event to the log file when StgRunning the next thread, even
1090        if it is the same one as before */
1091     LastTSO = t; 
1092     TimeOfLastYield = CURRENT_TIME;
1093 #endif
1094
1095     switch (ret) {
1096     case HeapOverflow:
1097 #if defined(GRAN)
1098       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1099       globalGranStats.tot_heapover++;
1100 #elif defined(PAR)
1101       globalParStats.tot_heapover++;
1102 #endif
1103
1104       // did the task ask for a large block?
1105       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1106           // if so, get one and push it on the front of the nursery.
1107           bdescr *bd;
1108           nat blocks;
1109           
1110           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1111
1112           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1113                                    t->id, t,
1114                                    whatNext_strs[t->what_next], blocks));
1115
1116           // don't do this if it would push us over the
1117           // alloc_blocks_lim limit; we'll GC first.
1118           if (alloc_blocks + blocks < alloc_blocks_lim) {
1119
1120               alloc_blocks += blocks;
1121               bd = allocGroup( blocks );
1122
1123               // link the new group into the list
1124               bd->link = cap->r.rCurrentNursery;
1125               bd->u.back = cap->r.rCurrentNursery->u.back;
1126               if (cap->r.rCurrentNursery->u.back != NULL) {
1127                   cap->r.rCurrentNursery->u.back->link = bd;
1128               } else {
1129                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1130                          g0s0->blocks == cap->r.rNursery);
1131                   cap->r.rNursery = g0s0->blocks = bd;
1132               }           
1133               cap->r.rCurrentNursery->u.back = bd;
1134
1135               // initialise it as a nursery block.  We initialise the
1136               // step, gen_no, and flags field of *every* sub-block in
1137               // this large block, because this is easier than making
1138               // sure that we always find the block head of a large
1139               // block whenever we call Bdescr() (eg. evacuate() and
1140               // isAlive() in the GC would both have to do this, at
1141               // least).
1142               { 
1143                   bdescr *x;
1144                   for (x = bd; x < bd + blocks; x++) {
1145                       x->step = g0s0;
1146                       x->gen_no = 0;
1147                       x->flags = 0;
1148                       x->free = x->start;
1149                   }
1150               }
1151
1152               // don't forget to update the block count in g0s0.
1153               g0s0->n_blocks += blocks;
1154               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1155
1156               // now update the nursery to point to the new block
1157               cap->r.rCurrentNursery = bd;
1158
1159               // we might be unlucky and have another thread get on the
1160               // run queue before us and steal the large block, but in that
1161               // case the thread will just end up requesting another large
1162               // block.
1163               PUSH_ON_RUN_QUEUE(t);
1164               break;
1165           }
1166       }
1167
1168       /* make all the running tasks block on a condition variable,
1169        * maybe set context_switch and wait till they all pile in,
1170        * then have them wait on a GC condition variable.
1171        */
1172       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1173                                t->id, t, whatNext_strs[t->what_next]));
1174       threadPaused(t);
1175 #if defined(GRAN)
1176       ASSERT(!is_on_queue(t,CurrentProc));
1177 #elif defined(PAR)
1178       /* Currently we emit a DESCHEDULE event before GC in GUM.
1179          ToDo: either add separate event to distinguish SYSTEM time from rest
1180                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1181       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1182         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1183                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1184         emitSchedule = rtsTrue;
1185       }
1186 #endif
1187       
1188       ready_to_gc = rtsTrue;
1189       context_switch = 1;               /* stop other threads ASAP */
1190       PUSH_ON_RUN_QUEUE(t);
1191       /* actual GC is done at the end of the while loop */
1192       break;
1193       
1194     case StackOverflow:
1195 #if defined(GRAN)
1196       IF_DEBUG(gran, 
1197                DumpGranEvent(GR_DESCHEDULE, t));
1198       globalGranStats.tot_stackover++;
1199 #elif defined(PAR)
1200       // IF_DEBUG(par, 
1201       // DumpGranEvent(GR_DESCHEDULE, t);
1202       globalParStats.tot_stackover++;
1203 #endif
1204       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1205                                t->id, t, whatNext_strs[t->what_next]));
1206       /* just adjust the stack for this thread, then pop it back
1207        * on the run queue.
1208        */
1209       threadPaused(t);
1210       { 
1211         StgMainThread *m;
1212         /* enlarge the stack */
1213         StgTSO *new_t = threadStackOverflow(t);
1214         
1215         /* This TSO has moved, so update any pointers to it from the
1216          * main thread stack.  It better not be on any other queues...
1217          * (it shouldn't be).
1218          */
1219         for (m = main_threads; m != NULL; m = m->link) {
1220           if (m->tso == t) {
1221             m->tso = new_t;
1222           }
1223         }
1224         threadPaused(new_t);
1225         PUSH_ON_RUN_QUEUE(new_t);
1226       }
1227       break;
1228
1229     case ThreadYielding:
1230 #if defined(GRAN)
1231       IF_DEBUG(gran, 
1232                DumpGranEvent(GR_DESCHEDULE, t));
1233       globalGranStats.tot_yields++;
1234 #elif defined(PAR)
1235       // IF_DEBUG(par, 
1236       // DumpGranEvent(GR_DESCHEDULE, t);
1237       globalParStats.tot_yields++;
1238 #endif
1239       /* put the thread back on the run queue.  Then, if we're ready to
1240        * GC, check whether this is the last task to stop.  If so, wake
1241        * up the GC thread.  getThread will block during a GC until the
1242        * GC is finished.
1243        */
1244       IF_DEBUG(scheduler,
1245                if (t->what_next == ThreadEnterInterp) {
1246                    /* ToDo: or maybe a timer expired when we were in Hugs?
1247                     * or maybe someone hit ctrl-C
1248                     */
1249                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1250                          t->id, t, whatNext_strs[t->what_next]);
1251                } else {
1252                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1253                          t->id, t, whatNext_strs[t->what_next]);
1254                }
1255                );
1256
1257       threadPaused(t);
1258
1259       IF_DEBUG(sanity,
1260                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1261                checkTSO(t));
1262       ASSERT(t->link == END_TSO_QUEUE);
1263 #if defined(GRAN)
1264       ASSERT(!is_on_queue(t,CurrentProc));
1265
1266       IF_DEBUG(sanity,
1267                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1268                checkThreadQsSanity(rtsTrue));
1269 #endif
1270 #if defined(PAR)
1271       if (RtsFlags.ParFlags.doFairScheduling) { 
1272         /* this does round-robin scheduling; good for concurrency */
1273         APPEND_TO_RUN_QUEUE(t);
1274       } else {
1275         /* this does unfair scheduling; good for parallelism */
1276         PUSH_ON_RUN_QUEUE(t);
1277       }
1278 #else
1279       /* this does round-robin scheduling; good for concurrency */
1280       APPEND_TO_RUN_QUEUE(t);
1281 #endif
1282 #if defined(GRAN)
1283       /* add a ContinueThread event to actually process the thread */
1284       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1285                 ContinueThread,
1286                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1287       IF_GRAN_DEBUG(bq, 
1288                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1289                G_EVENTQ(0);
1290                G_CURR_THREADQ(0));
1291 #endif /* GRAN */
1292       break;
1293       
1294     case ThreadBlocked:
1295 #if defined(GRAN)
1296       IF_DEBUG(scheduler,
1297                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1298                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1299                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1300
1301       // ??? needed; should emit block before
1302       IF_DEBUG(gran, 
1303                DumpGranEvent(GR_DESCHEDULE, t)); 
1304       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1305       /*
1306         ngoq Dogh!
1307       ASSERT(procStatus[CurrentProc]==Busy || 
1308               ((procStatus[CurrentProc]==Fetching) && 
1309               (t->block_info.closure!=(StgClosure*)NULL)));
1310       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1311           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1312             procStatus[CurrentProc]==Fetching)) 
1313         procStatus[CurrentProc] = Idle;
1314       */
1315 #elif defined(PAR)
1316       IF_DEBUG(scheduler,
1317                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1318                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1319       IF_PAR_DEBUG(bq,
1320
1321                    if (t->block_info.closure!=(StgClosure*)NULL) 
1322                      print_bq(t->block_info.closure));
1323
1324       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1325       blockThread(t);
1326
1327       /* whatever we schedule next, we must log that schedule */
1328       emitSchedule = rtsTrue;
1329
1330 #else /* !GRAN */
1331       /* don't need to do anything.  Either the thread is blocked on
1332        * I/O, in which case we'll have called addToBlockedQueue
1333        * previously, or it's blocked on an MVar or Blackhole, in which
1334        * case it'll be on the relevant queue already.
1335        */
1336       IF_DEBUG(scheduler,
1337                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1338                printThreadBlockage(t);
1339                fprintf(stderr, "\n"));
1340
1341       /* Only for dumping event to log file 
1342          ToDo: do I need this in GranSim, too?
1343       blockThread(t);
1344       */
1345 #endif
1346       threadPaused(t);
1347       break;
1348       
1349     case ThreadFinished:
1350       /* Need to check whether this was a main thread, and if so, signal
1351        * the task that started it with the return value.  If we have no
1352        * more main threads, we probably need to stop all the tasks until
1353        * we get a new one.
1354        */
1355       /* We also end up here if the thread kills itself with an
1356        * uncaught exception, see Exception.hc.
1357        */
1358       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1359 #if defined(GRAN)
1360       endThread(t, CurrentProc); // clean-up the thread
1361 #elif defined(PAR)
1362       /* For now all are advisory -- HWL */
1363       //if(t->priority==AdvisoryPriority) ??
1364       advisory_thread_count--;
1365       
1366 # ifdef DIST
1367       if(t->dist.priority==RevalPriority)
1368         FinishReval(t);
1369 # endif
1370       
1371       if (RtsFlags.ParFlags.ParStats.Full &&
1372           !RtsFlags.ParFlags.ParStats.Suppressed) 
1373         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1374 #endif
1375       break;
1376       
1377     default:
1378       barf("schedule: invalid thread return code %d", (int)ret);
1379     }
1380
1381 #ifdef PROFILING
1382     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1383         GarbageCollect(GetRoots, rtsTrue);
1384         heapCensus();
1385         performHeapProfile = rtsFalse;
1386         ready_to_gc = rtsFalse; // we already GC'd
1387     }
1388 #endif
1389
1390     if (ready_to_gc 
1391 #ifdef SMP
1392         && allFreeCapabilities() 
1393 #endif
1394         ) {
1395       /* everybody back, start the GC.
1396        * Could do it in this thread, or signal a condition var
1397        * to do it in another thread.  Either way, we need to
1398        * broadcast on gc_pending_cond afterward.
1399        */
1400 #if defined(RTS_SUPPORTS_THREADS)
1401       IF_DEBUG(scheduler,sched_belch("doing GC"));
1402 #endif
1403       GarbageCollect(GetRoots,rtsFalse);
1404       ready_to_gc = rtsFalse;
1405 #ifdef SMP
1406       broadcastCondition(&gc_pending_cond);
1407 #endif
1408 #if defined(GRAN)
1409       /* add a ContinueThread event to continue execution of current thread */
1410       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1411                 ContinueThread,
1412                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1413       IF_GRAN_DEBUG(bq, 
1414                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1415                G_EVENTQ(0);
1416                G_CURR_THREADQ(0));
1417 #endif /* GRAN */
1418     }
1419
1420 #if defined(GRAN)
1421   next_thread:
1422     IF_GRAN_DEBUG(unused,
1423                   print_eventq(EventHd));
1424
1425     event = get_next_event();
1426 #elif defined(PAR)
1427   next_thread:
1428     /* ToDo: wait for next message to arrive rather than busy wait */
1429 #endif /* GRAN */
1430
1431   } /* end of while(1) */
1432
1433   IF_PAR_DEBUG(verbose,
1434                belch("== Leaving schedule() after having received Finish"));
1435 }
1436
1437 /* ---------------------------------------------------------------------------
1438  * Singleton fork(). Do not copy any running threads.
1439  * ------------------------------------------------------------------------- */
1440
1441 StgInt forkProcess(StgTSO* tso) {
1442
1443 #ifndef mingw32_TARGET_OS
1444   pid_t pid;
1445   StgTSO* t,*next;
1446
1447   IF_DEBUG(scheduler,sched_belch("forking!"));
1448
1449   pid = fork();
1450   if (pid) { /* parent */
1451
1452   /* just return the pid */
1453     
1454   } else { /* child */
1455   /* wipe all other threads */
1456   run_queue_hd = tso;
1457   tso->link = END_TSO_QUEUE;
1458
1459   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1460      us is picky about finding the threat still in its queue when
1461      handling the deleteThread() */
1462
1463   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1464     next = t->link;
1465     if (t->id != tso->id) {
1466       deleteThread(t);
1467     }
1468   }
1469   }
1470   return pid;
1471 #else /* mingw32 */
1472   barf("forkProcess#: primop not implemented for mingw32, sorry! (%u)\n", tso->id);
1473   /* pointlessly printing out the TSOs 'id' to avoid CC unused warning. */
1474   return -1;
1475 #endif /* mingw32 */
1476 }
1477
1478 /* ---------------------------------------------------------------------------
1479  * deleteAllThreads():  kill all the live threads.
1480  *
1481  * This is used when we catch a user interrupt (^C), before performing
1482  * any necessary cleanups and running finalizers.
1483  *
1484  * Locks: sched_mutex held.
1485  * ------------------------------------------------------------------------- */
1486    
1487 void deleteAllThreads ( void )
1488 {
1489   StgTSO* t, *next;
1490   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1491   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1492       next = t->global_link;
1493       deleteThread(t);
1494   }      
1495   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1496   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1497   sleeping_queue = END_TSO_QUEUE;
1498 }
1499
1500 /* startThread and  insertThread are now in GranSim.c -- HWL */
1501
1502
1503 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1504 //@subsection Suspend and Resume
1505
1506 /* ---------------------------------------------------------------------------
1507  * Suspending & resuming Haskell threads.
1508  * 
1509  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1510  * its capability before calling the C function.  This allows another
1511  * task to pick up the capability and carry on running Haskell
1512  * threads.  It also means that if the C call blocks, it won't lock
1513  * the whole system.
1514  *
1515  * The Haskell thread making the C call is put to sleep for the
1516  * duration of the call, on the susepended_ccalling_threads queue.  We
1517  * give out a token to the task, which it can use to resume the thread
1518  * on return from the C function.
1519  * ------------------------------------------------------------------------- */
1520    
1521 StgInt
1522 suspendThread( StgRegTable *reg, 
1523                rtsBool concCall
1524 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1525                STG_UNUSED
1526 #endif
1527                )
1528 {
1529   nat tok;
1530   Capability *cap;
1531
1532   /* assume that *reg is a pointer to the StgRegTable part
1533    * of a Capability.
1534    */
1535   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1536
1537   ACQUIRE_LOCK(&sched_mutex);
1538
1539   IF_DEBUG(scheduler,
1540            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1541
1542   threadPaused(cap->r.rCurrentTSO);
1543   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1544   suspended_ccalling_threads = cap->r.rCurrentTSO;
1545
1546 #if defined(RTS_SUPPORTS_THREADS)
1547   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1548 #endif
1549
1550   /* Use the thread ID as the token; it should be unique */
1551   tok = cap->r.rCurrentTSO->id;
1552
1553   /* Hand back capability */
1554   releaseCapability(cap);
1555   
1556 #if defined(RTS_SUPPORTS_THREADS)
1557   /* Preparing to leave the RTS, so ensure there's a native thread/task
1558      waiting to take over.
1559      
1560      ToDo: optimise this and only create a new task if there's a need
1561      for one (i.e., if there's only one Concurrent Haskell thread alive,
1562      there's no need to create a new task).
1563   */
1564   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1565   if (concCall) {
1566     startTask(taskStart);
1567   }
1568 #endif
1569
1570   /* Other threads _might_ be available for execution; signal this */
1571   THREAD_RUNNABLE();
1572   RELEASE_LOCK(&sched_mutex);
1573   return tok; 
1574 }
1575
1576 StgRegTable *
1577 resumeThread( StgInt tok,
1578               rtsBool concCall
1579 #if !defined(RTS_SUPPORTS_THREADS)
1580                STG_UNUSED
1581 #endif
1582               )
1583 {
1584   StgTSO *tso, **prev;
1585   Capability *cap;
1586
1587 #if defined(RTS_SUPPORTS_THREADS)
1588   /* Wait for permission to re-enter the RTS with the result. */
1589   if ( concCall ) {
1590     ACQUIRE_LOCK(&sched_mutex);
1591     grabReturnCapability(&sched_mutex, &cap);
1592   } else {
1593     grabCapability(&cap);
1594   }
1595 #else
1596   grabCapability(&cap);
1597 #endif
1598
1599   /* Remove the thread off of the suspended list */
1600   prev = &suspended_ccalling_threads;
1601   for (tso = suspended_ccalling_threads; 
1602        tso != END_TSO_QUEUE; 
1603        prev = &tso->link, tso = tso->link) {
1604     if (tso->id == (StgThreadID)tok) {
1605       *prev = tso->link;
1606       break;
1607     }
1608   }
1609   if (tso == END_TSO_QUEUE) {
1610     barf("resumeThread: thread not found");
1611   }
1612   tso->link = END_TSO_QUEUE;
1613   /* Reset blocking status */
1614   tso->why_blocked  = NotBlocked;
1615
1616   cap->r.rCurrentTSO = tso;
1617   RELEASE_LOCK(&sched_mutex);
1618   return &cap->r;
1619 }
1620
1621
1622 /* ---------------------------------------------------------------------------
1623  * Static functions
1624  * ------------------------------------------------------------------------ */
1625 static void unblockThread(StgTSO *tso);
1626
1627 /* ---------------------------------------------------------------------------
1628  * Comparing Thread ids.
1629  *
1630  * This is used from STG land in the implementation of the
1631  * instances of Eq/Ord for ThreadIds.
1632  * ------------------------------------------------------------------------ */
1633
1634 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1635
1636   StgThreadID id1 = tso1->id; 
1637   StgThreadID id2 = tso2->id;
1638  
1639   if (id1 < id2) return (-1);
1640   if (id1 > id2) return 1;
1641   return 0;
1642 }
1643
1644 /* ---------------------------------------------------------------------------
1645  * Fetching the ThreadID from an StgTSO.
1646  *
1647  * This is used in the implementation of Show for ThreadIds.
1648  * ------------------------------------------------------------------------ */
1649 int rts_getThreadId(const StgTSO *tso) 
1650 {
1651   return tso->id;
1652 }
1653
1654 #ifdef DEBUG
1655 void labelThread(StgTSO *tso, char *label)
1656 {
1657   int len;
1658   void *buf;
1659
1660   /* Caveat: Once set, you can only set the thread name to "" */
1661   len = strlen(label)+1;
1662   buf = malloc(len);
1663   if (buf == NULL) {
1664     fprintf(stderr,"insufficient memory for labelThread!\n");
1665   } else
1666     strncpy(buf,label,len);
1667   /* Update will free the old memory for us */
1668   updateThreadLabel((StgWord)tso,buf);
1669 }
1670 #endif /* DEBUG */
1671
1672 /* ---------------------------------------------------------------------------
1673    Create a new thread.
1674
1675    The new thread starts with the given stack size.  Before the
1676    scheduler can run, however, this thread needs to have a closure
1677    (and possibly some arguments) pushed on its stack.  See
1678    pushClosure() in Schedule.h.
1679
1680    createGenThread() and createIOThread() (in SchedAPI.h) are
1681    convenient packaged versions of this function.
1682
1683    currently pri (priority) is only used in a GRAN setup -- HWL
1684    ------------------------------------------------------------------------ */
1685 //@cindex createThread
1686 #if defined(GRAN)
1687 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1688 StgTSO *
1689 createThread(nat size, StgInt pri)
1690 #else
1691 StgTSO *
1692 createThread(nat size)
1693 #endif
1694 {
1695
1696     StgTSO *tso;
1697     nat stack_size;
1698
1699     /* First check whether we should create a thread at all */
1700 #if defined(PAR)
1701   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1702   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1703     threadsIgnored++;
1704     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1705           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1706     return END_TSO_QUEUE;
1707   }
1708   threadsCreated++;
1709 #endif
1710
1711 #if defined(GRAN)
1712   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1713 #endif
1714
1715   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1716
1717   /* catch ridiculously small stack sizes */
1718   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1719     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1720   }
1721
1722   stack_size = size - TSO_STRUCT_SIZEW;
1723
1724   tso = (StgTSO *)allocate(size);
1725   TICK_ALLOC_TSO(stack_size, 0);
1726
1727   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1728 #if defined(GRAN)
1729   SET_GRAN_HDR(tso, ThisPE);
1730 #endif
1731   tso->what_next     = ThreadEnterGHC;
1732
1733   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1734    * protect the increment operation on next_thread_id.
1735    * In future, we could use an atomic increment instead.
1736    */
1737   ACQUIRE_LOCK(&thread_id_mutex);
1738   tso->id = next_thread_id++; 
1739   RELEASE_LOCK(&thread_id_mutex);
1740
1741   tso->why_blocked  = NotBlocked;
1742   tso->blocked_exceptions = NULL;
1743
1744   tso->stack_size   = stack_size;
1745   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1746                               - TSO_STRUCT_SIZEW;
1747   tso->sp           = (P_)&(tso->stack) + stack_size;
1748
1749 #ifdef PROFILING
1750   tso->prof.CCCS = CCS_MAIN;
1751 #endif
1752
1753   /* put a stop frame on the stack */
1754   tso->sp -= sizeofW(StgStopFrame);
1755   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1756   tso->su = (StgUpdateFrame*)tso->sp;
1757
1758   // ToDo: check this
1759 #if defined(GRAN)
1760   tso->link = END_TSO_QUEUE;
1761   /* uses more flexible routine in GranSim */
1762   insertThread(tso, CurrentProc);
1763 #else
1764   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1765    * from its creation
1766    */
1767 #endif
1768
1769 #if defined(GRAN) 
1770   if (RtsFlags.GranFlags.GranSimStats.Full) 
1771     DumpGranEvent(GR_START,tso);
1772 #elif defined(PAR)
1773   if (RtsFlags.ParFlags.ParStats.Full) 
1774     DumpGranEvent(GR_STARTQ,tso);
1775   /* HACk to avoid SCHEDULE 
1776      LastTSO = tso; */
1777 #endif
1778
1779   /* Link the new thread on the global thread list.
1780    */
1781   tso->global_link = all_threads;
1782   all_threads = tso;
1783
1784 #if defined(DIST)
1785   tso->dist.priority = MandatoryPriority; //by default that is...
1786 #endif
1787
1788 #if defined(GRAN)
1789   tso->gran.pri = pri;
1790 # if defined(DEBUG)
1791   tso->gran.magic = TSO_MAGIC; // debugging only
1792 # endif
1793   tso->gran.sparkname   = 0;
1794   tso->gran.startedat   = CURRENT_TIME; 
1795   tso->gran.exported    = 0;
1796   tso->gran.basicblocks = 0;
1797   tso->gran.allocs      = 0;
1798   tso->gran.exectime    = 0;
1799   tso->gran.fetchtime   = 0;
1800   tso->gran.fetchcount  = 0;
1801   tso->gran.blocktime   = 0;
1802   tso->gran.blockcount  = 0;
1803   tso->gran.blockedat   = 0;
1804   tso->gran.globalsparks = 0;
1805   tso->gran.localsparks  = 0;
1806   if (RtsFlags.GranFlags.Light)
1807     tso->gran.clock  = Now; /* local clock */
1808   else
1809     tso->gran.clock  = 0;
1810
1811   IF_DEBUG(gran,printTSO(tso));
1812 #elif defined(PAR)
1813 # if defined(DEBUG)
1814   tso->par.magic = TSO_MAGIC; // debugging only
1815 # endif
1816   tso->par.sparkname   = 0;
1817   tso->par.startedat   = CURRENT_TIME; 
1818   tso->par.exported    = 0;
1819   tso->par.basicblocks = 0;
1820   tso->par.allocs      = 0;
1821   tso->par.exectime    = 0;
1822   tso->par.fetchtime   = 0;
1823   tso->par.fetchcount  = 0;
1824   tso->par.blocktime   = 0;
1825   tso->par.blockcount  = 0;
1826   tso->par.blockedat   = 0;
1827   tso->par.globalsparks = 0;
1828   tso->par.localsparks  = 0;
1829 #endif
1830
1831 #if defined(GRAN)
1832   globalGranStats.tot_threads_created++;
1833   globalGranStats.threads_created_on_PE[CurrentProc]++;
1834   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1835   globalGranStats.tot_sq_probes++;
1836 #elif defined(PAR)
1837   // collect parallel global statistics (currently done together with GC stats)
1838   if (RtsFlags.ParFlags.ParStats.Global &&
1839       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1840     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1841     globalParStats.tot_threads_created++;
1842   }
1843 #endif 
1844
1845 #if defined(GRAN)
1846   IF_GRAN_DEBUG(pri,
1847                 belch("==__ schedule: Created TSO %d (%p);",
1848                       CurrentProc, tso, tso->id));
1849 #elif defined(PAR)
1850     IF_PAR_DEBUG(verbose,
1851                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1852                        tso->id, tso, advisory_thread_count));
1853 #else
1854   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1855                                  tso->id, tso->stack_size));
1856 #endif    
1857   return tso;
1858 }
1859
1860 #if defined(PAR)
1861 /* RFP:
1862    all parallel thread creation calls should fall through the following routine.
1863 */
1864 StgTSO *
1865 createSparkThread(rtsSpark spark) 
1866 { StgTSO *tso;
1867   ASSERT(spark != (rtsSpark)NULL);
1868   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1869   { threadsIgnored++;
1870     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1871           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1872     return END_TSO_QUEUE;
1873   }
1874   else
1875   { threadsCreated++;
1876     tso = createThread(RtsFlags.GcFlags.initialStkSize);
1877     if (tso==END_TSO_QUEUE)     
1878       barf("createSparkThread: Cannot create TSO");
1879 #if defined(DIST)
1880     tso->priority = AdvisoryPriority;
1881 #endif
1882     pushClosure(tso,spark);
1883     PUSH_ON_RUN_QUEUE(tso);
1884     advisory_thread_count++;    
1885   }
1886   return tso;
1887 }
1888 #endif
1889
1890 /*
1891   Turn a spark into a thread.
1892   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1893 */
1894 #if defined(PAR)
1895 //@cindex activateSpark
1896 StgTSO *
1897 activateSpark (rtsSpark spark) 
1898 {
1899   StgTSO *tso;
1900
1901   tso = createSparkThread(spark);
1902   if (RtsFlags.ParFlags.ParStats.Full) {   
1903     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1904     IF_PAR_DEBUG(verbose,
1905                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1906                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1907   }
1908   // ToDo: fwd info on local/global spark to thread -- HWL
1909   // tso->gran.exported =  spark->exported;
1910   // tso->gran.locked =   !spark->global;
1911   // tso->gran.sparkname = spark->name;
1912
1913   return tso;
1914 }
1915 #endif
1916
1917 static SchedulerStatus waitThread_(/*out*/StgMainThread* m
1918 #if defined(THREADED_RTS)
1919                                    , rtsBool blockWaiting
1920 #endif
1921                                    );
1922
1923
1924 /* ---------------------------------------------------------------------------
1925  * scheduleThread()
1926  *
1927  * scheduleThread puts a thread on the head of the runnable queue.
1928  * This will usually be done immediately after a thread is created.
1929  * The caller of scheduleThread must create the thread using e.g.
1930  * createThread and push an appropriate closure
1931  * on this thread's stack before the scheduler is invoked.
1932  * ------------------------------------------------------------------------ */
1933
1934 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1935
1936 void
1937 scheduleThread_(StgTSO *tso
1938                , rtsBool createTask
1939 #if !defined(THREADED_RTS)
1940                  STG_UNUSED
1941 #endif
1942               )
1943 {
1944   ACQUIRE_LOCK(&sched_mutex);
1945
1946   /* Put the new thread on the head of the runnable queue.  The caller
1947    * better push an appropriate closure on this thread's stack
1948    * beforehand.  In the SMP case, the thread may start running as
1949    * soon as we release the scheduler lock below.
1950    */
1951   PUSH_ON_RUN_QUEUE(tso);
1952 #if defined(THREADED_RTS)
1953   /* If main() is scheduling a thread, don't bother creating a 
1954    * new task.
1955    */
1956   if ( createTask ) {
1957     startTask(taskStart);
1958   }
1959 #endif
1960   THREAD_RUNNABLE();
1961
1962 #if 0
1963   IF_DEBUG(scheduler,printTSO(tso));
1964 #endif
1965   RELEASE_LOCK(&sched_mutex);
1966 }
1967
1968 void scheduleThread(StgTSO* tso)
1969 {
1970   scheduleThread_(tso, rtsFalse);
1971 }
1972
1973 SchedulerStatus
1974 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret)
1975 {
1976   StgMainThread *m;
1977
1978   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
1979   m->tso = tso;
1980   m->ret = ret;
1981   m->stat = NoStatus;
1982 #if defined(RTS_SUPPORTS_THREADS)
1983   initCondition(&m->wakeup);
1984 #endif
1985
1986   /* Put the thread on the main-threads list prior to scheduling the TSO.
1987      Failure to do so introduces a race condition in the MT case (as
1988      identified by Wolfgang Thaller), whereby the new task/OS thread 
1989      created by scheduleThread_() would complete prior to the thread
1990      that spawned it managed to put 'itself' on the main-threads list.
1991      The upshot of it all being that the worker thread wouldn't get to
1992      signal the completion of the its work item for the main thread to
1993      see (==> it got stuck waiting.)    -- sof 6/02.
1994   */
1995   ACQUIRE_LOCK(&sched_mutex);
1996   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
1997   
1998   m->link = main_threads;
1999   main_threads = m;
2000
2001   /* Inefficient (scheduleThread_() acquires it again right away),
2002    * but obviously correct.
2003    */
2004   RELEASE_LOCK(&sched_mutex);
2005
2006   scheduleThread_(tso, rtsTrue);
2007 #if defined(THREADED_RTS)
2008   return waitThread_(m, rtsTrue);
2009 #else
2010   return waitThread_(m);
2011 #endif
2012 }
2013
2014 /* ---------------------------------------------------------------------------
2015  * initScheduler()
2016  *
2017  * Initialise the scheduler.  This resets all the queues - if the
2018  * queues contained any threads, they'll be garbage collected at the
2019  * next pass.
2020  *
2021  * ------------------------------------------------------------------------ */
2022
2023 #ifdef SMP
2024 static void
2025 term_handler(int sig STG_UNUSED)
2026 {
2027   stat_workerStop();
2028   ACQUIRE_LOCK(&term_mutex);
2029   await_death--;
2030   RELEASE_LOCK(&term_mutex);
2031   shutdownThread();
2032 }
2033 #endif
2034
2035 void 
2036 initScheduler(void)
2037 {
2038 #if defined(GRAN)
2039   nat i;
2040
2041   for (i=0; i<=MAX_PROC; i++) {
2042     run_queue_hds[i]      = END_TSO_QUEUE;
2043     run_queue_tls[i]      = END_TSO_QUEUE;
2044     blocked_queue_hds[i]  = END_TSO_QUEUE;
2045     blocked_queue_tls[i]  = END_TSO_QUEUE;
2046     ccalling_threadss[i]  = END_TSO_QUEUE;
2047     sleeping_queue        = END_TSO_QUEUE;
2048   }
2049 #else
2050   run_queue_hd      = END_TSO_QUEUE;
2051   run_queue_tl      = END_TSO_QUEUE;
2052   blocked_queue_hd  = END_TSO_QUEUE;
2053   blocked_queue_tl  = END_TSO_QUEUE;
2054   sleeping_queue    = END_TSO_QUEUE;
2055 #endif 
2056
2057   suspended_ccalling_threads  = END_TSO_QUEUE;
2058
2059   main_threads = NULL;
2060   all_threads  = END_TSO_QUEUE;
2061
2062   context_switch = 0;
2063   interrupted    = 0;
2064
2065   RtsFlags.ConcFlags.ctxtSwitchTicks =
2066       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2067       
2068 #if defined(RTS_SUPPORTS_THREADS)
2069   /* Initialise the mutex and condition variables used by
2070    * the scheduler. */
2071   initMutex(&sched_mutex);
2072   initMutex(&term_mutex);
2073   initMutex(&thread_id_mutex);
2074
2075   initCondition(&thread_ready_cond);
2076 #endif
2077   
2078 #if defined(SMP)
2079   initCondition(&gc_pending_cond);
2080 #endif
2081
2082 #if defined(RTS_SUPPORTS_THREADS)
2083   ACQUIRE_LOCK(&sched_mutex);
2084 #endif
2085
2086   /* Install the SIGHUP handler */
2087 #if defined(SMP)
2088   {
2089     struct sigaction action,oact;
2090
2091     action.sa_handler = term_handler;
2092     sigemptyset(&action.sa_mask);
2093     action.sa_flags = 0;
2094     if (sigaction(SIGTERM, &action, &oact) != 0) {
2095       barf("can't install TERM handler");
2096     }
2097   }
2098 #endif
2099
2100   /* A capability holds the state a native thread needs in
2101    * order to execute STG code. At least one capability is
2102    * floating around (only SMP builds have more than one).
2103    */
2104   initCapabilities();
2105   
2106 #if defined(RTS_SUPPORTS_THREADS)
2107     /* start our haskell execution tasks */
2108 # if defined(SMP)
2109     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2110 # else
2111     startTaskManager(0,taskStart);
2112 # endif
2113 #endif
2114
2115 #if /* defined(SMP) ||*/ defined(PAR)
2116   initSparkPools();
2117 #endif
2118
2119 #if defined(RTS_SUPPORTS_THREADS)
2120   RELEASE_LOCK(&sched_mutex);
2121 #endif
2122
2123 }
2124
2125 void
2126 exitScheduler( void )
2127 {
2128 #if defined(RTS_SUPPORTS_THREADS)
2129   stopTaskManager();
2130 #endif
2131   shutting_down_scheduler = rtsTrue;
2132 }
2133
2134 /* -----------------------------------------------------------------------------
2135    Managing the per-task allocation areas.
2136    
2137    Each capability comes with an allocation area.  These are
2138    fixed-length block lists into which allocation can be done.
2139
2140    ToDo: no support for two-space collection at the moment???
2141    -------------------------------------------------------------------------- */
2142
2143 /* -----------------------------------------------------------------------------
2144  * waitThread is the external interface for running a new computation
2145  * and waiting for the result.
2146  *
2147  * In the non-SMP case, we create a new main thread, push it on the 
2148  * main-thread stack, and invoke the scheduler to run it.  The
2149  * scheduler will return when the top main thread on the stack has
2150  * completed or died, and fill in the necessary fields of the
2151  * main_thread structure.
2152  *
2153  * In the SMP case, we create a main thread as before, but we then
2154  * create a new condition variable and sleep on it.  When our new
2155  * main thread has completed, we'll be woken up and the status/result
2156  * will be in the main_thread struct.
2157  * -------------------------------------------------------------------------- */
2158
2159 int 
2160 howManyThreadsAvail ( void )
2161 {
2162    int i = 0;
2163    StgTSO* q;
2164    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2165       i++;
2166    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2167       i++;
2168    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2169       i++;
2170    return i;
2171 }
2172
2173 void
2174 finishAllThreads ( void )
2175 {
2176    do {
2177       while (run_queue_hd != END_TSO_QUEUE) {
2178          waitThread ( run_queue_hd, NULL);
2179       }
2180       while (blocked_queue_hd != END_TSO_QUEUE) {
2181          waitThread ( blocked_queue_hd, NULL);
2182       }
2183       while (sleeping_queue != END_TSO_QUEUE) {
2184          waitThread ( blocked_queue_hd, NULL);
2185       }
2186    } while 
2187       (blocked_queue_hd != END_TSO_QUEUE || 
2188        run_queue_hd     != END_TSO_QUEUE ||
2189        sleeping_queue   != END_TSO_QUEUE);
2190 }
2191
2192 SchedulerStatus
2193 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2194
2195   StgMainThread *m;
2196
2197   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2198   m->tso = tso;
2199   m->ret = ret;
2200   m->stat = NoStatus;
2201 #if defined(RTS_SUPPORTS_THREADS)
2202   initCondition(&m->wakeup);
2203 #endif
2204
2205   /* see scheduleWaitThread() comment */
2206   ACQUIRE_LOCK(&sched_mutex);
2207   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2208   m->link = main_threads;
2209   main_threads = m;
2210   RELEASE_LOCK(&sched_mutex);
2211
2212   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2213 #if defined(THREADED_RTS)
2214   return waitThread_(m, rtsFalse);
2215 #else
2216   return waitThread_(m);
2217 #endif
2218 }
2219
2220 static
2221 SchedulerStatus
2222 waitThread_(StgMainThread* m
2223 #if defined(THREADED_RTS)
2224             , rtsBool blockWaiting
2225 #endif
2226            )
2227 {
2228   SchedulerStatus stat;
2229
2230   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2231
2232 #if defined(RTS_SUPPORTS_THREADS)
2233
2234 # if defined(THREADED_RTS)
2235   if (!blockWaiting) {
2236     /* In the threaded case, the OS thread that called main()
2237      * gets to enter the RTS directly without going via another
2238      * task/thread.
2239      */
2240     schedule();
2241     ASSERT(m->stat != NoStatus);
2242   } else 
2243 # endif
2244   {
2245     ACQUIRE_LOCK(&sched_mutex);
2246     do {
2247       waitCondition(&m->wakeup, &sched_mutex);
2248     } while (m->stat == NoStatus);
2249   }
2250 #elif defined(GRAN)
2251   /* GranSim specific init */
2252   CurrentTSO = m->tso;                // the TSO to run
2253   procStatus[MainProc] = Busy;        // status of main PE
2254   CurrentProc = MainProc;             // PE to run it on
2255
2256   schedule();
2257 #else
2258   RELEASE_LOCK(&sched_mutex);
2259   schedule();
2260   ASSERT(m->stat != NoStatus);
2261 #endif
2262
2263   stat = m->stat;
2264
2265 #if defined(RTS_SUPPORTS_THREADS)
2266   closeCondition(&m->wakeup);
2267 #endif
2268
2269   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2270                               m->tso->id));
2271   free(m);
2272
2273 #if defined(THREADED_RTS)
2274   if (blockWaiting) 
2275 #endif
2276     RELEASE_LOCK(&sched_mutex);
2277
2278   return stat;
2279 }
2280
2281 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2282 //@subsection Run queue code 
2283
2284 #if 0
2285 /* 
2286    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2287        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2288        implicit global variable that has to be correct when calling these
2289        fcts -- HWL 
2290 */
2291
2292 /* Put the new thread on the head of the runnable queue.
2293  * The caller of createThread better push an appropriate closure
2294  * on this thread's stack before the scheduler is invoked.
2295  */
2296 static /* inline */ void
2297 add_to_run_queue(tso)
2298 StgTSO* tso; 
2299 {
2300   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2301   tso->link = run_queue_hd;
2302   run_queue_hd = tso;
2303   if (run_queue_tl == END_TSO_QUEUE) {
2304     run_queue_tl = tso;
2305   }
2306 }
2307
2308 /* Put the new thread at the end of the runnable queue. */
2309 static /* inline */ void
2310 push_on_run_queue(tso)
2311 StgTSO* tso; 
2312 {
2313   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2314   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2315   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2316   if (run_queue_hd == END_TSO_QUEUE) {
2317     run_queue_hd = tso;
2318   } else {
2319     run_queue_tl->link = tso;
2320   }
2321   run_queue_tl = tso;
2322 }
2323
2324 /* 
2325    Should be inlined because it's used very often in schedule.  The tso
2326    argument is actually only needed in GranSim, where we want to have the
2327    possibility to schedule *any* TSO on the run queue, irrespective of the
2328    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2329    the run queue and dequeue the tso, adjusting the links in the queue. 
2330 */
2331 //@cindex take_off_run_queue
2332 static /* inline */ StgTSO*
2333 take_off_run_queue(StgTSO *tso) {
2334   StgTSO *t, *prev;
2335
2336   /* 
2337      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2338
2339      if tso is specified, unlink that tso from the run_queue (doesn't have
2340      to be at the beginning of the queue); GranSim only 
2341   */
2342   if (tso!=END_TSO_QUEUE) {
2343     /* find tso in queue */
2344     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2345          t!=END_TSO_QUEUE && t!=tso;
2346          prev=t, t=t->link) 
2347       /* nothing */ ;
2348     ASSERT(t==tso);
2349     /* now actually dequeue the tso */
2350     if (prev!=END_TSO_QUEUE) {
2351       ASSERT(run_queue_hd!=t);
2352       prev->link = t->link;
2353     } else {
2354       /* t is at beginning of thread queue */
2355       ASSERT(run_queue_hd==t);
2356       run_queue_hd = t->link;
2357     }
2358     /* t is at end of thread queue */
2359     if (t->link==END_TSO_QUEUE) {
2360       ASSERT(t==run_queue_tl);
2361       run_queue_tl = prev;
2362     } else {
2363       ASSERT(run_queue_tl!=t);
2364     }
2365     t->link = END_TSO_QUEUE;
2366   } else {
2367     /* take tso from the beginning of the queue; std concurrent code */
2368     t = run_queue_hd;
2369     if (t != END_TSO_QUEUE) {
2370       run_queue_hd = t->link;
2371       t->link = END_TSO_QUEUE;
2372       if (run_queue_hd == END_TSO_QUEUE) {
2373         run_queue_tl = END_TSO_QUEUE;
2374       }
2375     }
2376   }
2377   return t;
2378 }
2379
2380 #endif /* 0 */
2381
2382 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2383 //@subsection Garbage Collextion Routines
2384
2385 /* ---------------------------------------------------------------------------
2386    Where are the roots that we know about?
2387
2388         - all the threads on the runnable queue
2389         - all the threads on the blocked queue
2390         - all the threads on the sleeping queue
2391         - all the thread currently executing a _ccall_GC
2392         - all the "main threads"
2393      
2394    ------------------------------------------------------------------------ */
2395
2396 /* This has to be protected either by the scheduler monitor, or by the
2397         garbage collection monitor (probably the latter).
2398         KH @ 25/10/99
2399 */
2400
2401 void
2402 GetRoots(evac_fn evac)
2403 {
2404 #if defined(GRAN)
2405   {
2406     nat i;
2407     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2408       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2409           evac((StgClosure **)&run_queue_hds[i]);
2410       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2411           evac((StgClosure **)&run_queue_tls[i]);
2412       
2413       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2414           evac((StgClosure **)&blocked_queue_hds[i]);
2415       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2416           evac((StgClosure **)&blocked_queue_tls[i]);
2417       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2418           evac((StgClosure **)&ccalling_threads[i]);
2419     }
2420   }
2421
2422   markEventQueue();
2423
2424 #else /* !GRAN */
2425   if (run_queue_hd != END_TSO_QUEUE) {
2426       ASSERT(run_queue_tl != END_TSO_QUEUE);
2427       evac((StgClosure **)&run_queue_hd);
2428       evac((StgClosure **)&run_queue_tl);
2429   }
2430   
2431   if (blocked_queue_hd != END_TSO_QUEUE) {
2432       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2433       evac((StgClosure **)&blocked_queue_hd);
2434       evac((StgClosure **)&blocked_queue_tl);
2435   }
2436   
2437   if (sleeping_queue != END_TSO_QUEUE) {
2438       evac((StgClosure **)&sleeping_queue);
2439   }
2440 #endif 
2441
2442   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2443       evac((StgClosure **)&suspended_ccalling_threads);
2444   }
2445
2446 #if defined(PAR) || defined(GRAN)
2447   markSparkQueue(evac);
2448 #endif
2449 }
2450
2451 /* -----------------------------------------------------------------------------
2452    performGC
2453
2454    This is the interface to the garbage collector from Haskell land.
2455    We provide this so that external C code can allocate and garbage
2456    collect when called from Haskell via _ccall_GC.
2457
2458    It might be useful to provide an interface whereby the programmer
2459    can specify more roots (ToDo).
2460    
2461    This needs to be protected by the GC condition variable above.  KH.
2462    -------------------------------------------------------------------------- */
2463
2464 void (*extra_roots)(evac_fn);
2465
2466 void
2467 performGC(void)
2468 {
2469   /* Obligated to hold this lock upon entry */
2470   ACQUIRE_LOCK(&sched_mutex);
2471   GarbageCollect(GetRoots,rtsFalse);
2472   RELEASE_LOCK(&sched_mutex);
2473 }
2474
2475 void
2476 performMajorGC(void)
2477 {
2478   ACQUIRE_LOCK(&sched_mutex);
2479   GarbageCollect(GetRoots,rtsTrue);
2480   RELEASE_LOCK(&sched_mutex);
2481 }
2482
2483 static void
2484 AllRoots(evac_fn evac)
2485 {
2486     GetRoots(evac);             // the scheduler's roots
2487     extra_roots(evac);          // the user's roots
2488 }
2489
2490 void
2491 performGCWithRoots(void (*get_roots)(evac_fn))
2492 {
2493   ACQUIRE_LOCK(&sched_mutex);
2494   extra_roots = get_roots;
2495   GarbageCollect(AllRoots,rtsFalse);
2496   RELEASE_LOCK(&sched_mutex);
2497 }
2498
2499 /* -----------------------------------------------------------------------------
2500    Stack overflow
2501
2502    If the thread has reached its maximum stack size, then raise the
2503    StackOverflow exception in the offending thread.  Otherwise
2504    relocate the TSO into a larger chunk of memory and adjust its stack
2505    size appropriately.
2506    -------------------------------------------------------------------------- */
2507
2508 static StgTSO *
2509 threadStackOverflow(StgTSO *tso)
2510 {
2511   nat new_stack_size, new_tso_size, diff, stack_words;
2512   StgPtr new_sp;
2513   StgTSO *dest;
2514
2515   IF_DEBUG(sanity,checkTSO(tso));
2516   if (tso->stack_size >= tso->max_stack_size) {
2517
2518     IF_DEBUG(gc,
2519              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2520                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2521              /* If we're debugging, just print out the top of the stack */
2522              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2523                                               tso->sp+64)));
2524
2525     /* Send this thread the StackOverflow exception */
2526     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2527     return tso;
2528   }
2529
2530   /* Try to double the current stack size.  If that takes us over the
2531    * maximum stack size for this thread, then use the maximum instead.
2532    * Finally round up so the TSO ends up as a whole number of blocks.
2533    */
2534   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2535   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2536                                        TSO_STRUCT_SIZE)/sizeof(W_);
2537   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2538   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2539
2540   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2541
2542   dest = (StgTSO *)allocate(new_tso_size);
2543   TICK_ALLOC_TSO(new_stack_size,0);
2544
2545   /* copy the TSO block and the old stack into the new area */
2546   memcpy(dest,tso,TSO_STRUCT_SIZE);
2547   stack_words = tso->stack + tso->stack_size - tso->sp;
2548   new_sp = (P_)dest + new_tso_size - stack_words;
2549   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2550
2551   /* relocate the stack pointers... */
2552   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2553   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2554   dest->sp    = new_sp;
2555   dest->stack_size = new_stack_size;
2556         
2557   /* and relocate the update frame list */
2558   relocate_stack(dest, diff);
2559
2560   /* Mark the old TSO as relocated.  We have to check for relocated
2561    * TSOs in the garbage collector and any primops that deal with TSOs.
2562    *
2563    * It's important to set the sp and su values to just beyond the end
2564    * of the stack, so we don't attempt to scavenge any part of the
2565    * dead TSO's stack.
2566    */
2567   tso->what_next = ThreadRelocated;
2568   tso->link = dest;
2569   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2570   tso->su = (StgUpdateFrame *)tso->sp;
2571   tso->why_blocked = NotBlocked;
2572   dest->mut_link = NULL;
2573
2574   IF_PAR_DEBUG(verbose,
2575                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2576                      tso->id, tso, tso->stack_size);
2577                /* If we're debugging, just print out the top of the stack */
2578                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2579                                                 tso->sp+64)));
2580   
2581   IF_DEBUG(sanity,checkTSO(tso));
2582 #if 0
2583   IF_DEBUG(scheduler,printTSO(dest));
2584 #endif
2585
2586   return dest;
2587 }
2588
2589 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2590 //@subsection Blocking Queue Routines
2591
2592 /* ---------------------------------------------------------------------------
2593    Wake up a queue that was blocked on some resource.
2594    ------------------------------------------------------------------------ */
2595
2596 #if defined(GRAN)
2597 static inline void
2598 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2599 {
2600 }
2601 #elif defined(PAR)
2602 static inline void
2603 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2604 {
2605   /* write RESUME events to log file and
2606      update blocked and fetch time (depending on type of the orig closure) */
2607   if (RtsFlags.ParFlags.ParStats.Full) {
2608     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2609                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2610                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2611     if (EMPTY_RUN_QUEUE())
2612       emitSchedule = rtsTrue;
2613
2614     switch (get_itbl(node)->type) {
2615         case FETCH_ME_BQ:
2616           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2617           break;
2618         case RBH:
2619         case FETCH_ME:
2620         case BLACKHOLE_BQ:
2621           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2622           break;
2623 #ifdef DIST
2624         case MVAR:
2625           break;
2626 #endif    
2627         default:
2628           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2629         }
2630       }
2631 }
2632 #endif
2633
2634 #if defined(GRAN)
2635 static StgBlockingQueueElement *
2636 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2637 {
2638     StgTSO *tso;
2639     PEs node_loc, tso_loc;
2640
2641     node_loc = where_is(node); // should be lifted out of loop
2642     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2643     tso_loc = where_is((StgClosure *)tso);
2644     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2645       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2646       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2647       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2648       // insertThread(tso, node_loc);
2649       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2650                 ResumeThread,
2651                 tso, node, (rtsSpark*)NULL);
2652       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2653       // len_local++;
2654       // len++;
2655     } else { // TSO is remote (actually should be FMBQ)
2656       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2657                                   RtsFlags.GranFlags.Costs.gunblocktime +
2658                                   RtsFlags.GranFlags.Costs.latency;
2659       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2660                 UnblockThread,
2661                 tso, node, (rtsSpark*)NULL);
2662       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2663       // len++;
2664     }
2665     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2666     IF_GRAN_DEBUG(bq,
2667                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2668                           (node_loc==tso_loc ? "Local" : "Global"), 
2669                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2670     tso->block_info.closure = NULL;
2671     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2672                              tso->id, tso));
2673 }
2674 #elif defined(PAR)
2675 static StgBlockingQueueElement *
2676 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2677 {
2678     StgBlockingQueueElement *next;
2679
2680     switch (get_itbl(bqe)->type) {
2681     case TSO:
2682       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2683       /* if it's a TSO just push it onto the run_queue */
2684       next = bqe->link;
2685       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2686       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2687       THREAD_RUNNABLE();
2688       unblockCount(bqe, node);
2689       /* reset blocking status after dumping event */
2690       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2691       break;
2692
2693     case BLOCKED_FETCH:
2694       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2695       next = bqe->link;
2696       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2697       PendingFetches = (StgBlockedFetch *)bqe;
2698       break;
2699
2700 # if defined(DEBUG)
2701       /* can ignore this case in a non-debugging setup; 
2702          see comments on RBHSave closures above */
2703     case CONSTR:
2704       /* check that the closure is an RBHSave closure */
2705       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2706              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2707              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2708       break;
2709
2710     default:
2711       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2712            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2713            (StgClosure *)bqe);
2714 # endif
2715     }
2716   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2717   return next;
2718 }
2719
2720 #else /* !GRAN && !PAR */
2721 static StgTSO *
2722 unblockOneLocked(StgTSO *tso)
2723 {
2724   StgTSO *next;
2725
2726   ASSERT(get_itbl(tso)->type == TSO);
2727   ASSERT(tso->why_blocked != NotBlocked);
2728   tso->why_blocked = NotBlocked;
2729   next = tso->link;
2730   PUSH_ON_RUN_QUEUE(tso);
2731   THREAD_RUNNABLE();
2732   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2733   return next;
2734 }
2735 #endif
2736
2737 #if defined(GRAN) || defined(PAR)
2738 inline StgBlockingQueueElement *
2739 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2740 {
2741   ACQUIRE_LOCK(&sched_mutex);
2742   bqe = unblockOneLocked(bqe, node);
2743   RELEASE_LOCK(&sched_mutex);
2744   return bqe;
2745 }
2746 #else
2747 inline StgTSO *
2748 unblockOne(StgTSO *tso)
2749 {
2750   ACQUIRE_LOCK(&sched_mutex);
2751   tso = unblockOneLocked(tso);
2752   RELEASE_LOCK(&sched_mutex);
2753   return tso;
2754 }
2755 #endif
2756
2757 #if defined(GRAN)
2758 void 
2759 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2760 {
2761   StgBlockingQueueElement *bqe;
2762   PEs node_loc;
2763   nat len = 0; 
2764
2765   IF_GRAN_DEBUG(bq, 
2766                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2767                       node, CurrentProc, CurrentTime[CurrentProc], 
2768                       CurrentTSO->id, CurrentTSO));
2769
2770   node_loc = where_is(node);
2771
2772   ASSERT(q == END_BQ_QUEUE ||
2773          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2774          get_itbl(q)->type == CONSTR); // closure (type constructor)
2775   ASSERT(is_unique(node));
2776
2777   /* FAKE FETCH: magically copy the node to the tso's proc;
2778      no Fetch necessary because in reality the node should not have been 
2779      moved to the other PE in the first place
2780   */
2781   if (CurrentProc!=node_loc) {
2782     IF_GRAN_DEBUG(bq, 
2783                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2784                         node, node_loc, CurrentProc, CurrentTSO->id, 
2785                         // CurrentTSO, where_is(CurrentTSO),
2786                         node->header.gran.procs));
2787     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2788     IF_GRAN_DEBUG(bq, 
2789                   belch("## new bitmask of node %p is %#x",
2790                         node, node->header.gran.procs));
2791     if (RtsFlags.GranFlags.GranSimStats.Global) {
2792       globalGranStats.tot_fake_fetches++;
2793     }
2794   }
2795
2796   bqe = q;
2797   // ToDo: check: ASSERT(CurrentProc==node_loc);
2798   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2799     //next = bqe->link;
2800     /* 
2801        bqe points to the current element in the queue
2802        next points to the next element in the queue
2803     */
2804     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2805     //tso_loc = where_is(tso);
2806     len++;
2807     bqe = unblockOneLocked(bqe, node);
2808   }
2809
2810   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2811      the closure to make room for the anchor of the BQ */
2812   if (bqe!=END_BQ_QUEUE) {
2813     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2814     /*
2815     ASSERT((info_ptr==&RBH_Save_0_info) ||
2816            (info_ptr==&RBH_Save_1_info) ||
2817            (info_ptr==&RBH_Save_2_info));
2818     */
2819     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2820     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2821     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2822
2823     IF_GRAN_DEBUG(bq,
2824                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2825                         node, info_type(node)));
2826   }
2827
2828   /* statistics gathering */
2829   if (RtsFlags.GranFlags.GranSimStats.Global) {
2830     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2831     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2832     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2833     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2834   }
2835   IF_GRAN_DEBUG(bq,
2836                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2837                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2838 }
2839 #elif defined(PAR)
2840 void 
2841 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2842 {
2843   StgBlockingQueueElement *bqe;
2844
2845   ACQUIRE_LOCK(&sched_mutex);
2846
2847   IF_PAR_DEBUG(verbose, 
2848                belch("##-_ AwBQ for node %p on [%x]: ",
2849                      node, mytid));
2850 #ifdef DIST  
2851   //RFP
2852   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2853     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2854     return;
2855   }
2856 #endif
2857   
2858   ASSERT(q == END_BQ_QUEUE ||
2859          get_itbl(q)->type == TSO ||           
2860          get_itbl(q)->type == BLOCKED_FETCH || 
2861          get_itbl(q)->type == CONSTR); 
2862
2863   bqe = q;
2864   while (get_itbl(bqe)->type==TSO || 
2865          get_itbl(bqe)->type==BLOCKED_FETCH) {
2866     bqe = unblockOneLocked(bqe, node);
2867   }
2868   RELEASE_LOCK(&sched_mutex);
2869 }
2870
2871 #else   /* !GRAN && !PAR */
2872 void
2873 awakenBlockedQueue(StgTSO *tso)
2874 {
2875   ACQUIRE_LOCK(&sched_mutex);
2876   while (tso != END_TSO_QUEUE) {
2877     tso = unblockOneLocked(tso);
2878   }
2879   RELEASE_LOCK(&sched_mutex);
2880 }
2881 #endif
2882
2883 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2884 //@subsection Exception Handling Routines
2885
2886 /* ---------------------------------------------------------------------------
2887    Interrupt execution
2888    - usually called inside a signal handler so it mustn't do anything fancy.   
2889    ------------------------------------------------------------------------ */
2890
2891 void
2892 interruptStgRts(void)
2893 {
2894     interrupted    = 1;
2895     context_switch = 1;
2896 }
2897
2898 /* -----------------------------------------------------------------------------
2899    Unblock a thread
2900
2901    This is for use when we raise an exception in another thread, which
2902    may be blocked.
2903    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2904    -------------------------------------------------------------------------- */
2905
2906 #if defined(GRAN) || defined(PAR)
2907 /*
2908   NB: only the type of the blocking queue is different in GranSim and GUM
2909       the operations on the queue-elements are the same
2910       long live polymorphism!
2911
2912   Locks: sched_mutex is held upon entry and exit.
2913
2914 */
2915 static void
2916 unblockThread(StgTSO *tso)
2917 {
2918   StgBlockingQueueElement *t, **last;
2919
2920   switch (tso->why_blocked) {
2921
2922   case NotBlocked:
2923     return;  /* not blocked */
2924
2925   case BlockedOnMVar:
2926     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2927     {
2928       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2929       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2930
2931       last = (StgBlockingQueueElement **)&mvar->head;
2932       for (t = (StgBlockingQueueElement *)mvar->head; 
2933            t != END_BQ_QUEUE; 
2934            last = &t->link, last_tso = t, t = t->link) {
2935         if (t == (StgBlockingQueueElement *)tso) {
2936           *last = (StgBlockingQueueElement *)tso->link;
2937           if (mvar->tail == tso) {
2938             mvar->tail = (StgTSO *)last_tso;
2939           }
2940           goto done;
2941         }
2942       }
2943       barf("unblockThread (MVAR): TSO not found");
2944     }
2945
2946   case BlockedOnBlackHole:
2947     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2948     {
2949       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2950
2951       last = &bq->blocking_queue;
2952       for (t = bq->blocking_queue; 
2953            t != END_BQ_QUEUE; 
2954            last = &t->link, t = t->link) {
2955         if (t == (StgBlockingQueueElement *)tso) {
2956           *last = (StgBlockingQueueElement *)tso->link;
2957           goto done;
2958         }
2959       }
2960       barf("unblockThread (BLACKHOLE): TSO not found");
2961     }
2962
2963   case BlockedOnException:
2964     {
2965       StgTSO *target  = tso->block_info.tso;
2966
2967       ASSERT(get_itbl(target)->type == TSO);
2968
2969       if (target->what_next == ThreadRelocated) {
2970           target = target->link;
2971           ASSERT(get_itbl(target)->type == TSO);
2972       }
2973
2974       ASSERT(target->blocked_exceptions != NULL);
2975
2976       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2977       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2978            t != END_BQ_QUEUE; 
2979            last = &t->link, t = t->link) {
2980         ASSERT(get_itbl(t)->type == TSO);
2981         if (t == (StgBlockingQueueElement *)tso) {
2982           *last = (StgBlockingQueueElement *)tso->link;
2983           goto done;
2984         }
2985       }
2986       barf("unblockThread (Exception): TSO not found");
2987     }
2988
2989   case BlockedOnRead:
2990   case BlockedOnWrite:
2991     {
2992       /* take TSO off blocked_queue */
2993       StgBlockingQueueElement *prev = NULL;
2994       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2995            prev = t, t = t->link) {
2996         if (t == (StgBlockingQueueElement *)tso) {
2997           if (prev == NULL) {
2998             blocked_queue_hd = (StgTSO *)t->link;
2999             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3000               blocked_queue_tl = END_TSO_QUEUE;
3001             }
3002           } else {
3003             prev->link = t->link;
3004             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3005               blocked_queue_tl = (StgTSO *)prev;
3006             }
3007           }
3008           goto done;
3009         }
3010       }
3011       barf("unblockThread (I/O): TSO not found");
3012     }
3013
3014   case BlockedOnDelay:
3015     {
3016       /* take TSO off sleeping_queue */
3017       StgBlockingQueueElement *prev = NULL;
3018       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3019            prev = t, t = t->link) {
3020         if (t == (StgBlockingQueueElement *)tso) {
3021           if (prev == NULL) {
3022             sleeping_queue = (StgTSO *)t->link;
3023           } else {
3024             prev->link = t->link;
3025           }
3026           goto done;
3027         }
3028       }
3029       barf("unblockThread (I/O): TSO not found");
3030     }
3031
3032   default:
3033     barf("unblockThread");
3034   }
3035
3036  done:
3037   tso->link = END_TSO_QUEUE;
3038   tso->why_blocked = NotBlocked;
3039   tso->block_info.closure = NULL;
3040   PUSH_ON_RUN_QUEUE(tso);
3041 }
3042 #else
3043 static void
3044 unblockThread(StgTSO *tso)
3045 {
3046   StgTSO *t, **last;
3047   
3048   /* To avoid locking unnecessarily. */
3049   if (tso->why_blocked == NotBlocked) {
3050     return;
3051   }
3052
3053   switch (tso->why_blocked) {
3054
3055   case BlockedOnMVar:
3056     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3057     {
3058       StgTSO *last_tso = END_TSO_QUEUE;
3059       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3060
3061       last = &mvar->head;
3062       for (t = mvar->head; t != END_TSO_QUEUE; 
3063            last = &t->link, last_tso = t, t = t->link) {
3064         if (t == tso) {
3065           *last = tso->link;
3066           if (mvar->tail == tso) {
3067             mvar->tail = last_tso;
3068           }
3069           goto done;
3070         }
3071       }
3072       barf("unblockThread (MVAR): TSO not found");
3073     }
3074
3075   case BlockedOnBlackHole:
3076     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3077     {
3078       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3079
3080       last = &bq->blocking_queue;
3081       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3082            last = &t->link, t = t->link) {
3083         if (t == tso) {
3084           *last = tso->link;
3085           goto done;
3086         }
3087       }
3088       barf("unblockThread (BLACKHOLE): TSO not found");
3089     }
3090
3091   case BlockedOnException:
3092     {
3093       StgTSO *target  = tso->block_info.tso;
3094
3095       ASSERT(get_itbl(target)->type == TSO);
3096
3097       while (target->what_next == ThreadRelocated) {
3098           target = target->link;
3099           ASSERT(get_itbl(target)->type == TSO);
3100       }
3101       
3102       ASSERT(target->blocked_exceptions != NULL);
3103
3104       last = &target->blocked_exceptions;
3105       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3106            last = &t->link, t = t->link) {
3107         ASSERT(get_itbl(t)->type == TSO);
3108         if (t == tso) {
3109           *last = tso->link;
3110           goto done;
3111         }
3112       }
3113       barf("unblockThread (Exception): TSO not found");
3114     }
3115
3116   case BlockedOnRead:
3117   case BlockedOnWrite:
3118     {
3119       StgTSO *prev = NULL;
3120       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3121            prev = t, t = t->link) {
3122         if (t == tso) {
3123           if (prev == NULL) {
3124             blocked_queue_hd = t->link;
3125             if (blocked_queue_tl == t) {
3126               blocked_queue_tl = END_TSO_QUEUE;
3127             }
3128           } else {
3129             prev->link = t->link;
3130             if (blocked_queue_tl == t) {
3131               blocked_queue_tl = prev;
3132             }
3133           }
3134           goto done;
3135         }
3136       }
3137       barf("unblockThread (I/O): TSO not found");
3138     }
3139
3140   case BlockedOnDelay:
3141     {
3142       StgTSO *prev = NULL;
3143       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3144            prev = t, t = t->link) {
3145         if (t == tso) {
3146           if (prev == NULL) {
3147             sleeping_queue = t->link;
3148           } else {
3149             prev->link = t->link;
3150           }
3151           goto done;
3152         }
3153       }
3154       barf("unblockThread (I/O): TSO not found");
3155     }
3156
3157   default:
3158     barf("unblockThread");
3159   }
3160
3161  done:
3162   tso->link = END_TSO_QUEUE;
3163   tso->why_blocked = NotBlocked;
3164   tso->block_info.closure = NULL;
3165   PUSH_ON_RUN_QUEUE(tso);
3166 }
3167 #endif
3168
3169 /* -----------------------------------------------------------------------------
3170  * raiseAsync()
3171  *
3172  * The following function implements the magic for raising an
3173  * asynchronous exception in an existing thread.
3174  *
3175  * We first remove the thread from any queue on which it might be
3176  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3177  *
3178  * We strip the stack down to the innermost CATCH_FRAME, building
3179  * thunks in the heap for all the active computations, so they can 
3180  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3181  * an application of the handler to the exception, and push it on
3182  * the top of the stack.
3183  * 
3184  * How exactly do we save all the active computations?  We create an
3185  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3186  * AP_UPDs pushes everything from the corresponding update frame
3187  * upwards onto the stack.  (Actually, it pushes everything up to the
3188  * next update frame plus a pointer to the next AP_UPD object.
3189  * Entering the next AP_UPD object pushes more onto the stack until we
3190  * reach the last AP_UPD object - at which point the stack should look
3191  * exactly as it did when we killed the TSO and we can continue
3192  * execution by entering the closure on top of the stack.
3193  *
3194  * We can also kill a thread entirely - this happens if either (a) the 
3195  * exception passed to raiseAsync is NULL, or (b) there's no
3196  * CATCH_FRAME on the stack.  In either case, we strip the entire
3197  * stack and replace the thread with a zombie.
3198  *
3199  * Locks: sched_mutex held upon entry nor exit.
3200  *
3201  * -------------------------------------------------------------------------- */
3202  
3203 void 
3204 deleteThread(StgTSO *tso)
3205 {
3206   raiseAsync(tso,NULL);
3207 }
3208
3209 void
3210 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3211 {
3212   /* When raising async exs from contexts where sched_mutex isn't held;
3213      use raiseAsyncWithLock(). */
3214   ACQUIRE_LOCK(&sched_mutex);
3215   raiseAsync(tso,exception);
3216   RELEASE_LOCK(&sched_mutex);
3217 }
3218
3219 void
3220 raiseAsync(StgTSO *tso, StgClosure *exception)
3221 {
3222   StgUpdateFrame* su = tso->su;
3223   StgPtr          sp = tso->sp;
3224   
3225   /* Thread already dead? */
3226   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3227     return;
3228   }
3229
3230   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3231
3232   /* Remove it from any blocking queues */
3233   unblockThread(tso);
3234
3235   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3236   /* The stack freezing code assumes there's a closure pointer on
3237    * the top of the stack.  This isn't always the case with compiled
3238    * code, so we have to push a dummy closure on the top which just
3239    * returns to the next return address on the stack.
3240    */
3241   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3242     *(--sp) = (W_)&stg_dummy_ret_closure;
3243   }
3244
3245   while (1) {
3246     nat words = ((P_)su - (P_)sp) - 1;
3247     nat i;
3248     StgAP_UPD * ap;
3249
3250     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3251      * then build the THUNK raise(exception), and leave it on
3252      * top of the CATCH_FRAME ready to enter.
3253      */
3254     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3255 #ifdef PROFILING
3256       StgCatchFrame *cf = (StgCatchFrame *)su;
3257 #endif
3258       StgClosure *raise;
3259
3260       /* we've got an exception to raise, so let's pass it to the
3261        * handler in this frame.
3262        */
3263       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3264       TICK_ALLOC_SE_THK(1,0);
3265       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3266       raise->payload[0] = exception;
3267
3268       /* throw away the stack from Sp up to the CATCH_FRAME.
3269        */
3270       sp = (P_)su - 1;
3271
3272       /* Ensure that async excpetions are blocked now, so we don't get
3273        * a surprise exception before we get around to executing the
3274        * handler.
3275        */
3276       if (tso->blocked_exceptions == NULL) {
3277           tso->blocked_exceptions = END_TSO_QUEUE;
3278       }
3279
3280       /* Put the newly-built THUNK on top of the stack, ready to execute
3281        * when the thread restarts.
3282        */
3283       sp[0] = (W_)raise;
3284       tso->sp = sp;
3285       tso->su = su;
3286       tso->what_next = ThreadEnterGHC;
3287       IF_DEBUG(sanity, checkTSO(tso));
3288       return;
3289     }
3290
3291     /* First build an AP_UPD consisting of the stack chunk above the
3292      * current update frame, with the top word on the stack as the
3293      * fun field.
3294      */
3295     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3296     
3297     ASSERT(words >= 0);
3298     
3299     ap->n_args = words;
3300     ap->fun    = (StgClosure *)sp[0];
3301     sp++;
3302     for(i=0; i < (nat)words; ++i) {
3303       ap->payload[i] = (StgClosure *)*sp++;
3304     }
3305     
3306     switch (get_itbl(su)->type) {
3307       
3308     case UPDATE_FRAME:
3309       {
3310         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3311         TICK_ALLOC_UP_THK(words+1,0);
3312         
3313         IF_DEBUG(scheduler,
3314                  fprintf(stderr,  "scheduler: Updating ");
3315                  printPtr((P_)su->updatee); 
3316                  fprintf(stderr,  " with ");
3317                  printObj((StgClosure *)ap);
3318                  );
3319         
3320         /* Replace the updatee with an indirection - happily
3321          * this will also wake up any threads currently
3322          * waiting on the result.
3323          *
3324          * Warning: if we're in a loop, more than one update frame on
3325          * the stack may point to the same object.  Be careful not to
3326          * overwrite an IND_OLDGEN in this case, because we'll screw
3327          * up the mutable lists.  To be on the safe side, don't
3328          * overwrite any kind of indirection at all.  See also
3329          * threadSqueezeStack in GC.c, where we have to make a similar
3330          * check.
3331          */
3332         if (!closure_IND(su->updatee)) {
3333             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3334         }
3335         su = su->link;
3336         sp += sizeofW(StgUpdateFrame) -1;
3337         sp[0] = (W_)ap; /* push onto stack */
3338         break;
3339       }
3340
3341     case CATCH_FRAME:
3342       {
3343         StgCatchFrame *cf = (StgCatchFrame *)su;
3344         StgClosure* o;
3345         
3346         /* We want a PAP, not an AP_UPD.  Fortunately, the
3347          * layout's the same.
3348          */
3349         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3350         TICK_ALLOC_UPD_PAP(words+1,0);
3351         
3352         /* now build o = FUN(catch,ap,handler) */
3353         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3354         TICK_ALLOC_FUN(2,0);
3355         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3356         o->payload[0] = (StgClosure *)ap;
3357         o->payload[1] = cf->handler;
3358         
3359         IF_DEBUG(scheduler,
3360                  fprintf(stderr,  "scheduler: Built ");
3361                  printObj((StgClosure *)o);
3362                  );
3363         
3364         /* pop the old handler and put o on the stack */
3365         su = cf->link;
3366         sp += sizeofW(StgCatchFrame) - 1;
3367         sp[0] = (W_)o;
3368         break;
3369       }
3370       
3371     case SEQ_FRAME:
3372       {
3373         StgSeqFrame *sf = (StgSeqFrame *)su;
3374         StgClosure* o;
3375         
3376         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3377         TICK_ALLOC_UPD_PAP(words+1,0);
3378         
3379         /* now build o = FUN(seq,ap) */
3380         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3381         TICK_ALLOC_SE_THK(1,0);
3382         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3383         o->payload[0] = (StgClosure *)ap;
3384         
3385         IF_DEBUG(scheduler,
3386                  fprintf(stderr,  "scheduler: Built ");
3387                  printObj((StgClosure *)o);
3388                  );
3389         
3390         /* pop the old handler and put o on the stack */
3391         su = sf->link;
3392         sp += sizeofW(StgSeqFrame) - 1;
3393         sp[0] = (W_)o;
3394         break;
3395       }
3396       
3397     case STOP_FRAME:
3398       /* We've stripped the entire stack, the thread is now dead. */
3399       sp += sizeofW(StgStopFrame) - 1;
3400       sp[0] = (W_)exception;    /* save the exception */
3401       tso->what_next = ThreadKilled;
3402       tso->su = (StgUpdateFrame *)(sp+1);
3403       tso->sp = sp;
3404       return;
3405
3406     default:
3407       barf("raiseAsync");
3408     }
3409   }
3410   barf("raiseAsync");
3411 }
3412
3413 /* -----------------------------------------------------------------------------
3414    resurrectThreads is called after garbage collection on the list of
3415    threads found to be garbage.  Each of these threads will be woken
3416    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3417    on an MVar, or NonTermination if the thread was blocked on a Black
3418    Hole.
3419
3420    Locks: sched_mutex isn't held upon entry nor exit.
3421    -------------------------------------------------------------------------- */
3422
3423 void
3424 resurrectThreads( StgTSO *threads )
3425 {
3426   StgTSO *tso, *next;
3427
3428   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3429     next = tso->global_link;
3430     tso->global_link = all_threads;
3431     all_threads = tso;
3432     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3433
3434     switch (tso->why_blocked) {
3435     case BlockedOnMVar:
3436     case BlockedOnException:
3437       /* Called by GC - sched_mutex lock is currently held. */
3438       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3439       break;
3440     case BlockedOnBlackHole:
3441       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3442       break;
3443     case NotBlocked:
3444       /* This might happen if the thread was blocked on a black hole
3445        * belonging to a thread that we've just woken up (raiseAsync
3446        * can wake up threads, remember...).
3447        */
3448       continue;
3449     default:
3450       barf("resurrectThreads: thread blocked in a strange way");
3451     }
3452   }
3453 }
3454
3455 /* -----------------------------------------------------------------------------
3456  * Blackhole detection: if we reach a deadlock, test whether any
3457  * threads are blocked on themselves.  Any threads which are found to
3458  * be self-blocked get sent a NonTermination exception.
3459  *
3460  * This is only done in a deadlock situation in order to avoid
3461  * performance overhead in the normal case.
3462  *
3463  * Locks: sched_mutex is held upon entry and exit.
3464  * -------------------------------------------------------------------------- */
3465
3466 static void
3467 detectBlackHoles( void )
3468 {
3469     StgTSO *t = all_threads;
3470     StgUpdateFrame *frame;
3471     StgClosure *blocked_on;
3472
3473     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3474
3475         while (t->what_next == ThreadRelocated) {
3476             t = t->link;
3477             ASSERT(get_itbl(t)->type == TSO);
3478         }
3479       
3480         if (t->why_blocked != BlockedOnBlackHole) {
3481             continue;
3482         }
3483
3484         blocked_on = t->block_info.closure;
3485
3486         for (frame = t->su; ; frame = frame->link) {
3487             switch (get_itbl(frame)->type) {
3488
3489             case UPDATE_FRAME:
3490                 if (frame->updatee == blocked_on) {
3491                     /* We are blocking on one of our own computations, so
3492                      * send this thread the NonTermination exception.  
3493                      */
3494                     IF_DEBUG(scheduler, 
3495                              sched_belch("thread %d is blocked on itself", t->id));
3496                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3497                     goto done;
3498                 }
3499                 else {
3500                     continue;
3501                 }
3502
3503             case CATCH_FRAME:
3504             case SEQ_FRAME:
3505                 continue;
3506                 
3507             case STOP_FRAME:
3508                 break;
3509             }
3510             break;
3511         }
3512
3513     done: ;
3514     }   
3515 }
3516
3517 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3518 //@subsection Debugging Routines
3519
3520 /* -----------------------------------------------------------------------------
3521    Debugging: why is a thread blocked
3522    -------------------------------------------------------------------------- */
3523
3524 #ifdef DEBUG
3525
3526 void
3527 printThreadBlockage(StgTSO *tso)
3528 {
3529   switch (tso->why_blocked) {
3530   case BlockedOnRead:
3531     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3532     break;
3533   case BlockedOnWrite:
3534     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3535     break;
3536   case BlockedOnDelay:
3537     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3538     break;
3539   case BlockedOnMVar:
3540     fprintf(stderr,"is blocked on an MVar");
3541     break;
3542   case BlockedOnException:
3543     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3544             tso->block_info.tso->id);
3545     break;
3546   case BlockedOnBlackHole:
3547     fprintf(stderr,"is blocked on a black hole");
3548     break;
3549   case NotBlocked:
3550     fprintf(stderr,"is not blocked");
3551     break;
3552 #if defined(PAR)
3553   case BlockedOnGA:
3554     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3555             tso->block_info.closure, info_type(tso->block_info.closure));
3556     break;
3557   case BlockedOnGA_NoSend:
3558     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3559             tso->block_info.closure, info_type(tso->block_info.closure));
3560     break;
3561 #endif
3562 #if defined(RTS_SUPPORTS_THREADS)
3563   case BlockedOnCCall:
3564     fprintf(stderr,"is blocked on an external call");
3565     break;
3566 #endif
3567   default:
3568     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3569          tso->why_blocked, tso->id, tso);
3570   }
3571 }
3572
3573 void
3574 printThreadStatus(StgTSO *tso)
3575 {
3576   switch (tso->what_next) {
3577   case ThreadKilled:
3578     fprintf(stderr,"has been killed");
3579     break;
3580   case ThreadComplete:
3581     fprintf(stderr,"has completed");
3582     break;
3583   default:
3584     printThreadBlockage(tso);
3585   }
3586 }
3587
3588 void
3589 printAllThreads(void)
3590 {
3591   StgTSO *t;
3592   void *label;
3593
3594 # if defined(GRAN)
3595   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3596   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3597                        time_string, rtsFalse/*no commas!*/);
3598
3599   sched_belch("all threads at [%s]:", time_string);
3600 # elif defined(PAR)
3601   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3602   ullong_format_string(CURRENT_TIME,
3603                        time_string, rtsFalse/*no commas!*/);
3604
3605   sched_belch("all threads at [%s]:", time_string);
3606 # else
3607   sched_belch("all threads:");
3608 # endif
3609
3610   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3611     fprintf(stderr, "\tthread %d @ %p ", t->id, (void *)t);
3612     label = lookupThreadLabel((StgWord)t);
3613     if (label) fprintf(stderr,"[\"%s\"] ",(char *)label);
3614     printThreadStatus(t);
3615     fprintf(stderr,"\n");
3616   }
3617 }
3618     
3619 /* 
3620    Print a whole blocking queue attached to node (debugging only).
3621 */
3622 //@cindex print_bq
3623 # if defined(PAR)
3624 void 
3625 print_bq (StgClosure *node)
3626 {
3627   StgBlockingQueueElement *bqe;
3628   StgTSO *tso;
3629   rtsBool end;
3630
3631   fprintf(stderr,"## BQ of closure %p (%s): ",
3632           node, info_type(node));
3633
3634   /* should cover all closures that may have a blocking queue */
3635   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3636          get_itbl(node)->type == FETCH_ME_BQ ||
3637          get_itbl(node)->type == RBH ||
3638          get_itbl(node)->type == MVAR);
3639     
3640   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3641
3642   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3643 }
3644
3645 /* 
3646    Print a whole blocking queue starting with the element bqe.
3647 */
3648 void 
3649 print_bqe (StgBlockingQueueElement *bqe)
3650 {
3651   rtsBool end;
3652
3653   /* 
3654      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3655   */
3656   for (end = (bqe==END_BQ_QUEUE);
3657        !end; // iterate until bqe points to a CONSTR
3658        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3659        bqe = end ? END_BQ_QUEUE : bqe->link) {
3660     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3661     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3662     /* types of closures that may appear in a blocking queue */
3663     ASSERT(get_itbl(bqe)->type == TSO ||           
3664            get_itbl(bqe)->type == BLOCKED_FETCH || 
3665            get_itbl(bqe)->type == CONSTR); 
3666     /* only BQs of an RBH end with an RBH_Save closure */
3667     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3668
3669     switch (get_itbl(bqe)->type) {
3670     case TSO:
3671       fprintf(stderr," TSO %u (%x),",
3672               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3673       break;
3674     case BLOCKED_FETCH:
3675       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3676               ((StgBlockedFetch *)bqe)->node, 
3677               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3678               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3679               ((StgBlockedFetch *)bqe)->ga.weight);
3680       break;
3681     case CONSTR:
3682       fprintf(stderr," %s (IP %p),",
3683               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3684                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3685                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3686                "RBH_Save_?"), get_itbl(bqe));
3687       break;
3688     default:
3689       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3690            info_type((StgClosure *)bqe)); // , node, info_type(node));
3691       break;
3692     }
3693   } /* for */
3694   fputc('\n', stderr);
3695 }
3696 # elif defined(GRAN)
3697 void 
3698 print_bq (StgClosure *node)
3699 {
3700   StgBlockingQueueElement *bqe;
3701   PEs node_loc, tso_loc;
3702   rtsBool end;
3703
3704   /* should cover all closures that may have a blocking queue */
3705   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3706          get_itbl(node)->type == FETCH_ME_BQ ||
3707          get_itbl(node)->type == RBH);
3708     
3709   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3710   node_loc = where_is(node);
3711
3712   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3713           node, info_type(node), node_loc);
3714
3715   /* 
3716      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3717   */
3718   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3719        !end; // iterate until bqe points to a CONSTR
3720        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3721     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3722     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3723     /* types of closures that may appear in a blocking queue */
3724     ASSERT(get_itbl(bqe)->type == TSO ||           
3725            get_itbl(bqe)->type == CONSTR); 
3726     /* only BQs of an RBH end with an RBH_Save closure */
3727     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3728
3729     tso_loc = where_is((StgClosure *)bqe);
3730     switch (get_itbl(bqe)->type) {
3731     case TSO:
3732       fprintf(stderr," TSO %d (%p) on [PE %d],",
3733               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3734       break;
3735     case CONSTR:
3736       fprintf(stderr," %s (IP %p),",
3737               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3738                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3739                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3740                "RBH_Save_?"), get_itbl(bqe));
3741       break;
3742     default:
3743       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3744            info_type((StgClosure *)bqe), node, info_type(node));
3745       break;
3746     }
3747   } /* for */
3748   fputc('\n', stderr);
3749 }
3750 #else
3751 /* 
3752    Nice and easy: only TSOs on the blocking queue
3753 */
3754 void 
3755 print_bq (StgClosure *node)
3756 {
3757   StgTSO *tso;
3758
3759   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3760   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3761        tso != END_TSO_QUEUE; 
3762        tso=tso->link) {
3763     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3764     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3765     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3766   }
3767   fputc('\n', stderr);
3768 }
3769 # endif
3770
3771 #if defined(PAR)
3772 static nat
3773 run_queue_len(void)
3774 {
3775   nat i;
3776   StgTSO *tso;
3777
3778   for (i=0, tso=run_queue_hd; 
3779        tso != END_TSO_QUEUE;
3780        i++, tso=tso->link)
3781     /* nothing */
3782
3783   return i;
3784 }
3785 #endif
3786
3787 static void
3788 sched_belch(char *s, ...)
3789 {
3790   va_list ap;
3791   va_start(ap,s);
3792 #ifdef SMP
3793   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3794 #elif defined(PAR)
3795   fprintf(stderr, "== ");
3796 #else
3797   fprintf(stderr, "scheduler: ");
3798 #endif
3799   vfprintf(stderr, s, ap);
3800   fprintf(stderr, "\n");
3801   va_end(ap);
3802 }
3803
3804 #endif /* DEBUG */
3805
3806
3807 //@node Index,  , Debugging Routines, Main scheduling code
3808 //@subsection Index
3809
3810 //@index
3811 //* StgMainThread::  @cindex\s-+StgMainThread
3812 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3813 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3814 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3815 //* context_switch::  @cindex\s-+context_switch
3816 //* createThread::  @cindex\s-+createThread
3817 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3818 //* initScheduler::  @cindex\s-+initScheduler
3819 //* interrupted::  @cindex\s-+interrupted
3820 //* next_thread_id::  @cindex\s-+next_thread_id
3821 //* print_bq::  @cindex\s-+print_bq
3822 //* run_queue_hd::  @cindex\s-+run_queue_hd
3823 //* run_queue_tl::  @cindex\s-+run_queue_tl
3824 //* sched_mutex::  @cindex\s-+sched_mutex
3825 //* schedule::  @cindex\s-+schedule
3826 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3827 //* term_mutex::  @cindex\s-+term_mutex
3828 //@end index