6c5edef99dec999436339b272d039955c3cd670f
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.137 2002/04/13 05:33:02 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #ifdef HAVE_SYS_TYPES_H
118 #include <sys/types.h>
119 #endif
120 #ifdef HAVE_UNISTD_H
121 #include <unistd.h>
122 #endif
123
124 #include <stdarg.h>
125
126 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
127 //@subsection Variables and Data structures
128
129 /* Main thread queue.
130  * Locks required: sched_mutex.
131  */
132 StgMainThread *main_threads;
133
134 /* Thread queues.
135  * Locks required: sched_mutex.
136  */
137 #if defined(GRAN)
138
139 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
140 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
141
142 /* 
143    In GranSim we have a runnable and a blocked queue for each processor.
144    In order to minimise code changes new arrays run_queue_hds/tls
145    are created. run_queue_hd is then a short cut (macro) for
146    run_queue_hds[CurrentProc] (see GranSim.h).
147    -- HWL
148 */
149 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
150 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
151 StgTSO *ccalling_threadss[MAX_PROC];
152 /* We use the same global list of threads (all_threads) in GranSim as in
153    the std RTS (i.e. we are cheating). However, we don't use this list in
154    the GranSim specific code at the moment (so we are only potentially
155    cheating).  */
156
157 #else /* !GRAN */
158
159 StgTSO *run_queue_hd, *run_queue_tl;
160 StgTSO *blocked_queue_hd, *blocked_queue_tl;
161 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
162
163 #endif
164
165 /* Linked list of all threads.
166  * Used for detecting garbage collected threads.
167  */
168 StgTSO *all_threads;
169
170 /* When a thread performs a safe C call (_ccall_GC, using old
171  * terminology), it gets put on the suspended_ccalling_threads
172  * list. Used by the garbage collector.
173  */
174 static StgTSO *suspended_ccalling_threads;
175
176 static StgTSO *threadStackOverflow(StgTSO *tso);
177
178 /* KH: The following two flags are shared memory locations.  There is no need
179        to lock them, since they are only unset at the end of a scheduler
180        operation.
181 */
182
183 /* flag set by signal handler to precipitate a context switch */
184 //@cindex context_switch
185 nat context_switch;
186
187 /* if this flag is set as well, give up execution */
188 //@cindex interrupted
189 rtsBool interrupted;
190
191 /* Next thread ID to allocate.
192  * Locks required: sched_mutex
193  */
194 //@cindex next_thread_id
195 StgThreadID next_thread_id = 1;
196
197 /*
198  * Pointers to the state of the current thread.
199  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
200  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
201  */
202  
203 /* The smallest stack size that makes any sense is:
204  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
205  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
206  *  + 1                       (the realworld token for an IO thread)
207  *  + 1                       (the closure to enter)
208  *
209  * A thread with this stack will bomb immediately with a stack
210  * overflow, which will increase its stack size.  
211  */
212
213 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
214
215
216 #if defined(GRAN)
217 StgTSO *CurrentTSO;
218 #endif
219
220 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
221  *  exists - earlier gccs apparently didn't.
222  *  -= chak
223  */
224 StgTSO dummy_tso;
225
226 rtsBool ready_to_gc;
227
228 void            addToBlockedQueue ( StgTSO *tso );
229
230 static void     schedule          ( void );
231        void     interruptStgRts   ( void );
232 #if defined(GRAN)
233 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
234 #else
235 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
236 #endif
237
238 static void     detectBlackHoles  ( void );
239
240 #ifdef DEBUG
241 static void sched_belch(char *s, ...);
242 #endif
243
244 #if defined(RTS_SUPPORTS_THREADS)
245 /* ToDo: carefully document the invariants that go together
246  *       with these synchronisation objects.
247  */
248 Mutex     sched_mutex       = INIT_MUTEX_VAR;
249 Mutex     term_mutex        = INIT_MUTEX_VAR;
250
251 # if defined(SMP)
252 static Condition gc_pending_cond = INIT_COND_VAR;
253 nat await_death;
254 # endif
255
256 #endif /* RTS_SUPPORTS_THREADS */
257
258 #if defined(PAR)
259 StgTSO *LastTSO;
260 rtsTime TimeOfLastYield;
261 rtsBool emitSchedule = rtsTrue;
262 #endif
263
264 #if DEBUG
265 char *whatNext_strs[] = {
266   "ThreadEnterGHC",
267   "ThreadRunGHC",
268   "ThreadEnterInterp",
269   "ThreadKilled",
270   "ThreadComplete"
271 };
272
273 char *threadReturnCode_strs[] = {
274   "HeapOverflow",                       /* might also be StackOverflow */
275   "StackOverflow",
276   "ThreadYielding",
277   "ThreadBlocked",
278   "ThreadFinished"
279 };
280 #endif
281
282 #if defined(PAR)
283 StgTSO * createSparkThread(rtsSpark spark);
284 StgTSO * activateSpark (rtsSpark spark);  
285 #endif
286
287 /*
288  * The thread state for the main thread.
289 // ToDo: check whether not needed any more
290 StgTSO   *MainTSO;
291  */
292
293 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
294 static void taskStart(void);
295 static void
296 taskStart(void)
297 {
298   schedule();
299 }
300 #endif
301
302
303
304
305 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
306 //@subsection Main scheduling loop
307
308 /* ---------------------------------------------------------------------------
309    Main scheduling loop.
310
311    We use round-robin scheduling, each thread returning to the
312    scheduler loop when one of these conditions is detected:
313
314       * out of heap space
315       * timer expires (thread yields)
316       * thread blocks
317       * thread ends
318       * stack overflow
319
320    Locking notes:  we acquire the scheduler lock once at the beginning
321    of the scheduler loop, and release it when
322     
323       * running a thread, or
324       * waiting for work, or
325       * waiting for a GC to complete.
326
327    GRAN version:
328      In a GranSim setup this loop iterates over the global event queue.
329      This revolves around the global event queue, which determines what 
330      to do next. Therefore, it's more complicated than either the 
331      concurrent or the parallel (GUM) setup.
332
333    GUM version:
334      GUM iterates over incoming messages.
335      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
336      and sends out a fish whenever it has nothing to do; in-between
337      doing the actual reductions (shared code below) it processes the
338      incoming messages and deals with delayed operations 
339      (see PendingFetches).
340      This is not the ugliest code you could imagine, but it's bloody close.
341
342    ------------------------------------------------------------------------ */
343 //@cindex schedule
344 static void
345 schedule( void )
346 {
347   StgTSO *t;
348   Capability *cap;
349   StgThreadReturnCode ret;
350 #if defined(GRAN)
351   rtsEvent *event;
352 #elif defined(PAR)
353   StgSparkPool *pool;
354   rtsSpark spark;
355   StgTSO *tso;
356   GlobalTaskId pe;
357   rtsBool receivedFinish = rtsFalse;
358 # if defined(DEBUG)
359   nat tp_size, sp_size; // stats only
360 # endif
361 #endif
362   rtsBool was_interrupted = rtsFalse;
363   
364   ACQUIRE_LOCK(&sched_mutex);
365  
366 #if defined(RTS_SUPPORTS_THREADS)
367   waitForWorkCapability(&sched_mutex, &cap, rtsFalse);
368 #else
369   /* simply initialise it in the non-threaded case */
370   grabCapability(&cap);
371 #endif
372
373 #if defined(GRAN)
374   /* set up first event to get things going */
375   /* ToDo: assign costs for system setup and init MainTSO ! */
376   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
377             ContinueThread, 
378             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
379
380   IF_DEBUG(gran,
381            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
382            G_TSO(CurrentTSO, 5));
383
384   if (RtsFlags.GranFlags.Light) {
385     /* Save current time; GranSim Light only */
386     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
387   }      
388
389   event = get_next_event();
390
391   while (event!=(rtsEvent*)NULL) {
392     /* Choose the processor with the next event */
393     CurrentProc = event->proc;
394     CurrentTSO = event->tso;
395
396 #elif defined(PAR)
397
398   while (!receivedFinish) {    /* set by processMessages */
399                                /* when receiving PP_FINISH message         */ 
400 #else
401
402   while (1) {
403
404 #endif
405
406     IF_DEBUG(scheduler, printAllThreads());
407
408 #if defined(RTS_SUPPORTS_THREADS)
409     /* Check to see whether there are any worker threads
410        waiting to deposit external call results. If so,
411        yield our capability */
412     yieldToReturningWorker(&sched_mutex, cap);
413 #endif
414
415     /* If we're interrupted (the user pressed ^C, or some other
416      * termination condition occurred), kill all the currently running
417      * threads.
418      */
419     if (interrupted) {
420       IF_DEBUG(scheduler, sched_belch("interrupted"));
421       deleteAllThreads();
422       interrupted = rtsFalse;
423       was_interrupted = rtsTrue;
424     }
425
426     /* Go through the list of main threads and wake up any
427      * clients whose computations have finished.  ToDo: this
428      * should be done more efficiently without a linear scan
429      * of the main threads list, somehow...
430      */
431 #if defined(RTS_SUPPORTS_THREADS)
432     { 
433       StgMainThread *m, **prev;
434       prev = &main_threads;
435       for (m = main_threads; m != NULL; m = m->link) {
436         switch (m->tso->what_next) {
437         case ThreadComplete:
438           if (m->ret) {
439             *(m->ret) = (StgClosure *)m->tso->sp[0];
440           }
441           *prev = m->link;
442           m->stat = Success;
443           broadcastCondition(&m->wakeup);
444 #ifdef DEBUG
445           free(m->tso->label);
446 #endif
447           break;
448         case ThreadKilled:
449           if (m->ret) *(m->ret) = NULL;
450           *prev = m->link;
451           if (was_interrupted) {
452             m->stat = Interrupted;
453           } else {
454             m->stat = Killed;
455           }
456           broadcastCondition(&m->wakeup);
457 #ifdef DEBUG
458           free(m->tso->label);
459 #endif
460           break;
461         default:
462           break;
463         }
464       }
465     }
466
467 #else /* not threaded */
468
469 # if defined(PAR)
470     /* in GUM do this only on the Main PE */
471     if (IAmMainThread)
472 # endif
473     /* If our main thread has finished or been killed, return.
474      */
475     {
476       StgMainThread *m = main_threads;
477       if (m->tso->what_next == ThreadComplete
478           || m->tso->what_next == ThreadKilled) {
479 #ifdef DEBUG
480         free(m->tso->label);
481 #endif
482         main_threads = main_threads->link;
483         if (m->tso->what_next == ThreadComplete) {
484           /* we finished successfully, fill in the return value */
485           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
486           m->stat = Success;
487           return;
488         } else {
489           if (m->ret) { *(m->ret) = NULL; };
490           if (was_interrupted) {
491             m->stat = Interrupted;
492           } else {
493             m->stat = Killed;
494           }
495           return;
496         }
497       }
498     }
499 #endif
500
501     /* Top up the run queue from our spark pool.  We try to make the
502      * number of threads in the run queue equal to the number of
503      * free capabilities.
504      *
505      * Disable spark support in SMP for now, non-essential & requires
506      * a little bit of work to make it compile cleanly. -- sof 1/02.
507      */
508 #if 0 /* defined(SMP) */
509     {
510       nat n = getFreeCapabilities();
511       StgTSO *tso = run_queue_hd;
512
513       /* Count the run queue */
514       while (n > 0 && tso != END_TSO_QUEUE) {
515         tso = tso->link;
516         n--;
517       }
518
519       for (; n > 0; n--) {
520         StgClosure *spark;
521         spark = findSpark(rtsFalse);
522         if (spark == NULL) {
523           break; /* no more sparks in the pool */
524         } else {
525           /* I'd prefer this to be done in activateSpark -- HWL */
526           /* tricky - it needs to hold the scheduler lock and
527            * not try to re-acquire it -- SDM */
528           createSparkThread(spark);       
529           IF_DEBUG(scheduler,
530                    sched_belch("==^^ turning spark of closure %p into a thread",
531                                (StgClosure *)spark));
532         }
533       }
534       /* We need to wake up the other tasks if we just created some
535        * work for them.
536        */
537       if (getFreeCapabilities() - n > 1) {
538           signalCondition( &thread_ready_cond );
539       }
540     }
541 #endif // SMP
542
543     /* check for signals each time around the scheduler */
544 #ifndef mingw32_TARGET_OS
545     if (signals_pending()) {
546       startSignalHandlers();
547     }
548 #endif
549
550     /* Check whether any waiting threads need to be woken up.  If the
551      * run queue is empty, and there are no other tasks running, we
552      * can wait indefinitely for something to happen.
553      * ToDo: what if another client comes along & requests another
554      * main thread?
555      */
556     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) {
557       awaitEvent( EMPTY_RUN_QUEUE()
558 #if defined(SMP)
559         && allFreeCapabilities()
560 #endif
561         );
562     }
563     /* we can be interrupted while waiting for I/O... */
564     if (interrupted) continue;
565
566     /* 
567      * Detect deadlock: when we have no threads to run, there are no
568      * threads waiting on I/O or sleeping, and all the other tasks are
569      * waiting for work, we must have a deadlock of some description.
570      *
571      * We first try to find threads blocked on themselves (ie. black
572      * holes), and generate NonTermination exceptions where necessary.
573      *
574      * If no threads are black holed, we have a deadlock situation, so
575      * inform all the main threads.
576      */
577 #ifndef PAR
578     if (   EMPTY_THREAD_QUEUES()
579 #if defined(RTS_SUPPORTS_THREADS)
580         && EMPTY_QUEUE(suspended_ccalling_threads)
581 #endif
582 #ifdef SMP
583         && allFreeCapabilities()
584 #endif
585         )
586     {
587         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
588 #if defined(THREADED_RTS)
589         /* and SMP mode ..? */
590         releaseCapability(cap);
591 #endif
592         // Garbage collection can release some new threads due to
593         // either (a) finalizers or (b) threads resurrected because
594         // they are about to be send BlockedOnDeadMVar.  Any threads
595         // thus released will be immediately runnable.
596         GarbageCollect(GetRoots,rtsTrue);
597
598         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
599
600         IF_DEBUG(scheduler, 
601                  sched_belch("still deadlocked, checking for black holes..."));
602         detectBlackHoles();
603
604         if ( !EMPTY_RUN_QUEUE() ) { goto not_deadlocked; }
605
606 #ifndef mingw32_TARGET_OS
607         /* If we have user-installed signal handlers, then wait
608          * for signals to arrive rather then bombing out with a
609          * deadlock.
610          */
611         if ( anyUserHandlers() ) {
612             IF_DEBUG(scheduler, 
613                      sched_belch("still deadlocked, waiting for signals..."));
614
615             awaitUserSignals();
616
617             // we might be interrupted...
618             if (interrupted) { continue; }
619
620             if (signals_pending()) {
621                 startSignalHandlers();
622             }
623             ASSERT(!EMPTY_RUN_QUEUE());
624             goto not_deadlocked;
625         }
626 #endif
627
628         /* Probably a real deadlock.  Send the current main thread the
629          * Deadlock exception (or in the SMP build, send *all* main
630          * threads the deadlock exception, since none of them can make
631          * progress).
632          */
633         {
634             StgMainThread *m;
635 #if defined(RTS_SUPPORTS_THREADS)
636             for (m = main_threads; m != NULL; m = m->link) {
637                 switch (m->tso->why_blocked) {
638                 case BlockedOnBlackHole:
639                     raiseAsyncWithLock(m->tso, (StgClosure *)NonTermination_closure);
640                     break;
641                 case BlockedOnException:
642                 case BlockedOnMVar:
643                     raiseAsyncWithLock(m->tso, (StgClosure *)Deadlock_closure);
644                     break;
645                 default:
646                     barf("deadlock: main thread blocked in a strange way");
647                 }
648             }
649 #else
650             m = main_threads;
651             switch (m->tso->why_blocked) {
652             case BlockedOnBlackHole:
653                 raiseAsyncWithLock(m->tso, (StgClosure *)NonTermination_closure);
654                 break;
655             case BlockedOnException:
656             case BlockedOnMVar:
657                 raiseAsyncWithLock(m->tso, (StgClosure *)Deadlock_closure);
658                 break;
659             default:
660                 barf("deadlock: main thread blocked in a strange way");
661             }
662 #endif
663         }
664
665 #if defined(RTS_SUPPORTS_THREADS)
666         /* ToDo: revisit conditions (and mechanism) for shutting
667            down a multi-threaded world  */
668         IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down."));
669         shutdownHaskellAndExit(0);
670 #endif
671     }
672   not_deadlocked:
673
674 #elif defined(PAR)
675     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
676 #endif
677
678 #if defined(SMP)
679     /* If there's a GC pending, don't do anything until it has
680      * completed.
681      */
682     if (ready_to_gc) {
683       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
684       waitCondition( &gc_pending_cond, &sched_mutex );
685     }
686 #endif    
687
688 #if defined(RTS_SUPPORTS_THREADS)
689     /* block until we've got a thread on the run queue and a free
690      * capability.
691      *
692      */
693     if ( EMPTY_RUN_QUEUE() ) {
694       /* Give up our capability */
695       releaseCapability(cap);
696       IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId()));
697       waitForWorkCapability(&sched_mutex, &cap, rtsTrue);
698       IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
699 #if 0
700       while ( EMPTY_RUN_QUEUE() ) {
701         waitForWorkCapability(&sched_mutex, &cap);
702         IF_DEBUG(scheduler, sched_belch("thread %d: work now available", osThreadId()));
703       }
704 #endif
705     }
706 #endif
707
708 #if defined(GRAN)
709     if (RtsFlags.GranFlags.Light)
710       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
711
712     /* adjust time based on time-stamp */
713     if (event->time > CurrentTime[CurrentProc] &&
714         event->evttype != ContinueThread)
715       CurrentTime[CurrentProc] = event->time;
716     
717     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
718     if (!RtsFlags.GranFlags.Light)
719       handleIdlePEs();
720
721     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
722
723     /* main event dispatcher in GranSim */
724     switch (event->evttype) {
725       /* Should just be continuing execution */
726     case ContinueThread:
727       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
728       /* ToDo: check assertion
729       ASSERT(run_queue_hd != (StgTSO*)NULL &&
730              run_queue_hd != END_TSO_QUEUE);
731       */
732       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
733       if (!RtsFlags.GranFlags.DoAsyncFetch &&
734           procStatus[CurrentProc]==Fetching) {
735         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
736               CurrentTSO->id, CurrentTSO, CurrentProc);
737         goto next_thread;
738       } 
739       /* Ignore ContinueThreads for completed threads */
740       if (CurrentTSO->what_next == ThreadComplete) {
741         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
742               CurrentTSO->id, CurrentTSO, CurrentProc);
743         goto next_thread;
744       } 
745       /* Ignore ContinueThreads for threads that are being migrated */
746       if (PROCS(CurrentTSO)==Nowhere) { 
747         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
748               CurrentTSO->id, CurrentTSO, CurrentProc);
749         goto next_thread;
750       }
751       /* The thread should be at the beginning of the run queue */
752       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
753         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
754               CurrentTSO->id, CurrentTSO, CurrentProc);
755         break; // run the thread anyway
756       }
757       /*
758       new_event(proc, proc, CurrentTime[proc],
759                 FindWork,
760                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
761       goto next_thread; 
762       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
763       break; // now actually run the thread; DaH Qu'vam yImuHbej 
764
765     case FetchNode:
766       do_the_fetchnode(event);
767       goto next_thread;             /* handle next event in event queue  */
768       
769     case GlobalBlock:
770       do_the_globalblock(event);
771       goto next_thread;             /* handle next event in event queue  */
772       
773     case FetchReply:
774       do_the_fetchreply(event);
775       goto next_thread;             /* handle next event in event queue  */
776       
777     case UnblockThread:   /* Move from the blocked queue to the tail of */
778       do_the_unblock(event);
779       goto next_thread;             /* handle next event in event queue  */
780       
781     case ResumeThread:  /* Move from the blocked queue to the tail of */
782       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
783       event->tso->gran.blocktime += 
784         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
785       do_the_startthread(event);
786       goto next_thread;             /* handle next event in event queue  */
787       
788     case StartThread:
789       do_the_startthread(event);
790       goto next_thread;             /* handle next event in event queue  */
791       
792     case MoveThread:
793       do_the_movethread(event);
794       goto next_thread;             /* handle next event in event queue  */
795       
796     case MoveSpark:
797       do_the_movespark(event);
798       goto next_thread;             /* handle next event in event queue  */
799       
800     case FindWork:
801       do_the_findwork(event);
802       goto next_thread;             /* handle next event in event queue  */
803       
804     default:
805       barf("Illegal event type %u\n", event->evttype);
806     }  /* switch */
807     
808     /* This point was scheduler_loop in the old RTS */
809
810     IF_DEBUG(gran, belch("GRAN: after main switch"));
811
812     TimeOfLastEvent = CurrentTime[CurrentProc];
813     TimeOfNextEvent = get_time_of_next_event();
814     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
815     // CurrentTSO = ThreadQueueHd;
816
817     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
818                          TimeOfNextEvent));
819
820     if (RtsFlags.GranFlags.Light) 
821       GranSimLight_leave_system(event, &ActiveTSO); 
822
823     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
824
825     IF_DEBUG(gran, 
826              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
827
828     /* in a GranSim setup the TSO stays on the run queue */
829     t = CurrentTSO;
830     /* Take a thread from the run queue. */
831     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
832
833     IF_DEBUG(gran, 
834              fprintf(stderr, "GRAN: About to run current thread, which is\n");
835              G_TSO(t,5));
836
837     context_switch = 0; // turned on via GranYield, checking events and time slice
838
839     IF_DEBUG(gran, 
840              DumpGranEvent(GR_SCHEDULE, t));
841
842     procStatus[CurrentProc] = Busy;
843
844 #elif defined(PAR)
845     if (PendingFetches != END_BF_QUEUE) {
846         processFetches();
847     }
848
849     /* ToDo: phps merge with spark activation above */
850     /* check whether we have local work and send requests if we have none */
851     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
852       /* :-[  no local threads => look out for local sparks */
853       /* the spark pool for the current PE */
854       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
855       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
856           pool->hd < pool->tl) {
857         /* 
858          * ToDo: add GC code check that we really have enough heap afterwards!!
859          * Old comment:
860          * If we're here (no runnable threads) and we have pending
861          * sparks, we must have a space problem.  Get enough space
862          * to turn one of those pending sparks into a
863          * thread... 
864          */
865
866         spark = findSpark(rtsFalse);                /* get a spark */
867         if (spark != (rtsSpark) NULL) {
868           tso = activateSpark(spark);       /* turn the spark into a thread */
869           IF_PAR_DEBUG(schedule,
870                        belch("==== schedule: Created TSO %d (%p); %d threads active",
871                              tso->id, tso, advisory_thread_count));
872
873           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
874             belch("==^^ failed to activate spark");
875             goto next_thread;
876           }               /* otherwise fall through & pick-up new tso */
877         } else {
878           IF_PAR_DEBUG(verbose,
879                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
880                              spark_queue_len(pool)));
881           goto next_thread;
882         }
883       }
884
885       /* If we still have no work we need to send a FISH to get a spark
886          from another PE 
887       */
888       if (EMPTY_RUN_QUEUE()) {
889       /* =8-[  no local sparks => look for work on other PEs */
890         /*
891          * We really have absolutely no work.  Send out a fish
892          * (there may be some out there already), and wait for
893          * something to arrive.  We clearly can't run any threads
894          * until a SCHEDULE or RESUME arrives, and so that's what
895          * we're hoping to see.  (Of course, we still have to
896          * respond to other types of messages.)
897          */
898         TIME now = msTime() /*CURRENT_TIME*/;
899         IF_PAR_DEBUG(verbose, 
900                      belch("--  now=%ld", now));
901         IF_PAR_DEBUG(verbose,
902                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
903                          (last_fish_arrived_at!=0 &&
904                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
905                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
906                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
907                              last_fish_arrived_at,
908                              RtsFlags.ParFlags.fishDelay, now);
909                      });
910         
911         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
912             (last_fish_arrived_at==0 ||
913              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
914           /* outstandingFishes is set in sendFish, processFish;
915              avoid flooding system with fishes via delay */
916           pe = choosePE();
917           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
918                    NEW_FISH_HUNGER);
919
920           // Global statistics: count no. of fishes
921           if (RtsFlags.ParFlags.ParStats.Global &&
922               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
923             globalParStats.tot_fish_mess++;
924           }
925         }
926       
927         receivedFinish = processMessages();
928         goto next_thread;
929       }
930     } else if (PacketsWaiting()) {  /* Look for incoming messages */
931       receivedFinish = processMessages();
932     }
933
934     /* Now we are sure that we have some work available */
935     ASSERT(run_queue_hd != END_TSO_QUEUE);
936
937     /* Take a thread from the run queue, if we have work */
938     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
939     IF_DEBUG(sanity,checkTSO(t));
940
941     /* ToDo: write something to the log-file
942     if (RTSflags.ParFlags.granSimStats && !sameThread)
943         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
944
945     CurrentTSO = t;
946     */
947     /* the spark pool for the current PE */
948     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
949
950     IF_DEBUG(scheduler, 
951              belch("--=^ %d threads, %d sparks on [%#x]", 
952                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
953
954 # if 1
955     if (0 && RtsFlags.ParFlags.ParStats.Full && 
956         t && LastTSO && t->id != LastTSO->id && 
957         LastTSO->why_blocked == NotBlocked && 
958         LastTSO->what_next != ThreadComplete) {
959       // if previously scheduled TSO not blocked we have to record the context switch
960       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
961                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
962     }
963
964     if (RtsFlags.ParFlags.ParStats.Full && 
965         (emitSchedule /* forced emit */ ||
966         (t && LastTSO && t->id != LastTSO->id))) {
967       /* 
968          we are running a different TSO, so write a schedule event to log file
969          NB: If we use fair scheduling we also have to write  a deschedule 
970              event for LastTSO; with unfair scheduling we know that the
971              previous tso has blocked whenever we switch to another tso, so
972              we don't need it in GUM for now
973       */
974       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
975                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
976       emitSchedule = rtsFalse;
977     }
978      
979 # endif
980 #else /* !GRAN && !PAR */
981   
982     /* grab a thread from the run queue */
983     ASSERT(run_queue_hd != END_TSO_QUEUE);
984     t = POP_RUN_QUEUE();
985     // Sanity check the thread we're about to run.  This can be
986     // expensive if there is lots of thread switching going on...
987     IF_DEBUG(sanity,checkTSO(t));
988 #endif
989     
990     cap->r.rCurrentTSO = t;
991     
992     /* context switches are now initiated by the timer signal, unless
993      * the user specified "context switch as often as possible", with
994      * +RTS -C0
995      */
996     if (
997 #ifdef PROFILING
998         RtsFlags.ProfFlags.profileInterval == 0 ||
999 #endif
1000         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1001          && (run_queue_hd != END_TSO_QUEUE
1002              || blocked_queue_hd != END_TSO_QUEUE
1003              || sleeping_queue != END_TSO_QUEUE)))
1004         context_switch = 1;
1005     else
1006         context_switch = 0;
1007
1008     RELEASE_LOCK(&sched_mutex);
1009
1010     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1011                               t->id, t, whatNext_strs[t->what_next]));
1012
1013 #ifdef PROFILING
1014     startHeapProfTimer();
1015 #endif
1016
1017     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1018     /* Run the current thread 
1019      */
1020     switch (cap->r.rCurrentTSO->what_next) {
1021     case ThreadKilled:
1022     case ThreadComplete:
1023         /* Thread already finished, return to scheduler. */
1024         ret = ThreadFinished;
1025         break;
1026     case ThreadEnterGHC:
1027         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1028         break;
1029     case ThreadRunGHC:
1030         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1031         break;
1032     case ThreadEnterInterp:
1033         ret = interpretBCO(cap);
1034         break;
1035     default:
1036       barf("schedule: invalid what_next field");
1037     }
1038     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1039     
1040     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1041 #ifdef PROFILING
1042     stopHeapProfTimer();
1043     CCCS = CCS_SYSTEM;
1044 #endif
1045     
1046     ACQUIRE_LOCK(&sched_mutex);
1047
1048 #ifdef SMP
1049     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1050 #elif !defined(GRAN) && !defined(PAR)
1051     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1052 #endif
1053     t = cap->r.rCurrentTSO;
1054     
1055 #if defined(PAR)
1056     /* HACK 675: if the last thread didn't yield, make sure to print a 
1057        SCHEDULE event to the log file when StgRunning the next thread, even
1058        if it is the same one as before */
1059     LastTSO = t; 
1060     TimeOfLastYield = CURRENT_TIME;
1061 #endif
1062
1063     switch (ret) {
1064     case HeapOverflow:
1065 #if defined(GRAN)
1066       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1067       globalGranStats.tot_heapover++;
1068 #elif defined(PAR)
1069       globalParStats.tot_heapover++;
1070 #endif
1071
1072       // did the task ask for a large block?
1073       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1074           // if so, get one and push it on the front of the nursery.
1075           bdescr *bd;
1076           nat blocks;
1077           
1078           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1079
1080           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1081                                    t->id, t,
1082                                    whatNext_strs[t->what_next], blocks));
1083
1084           // don't do this if it would push us over the
1085           // alloc_blocks_lim limit; we'll GC first.
1086           if (alloc_blocks + blocks < alloc_blocks_lim) {
1087
1088               alloc_blocks += blocks;
1089               bd = allocGroup( blocks );
1090
1091               // link the new group into the list
1092               bd->link = cap->r.rCurrentNursery;
1093               bd->u.back = cap->r.rCurrentNursery->u.back;
1094               if (cap->r.rCurrentNursery->u.back != NULL) {
1095                   cap->r.rCurrentNursery->u.back->link = bd;
1096               } else {
1097                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1098                          g0s0->blocks == cap->r.rNursery);
1099                   cap->r.rNursery = g0s0->blocks = bd;
1100               }           
1101               cap->r.rCurrentNursery->u.back = bd;
1102
1103               // initialise it as a nursery block
1104               bd->step = g0s0;
1105               bd->gen_no = 0;
1106               bd->flags = 0;
1107               bd->free = bd->start;
1108
1109               // don't forget to update the block count in g0s0.
1110               g0s0->n_blocks += blocks;
1111               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1112
1113               // now update the nursery to point to the new block
1114               cap->r.rCurrentNursery = bd;
1115
1116               // we might be unlucky and have another thread get on the
1117               // run queue before us and steal the large block, but in that
1118               // case the thread will just end up requesting another large
1119               // block.
1120               PUSH_ON_RUN_QUEUE(t);
1121               break;
1122           }
1123       }
1124
1125       /* make all the running tasks block on a condition variable,
1126        * maybe set context_switch and wait till they all pile in,
1127        * then have them wait on a GC condition variable.
1128        */
1129       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1130                                t->id, t, whatNext_strs[t->what_next]));
1131       threadPaused(t);
1132 #if defined(GRAN)
1133       ASSERT(!is_on_queue(t,CurrentProc));
1134 #elif defined(PAR)
1135       /* Currently we emit a DESCHEDULE event before GC in GUM.
1136          ToDo: either add separate event to distinguish SYSTEM time from rest
1137                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1138       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1139         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1140                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1141         emitSchedule = rtsTrue;
1142       }
1143 #endif
1144       
1145       ready_to_gc = rtsTrue;
1146       context_switch = 1;               /* stop other threads ASAP */
1147       PUSH_ON_RUN_QUEUE(t);
1148       /* actual GC is done at the end of the while loop */
1149       break;
1150       
1151     case StackOverflow:
1152 #if defined(GRAN)
1153       IF_DEBUG(gran, 
1154                DumpGranEvent(GR_DESCHEDULE, t));
1155       globalGranStats.tot_stackover++;
1156 #elif defined(PAR)
1157       // IF_DEBUG(par, 
1158       // DumpGranEvent(GR_DESCHEDULE, t);
1159       globalParStats.tot_stackover++;
1160 #endif
1161       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1162                                t->id, t, whatNext_strs[t->what_next]));
1163       /* just adjust the stack for this thread, then pop it back
1164        * on the run queue.
1165        */
1166       threadPaused(t);
1167       { 
1168         StgMainThread *m;
1169         /* enlarge the stack */
1170         StgTSO *new_t = threadStackOverflow(t);
1171         
1172         /* This TSO has moved, so update any pointers to it from the
1173          * main thread stack.  It better not be on any other queues...
1174          * (it shouldn't be).
1175          */
1176         for (m = main_threads; m != NULL; m = m->link) {
1177           if (m->tso == t) {
1178             m->tso = new_t;
1179           }
1180         }
1181         threadPaused(new_t);
1182         PUSH_ON_RUN_QUEUE(new_t);
1183       }
1184       break;
1185
1186     case ThreadYielding:
1187 #if defined(GRAN)
1188       IF_DEBUG(gran, 
1189                DumpGranEvent(GR_DESCHEDULE, t));
1190       globalGranStats.tot_yields++;
1191 #elif defined(PAR)
1192       // IF_DEBUG(par, 
1193       // DumpGranEvent(GR_DESCHEDULE, t);
1194       globalParStats.tot_yields++;
1195 #endif
1196       /* put the thread back on the run queue.  Then, if we're ready to
1197        * GC, check whether this is the last task to stop.  If so, wake
1198        * up the GC thread.  getThread will block during a GC until the
1199        * GC is finished.
1200        */
1201       IF_DEBUG(scheduler,
1202                if (t->what_next == ThreadEnterInterp) {
1203                    /* ToDo: or maybe a timer expired when we were in Hugs?
1204                     * or maybe someone hit ctrl-C
1205                     */
1206                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1207                          t->id, t, whatNext_strs[t->what_next]);
1208                } else {
1209                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1210                          t->id, t, whatNext_strs[t->what_next]);
1211                }
1212                );
1213
1214       threadPaused(t);
1215
1216       IF_DEBUG(sanity,
1217                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1218                checkTSO(t));
1219       ASSERT(t->link == END_TSO_QUEUE);
1220 #if defined(GRAN)
1221       ASSERT(!is_on_queue(t,CurrentProc));
1222
1223       IF_DEBUG(sanity,
1224                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1225                checkThreadQsSanity(rtsTrue));
1226 #endif
1227 #if defined(PAR)
1228       if (RtsFlags.ParFlags.doFairScheduling) { 
1229         /* this does round-robin scheduling; good for concurrency */
1230         APPEND_TO_RUN_QUEUE(t);
1231       } else {
1232         /* this does unfair scheduling; good for parallelism */
1233         PUSH_ON_RUN_QUEUE(t);
1234       }
1235 #else
1236       /* this does round-robin scheduling; good for concurrency */
1237       APPEND_TO_RUN_QUEUE(t);
1238 #endif
1239 #if defined(GRAN)
1240       /* add a ContinueThread event to actually process the thread */
1241       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1242                 ContinueThread,
1243                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1244       IF_GRAN_DEBUG(bq, 
1245                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1246                G_EVENTQ(0);
1247                G_CURR_THREADQ(0));
1248 #endif /* GRAN */
1249       break;
1250       
1251     case ThreadBlocked:
1252 #if defined(GRAN)
1253       IF_DEBUG(scheduler,
1254                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1255                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1256                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1257
1258       // ??? needed; should emit block before
1259       IF_DEBUG(gran, 
1260                DumpGranEvent(GR_DESCHEDULE, t)); 
1261       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1262       /*
1263         ngoq Dogh!
1264       ASSERT(procStatus[CurrentProc]==Busy || 
1265               ((procStatus[CurrentProc]==Fetching) && 
1266               (t->block_info.closure!=(StgClosure*)NULL)));
1267       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1268           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1269             procStatus[CurrentProc]==Fetching)) 
1270         procStatus[CurrentProc] = Idle;
1271       */
1272 #elif defined(PAR)
1273       IF_DEBUG(scheduler,
1274                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1275                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1276       IF_PAR_DEBUG(bq,
1277
1278                    if (t->block_info.closure!=(StgClosure*)NULL) 
1279                      print_bq(t->block_info.closure));
1280
1281       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1282       blockThread(t);
1283
1284       /* whatever we schedule next, we must log that schedule */
1285       emitSchedule = rtsTrue;
1286
1287 #else /* !GRAN */
1288       /* don't need to do anything.  Either the thread is blocked on
1289        * I/O, in which case we'll have called addToBlockedQueue
1290        * previously, or it's blocked on an MVar or Blackhole, in which
1291        * case it'll be on the relevant queue already.
1292        */
1293       IF_DEBUG(scheduler,
1294                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1295                printThreadBlockage(t);
1296                fprintf(stderr, "\n"));
1297
1298       /* Only for dumping event to log file 
1299          ToDo: do I need this in GranSim, too?
1300       blockThread(t);
1301       */
1302 #endif
1303       threadPaused(t);
1304       break;
1305       
1306     case ThreadFinished:
1307       /* Need to check whether this was a main thread, and if so, signal
1308        * the task that started it with the return value.  If we have no
1309        * more main threads, we probably need to stop all the tasks until
1310        * we get a new one.
1311        */
1312       /* We also end up here if the thread kills itself with an
1313        * uncaught exception, see Exception.hc.
1314        */
1315       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1316 #if defined(GRAN)
1317       endThread(t, CurrentProc); // clean-up the thread
1318 #elif defined(PAR)
1319       /* For now all are advisory -- HWL */
1320       //if(t->priority==AdvisoryPriority) ??
1321       advisory_thread_count--;
1322       
1323 # ifdef DIST
1324       if(t->dist.priority==RevalPriority)
1325         FinishReval(t);
1326 # endif
1327       
1328       if (RtsFlags.ParFlags.ParStats.Full &&
1329           !RtsFlags.ParFlags.ParStats.Suppressed) 
1330         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1331 #endif
1332       break;
1333       
1334     default:
1335       barf("schedule: invalid thread return code %d", (int)ret);
1336     }
1337
1338 #ifdef PROFILING
1339     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1340         GarbageCollect(GetRoots, rtsTrue);
1341         heapCensus();
1342         performHeapProfile = rtsFalse;
1343         ready_to_gc = rtsFalse; // we already GC'd
1344     }
1345 #endif
1346
1347     if (ready_to_gc 
1348 #ifdef SMP
1349         && allFreeCapabilities() 
1350 #endif
1351         ) {
1352       /* everybody back, start the GC.
1353        * Could do it in this thread, or signal a condition var
1354        * to do it in another thread.  Either way, we need to
1355        * broadcast on gc_pending_cond afterward.
1356        */
1357 #if defined(RTS_SUPPORTS_THREADS)
1358       IF_DEBUG(scheduler,sched_belch("doing GC"));
1359 #endif
1360       GarbageCollect(GetRoots,rtsFalse);
1361       ready_to_gc = rtsFalse;
1362 #ifdef SMP
1363       broadcastCondition(&gc_pending_cond);
1364 #endif
1365 #if defined(GRAN)
1366       /* add a ContinueThread event to continue execution of current thread */
1367       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1368                 ContinueThread,
1369                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1370       IF_GRAN_DEBUG(bq, 
1371                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1372                G_EVENTQ(0);
1373                G_CURR_THREADQ(0));
1374 #endif /* GRAN */
1375     }
1376
1377 #if defined(GRAN)
1378   next_thread:
1379     IF_GRAN_DEBUG(unused,
1380                   print_eventq(EventHd));
1381
1382     event = get_next_event();
1383 #elif defined(PAR)
1384   next_thread:
1385     /* ToDo: wait for next message to arrive rather than busy wait */
1386 #endif /* GRAN */
1387
1388   } /* end of while(1) */
1389
1390   IF_PAR_DEBUG(verbose,
1391                belch("== Leaving schedule() after having received Finish"));
1392 }
1393
1394 /* ---------------------------------------------------------------------------
1395  * Singleton fork(). Do not copy any running threads.
1396  * ------------------------------------------------------------------------- */
1397
1398 StgInt forkProcess(StgTSO* tso) {
1399
1400 #ifndef mingw32_TARGET_OS
1401   pid_t pid;
1402   StgTSO* t,*next;
1403
1404   IF_DEBUG(scheduler,sched_belch("forking!"));
1405
1406   pid = fork();
1407   if (pid) { /* parent */
1408
1409   /* just return the pid */
1410     
1411   } else { /* child */
1412   /* wipe all other threads */
1413   run_queue_hd = tso;
1414   tso->link = END_TSO_QUEUE;
1415
1416   /* DO NOT TOUCH THE QUEUES directly because most of the code around
1417      us is picky about finding the threat still in its queue when
1418      handling the deleteThread() */
1419
1420   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1421     next = t->link;
1422     if (t->id != tso->id) {
1423       deleteThread(t);
1424     }
1425   }
1426   }
1427   return pid;
1428 #else /* mingw32 */
1429   barf("forkProcess#: primop not implemented for mingw32, sorry!");
1430   return -1;
1431 #endif /* mingw32 */
1432 }
1433
1434 /* ---------------------------------------------------------------------------
1435  * deleteAllThreads():  kill all the live threads.
1436  *
1437  * This is used when we catch a user interrupt (^C), before performing
1438  * any necessary cleanups and running finalizers.
1439  * ------------------------------------------------------------------------- */
1440    
1441 void deleteAllThreads ( void )
1442 {
1443   StgTSO* t, *next;
1444   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1445   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1446       next = t->global_link;
1447       deleteThread(t);
1448   }      
1449   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1450   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1451   sleeping_queue = END_TSO_QUEUE;
1452 }
1453
1454 /* startThread and  insertThread are now in GranSim.c -- HWL */
1455
1456
1457 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1458 //@subsection Suspend and Resume
1459
1460 /* ---------------------------------------------------------------------------
1461  * Suspending & resuming Haskell threads.
1462  * 
1463  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1464  * its capability before calling the C function.  This allows another
1465  * task to pick up the capability and carry on running Haskell
1466  * threads.  It also means that if the C call blocks, it won't lock
1467  * the whole system.
1468  *
1469  * The Haskell thread making the C call is put to sleep for the
1470  * duration of the call, on the susepended_ccalling_threads queue.  We
1471  * give out a token to the task, which it can use to resume the thread
1472  * on return from the C function.
1473  * ------------------------------------------------------------------------- */
1474    
1475 StgInt
1476 suspendThread( StgRegTable *reg, 
1477                rtsBool concCall
1478 #if !defined(RTS_SUPPORTS_THREADS) && !defined(DEBUG)
1479                STG_UNUSED
1480 #endif
1481                )
1482 {
1483   nat tok;
1484   Capability *cap;
1485
1486   /* assume that *reg is a pointer to the StgRegTable part
1487    * of a Capability.
1488    */
1489   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1490
1491   ACQUIRE_LOCK(&sched_mutex);
1492
1493   IF_DEBUG(scheduler,
1494            sched_belch("thread %d did a _ccall_gc (is_concurrent: %d)", cap->r.rCurrentTSO->id,concCall));
1495
1496   threadPaused(cap->r.rCurrentTSO);
1497   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1498   suspended_ccalling_threads = cap->r.rCurrentTSO;
1499
1500 #if defined(RTS_SUPPORTS_THREADS)
1501   cap->r.rCurrentTSO->why_blocked  = BlockedOnCCall;
1502 #endif
1503
1504   /* Use the thread ID as the token; it should be unique */
1505   tok = cap->r.rCurrentTSO->id;
1506
1507   /* Hand back capability */
1508   releaseCapability(cap);
1509   
1510 #if defined(RTS_SUPPORTS_THREADS)
1511   /* Preparing to leave the RTS, so ensure there's a native thread/task
1512      waiting to take over.
1513      
1514      ToDo: optimise this and only create a new task if there's a need
1515      for one (i.e., if there's only one Concurrent Haskell thread alive,
1516      there's no need to create a new task).
1517   */
1518   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS", tok));
1519   if (concCall) {
1520     startTask(taskStart);
1521   }
1522 #endif
1523
1524   /* Other threads _might_ be available for execution; signal this */
1525   THREAD_RUNNABLE();
1526   RELEASE_LOCK(&sched_mutex);
1527   return tok; 
1528 }
1529
1530 StgRegTable *
1531 resumeThread( StgInt tok,
1532               rtsBool concCall
1533 #if !defined(RTS_SUPPORTS_THREADS)
1534                STG_UNUSED
1535 #endif
1536               )
1537 {
1538   StgTSO *tso, **prev;
1539   Capability *cap;
1540
1541 #if defined(RTS_SUPPORTS_THREADS)
1542   /* Wait for permission to re-enter the RTS with the result. */
1543   if ( concCall ) {
1544     ACQUIRE_LOCK(&sched_mutex);
1545     grabReturnCapability(&sched_mutex, &cap);
1546   } else {
1547     grabCapability(&cap);
1548   }
1549 #else
1550   grabCapability(&cap);
1551 #endif
1552
1553   /* Remove the thread off of the suspended list */
1554   prev = &suspended_ccalling_threads;
1555   for (tso = suspended_ccalling_threads; 
1556        tso != END_TSO_QUEUE; 
1557        prev = &tso->link, tso = tso->link) {
1558     if (tso->id == (StgThreadID)tok) {
1559       *prev = tso->link;
1560       break;
1561     }
1562   }
1563   if (tso == END_TSO_QUEUE) {
1564     barf("resumeThread: thread not found");
1565   }
1566   tso->link = END_TSO_QUEUE;
1567   /* Reset blocking status */
1568   tso->why_blocked  = NotBlocked;
1569
1570   RELEASE_LOCK(&sched_mutex);
1571
1572   cap->r.rCurrentTSO = tso;
1573   return &cap->r;
1574 }
1575
1576
1577 /* ---------------------------------------------------------------------------
1578  * Static functions
1579  * ------------------------------------------------------------------------ */
1580 static void unblockThread(StgTSO *tso);
1581
1582 /* ---------------------------------------------------------------------------
1583  * Comparing Thread ids.
1584  *
1585  * This is used from STG land in the implementation of the
1586  * instances of Eq/Ord for ThreadIds.
1587  * ------------------------------------------------------------------------ */
1588
1589 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1590
1591   StgThreadID id1 = tso1->id; 
1592   StgThreadID id2 = tso2->id;
1593  
1594   if (id1 < id2) return (-1);
1595   if (id1 > id2) return 1;
1596   return 0;
1597 }
1598
1599 /* ---------------------------------------------------------------------------
1600  * Fetching the ThreadID from an StgTSO.
1601  *
1602  * This is used in the implementation of Show for ThreadIds.
1603  * ------------------------------------------------------------------------ */
1604 int rts_getThreadId(const StgTSO *tso) 
1605 {
1606   return tso->id;
1607 }
1608
1609 #ifdef DEBUG
1610 void labelThread(StgTSO *tso, char *label)
1611 {
1612   int len;
1613   void *buf;
1614
1615   /* Caveat: Once set, you can only set the thread name to "" */
1616   len = strlen(label)+1;
1617   buf = realloc(tso->label,len);
1618   if (buf == NULL) {
1619     fprintf(stderr,"insufficient memory for labelThread!\n");
1620     free(tso->label);
1621   } else
1622     strncpy(buf,label,len);
1623   tso->label = buf;
1624 }
1625 #endif /* DEBUG */
1626
1627 /* ---------------------------------------------------------------------------
1628    Create a new thread.
1629
1630    The new thread starts with the given stack size.  Before the
1631    scheduler can run, however, this thread needs to have a closure
1632    (and possibly some arguments) pushed on its stack.  See
1633    pushClosure() in Schedule.h.
1634
1635    createGenThread() and createIOThread() (in SchedAPI.h) are
1636    convenient packaged versions of this function.
1637
1638    currently pri (priority) is only used in a GRAN setup -- HWL
1639    ------------------------------------------------------------------------ */
1640 //@cindex createThread
1641 #if defined(GRAN)
1642 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1643 StgTSO *
1644 createThread(nat stack_size, StgInt pri)
1645 {
1646   return createThread_(stack_size, rtsFalse, pri);
1647 }
1648
1649 static StgTSO *
1650 createThread_(nat size, rtsBool have_lock, StgInt pri)
1651 {
1652 #else
1653 StgTSO *
1654 createThread(nat stack_size)
1655 {
1656   return createThread_(stack_size, rtsFalse);
1657 }
1658
1659 static StgTSO *
1660 createThread_(nat size, rtsBool have_lock)
1661 {
1662 #endif
1663
1664     StgTSO *tso;
1665     nat stack_size;
1666
1667     /* First check whether we should create a thread at all */
1668 #if defined(PAR)
1669   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1670   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1671     threadsIgnored++;
1672     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1673           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1674     return END_TSO_QUEUE;
1675   }
1676   threadsCreated++;
1677 #endif
1678
1679 #if defined(GRAN)
1680   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1681 #endif
1682
1683   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1684
1685   /* catch ridiculously small stack sizes */
1686   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1687     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1688   }
1689
1690   stack_size = size - TSO_STRUCT_SIZEW;
1691
1692   tso = (StgTSO *)allocate(size);
1693   TICK_ALLOC_TSO(stack_size, 0);
1694
1695   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1696 #if defined(GRAN)
1697   SET_GRAN_HDR(tso, ThisPE);
1698 #endif
1699   tso->what_next     = ThreadEnterGHC;
1700
1701 #ifdef DEBUG
1702   tso->label = NULL;
1703 #endif
1704
1705   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1706    * protect the increment operation on next_thread_id.
1707    * In future, we could use an atomic increment instead.
1708    */
1709   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1710   tso->id = next_thread_id++; 
1711   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1712
1713   tso->why_blocked  = NotBlocked;
1714   tso->blocked_exceptions = NULL;
1715
1716   tso->stack_size   = stack_size;
1717   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1718                               - TSO_STRUCT_SIZEW;
1719   tso->sp           = (P_)&(tso->stack) + stack_size;
1720
1721 #ifdef PROFILING
1722   tso->prof.CCCS = CCS_MAIN;
1723 #endif
1724
1725   /* put a stop frame on the stack */
1726   tso->sp -= sizeofW(StgStopFrame);
1727   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1728   tso->su = (StgUpdateFrame*)tso->sp;
1729
1730   // ToDo: check this
1731 #if defined(GRAN)
1732   tso->link = END_TSO_QUEUE;
1733   /* uses more flexible routine in GranSim */
1734   insertThread(tso, CurrentProc);
1735 #else
1736   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1737    * from its creation
1738    */
1739 #endif
1740
1741 #if defined(GRAN) 
1742   if (RtsFlags.GranFlags.GranSimStats.Full) 
1743     DumpGranEvent(GR_START,tso);
1744 #elif defined(PAR)
1745   if (RtsFlags.ParFlags.ParStats.Full) 
1746     DumpGranEvent(GR_STARTQ,tso);
1747   /* HACk to avoid SCHEDULE 
1748      LastTSO = tso; */
1749 #endif
1750
1751   /* Link the new thread on the global thread list.
1752    */
1753   tso->global_link = all_threads;
1754   all_threads = tso;
1755
1756 #if defined(DIST)
1757   tso->dist.priority = MandatoryPriority; //by default that is...
1758 #endif
1759
1760 #if defined(GRAN)
1761   tso->gran.pri = pri;
1762 # if defined(DEBUG)
1763   tso->gran.magic = TSO_MAGIC; // debugging only
1764 # endif
1765   tso->gran.sparkname   = 0;
1766   tso->gran.startedat   = CURRENT_TIME; 
1767   tso->gran.exported    = 0;
1768   tso->gran.basicblocks = 0;
1769   tso->gran.allocs      = 0;
1770   tso->gran.exectime    = 0;
1771   tso->gran.fetchtime   = 0;
1772   tso->gran.fetchcount  = 0;
1773   tso->gran.blocktime   = 0;
1774   tso->gran.blockcount  = 0;
1775   tso->gran.blockedat   = 0;
1776   tso->gran.globalsparks = 0;
1777   tso->gran.localsparks  = 0;
1778   if (RtsFlags.GranFlags.Light)
1779     tso->gran.clock  = Now; /* local clock */
1780   else
1781     tso->gran.clock  = 0;
1782
1783   IF_DEBUG(gran,printTSO(tso));
1784 #elif defined(PAR)
1785 # if defined(DEBUG)
1786   tso->par.magic = TSO_MAGIC; // debugging only
1787 # endif
1788   tso->par.sparkname   = 0;
1789   tso->par.startedat   = CURRENT_TIME; 
1790   tso->par.exported    = 0;
1791   tso->par.basicblocks = 0;
1792   tso->par.allocs      = 0;
1793   tso->par.exectime    = 0;
1794   tso->par.fetchtime   = 0;
1795   tso->par.fetchcount  = 0;
1796   tso->par.blocktime   = 0;
1797   tso->par.blockcount  = 0;
1798   tso->par.blockedat   = 0;
1799   tso->par.globalsparks = 0;
1800   tso->par.localsparks  = 0;
1801 #endif
1802
1803 #if defined(GRAN)
1804   globalGranStats.tot_threads_created++;
1805   globalGranStats.threads_created_on_PE[CurrentProc]++;
1806   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1807   globalGranStats.tot_sq_probes++;
1808 #elif defined(PAR)
1809   // collect parallel global statistics (currently done together with GC stats)
1810   if (RtsFlags.ParFlags.ParStats.Global &&
1811       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1812     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1813     globalParStats.tot_threads_created++;
1814   }
1815 #endif 
1816
1817 #if defined(GRAN)
1818   IF_GRAN_DEBUG(pri,
1819                 belch("==__ schedule: Created TSO %d (%p);",
1820                       CurrentProc, tso, tso->id));
1821 #elif defined(PAR)
1822     IF_PAR_DEBUG(verbose,
1823                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1824                        tso->id, tso, advisory_thread_count));
1825 #else
1826   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1827                                  tso->id, tso->stack_size));
1828 #endif    
1829   return tso;
1830 }
1831
1832 #if defined(PAR)
1833 /* RFP:
1834    all parallel thread creation calls should fall through the following routine.
1835 */
1836 StgTSO *
1837 createSparkThread(rtsSpark spark) 
1838 { StgTSO *tso;
1839   ASSERT(spark != (rtsSpark)NULL);
1840   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1841   { threadsIgnored++;
1842     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1843           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1844     return END_TSO_QUEUE;
1845   }
1846   else
1847   { threadsCreated++;
1848     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1849     if (tso==END_TSO_QUEUE)     
1850       barf("createSparkThread: Cannot create TSO");
1851 #if defined(DIST)
1852     tso->priority = AdvisoryPriority;
1853 #endif
1854     pushClosure(tso,spark);
1855     PUSH_ON_RUN_QUEUE(tso);
1856     advisory_thread_count++;    
1857   }
1858   return tso;
1859 }
1860 #endif
1861
1862 /*
1863   Turn a spark into a thread.
1864   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1865 */
1866 #if defined(PAR)
1867 //@cindex activateSpark
1868 StgTSO *
1869 activateSpark (rtsSpark spark) 
1870 {
1871   StgTSO *tso;
1872
1873   tso = createSparkThread(spark);
1874   if (RtsFlags.ParFlags.ParStats.Full) {   
1875     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1876     IF_PAR_DEBUG(verbose,
1877                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1878                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1879   }
1880   // ToDo: fwd info on local/global spark to thread -- HWL
1881   // tso->gran.exported =  spark->exported;
1882   // tso->gran.locked =   !spark->global;
1883   // tso->gran.sparkname = spark->name;
1884
1885   return tso;
1886 }
1887 #endif
1888
1889 /* ---------------------------------------------------------------------------
1890  * scheduleThread()
1891  *
1892  * scheduleThread puts a thread on the head of the runnable queue.
1893  * This will usually be done immediately after a thread is created.
1894  * The caller of scheduleThread must create the thread using e.g.
1895  * createThread and push an appropriate closure
1896  * on this thread's stack before the scheduler is invoked.
1897  * ------------------------------------------------------------------------ */
1898
1899 static void scheduleThread_ (StgTSO* tso, rtsBool createTask);
1900
1901 void
1902 scheduleThread_(StgTSO *tso
1903                , rtsBool createTask
1904 #if !defined(THREADED_RTS)
1905                  STG_UNUSED
1906 #endif
1907               )
1908 {
1909   ACQUIRE_LOCK(&sched_mutex);
1910
1911   /* Put the new thread on the head of the runnable queue.  The caller
1912    * better push an appropriate closure on this thread's stack
1913    * beforehand.  In the SMP case, the thread may start running as
1914    * soon as we release the scheduler lock below.
1915    */
1916   PUSH_ON_RUN_QUEUE(tso);
1917 #if defined(THREADED_RTS)
1918   /* If main() is scheduling a thread, don't bother creating a 
1919    * new task.
1920    */
1921   if ( createTask ) {
1922     startTask(taskStart);
1923   }
1924 #endif
1925   THREAD_RUNNABLE();
1926
1927 #if 0
1928   IF_DEBUG(scheduler,printTSO(tso));
1929 #endif
1930   RELEASE_LOCK(&sched_mutex);
1931 }
1932
1933 void scheduleThread(StgTSO* tso)
1934 {
1935   return scheduleThread_(tso, rtsFalse);
1936 }
1937
1938 void scheduleExtThread(StgTSO* tso)
1939 {
1940   return scheduleThread_(tso, rtsTrue);
1941 }
1942
1943 /* ---------------------------------------------------------------------------
1944  * initScheduler()
1945  *
1946  * Initialise the scheduler.  This resets all the queues - if the
1947  * queues contained any threads, they'll be garbage collected at the
1948  * next pass.
1949  *
1950  * ------------------------------------------------------------------------ */
1951
1952 #ifdef SMP
1953 static void
1954 term_handler(int sig STG_UNUSED)
1955 {
1956   stat_workerStop();
1957   ACQUIRE_LOCK(&term_mutex);
1958   await_death--;
1959   RELEASE_LOCK(&term_mutex);
1960   shutdownThread();
1961 }
1962 #endif
1963
1964 void 
1965 initScheduler(void)
1966 {
1967 #if defined(GRAN)
1968   nat i;
1969
1970   for (i=0; i<=MAX_PROC; i++) {
1971     run_queue_hds[i]      = END_TSO_QUEUE;
1972     run_queue_tls[i]      = END_TSO_QUEUE;
1973     blocked_queue_hds[i]  = END_TSO_QUEUE;
1974     blocked_queue_tls[i]  = END_TSO_QUEUE;
1975     ccalling_threadss[i]  = END_TSO_QUEUE;
1976     sleeping_queue        = END_TSO_QUEUE;
1977   }
1978 #else
1979   run_queue_hd      = END_TSO_QUEUE;
1980   run_queue_tl      = END_TSO_QUEUE;
1981   blocked_queue_hd  = END_TSO_QUEUE;
1982   blocked_queue_tl  = END_TSO_QUEUE;
1983   sleeping_queue    = END_TSO_QUEUE;
1984 #endif 
1985
1986   suspended_ccalling_threads  = END_TSO_QUEUE;
1987
1988   main_threads = NULL;
1989   all_threads  = END_TSO_QUEUE;
1990
1991   context_switch = 0;
1992   interrupted    = 0;
1993
1994   RtsFlags.ConcFlags.ctxtSwitchTicks =
1995       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1996       
1997 #if defined(RTS_SUPPORTS_THREADS)
1998   /* Initialise the mutex and condition variables used by
1999    * the scheduler. */
2000   initMutex(&sched_mutex);
2001   initMutex(&term_mutex);
2002
2003   initCondition(&thread_ready_cond);
2004 #endif
2005   
2006 #if defined(SMP)
2007   initCondition(&gc_pending_cond);
2008 #endif
2009
2010 #if defined(RTS_SUPPORTS_THREADS)
2011   ACQUIRE_LOCK(&sched_mutex);
2012 #endif
2013
2014   /* Install the SIGHUP handler */
2015 #if defined(SMP)
2016   {
2017     struct sigaction action,oact;
2018
2019     action.sa_handler = term_handler;
2020     sigemptyset(&action.sa_mask);
2021     action.sa_flags = 0;
2022     if (sigaction(SIGTERM, &action, &oact) != 0) {
2023       barf("can't install TERM handler");
2024     }
2025   }
2026 #endif
2027
2028   /* A capability holds the state a native thread needs in
2029    * order to execute STG code. At least one capability is
2030    * floating around (only SMP builds have more than one).
2031    */
2032   initCapabilities();
2033   
2034 #if defined(RTS_SUPPORTS_THREADS)
2035     /* start our haskell execution tasks */
2036 # if defined(SMP)
2037     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2038 # else
2039     startTaskManager(0,taskStart);
2040 # endif
2041 #endif
2042
2043 #if /* defined(SMP) ||*/ defined(PAR)
2044   initSparkPools();
2045 #endif
2046
2047 #if defined(RTS_SUPPORTS_THREADS)
2048   RELEASE_LOCK(&sched_mutex);
2049 #endif
2050
2051 }
2052
2053 void
2054 exitScheduler( void )
2055 {
2056 #if defined(RTS_SUPPORTS_THREADS)
2057   stopTaskManager();
2058 #endif
2059 }
2060
2061 /* -----------------------------------------------------------------------------
2062    Managing the per-task allocation areas.
2063    
2064    Each capability comes with an allocation area.  These are
2065    fixed-length block lists into which allocation can be done.
2066
2067    ToDo: no support for two-space collection at the moment???
2068    -------------------------------------------------------------------------- */
2069
2070 /* -----------------------------------------------------------------------------
2071  * waitThread is the external interface for running a new computation
2072  * and waiting for the result.
2073  *
2074  * In the non-SMP case, we create a new main thread, push it on the 
2075  * main-thread stack, and invoke the scheduler to run it.  The
2076  * scheduler will return when the top main thread on the stack has
2077  * completed or died, and fill in the necessary fields of the
2078  * main_thread structure.
2079  *
2080  * In the SMP case, we create a main thread as before, but we then
2081  * create a new condition variable and sleep on it.  When our new
2082  * main thread has completed, we'll be woken up and the status/result
2083  * will be in the main_thread struct.
2084  * -------------------------------------------------------------------------- */
2085
2086 int 
2087 howManyThreadsAvail ( void )
2088 {
2089    int i = 0;
2090    StgTSO* q;
2091    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2092       i++;
2093    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2094       i++;
2095    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2096       i++;
2097    return i;
2098 }
2099
2100 void
2101 finishAllThreads ( void )
2102 {
2103    do {
2104       while (run_queue_hd != END_TSO_QUEUE) {
2105          waitThread ( run_queue_hd, NULL);
2106       }
2107       while (blocked_queue_hd != END_TSO_QUEUE) {
2108          waitThread ( blocked_queue_hd, NULL);
2109       }
2110       while (sleeping_queue != END_TSO_QUEUE) {
2111          waitThread ( blocked_queue_hd, NULL);
2112       }
2113    } while 
2114       (blocked_queue_hd != END_TSO_QUEUE || 
2115        run_queue_hd     != END_TSO_QUEUE ||
2116        sleeping_queue   != END_TSO_QUEUE);
2117 }
2118
2119 SchedulerStatus
2120 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2121
2122   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2123 #if defined(THREADED_RTS)
2124   return waitThread_(tso,ret, rtsFalse);
2125 #else
2126   return waitThread_(tso,ret);
2127 #endif
2128 }
2129
2130 SchedulerStatus
2131 waitThread_(StgTSO *tso,
2132             /*out*/StgClosure **ret
2133 #if defined(THREADED_RTS)
2134             , rtsBool blockWaiting
2135 #endif
2136            )
2137 {
2138   StgMainThread *m;
2139   SchedulerStatus stat;
2140
2141   ACQUIRE_LOCK(&sched_mutex);
2142   IF_DEBUG(scheduler, sched_belch("== scheduler: waiting for thread (%d)\n", tso->id));
2143   
2144   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2145
2146   m->tso = tso;
2147   m->ret = ret;
2148   m->stat = NoStatus;
2149 #if defined(RTS_SUPPORTS_THREADS)
2150   initCondition(&m->wakeup);
2151 #endif
2152
2153   m->link = main_threads;
2154   main_threads = m;
2155
2156   IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id));
2157
2158 #if defined(RTS_SUPPORTS_THREADS)
2159
2160 # if defined(THREADED_RTS)
2161   if (!blockWaiting) {
2162     /* In the threaded case, the OS thread that called main()
2163      * gets to enter the RTS directly without going via another
2164      * task/thread.
2165      */
2166     RELEASE_LOCK(&sched_mutex);
2167     schedule();
2168     ASSERT(m->stat != NoStatus);
2169   } else 
2170 # endif
2171   {
2172     IF_DEBUG(scheduler, sched_belch("sfoo"));
2173     do {
2174       waitCondition(&m->wakeup, &sched_mutex);
2175     } while (m->stat == NoStatus);
2176   }
2177 #elif defined(GRAN)
2178   /* GranSim specific init */
2179   CurrentTSO = m->tso;                // the TSO to run
2180   procStatus[MainProc] = Busy;        // status of main PE
2181   CurrentProc = MainProc;             // PE to run it on
2182
2183   schedule();
2184 #else
2185   RELEASE_LOCK(&sched_mutex);
2186   schedule();
2187   ASSERT(m->stat != NoStatus);
2188 #endif
2189
2190   stat = m->stat;
2191
2192 #if defined(RTS_SUPPORTS_THREADS)
2193   closeCondition(&m->wakeup);
2194 #endif
2195
2196   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2197                               m->tso->id));
2198   free(m);
2199
2200 #if defined(THREADED_RTS)
2201   if (blockWaiting) 
2202 #endif
2203     RELEASE_LOCK(&sched_mutex);
2204
2205   return stat;
2206 }
2207
2208 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2209 //@subsection Run queue code 
2210
2211 #if 0
2212 /* 
2213    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2214        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2215        implicit global variable that has to be correct when calling these
2216        fcts -- HWL 
2217 */
2218
2219 /* Put the new thread on the head of the runnable queue.
2220  * The caller of createThread better push an appropriate closure
2221  * on this thread's stack before the scheduler is invoked.
2222  */
2223 static /* inline */ void
2224 add_to_run_queue(tso)
2225 StgTSO* tso; 
2226 {
2227   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2228   tso->link = run_queue_hd;
2229   run_queue_hd = tso;
2230   if (run_queue_tl == END_TSO_QUEUE) {
2231     run_queue_tl = tso;
2232   }
2233 }
2234
2235 /* Put the new thread at the end of the runnable queue. */
2236 static /* inline */ void
2237 push_on_run_queue(tso)
2238 StgTSO* tso; 
2239 {
2240   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2241   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2242   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2243   if (run_queue_hd == END_TSO_QUEUE) {
2244     run_queue_hd = tso;
2245   } else {
2246     run_queue_tl->link = tso;
2247   }
2248   run_queue_tl = tso;
2249 }
2250
2251 /* 
2252    Should be inlined because it's used very often in schedule.  The tso
2253    argument is actually only needed in GranSim, where we want to have the
2254    possibility to schedule *any* TSO on the run queue, irrespective of the
2255    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2256    the run queue and dequeue the tso, adjusting the links in the queue. 
2257 */
2258 //@cindex take_off_run_queue
2259 static /* inline */ StgTSO*
2260 take_off_run_queue(StgTSO *tso) {
2261   StgTSO *t, *prev;
2262
2263   /* 
2264      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2265
2266      if tso is specified, unlink that tso from the run_queue (doesn't have
2267      to be at the beginning of the queue); GranSim only 
2268   */
2269   if (tso!=END_TSO_QUEUE) {
2270     /* find tso in queue */
2271     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2272          t!=END_TSO_QUEUE && t!=tso;
2273          prev=t, t=t->link) 
2274       /* nothing */ ;
2275     ASSERT(t==tso);
2276     /* now actually dequeue the tso */
2277     if (prev!=END_TSO_QUEUE) {
2278       ASSERT(run_queue_hd!=t);
2279       prev->link = t->link;
2280     } else {
2281       /* t is at beginning of thread queue */
2282       ASSERT(run_queue_hd==t);
2283       run_queue_hd = t->link;
2284     }
2285     /* t is at end of thread queue */
2286     if (t->link==END_TSO_QUEUE) {
2287       ASSERT(t==run_queue_tl);
2288       run_queue_tl = prev;
2289     } else {
2290       ASSERT(run_queue_tl!=t);
2291     }
2292     t->link = END_TSO_QUEUE;
2293   } else {
2294     /* take tso from the beginning of the queue; std concurrent code */
2295     t = run_queue_hd;
2296     if (t != END_TSO_QUEUE) {
2297       run_queue_hd = t->link;
2298       t->link = END_TSO_QUEUE;
2299       if (run_queue_hd == END_TSO_QUEUE) {
2300         run_queue_tl = END_TSO_QUEUE;
2301       }
2302     }
2303   }
2304   return t;
2305 }
2306
2307 #endif /* 0 */
2308
2309 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2310 //@subsection Garbage Collextion Routines
2311
2312 /* ---------------------------------------------------------------------------
2313    Where are the roots that we know about?
2314
2315         - all the threads on the runnable queue
2316         - all the threads on the blocked queue
2317         - all the threads on the sleeping queue
2318         - all the thread currently executing a _ccall_GC
2319         - all the "main threads"
2320      
2321    ------------------------------------------------------------------------ */
2322
2323 /* This has to be protected either by the scheduler monitor, or by the
2324         garbage collection monitor (probably the latter).
2325         KH @ 25/10/99
2326 */
2327
2328 void
2329 GetRoots(evac_fn evac)
2330 {
2331 #if defined(GRAN)
2332   {
2333     nat i;
2334     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2335       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2336           evac((StgClosure **)&run_queue_hds[i]);
2337       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2338           evac((StgClosure **)&run_queue_tls[i]);
2339       
2340       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2341           evac((StgClosure **)&blocked_queue_hds[i]);
2342       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2343           evac((StgClosure **)&blocked_queue_tls[i]);
2344       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2345           evac((StgClosure **)&ccalling_threads[i]);
2346     }
2347   }
2348
2349   markEventQueue();
2350
2351 #else /* !GRAN */
2352   if (run_queue_hd != END_TSO_QUEUE) {
2353       ASSERT(run_queue_tl != END_TSO_QUEUE);
2354       evac((StgClosure **)&run_queue_hd);
2355       evac((StgClosure **)&run_queue_tl);
2356   }
2357   
2358   if (blocked_queue_hd != END_TSO_QUEUE) {
2359       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2360       evac((StgClosure **)&blocked_queue_hd);
2361       evac((StgClosure **)&blocked_queue_tl);
2362   }
2363   
2364   if (sleeping_queue != END_TSO_QUEUE) {
2365       evac((StgClosure **)&sleeping_queue);
2366   }
2367 #endif 
2368
2369   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2370       evac((StgClosure **)&suspended_ccalling_threads);
2371   }
2372
2373 #if defined(PAR) || defined(GRAN)
2374   markSparkQueue(evac);
2375 #endif
2376 }
2377
2378 /* -----------------------------------------------------------------------------
2379    performGC
2380
2381    This is the interface to the garbage collector from Haskell land.
2382    We provide this so that external C code can allocate and garbage
2383    collect when called from Haskell via _ccall_GC.
2384
2385    It might be useful to provide an interface whereby the programmer
2386    can specify more roots (ToDo).
2387    
2388    This needs to be protected by the GC condition variable above.  KH.
2389    -------------------------------------------------------------------------- */
2390
2391 void (*extra_roots)(evac_fn);
2392
2393 void
2394 performGC(void)
2395 {
2396   /* Obligated to hold this lock upon entry */
2397   ACQUIRE_LOCK(&sched_mutex);
2398   GarbageCollect(GetRoots,rtsFalse);
2399   RELEASE_LOCK(&sched_mutex);
2400 }
2401
2402 void
2403 performMajorGC(void)
2404 {
2405   ACQUIRE_LOCK(&sched_mutex);
2406   GarbageCollect(GetRoots,rtsTrue);
2407   RELEASE_LOCK(&sched_mutex);
2408 }
2409
2410 static void
2411 AllRoots(evac_fn evac)
2412 {
2413     GetRoots(evac);             // the scheduler's roots
2414     extra_roots(evac);          // the user's roots
2415 }
2416
2417 void
2418 performGCWithRoots(void (*get_roots)(evac_fn))
2419 {
2420   ACQUIRE_LOCK(&sched_mutex);
2421   extra_roots = get_roots;
2422   GarbageCollect(AllRoots,rtsFalse);
2423   RELEASE_LOCK(&sched_mutex);
2424 }
2425
2426 /* -----------------------------------------------------------------------------
2427    Stack overflow
2428
2429    If the thread has reached its maximum stack size, then raise the
2430    StackOverflow exception in the offending thread.  Otherwise
2431    relocate the TSO into a larger chunk of memory and adjust its stack
2432    size appropriately.
2433    -------------------------------------------------------------------------- */
2434
2435 static StgTSO *
2436 threadStackOverflow(StgTSO *tso)
2437 {
2438   nat new_stack_size, new_tso_size, diff, stack_words;
2439   StgPtr new_sp;
2440   StgTSO *dest;
2441
2442   IF_DEBUG(sanity,checkTSO(tso));
2443   if (tso->stack_size >= tso->max_stack_size) {
2444
2445     IF_DEBUG(gc,
2446              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2447                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2448              /* If we're debugging, just print out the top of the stack */
2449              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2450                                               tso->sp+64)));
2451
2452     /* Send this thread the StackOverflow exception */
2453     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2454     return tso;
2455   }
2456
2457   /* Try to double the current stack size.  If that takes us over the
2458    * maximum stack size for this thread, then use the maximum instead.
2459    * Finally round up so the TSO ends up as a whole number of blocks.
2460    */
2461   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2462   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2463                                        TSO_STRUCT_SIZE)/sizeof(W_);
2464   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2465   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2466
2467   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2468
2469   dest = (StgTSO *)allocate(new_tso_size);
2470   TICK_ALLOC_TSO(new_stack_size,0);
2471
2472   /* copy the TSO block and the old stack into the new area */
2473   memcpy(dest,tso,TSO_STRUCT_SIZE);
2474   stack_words = tso->stack + tso->stack_size - tso->sp;
2475   new_sp = (P_)dest + new_tso_size - stack_words;
2476   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2477
2478   /* relocate the stack pointers... */
2479   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2480   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2481   dest->sp    = new_sp;
2482   dest->stack_size = new_stack_size;
2483         
2484   /* and relocate the update frame list */
2485   relocate_stack(dest, diff);
2486
2487   /* Mark the old TSO as relocated.  We have to check for relocated
2488    * TSOs in the garbage collector and any primops that deal with TSOs.
2489    *
2490    * It's important to set the sp and su values to just beyond the end
2491    * of the stack, so we don't attempt to scavenge any part of the
2492    * dead TSO's stack.
2493    */
2494   tso->what_next = ThreadRelocated;
2495   tso->link = dest;
2496   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2497   tso->su = (StgUpdateFrame *)tso->sp;
2498   tso->why_blocked = NotBlocked;
2499   dest->mut_link = NULL;
2500
2501   IF_PAR_DEBUG(verbose,
2502                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2503                      tso->id, tso, tso->stack_size);
2504                /* If we're debugging, just print out the top of the stack */
2505                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2506                                                 tso->sp+64)));
2507   
2508   IF_DEBUG(sanity,checkTSO(tso));
2509 #if 0
2510   IF_DEBUG(scheduler,printTSO(dest));
2511 #endif
2512
2513   return dest;
2514 }
2515
2516 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2517 //@subsection Blocking Queue Routines
2518
2519 /* ---------------------------------------------------------------------------
2520    Wake up a queue that was blocked on some resource.
2521    ------------------------------------------------------------------------ */
2522
2523 #if defined(GRAN)
2524 static inline void
2525 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2526 {
2527 }
2528 #elif defined(PAR)
2529 static inline void
2530 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2531 {
2532   /* write RESUME events to log file and
2533      update blocked and fetch time (depending on type of the orig closure) */
2534   if (RtsFlags.ParFlags.ParStats.Full) {
2535     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2536                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2537                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2538     if (EMPTY_RUN_QUEUE())
2539       emitSchedule = rtsTrue;
2540
2541     switch (get_itbl(node)->type) {
2542         case FETCH_ME_BQ:
2543           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2544           break;
2545         case RBH:
2546         case FETCH_ME:
2547         case BLACKHOLE_BQ:
2548           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2549           break;
2550 #ifdef DIST
2551         case MVAR:
2552           break;
2553 #endif    
2554         default:
2555           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2556         }
2557       }
2558 }
2559 #endif
2560
2561 #if defined(GRAN)
2562 static StgBlockingQueueElement *
2563 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2564 {
2565     StgTSO *tso;
2566     PEs node_loc, tso_loc;
2567
2568     node_loc = where_is(node); // should be lifted out of loop
2569     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2570     tso_loc = where_is((StgClosure *)tso);
2571     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2572       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2573       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2574       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2575       // insertThread(tso, node_loc);
2576       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2577                 ResumeThread,
2578                 tso, node, (rtsSpark*)NULL);
2579       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2580       // len_local++;
2581       // len++;
2582     } else { // TSO is remote (actually should be FMBQ)
2583       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2584                                   RtsFlags.GranFlags.Costs.gunblocktime +
2585                                   RtsFlags.GranFlags.Costs.latency;
2586       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2587                 UnblockThread,
2588                 tso, node, (rtsSpark*)NULL);
2589       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2590       // len++;
2591     }
2592     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2593     IF_GRAN_DEBUG(bq,
2594                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2595                           (node_loc==tso_loc ? "Local" : "Global"), 
2596                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2597     tso->block_info.closure = NULL;
2598     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2599                              tso->id, tso));
2600 }
2601 #elif defined(PAR)
2602 static StgBlockingQueueElement *
2603 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2604 {
2605     StgBlockingQueueElement *next;
2606
2607     switch (get_itbl(bqe)->type) {
2608     case TSO:
2609       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2610       /* if it's a TSO just push it onto the run_queue */
2611       next = bqe->link;
2612       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2613       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2614       THREAD_RUNNABLE();
2615       unblockCount(bqe, node);
2616       /* reset blocking status after dumping event */
2617       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2618       break;
2619
2620     case BLOCKED_FETCH:
2621       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2622       next = bqe->link;
2623       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2624       PendingFetches = (StgBlockedFetch *)bqe;
2625       break;
2626
2627 # if defined(DEBUG)
2628       /* can ignore this case in a non-debugging setup; 
2629          see comments on RBHSave closures above */
2630     case CONSTR:
2631       /* check that the closure is an RBHSave closure */
2632       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2633              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2634              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2635       break;
2636
2637     default:
2638       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2639            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2640            (StgClosure *)bqe);
2641 # endif
2642     }
2643   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2644   return next;
2645 }
2646
2647 #else /* !GRAN && !PAR */
2648 static StgTSO *
2649 unblockOneLocked(StgTSO *tso)
2650 {
2651   StgTSO *next;
2652
2653   ASSERT(get_itbl(tso)->type == TSO);
2654   ASSERT(tso->why_blocked != NotBlocked);
2655   tso->why_blocked = NotBlocked;
2656   next = tso->link;
2657   PUSH_ON_RUN_QUEUE(tso);
2658   THREAD_RUNNABLE();
2659   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2660   return next;
2661 }
2662 #endif
2663
2664 #if defined(GRAN) || defined(PAR)
2665 inline StgBlockingQueueElement *
2666 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2667 {
2668   ACQUIRE_LOCK(&sched_mutex);
2669   bqe = unblockOneLocked(bqe, node);
2670   RELEASE_LOCK(&sched_mutex);
2671   return bqe;
2672 }
2673 #else
2674 inline StgTSO *
2675 unblockOne(StgTSO *tso)
2676 {
2677   ACQUIRE_LOCK(&sched_mutex);
2678   tso = unblockOneLocked(tso);
2679   RELEASE_LOCK(&sched_mutex);
2680   return tso;
2681 }
2682 #endif
2683
2684 #if defined(GRAN)
2685 void 
2686 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2687 {
2688   StgBlockingQueueElement *bqe;
2689   PEs node_loc;
2690   nat len = 0; 
2691
2692   IF_GRAN_DEBUG(bq, 
2693                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2694                       node, CurrentProc, CurrentTime[CurrentProc], 
2695                       CurrentTSO->id, CurrentTSO));
2696
2697   node_loc = where_is(node);
2698
2699   ASSERT(q == END_BQ_QUEUE ||
2700          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2701          get_itbl(q)->type == CONSTR); // closure (type constructor)
2702   ASSERT(is_unique(node));
2703
2704   /* FAKE FETCH: magically copy the node to the tso's proc;
2705      no Fetch necessary because in reality the node should not have been 
2706      moved to the other PE in the first place
2707   */
2708   if (CurrentProc!=node_loc) {
2709     IF_GRAN_DEBUG(bq, 
2710                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2711                         node, node_loc, CurrentProc, CurrentTSO->id, 
2712                         // CurrentTSO, where_is(CurrentTSO),
2713                         node->header.gran.procs));
2714     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2715     IF_GRAN_DEBUG(bq, 
2716                   belch("## new bitmask of node %p is %#x",
2717                         node, node->header.gran.procs));
2718     if (RtsFlags.GranFlags.GranSimStats.Global) {
2719       globalGranStats.tot_fake_fetches++;
2720     }
2721   }
2722
2723   bqe = q;
2724   // ToDo: check: ASSERT(CurrentProc==node_loc);
2725   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2726     //next = bqe->link;
2727     /* 
2728        bqe points to the current element in the queue
2729        next points to the next element in the queue
2730     */
2731     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2732     //tso_loc = where_is(tso);
2733     len++;
2734     bqe = unblockOneLocked(bqe, node);
2735   }
2736
2737   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2738      the closure to make room for the anchor of the BQ */
2739   if (bqe!=END_BQ_QUEUE) {
2740     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2741     /*
2742     ASSERT((info_ptr==&RBH_Save_0_info) ||
2743            (info_ptr==&RBH_Save_1_info) ||
2744            (info_ptr==&RBH_Save_2_info));
2745     */
2746     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2747     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2748     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2749
2750     IF_GRAN_DEBUG(bq,
2751                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2752                         node, info_type(node)));
2753   }
2754
2755   /* statistics gathering */
2756   if (RtsFlags.GranFlags.GranSimStats.Global) {
2757     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2758     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2759     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2760     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2761   }
2762   IF_GRAN_DEBUG(bq,
2763                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2764                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2765 }
2766 #elif defined(PAR)
2767 void 
2768 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2769 {
2770   StgBlockingQueueElement *bqe;
2771
2772   ACQUIRE_LOCK(&sched_mutex);
2773
2774   IF_PAR_DEBUG(verbose, 
2775                belch("##-_ AwBQ for node %p on [%x]: ",
2776                      node, mytid));
2777 #ifdef DIST  
2778   //RFP
2779   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2780     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2781     return;
2782   }
2783 #endif
2784   
2785   ASSERT(q == END_BQ_QUEUE ||
2786          get_itbl(q)->type == TSO ||           
2787          get_itbl(q)->type == BLOCKED_FETCH || 
2788          get_itbl(q)->type == CONSTR); 
2789
2790   bqe = q;
2791   while (get_itbl(bqe)->type==TSO || 
2792          get_itbl(bqe)->type==BLOCKED_FETCH) {
2793     bqe = unblockOneLocked(bqe, node);
2794   }
2795   RELEASE_LOCK(&sched_mutex);
2796 }
2797
2798 #else   /* !GRAN && !PAR */
2799 void
2800 awakenBlockedQueue(StgTSO *tso)
2801 {
2802   ACQUIRE_LOCK(&sched_mutex);
2803   while (tso != END_TSO_QUEUE) {
2804     tso = unblockOneLocked(tso);
2805   }
2806   RELEASE_LOCK(&sched_mutex);
2807 }
2808 #endif
2809
2810 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2811 //@subsection Exception Handling Routines
2812
2813 /* ---------------------------------------------------------------------------
2814    Interrupt execution
2815    - usually called inside a signal handler so it mustn't do anything fancy.   
2816    ------------------------------------------------------------------------ */
2817
2818 void
2819 interruptStgRts(void)
2820 {
2821     interrupted    = 1;
2822     context_switch = 1;
2823 }
2824
2825 /* -----------------------------------------------------------------------------
2826    Unblock a thread
2827
2828    This is for use when we raise an exception in another thread, which
2829    may be blocked.
2830    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2831    -------------------------------------------------------------------------- */
2832
2833 #if defined(GRAN) || defined(PAR)
2834 /*
2835   NB: only the type of the blocking queue is different in GranSim and GUM
2836       the operations on the queue-elements are the same
2837       long live polymorphism!
2838
2839   Locks: sched_mutex is held upon entry and exit.
2840
2841 */
2842 static void
2843 unblockThread(StgTSO *tso)
2844 {
2845   StgBlockingQueueElement *t, **last;
2846
2847   ACQUIRE_LOCK(&sched_mutex);
2848   switch (tso->why_blocked) {
2849
2850   case NotBlocked:
2851     return;  /* not blocked */
2852
2853   case BlockedOnMVar:
2854     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2855     {
2856       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2857       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2858
2859       last = (StgBlockingQueueElement **)&mvar->head;
2860       for (t = (StgBlockingQueueElement *)mvar->head; 
2861            t != END_BQ_QUEUE; 
2862            last = &t->link, last_tso = t, t = t->link) {
2863         if (t == (StgBlockingQueueElement *)tso) {
2864           *last = (StgBlockingQueueElement *)tso->link;
2865           if (mvar->tail == tso) {
2866             mvar->tail = (StgTSO *)last_tso;
2867           }
2868           goto done;
2869         }
2870       }
2871       barf("unblockThread (MVAR): TSO not found");
2872     }
2873
2874   case BlockedOnBlackHole:
2875     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2876     {
2877       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2878
2879       last = &bq->blocking_queue;
2880       for (t = bq->blocking_queue; 
2881            t != END_BQ_QUEUE; 
2882            last = &t->link, t = t->link) {
2883         if (t == (StgBlockingQueueElement *)tso) {
2884           *last = (StgBlockingQueueElement *)tso->link;
2885           goto done;
2886         }
2887       }
2888       barf("unblockThread (BLACKHOLE): TSO not found");
2889     }
2890
2891   case BlockedOnException:
2892     {
2893       StgTSO *target  = tso->block_info.tso;
2894
2895       ASSERT(get_itbl(target)->type == TSO);
2896
2897       if (target->what_next == ThreadRelocated) {
2898           target = target->link;
2899           ASSERT(get_itbl(target)->type == TSO);
2900       }
2901
2902       ASSERT(target->blocked_exceptions != NULL);
2903
2904       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2905       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2906            t != END_BQ_QUEUE; 
2907            last = &t->link, t = t->link) {
2908         ASSERT(get_itbl(t)->type == TSO);
2909         if (t == (StgBlockingQueueElement *)tso) {
2910           *last = (StgBlockingQueueElement *)tso->link;
2911           goto done;
2912         }
2913       }
2914       barf("unblockThread (Exception): TSO not found");
2915     }
2916
2917   case BlockedOnRead:
2918   case BlockedOnWrite:
2919     {
2920       /* take TSO off blocked_queue */
2921       StgBlockingQueueElement *prev = NULL;
2922       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2923            prev = t, t = t->link) {
2924         if (t == (StgBlockingQueueElement *)tso) {
2925           if (prev == NULL) {
2926             blocked_queue_hd = (StgTSO *)t->link;
2927             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2928               blocked_queue_tl = END_TSO_QUEUE;
2929             }
2930           } else {
2931             prev->link = t->link;
2932             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2933               blocked_queue_tl = (StgTSO *)prev;
2934             }
2935           }
2936           goto done;
2937         }
2938       }
2939       barf("unblockThread (I/O): TSO not found");
2940     }
2941
2942   case BlockedOnDelay:
2943     {
2944       /* take TSO off sleeping_queue */
2945       StgBlockingQueueElement *prev = NULL;
2946       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2947            prev = t, t = t->link) {
2948         if (t == (StgBlockingQueueElement *)tso) {
2949           if (prev == NULL) {
2950             sleeping_queue = (StgTSO *)t->link;
2951           } else {
2952             prev->link = t->link;
2953           }
2954           goto done;
2955         }
2956       }
2957       barf("unblockThread (I/O): TSO not found");
2958     }
2959
2960   default:
2961     barf("unblockThread");
2962   }
2963
2964  done:
2965   tso->link = END_TSO_QUEUE;
2966   tso->why_blocked = NotBlocked;
2967   tso->block_info.closure = NULL;
2968   PUSH_ON_RUN_QUEUE(tso);
2969   RELEASE_LOCK(&sched_mutex);
2970 }
2971 #else
2972 static void
2973 unblockThread(StgTSO *tso)
2974 {
2975   StgTSO *t, **last;
2976   
2977   /* To avoid locking unnecessarily. */
2978   if (tso->why_blocked == NotBlocked) {
2979     return;
2980   }
2981
2982   ACQUIRE_LOCK(&sched_mutex);
2983   switch (tso->why_blocked) {
2984
2985   case BlockedOnMVar:
2986     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2987     {
2988       StgTSO *last_tso = END_TSO_QUEUE;
2989       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2990
2991       last = &mvar->head;
2992       for (t = mvar->head; t != END_TSO_QUEUE; 
2993            last = &t->link, last_tso = t, t = t->link) {
2994         if (t == tso) {
2995           *last = tso->link;
2996           if (mvar->tail == tso) {
2997             mvar->tail = last_tso;
2998           }
2999           goto done;
3000         }
3001       }
3002       barf("unblockThread (MVAR): TSO not found");
3003     }
3004
3005   case BlockedOnBlackHole:
3006     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3007     {
3008       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3009
3010       last = &bq->blocking_queue;
3011       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
3012            last = &t->link, t = t->link) {
3013         if (t == tso) {
3014           *last = tso->link;
3015           goto done;
3016         }
3017       }
3018       barf("unblockThread (BLACKHOLE): TSO not found");
3019     }
3020
3021   case BlockedOnException:
3022     {
3023       StgTSO *target  = tso->block_info.tso;
3024
3025       ASSERT(get_itbl(target)->type == TSO);
3026
3027       while (target->what_next == ThreadRelocated) {
3028           target = target->link;
3029           ASSERT(get_itbl(target)->type == TSO);
3030       }
3031       
3032       ASSERT(target->blocked_exceptions != NULL);
3033
3034       last = &target->blocked_exceptions;
3035       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3036            last = &t->link, t = t->link) {
3037         ASSERT(get_itbl(t)->type == TSO);
3038         if (t == tso) {
3039           *last = tso->link;
3040           goto done;
3041         }
3042       }
3043       barf("unblockThread (Exception): TSO not found");
3044     }
3045
3046   case BlockedOnRead:
3047   case BlockedOnWrite:
3048     {
3049       StgTSO *prev = NULL;
3050       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3051            prev = t, t = t->link) {
3052         if (t == tso) {
3053           if (prev == NULL) {
3054             blocked_queue_hd = t->link;
3055             if (blocked_queue_tl == t) {
3056               blocked_queue_tl = END_TSO_QUEUE;
3057             }
3058           } else {
3059             prev->link = t->link;
3060             if (blocked_queue_tl == t) {
3061               blocked_queue_tl = prev;
3062             }
3063           }
3064           goto done;
3065         }
3066       }
3067       barf("unblockThread (I/O): TSO not found");
3068     }
3069
3070   case BlockedOnDelay:
3071     {
3072       StgTSO *prev = NULL;
3073       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3074            prev = t, t = t->link) {
3075         if (t == tso) {
3076           if (prev == NULL) {
3077             sleeping_queue = t->link;
3078           } else {
3079             prev->link = t->link;
3080           }
3081           goto done;
3082         }
3083       }
3084       barf("unblockThread (I/O): TSO not found");
3085     }
3086
3087   default:
3088     barf("unblockThread");
3089   }
3090
3091  done:
3092   tso->link = END_TSO_QUEUE;
3093   tso->why_blocked = NotBlocked;
3094   tso->block_info.closure = NULL;
3095   PUSH_ON_RUN_QUEUE(tso);
3096   RELEASE_LOCK(&sched_mutex);
3097 }
3098 #endif
3099
3100 /* -----------------------------------------------------------------------------
3101  * raiseAsync()
3102  *
3103  * The following function implements the magic for raising an
3104  * asynchronous exception in an existing thread.
3105  *
3106  * We first remove the thread from any queue on which it might be
3107  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3108  *
3109  * We strip the stack down to the innermost CATCH_FRAME, building
3110  * thunks in the heap for all the active computations, so they can 
3111  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3112  * an application of the handler to the exception, and push it on
3113  * the top of the stack.
3114  * 
3115  * How exactly do we save all the active computations?  We create an
3116  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3117  * AP_UPDs pushes everything from the corresponding update frame
3118  * upwards onto the stack.  (Actually, it pushes everything up to the
3119  * next update frame plus a pointer to the next AP_UPD object.
3120  * Entering the next AP_UPD object pushes more onto the stack until we
3121  * reach the last AP_UPD object - at which point the stack should look
3122  * exactly as it did when we killed the TSO and we can continue
3123  * execution by entering the closure on top of the stack.
3124  *
3125  * We can also kill a thread entirely - this happens if either (a) the 
3126  * exception passed to raiseAsync is NULL, or (b) there's no
3127  * CATCH_FRAME on the stack.  In either case, we strip the entire
3128  * stack and replace the thread with a zombie.
3129  *
3130  * Locks: sched_mutex not held upon entry nor exit.
3131  *
3132  * -------------------------------------------------------------------------- */
3133  
3134 void 
3135 deleteThread(StgTSO *tso)
3136 {
3137   raiseAsync(tso,NULL);
3138 }
3139
3140 void
3141 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3142 {
3143   /* When raising async exs from contexts where sched_mutex is held;
3144      use raiseAsyncWithLock(). */
3145   RELEASE_LOCK(&sched_mutex);
3146   raiseAsync(tso,exception);
3147   ACQUIRE_LOCK(&sched_mutex);
3148 }
3149
3150 void
3151 raiseAsync(StgTSO *tso, StgClosure *exception)
3152 {
3153   StgUpdateFrame* su = tso->su;
3154   StgPtr          sp = tso->sp;
3155   
3156   /* Thread already dead? */
3157   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3158     return;
3159   }
3160
3161   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3162
3163   /* Remove it from any blocking queues */
3164   unblockThread(tso);
3165
3166   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3167   /* The stack freezing code assumes there's a closure pointer on
3168    * the top of the stack.  This isn't always the case with compiled
3169    * code, so we have to push a dummy closure on the top which just
3170    * returns to the next return address on the stack.
3171    */
3172   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3173     *(--sp) = (W_)&stg_dummy_ret_closure;
3174   }
3175
3176   while (1) {
3177     nat words = ((P_)su - (P_)sp) - 1;
3178     nat i;
3179     StgAP_UPD * ap;
3180
3181     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3182      * then build the THUNK raise(exception), and leave it on
3183      * top of the CATCH_FRAME ready to enter.
3184      */
3185     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3186 #ifdef PROFILING
3187       StgCatchFrame *cf = (StgCatchFrame *)su;
3188 #endif
3189       StgClosure *raise;
3190
3191       /* we've got an exception to raise, so let's pass it to the
3192        * handler in this frame.
3193        */
3194       raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3195       TICK_ALLOC_SE_THK(1,0);
3196       SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3197       raise->payload[0] = exception;
3198
3199       /* throw away the stack from Sp up to the CATCH_FRAME.
3200        */
3201       sp = (P_)su - 1;
3202
3203       /* Ensure that async excpetions are blocked now, so we don't get
3204        * a surprise exception before we get around to executing the
3205        * handler.
3206        */
3207       if (tso->blocked_exceptions == NULL) {
3208           tso->blocked_exceptions = END_TSO_QUEUE;
3209       }
3210
3211       /* Put the newly-built THUNK on top of the stack, ready to execute
3212        * when the thread restarts.
3213        */
3214       sp[0] = (W_)raise;
3215       tso->sp = sp;
3216       tso->su = su;
3217       tso->what_next = ThreadEnterGHC;
3218       IF_DEBUG(sanity, checkTSO(tso));
3219       return;
3220     }
3221
3222     /* First build an AP_UPD consisting of the stack chunk above the
3223      * current update frame, with the top word on the stack as the
3224      * fun field.
3225      */
3226     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3227     
3228     ASSERT(words >= 0);
3229     
3230     ap->n_args = words;
3231     ap->fun    = (StgClosure *)sp[0];
3232     sp++;
3233     for(i=0; i < (nat)words; ++i) {
3234       ap->payload[i] = (StgClosure *)*sp++;
3235     }
3236     
3237     switch (get_itbl(su)->type) {
3238       
3239     case UPDATE_FRAME:
3240       {
3241         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3242         TICK_ALLOC_UP_THK(words+1,0);
3243         
3244         IF_DEBUG(scheduler,
3245                  fprintf(stderr,  "scheduler: Updating ");
3246                  printPtr((P_)su->updatee); 
3247                  fprintf(stderr,  " with ");
3248                  printObj((StgClosure *)ap);
3249                  );
3250         
3251         /* Replace the updatee with an indirection - happily
3252          * this will also wake up any threads currently
3253          * waiting on the result.
3254          *
3255          * Warning: if we're in a loop, more than one update frame on
3256          * the stack may point to the same object.  Be careful not to
3257          * overwrite an IND_OLDGEN in this case, because we'll screw
3258          * up the mutable lists.  To be on the safe side, don't
3259          * overwrite any kind of indirection at all.  See also
3260          * threadSqueezeStack in GC.c, where we have to make a similar
3261          * check.
3262          */
3263         if (!closure_IND(su->updatee)) {
3264             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3265         }
3266         su = su->link;
3267         sp += sizeofW(StgUpdateFrame) -1;
3268         sp[0] = (W_)ap; /* push onto stack */
3269         break;
3270       }
3271
3272     case CATCH_FRAME:
3273       {
3274         StgCatchFrame *cf = (StgCatchFrame *)su;
3275         StgClosure* o;
3276         
3277         /* We want a PAP, not an AP_UPD.  Fortunately, the
3278          * layout's the same.
3279          */
3280         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3281         TICK_ALLOC_UPD_PAP(words+1,0);
3282         
3283         /* now build o = FUN(catch,ap,handler) */
3284         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3285         TICK_ALLOC_FUN(2,0);
3286         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3287         o->payload[0] = (StgClosure *)ap;
3288         o->payload[1] = cf->handler;
3289         
3290         IF_DEBUG(scheduler,
3291                  fprintf(stderr,  "scheduler: Built ");
3292                  printObj((StgClosure *)o);
3293                  );
3294         
3295         /* pop the old handler and put o on the stack */
3296         su = cf->link;
3297         sp += sizeofW(StgCatchFrame) - 1;
3298         sp[0] = (W_)o;
3299         break;
3300       }
3301       
3302     case SEQ_FRAME:
3303       {
3304         StgSeqFrame *sf = (StgSeqFrame *)su;
3305         StgClosure* o;
3306         
3307         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3308         TICK_ALLOC_UPD_PAP(words+1,0);
3309         
3310         /* now build o = FUN(seq,ap) */
3311         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3312         TICK_ALLOC_SE_THK(1,0);
3313         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3314         o->payload[0] = (StgClosure *)ap;
3315         
3316         IF_DEBUG(scheduler,
3317                  fprintf(stderr,  "scheduler: Built ");
3318                  printObj((StgClosure *)o);
3319                  );
3320         
3321         /* pop the old handler and put o on the stack */
3322         su = sf->link;
3323         sp += sizeofW(StgSeqFrame) - 1;
3324         sp[0] = (W_)o;
3325         break;
3326       }
3327       
3328     case STOP_FRAME:
3329       /* We've stripped the entire stack, the thread is now dead. */
3330       sp += sizeofW(StgStopFrame) - 1;
3331       sp[0] = (W_)exception;    /* save the exception */
3332       tso->what_next = ThreadKilled;
3333       tso->su = (StgUpdateFrame *)(sp+1);
3334       tso->sp = sp;
3335       return;
3336
3337     default:
3338       barf("raiseAsync");
3339     }
3340   }
3341   barf("raiseAsync");
3342 }
3343
3344 /* -----------------------------------------------------------------------------
3345    resurrectThreads is called after garbage collection on the list of
3346    threads found to be garbage.  Each of these threads will be woken
3347    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3348    on an MVar, or NonTermination if the thread was blocked on a Black
3349    Hole.
3350
3351    Locks: sched_mutex isn't held upon entry nor exit.
3352    -------------------------------------------------------------------------- */
3353
3354 void
3355 resurrectThreads( StgTSO *threads )
3356 {
3357   StgTSO *tso, *next;
3358
3359   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3360     next = tso->global_link;
3361     tso->global_link = all_threads;
3362     all_threads = tso;
3363     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3364
3365     switch (tso->why_blocked) {
3366     case BlockedOnMVar:
3367     case BlockedOnException:
3368       /* Called by GC - sched_mutex lock is currently held. */
3369       raiseAsyncWithLock(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3370       break;
3371     case BlockedOnBlackHole:
3372       raiseAsyncWithLock(tso,(StgClosure *)NonTermination_closure);
3373       break;
3374     case NotBlocked:
3375       /* This might happen if the thread was blocked on a black hole
3376        * belonging to a thread that we've just woken up (raiseAsync
3377        * can wake up threads, remember...).
3378        */
3379       continue;
3380     default:
3381       barf("resurrectThreads: thread blocked in a strange way");
3382     }
3383   }
3384 }
3385
3386 /* -----------------------------------------------------------------------------
3387  * Blackhole detection: if we reach a deadlock, test whether any
3388  * threads are blocked on themselves.  Any threads which are found to
3389  * be self-blocked get sent a NonTermination exception.
3390  *
3391  * This is only done in a deadlock situation in order to avoid
3392  * performance overhead in the normal case.
3393  *
3394  * Locks: sched_mutex is held upon entry and exit.
3395  * -------------------------------------------------------------------------- */
3396
3397 static void
3398 detectBlackHoles( void )
3399 {
3400     StgTSO *t = all_threads;
3401     StgUpdateFrame *frame;
3402     StgClosure *blocked_on;
3403
3404     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3405
3406         while (t->what_next == ThreadRelocated) {
3407             t = t->link;
3408             ASSERT(get_itbl(t)->type == TSO);
3409         }
3410       
3411         if (t->why_blocked != BlockedOnBlackHole) {
3412             continue;
3413         }
3414
3415         blocked_on = t->block_info.closure;
3416
3417         for (frame = t->su; ; frame = frame->link) {
3418             switch (get_itbl(frame)->type) {
3419
3420             case UPDATE_FRAME:
3421                 if (frame->updatee == blocked_on) {
3422                     /* We are blocking on one of our own computations, so
3423                      * send this thread the NonTermination exception.  
3424                      */
3425                     IF_DEBUG(scheduler, 
3426                              sched_belch("thread %d is blocked on itself", t->id));
3427                     raiseAsyncWithLock(t, (StgClosure *)NonTermination_closure);
3428                     goto done;
3429                 }
3430                 else {
3431                     continue;
3432                 }
3433
3434             case CATCH_FRAME:
3435             case SEQ_FRAME:
3436                 continue;
3437                 
3438             case STOP_FRAME:
3439                 break;
3440             }
3441             break;
3442         }
3443
3444     done: ;
3445     }   
3446 }
3447
3448 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3449 //@subsection Debugging Routines
3450
3451 /* -----------------------------------------------------------------------------
3452    Debugging: why is a thread blocked
3453    -------------------------------------------------------------------------- */
3454
3455 #ifdef DEBUG
3456
3457 void
3458 printThreadBlockage(StgTSO *tso)
3459 {
3460   switch (tso->why_blocked) {
3461   case BlockedOnRead:
3462     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3463     break;
3464   case BlockedOnWrite:
3465     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3466     break;
3467   case BlockedOnDelay:
3468     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3469     break;
3470   case BlockedOnMVar:
3471     fprintf(stderr,"is blocked on an MVar");
3472     break;
3473   case BlockedOnException:
3474     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3475             tso->block_info.tso->id);
3476     break;
3477   case BlockedOnBlackHole:
3478     fprintf(stderr,"is blocked on a black hole");
3479     break;
3480   case NotBlocked:
3481     fprintf(stderr,"is not blocked");
3482     break;
3483 #if defined(PAR)
3484   case BlockedOnGA:
3485     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3486             tso->block_info.closure, info_type(tso->block_info.closure));
3487     break;
3488   case BlockedOnGA_NoSend:
3489     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3490             tso->block_info.closure, info_type(tso->block_info.closure));
3491     break;
3492 #endif
3493 #if defined(RTS_SUPPORTS_THREADS)
3494   case BlockedOnCCall:
3495     fprintf(stderr,"is blocked on an external call");
3496     break;
3497 #endif
3498   default:
3499     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3500          tso->why_blocked, tso->id, tso);
3501   }
3502 }
3503
3504 void
3505 printThreadStatus(StgTSO *tso)
3506 {
3507   switch (tso->what_next) {
3508   case ThreadKilled:
3509     fprintf(stderr,"has been killed");
3510     break;
3511   case ThreadComplete:
3512     fprintf(stderr,"has completed");
3513     break;
3514   default:
3515     printThreadBlockage(tso);
3516   }
3517 }
3518
3519 void
3520 printAllThreads(void)
3521 {
3522   StgTSO *t;
3523
3524 # if defined(GRAN)
3525   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3526   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3527                        time_string, rtsFalse/*no commas!*/);
3528
3529   sched_belch("all threads at [%s]:", time_string);
3530 # elif defined(PAR)
3531   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3532   ullong_format_string(CURRENT_TIME,
3533                        time_string, rtsFalse/*no commas!*/);
3534
3535   sched_belch("all threads at [%s]:", time_string);
3536 # else
3537   sched_belch("all threads:");
3538 # endif
3539
3540   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3541     fprintf(stderr, "\tthread %d ", t->id);
3542     if (t->label) fprintf(stderr,"[\"%s\"] ",t->label);
3543     printThreadStatus(t);
3544     fprintf(stderr,"\n");
3545   }
3546 }
3547     
3548 /* 
3549    Print a whole blocking queue attached to node (debugging only).
3550 */
3551 //@cindex print_bq
3552 # if defined(PAR)
3553 void 
3554 print_bq (StgClosure *node)
3555 {
3556   StgBlockingQueueElement *bqe;
3557   StgTSO *tso;
3558   rtsBool end;
3559
3560   fprintf(stderr,"## BQ of closure %p (%s): ",
3561           node, info_type(node));
3562
3563   /* should cover all closures that may have a blocking queue */
3564   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3565          get_itbl(node)->type == FETCH_ME_BQ ||
3566          get_itbl(node)->type == RBH ||
3567          get_itbl(node)->type == MVAR);
3568     
3569   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3570
3571   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3572 }
3573
3574 /* 
3575    Print a whole blocking queue starting with the element bqe.
3576 */
3577 void 
3578 print_bqe (StgBlockingQueueElement *bqe)
3579 {
3580   rtsBool end;
3581
3582   /* 
3583      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3584   */
3585   for (end = (bqe==END_BQ_QUEUE);
3586        !end; // iterate until bqe points to a CONSTR
3587        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3588        bqe = end ? END_BQ_QUEUE : bqe->link) {
3589     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3590     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3591     /* types of closures that may appear in a blocking queue */
3592     ASSERT(get_itbl(bqe)->type == TSO ||           
3593            get_itbl(bqe)->type == BLOCKED_FETCH || 
3594            get_itbl(bqe)->type == CONSTR); 
3595     /* only BQs of an RBH end with an RBH_Save closure */
3596     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3597
3598     switch (get_itbl(bqe)->type) {
3599     case TSO:
3600       fprintf(stderr," TSO %u (%x),",
3601               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3602       break;
3603     case BLOCKED_FETCH:
3604       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3605               ((StgBlockedFetch *)bqe)->node, 
3606               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3607               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3608               ((StgBlockedFetch *)bqe)->ga.weight);
3609       break;
3610     case CONSTR:
3611       fprintf(stderr," %s (IP %p),",
3612               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3613                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3614                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3615                "RBH_Save_?"), get_itbl(bqe));
3616       break;
3617     default:
3618       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3619            info_type((StgClosure *)bqe)); // , node, info_type(node));
3620       break;
3621     }
3622   } /* for */
3623   fputc('\n', stderr);
3624 }
3625 # elif defined(GRAN)
3626 void 
3627 print_bq (StgClosure *node)
3628 {
3629   StgBlockingQueueElement *bqe;
3630   PEs node_loc, tso_loc;
3631   rtsBool end;
3632
3633   /* should cover all closures that may have a blocking queue */
3634   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3635          get_itbl(node)->type == FETCH_ME_BQ ||
3636          get_itbl(node)->type == RBH);
3637     
3638   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3639   node_loc = where_is(node);
3640
3641   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3642           node, info_type(node), node_loc);
3643
3644   /* 
3645      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3646   */
3647   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3648        !end; // iterate until bqe points to a CONSTR
3649        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3650     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3651     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3652     /* types of closures that may appear in a blocking queue */
3653     ASSERT(get_itbl(bqe)->type == TSO ||           
3654            get_itbl(bqe)->type == CONSTR); 
3655     /* only BQs of an RBH end with an RBH_Save closure */
3656     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3657
3658     tso_loc = where_is((StgClosure *)bqe);
3659     switch (get_itbl(bqe)->type) {
3660     case TSO:
3661       fprintf(stderr," TSO %d (%p) on [PE %d],",
3662               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3663       break;
3664     case CONSTR:
3665       fprintf(stderr," %s (IP %p),",
3666               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3667                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3668                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3669                "RBH_Save_?"), get_itbl(bqe));
3670       break;
3671     default:
3672       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3673            info_type((StgClosure *)bqe), node, info_type(node));
3674       break;
3675     }
3676   } /* for */
3677   fputc('\n', stderr);
3678 }
3679 #else
3680 /* 
3681    Nice and easy: only TSOs on the blocking queue
3682 */
3683 void 
3684 print_bq (StgClosure *node)
3685 {
3686   StgTSO *tso;
3687
3688   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3689   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3690        tso != END_TSO_QUEUE; 
3691        tso=tso->link) {
3692     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3693     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3694     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3695   }
3696   fputc('\n', stderr);
3697 }
3698 # endif
3699
3700 #if defined(PAR)
3701 static nat
3702 run_queue_len(void)
3703 {
3704   nat i;
3705   StgTSO *tso;
3706
3707   for (i=0, tso=run_queue_hd; 
3708        tso != END_TSO_QUEUE;
3709        i++, tso=tso->link)
3710     /* nothing */
3711
3712   return i;
3713 }
3714 #endif
3715
3716 static void
3717 sched_belch(char *s, ...)
3718 {
3719   va_list ap;
3720   va_start(ap,s);
3721 #ifdef SMP
3722   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3723 #elif defined(PAR)
3724   fprintf(stderr, "== ");
3725 #else
3726   fprintf(stderr, "scheduler: ");
3727 #endif
3728   vfprintf(stderr, s, ap);
3729   fprintf(stderr, "\n");
3730 }
3731
3732 #endif /* DEBUG */
3733
3734
3735 //@node Index,  , Debugging Routines, Main scheduling code
3736 //@subsection Index
3737
3738 //@index
3739 //* StgMainThread::  @cindex\s-+StgMainThread
3740 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3741 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3742 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3743 //* context_switch::  @cindex\s-+context_switch
3744 //* createThread::  @cindex\s-+createThread
3745 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3746 //* initScheduler::  @cindex\s-+initScheduler
3747 //* interrupted::  @cindex\s-+interrupted
3748 //* next_thread_id::  @cindex\s-+next_thread_id
3749 //* print_bq::  @cindex\s-+print_bq
3750 //* run_queue_hd::  @cindex\s-+run_queue_hd
3751 //* run_queue_tl::  @cindex\s-+run_queue_tl
3752 //* sched_mutex::  @cindex\s-+sched_mutex
3753 //* schedule::  @cindex\s-+schedule
3754 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3755 //* term_mutex::  @cindex\s-+term_mutex
3756 //@end index