[project @ 2005-04-12 09:17:47 by simonmar]
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2004
4  *
5  * Scheduler
6  *
7  * Different GHC ways use this scheduler quite differently (see comments below)
8  * Here is the global picture:
9  *
10  * WAY  Name     CPP flag  What's it for
11  * --------------------------------------
12  * mp   GUM      PARALLEL_HASKELL          Parallel execution on a distrib. memory machine
13  * s    SMP      SMP          Parallel execution on a shared memory machine
14  * mg   GranSim  GRAN         Simulation of parallel execution
15  * md   GUM/GdH  DIST         Distributed execution (based on GUM)
16  *
17  * --------------------------------------------------------------------------*/
18
19 /* 
20  * Version with support for distributed memory parallelism aka GUM (WAY=mp):
21
22    The main scheduling loop in GUM iterates until a finish message is received.
23    In that case a global flag @receivedFinish@ is set and this instance of
24    the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages()
25    for the handling of incoming messages, such as PP_FINISH.
26    Note that in the parallel case we have a system manager that coordinates
27    different PEs, each of which are running one instance of the RTS.
28    See ghc/rts/parallel/SysMan.c for the main routine of the parallel program.
29    From this routine processes executing ghc/rts/Main.c are spawned. -- HWL
30
31  * Version with support for simulating parallel execution aka GranSim (WAY=mg):
32
33    The main scheduling code in GranSim is quite different from that in std
34    (concurrent) Haskell: while concurrent Haskell just iterates over the
35    threads in the runnable queue, GranSim is event driven, i.e. it iterates
36    over the events in the global event queue.  -- HWL
37 */
38
39 #include "PosixSource.h"
40 #include "Rts.h"
41 #include "SchedAPI.h"
42 #include "RtsUtils.h"
43 #include "RtsFlags.h"
44 #include "BlockAlloc.h"
45 #include "OSThreads.h"
46 #include "Storage.h"
47 #include "StgRun.h"
48 #include "Hooks.h"
49 #define COMPILING_SCHEDULER
50 #include "Schedule.h"
51 #include "StgMiscClosures.h"
52 #include "Interpreter.h"
53 #include "Exception.h"
54 #include "Printer.h"
55 #include "Signals.h"
56 #include "Sanity.h"
57 #include "Stats.h"
58 #include "STM.h"
59 #include "Timer.h"
60 #include "Prelude.h"
61 #include "ThreadLabels.h"
62 #include "LdvProfile.h"
63 #include "Updates.h"
64 #ifdef PROFILING
65 #include "Proftimer.h"
66 #include "ProfHeap.h"
67 #endif
68 #if defined(GRAN) || defined(PARALLEL_HASKELL)
69 # include "GranSimRts.h"
70 # include "GranSim.h"
71 # include "ParallelRts.h"
72 # include "Parallel.h"
73 # include "ParallelDebug.h"
74 # include "FetchMe.h"
75 # include "HLC.h"
76 #endif
77 #include "Sparks.h"
78 #include "Capability.h"
79 #include  "Task.h"
80
81 #ifdef HAVE_SYS_TYPES_H
82 #include <sys/types.h>
83 #endif
84 #ifdef HAVE_UNISTD_H
85 #include <unistd.h>
86 #endif
87
88 #include <string.h>
89 #include <stdlib.h>
90 #include <stdarg.h>
91
92 #ifdef HAVE_ERRNO_H
93 #include <errno.h>
94 #endif
95
96 // Turn off inlining when debugging - it obfuscates things
97 #ifdef DEBUG
98 # undef  STATIC_INLINE
99 # define STATIC_INLINE static
100 #endif
101
102 #ifdef THREADED_RTS
103 #define USED_IN_THREADED_RTS
104 #else
105 #define USED_IN_THREADED_RTS STG_UNUSED
106 #endif
107
108 #ifdef RTS_SUPPORTS_THREADS
109 #define USED_WHEN_RTS_SUPPORTS_THREADS
110 #else
111 #define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED
112 #endif
113
114 /* Main thread queue.
115  * Locks required: sched_mutex.
116  */
117 StgMainThread *main_threads = NULL;
118
119 #if defined(GRAN)
120
121 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
122 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
123
124 /* 
125    In GranSim we have a runnable and a blocked queue for each processor.
126    In order to minimise code changes new arrays run_queue_hds/tls
127    are created. run_queue_hd is then a short cut (macro) for
128    run_queue_hds[CurrentProc] (see GranSim.h).
129    -- HWL
130 */
131 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
132 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
133 StgTSO *ccalling_threadss[MAX_PROC];
134 /* We use the same global list of threads (all_threads) in GranSim as in
135    the std RTS (i.e. we are cheating). However, we don't use this list in
136    the GranSim specific code at the moment (so we are only potentially
137    cheating).  */
138
139 #else /* !GRAN */
140
141 /* Thread queues.
142  * Locks required: sched_mutex.
143  */
144 StgTSO *run_queue_hd = NULL;
145 StgTSO *run_queue_tl = NULL;
146 StgTSO *blocked_queue_hd = NULL;
147 StgTSO *blocked_queue_tl = NULL;
148 StgTSO *blackhole_queue = NULL;
149 StgTSO *sleeping_queue = NULL;    /* perhaps replace with a hash table? */
150
151 #endif
152
153 /* The blackhole_queue should be checked for threads to wake up.  See
154  * Schedule.h for more thorough comment.
155  */
156 rtsBool blackholes_need_checking = rtsFalse;
157
158 /* Linked list of all threads.
159  * Used for detecting garbage collected threads.
160  */
161 StgTSO *all_threads = NULL;
162
163 /* When a thread performs a safe C call (_ccall_GC, using old
164  * terminology), it gets put on the suspended_ccalling_threads
165  * list. Used by the garbage collector.
166  */
167 static StgTSO *suspended_ccalling_threads;
168
169 /* KH: The following two flags are shared memory locations.  There is no need
170        to lock them, since they are only unset at the end of a scheduler
171        operation.
172 */
173
174 /* flag set by signal handler to precipitate a context switch */
175 int context_switch = 0;
176
177 /* if this flag is set as well, give up execution */
178 rtsBool interrupted = rtsFalse;
179
180 /* Next thread ID to allocate.
181  * Locks required: thread_id_mutex
182  */
183 static StgThreadID next_thread_id = 1;
184
185 /*
186  * Pointers to the state of the current thread.
187  * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell
188  * thread.  If CurrentTSO == NULL, then we're at the scheduler level.
189  */
190  
191 /* The smallest stack size that makes any sense is:
192  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
193  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
194  *  + 1                       (the closure to enter)
195  *  + 1                       (stg_ap_v_ret)
196  *  + 1                       (spare slot req'd by stg_ap_v_ret)
197  *
198  * A thread with this stack will bomb immediately with a stack
199  * overflow, which will increase its stack size.  
200  */
201
202 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
203
204
205 #if defined(GRAN)
206 StgTSO *CurrentTSO;
207 #endif
208
209 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
210  *  exists - earlier gccs apparently didn't.
211  *  -= chak
212  */
213 StgTSO dummy_tso;
214
215 /*
216  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
217  * in an MT setting, needed to signal that a worker thread shouldn't hang around
218  * in the scheduler when it is out of work.
219  */
220 static rtsBool shutting_down_scheduler = rtsFalse;
221
222 #if defined(RTS_SUPPORTS_THREADS)
223 /* ToDo: carefully document the invariants that go together
224  *       with these synchronisation objects.
225  */
226 Mutex     sched_mutex       = INIT_MUTEX_VAR;
227 Mutex     term_mutex        = INIT_MUTEX_VAR;
228
229 #endif /* RTS_SUPPORTS_THREADS */
230
231 #if defined(PARALLEL_HASKELL)
232 StgTSO *LastTSO;
233 rtsTime TimeOfLastYield;
234 rtsBool emitSchedule = rtsTrue;
235 #endif
236
237 #if DEBUG
238 static char *whatNext_strs[] = {
239   "(unknown)",
240   "ThreadRunGHC",
241   "ThreadInterpret",
242   "ThreadKilled",
243   "ThreadRelocated",
244   "ThreadComplete"
245 };
246 #endif
247
248 /* -----------------------------------------------------------------------------
249  * static function prototypes
250  * -------------------------------------------------------------------------- */
251
252 #if defined(RTS_SUPPORTS_THREADS)
253 static void taskStart(void);
254 #endif
255
256 static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
257                       Capability *initialCapability );
258
259 //
260 // These function all encapsulate parts of the scheduler loop, and are
261 // abstracted only to make the structure and control flow of the
262 // scheduler clearer.
263 //
264 static void schedulePreLoop(void);
265 static void scheduleStartSignalHandlers(void);
266 static void scheduleCheckBlockedThreads(void);
267 static void scheduleCheckBlackHoles(void);
268 static void scheduleDetectDeadlock(void);
269 #if defined(GRAN)
270 static StgTSO *scheduleProcessEvent(rtsEvent *event);
271 #endif
272 #if defined(PARALLEL_HASKELL)
273 static StgTSO *scheduleSendPendingMessages(void);
274 static void scheduleActivateSpark(void);
275 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
276 #endif
277 #if defined(PAR) || defined(GRAN)
278 static void scheduleGranParReport(void);
279 #endif
280 static void schedulePostRunThread(void);
281 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
282 static void scheduleHandleStackOverflow( StgTSO *t);
283 static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next );
284 static void scheduleHandleThreadBlocked( StgTSO *t );
285 static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, 
286                                              Capability *cap, StgTSO *t );
287 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
288 static void scheduleDoGC(Capability *cap);
289
290 static void unblockThread(StgTSO *tso);
291 static rtsBool checkBlackHoles(void);
292 static SchedulerStatus waitThread_(/*out*/StgMainThread* m,
293                                    Capability *initialCapability
294                                    );
295 static void scheduleThread_ (StgTSO* tso);
296 static void AllRoots(evac_fn evac);
297
298 static StgTSO *threadStackOverflow(StgTSO *tso);
299
300 static void raiseAsync_(StgTSO *tso, StgClosure *exception, 
301                         rtsBool stop_at_atomically);
302
303 static void printThreadBlockage(StgTSO *tso);
304 static void printThreadStatus(StgTSO *tso);
305
306 #if defined(PARALLEL_HASKELL)
307 StgTSO * createSparkThread(rtsSpark spark);
308 StgTSO * activateSpark (rtsSpark spark);  
309 #endif
310
311 /* ----------------------------------------------------------------------------
312  * Starting Tasks
313  * ------------------------------------------------------------------------- */
314
315 #if defined(RTS_SUPPORTS_THREADS)
316 static rtsBool startingWorkerThread = rtsFalse;
317
318 static void
319 taskStart(void)
320 {
321   ACQUIRE_LOCK(&sched_mutex);
322   startingWorkerThread = rtsFalse;
323   schedule(NULL,NULL);
324   taskStop();
325   RELEASE_LOCK(&sched_mutex);
326 }
327
328 void
329 startSchedulerTaskIfNecessary(void)
330 {
331     if ( !EMPTY_RUN_QUEUE()
332          && !shutting_down_scheduler // not if we're shutting down
333          && !startingWorkerThread)
334     {
335         // we don't want to start another worker thread
336         // just because the last one hasn't yet reached the
337         // "waiting for capability" state
338         startingWorkerThread = rtsTrue;
339         if (!maybeStartNewWorker(taskStart)) {
340             startingWorkerThread = rtsFalse;
341         }
342     }
343 }
344 #endif
345
346 /* -----------------------------------------------------------------------------
347  * Putting a thread on the run queue: different scheduling policies
348  * -------------------------------------------------------------------------- */
349
350 STATIC_INLINE void
351 addToRunQueue( StgTSO *t )
352 {
353 #if defined(PARALLEL_HASKELL)
354     if (RtsFlags.ParFlags.doFairScheduling) { 
355         // this does round-robin scheduling; good for concurrency
356         APPEND_TO_RUN_QUEUE(t);
357     } else {
358         // this does unfair scheduling; good for parallelism
359         PUSH_ON_RUN_QUEUE(t);
360     }
361 #else
362     // this does round-robin scheduling; good for concurrency
363     APPEND_TO_RUN_QUEUE(t);
364 #endif
365 }
366     
367 /* ---------------------------------------------------------------------------
368    Main scheduling loop.
369
370    We use round-robin scheduling, each thread returning to the
371    scheduler loop when one of these conditions is detected:
372
373       * out of heap space
374       * timer expires (thread yields)
375       * thread blocks
376       * thread ends
377       * stack overflow
378
379    Locking notes:  we acquire the scheduler lock once at the beginning
380    of the scheduler loop, and release it when
381     
382       * running a thread, or
383       * waiting for work, or
384       * waiting for a GC to complete.
385
386    GRAN version:
387      In a GranSim setup this loop iterates over the global event queue.
388      This revolves around the global event queue, which determines what 
389      to do next. Therefore, it's more complicated than either the 
390      concurrent or the parallel (GUM) setup.
391
392    GUM version:
393      GUM iterates over incoming messages.
394      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
395      and sends out a fish whenever it has nothing to do; in-between
396      doing the actual reductions (shared code below) it processes the
397      incoming messages and deals with delayed operations 
398      (see PendingFetches).
399      This is not the ugliest code you could imagine, but it's bloody close.
400
401    ------------------------------------------------------------------------ */
402
403 static void
404 schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS,
405           Capability *initialCapability )
406 {
407   StgTSO *t;
408   Capability *cap;
409   StgThreadReturnCode ret;
410 #if defined(GRAN)
411   rtsEvent *event;
412 #elif defined(PARALLEL_HASKELL)
413   StgTSO *tso;
414   GlobalTaskId pe;
415   rtsBool receivedFinish = rtsFalse;
416 # if defined(DEBUG)
417   nat tp_size, sp_size; // stats only
418 # endif
419 #endif
420   nat prev_what_next;
421   rtsBool ready_to_gc;
422   
423   // Pre-condition: sched_mutex is held.
424   // We might have a capability, passed in as initialCapability.
425   cap = initialCapability;
426
427 #if !defined(RTS_SUPPORTS_THREADS)
428   // simply initialise it in the non-threaded case
429   grabCapability(&cap);
430 #endif
431
432   IF_DEBUG(scheduler,
433            sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)",
434                        mainThread, initialCapability);
435       );
436
437   schedulePreLoop();
438
439   // -----------------------------------------------------------
440   // Scheduler loop starts here:
441
442 #if defined(PARALLEL_HASKELL)
443 #define TERMINATION_CONDITION        (!receivedFinish)
444 #elif defined(GRAN)
445 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
446 #else
447 #define TERMINATION_CONDITION        rtsTrue
448 #endif
449
450   while (TERMINATION_CONDITION) {
451
452 #if defined(GRAN)
453       /* Choose the processor with the next event */
454       CurrentProc = event->proc;
455       CurrentTSO = event->tso;
456 #endif
457
458       IF_DEBUG(scheduler, printAllThreads());
459
460 #if defined(RTS_SUPPORTS_THREADS)
461       // Yield the capability to higher-priority tasks if necessary.
462       //
463       if (cap != NULL) {
464           yieldCapability(&cap);
465       }
466
467       // If we do not currently hold a capability, we wait for one
468       //
469       if (cap == NULL) {
470           waitForCapability(&sched_mutex, &cap,
471                             mainThread ? &mainThread->bound_thread_cond : NULL);
472       }
473
474       // We now have a capability...
475 #endif
476
477     // Check whether we have re-entered the RTS from Haskell without
478     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
479     // call).
480     if (cap->r.rInHaskell) {
481           errorBelch("schedule: re-entered unsafely.\n"
482                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
483           stg_exit(1);
484     }
485
486     //
487     // Test for interruption.  If interrupted==rtsTrue, then either
488     // we received a keyboard interrupt (^C), or the scheduler is
489     // trying to shut down all the tasks (shutting_down_scheduler) in
490     // the threaded RTS.
491     //
492     if (interrupted) {
493         if (shutting_down_scheduler) {
494             IF_DEBUG(scheduler, sched_belch("shutting down"));
495             releaseCapability(cap);
496             if (mainThread) {
497                 mainThread->stat = Interrupted;
498                 mainThread->ret  = NULL;
499             }
500             return;
501         } else {
502             IF_DEBUG(scheduler, sched_belch("interrupted"));
503             deleteAllThreads();
504         }
505     }
506
507 #if defined(not_yet) && defined(SMP)
508     //
509     // Top up the run queue from our spark pool.  We try to make the
510     // number of threads in the run queue equal to the number of
511     // free capabilities.
512     //
513     {
514         StgClosure *spark;
515         if (EMPTY_RUN_QUEUE()) {
516             spark = findSpark(rtsFalse);
517             if (spark == NULL) {
518                 break; /* no more sparks in the pool */
519             } else {
520                 createSparkThread(spark);         
521                 IF_DEBUG(scheduler,
522                          sched_belch("==^^ turning spark of closure %p into a thread",
523                                      (StgClosure *)spark));
524             }
525         }
526     }
527 #endif // SMP
528
529     scheduleStartSignalHandlers();
530
531     // Only check the black holes here if we've nothing else to do.
532     // During normal execution, the black hole list only gets checked
533     // at GC time, to avoid repeatedly traversing this possibly long
534     // list each time around the scheduler.
535     if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); }
536
537     scheduleCheckBlockedThreads();
538
539     scheduleDetectDeadlock();
540
541     // Normally, the only way we can get here with no threads to
542     // run is if a keyboard interrupt received during 
543     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
544     // Additionally, it is not fatal for the
545     // threaded RTS to reach here with no threads to run.
546     //
547     // win32: might be here due to awaitEvent() being abandoned
548     // as a result of a console event having been delivered.
549     if ( EMPTY_RUN_QUEUE() ) {
550 #if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS)
551         ASSERT(interrupted);
552 #endif
553         continue; // nothing to do
554     }
555
556 #if defined(PARALLEL_HASKELL)
557     scheduleSendPendingMessages();
558     if (EMPTY_RUN_QUEUE() && scheduleActivateSpark()) 
559         continue;
560
561 #if defined(SPARKS)
562     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
563 #endif
564
565     /* If we still have no work we need to send a FISH to get a spark
566        from another PE */
567     if (EMPTY_RUN_QUEUE()) {
568         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
569         ASSERT(rtsFalse); // should not happen at the moment
570     }
571     // from here: non-empty run queue.
572     //  TODO: merge above case with this, only one call processMessages() !
573     if (PacketsWaiting()) {  /* process incoming messages, if
574                                 any pending...  only in else
575                                 because getRemoteWork waits for
576                                 messages as well */
577         receivedFinish = processMessages();
578     }
579 #endif
580
581 #if defined(GRAN)
582     scheduleProcessEvent(event);
583 #endif
584
585     // 
586     // Get a thread to run
587     //
588     ASSERT(run_queue_hd != END_TSO_QUEUE);
589     POP_RUN_QUEUE(t);
590
591 #if defined(GRAN) || defined(PAR)
592     scheduleGranParReport(); // some kind of debuging output
593 #else
594     // Sanity check the thread we're about to run.  This can be
595     // expensive if there is lots of thread switching going on...
596     IF_DEBUG(sanity,checkTSO(t));
597 #endif
598
599 #if defined(RTS_SUPPORTS_THREADS)
600     // Check whether we can run this thread in the current task.
601     // If not, we have to pass our capability to the right task.
602     {
603       StgMainThread *m = t->main;
604       
605       if(m)
606       {
607         if(m == mainThread)
608         {
609           IF_DEBUG(scheduler,
610             sched_belch("### Running thread %d in bound thread", t->id));
611           // yes, the Haskell thread is bound to the current native thread
612         }
613         else
614         {
615           IF_DEBUG(scheduler,
616             sched_belch("### thread %d bound to another OS thread", t->id));
617           // no, bound to a different Haskell thread: pass to that thread
618           PUSH_ON_RUN_QUEUE(t);
619           passCapability(&m->bound_thread_cond);
620           continue;
621         }
622       }
623       else
624       {
625         if(mainThread != NULL)
626         // The thread we want to run is bound.
627         {
628           IF_DEBUG(scheduler,
629             sched_belch("### this OS thread cannot run thread %d", t->id));
630           // no, the current native thread is bound to a different
631           // Haskell thread, so pass it to any worker thread
632           PUSH_ON_RUN_QUEUE(t);
633           passCapabilityToWorker();
634           continue; 
635         }
636       }
637     }
638 #endif
639
640     cap->r.rCurrentTSO = t;
641     
642     /* context switches are now initiated by the timer signal, unless
643      * the user specified "context switch as often as possible", with
644      * +RTS -C0
645      */
646     if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0
647          && (run_queue_hd != END_TSO_QUEUE
648              || blocked_queue_hd != END_TSO_QUEUE
649              || sleeping_queue != END_TSO_QUEUE)))
650         context_switch = 1;
651
652 run_thread:
653
654     RELEASE_LOCK(&sched_mutex);
655
656     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
657                               (long)t->id, whatNext_strs[t->what_next]));
658
659 #if defined(PROFILING)
660     startHeapProfTimer();
661 #endif
662
663     // ----------------------------------------------------------------------
664     // Run the current thread 
665
666     prev_what_next = t->what_next;
667
668     errno = t->saved_errno;
669     cap->r.rInHaskell = rtsTrue;
670
671     switch (prev_what_next) {
672
673     case ThreadKilled:
674     case ThreadComplete:
675         /* Thread already finished, return to scheduler. */
676         ret = ThreadFinished;
677         break;
678
679     case ThreadRunGHC:
680         ret = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
681         break;
682
683     case ThreadInterpret:
684         ret = interpretBCO(cap);
685         break;
686
687     default:
688       barf("schedule: invalid what_next field");
689     }
690
691     // We have run some Haskell code: there might be blackhole-blocked
692     // threads to wake up now.
693     if ( blackhole_queue != END_TSO_QUEUE ) {
694         blackholes_need_checking = rtsTrue;
695     }
696
697     cap->r.rInHaskell = rtsFalse;
698
699     // The TSO might have moved, eg. if it re-entered the RTS and a GC
700     // happened.  So find the new location:
701     t = cap->r.rCurrentTSO;
702
703     // And save the current errno in this thread.
704     t->saved_errno = errno;
705
706     // ----------------------------------------------------------------------
707     
708     /* Costs for the scheduler are assigned to CCS_SYSTEM */
709 #if defined(PROFILING)
710     stopHeapProfTimer();
711     CCCS = CCS_SYSTEM;
712 #endif
713     
714     ACQUIRE_LOCK(&sched_mutex);
715     
716 #if defined(RTS_SUPPORTS_THREADS)
717     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId()););
718 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
719     IF_DEBUG(scheduler,debugBelch("sched: "););
720 #endif
721     
722     schedulePostRunThread();
723
724     ready_to_gc = rtsFalse;
725
726     switch (ret) {
727     case HeapOverflow:
728         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
729         break;
730
731     case StackOverflow:
732         scheduleHandleStackOverflow(t);
733         break;
734
735     case ThreadYielding:
736         if (scheduleHandleYield(t, prev_what_next)) {
737             // shortcut for switching between compiler/interpreter:
738             goto run_thread; 
739         }
740         break;
741
742     case ThreadBlocked:
743         scheduleHandleThreadBlocked(t);
744         threadPaused(t);
745         break;
746
747     case ThreadFinished:
748         if (scheduleHandleThreadFinished(mainThread, cap, t)) return;;
749         break;
750
751     default:
752       barf("schedule: invalid thread return code %d", (int)ret);
753     }
754
755     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
756     if (ready_to_gc) { scheduleDoGC(cap); }
757   } /* end of while() */
758
759   IF_PAR_DEBUG(verbose,
760                debugBelch("== Leaving schedule() after having received Finish\n"));
761 }
762
763 /* ----------------------------------------------------------------------------
764  * Setting up the scheduler loop
765  * ASSUMES: sched_mutex
766  * ------------------------------------------------------------------------- */
767
768 static void
769 schedulePreLoop(void)
770 {
771 #if defined(GRAN) 
772     /* set up first event to get things going */
773     /* ToDo: assign costs for system setup and init MainTSO ! */
774     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
775               ContinueThread, 
776               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
777     
778     IF_DEBUG(gran,
779              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
780                         CurrentTSO);
781              G_TSO(CurrentTSO, 5));
782     
783     if (RtsFlags.GranFlags.Light) {
784         /* Save current time; GranSim Light only */
785         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
786     }      
787 #endif
788 }
789
790 /* ----------------------------------------------------------------------------
791  * Start any pending signal handlers
792  * ASSUMES: sched_mutex
793  * ------------------------------------------------------------------------- */
794
795 static void
796 scheduleStartSignalHandlers(void)
797 {
798 #if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS)
799     if (signals_pending()) {
800       RELEASE_LOCK(&sched_mutex); /* ToDo: kill */
801       startSignalHandlers();
802       ACQUIRE_LOCK(&sched_mutex);
803     }
804 #endif
805 }
806
807 /* ----------------------------------------------------------------------------
808  * Check for blocked threads that can be woken up.
809  * ASSUMES: sched_mutex
810  * ------------------------------------------------------------------------- */
811
812 static void
813 scheduleCheckBlockedThreads(void)
814 {
815     //
816     // Check whether any waiting threads need to be woken up.  If the
817     // run queue is empty, and there are no other tasks running, we
818     // can wait indefinitely for something to happen.
819     //
820     if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) )
821     {
822 #if defined(RTS_SUPPORTS_THREADS)
823         // We shouldn't be here...
824         barf("schedule: awaitEvent() in threaded RTS");
825 #endif
826         awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking );
827     }
828 }
829
830
831 /* ----------------------------------------------------------------------------
832  * Check for threads blocked on BLACKHOLEs that can be woken up
833  * ASSUMES: sched_mutex
834  * ------------------------------------------------------------------------- */
835 static void
836 scheduleCheckBlackHoles( void )
837 {
838     if ( blackholes_need_checking )
839     {
840         checkBlackHoles();
841         blackholes_need_checking = rtsFalse;
842     }
843 }
844
845 /* ----------------------------------------------------------------------------
846  * Detect deadlock conditions and attempt to resolve them.
847  * ASSUMES: sched_mutex
848  * ------------------------------------------------------------------------- */
849
850 static void
851 scheduleDetectDeadlock(void)
852 {
853     /* 
854      * Detect deadlock: when we have no threads to run, there are no
855      * threads blocked, waiting for I/O, or sleeping, and all the
856      * other tasks are waiting for work, we must have a deadlock of
857      * some description.
858      */
859     if ( EMPTY_THREAD_QUEUES() )
860     {
861 #if !defined(PARALLEL_HASKELL) && !defined(RTS_SUPPORTS_THREADS)
862         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
863
864         // Garbage collection can release some new threads due to
865         // either (a) finalizers or (b) threads resurrected because
866         // they are unreachable and will therefore be sent an
867         // exception.  Any threads thus released will be immediately
868         // runnable.
869         GarbageCollect(GetRoots,rtsTrue);
870         if ( !EMPTY_RUN_QUEUE() ) return;
871
872 #if defined(RTS_USER_SIGNALS)
873         /* If we have user-installed signal handlers, then wait
874          * for signals to arrive rather then bombing out with a
875          * deadlock.
876          */
877         if ( anyUserHandlers() ) {
878             IF_DEBUG(scheduler, 
879                      sched_belch("still deadlocked, waiting for signals..."));
880
881             awaitUserSignals();
882
883             if (signals_pending()) {
884                 RELEASE_LOCK(&sched_mutex);
885                 startSignalHandlers();
886                 ACQUIRE_LOCK(&sched_mutex);
887             }
888
889             // either we have threads to run, or we were interrupted:
890             ASSERT(!EMPTY_RUN_QUEUE() || interrupted);
891         }
892 #endif
893
894         /* Probably a real deadlock.  Send the current main thread the
895          * Deadlock exception (or in the SMP build, send *all* main
896          * threads the deadlock exception, since none of them can make
897          * progress).
898          */
899         {
900             StgMainThread *m;
901             m = main_threads;
902             switch (m->tso->why_blocked) {
903             case BlockedOnBlackHole:
904             case BlockedOnException:
905             case BlockedOnMVar:
906                 raiseAsync(m->tso, (StgClosure *)NonTermination_closure);
907                 return;
908             default:
909                 barf("deadlock: main thread blocked in a strange way");
910             }
911         }
912
913 #elif defined(RTS_SUPPORTS_THREADS)
914     // ToDo: add deadlock detection in threaded RTS
915 #elif defined(PARALLEL_HASKELL)
916     // ToDo: add deadlock detection in GUM (similar to SMP) -- HWL
917 #endif
918     }
919 }
920
921 /* ----------------------------------------------------------------------------
922  * Process an event (GRAN only)
923  * ------------------------------------------------------------------------- */
924
925 #if defined(GRAN)
926 static StgTSO *
927 scheduleProcessEvent(rtsEvent *event)
928 {
929     StgTSO *t;
930
931     if (RtsFlags.GranFlags.Light)
932       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
933
934     /* adjust time based on time-stamp */
935     if (event->time > CurrentTime[CurrentProc] &&
936         event->evttype != ContinueThread)
937       CurrentTime[CurrentProc] = event->time;
938     
939     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
940     if (!RtsFlags.GranFlags.Light)
941       handleIdlePEs();
942
943     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
944
945     /* main event dispatcher in GranSim */
946     switch (event->evttype) {
947       /* Should just be continuing execution */
948     case ContinueThread:
949       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
950       /* ToDo: check assertion
951       ASSERT(run_queue_hd != (StgTSO*)NULL &&
952              run_queue_hd != END_TSO_QUEUE);
953       */
954       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
955       if (!RtsFlags.GranFlags.DoAsyncFetch &&
956           procStatus[CurrentProc]==Fetching) {
957         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
958               CurrentTSO->id, CurrentTSO, CurrentProc);
959         goto next_thread;
960       } 
961       /* Ignore ContinueThreads for completed threads */
962       if (CurrentTSO->what_next == ThreadComplete) {
963         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
964               CurrentTSO->id, CurrentTSO, CurrentProc);
965         goto next_thread;
966       } 
967       /* Ignore ContinueThreads for threads that are being migrated */
968       if (PROCS(CurrentTSO)==Nowhere) { 
969         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
970               CurrentTSO->id, CurrentTSO, CurrentProc);
971         goto next_thread;
972       }
973       /* The thread should be at the beginning of the run queue */
974       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
975         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
976               CurrentTSO->id, CurrentTSO, CurrentProc);
977         break; // run the thread anyway
978       }
979       /*
980       new_event(proc, proc, CurrentTime[proc],
981                 FindWork,
982                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
983       goto next_thread; 
984       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
985       break; // now actually run the thread; DaH Qu'vam yImuHbej 
986
987     case FetchNode:
988       do_the_fetchnode(event);
989       goto next_thread;             /* handle next event in event queue  */
990       
991     case GlobalBlock:
992       do_the_globalblock(event);
993       goto next_thread;             /* handle next event in event queue  */
994       
995     case FetchReply:
996       do_the_fetchreply(event);
997       goto next_thread;             /* handle next event in event queue  */
998       
999     case UnblockThread:   /* Move from the blocked queue to the tail of */
1000       do_the_unblock(event);
1001       goto next_thread;             /* handle next event in event queue  */
1002       
1003     case ResumeThread:  /* Move from the blocked queue to the tail of */
1004       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1005       event->tso->gran.blocktime += 
1006         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1007       do_the_startthread(event);
1008       goto next_thread;             /* handle next event in event queue  */
1009       
1010     case StartThread:
1011       do_the_startthread(event);
1012       goto next_thread;             /* handle next event in event queue  */
1013       
1014     case MoveThread:
1015       do_the_movethread(event);
1016       goto next_thread;             /* handle next event in event queue  */
1017       
1018     case MoveSpark:
1019       do_the_movespark(event);
1020       goto next_thread;             /* handle next event in event queue  */
1021       
1022     case FindWork:
1023       do_the_findwork(event);
1024       goto next_thread;             /* handle next event in event queue  */
1025       
1026     default:
1027       barf("Illegal event type %u\n", event->evttype);
1028     }  /* switch */
1029     
1030     /* This point was scheduler_loop in the old RTS */
1031
1032     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1033
1034     TimeOfLastEvent = CurrentTime[CurrentProc];
1035     TimeOfNextEvent = get_time_of_next_event();
1036     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1037     // CurrentTSO = ThreadQueueHd;
1038
1039     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1040                          TimeOfNextEvent));
1041
1042     if (RtsFlags.GranFlags.Light) 
1043       GranSimLight_leave_system(event, &ActiveTSO); 
1044
1045     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1046
1047     IF_DEBUG(gran, 
1048              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1049
1050     /* in a GranSim setup the TSO stays on the run queue */
1051     t = CurrentTSO;
1052     /* Take a thread from the run queue. */
1053     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1054
1055     IF_DEBUG(gran, 
1056              debugBelch("GRAN: About to run current thread, which is\n");
1057              G_TSO(t,5));
1058
1059     context_switch = 0; // turned on via GranYield, checking events and time slice
1060
1061     IF_DEBUG(gran, 
1062              DumpGranEvent(GR_SCHEDULE, t));
1063
1064     procStatus[CurrentProc] = Busy;
1065 }
1066 #endif // GRAN
1067
1068 /* ----------------------------------------------------------------------------
1069  * Send pending messages (PARALLEL_HASKELL only)
1070  * ------------------------------------------------------------------------- */
1071
1072 #if defined(PARALLEL_HASKELL)
1073 static StgTSO *
1074 scheduleSendPendingMessages(void)
1075 {
1076     StgSparkPool *pool;
1077     rtsSpark spark;
1078     StgTSO *t;
1079
1080 # if defined(PAR) // global Mem.Mgmt., omit for now
1081     if (PendingFetches != END_BF_QUEUE) {
1082         processFetches();
1083     }
1084 # endif
1085     
1086     if (RtsFlags.ParFlags.BufferTime) {
1087         // if we use message buffering, we must send away all message
1088         // packets which have become too old...
1089         sendOldBuffers(); 
1090     }
1091 }
1092 #endif
1093
1094 /* ----------------------------------------------------------------------------
1095  * Activate spark threads (PARALLEL_HASKELL only)
1096  * ------------------------------------------------------------------------- */
1097
1098 #if defined(PARALLEL_HASKELL)
1099 static void
1100 scheduleActivateSpark(void)
1101 {
1102 #if defined(SPARKS)
1103   ASSERT(EMPTY_RUN_QUEUE());
1104 /* We get here if the run queue is empty and want some work.
1105    We try to turn a spark into a thread, and add it to the run queue,
1106    from where it will be picked up in the next iteration of the scheduler
1107    loop.
1108 */
1109
1110       /* :-[  no local threads => look out for local sparks */
1111       /* the spark pool for the current PE */
1112       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1113       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1114           pool->hd < pool->tl) {
1115         /* 
1116          * ToDo: add GC code check that we really have enough heap afterwards!!
1117          * Old comment:
1118          * If we're here (no runnable threads) and we have pending
1119          * sparks, we must have a space problem.  Get enough space
1120          * to turn one of those pending sparks into a
1121          * thread... 
1122          */
1123
1124         spark = findSpark(rtsFalse);            /* get a spark */
1125         if (spark != (rtsSpark) NULL) {
1126           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1127           IF_PAR_DEBUG(fish, // schedule,
1128                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1129                              tso->id, tso, advisory_thread_count));
1130
1131           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1132             IF_PAR_DEBUG(fish, // schedule,
1133                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1134                             spark));
1135             return rtsFalse; /* failed to generate a thread */
1136           }                  /* otherwise fall through & pick-up new tso */
1137         } else {
1138           IF_PAR_DEBUG(fish, // schedule,
1139                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1140                              spark_queue_len(pool)));
1141           return rtsFalse;  /* failed to generate a thread */
1142         }
1143         return rtsTrue;  /* success in generating a thread */
1144   } else { /* no more threads permitted or pool empty */
1145     return rtsFalse;  /* failed to generateThread */
1146   }
1147 #else
1148   tso = NULL; // avoid compiler warning only
1149   return rtsFalse;  /* dummy in non-PAR setup */
1150 #endif // SPARKS
1151 }
1152 #endif // PARALLEL_HASKELL
1153
1154 /* ----------------------------------------------------------------------------
1155  * Get work from a remote node (PARALLEL_HASKELL only)
1156  * ------------------------------------------------------------------------- */
1157     
1158 #if defined(PARALLEL_HASKELL)
1159 static rtsBool
1160 scheduleGetRemoteWork(rtsBool *receivedFinish)
1161 {
1162   ASSERT(EMPTY_RUN_QUEUE());
1163
1164   if (RtsFlags.ParFlags.BufferTime) {
1165         IF_PAR_DEBUG(verbose, 
1166                 debugBelch("...send all pending data,"));
1167         {
1168           nat i;
1169           for (i=1; i<=nPEs; i++)
1170             sendImmediately(i); // send all messages away immediately
1171         }
1172   }
1173 # ifndef SPARKS
1174         //++EDEN++ idle() , i.e. send all buffers, wait for work
1175         // suppress fishing in EDEN... just look for incoming messages
1176         // (blocking receive)
1177   IF_PAR_DEBUG(verbose, 
1178                debugBelch("...wait for incoming messages...\n"));
1179   *receivedFinish = processMessages(); // blocking receive...
1180
1181         // and reenter scheduling loop after having received something
1182         // (return rtsFalse below)
1183
1184 # else /* activate SPARKS machinery */
1185 /* We get here, if we have no work, tried to activate a local spark, but still
1186    have no work. We try to get a remote spark, by sending a FISH message.
1187    Thread migration should be added here, and triggered when a sequence of 
1188    fishes returns without work. */
1189         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1190
1191       /* =8-[  no local sparks => look for work on other PEs */
1192         /*
1193          * We really have absolutely no work.  Send out a fish
1194          * (there may be some out there already), and wait for
1195          * something to arrive.  We clearly can't run any threads
1196          * until a SCHEDULE or RESUME arrives, and so that's what
1197          * we're hoping to see.  (Of course, we still have to
1198          * respond to other types of messages.)
1199          */
1200         rtsTime now = msTime() /*CURRENT_TIME*/;
1201         IF_PAR_DEBUG(verbose, 
1202                      debugBelch("--  now=%ld\n", now));
1203         IF_PAR_DEBUG(fish, // verbose,
1204              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1205                  (last_fish_arrived_at!=0 &&
1206                   last_fish_arrived_at+delay > now)) {
1207                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1208                      now, last_fish_arrived_at+delay, 
1209                      last_fish_arrived_at,
1210                      delay);
1211              });
1212   
1213         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1214             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1215           if (last_fish_arrived_at==0 ||
1216               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1217             /* outstandingFishes is set in sendFish, processFish;
1218                avoid flooding system with fishes via delay */
1219     next_fish_to_send_at = 0;  
1220   } else {
1221     /* ToDo: this should be done in the main scheduling loop to avoid the
1222              busy wait here; not so bad if fish delay is very small  */
1223     int iq = 0; // DEBUGGING -- HWL
1224     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1225     /* send a fish when ready, but process messages that arrive in the meantime */
1226     do {
1227       if (PacketsWaiting()) {
1228         iq++; // DEBUGGING
1229         *receivedFinish = processMessages();
1230       }
1231       now = msTime();
1232     } while (!*receivedFinish || now<next_fish_to_send_at);
1233     // JB: This means the fish could become obsolete, if we receive
1234     // work. Better check for work again? 
1235     // last line: while (!receivedFinish || !haveWork || now<...)
1236     // next line: if (receivedFinish || haveWork )
1237
1238     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1239       return rtsFalse;  // NB: this will leave scheduler loop
1240                         // immediately after return!
1241                           
1242     IF_PAR_DEBUG(fish, // verbose,
1243                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1244
1245   }
1246
1247     // JB: IMHO, this should all be hidden inside sendFish(...)
1248     /* pe = choosePE(); 
1249        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1250                 NEW_FISH_HUNGER);
1251
1252     // Global statistics: count no. of fishes
1253     if (RtsFlags.ParFlags.ParStats.Global &&
1254          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1255            globalParStats.tot_fish_mess++;
1256            }
1257     */ 
1258
1259   /* delayed fishes must have been sent by now! */
1260   next_fish_to_send_at = 0;  
1261   }
1262       
1263   *receivedFinish = processMessages();
1264 # endif /* SPARKS */
1265
1266  return rtsFalse;
1267  /* NB: this function always returns rtsFalse, meaning the scheduler
1268     loop continues with the next iteration; 
1269     rationale: 
1270       return code means success in finding work; we enter this function
1271       if there is no local work, thus have to send a fish which takes
1272       time until it arrives with work; in the meantime we should process
1273       messages in the main loop;
1274  */
1275 }
1276 #endif // PARALLEL_HASKELL
1277
1278 /* ----------------------------------------------------------------------------
1279  * PAR/GRAN: Report stats & debugging info(?)
1280  * ------------------------------------------------------------------------- */
1281
1282 #if defined(PAR) || defined(GRAN)
1283 static void
1284 scheduleGranParReport(void)
1285 {
1286   ASSERT(run_queue_hd != END_TSO_QUEUE);
1287
1288   /* Take a thread from the run queue, if we have work */
1289   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1290
1291     /* If this TSO has got its outport closed in the meantime, 
1292      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1293      * It has to be marked as TH_DEAD for this purpose.
1294      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1295
1296 JB: TODO: investigate wether state change field could be nuked
1297      entirely and replaced by the normal tso state (whatnext
1298      field). All we want to do is to kill tsos from outside.
1299      */
1300
1301     /* ToDo: write something to the log-file
1302     if (RTSflags.ParFlags.granSimStats && !sameThread)
1303         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1304
1305     CurrentTSO = t;
1306     */
1307     /* the spark pool for the current PE */
1308     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1309
1310     IF_DEBUG(scheduler, 
1311              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1312                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1313
1314     IF_PAR_DEBUG(fish,
1315              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1316                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1317
1318     if (RtsFlags.ParFlags.ParStats.Full && 
1319         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1320         (emitSchedule || // forced emit
1321          (t && LastTSO && t->id != LastTSO->id))) {
1322       /* 
1323          we are running a different TSO, so write a schedule event to log file
1324          NB: If we use fair scheduling we also have to write  a deschedule 
1325              event for LastTSO; with unfair scheduling we know that the
1326              previous tso has blocked whenever we switch to another tso, so
1327              we don't need it in GUM for now
1328       */
1329       IF_PAR_DEBUG(fish, // schedule,
1330                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1331
1332       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1333                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1334       emitSchedule = rtsFalse;
1335     }
1336 }     
1337 #endif
1338
1339 /* ----------------------------------------------------------------------------
1340  * After running a thread...
1341  * ASSUMES: sched_mutex
1342  * ------------------------------------------------------------------------- */
1343
1344 static void
1345 schedulePostRunThread(void)
1346 {
1347 #if defined(PAR)
1348     /* HACK 675: if the last thread didn't yield, make sure to print a 
1349        SCHEDULE event to the log file when StgRunning the next thread, even
1350        if it is the same one as before */
1351     LastTSO = t; 
1352     TimeOfLastYield = CURRENT_TIME;
1353 #endif
1354
1355   /* some statistics gathering in the parallel case */
1356
1357 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1358   switch (ret) {
1359     case HeapOverflow:
1360 # if defined(GRAN)
1361       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1362       globalGranStats.tot_heapover++;
1363 # elif defined(PAR)
1364       globalParStats.tot_heapover++;
1365 # endif
1366       break;
1367
1368      case StackOverflow:
1369 # if defined(GRAN)
1370       IF_DEBUG(gran, 
1371                DumpGranEvent(GR_DESCHEDULE, t));
1372       globalGranStats.tot_stackover++;
1373 # elif defined(PAR)
1374       // IF_DEBUG(par, 
1375       // DumpGranEvent(GR_DESCHEDULE, t);
1376       globalParStats.tot_stackover++;
1377 # endif
1378       break;
1379
1380     case ThreadYielding:
1381 # if defined(GRAN)
1382       IF_DEBUG(gran, 
1383                DumpGranEvent(GR_DESCHEDULE, t));
1384       globalGranStats.tot_yields++;
1385 # elif defined(PAR)
1386       // IF_DEBUG(par, 
1387       // DumpGranEvent(GR_DESCHEDULE, t);
1388       globalParStats.tot_yields++;
1389 # endif
1390       break; 
1391
1392     case ThreadBlocked:
1393 # if defined(GRAN)
1394       IF_DEBUG(scheduler,
1395                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1396                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1397                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1398                if (t->block_info.closure!=(StgClosure*)NULL)
1399                  print_bq(t->block_info.closure);
1400                debugBelch("\n"));
1401
1402       // ??? needed; should emit block before
1403       IF_DEBUG(gran, 
1404                DumpGranEvent(GR_DESCHEDULE, t)); 
1405       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1406       /*
1407         ngoq Dogh!
1408       ASSERT(procStatus[CurrentProc]==Busy || 
1409               ((procStatus[CurrentProc]==Fetching) && 
1410               (t->block_info.closure!=(StgClosure*)NULL)));
1411       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1412           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1413             procStatus[CurrentProc]==Fetching)) 
1414         procStatus[CurrentProc] = Idle;
1415       */
1416 # elif defined(PAR)
1417 //++PAR++  blockThread() writes the event (change?)
1418 # endif
1419     break;
1420
1421   case ThreadFinished:
1422     break;
1423
1424   default:
1425     barf("parGlobalStats: unknown return code");
1426     break;
1427     }
1428 #endif
1429 }
1430
1431 /* -----------------------------------------------------------------------------
1432  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1433  * ASSUMES: sched_mutex
1434  * -------------------------------------------------------------------------- */
1435
1436 static rtsBool
1437 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1438 {
1439     // did the task ask for a large block?
1440     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1441         // if so, get one and push it on the front of the nursery.
1442         bdescr *bd;
1443         lnat blocks;
1444         
1445         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1446         
1447         IF_DEBUG(scheduler,
1448                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1449                             (long)t->id, whatNext_strs[t->what_next], blocks));
1450         
1451         // don't do this if it would push us over the
1452         // alloc_blocks_lim limit; we'll GC first.
1453         if (alloc_blocks + blocks < alloc_blocks_lim) {
1454             
1455             alloc_blocks += blocks;
1456             bd = allocGroup( blocks );
1457             
1458             // link the new group into the list
1459             bd->link = cap->r.rCurrentNursery;
1460             bd->u.back = cap->r.rCurrentNursery->u.back;
1461             if (cap->r.rCurrentNursery->u.back != NULL) {
1462                 cap->r.rCurrentNursery->u.back->link = bd;
1463             } else {
1464 #if !defined(SMP)
1465                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1466                        g0s0->blocks == cap->r.rNursery);
1467                 g0s0->blocks = bd;
1468 #endif
1469                 cap->r.rNursery = bd;
1470             }             
1471             cap->r.rCurrentNursery->u.back = bd;
1472             
1473             // initialise it as a nursery block.  We initialise the
1474             // step, gen_no, and flags field of *every* sub-block in
1475             // this large block, because this is easier than making
1476             // sure that we always find the block head of a large
1477             // block whenever we call Bdescr() (eg. evacuate() and
1478             // isAlive() in the GC would both have to do this, at
1479             // least).
1480             { 
1481                 bdescr *x;
1482                 for (x = bd; x < bd + blocks; x++) {
1483                     x->step = g0s0;
1484                     x->gen_no = 0;
1485                     x->flags = 0;
1486                 }
1487             }
1488             
1489 #if !defined(SMP)
1490             // don't forget to update the block count in g0s0.
1491             g0s0->n_blocks += blocks;
1492
1493             // This assert can be a killer if the app is doing lots
1494             // of large block allocations.
1495             ASSERT(countBlocks(g0s0->blocks) == g0s0->n_blocks);
1496 #endif
1497             
1498             // now update the nursery to point to the new block
1499             cap->r.rCurrentNursery = bd;
1500             
1501             // we might be unlucky and have another thread get on the
1502             // run queue before us and steal the large block, but in that
1503             // case the thread will just end up requesting another large
1504             // block.
1505             PUSH_ON_RUN_QUEUE(t);
1506             return rtsFalse;  /* not actually GC'ing */
1507         }
1508     }
1509     
1510     /* make all the running tasks block on a condition variable,
1511      * maybe set context_switch and wait till they all pile in,
1512      * then have them wait on a GC condition variable.
1513      */
1514     IF_DEBUG(scheduler,
1515              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1516                         (long)t->id, whatNext_strs[t->what_next]));
1517     threadPaused(t);
1518 #if defined(GRAN)
1519     ASSERT(!is_on_queue(t,CurrentProc));
1520 #elif defined(PARALLEL_HASKELL)
1521     /* Currently we emit a DESCHEDULE event before GC in GUM.
1522        ToDo: either add separate event to distinguish SYSTEM time from rest
1523        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1524     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1525         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1526                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1527         emitSchedule = rtsTrue;
1528     }
1529 #endif
1530       
1531     PUSH_ON_RUN_QUEUE(t);
1532     return rtsTrue;
1533     /* actual GC is done at the end of the while loop in schedule() */
1534 }
1535
1536 /* -----------------------------------------------------------------------------
1537  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1538  * ASSUMES: sched_mutex
1539  * -------------------------------------------------------------------------- */
1540
1541 static void
1542 scheduleHandleStackOverflow( StgTSO *t)
1543 {
1544     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1545                                   (long)t->id, whatNext_strs[t->what_next]));
1546     /* just adjust the stack for this thread, then pop it back
1547      * on the run queue.
1548      */
1549     threadPaused(t);
1550     { 
1551         /* enlarge the stack */
1552         StgTSO *new_t = threadStackOverflow(t);
1553         
1554         /* This TSO has moved, so update any pointers to it from the
1555          * main thread stack.  It better not be on any other queues...
1556          * (it shouldn't be).
1557          */
1558         if (t->main != NULL) {
1559             t->main->tso = new_t;
1560         }
1561         PUSH_ON_RUN_QUEUE(new_t);
1562     }
1563 }
1564
1565 /* -----------------------------------------------------------------------------
1566  * Handle a thread that returned to the scheduler with ThreadYielding
1567  * ASSUMES: sched_mutex
1568  * -------------------------------------------------------------------------- */
1569
1570 static rtsBool
1571 scheduleHandleYield( StgTSO *t, nat prev_what_next )
1572 {
1573     // Reset the context switch flag.  We don't do this just before
1574     // running the thread, because that would mean we would lose ticks
1575     // during GC, which can lead to unfair scheduling (a thread hogs
1576     // the CPU because the tick always arrives during GC).  This way
1577     // penalises threads that do a lot of allocation, but that seems
1578     // better than the alternative.
1579     context_switch = 0;
1580     
1581     /* put the thread back on the run queue.  Then, if we're ready to
1582      * GC, check whether this is the last task to stop.  If so, wake
1583      * up the GC thread.  getThread will block during a GC until the
1584      * GC is finished.
1585      */
1586     IF_DEBUG(scheduler,
1587              if (t->what_next != prev_what_next) {
1588                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1589                             (long)t->id, whatNext_strs[t->what_next]);
1590              } else {
1591                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1592                             (long)t->id, whatNext_strs[t->what_next]);
1593              }
1594         );
1595     
1596     IF_DEBUG(sanity,
1597              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1598              checkTSO(t));
1599     ASSERT(t->link == END_TSO_QUEUE);
1600     
1601     // Shortcut if we're just switching evaluators: don't bother
1602     // doing stack squeezing (which can be expensive), just run the
1603     // thread.
1604     if (t->what_next != prev_what_next) {
1605         return rtsTrue;
1606     }
1607     
1608     threadPaused(t);
1609     
1610 #if defined(GRAN)
1611     ASSERT(!is_on_queue(t,CurrentProc));
1612       
1613     IF_DEBUG(sanity,
1614              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1615              checkThreadQsSanity(rtsTrue));
1616
1617 #endif
1618
1619     addToRunQueue(t);
1620
1621 #if defined(GRAN)
1622     /* add a ContinueThread event to actually process the thread */
1623     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1624               ContinueThread,
1625               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1626     IF_GRAN_DEBUG(bq, 
1627                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1628                   G_EVENTQ(0);
1629                   G_CURR_THREADQ(0));
1630 #endif
1631     return rtsFalse;
1632 }
1633
1634 /* -----------------------------------------------------------------------------
1635  * Handle a thread that returned to the scheduler with ThreadBlocked
1636  * ASSUMES: sched_mutex
1637  * -------------------------------------------------------------------------- */
1638
1639 static void
1640 scheduleHandleThreadBlocked( StgTSO *t
1641 #if !defined(GRAN) && !defined(DEBUG)
1642     STG_UNUSED
1643 #endif
1644     )
1645 {
1646 #if defined(GRAN)
1647     IF_DEBUG(scheduler,
1648              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1649                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1650              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1651     
1652     // ??? needed; should emit block before
1653     IF_DEBUG(gran, 
1654              DumpGranEvent(GR_DESCHEDULE, t)); 
1655     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1656     /*
1657       ngoq Dogh!
1658       ASSERT(procStatus[CurrentProc]==Busy || 
1659       ((procStatus[CurrentProc]==Fetching) && 
1660       (t->block_info.closure!=(StgClosure*)NULL)));
1661       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1662       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1663       procStatus[CurrentProc]==Fetching)) 
1664       procStatus[CurrentProc] = Idle;
1665     */
1666 #elif defined(PAR)
1667     IF_DEBUG(scheduler,
1668              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1669                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1670     IF_PAR_DEBUG(bq,
1671                  
1672                  if (t->block_info.closure!=(StgClosure*)NULL) 
1673                  print_bq(t->block_info.closure));
1674     
1675     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1676     blockThread(t);
1677     
1678     /* whatever we schedule next, we must log that schedule */
1679     emitSchedule = rtsTrue;
1680     
1681 #else /* !GRAN */
1682       /* don't need to do anything.  Either the thread is blocked on
1683        * I/O, in which case we'll have called addToBlockedQueue
1684        * previously, or it's blocked on an MVar or Blackhole, in which
1685        * case it'll be on the relevant queue already.
1686        */
1687     ASSERT(t->why_blocked != NotBlocked);
1688     IF_DEBUG(scheduler,
1689              debugBelch("--<< thread %d (%s) stopped: ", 
1690                         t->id, whatNext_strs[t->what_next]);
1691              printThreadBlockage(t);
1692              debugBelch("\n"));
1693     
1694     /* Only for dumping event to log file 
1695        ToDo: do I need this in GranSim, too?
1696        blockThread(t);
1697     */
1698 #endif
1699 }
1700
1701 /* -----------------------------------------------------------------------------
1702  * Handle a thread that returned to the scheduler with ThreadFinished
1703  * ASSUMES: sched_mutex
1704  * -------------------------------------------------------------------------- */
1705
1706 static rtsBool
1707 scheduleHandleThreadFinished( StgMainThread *mainThread
1708                               USED_WHEN_RTS_SUPPORTS_THREADS,
1709                               Capability *cap,
1710                               StgTSO *t )
1711 {
1712     /* Need to check whether this was a main thread, and if so,
1713      * return with the return value.
1714      *
1715      * We also end up here if the thread kills itself with an
1716      * uncaught exception, see Exception.cmm.
1717      */
1718     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1719                                   t->id, whatNext_strs[t->what_next]));
1720
1721 #if defined(GRAN)
1722       endThread(t, CurrentProc); // clean-up the thread
1723 #elif defined(PARALLEL_HASKELL)
1724       /* For now all are advisory -- HWL */
1725       //if(t->priority==AdvisoryPriority) ??
1726       advisory_thread_count--; // JB: Caution with this counter, buggy!
1727       
1728 # if defined(DIST)
1729       if(t->dist.priority==RevalPriority)
1730         FinishReval(t);
1731 # endif
1732     
1733 # if defined(EDENOLD)
1734       // the thread could still have an outport... (BUG)
1735       if (t->eden.outport != -1) {
1736       // delete the outport for the tso which has finished...
1737         IF_PAR_DEBUG(eden_ports,
1738                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1739                               t->eden.outport, t->id));
1740         deleteOPT(t);
1741       }
1742       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1743       if (t->eden.epid != -1) {
1744         IF_PAR_DEBUG(eden_ports,
1745                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1746                            t->id, t->eden.epid));
1747         removeTSOfromProcess(t);
1748       }
1749 # endif 
1750
1751 # if defined(PAR)
1752       if (RtsFlags.ParFlags.ParStats.Full &&
1753           !RtsFlags.ParFlags.ParStats.Suppressed) 
1754         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1755
1756       //  t->par only contains statistics: left out for now...
1757       IF_PAR_DEBUG(fish,
1758                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1759                               t->id,t,t->par.sparkname));
1760 # endif
1761 #endif // PARALLEL_HASKELL
1762
1763       //
1764       // Check whether the thread that just completed was a main
1765       // thread, and if so return with the result.  
1766       //
1767       // There is an assumption here that all thread completion goes
1768       // through this point; we need to make sure that if a thread
1769       // ends up in the ThreadKilled state, that it stays on the run
1770       // queue so it can be dealt with here.
1771       //
1772       if (
1773 #if defined(RTS_SUPPORTS_THREADS)
1774           mainThread != NULL
1775 #else
1776           mainThread->tso == t
1777 #endif
1778           )
1779       {
1780           // We are a bound thread: this must be our thread that just
1781           // completed.
1782           ASSERT(mainThread->tso == t);
1783
1784           if (t->what_next == ThreadComplete) {
1785               if (mainThread->ret) {
1786                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1787                   *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; 
1788               }
1789               mainThread->stat = Success;
1790           } else {
1791               if (mainThread->ret) {
1792                   *(mainThread->ret) = NULL;
1793               }
1794               if (interrupted) {
1795                   mainThread->stat = Interrupted;
1796               } else {
1797                   mainThread->stat = Killed;
1798               }
1799           }
1800 #ifdef DEBUG
1801           removeThreadLabel((StgWord)mainThread->tso->id);
1802 #endif
1803           if (mainThread->prev == NULL) {
1804               main_threads = mainThread->link;
1805           } else {
1806               mainThread->prev->link = mainThread->link;
1807           }
1808           if (mainThread->link != NULL) {
1809               mainThread->link->prev = NULL;
1810           }
1811           releaseCapability(cap);
1812           return rtsTrue; // tells schedule() to return
1813       }
1814
1815 #ifdef RTS_SUPPORTS_THREADS
1816       ASSERT(t->main == NULL);
1817 #else
1818       if (t->main != NULL) {
1819           // Must be a main thread that is not the topmost one.  Leave
1820           // it on the run queue until the stack has unwound to the
1821           // point where we can deal with this.  Leaving it on the run
1822           // queue also ensures that the garbage collector knows about
1823           // this thread and its return value (it gets dropped from the
1824           // all_threads list so there's no other way to find it).
1825           APPEND_TO_RUN_QUEUE(t);
1826       }
1827 #endif
1828       return rtsFalse;
1829 }
1830
1831 /* -----------------------------------------------------------------------------
1832  * Perform a heap census, if PROFILING
1833  * -------------------------------------------------------------------------- */
1834
1835 static rtsBool
1836 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1837 {
1838 #if defined(PROFILING)
1839     // When we have +RTS -i0 and we're heap profiling, do a census at
1840     // every GC.  This lets us get repeatable runs for debugging.
1841     if (performHeapProfile ||
1842         (RtsFlags.ProfFlags.profileInterval==0 &&
1843          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1844         GarbageCollect(GetRoots, rtsTrue);
1845         heapCensus();
1846         performHeapProfile = rtsFalse;
1847         return rtsTrue;  // true <=> we already GC'd
1848     }
1849 #endif
1850     return rtsFalse;
1851 }
1852
1853 /* -----------------------------------------------------------------------------
1854  * Perform a garbage collection if necessary
1855  * ASSUMES: sched_mutex
1856  * -------------------------------------------------------------------------- */
1857
1858 static void
1859 scheduleDoGC( Capability *cap STG_UNUSED )
1860 {
1861     StgTSO *t;
1862 #ifdef SMP
1863     int n_capabilities = RtsFlags.ParFlags.nNodes - 1; 
1864            // subtract one because we're already holding one.
1865     Capability *caps[n_capabilities];
1866 #endif
1867
1868 #ifdef SMP
1869     // In order to GC, there must be no threads running Haskell code.
1870     // Therefore, the GC thread needs to hold *all* the capabilities,
1871     // and release them after the GC has completed.  
1872     //
1873     // This seems to be the simplest way: previous attempts involved
1874     // making all the threads with capabilities give up their
1875     // capabilities and sleep except for the *last* one, which
1876     // actually did the GC.  But it's quite hard to arrange for all
1877     // the other tasks to sleep and stay asleep.
1878     //
1879         
1880     caps[n_capabilities] = cap;
1881     while (n_capabilities > 0) {
1882         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities));
1883         waitForReturnCapability(&sched_mutex, &cap);
1884         n_capabilities--;
1885         caps[n_capabilities] = cap;
1886     }
1887 #endif
1888
1889     /* Kick any transactions which are invalid back to their
1890      * atomically frames.  When next scheduled they will try to
1891      * commit, this commit will fail and they will retry.
1892      */
1893     for (t = all_threads; t != END_TSO_QUEUE; t = t -> link) {
1894         if (t -> what_next != ThreadRelocated && t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1895             if (!stmValidateTransaction (t -> trec)) {
1896                 IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
1897                 
1898                 // strip the stack back to the ATOMICALLY_FRAME, aborting
1899                 // the (nested) transaction, and saving the stack of any
1900                 // partially-evaluated thunks on the heap.
1901                 raiseAsync_(t, NULL, rtsTrue);
1902                 
1903 #ifdef REG_R1
1904                 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
1905 #endif
1906             }
1907         }
1908     }
1909     
1910     // so this happens periodically:
1911     scheduleCheckBlackHoles();
1912     
1913     /* everybody back, start the GC.
1914      * Could do it in this thread, or signal a condition var
1915      * to do it in another thread.  Either way, we need to
1916      * broadcast on gc_pending_cond afterward.
1917      */
1918 #if defined(RTS_SUPPORTS_THREADS)
1919     IF_DEBUG(scheduler,sched_belch("doing GC"));
1920 #endif
1921     GarbageCollect(GetRoots,rtsFalse);
1922     
1923 #if defined(SMP)
1924     {
1925         // release our stash of capabilities.
1926         nat i;
1927         for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) {
1928             releaseCapability(caps[i]);
1929         }
1930     }
1931 #endif
1932
1933 #if defined(GRAN)
1934     /* add a ContinueThread event to continue execution of current thread */
1935     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1936               ContinueThread,
1937               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1938     IF_GRAN_DEBUG(bq, 
1939                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
1940                   G_EVENTQ(0);
1941                   G_CURR_THREADQ(0));
1942 #endif /* GRAN */
1943 }
1944
1945 /* ---------------------------------------------------------------------------
1946  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
1947  * used by Control.Concurrent for error checking.
1948  * ------------------------------------------------------------------------- */
1949  
1950 StgBool
1951 rtsSupportsBoundThreads(void)
1952 {
1953 #ifdef THREADED_RTS
1954   return rtsTrue;
1955 #else
1956   return rtsFalse;
1957 #endif
1958 }
1959
1960 /* ---------------------------------------------------------------------------
1961  * isThreadBound(tso): check whether tso is bound to an OS thread.
1962  * ------------------------------------------------------------------------- */
1963  
1964 StgBool
1965 isThreadBound(StgTSO* tso USED_IN_THREADED_RTS)
1966 {
1967 #ifdef THREADED_RTS
1968   return (tso->main != NULL);
1969 #endif
1970   return rtsFalse;
1971 }
1972
1973 /* ---------------------------------------------------------------------------
1974  * Singleton fork(). Do not copy any running threads.
1975  * ------------------------------------------------------------------------- */
1976
1977 #ifndef mingw32_HOST_OS
1978 #define FORKPROCESS_PRIMOP_SUPPORTED
1979 #endif
1980
1981 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1982 static void 
1983 deleteThreadImmediately(StgTSO *tso);
1984 #endif
1985 StgInt
1986 forkProcess(HsStablePtr *entry
1987 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
1988             STG_UNUSED
1989 #endif
1990            )
1991 {
1992 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
1993   pid_t pid;
1994   StgTSO* t,*next;
1995   StgMainThread *m;
1996   SchedulerStatus rc;
1997
1998   IF_DEBUG(scheduler,sched_belch("forking!"));
1999   rts_lock(); // This not only acquires sched_mutex, it also
2000               // makes sure that no other threads are running
2001
2002   pid = fork();
2003
2004   if (pid) { /* parent */
2005
2006   /* just return the pid */
2007     rts_unlock();
2008     return pid;
2009     
2010   } else { /* child */
2011     
2012     
2013       // delete all threads
2014     run_queue_hd = run_queue_tl = END_TSO_QUEUE;
2015     
2016     for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2017       next = t->link;
2018
2019         // don't allow threads to catch the ThreadKilled exception
2020       deleteThreadImmediately(t);
2021     }
2022     
2023       // wipe the main thread list
2024     while((m = main_threads) != NULL) {
2025       main_threads = m->link;
2026 # ifdef THREADED_RTS
2027       closeCondition(&m->bound_thread_cond);
2028 # endif
2029       stgFree(m);
2030     }
2031     
2032     rc = rts_evalStableIO(entry, NULL);  // run the action
2033     rts_checkSchedStatus("forkProcess",rc);
2034     
2035     rts_unlock();
2036     
2037     hs_exit();                      // clean up and exit
2038     stg_exit(0);
2039   }
2040 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2041   barf("forkProcess#: primop not supported, sorry!\n");
2042   return -1;
2043 #endif
2044 }
2045
2046 /* ---------------------------------------------------------------------------
2047  * deleteAllThreads():  kill all the live threads.
2048  *
2049  * This is used when we catch a user interrupt (^C), before performing
2050  * any necessary cleanups and running finalizers.
2051  *
2052  * Locks: sched_mutex held.
2053  * ------------------------------------------------------------------------- */
2054    
2055 void
2056 deleteAllThreads ( void )
2057 {
2058   StgTSO* t, *next;
2059   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2060   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2061       next = t->global_link;
2062       deleteThread(t);
2063   }      
2064
2065   // The run queue now contains a bunch of ThreadKilled threads.  We
2066   // must not throw these away: the main thread(s) will be in there
2067   // somewhere, and the main scheduler loop has to deal with it.
2068   // Also, the run queue is the only thing keeping these threads from
2069   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2070
2071   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2072   ASSERT(blackhole_queue == END_TSO_QUEUE);
2073   ASSERT(sleeping_queue == END_TSO_QUEUE);
2074 }
2075
2076 /* startThread and  insertThread are now in GranSim.c -- HWL */
2077
2078
2079 /* ---------------------------------------------------------------------------
2080  * Suspending & resuming Haskell threads.
2081  * 
2082  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2083  * its capability before calling the C function.  This allows another
2084  * task to pick up the capability and carry on running Haskell
2085  * threads.  It also means that if the C call blocks, it won't lock
2086  * the whole system.
2087  *
2088  * The Haskell thread making the C call is put to sleep for the
2089  * duration of the call, on the susepended_ccalling_threads queue.  We
2090  * give out a token to the task, which it can use to resume the thread
2091  * on return from the C function.
2092  * ------------------------------------------------------------------------- */
2093    
2094 StgInt
2095 suspendThread( StgRegTable *reg )
2096 {
2097   nat tok;
2098   Capability *cap;
2099   int saved_errno = errno;
2100
2101   /* assume that *reg is a pointer to the StgRegTable part
2102    * of a Capability.
2103    */
2104   cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable)));
2105
2106   ACQUIRE_LOCK(&sched_mutex);
2107
2108   IF_DEBUG(scheduler,
2109            sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id));
2110
2111   // XXX this might not be necessary --SDM
2112   cap->r.rCurrentTSO->what_next = ThreadRunGHC;
2113
2114   threadPaused(cap->r.rCurrentTSO);
2115   cap->r.rCurrentTSO->link = suspended_ccalling_threads;
2116   suspended_ccalling_threads = cap->r.rCurrentTSO;
2117
2118   if(cap->r.rCurrentTSO->blocked_exceptions == NULL)  {
2119       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall;
2120       cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE;
2121   } else {
2122       cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc;
2123   }
2124
2125   /* Use the thread ID as the token; it should be unique */
2126   tok = cap->r.rCurrentTSO->id;
2127
2128   /* Hand back capability */
2129   cap->r.rInHaskell = rtsFalse;
2130   releaseCapability(cap);
2131   
2132 #if defined(RTS_SUPPORTS_THREADS)
2133   /* Preparing to leave the RTS, so ensure there's a native thread/task
2134      waiting to take over.
2135   */
2136   IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok));
2137 #endif
2138
2139   RELEASE_LOCK(&sched_mutex);
2140   
2141   errno = saved_errno;
2142   return tok; 
2143 }
2144
2145 StgRegTable *
2146 resumeThread( StgInt tok )
2147 {
2148   StgTSO *tso, **prev;
2149   Capability *cap;
2150   int saved_errno = errno;
2151
2152 #if defined(RTS_SUPPORTS_THREADS)
2153   /* Wait for permission to re-enter the RTS with the result. */
2154   ACQUIRE_LOCK(&sched_mutex);
2155   waitForReturnCapability(&sched_mutex, &cap);
2156
2157   IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok));
2158 #else
2159   grabCapability(&cap);
2160 #endif
2161
2162   /* Remove the thread off of the suspended list */
2163   prev = &suspended_ccalling_threads;
2164   for (tso = suspended_ccalling_threads; 
2165        tso != END_TSO_QUEUE; 
2166        prev = &tso->link, tso = tso->link) {
2167     if (tso->id == (StgThreadID)tok) {
2168       *prev = tso->link;
2169       break;
2170     }
2171   }
2172   if (tso == END_TSO_QUEUE) {
2173     barf("resumeThread: thread not found");
2174   }
2175   tso->link = END_TSO_QUEUE;
2176   
2177   if(tso->why_blocked == BlockedOnCCall) {
2178       awakenBlockedQueueNoLock(tso->blocked_exceptions);
2179       tso->blocked_exceptions = NULL;
2180   }
2181   
2182   /* Reset blocking status */
2183   tso->why_blocked  = NotBlocked;
2184
2185   cap->r.rCurrentTSO = tso;
2186   cap->r.rInHaskell = rtsTrue;
2187   RELEASE_LOCK(&sched_mutex);
2188   errno = saved_errno;
2189   return &cap->r;
2190 }
2191
2192 /* ---------------------------------------------------------------------------
2193  * Comparing Thread ids.
2194  *
2195  * This is used from STG land in the implementation of the
2196  * instances of Eq/Ord for ThreadIds.
2197  * ------------------------------------------------------------------------ */
2198
2199 int
2200 cmp_thread(StgPtr tso1, StgPtr tso2) 
2201
2202   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2203   StgThreadID id2 = ((StgTSO *)tso2)->id;
2204  
2205   if (id1 < id2) return (-1);
2206   if (id1 > id2) return 1;
2207   return 0;
2208 }
2209
2210 /* ---------------------------------------------------------------------------
2211  * Fetching the ThreadID from an StgTSO.
2212  *
2213  * This is used in the implementation of Show for ThreadIds.
2214  * ------------------------------------------------------------------------ */
2215 int
2216 rts_getThreadId(StgPtr tso) 
2217 {
2218   return ((StgTSO *)tso)->id;
2219 }
2220
2221 #ifdef DEBUG
2222 void
2223 labelThread(StgPtr tso, char *label)
2224 {
2225   int len;
2226   void *buf;
2227
2228   /* Caveat: Once set, you can only set the thread name to "" */
2229   len = strlen(label)+1;
2230   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2231   strncpy(buf,label,len);
2232   /* Update will free the old memory for us */
2233   updateThreadLabel(((StgTSO *)tso)->id,buf);
2234 }
2235 #endif /* DEBUG */
2236
2237 /* ---------------------------------------------------------------------------
2238    Create a new thread.
2239
2240    The new thread starts with the given stack size.  Before the
2241    scheduler can run, however, this thread needs to have a closure
2242    (and possibly some arguments) pushed on its stack.  See
2243    pushClosure() in Schedule.h.
2244
2245    createGenThread() and createIOThread() (in SchedAPI.h) are
2246    convenient packaged versions of this function.
2247
2248    currently pri (priority) is only used in a GRAN setup -- HWL
2249    ------------------------------------------------------------------------ */
2250 #if defined(GRAN)
2251 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2252 StgTSO *
2253 createThread(nat size, StgInt pri)
2254 #else
2255 StgTSO *
2256 createThread(nat size)
2257 #endif
2258 {
2259
2260     StgTSO *tso;
2261     nat stack_size;
2262
2263     /* First check whether we should create a thread at all */
2264 #if defined(PARALLEL_HASKELL)
2265   /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2266   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2267     threadsIgnored++;
2268     debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2269           RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2270     return END_TSO_QUEUE;
2271   }
2272   threadsCreated++;
2273 #endif
2274
2275 #if defined(GRAN)
2276   ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2277 #endif
2278
2279   // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2280
2281   /* catch ridiculously small stack sizes */
2282   if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2283     size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2284   }
2285
2286   stack_size = size - TSO_STRUCT_SIZEW;
2287
2288   tso = (StgTSO *)allocate(size);
2289   TICK_ALLOC_TSO(stack_size, 0);
2290
2291   SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2292 #if defined(GRAN)
2293   SET_GRAN_HDR(tso, ThisPE);
2294 #endif
2295
2296   // Always start with the compiled code evaluator
2297   tso->what_next = ThreadRunGHC;
2298
2299   tso->id = next_thread_id++; 
2300   tso->why_blocked  = NotBlocked;
2301   tso->blocked_exceptions = NULL;
2302
2303   tso->saved_errno = 0;
2304   tso->main = NULL;
2305   
2306   tso->stack_size   = stack_size;
2307   tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2308                               - TSO_STRUCT_SIZEW;
2309   tso->sp           = (P_)&(tso->stack) + stack_size;
2310
2311   tso->trec = NO_TREC;
2312
2313 #ifdef PROFILING
2314   tso->prof.CCCS = CCS_MAIN;
2315 #endif
2316
2317   /* put a stop frame on the stack */
2318   tso->sp -= sizeofW(StgStopFrame);
2319   SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2320   tso->link = END_TSO_QUEUE;
2321
2322   // ToDo: check this
2323 #if defined(GRAN)
2324   /* uses more flexible routine in GranSim */
2325   insertThread(tso, CurrentProc);
2326 #else
2327   /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2328    * from its creation
2329    */
2330 #endif
2331
2332 #if defined(GRAN) 
2333   if (RtsFlags.GranFlags.GranSimStats.Full) 
2334     DumpGranEvent(GR_START,tso);
2335 #elif defined(PARALLEL_HASKELL)
2336   if (RtsFlags.ParFlags.ParStats.Full) 
2337     DumpGranEvent(GR_STARTQ,tso);
2338   /* HACk to avoid SCHEDULE 
2339      LastTSO = tso; */
2340 #endif
2341
2342   /* Link the new thread on the global thread list.
2343    */
2344   tso->global_link = all_threads;
2345   all_threads = tso;
2346
2347 #if defined(DIST)
2348   tso->dist.priority = MandatoryPriority; //by default that is...
2349 #endif
2350
2351 #if defined(GRAN)
2352   tso->gran.pri = pri;
2353 # if defined(DEBUG)
2354   tso->gran.magic = TSO_MAGIC; // debugging only
2355 # endif
2356   tso->gran.sparkname   = 0;
2357   tso->gran.startedat   = CURRENT_TIME; 
2358   tso->gran.exported    = 0;
2359   tso->gran.basicblocks = 0;
2360   tso->gran.allocs      = 0;
2361   tso->gran.exectime    = 0;
2362   tso->gran.fetchtime   = 0;
2363   tso->gran.fetchcount  = 0;
2364   tso->gran.blocktime   = 0;
2365   tso->gran.blockcount  = 0;
2366   tso->gran.blockedat   = 0;
2367   tso->gran.globalsparks = 0;
2368   tso->gran.localsparks  = 0;
2369   if (RtsFlags.GranFlags.Light)
2370     tso->gran.clock  = Now; /* local clock */
2371   else
2372     tso->gran.clock  = 0;
2373
2374   IF_DEBUG(gran,printTSO(tso));
2375 #elif defined(PARALLEL_HASKELL)
2376 # if defined(DEBUG)
2377   tso->par.magic = TSO_MAGIC; // debugging only
2378 # endif
2379   tso->par.sparkname   = 0;
2380   tso->par.startedat   = CURRENT_TIME; 
2381   tso->par.exported    = 0;
2382   tso->par.basicblocks = 0;
2383   tso->par.allocs      = 0;
2384   tso->par.exectime    = 0;
2385   tso->par.fetchtime   = 0;
2386   tso->par.fetchcount  = 0;
2387   tso->par.blocktime   = 0;
2388   tso->par.blockcount  = 0;
2389   tso->par.blockedat   = 0;
2390   tso->par.globalsparks = 0;
2391   tso->par.localsparks  = 0;
2392 #endif
2393
2394 #if defined(GRAN)
2395   globalGranStats.tot_threads_created++;
2396   globalGranStats.threads_created_on_PE[CurrentProc]++;
2397   globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2398   globalGranStats.tot_sq_probes++;
2399 #elif defined(PARALLEL_HASKELL)
2400   // collect parallel global statistics (currently done together with GC stats)
2401   if (RtsFlags.ParFlags.ParStats.Global &&
2402       RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2403     //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2404     globalParStats.tot_threads_created++;
2405   }
2406 #endif 
2407
2408 #if defined(GRAN)
2409   IF_GRAN_DEBUG(pri,
2410                 sched_belch("==__ schedule: Created TSO %d (%p);",
2411                       CurrentProc, tso, tso->id));
2412 #elif defined(PARALLEL_HASKELL)
2413   IF_PAR_DEBUG(verbose,
2414                sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2415                            (long)tso->id, tso, advisory_thread_count));
2416 #else
2417   IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2418                                  (long)tso->id, (long)tso->stack_size));
2419 #endif    
2420   return tso;
2421 }
2422
2423 #if defined(PAR)
2424 /* RFP:
2425    all parallel thread creation calls should fall through the following routine.
2426 */
2427 StgTSO *
2428 createThreadFromSpark(rtsSpark spark) 
2429 { StgTSO *tso;
2430   ASSERT(spark != (rtsSpark)NULL);
2431 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2432   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2433   { threadsIgnored++;
2434     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2435           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2436     return END_TSO_QUEUE;
2437   }
2438   else
2439   { threadsCreated++;
2440     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2441     if (tso==END_TSO_QUEUE)     
2442       barf("createSparkThread: Cannot create TSO");
2443 #if defined(DIST)
2444     tso->priority = AdvisoryPriority;
2445 #endif
2446     pushClosure(tso,spark);
2447     addToRunQueue(tso);
2448     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2449   }
2450   return tso;
2451 }
2452 #endif
2453
2454 /*
2455   Turn a spark into a thread.
2456   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2457 */
2458 #if 0
2459 StgTSO *
2460 activateSpark (rtsSpark spark) 
2461 {
2462   StgTSO *tso;
2463
2464   tso = createSparkThread(spark);
2465   if (RtsFlags.ParFlags.ParStats.Full) {   
2466     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2467       IF_PAR_DEBUG(verbose,
2468                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2469                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2470   }
2471   // ToDo: fwd info on local/global spark to thread -- HWL
2472   // tso->gran.exported =  spark->exported;
2473   // tso->gran.locked =   !spark->global;
2474   // tso->gran.sparkname = spark->name;
2475
2476   return tso;
2477 }
2478 #endif
2479
2480 /* ---------------------------------------------------------------------------
2481  * scheduleThread()
2482  *
2483  * scheduleThread puts a thread on the head of the runnable queue.
2484  * This will usually be done immediately after a thread is created.
2485  * The caller of scheduleThread must create the thread using e.g.
2486  * createThread and push an appropriate closure
2487  * on this thread's stack before the scheduler is invoked.
2488  * ------------------------------------------------------------------------ */
2489
2490 static void
2491 scheduleThread_(StgTSO *tso)
2492 {
2493   // The thread goes at the *end* of the run-queue, to avoid possible
2494   // starvation of any threads already on the queue.
2495   APPEND_TO_RUN_QUEUE(tso);
2496   threadRunnable();
2497 }
2498
2499 void
2500 scheduleThread(StgTSO* tso)
2501 {
2502   ACQUIRE_LOCK(&sched_mutex);
2503   scheduleThread_(tso);
2504   RELEASE_LOCK(&sched_mutex);
2505 }
2506
2507 #if defined(RTS_SUPPORTS_THREADS)
2508 static Condition bound_cond_cache;
2509 static int bound_cond_cache_full = 0;
2510 #endif
2511
2512
2513 SchedulerStatus
2514 scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret,
2515                    Capability *initialCapability)
2516 {
2517     // Precondition: sched_mutex must be held
2518     StgMainThread *m;
2519
2520     m = stgMallocBytes(sizeof(StgMainThread), "waitThread");
2521     m->tso = tso;
2522     tso->main = m;
2523     m->ret = ret;
2524     m->stat = NoStatus;
2525     m->link = main_threads;
2526     m->prev = NULL;
2527     if (main_threads != NULL) {
2528         main_threads->prev = m;
2529     }
2530     main_threads = m;
2531
2532 #if defined(RTS_SUPPORTS_THREADS)
2533     // Allocating a new condition for each thread is expensive, so we
2534     // cache one.  This is a pretty feeble hack, but it helps speed up
2535     // consecutive call-ins quite a bit.
2536     if (bound_cond_cache_full) {
2537         m->bound_thread_cond = bound_cond_cache;
2538         bound_cond_cache_full = 0;
2539     } else {
2540         initCondition(&m->bound_thread_cond);
2541     }
2542 #endif
2543
2544     /* Put the thread on the main-threads list prior to scheduling the TSO.
2545        Failure to do so introduces a race condition in the MT case (as
2546        identified by Wolfgang Thaller), whereby the new task/OS thread 
2547        created by scheduleThread_() would complete prior to the thread
2548        that spawned it managed to put 'itself' on the main-threads list.
2549        The upshot of it all being that the worker thread wouldn't get to
2550        signal the completion of the its work item for the main thread to
2551        see (==> it got stuck waiting.)    -- sof 6/02.
2552     */
2553     IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id));
2554     
2555     APPEND_TO_RUN_QUEUE(tso);
2556     // NB. Don't call threadRunnable() here, because the thread is
2557     // bound and only runnable by *this* OS thread, so waking up other
2558     // workers will just slow things down.
2559
2560     return waitThread_(m, initialCapability);
2561 }
2562
2563 /* ---------------------------------------------------------------------------
2564  * initScheduler()
2565  *
2566  * Initialise the scheduler.  This resets all the queues - if the
2567  * queues contained any threads, they'll be garbage collected at the
2568  * next pass.
2569  *
2570  * ------------------------------------------------------------------------ */
2571
2572 void 
2573 initScheduler(void)
2574 {
2575 #if defined(GRAN)
2576   nat i;
2577
2578   for (i=0; i<=MAX_PROC; i++) {
2579     run_queue_hds[i]      = END_TSO_QUEUE;
2580     run_queue_tls[i]      = END_TSO_QUEUE;
2581     blocked_queue_hds[i]  = END_TSO_QUEUE;
2582     blocked_queue_tls[i]  = END_TSO_QUEUE;
2583     ccalling_threadss[i]  = END_TSO_QUEUE;
2584     blackhole_queue[i]    = END_TSO_QUEUE;
2585     sleeping_queue        = END_TSO_QUEUE;
2586   }
2587 #else
2588   run_queue_hd      = END_TSO_QUEUE;
2589   run_queue_tl      = END_TSO_QUEUE;
2590   blocked_queue_hd  = END_TSO_QUEUE;
2591   blocked_queue_tl  = END_TSO_QUEUE;
2592   blackhole_queue   = END_TSO_QUEUE;
2593   sleeping_queue    = END_TSO_QUEUE;
2594 #endif 
2595
2596   suspended_ccalling_threads  = END_TSO_QUEUE;
2597
2598   main_threads = NULL;
2599   all_threads  = END_TSO_QUEUE;
2600
2601   context_switch = 0;
2602   interrupted    = 0;
2603
2604   RtsFlags.ConcFlags.ctxtSwitchTicks =
2605       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2606       
2607 #if defined(RTS_SUPPORTS_THREADS)
2608   /* Initialise the mutex and condition variables used by
2609    * the scheduler. */
2610   initMutex(&sched_mutex);
2611   initMutex(&term_mutex);
2612 #endif
2613   
2614   ACQUIRE_LOCK(&sched_mutex);
2615
2616   /* A capability holds the state a native thread needs in
2617    * order to execute STG code. At least one capability is
2618    * floating around (only SMP builds have more than one).
2619    */
2620   initCapabilities();
2621   
2622 #if defined(RTS_SUPPORTS_THREADS)
2623   initTaskManager();
2624 #endif
2625
2626 #if defined(SMP)
2627   /* eagerly start some extra workers */
2628   startTasks(RtsFlags.ParFlags.nNodes, taskStart);
2629 #endif
2630
2631 #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
2632   initSparkPools();
2633 #endif
2634
2635   RELEASE_LOCK(&sched_mutex);
2636 }
2637
2638 void
2639 exitScheduler( void )
2640 {
2641     interrupted = rtsTrue;
2642     shutting_down_scheduler = rtsTrue;
2643 #if defined(RTS_SUPPORTS_THREADS)
2644     if (threadIsTask(osThreadId())) { taskStop(); }
2645     stopTaskManager();
2646 #endif
2647 }
2648
2649 /* ----------------------------------------------------------------------------
2650    Managing the per-task allocation areas.
2651    
2652    Each capability comes with an allocation area.  These are
2653    fixed-length block lists into which allocation can be done.
2654
2655    ToDo: no support for two-space collection at the moment???
2656    ------------------------------------------------------------------------- */
2657
2658 static SchedulerStatus
2659 waitThread_(StgMainThread* m, Capability *initialCapability)
2660 {
2661   SchedulerStatus stat;
2662
2663   // Precondition: sched_mutex must be held.
2664   IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id));
2665
2666 #if defined(GRAN)
2667   /* GranSim specific init */
2668   CurrentTSO = m->tso;                // the TSO to run
2669   procStatus[MainProc] = Busy;        // status of main PE
2670   CurrentProc = MainProc;             // PE to run it on
2671   schedule(m,initialCapability);
2672 #else
2673   schedule(m,initialCapability);
2674   ASSERT(m->stat != NoStatus);
2675 #endif
2676
2677   stat = m->stat;
2678
2679 #if defined(RTS_SUPPORTS_THREADS)
2680   // Free the condition variable, returning it to the cache if possible.
2681   if (!bound_cond_cache_full) {
2682       bound_cond_cache = m->bound_thread_cond;
2683       bound_cond_cache_full = 1;
2684   } else {
2685       closeCondition(&m->bound_thread_cond);
2686   }
2687 #endif
2688
2689   IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id));
2690   stgFree(m);
2691
2692   // Postcondition: sched_mutex still held
2693   return stat;
2694 }
2695
2696 /* ---------------------------------------------------------------------------
2697    Where are the roots that we know about?
2698
2699         - all the threads on the runnable queue
2700         - all the threads on the blocked queue
2701         - all the threads on the sleeping queue
2702         - all the thread currently executing a _ccall_GC
2703         - all the "main threads"
2704      
2705    ------------------------------------------------------------------------ */
2706
2707 /* This has to be protected either by the scheduler monitor, or by the
2708         garbage collection monitor (probably the latter).
2709         KH @ 25/10/99
2710 */
2711
2712 void
2713 GetRoots( evac_fn evac )
2714 {
2715 #if defined(GRAN)
2716   {
2717     nat i;
2718     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2719       if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2720           evac((StgClosure **)&run_queue_hds[i]);
2721       if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2722           evac((StgClosure **)&run_queue_tls[i]);
2723       
2724       if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2725           evac((StgClosure **)&blocked_queue_hds[i]);
2726       if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2727           evac((StgClosure **)&blocked_queue_tls[i]);
2728       if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2729           evac((StgClosure **)&ccalling_threads[i]);
2730     }
2731   }
2732
2733   markEventQueue();
2734
2735 #else /* !GRAN */
2736   if (run_queue_hd != END_TSO_QUEUE) {
2737       ASSERT(run_queue_tl != END_TSO_QUEUE);
2738       evac((StgClosure **)&run_queue_hd);
2739       evac((StgClosure **)&run_queue_tl);
2740   }
2741   
2742   if (blocked_queue_hd != END_TSO_QUEUE) {
2743       ASSERT(blocked_queue_tl != END_TSO_QUEUE);
2744       evac((StgClosure **)&blocked_queue_hd);
2745       evac((StgClosure **)&blocked_queue_tl);
2746   }
2747   
2748   if (sleeping_queue != END_TSO_QUEUE) {
2749       evac((StgClosure **)&sleeping_queue);
2750   }
2751 #endif 
2752
2753   if (blackhole_queue != END_TSO_QUEUE) {
2754       evac((StgClosure **)&blackhole_queue);
2755   }
2756
2757   if (suspended_ccalling_threads != END_TSO_QUEUE) {
2758       evac((StgClosure **)&suspended_ccalling_threads);
2759   }
2760
2761 #if defined(PARALLEL_HASKELL) || defined(GRAN)
2762   markSparkQueue(evac);
2763 #endif
2764
2765 #if defined(RTS_USER_SIGNALS)
2766   // mark the signal handlers (signals should be already blocked)
2767   markSignalHandlers(evac);
2768 #endif
2769 }
2770
2771 /* -----------------------------------------------------------------------------
2772    performGC
2773
2774    This is the interface to the garbage collector from Haskell land.
2775    We provide this so that external C code can allocate and garbage
2776    collect when called from Haskell via _ccall_GC.
2777
2778    It might be useful to provide an interface whereby the programmer
2779    can specify more roots (ToDo).
2780    
2781    This needs to be protected by the GC condition variable above.  KH.
2782    -------------------------------------------------------------------------- */
2783
2784 static void (*extra_roots)(evac_fn);
2785
2786 void
2787 performGC(void)
2788 {
2789   /* Obligated to hold this lock upon entry */
2790   ACQUIRE_LOCK(&sched_mutex);
2791   GarbageCollect(GetRoots,rtsFalse);
2792   RELEASE_LOCK(&sched_mutex);
2793 }
2794
2795 void
2796 performMajorGC(void)
2797 {
2798   ACQUIRE_LOCK(&sched_mutex);
2799   GarbageCollect(GetRoots,rtsTrue);
2800   RELEASE_LOCK(&sched_mutex);
2801 }
2802
2803 static void
2804 AllRoots(evac_fn evac)
2805 {
2806     GetRoots(evac);             // the scheduler's roots
2807     extra_roots(evac);          // the user's roots
2808 }
2809
2810 void
2811 performGCWithRoots(void (*get_roots)(evac_fn))
2812 {
2813   ACQUIRE_LOCK(&sched_mutex);
2814   extra_roots = get_roots;
2815   GarbageCollect(AllRoots,rtsFalse);
2816   RELEASE_LOCK(&sched_mutex);
2817 }
2818
2819 /* -----------------------------------------------------------------------------
2820    Stack overflow
2821
2822    If the thread has reached its maximum stack size, then raise the
2823    StackOverflow exception in the offending thread.  Otherwise
2824    relocate the TSO into a larger chunk of memory and adjust its stack
2825    size appropriately.
2826    -------------------------------------------------------------------------- */
2827
2828 static StgTSO *
2829 threadStackOverflow(StgTSO *tso)
2830 {
2831   nat new_stack_size, stack_words;
2832   lnat new_tso_size;
2833   StgPtr new_sp;
2834   StgTSO *dest;
2835
2836   IF_DEBUG(sanity,checkTSO(tso));
2837   if (tso->stack_size >= tso->max_stack_size) {
2838
2839     IF_DEBUG(gc,
2840              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
2841                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
2842              /* If we're debugging, just print out the top of the stack */
2843              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2844                                               tso->sp+64)));
2845
2846     /* Send this thread the StackOverflow exception */
2847     raiseAsync(tso, (StgClosure *)stackOverflow_closure);
2848     return tso;
2849   }
2850
2851   /* Try to double the current stack size.  If that takes us over the
2852    * maximum stack size for this thread, then use the maximum instead.
2853    * Finally round up so the TSO ends up as a whole number of blocks.
2854    */
2855   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
2856   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
2857                                        TSO_STRUCT_SIZE)/sizeof(W_);
2858   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
2859   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
2860
2861   IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size));
2862
2863   dest = (StgTSO *)allocate(new_tso_size);
2864   TICK_ALLOC_TSO(new_stack_size,0);
2865
2866   /* copy the TSO block and the old stack into the new area */
2867   memcpy(dest,tso,TSO_STRUCT_SIZE);
2868   stack_words = tso->stack + tso->stack_size - tso->sp;
2869   new_sp = (P_)dest + new_tso_size - stack_words;
2870   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
2871
2872   /* relocate the stack pointers... */
2873   dest->sp         = new_sp;
2874   dest->stack_size = new_stack_size;
2875         
2876   /* Mark the old TSO as relocated.  We have to check for relocated
2877    * TSOs in the garbage collector and any primops that deal with TSOs.
2878    *
2879    * It's important to set the sp value to just beyond the end
2880    * of the stack, so we don't attempt to scavenge any part of the
2881    * dead TSO's stack.
2882    */
2883   tso->what_next = ThreadRelocated;
2884   tso->link = dest;
2885   tso->sp = (P_)&(tso->stack[tso->stack_size]);
2886   tso->why_blocked = NotBlocked;
2887
2888   IF_PAR_DEBUG(verbose,
2889                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
2890                      tso->id, tso, tso->stack_size);
2891                /* If we're debugging, just print out the top of the stack */
2892                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
2893                                                 tso->sp+64)));
2894   
2895   IF_DEBUG(sanity,checkTSO(tso));
2896 #if 0
2897   IF_DEBUG(scheduler,printTSO(dest));
2898 #endif
2899
2900   return dest;
2901 }
2902
2903 /* ---------------------------------------------------------------------------
2904    Wake up a queue that was blocked on some resource.
2905    ------------------------------------------------------------------------ */
2906
2907 #if defined(GRAN)
2908 STATIC_INLINE void
2909 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2910 {
2911 }
2912 #elif defined(PARALLEL_HASKELL)
2913 STATIC_INLINE void
2914 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
2915 {
2916   /* write RESUME events to log file and
2917      update blocked and fetch time (depending on type of the orig closure) */
2918   if (RtsFlags.ParFlags.ParStats.Full) {
2919     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
2920                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
2921                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
2922     if (EMPTY_RUN_QUEUE())
2923       emitSchedule = rtsTrue;
2924
2925     switch (get_itbl(node)->type) {
2926         case FETCH_ME_BQ:
2927           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2928           break;
2929         case RBH:
2930         case FETCH_ME:
2931         case BLACKHOLE_BQ:
2932           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
2933           break;
2934 #ifdef DIST
2935         case MVAR:
2936           break;
2937 #endif    
2938         default:
2939           barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue");
2940         }
2941       }
2942 }
2943 #endif
2944
2945 #if defined(GRAN)
2946 static StgBlockingQueueElement *
2947 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2948 {
2949     StgTSO *tso;
2950     PEs node_loc, tso_loc;
2951
2952     node_loc = where_is(node); // should be lifted out of loop
2953     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
2954     tso_loc = where_is((StgClosure *)tso);
2955     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
2956       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
2957       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
2958       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
2959       // insertThread(tso, node_loc);
2960       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
2961                 ResumeThread,
2962                 tso, node, (rtsSpark*)NULL);
2963       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2964       // len_local++;
2965       // len++;
2966     } else { // TSO is remote (actually should be FMBQ)
2967       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
2968                                   RtsFlags.GranFlags.Costs.gunblocktime +
2969                                   RtsFlags.GranFlags.Costs.latency;
2970       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
2971                 UnblockThread,
2972                 tso, node, (rtsSpark*)NULL);
2973       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
2974       // len++;
2975     }
2976     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
2977     IF_GRAN_DEBUG(bq,
2978                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
2979                           (node_loc==tso_loc ? "Local" : "Global"), 
2980                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
2981     tso->block_info.closure = NULL;
2982     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
2983                              tso->id, tso));
2984 }
2985 #elif defined(PARALLEL_HASKELL)
2986 static StgBlockingQueueElement *
2987 unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node)
2988 {
2989     StgBlockingQueueElement *next;
2990
2991     switch (get_itbl(bqe)->type) {
2992     case TSO:
2993       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
2994       /* if it's a TSO just push it onto the run_queue */
2995       next = bqe->link;
2996       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
2997       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
2998       threadRunnable();
2999       unblockCount(bqe, node);
3000       /* reset blocking status after dumping event */
3001       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3002       break;
3003
3004     case BLOCKED_FETCH:
3005       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3006       next = bqe->link;
3007       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3008       PendingFetches = (StgBlockedFetch *)bqe;
3009       break;
3010
3011 # if defined(DEBUG)
3012       /* can ignore this case in a non-debugging setup; 
3013          see comments on RBHSave closures above */
3014     case CONSTR:
3015       /* check that the closure is an RBHSave closure */
3016       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3017              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3018              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3019       break;
3020
3021     default:
3022       barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3023            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3024            (StgClosure *)bqe);
3025 # endif
3026     }
3027   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3028   return next;
3029 }
3030
3031 #else /* !GRAN && !PARALLEL_HASKELL */
3032 static StgTSO *
3033 unblockOneLocked(StgTSO *tso)
3034 {
3035   StgTSO *next;
3036
3037   ASSERT(get_itbl(tso)->type == TSO);
3038   ASSERT(tso->why_blocked != NotBlocked);
3039   tso->why_blocked = NotBlocked;
3040   next = tso->link;
3041   tso->link = END_TSO_QUEUE;
3042   APPEND_TO_RUN_QUEUE(tso);
3043   threadRunnable();
3044   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3045   return next;
3046 }
3047 #endif
3048
3049 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3050 INLINE_ME StgBlockingQueueElement *
3051 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3052 {
3053   ACQUIRE_LOCK(&sched_mutex);
3054   bqe = unblockOneLocked(bqe, node);
3055   RELEASE_LOCK(&sched_mutex);
3056   return bqe;
3057 }
3058 #else
3059 INLINE_ME StgTSO *
3060 unblockOne(StgTSO *tso)
3061 {
3062   ACQUIRE_LOCK(&sched_mutex);
3063   tso = unblockOneLocked(tso);
3064   RELEASE_LOCK(&sched_mutex);
3065   return tso;
3066 }
3067 #endif
3068
3069 #if defined(GRAN)
3070 void 
3071 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3072 {
3073   StgBlockingQueueElement *bqe;
3074   PEs node_loc;
3075   nat len = 0; 
3076
3077   IF_GRAN_DEBUG(bq, 
3078                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3079                       node, CurrentProc, CurrentTime[CurrentProc], 
3080                       CurrentTSO->id, CurrentTSO));
3081
3082   node_loc = where_is(node);
3083
3084   ASSERT(q == END_BQ_QUEUE ||
3085          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3086          get_itbl(q)->type == CONSTR); // closure (type constructor)
3087   ASSERT(is_unique(node));
3088
3089   /* FAKE FETCH: magically copy the node to the tso's proc;
3090      no Fetch necessary because in reality the node should not have been 
3091      moved to the other PE in the first place
3092   */
3093   if (CurrentProc!=node_loc) {
3094     IF_GRAN_DEBUG(bq, 
3095                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3096                         node, node_loc, CurrentProc, CurrentTSO->id, 
3097                         // CurrentTSO, where_is(CurrentTSO),
3098                         node->header.gran.procs));
3099     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3100     IF_GRAN_DEBUG(bq, 
3101                   debugBelch("## new bitmask of node %p is %#x\n",
3102                         node, node->header.gran.procs));
3103     if (RtsFlags.GranFlags.GranSimStats.Global) {
3104       globalGranStats.tot_fake_fetches++;
3105     }
3106   }
3107
3108   bqe = q;
3109   // ToDo: check: ASSERT(CurrentProc==node_loc);
3110   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3111     //next = bqe->link;
3112     /* 
3113        bqe points to the current element in the queue
3114        next points to the next element in the queue
3115     */
3116     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3117     //tso_loc = where_is(tso);
3118     len++;
3119     bqe = unblockOneLocked(bqe, node);
3120   }
3121
3122   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3123      the closure to make room for the anchor of the BQ */
3124   if (bqe!=END_BQ_QUEUE) {
3125     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3126     /*
3127     ASSERT((info_ptr==&RBH_Save_0_info) ||
3128            (info_ptr==&RBH_Save_1_info) ||
3129            (info_ptr==&RBH_Save_2_info));
3130     */
3131     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3132     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3133     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3134
3135     IF_GRAN_DEBUG(bq,
3136                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3137                         node, info_type(node)));
3138   }
3139
3140   /* statistics gathering */
3141   if (RtsFlags.GranFlags.GranSimStats.Global) {
3142     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3143     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3144     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3145     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3146   }
3147   IF_GRAN_DEBUG(bq,
3148                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3149                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3150 }
3151 #elif defined(PARALLEL_HASKELL)
3152 void 
3153 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3154 {
3155   StgBlockingQueueElement *bqe;
3156
3157   ACQUIRE_LOCK(&sched_mutex);
3158
3159   IF_PAR_DEBUG(verbose, 
3160                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3161                      node, mytid));
3162 #ifdef DIST  
3163   //RFP
3164   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3165     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3166     return;
3167   }
3168 #endif
3169   
3170   ASSERT(q == END_BQ_QUEUE ||
3171          get_itbl(q)->type == TSO ||           
3172          get_itbl(q)->type == BLOCKED_FETCH || 
3173          get_itbl(q)->type == CONSTR); 
3174
3175   bqe = q;
3176   while (get_itbl(bqe)->type==TSO || 
3177          get_itbl(bqe)->type==BLOCKED_FETCH) {
3178     bqe = unblockOneLocked(bqe, node);
3179   }
3180   RELEASE_LOCK(&sched_mutex);
3181 }
3182
3183 #else   /* !GRAN && !PARALLEL_HASKELL */
3184
3185 void
3186 awakenBlockedQueueNoLock(StgTSO *tso)
3187 {
3188   while (tso != END_TSO_QUEUE) {
3189     tso = unblockOneLocked(tso);
3190   }
3191 }
3192
3193 void
3194 awakenBlockedQueue(StgTSO *tso)
3195 {
3196   ACQUIRE_LOCK(&sched_mutex);
3197   while (tso != END_TSO_QUEUE) {
3198     tso = unblockOneLocked(tso);
3199   }
3200   RELEASE_LOCK(&sched_mutex);
3201 }
3202 #endif
3203
3204 /* ---------------------------------------------------------------------------
3205    Interrupt execution
3206    - usually called inside a signal handler so it mustn't do anything fancy.   
3207    ------------------------------------------------------------------------ */
3208
3209 void
3210 interruptStgRts(void)
3211 {
3212     interrupted    = 1;
3213     context_switch = 1;
3214 }
3215
3216 /* -----------------------------------------------------------------------------
3217    Unblock a thread
3218
3219    This is for use when we raise an exception in another thread, which
3220    may be blocked.
3221    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3222    -------------------------------------------------------------------------- */
3223
3224 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3225 /*
3226   NB: only the type of the blocking queue is different in GranSim and GUM
3227       the operations on the queue-elements are the same
3228       long live polymorphism!
3229
3230   Locks: sched_mutex is held upon entry and exit.
3231
3232 */
3233 static void
3234 unblockThread(StgTSO *tso)
3235 {
3236   StgBlockingQueueElement *t, **last;
3237
3238   switch (tso->why_blocked) {
3239
3240   case NotBlocked:
3241     return;  /* not blocked */
3242
3243   case BlockedOnSTM:
3244     // Be careful: nothing to do here!  We tell the scheduler that the thread
3245     // is runnable and we leave it to the stack-walking code to abort the 
3246     // transaction while unwinding the stack.  We should perhaps have a debugging
3247     // test to make sure that this really happens and that the 'zombie' transaction
3248     // does not get committed.
3249     goto done;
3250
3251   case BlockedOnMVar:
3252     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3253     {
3254       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3255       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3256
3257       last = (StgBlockingQueueElement **)&mvar->head;
3258       for (t = (StgBlockingQueueElement *)mvar->head; 
3259            t != END_BQ_QUEUE; 
3260            last = &t->link, last_tso = t, t = t->link) {
3261         if (t == (StgBlockingQueueElement *)tso) {
3262           *last = (StgBlockingQueueElement *)tso->link;
3263           if (mvar->tail == tso) {
3264             mvar->tail = (StgTSO *)last_tso;
3265           }
3266           goto done;
3267         }
3268       }
3269       barf("unblockThread (MVAR): TSO not found");
3270     }
3271
3272   case BlockedOnBlackHole:
3273     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3274     {
3275       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3276
3277       last = &bq->blocking_queue;
3278       for (t = bq->blocking_queue; 
3279            t != END_BQ_QUEUE; 
3280            last = &t->link, t = t->link) {
3281         if (t == (StgBlockingQueueElement *)tso) {
3282           *last = (StgBlockingQueueElement *)tso->link;
3283           goto done;
3284         }
3285       }
3286       barf("unblockThread (BLACKHOLE): TSO not found");
3287     }
3288
3289   case BlockedOnException:
3290     {
3291       StgTSO *target  = tso->block_info.tso;
3292
3293       ASSERT(get_itbl(target)->type == TSO);
3294
3295       if (target->what_next == ThreadRelocated) {
3296           target = target->link;
3297           ASSERT(get_itbl(target)->type == TSO);
3298       }
3299
3300       ASSERT(target->blocked_exceptions != NULL);
3301
3302       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3303       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3304            t != END_BQ_QUEUE; 
3305            last = &t->link, t = t->link) {
3306         ASSERT(get_itbl(t)->type == TSO);
3307         if (t == (StgBlockingQueueElement *)tso) {
3308           *last = (StgBlockingQueueElement *)tso->link;
3309           goto done;
3310         }
3311       }
3312       barf("unblockThread (Exception): TSO not found");
3313     }
3314
3315   case BlockedOnRead:
3316   case BlockedOnWrite:
3317 #if defined(mingw32_HOST_OS)
3318   case BlockedOnDoProc:
3319 #endif
3320     {
3321       /* take TSO off blocked_queue */
3322       StgBlockingQueueElement *prev = NULL;
3323       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3324            prev = t, t = t->link) {
3325         if (t == (StgBlockingQueueElement *)tso) {
3326           if (prev == NULL) {
3327             blocked_queue_hd = (StgTSO *)t->link;
3328             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3329               blocked_queue_tl = END_TSO_QUEUE;
3330             }
3331           } else {
3332             prev->link = t->link;
3333             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3334               blocked_queue_tl = (StgTSO *)prev;
3335             }
3336           }
3337           goto done;
3338         }
3339       }
3340       barf("unblockThread (I/O): TSO not found");
3341     }
3342
3343   case BlockedOnDelay:
3344     {
3345       /* take TSO off sleeping_queue */
3346       StgBlockingQueueElement *prev = NULL;
3347       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3348            prev = t, t = t->link) {
3349         if (t == (StgBlockingQueueElement *)tso) {
3350           if (prev == NULL) {
3351             sleeping_queue = (StgTSO *)t->link;
3352           } else {
3353             prev->link = t->link;
3354           }
3355           goto done;
3356         }
3357       }
3358       barf("unblockThread (delay): TSO not found");
3359     }
3360
3361   default:
3362     barf("unblockThread");
3363   }
3364
3365  done:
3366   tso->link = END_TSO_QUEUE;
3367   tso->why_blocked = NotBlocked;
3368   tso->block_info.closure = NULL;
3369   PUSH_ON_RUN_QUEUE(tso);
3370 }
3371 #else
3372 static void
3373 unblockThread(StgTSO *tso)
3374 {
3375   StgTSO *t, **last;
3376   
3377   /* To avoid locking unnecessarily. */
3378   if (tso->why_blocked == NotBlocked) {
3379     return;
3380   }
3381
3382   switch (tso->why_blocked) {
3383
3384   case BlockedOnSTM:
3385     // Be careful: nothing to do here!  We tell the scheduler that the thread
3386     // is runnable and we leave it to the stack-walking code to abort the 
3387     // transaction while unwinding the stack.  We should perhaps have a debugging
3388     // test to make sure that this really happens and that the 'zombie' transaction
3389     // does not get committed.
3390     goto done;
3391
3392   case BlockedOnMVar:
3393     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3394     {
3395       StgTSO *last_tso = END_TSO_QUEUE;
3396       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3397
3398       last = &mvar->head;
3399       for (t = mvar->head; t != END_TSO_QUEUE; 
3400            last = &t->link, last_tso = t, t = t->link) {
3401         if (t == tso) {
3402           *last = tso->link;
3403           if (mvar->tail == tso) {
3404             mvar->tail = last_tso;
3405           }
3406           goto done;
3407         }
3408       }
3409       barf("unblockThread (MVAR): TSO not found");
3410     }
3411
3412   case BlockedOnBlackHole:
3413     {
3414       last = &blackhole_queue;
3415       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3416            last = &t->link, t = t->link) {
3417         if (t == tso) {
3418           *last = tso->link;
3419           goto done;
3420         }
3421       }
3422       barf("unblockThread (BLACKHOLE): TSO not found");
3423     }
3424
3425   case BlockedOnException:
3426     {
3427       StgTSO *target  = tso->block_info.tso;
3428
3429       ASSERT(get_itbl(target)->type == TSO);
3430
3431       while (target->what_next == ThreadRelocated) {
3432           target = target->link;
3433           ASSERT(get_itbl(target)->type == TSO);
3434       }
3435       
3436       ASSERT(target->blocked_exceptions != NULL);
3437
3438       last = &target->blocked_exceptions;
3439       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3440            last = &t->link, t = t->link) {
3441         ASSERT(get_itbl(t)->type == TSO);
3442         if (t == tso) {
3443           *last = tso->link;
3444           goto done;
3445         }
3446       }
3447       barf("unblockThread (Exception): TSO not found");
3448     }
3449
3450   case BlockedOnRead:
3451   case BlockedOnWrite:
3452 #if defined(mingw32_HOST_OS)
3453   case BlockedOnDoProc:
3454 #endif
3455     {
3456       StgTSO *prev = NULL;
3457       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3458            prev = t, t = t->link) {
3459         if (t == tso) {
3460           if (prev == NULL) {
3461             blocked_queue_hd = t->link;
3462             if (blocked_queue_tl == t) {
3463               blocked_queue_tl = END_TSO_QUEUE;
3464             }
3465           } else {
3466             prev->link = t->link;
3467             if (blocked_queue_tl == t) {
3468               blocked_queue_tl = prev;
3469             }
3470           }
3471           goto done;
3472         }
3473       }
3474       barf("unblockThread (I/O): TSO not found");
3475     }
3476
3477   case BlockedOnDelay:
3478     {
3479       StgTSO *prev = NULL;
3480       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3481            prev = t, t = t->link) {
3482         if (t == tso) {
3483           if (prev == NULL) {
3484             sleeping_queue = t->link;
3485           } else {
3486             prev->link = t->link;
3487           }
3488           goto done;
3489         }
3490       }
3491       barf("unblockThread (delay): TSO not found");
3492     }
3493
3494   default:
3495     barf("unblockThread");
3496   }
3497
3498  done:
3499   tso->link = END_TSO_QUEUE;
3500   tso->why_blocked = NotBlocked;
3501   tso->block_info.closure = NULL;
3502   APPEND_TO_RUN_QUEUE(tso);
3503 }
3504 #endif
3505
3506 /* -----------------------------------------------------------------------------
3507  * checkBlackHoles()
3508  *
3509  * Check the blackhole_queue for threads that can be woken up.  We do
3510  * this periodically: before every GC, and whenever the run queue is
3511  * empty.
3512  *
3513  * An elegant solution might be to just wake up all the blocked
3514  * threads with awakenBlockedQueue occasionally: they'll go back to
3515  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3516  * doesn't give us a way to tell whether we've actually managed to
3517  * wake up any threads, so we would be busy-waiting.
3518  *
3519  * -------------------------------------------------------------------------- */
3520
3521 static rtsBool
3522 checkBlackHoles( void )
3523 {
3524     StgTSO **prev, *t;
3525     rtsBool any_woke_up = rtsFalse;
3526     StgHalfWord type;
3527
3528     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3529
3530     // ASSUMES: sched_mutex
3531     prev = &blackhole_queue;
3532     t = blackhole_queue;
3533     while (t != END_TSO_QUEUE) {
3534         ASSERT(t->why_blocked == BlockedOnBlackHole);
3535         type = get_itbl(t->block_info.closure)->type;
3536         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3537             t = unblockOneLocked(t);
3538             *prev = t;
3539             any_woke_up = rtsTrue;
3540         } else {
3541             prev = &t->link;
3542             t = t->link;
3543         }
3544     }
3545
3546     return any_woke_up;
3547 }
3548
3549 /* -----------------------------------------------------------------------------
3550  * raiseAsync()
3551  *
3552  * The following function implements the magic for raising an
3553  * asynchronous exception in an existing thread.
3554  *
3555  * We first remove the thread from any queue on which it might be
3556  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3557  *
3558  * We strip the stack down to the innermost CATCH_FRAME, building
3559  * thunks in the heap for all the active computations, so they can 
3560  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3561  * an application of the handler to the exception, and push it on
3562  * the top of the stack.
3563  * 
3564  * How exactly do we save all the active computations?  We create an
3565  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3566  * AP_STACKs pushes everything from the corresponding update frame
3567  * upwards onto the stack.  (Actually, it pushes everything up to the
3568  * next update frame plus a pointer to the next AP_STACK object.
3569  * Entering the next AP_STACK object pushes more onto the stack until we
3570  * reach the last AP_STACK object - at which point the stack should look
3571  * exactly as it did when we killed the TSO and we can continue
3572  * execution by entering the closure on top of the stack.
3573  *
3574  * We can also kill a thread entirely - this happens if either (a) the 
3575  * exception passed to raiseAsync is NULL, or (b) there's no
3576  * CATCH_FRAME on the stack.  In either case, we strip the entire
3577  * stack and replace the thread with a zombie.
3578  *
3579  * Locks: sched_mutex held upon entry nor exit.
3580  *
3581  * -------------------------------------------------------------------------- */
3582  
3583 void 
3584 deleteThread(StgTSO *tso)
3585 {
3586   if (tso->why_blocked != BlockedOnCCall &&
3587       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3588       raiseAsync(tso,NULL);
3589   }
3590 }
3591
3592 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
3593 static void 
3594 deleteThreadImmediately(StgTSO *tso)
3595 { // for forkProcess only:
3596   // delete thread without giving it a chance to catch the KillThread exception
3597
3598   if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3599       return;
3600   }
3601
3602   if (tso->why_blocked != BlockedOnCCall &&
3603       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3604     unblockThread(tso);
3605   }
3606
3607   tso->what_next = ThreadKilled;
3608 }
3609 #endif
3610
3611 void
3612 raiseAsyncWithLock(StgTSO *tso, StgClosure *exception)
3613 {
3614   /* When raising async exs from contexts where sched_mutex isn't held;
3615      use raiseAsyncWithLock(). */
3616   ACQUIRE_LOCK(&sched_mutex);
3617   raiseAsync(tso,exception);
3618   RELEASE_LOCK(&sched_mutex);
3619 }
3620
3621 void
3622 raiseAsync(StgTSO *tso, StgClosure *exception)
3623 {
3624     raiseAsync_(tso, exception, rtsFalse);
3625 }
3626
3627 static void
3628 raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically)
3629 {
3630     StgRetInfoTable *info;
3631     StgPtr sp;
3632   
3633     // Thread already dead?
3634     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3635         return;
3636     }
3637
3638     IF_DEBUG(scheduler, 
3639              sched_belch("raising exception in thread %ld.", (long)tso->id));
3640     
3641     // Remove it from any blocking queues
3642     unblockThread(tso);
3643
3644     sp = tso->sp;
3645     
3646     // The stack freezing code assumes there's a closure pointer on
3647     // the top of the stack, so we have to arrange that this is the case...
3648     //
3649     if (sp[0] == (W_)&stg_enter_info) {
3650         sp++;
3651     } else {
3652         sp--;
3653         sp[0] = (W_)&stg_dummy_ret_closure;
3654     }
3655
3656     while (1) {
3657         nat i;
3658
3659         // 1. Let the top of the stack be the "current closure"
3660         //
3661         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3662         // CATCH_FRAME.
3663         //
3664         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3665         // current closure applied to the chunk of stack up to (but not
3666         // including) the update frame.  This closure becomes the "current
3667         // closure".  Go back to step 2.
3668         //
3669         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3670         // top of the stack applied to the exception.
3671         // 
3672         // 5. If it's a STOP_FRAME, then kill the thread.
3673         // 
3674         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3675         // transaction
3676        
3677         
3678         StgPtr frame;
3679         
3680         frame = sp + 1;
3681         info = get_ret_itbl((StgClosure *)frame);
3682         
3683         while (info->i.type != UPDATE_FRAME
3684                && (info->i.type != CATCH_FRAME || exception == NULL)
3685                && info->i.type != STOP_FRAME
3686                && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
3687         {
3688             if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
3689               // IF we find an ATOMICALLY_FRAME then we abort the
3690               // current transaction and propagate the exception.  In
3691               // this case (unlike ordinary exceptions) we do not care
3692               // whether the transaction is valid or not because its
3693               // possible validity cannot have caused the exception
3694               // and will not be visible after the abort.
3695               IF_DEBUG(stm,
3696                        debugBelch("Found atomically block delivering async exception\n"));
3697               stmAbortTransaction(tso -> trec);
3698               tso -> trec = stmGetEnclosingTRec(tso -> trec);
3699             }
3700             frame += stack_frame_sizeW((StgClosure *)frame);
3701             info = get_ret_itbl((StgClosure *)frame);
3702         }
3703         
3704         switch (info->i.type) {
3705             
3706         case ATOMICALLY_FRAME:
3707             ASSERT(stop_at_atomically);
3708             ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3709             stmCondemnTransaction(tso -> trec);
3710 #ifdef REG_R1
3711             tso->sp = frame;
3712 #else
3713             // R1 is not a register: the return convention for IO in
3714             // this case puts the return value on the stack, so we
3715             // need to set up the stack to return to the atomically
3716             // frame properly...
3717             tso->sp = frame - 2;
3718             tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3719             tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3720 #endif
3721             tso->what_next = ThreadRunGHC;
3722             return;
3723
3724         case CATCH_FRAME:
3725             // If we find a CATCH_FRAME, and we've got an exception to raise,
3726             // then build the THUNK raise(exception), and leave it on
3727             // top of the CATCH_FRAME ready to enter.
3728             //
3729         {
3730 #ifdef PROFILING
3731             StgCatchFrame *cf = (StgCatchFrame *)frame;
3732 #endif
3733             StgClosure *raise;
3734             
3735             // we've got an exception to raise, so let's pass it to the
3736             // handler in this frame.
3737             //
3738             raise = (StgClosure *)allocate(sizeofW(StgClosure)+1);
3739             TICK_ALLOC_SE_THK(1,0);
3740             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3741             raise->payload[0] = exception;
3742             
3743             // throw away the stack from Sp up to the CATCH_FRAME.
3744             //
3745             sp = frame - 1;
3746             
3747             /* Ensure that async excpetions are blocked now, so we don't get
3748              * a surprise exception before we get around to executing the
3749              * handler.
3750              */
3751             if (tso->blocked_exceptions == NULL) {
3752                 tso->blocked_exceptions = END_TSO_QUEUE;
3753             }
3754             
3755             /* Put the newly-built THUNK on top of the stack, ready to execute
3756              * when the thread restarts.
3757              */
3758             sp[0] = (W_)raise;
3759             sp[-1] = (W_)&stg_enter_info;
3760             tso->sp = sp-1;
3761             tso->what_next = ThreadRunGHC;
3762             IF_DEBUG(sanity, checkTSO(tso));
3763             return;
3764         }
3765         
3766         case UPDATE_FRAME:
3767         {
3768             StgAP_STACK * ap;
3769             nat words;
3770             
3771             // First build an AP_STACK consisting of the stack chunk above the
3772             // current update frame, with the top word on the stack as the
3773             // fun field.
3774             //
3775             words = frame - sp - 1;
3776             ap = (StgAP_STACK *)allocate(PAP_sizeW(words));
3777             
3778             ap->size = words;
3779             ap->fun  = (StgClosure *)sp[0];
3780             sp++;
3781             for(i=0; i < (nat)words; ++i) {
3782                 ap->payload[i] = (StgClosure *)*sp++;
3783             }
3784             
3785             SET_HDR(ap,&stg_AP_STACK_info,
3786                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3787             TICK_ALLOC_UP_THK(words+1,0);
3788             
3789             IF_DEBUG(scheduler,
3790                      debugBelch("sched: Updating ");
3791                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3792                      debugBelch(" with ");
3793                      printObj((StgClosure *)ap);
3794                 );
3795
3796             // Replace the updatee with an indirection - happily
3797             // this will also wake up any threads currently
3798             // waiting on the result.
3799             //
3800             // Warning: if we're in a loop, more than one update frame on
3801             // the stack may point to the same object.  Be careful not to
3802             // overwrite an IND_OLDGEN in this case, because we'll screw
3803             // up the mutable lists.  To be on the safe side, don't
3804             // overwrite any kind of indirection at all.  See also
3805             // threadSqueezeStack in GC.c, where we have to make a similar
3806             // check.
3807             //
3808             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3809                 // revert the black hole
3810                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3811                                (StgClosure *)ap);
3812             }
3813             sp += sizeofW(StgUpdateFrame) - 1;
3814             sp[0] = (W_)ap; // push onto stack
3815             break;
3816         }
3817         
3818         case STOP_FRAME:
3819             // We've stripped the entire stack, the thread is now dead.
3820             sp += sizeofW(StgStopFrame);
3821             tso->what_next = ThreadKilled;
3822             tso->sp = sp;
3823             return;
3824             
3825         default:
3826             barf("raiseAsync");
3827         }
3828     }
3829     barf("raiseAsync");
3830 }
3831
3832 /* -----------------------------------------------------------------------------
3833    raiseExceptionHelper
3834    
3835    This function is called by the raise# primitve, just so that we can
3836    move some of the tricky bits of raising an exception from C-- into
3837    C.  Who knows, it might be a useful re-useable thing here too.
3838    -------------------------------------------------------------------------- */
3839
3840 StgWord
3841 raiseExceptionHelper (StgTSO *tso, StgClosure *exception)
3842 {
3843     StgClosure *raise_closure = NULL;
3844     StgPtr p, next;
3845     StgRetInfoTable *info;
3846     //
3847     // This closure represents the expression 'raise# E' where E
3848     // is the exception raise.  It is used to overwrite all the
3849     // thunks which are currently under evaluataion.
3850     //
3851
3852     //    
3853     // LDV profiling: stg_raise_info has THUNK as its closure
3854     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
3855     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
3856     // 1 does not cause any problem unless profiling is performed.
3857     // However, when LDV profiling goes on, we need to linearly scan
3858     // small object pool, where raise_closure is stored, so we should
3859     // use MIN_UPD_SIZE.
3860     //
3861     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
3862     //                                 sizeofW(StgClosure)+1);
3863     //
3864
3865     //
3866     // Walk up the stack, looking for the catch frame.  On the way,
3867     // we update any closures pointed to from update frames with the
3868     // raise closure that we just built.
3869     //
3870     p = tso->sp;
3871     while(1) {
3872         info = get_ret_itbl((StgClosure *)p);
3873         next = p + stack_frame_sizeW((StgClosure *)p);
3874         switch (info->i.type) {
3875             
3876         case UPDATE_FRAME:
3877             // Only create raise_closure if we need to.
3878             if (raise_closure == NULL) {
3879                 raise_closure = 
3880                     (StgClosure *)allocate(sizeofW(StgClosure)+MIN_UPD_SIZE);
3881                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
3882                 raise_closure->payload[0] = exception;
3883             }
3884             UPD_IND(((StgUpdateFrame *)p)->updatee,raise_closure);
3885             p = next;
3886             continue;
3887
3888         case ATOMICALLY_FRAME:
3889             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
3890             tso->sp = p;
3891             return ATOMICALLY_FRAME;
3892             
3893         case CATCH_FRAME:
3894             tso->sp = p;
3895             return CATCH_FRAME;
3896
3897         case CATCH_STM_FRAME:
3898             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
3899             tso->sp = p;
3900             return CATCH_STM_FRAME;
3901             
3902         case STOP_FRAME:
3903             tso->sp = p;
3904             return STOP_FRAME;
3905
3906         case CATCH_RETRY_FRAME:
3907         default:
3908             p = next; 
3909             continue;
3910         }
3911     }
3912 }
3913
3914
3915 /* -----------------------------------------------------------------------------
3916    findRetryFrameHelper
3917
3918    This function is called by the retry# primitive.  It traverses the stack
3919    leaving tso->sp referring to the frame which should handle the retry.  
3920
3921    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
3922    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
3923
3924    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
3925    despite the similar implementation.
3926
3927    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
3928    not be created within memory transactions.
3929    -------------------------------------------------------------------------- */
3930
3931 StgWord
3932 findRetryFrameHelper (StgTSO *tso)
3933 {
3934   StgPtr           p, next;
3935   StgRetInfoTable *info;
3936
3937   p = tso -> sp;
3938   while (1) {
3939     info = get_ret_itbl((StgClosure *)p);
3940     next = p + stack_frame_sizeW((StgClosure *)p);
3941     switch (info->i.type) {
3942       
3943     case ATOMICALLY_FRAME:
3944       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
3945       tso->sp = p;
3946       return ATOMICALLY_FRAME;
3947       
3948     case CATCH_RETRY_FRAME:
3949       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
3950       tso->sp = p;
3951       return CATCH_RETRY_FRAME;
3952       
3953     case CATCH_STM_FRAME:
3954     default:
3955       ASSERT(info->i.type != CATCH_FRAME);
3956       ASSERT(info->i.type != STOP_FRAME);
3957       p = next; 
3958       continue;
3959     }
3960   }
3961 }
3962
3963 /* -----------------------------------------------------------------------------
3964    resurrectThreads is called after garbage collection on the list of
3965    threads found to be garbage.  Each of these threads will be woken
3966    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
3967    on an MVar, or NonTermination if the thread was blocked on a Black
3968    Hole.
3969
3970    Locks: sched_mutex isn't held upon entry nor exit.
3971    -------------------------------------------------------------------------- */
3972
3973 void
3974 resurrectThreads( StgTSO *threads )
3975 {
3976   StgTSO *tso, *next;
3977
3978   for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
3979     next = tso->global_link;
3980     tso->global_link = all_threads;
3981     all_threads = tso;
3982     IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
3983
3984     switch (tso->why_blocked) {
3985     case BlockedOnMVar:
3986     case BlockedOnException:
3987       /* Called by GC - sched_mutex lock is currently held. */
3988       raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure);
3989       break;
3990     case BlockedOnBlackHole:
3991       raiseAsync(tso,(StgClosure *)NonTermination_closure);
3992       break;
3993     case BlockedOnSTM:
3994       raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure);
3995       break;
3996     case NotBlocked:
3997       /* This might happen if the thread was blocked on a black hole
3998        * belonging to a thread that we've just woken up (raiseAsync
3999        * can wake up threads, remember...).
4000        */
4001       continue;
4002     default:
4003       barf("resurrectThreads: thread blocked in a strange way");
4004     }
4005   }
4006 }
4007
4008 /* ----------------------------------------------------------------------------
4009  * Debugging: why is a thread blocked
4010  * [Also provides useful information when debugging threaded programs
4011  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4012    ------------------------------------------------------------------------- */
4013
4014 static void
4015 printThreadBlockage(StgTSO *tso)
4016 {
4017   switch (tso->why_blocked) {
4018   case BlockedOnRead:
4019     debugBelch("is blocked on read from fd %ld", tso->block_info.fd);
4020     break;
4021   case BlockedOnWrite:
4022     debugBelch("is blocked on write to fd %ld", tso->block_info.fd);
4023     break;
4024 #if defined(mingw32_HOST_OS)
4025     case BlockedOnDoProc:
4026     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4027     break;
4028 #endif
4029   case BlockedOnDelay:
4030     debugBelch("is blocked until %ld", tso->block_info.target);
4031     break;
4032   case BlockedOnMVar:
4033     debugBelch("is blocked on an MVar");
4034     break;
4035   case BlockedOnException:
4036     debugBelch("is blocked on delivering an exception to thread %d",
4037             tso->block_info.tso->id);
4038     break;
4039   case BlockedOnBlackHole:
4040     debugBelch("is blocked on a black hole");
4041     break;
4042   case NotBlocked:
4043     debugBelch("is not blocked");
4044     break;
4045 #if defined(PARALLEL_HASKELL)
4046   case BlockedOnGA:
4047     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4048             tso->block_info.closure, info_type(tso->block_info.closure));
4049     break;
4050   case BlockedOnGA_NoSend:
4051     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4052             tso->block_info.closure, info_type(tso->block_info.closure));
4053     break;
4054 #endif
4055   case BlockedOnCCall:
4056     debugBelch("is blocked on an external call");
4057     break;
4058   case BlockedOnCCall_NoUnblockExc:
4059     debugBelch("is blocked on an external call (exceptions were already blocked)");
4060     break;
4061   case BlockedOnSTM:
4062     debugBelch("is blocked on an STM operation");
4063     break;
4064   default:
4065     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4066          tso->why_blocked, tso->id, tso);
4067   }
4068 }
4069
4070 static void
4071 printThreadStatus(StgTSO *tso)
4072 {
4073   switch (tso->what_next) {
4074   case ThreadKilled:
4075     debugBelch("has been killed");
4076     break;
4077   case ThreadComplete:
4078     debugBelch("has completed");
4079     break;
4080   default:
4081     printThreadBlockage(tso);
4082   }
4083 }
4084
4085 void
4086 printAllThreads(void)
4087 {
4088   StgTSO *t;
4089
4090 # if defined(GRAN)
4091   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4092   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4093                        time_string, rtsFalse/*no commas!*/);
4094
4095   debugBelch("all threads at [%s]:\n", time_string);
4096 # elif defined(PARALLEL_HASKELL)
4097   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4098   ullong_format_string(CURRENT_TIME,
4099                        time_string, rtsFalse/*no commas!*/);
4100
4101   debugBelch("all threads at [%s]:\n", time_string);
4102 # else
4103   debugBelch("all threads:\n");
4104 # endif
4105
4106   for (t = all_threads; t != END_TSO_QUEUE; t = t->global_link) {
4107     debugBelch("\tthread %d @ %p ", t->id, (void *)t);
4108 #if defined(DEBUG)
4109     {
4110       void *label = lookupThreadLabel(t->id);
4111       if (label) debugBelch("[\"%s\"] ",(char *)label);
4112     }
4113 #endif
4114     printThreadStatus(t);
4115     debugBelch("\n");
4116   }
4117 }
4118     
4119 #ifdef DEBUG
4120
4121 /* 
4122    Print a whole blocking queue attached to node (debugging only).
4123 */
4124 # if defined(PARALLEL_HASKELL)
4125 void 
4126 print_bq (StgClosure *node)
4127 {
4128   StgBlockingQueueElement *bqe;
4129   StgTSO *tso;
4130   rtsBool end;
4131
4132   debugBelch("## BQ of closure %p (%s): ",
4133           node, info_type(node));
4134
4135   /* should cover all closures that may have a blocking queue */
4136   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4137          get_itbl(node)->type == FETCH_ME_BQ ||
4138          get_itbl(node)->type == RBH ||
4139          get_itbl(node)->type == MVAR);
4140     
4141   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4142
4143   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4144 }
4145
4146 /* 
4147    Print a whole blocking queue starting with the element bqe.
4148 */
4149 void 
4150 print_bqe (StgBlockingQueueElement *bqe)
4151 {
4152   rtsBool end;
4153
4154   /* 
4155      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4156   */
4157   for (end = (bqe==END_BQ_QUEUE);
4158        !end; // iterate until bqe points to a CONSTR
4159        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4160        bqe = end ? END_BQ_QUEUE : bqe->link) {
4161     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4162     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4163     /* types of closures that may appear in a blocking queue */
4164     ASSERT(get_itbl(bqe)->type == TSO ||           
4165            get_itbl(bqe)->type == BLOCKED_FETCH || 
4166            get_itbl(bqe)->type == CONSTR); 
4167     /* only BQs of an RBH end with an RBH_Save closure */
4168     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4169
4170     switch (get_itbl(bqe)->type) {
4171     case TSO:
4172       debugBelch(" TSO %u (%x),",
4173               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4174       break;
4175     case BLOCKED_FETCH:
4176       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4177               ((StgBlockedFetch *)bqe)->node, 
4178               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4179               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4180               ((StgBlockedFetch *)bqe)->ga.weight);
4181       break;
4182     case CONSTR:
4183       debugBelch(" %s (IP %p),",
4184               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4185                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4186                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4187                "RBH_Save_?"), get_itbl(bqe));
4188       break;
4189     default:
4190       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4191            info_type((StgClosure *)bqe)); // , node, info_type(node));
4192       break;
4193     }
4194   } /* for */
4195   debugBelch("\n");
4196 }
4197 # elif defined(GRAN)
4198 void 
4199 print_bq (StgClosure *node)
4200 {
4201   StgBlockingQueueElement *bqe;
4202   PEs node_loc, tso_loc;
4203   rtsBool end;
4204
4205   /* should cover all closures that may have a blocking queue */
4206   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4207          get_itbl(node)->type == FETCH_ME_BQ ||
4208          get_itbl(node)->type == RBH);
4209     
4210   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4211   node_loc = where_is(node);
4212
4213   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4214           node, info_type(node), node_loc);
4215
4216   /* 
4217      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4218   */
4219   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4220        !end; // iterate until bqe points to a CONSTR
4221        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4222     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4223     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4224     /* types of closures that may appear in a blocking queue */
4225     ASSERT(get_itbl(bqe)->type == TSO ||           
4226            get_itbl(bqe)->type == CONSTR); 
4227     /* only BQs of an RBH end with an RBH_Save closure */
4228     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4229
4230     tso_loc = where_is((StgClosure *)bqe);
4231     switch (get_itbl(bqe)->type) {
4232     case TSO:
4233       debugBelch(" TSO %d (%p) on [PE %d],",
4234               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4235       break;
4236     case CONSTR:
4237       debugBelch(" %s (IP %p),",
4238               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4239                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4240                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4241                "RBH_Save_?"), get_itbl(bqe));
4242       break;
4243     default:
4244       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4245            info_type((StgClosure *)bqe), node, info_type(node));
4246       break;
4247     }
4248   } /* for */
4249   debugBelch("\n");
4250 }
4251 # endif
4252
4253 #if defined(PARALLEL_HASKELL)
4254 static nat
4255 run_queue_len(void)
4256 {
4257   nat i;
4258   StgTSO *tso;
4259
4260   for (i=0, tso=run_queue_hd; 
4261        tso != END_TSO_QUEUE;
4262        i++, tso=tso->link)
4263     /* nothing */
4264
4265   return i;
4266 }
4267 #endif
4268
4269 void
4270 sched_belch(char *s, ...)
4271 {
4272   va_list ap;
4273   va_start(ap,s);
4274 #ifdef RTS_SUPPORTS_THREADS
4275   debugBelch("sched (task %p): ", osThreadId());
4276 #elif defined(PARALLEL_HASKELL)
4277   debugBelch("== ");
4278 #else
4279   debugBelch("sched: ");
4280 #endif
4281   vdebugBelch(s, ap);
4282   debugBelch("\n");
4283   va_end(ap);
4284 }
4285
4286 #endif /* DEBUG */