0ff9bbe1a0b356ca9d7412985e3a9f4accba009f
[ghc-hetmet.git] / ghc / rts / Schedule.c
1 /* ---------------------------------------------------------------------------
2  *
3  * (c) The GHC Team, 1998-2005
4  *
5  * The scheduler and thread-related functionality
6  *
7  * --------------------------------------------------------------------------*/
8
9 #include "PosixSource.h"
10 #include "Rts.h"
11 #include "SchedAPI.h"
12 #include "RtsUtils.h"
13 #include "RtsFlags.h"
14 #include "BlockAlloc.h"
15 #include "OSThreads.h"
16 #include "Storage.h"
17 #include "StgRun.h"
18 #include "Hooks.h"
19 #include "Schedule.h"
20 #include "StgMiscClosures.h"
21 #include "Interpreter.h"
22 #include "Exception.h"
23 #include "Printer.h"
24 #include "RtsSignals.h"
25 #include "Sanity.h"
26 #include "Stats.h"
27 #include "STM.h"
28 #include "Timer.h"
29 #include "Prelude.h"
30 #include "ThreadLabels.h"
31 #include "LdvProfile.h"
32 #include "Updates.h"
33 #ifdef PROFILING
34 #include "Proftimer.h"
35 #include "ProfHeap.h"
36 #endif
37 #if defined(GRAN) || defined(PARALLEL_HASKELL)
38 # include "GranSimRts.h"
39 # include "GranSim.h"
40 # include "ParallelRts.h"
41 # include "Parallel.h"
42 # include "ParallelDebug.h"
43 # include "FetchMe.h"
44 # include "HLC.h"
45 #endif
46 #include "Sparks.h"
47 #include "Capability.h"
48 #include "Task.h"
49 #include "AwaitEvent.h"
50 #if defined(mingw32_HOST_OS)
51 #include "win32/IOManager.h"
52 #endif
53
54 #ifdef HAVE_SYS_TYPES_H
55 #include <sys/types.h>
56 #endif
57 #ifdef HAVE_UNISTD_H
58 #include <unistd.h>
59 #endif
60
61 #include <string.h>
62 #include <stdlib.h>
63 #include <stdarg.h>
64
65 #ifdef HAVE_ERRNO_H
66 #include <errno.h>
67 #endif
68
69 // Turn off inlining when debugging - it obfuscates things
70 #ifdef DEBUG
71 # undef  STATIC_INLINE
72 # define STATIC_INLINE static
73 #endif
74
75 /* -----------------------------------------------------------------------------
76  * Global variables
77  * -------------------------------------------------------------------------- */
78
79 #if defined(GRAN)
80
81 StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */
82 /* rtsTime TimeOfNextEvent, EndOfTimeSlice;            now in GranSim.c */
83
84 /* 
85    In GranSim we have a runnable and a blocked queue for each processor.
86    In order to minimise code changes new arrays run_queue_hds/tls
87    are created. run_queue_hd is then a short cut (macro) for
88    run_queue_hds[CurrentProc] (see GranSim.h).
89    -- HWL
90 */
91 StgTSO *run_queue_hds[MAX_PROC], *run_queue_tls[MAX_PROC];
92 StgTSO *blocked_queue_hds[MAX_PROC], *blocked_queue_tls[MAX_PROC];
93 StgTSO *ccalling_threadss[MAX_PROC];
94 /* We use the same global list of threads (all_threads) in GranSim as in
95    the std RTS (i.e. we are cheating). However, we don't use this list in
96    the GranSim specific code at the moment (so we are only potentially
97    cheating).  */
98
99 #else /* !GRAN */
100
101 #if !defined(THREADED_RTS)
102 // Blocked/sleeping thrads
103 StgTSO *blocked_queue_hd = NULL;
104 StgTSO *blocked_queue_tl = NULL;
105 StgTSO *sleeping_queue = NULL;    // perhaps replace with a hash table?
106 #endif
107
108 /* Threads blocked on blackholes.
109  * LOCK: sched_mutex+capability, or all capabilities
110  */
111 StgTSO *blackhole_queue = NULL;
112 #endif
113
114 /* The blackhole_queue should be checked for threads to wake up.  See
115  * Schedule.h for more thorough comment.
116  * LOCK: none (doesn't matter if we miss an update)
117  */
118 rtsBool blackholes_need_checking = rtsFalse;
119
120 /* Linked list of all threads.
121  * Used for detecting garbage collected threads.
122  * LOCK: sched_mutex+capability, or all capabilities
123  */
124 StgTSO *all_threads = NULL;
125
126 /* flag set by signal handler to precipitate a context switch
127  * LOCK: none (just an advisory flag)
128  */
129 int context_switch = 0;
130
131 /* flag that tracks whether we have done any execution in this time slice.
132  * LOCK: currently none, perhaps we should lock (but needs to be
133  * updated in the fast path of the scheduler).
134  */
135 nat recent_activity = ACTIVITY_YES;
136
137 /* if this flag is set as well, give up execution
138  * LOCK: none (changes once, from false->true)
139  */
140 rtsBool sched_state = SCHED_RUNNING;
141
142 /* Next thread ID to allocate.
143  * LOCK: sched_mutex
144  */
145 static StgThreadID next_thread_id = 1;
146
147 /* The smallest stack size that makes any sense is:
148  *    RESERVED_STACK_WORDS    (so we can get back from the stack overflow)
149  *  + sizeofW(StgStopFrame)   (the stg_stop_thread_info frame)
150  *  + 1                       (the closure to enter)
151  *  + 1                       (stg_ap_v_ret)
152  *  + 1                       (spare slot req'd by stg_ap_v_ret)
153  *
154  * A thread with this stack will bomb immediately with a stack
155  * overflow, which will increase its stack size.  
156  */
157 #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3)
158
159 #if defined(GRAN)
160 StgTSO *CurrentTSO;
161 #endif
162
163 /*  This is used in `TSO.h' and gcc 2.96 insists that this variable actually 
164  *  exists - earlier gccs apparently didn't.
165  *  -= chak
166  */
167 StgTSO dummy_tso;
168
169 /*
170  * Set to TRUE when entering a shutdown state (via shutdownHaskellAndExit()) --
171  * in an MT setting, needed to signal that a worker thread shouldn't hang around
172  * in the scheduler when it is out of work.
173  */
174 rtsBool shutting_down_scheduler = rtsFalse;
175
176 /*
177  * This mutex protects most of the global scheduler data in
178  * the THREADED_RTS runtime.
179  */
180 #if defined(THREADED_RTS)
181 Mutex sched_mutex;
182 #endif
183
184 #if defined(PARALLEL_HASKELL)
185 StgTSO *LastTSO;
186 rtsTime TimeOfLastYield;
187 rtsBool emitSchedule = rtsTrue;
188 #endif
189
190 /* -----------------------------------------------------------------------------
191  * static function prototypes
192  * -------------------------------------------------------------------------- */
193
194 static Capability *schedule (Capability *initialCapability, Task *task);
195
196 //
197 // These function all encapsulate parts of the scheduler loop, and are
198 // abstracted only to make the structure and control flow of the
199 // scheduler clearer.
200 //
201 static void schedulePreLoop (void);
202 #if defined(THREADED_RTS)
203 static void schedulePushWork(Capability *cap, Task *task);
204 #endif
205 static void scheduleStartSignalHandlers (Capability *cap);
206 static void scheduleCheckBlockedThreads (Capability *cap);
207 static void scheduleCheckBlackHoles (Capability *cap);
208 static void scheduleDetectDeadlock (Capability *cap, Task *task);
209 #if defined(GRAN)
210 static StgTSO *scheduleProcessEvent(rtsEvent *event);
211 #endif
212 #if defined(PARALLEL_HASKELL)
213 static StgTSO *scheduleSendPendingMessages(void);
214 static void scheduleActivateSpark(void);
215 static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish);
216 #endif
217 #if defined(PAR) || defined(GRAN)
218 static void scheduleGranParReport(void);
219 #endif
220 static void schedulePostRunThread(void);
221 static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t );
222 static void scheduleHandleStackOverflow( Capability *cap, Task *task, 
223                                          StgTSO *t);
224 static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, 
225                                     nat prev_what_next );
226 static void scheduleHandleThreadBlocked( StgTSO *t );
227 static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
228                                              StgTSO *t );
229 static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc);
230 static Capability *scheduleDoGC(Capability *cap, Task *task,
231                                 rtsBool force_major, 
232                                 void (*get_roots)(evac_fn));
233
234 static void unblockThread(Capability *cap, StgTSO *tso);
235 static rtsBool checkBlackHoles(Capability *cap);
236 static void AllRoots(evac_fn evac);
237
238 static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
239
240 static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
241                         rtsBool stop_at_atomically, StgPtr stop_here);
242
243 static void deleteThread (Capability *cap, StgTSO *tso);
244 static void deleteAllThreads (Capability *cap);
245
246 #ifdef DEBUG
247 static void printThreadBlockage(StgTSO *tso);
248 static void printThreadStatus(StgTSO *tso);
249 void printThreadQueue(StgTSO *tso);
250 #endif
251
252 #if defined(PARALLEL_HASKELL)
253 StgTSO * createSparkThread(rtsSpark spark);
254 StgTSO * activateSpark (rtsSpark spark);  
255 #endif
256
257 #ifdef DEBUG
258 static char *whatNext_strs[] = {
259   "(unknown)",
260   "ThreadRunGHC",
261   "ThreadInterpret",
262   "ThreadKilled",
263   "ThreadRelocated",
264   "ThreadComplete"
265 };
266 #endif
267
268 /* -----------------------------------------------------------------------------
269  * Putting a thread on the run queue: different scheduling policies
270  * -------------------------------------------------------------------------- */
271
272 STATIC_INLINE void
273 addToRunQueue( Capability *cap, StgTSO *t )
274 {
275 #if defined(PARALLEL_HASKELL)
276     if (RtsFlags.ParFlags.doFairScheduling) { 
277         // this does round-robin scheduling; good for concurrency
278         appendToRunQueue(cap,t);
279     } else {
280         // this does unfair scheduling; good for parallelism
281         pushOnRunQueue(cap,t);
282     }
283 #else
284     // this does round-robin scheduling; good for concurrency
285     appendToRunQueue(cap,t);
286 #endif
287 }
288
289 /* ---------------------------------------------------------------------------
290    Main scheduling loop.
291
292    We use round-robin scheduling, each thread returning to the
293    scheduler loop when one of these conditions is detected:
294
295       * out of heap space
296       * timer expires (thread yields)
297       * thread blocks
298       * thread ends
299       * stack overflow
300
301    GRAN version:
302      In a GranSim setup this loop iterates over the global event queue.
303      This revolves around the global event queue, which determines what 
304      to do next. Therefore, it's more complicated than either the 
305      concurrent or the parallel (GUM) setup.
306
307    GUM version:
308      GUM iterates over incoming messages.
309      It starts with nothing to do (thus CurrentTSO == END_TSO_QUEUE),
310      and sends out a fish whenever it has nothing to do; in-between
311      doing the actual reductions (shared code below) it processes the
312      incoming messages and deals with delayed operations 
313      (see PendingFetches).
314      This is not the ugliest code you could imagine, but it's bloody close.
315
316    ------------------------------------------------------------------------ */
317
318 static Capability *
319 schedule (Capability *initialCapability, Task *task)
320 {
321   StgTSO *t;
322   Capability *cap;
323   StgThreadReturnCode ret;
324 #if defined(GRAN)
325   rtsEvent *event;
326 #elif defined(PARALLEL_HASKELL)
327   StgTSO *tso;
328   GlobalTaskId pe;
329   rtsBool receivedFinish = rtsFalse;
330 # if defined(DEBUG)
331   nat tp_size, sp_size; // stats only
332 # endif
333 #endif
334   nat prev_what_next;
335   rtsBool ready_to_gc;
336 #if defined(THREADED_RTS)
337   rtsBool first = rtsTrue;
338 #endif
339   
340   cap = initialCapability;
341
342   // Pre-condition: this task owns initialCapability.
343   // The sched_mutex is *NOT* held
344   // NB. on return, we still hold a capability.
345
346   IF_DEBUG(scheduler,
347            sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)",
348                        task, initialCapability);
349       );
350
351   schedulePreLoop();
352
353   // -----------------------------------------------------------
354   // Scheduler loop starts here:
355
356 #if defined(PARALLEL_HASKELL)
357 #define TERMINATION_CONDITION        (!receivedFinish)
358 #elif defined(GRAN)
359 #define TERMINATION_CONDITION        ((event = get_next_event()) != (rtsEvent*)NULL) 
360 #else
361 #define TERMINATION_CONDITION        rtsTrue
362 #endif
363
364   while (TERMINATION_CONDITION) {
365
366 #if defined(GRAN)
367       /* Choose the processor with the next event */
368       CurrentProc = event->proc;
369       CurrentTSO = event->tso;
370 #endif
371
372 #if defined(THREADED_RTS)
373       if (first) {
374           // don't yield the first time, we want a chance to run this
375           // thread for a bit, even if there are others banging at the
376           // door.
377           first = rtsFalse;
378           ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
379       } else {
380           // Yield the capability to higher-priority tasks if necessary.
381           yieldCapability(&cap, task);
382       }
383 #endif
384       
385 #if defined(THREADED_RTS)
386       schedulePushWork(cap,task);
387 #endif
388
389     // Check whether we have re-entered the RTS from Haskell without
390     // going via suspendThread()/resumeThread (i.e. a 'safe' foreign
391     // call).
392     if (cap->in_haskell) {
393           errorBelch("schedule: re-entered unsafely.\n"
394                      "   Perhaps a 'foreign import unsafe' should be 'safe'?");
395           stg_exit(EXIT_FAILURE);
396     }
397
398     // The interruption / shutdown sequence.
399     // 
400     // In order to cleanly shut down the runtime, we want to:
401     //   * make sure that all main threads return to their callers
402     //     with the state 'Interrupted'.
403     //   * clean up all OS threads assocated with the runtime
404     //   * free all memory etc.
405     //
406     // So the sequence for ^C goes like this:
407     //
408     //   * ^C handler sets sched_state := SCHED_INTERRUPTING and
409     //     arranges for some Capability to wake up
410     //
411     //   * all threads in the system are halted, and the zombies are
412     //     placed on the run queue for cleaning up.  We acquire all
413     //     the capabilities in order to delete the threads, this is
414     //     done by scheduleDoGC() for convenience (because GC already
415     //     needs to acquire all the capabilities).  We can't kill
416     //     threads involved in foreign calls.
417     // 
418     //   * sched_state := SCHED_INTERRUPTED
419     //
420     //   * somebody calls shutdownHaskell(), which calls exitScheduler()
421     //
422     //   * sched_state := SCHED_SHUTTING_DOWN
423     //
424     //   * all workers exit when the run queue on their capability
425     //     drains.  All main threads will also exit when their TSO
426     //     reaches the head of the run queue and they can return.
427     //
428     //   * eventually all Capabilities will shut down, and the RTS can
429     //     exit.
430     //
431     //   * We might be left with threads blocked in foreign calls, 
432     //     we should really attempt to kill these somehow (TODO);
433     
434     switch (sched_state) {
435     case SCHED_RUNNING:
436         break;
437     case SCHED_INTERRUPTING:
438         IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTING"));
439 #if defined(THREADED_RTS)
440         discardSparksCap(cap);
441 #endif
442         /* scheduleDoGC() deletes all the threads */
443         cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
444         break;
445     case SCHED_INTERRUPTED:
446         IF_DEBUG(scheduler, sched_belch("SCHED_INTERRUPTED"));
447         break;
448     case SCHED_SHUTTING_DOWN:
449         IF_DEBUG(scheduler, sched_belch("SCHED_SHUTTING_DOWN"));
450         // If we are a worker, just exit.  If we're a bound thread
451         // then we will exit below when we've removed our TSO from
452         // the run queue.
453         if (task->tso == NULL && emptyRunQueue(cap)) {
454             return cap;
455         }
456         break;
457     default:
458         barf("sched_state: %d", sched_state);
459     }
460
461 #if defined(THREADED_RTS)
462     // If the run queue is empty, take a spark and turn it into a thread.
463     {
464         if (emptyRunQueue(cap)) {
465             StgClosure *spark;
466             spark = findSpark(cap);
467             if (spark != NULL) {
468                 IF_DEBUG(scheduler,
469                          sched_belch("turning spark of closure %p into a thread",
470                                      (StgClosure *)spark));
471                 createSparkThread(cap,spark);     
472             }
473         }
474     }
475 #endif // THREADED_RTS
476
477     scheduleStartSignalHandlers(cap);
478
479     // Only check the black holes here if we've nothing else to do.
480     // During normal execution, the black hole list only gets checked
481     // at GC time, to avoid repeatedly traversing this possibly long
482     // list each time around the scheduler.
483     if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
484
485     scheduleCheckBlockedThreads(cap);
486
487     scheduleDetectDeadlock(cap,task);
488 #if defined(THREADED_RTS)
489     cap = task->cap;    // reload cap, it might have changed
490 #endif
491
492     // Normally, the only way we can get here with no threads to
493     // run is if a keyboard interrupt received during 
494     // scheduleCheckBlockedThreads() or scheduleDetectDeadlock().
495     // Additionally, it is not fatal for the
496     // threaded RTS to reach here with no threads to run.
497     //
498     // win32: might be here due to awaitEvent() being abandoned
499     // as a result of a console event having been delivered.
500     if ( emptyRunQueue(cap) ) {
501 #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
502         ASSERT(sched_state >= SCHED_INTERRUPTING);
503 #endif
504         continue; // nothing to do
505     }
506
507 #if defined(PARALLEL_HASKELL)
508     scheduleSendPendingMessages();
509     if (emptyRunQueue(cap) && scheduleActivateSpark()) 
510         continue;
511
512 #if defined(SPARKS)
513     ASSERT(next_fish_to_send_at==0);  // i.e. no delayed fishes left!
514 #endif
515
516     /* If we still have no work we need to send a FISH to get a spark
517        from another PE */
518     if (emptyRunQueue(cap)) {
519         if (!scheduleGetRemoteWork(&receivedFinish)) continue;
520         ASSERT(rtsFalse); // should not happen at the moment
521     }
522     // from here: non-empty run queue.
523     //  TODO: merge above case with this, only one call processMessages() !
524     if (PacketsWaiting()) {  /* process incoming messages, if
525                                 any pending...  only in else
526                                 because getRemoteWork waits for
527                                 messages as well */
528         receivedFinish = processMessages();
529     }
530 #endif
531
532 #if defined(GRAN)
533     scheduleProcessEvent(event);
534 #endif
535
536     // 
537     // Get a thread to run
538     //
539     t = popRunQueue(cap);
540
541 #if defined(GRAN) || defined(PAR)
542     scheduleGranParReport(); // some kind of debuging output
543 #else
544     // Sanity check the thread we're about to run.  This can be
545     // expensive if there is lots of thread switching going on...
546     IF_DEBUG(sanity,checkTSO(t));
547 #endif
548
549 #if defined(THREADED_RTS)
550     // Check whether we can run this thread in the current task.
551     // If not, we have to pass our capability to the right task.
552     {
553         Task *bound = t->bound;
554       
555         if (bound) {
556             if (bound == task) {
557                 IF_DEBUG(scheduler,
558                          sched_belch("### Running thread %d in bound thread",
559                                      t->id));
560                 // yes, the Haskell thread is bound to the current native thread
561             } else {
562                 IF_DEBUG(scheduler,
563                          sched_belch("### thread %d bound to another OS thread",
564                                      t->id));
565                 // no, bound to a different Haskell thread: pass to that thread
566                 pushOnRunQueue(cap,t);
567                 continue;
568             }
569         } else {
570             // The thread we want to run is unbound.
571             if (task->tso) { 
572                 IF_DEBUG(scheduler,
573                          sched_belch("### this OS thread cannot run thread %d", t->id));
574                 // no, the current native thread is bound to a different
575                 // Haskell thread, so pass it to any worker thread
576                 pushOnRunQueue(cap,t);
577                 continue; 
578             }
579         }
580     }
581 #endif
582
583     cap->r.rCurrentTSO = t;
584     
585     /* context switches are initiated by the timer signal, unless
586      * the user specified "context switch as often as possible", with
587      * +RTS -C0
588      */
589     if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
590         && !emptyThreadQueues(cap)) {
591         context_switch = 1;
592     }
593          
594 run_thread:
595
596     IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", 
597                               (long)t->id, whatNext_strs[t->what_next]));
598
599 #if defined(PROFILING)
600     startHeapProfTimer();
601 #endif
602
603     // ----------------------------------------------------------------------
604     // Run the current thread 
605
606     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
607
608     prev_what_next = t->what_next;
609
610     errno = t->saved_errno;
611     cap->in_haskell = rtsTrue;
612
613     dirtyTSO(t);
614
615     recent_activity = ACTIVITY_YES;
616
617     switch (prev_what_next) {
618         
619     case ThreadKilled:
620     case ThreadComplete:
621         /* Thread already finished, return to scheduler. */
622         ret = ThreadFinished;
623         break;
624         
625     case ThreadRunGHC:
626     {
627         StgRegTable *r;
628         r = StgRun((StgFunPtr) stg_returnToStackTop, &cap->r);
629         cap = regTableToCapability(r);
630         ret = r->rRet;
631         break;
632     }
633     
634     case ThreadInterpret:
635         cap = interpretBCO(cap);
636         ret = cap->r.rRet;
637         break;
638         
639     default:
640         barf("schedule: invalid what_next field");
641     }
642
643     cap->in_haskell = rtsFalse;
644
645     // The TSO might have moved, eg. if it re-entered the RTS and a GC
646     // happened.  So find the new location:
647     t = cap->r.rCurrentTSO;
648
649     // We have run some Haskell code: there might be blackhole-blocked
650     // threads to wake up now.
651     // Lock-free test here should be ok, we're just setting a flag.
652     if ( blackhole_queue != END_TSO_QUEUE ) {
653         blackholes_need_checking = rtsTrue;
654     }
655     
656     // And save the current errno in this thread.
657     // XXX: possibly bogus for SMP because this thread might already
658     // be running again, see code below.
659     t->saved_errno = errno;
660
661 #if defined(THREADED_RTS)
662     // If ret is ThreadBlocked, and this Task is bound to the TSO that
663     // blocked, we are in limbo - the TSO is now owned by whatever it
664     // is blocked on, and may in fact already have been woken up,
665     // perhaps even on a different Capability.  It may be the case
666     // that task->cap != cap.  We better yield this Capability
667     // immediately and return to normaility.
668     if (ret == ThreadBlocked) {
669         IF_DEBUG(scheduler,
670                  sched_belch("--<< thread %d (%s) stopped: blocked\n",
671                              t->id, whatNext_strs[t->what_next]));
672         continue;
673     }
674 #endif
675
676     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
677
678     // ----------------------------------------------------------------------
679     
680     // Costs for the scheduler are assigned to CCS_SYSTEM
681 #if defined(PROFILING)
682     stopHeapProfTimer();
683     CCCS = CCS_SYSTEM;
684 #endif
685     
686 #if defined(THREADED_RTS)
687     IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()););
688 #elif !defined(GRAN) && !defined(PARALLEL_HASKELL)
689     IF_DEBUG(scheduler,debugBelch("sched: "););
690 #endif
691     
692     schedulePostRunThread();
693
694     ready_to_gc = rtsFalse;
695
696     switch (ret) {
697     case HeapOverflow:
698         ready_to_gc = scheduleHandleHeapOverflow(cap,t);
699         break;
700
701     case StackOverflow:
702         scheduleHandleStackOverflow(cap,task,t);
703         break;
704
705     case ThreadYielding:
706         if (scheduleHandleYield(cap, t, prev_what_next)) {
707             // shortcut for switching between compiler/interpreter:
708             goto run_thread; 
709         }
710         break;
711
712     case ThreadBlocked:
713         scheduleHandleThreadBlocked(t);
714         break;
715
716     case ThreadFinished:
717         if (scheduleHandleThreadFinished(cap, task, t)) return cap;
718         ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
719         break;
720
721     default:
722       barf("schedule: invalid thread return code %d", (int)ret);
723     }
724
725     if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; }
726     if (ready_to_gc) {
727       cap = scheduleDoGC(cap,task,rtsFalse,GetRoots);
728     }
729   } /* end of while() */
730
731   IF_PAR_DEBUG(verbose,
732                debugBelch("== Leaving schedule() after having received Finish\n"));
733 }
734
735 /* ----------------------------------------------------------------------------
736  * Setting up the scheduler loop
737  * ------------------------------------------------------------------------- */
738
739 static void
740 schedulePreLoop(void)
741 {
742 #if defined(GRAN) 
743     /* set up first event to get things going */
744     /* ToDo: assign costs for system setup and init MainTSO ! */
745     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
746               ContinueThread, 
747               CurrentTSO, (StgClosure*)NULL, (rtsSpark*)NULL);
748     
749     IF_DEBUG(gran,
750              debugBelch("GRAN: Init CurrentTSO (in schedule) = %p\n", 
751                         CurrentTSO);
752              G_TSO(CurrentTSO, 5));
753     
754     if (RtsFlags.GranFlags.Light) {
755         /* Save current time; GranSim Light only */
756         CurrentTSO->gran.clock = CurrentTime[CurrentProc];
757     }      
758 #endif
759 }
760
761 /* -----------------------------------------------------------------------------
762  * schedulePushWork()
763  *
764  * Push work to other Capabilities if we have some.
765  * -------------------------------------------------------------------------- */
766
767 #if defined(THREADED_RTS)
768 static void
769 schedulePushWork(Capability *cap USED_IF_THREADS, 
770                  Task *task      USED_IF_THREADS)
771 {
772     Capability *free_caps[n_capabilities], *cap0;
773     nat i, n_free_caps;
774
775     // Check whether we have more threads on our run queue, or sparks
776     // in our pool, that we could hand to another Capability.
777     if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
778         && sparkPoolSizeCap(cap) < 2) {
779         return;
780     }
781
782     // First grab as many free Capabilities as we can.
783     for (i=0, n_free_caps=0; i < n_capabilities; i++) {
784         cap0 = &capabilities[i];
785         if (cap != cap0 && tryGrabCapability(cap0,task)) {
786             if (!emptyRunQueue(cap0) || cap->returning_tasks_hd != NULL) {
787                 // it already has some work, we just grabbed it at 
788                 // the wrong moment.  Or maybe it's deadlocked!
789                 releaseCapability(cap0);
790             } else {
791                 free_caps[n_free_caps++] = cap0;
792             }
793         }
794     }
795
796     // we now have n_free_caps free capabilities stashed in
797     // free_caps[].  Share our run queue equally with them.  This is
798     // probably the simplest thing we could do; improvements we might
799     // want to do include:
800     //
801     //   - giving high priority to moving relatively new threads, on 
802     //     the gournds that they haven't had time to build up a
803     //     working set in the cache on this CPU/Capability.
804     //
805     //   - giving low priority to moving long-lived threads
806
807     if (n_free_caps > 0) {
808         StgTSO *prev, *t, *next;
809         rtsBool pushed_to_all;
810
811         IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
812
813         i = 0;
814         pushed_to_all = rtsFalse;
815
816         if (cap->run_queue_hd != END_TSO_QUEUE) {
817             prev = cap->run_queue_hd;
818             t = prev->link;
819             prev->link = END_TSO_QUEUE;
820             for (; t != END_TSO_QUEUE; t = next) {
821                 next = t->link;
822                 t->link = END_TSO_QUEUE;
823                 if (t->what_next == ThreadRelocated
824                     || t->bound == task) { // don't move my bound thread
825                     prev->link = t;
826                     prev = t;
827                 } else if (i == n_free_caps) {
828                     pushed_to_all = rtsTrue;
829                     i = 0;
830                     // keep one for us
831                     prev->link = t;
832                     prev = t;
833                 } else {
834                     IF_DEBUG(scheduler, sched_belch("pushing thread %d to capability %d", t->id, free_caps[i]->no));
835                     appendToRunQueue(free_caps[i],t);
836                     if (t->bound) { t->bound->cap = free_caps[i]; }
837                     i++;
838                 }
839             }
840             cap->run_queue_tl = prev;
841         }
842
843         // If there are some free capabilities that we didn't push any
844         // threads to, then try to push a spark to each one.
845         if (!pushed_to_all) {
846             StgClosure *spark;
847             // i is the next free capability to push to
848             for (; i < n_free_caps; i++) {
849                 if (emptySparkPoolCap(free_caps[i])) {
850                     spark = findSpark(cap);
851                     if (spark != NULL) {
852                         IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
853                         newSpark(&(free_caps[i]->r), spark);
854                     }
855                 }
856             }
857         }
858
859         // release the capabilities
860         for (i = 0; i < n_free_caps; i++) {
861             task->cap = free_caps[i];
862             releaseCapability(free_caps[i]);
863         }
864     }
865     task->cap = cap; // reset to point to our Capability.
866 }
867 #endif
868
869 /* ----------------------------------------------------------------------------
870  * Start any pending signal handlers
871  * ------------------------------------------------------------------------- */
872
873 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
874 static void
875 scheduleStartSignalHandlers(Capability *cap)
876 {
877     if (signals_pending()) { // safe outside the lock
878         startSignalHandlers(cap);
879     }
880 }
881 #else
882 static void
883 scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
884 {
885 }
886 #endif
887
888 /* ----------------------------------------------------------------------------
889  * Check for blocked threads that can be woken up.
890  * ------------------------------------------------------------------------- */
891
892 static void
893 scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
894 {
895 #if !defined(THREADED_RTS)
896     //
897     // Check whether any waiting threads need to be woken up.  If the
898     // run queue is empty, and there are no other tasks running, we
899     // can wait indefinitely for something to happen.
900     //
901     if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) )
902     {
903         awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking );
904     }
905 #endif
906 }
907
908
909 /* ----------------------------------------------------------------------------
910  * Check for threads blocked on BLACKHOLEs that can be woken up
911  * ------------------------------------------------------------------------- */
912 static void
913 scheduleCheckBlackHoles (Capability *cap)
914 {
915     if ( blackholes_need_checking ) // check without the lock first
916     {
917         ACQUIRE_LOCK(&sched_mutex);
918         if ( blackholes_need_checking ) {
919             checkBlackHoles(cap);
920             blackholes_need_checking = rtsFalse;
921         }
922         RELEASE_LOCK(&sched_mutex);
923     }
924 }
925
926 /* ----------------------------------------------------------------------------
927  * Detect deadlock conditions and attempt to resolve them.
928  * ------------------------------------------------------------------------- */
929
930 static void
931 scheduleDetectDeadlock (Capability *cap, Task *task)
932 {
933
934 #if defined(PARALLEL_HASKELL)
935     // ToDo: add deadlock detection in GUM (similar to THREADED_RTS) -- HWL
936     return;
937 #endif
938
939     /* 
940      * Detect deadlock: when we have no threads to run, there are no
941      * threads blocked, waiting for I/O, or sleeping, and all the
942      * other tasks are waiting for work, we must have a deadlock of
943      * some description.
944      */
945     if ( emptyThreadQueues(cap) )
946     {
947 #if defined(THREADED_RTS)
948         /* 
949          * In the threaded RTS, we only check for deadlock if there
950          * has been no activity in a complete timeslice.  This means
951          * we won't eagerly start a full GC just because we don't have
952          * any threads to run currently.
953          */
954         if (recent_activity != ACTIVITY_INACTIVE) return;
955 #endif
956
957         IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC..."));
958
959         // Garbage collection can release some new threads due to
960         // either (a) finalizers or (b) threads resurrected because
961         // they are unreachable and will therefore be sent an
962         // exception.  Any threads thus released will be immediately
963         // runnable.
964         cap = scheduleDoGC (cap, task, rtsTrue/*force  major GC*/, GetRoots);
965
966         recent_activity = ACTIVITY_DONE_GC;
967         
968         if ( !emptyRunQueue(cap) ) return;
969
970 #if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
971         /* If we have user-installed signal handlers, then wait
972          * for signals to arrive rather then bombing out with a
973          * deadlock.
974          */
975         if ( anyUserHandlers() ) {
976             IF_DEBUG(scheduler, 
977                      sched_belch("still deadlocked, waiting for signals..."));
978
979             awaitUserSignals();
980
981             if (signals_pending()) {
982                 startSignalHandlers(cap);
983             }
984
985             // either we have threads to run, or we were interrupted:
986             ASSERT(!emptyRunQueue(cap) || sched_state >= SCHED_INTERRUPTING);
987         }
988 #endif
989
990 #if !defined(THREADED_RTS)
991         /* Probably a real deadlock.  Send the current main thread the
992          * Deadlock exception.
993          */
994         if (task->tso) {
995             switch (task->tso->why_blocked) {
996             case BlockedOnSTM:
997             case BlockedOnBlackHole:
998             case BlockedOnException:
999             case BlockedOnMVar:
1000                 raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure);
1001                 return;
1002             default:
1003                 barf("deadlock: main thread blocked in a strange way");
1004             }
1005         }
1006         return;
1007 #endif
1008     }
1009 }
1010
1011 /* ----------------------------------------------------------------------------
1012  * Process an event (GRAN only)
1013  * ------------------------------------------------------------------------- */
1014
1015 #if defined(GRAN)
1016 static StgTSO *
1017 scheduleProcessEvent(rtsEvent *event)
1018 {
1019     StgTSO *t;
1020
1021     if (RtsFlags.GranFlags.Light)
1022       GranSimLight_enter_system(event, &ActiveTSO); // adjust ActiveTSO etc
1023
1024     /* adjust time based on time-stamp */
1025     if (event->time > CurrentTime[CurrentProc] &&
1026         event->evttype != ContinueThread)
1027       CurrentTime[CurrentProc] = event->time;
1028     
1029     /* Deal with the idle PEs (may issue FindWork or MoveSpark events) */
1030     if (!RtsFlags.GranFlags.Light)
1031       handleIdlePEs();
1032
1033     IF_DEBUG(gran, debugBelch("GRAN: switch by event-type\n"));
1034
1035     /* main event dispatcher in GranSim */
1036     switch (event->evttype) {
1037       /* Should just be continuing execution */
1038     case ContinueThread:
1039       IF_DEBUG(gran, debugBelch("GRAN: doing ContinueThread\n"));
1040       /* ToDo: check assertion
1041       ASSERT(run_queue_hd != (StgTSO*)NULL &&
1042              run_queue_hd != END_TSO_QUEUE);
1043       */
1044       /* Ignore ContinueThreads for fetching threads (if synchr comm) */
1045       if (!RtsFlags.GranFlags.DoAsyncFetch &&
1046           procStatus[CurrentProc]==Fetching) {
1047         debugBelch("ghuH: Spurious ContinueThread while Fetching ignored; TSO %d (%p) [PE %d]\n",
1048               CurrentTSO->id, CurrentTSO, CurrentProc);
1049         goto next_thread;
1050       } 
1051       /* Ignore ContinueThreads for completed threads */
1052       if (CurrentTSO->what_next == ThreadComplete) {
1053         debugBelch("ghuH: found a ContinueThread event for completed thread %d (%p) [PE %d] (ignoring ContinueThread)\n", 
1054               CurrentTSO->id, CurrentTSO, CurrentProc);
1055         goto next_thread;
1056       } 
1057       /* Ignore ContinueThreads for threads that are being migrated */
1058       if (PROCS(CurrentTSO)==Nowhere) { 
1059         debugBelch("ghuH: trying to run the migrating TSO %d (%p) [PE %d] (ignoring ContinueThread)\n",
1060               CurrentTSO->id, CurrentTSO, CurrentProc);
1061         goto next_thread;
1062       }
1063       /* The thread should be at the beginning of the run queue */
1064       if (CurrentTSO!=run_queue_hds[CurrentProc]) { 
1065         debugBelch("ghuH: TSO %d (%p) [PE %d] is not at the start of the run_queue when doing a ContinueThread\n",
1066               CurrentTSO->id, CurrentTSO, CurrentProc);
1067         break; // run the thread anyway
1068       }
1069       /*
1070       new_event(proc, proc, CurrentTime[proc],
1071                 FindWork,
1072                 (StgTSO*)NULL, (StgClosure*)NULL, (rtsSpark*)NULL);
1073       goto next_thread; 
1074       */ /* Catches superfluous CONTINUEs -- should be unnecessary */
1075       break; // now actually run the thread; DaH Qu'vam yImuHbej 
1076
1077     case FetchNode:
1078       do_the_fetchnode(event);
1079       goto next_thread;             /* handle next event in event queue  */
1080       
1081     case GlobalBlock:
1082       do_the_globalblock(event);
1083       goto next_thread;             /* handle next event in event queue  */
1084       
1085     case FetchReply:
1086       do_the_fetchreply(event);
1087       goto next_thread;             /* handle next event in event queue  */
1088       
1089     case UnblockThread:   /* Move from the blocked queue to the tail of */
1090       do_the_unblock(event);
1091       goto next_thread;             /* handle next event in event queue  */
1092       
1093     case ResumeThread:  /* Move from the blocked queue to the tail of */
1094       /* the runnable queue ( i.e. Qu' SImqa'lu') */ 
1095       event->tso->gran.blocktime += 
1096         CurrentTime[CurrentProc] - event->tso->gran.blockedat;
1097       do_the_startthread(event);
1098       goto next_thread;             /* handle next event in event queue  */
1099       
1100     case StartThread:
1101       do_the_startthread(event);
1102       goto next_thread;             /* handle next event in event queue  */
1103       
1104     case MoveThread:
1105       do_the_movethread(event);
1106       goto next_thread;             /* handle next event in event queue  */
1107       
1108     case MoveSpark:
1109       do_the_movespark(event);
1110       goto next_thread;             /* handle next event in event queue  */
1111       
1112     case FindWork:
1113       do_the_findwork(event);
1114       goto next_thread;             /* handle next event in event queue  */
1115       
1116     default:
1117       barf("Illegal event type %u\n", event->evttype);
1118     }  /* switch */
1119     
1120     /* This point was scheduler_loop in the old RTS */
1121
1122     IF_DEBUG(gran, debugBelch("GRAN: after main switch\n"));
1123
1124     TimeOfLastEvent = CurrentTime[CurrentProc];
1125     TimeOfNextEvent = get_time_of_next_event();
1126     IgnoreEvents=(TimeOfNextEvent==0); // HWL HACK
1127     // CurrentTSO = ThreadQueueHd;
1128
1129     IF_DEBUG(gran, debugBelch("GRAN: time of next event is: %ld\n", 
1130                          TimeOfNextEvent));
1131
1132     if (RtsFlags.GranFlags.Light) 
1133       GranSimLight_leave_system(event, &ActiveTSO); 
1134
1135     EndOfTimeSlice = CurrentTime[CurrentProc]+RtsFlags.GranFlags.time_slice;
1136
1137     IF_DEBUG(gran, 
1138              debugBelch("GRAN: end of time-slice is %#lx\n", EndOfTimeSlice));
1139
1140     /* in a GranSim setup the TSO stays on the run queue */
1141     t = CurrentTSO;
1142     /* Take a thread from the run queue. */
1143     POP_RUN_QUEUE(t); // take_off_run_queue(t);
1144
1145     IF_DEBUG(gran, 
1146              debugBelch("GRAN: About to run current thread, which is\n");
1147              G_TSO(t,5));
1148
1149     context_switch = 0; // turned on via GranYield, checking events and time slice
1150
1151     IF_DEBUG(gran, 
1152              DumpGranEvent(GR_SCHEDULE, t));
1153
1154     procStatus[CurrentProc] = Busy;
1155 }
1156 #endif // GRAN
1157
1158 /* ----------------------------------------------------------------------------
1159  * Send pending messages (PARALLEL_HASKELL only)
1160  * ------------------------------------------------------------------------- */
1161
1162 #if defined(PARALLEL_HASKELL)
1163 static StgTSO *
1164 scheduleSendPendingMessages(void)
1165 {
1166     StgSparkPool *pool;
1167     rtsSpark spark;
1168     StgTSO *t;
1169
1170 # if defined(PAR) // global Mem.Mgmt., omit for now
1171     if (PendingFetches != END_BF_QUEUE) {
1172         processFetches();
1173     }
1174 # endif
1175     
1176     if (RtsFlags.ParFlags.BufferTime) {
1177         // if we use message buffering, we must send away all message
1178         // packets which have become too old...
1179         sendOldBuffers(); 
1180     }
1181 }
1182 #endif
1183
1184 /* ----------------------------------------------------------------------------
1185  * Activate spark threads (PARALLEL_HASKELL only)
1186  * ------------------------------------------------------------------------- */
1187
1188 #if defined(PARALLEL_HASKELL)
1189 static void
1190 scheduleActivateSpark(void)
1191 {
1192 #if defined(SPARKS)
1193   ASSERT(emptyRunQueue());
1194 /* We get here if the run queue is empty and want some work.
1195    We try to turn a spark into a thread, and add it to the run queue,
1196    from where it will be picked up in the next iteration of the scheduler
1197    loop.
1198 */
1199
1200       /* :-[  no local threads => look out for local sparks */
1201       /* the spark pool for the current PE */
1202       pool = &(cap.r.rSparks); // JB: cap = (old) MainCap
1203       if (advisory_thread_count < RtsFlags.ParFlags.maxThreads &&
1204           pool->hd < pool->tl) {
1205         /* 
1206          * ToDo: add GC code check that we really have enough heap afterwards!!
1207          * Old comment:
1208          * If we're here (no runnable threads) and we have pending
1209          * sparks, we must have a space problem.  Get enough space
1210          * to turn one of those pending sparks into a
1211          * thread... 
1212          */
1213
1214         spark = findSpark(rtsFalse);            /* get a spark */
1215         if (spark != (rtsSpark) NULL) {
1216           tso = createThreadFromSpark(spark);       /* turn the spark into a thread */
1217           IF_PAR_DEBUG(fish, // schedule,
1218                        debugBelch("==== schedule: Created TSO %d (%p); %d threads active\n",
1219                              tso->id, tso, advisory_thread_count));
1220
1221           if (tso==END_TSO_QUEUE) { /* failed to activate spark->back to loop */
1222             IF_PAR_DEBUG(fish, // schedule,
1223                          debugBelch("==^^ failed to create thread from spark @ %lx\n",
1224                             spark));
1225             return rtsFalse; /* failed to generate a thread */
1226           }                  /* otherwise fall through & pick-up new tso */
1227         } else {
1228           IF_PAR_DEBUG(fish, // schedule,
1229                        debugBelch("==^^ no local sparks (spark pool contains only NFs: %d)\n", 
1230                              spark_queue_len(pool)));
1231           return rtsFalse;  /* failed to generate a thread */
1232         }
1233         return rtsTrue;  /* success in generating a thread */
1234   } else { /* no more threads permitted or pool empty */
1235     return rtsFalse;  /* failed to generateThread */
1236   }
1237 #else
1238   tso = NULL; // avoid compiler warning only
1239   return rtsFalse;  /* dummy in non-PAR setup */
1240 #endif // SPARKS
1241 }
1242 #endif // PARALLEL_HASKELL
1243
1244 /* ----------------------------------------------------------------------------
1245  * Get work from a remote node (PARALLEL_HASKELL only)
1246  * ------------------------------------------------------------------------- */
1247     
1248 #if defined(PARALLEL_HASKELL)
1249 static rtsBool
1250 scheduleGetRemoteWork(rtsBool *receivedFinish)
1251 {
1252   ASSERT(emptyRunQueue());
1253
1254   if (RtsFlags.ParFlags.BufferTime) {
1255         IF_PAR_DEBUG(verbose, 
1256                 debugBelch("...send all pending data,"));
1257         {
1258           nat i;
1259           for (i=1; i<=nPEs; i++)
1260             sendImmediately(i); // send all messages away immediately
1261         }
1262   }
1263 # ifndef SPARKS
1264         //++EDEN++ idle() , i.e. send all buffers, wait for work
1265         // suppress fishing in EDEN... just look for incoming messages
1266         // (blocking receive)
1267   IF_PAR_DEBUG(verbose, 
1268                debugBelch("...wait for incoming messages...\n"));
1269   *receivedFinish = processMessages(); // blocking receive...
1270
1271         // and reenter scheduling loop after having received something
1272         // (return rtsFalse below)
1273
1274 # else /* activate SPARKS machinery */
1275 /* We get here, if we have no work, tried to activate a local spark, but still
1276    have no work. We try to get a remote spark, by sending a FISH message.
1277    Thread migration should be added here, and triggered when a sequence of 
1278    fishes returns without work. */
1279         delay = (RtsFlags.ParFlags.fishDelay!=0ll ? RtsFlags.ParFlags.fishDelay : 0ll);
1280
1281       /* =8-[  no local sparks => look for work on other PEs */
1282         /*
1283          * We really have absolutely no work.  Send out a fish
1284          * (there may be some out there already), and wait for
1285          * something to arrive.  We clearly can't run any threads
1286          * until a SCHEDULE or RESUME arrives, and so that's what
1287          * we're hoping to see.  (Of course, we still have to
1288          * respond to other types of messages.)
1289          */
1290         rtsTime now = msTime() /*CURRENT_TIME*/;
1291         IF_PAR_DEBUG(verbose, 
1292                      debugBelch("--  now=%ld\n", now));
1293         IF_PAR_DEBUG(fish, // verbose,
1294              if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1295                  (last_fish_arrived_at!=0 &&
1296                   last_fish_arrived_at+delay > now)) {
1297                debugBelch("--$$ <%llu> delaying FISH until %llu (last fish %llu, delay %llu)\n",
1298                      now, last_fish_arrived_at+delay, 
1299                      last_fish_arrived_at,
1300                      delay);
1301              });
1302   
1303         if (outstandingFishes < RtsFlags.ParFlags.maxFishes &&
1304             advisory_thread_count < RtsFlags.ParFlags.maxThreads) { // send a FISH, but when?
1305           if (last_fish_arrived_at==0 ||
1306               (last_fish_arrived_at+delay <= now)) {           // send FISH now!
1307             /* outstandingFishes is set in sendFish, processFish;
1308                avoid flooding system with fishes via delay */
1309     next_fish_to_send_at = 0;  
1310   } else {
1311     /* ToDo: this should be done in the main scheduling loop to avoid the
1312              busy wait here; not so bad if fish delay is very small  */
1313     int iq = 0; // DEBUGGING -- HWL
1314     next_fish_to_send_at = last_fish_arrived_at+delay; // remember when to send  
1315     /* send a fish when ready, but process messages that arrive in the meantime */
1316     do {
1317       if (PacketsWaiting()) {
1318         iq++; // DEBUGGING
1319         *receivedFinish = processMessages();
1320       }
1321       now = msTime();
1322     } while (!*receivedFinish || now<next_fish_to_send_at);
1323     // JB: This means the fish could become obsolete, if we receive
1324     // work. Better check for work again? 
1325     // last line: while (!receivedFinish || !haveWork || now<...)
1326     // next line: if (receivedFinish || haveWork )
1327
1328     if (*receivedFinish) // no need to send a FISH if we are finishing anyway
1329       return rtsFalse;  // NB: this will leave scheduler loop
1330                         // immediately after return!
1331                           
1332     IF_PAR_DEBUG(fish, // verbose,
1333                debugBelch("--$$ <%llu> sent delayed fish (%d processMessages); active/total threads=%d/%d\n",now,iq,run_queue_len(),advisory_thread_count));
1334
1335   }
1336
1337     // JB: IMHO, this should all be hidden inside sendFish(...)
1338     /* pe = choosePE(); 
1339        sendFish(pe, thisPE, NEW_FISH_AGE, NEW_FISH_HISTORY, 
1340                 NEW_FISH_HUNGER);
1341
1342     // Global statistics: count no. of fishes
1343     if (RtsFlags.ParFlags.ParStats.Global &&
1344          RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
1345            globalParStats.tot_fish_mess++;
1346            }
1347     */ 
1348
1349   /* delayed fishes must have been sent by now! */
1350   next_fish_to_send_at = 0;  
1351   }
1352       
1353   *receivedFinish = processMessages();
1354 # endif /* SPARKS */
1355
1356  return rtsFalse;
1357  /* NB: this function always returns rtsFalse, meaning the scheduler
1358     loop continues with the next iteration; 
1359     rationale: 
1360       return code means success in finding work; we enter this function
1361       if there is no local work, thus have to send a fish which takes
1362       time until it arrives with work; in the meantime we should process
1363       messages in the main loop;
1364  */
1365 }
1366 #endif // PARALLEL_HASKELL
1367
1368 /* ----------------------------------------------------------------------------
1369  * PAR/GRAN: Report stats & debugging info(?)
1370  * ------------------------------------------------------------------------- */
1371
1372 #if defined(PAR) || defined(GRAN)
1373 static void
1374 scheduleGranParReport(void)
1375 {
1376   ASSERT(run_queue_hd != END_TSO_QUEUE);
1377
1378   /* Take a thread from the run queue, if we have work */
1379   POP_RUN_QUEUE(t);  // take_off_run_queue(END_TSO_QUEUE);
1380
1381     /* If this TSO has got its outport closed in the meantime, 
1382      *   it mustn't be run. Instead, we have to clean it up as if it was finished.
1383      * It has to be marked as TH_DEAD for this purpose.
1384      * If it is TH_TERM instead, it is supposed to have finished in the normal way.
1385
1386 JB: TODO: investigate wether state change field could be nuked
1387      entirely and replaced by the normal tso state (whatnext
1388      field). All we want to do is to kill tsos from outside.
1389      */
1390
1391     /* ToDo: write something to the log-file
1392     if (RTSflags.ParFlags.granSimStats && !sameThread)
1393         DumpGranEvent(GR_SCHEDULE, RunnableThreadsHd);
1394
1395     CurrentTSO = t;
1396     */
1397     /* the spark pool for the current PE */
1398     pool = &(cap.r.rSparks); //  cap = (old) MainCap
1399
1400     IF_DEBUG(scheduler, 
1401              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1402                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1403
1404     IF_PAR_DEBUG(fish,
1405              debugBelch("--=^ %d threads, %d sparks on [%#x]\n", 
1406                    run_queue_len(), spark_queue_len(pool), CURRENT_PROC));
1407
1408     if (RtsFlags.ParFlags.ParStats.Full && 
1409         (t->par.sparkname != (StgInt)0) && // only log spark generated threads
1410         (emitSchedule || // forced emit
1411          (t && LastTSO && t->id != LastTSO->id))) {
1412       /* 
1413          we are running a different TSO, so write a schedule event to log file
1414          NB: If we use fair scheduling we also have to write  a deschedule 
1415              event for LastTSO; with unfair scheduling we know that the
1416              previous tso has blocked whenever we switch to another tso, so
1417              we don't need it in GUM for now
1418       */
1419       IF_PAR_DEBUG(fish, // schedule,
1420                    debugBelch("____ scheduling spark generated thread %d (%lx) (%lx) via a forced emit\n",t->id,t,t->par.sparkname));
1421
1422       DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1423                        GR_SCHEDULE, t, (StgClosure *)NULL, 0, 0);
1424       emitSchedule = rtsFalse;
1425     }
1426 }     
1427 #endif
1428
1429 /* ----------------------------------------------------------------------------
1430  * After running a thread...
1431  * ------------------------------------------------------------------------- */
1432
1433 static void
1434 schedulePostRunThread(void)
1435 {
1436 #if defined(PAR)
1437     /* HACK 675: if the last thread didn't yield, make sure to print a 
1438        SCHEDULE event to the log file when StgRunning the next thread, even
1439        if it is the same one as before */
1440     LastTSO = t; 
1441     TimeOfLastYield = CURRENT_TIME;
1442 #endif
1443
1444   /* some statistics gathering in the parallel case */
1445
1446 #if defined(GRAN) || defined(PAR) || defined(EDEN)
1447   switch (ret) {
1448     case HeapOverflow:
1449 # if defined(GRAN)
1450       IF_DEBUG(gran, DumpGranEvent(GR_DESCHEDULE, t));
1451       globalGranStats.tot_heapover++;
1452 # elif defined(PAR)
1453       globalParStats.tot_heapover++;
1454 # endif
1455       break;
1456
1457      case StackOverflow:
1458 # if defined(GRAN)
1459       IF_DEBUG(gran, 
1460                DumpGranEvent(GR_DESCHEDULE, t));
1461       globalGranStats.tot_stackover++;
1462 # elif defined(PAR)
1463       // IF_DEBUG(par, 
1464       // DumpGranEvent(GR_DESCHEDULE, t);
1465       globalParStats.tot_stackover++;
1466 # endif
1467       break;
1468
1469     case ThreadYielding:
1470 # if defined(GRAN)
1471       IF_DEBUG(gran, 
1472                DumpGranEvent(GR_DESCHEDULE, t));
1473       globalGranStats.tot_yields++;
1474 # elif defined(PAR)
1475       // IF_DEBUG(par, 
1476       // DumpGranEvent(GR_DESCHEDULE, t);
1477       globalParStats.tot_yields++;
1478 # endif
1479       break; 
1480
1481     case ThreadBlocked:
1482 # if defined(GRAN)
1483       IF_DEBUG(scheduler,
1484                debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: ", 
1485                           t->id, t, whatNext_strs[t->what_next], t->block_info.closure, 
1486                           (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1487                if (t->block_info.closure!=(StgClosure*)NULL)
1488                  print_bq(t->block_info.closure);
1489                debugBelch("\n"));
1490
1491       // ??? needed; should emit block before
1492       IF_DEBUG(gran, 
1493                DumpGranEvent(GR_DESCHEDULE, t)); 
1494       prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1495       /*
1496         ngoq Dogh!
1497       ASSERT(procStatus[CurrentProc]==Busy || 
1498               ((procStatus[CurrentProc]==Fetching) && 
1499               (t->block_info.closure!=(StgClosure*)NULL)));
1500       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1501           !(!RtsFlags.GranFlags.DoAsyncFetch &&
1502             procStatus[CurrentProc]==Fetching)) 
1503         procStatus[CurrentProc] = Idle;
1504       */
1505 # elif defined(PAR)
1506 //++PAR++  blockThread() writes the event (change?)
1507 # endif
1508     break;
1509
1510   case ThreadFinished:
1511     break;
1512
1513   default:
1514     barf("parGlobalStats: unknown return code");
1515     break;
1516     }
1517 #endif
1518 }
1519
1520 /* -----------------------------------------------------------------------------
1521  * Handle a thread that returned to the scheduler with ThreadHeepOverflow
1522  * -------------------------------------------------------------------------- */
1523
1524 static rtsBool
1525 scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
1526 {
1527     // did the task ask for a large block?
1528     if (cap->r.rHpAlloc > BLOCK_SIZE) {
1529         // if so, get one and push it on the front of the nursery.
1530         bdescr *bd;
1531         lnat blocks;
1532         
1533         blocks = (lnat)BLOCK_ROUND_UP(cap->r.rHpAlloc) / BLOCK_SIZE;
1534         
1535         IF_DEBUG(scheduler,
1536                  debugBelch("--<< thread %ld (%s) stopped: requesting a large block (size %ld)\n", 
1537                             (long)t->id, whatNext_strs[t->what_next], blocks));
1538         
1539         // don't do this if the nursery is (nearly) full, we'll GC first.
1540         if (cap->r.rCurrentNursery->link != NULL ||
1541             cap->r.rNursery->n_blocks == 1) {  // paranoia to prevent infinite loop
1542                                                // if the nursery has only one block.
1543             
1544             ACQUIRE_SM_LOCK
1545             bd = allocGroup( blocks );
1546             RELEASE_SM_LOCK
1547             cap->r.rNursery->n_blocks += blocks;
1548             
1549             // link the new group into the list
1550             bd->link = cap->r.rCurrentNursery;
1551             bd->u.back = cap->r.rCurrentNursery->u.back;
1552             if (cap->r.rCurrentNursery->u.back != NULL) {
1553                 cap->r.rCurrentNursery->u.back->link = bd;
1554             } else {
1555 #if !defined(THREADED_RTS)
1556                 ASSERT(g0s0->blocks == cap->r.rCurrentNursery &&
1557                        g0s0 == cap->r.rNursery);
1558 #endif
1559                 cap->r.rNursery->blocks = bd;
1560             }             
1561             cap->r.rCurrentNursery->u.back = bd;
1562             
1563             // initialise it as a nursery block.  We initialise the
1564             // step, gen_no, and flags field of *every* sub-block in
1565             // this large block, because this is easier than making
1566             // sure that we always find the block head of a large
1567             // block whenever we call Bdescr() (eg. evacuate() and
1568             // isAlive() in the GC would both have to do this, at
1569             // least).
1570             { 
1571                 bdescr *x;
1572                 for (x = bd; x < bd + blocks; x++) {
1573                     x->step = cap->r.rNursery;
1574                     x->gen_no = 0;
1575                     x->flags = 0;
1576                 }
1577             }
1578             
1579             // This assert can be a killer if the app is doing lots
1580             // of large block allocations.
1581             IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery));
1582             
1583             // now update the nursery to point to the new block
1584             cap->r.rCurrentNursery = bd;
1585             
1586             // we might be unlucky and have another thread get on the
1587             // run queue before us and steal the large block, but in that
1588             // case the thread will just end up requesting another large
1589             // block.
1590             pushOnRunQueue(cap,t);
1591             return rtsFalse;  /* not actually GC'ing */
1592         }
1593     }
1594     
1595     IF_DEBUG(scheduler,
1596              debugBelch("--<< thread %ld (%s) stopped: HeapOverflow\n", 
1597                         (long)t->id, whatNext_strs[t->what_next]));
1598 #if defined(GRAN)
1599     ASSERT(!is_on_queue(t,CurrentProc));
1600 #elif defined(PARALLEL_HASKELL)
1601     /* Currently we emit a DESCHEDULE event before GC in GUM.
1602        ToDo: either add separate event to distinguish SYSTEM time from rest
1603        or just nuke this DESCHEDULE (and the following SCHEDULE) */
1604     if (0 && RtsFlags.ParFlags.ParStats.Full) {
1605         DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
1606                          GR_DESCHEDULE, t, (StgClosure *)NULL, 0, 0);
1607         emitSchedule = rtsTrue;
1608     }
1609 #endif
1610       
1611     pushOnRunQueue(cap,t);
1612     return rtsTrue;
1613     /* actual GC is done at the end of the while loop in schedule() */
1614 }
1615
1616 /* -----------------------------------------------------------------------------
1617  * Handle a thread that returned to the scheduler with ThreadStackOverflow
1618  * -------------------------------------------------------------------------- */
1619
1620 static void
1621 scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
1622 {
1623     IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", 
1624                                   (long)t->id, whatNext_strs[t->what_next]));
1625     /* just adjust the stack for this thread, then pop it back
1626      * on the run queue.
1627      */
1628     { 
1629         /* enlarge the stack */
1630         StgTSO *new_t = threadStackOverflow(cap, t);
1631         
1632         /* The TSO attached to this Task may have moved, so update the
1633          * pointer to it.
1634          */
1635         if (task->tso == t) {
1636             task->tso = new_t;
1637         }
1638         pushOnRunQueue(cap,new_t);
1639     }
1640 }
1641
1642 /* -----------------------------------------------------------------------------
1643  * Handle a thread that returned to the scheduler with ThreadYielding
1644  * -------------------------------------------------------------------------- */
1645
1646 static rtsBool
1647 scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
1648 {
1649     // Reset the context switch flag.  We don't do this just before
1650     // running the thread, because that would mean we would lose ticks
1651     // during GC, which can lead to unfair scheduling (a thread hogs
1652     // the CPU because the tick always arrives during GC).  This way
1653     // penalises threads that do a lot of allocation, but that seems
1654     // better than the alternative.
1655     context_switch = 0;
1656     
1657     /* put the thread back on the run queue.  Then, if we're ready to
1658      * GC, check whether this is the last task to stop.  If so, wake
1659      * up the GC thread.  getThread will block during a GC until the
1660      * GC is finished.
1661      */
1662     IF_DEBUG(scheduler,
1663              if (t->what_next != prev_what_next) {
1664                  debugBelch("--<< thread %ld (%s) stopped to switch evaluators\n", 
1665                             (long)t->id, whatNext_strs[t->what_next]);
1666              } else {
1667                  debugBelch("--<< thread %ld (%s) stopped, yielding\n",
1668                             (long)t->id, whatNext_strs[t->what_next]);
1669              }
1670         );
1671     
1672     IF_DEBUG(sanity,
1673              //debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
1674              checkTSO(t));
1675     ASSERT(t->link == END_TSO_QUEUE);
1676     
1677     // Shortcut if we're just switching evaluators: don't bother
1678     // doing stack squeezing (which can be expensive), just run the
1679     // thread.
1680     if (t->what_next != prev_what_next) {
1681         return rtsTrue;
1682     }
1683     
1684 #if defined(GRAN)
1685     ASSERT(!is_on_queue(t,CurrentProc));
1686       
1687     IF_DEBUG(sanity,
1688              //debugBelch("&& Doing sanity check on all ThreadQueues (and their TSOs).");
1689              checkThreadQsSanity(rtsTrue));
1690
1691 #endif
1692
1693     addToRunQueue(cap,t);
1694
1695 #if defined(GRAN)
1696     /* add a ContinueThread event to actually process the thread */
1697     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
1698               ContinueThread,
1699               t, (StgClosure*)NULL, (rtsSpark*)NULL);
1700     IF_GRAN_DEBUG(bq, 
1701                   debugBelch("GRAN: eventq and runnableq after adding yielded thread to queue again:\n");
1702                   G_EVENTQ(0);
1703                   G_CURR_THREADQ(0));
1704 #endif
1705     return rtsFalse;
1706 }
1707
1708 /* -----------------------------------------------------------------------------
1709  * Handle a thread that returned to the scheduler with ThreadBlocked
1710  * -------------------------------------------------------------------------- */
1711
1712 static void
1713 scheduleHandleThreadBlocked( StgTSO *t
1714 #if !defined(GRAN) && !defined(DEBUG)
1715     STG_UNUSED
1716 #endif
1717     )
1718 {
1719 #if defined(GRAN)
1720     IF_DEBUG(scheduler,
1721              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p [PE %d] with BQ: \n", 
1722                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure, (t->block_info.closure==(StgClosure*)NULL ? 99 : where_is(t->block_info.closure)));
1723              if (t->block_info.closure!=(StgClosure*)NULL) print_bq(t->block_info.closure));
1724     
1725     // ??? needed; should emit block before
1726     IF_DEBUG(gran, 
1727              DumpGranEvent(GR_DESCHEDULE, t)); 
1728     prune_eventq(t, (StgClosure *)NULL); // prune ContinueThreads for t
1729     /*
1730       ngoq Dogh!
1731       ASSERT(procStatus[CurrentProc]==Busy || 
1732       ((procStatus[CurrentProc]==Fetching) && 
1733       (t->block_info.closure!=(StgClosure*)NULL)));
1734       if (run_queue_hds[CurrentProc] == END_TSO_QUEUE &&
1735       !(!RtsFlags.GranFlags.DoAsyncFetch &&
1736       procStatus[CurrentProc]==Fetching)) 
1737       procStatus[CurrentProc] = Idle;
1738     */
1739 #elif defined(PAR)
1740     IF_DEBUG(scheduler,
1741              debugBelch("--<< thread %ld (%p; %s) stopped, blocking on node %p with BQ: \n", 
1742                         t->id, t, whatNext_strs[t->what_next], t->block_info.closure));
1743     IF_PAR_DEBUG(bq,
1744                  
1745                  if (t->block_info.closure!=(StgClosure*)NULL) 
1746                  print_bq(t->block_info.closure));
1747     
1748     /* Send a fetch (if BlockedOnGA) and dump event to log file */
1749     blockThread(t);
1750     
1751     /* whatever we schedule next, we must log that schedule */
1752     emitSchedule = rtsTrue;
1753     
1754 #else /* !GRAN */
1755
1756       // We don't need to do anything.  The thread is blocked, and it
1757       // has tidied up its stack and placed itself on whatever queue
1758       // it needs to be on.
1759
1760 #if !defined(THREADED_RTS)
1761     ASSERT(t->why_blocked != NotBlocked);
1762              // This might not be true under THREADED_RTS: we don't have
1763              // exclusive access to this TSO, so someone might have
1764              // woken it up by now.  This actually happens: try
1765              // conc023 +RTS -N2.
1766 #endif
1767
1768     IF_DEBUG(scheduler,
1769              debugBelch("--<< thread %d (%s) stopped: ", 
1770                         t->id, whatNext_strs[t->what_next]);
1771              printThreadBlockage(t);
1772              debugBelch("\n"));
1773     
1774     /* Only for dumping event to log file 
1775        ToDo: do I need this in GranSim, too?
1776        blockThread(t);
1777     */
1778 #endif
1779 }
1780
1781 /* -----------------------------------------------------------------------------
1782  * Handle a thread that returned to the scheduler with ThreadFinished
1783  * -------------------------------------------------------------------------- */
1784
1785 static rtsBool
1786 scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
1787 {
1788     /* Need to check whether this was a main thread, and if so,
1789      * return with the return value.
1790      *
1791      * We also end up here if the thread kills itself with an
1792      * uncaught exception, see Exception.cmm.
1793      */
1794     IF_DEBUG(scheduler,debugBelch("--++ thread %d (%s) finished\n", 
1795                                   t->id, whatNext_strs[t->what_next]));
1796
1797 #if defined(GRAN)
1798       endThread(t, CurrentProc); // clean-up the thread
1799 #elif defined(PARALLEL_HASKELL)
1800       /* For now all are advisory -- HWL */
1801       //if(t->priority==AdvisoryPriority) ??
1802       advisory_thread_count--; // JB: Caution with this counter, buggy!
1803       
1804 # if defined(DIST)
1805       if(t->dist.priority==RevalPriority)
1806         FinishReval(t);
1807 # endif
1808     
1809 # if defined(EDENOLD)
1810       // the thread could still have an outport... (BUG)
1811       if (t->eden.outport != -1) {
1812       // delete the outport for the tso which has finished...
1813         IF_PAR_DEBUG(eden_ports,
1814                    debugBelch("WARNING: Scheduler removes outport %d for TSO %d.\n",
1815                               t->eden.outport, t->id));
1816         deleteOPT(t);
1817       }
1818       // thread still in the process (HEAVY BUG! since outport has just been closed...)
1819       if (t->eden.epid != -1) {
1820         IF_PAR_DEBUG(eden_ports,
1821                    debugBelch("WARNING: Scheduler removes TSO %d from process %d .\n",
1822                            t->id, t->eden.epid));
1823         removeTSOfromProcess(t);
1824       }
1825 # endif 
1826
1827 # if defined(PAR)
1828       if (RtsFlags.ParFlags.ParStats.Full &&
1829           !RtsFlags.ParFlags.ParStats.Suppressed) 
1830         DumpEndEvent(CURRENT_PROC, t, rtsFalse /* not mandatory */);
1831
1832       //  t->par only contains statistics: left out for now...
1833       IF_PAR_DEBUG(fish,
1834                    debugBelch("**** end thread: ended sparked thread %d (%lx); sparkname: %lx\n",
1835                               t->id,t,t->par.sparkname));
1836 # endif
1837 #endif // PARALLEL_HASKELL
1838
1839       //
1840       // Check whether the thread that just completed was a bound
1841       // thread, and if so return with the result.  
1842       //
1843       // There is an assumption here that all thread completion goes
1844       // through this point; we need to make sure that if a thread
1845       // ends up in the ThreadKilled state, that it stays on the run
1846       // queue so it can be dealt with here.
1847       //
1848
1849       if (t->bound) {
1850
1851           if (t->bound != task) {
1852 #if !defined(THREADED_RTS)
1853               // Must be a bound thread that is not the topmost one.  Leave
1854               // it on the run queue until the stack has unwound to the
1855               // point where we can deal with this.  Leaving it on the run
1856               // queue also ensures that the garbage collector knows about
1857               // this thread and its return value (it gets dropped from the
1858               // all_threads list so there's no other way to find it).
1859               appendToRunQueue(cap,t);
1860               return rtsFalse;
1861 #else
1862               // this cannot happen in the threaded RTS, because a
1863               // bound thread can only be run by the appropriate Task.
1864               barf("finished bound thread that isn't mine");
1865 #endif
1866           }
1867
1868           ASSERT(task->tso == t);
1869
1870           if (t->what_next == ThreadComplete) {
1871               if (task->ret) {
1872                   // NOTE: return val is tso->sp[1] (see StgStartup.hc)
1873                   *(task->ret) = (StgClosure *)task->tso->sp[1]; 
1874               }
1875               task->stat = Success;
1876           } else {
1877               if (task->ret) {
1878                   *(task->ret) = NULL;
1879               }
1880               if (sched_state >= SCHED_INTERRUPTING) {
1881                   task->stat = Interrupted;
1882               } else {
1883                   task->stat = Killed;
1884               }
1885           }
1886 #ifdef DEBUG
1887           removeThreadLabel((StgWord)task->tso->id);
1888 #endif
1889           return rtsTrue; // tells schedule() to return
1890       }
1891
1892       return rtsFalse;
1893 }
1894
1895 /* -----------------------------------------------------------------------------
1896  * Perform a heap census, if PROFILING
1897  * -------------------------------------------------------------------------- */
1898
1899 static rtsBool
1900 scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED )
1901 {
1902 #if defined(PROFILING)
1903     // When we have +RTS -i0 and we're heap profiling, do a census at
1904     // every GC.  This lets us get repeatable runs for debugging.
1905     if (performHeapProfile ||
1906         (RtsFlags.ProfFlags.profileInterval==0 &&
1907          RtsFlags.ProfFlags.doHeapProfile && ready_to_gc)) {
1908
1909         // checking black holes is necessary before GC, otherwise
1910         // there may be threads that are unreachable except by the
1911         // blackhole queue, which the GC will consider to be
1912         // deadlocked.
1913         scheduleCheckBlackHoles(&MainCapability);
1914
1915         IF_DEBUG(scheduler, sched_belch("garbage collecting before heap census"));
1916         GarbageCollect(GetRoots, rtsTrue);
1917
1918         IF_DEBUG(scheduler, sched_belch("performing heap census"));
1919         heapCensus();
1920
1921         performHeapProfile = rtsFalse;
1922         return rtsTrue;  // true <=> we already GC'd
1923     }
1924 #endif
1925     return rtsFalse;
1926 }
1927
1928 /* -----------------------------------------------------------------------------
1929  * Perform a garbage collection if necessary
1930  * -------------------------------------------------------------------------- */
1931
1932 static Capability *
1933 scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS,
1934               rtsBool force_major, void (*get_roots)(evac_fn))
1935 {
1936     StgTSO *t;
1937 #ifdef THREADED_RTS
1938     static volatile StgWord waiting_for_gc;
1939     rtsBool was_waiting;
1940     nat i;
1941 #endif
1942
1943 #ifdef THREADED_RTS
1944     // In order to GC, there must be no threads running Haskell code.
1945     // Therefore, the GC thread needs to hold *all* the capabilities,
1946     // and release them after the GC has completed.  
1947     //
1948     // This seems to be the simplest way: previous attempts involved
1949     // making all the threads with capabilities give up their
1950     // capabilities and sleep except for the *last* one, which
1951     // actually did the GC.  But it's quite hard to arrange for all
1952     // the other tasks to sleep and stay asleep.
1953     //
1954         
1955     was_waiting = cas(&waiting_for_gc, 0, 1);
1956     if (was_waiting) {
1957         do {
1958             IF_DEBUG(scheduler, sched_belch("someone else is trying to GC..."));
1959             if (cap) yieldCapability(&cap,task);
1960         } while (waiting_for_gc);
1961         return cap;  // NOTE: task->cap might have changed here
1962     }
1963
1964     for (i=0; i < n_capabilities; i++) {
1965         IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities));
1966         if (cap != &capabilities[i]) {
1967             Capability *pcap = &capabilities[i];
1968             // we better hope this task doesn't get migrated to
1969             // another Capability while we're waiting for this one.
1970             // It won't, because load balancing happens while we have
1971             // all the Capabilities, but even so it's a slightly
1972             // unsavoury invariant.
1973             task->cap = pcap;
1974             context_switch = 1;
1975             waitForReturnCapability(&pcap, task);
1976             if (pcap != &capabilities[i]) {
1977                 barf("scheduleDoGC: got the wrong capability");
1978             }
1979         }
1980     }
1981
1982     waiting_for_gc = rtsFalse;
1983 #endif
1984
1985     /* Kick any transactions which are invalid back to their
1986      * atomically frames.  When next scheduled they will try to
1987      * commit, this commit will fail and they will retry.
1988      */
1989     { 
1990         StgTSO *next;
1991
1992         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
1993             if (t->what_next == ThreadRelocated) {
1994                 next = t->link;
1995             } else {
1996                 next = t->global_link;
1997                 if (t -> trec != NO_TREC && t -> why_blocked == NotBlocked) {
1998                     if (!stmValidateNestOfTransactions (t -> trec)) {
1999                         IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t));
2000                         
2001                         // strip the stack back to the
2002                         // ATOMICALLY_FRAME, aborting the (nested)
2003                         // transaction, and saving the stack of any
2004                         // partially-evaluated thunks on the heap.
2005                         raiseAsync_(&capabilities[0], t, NULL, rtsTrue, NULL);
2006                         
2007 #ifdef REG_R1
2008                         ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
2009 #endif
2010                     }
2011                 }
2012             }
2013         }
2014     }
2015     
2016     // so this happens periodically:
2017     if (cap) scheduleCheckBlackHoles(cap);
2018     
2019     IF_DEBUG(scheduler, printAllThreads());
2020
2021     /*
2022      * We now have all the capabilities; if we're in an interrupting
2023      * state, then we should take the opportunity to delete all the
2024      * threads in the system.
2025      */
2026     if (sched_state >= SCHED_INTERRUPTING) {
2027         deleteAllThreads(&capabilities[0]);
2028         sched_state = SCHED_INTERRUPTED;
2029     }
2030
2031     /* everybody back, start the GC.
2032      * Could do it in this thread, or signal a condition var
2033      * to do it in another thread.  Either way, we need to
2034      * broadcast on gc_pending_cond afterward.
2035      */
2036 #if defined(THREADED_RTS)
2037     IF_DEBUG(scheduler,sched_belch("doing GC"));
2038 #endif
2039     GarbageCollect(get_roots, force_major);
2040     
2041 #if defined(THREADED_RTS)
2042     // release our stash of capabilities.
2043     for (i = 0; i < n_capabilities; i++) {
2044         if (cap != &capabilities[i]) {
2045             task->cap = &capabilities[i];
2046             releaseCapability(&capabilities[i]);
2047         }
2048     }
2049     if (cap) {
2050         task->cap = cap;
2051     } else {
2052         task->cap = NULL;
2053     }
2054 #endif
2055
2056 #if defined(GRAN)
2057     /* add a ContinueThread event to continue execution of current thread */
2058     new_event(CurrentProc, CurrentProc, CurrentTime[CurrentProc],
2059               ContinueThread,
2060               t, (StgClosure*)NULL, (rtsSpark*)NULL);
2061     IF_GRAN_DEBUG(bq, 
2062                   debugBelch("GRAN: eventq and runnableq after Garbage collection:\n\n");
2063                   G_EVENTQ(0);
2064                   G_CURR_THREADQ(0));
2065 #endif /* GRAN */
2066
2067     return cap;
2068 }
2069
2070 /* ---------------------------------------------------------------------------
2071  * rtsSupportsBoundThreads(): is the RTS built to support bound threads?
2072  * used by Control.Concurrent for error checking.
2073  * ------------------------------------------------------------------------- */
2074  
2075 StgBool
2076 rtsSupportsBoundThreads(void)
2077 {
2078 #if defined(THREADED_RTS)
2079   return rtsTrue;
2080 #else
2081   return rtsFalse;
2082 #endif
2083 }
2084
2085 /* ---------------------------------------------------------------------------
2086  * isThreadBound(tso): check whether tso is bound to an OS thread.
2087  * ------------------------------------------------------------------------- */
2088  
2089 StgBool
2090 isThreadBound(StgTSO* tso USED_IF_THREADS)
2091 {
2092 #if defined(THREADED_RTS)
2093   return (tso->bound != NULL);
2094 #endif
2095   return rtsFalse;
2096 }
2097
2098 /* ---------------------------------------------------------------------------
2099  * Singleton fork(). Do not copy any running threads.
2100  * ------------------------------------------------------------------------- */
2101
2102 #if !defined(mingw32_HOST_OS)
2103 #define FORKPROCESS_PRIMOP_SUPPORTED
2104 #endif
2105
2106 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2107 static void 
2108 deleteThread_(Capability *cap, StgTSO *tso);
2109 #endif
2110 StgInt
2111 forkProcess(HsStablePtr *entry
2112 #ifndef FORKPROCESS_PRIMOP_SUPPORTED
2113             STG_UNUSED
2114 #endif
2115            )
2116 {
2117 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
2118     Task *task;
2119     pid_t pid;
2120     StgTSO* t,*next;
2121     Capability *cap;
2122     
2123 #if defined(THREADED_RTS)
2124     if (RtsFlags.ParFlags.nNodes > 1) {
2125         errorBelch("forking not supported with +RTS -N<n> greater than 1");
2126         stg_exit(EXIT_FAILURE);
2127     }
2128 #endif
2129
2130     IF_DEBUG(scheduler,sched_belch("forking!"));
2131     
2132     // ToDo: for SMP, we should probably acquire *all* the capabilities
2133     cap = rts_lock();
2134     
2135     pid = fork();
2136     
2137     if (pid) { // parent
2138         
2139         // just return the pid
2140         rts_unlock(cap);
2141         return pid;
2142         
2143     } else { // child
2144         
2145         // Now, all OS threads except the thread that forked are
2146         // stopped.  We need to stop all Haskell threads, including
2147         // those involved in foreign calls.  Also we need to delete
2148         // all Tasks, because they correspond to OS threads that are
2149         // now gone.
2150
2151         for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2152             next = t->global_link;
2153             // don't allow threads to catch the ThreadKilled
2154             // exception, but we do want to raiseAsync() because these
2155             // threads may be evaluating thunks that we need later.
2156             deleteThread_(cap,t);
2157         }
2158         
2159         // Empty the run queue.  It seems tempting to let all the
2160         // killed threads stay on the run queue as zombies to be
2161         // cleaned up later, but some of them correspond to bound
2162         // threads for which the corresponding Task does not exist.
2163         cap->run_queue_hd = END_TSO_QUEUE;
2164         cap->run_queue_tl = END_TSO_QUEUE;
2165
2166         // Any suspended C-calling Tasks are no more, their OS threads
2167         // don't exist now:
2168         cap->suspended_ccalling_tasks = NULL;
2169
2170         // Empty the all_threads list.  Otherwise, the garbage
2171         // collector may attempt to resurrect some of these threads.
2172         all_threads = END_TSO_QUEUE;
2173
2174         // Wipe the task list, except the current Task.
2175         ACQUIRE_LOCK(&sched_mutex);
2176         for (task = all_tasks; task != NULL; task=task->all_link) {
2177             if (task != cap->running_task) {
2178                 discardTask(task);
2179             }
2180         }
2181         RELEASE_LOCK(&sched_mutex);
2182
2183 #if defined(THREADED_RTS)
2184         // Wipe our spare workers list, they no longer exist.  New
2185         // workers will be created if necessary.
2186         cap->spare_workers = NULL;
2187         cap->returning_tasks_hd = NULL;
2188         cap->returning_tasks_tl = NULL;
2189 #endif
2190
2191         cap = rts_evalStableIO(cap, entry, NULL);  // run the action
2192         rts_checkSchedStatus("forkProcess",cap);
2193         
2194         rts_unlock(cap);
2195         hs_exit();                      // clean up and exit
2196         stg_exit(EXIT_SUCCESS);
2197     }
2198 #else /* !FORKPROCESS_PRIMOP_SUPPORTED */
2199     barf("forkProcess#: primop not supported on this platform, sorry!\n");
2200     return -1;
2201 #endif
2202 }
2203
2204 /* ---------------------------------------------------------------------------
2205  * Delete all the threads in the system
2206  * ------------------------------------------------------------------------- */
2207    
2208 static void
2209 deleteAllThreads ( Capability *cap )
2210 {
2211   StgTSO* t, *next;
2212   IF_DEBUG(scheduler,sched_belch("deleting all threads"));
2213   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
2214       if (t->what_next == ThreadRelocated) {
2215           next = t->link;
2216       } else {
2217           next = t->global_link;
2218           deleteThread(cap,t);
2219       }
2220   }      
2221
2222   // The run queue now contains a bunch of ThreadKilled threads.  We
2223   // must not throw these away: the main thread(s) will be in there
2224   // somewhere, and the main scheduler loop has to deal with it.
2225   // Also, the run queue is the only thing keeping these threads from
2226   // being GC'd, and we don't want the "main thread has been GC'd" panic.
2227
2228 #if !defined(THREADED_RTS)
2229   ASSERT(blocked_queue_hd == END_TSO_QUEUE);
2230   ASSERT(sleeping_queue == END_TSO_QUEUE);
2231 #endif
2232 }
2233
2234 /* -----------------------------------------------------------------------------
2235    Managing the suspended_ccalling_tasks list.
2236    Locks required: sched_mutex
2237    -------------------------------------------------------------------------- */
2238
2239 STATIC_INLINE void
2240 suspendTask (Capability *cap, Task *task)
2241 {
2242     ASSERT(task->next == NULL && task->prev == NULL);
2243     task->next = cap->suspended_ccalling_tasks;
2244     task->prev = NULL;
2245     if (cap->suspended_ccalling_tasks) {
2246         cap->suspended_ccalling_tasks->prev = task;
2247     }
2248     cap->suspended_ccalling_tasks = task;
2249 }
2250
2251 STATIC_INLINE void
2252 recoverSuspendedTask (Capability *cap, Task *task)
2253 {
2254     if (task->prev) {
2255         task->prev->next = task->next;
2256     } else {
2257         ASSERT(cap->suspended_ccalling_tasks == task);
2258         cap->suspended_ccalling_tasks = task->next;
2259     }
2260     if (task->next) {
2261         task->next->prev = task->prev;
2262     }
2263     task->next = task->prev = NULL;
2264 }
2265
2266 /* ---------------------------------------------------------------------------
2267  * Suspending & resuming Haskell threads.
2268  * 
2269  * When making a "safe" call to C (aka _ccall_GC), the task gives back
2270  * its capability before calling the C function.  This allows another
2271  * task to pick up the capability and carry on running Haskell
2272  * threads.  It also means that if the C call blocks, it won't lock
2273  * the whole system.
2274  *
2275  * The Haskell thread making the C call is put to sleep for the
2276  * duration of the call, on the susepended_ccalling_threads queue.  We
2277  * give out a token to the task, which it can use to resume the thread
2278  * on return from the C function.
2279  * ------------------------------------------------------------------------- */
2280    
2281 void *
2282 suspendThread (StgRegTable *reg)
2283 {
2284   Capability *cap;
2285   int saved_errno = errno;
2286   StgTSO *tso;
2287   Task *task;
2288
2289   /* assume that *reg is a pointer to the StgRegTable part of a Capability.
2290    */
2291   cap = regTableToCapability(reg);
2292
2293   task = cap->running_task;
2294   tso = cap->r.rCurrentTSO;
2295
2296   IF_DEBUG(scheduler,
2297            sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id));
2298
2299   // XXX this might not be necessary --SDM
2300   tso->what_next = ThreadRunGHC;
2301
2302   threadPaused(cap,tso);
2303
2304   if(tso->blocked_exceptions == NULL)  {
2305       tso->why_blocked = BlockedOnCCall;
2306       tso->blocked_exceptions = END_TSO_QUEUE;
2307   } else {
2308       tso->why_blocked = BlockedOnCCall_NoUnblockExc;
2309   }
2310
2311   // Hand back capability
2312   task->suspended_tso = tso;
2313
2314   ACQUIRE_LOCK(&cap->lock);
2315
2316   suspendTask(cap,task);
2317   cap->in_haskell = rtsFalse;
2318   releaseCapability_(cap);
2319   
2320   RELEASE_LOCK(&cap->lock);
2321
2322 #if defined(THREADED_RTS)
2323   /* Preparing to leave the RTS, so ensure there's a native thread/task
2324      waiting to take over.
2325   */
2326   IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id));
2327 #endif
2328
2329   errno = saved_errno;
2330   return task;
2331 }
2332
2333 StgRegTable *
2334 resumeThread (void *task_)
2335 {
2336     StgTSO *tso;
2337     Capability *cap;
2338     int saved_errno = errno;
2339     Task *task = task_;
2340
2341     cap = task->cap;
2342     // Wait for permission to re-enter the RTS with the result.
2343     waitForReturnCapability(&cap,task);
2344     // we might be on a different capability now... but if so, our
2345     // entry on the suspended_ccalling_tasks list will also have been
2346     // migrated.
2347
2348     // Remove the thread from the suspended list
2349     recoverSuspendedTask(cap,task);
2350
2351     tso = task->suspended_tso;
2352     task->suspended_tso = NULL;
2353     tso->link = END_TSO_QUEUE;
2354     IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id));
2355     
2356     if (tso->why_blocked == BlockedOnCCall) {
2357         awakenBlockedQueue(cap,tso->blocked_exceptions);
2358         tso->blocked_exceptions = NULL;
2359     }
2360     
2361     /* Reset blocking status */
2362     tso->why_blocked  = NotBlocked;
2363     
2364     cap->r.rCurrentTSO = tso;
2365     cap->in_haskell = rtsTrue;
2366     errno = saved_errno;
2367
2368     /* We might have GC'd, mark the TSO dirty again */
2369     dirtyTSO(tso);
2370
2371     IF_DEBUG(sanity, checkTSO(tso));
2372
2373     return &cap->r;
2374 }
2375
2376 /* ---------------------------------------------------------------------------
2377  * Comparing Thread ids.
2378  *
2379  * This is used from STG land in the implementation of the
2380  * instances of Eq/Ord for ThreadIds.
2381  * ------------------------------------------------------------------------ */
2382
2383 int
2384 cmp_thread(StgPtr tso1, StgPtr tso2) 
2385
2386   StgThreadID id1 = ((StgTSO *)tso1)->id; 
2387   StgThreadID id2 = ((StgTSO *)tso2)->id;
2388  
2389   if (id1 < id2) return (-1);
2390   if (id1 > id2) return 1;
2391   return 0;
2392 }
2393
2394 /* ---------------------------------------------------------------------------
2395  * Fetching the ThreadID from an StgTSO.
2396  *
2397  * This is used in the implementation of Show for ThreadIds.
2398  * ------------------------------------------------------------------------ */
2399 int
2400 rts_getThreadId(StgPtr tso) 
2401 {
2402   return ((StgTSO *)tso)->id;
2403 }
2404
2405 #ifdef DEBUG
2406 void
2407 labelThread(StgPtr tso, char *label)
2408 {
2409   int len;
2410   void *buf;
2411
2412   /* Caveat: Once set, you can only set the thread name to "" */
2413   len = strlen(label)+1;
2414   buf = stgMallocBytes(len * sizeof(char), "Schedule.c:labelThread()");
2415   strncpy(buf,label,len);
2416   /* Update will free the old memory for us */
2417   updateThreadLabel(((StgTSO *)tso)->id,buf);
2418 }
2419 #endif /* DEBUG */
2420
2421 /* ---------------------------------------------------------------------------
2422    Create a new thread.
2423
2424    The new thread starts with the given stack size.  Before the
2425    scheduler can run, however, this thread needs to have a closure
2426    (and possibly some arguments) pushed on its stack.  See
2427    pushClosure() in Schedule.h.
2428
2429    createGenThread() and createIOThread() (in SchedAPI.h) are
2430    convenient packaged versions of this function.
2431
2432    currently pri (priority) is only used in a GRAN setup -- HWL
2433    ------------------------------------------------------------------------ */
2434 #if defined(GRAN)
2435 /*   currently pri (priority) is only used in a GRAN setup -- HWL */
2436 StgTSO *
2437 createThread(nat size, StgInt pri)
2438 #else
2439 StgTSO *
2440 createThread(Capability *cap, nat size)
2441 #endif
2442 {
2443     StgTSO *tso;
2444     nat stack_size;
2445
2446     /* sched_mutex is *not* required */
2447
2448     /* First check whether we should create a thread at all */
2449 #if defined(PARALLEL_HASKELL)
2450     /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */
2451     if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) {
2452         threadsIgnored++;
2453         debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n",
2454                    RtsFlags.ParFlags.maxThreads, advisory_thread_count);
2455         return END_TSO_QUEUE;
2456     }
2457     threadsCreated++;
2458 #endif
2459
2460 #if defined(GRAN)
2461     ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0);
2462 #endif
2463
2464     // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW
2465
2466     /* catch ridiculously small stack sizes */
2467     if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) {
2468         size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW;
2469     }
2470
2471     stack_size = size - TSO_STRUCT_SIZEW;
2472     
2473     tso = (StgTSO *)allocateLocal(cap, size);
2474     TICK_ALLOC_TSO(stack_size, 0);
2475
2476     SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM);
2477 #if defined(GRAN)
2478     SET_GRAN_HDR(tso, ThisPE);
2479 #endif
2480
2481     // Always start with the compiled code evaluator
2482     tso->what_next = ThreadRunGHC;
2483
2484     tso->why_blocked  = NotBlocked;
2485     tso->blocked_exceptions = NULL;
2486     tso->flags = TSO_DIRTY;
2487     
2488     tso->saved_errno = 0;
2489     tso->bound = NULL;
2490     
2491     tso->stack_size     = stack_size;
2492     tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) 
2493                           - TSO_STRUCT_SIZEW;
2494     tso->sp             = (P_)&(tso->stack) + stack_size;
2495
2496     tso->trec = NO_TREC;
2497     
2498 #ifdef PROFILING
2499     tso->prof.CCCS = CCS_MAIN;
2500 #endif
2501     
2502   /* put a stop frame on the stack */
2503     tso->sp -= sizeofW(StgStopFrame);
2504     SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM);
2505     tso->link = END_TSO_QUEUE;
2506     
2507   // ToDo: check this
2508 #if defined(GRAN)
2509     /* uses more flexible routine in GranSim */
2510     insertThread(tso, CurrentProc);
2511 #else
2512     /* In a non-GranSim setup the pushing of a TSO onto the runq is separated
2513      * from its creation
2514      */
2515 #endif
2516     
2517 #if defined(GRAN) 
2518     if (RtsFlags.GranFlags.GranSimStats.Full) 
2519         DumpGranEvent(GR_START,tso);
2520 #elif defined(PARALLEL_HASKELL)
2521     if (RtsFlags.ParFlags.ParStats.Full) 
2522         DumpGranEvent(GR_STARTQ,tso);
2523     /* HACk to avoid SCHEDULE 
2524        LastTSO = tso; */
2525 #endif
2526     
2527     /* Link the new thread on the global thread list.
2528      */
2529     ACQUIRE_LOCK(&sched_mutex);
2530     tso->id = next_thread_id++;  // while we have the mutex
2531     tso->global_link = all_threads;
2532     all_threads = tso;
2533     RELEASE_LOCK(&sched_mutex);
2534     
2535 #if defined(DIST)
2536     tso->dist.priority = MandatoryPriority; //by default that is...
2537 #endif
2538     
2539 #if defined(GRAN)
2540     tso->gran.pri = pri;
2541 # if defined(DEBUG)
2542     tso->gran.magic = TSO_MAGIC; // debugging only
2543 # endif
2544     tso->gran.sparkname   = 0;
2545     tso->gran.startedat   = CURRENT_TIME; 
2546     tso->gran.exported    = 0;
2547     tso->gran.basicblocks = 0;
2548     tso->gran.allocs      = 0;
2549     tso->gran.exectime    = 0;
2550     tso->gran.fetchtime   = 0;
2551     tso->gran.fetchcount  = 0;
2552     tso->gran.blocktime   = 0;
2553     tso->gran.blockcount  = 0;
2554     tso->gran.blockedat   = 0;
2555     tso->gran.globalsparks = 0;
2556     tso->gran.localsparks  = 0;
2557     if (RtsFlags.GranFlags.Light)
2558         tso->gran.clock  = Now; /* local clock */
2559     else
2560         tso->gran.clock  = 0;
2561     
2562     IF_DEBUG(gran,printTSO(tso));
2563 #elif defined(PARALLEL_HASKELL)
2564 # if defined(DEBUG)
2565     tso->par.magic = TSO_MAGIC; // debugging only
2566 # endif
2567     tso->par.sparkname   = 0;
2568     tso->par.startedat   = CURRENT_TIME; 
2569     tso->par.exported    = 0;
2570     tso->par.basicblocks = 0;
2571     tso->par.allocs      = 0;
2572     tso->par.exectime    = 0;
2573     tso->par.fetchtime   = 0;
2574     tso->par.fetchcount  = 0;
2575     tso->par.blocktime   = 0;
2576     tso->par.blockcount  = 0;
2577     tso->par.blockedat   = 0;
2578     tso->par.globalsparks = 0;
2579     tso->par.localsparks  = 0;
2580 #endif
2581     
2582 #if defined(GRAN)
2583     globalGranStats.tot_threads_created++;
2584     globalGranStats.threads_created_on_PE[CurrentProc]++;
2585     globalGranStats.tot_sq_len += spark_queue_len(CurrentProc);
2586     globalGranStats.tot_sq_probes++;
2587 #elif defined(PARALLEL_HASKELL)
2588     // collect parallel global statistics (currently done together with GC stats)
2589     if (RtsFlags.ParFlags.ParStats.Global &&
2590         RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
2591         //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); 
2592         globalParStats.tot_threads_created++;
2593     }
2594 #endif 
2595     
2596 #if defined(GRAN)
2597     IF_GRAN_DEBUG(pri,
2598                   sched_belch("==__ schedule: Created TSO %d (%p);",
2599                               CurrentProc, tso, tso->id));
2600 #elif defined(PARALLEL_HASKELL)
2601     IF_PAR_DEBUG(verbose,
2602                  sched_belch("==__ schedule: Created TSO %d (%p); %d threads active",
2603                              (long)tso->id, tso, advisory_thread_count));
2604 #else
2605     IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", 
2606                                    (long)tso->id, (long)tso->stack_size));
2607 #endif    
2608     return tso;
2609 }
2610
2611 #if defined(PAR)
2612 /* RFP:
2613    all parallel thread creation calls should fall through the following routine.
2614 */
2615 StgTSO *
2616 createThreadFromSpark(rtsSpark spark) 
2617 { StgTSO *tso;
2618   ASSERT(spark != (rtsSpark)NULL);
2619 // JB: TAKE CARE OF THIS COUNTER! BUGGY
2620   if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) 
2621   { threadsIgnored++;
2622     barf("{createSparkThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)",
2623           RtsFlags.ParFlags.maxThreads, advisory_thread_count);    
2624     return END_TSO_QUEUE;
2625   }
2626   else
2627   { threadsCreated++;
2628     tso = createThread(RtsFlags.GcFlags.initialStkSize);
2629     if (tso==END_TSO_QUEUE)     
2630       barf("createSparkThread: Cannot create TSO");
2631 #if defined(DIST)
2632     tso->priority = AdvisoryPriority;
2633 #endif
2634     pushClosure(tso,spark);
2635     addToRunQueue(tso);
2636     advisory_thread_count++;  // JB: TAKE CARE OF THIS COUNTER! BUGGY
2637   }
2638   return tso;
2639 }
2640 #endif
2641
2642 /*
2643   Turn a spark into a thread.
2644   ToDo: fix for SMP (needs to acquire SCHED_MUTEX!)
2645 */
2646 #if 0
2647 StgTSO *
2648 activateSpark (rtsSpark spark) 
2649 {
2650   StgTSO *tso;
2651
2652   tso = createSparkThread(spark);
2653   if (RtsFlags.ParFlags.ParStats.Full) {   
2654     //ASSERT(run_queue_hd == END_TSO_QUEUE); // I think ...
2655       IF_PAR_DEBUG(verbose,
2656                    debugBelch("==^^ activateSpark: turning spark of closure %p (%s) into a thread\n",
2657                               (StgClosure *)spark, info_type((StgClosure *)spark)));
2658   }
2659   // ToDo: fwd info on local/global spark to thread -- HWL
2660   // tso->gran.exported =  spark->exported;
2661   // tso->gran.locked =   !spark->global;
2662   // tso->gran.sparkname = spark->name;
2663
2664   return tso;
2665 }
2666 #endif
2667
2668 /* ---------------------------------------------------------------------------
2669  * scheduleThread()
2670  *
2671  * scheduleThread puts a thread on the end  of the runnable queue.
2672  * This will usually be done immediately after a thread is created.
2673  * The caller of scheduleThread must create the thread using e.g.
2674  * createThread and push an appropriate closure
2675  * on this thread's stack before the scheduler is invoked.
2676  * ------------------------------------------------------------------------ */
2677
2678 void
2679 scheduleThread(Capability *cap, StgTSO *tso)
2680 {
2681     // The thread goes at the *end* of the run-queue, to avoid possible
2682     // starvation of any threads already on the queue.
2683     appendToRunQueue(cap,tso);
2684 }
2685
2686 Capability *
2687 scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
2688 {
2689     Task *task;
2690
2691     // We already created/initialised the Task
2692     task = cap->running_task;
2693
2694     // This TSO is now a bound thread; make the Task and TSO
2695     // point to each other.
2696     tso->bound = task;
2697
2698     task->tso = tso;
2699     task->ret = ret;
2700     task->stat = NoStatus;
2701
2702     appendToRunQueue(cap,tso);
2703
2704     IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id));
2705
2706 #if defined(GRAN)
2707     /* GranSim specific init */
2708     CurrentTSO = m->tso;                // the TSO to run
2709     procStatus[MainProc] = Busy;        // status of main PE
2710     CurrentProc = MainProc;             // PE to run it on
2711 #endif
2712
2713     cap = schedule(cap,task);
2714
2715     ASSERT(task->stat != NoStatus);
2716     ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
2717
2718     IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id));
2719     return cap;
2720 }
2721
2722 /* ----------------------------------------------------------------------------
2723  * Starting Tasks
2724  * ------------------------------------------------------------------------- */
2725
2726 #if defined(THREADED_RTS)
2727 void
2728 workerStart(Task *task)
2729 {
2730     Capability *cap;
2731
2732     // See startWorkerTask().
2733     ACQUIRE_LOCK(&task->lock);
2734     cap = task->cap;
2735     RELEASE_LOCK(&task->lock);
2736
2737     // set the thread-local pointer to the Task:
2738     taskEnter(task);
2739
2740     // schedule() runs without a lock.
2741     cap = schedule(cap,task);
2742
2743     // On exit from schedule(), we have a Capability.
2744     releaseCapability(cap);
2745     taskStop(task);
2746 }
2747 #endif
2748
2749 /* ---------------------------------------------------------------------------
2750  * initScheduler()
2751  *
2752  * Initialise the scheduler.  This resets all the queues - if the
2753  * queues contained any threads, they'll be garbage collected at the
2754  * next pass.
2755  *
2756  * ------------------------------------------------------------------------ */
2757
2758 void 
2759 initScheduler(void)
2760 {
2761 #if defined(GRAN)
2762   nat i;
2763   for (i=0; i<=MAX_PROC; i++) {
2764     run_queue_hds[i]      = END_TSO_QUEUE;
2765     run_queue_tls[i]      = END_TSO_QUEUE;
2766     blocked_queue_hds[i]  = END_TSO_QUEUE;
2767     blocked_queue_tls[i]  = END_TSO_QUEUE;
2768     ccalling_threadss[i]  = END_TSO_QUEUE;
2769     blackhole_queue[i]    = END_TSO_QUEUE;
2770     sleeping_queue        = END_TSO_QUEUE;
2771   }
2772 #elif !defined(THREADED_RTS)
2773   blocked_queue_hd  = END_TSO_QUEUE;
2774   blocked_queue_tl  = END_TSO_QUEUE;
2775   sleeping_queue    = END_TSO_QUEUE;
2776 #endif
2777
2778   blackhole_queue   = END_TSO_QUEUE;
2779   all_threads       = END_TSO_QUEUE;
2780
2781   context_switch = 0;
2782   sched_state    = SCHED_RUNNING;
2783
2784   RtsFlags.ConcFlags.ctxtSwitchTicks =
2785       RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS;
2786       
2787 #if defined(THREADED_RTS)
2788   /* Initialise the mutex and condition variables used by
2789    * the scheduler. */
2790   initMutex(&sched_mutex);
2791 #endif
2792   
2793   ACQUIRE_LOCK(&sched_mutex);
2794
2795   /* A capability holds the state a native thread needs in
2796    * order to execute STG code. At least one capability is
2797    * floating around (only THREADED_RTS builds have more than one).
2798    */
2799   initCapabilities();
2800
2801   initTaskManager();
2802
2803 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
2804   initSparkPools();
2805 #endif
2806
2807 #if defined(THREADED_RTS)
2808   /*
2809    * Eagerly start one worker to run each Capability, except for
2810    * Capability 0.  The idea is that we're probably going to start a
2811    * bound thread on Capability 0 pretty soon, so we don't want a
2812    * worker task hogging it.
2813    */
2814   { 
2815       nat i;
2816       Capability *cap;
2817       for (i = 1; i < n_capabilities; i++) {
2818           cap = &capabilities[i];
2819           ACQUIRE_LOCK(&cap->lock);
2820           startWorkerTask(cap, workerStart);
2821           RELEASE_LOCK(&cap->lock);
2822       }
2823   }
2824 #endif
2825
2826   RELEASE_LOCK(&sched_mutex);
2827 }
2828
2829 void
2830 exitScheduler( void )
2831 {
2832     Task *task = NULL;
2833
2834 #if defined(THREADED_RTS)
2835     ACQUIRE_LOCK(&sched_mutex);
2836     task = newBoundTask();
2837     RELEASE_LOCK(&sched_mutex);
2838 #endif
2839
2840     // If we haven't killed all the threads yet, do it now.
2841     if (sched_state < SCHED_INTERRUPTED) {
2842         sched_state = SCHED_INTERRUPTING;
2843         scheduleDoGC(NULL,task,rtsFalse,GetRoots);    
2844     }
2845     sched_state = SCHED_SHUTTING_DOWN;
2846
2847 #if defined(THREADED_RTS)
2848     { 
2849         nat i;
2850         
2851         for (i = 0; i < n_capabilities; i++) {
2852             shutdownCapability(&capabilities[i], task);
2853         }
2854         boundTaskExiting(task);
2855         stopTaskManager();
2856     }
2857 #endif
2858 }
2859
2860 /* ---------------------------------------------------------------------------
2861    Where are the roots that we know about?
2862
2863         - all the threads on the runnable queue
2864         - all the threads on the blocked queue
2865         - all the threads on the sleeping queue
2866         - all the thread currently executing a _ccall_GC
2867         - all the "main threads"
2868      
2869    ------------------------------------------------------------------------ */
2870
2871 /* This has to be protected either by the scheduler monitor, or by the
2872         garbage collection monitor (probably the latter).
2873         KH @ 25/10/99
2874 */
2875
2876 void
2877 GetRoots( evac_fn evac )
2878 {
2879     nat i;
2880     Capability *cap;
2881     Task *task;
2882
2883 #if defined(GRAN)
2884     for (i=0; i<=RtsFlags.GranFlags.proc; i++) {
2885         if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL)))
2886             evac((StgClosure **)&run_queue_hds[i]);
2887         if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL)))
2888             evac((StgClosure **)&run_queue_tls[i]);
2889         
2890         if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL)))
2891             evac((StgClosure **)&blocked_queue_hds[i]);
2892         if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL)))
2893             evac((StgClosure **)&blocked_queue_tls[i]);
2894         if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL)))
2895             evac((StgClosure **)&ccalling_threads[i]);
2896     }
2897
2898     markEventQueue();
2899
2900 #else /* !GRAN */
2901
2902     for (i = 0; i < n_capabilities; i++) {
2903         cap = &capabilities[i];
2904         evac((StgClosure **)&cap->run_queue_hd);
2905         evac((StgClosure **)&cap->run_queue_tl);
2906         
2907         for (task = cap->suspended_ccalling_tasks; task != NULL; 
2908              task=task->next) {
2909             IF_DEBUG(scheduler,sched_belch("evac'ing suspended TSO %d", task->suspended_tso->id));
2910             evac((StgClosure **)&task->suspended_tso);
2911         }
2912     }
2913     
2914 #if !defined(THREADED_RTS)
2915     evac((StgClosure **)(void *)&blocked_queue_hd);
2916     evac((StgClosure **)(void *)&blocked_queue_tl);
2917     evac((StgClosure **)(void *)&sleeping_queue);
2918 #endif 
2919 #endif
2920
2921     // evac((StgClosure **)&blackhole_queue);
2922
2923 #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) || defined(GRAN)
2924     markSparkQueue(evac);
2925 #endif
2926     
2927 #if defined(RTS_USER_SIGNALS)
2928     // mark the signal handlers (signals should be already blocked)
2929     markSignalHandlers(evac);
2930 #endif
2931 }
2932
2933 /* -----------------------------------------------------------------------------
2934    performGC
2935
2936    This is the interface to the garbage collector from Haskell land.
2937    We provide this so that external C code can allocate and garbage
2938    collect when called from Haskell via _ccall_GC.
2939
2940    It might be useful to provide an interface whereby the programmer
2941    can specify more roots (ToDo).
2942    
2943    This needs to be protected by the GC condition variable above.  KH.
2944    -------------------------------------------------------------------------- */
2945
2946 static void (*extra_roots)(evac_fn);
2947
2948 static void
2949 performGC_(rtsBool force_major, void (*get_roots)(evac_fn))
2950 {
2951     Task *task = myTask();
2952
2953     if (task == NULL) {
2954         ACQUIRE_LOCK(&sched_mutex);
2955         task = newBoundTask();
2956         RELEASE_LOCK(&sched_mutex);
2957         scheduleDoGC(NULL,task,force_major, get_roots);
2958         boundTaskExiting(task);
2959     } else {
2960         scheduleDoGC(NULL,task,force_major, get_roots);
2961     }
2962 }
2963
2964 void
2965 performGC(void)
2966 {
2967     performGC_(rtsFalse, GetRoots);
2968 }
2969
2970 void
2971 performMajorGC(void)
2972 {
2973     performGC_(rtsTrue, GetRoots);
2974 }
2975
2976 static void
2977 AllRoots(evac_fn evac)
2978 {
2979     GetRoots(evac);             // the scheduler's roots
2980     extra_roots(evac);          // the user's roots
2981 }
2982
2983 void
2984 performGCWithRoots(void (*get_roots)(evac_fn))
2985 {
2986     extra_roots = get_roots;
2987     performGC_(rtsFalse, AllRoots);
2988 }
2989
2990 /* -----------------------------------------------------------------------------
2991    Stack overflow
2992
2993    If the thread has reached its maximum stack size, then raise the
2994    StackOverflow exception in the offending thread.  Otherwise
2995    relocate the TSO into a larger chunk of memory and adjust its stack
2996    size appropriately.
2997    -------------------------------------------------------------------------- */
2998
2999 static StgTSO *
3000 threadStackOverflow(Capability *cap, StgTSO *tso)
3001 {
3002   nat new_stack_size, stack_words;
3003   lnat new_tso_size;
3004   StgPtr new_sp;
3005   StgTSO *dest;
3006
3007   IF_DEBUG(sanity,checkTSO(tso));
3008   if (tso->stack_size >= tso->max_stack_size) {
3009
3010     IF_DEBUG(gc,
3011              debugBelch("@@ threadStackOverflow of TSO %ld (%p): stack too large (now %ld; max is %ld)\n",
3012                    (long)tso->id, tso, (long)tso->stack_size, (long)tso->max_stack_size);
3013              /* If we're debugging, just print out the top of the stack */
3014              printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3015                                               tso->sp+64)));
3016
3017     /* Send this thread the StackOverflow exception */
3018     raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure);
3019     return tso;
3020   }
3021
3022   /* Try to double the current stack size.  If that takes us over the
3023    * maximum stack size for this thread, then use the maximum instead.
3024    * Finally round up so the TSO ends up as a whole number of blocks.
3025    */
3026   new_stack_size = stg_min(tso->stack_size * 2, tso->max_stack_size);
3027   new_tso_size   = (lnat)BLOCK_ROUND_UP(new_stack_size * sizeof(W_) + 
3028                                        TSO_STRUCT_SIZE)/sizeof(W_);
3029   new_tso_size = round_to_mblocks(new_tso_size);  /* Be MBLOCK-friendly */
3030   new_stack_size = new_tso_size - TSO_STRUCT_SIZEW;
3031
3032   IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", (long)tso->stack_size, new_stack_size));
3033
3034   dest = (StgTSO *)allocate(new_tso_size);
3035   TICK_ALLOC_TSO(new_stack_size,0);
3036
3037   /* copy the TSO block and the old stack into the new area */
3038   memcpy(dest,tso,TSO_STRUCT_SIZE);
3039   stack_words = tso->stack + tso->stack_size - tso->sp;
3040   new_sp = (P_)dest + new_tso_size - stack_words;
3041   memcpy(new_sp, tso->sp, stack_words * sizeof(W_));
3042
3043   /* relocate the stack pointers... */
3044   dest->sp         = new_sp;
3045   dest->stack_size = new_stack_size;
3046         
3047   /* Mark the old TSO as relocated.  We have to check for relocated
3048    * TSOs in the garbage collector and any primops that deal with TSOs.
3049    *
3050    * It's important to set the sp value to just beyond the end
3051    * of the stack, so we don't attempt to scavenge any part of the
3052    * dead TSO's stack.
3053    */
3054   tso->what_next = ThreadRelocated;
3055   tso->link = dest;
3056   tso->sp = (P_)&(tso->stack[tso->stack_size]);
3057   tso->why_blocked = NotBlocked;
3058
3059   IF_PAR_DEBUG(verbose,
3060                debugBelch("@@ threadStackOverflow of TSO %d (now at %p): stack size increased to %ld\n",
3061                      tso->id, tso, tso->stack_size);
3062                /* If we're debugging, just print out the top of the stack */
3063                printStackChunk(tso->sp, stg_min(tso->stack+tso->stack_size, 
3064                                                 tso->sp+64)));
3065   
3066   IF_DEBUG(sanity,checkTSO(tso));
3067 #if 0
3068   IF_DEBUG(scheduler,printTSO(dest));
3069 #endif
3070
3071   return dest;
3072 }
3073
3074 /* ---------------------------------------------------------------------------
3075    Wake up a queue that was blocked on some resource.
3076    ------------------------------------------------------------------------ */
3077
3078 #if defined(GRAN)
3079 STATIC_INLINE void
3080 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3081 {
3082 }
3083 #elif defined(PARALLEL_HASKELL)
3084 STATIC_INLINE void
3085 unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node )
3086 {
3087   /* write RESUME events to log file and
3088      update blocked and fetch time (depending on type of the orig closure) */
3089   if (RtsFlags.ParFlags.ParStats.Full) {
3090     DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, 
3091                      GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure,
3092                      0, 0 /* spark_queue_len(ADVISORY_POOL) */);
3093     if (emptyRunQueue())
3094       emitSchedule = rtsTrue;
3095
3096     switch (get_itbl(node)->type) {
3097         case FETCH_ME_BQ:
3098           ((StgTSO *)bqe)->par.fetchtime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3099           break;
3100         case RBH:
3101         case FETCH_ME:
3102         case BLACKHOLE_BQ:
3103           ((StgTSO *)bqe)->par.blocktime += CURRENT_TIME-((StgTSO *)bqe)->par.blockedat;
3104           break;
3105 #ifdef DIST
3106         case MVAR:
3107           break;
3108 #endif    
3109         default:
3110           barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue");
3111         }
3112       }
3113 }
3114 #endif
3115
3116 #if defined(GRAN)
3117 StgBlockingQueueElement *
3118 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3119 {
3120     StgTSO *tso;
3121     PEs node_loc, tso_loc;
3122
3123     node_loc = where_is(node); // should be lifted out of loop
3124     tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3125     tso_loc = where_is((StgClosure *)tso);
3126     if (IS_LOCAL_TO(PROCS(node),tso_loc)) { // TSO is local
3127       /* !fake_fetch => TSO is on CurrentProc is same as IS_LOCAL_TO */
3128       ASSERT(CurrentProc!=node_loc || tso_loc==CurrentProc);
3129       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.lunblocktime;
3130       // insertThread(tso, node_loc);
3131       new_event(tso_loc, tso_loc, CurrentTime[CurrentProc],
3132                 ResumeThread,
3133                 tso, node, (rtsSpark*)NULL);
3134       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3135       // len_local++;
3136       // len++;
3137     } else { // TSO is remote (actually should be FMBQ)
3138       CurrentTime[CurrentProc] += RtsFlags.GranFlags.Costs.mpacktime +
3139                                   RtsFlags.GranFlags.Costs.gunblocktime +
3140                                   RtsFlags.GranFlags.Costs.latency;
3141       new_event(tso_loc, CurrentProc, CurrentTime[CurrentProc],
3142                 UnblockThread,
3143                 tso, node, (rtsSpark*)NULL);
3144       tso->link = END_TSO_QUEUE; // overwrite link just to be sure 
3145       // len++;
3146     }
3147     /* the thread-queue-overhead is accounted for in either Resume or UnblockThread */
3148     IF_GRAN_DEBUG(bq,
3149                   debugBelch(" %s TSO %d (%p) [PE %d] (block_info.closure=%p) (next=%p) ,",
3150                           (node_loc==tso_loc ? "Local" : "Global"), 
3151                           tso->id, tso, CurrentProc, tso->block_info.closure, tso->link));
3152     tso->block_info.closure = NULL;
3153     IF_DEBUG(scheduler,debugBelch("-- Waking up thread %ld (%p)\n", 
3154                              tso->id, tso));
3155 }
3156 #elif defined(PARALLEL_HASKELL)
3157 StgBlockingQueueElement *
3158 unblockOne(StgBlockingQueueElement *bqe, StgClosure *node)
3159 {
3160     StgBlockingQueueElement *next;
3161
3162     switch (get_itbl(bqe)->type) {
3163     case TSO:
3164       ASSERT(((StgTSO *)bqe)->why_blocked != NotBlocked);
3165       /* if it's a TSO just push it onto the run_queue */
3166       next = bqe->link;
3167       ((StgTSO *)bqe)->link = END_TSO_QUEUE; // debugging?
3168       APPEND_TO_RUN_QUEUE((StgTSO *)bqe); 
3169       threadRunnable();
3170       unblockCount(bqe, node);
3171       /* reset blocking status after dumping event */
3172       ((StgTSO *)bqe)->why_blocked = NotBlocked;
3173       break;
3174
3175     case BLOCKED_FETCH:
3176       /* if it's a BLOCKED_FETCH put it on the PendingFetches list */
3177       next = bqe->link;
3178       bqe->link = (StgBlockingQueueElement *)PendingFetches;
3179       PendingFetches = (StgBlockedFetch *)bqe;
3180       break;
3181
3182 # if defined(DEBUG)
3183       /* can ignore this case in a non-debugging setup; 
3184          see comments on RBHSave closures above */
3185     case CONSTR:
3186       /* check that the closure is an RBHSave closure */
3187       ASSERT(get_itbl((StgClosure *)bqe) == &stg_RBH_Save_0_info ||
3188              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_1_info ||
3189              get_itbl((StgClosure *)bqe) == &stg_RBH_Save_2_info);
3190       break;
3191
3192     default:
3193       barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n",
3194            get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), 
3195            (StgClosure *)bqe);
3196 # endif
3197     }
3198   IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe)));
3199   return next;
3200 }
3201 #endif
3202
3203 StgTSO *
3204 unblockOne(Capability *cap, StgTSO *tso)
3205 {
3206   StgTSO *next;
3207
3208   ASSERT(get_itbl(tso)->type == TSO);
3209   ASSERT(tso->why_blocked != NotBlocked);
3210   tso->why_blocked = NotBlocked;
3211   next = tso->link;
3212   tso->link = END_TSO_QUEUE;
3213
3214   // We might have just migrated this TSO to our Capability:
3215   if (tso->bound) {
3216       tso->bound->cap = cap;
3217   }
3218
3219   appendToRunQueue(cap,tso);
3220
3221   // we're holding a newly woken thread, make sure we context switch
3222   // quickly so we can migrate it if necessary.
3223   context_switch = 1;
3224   IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id));
3225   return next;
3226 }
3227
3228
3229 #if defined(GRAN)
3230 void 
3231 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3232 {
3233   StgBlockingQueueElement *bqe;
3234   PEs node_loc;
3235   nat len = 0; 
3236
3237   IF_GRAN_DEBUG(bq, 
3238                 debugBelch("##-_ AwBQ for node %p on PE %d @ %ld by TSO %d (%p): \n", \
3239                       node, CurrentProc, CurrentTime[CurrentProc], 
3240                       CurrentTSO->id, CurrentTSO));
3241
3242   node_loc = where_is(node);
3243
3244   ASSERT(q == END_BQ_QUEUE ||
3245          get_itbl(q)->type == TSO ||   // q is either a TSO or an RBHSave
3246          get_itbl(q)->type == CONSTR); // closure (type constructor)
3247   ASSERT(is_unique(node));
3248
3249   /* FAKE FETCH: magically copy the node to the tso's proc;
3250      no Fetch necessary because in reality the node should not have been 
3251      moved to the other PE in the first place
3252   */
3253   if (CurrentProc!=node_loc) {
3254     IF_GRAN_DEBUG(bq, 
3255                   debugBelch("## node %p is on PE %d but CurrentProc is %d (TSO %d); assuming fake fetch and adjusting bitmask (old: %#x)\n",
3256                         node, node_loc, CurrentProc, CurrentTSO->id, 
3257                         // CurrentTSO, where_is(CurrentTSO),
3258                         node->header.gran.procs));
3259     node->header.gran.procs = (node->header.gran.procs) | PE_NUMBER(CurrentProc);
3260     IF_GRAN_DEBUG(bq, 
3261                   debugBelch("## new bitmask of node %p is %#x\n",
3262                         node, node->header.gran.procs));
3263     if (RtsFlags.GranFlags.GranSimStats.Global) {
3264       globalGranStats.tot_fake_fetches++;
3265     }
3266   }
3267
3268   bqe = q;
3269   // ToDo: check: ASSERT(CurrentProc==node_loc);
3270   while (get_itbl(bqe)->type==TSO) { // q != END_TSO_QUEUE) {
3271     //next = bqe->link;
3272     /* 
3273        bqe points to the current element in the queue
3274        next points to the next element in the queue
3275     */
3276     //tso = (StgTSO *)bqe;  // wastes an assignment to get the type right
3277     //tso_loc = where_is(tso);
3278     len++;
3279     bqe = unblockOne(bqe, node);
3280   }
3281
3282   /* if this is the BQ of an RBH, we have to put back the info ripped out of
3283      the closure to make room for the anchor of the BQ */
3284   if (bqe!=END_BQ_QUEUE) {
3285     ASSERT(get_itbl(node)->type == RBH && get_itbl(bqe)->type == CONSTR);
3286     /*
3287     ASSERT((info_ptr==&RBH_Save_0_info) ||
3288            (info_ptr==&RBH_Save_1_info) ||
3289            (info_ptr==&RBH_Save_2_info));
3290     */
3291     /* cf. convertToRBH in RBH.c for writing the RBHSave closure */
3292     ((StgRBH *)node)->blocking_queue = (StgBlockingQueueElement *)((StgRBHSave *)bqe)->payload[0];
3293     ((StgRBH *)node)->mut_link       = (StgMutClosure *)((StgRBHSave *)bqe)->payload[1];
3294
3295     IF_GRAN_DEBUG(bq,
3296                   debugBelch("## Filled in RBH_Save for %p (%s) at end of AwBQ\n",
3297                         node, info_type(node)));
3298   }
3299
3300   /* statistics gathering */
3301   if (RtsFlags.GranFlags.GranSimStats.Global) {
3302     // globalGranStats.tot_bq_processing_time += bq_processing_time;
3303     globalGranStats.tot_bq_len += len;      // total length of all bqs awakened
3304     // globalGranStats.tot_bq_len_local += len_local;  // same for local TSOs only
3305     globalGranStats.tot_awbq++;             // total no. of bqs awakened
3306   }
3307   IF_GRAN_DEBUG(bq,
3308                 debugBelch("## BQ Stats of %p: [%d entries] %s\n",
3309                         node, len, (bqe!=END_BQ_QUEUE) ? "RBH" : ""));
3310 }
3311 #elif defined(PARALLEL_HASKELL)
3312 void 
3313 awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node)
3314 {
3315   StgBlockingQueueElement *bqe;
3316
3317   IF_PAR_DEBUG(verbose, 
3318                debugBelch("##-_ AwBQ for node %p on [%x]: \n",
3319                      node, mytid));
3320 #ifdef DIST  
3321   //RFP
3322   if(get_itbl(q)->type == CONSTR || q==END_BQ_QUEUE) {
3323     IF_PAR_DEBUG(verbose, debugBelch("## ... nothing to unblock so lets just return. RFP (BUG?)\n"));
3324     return;
3325   }
3326 #endif
3327   
3328   ASSERT(q == END_BQ_QUEUE ||
3329          get_itbl(q)->type == TSO ||           
3330          get_itbl(q)->type == BLOCKED_FETCH || 
3331          get_itbl(q)->type == CONSTR); 
3332
3333   bqe = q;
3334   while (get_itbl(bqe)->type==TSO || 
3335          get_itbl(bqe)->type==BLOCKED_FETCH) {
3336     bqe = unblockOne(bqe, node);
3337   }
3338 }
3339
3340 #else   /* !GRAN && !PARALLEL_HASKELL */
3341
3342 void
3343 awakenBlockedQueue(Capability *cap, StgTSO *tso)
3344 {
3345     if (tso == NULL) return; // hack; see bug #1235728, and comments in
3346                              // Exception.cmm
3347     while (tso != END_TSO_QUEUE) {
3348         tso = unblockOne(cap,tso);
3349     }
3350 }
3351 #endif
3352
3353 /* ---------------------------------------------------------------------------
3354    Interrupt execution
3355    - usually called inside a signal handler so it mustn't do anything fancy.   
3356    ------------------------------------------------------------------------ */
3357
3358 void
3359 interruptStgRts(void)
3360 {
3361     sched_state = SCHED_INTERRUPTING;
3362     context_switch = 1;
3363 #if defined(THREADED_RTS)
3364     prodAllCapabilities();
3365 #endif
3366 }
3367
3368 /* -----------------------------------------------------------------------------
3369    Unblock a thread
3370
3371    This is for use when we raise an exception in another thread, which
3372    may be blocked.
3373    This has nothing to do with the UnblockThread event in GranSim. -- HWL
3374    -------------------------------------------------------------------------- */
3375
3376 #if defined(GRAN) || defined(PARALLEL_HASKELL)
3377 /*
3378   NB: only the type of the blocking queue is different in GranSim and GUM
3379       the operations on the queue-elements are the same
3380       long live polymorphism!
3381
3382   Locks: sched_mutex is held upon entry and exit.
3383
3384 */
3385 static void
3386 unblockThread(Capability *cap, StgTSO *tso)
3387 {
3388   StgBlockingQueueElement *t, **last;
3389
3390   switch (tso->why_blocked) {
3391
3392   case NotBlocked:
3393     return;  /* not blocked */
3394
3395   case BlockedOnSTM:
3396     // Be careful: nothing to do here!  We tell the scheduler that the thread
3397     // is runnable and we leave it to the stack-walking code to abort the 
3398     // transaction while unwinding the stack.  We should perhaps have a debugging
3399     // test to make sure that this really happens and that the 'zombie' transaction
3400     // does not get committed.
3401     goto done;
3402
3403   case BlockedOnMVar:
3404     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3405     {
3406       StgBlockingQueueElement *last_tso = END_BQ_QUEUE;
3407       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3408
3409       last = (StgBlockingQueueElement **)&mvar->head;
3410       for (t = (StgBlockingQueueElement *)mvar->head; 
3411            t != END_BQ_QUEUE; 
3412            last = &t->link, last_tso = t, t = t->link) {
3413         if (t == (StgBlockingQueueElement *)tso) {
3414           *last = (StgBlockingQueueElement *)tso->link;
3415           if (mvar->tail == tso) {
3416             mvar->tail = (StgTSO *)last_tso;
3417           }
3418           goto done;
3419         }
3420       }
3421       barf("unblockThread (MVAR): TSO not found");
3422     }
3423
3424   case BlockedOnBlackHole:
3425     ASSERT(get_itbl(tso->block_info.closure)->type == BLACKHOLE_BQ);
3426     {
3427       StgBlockingQueue *bq = (StgBlockingQueue *)(tso->block_info.closure);
3428
3429       last = &bq->blocking_queue;
3430       for (t = bq->blocking_queue; 
3431            t != END_BQ_QUEUE; 
3432            last = &t->link, t = t->link) {
3433         if (t == (StgBlockingQueueElement *)tso) {
3434           *last = (StgBlockingQueueElement *)tso->link;
3435           goto done;
3436         }
3437       }
3438       barf("unblockThread (BLACKHOLE): TSO not found");
3439     }
3440
3441   case BlockedOnException:
3442     {
3443       StgTSO *target  = tso->block_info.tso;
3444
3445       ASSERT(get_itbl(target)->type == TSO);
3446
3447       if (target->what_next == ThreadRelocated) {
3448           target = target->link;
3449           ASSERT(get_itbl(target)->type == TSO);
3450       }
3451
3452       ASSERT(target->blocked_exceptions != NULL);
3453
3454       last = (StgBlockingQueueElement **)&target->blocked_exceptions;
3455       for (t = (StgBlockingQueueElement *)target->blocked_exceptions; 
3456            t != END_BQ_QUEUE; 
3457            last = &t->link, t = t->link) {
3458         ASSERT(get_itbl(t)->type == TSO);
3459         if (t == (StgBlockingQueueElement *)tso) {
3460           *last = (StgBlockingQueueElement *)tso->link;
3461           goto done;
3462         }
3463       }
3464       barf("unblockThread (Exception): TSO not found");
3465     }
3466
3467   case BlockedOnRead:
3468   case BlockedOnWrite:
3469 #if defined(mingw32_HOST_OS)
3470   case BlockedOnDoProc:
3471 #endif
3472     {
3473       /* take TSO off blocked_queue */
3474       StgBlockingQueueElement *prev = NULL;
3475       for (t = (StgBlockingQueueElement *)blocked_queue_hd; t != END_BQ_QUEUE; 
3476            prev = t, t = t->link) {
3477         if (t == (StgBlockingQueueElement *)tso) {
3478           if (prev == NULL) {
3479             blocked_queue_hd = (StgTSO *)t->link;
3480             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3481               blocked_queue_tl = END_TSO_QUEUE;
3482             }
3483           } else {
3484             prev->link = t->link;
3485             if ((StgBlockingQueueElement *)blocked_queue_tl == t) {
3486               blocked_queue_tl = (StgTSO *)prev;
3487             }
3488           }
3489 #if defined(mingw32_HOST_OS)
3490           /* (Cooperatively) signal that the worker thread should abort
3491            * the request.
3492            */
3493           abandonWorkRequest(tso->block_info.async_result->reqID);
3494 #endif
3495           goto done;
3496         }
3497       }
3498       barf("unblockThread (I/O): TSO not found");
3499     }
3500
3501   case BlockedOnDelay:
3502     {
3503       /* take TSO off sleeping_queue */
3504       StgBlockingQueueElement *prev = NULL;
3505       for (t = (StgBlockingQueueElement *)sleeping_queue; t != END_BQ_QUEUE; 
3506            prev = t, t = t->link) {
3507         if (t == (StgBlockingQueueElement *)tso) {
3508           if (prev == NULL) {
3509             sleeping_queue = (StgTSO *)t->link;
3510           } else {
3511             prev->link = t->link;
3512           }
3513           goto done;
3514         }
3515       }
3516       barf("unblockThread (delay): TSO not found");
3517     }
3518
3519   default:
3520     barf("unblockThread");
3521   }
3522
3523  done:
3524   tso->link = END_TSO_QUEUE;
3525   tso->why_blocked = NotBlocked;
3526   tso->block_info.closure = NULL;
3527   pushOnRunQueue(cap,tso);
3528 }
3529 #else
3530 static void
3531 unblockThread(Capability *cap, StgTSO *tso)
3532 {
3533   StgTSO *t, **last;
3534   
3535   /* To avoid locking unnecessarily. */
3536   if (tso->why_blocked == NotBlocked) {
3537     return;
3538   }
3539
3540   switch (tso->why_blocked) {
3541
3542   case BlockedOnSTM:
3543     // Be careful: nothing to do here!  We tell the scheduler that the thread
3544     // is runnable and we leave it to the stack-walking code to abort the 
3545     // transaction while unwinding the stack.  We should perhaps have a debugging
3546     // test to make sure that this really happens and that the 'zombie' transaction
3547     // does not get committed.
3548     goto done;
3549
3550   case BlockedOnMVar:
3551     ASSERT(get_itbl(tso->block_info.closure)->type == MVAR);
3552     {
3553       StgTSO *last_tso = END_TSO_QUEUE;
3554       StgMVar *mvar = (StgMVar *)(tso->block_info.closure);
3555
3556       last = &mvar->head;
3557       for (t = mvar->head; t != END_TSO_QUEUE; 
3558            last = &t->link, last_tso = t, t = t->link) {
3559         if (t == tso) {
3560           *last = tso->link;
3561           if (mvar->tail == tso) {
3562             mvar->tail = last_tso;
3563           }
3564           goto done;
3565         }
3566       }
3567       barf("unblockThread (MVAR): TSO not found");
3568     }
3569
3570   case BlockedOnBlackHole:
3571     {
3572       last = &blackhole_queue;
3573       for (t = blackhole_queue; t != END_TSO_QUEUE; 
3574            last = &t->link, t = t->link) {
3575         if (t == tso) {
3576           *last = tso->link;
3577           goto done;
3578         }
3579       }
3580       barf("unblockThread (BLACKHOLE): TSO not found");
3581     }
3582
3583   case BlockedOnException:
3584     {
3585       StgTSO *target  = tso->block_info.tso;
3586
3587       ASSERT(get_itbl(target)->type == TSO);
3588
3589       while (target->what_next == ThreadRelocated) {
3590           target = target->link;
3591           ASSERT(get_itbl(target)->type == TSO);
3592       }
3593       
3594       ASSERT(target->blocked_exceptions != NULL);
3595
3596       last = &target->blocked_exceptions;
3597       for (t = target->blocked_exceptions; t != END_TSO_QUEUE; 
3598            last = &t->link, t = t->link) {
3599         ASSERT(get_itbl(t)->type == TSO);
3600         if (t == tso) {
3601           *last = tso->link;
3602           goto done;
3603         }
3604       }
3605       barf("unblockThread (Exception): TSO not found");
3606     }
3607
3608 #if !defined(THREADED_RTS)
3609   case BlockedOnRead:
3610   case BlockedOnWrite:
3611 #if defined(mingw32_HOST_OS)
3612   case BlockedOnDoProc:
3613 #endif
3614     {
3615       StgTSO *prev = NULL;
3616       for (t = blocked_queue_hd; t != END_TSO_QUEUE; 
3617            prev = t, t = t->link) {
3618         if (t == tso) {
3619           if (prev == NULL) {
3620             blocked_queue_hd = t->link;
3621             if (blocked_queue_tl == t) {
3622               blocked_queue_tl = END_TSO_QUEUE;
3623             }
3624           } else {
3625             prev->link = t->link;
3626             if (blocked_queue_tl == t) {
3627               blocked_queue_tl = prev;
3628             }
3629           }
3630 #if defined(mingw32_HOST_OS)
3631           /* (Cooperatively) signal that the worker thread should abort
3632            * the request.
3633            */
3634           abandonWorkRequest(tso->block_info.async_result->reqID);
3635 #endif
3636           goto done;
3637         }
3638       }
3639       barf("unblockThread (I/O): TSO not found");
3640     }
3641
3642   case BlockedOnDelay:
3643     {
3644       StgTSO *prev = NULL;
3645       for (t = sleeping_queue; t != END_TSO_QUEUE; 
3646            prev = t, t = t->link) {
3647         if (t == tso) {
3648           if (prev == NULL) {
3649             sleeping_queue = t->link;
3650           } else {
3651             prev->link = t->link;
3652           }
3653           goto done;
3654         }
3655       }
3656       barf("unblockThread (delay): TSO not found");
3657     }
3658 #endif
3659
3660   default:
3661     barf("unblockThread");
3662   }
3663
3664  done:
3665   tso->link = END_TSO_QUEUE;
3666   tso->why_blocked = NotBlocked;
3667   tso->block_info.closure = NULL;
3668   appendToRunQueue(cap,tso);
3669
3670   // We might have just migrated this TSO to our Capability:
3671   if (tso->bound) {
3672       tso->bound->cap = cap;
3673   }
3674 }
3675 #endif
3676
3677 /* -----------------------------------------------------------------------------
3678  * checkBlackHoles()
3679  *
3680  * Check the blackhole_queue for threads that can be woken up.  We do
3681  * this periodically: before every GC, and whenever the run queue is
3682  * empty.
3683  *
3684  * An elegant solution might be to just wake up all the blocked
3685  * threads with awakenBlockedQueue occasionally: they'll go back to
3686  * sleep again if the object is still a BLACKHOLE.  Unfortunately this
3687  * doesn't give us a way to tell whether we've actually managed to
3688  * wake up any threads, so we would be busy-waiting.
3689  *
3690  * -------------------------------------------------------------------------- */
3691
3692 static rtsBool
3693 checkBlackHoles (Capability *cap)
3694 {
3695     StgTSO **prev, *t;
3696     rtsBool any_woke_up = rtsFalse;
3697     StgHalfWord type;
3698
3699     // blackhole_queue is global:
3700     ASSERT_LOCK_HELD(&sched_mutex);
3701
3702     IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes"));
3703
3704     // ASSUMES: sched_mutex
3705     prev = &blackhole_queue;
3706     t = blackhole_queue;
3707     while (t != END_TSO_QUEUE) {
3708         ASSERT(t->why_blocked == BlockedOnBlackHole);
3709         type = get_itbl(t->block_info.closure)->type;
3710         if (type != BLACKHOLE && type != CAF_BLACKHOLE) {
3711             IF_DEBUG(sanity,checkTSO(t));
3712             t = unblockOne(cap, t);
3713             // urk, the threads migrate to the current capability
3714             // here, but we'd like to keep them on the original one.
3715             *prev = t;
3716             any_woke_up = rtsTrue;
3717         } else {
3718             prev = &t->link;
3719             t = t->link;
3720         }
3721     }
3722
3723     return any_woke_up;
3724 }
3725
3726 /* -----------------------------------------------------------------------------
3727  * raiseAsync()
3728  *
3729  * The following function implements the magic for raising an
3730  * asynchronous exception in an existing thread.
3731  *
3732  * We first remove the thread from any queue on which it might be
3733  * blocked.  The possible blockages are MVARs and BLACKHOLE_BQs.
3734  *
3735  * We strip the stack down to the innermost CATCH_FRAME, building
3736  * thunks in the heap for all the active computations, so they can 
3737  * be restarted if necessary.  When we reach a CATCH_FRAME, we build
3738  * an application of the handler to the exception, and push it on
3739  * the top of the stack.
3740  * 
3741  * How exactly do we save all the active computations?  We create an
3742  * AP_STACK for every UpdateFrame on the stack.  Entering one of these
3743  * AP_STACKs pushes everything from the corresponding update frame
3744  * upwards onto the stack.  (Actually, it pushes everything up to the
3745  * next update frame plus a pointer to the next AP_STACK object.
3746  * Entering the next AP_STACK object pushes more onto the stack until we
3747  * reach the last AP_STACK object - at which point the stack should look
3748  * exactly as it did when we killed the TSO and we can continue
3749  * execution by entering the closure on top of the stack.
3750  *
3751  * We can also kill a thread entirely - this happens if either (a) the 
3752  * exception passed to raiseAsync is NULL, or (b) there's no
3753  * CATCH_FRAME on the stack.  In either case, we strip the entire
3754  * stack and replace the thread with a zombie.
3755  *
3756  * ToDo: in THREADED_RTS mode, this function is only safe if either
3757  * (a) we hold all the Capabilities (eg. in GC, or if there is only
3758  * one Capability), or (b) we own the Capability that the TSO is
3759  * currently blocked on or on the run queue of.
3760  *
3761  * -------------------------------------------------------------------------- */
3762  
3763 void
3764 raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
3765 {
3766     raiseAsync_(cap, tso, exception, rtsFalse, NULL);
3767 }
3768
3769 void
3770 suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
3771 {
3772     raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
3773 }
3774
3775 static void
3776 raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, 
3777             rtsBool stop_at_atomically, StgPtr stop_here)
3778 {
3779     StgRetInfoTable *info;
3780     StgPtr sp, frame;
3781     nat i;
3782   
3783     // Thread already dead?
3784     if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
3785         return;
3786     }
3787
3788     IF_DEBUG(scheduler, 
3789              sched_belch("raising exception in thread %ld.", (long)tso->id));
3790     
3791     // Remove it from any blocking queues
3792     unblockThread(cap,tso);
3793
3794     // mark it dirty; we're about to change its stack.
3795     dirtyTSO(tso);
3796
3797     sp = tso->sp;
3798     
3799     // The stack freezing code assumes there's a closure pointer on
3800     // the top of the stack, so we have to arrange that this is the case...
3801     //
3802     if (sp[0] == (W_)&stg_enter_info) {
3803         sp++;
3804     } else {
3805         sp--;
3806         sp[0] = (W_)&stg_dummy_ret_closure;
3807     }
3808
3809     frame = sp + 1;
3810     while (stop_here == NULL || frame < stop_here) {
3811
3812         // 1. Let the top of the stack be the "current closure"
3813         //
3814         // 2. Walk up the stack until we find either an UPDATE_FRAME or a
3815         // CATCH_FRAME.
3816         //
3817         // 3. If it's an UPDATE_FRAME, then make an AP_STACK containing the
3818         // current closure applied to the chunk of stack up to (but not
3819         // including) the update frame.  This closure becomes the "current
3820         // closure".  Go back to step 2.
3821         //
3822         // 4. If it's a CATCH_FRAME, then leave the exception handler on
3823         // top of the stack applied to the exception.
3824         // 
3825         // 5. If it's a STOP_FRAME, then kill the thread.
3826         // 
3827         // NB: if we pass an ATOMICALLY_FRAME then abort the associated 
3828         // transaction
3829        
3830         info = get_ret_itbl((StgClosure *)frame);
3831
3832         switch (info->i.type) {
3833
3834         case UPDATE_FRAME:
3835         {
3836             StgAP_STACK * ap;
3837             nat words;
3838             
3839             // First build an AP_STACK consisting of the stack chunk above the
3840             // current update frame, with the top word on the stack as the
3841             // fun field.
3842             //
3843             words = frame - sp - 1;
3844             ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words));
3845             
3846             ap->size = words;
3847             ap->fun  = (StgClosure *)sp[0];
3848             sp++;
3849             for(i=0; i < (nat)words; ++i) {
3850                 ap->payload[i] = (StgClosure *)*sp++;
3851             }
3852             
3853             SET_HDR(ap,&stg_AP_STACK_info,
3854                     ((StgClosure *)frame)->header.prof.ccs /* ToDo */); 
3855             TICK_ALLOC_UP_THK(words+1,0);
3856             
3857             IF_DEBUG(scheduler,
3858                      debugBelch("sched: Updating ");
3859                      printPtr((P_)((StgUpdateFrame *)frame)->updatee); 
3860                      debugBelch(" with ");
3861                      printObj((StgClosure *)ap);
3862                 );
3863
3864             // Replace the updatee with an indirection
3865             //
3866             // Warning: if we're in a loop, more than one update frame on
3867             // the stack may point to the same object.  Be careful not to
3868             // overwrite an IND_OLDGEN in this case, because we'll screw
3869             // up the mutable lists.  To be on the safe side, don't
3870             // overwrite any kind of indirection at all.  See also
3871             // threadSqueezeStack in GC.c, where we have to make a similar
3872             // check.
3873             //
3874             if (!closure_IND(((StgUpdateFrame *)frame)->updatee)) {
3875                 // revert the black hole
3876                 UPD_IND_NOLOCK(((StgUpdateFrame *)frame)->updatee,
3877                                (StgClosure *)ap);
3878             }
3879             sp += sizeofW(StgUpdateFrame) - 1;
3880             sp[0] = (W_)ap; // push onto stack
3881             frame = sp + 1;
3882             continue; //no need to bump frame
3883         }
3884
3885         case STOP_FRAME:
3886             // We've stripped the entire stack, the thread is now dead.
3887             tso->what_next = ThreadKilled;
3888             tso->sp = frame + sizeofW(StgStopFrame);
3889             return;
3890
3891         case CATCH_FRAME:
3892             // If we find a CATCH_FRAME, and we've got an exception to raise,
3893             // then build the THUNK raise(exception), and leave it on
3894             // top of the CATCH_FRAME ready to enter.
3895             //
3896         {
3897 #ifdef PROFILING
3898             StgCatchFrame *cf = (StgCatchFrame *)frame;
3899 #endif
3900             StgThunk *raise;
3901             
3902             if (exception == NULL) break;
3903
3904             // we've got an exception to raise, so let's pass it to the
3905             // handler in this frame.
3906             //
3907             raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
3908             TICK_ALLOC_SE_THK(1,0);
3909             SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
3910             raise->payload[0] = exception;
3911             
3912             // throw away the stack from Sp up to the CATCH_FRAME.
3913             //
3914             sp = frame - 1;
3915             
3916             /* Ensure that async excpetions are blocked now, so we don't get
3917              * a surprise exception before we get around to executing the
3918              * handler.
3919              */
3920             if (tso->blocked_exceptions == NULL) {
3921                 tso->blocked_exceptions = END_TSO_QUEUE;
3922             }
3923
3924             /* Put the newly-built THUNK on top of the stack, ready to execute
3925              * when the thread restarts.
3926              */
3927             sp[0] = (W_)raise;
3928             sp[-1] = (W_)&stg_enter_info;
3929             tso->sp = sp-1;
3930             tso->what_next = ThreadRunGHC;
3931             IF_DEBUG(sanity, checkTSO(tso));
3932             return;
3933         }
3934             
3935         case ATOMICALLY_FRAME:
3936             if (stop_at_atomically) {
3937                 ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
3938                 stmCondemnTransaction(cap, tso -> trec);
3939 #ifdef REG_R1
3940                 tso->sp = frame;
3941 #else
3942                 // R1 is not a register: the return convention for IO in
3943                 // this case puts the return value on the stack, so we
3944                 // need to set up the stack to return to the atomically
3945                 // frame properly...
3946                 tso->sp = frame - 2;
3947                 tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
3948                 tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
3949 #endif
3950                 tso->what_next = ThreadRunGHC;
3951                 return;
3952             }
3953             // Not stop_at_atomically... fall through and abort the
3954             // transaction.
3955             
3956         case CATCH_RETRY_FRAME:
3957             // IF we find an ATOMICALLY_FRAME then we abort the
3958             // current transaction and propagate the exception.  In
3959             // this case (unlike ordinary exceptions) we do not care
3960             // whether the transaction is valid or not because its
3961             // possible validity cannot have caused the exception
3962             // and will not be visible after the abort.
3963             IF_DEBUG(stm,
3964                      debugBelch("Found atomically block delivering async exception\n"));
3965             StgTRecHeader *trec = tso -> trec;
3966             StgTRecHeader *outer = stmGetEnclosingTRec(trec);
3967             stmAbortTransaction(cap, trec);
3968             tso -> trec = outer;
3969             break;
3970             
3971         default:
3972             break;
3973         }
3974
3975         // move on to the next stack frame
3976         frame += stack_frame_sizeW((StgClosure *)frame);
3977     }
3978
3979     // if we got here, then we stopped at stop_here
3980     ASSERT(stop_here != NULL);
3981 }
3982
3983 /* -----------------------------------------------------------------------------
3984    Deleting threads
3985
3986    This is used for interruption (^C) and forking, and corresponds to
3987    raising an exception but without letting the thread catch the
3988    exception.
3989    -------------------------------------------------------------------------- */
3990
3991 static void 
3992 deleteThread (Capability *cap, StgTSO *tso)
3993 {
3994   if (tso->why_blocked != BlockedOnCCall &&
3995       tso->why_blocked != BlockedOnCCall_NoUnblockExc) {
3996       raiseAsync(cap,tso,NULL);
3997   }
3998 }
3999
4000 #ifdef FORKPROCESS_PRIMOP_SUPPORTED
4001 static void 
4002 deleteThread_(Capability *cap, StgTSO *tso)
4003 { // for forkProcess only:
4004   // like deleteThread(), but we delete threads in foreign calls, too.
4005
4006     if (tso->why_blocked == BlockedOnCCall ||
4007         tso->why_blocked == BlockedOnCCall_NoUnblockExc) {
4008         unblockOne(cap,tso);
4009         tso->what_next = ThreadKilled;
4010     } else {
4011         deleteThread(cap,tso);
4012     }
4013 }
4014 #endif
4015
4016 /* -----------------------------------------------------------------------------
4017    raiseExceptionHelper
4018    
4019    This function is called by the raise# primitve, just so that we can
4020    move some of the tricky bits of raising an exception from C-- into
4021    C.  Who knows, it might be a useful re-useable thing here too.
4022    -------------------------------------------------------------------------- */
4023
4024 StgWord
4025 raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception)
4026 {
4027     Capability *cap = regTableToCapability(reg);
4028     StgThunk *raise_closure = NULL;
4029     StgPtr p, next;
4030     StgRetInfoTable *info;
4031     //
4032     // This closure represents the expression 'raise# E' where E
4033     // is the exception raise.  It is used to overwrite all the
4034     // thunks which are currently under evaluataion.
4035     //
4036
4037     // OLD COMMENT (we don't have MIN_UPD_SIZE now):
4038     // LDV profiling: stg_raise_info has THUNK as its closure
4039     // type. Since a THUNK takes at least MIN_UPD_SIZE words in its
4040     // payload, MIN_UPD_SIZE is more approprate than 1.  It seems that
4041     // 1 does not cause any problem unless profiling is performed.
4042     // However, when LDV profiling goes on, we need to linearly scan
4043     // small object pool, where raise_closure is stored, so we should
4044     // use MIN_UPD_SIZE.
4045     //
4046     // raise_closure = (StgClosure *)RET_STGCALL1(P_,allocate,
4047     //                                 sizeofW(StgClosure)+1);
4048     //
4049
4050     //
4051     // Walk up the stack, looking for the catch frame.  On the way,
4052     // we update any closures pointed to from update frames with the
4053     // raise closure that we just built.
4054     //
4055     p = tso->sp;
4056     while(1) {
4057         info = get_ret_itbl((StgClosure *)p);
4058         next = p + stack_frame_sizeW((StgClosure *)p);
4059         switch (info->i.type) {
4060             
4061         case UPDATE_FRAME:
4062             // Only create raise_closure if we need to.
4063             if (raise_closure == NULL) {
4064                 raise_closure = 
4065                     (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+1);
4066                 SET_HDR(raise_closure, &stg_raise_info, CCCS);
4067                 raise_closure->payload[0] = exception;
4068             }
4069             UPD_IND(((StgUpdateFrame *)p)->updatee,(StgClosure *)raise_closure);
4070             p = next;
4071             continue;
4072
4073         case ATOMICALLY_FRAME:
4074             IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p\n", p));
4075             tso->sp = p;
4076             return ATOMICALLY_FRAME;
4077             
4078         case CATCH_FRAME:
4079             tso->sp = p;
4080             return CATCH_FRAME;
4081
4082         case CATCH_STM_FRAME:
4083             IF_DEBUG(stm, debugBelch("Found CATCH_STM_FRAME at %p\n", p));
4084             tso->sp = p;
4085             return CATCH_STM_FRAME;
4086             
4087         case STOP_FRAME:
4088             tso->sp = p;
4089             return STOP_FRAME;
4090
4091         case CATCH_RETRY_FRAME:
4092         default:
4093             p = next; 
4094             continue;
4095         }
4096     }
4097 }
4098
4099
4100 /* -----------------------------------------------------------------------------
4101    findRetryFrameHelper
4102
4103    This function is called by the retry# primitive.  It traverses the stack
4104    leaving tso->sp referring to the frame which should handle the retry.  
4105
4106    This should either be a CATCH_RETRY_FRAME (if the retry# is within an orElse#) 
4107    or should be a ATOMICALLY_FRAME (if the retry# reaches the top level).  
4108
4109    We skip CATCH_STM_FRAMEs because retries are not considered to be exceptions,
4110    despite the similar implementation.
4111
4112    We should not expect to see CATCH_FRAME or STOP_FRAME because those should
4113    not be created within memory transactions.
4114    -------------------------------------------------------------------------- */
4115
4116 StgWord
4117 findRetryFrameHelper (StgTSO *tso)
4118 {
4119   StgPtr           p, next;
4120   StgRetInfoTable *info;
4121
4122   p = tso -> sp;
4123   while (1) {
4124     info = get_ret_itbl((StgClosure *)p);
4125     next = p + stack_frame_sizeW((StgClosure *)p);
4126     switch (info->i.type) {
4127       
4128     case ATOMICALLY_FRAME:
4129       IF_DEBUG(stm, debugBelch("Found ATOMICALLY_FRAME at %p during retrry\n", p));
4130       tso->sp = p;
4131       return ATOMICALLY_FRAME;
4132       
4133     case CATCH_RETRY_FRAME:
4134       IF_DEBUG(stm, debugBelch("Found CATCH_RETRY_FRAME at %p during retrry\n", p));
4135       tso->sp = p;
4136       return CATCH_RETRY_FRAME;
4137       
4138     case CATCH_STM_FRAME:
4139     default:
4140       ASSERT(info->i.type != CATCH_FRAME);
4141       ASSERT(info->i.type != STOP_FRAME);
4142       p = next; 
4143       continue;
4144     }
4145   }
4146 }
4147
4148 /* -----------------------------------------------------------------------------
4149    resurrectThreads is called after garbage collection on the list of
4150    threads found to be garbage.  Each of these threads will be woken
4151    up and sent a signal: BlockedOnDeadMVar if the thread was blocked
4152    on an MVar, or NonTermination if the thread was blocked on a Black
4153    Hole.
4154
4155    Locks: assumes we hold *all* the capabilities.
4156    -------------------------------------------------------------------------- */
4157
4158 void
4159 resurrectThreads (StgTSO *threads)
4160 {
4161     StgTSO *tso, *next;
4162     Capability *cap;
4163
4164     for (tso = threads; tso != END_TSO_QUEUE; tso = next) {
4165         next = tso->global_link;
4166         tso->global_link = all_threads;
4167         all_threads = tso;
4168         IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id));
4169         
4170         // Wake up the thread on the Capability it was last on for a
4171         // bound thread, or last_free_capability otherwise.
4172         if (tso->bound) {
4173             cap = tso->bound->cap;
4174         } else {
4175             cap = last_free_capability;
4176         }
4177         
4178         switch (tso->why_blocked) {
4179         case BlockedOnMVar:
4180         case BlockedOnException:
4181             /* Called by GC - sched_mutex lock is currently held. */
4182             raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure);
4183             break;
4184         case BlockedOnBlackHole:
4185             raiseAsync(cap, tso,(StgClosure *)NonTermination_closure);
4186             break;
4187         case BlockedOnSTM:
4188             raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure);
4189             break;
4190         case NotBlocked:
4191             /* This might happen if the thread was blocked on a black hole
4192              * belonging to a thread that we've just woken up (raiseAsync
4193              * can wake up threads, remember...).
4194              */
4195             continue;
4196         default:
4197             barf("resurrectThreads: thread blocked in a strange way");
4198         }
4199     }
4200 }
4201
4202 /* ----------------------------------------------------------------------------
4203  * Debugging: why is a thread blocked
4204  * [Also provides useful information when debugging threaded programs
4205  *  at the Haskell source code level, so enable outside of DEBUG. --sof 7/02]
4206    ------------------------------------------------------------------------- */
4207
4208 #if DEBUG
4209 static void
4210 printThreadBlockage(StgTSO *tso)
4211 {
4212   switch (tso->why_blocked) {
4213   case BlockedOnRead:
4214     debugBelch("is blocked on read from fd %d", (int)(tso->block_info.fd));
4215     break;
4216   case BlockedOnWrite:
4217     debugBelch("is blocked on write to fd %d", (int)(tso->block_info.fd));
4218     break;
4219 #if defined(mingw32_HOST_OS)
4220     case BlockedOnDoProc:
4221     debugBelch("is blocked on proc (request: %ld)", tso->block_info.async_result->reqID);
4222     break;
4223 #endif
4224   case BlockedOnDelay:
4225     debugBelch("is blocked until %ld", (long)(tso->block_info.target));
4226     break;
4227   case BlockedOnMVar:
4228     debugBelch("is blocked on an MVar @ %p", tso->block_info.closure);
4229     break;
4230   case BlockedOnException:
4231     debugBelch("is blocked on delivering an exception to thread %d",
4232             tso->block_info.tso->id);
4233     break;
4234   case BlockedOnBlackHole:
4235     debugBelch("is blocked on a black hole");
4236     break;
4237   case NotBlocked:
4238     debugBelch("is not blocked");
4239     break;
4240 #if defined(PARALLEL_HASKELL)
4241   case BlockedOnGA:
4242     debugBelch("is blocked on global address; local FM_BQ is %p (%s)",
4243             tso->block_info.closure, info_type(tso->block_info.closure));
4244     break;
4245   case BlockedOnGA_NoSend:
4246     debugBelch("is blocked on global address (no send); local FM_BQ is %p (%s)",
4247             tso->block_info.closure, info_type(tso->block_info.closure));
4248     break;
4249 #endif
4250   case BlockedOnCCall:
4251     debugBelch("is blocked on an external call");
4252     break;
4253   case BlockedOnCCall_NoUnblockExc:
4254     debugBelch("is blocked on an external call (exceptions were already blocked)");
4255     break;
4256   case BlockedOnSTM:
4257     debugBelch("is blocked on an STM operation");
4258     break;
4259   default:
4260     barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)",
4261          tso->why_blocked, tso->id, tso);
4262   }
4263 }
4264
4265 void
4266 printThreadStatus(StgTSO *t)
4267 {
4268     debugBelch("\tthread %4d @ %p ", t->id, (void *)t);
4269     {
4270       void *label = lookupThreadLabel(t->id);
4271       if (label) debugBelch("[\"%s\"] ",(char *)label);
4272     }
4273     if (t->what_next == ThreadRelocated) {
4274         debugBelch("has been relocated...\n");
4275     } else {
4276         switch (t->what_next) {
4277         case ThreadKilled:
4278             debugBelch("has been killed");
4279             break;
4280         case ThreadComplete:
4281             debugBelch("has completed");
4282             break;
4283         default:
4284             printThreadBlockage(t);
4285         }
4286         debugBelch("\n");
4287     }
4288 }
4289
4290 void
4291 printAllThreads(void)
4292 {
4293   StgTSO *t, *next;
4294   nat i;
4295   Capability *cap;
4296
4297 # if defined(GRAN)
4298   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4299   ullong_format_string(TIME_ON_PROC(CurrentProc), 
4300                        time_string, rtsFalse/*no commas!*/);
4301
4302   debugBelch("all threads at [%s]:\n", time_string);
4303 # elif defined(PARALLEL_HASKELL)
4304   char time_string[TIME_STR_LEN], node_str[NODE_STR_LEN];
4305   ullong_format_string(CURRENT_TIME,
4306                        time_string, rtsFalse/*no commas!*/);
4307
4308   debugBelch("all threads at [%s]:\n", time_string);
4309 # else
4310   debugBelch("all threads:\n");
4311 # endif
4312
4313   for (i = 0; i < n_capabilities; i++) {
4314       cap = &capabilities[i];
4315       debugBelch("threads on capability %d:\n", cap->no);
4316       for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = t->link) {
4317           printThreadStatus(t);
4318       }
4319   }
4320
4321   debugBelch("other threads:\n");
4322   for (t = all_threads; t != END_TSO_QUEUE; t = next) {
4323       if (t->why_blocked != NotBlocked) {
4324           printThreadStatus(t);
4325       }
4326       if (t->what_next == ThreadRelocated) {
4327           next = t->link;
4328       } else {
4329           next = t->global_link;
4330       }
4331   }
4332 }
4333
4334 // useful from gdb
4335 void 
4336 printThreadQueue(StgTSO *t)
4337 {
4338     nat i = 0;
4339     for (; t != END_TSO_QUEUE; t = t->link) {
4340         printThreadStatus(t);
4341         i++;
4342     }
4343     debugBelch("%d threads on queue\n", i);
4344 }
4345
4346 /* 
4347    Print a whole blocking queue attached to node (debugging only).
4348 */
4349 # if defined(PARALLEL_HASKELL)
4350 void 
4351 print_bq (StgClosure *node)
4352 {
4353   StgBlockingQueueElement *bqe;
4354   StgTSO *tso;
4355   rtsBool end;
4356
4357   debugBelch("## BQ of closure %p (%s): ",
4358           node, info_type(node));
4359
4360   /* should cover all closures that may have a blocking queue */
4361   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4362          get_itbl(node)->type == FETCH_ME_BQ ||
4363          get_itbl(node)->type == RBH ||
4364          get_itbl(node)->type == MVAR);
4365     
4366   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4367
4368   print_bqe(((StgBlockingQueue*)node)->blocking_queue);
4369 }
4370
4371 /* 
4372    Print a whole blocking queue starting with the element bqe.
4373 */
4374 void 
4375 print_bqe (StgBlockingQueueElement *bqe)
4376 {
4377   rtsBool end;
4378
4379   /* 
4380      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4381   */
4382   for (end = (bqe==END_BQ_QUEUE);
4383        !end; // iterate until bqe points to a CONSTR
4384        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), 
4385        bqe = end ? END_BQ_QUEUE : bqe->link) {
4386     ASSERT(bqe != END_BQ_QUEUE);                               // sanity check
4387     ASSERT(bqe != (StgBlockingQueueElement *)NULL);            // sanity check
4388     /* types of closures that may appear in a blocking queue */
4389     ASSERT(get_itbl(bqe)->type == TSO ||           
4390            get_itbl(bqe)->type == BLOCKED_FETCH || 
4391            get_itbl(bqe)->type == CONSTR); 
4392     /* only BQs of an RBH end with an RBH_Save closure */
4393     //ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4394
4395     switch (get_itbl(bqe)->type) {
4396     case TSO:
4397       debugBelch(" TSO %u (%x),",
4398               ((StgTSO *)bqe)->id, ((StgTSO *)bqe));
4399       break;
4400     case BLOCKED_FETCH:
4401       debugBelch(" BF (node=%p, ga=((%x, %d, %x)),",
4402               ((StgBlockedFetch *)bqe)->node, 
4403               ((StgBlockedFetch *)bqe)->ga.payload.gc.gtid,
4404               ((StgBlockedFetch *)bqe)->ga.payload.gc.slot,
4405               ((StgBlockedFetch *)bqe)->ga.weight);
4406       break;
4407     case CONSTR:
4408       debugBelch(" %s (IP %p),",
4409               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4410                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4411                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4412                "RBH_Save_?"), get_itbl(bqe));
4413       break;
4414     default:
4415       barf("Unexpected closure type %s in blocking queue", // of %p (%s)",
4416            info_type((StgClosure *)bqe)); // , node, info_type(node));
4417       break;
4418     }
4419   } /* for */
4420   debugBelch("\n");
4421 }
4422 # elif defined(GRAN)
4423 void 
4424 print_bq (StgClosure *node)
4425 {
4426   StgBlockingQueueElement *bqe;
4427   PEs node_loc, tso_loc;
4428   rtsBool end;
4429
4430   /* should cover all closures that may have a blocking queue */
4431   ASSERT(get_itbl(node)->type == BLACKHOLE_BQ ||
4432          get_itbl(node)->type == FETCH_ME_BQ ||
4433          get_itbl(node)->type == RBH);
4434     
4435   ASSERT(node!=(StgClosure*)NULL);         // sanity check
4436   node_loc = where_is(node);
4437
4438   debugBelch("## BQ of closure %p (%s) on [PE %d]: ",
4439           node, info_type(node), node_loc);
4440
4441   /* 
4442      NB: In a parallel setup a BQ of an RBH must end with an RBH_Save closure;
4443   */
4444   for (bqe = ((StgBlockingQueue*)node)->blocking_queue, end = (bqe==END_BQ_QUEUE);
4445        !end; // iterate until bqe points to a CONSTR
4446        end = (get_itbl(bqe)->type == CONSTR) || (bqe->link==END_BQ_QUEUE), bqe = end ? END_BQ_QUEUE : bqe->link) {
4447     ASSERT(bqe != END_BQ_QUEUE);             // sanity check
4448     ASSERT(bqe != (StgBlockingQueueElement *)NULL);  // sanity check
4449     /* types of closures that may appear in a blocking queue */
4450     ASSERT(get_itbl(bqe)->type == TSO ||           
4451            get_itbl(bqe)->type == CONSTR); 
4452     /* only BQs of an RBH end with an RBH_Save closure */
4453     ASSERT(get_itbl(bqe)->type != CONSTR || get_itbl(node)->type == RBH);
4454
4455     tso_loc = where_is((StgClosure *)bqe);
4456     switch (get_itbl(bqe)->type) {
4457     case TSO:
4458       debugBelch(" TSO %d (%p) on [PE %d],",
4459               ((StgTSO *)bqe)->id, (StgTSO *)bqe, tso_loc);
4460       break;
4461     case CONSTR:
4462       debugBelch(" %s (IP %p),",
4463               (get_itbl(bqe) == &stg_RBH_Save_0_info ? "RBH_Save_0" :
4464                get_itbl(bqe) == &stg_RBH_Save_1_info ? "RBH_Save_1" :
4465                get_itbl(bqe) == &stg_RBH_Save_2_info ? "RBH_Save_2" :
4466                "RBH_Save_?"), get_itbl(bqe));
4467       break;
4468     default:
4469       barf("Unexpected closure type %s in blocking queue of %p (%s)",
4470            info_type((StgClosure *)bqe), node, info_type(node));
4471       break;
4472     }
4473   } /* for */
4474   debugBelch("\n");
4475 }
4476 # endif
4477
4478 #if defined(PARALLEL_HASKELL)
4479 static nat
4480 run_queue_len(void)
4481 {
4482     nat i;
4483     StgTSO *tso;
4484     
4485     for (i=0, tso=run_queue_hd; 
4486          tso != END_TSO_QUEUE;
4487          i++, tso=tso->link) {
4488         /* nothing */
4489     }
4490         
4491     return i;
4492 }
4493 #endif
4494
4495 void
4496 sched_belch(char *s, ...)
4497 {
4498     va_list ap;
4499     va_start(ap,s);
4500 #ifdef THREADED_RTS
4501     debugBelch("sched (task %p, pid %d): ", (void *)(unsigned long)(unsigned int)osThreadId(), getpid());
4502 #elif defined(PARALLEL_HASKELL)
4503     debugBelch("== ");
4504 #else
4505     debugBelch("sched: ");
4506 #endif
4507     vdebugBelch(s, ap);
4508     debugBelch("\n");
4509     va_end(ap);
4510 }
4511
4512 #endif /* DEBUG */