ab008fd87f13dc9d39b315d5219741380b0f97bd
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.116 2002/02/04 20:56:53 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #include <stdarg.h>
118
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
121
122 /* Main threads:
123  *
124  * These are the threads which clients have requested that we run.  
125  *
126  * In a 'threaded' build, we might have several concurrent clients all
127  * waiting for results, and each one will wait on a condition variable
128  * until the result is available.
129  *
130  * In non-SMP, clients are strictly nested: the first client calls
131  * into the RTS, which might call out again to C with a _ccall_GC, and
132  * eventually re-enter the RTS.
133  *
134  * Main threads information is kept in a linked list:
135  */
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
138   StgTSO *         tso;
139   SchedulerStatus  stat;
140   StgClosure **    ret;
141 #if defined(RTS_SUPPORTS_THREADS)
142   Condition        wakeup;
143 #endif
144   struct StgMainThread_ *link;
145 } StgMainThread;
146
147 /* Main thread queue.
148  * Locks required: sched_mutex.
149  */
150 static StgMainThread *main_threads;
151
152 /* Thread queues.
153  * Locks required: sched_mutex.
154  */
155 #if defined(GRAN)
156
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
159
160 /* 
161    In GranSim we have a runable and a blocked queue for each processor.
162    In order to minimise code changes new arrays run_queue_hds/tls
163    are created. run_queue_hd is then a short cut (macro) for
164    run_queue_hds[CurrentProc] (see GranSim.h).
165    -- HWL
166 */
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171    the std RTS (i.e. we are cheating). However, we don't use this list in
172    the GranSim specific code at the moment (so we are only potentially
173    cheating).  */
174
175 #else /* !GRAN */
176
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
180
181 #endif
182
183 /* Linked list of all threads.
184  * Used for detecting garbage collected threads.
185  */
186 StgTSO *all_threads;
187
188 /* Threads suspended in _ccall_GC.
189  */
190 static StgTSO *suspended_ccalling_threads;
191
192 static StgTSO *threadStackOverflow(StgTSO *tso);
193
194 /* KH: The following two flags are shared memory locations.  There is no need
195        to lock them, since they are only unset at the end of a scheduler
196        operation.
197 */
198
199 /* flag set by signal handler to precipitate a context switch */
200 //@cindex context_switch
201 nat context_switch;
202
203 /* if this flag is set as well, give up execution */
204 //@cindex interrupted
205 rtsBool interrupted;
206
207 /* Next thread ID to allocate.
208  * Locks required: sched_mutex
209  */
210 //@cindex next_thread_id
211 StgThreadID next_thread_id = 1;
212
213 /*
214  * Pointers to the state of the current thread.
215  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
216  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
217  */
218  
219 /* The smallest stack size that makes any sense is:
220  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
221  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
222  *  + 1                       (the realworld token for an IO thread)
223  *  + 1                       (the closure to enter)
224  *
225  * A thread with this stack will bomb immediately with a stack
226  * overflow, which will increase its stack size.  
227  */
228
229 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
230
231
232 #if defined(GRAN)
233 StgTSO *CurrentTSO;
234 #endif
235
236 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
237  *  exists - earlier gccs apparently didn't.
238  *  -= chak
239  */
240 StgTSO dummy_tso;
241
242 rtsBool ready_to_gc;
243
244 void            addToBlockedQueue ( StgTSO *tso );
245
246 static void     schedule          ( void );
247        void     interruptStgRts   ( void );
248 #if defined(GRAN)
249 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
250 #else
251 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
252 #endif
253
254 static void     detectBlackHoles  ( void );
255
256 #ifdef DEBUG
257 static void sched_belch(char *s, ...);
258 #endif
259
260 #if defined(RTS_SUPPORTS_THREADS)
261 /* ToDo: carefully document the invariants that go together
262  *       with these synchronisation objects.
263  */
264 Mutex     sched_mutex       = INIT_MUTEX_VAR;
265 Mutex     term_mutex        = INIT_MUTEX_VAR;
266 #if defined(THREADED_RTS)
267 /*
268  * The rts_mutex is the 'big lock' that the active native
269  * thread within the RTS holds while executing code
270  * within the RTS. It is given up when the thread makes a
271  * transition out of the RTS (e.g., to perform an external
272  * C call), hopefully for another thread to enter the RTS.
273  *
274  */
275 Mutex     rts_mutex         = INIT_MUTEX_VAR;
276 /*
277  * When a native thread has completed executing an external
278  * call, it needs to communicate the result back to the
279  * (Haskell) thread that made the call. Do this as follows:
280  *
281  *  - in resumeThread(), the thread increments the counter
282  *    ext_threads_waiting, and then blocks on the
283  *    'big' RTS lock.
284  *  - upon entry to the scheduler, the thread that's currently
285  *    holding the RTS lock checks ext_threads_waiting. If there
286  *    are native threads waiting, it gives up its RTS lock
287  *    and tries to re-grab the RTS lock [perhaps after having
288  *    waited for a bit..?]
289  *  - care must be taken to deal with the case where more than
290  *    one external thread are waiting on the lock. [ToDo: more]
291  *    
292  */
293
294 static nat ext_threads_waiting = 0;
295 /*
296  * thread_ready_aux_mutex is used to handle the scenario where the
297  * the RTS executing thread runs out of work, but there are
298  * active external threads. The RTS executing thread gives up
299  * its RTS mutex, and blocks waiting for the thread_ready_cond.
300  * Unfortunately, a condition variable needs to be associated
301  * with a mutex in pthreads, so rts_thread_waiting_mutex is
302  * used for just this purpose.
303  * 
304  */
305 Mutex  thread_ready_aux_mutex = INIT_MUTEX_VAR;
306 #endif
307
308
309 /* thread_ready_cond: when signalled, a thread has
310  * become runnable. When used?
311  */
312 Condition thread_ready_cond = INIT_COND_VAR;
313 Condition gc_pending_cond   = INIT_COND_VAR;
314
315 nat await_death;
316 #endif
317
318 #if defined(PAR)
319 StgTSO *LastTSO;
320 rtsTime TimeOfLastYield;
321 rtsBool emitSchedule = rtsTrue;
322 #endif
323
324 #if DEBUG
325 char *whatNext_strs[] = {
326   "ThreadEnterGHC",
327   "ThreadRunGHC",
328   "ThreadEnterInterp",
329   "ThreadKilled",
330   "ThreadComplete"
331 };
332
333 char *threadReturnCode_strs[] = {
334   "HeapOverflow",                       /* might also be StackOverflow */
335   "StackOverflow",
336   "ThreadYielding",
337   "ThreadBlocked",
338   "ThreadFinished"
339 };
340 #endif
341
342 #if defined(PAR)
343 StgTSO * createSparkThread(rtsSpark spark);
344 StgTSO * activateSpark (rtsSpark spark);  
345 #endif
346
347 /*
348  * The thread state for the main thread.
349 // ToDo: check whether not needed any more
350 StgTSO   *MainTSO;
351  */
352
353 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
354 static void taskStart(void);
355 static void
356 taskStart(void)
357 {
358   /* threads start up using 'taskStart', so make them
359      them grab the RTS lock. */
360 #if defined(THREADED_RTS)
361   ACQUIRE_LOCK(&rts_mutex);
362 #endif
363   schedule();
364 }
365 #endif
366
367
368
369
370 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
371 //@subsection Main scheduling loop
372
373 /* ---------------------------------------------------------------------------
374    Main scheduling loop.
375
376    We use round-robin scheduling, each thread returning to the
377    scheduler loop when one of these conditions is detected:
378
379       * out of heap space
380       * timer expires (thread yields)
381       * thread blocks
382       * thread ends
383       * stack overflow
384
385    Locking notes:  we acquire the scheduler lock once at the beginning
386    of the scheduler loop, and release it when
387     
388       * running a thread, or
389       * waiting for work, or
390       * waiting for a GC to complete.
391
392    GRAN version:
393      In a GranSim setup this loop iterates over the global event queue.
394      This revolves around the global event queue, which determines what 
395      to do next. Therefore, it's more complicated than either the 
396      concurrent or the parallel (GUM) setup.
397
398    GUM version:
399      GUM iterates over incoming messages.
400      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
401      and sends out a fish whenever it has nothing to do; in-between
402      doing the actual reductions (shared code below) it processes the
403      incoming messages and deals with delayed operations 
404      (see PendingFetches).
405      This is not the ugliest code you could imagine, but it's bloody close.
406
407    ------------------------------------------------------------------------ */
408 //@cindex schedule
409 static void
410 schedule( void )
411 {
412   StgTSO *t;
413   Capability *cap;
414   StgThreadReturnCode ret;
415 #if defined(GRAN)
416   rtsEvent *event;
417 #elif defined(PAR)
418   StgSparkPool *pool;
419   rtsSpark spark;
420   StgTSO *tso;
421   GlobalTaskId pe;
422   rtsBool receivedFinish = rtsFalse;
423 # if defined(DEBUG)
424   nat tp_size, sp_size; // stats only
425 # endif
426 #endif
427   rtsBool was_interrupted = rtsFalse;
428   
429   ACQUIRE_LOCK(&sched_mutex);
430   
431 #if defined(THREADED_RTS)
432   /* ToDo: consider SMP support */
433   if (ext_threads_waiting > 0) {
434     /* (At least) one external thread is waiting to
435      * to deposit the result of an external call.
436      * Give way to one of them by giving up the RTS
437      * lock.
438      */
439     RELEASE_LOCK(&sched_mutex);
440     RELEASE_LOCK(&rts_mutex);
441     /* ToDo: come up with mechanism that guarantees that
442      * the main thread doesn't loop here.
443      */
444     yieldThread();
445     /* ToDo: longjmp() */
446     taskStart();
447   }
448 #endif
449
450 #if defined(GRAN)
451
452   /* set up first event to get things going */
453   /* ToDo: assign costs for system setup and init MainTSO ! */
454   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
455             ContinueThread, 
456             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
457
458   IF_DEBUG(gran,
459            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
460            G_TSO(CurrentTSO, 5));
461
462   if (RtsFlags.GranFlags.Light) {
463     /* Save current time; GranSim Light only */
464     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
465   }      
466
467   event = get_next_event();
468
469   while (event!=(rtsEvent*)NULL) {
470     /* Choose the processor with the next event */
471     CurrentProc = event->proc;
472     CurrentTSO = event->tso;
473
474 #elif defined(PAR)
475
476   while (!receivedFinish) {    /* set by processMessages */
477                                /* when receiving PP_FINISH message         */ 
478 #else
479
480   while (1) {
481
482 #endif
483
484     IF_DEBUG(scheduler, printAllThreads());
485
486     /* If we're interrupted (the user pressed ^C, or some other
487      * termination condition occurred), kill all the currently running
488      * threads.
489      */
490     if (interrupted) {
491       IF_DEBUG(scheduler, sched_belch("interrupted"));
492       deleteAllThreads();
493       interrupted = rtsFalse;
494       was_interrupted = rtsTrue;
495     }
496
497     /* Go through the list of main threads and wake up any
498      * clients whose computations have finished.  ToDo: this
499      * should be done more efficiently without a linear scan
500      * of the main threads list, somehow...
501      */
502 #if defined(RTS_SUPPORTS_THREADS)
503     { 
504       StgMainThread *m, **prev;
505       prev = &main_threads;
506       for (m = main_threads; m != NULL; m = m->link) {
507         switch (m->tso->what_next) {
508         case ThreadComplete:
509           if (m->ret) {
510             *(m->ret) = (StgClosure *)m->tso->sp[0];
511           }
512           *prev = m->link;
513           m->stat = Success;
514           broadcastCondition(&m->wakeup);
515           break;
516         case ThreadKilled:
517           if (m->ret) *(m->ret) = NULL;
518           *prev = m->link;
519           if (was_interrupted) {
520             m->stat = Interrupted;
521           } else {
522             m->stat = Killed;
523           }
524           broadcastCondition(&m->wakeup);
525           break;
526         default:
527           break;
528         }
529       }
530     }
531
532 #else /* not threaded */
533
534 # if defined(PAR)
535     /* in GUM do this only on the Main PE */
536     if (IAmMainThread)
537 # endif
538     /* If our main thread has finished or been killed, return.
539      */
540     {
541       StgMainThread *m = main_threads;
542       if (m->tso->what_next == ThreadComplete
543           || m->tso->what_next == ThreadKilled) {
544         main_threads = main_threads->link;
545         if (m->tso->what_next == ThreadComplete) {
546           /* we finished successfully, fill in the return value */
547           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
548           m->stat = Success;
549           return;
550         } else {
551           if (m->ret) { *(m->ret) = NULL; };
552           if (was_interrupted) {
553             m->stat = Interrupted;
554           } else {
555             m->stat = Killed;
556           }
557           return;
558         }
559       }
560     }
561 #endif
562
563     /* Top up the run queue from our spark pool.  We try to make the
564      * number of threads in the run queue equal to the number of
565      * free capabilities.
566      *
567      * Disable spark support in SMP for now, non-essential & requires
568      * a little bit of work to make it compile cleanly. -- sof 1/02.
569      */
570 #if 0 /* defined(SMP) */
571     {
572       nat n = getFreeCapabilities();
573       StgTSO *tso = run_queue_hd;
574
575       /* Count the run queue */
576       while (n > 0 && tso != END_TSO_QUEUE) {
577         tso = tso->link;
578         n--;
579       }
580
581       for (; n > 0; n--) {
582         StgClosure *spark;
583         spark = findSpark(rtsFalse);
584         if (spark == NULL) {
585           break; /* no more sparks in the pool */
586         } else {
587           /* I'd prefer this to be done in activateSpark -- HWL */
588           /* tricky - it needs to hold the scheduler lock and
589            * not try to re-acquire it -- SDM */
590           createSparkThread(spark);       
591           IF_DEBUG(scheduler,
592                    sched_belch("==^^ turning spark of closure %p into a thread",
593                                (StgClosure *)spark));
594         }
595       }
596       /* We need to wake up the other tasks if we just created some
597        * work for them.
598        */
599       if (getFreeCapabilities() - n > 1) {
600           signalCondition( &thread_ready_cond );
601       }
602     }
603 #endif // SMP
604
605     /* check for signals each time around the scheduler */
606 #ifndef mingw32_TARGET_OS
607     if (signals_pending()) {
608       startSignalHandlers();
609     }
610 #endif
611
612     /* Check whether any waiting threads need to be woken up.  If the
613      * run queue is empty, and there are no other tasks running, we
614      * can wait indefinitely for something to happen.
615      * ToDo: what if another client comes along & requests another
616      * main thread?
617      */
618     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
619       awaitEvent(
620            (run_queue_hd == END_TSO_QUEUE)
621 #if defined(SMP)
622         && allFreeCapabilities()
623 #endif
624         );
625     }
626     /* we can be interrupted while waiting for I/O... */
627     if (interrupted) continue;
628
629     /* 
630      * Detect deadlock: when we have no threads to run, there are no
631      * threads waiting on I/O or sleeping, and all the other tasks are
632      * waiting for work, we must have a deadlock of some description.
633      *
634      * We first try to find threads blocked on themselves (ie. black
635      * holes), and generate NonTermination exceptions where necessary.
636      *
637      * If no threads are black holed, we have a deadlock situation, so
638      * inform all the main threads.
639      */
640 #ifndef PAR
641     if (blocked_queue_hd == END_TSO_QUEUE
642         && run_queue_hd == END_TSO_QUEUE
643         && sleeping_queue == END_TSO_QUEUE
644 #if defined(SMP)
645         && allFreeCapabilities()
646 #elif defined(THREADED_RTS)
647         && suspended_ccalling_threads == END_TSO_QUEUE
648 #endif
649         )
650     {
651         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
652         RELEASE_LOCK(&sched_mutex);
653         GarbageCollect(GetRoots,rtsTrue);
654         ACQUIRE_LOCK(&sched_mutex);
655         IF_DEBUG(scheduler, sched_belch("GC done."));
656         if (blocked_queue_hd == END_TSO_QUEUE
657             && run_queue_hd == END_TSO_QUEUE
658             && sleeping_queue == END_TSO_QUEUE) {
659
660             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
661             detectBlackHoles();
662
663             /* No black holes, so probably a real deadlock.  Send the
664              * current main thread the Deadlock exception (or in the SMP
665              * build, send *all* main threads the deadlock exception,
666              * since none of them can make progress).
667              */
668             if (run_queue_hd == END_TSO_QUEUE) {
669                 StgMainThread *m;
670 #if defined(RTS_SUPPORTS_THREADS)
671                 for (m = main_threads; m != NULL; m = m->link) {
672                     switch (m->tso->why_blocked) {
673                     case BlockedOnBlackHole:
674                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
675                         break;
676                     case BlockedOnException:
677                     case BlockedOnMVar:
678                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
679                         break;
680                     default:
681                         barf("deadlock: main thread blocked in a strange way");
682                     }
683                 }
684 #else
685                 m = main_threads;
686                 switch (m->tso->why_blocked) {
687                 case BlockedOnBlackHole:
688                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
689                     break;
690                 case BlockedOnException:
691                 case BlockedOnMVar:
692                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
693                     break;
694                 default:
695                     barf("deadlock: main thread blocked in a strange way");
696                 }
697 #endif
698             }
699 #if defined(RTS_SUPPORTS_THREADS)
700             if ( run_queue_hd == END_TSO_QUEUE ) {
701               IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down."));
702               shutdownHaskellAndExit(0);
703             
704             }
705 #endif
706             ASSERT( run_queue_hd != END_TSO_QUEUE );
707         }
708     }
709 #elif defined(PAR)
710     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
711 #endif
712
713 #if defined(SMP)
714     /* If there's a GC pending, don't do anything until it has
715      * completed.
716      */
717     if (ready_to_gc) {
718       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
719       waitCondition( &gc_pending_cond, &sched_mutex );
720     }
721 #endif    
722
723 #if defined(SMP)
724     /* block until we've got a thread on the run queue and a free
725      * capability.
726      */
727     while ( run_queue_hd == END_TSO_QUEUE 
728             || noFreeCapabilities() 
729             ) {
730       IF_DEBUG(scheduler, sched_belch("waiting for work"));
731       waitCondition( &thread_ready_cond, &sched_mutex );
732       IF_DEBUG(scheduler, sched_belch("work now available"));
733     }
734 #elif defined(THREADED_RTS)
735    if ( run_queue_hd == END_TSO_QUEUE ) {
736      /* no work available, wait for external calls to complete. */
737      IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId()));
738      RELEASE_LOCK(&sched_mutex);
739      RELEASE_LOCK(&rts_mutex);
740      /* Sigh - need to have a mutex locked in order to wait on the
741         condition variable. */
742      ACQUIRE_LOCK(&thread_ready_aux_mutex);
743      waitCondition(&thread_ready_cond, &thread_ready_aux_mutex);
744      RELEASE_LOCK(&thread_ready_aux_mutex);
745      IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId()));
746      /* ToDo: longjmp() */
747      taskStart();
748    
749    }
750 #endif
751
752 #if defined(GRAN)
753
754     if (RtsFlags.GranFlags.Light)
755       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
756
757     /* adjust time based on time-stamp */
758     if (event->time > CurrentTime[CurrentProc] &&
759         event->evttype != ContinueThread)
760       CurrentTime[CurrentProc] = event->time;
761     
762     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
763     if (!RtsFlags.GranFlags.Light)
764       handleIdlePEs();
765
766     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
767
768     /* main event dispatcher in GranSim */
769     switch (event->evttype) {
770       /* Should just be continuing execution */
771     case ContinueThread:
772       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
773       /* ToDo: check assertion
774       ASSERT(run_queue_hd != (StgTSO*)NULL &&
775              run_queue_hd != END_TSO_QUEUE);
776       */
777       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
778       if (!RtsFlags.GranFlags.DoAsyncFetch &&
779           procStatus[CurrentProc]==Fetching) {
780         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
781               CurrentTSO->id, CurrentTSO, CurrentProc);
782         goto next_thread;
783       } 
784       /* Ignore ContinueThreads for completed threads */
785       if (CurrentTSO->what_next == ThreadComplete) {
786         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
787               CurrentTSO->id, CurrentTSO, CurrentProc);
788         goto next_thread;
789       } 
790       /* Ignore ContinueThreads for threads that are being migrated */
791       if (PROCS(CurrentTSO)==Nowhere) { 
792         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
793               CurrentTSO->id, CurrentTSO, CurrentProc);
794         goto next_thread;
795       }
796       /* The thread should be at the beginning of the run queue */
797       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
798         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
799               CurrentTSO->id, CurrentTSO, CurrentProc);
800         break; // run the thread anyway
801       }
802       /*
803       new_event(proc, proc, CurrentTime[proc],
804                 FindWork,
805                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
806       goto next_thread; 
807       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
808       break; // now actually run the thread; DaH Qu'vam yImuHbej 
809
810     case FetchNode:
811       do_the_fetchnode(event);
812       goto next_thread;             /* handle next event in event queue  */
813       
814     case GlobalBlock:
815       do_the_globalblock(event);
816       goto next_thread;             /* handle next event in event queue  */
817       
818     case FetchReply:
819       do_the_fetchreply(event);
820       goto next_thread;             /* handle next event in event queue  */
821       
822     case UnblockThread:   /* Move from the blocked queue to the tail of */
823       do_the_unblock(event);
824       goto next_thread;             /* handle next event in event queue  */
825       
826     case ResumeThread:  /* Move from the blocked queue to the tail of */
827       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
828       event->tso->gran.blocktime += 
829         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
830       do_the_startthread(event);
831       goto next_thread;             /* handle next event in event queue  */
832       
833     case StartThread:
834       do_the_startthread(event);
835       goto next_thread;             /* handle next event in event queue  */
836       
837     case MoveThread:
838       do_the_movethread(event);
839       goto next_thread;             /* handle next event in event queue  */
840       
841     case MoveSpark:
842       do_the_movespark(event);
843       goto next_thread;             /* handle next event in event queue  */
844       
845     case FindWork:
846       do_the_findwork(event);
847       goto next_thread;             /* handle next event in event queue  */
848       
849     default:
850       barf("Illegal event type %u\n", event->evttype);
851     }  /* switch */
852     
853     /* This point was scheduler_loop in the old RTS */
854
855     IF_DEBUG(gran, belch("GRAN: after main switch"));
856
857     TimeOfLastEvent = CurrentTime[CurrentProc];
858     TimeOfNextEvent = get_time_of_next_event();
859     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
860     // CurrentTSO = ThreadQueueHd;
861
862     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
863                          TimeOfNextEvent));
864
865     if (RtsFlags.GranFlags.Light) 
866       GranSimLight_leave_system(event, &ActiveTSO); 
867
868     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
869
870     IF_DEBUG(gran, 
871              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
872
873     /* in a GranSim setup the TSO stays on the run queue */
874     t = CurrentTSO;
875     /* Take a thread from the run queue. */
876     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
877
878     IF_DEBUG(gran, 
879              fprintf(stderr, "GRAN: About to run current thread, which is\n");
880              G_TSO(t,5));
881
882     context_switch = 0; // turned on via GranYield, checking events and time slice
883
884     IF_DEBUG(gran, 
885              DumpGranEvent(GR_SCHEDULE, t));
886
887     procStatus[CurrentProc] = Busy;
888
889 #elif defined(PAR)
890     if (PendingFetches != END_BF_QUEUE) {
891         processFetches();
892     }
893
894     /* ToDo: phps merge with spark activation above */
895     /* check whether we have local work and send requests if we have none */
896     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
897       /* :-[  no local threads => look out for local sparks */
898       /* the spark pool for the current PE */
899       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
900       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
901           pool->hd < pool->tl) {
902         /* 
903          * ToDo: add GC code check that we really have enough heap afterwards!!
904          * Old comment:
905          * If we're here (no runnable threads) and we have pending
906          * sparks, we must have a space problem.  Get enough space
907          * to turn one of those pending sparks into a
908          * thread... 
909          */
910
911         spark = findSpark(rtsFalse);                /* get a spark */
912         if (spark != (rtsSpark) NULL) {
913           tso = activateSpark(spark);       /* turn the spark into a thread */
914           IF_PAR_DEBUG(schedule,
915                        belch("==== schedule: Created TSO %d (%p); %d threads active",
916                              tso->id, tso, advisory_thread_count));
917
918           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
919             belch("==^^ failed to activate spark");
920             goto next_thread;
921           }               /* otherwise fall through & pick-up new tso */
922         } else {
923           IF_PAR_DEBUG(verbose,
924                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
925                              spark_queue_len(pool)));
926           goto next_thread;
927         }
928       }
929
930       /* If we still have no work we need to send a FISH to get a spark
931          from another PE 
932       */
933       if (EMPTY_RUN_QUEUE()) {
934       /* =8-[  no local sparks => look for work on other PEs */
935         /*
936          * We really have absolutely no work.  Send out a fish
937          * (there may be some out there already), and wait for
938          * something to arrive.  We clearly can't run any threads
939          * until a SCHEDULE or RESUME arrives, and so that's what
940          * we're hoping to see.  (Of course, we still have to
941          * respond to other types of messages.)
942          */
943         TIME now = msTime() /*CURRENT_TIME*/;
944         IF_PAR_DEBUG(verbose, 
945                      belch("--  now=%ld", now));
946         IF_PAR_DEBUG(verbose,
947                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
948                          (last_fish_arrived_at!=0 &&
949                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
950                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
951                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
952                              last_fish_arrived_at,
953                              RtsFlags.ParFlags.fishDelay, now);
954                      });
955         
956         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
957             (last_fish_arrived_at==0 ||
958              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
959           /* outstandingFishes is set in sendFish, processFish;
960              avoid flooding system with fishes via delay */
961           pe = choosePE();
962           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
963                    NEW_FISH_HUNGER);
964
965           // Global statistics: count no. of fishes
966           if (RtsFlags.ParFlags.ParStats.Global &&
967               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
968             globalParStats.tot_fish_mess++;
969           }
970         }
971       
972         receivedFinish = processMessages();
973         goto next_thread;
974       }
975     } else if (PacketsWaiting()) {  /* Look for incoming messages */
976       receivedFinish = processMessages();
977     }
978
979     /* Now we are sure that we have some work available */
980     ASSERT(run_queue_hd != END_TSO_QUEUE);
981
982     /* Take a thread from the run queue, if we have work */
983     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
984     IF_DEBUG(sanity,checkTSO(t));
985
986     /* ToDo: write something to the log-file
987     if (RTSflags.ParFlags.granSimStats && !sameThread)
988         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
989
990     CurrentTSO = t;
991     */
992     /* the spark pool for the current PE */
993     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
994
995     IF_DEBUG(scheduler, 
996              belch("--=^ %d threads, %d sparks on [%#x]", 
997                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
998
999 #if 1
1000     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1001         t && LastTSO && t->id != LastTSO->id && 
1002         LastTSO->why_blocked == NotBlocked && 
1003         LastTSO->what_next != ThreadComplete) {
1004       // if previously scheduled TSO not blocked we have to record the context switch
1005       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1006                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1007     }
1008
1009     if (RtsFlags.ParFlags.ParStats.Full && 
1010         (emitSchedule /* forced emit */ ||
1011         (t && LastTSO && t->id != LastTSO->id))) {
1012       /* 
1013          we are running a different TSO, so write a schedule event to log file
1014          NB: If we use fair scheduling we also have to write  a deschedule 
1015              event for LastTSO; with unfair scheduling we know that the
1016              previous tso has blocked whenever we switch to another tso, so
1017              we don't need it in GUM for now
1018       */
1019       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1020                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1021       emitSchedule = rtsFalse;
1022     }
1023      
1024 #endif
1025 #else /* !GRAN && !PAR */
1026   
1027     /* grab a thread from the run queue
1028      */
1029     ASSERT(run_queue_hd != END_TSO_QUEUE);
1030     t = POP_RUN_QUEUE();
1031     // Sanity check the thread we're about to run.  This can be
1032     // expensive if there is lots of thread switching going on...
1033     IF_DEBUG(sanity,checkTSO(t));
1034 #endif
1035     
1036     grabCapability(&cap);
1037     cap->r.rCurrentTSO = t;
1038     
1039     /* context switches are now initiated by the timer signal, unless
1040      * the user specified "context switch as often as possible", with
1041      * +RTS -C0
1042      */
1043     if (
1044 #ifdef PROFILING
1045         RtsFlags.ProfFlags.profileInterval == 0 ||
1046 #endif
1047         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1048          && (run_queue_hd != END_TSO_QUEUE
1049              || blocked_queue_hd != END_TSO_QUEUE
1050              || sleeping_queue != END_TSO_QUEUE)))
1051         context_switch = 1;
1052     else
1053         context_switch = 0;
1054
1055     RELEASE_LOCK(&sched_mutex);
1056
1057     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1058                               t->id, t, whatNext_strs[t->what_next]));
1059
1060 #ifdef PROFILING
1061     startHeapProfTimer();
1062 #endif
1063
1064     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1065     /* Run the current thread 
1066      */
1067     switch (cap->r.rCurrentTSO->what_next) {
1068     case ThreadKilled:
1069     case ThreadComplete:
1070         /* Thread already finished, return to scheduler. */
1071         ret = ThreadFinished;
1072         break;
1073     case ThreadEnterGHC:
1074         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1075         break;
1076     case ThreadRunGHC:
1077         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1078         break;
1079     case ThreadEnterInterp:
1080         ret = interpretBCO(cap);
1081         break;
1082     default:
1083       barf("schedule: invalid what_next field");
1084     }
1085     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1086     
1087     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1088 #ifdef PROFILING
1089     stopHeapProfTimer();
1090     CCCS = CCS_SYSTEM;
1091 #endif
1092     
1093     ACQUIRE_LOCK(&sched_mutex);
1094
1095 #ifdef SMP
1096     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1097 #elif !defined(GRAN) && !defined(PAR)
1098     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1099 #endif
1100     t = cap->r.rCurrentTSO;
1101     
1102 #if defined(PAR)
1103     /* HACK 675: if the last thread didn't yield, make sure to print a 
1104        SCHEDULE event to the log file when StgRunning the next thread, even
1105        if it is the same one as before */
1106     LastTSO = t; 
1107     TimeOfLastYield = CURRENT_TIME;
1108 #endif
1109
1110     switch (ret) {
1111     case HeapOverflow:
1112 #if defined(GRAN)
1113       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1114       globalGranStats.tot_heapover++;
1115 #elif defined(PAR)
1116       globalParStats.tot_heapover++;
1117 #endif
1118
1119       // did the task ask for a large block?
1120       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1121           // if so, get one and push it on the front of the nursery.
1122           bdescr *bd;
1123           nat blocks;
1124           
1125           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1126
1127           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1128                                    t->id, t,
1129                                    whatNext_strs[t->what_next], blocks));
1130
1131           // don't do this if it would push us over the
1132           // alloc_blocks_lim limit; we'll GC first.
1133           if (alloc_blocks + blocks < alloc_blocks_lim) {
1134
1135               alloc_blocks += blocks;
1136               bd = allocGroup( blocks );
1137
1138               // link the new group into the list
1139               bd->link = cap->r.rCurrentNursery;
1140               bd->u.back = cap->r.rCurrentNursery->u.back;
1141               if (cap->r.rCurrentNursery->u.back != NULL) {
1142                   cap->r.rCurrentNursery->u.back->link = bd;
1143               } else {
1144                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1145                          g0s0->blocks == cap->r.rNursery);
1146                   cap->r.rNursery = g0s0->blocks = bd;
1147               }           
1148               cap->r.rCurrentNursery->u.back = bd;
1149
1150               // initialise it as a nursery block
1151               bd->step = g0s0;
1152               bd->gen_no = 0;
1153               bd->flags = 0;
1154               bd->free = bd->start;
1155
1156               // don't forget to update the block count in g0s0.
1157               g0s0->n_blocks += blocks;
1158               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1159
1160               // now update the nursery to point to the new block
1161               cap->r.rCurrentNursery = bd;
1162
1163               // we might be unlucky and have another thread get on the
1164               // run queue before us and steal the large block, but in that
1165               // case the thread will just end up requesting another large
1166               // block.
1167               PUSH_ON_RUN_QUEUE(t);
1168               break;
1169           }
1170       }
1171
1172       /* make all the running tasks block on a condition variable,
1173        * maybe set context_switch and wait till they all pile in,
1174        * then have them wait on a GC condition variable.
1175        */
1176       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1177                                t->id, t, whatNext_strs[t->what_next]));
1178       threadPaused(t);
1179 #if defined(GRAN)
1180       ASSERT(!is_on_queue(t,CurrentProc));
1181 #elif defined(PAR)
1182       /* Currently we emit a DESCHEDULE event before GC in GUM.
1183          ToDo: either add separate event to distinguish SYSTEM time from rest
1184                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1185       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1186         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1187                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1188         emitSchedule = rtsTrue;
1189       }
1190 #endif
1191       
1192       ready_to_gc = rtsTrue;
1193       context_switch = 1;               /* stop other threads ASAP */
1194       PUSH_ON_RUN_QUEUE(t);
1195       /* actual GC is done at the end of the while loop */
1196       break;
1197       
1198     case StackOverflow:
1199 #if defined(GRAN)
1200       IF_DEBUG(gran, 
1201                DumpGranEvent(GR_DESCHEDULE, t));
1202       globalGranStats.tot_stackover++;
1203 #elif defined(PAR)
1204       // IF_DEBUG(par, 
1205       // DumpGranEvent(GR_DESCHEDULE, t);
1206       globalParStats.tot_stackover++;
1207 #endif
1208       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1209                                t->id, t, whatNext_strs[t->what_next]));
1210       /* just adjust the stack for this thread, then pop it back
1211        * on the run queue.
1212        */
1213       threadPaused(t);
1214       { 
1215         StgMainThread *m;
1216         /* enlarge the stack */
1217         StgTSO *new_t = threadStackOverflow(t);
1218         
1219         /* This TSO has moved, so update any pointers to it from the
1220          * main thread stack.  It better not be on any other queues...
1221          * (it shouldn't be).
1222          */
1223         for (m = main_threads; m != NULL; m = m->link) {
1224           if (m->tso == t) {
1225             m->tso = new_t;
1226           }
1227         }
1228         threadPaused(new_t);
1229         PUSH_ON_RUN_QUEUE(new_t);
1230       }
1231       break;
1232
1233     case ThreadYielding:
1234 #if defined(GRAN)
1235       IF_DEBUG(gran, 
1236                DumpGranEvent(GR_DESCHEDULE, t));
1237       globalGranStats.tot_yields++;
1238 #elif defined(PAR)
1239       // IF_DEBUG(par, 
1240       // DumpGranEvent(GR_DESCHEDULE, t);
1241       globalParStats.tot_yields++;
1242 #endif
1243       /* put the thread back on the run queue.  Then, if we're ready to
1244        * GC, check whether this is the last task to stop.  If so, wake
1245        * up the GC thread.  getThread will block during a GC until the
1246        * GC is finished.
1247        */
1248       IF_DEBUG(scheduler,
1249                if (t->what_next == ThreadEnterInterp) {
1250                    /* ToDo: or maybe a timer expired when we were in Hugs?
1251                     * or maybe someone hit ctrl-C
1252                     */
1253                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1254                          t->id, t, whatNext_strs[t->what_next]);
1255                } else {
1256                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1257                          t->id, t, whatNext_strs[t->what_next]);
1258                }
1259                );
1260
1261       threadPaused(t);
1262
1263       IF_DEBUG(sanity,
1264                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1265                checkTSO(t));
1266       ASSERT(t->link == END_TSO_QUEUE);
1267 #if defined(GRAN)
1268       ASSERT(!is_on_queue(t,CurrentProc));
1269
1270       IF_DEBUG(sanity,
1271                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1272                checkThreadQsSanity(rtsTrue));
1273 #endif
1274 #if defined(PAR)
1275       if (RtsFlags.ParFlags.doFairScheduling) { 
1276         /* this does round-robin scheduling; good for concurrency */
1277         APPEND_TO_RUN_QUEUE(t);
1278       } else {
1279         /* this does unfair scheduling; good for parallelism */
1280         PUSH_ON_RUN_QUEUE(t);
1281       }
1282 #else
1283       /* this does round-robin scheduling; good for concurrency */
1284       APPEND_TO_RUN_QUEUE(t);
1285 #endif
1286 #if defined(GRAN)
1287       /* add a ContinueThread event to actually process the thread */
1288       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1289                 ContinueThread,
1290                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1291       IF_GRAN_DEBUG(bq, 
1292                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1293                G_EVENTQ(0);
1294                G_CURR_THREADQ(0));
1295 #endif /* GRAN */
1296       break;
1297       
1298     case ThreadBlocked:
1299 #if defined(GRAN)
1300       IF_DEBUG(scheduler,
1301                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1302                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1303                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1304
1305       // ??? needed; should emit block before
1306       IF_DEBUG(gran, 
1307                DumpGranEvent(GR_DESCHEDULE, t)); 
1308       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1309       /*
1310         ngoq Dogh!
1311       ASSERT(procStatus[CurrentProc]==Busy || 
1312               ((procStatus[CurrentProc]==Fetching) && 
1313               (t->block_info.closure!=(StgClosure*)NULL)));
1314       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1315           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1316             procStatus[CurrentProc]==Fetching)) 
1317         procStatus[CurrentProc] = Idle;
1318       */
1319 #elif defined(PAR)
1320       IF_DEBUG(scheduler,
1321                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1322                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1323       IF_PAR_DEBUG(bq,
1324
1325                    if (t->block_info.closure!=(StgClosure*)NULL) 
1326                      print_bq(t->block_info.closure));
1327
1328       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1329       blockThread(t);
1330
1331       /* whatever we schedule next, we must log that schedule */
1332       emitSchedule = rtsTrue;
1333
1334 #else /* !GRAN */
1335       /* don't need to do anything.  Either the thread is blocked on
1336        * I/O, in which case we'll have called addToBlockedQueue
1337        * previously, or it's blocked on an MVar or Blackhole, in which
1338        * case it'll be on the relevant queue already.
1339        */
1340       IF_DEBUG(scheduler,
1341                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1342                printThreadBlockage(t);
1343                fprintf(stderr, "\n"));
1344
1345       /* Only for dumping event to log file 
1346          ToDo: do I need this in GranSim, too?
1347       blockThread(t);
1348       */
1349 #endif
1350       threadPaused(t);
1351       break;
1352       
1353     case ThreadFinished:
1354       /* Need to check whether this was a main thread, and if so, signal
1355        * the task that started it with the return value.  If we have no
1356        * more main threads, we probably need to stop all the tasks until
1357        * we get a new one.
1358        */
1359       /* We also end up here if the thread kills itself with an
1360        * uncaught exception, see Exception.hc.
1361        */
1362       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1363 #if defined(GRAN)
1364       endThread(t, CurrentProc); // clean-up the thread
1365 #elif defined(PAR)
1366       /* For now all are advisory -- HWL */
1367       //if(t->priority==AdvisoryPriority) ??
1368       advisory_thread_count--;
1369       
1370 # ifdef DIST
1371       if(t->dist.priority==RevalPriority)
1372         FinishReval(t);
1373 # endif
1374       
1375       if (RtsFlags.ParFlags.ParStats.Full &&
1376           !RtsFlags.ParFlags.ParStats.Suppressed) 
1377         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1378 #endif
1379       break;
1380       
1381     default:
1382       barf("schedule: invalid thread return code %d", (int)ret);
1383     }
1384     
1385 #ifdef SMP
1386     grabCapability(&cap);
1387 #endif
1388
1389 #ifdef PROFILING
1390     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1391         GarbageCollect(GetRoots, rtsTrue);
1392         heapCensus();
1393         performHeapProfile = rtsFalse;
1394         ready_to_gc = rtsFalse; // we already GC'd
1395     }
1396 #endif
1397
1398 #ifdef SMP
1399     if (ready_to_gc && allFreeCapabilities() )
1400 #else
1401     if (ready_to_gc) 
1402 #endif
1403       {
1404       /* everybody back, start the GC.
1405        * Could do it in this thread, or signal a condition var
1406        * to do it in another thread.  Either way, we need to
1407        * broadcast on gc_pending_cond afterward.
1408        */
1409 #if defined(RTS_SUPPORTS_THREADS)
1410       IF_DEBUG(scheduler,sched_belch("doing GC"));
1411 #endif
1412       GarbageCollect(GetRoots,rtsFalse);
1413       ready_to_gc = rtsFalse;
1414 #ifdef SMP
1415       broadcastCondition(&gc_pending_cond);
1416 #endif
1417 #if defined(GRAN)
1418       /* add a ContinueThread event to continue execution of current thread */
1419       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1420                 ContinueThread,
1421                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1422       IF_GRAN_DEBUG(bq, 
1423                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1424                G_EVENTQ(0);
1425                G_CURR_THREADQ(0));
1426 #endif /* GRAN */
1427     }
1428
1429 #if defined(GRAN)
1430   next_thread:
1431     IF_GRAN_DEBUG(unused,
1432                   print_eventq(EventHd));
1433
1434     event = get_next_event();
1435 #elif defined(PAR)
1436   next_thread:
1437     /* ToDo: wait for next message to arrive rather than busy wait */
1438 #endif /* GRAN */
1439
1440   } /* end of while(1) */
1441
1442   IF_PAR_DEBUG(verbose,
1443                belch("== Leaving schedule() after having received Finish"));
1444 }
1445
1446 /* ---------------------------------------------------------------------------
1447  * deleteAllThreads():  kill all the live threads.
1448  *
1449  * This is used when we catch a user interrupt (^C), before performing
1450  * any necessary cleanups and running finalizers.
1451  * ------------------------------------------------------------------------- */
1452    
1453 void deleteAllThreads ( void )
1454 {
1455   StgTSO* t;
1456   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1457   for (t = run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1458       deleteThread(t);
1459   }
1460   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = t->link) {
1461       deleteThread(t);
1462   }
1463   for (t = sleeping_queue; t != END_TSO_QUEUE; t = t->link) {
1464       deleteThread(t);
1465   }
1466   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1467   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1468   sleeping_queue = END_TSO_QUEUE;
1469 }
1470
1471 /* startThread and  insertThread are now in GranSim.c -- HWL */
1472
1473
1474 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1475 //@subsection Suspend and Resume
1476
1477 /* ---------------------------------------------------------------------------
1478  * Suspending & resuming Haskell threads.
1479  * 
1480  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1481  * its capability before calling the C function.  This allows another
1482  * task to pick up the capability and carry on running Haskell
1483  * threads.  It also means that if the C call blocks, it won't lock
1484  * the whole system.
1485  *
1486  * The Haskell thread making the C call is put to sleep for the
1487  * duration of the call, on the susepended_ccalling_threads queue.  We
1488  * give out a token to the task, which it can use to resume the thread
1489  * on return from the C function.
1490  * ------------------------------------------------------------------------- */
1491    
1492 StgInt
1493 suspendThread( StgRegTable *reg )
1494 {
1495   nat tok;
1496   Capability *cap;
1497
1498   /* assume that *reg is a pointer to the StgRegTable part
1499    * of a Capability.
1500    */
1501   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1502
1503   ACQUIRE_LOCK(&sched_mutex);
1504
1505   IF_DEBUG(scheduler,
1506            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1507
1508   threadPaused(cap->r.rCurrentTSO);
1509   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1510   suspended_ccalling_threads = cap->r.rCurrentTSO;
1511
1512   /* Use the thread ID as the token; it should be unique */
1513   tok = cap->r.rCurrentTSO->id;
1514
1515   /* Hand back capability */
1516   releaseCapability(&cap);
1517   
1518 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1519   IF_DEBUG(scheduler, sched_belch("thread %d leaving RTS\n", tok));
1520   startTask(taskStart);
1521 #endif
1522   
1523   RELEASE_LOCK(&sched_mutex);
1524   RELEASE_LOCK(&rts_mutex);
1525   return tok; 
1526 }
1527
1528 StgRegTable *
1529 resumeThread( StgInt tok )
1530 {
1531   StgTSO *tso, **prev;
1532   Capability *cap;
1533
1534 #if defined(THREADED_RTS)
1535   IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok));
1536   ACQUIRE_LOCK(&sched_mutex);
1537   ext_threads_waiting++;
1538   IF_DEBUG(scheduler, sched_belch("thread %d returning, ext_thread count: %d.\n", tok, ext_threads_waiting));
1539   RELEASE_LOCK(&sched_mutex);
1540   
1541   IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok));
1542   ACQUIRE_LOCK(&rts_mutex);
1543   ext_threads_waiting--;
1544   IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok));
1545 #endif
1546
1547 #if defined(THREADED_RTS)
1548   /* Free up any RTS-blocked threads. */
1549   broadcastCondition(&thread_ready_cond);
1550 #endif
1551
1552   /* Remove the thread off of the suspended list */
1553   prev = &suspended_ccalling_threads;
1554   for (tso = suspended_ccalling_threads; 
1555        tso != END_TSO_QUEUE; 
1556        prev = &tso->link, tso = tso->link) {
1557     if (tso->id == (StgThreadID)tok) {
1558       *prev = tso->link;
1559       break;
1560     }
1561   }
1562   if (tso == END_TSO_QUEUE) {
1563     barf("resumeThread: thread not found");
1564   }
1565   tso->link = END_TSO_QUEUE;
1566
1567 #if defined(SMP)
1568   while ( noFreeCapabilities() ) {
1569     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1570     waitCondition(&thread_ready_cond, &sched_mutex);
1571     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1572   }
1573 #endif
1574
1575   grabCapability(&cap);
1576
1577   cap->r.rCurrentTSO = tso;
1578
1579   return &cap->r;
1580 }
1581
1582
1583 /* ---------------------------------------------------------------------------
1584  * Static functions
1585  * ------------------------------------------------------------------------ */
1586 static void unblockThread(StgTSO *tso);
1587
1588 /* ---------------------------------------------------------------------------
1589  * Comparing Thread ids.
1590  *
1591  * This is used from STG land in the implementation of the
1592  * instances of Eq/Ord for ThreadIds.
1593  * ------------------------------------------------------------------------ */
1594
1595 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1596
1597   StgThreadID id1 = tso1->id; 
1598   StgThreadID id2 = tso2->id;
1599  
1600   if (id1 < id2) return (-1);
1601   if (id1 > id2) return 1;
1602   return 0;
1603 }
1604
1605 /* ---------------------------------------------------------------------------
1606  * Fetching the ThreadID from an StgTSO.
1607  *
1608  * This is used in the implementation of Show for ThreadIds.
1609  * ------------------------------------------------------------------------ */
1610 int rts_getThreadId(const StgTSO *tso) 
1611 {
1612   return tso->id;
1613 }
1614
1615 /* ---------------------------------------------------------------------------
1616    Create a new thread.
1617
1618    The new thread starts with the given stack size.  Before the
1619    scheduler can run, however, this thread needs to have a closure
1620    (and possibly some arguments) pushed on its stack.  See
1621    pushClosure() in Schedule.h.
1622
1623    createGenThread() and createIOThread() (in SchedAPI.h) are
1624    convenient packaged versions of this function.
1625
1626    currently pri (priority) is only used in a GRAN setup -- HWL
1627    ------------------------------------------------------------------------ */
1628 //@cindex createThread
1629 #if defined(GRAN)
1630 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1631 StgTSO *
1632 createThread(nat stack_size, StgInt pri)
1633 {
1634   return createThread_(stack_size, rtsFalse, pri);
1635 }
1636
1637 static StgTSO *
1638 createThread_(nat size, rtsBool have_lock, StgInt pri)
1639 {
1640 #else
1641 StgTSO *
1642 createThread(nat stack_size)
1643 {
1644   return createThread_(stack_size, rtsFalse);
1645 }
1646
1647 static StgTSO *
1648 createThread_(nat size, rtsBool have_lock)
1649 {
1650 #endif
1651
1652     StgTSO *tso;
1653     nat stack_size;
1654
1655     /* First check whether we should create a thread at all */
1656 #if defined(PAR)
1657   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1658   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1659     threadsIgnored++;
1660     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1661           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1662     return END_TSO_QUEUE;
1663   }
1664   threadsCreated++;
1665 #endif
1666
1667 #if defined(GRAN)
1668   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1669 #endif
1670
1671   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1672
1673   /* catch ridiculously small stack sizes */
1674   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1675     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1676   }
1677
1678   stack_size = size - TSO_STRUCT_SIZEW;
1679
1680   tso = (StgTSO *)allocate(size);
1681   TICK_ALLOC_TSO(stack_size, 0);
1682
1683   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1684 #if defined(GRAN)
1685   SET_GRAN_HDR(tso, ThisPE);
1686 #endif
1687   tso->what_next     = ThreadEnterGHC;
1688
1689   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1690    * protect the increment operation on next_thread_id.
1691    * In future, we could use an atomic increment instead.
1692    */
1693   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1694   tso->id = next_thread_id++; 
1695   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1696
1697   tso->why_blocked  = NotBlocked;
1698   tso->blocked_exceptions = NULL;
1699
1700   tso->stack_size   = stack_size;
1701   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1702                               - TSO_STRUCT_SIZEW;
1703   tso->sp           = (P_)&(tso->stack) + stack_size;
1704
1705 #ifdef PROFILING
1706   tso->prof.CCCS = CCS_MAIN;
1707 #endif
1708
1709   /* put a stop frame on the stack */
1710   tso->sp -= sizeofW(StgStopFrame);
1711   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1712   tso->su = (StgUpdateFrame*)tso->sp;
1713
1714   // ToDo: check this
1715 #if defined(GRAN)
1716   tso->link = END_TSO_QUEUE;
1717   /* uses more flexible routine in GranSim */
1718   insertThread(tso, CurrentProc);
1719 #else
1720   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1721    * from its creation
1722    */
1723 #endif
1724
1725 #if defined(GRAN) 
1726   if (RtsFlags.GranFlags.GranSimStats.Full) 
1727     DumpGranEvent(GR_START,tso);
1728 #elif defined(PAR)
1729   if (RtsFlags.ParFlags.ParStats.Full) 
1730     DumpGranEvent(GR_STARTQ,tso);
1731   /* HACk to avoid SCHEDULE 
1732      LastTSO = tso; */
1733 #endif
1734
1735   /* Link the new thread on the global thread list.
1736    */
1737   tso->global_link = all_threads;
1738   all_threads = tso;
1739
1740 #if defined(DIST)
1741   tso->dist.priority = MandatoryPriority; //by default that is...
1742 #endif
1743
1744 #if defined(GRAN)
1745   tso->gran.pri = pri;
1746 # if defined(DEBUG)
1747   tso->gran.magic = TSO_MAGIC; // debugging only
1748 # endif
1749   tso->gran.sparkname   = 0;
1750   tso->gran.startedat   = CURRENT_TIME; 
1751   tso->gran.exported    = 0;
1752   tso->gran.basicblocks = 0;
1753   tso->gran.allocs      = 0;
1754   tso->gran.exectime    = 0;
1755   tso->gran.fetchtime   = 0;
1756   tso->gran.fetchcount  = 0;
1757   tso->gran.blocktime   = 0;
1758   tso->gran.blockcount  = 0;
1759   tso->gran.blockedat   = 0;
1760   tso->gran.globalsparks = 0;
1761   tso->gran.localsparks  = 0;
1762   if (RtsFlags.GranFlags.Light)
1763     tso->gran.clock  = Now; /* local clock */
1764   else
1765     tso->gran.clock  = 0;
1766
1767   IF_DEBUG(gran,printTSO(tso));
1768 #elif defined(PAR)
1769 # if defined(DEBUG)
1770   tso->par.magic = TSO_MAGIC; // debugging only
1771 # endif
1772   tso->par.sparkname   = 0;
1773   tso->par.startedat   = CURRENT_TIME; 
1774   tso->par.exported    = 0;
1775   tso->par.basicblocks = 0;
1776   tso->par.allocs      = 0;
1777   tso->par.exectime    = 0;
1778   tso->par.fetchtime   = 0;
1779   tso->par.fetchcount  = 0;
1780   tso->par.blocktime   = 0;
1781   tso->par.blockcount  = 0;
1782   tso->par.blockedat   = 0;
1783   tso->par.globalsparks = 0;
1784   tso->par.localsparks  = 0;
1785 #endif
1786
1787 #if defined(GRAN)
1788   globalGranStats.tot_threads_created++;
1789   globalGranStats.threads_created_on_PE[CurrentProc]++;
1790   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1791   globalGranStats.tot_sq_probes++;
1792 #elif defined(PAR)
1793   // collect parallel global statistics (currently done together with GC stats)
1794   if (RtsFlags.ParFlags.ParStats.Global &&
1795       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1796     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1797     globalParStats.tot_threads_created++;
1798   }
1799 #endif 
1800
1801 #if defined(GRAN)
1802   IF_GRAN_DEBUG(pri,
1803                 belch("==__ schedule: Created TSO %d (%p);",
1804                       CurrentProc, tso, tso->id));
1805 #elif defined(PAR)
1806     IF_PAR_DEBUG(verbose,
1807                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1808                        tso->id, tso, advisory_thread_count));
1809 #else
1810   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1811                                  tso->id, tso->stack_size));
1812 #endif    
1813   return tso;
1814 }
1815
1816 #if defined(PAR)
1817 /* RFP:
1818    all parallel thread creation calls should fall through the following routine.
1819 */
1820 StgTSO *
1821 createSparkThread(rtsSpark spark) 
1822 { StgTSO *tso;
1823   ASSERT(spark != (rtsSpark)NULL);
1824   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1825   { threadsIgnored++;
1826     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1827           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1828     return END_TSO_QUEUE;
1829   }
1830   else
1831   { threadsCreated++;
1832     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1833     if (tso==END_TSO_QUEUE)     
1834       barf("createSparkThread: Cannot create TSO");
1835 #if defined(DIST)
1836     tso->priority = AdvisoryPriority;
1837 #endif
1838     pushClosure(tso,spark);
1839     PUSH_ON_RUN_QUEUE(tso);
1840     advisory_thread_count++;    
1841   }
1842   return tso;
1843 }
1844 #endif
1845
1846 /*
1847   Turn a spark into a thread.
1848   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1849 */
1850 #if defined(PAR)
1851 //@cindex activateSpark
1852 StgTSO *
1853 activateSpark (rtsSpark spark) 
1854 {
1855   StgTSO *tso;
1856
1857   tso = createSparkThread(spark);
1858   if (RtsFlags.ParFlags.ParStats.Full) {   
1859     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1860     IF_PAR_DEBUG(verbose,
1861                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1862                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1863   }
1864   // ToDo: fwd info on local/global spark to thread -- HWL
1865   // tso->gran.exported =  spark->exported;
1866   // tso->gran.locked =   !spark->global;
1867   // tso->gran.sparkname = spark->name;
1868
1869   return tso;
1870 }
1871 #endif
1872
1873 /* ---------------------------------------------------------------------------
1874  * scheduleThread()
1875  *
1876  * scheduleThread puts a thread on the head of the runnable queue.
1877  * This will usually be done immediately after a thread is created.
1878  * The caller of scheduleThread must create the thread using e.g.
1879  * createThread and push an appropriate closure
1880  * on this thread's stack before the scheduler is invoked.
1881  * ------------------------------------------------------------------------ */
1882
1883 void
1884 scheduleThread(StgTSO *tso)
1885 {
1886   ACQUIRE_LOCK(&sched_mutex);
1887
1888   /* Put the new thread on the head of the runnable queue.  The caller
1889    * better push an appropriate closure on this thread's stack
1890    * beforehand.  In the SMP case, the thread may start running as
1891    * soon as we release the scheduler lock below.
1892    */
1893   PUSH_ON_RUN_QUEUE(tso);
1894   THREAD_RUNNABLE();
1895
1896 #if 0
1897   IF_DEBUG(scheduler,printTSO(tso));
1898 #endif
1899   RELEASE_LOCK(&sched_mutex);
1900 }
1901
1902 /* ---------------------------------------------------------------------------
1903  * initScheduler()
1904  *
1905  * Initialise the scheduler.  This resets all the queues - if the
1906  * queues contained any threads, they'll be garbage collected at the
1907  * next pass.
1908  *
1909  * ------------------------------------------------------------------------ */
1910
1911 #ifdef SMP
1912 static void
1913 term_handler(int sig STG_UNUSED)
1914 {
1915   stat_workerStop();
1916   ACQUIRE_LOCK(&term_mutex);
1917   await_death--;
1918   RELEASE_LOCK(&term_mutex);
1919   shutdownThread();
1920 }
1921 #endif
1922
1923 void 
1924 initScheduler(void)
1925 {
1926 #if defined(GRAN)
1927   nat i;
1928
1929   for (i=0; i<=MAX_PROC; i++) {
1930     run_queue_hds[i]      = END_TSO_QUEUE;
1931     run_queue_tls[i]      = END_TSO_QUEUE;
1932     blocked_queue_hds[i]  = END_TSO_QUEUE;
1933     blocked_queue_tls[i]  = END_TSO_QUEUE;
1934     ccalling_threadss[i]  = END_TSO_QUEUE;
1935     sleeping_queue        = END_TSO_QUEUE;
1936   }
1937 #else
1938   run_queue_hd      = END_TSO_QUEUE;
1939   run_queue_tl      = END_TSO_QUEUE;
1940   blocked_queue_hd  = END_TSO_QUEUE;
1941   blocked_queue_tl  = END_TSO_QUEUE;
1942   sleeping_queue    = END_TSO_QUEUE;
1943 #endif 
1944
1945   suspended_ccalling_threads  = END_TSO_QUEUE;
1946
1947   main_threads = NULL;
1948   all_threads  = END_TSO_QUEUE;
1949
1950   context_switch = 0;
1951   interrupted    = 0;
1952
1953   RtsFlags.ConcFlags.ctxtSwitchTicks =
1954       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1955       
1956 #if defined(RTS_SUPPORTS_THREADS)
1957   /* Initialise the mutex and condition variables used by
1958    * the scheduler. */
1959   initMutex(&rts_mutex);
1960   initMutex(&sched_mutex);
1961   initMutex(&term_mutex);
1962 #if defined(THREADED_RTS)
1963   initMutex(&thread_ready_aux_mutex);
1964 #endif
1965   
1966   initCondition(&thread_ready_cond);
1967   initCondition(&gc_pending_cond);
1968 #endif
1969
1970 #if defined(THREADED_RTS)
1971   /* Grab big lock */
1972   ACQUIRE_LOCK(&rts_mutex);
1973   IF_DEBUG(scheduler, 
1974            sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId()));
1975 #endif
1976
1977   /* Install the SIGHUP handler */
1978 #ifdef SMP
1979   {
1980     struct sigaction action,oact;
1981
1982     action.sa_handler = term_handler;
1983     sigemptyset(&action.sa_mask);
1984     action.sa_flags = 0;
1985     if (sigaction(SIGTERM, &action, &oact) != 0) {
1986       barf("can't install TERM handler");
1987     }
1988   }
1989 #endif
1990
1991   /* A capability holds the state a native thread needs in
1992    * order to execute STG code. At least one capability is
1993    * floating around (only SMP builds have more than one).
1994    */
1995   initCapabilities();
1996   
1997 #if defined(RTS_SUPPORTS_THREADS)
1998     /* start our haskell execution tasks */
1999 # if defined(SMP)
2000     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2001 # else
2002     startTaskManager(0,taskStart);
2003 # endif
2004 #endif
2005
2006 #if /* defined(SMP) ||*/ defined(PAR)
2007   initSparkPools();
2008 #endif
2009 }
2010
2011 void
2012 exitScheduler( void )
2013 {
2014 #if defined(RTS_SUPPORTS_THREADS)
2015   stopTaskManager();
2016 #endif
2017 }
2018
2019 /* -----------------------------------------------------------------------------
2020    Managing the per-task allocation areas.
2021    
2022    Each capability comes with an allocation area.  These are
2023    fixed-length block lists into which allocation can be done.
2024
2025    ToDo: no support for two-space collection at the moment???
2026    -------------------------------------------------------------------------- */
2027
2028 /* -----------------------------------------------------------------------------
2029  * waitThread is the external interface for running a new computation
2030  * and waiting for the result.
2031  *
2032  * In the non-SMP case, we create a new main thread, push it on the 
2033  * main-thread stack, and invoke the scheduler to run it.  The
2034  * scheduler will return when the top main thread on the stack has
2035  * completed or died, and fill in the necessary fields of the
2036  * main_thread structure.
2037  *
2038  * In the SMP case, we create a main thread as before, but we then
2039  * create a new condition variable and sleep on it.  When our new
2040  * main thread has completed, we'll be woken up and the status/result
2041  * will be in the main_thread struct.
2042  * -------------------------------------------------------------------------- */
2043
2044 int 
2045 howManyThreadsAvail ( void )
2046 {
2047    int i = 0;
2048    StgTSO* q;
2049    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2050       i++;
2051    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2052       i++;
2053    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2054       i++;
2055    return i;
2056 }
2057
2058 void
2059 finishAllThreads ( void )
2060 {
2061    do {
2062       while (run_queue_hd != END_TSO_QUEUE) {
2063          waitThread ( run_queue_hd, NULL );
2064       }
2065       while (blocked_queue_hd != END_TSO_QUEUE) {
2066          waitThread ( blocked_queue_hd, NULL );
2067       }
2068       while (sleeping_queue != END_TSO_QUEUE) {
2069          waitThread ( blocked_queue_hd, NULL );
2070       }
2071    } while 
2072       (blocked_queue_hd != END_TSO_QUEUE || 
2073        run_queue_hd     != END_TSO_QUEUE ||
2074        sleeping_queue   != END_TSO_QUEUE);
2075 }
2076
2077 SchedulerStatus
2078 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2079 {
2080   StgMainThread *m;
2081   SchedulerStatus stat;
2082
2083   ACQUIRE_LOCK(&sched_mutex);
2084   
2085   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2086
2087   m->tso = tso;
2088   m->ret = ret;
2089   m->stat = NoStatus;
2090 #if defined(RTS_SUPPORTS_THREADS)
2091   initCondition(&m->wakeup);
2092 #endif
2093
2094   m->link = main_threads;
2095   main_threads = m;
2096
2097   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2098                               m->tso->id));
2099
2100 #ifdef SMP
2101   do {
2102     waitCondition(&m->wakeup, &sched_mutex);
2103   } while (m->stat == NoStatus);
2104 #elif defined(GRAN)
2105   /* GranSim specific init */
2106   CurrentTSO = m->tso;                // the TSO to run
2107   procStatus[MainProc] = Busy;        // status of main PE
2108   CurrentProc = MainProc;             // PE to run it on
2109
2110   schedule();
2111 #else
2112   RELEASE_LOCK(&sched_mutex);
2113   schedule();
2114   ASSERT(m->stat != NoStatus);
2115 #endif
2116
2117   stat = m->stat;
2118
2119 #if defined(RTS_SUPPORTS_THREADS)
2120   closeCondition(&m->wakeup);
2121 #endif
2122
2123   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2124                               m->tso->id));
2125   free(m);
2126
2127   RELEASE_LOCK(&sched_mutex);
2128
2129   return stat;
2130 }
2131
2132 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2133 //@subsection Run queue code 
2134
2135 #if 0
2136 /* 
2137    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2138        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2139        implicit global variable that has to be correct when calling these
2140        fcts -- HWL 
2141 */
2142
2143 /* Put the new thread on the head of the runnable queue.
2144  * The caller of createThread better push an appropriate closure
2145  * on this thread's stack before the scheduler is invoked.
2146  */
2147 static /* inline */ void
2148 add_to_run_queue(tso)
2149 StgTSO* tso; 
2150 {
2151   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2152   tso->link = run_queue_hd;
2153   run_queue_hd = tso;
2154   if (run_queue_tl == END_TSO_QUEUE) {
2155     run_queue_tl = tso;
2156   }
2157 }
2158
2159 /* Put the new thread at the end of the runnable queue. */
2160 static /* inline */ void
2161 push_on_run_queue(tso)
2162 StgTSO* tso; 
2163 {
2164   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2165   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2166   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2167   if (run_queue_hd == END_TSO_QUEUE) {
2168     run_queue_hd = tso;
2169   } else {
2170     run_queue_tl->link = tso;
2171   }
2172   run_queue_tl = tso;
2173 }
2174
2175 /* 
2176    Should be inlined because it's used very often in schedule.  The tso
2177    argument is actually only needed in GranSim, where we want to have the
2178    possibility to schedule *any* TSO on the run queue, irrespective of the
2179    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2180    the run queue and dequeue the tso, adjusting the links in the queue. 
2181 */
2182 //@cindex take_off_run_queue
2183 static /* inline */ StgTSO*
2184 take_off_run_queue(StgTSO *tso) {
2185   StgTSO *t, *prev;
2186
2187   /* 
2188      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2189
2190      if tso is specified, unlink that tso from the run_queue (doesn't have
2191      to be at the beginning of the queue); GranSim only 
2192   */
2193   if (tso!=END_TSO_QUEUE) {
2194     /* find tso in queue */
2195     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2196          t!=END_TSO_QUEUE && t!=tso;
2197          prev=t, t=t->link) 
2198       /* nothing */ ;
2199     ASSERT(t==tso);
2200     /* now actually dequeue the tso */
2201     if (prev!=END_TSO_QUEUE) {
2202       ASSERT(run_queue_hd!=t);
2203       prev->link = t->link;
2204     } else {
2205       /* t is at beginning of thread queue */
2206       ASSERT(run_queue_hd==t);
2207       run_queue_hd = t->link;
2208     }
2209     /* t is at end of thread queue */
2210     if (t->link==END_TSO_QUEUE) {
2211       ASSERT(t==run_queue_tl);
2212       run_queue_tl = prev;
2213     } else {
2214       ASSERT(run_queue_tl!=t);
2215     }
2216     t->link = END_TSO_QUEUE;
2217   } else {
2218     /* take tso from the beginning of the queue; std concurrent code */
2219     t = run_queue_hd;
2220     if (t != END_TSO_QUEUE) {
2221       run_queue_hd = t->link;
2222       t->link = END_TSO_QUEUE;
2223       if (run_queue_hd == END_TSO_QUEUE) {
2224         run_queue_tl = END_TSO_QUEUE;
2225       }
2226     }
2227   }
2228   return t;
2229 }
2230
2231 #endif /* 0 */
2232
2233 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2234 //@subsection Garbage Collextion Routines
2235
2236 /* ---------------------------------------------------------------------------
2237    Where are the roots that we know about?
2238
2239         - all the threads on the runnable queue
2240         - all the threads on the blocked queue
2241         - all the threads on the sleeping queue
2242         - all the thread currently executing a _ccall_GC
2243         - all the "main threads"
2244      
2245    ------------------------------------------------------------------------ */
2246
2247 /* This has to be protected either by the scheduler monitor, or by the
2248         garbage collection monitor (probably the latter).
2249         KH @ 25/10/99
2250 */
2251
2252 void
2253 GetRoots(evac_fn evac)
2254 {
2255   StgMainThread *m;
2256
2257 #if defined(GRAN)
2258   {
2259     nat i;
2260     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2261       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2262           evac((StgClosure **)&run_queue_hds[i]);
2263       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2264           evac((StgClosure **)&run_queue_tls[i]);
2265       
2266       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2267           evac((StgClosure **)&blocked_queue_hds[i]);
2268       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2269           evac((StgClosure **)&blocked_queue_tls[i]);
2270       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2271           evac((StgClosure **)&ccalling_threads[i]);
2272     }
2273   }
2274
2275   markEventQueue();
2276
2277 #else /* !GRAN */
2278   if (run_queue_hd != END_TSO_QUEUE) {
2279       ASSERT(run_queue_tl != END_TSO_QUEUE);
2280       evac((StgClosure **)&run_queue_hd);
2281       evac((StgClosure **)&run_queue_tl);
2282   }
2283   
2284   if (blocked_queue_hd != END_TSO_QUEUE) {
2285       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2286       evac((StgClosure **)&blocked_queue_hd);
2287       evac((StgClosure **)&blocked_queue_tl);
2288   }
2289   
2290   if (sleeping_queue != END_TSO_QUEUE) {
2291       evac((StgClosure **)&sleeping_queue);
2292   }
2293 #endif 
2294
2295   for (m = main_threads; m != NULL; m = m->link) {
2296       evac((StgClosure **)&m->tso);
2297   }
2298   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2299       evac((StgClosure **)&suspended_ccalling_threads);
2300   }
2301
2302 #if defined(PAR) || defined(GRAN)
2303   markSparkQueue(evac);
2304 #endif
2305 }
2306
2307 /* -----------------------------------------------------------------------------
2308    performGC
2309
2310    This is the interface to the garbage collector from Haskell land.
2311    We provide this so that external C code can allocate and garbage
2312    collect when called from Haskell via _ccall_GC.
2313
2314    It might be useful to provide an interface whereby the programmer
2315    can specify more roots (ToDo).
2316    
2317    This needs to be protected by the GC condition variable above.  KH.
2318    -------------------------------------------------------------------------- */
2319
2320 void (*extra_roots)(evac_fn);
2321
2322 void
2323 performGC(void)
2324 {
2325   GarbageCollect(GetRoots,rtsFalse);
2326 }
2327
2328 void
2329 performMajorGC(void)
2330 {
2331   GarbageCollect(GetRoots,rtsTrue);
2332 }
2333
2334 static void
2335 AllRoots(evac_fn evac)
2336 {
2337     GetRoots(evac);             // the scheduler's roots
2338     extra_roots(evac);          // the user's roots
2339 }
2340
2341 void
2342 performGCWithRoots(void (*get_roots)(evac_fn))
2343 {
2344   extra_roots = get_roots;
2345   GarbageCollect(AllRoots,rtsFalse);
2346 }
2347
2348 /* -----------------------------------------------------------------------------
2349    Stack overflow
2350
2351    If the thread has reached its maximum stack size, then raise the
2352    StackOverflow exception in the offending thread.  Otherwise
2353    relocate the TSO into a larger chunk of memory and adjust its stack
2354    size appropriately.
2355    -------------------------------------------------------------------------- */
2356
2357 static StgTSO *
2358 threadStackOverflow(StgTSO *tso)
2359 {
2360   nat new_stack_size, new_tso_size, diff, stack_words;
2361   StgPtr new_sp;
2362   StgTSO *dest;
2363
2364   IF_DEBUG(sanity,checkTSO(tso));
2365   if (tso->stack_size >= tso->max_stack_size) {
2366
2367     IF_DEBUG(gc,
2368              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2369                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2370              /* If we're debugging, just print out the top of the stack */
2371              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2372                                               tso->sp+64)));
2373
2374     /* Send this thread the StackOverflow exception */
2375     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2376     return tso;
2377   }
2378
2379   /* Try to double the current stack size.  If that takes us over the
2380    * maximum stack size for this thread, then use the maximum instead.
2381    * Finally round up so the TSO ends up as a whole number of blocks.
2382    */
2383   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2384   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2385                                        TSO_STRUCT_SIZE)/sizeof(W_);
2386   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2387   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2388
2389   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2390
2391   dest = (StgTSO *)allocate(new_tso_size);
2392   TICK_ALLOC_TSO(new_stack_size,0);
2393
2394   /* copy the TSO block and the old stack into the new area */
2395   memcpy(dest,tso,TSO_STRUCT_SIZE);
2396   stack_words = tso->stack + tso->stack_size - tso->sp;
2397   new_sp = (P_)dest + new_tso_size - stack_words;
2398   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2399
2400   /* relocate the stack pointers... */
2401   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2402   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2403   dest->sp    = new_sp;
2404   dest->stack_size = new_stack_size;
2405         
2406   /* and relocate the update frame list */
2407   relocate_stack(dest, diff);
2408
2409   /* Mark the old TSO as relocated.  We have to check for relocated
2410    * TSOs in the garbage collector and any primops that deal with TSOs.
2411    *
2412    * It's important to set the sp and su values to just beyond the end
2413    * of the stack, so we don't attempt to scavenge any part of the
2414    * dead TSO's stack.
2415    */
2416   tso->what_next = ThreadRelocated;
2417   tso->link = dest;
2418   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2419   tso->su = (StgUpdateFrame *)tso->sp;
2420   tso->why_blocked = NotBlocked;
2421   dest->mut_link = NULL;
2422
2423   IF_PAR_DEBUG(verbose,
2424                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2425                      tso->id, tso, tso->stack_size);
2426                /* If we're debugging, just print out the top of the stack */
2427                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2428                                                 tso->sp+64)));
2429   
2430   IF_DEBUG(sanity,checkTSO(tso));
2431 #if 0
2432   IF_DEBUG(scheduler,printTSO(dest));
2433 #endif
2434
2435   return dest;
2436 }
2437
2438 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2439 //@subsection Blocking Queue Routines
2440
2441 /* ---------------------------------------------------------------------------
2442    Wake up a queue that was blocked on some resource.
2443    ------------------------------------------------------------------------ */
2444
2445 #if defined(GRAN)
2446 static inline void
2447 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2448 {
2449 }
2450 #elif defined(PAR)
2451 static inline void
2452 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2453 {
2454   /* write RESUME events to log file and
2455      update blocked and fetch time (depending on type of the orig closure) */
2456   if (RtsFlags.ParFlags.ParStats.Full) {
2457     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2458                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2459                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2460     if (EMPTY_RUN_QUEUE())
2461       emitSchedule = rtsTrue;
2462
2463     switch (get_itbl(node)->type) {
2464         case FETCH_ME_BQ:
2465           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2466           break;
2467         case RBH:
2468         case FETCH_ME:
2469         case BLACKHOLE_BQ:
2470           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2471           break;
2472 #ifdef DIST
2473         case MVAR:
2474           break;
2475 #endif    
2476         default:
2477           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2478         }
2479       }
2480 }
2481 #endif
2482
2483 #if defined(GRAN)
2484 static StgBlockingQueueElement *
2485 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2486 {
2487     StgTSO *tso;
2488     PEs node_loc, tso_loc;
2489
2490     node_loc = where_is(node); // should be lifted out of loop
2491     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2492     tso_loc = where_is((StgClosure *)tso);
2493     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2494       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2495       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2496       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2497       // insertThread(tso, node_loc);
2498       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2499                 ResumeThread,
2500                 tso, node, (rtsSpark*)NULL);
2501       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2502       // len_local++;
2503       // len++;
2504     } else { // TSO is remote (actually should be FMBQ)
2505       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2506                                   RtsFlags.GranFlags.Costs.gunblocktime +
2507                                   RtsFlags.GranFlags.Costs.latency;
2508       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2509                 UnblockThread,
2510                 tso, node, (rtsSpark*)NULL);
2511       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2512       // len++;
2513     }
2514     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2515     IF_GRAN_DEBUG(bq,
2516                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2517                           (node_loc==tso_loc ? "Local" : "Global"), 
2518                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2519     tso->block_info.closure = NULL;
2520     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2521                              tso->id, tso));
2522 }
2523 #elif defined(PAR)
2524 static StgBlockingQueueElement *
2525 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2526 {
2527     StgBlockingQueueElement *next;
2528
2529     switch (get_itbl(bqe)->type) {
2530     case TSO:
2531       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2532       /* if it's a TSO just push it onto the run_queue */
2533       next = bqe->link;
2534       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2535       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2536       THREAD_RUNNABLE();
2537       unblockCount(bqe, node);
2538       /* reset blocking status after dumping event */
2539       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2540       break;
2541
2542     case BLOCKED_FETCH:
2543       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2544       next = bqe->link;
2545       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2546       PendingFetches = (StgBlockedFetch *)bqe;
2547       break;
2548
2549 # if defined(DEBUG)
2550       /* can ignore this case in a non-debugging setup; 
2551          see comments on RBHSave closures above */
2552     case CONSTR:
2553       /* check that the closure is an RBHSave closure */
2554       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2555              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2556              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2557       break;
2558
2559     default:
2560       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2561            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2562            (StgClosure *)bqe);
2563 # endif
2564     }
2565   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2566   return next;
2567 }
2568
2569 #else /* !GRAN && !PAR */
2570 static StgTSO *
2571 unblockOneLocked(StgTSO *tso)
2572 {
2573   StgTSO *next;
2574
2575   ASSERT(get_itbl(tso)->type == TSO);
2576   ASSERT(tso->why_blocked != NotBlocked);
2577   tso->why_blocked = NotBlocked;
2578   next = tso->link;
2579   PUSH_ON_RUN_QUEUE(tso);
2580   THREAD_RUNNABLE();
2581   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2582   return next;
2583 }
2584 #endif
2585
2586 #if defined(GRAN) || defined(PAR)
2587 inline StgBlockingQueueElement *
2588 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2589 {
2590   ACQUIRE_LOCK(&sched_mutex);
2591   bqe = unblockOneLocked(bqe, node);
2592   RELEASE_LOCK(&sched_mutex);
2593   return bqe;
2594 }
2595 #else
2596 inline StgTSO *
2597 unblockOne(StgTSO *tso)
2598 {
2599   ACQUIRE_LOCK(&sched_mutex);
2600   tso = unblockOneLocked(tso);
2601   RELEASE_LOCK(&sched_mutex);
2602   return tso;
2603 }
2604 #endif
2605
2606 #if defined(GRAN)
2607 void 
2608 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2609 {
2610   StgBlockingQueueElement *bqe;
2611   PEs node_loc;
2612   nat len = 0; 
2613
2614   IF_GRAN_DEBUG(bq, 
2615                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2616                       node, CurrentProc, CurrentTime[CurrentProc], 
2617                       CurrentTSO->id, CurrentTSO));
2618
2619   node_loc = where_is(node);
2620
2621   ASSERT(q == END_BQ_QUEUE ||
2622          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2623          get_itbl(q)->type == CONSTR); // closure (type constructor)
2624   ASSERT(is_unique(node));
2625
2626   /* FAKE FETCH: magically copy the node to the tso's proc;
2627      no Fetch necessary because in reality the node should not have been 
2628      moved to the other PE in the first place
2629   */
2630   if (CurrentProc!=node_loc) {
2631     IF_GRAN_DEBUG(bq, 
2632                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2633                         node, node_loc, CurrentProc, CurrentTSO->id, 
2634                         // CurrentTSO, where_is(CurrentTSO),
2635                         node->header.gran.procs));
2636     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2637     IF_GRAN_DEBUG(bq, 
2638                   belch("## new bitmask of node %p is %#x",
2639                         node, node->header.gran.procs));
2640     if (RtsFlags.GranFlags.GranSimStats.Global) {
2641       globalGranStats.tot_fake_fetches++;
2642     }
2643   }
2644
2645   bqe = q;
2646   // ToDo: check: ASSERT(CurrentProc==node_loc);
2647   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2648     //next = bqe->link;
2649     /* 
2650        bqe points to the current element in the queue
2651        next points to the next element in the queue
2652     */
2653     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2654     //tso_loc = where_is(tso);
2655     len++;
2656     bqe = unblockOneLocked(bqe, node);
2657   }
2658
2659   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2660      the closure to make room for the anchor of the BQ */
2661   if (bqe!=END_BQ_QUEUE) {
2662     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2663     /*
2664     ASSERT((info_ptr==&RBH_Save_0_info) ||
2665            (info_ptr==&RBH_Save_1_info) ||
2666            (info_ptr==&RBH_Save_2_info));
2667     */
2668     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2669     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2670     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2671
2672     IF_GRAN_DEBUG(bq,
2673                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2674                         node, info_type(node)));
2675   }
2676
2677   /* statistics gathering */
2678   if (RtsFlags.GranFlags.GranSimStats.Global) {
2679     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2680     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2681     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2682     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2683   }
2684   IF_GRAN_DEBUG(bq,
2685                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2686                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2687 }
2688 #elif defined(PAR)
2689 void 
2690 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2691 {
2692   StgBlockingQueueElement *bqe;
2693
2694   ACQUIRE_LOCK(&sched_mutex);
2695
2696   IF_PAR_DEBUG(verbose, 
2697                belch("##-_ AwBQ for node %p on [%x]: ",
2698                      node, mytid));
2699 #ifdef DIST  
2700   //RFP
2701   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2702     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2703     return;
2704   }
2705 #endif
2706   
2707   ASSERT(q == END_BQ_QUEUE ||
2708          get_itbl(q)->type == TSO ||           
2709          get_itbl(q)->type == BLOCKED_FETCH || 
2710          get_itbl(q)->type == CONSTR); 
2711
2712   bqe = q;
2713   while (get_itbl(bqe)->type==TSO || 
2714          get_itbl(bqe)->type==BLOCKED_FETCH) {
2715     bqe = unblockOneLocked(bqe, node);
2716   }
2717   RELEASE_LOCK(&sched_mutex);
2718 }
2719
2720 #else   /* !GRAN && !PAR */
2721 void
2722 awakenBlockedQueue(StgTSO *tso)
2723 {
2724   ACQUIRE_LOCK(&sched_mutex);
2725   while (tso != END_TSO_QUEUE) {
2726     tso = unblockOneLocked(tso);
2727   }
2728   RELEASE_LOCK(&sched_mutex);
2729 }
2730 #endif
2731
2732 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2733 //@subsection Exception Handling Routines
2734
2735 /* ---------------------------------------------------------------------------
2736    Interrupt execution
2737    - usually called inside a signal handler so it mustn't do anything fancy.   
2738    ------------------------------------------------------------------------ */
2739
2740 void
2741 interruptStgRts(void)
2742 {
2743     interrupted    = 1;
2744     context_switch = 1;
2745 }
2746
2747 /* -----------------------------------------------------------------------------
2748    Unblock a thread
2749
2750    This is for use when we raise an exception in another thread, which
2751    may be blocked.
2752    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2753    -------------------------------------------------------------------------- */
2754
2755 #if defined(GRAN) || defined(PAR)
2756 /*
2757   NB: only the type of the blocking queue is different in GranSim and GUM
2758       the operations on the queue-elements are the same
2759       long live polymorphism!
2760 */
2761 static void
2762 unblockThread(StgTSO *tso)
2763 {
2764   StgBlockingQueueElement *t, **last;
2765
2766   ACQUIRE_LOCK(&sched_mutex);
2767   switch (tso->why_blocked) {
2768
2769   case NotBlocked:
2770     return;  /* not blocked */
2771
2772   case BlockedOnMVar:
2773     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2774     {
2775       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2776       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2777
2778       last = (StgBlockingQueueElement **)&mvar->head;
2779       for (t = (StgBlockingQueueElement *)mvar->head; 
2780            t != END_BQ_QUEUE; 
2781            last = &t->link, last_tso = t, t = t->link) {
2782         if (t == (StgBlockingQueueElement *)tso) {
2783           *last = (StgBlockingQueueElement *)tso->link;
2784           if (mvar->tail == tso) {
2785             mvar->tail = (StgTSO *)last_tso;
2786           }
2787           goto done;
2788         }
2789       }
2790       barf("unblockThread (MVAR): TSO not found");
2791     }
2792
2793   case BlockedOnBlackHole:
2794     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2795     {
2796       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2797
2798       last = &bq->blocking_queue;
2799       for (t = bq->blocking_queue; 
2800            t != END_BQ_QUEUE; 
2801            last = &t->link, t = t->link) {
2802         if (t == (StgBlockingQueueElement *)tso) {
2803           *last = (StgBlockingQueueElement *)tso->link;
2804           goto done;
2805         }
2806       }
2807       barf("unblockThread (BLACKHOLE): TSO not found");
2808     }
2809
2810   case BlockedOnException:
2811     {
2812       StgTSO *target  = tso->block_info.tso;
2813
2814       ASSERT(get_itbl(target)->type == TSO);
2815
2816       if (target->what_next == ThreadRelocated) {
2817           target = target->link;
2818           ASSERT(get_itbl(target)->type == TSO);
2819       }
2820
2821       ASSERT(target->blocked_exceptions != NULL);
2822
2823       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2824       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2825            t != END_BQ_QUEUE; 
2826            last = &t->link, t = t->link) {
2827         ASSERT(get_itbl(t)->type == TSO);
2828         if (t == (StgBlockingQueueElement *)tso) {
2829           *last = (StgBlockingQueueElement *)tso->link;
2830           goto done;
2831         }
2832       }
2833       barf("unblockThread (Exception): TSO not found");
2834     }
2835
2836   case BlockedOnRead:
2837   case BlockedOnWrite:
2838     {
2839       /* take TSO off blocked_queue */
2840       StgBlockingQueueElement *prev = NULL;
2841       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2842            prev = t, t = t->link) {
2843         if (t == (StgBlockingQueueElement *)tso) {
2844           if (prev == NULL) {
2845             blocked_queue_hd = (StgTSO *)t->link;
2846             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2847               blocked_queue_tl = END_TSO_QUEUE;
2848             }
2849           } else {
2850             prev->link = t->link;
2851             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2852               blocked_queue_tl = (StgTSO *)prev;
2853             }
2854           }
2855           goto done;
2856         }
2857       }
2858       barf("unblockThread (I/O): TSO not found");
2859     }
2860
2861   case BlockedOnDelay:
2862     {
2863       /* take TSO off sleeping_queue */
2864       StgBlockingQueueElement *prev = NULL;
2865       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2866            prev = t, t = t->link) {
2867         if (t == (StgBlockingQueueElement *)tso) {
2868           if (prev == NULL) {
2869             sleeping_queue = (StgTSO *)t->link;
2870           } else {
2871             prev->link = t->link;
2872           }
2873           goto done;
2874         }
2875       }
2876       barf("unblockThread (I/O): TSO not found");
2877     }
2878
2879   default:
2880     barf("unblockThread");
2881   }
2882
2883  done:
2884   tso->link = END_TSO_QUEUE;
2885   tso->why_blocked = NotBlocked;
2886   tso->block_info.closure = NULL;
2887   PUSH_ON_RUN_QUEUE(tso);
2888   RELEASE_LOCK(&sched_mutex);
2889 }
2890 #else
2891 static void
2892 unblockThread(StgTSO *tso)
2893 {
2894   StgTSO *t, **last;
2895
2896   ACQUIRE_LOCK(&sched_mutex);
2897   switch (tso->why_blocked) {
2898
2899   case NotBlocked:
2900     return;  /* not blocked */
2901
2902   case BlockedOnMVar:
2903     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2904     {
2905       StgTSO *last_tso = END_TSO_QUEUE;
2906       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2907
2908       last = &mvar->head;
2909       for (t = mvar->head; t != END_TSO_QUEUE; 
2910            last = &t->link, last_tso = t, t = t->link) {
2911         if (t == tso) {
2912           *last = tso->link;
2913           if (mvar->tail == tso) {
2914             mvar->tail = last_tso;
2915           }
2916           goto done;
2917         }
2918       }
2919       barf("unblockThread (MVAR): TSO not found");
2920     }
2921
2922   case BlockedOnBlackHole:
2923     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2924     {
2925       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2926
2927       last = &bq->blocking_queue;
2928       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2929            last = &t->link, t = t->link) {
2930         if (t == tso) {
2931           *last = tso->link;
2932           goto done;
2933         }
2934       }
2935       barf("unblockThread (BLACKHOLE): TSO not found");
2936     }
2937
2938   case BlockedOnException:
2939     {
2940       StgTSO *target  = tso->block_info.tso;
2941
2942       ASSERT(get_itbl(target)->type == TSO);
2943
2944       while (target->what_next == ThreadRelocated) {
2945           target = target->link;
2946           ASSERT(get_itbl(target)->type == TSO);
2947       }
2948       
2949       ASSERT(target->blocked_exceptions != NULL);
2950
2951       last = &target->blocked_exceptions;
2952       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2953            last = &t->link, t = t->link) {
2954         ASSERT(get_itbl(t)->type == TSO);
2955         if (t == tso) {
2956           *last = tso->link;
2957           goto done;
2958         }
2959       }
2960       barf("unblockThread (Exception): TSO not found");
2961     }
2962
2963   case BlockedOnRead:
2964   case BlockedOnWrite:
2965     {
2966       StgTSO *prev = NULL;
2967       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2968            prev = t, t = t->link) {
2969         if (t == tso) {
2970           if (prev == NULL) {
2971             blocked_queue_hd = t->link;
2972             if (blocked_queue_tl == t) {
2973               blocked_queue_tl = END_TSO_QUEUE;
2974             }
2975           } else {
2976             prev->link = t->link;
2977             if (blocked_queue_tl == t) {
2978               blocked_queue_tl = prev;
2979             }
2980           }
2981           goto done;
2982         }
2983       }
2984       barf("unblockThread (I/O): TSO not found");
2985     }
2986
2987   case BlockedOnDelay:
2988     {
2989       StgTSO *prev = NULL;
2990       for (t = sleeping_queue; t != END_TSO_QUEUE; 
2991            prev = t, t = t->link) {
2992         if (t == tso) {
2993           if (prev == NULL) {
2994             sleeping_queue = t->link;
2995           } else {
2996             prev->link = t->link;
2997           }
2998           goto done;
2999         }
3000       }
3001       barf("unblockThread (I/O): TSO not found");
3002     }
3003
3004   default:
3005     barf("unblockThread");
3006   }
3007
3008  done:
3009   tso->link = END_TSO_QUEUE;
3010   tso->why_blocked = NotBlocked;
3011   tso->block_info.closure = NULL;
3012   PUSH_ON_RUN_QUEUE(tso);
3013   RELEASE_LOCK(&sched_mutex);
3014 }
3015 #endif
3016
3017 /* -----------------------------------------------------------------------------
3018  * raiseAsync()
3019  *
3020  * The following function implements the magic for raising an
3021  * asynchronous exception in an existing thread.
3022  *
3023  * We first remove the thread from any queue on which it might be
3024  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3025  *
3026  * We strip the stack down to the innermost CATCH_FRAME, building
3027  * thunks in the heap for all the active computations, so they can 
3028  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3029  * an application of the handler to the exception, and push it on
3030  * the top of the stack.
3031  * 
3032  * How exactly do we save all the active computations?  We create an
3033  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3034  * AP_UPDs pushes everything from the corresponding update frame
3035  * upwards onto the stack.  (Actually, it pushes everything up to the
3036  * next update frame plus a pointer to the next AP_UPD object.
3037  * Entering the next AP_UPD object pushes more onto the stack until we
3038  * reach the last AP_UPD object - at which point the stack should look
3039  * exactly as it did when we killed the TSO and we can continue
3040  * execution by entering the closure on top of the stack.
3041  *
3042  * We can also kill a thread entirely - this happens if either (a) the 
3043  * exception passed to raiseAsync is NULL, or (b) there's no
3044  * CATCH_FRAME on the stack.  In either case, we strip the entire
3045  * stack and replace the thread with a zombie.
3046  *
3047  * -------------------------------------------------------------------------- */
3048  
3049 void 
3050 deleteThread(StgTSO *tso)
3051 {
3052   raiseAsync(tso,NULL);
3053 }
3054
3055 void
3056 raiseAsync(StgTSO *tso, StgClosure *exception)
3057 {
3058   StgUpdateFrame* su = tso->su;
3059   StgPtr          sp = tso->sp;
3060   
3061   /* Thread already dead? */
3062   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3063     return;
3064   }
3065
3066   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3067
3068   /* Remove it from any blocking queues */
3069   unblockThread(tso);
3070
3071   /* The stack freezing code assumes there's a closure pointer on
3072    * the top of the stack.  This isn't always the case with compiled
3073    * code, so we have to push a dummy closure on the top which just
3074    * returns to the next return address on the stack.
3075    */
3076   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3077     *(--sp) = (W_)&stg_dummy_ret_closure;
3078   }
3079
3080   while (1) {
3081     nat words = ((P_)su - (P_)sp) - 1;
3082     nat i;
3083     StgAP_UPD * ap;
3084
3085     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3086      * then build PAP(handler,exception,realworld#), and leave it on
3087      * top of the stack ready to enter.
3088      */
3089     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3090       StgCatchFrame *cf = (StgCatchFrame *)su;
3091       /* we've got an exception to raise, so let's pass it to the
3092        * handler in this frame.
3093        */
3094       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3095       TICK_ALLOC_UPD_PAP(3,0);
3096       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3097               
3098       ap->n_args = 2;
3099       ap->fun = cf->handler;    /* :: Exception -> IO a */
3100       ap->payload[0] = exception;
3101       ap->payload[1] = ARG_TAG(0); /* realworld token */
3102
3103       /* throw away the stack from Sp up to and including the
3104        * CATCH_FRAME.
3105        */
3106       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3107       tso->su = cf->link;
3108
3109       /* Restore the blocked/unblocked state for asynchronous exceptions
3110        * at the CATCH_FRAME.  
3111        *
3112        * If exceptions were unblocked at the catch, arrange that they
3113        * are unblocked again after executing the handler by pushing an
3114        * unblockAsyncExceptions_ret stack frame.
3115        */
3116       if (!cf->exceptions_blocked) {
3117         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3118       }
3119       
3120       /* Ensure that async exceptions are blocked when running the handler.
3121        */
3122       if (tso->blocked_exceptions == NULL) {
3123         tso->blocked_exceptions = END_TSO_QUEUE;
3124       }
3125       
3126       /* Put the newly-built PAP on top of the stack, ready to execute
3127        * when the thread restarts.
3128        */
3129       sp[0] = (W_)ap;
3130       tso->sp = sp;
3131       tso->what_next = ThreadEnterGHC;
3132       IF_DEBUG(sanity, checkTSO(tso));
3133       return;
3134     }
3135
3136     /* First build an AP_UPD consisting of the stack chunk above the
3137      * current update frame, with the top word on the stack as the
3138      * fun field.
3139      */
3140     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3141     
3142     ASSERT(words >= 0);
3143     
3144     ap->n_args = words;
3145     ap->fun    = (StgClosure *)sp[0];
3146     sp++;
3147     for(i=0; i < (nat)words; ++i) {
3148       ap->payload[i] = (StgClosure *)*sp++;
3149     }
3150     
3151     switch (get_itbl(su)->type) {
3152       
3153     case UPDATE_FRAME:
3154       {
3155         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3156         TICK_ALLOC_UP_THK(words+1,0);
3157         
3158         IF_DEBUG(scheduler,
3159                  fprintf(stderr,  "scheduler: Updating ");
3160                  printPtr((P_)su->updatee); 
3161                  fprintf(stderr,  " with ");
3162                  printObj((StgClosure *)ap);
3163                  );
3164         
3165         /* Replace the updatee with an indirection - happily
3166          * this will also wake up any threads currently
3167          * waiting on the result.
3168          *
3169          * Warning: if we're in a loop, more than one update frame on
3170          * the stack may point to the same object.  Be careful not to
3171          * overwrite an IND_OLDGEN in this case, because we'll screw
3172          * up the mutable lists.  To be on the safe side, don't
3173          * overwrite any kind of indirection at all.  See also
3174          * threadSqueezeStack in GC.c, where we have to make a similar
3175          * check.
3176          */
3177         if (!closure_IND(su->updatee)) {
3178             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3179         }
3180         su = su->link;
3181         sp += sizeofW(StgUpdateFrame) -1;
3182         sp[0] = (W_)ap; /* push onto stack */
3183         break;
3184       }
3185
3186     case CATCH_FRAME:
3187       {
3188         StgCatchFrame *cf = (StgCatchFrame *)su;
3189         StgClosure* o;
3190         
3191         /* We want a PAP, not an AP_UPD.  Fortunately, the
3192          * layout's the same.
3193          */
3194         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3195         TICK_ALLOC_UPD_PAP(words+1,0);
3196         
3197         /* now build o = FUN(catch,ap,handler) */
3198         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3199         TICK_ALLOC_FUN(2,0);
3200         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3201         o->payload[0] = (StgClosure *)ap;
3202         o->payload[1] = cf->handler;
3203         
3204         IF_DEBUG(scheduler,
3205                  fprintf(stderr,  "scheduler: Built ");
3206                  printObj((StgClosure *)o);
3207                  );
3208         
3209         /* pop the old handler and put o on the stack */
3210         su = cf->link;
3211         sp += sizeofW(StgCatchFrame) - 1;
3212         sp[0] = (W_)o;
3213         break;
3214       }
3215       
3216     case SEQ_FRAME:
3217       {
3218         StgSeqFrame *sf = (StgSeqFrame *)su;
3219         StgClosure* o;
3220         
3221         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3222         TICK_ALLOC_UPD_PAP(words+1,0);
3223         
3224         /* now build o = FUN(seq,ap) */
3225         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3226         TICK_ALLOC_SE_THK(1,0);
3227         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3228         o->payload[0] = (StgClosure *)ap;
3229         
3230         IF_DEBUG(scheduler,
3231                  fprintf(stderr,  "scheduler: Built ");
3232                  printObj((StgClosure *)o);
3233                  );
3234         
3235         /* pop the old handler and put o on the stack */
3236         su = sf->link;
3237         sp += sizeofW(StgSeqFrame) - 1;
3238         sp[0] = (W_)o;
3239         break;
3240       }
3241       
3242     case STOP_FRAME:
3243       /* We've stripped the entire stack, the thread is now dead. */
3244       sp += sizeofW(StgStopFrame) - 1;
3245       sp[0] = (W_)exception;    /* save the exception */
3246       tso->what_next = ThreadKilled;
3247       tso->su = (StgUpdateFrame *)(sp+1);
3248       tso->sp = sp;
3249       return;
3250
3251     default:
3252       barf("raiseAsync");
3253     }
3254   }
3255   barf("raiseAsync");
3256 }
3257
3258 /* -----------------------------------------------------------------------------
3259    resurrectThreads is called after garbage collection on the list of
3260    threads found to be garbage.  Each of these threads will be woken
3261    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3262    on an MVar, or NonTermination if the thread was blocked on a Black
3263    Hole.
3264    -------------------------------------------------------------------------- */
3265
3266 void
3267 resurrectThreads( StgTSO *threads )
3268 {
3269   StgTSO *tso, *next;
3270
3271   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3272     next = tso->global_link;
3273     tso->global_link = all_threads;
3274     all_threads = tso;
3275     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3276
3277     switch (tso->why_blocked) {
3278     case BlockedOnMVar:
3279     case BlockedOnException:
3280       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3281       break;
3282     case BlockedOnBlackHole:
3283       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3284       break;
3285     case NotBlocked:
3286       /* This might happen if the thread was blocked on a black hole
3287        * belonging to a thread that we've just woken up (raiseAsync
3288        * can wake up threads, remember...).
3289        */
3290       continue;
3291     default:
3292       barf("resurrectThreads: thread blocked in a strange way");
3293     }
3294   }
3295 }
3296
3297 /* -----------------------------------------------------------------------------
3298  * Blackhole detection: if we reach a deadlock, test whether any
3299  * threads are blocked on themselves.  Any threads which are found to
3300  * be self-blocked get sent a NonTermination exception.
3301  *
3302  * This is only done in a deadlock situation in order to avoid
3303  * performance overhead in the normal case.
3304  * -------------------------------------------------------------------------- */
3305
3306 static void
3307 detectBlackHoles( void )
3308 {
3309     StgTSO *t = all_threads;
3310     StgUpdateFrame *frame;
3311     StgClosure *blocked_on;
3312
3313     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3314
3315         while (t->what_next == ThreadRelocated) {
3316             t = t->link;
3317             ASSERT(get_itbl(t)->type == TSO);
3318         }
3319       
3320         if (t->why_blocked != BlockedOnBlackHole) {
3321             continue;
3322         }
3323
3324         blocked_on = t->block_info.closure;
3325
3326         for (frame = t->su; ; frame = frame->link) {
3327             switch (get_itbl(frame)->type) {
3328
3329             case UPDATE_FRAME:
3330                 if (frame->updatee == blocked_on) {
3331                     /* We are blocking on one of our own computations, so
3332                      * send this thread the NonTermination exception.  
3333                      */
3334                     IF_DEBUG(scheduler, 
3335                              sched_belch("thread %d is blocked on itself", t->id));
3336                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3337                     goto done;
3338                 }
3339                 else {
3340                     continue;
3341                 }
3342
3343             case CATCH_FRAME:
3344             case SEQ_FRAME:
3345                 continue;
3346                 
3347             case STOP_FRAME:
3348                 break;
3349             }
3350             break;
3351         }
3352
3353     done: ;
3354     }   
3355 }
3356
3357 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3358 //@subsection Debugging Routines
3359
3360 /* -----------------------------------------------------------------------------
3361    Debugging: why is a thread blocked
3362    -------------------------------------------------------------------------- */
3363
3364 #ifdef DEBUG
3365
3366 void
3367 printThreadBlockage(StgTSO *tso)
3368 {
3369   switch (tso->why_blocked) {
3370   case BlockedOnRead:
3371     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3372     break;
3373   case BlockedOnWrite:
3374     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3375     break;
3376   case BlockedOnDelay:
3377     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3378     break;
3379   case BlockedOnMVar:
3380     fprintf(stderr,"is blocked on an MVar");
3381     break;
3382   case BlockedOnException:
3383     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3384             tso->block_info.tso->id);
3385     break;
3386   case BlockedOnBlackHole:
3387     fprintf(stderr,"is blocked on a black hole");
3388     break;
3389   case NotBlocked:
3390     fprintf(stderr,"is not blocked");
3391     break;
3392 #if defined(PAR)
3393   case BlockedOnGA:
3394     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3395             tso->block_info.closure, info_type(tso->block_info.closure));
3396     break;
3397   case BlockedOnGA_NoSend:
3398     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3399             tso->block_info.closure, info_type(tso->block_info.closure));
3400     break;
3401 #endif
3402   default:
3403     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3404          tso->why_blocked, tso->id, tso);
3405   }
3406 }
3407
3408 void
3409 printThreadStatus(StgTSO *tso)
3410 {
3411   switch (tso->what_next) {
3412   case ThreadKilled:
3413     fprintf(stderr,"has been killed");
3414     break;
3415   case ThreadComplete:
3416     fprintf(stderr,"has completed");
3417     break;
3418   default:
3419     printThreadBlockage(tso);
3420   }
3421 }
3422
3423 void
3424 printAllThreads(void)
3425 {
3426   StgTSO *t;
3427
3428 # if defined(GRAN)
3429   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3430   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3431                        time_string, rtsFalse/*no commas!*/);
3432
3433   sched_belch("all threads at [%s]:", time_string);
3434 # elif defined(PAR)
3435   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3436   ullong_format_string(CURRENT_TIME,
3437                        time_string, rtsFalse/*no commas!*/);
3438
3439   sched_belch("all threads at [%s]:", time_string);
3440 # else
3441   sched_belch("all threads:");
3442 # endif
3443
3444   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3445     fprintf(stderr, "\tthread %d ", t->id);
3446     printThreadStatus(t);
3447     fprintf(stderr,"\n");
3448   }
3449 }
3450     
3451 /* 
3452    Print a whole blocking queue attached to node (debugging only).
3453 */
3454 //@cindex print_bq
3455 # if defined(PAR)
3456 void 
3457 print_bq (StgClosure *node)
3458 {
3459   StgBlockingQueueElement *bqe;
3460   StgTSO *tso;
3461   rtsBool end;
3462
3463   fprintf(stderr,"## BQ of closure %p (%s): ",
3464           node, info_type(node));
3465
3466   /* should cover all closures that may have a blocking queue */
3467   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3468          get_itbl(node)->type == FETCH_ME_BQ ||
3469          get_itbl(node)->type == RBH ||
3470          get_itbl(node)->type == MVAR);
3471     
3472   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3473
3474   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3475 }
3476
3477 /* 
3478    Print a whole blocking queue starting with the element bqe.
3479 */
3480 void 
3481 print_bqe (StgBlockingQueueElement *bqe)
3482 {
3483   rtsBool end;
3484
3485   /* 
3486      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3487   */
3488   for (end = (bqe==END_BQ_QUEUE);
3489        !end; // iterate until bqe points to a CONSTR
3490        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3491        bqe = end ? END_BQ_QUEUE : bqe->link) {
3492     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3493     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3494     /* types of closures that may appear in a blocking queue */
3495     ASSERT(get_itbl(bqe)->type == TSO ||           
3496            get_itbl(bqe)->type == BLOCKED_FETCH || 
3497            get_itbl(bqe)->type == CONSTR); 
3498     /* only BQs of an RBH end with an RBH_Save closure */
3499     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3500
3501     switch (get_itbl(bqe)->type) {
3502     case TSO:
3503       fprintf(stderr," TSO %u (%x),",
3504               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3505       break;
3506     case BLOCKED_FETCH:
3507       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3508               ((StgBlockedFetch *)bqe)->node, 
3509               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3510               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3511               ((StgBlockedFetch *)bqe)->ga.weight);
3512       break;
3513     case CONSTR:
3514       fprintf(stderr," %s (IP %p),",
3515               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3516                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3517                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3518                "RBH_Save_?"), get_itbl(bqe));
3519       break;
3520     default:
3521       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3522            info_type((StgClosure *)bqe)); // , node, info_type(node));
3523       break;
3524     }
3525   } /* for */
3526   fputc('\n', stderr);
3527 }
3528 # elif defined(GRAN)
3529 void 
3530 print_bq (StgClosure *node)
3531 {
3532   StgBlockingQueueElement *bqe;
3533   PEs node_loc, tso_loc;
3534   rtsBool end;
3535
3536   /* should cover all closures that may have a blocking queue */
3537   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3538          get_itbl(node)->type == FETCH_ME_BQ ||
3539          get_itbl(node)->type == RBH);
3540     
3541   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3542   node_loc = where_is(node);
3543
3544   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3545           node, info_type(node), node_loc);
3546
3547   /* 
3548      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3549   */
3550   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3551        !end; // iterate until bqe points to a CONSTR
3552        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3553     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3554     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3555     /* types of closures that may appear in a blocking queue */
3556     ASSERT(get_itbl(bqe)->type == TSO ||           
3557            get_itbl(bqe)->type == CONSTR); 
3558     /* only BQs of an RBH end with an RBH_Save closure */
3559     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3560
3561     tso_loc = where_is((StgClosure *)bqe);
3562     switch (get_itbl(bqe)->type) {
3563     case TSO:
3564       fprintf(stderr," TSO %d (%p) on [PE %d],",
3565               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3566       break;
3567     case CONSTR:
3568       fprintf(stderr," %s (IP %p),",
3569               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3570                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3571                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3572                "RBH_Save_?"), get_itbl(bqe));
3573       break;
3574     default:
3575       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3576            info_type((StgClosure *)bqe), node, info_type(node));
3577       break;
3578     }
3579   } /* for */
3580   fputc('\n', stderr);
3581 }
3582 #else
3583 /* 
3584    Nice and easy: only TSOs on the blocking queue
3585 */
3586 void 
3587 print_bq (StgClosure *node)
3588 {
3589   StgTSO *tso;
3590
3591   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3592   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3593        tso != END_TSO_QUEUE; 
3594        tso=tso->link) {
3595     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3596     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3597     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3598   }
3599   fputc('\n', stderr);
3600 }
3601 # endif
3602
3603 #if defined(PAR)
3604 static nat
3605 run_queue_len(void)
3606 {
3607   nat i;
3608   StgTSO *tso;
3609
3610   for (i=0, tso=run_queue_hd; 
3611        tso != END_TSO_QUEUE;
3612        i++, tso=tso->link)
3613     /* nothing */
3614
3615   return i;
3616 }
3617 #endif
3618
3619 static void
3620 sched_belch(char *s, ...)
3621 {
3622   va_list ap;
3623   va_start(ap,s);
3624 #ifdef SMP
3625   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3626 #elif defined(PAR)
3627   fprintf(stderr, "== ");
3628 #else
3629   fprintf(stderr, "scheduler: ");
3630 #endif
3631   vfprintf(stderr, s, ap);
3632   fprintf(stderr, "\n");
3633 }
3634
3635 #endif /* DEBUG */
3636
3637
3638 //@node Index,  , Debugging Routines, Main scheduling code
3639 //@subsection Index
3640
3641 //@index
3642 //* MainRegTable::  @cindex\s-+MainRegTable
3643 //* StgMainThread::  @cindex\s-+StgMainThread
3644 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3645 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3646 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3647 //* context_switch::  @cindex\s-+context_switch
3648 //* createThread::  @cindex\s-+createThread
3649 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3650 //* initScheduler::  @cindex\s-+initScheduler
3651 //* interrupted::  @cindex\s-+interrupted
3652 //* next_thread_id::  @cindex\s-+next_thread_id
3653 //* print_bq::  @cindex\s-+print_bq
3654 //* run_queue_hd::  @cindex\s-+run_queue_hd
3655 //* run_queue_tl::  @cindex\s-+run_queue_tl
3656 //* sched_mutex::  @cindex\s-+sched_mutex
3657 //* schedule::  @cindex\s-+schedule
3658 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3659 //* term_mutex::  @cindex\s-+term_mutex
3660 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3661 //@end index