[project @ 2002-02-06 01:29:27 by sof]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  * $Id: Schedule.c,v 1.118 2002/02/06 01:29:27 sof Exp $
3  *
4  * (c) The GHC Team, 1998-2000
5  *
6  * Scheduler
7  *
8  * Different GHC ways use this scheduler quite differently (see comments below)
9  * Here is the global picture:
10  *
11  * WAY  Name     CPP flag  What's it for
12  * --------------------------------------
13  * mp   GUM      PAR          Parallel execution on a distributed memory machine
14  * s    SMP      SMP          Parallel execution on a shared memory machine
15  * mg   GranSim  GRAN         Simulation of parallel execution
16  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
17  *
18  * --------------------------------------------------------------------------*/
19
20 //@node Main scheduling code, , ,
21 //@section Main scheduling code
22
23 /* 
24  * Version with scheduler monitor support for SMPs (WAY=s):
25
26    This design provides a high-level API to create and schedule threads etc.
27    as documented in the SMP design document.
28
29    It uses a monitor design controlled by a single mutex to exercise control
30    over accesses to shared data structures, and builds on the Posix threads
31    library.
32
33    The majority of state is shared.  In order to keep essential per-task state,
34    there is a Capability structure, which contains all the information
35    needed to run a thread: its STG registers, a pointer to its TSO, a
36    nursery etc.  During STG execution, a pointer to the capability is
37    kept in a register (BaseReg).
38
39    In a non-SMP build, there is one global capability, namely MainRegTable.
40
41    SDM & KH, 10/99
42
43  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
44
45    The main scheduling loop in GUM iterates until a finish message is received.
46    In that case a global flag @receivedFinish@ is set and this instance of
47    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
48    for the handling of incoming messages, such as PP_FINISH.
49    Note that in the parallel case we have a system manager that coordinates
50    different PEs, each of which are running one instance of the RTS.
51    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
52    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
53
54  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
55
56    The main scheduling code in GranSim is quite different from that in std
57    (concurrent) Haskell: while concurrent Haskell just iterates over the
58    threads in the runnable queue, GranSim is event driven, i.e. it iterates
59    over the events in the global event queue.  -- HWL
60 */
61
62 //@menu
63 //* Includes::                  
64 //* Variables and Data structures::  
65 //* Main scheduling loop::      
66 //* Suspend and Resume::        
67 //* Run queue code::            
68 //* Garbage Collextion Routines::  
69 //* Blocking Queue Routines::   
70 //* Exception Handling Routines::  
71 //* Debugging Routines::        
72 //* Index::                     
73 //@end menu
74
75 //@node Includes, Variables and Data structures, Main scheduling code, Main scheduling code
76 //@subsection Includes
77
78 #include "PosixSource.h"
79 #include "Rts.h"
80 #include "SchedAPI.h"
81 #include "RtsUtils.h"
82 #include "RtsFlags.h"
83 #include "Storage.h"
84 #include "StgRun.h"
85 #include "StgStartup.h"
86 #include "Hooks.h"
87 #include "Schedule.h"
88 #include "StgMiscClosures.h"
89 #include "Storage.h"
90 #include "Interpreter.h"
91 #include "Exception.h"
92 #include "Printer.h"
93 #include "Main.h"
94 #include "Signals.h"
95 #include "Sanity.h"
96 #include "Stats.h"
97 #include "Itimer.h"
98 #include "Prelude.h"
99 #ifdef PROFILING
100 #include "Proftimer.h"
101 #include "ProfHeap.h"
102 #endif
103 #if defined(GRAN) || defined(PAR)
104 # include "GranSimRts.h"
105 # include "GranSim.h"
106 # include "ParallelRts.h"
107 # include "Parallel.h"
108 # include "ParallelDebug.h"
109 # include "FetchMe.h"
110 # include "HLC.h"
111 #endif
112 #include "Sparks.h"
113 #include "Capability.h"
114 #include "OSThreads.h"
115 #include  "Task.h"
116
117 #include <stdarg.h>
118
119 //@node Variables and Data structures, Prototypes, Includes, Main scheduling code
120 //@subsection Variables and Data structures
121
122 /* Main threads:
123  *
124  * These are the threads which clients have requested that we run.  
125  *
126  * In a 'threaded' build, we might have several concurrent clients all
127  * waiting for results, and each one will wait on a condition variable
128  * until the result is available.
129  *
130  * In non-SMP, clients are strictly nested: the first client calls
131  * into the RTS, which might call out again to C with a _ccall_GC, and
132  * eventually re-enter the RTS.
133  *
134  * Main threads information is kept in a linked list:
135  */
136 //@cindex StgMainThread
137 typedef struct StgMainThread_ {
138   StgTSO *         tso;
139   SchedulerStatus  stat;
140   StgClosure **    ret;
141 #if defined(RTS_SUPPORTS_THREADS)
142   Condition        wakeup;
143 #endif
144   struct StgMainThread_ *link;
145 } StgMainThread;
146
147 /* Main thread queue.
148  * Locks required: sched_mutex.
149  */
150 static StgMainThread *main_threads;
151
152 /* Thread queues.
153  * Locks required: sched_mutex.
154  */
155 #if defined(GRAN)
156
157 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
158 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
159
160 /* 
161    In GranSim we have a runable and a blocked queue for each processor.
162    In order to minimise code changes new arrays run_queue_hds/tls
163    are created. run_queue_hd is then a short cut (macro) for
164    run_queue_hds[CurrentProc] (see GranSim.h).
165    -- HWL
166 */
167 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
168 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
169 StgTSO *ccalling_threadss[MAX_PROC];
170 /* We use the same global list of threads (all_threads) in GranSim as in
171    the std RTS (i.e. we are cheating). However, we don't use this list in
172    the GranSim specific code at the moment (so we are only potentially
173    cheating).  */
174
175 #else /* !GRAN */
176
177 StgTSO *run_queue_hd, *run_queue_tl;
178 StgTSO *blocked_queue_hd, *blocked_queue_tl;
179 StgTSO *sleeping_queue;         /* perhaps replace with a hash table? */
180
181 #endif
182
183 /* Linked list of all threads.
184  * Used for detecting garbage collected threads.
185  */
186 StgTSO *all_threads;
187
188 /* Threads suspended in _ccall_GC.
189  */
190 static StgTSO *suspended_ccalling_threads;
191
192 static StgTSO *threadStackOverflow(StgTSO *tso);
193
194 /* KH: The following two flags are shared memory locations.  There is no need
195        to lock them, since they are only unset at the end of a scheduler
196        operation.
197 */
198
199 /* flag set by signal handler to precipitate a context switch */
200 //@cindex context_switch
201 nat context_switch;
202
203 /* if this flag is set as well, give up execution */
204 //@cindex interrupted
205 rtsBool interrupted;
206
207 /* Next thread ID to allocate.
208  * Locks required: sched_mutex
209  */
210 //@cindex next_thread_id
211 StgThreadID next_thread_id = 1;
212
213 /*
214  * Pointers to the state of the current thread.
215  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
216  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
217  */
218  
219 /* The smallest stack size that makes any sense is:
220  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
221  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
222  *  + 1                       (the realworld token for an IO thread)
223  *  + 1                       (the closure to enter)
224  *
225  * A thread with this stack will bomb immediately with a stack
226  * overflow, which will increase its stack size.  
227  */
228
229 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2)
230
231
232 #if defined(GRAN)
233 StgTSO *CurrentTSO;
234 #endif
235
236 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
237  *  exists - earlier gccs apparently didn't.
238  *  -= chak
239  */
240 StgTSO dummy_tso;
241
242 rtsBool ready_to_gc;
243
244 void            addToBlockedQueue ( StgTSO *tso );
245
246 static void     schedule          ( void );
247        void     interruptStgRts   ( void );
248 #if defined(GRAN)
249 static StgTSO * createThread_     ( nat size, rtsBool have_lock, StgInt pri );
250 #else
251 static StgTSO * createThread_     ( nat size, rtsBool have_lock );
252 #endif
253
254 static void     detectBlackHoles  ( void );
255
256 #ifdef DEBUG
257 static void sched_belch(char *s, ...);
258 #endif
259
260 #if defined(RTS_SUPPORTS_THREADS)
261 /* ToDo: carefully document the invariants that go together
262  *       with these synchronisation objects.
263  */
264 Mutex     sched_mutex       = INIT_MUTEX_VAR;
265 Mutex     term_mutex        = INIT_MUTEX_VAR;
266 #if defined(THREADED_RTS)
267 /*
268  * The rts_mutex is the 'big lock' that the active native
269  * thread within the RTS holds while executing code.
270  * It is given up when the thread makes a transition out of
271  * the RTS (e.g., to perform an external C call), hopefully
272  * for another thread to take over its chores and enter
273  * the RTS.
274  *
275  */
276 Mutex     rts_mutex         = INIT_MUTEX_VAR;
277 /*
278  * When a native thread has completed executing an external
279  * call, it needs to communicate the result back to the
280  * (Haskell) thread that made the call. Do this as follows:
281  *
282  *  - in resumeThread(), the thread increments the counter
283  *    threads_waiting, and then blocks on the 'big' RTS lock.
284  *  - upon entry to the scheduler, the thread that's currently
285  *    holding the RTS lock checks threads_waiting. If there
286  *    are native threads waiting, it gives up its RTS lock
287  *    and tries to re-grab the RTS lock [perhaps after having
288  *    waited for a bit..?]
289  *  - care must be taken to deal with the case where more than
290  *    one external thread are waiting on the lock. [ToDo: more]
291  *    
292  */
293
294 static nat threads_waiting = 0;
295 /*
296  * thread_ready_aux_mutex is used to handle the scenario where the
297  * the RTS executing thread runs out of work, but there are
298  * active external threads. The RTS executing thread gives up
299  * its RTS mutex, and blocks waiting for the thread_ready_cond.
300  * Unfortunately, a condition variable needs to be associated
301  * with a mutex in pthreads, so rts_thread_waiting_mutex is
302  * used for just this purpose.
303  * 
304  */
305 Mutex  thread_ready_aux_mutex = INIT_MUTEX_VAR;
306 #endif
307
308
309 /* thread_ready_cond: when signalled, a thread has
310  * become runnable. When used?
311  */
312 Condition thread_ready_cond = INIT_COND_VAR;
313 Condition gc_pending_cond   = INIT_COND_VAR;
314
315 nat await_death;
316 #endif
317
318 #if defined(PAR)
319 StgTSO *LastTSO;
320 rtsTime TimeOfLastYield;
321 rtsBool emitSchedule = rtsTrue;
322 #endif
323
324 #if DEBUG
325 char *whatNext_strs[] = {
326   "ThreadEnterGHC",
327   "ThreadRunGHC",
328   "ThreadEnterInterp",
329   "ThreadKilled",
330   "ThreadComplete"
331 };
332
333 char *threadReturnCode_strs[] = {
334   "HeapOverflow",                       /* might also be StackOverflow */
335   "StackOverflow",
336   "ThreadYielding",
337   "ThreadBlocked",
338   "ThreadFinished"
339 };
340 #endif
341
342 #if defined(PAR)
343 StgTSO * createSparkThread(rtsSpark spark);
344 StgTSO * activateSpark (rtsSpark spark);  
345 #endif
346
347 /*
348  * The thread state for the main thread.
349 // ToDo: check whether not needed any more
350 StgTSO   *MainTSO;
351  */
352
353 #if defined(PAR) || defined(RTS_SUPPORTS_THREADS)
354 static void taskStart(void);
355 static void
356 taskStart(void)
357 {
358   /* threads start up using 'taskStart', so make them
359      them grab the RTS lock. */
360 #if defined(THREADED_RTS)
361   ACQUIRE_LOCK(&rts_mutex);
362   taskNotAvailable();
363 #endif
364   schedule();
365 }
366 #endif
367
368
369
370
371 //@node Main scheduling loop, Suspend and Resume, Prototypes, Main scheduling code
372 //@subsection Main scheduling loop
373
374 /* ---------------------------------------------------------------------------
375    Main scheduling loop.
376
377    We use round-robin scheduling, each thread returning to the
378    scheduler loop when one of these conditions is detected:
379
380       * out of heap space
381       * timer expires (thread yields)
382       * thread blocks
383       * thread ends
384       * stack overflow
385
386    Locking notes:  we acquire the scheduler lock once at the beginning
387    of the scheduler loop, and release it when
388     
389       * running a thread, or
390       * waiting for work, or
391       * waiting for a GC to complete.
392
393    GRAN version:
394      In a GranSim setup this loop iterates over the global event queue.
395      This revolves around the global event queue, which determines what 
396      to do next. Therefore, it's more complicated than either the 
397      concurrent or the parallel (GUM) setup.
398
399    GUM version:
400      GUM iterates over incoming messages.
401      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
402      and sends out a fish whenever it has nothing to do; in-between
403      doing the actual reductions (shared code below) it processes the
404      incoming messages and deals with delayed operations 
405      (see PendingFetches).
406      This is not the ugliest code you could imagine, but it's bloody close.
407
408    ------------------------------------------------------------------------ */
409 //@cindex schedule
410 static void
411 schedule( void )
412 {
413   StgTSO *t;
414   Capability *cap;
415   StgThreadReturnCode ret;
416 #if defined(GRAN)
417   rtsEvent *event;
418 #elif defined(PAR)
419   StgSparkPool *pool;
420   rtsSpark spark;
421   StgTSO *tso;
422   GlobalTaskId pe;
423   rtsBool receivedFinish = rtsFalse;
424 # if defined(DEBUG)
425   nat tp_size, sp_size; // stats only
426 # endif
427 #endif
428   rtsBool was_interrupted = rtsFalse;
429   
430   ACQUIRE_LOCK(&sched_mutex);
431   
432 #if defined(THREADED_RTS)
433   /* ToDo: consider SMP support */
434   if (threads_waiting > 0) {
435     /* (At least) one native thread is waiting to
436      * deposit the result of an external call. So,
437      * give up our RTS executing privileges and let
438      * one of them continue.
439      * 
440      */
441     taskAvailable();
442     RELEASE_LOCK(&sched_mutex);
443     IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (threads_waiting=%d)\n", osThreadId(), threads_waiting));
444     RELEASE_LOCK(&rts_mutex);
445     /* ToDo: come up with mechanism that guarantees that
446      * the main thread doesn't loop here.
447      */
448     yieldThread();
449     /* ToDo: longjmp() */
450     taskStart();
451   }
452 #endif
453
454 #if defined(GRAN)
455
456   /* set up first event to get things going */
457   /* ToDo: assign costs for system setup and init MainTSO ! */
458   new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
459             ContinueThread, 
460             CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
461
462   IF_DEBUG(gran,
463            fprintf(stderr, "GRAN: Init CurrentTSO (in schedule) = %p\n", CurrentTSO);
464            G_TSO(CurrentTSO, 5));
465
466   if (RtsFlags.GranFlags.Light) {
467     /* Save current time; GranSim Light only */
468     CurrentTSO->gran.clock = CurrentTime[CurrentProc];
469   }      
470
471   event = get_next_event();
472
473   while (event!=(rtsEvent*)NULL) {
474     /* Choose the processor with the next event */
475     CurrentProc = event->proc;
476     CurrentTSO = event->tso;
477
478 #elif defined(PAR)
479
480   while (!receivedFinish) {    /* set by processMessages */
481                                /* when receiving PP_FINISH message         */ 
482 #else
483
484   while (1) {
485
486 #endif
487
488     IF_DEBUG(scheduler, printAllThreads());
489
490     /* If we're interrupted (the user pressed ^C, or some other
491      * termination condition occurred), kill all the currently running
492      * threads.
493      */
494     if (interrupted) {
495       IF_DEBUG(scheduler, sched_belch("interrupted"));
496       deleteAllThreads();
497       interrupted = rtsFalse;
498       was_interrupted = rtsTrue;
499     }
500
501     /* Go through the list of main threads and wake up any
502      * clients whose computations have finished.  ToDo: this
503      * should be done more efficiently without a linear scan
504      * of the main threads list, somehow...
505      */
506 #if defined(RTS_SUPPORTS_THREADS)
507     { 
508       StgMainThread *m, **prev;
509       prev = &main_threads;
510       for (m = main_threads; m != NULL; m = m->link) {
511         switch (m->tso->what_next) {
512         case ThreadComplete:
513           if (m->ret) {
514             *(m->ret) = (StgClosure *)m->tso->sp[0];
515           }
516           *prev = m->link;
517           m->stat = Success;
518           broadcastCondition(&m->wakeup);
519           break;
520         case ThreadKilled:
521           if (m->ret) *(m->ret) = NULL;
522           *prev = m->link;
523           if (was_interrupted) {
524             m->stat = Interrupted;
525           } else {
526             m->stat = Killed;
527           }
528           broadcastCondition(&m->wakeup);
529           break;
530         default:
531           break;
532         }
533       }
534     }
535
536 #else /* not threaded */
537
538 # if defined(PAR)
539     /* in GUM do this only on the Main PE */
540     if (IAmMainThread)
541 # endif
542     /* If our main thread has finished or been killed, return.
543      */
544     {
545       StgMainThread *m = main_threads;
546       if (m->tso->what_next == ThreadComplete
547           || m->tso->what_next == ThreadKilled) {
548         main_threads = main_threads->link;
549         if (m->tso->what_next == ThreadComplete) {
550           /* we finished successfully, fill in the return value */
551           if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; };
552           m->stat = Success;
553           return;
554         } else {
555           if (m->ret) { *(m->ret) = NULL; };
556           if (was_interrupted) {
557             m->stat = Interrupted;
558           } else {
559             m->stat = Killed;
560           }
561           return;
562         }
563       }
564     }
565 #endif
566
567     /* Top up the run queue from our spark pool.  We try to make the
568      * number of threads in the run queue equal to the number of
569      * free capabilities.
570      *
571      * Disable spark support in SMP for now, non-essential & requires
572      * a little bit of work to make it compile cleanly. -- sof 1/02.
573      */
574 #if 0 /* defined(SMP) */
575     {
576       nat n = getFreeCapabilities();
577       StgTSO *tso = run_queue_hd;
578
579       /* Count the run queue */
580       while (n > 0 && tso != END_TSO_QUEUE) {
581         tso = tso->link;
582         n--;
583       }
584
585       for (; n > 0; n--) {
586         StgClosure *spark;
587         spark = findSpark(rtsFalse);
588         if (spark == NULL) {
589           break; /* no more sparks in the pool */
590         } else {
591           /* I'd prefer this to be done in activateSpark -- HWL */
592           /* tricky - it needs to hold the scheduler lock and
593            * not try to re-acquire it -- SDM */
594           createSparkThread(spark);       
595           IF_DEBUG(scheduler,
596                    sched_belch("==^^ turning spark of closure %p into a thread",
597                                (StgClosure *)spark));
598         }
599       }
600       /* We need to wake up the other tasks if we just created some
601        * work for them.
602        */
603       if (getFreeCapabilities() - n > 1) {
604           signalCondition( &thread_ready_cond );
605       }
606     }
607 #endif // SMP
608
609     /* check for signals each time around the scheduler */
610 #ifndef mingw32_TARGET_OS
611     if (signals_pending()) {
612       startSignalHandlers();
613     }
614 #endif
615
616     /* Check whether any waiting threads need to be woken up.  If the
617      * run queue is empty, and there are no other tasks running, we
618      * can wait indefinitely for something to happen.
619      * ToDo: what if another client comes along & requests another
620      * main thread?
621      */
622     if (blocked_queue_hd != END_TSO_QUEUE || sleeping_queue != END_TSO_QUEUE) {
623       awaitEvent(
624            (run_queue_hd == END_TSO_QUEUE)
625 #if defined(SMP)
626         && allFreeCapabilities()
627 #endif
628         );
629     }
630     /* we can be interrupted while waiting for I/O... */
631     if (interrupted) continue;
632
633     /* 
634      * Detect deadlock: when we have no threads to run, there are no
635      * threads waiting on I/O or sleeping, and all the other tasks are
636      * waiting for work, we must have a deadlock of some description.
637      *
638      * We first try to find threads blocked on themselves (ie. black
639      * holes), and generate NonTermination exceptions where necessary.
640      *
641      * If no threads are black holed, we have a deadlock situation, so
642      * inform all the main threads.
643      */
644 #ifndef PAR
645     if (blocked_queue_hd == END_TSO_QUEUE
646         && run_queue_hd == END_TSO_QUEUE
647         && sleeping_queue == END_TSO_QUEUE
648 #if defined(SMP)
649         && allFreeCapabilities()
650 #elif defined(THREADED_RTS)
651         && suspended_ccalling_threads == END_TSO_QUEUE
652 #endif
653         )
654     {
655         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
656         RELEASE_LOCK(&sched_mutex);
657         GarbageCollect(GetRoots,rtsTrue);
658         ACQUIRE_LOCK(&sched_mutex);
659         IF_DEBUG(scheduler, sched_belch("GC done."));
660         if (blocked_queue_hd == END_TSO_QUEUE
661             && run_queue_hd == END_TSO_QUEUE
662             && sleeping_queue == END_TSO_QUEUE) {
663
664             IF_DEBUG(scheduler, sched_belch("still deadlocked, checking for black holes..."));
665             detectBlackHoles();
666
667             /* No black holes, so probably a real deadlock.  Send the
668              * current main thread the Deadlock exception (or in the SMP
669              * build, send *all* main threads the deadlock exception,
670              * since none of them can make progress).
671              */
672             if (run_queue_hd == END_TSO_QUEUE) {
673                 StgMainThread *m;
674 #if defined(RTS_SUPPORTS_THREADS)
675                 for (m = main_threads; m != NULL; m = m->link) {
676                     switch (m->tso->why_blocked) {
677                     case BlockedOnBlackHole:
678                         raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
679                         break;
680                     case BlockedOnException:
681                     case BlockedOnMVar:
682                         raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
683                         break;
684                     default:
685                         barf("deadlock: main thread blocked in a strange way");
686                     }
687                 }
688 #else
689                 m = main_threads;
690                 switch (m->tso->why_blocked) {
691                 case BlockedOnBlackHole:
692                     raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
693                     break;
694                 case BlockedOnException:
695                 case BlockedOnMVar:
696                     raiseAsync(m->tso, (StgClosure *)Deadlock_closure);
697                     break;
698                 default:
699                     barf("deadlock: main thread blocked in a strange way");
700                 }
701 #endif
702             }
703 #if defined(RTS_SUPPORTS_THREADS)
704             if ( run_queue_hd == END_TSO_QUEUE ) {
705               IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down."));
706               shutdownHaskellAndExit(0);
707             
708             }
709 #endif
710             ASSERT( run_queue_hd != END_TSO_QUEUE );
711         }
712     }
713 #elif defined(PAR)
714     /* ToDo: add deadlock detection in GUM (similar to SMP) -- HWL */
715 #endif
716
717 #if defined(SMP)
718     /* If there's a GC pending, don't do anything until it has
719      * completed.
720      */
721     if (ready_to_gc) {
722       IF_DEBUG(scheduler,sched_belch("waiting for GC"));
723       waitCondition( &gc_pending_cond, &sched_mutex );
724     }
725 #endif    
726
727 #if defined(SMP)
728     /* block until we've got a thread on the run queue and a free
729      * capability.
730      */
731     while ( run_queue_hd == END_TSO_QUEUE 
732             || noFreeCapabilities() 
733             ) {
734       IF_DEBUG(scheduler, sched_belch("waiting for work"));
735       waitCondition( &thread_ready_cond, &sched_mutex );
736       IF_DEBUG(scheduler, sched_belch("work now available"));
737     }
738 #elif defined(THREADED_RTS)
739    if ( run_queue_hd == END_TSO_QUEUE ) {
740      /* no work available, wait for external calls to complete. */
741      IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId()));
742      taskAvailable();
743      RELEASE_LOCK(&sched_mutex);
744      RELEASE_LOCK(&rts_mutex);
745
746      /* Sigh - need to have a mutex locked in order to wait on the
747         condition variable. */
748      ACQUIRE_LOCK(&thread_ready_aux_mutex);
749      waitCondition(&thread_ready_cond, &thread_ready_aux_mutex);
750      RELEASE_LOCK(&thread_ready_aux_mutex);
751
752      IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId()));
753      /* ToDo: longjmp() */
754      taskStart();
755    }
756 #endif
757
758 #if defined(GRAN)
759
760     if (RtsFlags.GranFlags.Light)
761       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
762
763     /* adjust time based on time-stamp */
764     if (event->time > CurrentTime[CurrentProc] &&
765         event->evttype != ContinueThread)
766       CurrentTime[CurrentProc] = event->time;
767     
768     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
769     if (!RtsFlags.GranFlags.Light)
770       handleIdlePEs();
771
772     IF_DEBUG(gran, fprintf(stderr, "GRAN: switch by event-type\n"));
773
774     /* main event dispatcher in GranSim */
775     switch (event->evttype) {
776       /* Should just be continuing execution */
777     case ContinueThread:
778       IF_DEBUG(gran, fprintf(stderr, "GRAN: doing ContinueThread\n"));
779       /* ToDo: check assertion
780       ASSERT(run_queue_hd != (StgTSO*)NULL &&
781              run_queue_hd != END_TSO_QUEUE);
782       */
783       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
784       if (!RtsFlags.GranFlags.DoAsyncFetch &&
785           procStatus[CurrentProc]==Fetching) {
786         belch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]",
787               CurrentTSO->id, CurrentTSO, CurrentProc);
788         goto next_thread;
789       } 
790       /* Ignore ContinueThreads for completed threads */
791       if (CurrentTSO->what_next == ThreadComplete) {
792         belch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)", 
793               CurrentTSO->id, CurrentTSO, CurrentProc);
794         goto next_thread;
795       } 
796       /* Ignore ContinueThreads for threads that are being migrated */
797       if (PROCS(CurrentTSO)==Nowhere) { 
798         belch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)",
799               CurrentTSO->id, CurrentTSO, CurrentProc);
800         goto next_thread;
801       }
802       /* The thread should be at the beginning of the run queue */
803       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
804         belch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread",
805               CurrentTSO->id, CurrentTSO, CurrentProc);
806         break; // run the thread anyway
807       }
808       /*
809       new_event(proc, proc, CurrentTime[proc],
810                 FindWork,
811                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
812       goto next_thread; 
813       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
814       break; // now actually run the thread; DaH Qu'vam yImuHbej 
815
816     case FetchNode:
817       do_the_fetchnode(event);
818       goto next_thread;             /* handle next event in event queue  */
819       
820     case GlobalBlock:
821       do_the_globalblock(event);
822       goto next_thread;             /* handle next event in event queue  */
823       
824     case FetchReply:
825       do_the_fetchreply(event);
826       goto next_thread;             /* handle next event in event queue  */
827       
828     case UnblockThread:   /* Move from the blocked queue to the tail of */
829       do_the_unblock(event);
830       goto next_thread;             /* handle next event in event queue  */
831       
832     case ResumeThread:  /* Move from the blocked queue to the tail of */
833       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
834       event->tso->gran.blocktime += 
835         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
836       do_the_startthread(event);
837       goto next_thread;             /* handle next event in event queue  */
838       
839     case StartThread:
840       do_the_startthread(event);
841       goto next_thread;             /* handle next event in event queue  */
842       
843     case MoveThread:
844       do_the_movethread(event);
845       goto next_thread;             /* handle next event in event queue  */
846       
847     case MoveSpark:
848       do_the_movespark(event);
849       goto next_thread;             /* handle next event in event queue  */
850       
851     case FindWork:
852       do_the_findwork(event);
853       goto next_thread;             /* handle next event in event queue  */
854       
855     default:
856       barf("Illegal event type %u\n", event->evttype);
857     }  /* switch */
858     
859     /* This point was scheduler_loop in the old RTS */
860
861     IF_DEBUG(gran, belch("GRAN: after main switch"));
862
863     TimeOfLastEvent = CurrentTime[CurrentProc];
864     TimeOfNextEvent = get_time_of_next_event();
865     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
866     // CurrentTSO = ThreadQueueHd;
867
868     IF_DEBUG(gran, belch("GRAN: time of next event is: %ld", 
869                          TimeOfNextEvent));
870
871     if (RtsFlags.GranFlags.Light) 
872       GranSimLight_leave_system(event, &ActiveTSO); 
873
874     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
875
876     IF_DEBUG(gran, 
877              belch("GRAN: end of time-slice is %#lx", EndOfTimeSlice));
878
879     /* in a GranSim setup the TSO stays on the run queue */
880     t = CurrentTSO;
881     /* Take a thread from the run queue. */
882     t = POP_RUN_QUEUE(); // take_off_run_queue(t);
883
884     IF_DEBUG(gran, 
885              fprintf(stderr, "GRAN: About to run current thread, which is\n");
886              G_TSO(t,5));
887
888     context_switch = 0; // turned on via GranYield, checking events and time slice
889
890     IF_DEBUG(gran, 
891              DumpGranEvent(GR_SCHEDULE, t));
892
893     procStatus[CurrentProc] = Busy;
894
895 #elif defined(PAR)
896     if (PendingFetches != END_BF_QUEUE) {
897         processFetches();
898     }
899
900     /* ToDo: phps merge with spark activation above */
901     /* check whether we have local work and send requests if we have none */
902     if (EMPTY_RUN_QUEUE()) {  /* no runnable threads */
903       /* :-[  no local threads => look out for local sparks */
904       /* the spark pool for the current PE */
905       pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
906       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
907           pool->hd < pool->tl) {
908         /* 
909          * ToDo: add GC code check that we really have enough heap afterwards!!
910          * Old comment:
911          * If we're here (no runnable threads) and we have pending
912          * sparks, we must have a space problem.  Get enough space
913          * to turn one of those pending sparks into a
914          * thread... 
915          */
916
917         spark = findSpark(rtsFalse);                /* get a spark */
918         if (spark != (rtsSpark) NULL) {
919           tso = activateSpark(spark);       /* turn the spark into a thread */
920           IF_PAR_DEBUG(schedule,
921                        belch("==== schedule: Created TSO %d (%p); %d threads active",
922                              tso->id, tso, advisory_thread_count));
923
924           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
925             belch("==^^ failed to activate spark");
926             goto next_thread;
927           }               /* otherwise fall through & pick-up new tso */
928         } else {
929           IF_PAR_DEBUG(verbose,
930                        belch("==^^ no local sparks (spark pool contains only NFs: %d)", 
931                              spark_queue_len(pool)));
932           goto next_thread;
933         }
934       }
935
936       /* If we still have no work we need to send a FISH to get a spark
937          from another PE 
938       */
939       if (EMPTY_RUN_QUEUE()) {
940       /* =8-[  no local sparks => look for work on other PEs */
941         /*
942          * We really have absolutely no work.  Send out a fish
943          * (there may be some out there already), and wait for
944          * something to arrive.  We clearly can't run any threads
945          * until a SCHEDULE or RESUME arrives, and so that's what
946          * we're hoping to see.  (Of course, we still have to
947          * respond to other types of messages.)
948          */
949         TIME now = msTime() /*CURRENT_TIME*/;
950         IF_PAR_DEBUG(verbose, 
951                      belch("--  now=%ld", now));
952         IF_PAR_DEBUG(verbose,
953                      if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
954                          (last_fish_arrived_at!=0 &&
955                           last_fish_arrived_at+RtsFlags.ParFlags.fishDelay > now)) {
956                        belch("--$$ delaying FISH until %ld (last fish %ld, delay %ld, now %ld)",
957                              last_fish_arrived_at+RtsFlags.ParFlags.fishDelay,
958                              last_fish_arrived_at,
959                              RtsFlags.ParFlags.fishDelay, now);
960                      });
961         
962         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
963             (last_fish_arrived_at==0 ||
964              (last_fish_arrived_at+RtsFlags.ParFlags.fishDelay <= now))) {
965           /* outstandingFishes is set in sendFish, processFish;
966              avoid flooding system with fishes via delay */
967           pe = choosePE();
968           sendFish(pe, mytid, NEW_FISH_AGE, NEW_FISH_HISTORY, 
969                    NEW_FISH_HUNGER);
970
971           // Global statistics: count no. of fishes
972           if (RtsFlags.ParFlags.ParStats.Global &&
973               RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
974             globalParStats.tot_fish_mess++;
975           }
976         }
977       
978         receivedFinish = processMessages();
979         goto next_thread;
980       }
981     } else if (PacketsWaiting()) {  /* Look for incoming messages */
982       receivedFinish = processMessages();
983     }
984
985     /* Now we are sure that we have some work available */
986     ASSERT(run_queue_hd != END_TSO_QUEUE);
987
988     /* Take a thread from the run queue, if we have work */
989     t = POP_RUN_QUEUE();  // take_off_run_queue(END_TSO_QUEUE);
990     IF_DEBUG(sanity,checkTSO(t));
991
992     /* ToDo: write something to the log-file
993     if (RTSflags.ParFlags.granSimStats && !sameThread)
994         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
995
996     CurrentTSO = t;
997     */
998     /* the spark pool for the current PE */
999     pool = &(MainRegTable.rSparks); // generalise to cap = &MainRegTable
1000
1001     IF_DEBUG(scheduler, 
1002              belch("--=^ %d threads, %d sparks on [%#x]", 
1003                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1004
1005 #if 1
1006     if (0 && RtsFlags.ParFlags.ParStats.Full && 
1007         t && LastTSO && t->id != LastTSO->id && 
1008         LastTSO->why_blocked == NotBlocked && 
1009         LastTSO->what_next != ThreadComplete) {
1010       // if previously scheduled TSO not blocked we have to record the context switch
1011       DumpVeryRawGranEvent(TimeOfLastYield, CURRENT_PROC, CURRENT_PROC,
1012                            GR_DESCHEDULE, LastTSO, (StgClosure *)NULL, 0, 0);
1013     }
1014
1015     if (RtsFlags.ParFlags.ParStats.Full && 
1016         (emitSchedule /* forced emit */ ||
1017         (t && LastTSO && t->id != LastTSO->id))) {
1018       /* 
1019          we are running a different TSO, so write a schedule event to log file
1020          NB: If we use fair scheduling we also have to write  a deschedule 
1021              event for LastTSO; with unfair scheduling we know that the
1022              previous tso has blocked whenever we switch to another tso, so
1023              we don't need it in GUM for now
1024       */
1025       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1026                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1027       emitSchedule = rtsFalse;
1028     }
1029      
1030 #endif
1031 #else /* !GRAN && !PAR */
1032   
1033     /* grab a thread from the run queue
1034      */
1035     ASSERT(run_queue_hd != END_TSO_QUEUE);
1036     t = POP_RUN_QUEUE();
1037     // Sanity check the thread we're about to run.  This can be
1038     // expensive if there is lots of thread switching going on...
1039     IF_DEBUG(sanity,checkTSO(t));
1040 #endif
1041     
1042     grabCapability(&cap);
1043     cap->r.rCurrentTSO = t;
1044     
1045     /* context switches are now initiated by the timer signal, unless
1046      * the user specified "context switch as often as possible", with
1047      * +RTS -C0
1048      */
1049     if (
1050 #ifdef PROFILING
1051         RtsFlags.ProfFlags.profileInterval == 0 ||
1052 #endif
1053         (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
1054          && (run_queue_hd != END_TSO_QUEUE
1055              || blocked_queue_hd != END_TSO_QUEUE
1056              || sleeping_queue != END_TSO_QUEUE)))
1057         context_switch = 1;
1058     else
1059         context_switch = 0;
1060
1061     RELEASE_LOCK(&sched_mutex);
1062
1063     IF_DEBUG(scheduler, sched_belch("-->> Running TSO %ld (%p) %s ...", 
1064                               t->id, t, whatNext_strs[t->what_next]));
1065
1066 #ifdef PROFILING
1067     startHeapProfTimer();
1068 #endif
1069
1070     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1071     /* Run the current thread 
1072      */
1073     switch (cap->r.rCurrentTSO->what_next) {
1074     case ThreadKilled:
1075     case ThreadComplete:
1076         /* Thread already finished, return to scheduler. */
1077         ret = ThreadFinished;
1078         break;
1079     case ThreadEnterGHC:
1080         ret = StgRun((StgFunPtr) stg_enterStackTop, &cap->r);
1081         break;
1082     case ThreadRunGHC:
1083         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
1084         break;
1085     case ThreadEnterInterp:
1086         ret = interpretBCO(cap);
1087         break;
1088     default:
1089       barf("schedule: invalid what_next field");
1090     }
1091     /* +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ */
1092     
1093     /* Costs for the scheduler are assigned to CCS_SYSTEM */
1094 #ifdef PROFILING
1095     stopHeapProfTimer();
1096     CCCS = CCS_SYSTEM;
1097 #endif
1098     
1099     ACQUIRE_LOCK(&sched_mutex);
1100
1101 #ifdef SMP
1102     IF_DEBUG(scheduler,fprintf(stderr,"scheduler (task %ld): ", osThreadId()););
1103 #elif !defined(GRAN) && !defined(PAR)
1104     IF_DEBUG(scheduler,fprintf(stderr,"scheduler: "););
1105 #endif
1106     t = cap->r.rCurrentTSO;
1107     
1108 #if defined(PAR)
1109     /* HACK 675: if the last thread didn't yield, make sure to print a 
1110        SCHEDULE event to the log file when StgRunning the next thread, even
1111        if it is the same one as before */
1112     LastTSO = t; 
1113     TimeOfLastYield = CURRENT_TIME;
1114 #endif
1115
1116     switch (ret) {
1117     case HeapOverflow:
1118 #if defined(GRAN)
1119       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1120       globalGranStats.tot_heapover++;
1121 #elif defined(PAR)
1122       globalParStats.tot_heapover++;
1123 #endif
1124
1125       // did the task ask for a large block?
1126       if (cap->r.rHpAlloc > BLOCK_SIZE_W) {
1127           // if so, get one and push it on the front of the nursery.
1128           bdescr *bd;
1129           nat blocks;
1130           
1131           blocks = (nat)BLOCK_ROUND_UP(cap->r.rHpAlloc * sizeof(W_)) / BLOCK_SIZE;
1132
1133           IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: requesting a large block (size %d)", 
1134                                    t->id, t,
1135                                    whatNext_strs[t->what_next], blocks));
1136
1137           // don't do this if it would push us over the
1138           // alloc_blocks_lim limit; we'll GC first.
1139           if (alloc_blocks + blocks < alloc_blocks_lim) {
1140
1141               alloc_blocks += blocks;
1142               bd = allocGroup( blocks );
1143
1144               // link the new group into the list
1145               bd->link = cap->r.rCurrentNursery;
1146               bd->u.back = cap->r.rCurrentNursery->u.back;
1147               if (cap->r.rCurrentNursery->u.back != NULL) {
1148                   cap->r.rCurrentNursery->u.back->link = bd;
1149               } else {
1150                   ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1151                          g0s0->blocks == cap->r.rNursery);
1152                   cap->r.rNursery = g0s0->blocks = bd;
1153               }           
1154               cap->r.rCurrentNursery->u.back = bd;
1155
1156               // initialise it as a nursery block
1157               bd->step = g0s0;
1158               bd->gen_no = 0;
1159               bd->flags = 0;
1160               bd->free = bd->start;
1161
1162               // don't forget to update the block count in g0s0.
1163               g0s0->n_blocks += blocks;
1164               ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1165
1166               // now update the nursery to point to the new block
1167               cap->r.rCurrentNursery = bd;
1168
1169               // we might be unlucky and have another thread get on the
1170               // run queue before us and steal the large block, but in that
1171               // case the thread will just end up requesting another large
1172               // block.
1173               PUSH_ON_RUN_QUEUE(t);
1174               break;
1175           }
1176       }
1177
1178       /* make all the running tasks block on a condition variable,
1179        * maybe set context_switch and wait till they all pile in,
1180        * then have them wait on a GC condition variable.
1181        */
1182       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped: HeapOverflow", 
1183                                t->id, t, whatNext_strs[t->what_next]));
1184       threadPaused(t);
1185 #if defined(GRAN)
1186       ASSERT(!is_on_queue(t,CurrentProc));
1187 #elif defined(PAR)
1188       /* Currently we emit a DESCHEDULE event before GC in GUM.
1189          ToDo: either add separate event to distinguish SYSTEM time from rest
1190                or just nuke this DESCHEDULE (and the following SCHEDULE) */
1191       if (0 && RtsFlags.ParFlags.ParStats.Full) {
1192         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1193                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1194         emitSchedule = rtsTrue;
1195       }
1196 #endif
1197       
1198       ready_to_gc = rtsTrue;
1199       context_switch = 1;               /* stop other threads ASAP */
1200       PUSH_ON_RUN_QUEUE(t);
1201       /* actual GC is done at the end of the while loop */
1202       break;
1203       
1204     case StackOverflow:
1205 #if defined(GRAN)
1206       IF_DEBUG(gran, 
1207                DumpGranEvent(GR_DESCHEDULE, t));
1208       globalGranStats.tot_stackover++;
1209 #elif defined(PAR)
1210       // IF_DEBUG(par, 
1211       // DumpGranEvent(GR_DESCHEDULE, t);
1212       globalParStats.tot_stackover++;
1213 #endif
1214       IF_DEBUG(scheduler,belch("--<< thread %ld (%p; %s) stopped, StackOverflow", 
1215                                t->id, t, whatNext_strs[t->what_next]));
1216       /* just adjust the stack for this thread, then pop it back
1217        * on the run queue.
1218        */
1219       threadPaused(t);
1220       { 
1221         StgMainThread *m;
1222         /* enlarge the stack */
1223         StgTSO *new_t = threadStackOverflow(t);
1224         
1225         /* This TSO has moved, so update any pointers to it from the
1226          * main thread stack.  It better not be on any other queues...
1227          * (it shouldn't be).
1228          */
1229         for (m = main_threads; m != NULL; m = m->link) {
1230           if (m->tso == t) {
1231             m->tso = new_t;
1232           }
1233         }
1234         threadPaused(new_t);
1235         PUSH_ON_RUN_QUEUE(new_t);
1236       }
1237       break;
1238
1239     case ThreadYielding:
1240 #if defined(GRAN)
1241       IF_DEBUG(gran, 
1242                DumpGranEvent(GR_DESCHEDULE, t));
1243       globalGranStats.tot_yields++;
1244 #elif defined(PAR)
1245       // IF_DEBUG(par, 
1246       // DumpGranEvent(GR_DESCHEDULE, t);
1247       globalParStats.tot_yields++;
1248 #endif
1249       /* put the thread back on the run queue.  Then, if we're ready to
1250        * GC, check whether this is the last task to stop.  If so, wake
1251        * up the GC thread.  getThread will block during a GC until the
1252        * GC is finished.
1253        */
1254       IF_DEBUG(scheduler,
1255                if (t->what_next == ThreadEnterInterp) {
1256                    /* ToDo: or maybe a timer expired when we were in Hugs?
1257                     * or maybe someone hit ctrl-C
1258                     */
1259                    belch("--<< thread %ld (%p; %s) stopped to switch to Hugs", 
1260                          t->id, t, whatNext_strs[t->what_next]);
1261                } else {
1262                    belch("--<< thread %ld (%p; %s) stopped, yielding", 
1263                          t->id, t, whatNext_strs[t->what_next]);
1264                }
1265                );
1266
1267       threadPaused(t);
1268
1269       IF_DEBUG(sanity,
1270                //belch("&& Doing sanity check on yielding TSO %ld.", t->id);
1271                checkTSO(t));
1272       ASSERT(t->link == END_TSO_QUEUE);
1273 #if defined(GRAN)
1274       ASSERT(!is_on_queue(t,CurrentProc));
1275
1276       IF_DEBUG(sanity,
1277                //belch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1278                checkThreadQsSanity(rtsTrue));
1279 #endif
1280 #if defined(PAR)
1281       if (RtsFlags.ParFlags.doFairScheduling) { 
1282         /* this does round-robin scheduling; good for concurrency */
1283         APPEND_TO_RUN_QUEUE(t);
1284       } else {
1285         /* this does unfair scheduling; good for parallelism */
1286         PUSH_ON_RUN_QUEUE(t);
1287       }
1288 #else
1289       /* this does round-robin scheduling; good for concurrency */
1290       APPEND_TO_RUN_QUEUE(t);
1291 #endif
1292 #if defined(GRAN)
1293       /* add a ContinueThread event to actually process the thread */
1294       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1295                 ContinueThread,
1296                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1297       IF_GRAN_DEBUG(bq, 
1298                belch("GRAN: eventq and runnableq after adding yielded thread to queue again:");
1299                G_EVENTQ(0);
1300                G_CURR_THREADQ(0));
1301 #endif /* GRAN */
1302       break;
1303       
1304     case ThreadBlocked:
1305 #if defined(GRAN)
1306       IF_DEBUG(scheduler,
1307                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1308                                t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1309                if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1310
1311       // ??? needed; should emit block before
1312       IF_DEBUG(gran, 
1313                DumpGranEvent(GR_DESCHEDULE, t)); 
1314       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1315       /*
1316         ngoq Dogh!
1317       ASSERT(procStatus[CurrentProc]==Busy || 
1318               ((procStatus[CurrentProc]==Fetching) && 
1319               (t->block_info.closure!=(StgClosure*)NULL)));
1320       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1321           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1322             procStatus[CurrentProc]==Fetching)) 
1323         procStatus[CurrentProc] = Idle;
1324       */
1325 #elif defined(PAR)
1326       IF_DEBUG(scheduler,
1327                belch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: ", 
1328                      t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1329       IF_PAR_DEBUG(bq,
1330
1331                    if (t->block_info.closure!=(StgClosure*)NULL) 
1332                      print_bq(t->block_info.closure));
1333
1334       /* Send a fetch (if BlockedOnGA) and dump event to log file */
1335       blockThread(t);
1336
1337       /* whatever we schedule next, we must log that schedule */
1338       emitSchedule = rtsTrue;
1339
1340 #else /* !GRAN */
1341       /* don't need to do anything.  Either the thread is blocked on
1342        * I/O, in which case we'll have called addToBlockedQueue
1343        * previously, or it's blocked on an MVar or Blackhole, in which
1344        * case it'll be on the relevant queue already.
1345        */
1346       IF_DEBUG(scheduler,
1347                fprintf(stderr, "--<< thread %d (%p) stopped: ", t->id, t);
1348                printThreadBlockage(t);
1349                fprintf(stderr, "\n"));
1350
1351       /* Only for dumping event to log file 
1352          ToDo: do I need this in GranSim, too?
1353       blockThread(t);
1354       */
1355 #endif
1356       threadPaused(t);
1357       break;
1358       
1359     case ThreadFinished:
1360       /* Need to check whether this was a main thread, and if so, signal
1361        * the task that started it with the return value.  If we have no
1362        * more main threads, we probably need to stop all the tasks until
1363        * we get a new one.
1364        */
1365       /* We also end up here if the thread kills itself with an
1366        * uncaught exception, see Exception.hc.
1367        */
1368       IF_DEBUG(scheduler,belch("--++ thread %d (%p) finished", t->id, t));
1369 #if defined(GRAN)
1370       endThread(t, CurrentProc); // clean-up the thread
1371 #elif defined(PAR)
1372       /* For now all are advisory -- HWL */
1373       //if(t->priority==AdvisoryPriority) ??
1374       advisory_thread_count--;
1375       
1376 # ifdef DIST
1377       if(t->dist.priority==RevalPriority)
1378         FinishReval(t);
1379 # endif
1380       
1381       if (RtsFlags.ParFlags.ParStats.Full &&
1382           !RtsFlags.ParFlags.ParStats.Suppressed) 
1383         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1384 #endif
1385       break;
1386       
1387     default:
1388       barf("schedule: invalid thread return code %d", (int)ret);
1389     }
1390     
1391 #ifdef SMP
1392     grabCapability(&cap);
1393 #endif
1394
1395 #ifdef PROFILING
1396     if (RtsFlags.ProfFlags.profileInterval==0 || performHeapProfile) {
1397         GarbageCollect(GetRoots, rtsTrue);
1398         heapCensus();
1399         performHeapProfile = rtsFalse;
1400         ready_to_gc = rtsFalse; // we already GC'd
1401     }
1402 #endif
1403
1404 #ifdef SMP
1405     if (ready_to_gc && allFreeCapabilities() )
1406 #else
1407     if (ready_to_gc) 
1408 #endif
1409       {
1410       /* everybody back, start the GC.
1411        * Could do it in this thread, or signal a condition var
1412        * to do it in another thread.  Either way, we need to
1413        * broadcast on gc_pending_cond afterward.
1414        */
1415 #if defined(RTS_SUPPORTS_THREADS)
1416       IF_DEBUG(scheduler,sched_belch("doing GC"));
1417 #endif
1418       GarbageCollect(GetRoots,rtsFalse);
1419       ready_to_gc = rtsFalse;
1420 #ifdef SMP
1421       broadcastCondition(&gc_pending_cond);
1422 #endif
1423 #if defined(GRAN)
1424       /* add a ContinueThread event to continue execution of current thread */
1425       new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1426                 ContinueThread,
1427                 t, (StgClosure*)NULL, (rtsSpark*)NULL);
1428       IF_GRAN_DEBUG(bq, 
1429                fprintf(stderr, "GRAN: eventq and runnableq after Garbage collection:\n");
1430                G_EVENTQ(0);
1431                G_CURR_THREADQ(0));
1432 #endif /* GRAN */
1433     }
1434
1435 #if defined(GRAN)
1436   next_thread:
1437     IF_GRAN_DEBUG(unused,
1438                   print_eventq(EventHd));
1439
1440     event = get_next_event();
1441 #elif defined(PAR)
1442   next_thread:
1443     /* ToDo: wait for next message to arrive rather than busy wait */
1444 #endif /* GRAN */
1445
1446   } /* end of while(1) */
1447
1448   IF_PAR_DEBUG(verbose,
1449                belch("== Leaving schedule() after having received Finish"));
1450 }
1451
1452 /* ---------------------------------------------------------------------------
1453  * deleteAllThreads():  kill all the live threads.
1454  *
1455  * This is used when we catch a user interrupt (^C), before performing
1456  * any necessary cleanups and running finalizers.
1457  * ------------------------------------------------------------------------- */
1458    
1459 void deleteAllThreads ( void )
1460 {
1461   StgTSO* t, *next;
1462   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
1463   for (t = run_queue_hd; t != END_TSO_QUEUE; t = next) {
1464       next = t->link;
1465       deleteThread(t);
1466   }
1467   for (t = blocked_queue_hd; t != END_TSO_QUEUE; t = next) {
1468       next = t->link;
1469       deleteThread(t);
1470   }
1471   for (t = sleeping_queue; t != END_TSO_QUEUE; t = next) {
1472       next = t->link;
1473       deleteThread(t);
1474   }
1475   run_queue_hd = run_queue_tl = END_TSO_QUEUE;
1476   blocked_queue_hd = blocked_queue_tl = END_TSO_QUEUE;
1477   sleeping_queue = END_TSO_QUEUE;
1478 }
1479
1480 /* startThread and  insertThread are now in GranSim.c -- HWL */
1481
1482
1483 //@node Suspend and Resume, Run queue code, Main scheduling loop, Main scheduling code
1484 //@subsection Suspend and Resume
1485
1486 /* ---------------------------------------------------------------------------
1487  * Suspending & resuming Haskell threads.
1488  * 
1489  * When making a "safe" call to C (aka _ccall_GC), the task gives back
1490  * its capability before calling the C function.  This allows another
1491  * task to pick up the capability and carry on running Haskell
1492  * threads.  It also means that if the C call blocks, it won't lock
1493  * the whole system.
1494  *
1495  * The Haskell thread making the C call is put to sleep for the
1496  * duration of the call, on the susepended_ccalling_threads queue.  We
1497  * give out a token to the task, which it can use to resume the thread
1498  * on return from the C function.
1499  * ------------------------------------------------------------------------- */
1500    
1501 StgInt
1502 suspendThread( StgRegTable *reg )
1503 {
1504   nat tok;
1505   Capability *cap;
1506
1507   /* assume that *reg is a pointer to the StgRegTable part
1508    * of a Capability.
1509    */
1510   cap = (Capability *)((void *)reg - sizeof(StgFunTable));
1511
1512   ACQUIRE_LOCK(&sched_mutex);
1513
1514   IF_DEBUG(scheduler,
1515            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
1516
1517   threadPaused(cap->r.rCurrentTSO);
1518   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
1519   suspended_ccalling_threads = cap->r.rCurrentTSO;
1520
1521   /* Use the thread ID as the token; it should be unique */
1522   tok = cap->r.rCurrentTSO->id;
1523
1524   /* Hand back capability */
1525   releaseCapability(&cap);
1526   
1527 #if defined(RTS_SUPPORTS_THREADS) && !defined(SMP)
1528   /* Preparing to leave the RTS, so ensure there's a native thread/task
1529      waiting to take over.
1530      
1531      ToDo: optimise this and only create a new task if there's a need
1532      for one (i.e., if there's only one Concurrent Haskell thread alive,
1533      there's no need to create a new task).
1534   */
1535   IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok));
1536   startTask(taskStart);
1537
1538 #endif
1539   
1540   RELEASE_LOCK(&sched_mutex);
1541   RELEASE_LOCK(&rts_mutex);
1542   return tok; 
1543 }
1544
1545 StgRegTable *
1546 resumeThread( StgInt tok )
1547 {
1548   StgTSO *tso, **prev;
1549   Capability *cap;
1550
1551 #if defined(THREADED_RTS)
1552   IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok));
1553   ACQUIRE_LOCK(&sched_mutex);
1554   threads_waiting++;
1555   IF_DEBUG(scheduler, sched_belch("thread %d returning, threads waiting: %d.\n", tok, threads_waiting));
1556   RELEASE_LOCK(&sched_mutex);
1557   
1558   IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok));
1559   ACQUIRE_LOCK(&rts_mutex);
1560   threads_waiting--;
1561   taskNotAvailable();
1562   IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok));
1563 #endif
1564
1565 #if defined(THREADED_RTS)
1566   /* Free up any RTS-blocked threads. */
1567   broadcastCondition(&thread_ready_cond);
1568 #endif
1569
1570   /* Remove the thread off of the suspended list */
1571   prev = &suspended_ccalling_threads;
1572   for (tso = suspended_ccalling_threads; 
1573        tso != END_TSO_QUEUE; 
1574        prev = &tso->link, tso = tso->link) {
1575     if (tso->id == (StgThreadID)tok) {
1576       *prev = tso->link;
1577       break;
1578     }
1579   }
1580   if (tso == END_TSO_QUEUE) {
1581     barf("resumeThread: thread not found");
1582   }
1583   tso->link = END_TSO_QUEUE;
1584
1585 #if defined(SMP)
1586   while ( noFreeCapabilities() ) {
1587     IF_DEBUG(scheduler, sched_belch("waiting to resume"));
1588     waitCondition(&thread_ready_cond, &sched_mutex);
1589     IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id));
1590   }
1591 #endif
1592
1593   grabCapability(&cap);
1594
1595   cap->r.rCurrentTSO = tso;
1596
1597   return &cap->r;
1598 }
1599
1600
1601 /* ---------------------------------------------------------------------------
1602  * Static functions
1603  * ------------------------------------------------------------------------ */
1604 static void unblockThread(StgTSO *tso);
1605
1606 /* ---------------------------------------------------------------------------
1607  * Comparing Thread ids.
1608  *
1609  * This is used from STG land in the implementation of the
1610  * instances of Eq/Ord for ThreadIds.
1611  * ------------------------------------------------------------------------ */
1612
1613 int cmp_thread(const StgTSO *tso1, const StgTSO *tso2) 
1614
1615   StgThreadID id1 = tso1->id; 
1616   StgThreadID id2 = tso2->id;
1617  
1618   if (id1 < id2) return (-1);
1619   if (id1 > id2) return 1;
1620   return 0;
1621 }
1622
1623 /* ---------------------------------------------------------------------------
1624  * Fetching the ThreadID from an StgTSO.
1625  *
1626  * This is used in the implementation of Show for ThreadIds.
1627  * ------------------------------------------------------------------------ */
1628 int rts_getThreadId(const StgTSO *tso) 
1629 {
1630   return tso->id;
1631 }
1632
1633 /* ---------------------------------------------------------------------------
1634    Create a new thread.
1635
1636    The new thread starts with the given stack size.  Before the
1637    scheduler can run, however, this thread needs to have a closure
1638    (and possibly some arguments) pushed on its stack.  See
1639    pushClosure() in Schedule.h.
1640
1641    createGenThread() and createIOThread() (in SchedAPI.h) are
1642    convenient packaged versions of this function.
1643
1644    currently pri (priority) is only used in a GRAN setup -- HWL
1645    ------------------------------------------------------------------------ */
1646 //@cindex createThread
1647 #if defined(GRAN)
1648 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
1649 StgTSO *
1650 createThread(nat stack_size, StgInt pri)
1651 {
1652   return createThread_(stack_size, rtsFalse, pri);
1653 }
1654
1655 static StgTSO *
1656 createThread_(nat size, rtsBool have_lock, StgInt pri)
1657 {
1658 #else
1659 StgTSO *
1660 createThread(nat stack_size)
1661 {
1662   return createThread_(stack_size, rtsFalse);
1663 }
1664
1665 static StgTSO *
1666 createThread_(nat size, rtsBool have_lock)
1667 {
1668 #endif
1669
1670     StgTSO *tso;
1671     nat stack_size;
1672
1673     /* First check whether we should create a thread at all */
1674 #if defined(PAR)
1675   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
1676   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
1677     threadsIgnored++;
1678     belch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1679           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
1680     return END_TSO_QUEUE;
1681   }
1682   threadsCreated++;
1683 #endif
1684
1685 #if defined(GRAN)
1686   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
1687 #endif
1688
1689   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
1690
1691   /* catch ridiculously small stack sizes */
1692   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
1693     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
1694   }
1695
1696   stack_size = size - TSO_STRUCT_SIZEW;
1697
1698   tso = (StgTSO *)allocate(size);
1699   TICK_ALLOC_TSO(stack_size, 0);
1700
1701   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
1702 #if defined(GRAN)
1703   SET_GRAN_HDR(tso, ThisPE);
1704 #endif
1705   tso->what_next     = ThreadEnterGHC;
1706
1707   /* tso->id needs to be unique.  For now we use a heavyweight mutex to
1708    * protect the increment operation on next_thread_id.
1709    * In future, we could use an atomic increment instead.
1710    */
1711   if (!have_lock) { ACQUIRE_LOCK(&sched_mutex); }
1712   tso->id = next_thread_id++; 
1713   if (!have_lock) { RELEASE_LOCK(&sched_mutex); }
1714
1715   tso->why_blocked  = NotBlocked;
1716   tso->blocked_exceptions = NULL;
1717
1718   tso->stack_size   = stack_size;
1719   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
1720                               - TSO_STRUCT_SIZEW;
1721   tso->sp           = (P_)&(tso->stack) + stack_size;
1722
1723 #ifdef PROFILING
1724   tso->prof.CCCS = CCS_MAIN;
1725 #endif
1726
1727   /* put a stop frame on the stack */
1728   tso->sp -= sizeofW(StgStopFrame);
1729   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
1730   tso->su = (StgUpdateFrame*)tso->sp;
1731
1732   // ToDo: check this
1733 #if defined(GRAN)
1734   tso->link = END_TSO_QUEUE;
1735   /* uses more flexible routine in GranSim */
1736   insertThread(tso, CurrentProc);
1737 #else
1738   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
1739    * from its creation
1740    */
1741 #endif
1742
1743 #if defined(GRAN) 
1744   if (RtsFlags.GranFlags.GranSimStats.Full) 
1745     DumpGranEvent(GR_START,tso);
1746 #elif defined(PAR)
1747   if (RtsFlags.ParFlags.ParStats.Full) 
1748     DumpGranEvent(GR_STARTQ,tso);
1749   /* HACk to avoid SCHEDULE 
1750      LastTSO = tso; */
1751 #endif
1752
1753   /* Link the new thread on the global thread list.
1754    */
1755   tso->global_link = all_threads;
1756   all_threads = tso;
1757
1758 #if defined(DIST)
1759   tso->dist.priority = MandatoryPriority; //by default that is...
1760 #endif
1761
1762 #if defined(GRAN)
1763   tso->gran.pri = pri;
1764 # if defined(DEBUG)
1765   tso->gran.magic = TSO_MAGIC; // debugging only
1766 # endif
1767   tso->gran.sparkname   = 0;
1768   tso->gran.startedat   = CURRENT_TIME; 
1769   tso->gran.exported    = 0;
1770   tso->gran.basicblocks = 0;
1771   tso->gran.allocs      = 0;
1772   tso->gran.exectime    = 0;
1773   tso->gran.fetchtime   = 0;
1774   tso->gran.fetchcount  = 0;
1775   tso->gran.blocktime   = 0;
1776   tso->gran.blockcount  = 0;
1777   tso->gran.blockedat   = 0;
1778   tso->gran.globalsparks = 0;
1779   tso->gran.localsparks  = 0;
1780   if (RtsFlags.GranFlags.Light)
1781     tso->gran.clock  = Now; /* local clock */
1782   else
1783     tso->gran.clock  = 0;
1784
1785   IF_DEBUG(gran,printTSO(tso));
1786 #elif defined(PAR)
1787 # if defined(DEBUG)
1788   tso->par.magic = TSO_MAGIC; // debugging only
1789 # endif
1790   tso->par.sparkname   = 0;
1791   tso->par.startedat   = CURRENT_TIME; 
1792   tso->par.exported    = 0;
1793   tso->par.basicblocks = 0;
1794   tso->par.allocs      = 0;
1795   tso->par.exectime    = 0;
1796   tso->par.fetchtime   = 0;
1797   tso->par.fetchcount  = 0;
1798   tso->par.blocktime   = 0;
1799   tso->par.blockcount  = 0;
1800   tso->par.blockedat   = 0;
1801   tso->par.globalsparks = 0;
1802   tso->par.localsparks  = 0;
1803 #endif
1804
1805 #if defined(GRAN)
1806   globalGranStats.tot_threads_created++;
1807   globalGranStats.threads_created_on_PE[CurrentProc]++;
1808   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
1809   globalGranStats.tot_sq_probes++;
1810 #elif defined(PAR)
1811   // collect parallel global statistics (currently done together with GC stats)
1812   if (RtsFlags.ParFlags.ParStats.Global &&
1813       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1814     //fprintf(stderr, "Creating thread %d @ %11.2f\n", tso->id, usertime()); 
1815     globalParStats.tot_threads_created++;
1816   }
1817 #endif 
1818
1819 #if defined(GRAN)
1820   IF_GRAN_DEBUG(pri,
1821                 belch("==__ schedule: Created TSO %d (%p);",
1822                       CurrentProc, tso, tso->id));
1823 #elif defined(PAR)
1824     IF_PAR_DEBUG(verbose,
1825                  belch("==__ schedule: Created TSO %d (%p); %d threads active",
1826                        tso->id, tso, advisory_thread_count));
1827 #else
1828   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
1829                                  tso->id, tso->stack_size));
1830 #endif    
1831   return tso;
1832 }
1833
1834 #if defined(PAR)
1835 /* RFP:
1836    all parallel thread creation calls should fall through the following routine.
1837 */
1838 StgTSO *
1839 createSparkThread(rtsSpark spark) 
1840 { StgTSO *tso;
1841   ASSERT(spark != (rtsSpark)NULL);
1842   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
1843   { threadsIgnored++;
1844     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
1845           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
1846     return END_TSO_QUEUE;
1847   }
1848   else
1849   { threadsCreated++;
1850     tso = createThread_(RtsFlags.GcFlags.initialStkSize, rtsTrue);
1851     if (tso==END_TSO_QUEUE)     
1852       barf("createSparkThread: Cannot create TSO");
1853 #if defined(DIST)
1854     tso->priority = AdvisoryPriority;
1855 #endif
1856     pushClosure(tso,spark);
1857     PUSH_ON_RUN_QUEUE(tso);
1858     advisory_thread_count++;    
1859   }
1860   return tso;
1861 }
1862 #endif
1863
1864 /*
1865   Turn a spark into a thread.
1866   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
1867 */
1868 #if defined(PAR)
1869 //@cindex activateSpark
1870 StgTSO *
1871 activateSpark (rtsSpark spark) 
1872 {
1873   StgTSO *tso;
1874
1875   tso = createSparkThread(spark);
1876   if (RtsFlags.ParFlags.ParStats.Full) {   
1877     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
1878     IF_PAR_DEBUG(verbose,
1879                  belch("==^^ activateSpark: turning spark of closure %p (%s) into a thread",
1880                        (StgClosure *)spark, info_type((StgClosure *)spark)));
1881   }
1882   // ToDo: fwd info on local/global spark to thread -- HWL
1883   // tso->gran.exported =  spark->exported;
1884   // tso->gran.locked =   !spark->global;
1885   // tso->gran.sparkname = spark->name;
1886
1887   return tso;
1888 }
1889 #endif
1890
1891 /* ---------------------------------------------------------------------------
1892  * scheduleThread()
1893  *
1894  * scheduleThread puts a thread on the head of the runnable queue.
1895  * This will usually be done immediately after a thread is created.
1896  * The caller of scheduleThread must create the thread using e.g.
1897  * createThread and push an appropriate closure
1898  * on this thread's stack before the scheduler is invoked.
1899  * ------------------------------------------------------------------------ */
1900
1901 void
1902 scheduleThread(StgTSO *tso)
1903 {
1904   ACQUIRE_LOCK(&sched_mutex);
1905
1906   /* Put the new thread on the head of the runnable queue.  The caller
1907    * better push an appropriate closure on this thread's stack
1908    * beforehand.  In the SMP case, the thread may start running as
1909    * soon as we release the scheduler lock below.
1910    */
1911   PUSH_ON_RUN_QUEUE(tso);
1912   THREAD_RUNNABLE();
1913
1914 #if 0
1915   IF_DEBUG(scheduler,printTSO(tso));
1916 #endif
1917   RELEASE_LOCK(&sched_mutex);
1918 }
1919
1920 /* ---------------------------------------------------------------------------
1921  * initScheduler()
1922  *
1923  * Initialise the scheduler.  This resets all the queues - if the
1924  * queues contained any threads, they'll be garbage collected at the
1925  * next pass.
1926  *
1927  * ------------------------------------------------------------------------ */
1928
1929 #ifdef SMP
1930 static void
1931 term_handler(int sig STG_UNUSED)
1932 {
1933   stat_workerStop();
1934   ACQUIRE_LOCK(&term_mutex);
1935   await_death--;
1936   RELEASE_LOCK(&term_mutex);
1937   shutdownThread();
1938 }
1939 #endif
1940
1941 void 
1942 initScheduler(void)
1943 {
1944 #if defined(GRAN)
1945   nat i;
1946
1947   for (i=0; i<=MAX_PROC; i++) {
1948     run_queue_hds[i]      = END_TSO_QUEUE;
1949     run_queue_tls[i]      = END_TSO_QUEUE;
1950     blocked_queue_hds[i]  = END_TSO_QUEUE;
1951     blocked_queue_tls[i]  = END_TSO_QUEUE;
1952     ccalling_threadss[i]  = END_TSO_QUEUE;
1953     sleeping_queue        = END_TSO_QUEUE;
1954   }
1955 #else
1956   run_queue_hd      = END_TSO_QUEUE;
1957   run_queue_tl      = END_TSO_QUEUE;
1958   blocked_queue_hd  = END_TSO_QUEUE;
1959   blocked_queue_tl  = END_TSO_QUEUE;
1960   sleeping_queue    = END_TSO_QUEUE;
1961 #endif 
1962
1963   suspended_ccalling_threads  = END_TSO_QUEUE;
1964
1965   main_threads = NULL;
1966   all_threads  = END_TSO_QUEUE;
1967
1968   context_switch = 0;
1969   interrupted    = 0;
1970
1971   RtsFlags.ConcFlags.ctxtSwitchTicks =
1972       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
1973       
1974 #if defined(RTS_SUPPORTS_THREADS)
1975   /* Initialise the mutex and condition variables used by
1976    * the scheduler. */
1977   initMutex(&rts_mutex);
1978   initMutex(&sched_mutex);
1979   initMutex(&term_mutex);
1980 #if defined(THREADED_RTS)
1981   initMutex(&thread_ready_aux_mutex);
1982 #endif
1983   
1984   initCondition(&thread_ready_cond);
1985   initCondition(&gc_pending_cond);
1986 #endif
1987
1988 #if defined(THREADED_RTS)
1989   /* Grab big lock */
1990   ACQUIRE_LOCK(&rts_mutex);
1991   IF_DEBUG(scheduler, 
1992            sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId()));
1993 #endif
1994
1995   /* Install the SIGHUP handler */
1996 #ifdef SMP
1997   {
1998     struct sigaction action,oact;
1999
2000     action.sa_handler = term_handler;
2001     sigemptyset(&action.sa_mask);
2002     action.sa_flags = 0;
2003     if (sigaction(SIGTERM, &action, &oact) != 0) {
2004       barf("can't install TERM handler");
2005     }
2006   }
2007 #endif
2008
2009   /* A capability holds the state a native thread needs in
2010    * order to execute STG code. At least one capability is
2011    * floating around (only SMP builds have more than one).
2012    */
2013   initCapabilities();
2014   
2015 #if defined(RTS_SUPPORTS_THREADS)
2016     /* start our haskell execution tasks */
2017 # if defined(SMP)
2018     startTaskManager(RtsFlags.ParFlags.nNodes, taskStart);
2019 # else
2020     startTaskManager(0,taskStart);
2021 # endif
2022 #endif
2023
2024 #if /* defined(SMP) ||*/ defined(PAR)
2025   initSparkPools();
2026 #endif
2027 }
2028
2029 void
2030 exitScheduler( void )
2031 {
2032 #if defined(RTS_SUPPORTS_THREADS)
2033   stopTaskManager();
2034 #endif
2035 }
2036
2037 /* -----------------------------------------------------------------------------
2038    Managing the per-task allocation areas.
2039    
2040    Each capability comes with an allocation area.  These are
2041    fixed-length block lists into which allocation can be done.
2042
2043    ToDo: no support for two-space collection at the moment???
2044    -------------------------------------------------------------------------- */
2045
2046 /* -----------------------------------------------------------------------------
2047  * waitThread is the external interface for running a new computation
2048  * and waiting for the result.
2049  *
2050  * In the non-SMP case, we create a new main thread, push it on the 
2051  * main-thread stack, and invoke the scheduler to run it.  The
2052  * scheduler will return when the top main thread on the stack has
2053  * completed or died, and fill in the necessary fields of the
2054  * main_thread structure.
2055  *
2056  * In the SMP case, we create a main thread as before, but we then
2057  * create a new condition variable and sleep on it.  When our new
2058  * main thread has completed, we'll be woken up and the status/result
2059  * will be in the main_thread struct.
2060  * -------------------------------------------------------------------------- */
2061
2062 int 
2063 howManyThreadsAvail ( void )
2064 {
2065    int i = 0;
2066    StgTSO* q;
2067    for (q = run_queue_hd; q != END_TSO_QUEUE; q = q->link)
2068       i++;
2069    for (q = blocked_queue_hd; q != END_TSO_QUEUE; q = q->link)
2070       i++;
2071    for (q = sleeping_queue; q != END_TSO_QUEUE; q = q->link)
2072       i++;
2073    return i;
2074 }
2075
2076 void
2077 finishAllThreads ( void )
2078 {
2079    do {
2080       while (run_queue_hd != END_TSO_QUEUE) {
2081          waitThread ( run_queue_hd, NULL );
2082       }
2083       while (blocked_queue_hd != END_TSO_QUEUE) {
2084          waitThread ( blocked_queue_hd, NULL );
2085       }
2086       while (sleeping_queue != END_TSO_QUEUE) {
2087          waitThread ( blocked_queue_hd, NULL );
2088       }
2089    } while 
2090       (blocked_queue_hd != END_TSO_QUEUE || 
2091        run_queue_hd     != END_TSO_QUEUE ||
2092        sleeping_queue   != END_TSO_QUEUE);
2093 }
2094
2095 SchedulerStatus
2096 waitThread(StgTSO *tso, /*out*/StgClosure **ret)
2097 {
2098   StgMainThread *m;
2099   SchedulerStatus stat;
2100
2101   ACQUIRE_LOCK(&sched_mutex);
2102   
2103   m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2104
2105   m->tso = tso;
2106   m->ret = ret;
2107   m->stat = NoStatus;
2108 #if defined(RTS_SUPPORTS_THREADS)
2109   initCondition(&m->wakeup);
2110 #endif
2111
2112   m->link = main_threads;
2113   main_threads = m;
2114
2115   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", 
2116                               m->tso->id));
2117
2118 #ifdef SMP
2119   do {
2120     waitCondition(&m->wakeup, &sched_mutex);
2121   } while (m->stat == NoStatus);
2122 #elif defined(GRAN)
2123   /* GranSim specific init */
2124   CurrentTSO = m->tso;                // the TSO to run
2125   procStatus[MainProc] = Busy;        // status of main PE
2126   CurrentProc = MainProc;             // PE to run it on
2127
2128   schedule();
2129 #else
2130   RELEASE_LOCK(&sched_mutex);
2131   schedule();
2132   ASSERT(m->stat != NoStatus);
2133 #endif
2134
2135   stat = m->stat;
2136
2137 #if defined(RTS_SUPPORTS_THREADS)
2138   closeCondition(&m->wakeup);
2139 #endif
2140
2141   IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: main thread (%d) finished\n", 
2142                               m->tso->id));
2143   free(m);
2144
2145   RELEASE_LOCK(&sched_mutex);
2146
2147   return stat;
2148 }
2149
2150 //@node Run queue code, Garbage Collextion Routines, Suspend and Resume, Main scheduling code
2151 //@subsection Run queue code 
2152
2153 #if 0
2154 /* 
2155    NB: In GranSim we have many run queues; run_queue_hd is actually a macro
2156        unfolding to run_queue_hds[CurrentProc], thus CurrentProc is an
2157        implicit global variable that has to be correct when calling these
2158        fcts -- HWL 
2159 */
2160
2161 /* Put the new thread on the head of the runnable queue.
2162  * The caller of createThread better push an appropriate closure
2163  * on this thread's stack before the scheduler is invoked.
2164  */
2165 static /* inline */ void
2166 add_to_run_queue(tso)
2167 StgTSO* tso; 
2168 {
2169   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2170   tso->link = run_queue_hd;
2171   run_queue_hd = tso;
2172   if (run_queue_tl == END_TSO_QUEUE) {
2173     run_queue_tl = tso;
2174   }
2175 }
2176
2177 /* Put the new thread at the end of the runnable queue. */
2178 static /* inline */ void
2179 push_on_run_queue(tso)
2180 StgTSO* tso; 
2181 {
2182   ASSERT(get_itbl((StgClosure *)tso)->type == TSO);
2183   ASSERT(run_queue_hd!=NULL && run_queue_tl!=NULL);
2184   ASSERT(tso!=run_queue_hd && tso!=run_queue_tl);
2185   if (run_queue_hd == END_TSO_QUEUE) {
2186     run_queue_hd = tso;
2187   } else {
2188     run_queue_tl->link = tso;
2189   }
2190   run_queue_tl = tso;
2191 }
2192
2193 /* 
2194    Should be inlined because it's used very often in schedule.  The tso
2195    argument is actually only needed in GranSim, where we want to have the
2196    possibility to schedule *any* TSO on the run queue, irrespective of the
2197    actual ordering. Therefore, if tso is not the nil TSO then we traverse
2198    the run queue and dequeue the tso, adjusting the links in the queue. 
2199 */
2200 //@cindex take_off_run_queue
2201 static /* inline */ StgTSO*
2202 take_off_run_queue(StgTSO *tso) {
2203   StgTSO *t, *prev;
2204
2205   /* 
2206      qetlaHbogh Qu' ngaSbogh ghomDaQ {tso} yIteq!
2207
2208      if tso is specified, unlink that tso from the run_queue (doesn't have
2209      to be at the beginning of the queue); GranSim only 
2210   */
2211   if (tso!=END_TSO_QUEUE) {
2212     /* find tso in queue */
2213     for (t=run_queue_hd, prev=END_TSO_QUEUE; 
2214          t!=END_TSO_QUEUE && t!=tso;
2215          prev=t, t=t->link) 
2216       /* nothing */ ;
2217     ASSERT(t==tso);
2218     /* now actually dequeue the tso */
2219     if (prev!=END_TSO_QUEUE) {
2220       ASSERT(run_queue_hd!=t);
2221       prev->link = t->link;
2222     } else {
2223       /* t is at beginning of thread queue */
2224       ASSERT(run_queue_hd==t);
2225       run_queue_hd = t->link;
2226     }
2227     /* t is at end of thread queue */
2228     if (t->link==END_TSO_QUEUE) {
2229       ASSERT(t==run_queue_tl);
2230       run_queue_tl = prev;
2231     } else {
2232       ASSERT(run_queue_tl!=t);
2233     }
2234     t->link = END_TSO_QUEUE;
2235   } else {
2236     /* take tso from the beginning of the queue; std concurrent code */
2237     t = run_queue_hd;
2238     if (t != END_TSO_QUEUE) {
2239       run_queue_hd = t->link;
2240       t->link = END_TSO_QUEUE;
2241       if (run_queue_hd == END_TSO_QUEUE) {
2242         run_queue_tl = END_TSO_QUEUE;
2243       }
2244     }
2245   }
2246   return t;
2247 }
2248
2249 #endif /* 0 */
2250
2251 //@node Garbage Collextion Routines, Blocking Queue Routines, Run queue code, Main scheduling code
2252 //@subsection Garbage Collextion Routines
2253
2254 /* ---------------------------------------------------------------------------
2255    Where are the roots that we know about?
2256
2257         - all the threads on the runnable queue
2258         - all the threads on the blocked queue
2259         - all the threads on the sleeping queue
2260         - all the thread currently executing a _ccall_GC
2261         - all the "main threads"
2262      
2263    ------------------------------------------------------------------------ */
2264
2265 /* This has to be protected either by the scheduler monitor, or by the
2266         garbage collection monitor (probably the latter).
2267         KH @ 25/10/99
2268 */
2269
2270 void
2271 GetRoots(evac_fn evac)
2272 {
2273   StgMainThread *m;
2274
2275 #if defined(GRAN)
2276   {
2277     nat i;
2278     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2279       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2280           evac((StgClosure **)&run_queue_hds[i]);
2281       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2282           evac((StgClosure **)&run_queue_tls[i]);
2283       
2284       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2285           evac((StgClosure **)&blocked_queue_hds[i]);
2286       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2287           evac((StgClosure **)&blocked_queue_tls[i]);
2288       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2289           evac((StgClosure **)&ccalling_threads[i]);
2290     }
2291   }
2292
2293   markEventQueue();
2294
2295 #else /* !GRAN */
2296   if (run_queue_hd != END_TSO_QUEUE) {
2297       ASSERT(run_queue_tl != END_TSO_QUEUE);
2298       evac((StgClosure **)&run_queue_hd);
2299       evac((StgClosure **)&run_queue_tl);
2300   }
2301   
2302   if (blocked_queue_hd != END_TSO_QUEUE) {
2303       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2304       evac((StgClosure **)&blocked_queue_hd);
2305       evac((StgClosure **)&blocked_queue_tl);
2306   }
2307   
2308   if (sleeping_queue != END_TSO_QUEUE) {
2309       evac((StgClosure **)&sleeping_queue);
2310   }
2311 #endif 
2312
2313   for (m = main_threads; m != NULL; m = m->link) {
2314       evac((StgClosure **)&m->tso);
2315   }
2316   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2317       evac((StgClosure **)&suspended_ccalling_threads);
2318   }
2319
2320 #if defined(PAR) || defined(GRAN)
2321   markSparkQueue(evac);
2322 #endif
2323 }
2324
2325 /* -----------------------------------------------------------------------------
2326    performGC
2327
2328    This is the interface to the garbage collector from Haskell land.
2329    We provide this so that external C code can allocate and garbage
2330    collect when called from Haskell via _ccall_GC.
2331
2332    It might be useful to provide an interface whereby the programmer
2333    can specify more roots (ToDo).
2334    
2335    This needs to be protected by the GC condition variable above.  KH.
2336    -------------------------------------------------------------------------- */
2337
2338 void (*extra_roots)(evac_fn);
2339
2340 void
2341 performGC(void)
2342 {
2343   GarbageCollect(GetRoots,rtsFalse);
2344 }
2345
2346 void
2347 performMajorGC(void)
2348 {
2349   GarbageCollect(GetRoots,rtsTrue);
2350 }
2351
2352 static void
2353 AllRoots(evac_fn evac)
2354 {
2355     GetRoots(evac);             // the scheduler's roots
2356     extra_roots(evac);          // the user's roots
2357 }
2358
2359 void
2360 performGCWithRoots(void (*get_roots)(evac_fn))
2361 {
2362   extra_roots = get_roots;
2363   GarbageCollect(AllRoots,rtsFalse);
2364 }
2365
2366 /* -----------------------------------------------------------------------------
2367    Stack overflow
2368
2369    If the thread has reached its maximum stack size, then raise the
2370    StackOverflow exception in the offending thread.  Otherwise
2371    relocate the TSO into a larger chunk of memory and adjust its stack
2372    size appropriately.
2373    -------------------------------------------------------------------------- */
2374
2375 static StgTSO *
2376 threadStackOverflow(StgTSO *tso)
2377 {
2378   nat new_stack_size, new_tso_size, diff, stack_words;
2379   StgPtr new_sp;
2380   StgTSO *dest;
2381
2382   IF_DEBUG(sanity,checkTSO(tso));
2383   if (tso->stack_size >= tso->max_stack_size) {
2384
2385     IF_DEBUG(gc,
2386              belch("@@ threadStackOverflow of TSO %d (%p): stack too large (now %ld; max is %ld",
2387                    tso->id, tso, tso->stack_size, tso->max_stack_size);
2388              /* If we're debugging, just print out the top of the stack */
2389              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2390                                               tso->sp+64)));
2391
2392     /* Send this thread the StackOverflow exception */
2393     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2394     return tso;
2395   }
2396
2397   /* Try to double the current stack size.  If that takes us over the
2398    * maximum stack size for this thread, then use the maximum instead.
2399    * Finally round up so the TSO ends up as a whole number of blocks.
2400    */
2401   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2402   new_tso_size   = (nat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2403                                        TSO_STRUCT_SIZE)/sizeof(W_);
2404   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2405   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2406
2407   IF_DEBUG(scheduler, fprintf(stderr,"== scheduler: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2408
2409   dest = (StgTSO *)allocate(new_tso_size);
2410   TICK_ALLOC_TSO(new_stack_size,0);
2411
2412   /* copy the TSO block and the old stack into the new area */
2413   memcpy(dest,tso,TSO_STRUCT_SIZE);
2414   stack_words = tso->stack + tso->stack_size - tso->sp;
2415   new_sp = (P_)dest + new_tso_size - stack_words;
2416   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2417
2418   /* relocate the stack pointers... */
2419   diff = (P_)new_sp - (P_)tso->sp; /* In *words* */
2420   dest->su    = (StgUpdateFrame *) ((P_)dest->su + diff);
2421   dest->sp    = new_sp;
2422   dest->stack_size = new_stack_size;
2423         
2424   /* and relocate the update frame list */
2425   relocate_stack(dest, diff);
2426
2427   /* Mark the old TSO as relocated.  We have to check for relocated
2428    * TSOs in the garbage collector and any primops that deal with TSOs.
2429    *
2430    * It's important to set the sp and su values to just beyond the end
2431    * of the stack, so we don't attempt to scavenge any part of the
2432    * dead TSO's stack.
2433    */
2434   tso->what_next = ThreadRelocated;
2435   tso->link = dest;
2436   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2437   tso->su = (StgUpdateFrame *)tso->sp;
2438   tso->why_blocked = NotBlocked;
2439   dest->mut_link = NULL;
2440
2441   IF_PAR_DEBUG(verbose,
2442                belch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld",
2443                      tso->id, tso, tso->stack_size);
2444                /* If we're debugging, just print out the top of the stack */
2445                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2446                                                 tso->sp+64)));
2447   
2448   IF_DEBUG(sanity,checkTSO(tso));
2449 #if 0
2450   IF_DEBUG(scheduler,printTSO(dest));
2451 #endif
2452
2453   return dest;
2454 }
2455
2456 //@node Blocking Queue Routines, Exception Handling Routines, Garbage Collextion Routines, Main scheduling code
2457 //@subsection Blocking Queue Routines
2458
2459 /* ---------------------------------------------------------------------------
2460    Wake up a queue that was blocked on some resource.
2461    ------------------------------------------------------------------------ */
2462
2463 #if defined(GRAN)
2464 static inline void
2465 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2466 {
2467 }
2468 #elif defined(PAR)
2469 static inline void
2470 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2471 {
2472   /* write RESUME events to log file and
2473      update blocked and fetch time (depending on type of the orig closure) */
2474   if (RtsFlags.ParFlags.ParStats.Full) {
2475     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2476                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2477                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2478     if (EMPTY_RUN_QUEUE())
2479       emitSchedule = rtsTrue;
2480
2481     switch (get_itbl(node)->type) {
2482         case FETCH_ME_BQ:
2483           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2484           break;
2485         case RBH:
2486         case FETCH_ME:
2487         case BLACKHOLE_BQ:
2488           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2489           break;
2490 #ifdef DIST
2491         case MVAR:
2492           break;
2493 #endif    
2494         default:
2495           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2496         }
2497       }
2498 }
2499 #endif
2500
2501 #if defined(GRAN)
2502 static StgBlockingQueueElement *
2503 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2504 {
2505     StgTSO *tso;
2506     PEs node_loc, tso_loc;
2507
2508     node_loc = where_is(node); // should be lifted out of loop
2509     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2510     tso_loc = where_is((StgClosure *)tso);
2511     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2512       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2513       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2514       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2515       // insertThread(tso, node_loc);
2516       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2517                 ResumeThread,
2518                 tso, node, (rtsSpark*)NULL);
2519       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2520       // len_local++;
2521       // len++;
2522     } else { // TSO is remote (actually should be FMBQ)
2523       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2524                                   RtsFlags.GranFlags.Costs.gunblocktime +
2525                                   RtsFlags.GranFlags.Costs.latency;
2526       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2527                 UnblockThread,
2528                 tso, node, (rtsSpark*)NULL);
2529       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2530       // len++;
2531     }
2532     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2533     IF_GRAN_DEBUG(bq,
2534                   fprintf(stderr," %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2535                           (node_loc==tso_loc ? "Local" : "Global"), 
2536                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2537     tso->block_info.closure = NULL;
2538     IF_DEBUG(scheduler,belch("-- Waking up thread %ld (%p)", 
2539                              tso->id, tso));
2540 }
2541 #elif defined(PAR)
2542 static StgBlockingQueueElement *
2543 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2544 {
2545     StgBlockingQueueElement *next;
2546
2547     switch (get_itbl(bqe)->type) {
2548     case TSO:
2549       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2550       /* if it's a TSO just push it onto the run_queue */
2551       next = bqe->link;
2552       // ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2553       PUSH_ON_RUN_QUEUE((StgTSO *)bqe); 
2554       THREAD_RUNNABLE();
2555       unblockCount(bqe, node);
2556       /* reset blocking status after dumping event */
2557       ((StgTSO *)bqe)->why_blocked = NotBlocked;
2558       break;
2559
2560     case BLOCKED_FETCH:
2561       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
2562       next = bqe->link;
2563       bqe->link = (StgBlockingQueueElement *)PendingFetches;
2564       PendingFetches = (StgBlockedFetch *)bqe;
2565       break;
2566
2567 # if defined(DEBUG)
2568       /* can ignore this case in a non-debugging setup; 
2569          see comments on RBHSave closures above */
2570     case CONSTR:
2571       /* check that the closure is an RBHSave closure */
2572       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
2573              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
2574              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
2575       break;
2576
2577     default:
2578       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
2579            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
2580            (StgClosure *)bqe);
2581 # endif
2582     }
2583   IF_PAR_DEBUG(bq, fprintf(stderr, ", %p (%s)", bqe, info_type((StgClosure*)bqe)));
2584   return next;
2585 }
2586
2587 #else /* !GRAN && !PAR */
2588 static StgTSO *
2589 unblockOneLocked(StgTSO *tso)
2590 {
2591   StgTSO *next;
2592
2593   ASSERT(get_itbl(tso)->type == TSO);
2594   ASSERT(tso->why_blocked != NotBlocked);
2595   tso->why_blocked = NotBlocked;
2596   next = tso->link;
2597   PUSH_ON_RUN_QUEUE(tso);
2598   THREAD_RUNNABLE();
2599   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", tso->id));
2600   return next;
2601 }
2602 #endif
2603
2604 #if defined(GRAN) || defined(PAR)
2605 inline StgBlockingQueueElement *
2606 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
2607 {
2608   ACQUIRE_LOCK(&sched_mutex);
2609   bqe = unblockOneLocked(bqe, node);
2610   RELEASE_LOCK(&sched_mutex);
2611   return bqe;
2612 }
2613 #else
2614 inline StgTSO *
2615 unblockOne(StgTSO *tso)
2616 {
2617   ACQUIRE_LOCK(&sched_mutex);
2618   tso = unblockOneLocked(tso);
2619   RELEASE_LOCK(&sched_mutex);
2620   return tso;
2621 }
2622 #endif
2623
2624 #if defined(GRAN)
2625 void 
2626 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2627 {
2628   StgBlockingQueueElement *bqe;
2629   PEs node_loc;
2630   nat len = 0; 
2631
2632   IF_GRAN_DEBUG(bq, 
2633                 belch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): ", \
2634                       node, CurrentProc, CurrentTime[CurrentProc], 
2635                       CurrentTSO->id, CurrentTSO));
2636
2637   node_loc = where_is(node);
2638
2639   ASSERT(q == END_BQ_QUEUE ||
2640          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
2641          get_itbl(q)->type == CONSTR); // closure (type constructor)
2642   ASSERT(is_unique(node));
2643
2644   /* FAKE FETCH: magically copy the node to the tso's proc;
2645      no Fetch necessary because in reality the node should not have been 
2646      moved to the other PE in the first place
2647   */
2648   if (CurrentProc!=node_loc) {
2649     IF_GRAN_DEBUG(bq, 
2650                   belch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)",
2651                         node, node_loc, CurrentProc, CurrentTSO->id, 
2652                         // CurrentTSO, where_is(CurrentTSO),
2653                         node->header.gran.procs));
2654     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
2655     IF_GRAN_DEBUG(bq, 
2656                   belch("## new bitmask of node %p is %#x",
2657                         node, node->header.gran.procs));
2658     if (RtsFlags.GranFlags.GranSimStats.Global) {
2659       globalGranStats.tot_fake_fetches++;
2660     }
2661   }
2662
2663   bqe = q;
2664   // ToDo: check: ASSERT(CurrentProc==node_loc);
2665   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
2666     //next = bqe->link;
2667     /* 
2668        bqe points to the current element in the queue
2669        next points to the next element in the queue
2670     */
2671     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2672     //tso_loc = where_is(tso);
2673     len++;
2674     bqe = unblockOneLocked(bqe, node);
2675   }
2676
2677   /* if this is the BQ of an RBH, we have to put back the info ripped out of
2678      the closure to make room for the anchor of the BQ */
2679   if (bqe!=END_BQ_QUEUE) {
2680     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
2681     /*
2682     ASSERT((info_ptr==&RBH_Save_0_info) ||
2683            (info_ptr==&RBH_Save_1_info) ||
2684            (info_ptr==&RBH_Save_2_info));
2685     */
2686     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
2687     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
2688     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
2689
2690     IF_GRAN_DEBUG(bq,
2691                   belch("## Filled in RBH_Save for %p (%s) at end of AwBQ",
2692                         node, info_type(node)));
2693   }
2694
2695   /* statistics gathering */
2696   if (RtsFlags.GranFlags.GranSimStats.Global) {
2697     // globalGranStats.tot_bq_processing_time += bq_processing_time;
2698     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
2699     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
2700     globalGranStats.tot_awbq++;             // total no. of bqs awakened
2701   }
2702   IF_GRAN_DEBUG(bq,
2703                 fprintf(stderr,"## BQ Stats of %p: [%d entries] %s\n",
2704                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
2705 }
2706 #elif defined(PAR)
2707 void 
2708 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
2709 {
2710   StgBlockingQueueElement *bqe;
2711
2712   ACQUIRE_LOCK(&sched_mutex);
2713
2714   IF_PAR_DEBUG(verbose, 
2715                belch("##-_ AwBQ for node %p on [%x]: ",
2716                      node, mytid));
2717 #ifdef DIST  
2718   //RFP
2719   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
2720     IF_PAR_DEBUG(verbose, belch("## ... nothing to unblock so lets just return. RFP (BUG?)"));
2721     return;
2722   }
2723 #endif
2724   
2725   ASSERT(q == END_BQ_QUEUE ||
2726          get_itbl(q)->type == TSO ||           
2727          get_itbl(q)->type == BLOCKED_FETCH || 
2728          get_itbl(q)->type == CONSTR); 
2729
2730   bqe = q;
2731   while (get_itbl(bqe)->type==TSO || 
2732          get_itbl(bqe)->type==BLOCKED_FETCH) {
2733     bqe = unblockOneLocked(bqe, node);
2734   }
2735   RELEASE_LOCK(&sched_mutex);
2736 }
2737
2738 #else   /* !GRAN && !PAR */
2739 void
2740 awakenBlockedQueue(StgTSO *tso)
2741 {
2742   ACQUIRE_LOCK(&sched_mutex);
2743   while (tso != END_TSO_QUEUE) {
2744     tso = unblockOneLocked(tso);
2745   }
2746   RELEASE_LOCK(&sched_mutex);
2747 }
2748 #endif
2749
2750 //@node Exception Handling Routines, Debugging Routines, Blocking Queue Routines, Main scheduling code
2751 //@subsection Exception Handling Routines
2752
2753 /* ---------------------------------------------------------------------------
2754    Interrupt execution
2755    - usually called inside a signal handler so it mustn't do anything fancy.   
2756    ------------------------------------------------------------------------ */
2757
2758 void
2759 interruptStgRts(void)
2760 {
2761     interrupted    = 1;
2762     context_switch = 1;
2763 }
2764
2765 /* -----------------------------------------------------------------------------
2766    Unblock a thread
2767
2768    This is for use when we raise an exception in another thread, which
2769    may be blocked.
2770    This has nothing to do with the UnblockThread event in GranSim. -- HWL
2771    -------------------------------------------------------------------------- */
2772
2773 #if defined(GRAN) || defined(PAR)
2774 /*
2775   NB: only the type of the blocking queue is different in GranSim and GUM
2776       the operations on the queue-elements are the same
2777       long live polymorphism!
2778 */
2779 static void
2780 unblockThread(StgTSO *tso)
2781 {
2782   StgBlockingQueueElement *t, **last;
2783
2784   ACQUIRE_LOCK(&sched_mutex);
2785   switch (tso->why_blocked) {
2786
2787   case NotBlocked:
2788     return;  /* not blocked */
2789
2790   case BlockedOnMVar:
2791     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2792     {
2793       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
2794       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2795
2796       last = (StgBlockingQueueElement **)&mvar->head;
2797       for (t = (StgBlockingQueueElement *)mvar->head; 
2798            t != END_BQ_QUEUE; 
2799            last = &t->link, last_tso = t, t = t->link) {
2800         if (t == (StgBlockingQueueElement *)tso) {
2801           *last = (StgBlockingQueueElement *)tso->link;
2802           if (mvar->tail == tso) {
2803             mvar->tail = (StgTSO *)last_tso;
2804           }
2805           goto done;
2806         }
2807       }
2808       barf("unblockThread (MVAR): TSO not found");
2809     }
2810
2811   case BlockedOnBlackHole:
2812     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2813     {
2814       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2815
2816       last = &bq->blocking_queue;
2817       for (t = bq->blocking_queue; 
2818            t != END_BQ_QUEUE; 
2819            last = &t->link, t = t->link) {
2820         if (t == (StgBlockingQueueElement *)tso) {
2821           *last = (StgBlockingQueueElement *)tso->link;
2822           goto done;
2823         }
2824       }
2825       barf("unblockThread (BLACKHOLE): TSO not found");
2826     }
2827
2828   case BlockedOnException:
2829     {
2830       StgTSO *target  = tso->block_info.tso;
2831
2832       ASSERT(get_itbl(target)->type == TSO);
2833
2834       if (target->what_next == ThreadRelocated) {
2835           target = target->link;
2836           ASSERT(get_itbl(target)->type == TSO);
2837       }
2838
2839       ASSERT(target->blocked_exceptions != NULL);
2840
2841       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
2842       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
2843            t != END_BQ_QUEUE; 
2844            last = &t->link, t = t->link) {
2845         ASSERT(get_itbl(t)->type == TSO);
2846         if (t == (StgBlockingQueueElement *)tso) {
2847           *last = (StgBlockingQueueElement *)tso->link;
2848           goto done;
2849         }
2850       }
2851       barf("unblockThread (Exception): TSO not found");
2852     }
2853
2854   case BlockedOnRead:
2855   case BlockedOnWrite:
2856     {
2857       /* take TSO off blocked_queue */
2858       StgBlockingQueueElement *prev = NULL;
2859       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
2860            prev = t, t = t->link) {
2861         if (t == (StgBlockingQueueElement *)tso) {
2862           if (prev == NULL) {
2863             blocked_queue_hd = (StgTSO *)t->link;
2864             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2865               blocked_queue_tl = END_TSO_QUEUE;
2866             }
2867           } else {
2868             prev->link = t->link;
2869             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
2870               blocked_queue_tl = (StgTSO *)prev;
2871             }
2872           }
2873           goto done;
2874         }
2875       }
2876       barf("unblockThread (I/O): TSO not found");
2877     }
2878
2879   case BlockedOnDelay:
2880     {
2881       /* take TSO off sleeping_queue */
2882       StgBlockingQueueElement *prev = NULL;
2883       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
2884            prev = t, t = t->link) {
2885         if (t == (StgBlockingQueueElement *)tso) {
2886           if (prev == NULL) {
2887             sleeping_queue = (StgTSO *)t->link;
2888           } else {
2889             prev->link = t->link;
2890           }
2891           goto done;
2892         }
2893       }
2894       barf("unblockThread (I/O): TSO not found");
2895     }
2896
2897   default:
2898     barf("unblockThread");
2899   }
2900
2901  done:
2902   tso->link = END_TSO_QUEUE;
2903   tso->why_blocked = NotBlocked;
2904   tso->block_info.closure = NULL;
2905   PUSH_ON_RUN_QUEUE(tso);
2906   RELEASE_LOCK(&sched_mutex);
2907 }
2908 #else
2909 static void
2910 unblockThread(StgTSO *tso)
2911 {
2912   StgTSO *t, **last;
2913
2914   ACQUIRE_LOCK(&sched_mutex);
2915   switch (tso->why_blocked) {
2916
2917   case NotBlocked:
2918     return;  /* not blocked */
2919
2920   case BlockedOnMVar:
2921     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
2922     {
2923       StgTSO *last_tso = END_TSO_QUEUE;
2924       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
2925
2926       last = &mvar->head;
2927       for (t = mvar->head; t != END_TSO_QUEUE; 
2928            last = &t->link, last_tso = t, t = t->link) {
2929         if (t == tso) {
2930           *last = tso->link;
2931           if (mvar->tail == tso) {
2932             mvar->tail = last_tso;
2933           }
2934           goto done;
2935         }
2936       }
2937       barf("unblockThread (MVAR): TSO not found");
2938     }
2939
2940   case BlockedOnBlackHole:
2941     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
2942     {
2943       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
2944
2945       last = &bq->blocking_queue;
2946       for (t = bq->blocking_queue; t != END_TSO_QUEUE; 
2947            last = &t->link, t = t->link) {
2948         if (t == tso) {
2949           *last = tso->link;
2950           goto done;
2951         }
2952       }
2953       barf("unblockThread (BLACKHOLE): TSO not found");
2954     }
2955
2956   case BlockedOnException:
2957     {
2958       StgTSO *target  = tso->block_info.tso;
2959
2960       ASSERT(get_itbl(target)->type == TSO);
2961
2962       while (target->what_next == ThreadRelocated) {
2963           target = target->link;
2964           ASSERT(get_itbl(target)->type == TSO);
2965       }
2966       
2967       ASSERT(target->blocked_exceptions != NULL);
2968
2969       last = &target->blocked_exceptions;
2970       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
2971            last = &t->link, t = t->link) {
2972         ASSERT(get_itbl(t)->type == TSO);
2973         if (t == tso) {
2974           *last = tso->link;
2975           goto done;
2976         }
2977       }
2978       barf("unblockThread (Exception): TSO not found");
2979     }
2980
2981   case BlockedOnRead:
2982   case BlockedOnWrite:
2983     {
2984       StgTSO *prev = NULL;
2985       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
2986            prev = t, t = t->link) {
2987         if (t == tso) {
2988           if (prev == NULL) {
2989             blocked_queue_hd = t->link;
2990             if (blocked_queue_tl == t) {
2991               blocked_queue_tl = END_TSO_QUEUE;
2992             }
2993           } else {
2994             prev->link = t->link;
2995             if (blocked_queue_tl == t) {
2996               blocked_queue_tl = prev;
2997             }
2998           }
2999           goto done;
3000         }
3001       }
3002       barf("unblockThread (I/O): TSO not found");
3003     }
3004
3005   case BlockedOnDelay:
3006     {
3007       StgTSO *prev = NULL;
3008       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3009            prev = t, t = t->link) {
3010         if (t == tso) {
3011           if (prev == NULL) {
3012             sleeping_queue = t->link;
3013           } else {
3014             prev->link = t->link;
3015           }
3016           goto done;
3017         }
3018       }
3019       barf("unblockThread (I/O): TSO not found");
3020     }
3021
3022   default:
3023     barf("unblockThread");
3024   }
3025
3026  done:
3027   tso->link = END_TSO_QUEUE;
3028   tso->why_blocked = NotBlocked;
3029   tso->block_info.closure = NULL;
3030   PUSH_ON_RUN_QUEUE(tso);
3031   RELEASE_LOCK(&sched_mutex);
3032 }
3033 #endif
3034
3035 /* -----------------------------------------------------------------------------
3036  * raiseAsync()
3037  *
3038  * The following function implements the magic for raising an
3039  * asynchronous exception in an existing thread.
3040  *
3041  * We first remove the thread from any queue on which it might be
3042  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3043  *
3044  * We strip the stack down to the innermost CATCH_FRAME, building
3045  * thunks in the heap for all the active computations, so they can 
3046  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3047  * an application of the handler to the exception, and push it on
3048  * the top of the stack.
3049  * 
3050  * How exactly do we save all the active computations?  We create an
3051  * AP_UPD for every UpdateFrame on the stack.  Entering one of these
3052  * AP_UPDs pushes everything from the corresponding update frame
3053  * upwards onto the stack.  (Actually, it pushes everything up to the
3054  * next update frame plus a pointer to the next AP_UPD object.
3055  * Entering the next AP_UPD object pushes more onto the stack until we
3056  * reach the last AP_UPD object - at which point the stack should look
3057  * exactly as it did when we killed the TSO and we can continue
3058  * execution by entering the closure on top of the stack.
3059  *
3060  * We can also kill a thread entirely - this happens if either (a) the 
3061  * exception passed to raiseAsync is NULL, or (b) there's no
3062  * CATCH_FRAME on the stack.  In either case, we strip the entire
3063  * stack and replace the thread with a zombie.
3064  *
3065  * -------------------------------------------------------------------------- */
3066  
3067 void 
3068 deleteThread(StgTSO *tso)
3069 {
3070   raiseAsync(tso,NULL);
3071 }
3072
3073 void
3074 raiseAsync(StgTSO *tso, StgClosure *exception)
3075 {
3076   StgUpdateFrame* su = tso->su;
3077   StgPtr          sp = tso->sp;
3078   
3079   /* Thread already dead? */
3080   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3081     return;
3082   }
3083
3084   IF_DEBUG(scheduler, sched_belch("raising exception in thread %ld.", tso->id));
3085
3086   /* Remove it from any blocking queues */
3087   unblockThread(tso);
3088
3089   /* The stack freezing code assumes there's a closure pointer on
3090    * the top of the stack.  This isn't always the case with compiled
3091    * code, so we have to push a dummy closure on the top which just
3092    * returns to the next return address on the stack.
3093    */
3094   if ( LOOKS_LIKE_GHC_INFO((void*)*sp) ) {
3095     *(--sp) = (W_)&stg_dummy_ret_closure;
3096   }
3097
3098   while (1) {
3099     nat words = ((P_)su - (P_)sp) - 1;
3100     nat i;
3101     StgAP_UPD * ap;
3102
3103     /* If we find a CATCH_FRAME, and we've got an exception to raise,
3104      * then build PAP(handler,exception,realworld#), and leave it on
3105      * top of the stack ready to enter.
3106      */
3107     if (get_itbl(su)->type == CATCH_FRAME && exception != NULL) {
3108       StgCatchFrame *cf = (StgCatchFrame *)su;
3109       /* we've got an exception to raise, so let's pass it to the
3110        * handler in this frame.
3111        */
3112       ap = (StgAP_UPD *)allocate(sizeofW(StgPAP) + 2);
3113       TICK_ALLOC_UPD_PAP(3,0);
3114       SET_HDR(ap,&stg_PAP_info,cf->header.prof.ccs);
3115               
3116       ap->n_args = 2;
3117       ap->fun = cf->handler;    /* :: Exception -> IO a */
3118       ap->payload[0] = exception;
3119       ap->payload[1] = ARG_TAG(0); /* realworld token */
3120
3121       /* throw away the stack from Sp up to and including the
3122        * CATCH_FRAME.
3123        */
3124       sp = (P_)su + sizeofW(StgCatchFrame) - 1; 
3125       tso->su = cf->link;
3126
3127       /* Restore the blocked/unblocked state for asynchronous exceptions
3128        * at the CATCH_FRAME.  
3129        *
3130        * If exceptions were unblocked at the catch, arrange that they
3131        * are unblocked again after executing the handler by pushing an
3132        * unblockAsyncExceptions_ret stack frame.
3133        */
3134       if (!cf->exceptions_blocked) {
3135         *(sp--) = (W_)&stg_unblockAsyncExceptionszh_ret_info;
3136       }
3137       
3138       /* Ensure that async exceptions are blocked when running the handler.
3139        */
3140       if (tso->blocked_exceptions == NULL) {
3141         tso->blocked_exceptions = END_TSO_QUEUE;
3142       }
3143       
3144       /* Put the newly-built PAP on top of the stack, ready to execute
3145        * when the thread restarts.
3146        */
3147       sp[0] = (W_)ap;
3148       tso->sp = sp;
3149       tso->what_next = ThreadEnterGHC;
3150       IF_DEBUG(sanity, checkTSO(tso));
3151       return;
3152     }
3153
3154     /* First build an AP_UPD consisting of the stack chunk above the
3155      * current update frame, with the top word on the stack as the
3156      * fun field.
3157      */
3158     ap = (StgAP_UPD *)allocate(AP_sizeW(words));
3159     
3160     ASSERT(words >= 0);
3161     
3162     ap->n_args = words;
3163     ap->fun    = (StgClosure *)sp[0];
3164     sp++;
3165     for(i=0; i < (nat)words; ++i) {
3166       ap->payload[i] = (StgClosure *)*sp++;
3167     }
3168     
3169     switch (get_itbl(su)->type) {
3170       
3171     case UPDATE_FRAME:
3172       {
3173         SET_HDR(ap,&stg_AP_UPD_info,su->header.prof.ccs /* ToDo */); 
3174         TICK_ALLOC_UP_THK(words+1,0);
3175         
3176         IF_DEBUG(scheduler,
3177                  fprintf(stderr,  "scheduler: Updating ");
3178                  printPtr((P_)su->updatee); 
3179                  fprintf(stderr,  " with ");
3180                  printObj((StgClosure *)ap);
3181                  );
3182         
3183         /* Replace the updatee with an indirection - happily
3184          * this will also wake up any threads currently
3185          * waiting on the result.
3186          *
3187          * Warning: if we're in a loop, more than one update frame on
3188          * the stack may point to the same object.  Be careful not to
3189          * overwrite an IND_OLDGEN in this case, because we'll screw
3190          * up the mutable lists.  To be on the safe side, don't
3191          * overwrite any kind of indirection at all.  See also
3192          * threadSqueezeStack in GC.c, where we have to make a similar
3193          * check.
3194          */
3195         if (!closure_IND(su->updatee)) {
3196             UPD_IND_NOLOCK(su->updatee,ap);  /* revert the black hole */
3197         }
3198         su = su->link;
3199         sp += sizeofW(StgUpdateFrame) -1;
3200         sp[0] = (W_)ap; /* push onto stack */
3201         break;
3202       }
3203
3204     case CATCH_FRAME:
3205       {
3206         StgCatchFrame *cf = (StgCatchFrame *)su;
3207         StgClosure* o;
3208         
3209         /* We want a PAP, not an AP_UPD.  Fortunately, the
3210          * layout's the same.
3211          */
3212         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3213         TICK_ALLOC_UPD_PAP(words+1,0);
3214         
3215         /* now build o = FUN(catch,ap,handler) */
3216         o = (StgClosure *)allocate(sizeofW(StgClosure)+2);
3217         TICK_ALLOC_FUN(2,0);
3218         SET_HDR(o,&stg_catch_info,su->header.prof.ccs /* ToDo */);
3219         o->payload[0] = (StgClosure *)ap;
3220         o->payload[1] = cf->handler;
3221         
3222         IF_DEBUG(scheduler,
3223                  fprintf(stderr,  "scheduler: Built ");
3224                  printObj((StgClosure *)o);
3225                  );
3226         
3227         /* pop the old handler and put o on the stack */
3228         su = cf->link;
3229         sp += sizeofW(StgCatchFrame) - 1;
3230         sp[0] = (W_)o;
3231         break;
3232       }
3233       
3234     case SEQ_FRAME:
3235       {
3236         StgSeqFrame *sf = (StgSeqFrame *)su;
3237         StgClosure* o;
3238         
3239         SET_HDR(ap,&stg_PAP_info,su->header.prof.ccs /* ToDo */);
3240         TICK_ALLOC_UPD_PAP(words+1,0);
3241         
3242         /* now build o = FUN(seq,ap) */
3243         o = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3244         TICK_ALLOC_SE_THK(1,0);
3245         SET_HDR(o,&stg_seq_info,su->header.prof.ccs /* ToDo */);
3246         o->payload[0] = (StgClosure *)ap;
3247         
3248         IF_DEBUG(scheduler,
3249                  fprintf(stderr,  "scheduler: Built ");
3250                  printObj((StgClosure *)o);
3251                  );
3252         
3253         /* pop the old handler and put o on the stack */
3254         su = sf->link;
3255         sp += sizeofW(StgSeqFrame) - 1;
3256         sp[0] = (W_)o;
3257         break;
3258       }
3259       
3260     case STOP_FRAME:
3261       /* We've stripped the entire stack, the thread is now dead. */
3262       sp += sizeofW(StgStopFrame) - 1;
3263       sp[0] = (W_)exception;    /* save the exception */
3264       tso->what_next = ThreadKilled;
3265       tso->su = (StgUpdateFrame *)(sp+1);
3266       tso->sp = sp;
3267       return;
3268
3269     default:
3270       barf("raiseAsync");
3271     }
3272   }
3273   barf("raiseAsync");
3274 }
3275
3276 /* -----------------------------------------------------------------------------
3277    resurrectThreads is called after garbage collection on the list of
3278    threads found to be garbage.  Each of these threads will be woken
3279    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3280    on an MVar, or NonTermination if the thread was blocked on a Black
3281    Hole.
3282    -------------------------------------------------------------------------- */
3283
3284 void
3285 resurrectThreads( StgTSO *threads )
3286 {
3287   StgTSO *tso, *next;
3288
3289   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3290     next = tso->global_link;
3291     tso->global_link = all_threads;
3292     all_threads = tso;
3293     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3294
3295     switch (tso->why_blocked) {
3296     case BlockedOnMVar:
3297     case BlockedOnException:
3298       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3299       break;
3300     case BlockedOnBlackHole:
3301       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3302       break;
3303     case NotBlocked:
3304       /* This might happen if the thread was blocked on a black hole
3305        * belonging to a thread that we've just woken up (raiseAsync
3306        * can wake up threads, remember...).
3307        */
3308       continue;
3309     default:
3310       barf("resurrectThreads: thread blocked in a strange way");
3311     }
3312   }
3313 }
3314
3315 /* -----------------------------------------------------------------------------
3316  * Blackhole detection: if we reach a deadlock, test whether any
3317  * threads are blocked on themselves.  Any threads which are found to
3318  * be self-blocked get sent a NonTermination exception.
3319  *
3320  * This is only done in a deadlock situation in order to avoid
3321  * performance overhead in the normal case.
3322  * -------------------------------------------------------------------------- */
3323
3324 static void
3325 detectBlackHoles( void )
3326 {
3327     StgTSO *t = all_threads;
3328     StgUpdateFrame *frame;
3329     StgClosure *blocked_on;
3330
3331     for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3332
3333         while (t->what_next == ThreadRelocated) {
3334             t = t->link;
3335             ASSERT(get_itbl(t)->type == TSO);
3336         }
3337       
3338         if (t->why_blocked != BlockedOnBlackHole) {
3339             continue;
3340         }
3341
3342         blocked_on = t->block_info.closure;
3343
3344         for (frame = t->su; ; frame = frame->link) {
3345             switch (get_itbl(frame)->type) {
3346
3347             case UPDATE_FRAME:
3348                 if (frame->updatee == blocked_on) {
3349                     /* We are blocking on one of our own computations, so
3350                      * send this thread the NonTermination exception.  
3351                      */
3352                     IF_DEBUG(scheduler, 
3353                              sched_belch("thread %d is blocked on itself", t->id));
3354                     raiseAsync(t, (StgClosure *)NonTermination_closure);
3355                     goto done;
3356                 }
3357                 else {
3358                     continue;
3359                 }
3360
3361             case CATCH_FRAME:
3362             case SEQ_FRAME:
3363                 continue;
3364                 
3365             case STOP_FRAME:
3366                 break;
3367             }
3368             break;
3369         }
3370
3371     done: ;
3372     }   
3373 }
3374
3375 //@node Debugging Routines, Index, Exception Handling Routines, Main scheduling code
3376 //@subsection Debugging Routines
3377
3378 /* -----------------------------------------------------------------------------
3379    Debugging: why is a thread blocked
3380    -------------------------------------------------------------------------- */
3381
3382 #ifdef DEBUG
3383
3384 void
3385 printThreadBlockage(StgTSO *tso)
3386 {
3387   switch (tso->why_blocked) {
3388   case BlockedOnRead:
3389     fprintf(stderr,"is blocked on read from fd %d", tso->block_info.fd);
3390     break;
3391   case BlockedOnWrite:
3392     fprintf(stderr,"is blocked on write to fd %d", tso->block_info.fd);
3393     break;
3394   case BlockedOnDelay:
3395     fprintf(stderr,"is blocked until %d", tso->block_info.target);
3396     break;
3397   case BlockedOnMVar:
3398     fprintf(stderr,"is blocked on an MVar");
3399     break;
3400   case BlockedOnException:
3401     fprintf(stderr,"is blocked on delivering an exception to thread %d",
3402             tso->block_info.tso->id);
3403     break;
3404   case BlockedOnBlackHole:
3405     fprintf(stderr,"is blocked on a black hole");
3406     break;
3407   case NotBlocked:
3408     fprintf(stderr,"is not blocked");
3409     break;
3410 #if defined(PAR)
3411   case BlockedOnGA:
3412     fprintf(stderr,"is blocked on global address; local FM_BQ is %p (%s)",
3413             tso->block_info.closure, info_type(tso->block_info.closure));
3414     break;
3415   case BlockedOnGA_NoSend:
3416     fprintf(stderr,"is blocked on global address (no send); local FM_BQ is %p (%s)",
3417             tso->block_info.closure, info_type(tso->block_info.closure));
3418     break;
3419 #endif
3420   default:
3421     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
3422          tso->why_blocked, tso->id, tso);
3423   }
3424 }
3425
3426 void
3427 printThreadStatus(StgTSO *tso)
3428 {
3429   switch (tso->what_next) {
3430   case ThreadKilled:
3431     fprintf(stderr,"has been killed");
3432     break;
3433   case ThreadComplete:
3434     fprintf(stderr,"has completed");
3435     break;
3436   default:
3437     printThreadBlockage(tso);
3438   }
3439 }
3440
3441 void
3442 printAllThreads(void)
3443 {
3444   StgTSO *t;
3445
3446 # if defined(GRAN)
3447   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3448   ullong_format_string(TIME_ON_PROC(CurrentProc), 
3449                        time_string, rtsFalse/*no commas!*/);
3450
3451   sched_belch("all threads at [%s]:", time_string);
3452 # elif defined(PAR)
3453   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
3454   ullong_format_string(CURRENT_TIME,
3455                        time_string, rtsFalse/*no commas!*/);
3456
3457   sched_belch("all threads at [%s]:", time_string);
3458 # else
3459   sched_belch("all threads:");
3460 # endif
3461
3462   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
3463     fprintf(stderr, "\tthread %d ", t->id);
3464     printThreadStatus(t);
3465     fprintf(stderr,"\n");
3466   }
3467 }
3468     
3469 /* 
3470    Print a whole blocking queue attached to node (debugging only).
3471 */
3472 //@cindex print_bq
3473 # if defined(PAR)
3474 void 
3475 print_bq (StgClosure *node)
3476 {
3477   StgBlockingQueueElement *bqe;
3478   StgTSO *tso;
3479   rtsBool end;
3480
3481   fprintf(stderr,"## BQ of closure %p (%s): ",
3482           node, info_type(node));
3483
3484   /* should cover all closures that may have a blocking queue */
3485   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3486          get_itbl(node)->type == FETCH_ME_BQ ||
3487          get_itbl(node)->type == RBH ||
3488          get_itbl(node)->type == MVAR);
3489     
3490   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3491
3492   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
3493 }
3494
3495 /* 
3496    Print a whole blocking queue starting with the element bqe.
3497 */
3498 void 
3499 print_bqe (StgBlockingQueueElement *bqe)
3500 {
3501   rtsBool end;
3502
3503   /* 
3504      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3505   */
3506   for (end = (bqe==END_BQ_QUEUE);
3507        !end; // iterate until bqe points to a CONSTR
3508        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
3509        bqe = end ? END_BQ_QUEUE : bqe->link) {
3510     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
3511     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
3512     /* types of closures that may appear in a blocking queue */
3513     ASSERT(get_itbl(bqe)->type == TSO ||           
3514            get_itbl(bqe)->type == BLOCKED_FETCH || 
3515            get_itbl(bqe)->type == CONSTR); 
3516     /* only BQs of an RBH end with an RBH_Save closure */
3517     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3518
3519     switch (get_itbl(bqe)->type) {
3520     case TSO:
3521       fprintf(stderr," TSO %u (%x),",
3522               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
3523       break;
3524     case BLOCKED_FETCH:
3525       fprintf(stderr," BF (node=%p, ga=((%x, %d, %x)),",
3526               ((StgBlockedFetch *)bqe)->node, 
3527               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
3528               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
3529               ((StgBlockedFetch *)bqe)->ga.weight);
3530       break;
3531     case CONSTR:
3532       fprintf(stderr," %s (IP %p),",
3533               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3534                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3535                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3536                "RBH_Save_?"), get_itbl(bqe));
3537       break;
3538     default:
3539       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
3540            info_type((StgClosure *)bqe)); // , node, info_type(node));
3541       break;
3542     }
3543   } /* for */
3544   fputc('\n', stderr);
3545 }
3546 # elif defined(GRAN)
3547 void 
3548 print_bq (StgClosure *node)
3549 {
3550   StgBlockingQueueElement *bqe;
3551   PEs node_loc, tso_loc;
3552   rtsBool end;
3553
3554   /* should cover all closures that may have a blocking queue */
3555   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
3556          get_itbl(node)->type == FETCH_ME_BQ ||
3557          get_itbl(node)->type == RBH);
3558     
3559   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3560   node_loc = where_is(node);
3561
3562   fprintf(stderr,"## BQ of closure %p (%s) on [PE %d]: ",
3563           node, info_type(node), node_loc);
3564
3565   /* 
3566      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
3567   */
3568   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
3569        !end; // iterate until bqe points to a CONSTR
3570        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
3571     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
3572     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
3573     /* types of closures that may appear in a blocking queue */
3574     ASSERT(get_itbl(bqe)->type == TSO ||           
3575            get_itbl(bqe)->type == CONSTR); 
3576     /* only BQs of an RBH end with an RBH_Save closure */
3577     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
3578
3579     tso_loc = where_is((StgClosure *)bqe);
3580     switch (get_itbl(bqe)->type) {
3581     case TSO:
3582       fprintf(stderr," TSO %d (%p) on [PE %d],",
3583               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
3584       break;
3585     case CONSTR:
3586       fprintf(stderr," %s (IP %p),",
3587               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
3588                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
3589                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
3590                "RBH_Save_?"), get_itbl(bqe));
3591       break;
3592     default:
3593       barf("Unexpected closure type %s in blocking queue of %p (%s)",
3594            info_type((StgClosure *)bqe), node, info_type(node));
3595       break;
3596     }
3597   } /* for */
3598   fputc('\n', stderr);
3599 }
3600 #else
3601 /* 
3602    Nice and easy: only TSOs on the blocking queue
3603 */
3604 void 
3605 print_bq (StgClosure *node)
3606 {
3607   StgTSO *tso;
3608
3609   ASSERT(node!=(StgClosure*)NULL);         // sanity check
3610   for (tso = ((StgBlockingQueue*)node)->blocking_queue;
3611        tso != END_TSO_QUEUE; 
3612        tso=tso->link) {
3613     ASSERT(tso!=NULL && tso!=END_TSO_QUEUE);   // sanity check
3614     ASSERT(get_itbl(tso)->type == TSO);  // guess what, sanity check
3615     fprintf(stderr," TSO %d (%p),", tso->id, tso);
3616   }
3617   fputc('\n', stderr);
3618 }
3619 # endif
3620
3621 #if defined(PAR)
3622 static nat
3623 run_queue_len(void)
3624 {
3625   nat i;
3626   StgTSO *tso;
3627
3628   for (i=0, tso=run_queue_hd; 
3629        tso != END_TSO_QUEUE;
3630        i++, tso=tso->link)
3631     /* nothing */
3632
3633   return i;
3634 }
3635 #endif
3636
3637 static void
3638 sched_belch(char *s, ...)
3639 {
3640   va_list ap;
3641   va_start(ap,s);
3642 #ifdef SMP
3643   fprintf(stderr, "scheduler (task %ld): ", osThreadId());
3644 #elif defined(PAR)
3645   fprintf(stderr, "== ");
3646 #else
3647   fprintf(stderr, "scheduler: ");
3648 #endif
3649   vfprintf(stderr, s, ap);
3650   fprintf(stderr, "\n");
3651 }
3652
3653 #endif /* DEBUG */
3654
3655
3656 //@node Index,  , Debugging Routines, Main scheduling code
3657 //@subsection Index
3658
3659 //@index
3660 //* MainRegTable::  @cindex\s-+MainRegTable
3661 //* StgMainThread::  @cindex\s-+StgMainThread
3662 //* awaken_blocked_queue::  @cindex\s-+awaken_blocked_queue
3663 //* blocked_queue_hd::  @cindex\s-+blocked_queue_hd
3664 //* blocked_queue_tl::  @cindex\s-+blocked_queue_tl
3665 //* context_switch::  @cindex\s-+context_switch
3666 //* createThread::  @cindex\s-+createThread
3667 //* gc_pending_cond::  @cindex\s-+gc_pending_cond
3668 //* initScheduler::  @cindex\s-+initScheduler
3669 //* interrupted::  @cindex\s-+interrupted
3670 //* next_thread_id::  @cindex\s-+next_thread_id
3671 //* print_bq::  @cindex\s-+print_bq
3672 //* run_queue_hd::  @cindex\s-+run_queue_hd
3673 //* run_queue_tl::  @cindex\s-+run_queue_tl
3674 //* sched_mutex::  @cindex\s-+sched_mutex
3675 //* schedule::  @cindex\s-+schedule
3676 //* take_off_run_queue::  @cindex\s-+take_off_run_queue
3677 //* term_mutex::  @cindex\s-+term_mutex
3678 //* thread_ready_cond::  @cindex\s-+thread_ready_cond
3679 //@end index